Pull Plasma from Apache Arrow and remove Plasma store from Ray. (#692)

* Rebase Ray on top of Plasma in Apache Arrow

* add thirdparty building scripts

* use rebased arrow

* fix

* fix build

* fix python visibility

* comment out C tests for now

* fix multithreading

* fix

* reduce logging

* fix plasma manager multithreading

* make sure old and new object IDs can coexist peacefully

* more rebasing

* update

* fixes

* fix

* install pyarrow

* install cython

* fix

* install newer cmake

* fix

* rebase on top of latest arrow

* getting runtest.py run locally (needed to comment out a test for that to work)

* work on plasma tests

* more fixes

* fix local scheduler tests

* fix global scheduler test

* more fixes

* fix python 3 bytes vs string

* fix manager tests valgrind

* fix documentation building

* fix linting

* fix c++ linting

* fix linting

* add tests back in

* Install without sudo.

* Set PKG_CONFIG_PATH in build.sh so that Ray can find plasma.

* Install pkg-config

* Link -lpthread, note that find_package(Threads) doesn't seem to work reliably.

* Comment in testGPUIDs in runtest.py.

* Set PKG_CONFIG_PATH when building pyarrow.

* Pull apache/arrow and not pcmoritz/arrow.

* Fix installation in docker image.

* adapt to changes of the plasma api

* Fix installation of pyarrow module.

* Fix linting.

* Use correct python executable to build pyarrow.
This commit is contained in:
Philipp Moritz
2017-07-31 21:04:15 -07:00
committed by Robert Nishihara
parent dfcd399dbb
commit c3b39b4d86
64 changed files with 470 additions and 5761 deletions
+1 -1
View File
@@ -465,7 +465,7 @@ class GlobalState(object):
**params)
for (event, score) in event_list:
event_dict = json.loads(event)
event_dict = json.loads(event.decode())
task_id = ""
for event in event_dict:
if "task_id" in event[3]:
+12 -10
View File
@@ -10,6 +10,7 @@ import sys
import time
import unittest
import pyarrow as pa
import ray.global_scheduler as global_scheduler
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
@@ -87,8 +88,8 @@ class TestGlobalScheduler(unittest.TestCase):
self.plasma_manager_pids.append(p3)
plasma_address = "{}:{}".format(self.node_ip_address,
plasma_manager_port)
plasma_client = plasma.PlasmaClient(plasma_store_name,
plasma_manager_name)
plasma_client = pa.plasma.connect(plasma_store_name,
plasma_manager_name, 64)
self.plasma_clients.append(plasma_client)
# Start the local scheduler.
local_scheduler_name, p4 = local_scheduler.start_local_scheduler(
@@ -203,9 +204,10 @@ class TestGlobalScheduler(unittest.TestCase):
# Sleep before submitting task to local scheduler.
time.sleep(0.1)
# Submit a task to Redis.
task = local_scheduler.Task(random_driver_id(), random_function_id(),
[local_scheduler.ObjectID(object_dep)],
num_return_vals[0], random_task_id(), 0)
task = local_scheduler.Task(
random_driver_id(), random_function_id(),
[local_scheduler.ObjectID(object_dep.binary())],
num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
time.sleep(0.1)
# There should now be a task in Redis, and it should get assigned to
@@ -256,11 +258,11 @@ class TestGlobalScheduler(unittest.TestCase):
# Give 10ms for object info handler to fire (long enough to
# yield CPU).
time.sleep(0.010)
task = local_scheduler.Task(random_driver_id(),
random_function_id(),
[local_scheduler.ObjectID(object_dep)],
num_return_vals[0], random_task_id(),
0)
task = local_scheduler.Task(
random_driver_id(),
random_function_id(),
[local_scheduler.ObjectID(object_dep.binary())],
num_return_vals[0], random_task_id(), 0)
self.local_scheduler_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that
# they all get assigned to the local scheduler.
+14 -13
View File
@@ -12,6 +12,7 @@ import unittest
import ray.local_scheduler as local_scheduler
import ray.plasma as plasma
import pyarrow as pa
USE_VALGRIND = False
ID_SIZE = 20
@@ -41,8 +42,7 @@ class TestLocalSchedulerClient(unittest.TestCase):
def setUp(self):
# Start Plasma store.
plasma_store_name, self.p1 = plasma.start_plasma_store()
self.plasma_client = plasma.PlasmaClient(plasma_store_name,
release_delay=0)
self.plasma_client = pa.plasma.connect(plasma_store_name, "", 0)
# Start a local scheduler.
scheduler_name, self.p2 = local_scheduler.start_local_scheduler(
plasma_store_name, use_valgrind=USE_VALGRIND)
@@ -72,8 +72,8 @@ class TestLocalSchedulerClient(unittest.TestCase):
# Create and seal the objects in the object store so that we can
# schedule all of the subsequent tasks.
for object_id in object_ids:
self.plasma_client.create(object_id.id(), 0)
self.plasma_client.seal(object_id.id())
self.plasma_client.create(pa.plasma.ObjectID(object_id.id()), 0)
self.plasma_client.seal(pa.plasma.ObjectID(object_id.id()))
# Define some arguments to use for the tasks.
args_list = [
[],
@@ -153,8 +153,8 @@ class TestLocalSchedulerClient(unittest.TestCase):
time.sleep(0.1)
# Create and seal the object ID in the object store. This should
# trigger a scheduling event.
self.plasma_client.create(object_id.id(), 0)
self.plasma_client.seal(object_id.id())
self.plasma_client.create(pa.plasma.ObjectID(object_id.id()), 0)
self.plasma_client.seal(pa.plasma.ObjectID(object_id.id()))
# Wait until the thread finishes so that we know the task was
# scheduled.
t.join()
@@ -175,8 +175,8 @@ class TestLocalSchedulerClient(unittest.TestCase):
t.start()
# Make one of the dependencies available.
buf = self.plasma_client.create(object_id1.id(), 1)
self.plasma_client.seal(object_id1.id())
buf = self.plasma_client.create(pa.plasma.ObjectID(object_id1.id()), 1)
self.plasma_client.seal(pa.plasma.ObjectID(object_id1.id()))
# Release the object.
del buf
# Check that the thread is still waiting for a task.
@@ -188,23 +188,24 @@ class TestLocalSchedulerClient(unittest.TestCase):
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Check that the first object dependency was evicted.
object1 = self.plasma_client.get([object_id1.id()], timeout_ms=0)
object1 = self.plasma_client.get([pa.plasma.ObjectID(object_id1.id())],
timeout_ms=0)
self.assertEqual(object1, [None])
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Create the second dependency.
self.plasma_client.create(object_id2.id(), 1)
self.plasma_client.seal(object_id2.id())
self.plasma_client.create(pa.plasma.ObjectID(object_id2.id()), 1)
self.plasma_client.seal(pa.plasma.ObjectID(object_id2.id()))
# Check that the thread is still waiting for a task.
time.sleep(0.1)
self.assertTrue(t.is_alive())
# Create the first dependency again. Both dependencies are now
# available.
self.plasma_client.create(object_id1.id(), 1)
self.plasma_client.seal(object_id1.id())
self.plasma_client.create(pa.plasma.ObjectID(object_id1.id()), 1)
self.plasma_client.seal(pa.plasma.ObjectID(object_id1.id()))
# Wait until the thread finishes so that we know the task was
# scheduled.
+2 -7
View File
@@ -2,13 +2,8 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.plasma.plasma import (PlasmaBuffer, buffers_equal, PlasmaClient,
start_plasma_store, start_plasma_manager,
plasma_object_exists_error,
plasma_out_of_memory_error,
from ray.plasma.plasma import (start_plasma_store, start_plasma_manager,
DEFAULT_PLASMA_STORE_MEMORY)
__all__ = ["PlasmaBuffer", "buffers_equal", "PlasmaClient",
"start_plasma_store", "start_plasma_manager",
"plasma_object_exists_error", "plasma_out_of_memory_error",
__all__ = ["start_plasma_store", "start_plasma_manager",
"DEFAULT_PLASMA_STORE_MEMORY"]
+1 -302
View File
@@ -5,315 +5,14 @@ from __future__ import print_function
import os
import random
import subprocess
import sys
import time
import ray.core.src.plasma.libplasma as libplasma
from ray.core.src.plasma.libplasma import plasma_object_exists_error
from ray.core.src.plasma.libplasma import plasma_out_of_memory_error
__all__ = ["PlasmaBuffer", "buffers_equal", "PlasmaClient",
"start_plasma_store", "start_plasma_manager",
"plasma_object_exists_error", "plasma_out_of_memory_error",
__all__ = ["start_plasma_store", "start_plasma_manager",
"DEFAULT_PLASMA_STORE_MEMORY"]
PLASMA_WAIT_TIMEOUT = 2 ** 30
class PlasmaBuffer(object):
"""This is the type returned by calls to get with a PlasmaClient.
We define our own class instead of directly returning a buffer object so
that we can add a custom destructor which notifies Plasma that the object
is no longer being used, so the memory in the Plasma store backing the
object can potentially be freed.
Attributes:
buffer (buffer): A buffer containing an object in the Plasma store.
plasma_id (PlasmaID): The ID of the object in the buffer.
plasma_client (PlasmaClient): The PlasmaClient that we use to communicate
with the store and manager.
"""
def __init__(self, buff, plasma_id, plasma_client):
"""Initialize a PlasmaBuffer."""
self.buffer = buff
self.plasma_id = plasma_id
self.plasma_client = plasma_client
def __del__(self):
"""Notify Plasma that the object is no longer needed.
If the plasma client has been shut down, then don't do anything.
"""
if self.plasma_client.alive:
libplasma.release(self.plasma_client.conn, self.plasma_id)
def __getitem__(self, index):
"""Read from the PlasmaBuffer as if it were just a regular buffer."""
# We currently don't allow slicing plasma buffers. We should handle
# this better, but it requires some care because the slice may be
# backed by the same memory in the object store, but the original
# plasma buffer may go out of scope causing the memory to no longer be
# accessible.
assert not isinstance(index, slice)
value = self.buffer[index]
if sys.version_info >= (3, 0) and not isinstance(index, slice):
value = chr(value)
return value
def __setitem__(self, index, value):
"""Write to the PlasmaBuffer as if it were just a regular buffer.
This should fail because the buffer should be read only.
"""
# We currently don't allow slicing plasma buffers. We should handle
# this better, but it requires some care because the slice may be
# backed by the same memory in the object store, but the original
# plasma buffer may go out of scope causing the memory to no longer be
# accessible.
assert not isinstance(index, slice)
if sys.version_info >= (3, 0) and not isinstance(index, slice):
value = ord(value)
self.buffer[index] = value
def __len__(self):
"""Return the length of the buffer."""
return len(self.buffer)
def buffers_equal(buff1, buff2):
"""Compare two buffers. These buffers may be PlasmaBuffer objects.
This method should only be used in the tests. We implement a special helper
method for doing this because doing comparisons by slicing is much faster,
but we don't want to expose slicing of PlasmaBuffer objects because it
currently is not safe.
"""
buff1_to_compare = (buff1.buffer if isinstance(buff1, PlasmaBuffer)
else buff1)
buff2_to_compare = (buff2.buffer if isinstance(buff2, PlasmaBuffer)
else buff2)
return buff1_to_compare[:] == buff2_to_compare[:]
class PlasmaClient(object):
"""The PlasmaClient is used to interface with a plasma store and manager.
The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
buffer, and get a buffer. Buffers are referred to by object IDs, which are
strings.
"""
def __init__(self, store_socket_name, manager_socket_name=None,
release_delay=64):
"""Initialize the PlasmaClient.
Args:
store_socket_name (str): Name of the socket the plasma store is
listening at.
manager_socket_name (str): Name of the socket the plasma manager is
listening at.
release_delay (int): The maximum number of objects that the client
will keep and delay releasing (for caching reasons).
"""
self.store_socket_name = store_socket_name
self.manager_socket_name = manager_socket_name
self.alive = True
if manager_socket_name is not None:
self.conn = libplasma.connect(store_socket_name,
manager_socket_name,
release_delay)
else:
self.conn = libplasma.connect(store_socket_name, "", release_delay)
def shutdown(self):
"""Shutdown the client so that it does not send messages.
If we kill the Plasma store and Plasma manager that this client is
connected to, then we can use this method to prevent the client from
trying to send messages to the killed processes.
"""
if self.alive:
libplasma.disconnect(self.conn)
self.alive = False
def create(self, object_id, size, metadata=None):
"""Create a new buffer in the PlasmaStore for a particular object ID.
The returned buffer is mutable until seal is called.
Args:
object_id (str): A string used to identify an object.
size (int): The size in bytes of the created buffer.
metadata (buffer): An optional buffer encoding whatever metadata the
user wishes to encode.
Raises:
plasma_object_exists_error: This exception is raised if the object
could not be created because there already is an object with the
same ID in the plasma store.
plasma_out_of_memory_error: This exception is raised if the object
could not be created because the plasma store is unable to evict
enough objects to create room for it.
"""
# Turn the metadata into the right type.
metadata = bytearray(b"") if metadata is None else metadata
buff = libplasma.create(self.conn, object_id, size, metadata)
return PlasmaBuffer(buff, object_id, self)
def get(self, object_ids, timeout_ms=-1):
"""Create a buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block. The
retrieved buffer is immutable.
Args:
object_ids (List[str]): A list of strings used to identify some
objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning. Pass -1 if the call should
block and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
returns = []
for i in range(len(object_ids)):
if results[i] is None:
returns.append(None)
else:
returns.append(PlasmaBuffer(results[i][0], object_ids[i],
self))
return returns
def get_metadata(self, object_ids, timeout_ms=-1):
"""Create a buffer from the PlasmaStore based on object ID.
If the object has not been sealed yet, this call will block until the
object has been sealed. The retrieved buffer is immutable.
Args:
object_ids (List[str]): A list of strings used to identify some
objects.
timeout_ms (int): The number of milliseconds that the get call should
block before timing out and returning. Pass -1 if the call should
block and 0 if the call should return immediately.
"""
results = libplasma.get(self.conn, object_ids, timeout_ms)
assert len(object_ids) == len(results)
returns = []
for i in range(len(object_ids)):
if results[i] is None:
returns.append(None)
else:
returns.append(PlasmaBuffer(results[i][1], object_ids[i],
self))
return returns
def contains(self, object_id):
"""Check if the object is present and has been sealed.
Args:
object_id (str): A string used to identify an object.
"""
return libplasma.contains(self.conn, object_id)
def hash(self, object_id):
"""Compute the hash of an object in the object store.
Args:
object_id (str): A string used to identify an object.
Returns:
A digest string object's SHA256 hash. If the object isn't in the
object store, the string will have length zero.
"""
return libplasma.hash(self.conn, object_id)
def seal(self, object_id):
"""Seal the buffer in the PlasmaStore for a particular object ID.
Once a buffer has been sealed, the buffer is immutable and can only be
accessed through get.
Args:
object_id (str): A string used to identify an object.
"""
libplasma.seal(self.conn, object_id)
def delete(self, object_id):
"""Delete the buffer in the PlasmaStore for a particular object ID.
Once a buffer has been deleted, the buffer is no longer accessible.
Args:
object_id (str): A string used to identify an object.
"""
libplasma.delete(self.conn, object_id)
def evict(self, num_bytes):
"""Evict some objects until to recover some bytes.
Recover at least num_bytes bytes if possible.
Args:
num_bytes (int): The number of bytes to attempt to recover.
"""
return libplasma.evict(self.conn, num_bytes)
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to another plasma instance
Args:
addr (str): IPv4 address of the plasma instance the object is sent
to.
port (int): Port number of the plasma instance the object is sent to.
object_id (str): A string used to identify an object.
"""
return libplasma.transfer(self.conn, object_id, addr, port)
def fetch(self, object_ids):
"""Fetch the objects with the given IDs from other plasma managers.
Args:
object_ids (List[str]): A list of strings used to identify the
objects.
"""
return libplasma.fetch(self.conn, object_ids)
def wait(self, object_ids, timeout=PLASMA_WAIT_TIMEOUT, num_returns=1):
"""Wait until num_returns objects in object_ids are ready.
Currently, the object ID arguments to wait must be unique.
Args:
object_ids (List[str]): List of object IDs to wait for.
timeout (int): Return to the caller after timeout milliseconds.
num_returns (int): We are waiting for this number of objects to be
ready.
Returns:
ready_ids, waiting_ids (List[str], List[str]): List of object IDs
that are ready and list of object IDs we might still wait on
respectively.
"""
# Check that the object ID arguments are unique. The plasma manager
# currently crashes if given duplicate object IDs.
if len(object_ids) != len(set(object_ids)):
raise Exception("Wait requires a list of unique object IDs.")
ready_ids, waiting_ids = libplasma.wait(self.conn, object_ids, timeout,
num_returns)
return ready_ids, list(waiting_ids)
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
self.notification_fd = libplasma.subscribe(self.conn)
def get_next_notification(self):
"""Get the next notification from the notification socket."""
return libplasma.receive_notification(self.notification_fd)
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
+74 -450
View File
@@ -3,16 +3,20 @@ from __future__ import division
from __future__ import print_function
import numpy as np
from numpy.testing import assert_equal
import os
import random
import signal
import subprocess
import sys
import threading
import time
import unittest
import ray.plasma as plasma
from ray.plasma.utils import (random_object_id, generate_metadata,
import pyarrow as pa
import pyarrow.plasma as plasma
import ray
from ray.plasma.utils import (random_object_id,
create_object_with_id, create_object)
from ray import services
@@ -20,6 +24,10 @@ USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
def random_name():
return str(random.randint(0, 99999999))
def assert_get_object_equal(unit_test, client1, client2, object_id,
memory_buffer=None, metadata=None):
client1_buff = client1.get([object_id])[0]
@@ -29,473 +37,88 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
unit_test.assertEqual(len(client1_buff), len(client2_buff))
unit_test.assertEqual(len(client1_metadata), len(client2_metadata))
# Check that the buffers from the two clients are the same.
unit_test.assertTrue(plasma.buffers_equal(client1_buff, client2_buff))
assert_equal(np.frombuffer(client1_buff, dtype="uint8"),
np.frombuffer(client2_buff, dtype="uint8"))
# Check that the metadata buffers from the two clients are the same.
unit_test.assertTrue(plasma.buffers_equal(client1_metadata,
client2_metadata))
assert_equal(np.frombuffer(client1_metadata, dtype="uint8"),
np.frombuffer(client2_metadata, dtype="uint8"))
# If a reference buffer was provided, check that it is the same as well.
if memory_buffer is not None:
unit_test.assertTrue(plasma.buffers_equal(memory_buffer, client1_buff))
assert_equal(np.frombuffer(memory_buffer, dtype="uint8"),
np.frombuffer(client1_buff, dtype="uint8"))
# If reference metadata was provided, check that it is the same as well.
if metadata is not None:
unit_test.assertTrue(plasma.buffers_equal(metadata, client1_metadata))
assert_equal(np.frombuffer(metadata, dtype="uint8"),
np.frombuffer(client1_metadata, dtype="uint8"))
class TestPlasmaClient(unittest.TestCase):
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
def setUp(self):
# Start Plasma store.
plasma_store_name, self.p = plasma.start_plasma_store(
use_valgrind=USE_VALGRIND)
# Connect to Plasma.
self.plasma_client = plasma.PlasmaClient(plasma_store_name, None, 64)
# For the eviction test
self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, None, 0)
def tearDown(self):
# Check that the Plasma store is still alive.
self.assertEqual(self.p.poll(), None)
# Kill the plasma store process.
if USE_VALGRIND:
self.p.send_signal(signal.SIGTERM)
self.p.wait()
if self.p.returncode != 0:
os._exit(-1)
else:
self.p.kill()
def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=False, use_profiler=False,
stdout_file=None, stderr_file=None):
"""Start a plasma store process.
Args:
use_valgrind (bool): True if the plasma store should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
Return:
A tuple of the name of the plasma store socket and the process ID of
the plasma store process.
"""
if use_valgrind and use_profiler:
raise Exception("Cannot use valgrind and profiler at the same time.")
plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
plasma_store_name = "/tmp/plasma_store{}".format(random_name())
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
if use_valgrind:
pid = subprocess.Popen(["valgrind",
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--leak-check-heuristics=stdstring",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
elif use_profiler:
pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
else:
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
return plasma_store_name, pid
def test_create(self):
# Create an object id string.
object_id = random_object_id()
# Create a new buffer and write to it.
length = 50
memory_buffer = self.plasma_client.create(object_id, length)
for i in range(length):
memory_buffer[i] = chr(i % 256)
# Seal the object.
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
for i in range(length):
self.assertEqual(memory_buffer[i], chr(i % 256))
def test_create_with_metadata(self):
for length in range(1000):
# Create an object id string.
object_id = random_object_id()
# Create a random metadata string.
metadata = generate_metadata(length)
# Create a new buffer and write to it.
memory_buffer = self.plasma_client.create(object_id, length,
metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
# Seal the object.
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
for i in range(length):
self.assertEqual(memory_buffer[i], chr(i % 256))
# Get the metadata.
metadata_buffer = self.plasma_client.get_metadata([object_id])[0]
self.assertEqual(len(metadata), len(metadata_buffer))
for i in range(len(metadata)):
self.assertEqual(chr(metadata[i]), metadata_buffer[i])
def test_create_existing(self):
# This test is partially used to test the code path in which we create
# an object with an ID that already exists
length = 100
for _ in range(1000):
object_id = random_object_id()
self.plasma_client.create(object_id, length,
generate_metadata(length))
try:
self.plasma_client.create(object_id, length,
generate_metadata(length))
except plasma.plasma_object_exists_error as e:
pass
else:
self.assertTrue(False)
def test_get(self):
num_object_ids = 100
# Test timing out of get with various timeouts.
for timeout in [0, 10, 100, 1000]:
object_ids = [random_object_id() for _ in range(num_object_ids)]
results = self.plasma_client.get(object_ids, timeout_ms=timeout)
self.assertEqual(results, num_object_ids * [None])
data_buffers = []
metadata_buffers = []
for i in range(num_object_ids):
if i % 2 == 0:
data_buffer, metadata_buffer = create_object_with_id(
self.plasma_client, object_ids[i], 2000, 2000)
data_buffers.append(data_buffer)
metadata_buffers.append(metadata_buffer)
# Test timing out from some but not all get calls with various
# timeouts.
for timeout in [0, 10, 100, 1000]:
data_results = self.plasma_client.get(object_ids,
timeout_ms=timeout)
for i in range(num_object_ids):
if i % 2 == 0:
self.assertTrue(plasma.buffers_equal(data_buffers[i // 2],
data_results[i]))
else:
self.assertIsNone(results[i])
def test_store_full(self):
# The store is started with 1GB, so make sure that create throws an
# exception when it is full.
def assert_create_raises_plasma_full(unit_test, size):
partial_size = np.random.randint(size)
try:
_, memory_buffer, _ = create_object(unit_test.plasma_client,
partial_size,
size - partial_size)
except plasma.plasma_out_of_memory_error as e:
pass
else:
# For some reason the above didn't throw an exception, so fail.
unit_test.assertTrue(False)
# Create a list to keep some of the buffers in scope.
memory_buffers = []
_, memory_buffer, _ = create_object(self.plasma_client, 5 * 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 5 * 10 ** 8. Make sure that we can't create an
# object of size 5 * 10 ** 8 + 1, but we can create one of size
# 2 * 10 ** 8.
assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
del memory_buffer
assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 3 * 10 ** 8.
assert_create_raises_plasma_full(self, 3 * 10 ** 8 + 1)
_, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
memory_buffers.append(memory_buffer)
# Remaining space is 2 * 10 ** 8.
assert_create_raises_plasma_full(self, 2 * 10 ** 8 + 1)
def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
real_object_ids = [random_object_id() for _ in range(100)]
for object_id in real_object_ids:
self.assertFalse(self.plasma_client.contains(object_id))
self.plasma_client.create(object_id, 100)
self.plasma_client.seal(object_id)
self.assertTrue(self.plasma_client.contains(object_id))
for object_id in fake_object_ids:
self.assertFalse(self.plasma_client.contains(object_id))
for object_id in real_object_ids:
self.assertTrue(self.plasma_client.contains(object_id))
def test_hash(self):
# Check the hash of an object that doesn't exist.
object_id1 = random_object_id()
self.plasma_client.hash(object_id1)
length = 1000
# Create a random object, and check that the hash function always
# returns the same value.
metadata = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id1, length, metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
self.plasma_client.seal(object_id1)
self.assertEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id1))
# Create a second object with the same value as the first, and check
# that their hashes are equal.
object_id2 = random_object_id()
memory_buffer = self.plasma_client.create(object_id2, length, metadata)
for i in range(length):
memory_buffer[i] = chr(i % 256)
self.plasma_client.seal(object_id2)
self.assertEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id2))
# Create a third object with a different value from the first two, and
# check that its hash is different.
object_id3 = random_object_id()
metadata = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id3, length, metadata)
for i in range(length):
memory_buffer[i] = chr((i + 1) % 256)
self.plasma_client.seal(object_id3)
self.assertNotEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id3))
# Create a fourth object with the same value as the third, but
# different metadata. Check that its hash is different from any of the
# previous three.
object_id4 = random_object_id()
metadata4 = generate_metadata(length)
memory_buffer = self.plasma_client.create(object_id4, length,
metadata4)
for i in range(length):
memory_buffer[i] = chr((i + 1) % 256)
self.plasma_client.seal(object_id4)
self.assertNotEqual(self.plasma_client.hash(object_id1),
self.plasma_client.hash(object_id4))
self.assertNotEqual(self.plasma_client.hash(object_id3),
self.plasma_client.hash(object_id4))
def test_many_hashes(self):
hashes = []
length = 2 ** 10
for i in range(256):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, length)
for j in range(length):
memory_buffer[j] = chr(i)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Create objects of varying length. Each pair has two bits different.
for i in range(length):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, length)
for j in range(length):
memory_buffer[j] = chr(0)
memory_buffer[i] = chr(1)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Create objects of varying length, all with value 0.
for i in range(length):
object_id = random_object_id()
memory_buffer = self.plasma_client.create(object_id, i)
for j in range(i):
memory_buffer[j] = chr(0)
self.plasma_client.seal(object_id)
hashes.append(self.plasma_client.hash(object_id))
# Check that all hashes were unique.
self.assertEqual(len(set(hashes)), 256 + length + length)
# def test_individual_delete(self):
# length = 100
# # Create an object id string.
# object_id = random_object_id()
# # Create a random metadata string.
# metadata = generate_metadata(100)
# # Create a new buffer and write to it.
# memory_buffer = self.plasma_client.create(object_id, length, metadata)
# for i in range(length):
# memory_buffer[i] = chr(i % 256)
# # Seal the object.
# self.plasma_client.seal(object_id)
# # Check that the object is present.
# self.assertTrue(self.plasma_client.contains(object_id))
# # Delete the object.
# self.plasma_client.delete(object_id)
# # Make sure the object is no longer present.
# self.assertFalse(self.plasma_client.contains(object_id))
#
# def test_delete(self):
# # Create some objects.
# object_ids = [random_object_id() for _ in range(100)]
# for object_id in object_ids:
# length = 100
# # Create a random metadata string.
# metadata = generate_metadata(100)
# # Create a new buffer and write to it.
# memory_buffer = self.plasma_client.create(object_id, length,
# metadata)
# for i in range(length):
# memory_buffer[i] = chr(i % 256)
# # Seal the object.
# self.plasma_client.seal(object_id)
# # Check that the object is present.
# self.assertTrue(self.plasma_client.contains(object_id))
#
# # Delete the objects and make sure they are no longer present.
# for object_id in object_ids:
# # Delete the object.
# self.plasma_client.delete(object_id)
# # Make sure the object is no longer present.
# self.assertFalse(self.plasma_client.contains(object_id))
def test_illegal_functionality(self):
# Create an object id string.
object_id = random_object_id()
# Create a new buffer and write to it.
length = 1000
memory_buffer = self.plasma_client.create(object_id, length)
# Make sure we cannot access memory out of bounds.
self.assertRaises(Exception, lambda: memory_buffer[length])
# Seal the object.
self.plasma_client.seal(object_id)
# This test is commented out because it currently fails.
# # Make sure the object is ready only now.
# def illegal_assignment():
# memory_buffer[0] = chr(0)
# self.assertRaises(Exception, illegal_assignment)
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
# Make sure the object is read only.
def illegal_assignment():
memory_buffer[0] = chr(0)
self.assertRaises(Exception, illegal_assignment)
def test_evict(self):
client = self.plasma_client2
object_id1 = random_object_id()
b1 = client.create(object_id1, 1000)
client.seal(object_id1)
del b1
self.assertEqual(client.evict(1), 1000)
object_id2 = random_object_id()
object_id3 = random_object_id()
b2 = client.create(object_id2, 999)
b3 = client.create(object_id3, 998)
client.seal(object_id3)
del b3
self.assertEqual(client.evict(1000), 998)
object_id4 = random_object_id()
b4 = client.create(object_id4, 997)
client.seal(object_id4)
del b4
client.seal(object_id2)
del b2
self.assertEqual(client.evict(1), 997)
self.assertEqual(client.evict(1), 999)
object_id5 = random_object_id()
object_id6 = random_object_id()
object_id7 = random_object_id()
b5 = client.create(object_id5, 996)
b6 = client.create(object_id6, 995)
b7 = client.create(object_id7, 994)
client.seal(object_id5)
client.seal(object_id6)
client.seal(object_id7)
del b5
del b6
del b7
self.assertEqual(client.evict(2000), 996 + 995 + 994)
def test_subscribe(self):
# Subscribe to notifications from the Plasma Store.
self.plasma_client.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
object_ids = [random_object_id() for _ in range(i)]
metadata_sizes = [np.random.randint(1000) for _ in range(i)]
data_sizes = [np.random.randint(1000) for _ in range(i)]
for j in range(i):
self.plasma_client.create(
object_ids[j], size=data_sizes[j],
metadata=bytearray(np.random.bytes(metadata_sizes[j])))
self.plasma_client.seal(object_ids[j])
# Check that we received notifications for all of the objects.
for j in range(i):
notification_info = self.plasma_client.get_next_notification()
recv_objid, recv_dsize, recv_msize = notification_info
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(data_sizes[j], recv_dsize)
self.assertEqual(metadata_sizes[j], recv_msize)
def test_subscribe_deletions(self):
# Subscribe to notifications from the Plasma Store. We use
# plasma_client2 to make sure that all used objects will get evicted
# properly.
self.plasma_client2.subscribe()
for i in [1, 10, 100, 1000, 10000, 100000]:
object_ids = [random_object_id() for _ in range(i)]
# Add 1 to the sizes to make sure we have nonzero object sizes.
metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
for j in range(i):
x = self.plasma_client2.create(
object_ids[j], size=data_sizes[j],
metadata=bytearray(np.random.bytes(metadata_sizes[j])))
self.plasma_client2.seal(object_ids[j])
del x
# Check that we received notifications for creating all of the
# objects.
for j in range(i):
notification_info = self.plasma_client2.get_next_notification()
recv_objid, recv_dsize, recv_msize = notification_info
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(data_sizes[j], recv_dsize)
self.assertEqual(metadata_sizes[j], recv_msize)
# Check that we receive notifications for deleting all objects, as
# we evict them.
for j in range(i):
self.assertEqual(self.plasma_client2.evict(1),
data_sizes[j] + metadata_sizes[j])
notification_info = self.plasma_client2.get_next_notification()
recv_objid, recv_dsize, recv_msize = notification_info
self.assertEqual(object_ids[j], recv_objid)
self.assertEqual(-1, recv_dsize)
self.assertEqual(-1, recv_msize)
# Test multiple deletion notifications. The first 9 object IDs have
# size 0, and the last has a nonzero size. When Plasma evicts 1 byte,
# it will evict all objects, so we should receive deletion
# notifications for each.
num_object_ids = 10
object_ids = [random_object_id() for _ in range(num_object_ids)]
metadata_sizes = [0] * (num_object_ids - 1)
data_sizes = [0] * (num_object_ids - 1)
metadata_sizes.append(np.random.randint(1000))
data_sizes.append(np.random.randint(1000))
for i in range(num_object_ids):
x = self.plasma_client2.create(
object_ids[i], size=data_sizes[i],
metadata=bytearray(np.random.bytes(metadata_sizes[i])))
self.plasma_client2.seal(object_ids[i])
del x
for i in range(num_object_ids):
notification_info = self.plasma_client2.get_next_notification()
recv_objid, recv_dsize, recv_msize = notification_info
self.assertEqual(object_ids[i], recv_objid)
self.assertEqual(data_sizes[i], recv_dsize)
self.assertEqual(metadata_sizes[i], recv_msize)
self.assertEqual(self.plasma_client2.evict(1),
data_sizes[-1] + metadata_sizes[-1])
for i in range(num_object_ids):
notification_info = self.plasma_client2.get_next_notification()
recv_objid, recv_dsize, recv_msize = notification_info
self.assertEqual(object_ids[i], recv_objid)
self.assertEqual(-1, recv_dsize)
self.assertEqual(-1, recv_msize)
# Plasma client tests were moved into arrow
class TestPlasmaManager(unittest.TestCase):
def setUp(self):
# Start two PlasmaStores.
store_name1, self.p2 = plasma.start_plasma_store(
store_name1, self.p2 = start_plasma_store(
use_valgrind=USE_VALGRIND)
store_name2, self.p3 = plasma.start_plasma_store(
store_name2, self.p3 = start_plasma_store(
use_valgrind=USE_VALGRIND)
# Start a Redis server.
redis_address, _ = services.start_redis("127.0.0.1")
# Start two PlasmaManagers.
manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(
manager_name1, self.p4, self.port1 = ray.plasma.start_plasma_manager(
store_name1, redis_address, use_valgrind=USE_VALGRIND)
manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(
manager_name2, self.p5, self.port2 = ray.plasma.start_plasma_manager(
store_name2, redis_address, use_valgrind=USE_VALGRIND)
# Connect two PlasmaClients.
self.client1 = plasma.PlasmaClient(store_name1, manager_name1)
self.client2 = plasma.PlasmaClient(store_name2, manager_name2)
self.client1 = plasma.connect(store_name1, manager_name1, 64)
self.client2 = plasma.connect(store_name2, manager_name2, 64)
# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
@@ -719,7 +342,8 @@ class TestPlasmaManager(unittest.TestCase):
# Make sure that wait returns when the requested number of object IDs
# are available and does not wait for all object IDs to be available.
object_ids = [random_object_id() for _ in range(9)] + [20 * b'\x00']
object_ids = [random_object_id() for _ in range(9)] + \
[plasma.ObjectID(20 * b'\x00')]
object_ids_perm = object_ids[:]
random.shuffle(object_ids_perm)
for i in range(10):
@@ -812,17 +436,17 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
def setUp(self):
# Start a Plasma store.
self.store_name, self.p2 = plasma.start_plasma_store(
self.store_name, self.p2 = start_plasma_store(
use_valgrind=USE_VALGRIND)
# Start a Redis server.
self.redis_address, _ = services.start_redis("127.0.0.1")
# Start a PlasmaManagers.
manager_name, self.p3, self.port1 = plasma.start_plasma_manager(
manager_name, self.p3, self.port1 = ray.plasma.start_plasma_manager(
self.store_name,
self.redis_address,
use_valgrind=USE_VALGRIND)
# Connect a PlasmaClient.
self.client = plasma.PlasmaClient(self.store_name, manager_name)
self.client = plasma.connect(self.store_name, manager_name, 64)
# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
@@ -865,12 +489,12 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
self.assertEqual(waiting, [])
# Start a second plasma manager attached to the same store.
manager_name, self.p5, self.port2 = plasma.start_plasma_manager(
manager_name, self.p5, self.port2 = ray.plasma.start_plasma_manager(
self.store_name, self.redis_address, use_valgrind=USE_VALGRIND)
self.processes_to_kill = [self.p5] + self.processes_to_kill
# Check that the second manager knows about existing objects.
client2 = plasma.PlasmaClient(self.store_name, manager_name)
client2 = plasma.connect(self.store_name, manager_name, 64)
ready, waiting = [], object_ids
while True:
ready, waiting = client2.wait(object_ids, num_returns=num_objects,
+7 -4
View File
@@ -5,9 +5,11 @@ from __future__ import print_function
import numpy as np
import random
import pyarrow.plasma as plasma
def random_object_id():
return np.random.bytes(20)
return plasma.ObjectID(np.random.bytes(20))
def generate_metadata(length):
@@ -22,11 +24,12 @@ def generate_metadata(length):
def write_to_data_buffer(buff, length):
array = np.frombuffer(buff, dtype="uint8")
if length > 0:
buff[0] = chr(random.randint(0, 255))
buff[-1] = chr(random.randint(0, 255))
array[0] = random.randint(0, 255)
array[-1] = random.randint(0, 255)
for _ in range(100):
buff[random.randint(0, length - 1)] = chr(random.randint(0, 255))
array[random.randint(0, length - 1)] = random.randint(0, 255)
def create_object_with_id(client, object_id, data_size, metadata_size,
+16 -10
View File
@@ -20,6 +20,7 @@ import time
import traceback
# Ray modules
import pyarrow.plasma as plasma
import ray.experimental.state as state
import ray.serialization as serialization
import ray.services as services
@@ -300,7 +301,8 @@ class Worker(object):
"type {}.".format(type(value)))
counter += 1
try:
ray.numbuf.store_list(object_id.id(), self.plasma_client.conn,
ray.numbuf.store_list(object_id.id(),
self.plasma_client.to_capsule(),
[value])
break
except serialization.RaySerializationException as e:
@@ -375,7 +377,7 @@ class Worker(object):
for i in range(0, len(object_ids), get_request_size):
results += ray.numbuf.retrieve_list(
object_ids[i:(i + get_request_size)],
self.plasma_client.conn,
self.plasma_client.to_capsule(),
timeout)
return results
except serialization.RayDeserializationException as e:
@@ -420,7 +422,8 @@ class Worker(object):
# smaller fetches so as to not block the manager for a prolonged period
# of time in a single call.
fetch_request_size = 10000
plain_object_ids = [object_id.id() for object_id in object_ids]
plain_object_ids = [plasma.ObjectID(object_id.id())
for object_id in object_ids]
for i in range(0, len(object_ids), fetch_request_size):
self.plasma_client.fetch(
plain_object_ids[i:(i + fetch_request_size)])
@@ -443,7 +446,8 @@ class Worker(object):
# in case they were evicted since the last fetch. We divide the
# fetch into smaller fetches so as to not block the manager for a
# prolonged period of time in a single call.
object_ids_to_fetch = list(unready_ids.keys())
object_ids_to_fetch = list(map(
plasma.ObjectID, unready_ids.keys()))
for i in range(0, len(object_ids_to_fetch), fetch_request_size):
self.plasma_client.fetch(
object_ids_to_fetch[i:(i + fetch_request_size)])
@@ -1026,7 +1030,7 @@ def cleanup(worker=global_worker):
if hasattr(worker, "local_scheduler_client"):
del worker.local_scheduler_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.shutdown()
worker.plasma_client.disconnect()
if worker.mode in [SCRIPT_MODE, SILENT_MODE]:
# If this is a driver, push the finish time to Redis and clean up any
@@ -1371,8 +1375,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
raise Exception("This code should be unreachable.")
# Create an object store client.
worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"],
info["manager_socket_name"])
worker.plasma_client = plasma.connect(info["store_socket_name"],
info["manager_socket_name"],
64)
# Create the local scheduler client.
if worker.actor_id != NIL_ACTOR_ID:
num_gpus = int(worker.redis_client.hget(b"Actor:" + actor_id,
@@ -1713,14 +1718,15 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
check_connected(worker)
with log_span("ray:wait", worker=worker):
check_main_thread()
object_id_strs = [object_id.id() for object_id in object_ids]
object_id_strs = [plasma.ObjectID(object_id.id())
for object_id in object_ids]
timeout = timeout if timeout is not None else 2 ** 30
ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs,
timeout,
num_returns)
ready_ids = [ray.local_scheduler.ObjectID(object_id)
ready_ids = [ray.local_scheduler.ObjectID(object_id.binary())
for object_id in ready_ids]
remaining_ids = [ray.local_scheduler.ObjectID(object_id)
remaining_ids = [ray.local_scheduler.ObjectID(object_id.binary())
for object_id in remaining_ids]
return ready_ids, remaining_ids
+22 -2
View File
@@ -5,14 +5,35 @@ from __future__ import print_function
import os
import shutil
import subprocess
import sys
from setuptools import setup, find_packages, Distribution
import setuptools.command.build_ext as _build_ext
# This used to be the first line of the run method in the build_ext class.
# However, we moved it here because the previous approach seemed to fail in
# Docker. Inside of the build.sh script, we install the pyarrow Python module.
# Something about calling "python setup.py install" inside of the build_ext
# run method doesn't work (this is easily reproducible in Docker with just a
# couple files to simulate two Python modules). The problem is that the pyarrow
# module doesn't get added to the easy-install.pth file, so it never gets added
# to the Python path even though the package is built and copied to the right
# location. An alternative fix would be to manually modify the easy-install.pth
# file. TODO(rkn): Fix all of this.
#
# Note: We are passing in sys.executable so that we use the same version of
# Python to build pyarrow inside the build.sh script. Note that certain flags
# will not be passed along such as --user or sudo. TODO(rkn): Fix this.
subprocess.check_call(["../build.sh", sys.executable])
class build_ext(_build_ext.build_ext):
def run(self):
subprocess.check_call(["../build.sh"])
# The line below has been moved outside of the build_ext class. See the
# explanation there.
# subprocess.check_call(["../build.sh"])
# Ideally, we could include these files by putting them in a
# MANIFEST.in or using the package_data argument to setup, but the
# MANIFEST.in gets applied at the very beginning when setup.py runs
@@ -46,7 +67,6 @@ files_to_include = [
"ray/core/src/common/redis_module/libray_redis_module.so",
"ray/core/src/plasma/plasma_store",
"ray/core/src/plasma/plasma_manager",
"ray/core/src/plasma/libplasma.so",
"ray/core/src/local_scheduler/local_scheduler",
"ray/core/src/local_scheduler/liblocal_scheduler_library.so",
"ray/core/src/numbuf/libnumbuf.so",