diff --git a/build.sh b/build.sh index 592e472fd..4a6800741 100755 --- a/build.sh +++ b/build.sh @@ -117,6 +117,14 @@ fi pushd "$BUILD_DIR" +# The following line installs pyarrow from S3, these wheels have been +# generated from https://github.com/ray-project/arrow-build from +# the commit listed in the command. +if [ -z "$SKIP_THIRDPARTY_INSTALL" ]; then + "$PYTHON_EXECUTABLE" -m pip install -q \ + --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ + --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html +fi WORK_DIR=`mktemp -d` pushd $WORK_DIR diff --git a/ci/travis/format.sh b/ci/travis/format.sh index 0f7ce3a6a..94aa47eae 100755 --- a/ci/travis/format.sh +++ b/ci/travis/format.sh @@ -79,6 +79,7 @@ YAPF_FLAGS=( YAPF_EXCLUDES=( '--exclude' 'python/ray/cloudpickle/*' '--exclude' 'python/build/*' + '--exclude' 'python/ray/pyarrow_files/*' '--exclude' 'python/ray/core/src/ray/gcs/*' '--exclude' 'python/ray/thirdparty_files/*' ) @@ -144,7 +145,6 @@ fi # Ensure import ordering # Make sure that for every import psutil; import setpproctitle # There's a import ray above it. - python ci/travis/check_import_order.py . -s ci -s python/ray/pyarrow_files -s python/ray/thirdparty_files -s python/build if ! git diff --quiet &>/dev/null; then diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index c609099fe..8a2cb75fd 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -10,6 +10,7 @@ mock numpy opencv-python-headless pandas +pyarrow pygments psutil pyyaml diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 9874d36c2..a8f42f9be 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -38,8 +38,24 @@ if os.path.exists(so_path): from ctypes import CDLL CDLL(so_path, ctypes.RTLD_GLOBAL) +# MUST import ray._raylet before pyarrow to initialize some global variables. +# It seems the library related to memory allocation in pyarrow will destroy the +# initialization of grpc if we import pyarrow at first. +# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more +# details. import ray._raylet # noqa: E402 +if "pyarrow" in sys.modules: + raise ImportError("Ray must be imported before pyarrow because Ray " + "requires a specific version of pyarrow (which is " + "packaged along with Ray).") + +# Add the directory containing pyarrow to the Python path so that we find the +# pyarrow version packaged with ray and not a pre-existing pyarrow. +pyarrow_path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), "pyarrow_files") +sys.path.insert(0, pyarrow_path) + # See https://github.com/ray-project/ray/issues/131. helpful_message = """ @@ -48,6 +64,37 @@ If you are using Anaconda, try fixing this problem by running: conda install libgcc """ +try: + import pyarrow # noqa: F401 + + # pyarrow is not imported inside of _raylet because of the issue described + # above. In order for Cython to compile _raylet, pyarrow is set to None + # in _raylet instead, so we give _raylet a real reference to it here. + # We first do the attribute checks here so that building the documentation + # succeeds without fully installing ray.. + # TODO(edoakes): Fix this. + if hasattr(ray, "_raylet") and hasattr(ray._raylet, "pyarrow"): + ray._raylet.pyarrow = pyarrow +except ImportError as e: + if ((hasattr(e, "msg") and isinstance(e.msg, str) + and ("libstdc++" in e.msg or "CXX" in e.msg))): + # This code path should be taken with Python 3. + e.msg += helpful_message + elif (hasattr(e, "message") and isinstance(e.message, str) + and ("libstdc++" in e.message or "CXX" in e.message)): + # This code path should be taken with Python 2. + condition = (hasattr(e, "args") and isinstance(e.args, tuple) + and len(e.args) == 1 and isinstance(e.args[0], str)) + if condition: + e.args = (e.args[0] + helpful_message, ) + else: + if not hasattr(e, "args"): + e.args = () + elif not isinstance(e.args, tuple): + e.args = (e.args, ) + e.args += (helpful_message, ) + raise + from ray._raylet import ( ActorCheckpointID, ActorClassID, diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 53157a1cb..d72f8cfa3 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -75,7 +75,6 @@ cdef class CoreWorker: unique_ptr[CCoreWorker] core_worker object async_thread object async_event_loop - object plasma_event_handler cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectID object_id, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index bcc1d9347..a5554d239 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -98,6 +98,14 @@ from ray.ray_constants import ( DEFAULT_PUT_OBJECT_RETRIES, ) +# pyarrow cannot be imported until after _raylet finishes initializing +# (see ray/__init__.py for details). +# Unfortunately, Cython won't compile if 'pyarrow' is undefined, so we +# "forward declare" it here and then replace it with a reference to the +# imported package from ray/__init__.py. +# TODO(edoakes): Fix this. +pyarrow = None + cimport cpython include "includes/unique_ids.pxi" @@ -539,14 +547,6 @@ cdef CRayStatus task_execution_handler( return CRayStatus.OK() -cdef void async_plasma_callback(CObjectID object_id, - int64_t data_size, - int64_t metadata_size) with gil: - message = [tuple([ObjectID(object_id.Binary()), data_size, metadata_size])] - core_worker = ray.worker.global_worker.core_worker - event_handler = core_worker.get_plasma_event_handler() - if event_handler is not None: - event_handler.process_notifications(message) cdef CRayStatus check_signals() nogil: with gil: @@ -569,20 +569,17 @@ cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): cdef write_serialized_object( serialized_object, const shared_ptr[CBuffer]& buf): + # avoid initializing pyarrow before raylet from ray.serialization import Pickle5SerializedObject, RawSerializedObject if isinstance(serialized_object, RawSerializedObject): if buf.get() != NULL and buf.get().Size() > 0: - size = serialized_object.total_bytes - if MEMCOPY_THREADS > 1 and size > kMemcopyDefaultThreshold: - parallel_memcopy(buf.get().Data(), - serialized_object.value, - size, kMemcopyDefaultBlocksize, - MEMCOPY_THREADS) - else: - memcpy(buf.get().Data(), - serialized_object.value, size) - + buffer = Buffer.make(buf) + # `Buffer` has a nullptr buffer underlying if size is 0, + # which will cause `pyarrow.py_buffer` crash + stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer)) + stream.set_memcopy_threads(MEMCOPY_THREADS) + stream.write(pyarrow.py_buffer(serialized_object.value)) elif isinstance(serialized_object, Pickle5SerializedObject): (serialized_object.writer).write_to( serialized_object.inband, buf, MEMCOPY_THREADS) @@ -595,6 +592,9 @@ cdef class CoreWorker: def __cinit__(self, is_driver, store_socket, raylet_socket, JobID job_id, GcsClientOptions gcs_options, log_dir, node_ip_address, node_manager_port): + assert pyarrow is not None, ("Expected pyarrow to be imported from " + "outside _raylet. See __init__.py for " + "details.") self.core_worker.reset(new CCoreWorker( WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER, @@ -623,13 +623,6 @@ cdef class CoreWorker: def set_actor_title(self, title): self.core_worker.get().SetActorTitle(title) - def subscribe_to_plasma(self, plasma_event_handler): - self.plasma_event_handler = plasma_event_handler - self.core_worker.get().SubscribeToAsyncPlasma(async_plasma_callback) - - def get_plasma_event_handler(self): - return self.plasma_event_handler - def get_objects(self, object_ids, TaskID current_task_id, int64_t timeout_ms=-1): cdef: diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 441c6e103..e6aa2ef41 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -162,13 +162,7 @@ class RayTimeoutError(RayError): pass -class PlasmaObjectNotAvailable(RayError): - """Called when an object was not available within the given timeout.""" - pass - - RAY_EXCEPTION_TYPES = [ - PlasmaObjectNotAvailable, RayError, RayTaskError, RayWorkerError, diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index c15c9e3cc..0d10cca1d 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -1,22 +1,84 @@ # Note: asyncio is only compatible with Python 3 import asyncio +import functools import threading +import pyarrow.plasma as plasma + import ray -from ray.experimental.async_plasma import PlasmaEventHandler +from ray.experimental.async_plasma import PlasmaProtocol, PlasmaEventHandler from ray.services import logger handler = None +transport = None +protocol = None + + +class _ThreadSafeProxy: + """This class is used to create a thread-safe proxy for a given object. + Every method call will be guarded with a lock. + + Attributes: + orig_obj (object): the original object. + lock (threading.Lock): the lock object. + _wrapper_cache (dict): a cache from original object's methods to + the proxy methods. + """ + + def __init__(self, orig_obj, lock): + self.orig_obj = orig_obj + self.lock = lock + self._wrapper_cache = {} + + def __getattr__(self, attr): + orig_attr = getattr(self.orig_obj, attr) + if not callable(orig_attr): + # If the original attr is a field, just return it. + return orig_attr + else: + # If the orginal attr is a method, + # return a wrapper that guards the original method with a lock. + wrapper = self._wrapper_cache.get(attr) + if wrapper is None: + + @functools.wraps(orig_attr) + def _wrapper(*args, **kwargs): + with self.lock: + return orig_attr(*args, **kwargs) + + self._wrapper_cache[attr] = _wrapper + wrapper = _wrapper + return wrapper + + +def thread_safe_client(client, lock=None): + """Create a thread-safe proxy which locks every method call + for the given client. + Args: + client: the client object to be guarded. + lock: the lock object that will be used to lock client's methods. + If None, a new lock will be used. + Returns: + A thread-safe proxy for the given client. + """ + if lock is None: + lock = threading.Lock() + return _ThreadSafeProxy(client, lock) async def _async_init(): - global handler + global handler, transport, protocol if handler is None: worker = ray.worker.global_worker + plasma_client = thread_safe_client( + plasma.connect(worker.node.plasma_store_socket_name, 300)) loop = asyncio.get_event_loop() + plasma_client.subscribe() + rsock = plasma_client.get_notification_socket() handler = PlasmaEventHandler(loop, worker) - worker.core_worker.subscribe_to_plasma(handler) + transport, protocol = await loop.create_connection( + lambda: PlasmaProtocol(plasma_client, handler), sock=rsock) logger.debug("AsyncPlasma Connection Created!") @@ -64,7 +126,10 @@ def shutdown(): Cancels all related tasks and all the socket transportation. """ - global handler + global handler, transport, protocol if handler is not None: handler.close() + transport.close() handler = None + transport = None + protocol = None diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index abc7f48d2..53e240afc 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -1,13 +1,179 @@ import asyncio +import ctypes +import sys + +import pyarrow.plasma as plasma import ray from ray.services import logger -from collections import defaultdict + +INT64_SIZE = ctypes.sizeof(ctypes.c_int64) + + +def _release_waiter(waiter, *_): + if not waiter.done(): + waiter.set_result(None) + + +class PlasmaProtocol(asyncio.Protocol): + """Protocol control for the asyncio connection.""" + + def __init__(self, plasma_client, plasma_event_handler): + self.plasma_client = plasma_client + self.plasma_event_handler = plasma_event_handler + self.transport = None + self._buffer = b"" + + def connection_made(self, transport): + self.transport = transport + + def data_received(self, data): + self._buffer += data + messages = [] + i = 0 + while i + INT64_SIZE <= len(self._buffer): + msg_len = int.from_bytes(self._buffer[i:i + INT64_SIZE], + sys.byteorder) + if i + INT64_SIZE + msg_len > len(self._buffer): + break + i += INT64_SIZE + segment = self._buffer[i:i + msg_len] + i += msg_len + (object_ids, object_sizes, + metadata_sizes) = self.plasma_client.decode_notifications(segment) + assert len(object_ids) == len(object_sizes) == len(metadata_sizes) + for j in range(len(object_ids)): + messages.append((object_ids[j], object_sizes[j], + metadata_sizes[j])) + + self._buffer = self._buffer[i:] + self.plasma_event_handler.process_notifications(messages) + + def connection_lost(self, exc): + # The socket has been closed + logger.debug("PlasmaProtocol - connection lost.") + + def eof_received(self): + logger.debug("PlasmaProtocol - EOF received.") + self.transport.close() class PlasmaObjectFuture(asyncio.Future): - """This class is a wrapper for a Future on Plasma.""" - pass + """This class manages the lifecycle of a Future contains an object_id. + + Note: + This Future is an item in an linked list. + + Attributes: + object_id: The object_id this Future contains. + """ + + def __init__(self, loop, object_id): + super().__init__(loop=loop) + self.object_id = object_id + self.prev = None + self.next = None + + @property + def ray_object_id(self): + return ray.ObjectID(self.object_id.binary()) + + def __repr__(self): + return super().__repr__() + "{object_id=%s}" % self.object_id + + +class PlasmaObjectLinkedList(asyncio.Future): + """This class is a doubly-linked list. + It holds a ObjectID and maintains futures assigned to the ObjectID. + + Args: + loop: an event loop. + plain_object_id (plasma.ObjectID): + The plasma ObjectID this class holds. + """ + + def __init__(self, loop, plain_object_id): + super().__init__(loop=loop) + assert isinstance(plain_object_id, plasma.ObjectID) + self.object_id = plain_object_id + self.head = None + self.tail = None + + def append(self, future): + """Append an object to the linked list. + + Args: + future (PlasmaObjectFuture): A PlasmaObjectFuture instance. + """ + future.prev = self.tail + if self.tail is None: + assert self.head is None + self.head = future + else: + self.tail.next = future + self.tail = future + # Once done, it will be removed from the list. + future.add_done_callback(self.remove) + + def remove(self, future): + """Remove an object from the linked list. + + Args: + future (PlasmaObjectFuture): A PlasmaObjectFuture instance. + """ + if self._loop.get_debug(): + logger.debug("Removing %s from the linked list.", future) + if future.prev is None: + assert future is self.head + self.head = future.next + if self.head is None: + self.tail = None + if not self.cancelled(): + self.set_result(None) + else: + self.head.prev = None + elif future.next is None: + assert future is self.tail + self.tail = future.prev + if self.tail is None: + self.head = None + if not self.cancelled(): + self.set_result(None) + else: + self.tail.prev = None + + def cancel(self, *args, **kwargs): + """Manually cancel all tasks assigned to this event loop.""" + # Because remove all futures will trigger `set_result`, + # we cancel itself first. + super().cancel() + for future in self.traverse(): + # All cancelled futures should have callbacks to removed itself + # from this linked list. However, these callbacks are scheduled in + # an event loop, so we could still find them in our list. + if not future.cancelled(): + future.cancel() + + def set_result(self, result): + """Complete all tasks. """ + for future in self.traverse(): + # All cancelled futures should have callbacks to removed itself + # from this linked list. However, these callbacks are scheduled in + # an event loop, so we could still find them in our list. + future.set_result(result) + if not self.done(): + super().set_result(result) + + def traverse(self): + """Traverse this linked list. + + Yields: + PlasmaObjectFuture: PlasmaObjectFuture instances. + """ + current = self.head + while current is not None: + yield current + current = current.next class PlasmaEventHandler: @@ -17,46 +183,30 @@ class PlasmaEventHandler: super().__init__() self._loop = loop self._worker = worker - self._waiting_dict = defaultdict(list) + self._waiting_dict = {} def process_notifications(self, messages): """Process notifications.""" for object_id, object_size, metadata_size in messages: if object_size > 0 and object_id in self._waiting_dict: - self._complete_future(object_id) + linked_list = self._waiting_dict[object_id] + self._complete_future(linked_list) def close(self): """Clean up this handler.""" - for futures in self._waiting_dict.values(): - for fut in futures: - fut.cancel() + for linked_list in self._waiting_dict.values(): + linked_list.cancel() + # All cancelled linked lists should have callbacks to removed itself + # from the waiting dict. However, these callbacks are scheduled in + # an event loop, so we don't check them now. - def _complete_future(self, ray_object_id): - # TODO(ilr): Consider race condition between popping from the - # waiting_dict and as_future appending to the waiting_dict's list. - logger.debug( - "Completing plasma futures for object id {}".format(ray_object_id)) + def _unregister_callback(self, fut): + del self._waiting_dict[fut.object_id] - obj = self._worker.get_objects([ray_object_id])[0] - futures = self._waiting_dict.pop(ray_object_id) - for fut in futures: - loop = fut._loop - - def complete_closure(): - try: - fut.set_result(obj) - except asyncio.InvalidStateError: - # Avoid issues where process_notifications - # and check_ready both get executed - logger.debug("Failed to set result for future {}." - "Most likely already set.".format(fut)) - - loop.call_soon_threadsafe(complete_closure) - - def check_immediately(self, object_id): - ready, _ = ray.wait([object_id], timeout=0) - if ready: - self._complete_future(object_id) + def _complete_future(self, fut): + obj = self._worker.get_objects([ray.ObjectID( + fut.object_id.binary())])[0] + fut.set_result(obj) def as_future(self, object_id, check_ready=True): """Turn an object_id into a Future object. @@ -69,10 +219,25 @@ class PlasmaEventHandler: PlasmaObjectFuture: A future object that waits the object_id. """ if not isinstance(object_id, ray.ObjectID): - raise TypeError("Input should be a Ray ObjectID.") + raise TypeError("Input should be an ObjectID.") - future = PlasmaObjectFuture(loop=self._loop) - self._waiting_dict[object_id].append(future) - self.check_immediately(object_id) + plain_object_id = plasma.ObjectID(object_id.binary()) + fut = PlasmaObjectFuture(loop=self._loop, object_id=plain_object_id) - return future + if check_ready: + ready, _ = ray.wait([object_id], timeout=0) + if ready: + if self._loop.get_debug(): + logger.debug("%s has been ready.", plain_object_id) + self._complete_future(fut) + return fut + + if plain_object_id not in self._waiting_dict: + linked_list = PlasmaObjectLinkedList(self._loop, plain_object_id) + linked_list.add_done_callback(self._unregister_callback) + self._waiting_dict[plain_object_id] = linked_list + self._waiting_dict[plain_object_id].append(fut) + if self._loop.get_debug(): + logger.debug("%s added to the waiting list.", fut) + + return fut diff --git a/python/ray/experimental/test/async_test.py b/python/ray/experimental/test/async_test.py index 3a8f8b922..3bdca4118 100644 --- a/python/ray/experimental/test/async_test.py +++ b/python/ray/experimental/test/async_test.py @@ -1,6 +1,5 @@ import asyncio import time -import os import pytest @@ -10,7 +9,6 @@ from ray.experimental import async_api @pytest.fixture def init(): - os.environ["RAY_FORCE_DIRECT"] = "0" ray.init(num_cpus=4) async_api.init() asyncio.get_event_loop().set_debug(False) diff --git a/python/ray/includes/buffer.pxi b/python/ray/includes/buffer.pxi index 34e07412e..6ac8b1db5 100644 --- a/python/ray/includes/buffer.pxi +++ b/python/ray/includes/buffer.pxi @@ -10,8 +10,8 @@ cdef class Buffer: """Cython wrapper class of C++ `ray::Buffer`. This class implements the Python 'buffer protocol', which allows - us to use it for calls into Python libraries without having to - copy the data. + us to use it for calls into pyarrow (and other Python libraries + down the line) without having to copy the data. See https://docs.python.org/3/c-api/buffer.html for details. """ diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 3973e11ff..5cd5b134e 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -45,9 +45,6 @@ ctypedef void (*ray_callback_function) \ (shared_ptr[CRayObject] result_object, CObjectID object_id, void* user_data) -ctypedef void (*plasma_callback_function) \ - (CObjectID object_id, int64_t data_size, int64_t metadata_size) - cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfiler "ray::worker::Profiler": void Start() @@ -196,5 +193,3 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id) - - void SubscribeToAsyncPlasma(plasma_callback_function callback) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 8dd4c8559..b11b63b64 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -3,13 +3,14 @@ import logging import time import threading +import pyarrow.plasma as plasma + import ray.cloudpickle as pickle from ray import ray_constants, JobID import ray.utils from ray.utils import _random_string from ray.gcs_utils import ErrorType from ray.exceptions import ( - PlasmaObjectNotAvailable, RayActorError, RayWorkerError, UnreconstructableError, @@ -224,7 +225,7 @@ class SerializationContext: if not self.use_pickle: raise ValueError("Receiving pickle5 serialized objects " "while the serialization context is " - "using a custom raw backend.") + "using pyarrow as the backend.") try: in_band, buffers = unpack_pickle5_buffers(data) if len(buffers) > 0: @@ -271,7 +272,7 @@ class SerializationContext: # to the user. We should only reach this line if this object was # deserialized as part of a list, and another object in the list # throws an exception. - return PlasmaObjectNotAvailable + return plasma.ObjectNotAvailable def deserialize_objects(self, data_metadata_pairs, diff --git a/python/ray/services.py b/python/ray/services.py index 9a798d93e..c93d673f1 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -13,6 +13,7 @@ import time import redis import colorama +import pyarrow # Ray modules import ray import ray.ray_constants as ray_constants @@ -506,21 +507,22 @@ def wait_for_redis_to_start(redis_ip_address, def _compute_version_info(): - """Compute the versions of Python, and Ray. + """Compute the versions of Python, pyarrow, and Ray. Returns: A tuple containing the version information. """ ray_version = ray.__version__ python_version = ".".join(map(str, sys.version_info[:3])) - return ray_version, python_version + pyarrow_version = pyarrow.__version__ + return ray_version, python_version, pyarrow_version def _put_version_info_in_redis(redis_client): """Store version information in Redis. This will be used to detect if workers or drivers are started using - different versions of Python, or Ray. + different versions of Python, pyarrow, or Ray. Args: redis_client: A client for the primary Redis shard. @@ -532,7 +534,7 @@ def check_version_info(redis_client): """Check if various version info of this process is correct. This will be used to detect if workers or drivers are started using - different versions of Python, or Ray. If the version + different versions of Python, pyarrow, or Ray. If the version information is not present in Redis, then no check is done. Args: @@ -555,10 +557,12 @@ def check_version_info(redis_client): error_message = ("Version mismatch: The cluster was started with:\n" " Ray: " + true_version_info[0] + "\n" " Python: " + true_version_info[1] + "\n" + " Pyarrow: " + str(true_version_info[2]) + "\n" "This process on node " + node_ip_address + " was started with:" + "\n" " Ray: " + version_info[0] + "\n" - " Python: " + version_info[1] + "\n") + " Python: " + version_info[1] + "\n" + " Pyarrow: " + str(version_info[2])) if version_info[:2] != true_version_info[:2]: raise Exception(error_message) else: diff --git a/python/setup.py b/python/setup.py index 445dd4242..62b384275 100644 --- a/python/setup.py +++ b/python/setup.py @@ -92,7 +92,7 @@ extras["all"] = list(set(chain.from_iterable(extras.values()))) class build_ext(_build_ext.build_ext): def run(self): # Note: We are passing in sys.executable so that we use the same - # version of Python to build packages inside the build.sh script. Note + # version of Python to build pyarrow inside the build.sh script. Note # that certain flags will not be passed along such as --user or sudo. # TODO(rkn): Fix this. command = ["../build.sh", "-p", sys.executable] @@ -101,13 +101,18 @@ class build_ext(_build_ext.build_ext): command += ["-l", "python,java"] subprocess.check_call(command) + # We also need to install pyarrow along with Ray, so make sure that the + # relevant non-Python pyarrow files get copied. + pyarrow_files = self.walk_directory("./ray/pyarrow_files/pyarrow") + # We also need to install pickle5 along with Ray, so make sure that the # relevant non-Python pickle5 files get copied. pickle5_files = self.walk_directory("./ray/pickle5_files/pickle5") thirdparty_files = self.walk_directory("./ray/thirdparty_files") - files_to_include = ray_files + pickle5_files + thirdparty_files + files_to_include = ray_files + pyarrow_files + pickle5_files + \ + thirdparty_files # Copy over the autogenerated protobuf Python bindings. for directory in generated_python_directories: diff --git a/rllib/utils/compression.py b/rllib/utils/compression.py index db7674a37..e979ef902 100644 --- a/rllib/utils/compression.py +++ b/rllib/utils/compression.py @@ -4,7 +4,7 @@ import logging import time import base64 import numpy as np -from ray import cloudpickle as pickle +import pyarrow from six import string_types logger = logging.getLogger(__name__) @@ -27,7 +27,7 @@ def compression_supported(): @DeveloperAPI def pack(data): if LZ4_ENABLED: - data = pickle.dumps(data) + data = pyarrow.serialize(data).to_buffer().to_pybytes() data = lz4.frame.compress(data) # TODO(ekl) we shouldn't need to base64 encode this data, but this # seems to not survive a transfer through the object store if we don't. @@ -47,7 +47,7 @@ def unpack(data): if LZ4_ENABLED: data = base64.b64decode(data) data = lz4.frame.decompress(data) - data = pickle.loads(data) + data = pyarrow.deserialize(data) return data diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index fe455c1ec..cc17249db 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -97,6 +97,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RayLog::InstallFailureSignalHandler(); } RAY_LOG(INFO) << "Initializing worker " << worker_context_.GetWorkerID(); + // Initialize gcs client. gcs_client_ = std::make_shared(gcs_options); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); @@ -234,7 +235,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, if (direct_task_receiver_ != nullptr) { direct_task_receiver_->Init(client_factory, rpc_address_); } - plasma_notifier_.reset(new ObjectStoreNotificationManager(io_service_, store_socket)); } CoreWorker::~CoreWorker() { @@ -1239,14 +1239,6 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c }); } -void CoreWorker::SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback) { - plasma_notifier_->SubscribeObjAdded( - [subscribe_callback](const object_manager::protocol::ObjectInfoT &info) { - subscribe_callback(ObjectID::FromPlasmaIdBinary(info.object_id), info.data_size, - info.metadata_size); - }); -} - void CoreWorker::SetActorId(const ActorID &actor_id) { absl::MutexLock lock(&mutex_); RAY_CHECK(actor_id_.IsNil()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 1ed76bb14..9610a8dc9 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -18,7 +18,6 @@ #include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/gcs/subscription_executor.h" -#include "ray/object_manager/object_store_notification_manager.h" #include "ray/raylet/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/worker/core_worker_client.h" @@ -500,15 +499,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void GetAsync(const ObjectID &object_id, SetResultCallback success_callback, SetResultCallback fallback_callback, void *python_future); - /// Connect to plasma store for async futures - using PlasmaSubscriptionCallback = std::function; - - /// Subscribe to plasma store - /// - /// \param[in] subscribe_callback The callback when an item is added to plasma. - /// \return void - void SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback); - private: /// Run the io_service_ event loop. This should be called in a background thread. void RunIOService(); @@ -754,9 +744,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Queue of tasks to resubmit when the specified time passes. std::deque> to_resubmit_ GUARDED_BY(mutex_); - // Plasma notification manager - std::unique_ptr plasma_notifier_; - friend class CoreWorkerTest; };