From 07c4c6367af43380b4019202488f6370a4773df2 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 12 Sep 2019 23:07:46 -0700 Subject: [PATCH] [core worker] Python core worker object interface (#5272) --- BUILD.bazel | 5 +- ci/travis/format.sh | 2 +- python/ray/__init__.py | 9 + python/ray/_raylet.pyx | 317 ++++++++++++++---- python/ray/actor.py | 5 +- python/ray/exceptions.py | 10 + python/ray/experimental/async_api.py | 64 +++- python/ray/experimental/async_plasma.py | 3 +- .../experimental/streaming/batched_queue.py | 15 +- python/ray/includes/buffer.pxi | 79 +++++ python/ray/includes/common.pxd | 47 ++- python/ray/includes/common.pxi | 23 ++ python/ray/includes/libcoreworker.pxd | 62 ++++ python/ray/includes/libraylet.pxd | 4 +- python/ray/includes/task.pxd | 26 +- python/ray/includes/task.pxi | 11 +- python/ray/includes/unique_ids.pxd | 21 +- python/ray/internal/internal_api.py | 18 +- python/ray/local_mode_manager.py | 2 +- python/ray/node.py | 6 + python/ray/tests/test_actor.py | 94 +++--- python/ray/tests/test_basic.py | 53 ++- python/ray/tests/test_failure.py | 5 +- python/ray/tests/test_memory_limits.py | 5 +- python/ray/tests/test_memory_scheduling.py | 2 +- python/ray/tests/test_object_manager.py | 12 +- python/ray/tests/test_signal.py | 7 +- python/ray/utils.py | 55 --- python/ray/worker.py | 247 +++++--------- rllib/utils/actors.py | 6 +- src/ray/common/status.cc | 6 + src/ray/common/status.h | 18 +- src/ray/core_worker/context.cc | 8 + src/ray/core_worker/context.h | 6 + src/ray/core_worker/core_worker.cc | 56 +++- src/ray/core_worker/core_worker.h | 53 +-- .../java/org_ray_runtime_RayNativeRuntime.cc | 6 +- src/ray/core_worker/object_interface.cc | 35 +- src/ray/core_worker/object_interface.h | 64 +++- .../memory_store/memory_store.h | 20 ++ .../store_provider/memory_store_provider.cc | 34 +- .../store_provider/memory_store_provider.h | 16 +- .../store_provider/plasma_store_provider.cc | 76 ++++- .../store_provider/plasma_store_provider.h | 23 +- .../store_provider/store_provider.h | 42 ++- src/ray/core_worker/test/core_worker_test.cc | 23 +- src/ray/core_worker/test/mock_worker.cc | 2 +- .../transport/direct_actor_transport.cc | 4 +- src/ray/gcs/redis_context.cc | 2 +- 49 files changed, 1157 insertions(+), 552 deletions(-) create mode 100644 python/ray/includes/buffer.pxi create mode 100644 python/ray/includes/common.pxi create mode 100644 python/ray/includes/libcoreworker.pxd diff --git a/BUILD.bazel b/BUILD.bazel index 275a74540..58b621290 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -722,7 +722,10 @@ pyx_library( "python/ray/includes/*.pxd", "python/ray/includes/*.pxi", ]), - deps = ["//:raylet_lib"], + deps = [ + "//:core_worker_lib", + "//:raylet_lib", + ], ) cc_binary( diff --git a/ci/travis/format.sh b/ci/travis/format.sh index 26b23486b..8e43dba2b 100755 --- a/ci/travis/format.sh +++ b/ci/travis/format.sh @@ -86,7 +86,7 @@ format_changed() { if ! git diff --diff-filter=ACM --quiet --exit-code "$MERGEBASE" -- '*.pyx' '*.pxd' '*.pxi' &>/dev/null; then if which flake8 >/dev/null; then git diff --name-only --diff-filter=ACM "$MERGEBASE" -- '*.pyx' '*.pxd' '*.pxi' | xargs -P 5 \ - flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605 + flake8 --inline-quotes '"' --no-avoid-escape --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ --ignore=C408,E121,E123,E126,E211,E225,E226,E227,E24,E704,E999,W503,W504,W605 fi fi diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 5a9515fd7..da985b926 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -33,6 +33,15 @@ If you are using Anaconda, try fixing this problem by running: try: import pyarrow # noqa: F401 + + # pyarrow is not imported inside of _raylet because of the issue described + # above. In order for Cython to compile _raylet, pyarrow is set to None + # in _raylet instead, so we give _raylet a real reference to it here. + # We first do the attribute checks here so that building the documentation + # succeeds without fully installing ray.. + # TODO(edoakes): Fix this. + if hasattr(ray, "_raylet") and hasattr(ray._raylet, "pyarrow"): + ray._raylet.pyarrow = pyarrow except ImportError as e: if ((hasattr(e, "msg") and isinstance(e.msg, str) and ("libstdc++" in e.msg or "CXX" in e.msg))): diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a1e9387fe..dba03b2fe 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4,10 +4,17 @@ # cython: language_level = 3 import numpy +import time +import logging -from libc.stdint cimport int32_t, int64_t +from libc.stdint cimport uint8_t, int32_t, int64_t from libcpp cimport bool as c_bool -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport ( + dynamic_pointer_cast, + make_shared, + shared_ptr, + unique_ptr, +) from libcpp.string cimport string as c_string from libcpp.utility cimport pair from libcpp.unordered_map cimport unordered_map @@ -17,10 +24,15 @@ from cython.operator import dereference, postincrement from ray.includes.common cimport ( CLanguage, + CRayObject, CRayStatus, + CGcsClientOptions, + LocalMemoryBuffer, LANGUAGE_CPP, LANGUAGE_JAVA, LANGUAGE_PYTHON, + WORKER_TYPE_WORKER, + WORKER_TYPE_DRIVER, ) from ray.includes.libraylet cimport ( CRayletClient, @@ -34,16 +46,35 @@ from ray.includes.unique_ids cimport ( CObjectID, CClientID, ) +from ray.includes.libcoreworker cimport CCoreWorker from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig -from ray.exceptions import RayletError +from ray.exceptions import RayletError, ObjectStoreFullError from ray.utils import decode +from ray.ray_constants import ( + DEFAULT_PUT_OBJECT_DELAY, + DEFAULT_PUT_OBJECT_RETRIES, + RAW_BUFFER_METADATA, +) + +# pyarrow cannot be imported until after _raylet finishes initializing +# (see ray/__init__.py for details). +# Unfortunately, Cython won't compile if 'pyarrow' is undefined, so we +# "forward declare" it here and then replace it with a reference to the +# imported package from ray/__init__.py. +# TODO(edoakes): Fix this. +pyarrow = None cimport cpython include "includes/unique_ids.pxi" include "includes/ray_config.pxi" include "includes/task.pxi" +include "includes/buffer.pxi" +include "includes/common.pxi" + + +logger = logging.getLogger(__name__) if cpython.PY_MAJOR_VERSION >= 3: @@ -58,6 +89,10 @@ cdef int check_status(const CRayStatus& status) nogil except -1: with gil: message = status.message().decode() + + if status.IsObjectStoreFull(): + raise ObjectStoreFullError(message) + else: raise RayletError(message) @@ -78,13 +113,6 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids): return result -cdef VectorToObjectIDs(c_vector[CObjectID] object_ids): - result = [] - for i in range(object_ids.size()): - result.append(ObjectID(object_ids[i].Binary())) - return result - - def compute_put_id(TaskID task_id, int64_t put_index): if put_index < 1 or put_index > CObjectID.MaxObjectIndex(): raise ValueError("The range of 'put_index' should be [1, %d]" @@ -217,26 +245,23 @@ cdef unordered_map[c_string, double] resource_map_from_dict(resource_map): cdef class RayletClient: - cdef unique_ptr[CRayletClient] client + cdef CRayletClient* client - def __cinit__(self, raylet_socket, - WorkerID worker_id, - c_bool is_worker, - JobID job_id): - # We know that we are using Python, so just skip the language - # parameter. - # TODO(suquark): Should we allow unicode chars in "raylet_socket"? - self.client.reset(new CRayletClient( - raylet_socket.encode("ascii"), worker_id.native(), is_worker, - job_id.native(), LANGUAGE_PYTHON)) - - def disconnect(self): - check_status(self.client.get().Disconnect()) + def __cinit__(self, CoreWorker core_worker): + # The core worker and raylet client need to share an underlying + # raylet client, so we take a reference to the core worker's client + # here. The client is a raw pointer because it is only a temporary + # workaround and will be removed once the core worker transition is + # complete, so we don't want to change the unique_ptr in core worker + # to a shared_ptr. This means the core worker *must* be + # initialized before the raylet client. + self.client = &core_worker.core_worker.get().GetRayletClient() def submit_task(self, TaskSpec task_spec): cdef: CObjectID c_id - check_status(self.client.get().SubmitTask( + + check_status(self.client.SubmitTask( task_spec.task_spec.get()[0])) def get_task(self): @@ -244,45 +269,28 @@ cdef class RayletClient: unique_ptr[CTaskSpec] task_spec with nogil: - check_status(self.client.get().GetTask(&task_spec)) + check_status(self.client.GetTask(&task_spec)) return TaskSpec.make(task_spec) def task_done(self): - check_status(self.client.get().TaskDone()) + check_status(self.client.TaskDone()) def fetch_or_reconstruct(self, object_ids, c_bool fetch_only, TaskID current_task_id=TaskID.nil()): cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids) - check_status(self.client.get().FetchOrReconstruct( + check_status(self.client.FetchOrReconstruct( fetch_ids, fetch_only, current_task_id.native())) - def notify_unblocked(self, TaskID current_task_id): - check_status(self.client.get().NotifyUnblocked(current_task_id.native())) - - def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds, - c_bool wait_local, TaskID current_task_id): - cdef: - WaitResultPair result - c_vector[CObjectID] wait_ids - CTaskID c_task_id = current_task_id.native() - wait_ids = ObjectIDsToVector(object_ids) - with nogil: - check_status(self.client.get().Wait(wait_ids, num_returns, - timeout_milliseconds, - wait_local, - c_task_id, &result)) - return (VectorToObjectIDs(result.first), - VectorToObjectIDs(result.second)) - def resource_ids(self): cdef: ResourceMappingType resource_mapping = ( - self.client.get().GetResourceIDs()) + self.client.GetResourceIDs()) unordered_map[ c_string, c_vector[pair[int64_t, double]] ].iterator iterator = resource_mapping.begin() c_vector[pair[int64_t, double]] c_value + resources_dict = {} while iterator != resource_mapping.end(): key = decode(dereference(iterator).first) @@ -297,10 +305,10 @@ cdef class RayletClient: def push_error(self, JobID job_id, error_type, error_message, double timestamp): - check_status(self.client.get().PushError(job_id.native(), - error_type.encode("ascii"), - error_message.encode("ascii"), - timestamp)) + check_status(self.client.PushError(job_id.native(), + error_type.encode("ascii"), + error_message.encode("ascii"), + timestamp)) def push_profile_events(self, component_type, UniqueID component_id, node_ip_address, profile_data): @@ -344,42 +352,207 @@ cdef class RayletClient: raise ValueError( "Unknown profile event key '%s'" % key_string) - check_status(self.client.get().PushProfileEvents(profile_info)) - - def free_objects(self, object_ids, c_bool local_only, c_bool delete_creating_tasks): - cdef c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids) - check_status(self.client.get().FreeObjects(free_ids, local_only, delete_creating_tasks)) + check_status(self.client.PushProfileEvents(profile_info)) def prepare_actor_checkpoint(self, ActorID actor_id): - cdef CActorCheckpointID checkpoint_id - cdef CActorID c_actor_id = actor_id.native() + cdef: + CActorCheckpointID checkpoint_id + CActorID c_actor_id = actor_id.native() + # PrepareActorCheckpoint will wait for raylet's reply, release # the GIL so other Python threads can run. with nogil: - check_status(self.client.get().PrepareActorCheckpoint( + check_status(self.client.PrepareActorCheckpoint( c_actor_id, checkpoint_id)) return ActorCheckpointID(checkpoint_id.Binary()) def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, ActorCheckpointID checkpoint_id): - check_status(self.client.get().NotifyActorResumedFromCheckpoint( + check_status(self.client.NotifyActorResumedFromCheckpoint( actor_id.native(), checkpoint_id.native())) - def set_resource(self, basestring resource_name, double capacity, ClientID client_id): - self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.FromBinary(client_id.binary())) - - @property - def language(self): - return Language.from_native(self.client.get().GetLanguage()) - - @property - def client_id(self): - return ClientID(self.client.get().GetWorkerID().Binary()) + def set_resource(self, basestring resource_name, + double capacity, ClientID client_id): + self.client.SetResource(resource_name.encode("ascii"), capacity, + CClientID.FromBinary(client_id.binary())) @property def job_id(self): - return JobID(self.client.get().GetJobID().Binary()) + return JobID(self.client.GetJobID().Binary()) @property def is_worker(self): - return self.client.get().IsWorker() + return self.client.IsWorker() + + +cdef class CoreWorker: + cdef unique_ptr[CCoreWorker] core_worker + + def __cinit__(self, is_driver, store_socket, raylet_socket, + JobID job_id, GcsClientOptions gcs_options, log_dir): + self.core_worker.reset(new CCoreWorker( + WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER, + LANGUAGE_PYTHON, store_socket.encode("ascii"), + raylet_socket.encode("ascii"), job_id.native(), + gcs_options.native()[0], log_dir.encode("utf-8"), NULL, False)) + + assert pyarrow is not None, ("Expected pyarrow to be imported from " + "outside _raylet. See __init__.py for " + "details.") + + def get_objects(self, object_ids, TaskID current_task_id): + cdef: + c_vector[shared_ptr[CRayObject]] results + CTaskID c_task_id = current_task_id.native() + c_vector[CObjectID] c_object_ids = ObjectIDsToVector(object_ids) + + with nogil: + check_status(self.core_worker.get().Objects().Get( + c_object_ids, -1, &results)) + + data_metadata_pairs = [] + for result in results: + # core_worker will return a nullptr for objects that couldn't be + # retrieved from the store or if an object was an exception. + if not result.get(): + data_metadata_pairs.append((None, None)) + else: + data = None + metadata = None + if result.get().HasData(): + data = Buffer.make(result.get().GetData()) + if result.get().HasMetadata(): + metadata = Buffer.make( + result.get().GetMetadata()).to_pybytes() + data_metadata_pairs.append((data, metadata)) + + return data_metadata_pairs + + def object_exists(self, ObjectID object_id): + cdef: + c_bool has_object + CObjectID c_object_id = object_id.native() + + with nogil: + check_status(self.core_worker.get().Objects().Contains( + c_object_id, &has_object)) + + return has_object + + def put_serialized_object(self, serialized_object, ObjectID object_id, + int memcopy_threads=6): + cdef: + shared_ptr[CBuffer] data + shared_ptr[CBuffer] metadata + CObjectID c_object_id = object_id.native() + size_t data_size + + data_size = serialized_object.total_bytes + + with nogil: + check_status(self.core_worker.get().Objects().Create( + metadata, data_size, c_object_id, &data)) + + # If data is nullptr, that means the ObjectID already existed, + # which we ignore. + # TODO(edoakes): this is hacky, we should return the error instead + # and deal with it here. + if not data: + return + + stream = pyarrow.FixedSizeBufferWriter( + pyarrow.py_buffer(Buffer.make(data))) + stream.set_memcopy_threads(memcopy_threads) + serialized_object.write_to(stream) + + with nogil: + check_status(self.core_worker.get().Objects().Seal(c_object_id)) + + def put_raw_buffer(self, c_string value, ObjectID object_id, + int memcopy_threads=6): + cdef: + c_string metadata_str = RAW_BUFFER_METADATA + CObjectID c_object_id = object_id.native() + shared_ptr[CBuffer] data + shared_ptr[CBuffer] metadata = dynamic_pointer_cast[ + CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer]( + (metadata_str.data()), metadata_str.size())) + + with nogil: + check_status(self.core_worker.get().Objects().Create( + metadata, value.size(), c_object_id, &data)) + + stream = pyarrow.FixedSizeBufferWriter( + pyarrow.py_buffer(Buffer.make(data))) + stream.set_memcopy_threads(memcopy_threads) + stream.write(pyarrow.py_buffer(value)) + + with nogil: + check_status(self.core_worker.get().Objects().Seal(c_object_id)) + + def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds, + TaskID current_task_id): + cdef: + WaitResultPair result + c_vector[CObjectID] wait_ids + c_vector[c_bool] results + CTaskID c_task_id = current_task_id.native() + + wait_ids = ObjectIDsToVector(object_ids) + with nogil: + check_status(self.core_worker.get().Objects().Wait( + wait_ids, num_returns, timeout_milliseconds, &results)) + + assert len(results) == len(object_ids) + + ready, not_ready = [], [] + for i, object_id in enumerate(object_ids): + if results[i]: + ready.append(object_id) + else: + not_ready.append(object_id) + + return (ready, not_ready) + + def free_objects(self, object_ids, c_bool local_only, + c_bool delete_creating_tasks): + cdef: + c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids) + + with nogil: + check_status(self.core_worker.get().Objects().Delete( + free_ids, local_only, delete_creating_tasks)) + + def set_current_task_id(self, TaskID task_id): + cdef: + CTaskID c_task_id = task_id.native() + + with nogil: + self.core_worker.get().SetCurrentTaskId(c_task_id) + + def set_current_job_id(self, JobID job_id): + cdef: + CJobID c_job_id = job_id.native() + + with nogil: + self.core_worker.get().SetCurrentJobId(c_job_id) + + def set_object_store_client_options(self, c_string client_name, + int64_t limit_bytes): + with nogil: + check_status(self.core_worker.get().Objects().SetClientOptions( + client_name, limit_bytes)) + + def object_store_memory_usage_string(self): + cdef: + c_string message + + with nogil: + message = self.core_worker.get().Objects().MemoryUsageString() + + return message.decode("utf-8") + + def disconnect(self): + with nogil: + self.core_worker.get().Disconnect() diff --git a/python/ray/actor.py b/python/ray/actor.py index c4a166def..d367c7e11 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -862,10 +862,9 @@ def exit_actor(): """ worker = ray.worker.global_worker if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil(): - # Disconnect the worker from the raylet. The point of - # this is so that when the worker kills itself below, the + # Intentionally disconnect the core worker from the raylet so the # raylet won't push an error message to the driver. - worker.raylet_client.disconnect() + worker.core_worker.disconnect() ray.disconnect() # Disconnect global state from GCS. ray.state.state.disconnect() diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 2159b5a1a..c60adae2d 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -90,6 +90,15 @@ class RayletError(RayError): return "The Raylet died with this message: {}".format(self.client_exc) +class ObjectStoreFullError(RayError): + """Indicates that the object store is full. + + This is raised if the attempt to store the object fails + because the object store is full even after multiple retries. + """ + pass + + class UnreconstructableError(RayError): """Indicates that an object is lost and cannot be reconstructed. @@ -120,5 +129,6 @@ RAY_EXCEPTION_TYPES = [ RayTaskError, RayWorkerError, RayActorError, + ObjectStoreFullError, UnreconstructableError, ] diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index 5dde449a5..18b511190 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -1,6 +1,10 @@ # Note: asyncio is only compatible with Python 3 import asyncio +import functools +import threading + +import pyarrow.plasma as plasma import ray from ray.experimental.async_plasma import PlasmaProtocol, PlasmaEventHandler @@ -11,16 +15,70 @@ transport = None protocol = None +class _ThreadSafeProxy(object): + """This class is used to create a thread-safe proxy for a given object. + Every method call will be guarded with a lock. + + Attributes: + orig_obj (object): the original object. + lock (threading.Lock): the lock object. + _wrapper_cache (dict): a cache from original object's methods to + the proxy methods. + """ + + def __init__(self, orig_obj, lock): + self.orig_obj = orig_obj + self.lock = lock + self._wrapper_cache = {} + + def __getattr__(self, attr): + orig_attr = getattr(self.orig_obj, attr) + if not callable(orig_attr): + # If the original attr is a field, just return it. + return orig_attr + else: + # If the orginal attr is a method, + # return a wrapper that guards the original method with a lock. + wrapper = self._wrapper_cache.get(attr) + if wrapper is None: + + @functools.wraps(orig_attr) + def _wrapper(*args, **kwargs): + with self.lock: + return orig_attr(*args, **kwargs) + + self._wrapper_cache[attr] = _wrapper + wrapper = _wrapper + return wrapper + + +def thread_safe_client(client, lock=None): + """Create a thread-safe proxy which locks every method call + for the given client. + Args: + client: the client object to be guarded. + lock: the lock object that will be used to lock client's methods. + If None, a new lock will be used. + Returns: + A thread-safe proxy for the given client. + """ + if lock is None: + lock = threading.Lock() + return _ThreadSafeProxy(client, lock) + + async def _async_init(): global handler, transport, protocol if handler is None: worker = ray.worker.global_worker + plasma_client = thread_safe_client( + plasma.connect(worker.node.plasma_store_socket_name, None, 0, 300)) loop = asyncio.get_event_loop() - worker.plasma_client.subscribe() - rsock = worker.plasma_client.get_notification_socket() + plasma_client.subscribe() + rsock = plasma_client.get_notification_socket() handler = PlasmaEventHandler(loop, worker) transport, protocol = await loop.create_connection( - lambda: PlasmaProtocol(worker.plasma_client, handler), sock=rsock) + lambda: PlasmaProtocol(plasma_client, handler), sock=rsock) logger.debug("AsyncPlasma Connection Created!") diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 58bb85940..9809feef2 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -199,7 +199,8 @@ class PlasmaEventHandler: del self._waiting_dict[fut.object_id] def _complete_future(self, fut): - obj = self._worker.retrieve_and_deserialize([fut.object_id], 0)[0] + obj = self._worker.retrieve_and_deserialize( + [ray.ObjectID(fut.object_id.binary())], 0)[0] fut.set_result(obj) def as_future(self, object_id, check_ready=True): diff --git a/python/ray/experimental/streaming/batched_queue.py b/python/ray/experimental/streaming/batched_queue.py index df4c16d83..13cbf5673 100644 --- a/python/ray/experimental/streaming/batched_queue.py +++ b/python/ray/experimental/streaming/batched_queue.py @@ -21,18 +21,6 @@ def plasma_prefetch(object_id): local_sched_client.fetch_or_reconstruct([ray_obj_id], True) -def plasma_get(object_id): - """Get an object directly from plasma without going through object table. - - Precondition: plasma_prefetch(object_id) has been called before. - """ - client = ray.worker.global_worker.plasma_client - plasma_id = ray.pyarrow.plasma.ObjectID(object_id) - while not client.contains(plasma_id): - pass - return client.get(plasma_id) - - # TODO: doing the timer in Python land is a bit slow class FlushThread(threading.Thread): """A thread that flushes periodically to plasma. @@ -191,7 +179,8 @@ class BatchedQueue(object): self.read_batch_offset + self.prefetch_depth): plasma_prefetch(self._batch_id(self.prefetch_batch_offset)) self.prefetch_batch_offset += 1 - self.read_buffer = plasma_get(self._batch_id(self.read_batch_offset)) + self.read_buffer = ray.get( + ray.ObjectID(self._batch_id(self.read_batch_offset))) self.read_batch_offset += 1 logger.debug("[reader] Fetched batch {} offset {} size {}".format( self.read_batch_offset, self.read_item_offset, diff --git a/python/ray/includes/buffer.pxi b/python/ray/includes/buffer.pxi new file mode 100644 index 000000000..ae272b0cd --- /dev/null +++ b/python/ray/includes/buffer.pxi @@ -0,0 +1,79 @@ +from cpython cimport Py_buffer, PyBytes_FromStringAndSize +from libc.stdint cimport int64_t, uintptr_t +from libc.stdio cimport printf +from libcpp.memory cimport shared_ptr + +from ray.includes.common cimport CBuffer + + +cdef class Buffer: + """Cython wrapper class of C++ `ray::Buffer`. + + This class implements the Python 'buffer protocol', which allows + us to use it for calls into pyarrow (and other Python libraries + down the line) without having to copy the data. + + See https://docs.python.org/3/c-api/buffer.html for details. + """ + cdef: + shared_ptr[CBuffer] buffer + Py_ssize_t shape + Py_ssize_t strides + + @staticmethod + cdef make(const shared_ptr[CBuffer]& buffer): + cdef Buffer self = Buffer.__new__(Buffer) + self.buffer = buffer + self.shape = self.size + self.strides = (1) + return self + + def __len__(self): + return self.size + + @property + def size(self): + """ + The buffer size in bytes. + """ + return self.buffer.get().Size() + + def to_pybytes(self): + """ + Return this buffer as a Python bytes object. Memory is copied. + """ + return PyBytes_FromStringAndSize( + self.buffer.get().Data(), + self.buffer.get().Size()) + + def __getbuffer__(self, Py_buffer* buffer, int flags): + buffer.readonly = 0 + buffer.buf = self.buffer.get().Data() + buffer.format = 'b' + buffer.internal = NULL + buffer.itemsize = 1 + buffer.len = self.size + buffer.ndim = 1 + buffer.obj = self + buffer.shape = &self.shape + buffer.strides = &self.strides + buffer.suboffsets = NULL + + def __getsegcount__(self, Py_ssize_t *len_out): + if len_out != NULL: + len_out[0] = self.size + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void **p): + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = self.buffer.get().Data() + return self.size + + def __getwritebuffer__(self, Py_ssize_t idx, void **p): + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = self.buffer.get().Data() + return self.size diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 18a248304..8feeec351 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -1,7 +1,8 @@ -from libcpp.string cimport string as c_string from libcpp cimport bool as c_bool +from libcpp.memory cimport shared_ptr +from libcpp.string cimport string as c_string -from libc.stdint cimport int64_t +from libc.stdint cimport uint8_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector @@ -49,6 +50,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: @staticmethod CRayStatus RedisError() + @staticmethod + CRayStatus ObjectStoreFull() + c_bool ok() c_bool IsOutOfMemory() c_bool IsKeyError() @@ -58,6 +62,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_bool IsUnknownError() c_bool IsNotImplemented() c_bool IsRedisError() + c_bool IsObjectStoreFull() c_string ToString() c_string CodeAsString() @@ -90,19 +95,24 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: cdef extern from "ray/protobuf/common.pb.h" nogil: cdef cppclass CLanguage "Language": pass + cdef cppclass CWorkerType "ray::WorkerType": + pass # This is a workaround for C++ enum class since Cython has no corresponding # representation. -cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil: +cdef extern from "ray/protobuf/common.pb.h" nogil: cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON" cdef CLanguage LANGUAGE_CPP "Language::CPP" cdef CLanguage LANGUAGE_JAVA "Language::JAVA" +cdef extern from "ray/protobuf/common.pb.h" nogil: + cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER" + cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER" -cdef extern from "ray/common/task/scheduling_resources.h" \ - namespace "ray" nogil: - cdef cppclass ResourceSet "ResourceSet": + +cdef extern from "ray/common/task/scheduling_resources.h" nogil: + cdef cppclass ResourceSet "ray::ResourceSet": ResourceSet() ResourceSet(const unordered_map[c_string, double] &resource_map) ResourceSet(const c_vector[c_string] &resource_labels, @@ -111,7 +121,8 @@ cdef extern from "ray/common/task/scheduling_resources.h" \ c_bool IsEqual(const ResourceSet &other) const c_bool IsSubset(const ResourceSet &other) const c_bool IsSuperset(const ResourceSet &other) const - c_bool AddOrUpdateResource(const c_string &resource_name, double capacity) + c_bool AddOrUpdateResource(const c_string &resource_name, + double capacity) c_bool RemoveResource(const c_string &resource_name) void AddResources(const ResourceSet &other) c_bool SubtractResourcesStrict(const ResourceSet &other) @@ -120,3 +131,25 @@ cdef extern from "ray/common/task/scheduling_resources.h" \ c_bool IsEmpty() const const unordered_map[c_string, double] &GetResourceMap() const const c_string ToString() const + +cdef extern from "ray/common/buffer.h" namespace "ray" nogil: + cdef cppclass CBuffer "ray::Buffer": + uint8_t *Data() const + size_t Size() const + + cdef cppclass LocalMemoryBuffer(CBuffer): + LocalMemoryBuffer(uint8_t *data, size_t size) + +cdef extern from "ray/core_worker/store_provider/store_provider.h" nogil: + cdef cppclass CRayObject "ray::RayObject": + const shared_ptr[CBuffer] &GetData() + const size_t DataSize() const + const shared_ptr[CBuffer] &GetMetadata() const + c_bool HasData() const + c_bool HasMetadata() const + +cdef extern from "ray/gcs/gcs_client_interface.h" nogil: + cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": + CGcsClientOptions(const c_string &ip, int port, + const c_string &password, + c_bool is_test_client) diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi new file mode 100644 index 000000000..971b4cfa7 --- /dev/null +++ b/python/ray/includes/common.pxi @@ -0,0 +1,23 @@ +from libcpp cimport bool as c_bool +from libcpp.string cimport string as c_string + +from ray.includes.common cimport CGcsClientOptions + + +cdef class GcsClientOptions: + """Cython wrapper class of C++ `ray::gcs::GcsClientOptions`.""" + cdef: + unique_ptr[CGcsClientOptions] gcs_client_options + + def __init__(self, redis_ip, int redis_port, + redis_password, c_bool is_test_client=False): + if not redis_password: + redis_password = "" + self.gcs_client_options.reset( + new CGcsClientOptions(redis_ip.encode("ascii"), + redis_port, + redis_password.encode("ascii"), + is_test_client)) + + cdef CGcsClientOptions* native(self): + return (self.gcs_client_options.get()) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd new file mode 100644 index 000000000..bcf747694 --- /dev/null +++ b/python/ray/includes/libcoreworker.pxd @@ -0,0 +1,62 @@ +from libc.stdint cimport int64_t +from libcpp cimport bool as c_bool +from libcpp.memory cimport shared_ptr +from libcpp.string cimport string as c_string +from libcpp.vector cimport vector as c_vector + +from ray.includes.unique_ids cimport ( + CJobID, + CTaskID, + CObjectID, +) +from ray.includes.common cimport ( + CBuffer, + CRayStatus, + CRayObject, + CWorkerType, + CLanguage, + CGcsClientOptions, +) +from ray.includes.libraylet cimport CRayletClient + + +cdef extern from "ray/core_worker/object_interface.h" nogil: + cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface": + CRayStatus SetClientOptions(c_string client_name, int64_t limit) + CRayStatus Put(const CRayObject &object, CObjectID *object_id) + CRayStatus Put(const CRayObject &object, const CObjectID &object_id) + CRayStatus Create(const shared_ptr[CBuffer] &metadata, + const size_t data_size, const CObjectID &object_id, + shared_ptr[CBuffer] *data) + CRayStatus Seal(const CObjectID &object_id) + CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, + c_vector[shared_ptr[CRayObject]] *results) + CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) + CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, + int64_t timeout_ms, c_vector[c_bool] *results) + CRayStatus Delete(const c_vector[CObjectID] &object_ids, + c_bool local_only, c_bool delete_creating_tasks) + c_string MemoryUsageString() + +cdef extern from "ray/core_worker/core_worker.h" nogil: + cdef cppclass CCoreWorker "ray::CoreWorker": + CCoreWorker(const CWorkerType worker_type, const CLanguage language, + const c_string &store_socket, + const c_string &raylet_socket, const CJobID &job_id, + const CGcsClientOptions &gcs_options, + const c_string log_dir, void* execution_callback, + c_bool use_memory_store_) + void Disconnect() + CWorkerType &GetWorkerType() + CLanguage &GetLanguage() + CObjectInterface &Objects() + # CTaskSubmissionInterface &Tasks() + # CTaskExecutionInterface &Execution() + + # TODO(edoakes): remove this once the raylet client is no longer used + # directly. + CRayletClient &GetRayletClient() + # TODO(edoakes): remove this once the Python core worker uses the task + # interfaces + void SetCurrentJobId(const CJobID &job_id) + void SetCurrentTaskId(const CTaskID &task_id) diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd index 45746da2f..9f045ee18 100644 --- a/python/ray/includes/libraylet.pxd +++ b/python/ray/includes/libraylet.pxd @@ -71,7 +71,9 @@ cdef extern from "ray/raylet/raylet_client.h" nogil: CActorCheckpointID &checkpoint_id) CRayStatus NotifyActorResumedFromCheckpoint( const CActorID &actor_id, const CActorCheckpointID &checkpoint_id) - CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id) + CRayStatus SetResource(const c_string &resource_name, + const double capacity, + const CClientID &client_Id) CLanguage GetLanguage() const CWorkerID GetWorkerID() const CJobID GetJobID() const diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index ba306e09f..8b2f040a6 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -17,7 +17,7 @@ from ray.includes.unique_ids cimport ( CTaskID, ) -cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc" nogil: +cdef extern from "ray/protobuf/common.pb.h" nogil: cdef cppclass RpcTaskSpec "ray::rpc::TaskSpec": void CopyFrom(const RpcTaskSpec &value) @@ -29,13 +29,13 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc" nogil: RpcTaskSpec *mutable_task_spec() -cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil: +cdef extern from "ray/protobuf/gcs.pb.h" nogil: cdef cppclass TaskTableData "ray::rpc::TaskTableData": RpcTask *mutable_task() const c_string &SerializeAsString() -cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil: +cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CTaskSpec "ray::TaskSpecification": CTaskSpec(const RpcTaskSpec message) CTaskSpec(const c_string &serialized_binary) @@ -77,18 +77,20 @@ cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil: c_vector[CActorHandleID] NewActorHandles() const -cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil: +cdef extern from "ray/common/task/task_util.h" nogil: cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder": TaskSpecBuilder &SetCommonTaskSpec( const CTaskID &task_id, const CLanguage &language, - const c_vector[c_string] &function_descriptor, const CJobID &job_id, - const CTaskID &parent_task_id, uint64_t parent_counter, - uint64_t num_returns, const unordered_map[c_string, double] &required_resources, - const unordered_map[c_string, double] &required_placement_resources) + const c_vector[c_string] &function_descriptor, + const CJobID &job_id, const CTaskID &parent_task_id, + uint64_t parent_counter, uint64_t num_returns, + const unordered_map[c_string, double] &required_resources, + const unordered_map[c_string, double] &required_placement_resources) # noqa: E501 TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id) - TaskSpecBuilder &AddByValueArg(const c_string &data, const c_string &metadata) + TaskSpecBuilder &AddByValueArg(const c_string &data, + const c_string &metadata) TaskSpecBuilder &SetActorCreationTaskSpec( const CActorID &actor_id, uint64_t max_reconstructions, @@ -100,12 +102,12 @@ cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil: const CObjectID &actor_creation_dummy_object_id, const CObjectID &previous_actor_task_dummy_object_id, uint64_t actor_counter, - const c_vector[CActorHandleID] &new_handle_ids); + const c_vector[CActorHandleID] &new_handle_ids) RpcTaskSpec GetMessage() -cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil: +cdef extern from "ray/common/task/task_execution_spec.h" nogil: cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification": CTaskExecutionSpec(RpcTaskExecutionSpec message) CTaskExecutionSpec(const c_string &serialized_binary) @@ -113,6 +115,6 @@ cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil: c_vector[CObjectID] ExecutionDependencies() uint64_t NumForwards() -cdef extern from "ray/common/task/task.h" namespace "ray" nogil: +cdef extern from "ray/common/task/task.h" nogil: cdef cppclass CTask "ray::Task": CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec) diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 5d0d1251c..f2a38ec4d 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -1,9 +1,3 @@ -from libc.stdint cimport uint8_t -from libcpp.memory cimport ( - make_shared, - shared_ptr, - static_pointer_cast, -) from ray.includes.task cimport ( CTask, CTaskExecutionSpec, @@ -184,8 +178,9 @@ cdef class TaskSpec: arg_list.append( ObjectID(task_spec.ArgId(i, 0).Binary())) else: - data = (task_spec.ArgData(i)[:task_spec.ArgDataSize(i)]) - metadata = (task_spec.ArgMetadata(i)[:task_spec.ArgMetadataSize(i)]) + data = task_spec.ArgData(i)[:task_spec.ArgDataSize(i)] + metadata = task_spec.ArgMetadata(i)[ + :task_spec.ArgMetadataSize(i)] if metadata == RAW_BUFFER_METADATA: obj = data else: diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index e09b5c7fd..5987325f3 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -20,10 +20,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: c_bool IsNil() const c_bool operator==(const CBaseID &rhs) const c_bool operator!=(const CBaseID &rhs) const - const uint8_t *data() const; + const uint8_t *data() const - c_string Binary() const; - c_string Hex() const; + c_string Binary() const + c_string Hex() const cdef cppclass CUniqueID "ray::UniqueID"(CBaseID): CUniqueID() @@ -65,8 +65,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: size_t Size() @staticmethod - CActorID Of(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter) - + CActorID Of(CJobID job_id, CTaskID parent_task_id, + int64_t parent_task_counter) cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID): @@ -123,10 +123,12 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CTaskID ForActorCreationTask(CActorID actor_id) @staticmethod - CTaskID ForActorTask(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter, CActorID actor_id) + CTaskID ForActorTask(CJobID job_id, CTaskID parent_task_id, + int64_t parent_task_counter, CActorID actor_id) @staticmethod - CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter) + CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id, + int64_t parent_task_counter) cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]): @@ -140,10 +142,11 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: const CObjectID Nil() @staticmethod - CObjectID ForPut(const CTaskID &task_id, int64_t index, int64_t transport_type); + CObjectID ForPut(const CTaskID &task_id, int64_t index, + int64_t transport_type) @staticmethod - CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index); + CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index) @staticmethod size_t Size() diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index ff6dd7fc5..6a6fc627a 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -4,12 +4,11 @@ from __future__ import print_function import ray.worker from ray import profiling -import pyarrow __all__ = ["free", "pin_object_data"] -def pin_object_data(obj_id): +def pin_object_data(object_id): """Pin the object data referenced by this object id in memory. The object data cannot be evicted while there exists a Python reference to @@ -25,14 +24,13 @@ def pin_object_data(obj_id): Note that ray will automatically do this for objects created with ray.put() already, unless you ray.put with weakref=True. """ + worker = ray.worker.get_global_worker() - ray.get(obj_id) - obj_id.set_buffer_ref( - ray.worker.global_worker.plasma_client.get_buffers( - [pyarrow.plasma.ObjectID(obj_id.binary())])) + object_id.set_buffer_ref( + worker.core_worker.get_objects([object_id], worker.current_task_id)) -def unpin_object_data(obj_id): +def unpin_object_data(object_id): """Unpin an object pinned by pin_object_id. Examples: @@ -41,7 +39,7 @@ def unpin_object_data(obj_id): >>> unpin_object_id(x_id) # as if the pin didn't happen """ - obj_id.set_buffer_ref(None) + object_id.set_buffer_ref(None) def free(object_ids, local_only=False, delete_creating_tasks=False): @@ -94,5 +92,5 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): if len(object_ids) == 0: return - worker.raylet_client.free_objects(object_ids, local_only, - delete_creating_tasks) + worker.core_worker.free_objects(object_ids, local_only, + delete_creating_tasks) diff --git a/python/ray/local_mode_manager.py b/python/ray/local_mode_manager.py index 50b27d47c..1deb0533b 100644 --- a/python/ray/local_mode_manager.py +++ b/python/ray/local_mode_manager.py @@ -83,7 +83,7 @@ class LocalModeManager(object): object_id.value = value return object_id - def get_object(self, object_ids): + def get_objects(self, object_ids): """Fetch objects from the emulated object store. Accepts only LocalModeObjectIDs and reads values directly from them. diff --git a/python/ray/node.py b/python/ray/node.py index e016ffb75..cf0733a93 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -230,6 +230,12 @@ class Node(object): """Get the node's plasma store socket name.""" return self._plasma_store_socket_name + @property + def unique_id(self): + """Get a unique identifier for this node.""" + return "{}:{}".format(self.node_ip_address, + self._plasma_store_socket_name) + @property def webui_url(self): """Get the cluster's web UI url.""" diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index e28ee35cf..7189ca0fa 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -15,7 +15,6 @@ except ImportError: import signal import sys import time -from pyarrow import plasma import ray import ray.ray_constants as ray_constants @@ -41,8 +40,8 @@ def ray_checkpointable_actor_cls(request): self.resumed_from_checkpoint = False self.checkpoint_dir = checkpoint_dir - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id def increase(self): self.value += 1 @@ -904,7 +903,7 @@ def test_actor_load_balancing(ray_start_cluster): pass def get_location(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id # Create a bunch of actors. num_actors = 30 @@ -972,7 +971,7 @@ def test_actor_gpus(ray_start_cluster): def get_location_and_ids(self): assert ray.get_gpu_ids() == self.gpu_ids - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) # Create one actor per GPU. @@ -1011,7 +1010,7 @@ def test_actor_multiple_gpus(ray_start_cluster): def get_location_and_ids(self): assert ray.get_gpu_ids() == self.gpu_ids - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) # Create some actors. @@ -1042,7 +1041,7 @@ def test_actor_multiple_gpus(ray_start_cluster): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) # Create some actors. @@ -1080,7 +1079,7 @@ def test_actor_different_numbers_of_gpus(ray_start_cluster): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) # Create some actors. @@ -1126,8 +1125,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): - return (( - ray.worker.global_worker.plasma_client.store_socket_name), + return ((ray.worker.global_worker.node.unique_id), tuple(self.gpu_ids)) def sleep(self): @@ -1173,7 +1171,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) # All the GPUs should be used up now. @@ -1217,8 +1215,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 1 assert gpu_ids[0] in range(num_gpus_per_raylet) - return (ray.worker.global_worker.plasma_client.store_socket_name, - tuple(gpu_ids), [t1, t2]) + return (ray.worker.global_worker.node.unique_id, tuple(gpu_ids), + [t1, t2]) @ray.remote(num_gpus=2) def f2(): @@ -1229,8 +1227,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): assert len(gpu_ids) == 2 assert gpu_ids[0] in range(num_gpus_per_raylet) assert gpu_ids[1] in range(num_gpus_per_raylet) - return (ray.worker.global_worker.plasma_client.store_socket_name, - tuple(gpu_ids), [t1, t2]) + return (ray.worker.global_worker.node.unique_id, tuple(gpu_ids), + [t1, t2]) @ray.remote(num_gpus=1) class Actor1(object): @@ -1241,7 +1239,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): def get_location_and_ids(self): assert ray.get_gpu_ids() == self.gpu_ids - return (ray.worker.global_worker.plasma_client.store_socket_name, + return (ray.worker.global_worker.node.unique_id, tuple(self.gpu_ids)) def locations_to_intervals_for_many_tasks(): @@ -1416,8 +1414,8 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): def __init__(self): self.x = 0 - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id def inc(self): self.x += 1 @@ -1425,8 +1423,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): # Create an actor that is not on the raylet. actor = Counter.remote() - while (ray.get(actor.local_plasma.remote()) != - remote_node.plasma_store_socket_name): + while (ray.get(actor.node_id.remote()) != remote_node.unique_id): actor = Counter.remote() # Kill the second node. @@ -1526,8 +1523,8 @@ def setup_counter_actor(test_checkpoint=False, self.save_exception = save_exception self.restored = False - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id def inc(self, *xs): self.x += 1 @@ -1554,11 +1551,11 @@ def setup_counter_actor(test_checkpoint=False, self.num_inc_calls = 0 self.restored = True - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id # Create an actor that is not on the raylet. actor = Counter.remote(save_exception) - while ray.get(actor.local_plasma.remote()) == local_plasma: + while ray.get(actor.node_id.remote()) == node_id: actor = Counter.remote(save_exception) args = [ray.put(0) for _ in range(100)] @@ -1689,8 +1686,8 @@ def _test_nondeterministic_reconstruction( def __init__(self): self.queue = [] - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id def push(self, item): self.queue.append(item) @@ -1699,9 +1696,9 @@ def _test_nondeterministic_reconstruction( return self.queue # Schedule the shared queue onto the remote raylet. - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id actor = Queue.remote() - while ray.get(actor.local_plasma.remote()) == local_plasma: + while ray.get(actor.node_id.remote()) == node_id: actor = Queue.remote() # A task that takes in the shared queue and a list of items to enqueue, @@ -2066,14 +2063,14 @@ def test_custom_label_placement(ray_start_cluster): @ray.remote(resources={"CustomResource1": 1}) class ResourceActor1(object): def get_location(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource2": 1}) class ResourceActor2(object): def get_location(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id # Create some actors. actors1 = [ResourceActor1.remote() for _ in range(2)] @@ -2081,9 +2078,9 @@ def test_custom_label_placement(ray_start_cluster): locations1 = ray.get([a.get_location.remote() for a in actors1]) locations2 = ray.get([a.get_location.remote() for a in actors2]) for location in locations1: - assert location == local_plasma + assert location == node_id for location in locations2: - assert location != local_plasma + assert location != node_id def test_creating_more_actors_than_resources(shutdown_only): @@ -2225,21 +2222,15 @@ def test_actor_reconstruction(ray_start_regular): def test_actor_reconstruction_without_task(ray_start_regular): """Test a dead actor can be reconstructed without sending task to it.""" - def object_exists(obj_id): - """Check wether an object exists in plasma store.""" - plasma_client = ray.worker.global_worker.plasma_client - plasma_id = plasma.ObjectID(obj_id.binary()) - return plasma_client.get( - plasma_id, timeout_ms=0) != plasma.ObjectNotAvailable - @ray.remote(max_reconstructions=1) class ReconstructableActor(object): def __init__(self, obj_ids): for obj_id in obj_ids: # Every time the actor gets constructed, # put a new object in plasma store. - if not object_exists(obj_id): - ray.worker.global_worker.put_object(obj_id, 1) + global_worker = ray.worker.global_worker + if not global_worker.core_worker.object_exists(obj_id): + global_worker.put_object(obj_id, 1) break def get_pid(self): @@ -2252,7 +2243,8 @@ def test_actor_reconstruction_without_task(ray_start_regular): os.kill(pid, signal.SIGKILL) # Wait until the actor is reconstructed. assert wait_for_condition( - lambda: object_exists(obj_ids[1]), timeout_ms=5000) + lambda: ray.worker.global_worker.core_worker.object_exists(obj_ids[1]), + timeout_ms=5000) def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): @@ -2271,10 +2263,10 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): }), ) - def kill_node(object_store_socket): + def kill_node(node_id): node_to_remove = None for node in cluster.worker_nodes: - if object_store_socket == node.plasma_store_socket_name: + if node_id == node.unique_id: node_to_remove = node cluster.remove_node(node_to_remove) @@ -2288,7 +2280,7 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): return self.value def get_object_store_socket(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id actor = MyActor.remote() # Call increase 3 times. @@ -2481,8 +2473,7 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, remote_node = [node for node in cluster.worker_nodes] actor_cls = ray.remote(max_reconstructions=1)(ray_checkpointable_actor_cls) actor = actor_cls.remote() - while (ray.get(actor.local_plasma.remote()) != - remote_node[0].plasma_store_socket_name): + while (ray.get(actor.node_id.remote()) != remote_node[0].unique_id): actor = actor_cls.remote() # Call increase several times. @@ -2725,8 +2716,8 @@ def test_ray_wait_dead_actor(ray_start_cluster): def __init__(self): pass - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id def ping(self): time.sleep(1) @@ -2743,8 +2734,7 @@ def test_ray_wait_dead_actor(ray_start_cluster): remote_node = cluster.list_all_nodes()[-1] remote_ping_id = None for i, actor in enumerate(actors): - if ray.get(actor.local_plasma.remote() - ) == remote_node.plasma_store_socket_name: + if ray.get(actor.node_id.remote()) == remote_node.unique_id: remote_ping_id = ping_ids[i] ray.internal.free([remote_ping_id], local_only=True) cluster.remove_node(remote_node) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c63f93bc1..3f48851ec 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1538,7 +1538,7 @@ def test_free_objects_multi_node(ray_start_cluster): class RawActor(object): def get(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id ActorOnNode0 = ray.remote(resources={"Custom0": 1})(RawActor) ActorOnNode1 = ray.remote(resources={"Custom1": 1})(RawActor) @@ -1585,7 +1585,7 @@ def test_free_objects_multi_node(ray_start_cluster): assert len(l1) == 2 assert len(l2) == 1 # The deleted object will have the same store with the driver. - local_return = ray.worker.global_worker.plasma_client.store_socket_name + local_return = ray.worker.global_worker.node.unique_id for object_id in l1: assert ray.get(object_id) != local_return @@ -1998,16 +1998,16 @@ def test_zero_cpus_actor(ray_start_cluster): cluster.add_node(num_cpus=2) ray.init(address=cluster.address) - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id @ray.remote class Foo(object): def method(self): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id # Make sure tasks and actors run on the remote raylet. a = Foo.remote() - assert ray.get(a.method.remote()) != local_plasma + assert ray.get(a.method.remote()) != node_id def test_fractional_resources(shutdown_only): @@ -2080,32 +2080,32 @@ def test_multiple_raylets(ray_start_cluster): # This must be run on the zeroth raylet. @ray.remote(num_cpus=11) def run_on_0(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name # This must be run on the first raylet. @ray.remote(num_gpus=2) def run_on_1(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name # This must be run on the second raylet. @ray.remote(num_cpus=6, num_gpus=1) def run_on_2(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name # This can be run anywhere. @ray.remote(num_cpus=0, num_gpus=0) def run_on_0_1_2(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name # This must be run on the first or second raylet. @ray.remote(num_gpus=1) def run_on_1_2(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name # This must be run on the zeroth or second raylet. @ray.remote(num_cpus=8) def run_on_0_2(): - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.plasma_store_socket_name def run_lots_of_tasks(): names = [] @@ -2196,27 +2196,27 @@ def test_custom_resources(ray_start_cluster): @ray.remote def f(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource": 1}) def g(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource": 1}) def h(): ray.get([f.remote() for _ in range(5)]) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id # The f tasks should be scheduled on both raylets. assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2 - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id # The g tasks should be scheduled only on the second raylet. raylet_ids = set(ray.get([g.remote() for _ in range(50)])) assert len(raylet_ids) == 1 - assert list(raylet_ids)[0] != local_plasma + assert list(raylet_ids)[0] != node_id # Make sure that resource bookkeeping works when a task that uses a # custom resources gets blocked. @@ -2240,38 +2240,38 @@ def test_two_custom_resources(ray_start_cluster): @ray.remote(resources={"CustomResource1": 1}) def f(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource2": 1}) def g(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource1": 1, "CustomResource2": 3}) def h(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource1": 4}) def j(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id @ray.remote(resources={"CustomResource3": 1}) def k(): time.sleep(0.001) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id # The f and g tasks should be scheduled on both raylets. assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2 assert len(set(ray.get([g.remote() for _ in range(50)]))) == 2 - local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + node_id = ray.worker.global_worker.node.unique_id # The h tasks should be scheduled only on the second raylet. raylet_ids = set(ray.get([h.remote() for _ in range(50)])) assert len(raylet_ids) == 1 - assert list(raylet_ids)[0] != local_plasma + assert list(raylet_ids)[0] != node_id # Make sure that tasks with unsatisfied custom resource requirements do # not get scheduled. @@ -2473,7 +2473,7 @@ def test_load_balancing(ray_start_cluster): @ray.remote def f(): time.sleep(0.01) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id attempt_to_load_balance(f, [], 100, num_nodes, 10) attempt_to_load_balance(f, [], 1000, num_nodes, 100) @@ -2491,7 +2491,7 @@ def test_load_balancing_with_dependencies(ray_start_cluster): @ray.remote def f(x): time.sleep(0.010) - return ray.worker.global_worker.plasma_client.store_socket_name + return ray.worker.global_worker.node.unique_id # This object will be local to one of the raylets. Make sure # this doesn't prevent tasks from being scheduled on other raylets. @@ -2820,8 +2820,7 @@ def test_wait_reconstruction(shutdown_only): x_id = f.remote() ray.wait([x_id]) ray.wait([f.remote()]) - assert not ray.worker.global_worker.plasma_client.contains( - ray.pyarrow.plasma.ObjectID(x_id.binary())) + assert not ray.worker.global_worker.core_worker.object_exists(x_id) ready_ids, _ = ray.wait([x_id]) assert len(ready_ids) == 1 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 4319c12c8..7b9a010dd 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -4,7 +4,6 @@ from __future__ import print_function import json import os -import pyarrow.plasma as plasma import pytest import sys import tempfile @@ -767,7 +766,7 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): "object_store_memory": 10**8 }], indirect=True) -def test_fill_plasma_exception(ray_start_cluster_head): +def test_fill_object_store_exception(ray_start_cluster_head): @ray.remote class LargeMemoryActor(object): def some_expensive_task(self): @@ -782,5 +781,5 @@ def test_fill_plasma_exception(ray_start_cluster_head): # Make sure actor does not die ray.get(actor.test.remote()) - with pytest.raises(plasma.PlasmaStoreFull): + with pytest.raises(ray.exceptions.ObjectStoreFullError): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index bff1f7e2f..75d40b661 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -2,12 +2,11 @@ import numpy as np import unittest import ray -import pyarrow MB = 1024 * 1024 OBJECT_EVICTED = ray.exceptions.UnreconstructableError -OBJECT_TOO_LARGE = pyarrow._plasma.PlasmaStoreFull +OBJECT_TOO_LARGE = ray.exceptions.ObjectStoreFullError @ray.remote @@ -77,7 +76,7 @@ class TestMemoryLimits(unittest.TestCase): print("Raised exception", type(e), e) raise e finally: - print(ray.worker.global_worker.plasma_client.debug_string()) + print(ray.worker.global_worker.dump_object_store_memory_usage()) ray.shutdown() diff --git a/python/ray/tests/test_memory_scheduling.py b/python/ray/tests/test_memory_scheduling.py index ad39a7e1b..36faa8075 100644 --- a/python/ray/tests/test_memory_scheduling.py +++ b/python/ray/tests/test_memory_scheduling.py @@ -145,7 +145,7 @@ class TestMemoryScheduling(unittest.TestCase): resources_per_trial={"object_store_memory": 150 * 1024 * 1024}, raise_on_failed_trial=False) self.assertTrue(result.trials[0].status, "ERROR") - self.assertTrue("PlasmaStoreFull: object does not fit" in + self.assertTrue("ObjectStoreFullError: Failed to put" in result.trials[0].error_msg) finally: ray.shutdown() diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 5dd3a2a8a..61b2a7a16 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -237,8 +237,8 @@ def test_object_transfer_retry(ray_start_cluster): x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]] assert not any( - ray.worker.global_worker.plasma_client.contains( - ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) + ray.worker.global_worker.core_worker.object_exists(x_id) + for x_id in x_ids) # Get the objects locally to cause them to be transferred. This is the # first time the objects are getting transferred, so it should happen @@ -257,8 +257,8 @@ def test_object_transfer_retry(ray_start_cluster): for _ in range(15): ray.put(x) assert not any( - ray.worker.global_worker.plasma_client.contains( - ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) + ray.worker.global_worker.core_worker.object_exists(x_id) + for x_id in x_ids) end_time = time.time() # Make sure that the first time the objects get transferred, it happens @@ -277,8 +277,8 @@ def test_object_transfer_retry(ray_start_cluster): for _ in range(15): ray.put(x) assert not any( - ray.worker.global_worker.plasma_client.contains( - ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) + ray.worker.global_worker.core_worker.object_exists(x_id) + for x_id in x_ids) time.sleep(repeated_push_delay) diff --git a/python/ray/tests/test_signal.py b/python/ray/tests/test_signal.py index 176fbd45b..85e6cc6a5 100644 --- a/python/ray/tests/test_signal.py +++ b/python/ray/tests/test_signal.py @@ -281,16 +281,15 @@ def test_signal_on_node_failure(two_node_cluster): def __init__(self): pass - def local_plasma(self): - return ray.worker.global_worker.plasma_client.store_socket_name + def node_id(self): + return ray.worker.global_worker.node.unique_id # Place the actor on the remote node. cluster, remote_node = two_node_cluster actor_cls = ray.remote(max_reconstructions=0)(ActorSignal) actor = actor_cls.remote() # Try until we put an actor on a different node. - while (ray.get(actor.local_plasma.remote()) != - remote_node.plasma_store_socket_name): + while (ray.get(actor.node_id.remote()) != remote_node.unique_id): actor = actor_cls.remote() # Kill actor process. diff --git a/python/ray/utils.py b/python/ray/utils.py index 1914b2615..baada7b56 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -4,7 +4,6 @@ from __future__ import print_function import binascii import errno -import functools import hashlib import inspect import logging @@ -518,60 +517,6 @@ def check_oversized_pickle(pickled, name, obj_type, worker): job_id=worker.current_job_id) -class _ThreadSafeProxy(object): - """This class is used to create a thread-safe proxy for a given object. - Every method call will be guarded with a lock. - - Attributes: - orig_obj (object): the original object. - lock (threading.Lock): the lock object. - _wrapper_cache (dict): a cache from original object's methods to - the proxy methods. - """ - - def __init__(self, orig_obj, lock): - self.orig_obj = orig_obj - self.lock = lock - self._wrapper_cache = {} - - def __getattr__(self, attr): - orig_attr = getattr(self.orig_obj, attr) - if not callable(orig_attr): - # If the original attr is a field, just return it. - return orig_attr - else: - # If the orginal attr is a method, - # return a wrapper that guards the original method with a lock. - wrapper = self._wrapper_cache.get(attr) - if wrapper is None: - - @functools.wraps(orig_attr) - def _wrapper(*args, **kwargs): - with self.lock: - return orig_attr(*args, **kwargs) - - self._wrapper_cache[attr] = _wrapper - wrapper = _wrapper - return wrapper - - -def thread_safe_client(client, lock=None): - """Create a thread-safe proxy which locks every method call - for the given client. - - Args: - client: the client object to be guarded. - lock: the lock object that will be used to lock client's methods. - If None, a new lock will be used. - - Returns: - A thread-safe proxy for the given client. - """ - if lock is None: - lock = threading.Lock() - return _ThreadSafeProxy(client, lock) - - def is_main_thread(): return threading.current_thread().getName() == "MainThread" diff --git a/python/ray/worker.py b/python/ray/worker.py index cb9286799..e34616788 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -55,6 +55,7 @@ from ray.exceptions import ( RayError, RayTaskError, RayWorkerError, + ObjectStoreFullError, UnreconstructableError, RAY_EXCEPTION_TYPES, ) @@ -67,7 +68,6 @@ from ray.utils import ( check_oversized_pickle, is_cython, setup_logger, - thread_safe_client, ) from ray.local_mode_manager import LocalModeManager @@ -312,18 +312,15 @@ class Worker(object): # If the object is a byte array, skip serializing it and # use a special metadata to indicate it's raw binary. So # that this object can also be read by Java. - self.plasma_client.put_raw_buffer( - value, - object_id=pyarrow.plasma.ObjectID(object_id.binary()), - metadata=ray_constants.RAW_BUFFER_METADATA, - memcopy_threads=self.memcopy_threads) + self.core_worker.put_raw_buffer( + value, object_id, memcopy_threads=self.memcopy_threads) else: - self.plasma_client.put( - value, - object_id=pyarrow.plasma.ObjectID(object_id.binary()), - memcopy_threads=self.memcopy_threads, - serialization_context=self.get_serialization_context( - self.current_job_id)) + serialization_context = self.get_serialization_context( + self.current_job_id) + self.core_worker.put_serialized_object( + pyarrow.serialize(value, serialization_context), + object_id, + memcopy_threads=self.memcopy_threads) break except pyarrow.SerializationCallbackError as e: try: @@ -377,9 +374,9 @@ class Worker(object): value: The value to put in the object store. Raises: - plasma.PlasmaStoreFull: This is raised if the attempt to store the - object fails because the object store is full even after - multiple retries. + ray.exceptions.ObjectStoreFullError: This is raised if the attempt + to store the object fails because the object store is full even + after multiple retries. """ # Make sure that the value is not an object ID. if isinstance(value, ObjectID): @@ -396,7 +393,7 @@ class Worker(object): try: self._try_store_and_register(object_id, value) break - except pyarrow.plasma.PlasmaStoreFull as plasma_exc: + except ObjectStoreFullError as e: if attempt: logger.warning("Waiting {} seconds for space to free up " "in the object store.".format(delay)) @@ -404,13 +401,12 @@ class Worker(object): delay *= 2 else: self.dump_object_store_memory_usage() - raise plasma_exc + raise e def dump_object_store_memory_usage(self): """Prints object store debug string to stdout.""" - msg = "\n" + self.plasma_client.debug_string() - msg = msg.replace("\n", "\nplasma: ") - logger.warning("Local object store memory usage:\n{}\n".format(msg)) + logger.warning("Local object store memory usage:\n{}\n".format( + self.core_worker.object_store_memory_usage_string())) def _try_store_and_register(self, object_id, value): """Wraps `store_and_register` with cases for existence and pickling. @@ -422,14 +418,6 @@ class Worker(object): """ try: self.store_and_register(object_id, value) - except pyarrow.plasma.PlasmaObjectExists: - # The object already exists in the object store, so there is no - # need to add it again. TODO(rkn): We need to compare hashes - # and make sure that the objects are in fact the same. We also - # should return an error code to caller instead of printing a - # message. - logger.info("The object with ID {} already exists " - "in the object store.".format(object_id)) except TypeError: # TypeError can happen because one of the members of the object # may not be serializable for cloudpickle. So we need @@ -442,36 +430,25 @@ class Worker(object): logger.warning(warning_message) self.store_and_register(object_id, value) - def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10): + def retrieve_and_deserialize(self, object_ids, error_timeout=10): + data_metadata_pairs = self.core_worker.get_objects( + object_ids, self.current_task_id) + assert len(data_metadata_pairs) == len(object_ids) + start_time = time.time() - # Only send the warning once. - warning_sent = False serialization_context = self.get_serialization_context( self.current_job_id) - while True: + results = [] + warning_sent = False + i = 0 + while i < len(object_ids): + object_id = object_ids[i] + data, metadata = data_metadata_pairs[i] try: - # We divide very large get requests into smaller get requests - # so that a single get request doesn't block the store for a - # long time, if the store is blocked, it can block the manager - # as well as a consequence. - results = [] - batch_size = ray._config.worker_fetch_request_size() - for i in range(0, len(object_ids), batch_size): - metadata_data_pairs = self.plasma_client.get_buffers( - object_ids[i:i + batch_size], - timeout, - with_meta=True, - ) - for j in range(len(metadata_data_pairs)): - metadata, data = metadata_data_pairs[j] - results.append( - self._deserialize_object_from_arrow( - data, - metadata, - object_ids[i + j], - serialization_context, - )) - return results + results.append( + self._deserialize_object_from_arrow( + data, metadata, object_id, serialization_context)) + i += 1 except pyarrow.DeserializationCallbackError: # Wait a little bit for the import thread to import the class. # If we currently have the worker lock, we need to release it @@ -492,11 +469,15 @@ class Worker(object): job_id=self.current_job_id) warning_sent = True + return results + def _deserialize_object_from_arrow(self, data, metadata, object_id, serialization_context): if metadata: # Check if the object should be returned as raw bytes. if metadata == ray_constants.RAW_BUFFER_METADATA: + if data is None: + return b"" return data.to_pybytes() # Otherwise, return an exception object based on # the error type. @@ -511,16 +492,13 @@ class Worker(object): assert False, "Unrecognized error type " + str(error_type) elif data: # If data is not empty, deserialize the object. - # Note, the lock is needed because `serialization_context` isn't - # thread-safe. - with self.plasma_client.lock: - return pyarrow.deserialize(data, serialization_context) + return pyarrow.deserialize(data, serialization_context) else: # Object isn't available in plasma. return plasma.ObjectNotAvailable - def get_object(self, object_ids): - """Get the value or values in the object store associated with the IDs. + def get_objects(self, object_ids): + """Get the values in the object store associated with the IDs. Return the values from the local object store for object_ids. This will block until all the values for object_ids have been written to the @@ -542,72 +520,11 @@ class Worker(object): "which is not an ray.ObjectID.".format(object_id)) if self.mode == LOCAL_MODE: - return self.local_mode_manager.get_object(object_ids) + return self.local_mode_manager.get_objects(object_ids) - # Do an initial fetch for remote objects. We divide the fetch into - # smaller fetches so as to not block the manager for a prolonged period - # of time in a single call. - plain_object_ids = [ - plasma.ObjectID(object_id.binary()) for object_id in object_ids - ] - for i in range(0, len(object_ids), - ray._config.worker_fetch_request_size()): - self.raylet_client.fetch_or_reconstruct( - object_ids[i:(i + ray._config.worker_fetch_request_size())], - True) - - # Get the objects. We initially try to get the objects immediately. - final_results = self.retrieve_and_deserialize(plain_object_ids, 0) - # Construct a dictionary mapping object IDs that we haven't gotten yet - # to their original index in the object_ids argument. - unready_ids = { - plain_object_ids[i].binary(): i - for (i, val) in enumerate(final_results) - if val is plasma.ObjectNotAvailable - } - - if len(unready_ids) > 0: - # Try reconstructing any objects we haven't gotten yet. Try to - # get them until at least get_timeout_milliseconds - # milliseconds passes, then repeat. - while len(unready_ids) > 0: - object_ids_to_fetch = [ - plasma.ObjectID(unready_id) - for unready_id in unready_ids.keys() - ] - ray_object_ids_to_fetch = [ - ObjectID(unready_id) for unready_id in unready_ids.keys() - ] - fetch_request_size = ray._config.worker_fetch_request_size() - for i in range(0, len(object_ids_to_fetch), - fetch_request_size): - self.raylet_client.fetch_or_reconstruct( - ray_object_ids_to_fetch[i:(i + fetch_request_size)], - False, - self.current_task_id, - ) - results = self.retrieve_and_deserialize( - object_ids_to_fetch, - max([ - ray._config.get_timeout_milliseconds(), - int(0.01 * len(unready_ids)), - ]), - ) - # Remove any entries for objects we received during this - # iteration so we don't retrieve the same object twice. - for i, val in enumerate(results): - if val is not plasma.ObjectNotAvailable: - object_id = object_ids_to_fetch[i].binary() - index = unready_ids[object_id] - final_results[index] = val - unready_ids.pop(object_id) - - # If there were objects that we weren't able to get locally, - # let the raylet know that we're now unblocked. - self.raylet_client.notify_unblocked(self.current_task_id) - - assert len(final_results) == len(object_ids) - return final_results + results = self.retrieve_and_deserialize(object_ids) + assert len(results) == len(object_ids) + return results def submit_task(self, function_descriptor, @@ -859,7 +776,7 @@ class Worker(object): # Get the objects from the local object store. if len(object_ids) > 0: - values = self.get_object(object_ids) + values = self.get_objects(object_ids) for i, value in enumerate(values): if isinstance(value, RayError): raise value @@ -893,8 +810,7 @@ class Worker(object): raise Exception("Returning an actor handle from a remote " "function is not allowed).") if outputs[i] is ray.experimental.no_return.NoReturn: - if not self.plasma_client.contains( - pyarrow.plasma.ObjectID(object_ids[i].binary())): + if not self.core_worker.object_exists(object_ids[i]): raise RuntimeError( "Attempting to return 'ray.experimental.NoReturn' " "from a remote function, but the corresponding " @@ -923,12 +839,14 @@ class Worker(object): # needed so that if the task throws an exception, we propagate # the error message to the correct driver. self.current_job_id = task.job_id() + self.core_worker.set_current_job_id(task.job_id()) else: # If this worker is an actor, current_job_id wasn't reset. # Check that current task's driver ID equals the previous one. assert self.current_job_id == task.job_id() self.task_context.current_task_id = task.task_id() + self.core_worker.set_current_task_id(task.task_id()) function_descriptor = FunctionDescriptor.from_bytes_list( task.function_descriptor_list()) @@ -972,7 +890,7 @@ class Worker(object): ray_constants.from_memory_units( task.required_resources()["memory"])) if "object_store_memory" in task.required_resources(): - self._set_plasma_client_options( + self._set_object_store_client_options( worker_name, int( ray_constants.from_memory_units( @@ -1007,20 +925,21 @@ class Worker(object): function_descriptor, return_object_ids, e, ray.utils.format_error_message(traceback.format_exc())) - def _set_plasma_client_options(self, client_name, object_store_memory): + def _set_object_store_client_options(self, name, object_store_memory): try: logger.debug("Setting plasma memory limit to {} for {}".format( - object_store_memory, client_name)) - self.plasma_client.set_client_options(client_name, - object_store_memory) - except pyarrow._plasma.PlasmaStoreFull: + object_store_memory, name)) + self.core_worker.set_object_store_client_options( + name.encode("ascii"), object_store_memory) + except RayError as e: self.dump_object_store_memory_usage() raise memory_monitor.RayOutOfMemoryError( "Failed to set object_store_memory={} for {}. The " "plasma store may have insufficient memory remaining " "to satisfy this limit (30% of object store memory is " - "permanently reserved for shared usage).".format( - object_store_memory, client_name)) + "permanently reserved for shared usage). The current " + "object store memory status is:\n\n{}".format( + object_store_memory, name, e)) def _handle_process_task_failure(self, function_descriptor, return_object_ids, error, backtrace): @@ -1092,6 +1011,7 @@ class Worker(object): self._process_task(task, execution_info) # Reset the state fields so the next task can run. self.task_context.current_task_id = TaskID.nil() + self.core_worker.set_current_task_id(TaskID.nil()) self.task_context.task_index = 0 self.task_context.put_index = 1 if self.actor_id.is_nil(): @@ -1099,6 +1019,7 @@ class Worker(object): # actor. Because the following tasks should all have the # same driver id. self.current_job_id = WorkerID.nil() + self.core_worker.set_current_job_id(JobID.nil()) # Reset signal counters so that the next task can get # all past signals. ray_signal.reset() @@ -1110,7 +1031,7 @@ class Worker(object): reached_max_executions = (self.function_actor_manager.get_task_counter( job_id, function_descriptor) == execution_info.max_calls) if reached_max_executions: - self.raylet_client.disconnect() + self.core_worker.disconnect() sys.exit(0) def _get_next_task_from_raylet(self): @@ -1967,14 +1888,6 @@ def connect(node, else: raise Exception("This code should be unreachable.") - # Create an object store client. - worker.plasma_client = thread_safe_client( - plasma.connect(node.plasma_store_socket_name, None, 0, 300)) - - if driver_object_store_memory is not None: - worker._set_plasma_client_options("ray_driver_{}".format(os.getpid()), - driver_object_store_memory) - # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. if mode == SCRIPT_MODE: @@ -2036,12 +1949,27 @@ def connect(node, # driver task. worker.task_context.current_task_id = driver_task_spec.task_id() - worker.raylet_client = ray._raylet.RayletClient( - node.raylet_socket_name, - WorkerID(worker.worker_id), - (mode == WORKER_MODE), - worker.current_job_id, + redis_address, redis_port = node.redis_address.split(":") + gcs_options = ray._raylet.GcsClientOptions( + redis_address, + int(redis_port), + node.redis_password, ) + worker.core_worker = ray._raylet.CoreWorker( + (mode == SCRIPT_MODE), + node.plasma_store_socket_name, + node.raylet_socket_name, + worker.current_job_id, + gcs_options, + node.get_logs_dir_path(), + ) + worker.core_worker.set_current_job_id(worker.current_job_id) + worker.core_worker.set_current_task_id(worker.current_task_id) + worker.raylet_client = ray._raylet.RayletClient(worker.core_worker) + + if driver_object_store_memory is not None: + worker._set_object_store_client_options( + "ray_driver_{}".format(os.getpid()), driver_object_store_memory) # Start the import thread worker.import_thread = import_thread.ImportThread(worker, mode, @@ -2141,8 +2069,8 @@ def disconnect(): if hasattr(worker, "raylet_client"): del worker.raylet_client - if hasattr(worker, "plasma_client"): - worker.plasma_client.disconnect() + if hasattr(worker, "core_worker"): + del worker.core_worker @contextmanager @@ -2331,7 +2259,7 @@ def get(object_ids): "or a list of object IDs.") global last_task_error_raise_time - values = worker.get_object(object_ids) + values = worker.get_objects(object_ids) for i, value in enumerate(values): if isinstance(value, RayError): last_task_error_raise_time = time.time() @@ -2376,7 +2304,7 @@ def put(value, weakref=False): ) try: worker.put_object(object_id, value) - except pyarrow.plasma.PlasmaStoreFull: + except ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " "store was full of pinned objects. If you are putting " @@ -2387,10 +2315,14 @@ def put(value, weakref=False): worker.task_context.put_index += 1 # Pin the object buffer with the returned id. This avoids put returns # from getting evicted out from under the id. + # TODO(edoakes): we should be able to avoid this extra IPC by holding + # a reference to the buffer created when putting the object, but the + # buffer returned by the plasma store create method doesn't prevent + # the object from being evicted. if not weakref and not worker.mode == LOCAL_MODE: object_id.set_buffer_ref( - worker.plasma_client.get_buffers( - [pyarrow.plasma.ObjectID(object_id.binary())])) + worker.core_worker.get_objects([object_id], + worker.current_task_id)) return object_id @@ -2479,11 +2411,10 @@ def wait(object_ids, num_returns=1, timeout=None): timeout = timeout if timeout is not None else 10**6 timeout_milliseconds = int(timeout * 1000) - ready_ids, remaining_ids = worker.raylet_client.wait( + ready_ids, remaining_ids = worker.core_worker.wait( object_ids, num_returns, timeout_milliseconds, - False, worker.current_task_id, ) return ready_ids, remaining_ids diff --git a/rllib/utils/actors.py b/rllib/utils/actors.py index 8907aa5c9..8c4929949 100644 --- a/rllib/utils/actors.py +++ b/rllib/utils/actors.py @@ -40,7 +40,6 @@ class TaskPool(object): Assumes obj_id only is one id.""" for worker, obj_id in self.completed(blocking_wait=blocking_wait): - plasma_id = ray.pyarrow.plasma.ObjectID(obj_id.binary()) (ray.worker.global_worker.raylet_client.fetch_or_reconstruct( [obj_id], True)) self._fetching.append((worker, obj_id)) @@ -48,10 +47,9 @@ class TaskPool(object): remaining = [] num_yielded = 0 for worker, obj_id in self._fetching: - plasma_id = ray.pyarrow.plasma.ObjectID(obj_id.binary()) if (num_yielded < max_yield - and ray.worker.global_worker.plasma_client.contains( - plasma_id)): + and ray.worker.global_worker.core_worker.object_exists( + obj_id)): yield (worker, obj_id) num_yielded += 1 else: diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 01abacde6..9c6dcef68 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -59,6 +59,12 @@ std::string Status::CodeAsString() const { case StatusCode::IOError: type = "IOError"; break; + case StatusCode::ObjectExists: + type = "ObjectExists"; + break; + case StatusCode::ObjectStoreFull: + type = "ObjectStoreFull"; + break; case StatusCode::UnknownError: type = "Unknown error"; break; diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 4c4cb972a..3a8d2f0f4 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -75,9 +75,10 @@ enum class StatusCode : char { Invalid = 4, IOError = 5, ObjectExists = 6, + ObjectStoreFull = 7, UnknownError = 9, NotImplemented = 10, - RedisError = 11 + RedisError = 11, }; #if defined(__clang__) @@ -129,14 +130,18 @@ class RAY_EXPORT Status { return Status(StatusCode::IOError, msg); } - static Status RedisError(const std::string &msg) { - return Status(StatusCode::RedisError, msg); - } - static Status ObjectExists(const std::string &msg) { return Status(StatusCode::ObjectExists, msg); } + static Status ObjectStoreFull(const std::string &msg) { + return Status(StatusCode::ObjectStoreFull, msg); + } + + static Status RedisError(const std::string &msg) { + return Status(StatusCode::RedisError, msg); + } + // Returns true iff the status indicates success. bool ok() const { return (state_ == NULL); } @@ -144,11 +149,12 @@ class RAY_EXPORT Status { bool IsKeyError() const { return code() == StatusCode::KeyError; } bool IsInvalid() const { return code() == StatusCode::Invalid; } bool IsIOError() const { return code() == StatusCode::IOError; } + bool IsObjectExists() const { return code() == StatusCode::ObjectExists; } + bool IsObjectStoreFull() const { return code() == StatusCode::ObjectStoreFull; } bool IsTypeError() const { return code() == StatusCode::TypeError; } bool IsUnknownError() const { return code() == StatusCode::UnknownError; } bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; } bool IsRedisError() const { return code() == StatusCode::RedisError; } - bool IsObjectExists() const { return code() == StatusCode::ObjectExists; } // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 39d4ec3a5..82f3c80bb 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -74,6 +74,14 @@ const TaskID &WorkerContext::GetCurrentTaskID() const { return GetThreadContext().GetCurrentTaskID(); } +// TODO(edoakes): remove this once Python core worker uses the task interfaces. +void WorkerContext::SetCurrentJobId(const JobID &job_id) { current_job_id_ = job_id; } + +// TODO(edoakes): remove this once Python core worker uses the task interfaces. +void WorkerContext::SetCurrentTaskId(const TaskID &task_id) { + GetThreadContext().SetCurrentTaskId(task_id); +} + void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { current_job_id_ = task_spec.JobId(); GetThreadContext().SetCurrentTask(task_spec); diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 3c53e415e..d552c59ff 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -20,6 +20,12 @@ class WorkerContext { const TaskID &GetCurrentTaskID() const; + // TODO(edoakes): remove this once Python core worker uses the task interfaces. + void SetCurrentJobId(const JobID &job_id); + + // TODO(edoakes): remove this once Python core worker uses the task interfaces. + void SetCurrentTaskId(const TaskID &task_id); + void SetCurrentTask(const TaskSpecification &task_spec); std::shared_ptr GetCurrentTask() const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index fb01789c7..42d6fb7e3 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -7,30 +7,58 @@ CoreWorker::CoreWorker( const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, - const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback) + const std::string &log_dir, + const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, + bool use_memory_store) : worker_type_(worker_type), language_(language), raylet_socket_(raylet_socket), + log_dir_(log_dir), worker_context_(worker_type, job_id), io_work_(io_service_) { - // Initialize gcs client + // Initialize logging if log_dir is passed. Otherwise, it must be initialized + // and cleaned up by the caller. + if (!log_dir_.empty()) { + std::stringstream app_name; + if (language_ == Language::PYTHON) { + app_name << "python-"; + } else if (language == Language::JAVA) { + app_name << "java-"; + } + if (worker_type_ == WorkerType::DRIVER) { + app_name << "core-driver-" << worker_context_.GetWorkerID(); + } else { + app_name << "core-worker-" << worker_context_.GetWorkerID(); + } + RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, log_dir_); + RayLog::InstallFailureSignalHandler(); + } + + // Initialize gcs client. gcs_client_ = std::unique_ptr(new gcs::RedisGcsClient(gcs_options)); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); - object_interface_ = std::unique_ptr( - new CoreWorkerObjectInterface(worker_context_, raylet_client_, store_socket)); + object_interface_ = + std::unique_ptr(new CoreWorkerObjectInterface( + worker_context_, raylet_client_, store_socket, use_memory_store)); task_interface_ = std::unique_ptr(new CoreWorkerTaskInterface( worker_context_, raylet_client_, *object_interface_, io_service_, *gcs_client_)); + // Initialize task execution. int rpc_server_port = 0; if (worker_type_ == WorkerType::WORKER) { - RAY_CHECK(execution_callback != nullptr); - task_execution_interface_ = std::unique_ptr( - new CoreWorkerTaskExecutionInterface(worker_context_, raylet_client_, - *object_interface_, execution_callback)); - rpc_server_port = task_execution_interface_->worker_server_.GetPort(); + // TODO(edoakes): Remove this check once Python core worker migration is complete. + if (language != Language::PYTHON || execution_callback != nullptr) { + RAY_CHECK(execution_callback != nullptr); + task_execution_interface_ = std::unique_ptr( + new CoreWorkerTaskExecutionInterface(worker_context_, raylet_client_, + *object_interface_, execution_callback)); + rpc_server_port = task_execution_interface_->worker_server_.GetPort(); + } } + + // Initialize raylet client. // TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot // connect to Raylet after a number of retries, this can be changed later // so that the worker (java/python .etc) can retrieve and handle the error @@ -44,12 +72,20 @@ CoreWorker::CoreWorker( } CoreWorker::~CoreWorker() { - gcs_client_->Disconnect(); io_service_.stop(); io_thread_.join(); if (task_execution_interface_) { task_execution_interface_->Stop(); } + if (!log_dir_.empty()) { + RayLog::ShutDownRayLog(); + } +} + +void CoreWorker::Disconnect() { + if (gcs_client_) { + gcs_client_->Disconnect(); + } if (raylet_client_) { RAY_IGNORE_EXPR(raylet_client_->Disconnect()); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 249e78fe4..3fcdbfdbb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -20,16 +20,32 @@ class CoreWorker { /// Construct a CoreWorker instance. /// /// \param[in] worker_type Type of this worker. - /// \param[in] langauge Language of this worker. + /// \param[in] language Language of this worker. + /// \param[in] store_socket Object store socket to connect to. + /// \param[in] raylet_socket Raylet socket to connect to. + /// \param[in] job_id Job ID of this worker. + /// \param[in] gcs_options Options for the GCS client. + /// \param[in] log_dir Directory to write logs to. If this is empty, logs + /// won't be written to a file. + /// \param[in] execution_callback Language worker callback to execute tasks. + /// \param[in] use_memory_store Whether or not to use the in-memory object store + /// in addition to the plasma store. /// /// NOTE(zhijunfu): the constructor would throw if a failure happens. + /// NOTE(edoakes): the use_memory_store flag is a stop-gap solution to the issue + /// that randomly generated ObjectIDs may use the memory store + /// instead of the plasma store. CoreWorker(const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, - const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback); + const std::string &log_dir, + const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, + bool use_memory_store = true); ~CoreWorker(); + void Disconnect(); + /// Type of this worker. WorkerType GetWorkerType() const { return worker_type_; } @@ -55,44 +71,35 @@ class CoreWorker { return *task_execution_interface_; } + // TODO(edoakes): remove this once Python core worker uses the task interfaces. + void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); } + + // TODO(edoakes): remove this once Python core worker uses the task interfaces. + void SetCurrentTaskId(const TaskID &task_id) { + worker_context_.SetCurrentTaskId(task_id); + } + private: void StartIOService(); - /// Type of this worker. const WorkerType worker_type_; - - /// Language of this worker. const Language language_; - - /// raylet socket name. const std::string raylet_socket_; - - /// Worker context. + const std::string log_dir_; WorkerContext worker_context_; - /// event loop where the IO events are handled. e.g. async GCS operations. + /// Event loop where the IO events are handled. e.g. async GCS operations. boost::asio::io_service io_service_; - - /// keeps io_service_ alive. + /// Keeps the io_service_ alive. boost::asio::io_service::work io_work_; - /// The thread to handle IO events. std::thread io_thread_; - - /// Raylet client. std::unique_ptr raylet_client_; - - /// GCS client. std::unique_ptr gcs_client_; - - /// The `CoreWorkerTaskInterface` instance. std::unique_ptr task_interface_; - - /// The `CoreWorkerObjectInterface` instance. std::unique_ptr object_interface_; - /// The `CoreWorkerTaskExecutionInterface` instance. - /// This is only available if it's not a driver. + /// Only available if it's not a driver. std::unique_ptr task_execution_interface_; }; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index abd898f55..ea9082188 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -71,7 +71,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork try { auto core_worker = new ray::CoreWorker( static_cast(workerMode), ::Language::JAVA, native_store_socket, - native_raylet_socket, job_id, gcs_client_options, executor_func); + native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", executor_func); return reinterpret_cast(core_worker); } catch (const std::exception &e) { std::ostringstream oss; @@ -103,7 +103,9 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecut */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeDestroyCoreWorker( JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer) { - delete reinterpret_cast(nativeCoreWorkerPointer); + auto core_worker = reinterpret_cast(nativeCoreWorkerPointer); + core_worker->Disconnect(); + delete core_worker; } /* diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 9b2e3ea45..acdf844aa 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -8,7 +8,7 @@ namespace ray { // Group object ids according the the corresponding store providers. -void GroupObjectIdsByStoreProvider( +void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider( const std::vector &object_ids, EnumUnorderedMap> *results) { // There are two cases: @@ -25,7 +25,7 @@ void GroupObjectIdsByStoreProvider( // and are only used locally. // Thus we need to check whether this object is a task return object in additional // to whether it's from direct actor call before we can choose memory store provider. - if (object_id.IsReturnObject() && + if (use_memory_store_ && object_id.IsReturnObject() && object_id.GetTransportType() == static_cast(TaskTransportType::DIRECT_ACTOR)) { type = StoreProviderType::MEMORY; @@ -37,15 +37,22 @@ void GroupObjectIdsByStoreProvider( CoreWorkerObjectInterface::CoreWorkerObjectInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket) + const std::string &store_socket, bool use_memory_store) : worker_context_(worker_context), raylet_client_(raylet_client), store_socket_(store_socket), + use_memory_store_(use_memory_store), memory_store_(std::make_shared()) { AddStoreProvider(StoreProviderType::PLASMA); AddStoreProvider(StoreProviderType::MEMORY); } +Status CoreWorkerObjectInterface::SetClientOptions(std::string name, + int64_t limit_bytes) { + // Currently only the Plasma store supports client options. + return store_providers_[StoreProviderType::PLASMA]->SetClientOptions(name, limit_bytes); +} + Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) { ObjectID put_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), @@ -62,6 +69,18 @@ Status CoreWorkerObjectInterface::Put(const RayObject &object, return store_providers_[StoreProviderType::PLASMA]->Put(object, object_id); } +Status CoreWorkerObjectInterface::Create(const std::shared_ptr &metadata, + const size_t data_size, + const ObjectID &object_id, + std::shared_ptr *data) { + return store_providers_[StoreProviderType::PLASMA]->Create(metadata, data_size, + object_id, data); +} + +Status CoreWorkerObjectInterface::Seal(const ObjectID &object_id) { + return store_providers_[StoreProviderType::PLASMA]->Seal(object_id); +} + Status CoreWorkerObjectInterface::Get(const std::vector &ids, int64_t timeout_ms, std::vector> *results) { @@ -124,6 +143,11 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, return Status::OK(); } +Status CoreWorkerObjectInterface::Contains(const ObjectID &object_id, bool *has_object) { + // Currently only the Plasma store supports Contains(). + return store_providers_[StoreProviderType::PLASMA]->Contains(object_id, has_object); +} + Status CoreWorkerObjectInterface::Wait(const std::vector &ids, int num_objects, int64_t timeout_ms, std::vector *results) { (*results).resize(ids.size(), false); @@ -231,6 +255,11 @@ Status CoreWorkerObjectInterface::Delete(const std::vector &object_ids return Status::OK(); } +std::string CoreWorkerObjectInterface::MemoryUsageString() { + // Currently only the Plasma store returns a debug string. + return store_providers_[StoreProviderType::PLASMA]->MemoryUsageString(); +} + void CoreWorkerObjectInterface::AddStoreProvider(StoreProviderType type) { store_providers_.emplace(type, CreateStoreProvider(type)); } diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 227f6cdae..90665d69e 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -15,12 +15,24 @@ class CoreWorker; class CoreWorkerStoreProvider; class CoreWorkerMemoryStore; -/// The interface that contains all `CoreWorker` methods that are related to object store. +/// The interface that contains all `CoreWorker` methods related to the object store. class CoreWorkerObjectInterface { public: + /// \param[in] worker_context WorkerContext of the parent CoreWorker. + /// \param[in] store_socket Path to the plasma store socket. + /// \param[in] use_memory_store Whether or not to use the in-memory object store + /// in addition to the plasma store. CoreWorkerObjectInterface(WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket); + const std::string &store_socket, + bool use_memory_store = true); + + /// Set options for this client's interactions with the object store. + /// + /// \param[in] name Unique name for this object store client. + /// \param[in] limit The maximum amount of memory in bytes that this client + /// can use in the object store. + Status SetClientOptions(std::string name, int64_t limit_bytes); /// Put an object into object store. /// @@ -32,11 +44,32 @@ class CoreWorkerObjectInterface { /// Put an object with specified ID into object store. /// /// \param[in] object The ray object. - /// \param[in] object_id Object ID specified by user. + /// \param[in] object_id Object ID specified by the user. /// \return Status. Status Put(const RayObject &object, const ObjectID &object_id); - /// Get a list of objects from the object store. Duplicate object ids are supported. + /// Create and return a buffer in the object store that can be directly written + /// into. After writing to the buffer, the caller must call `Seal()` to finalize + /// the object. The `Create()` and `Seal()` combination is an alternative interface + /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// + /// \param[in] metadata Metadata of the object to be written. + /// \param[in] data_size Size of the object to be written. + /// \param[in] object_id Object ID specified by the user. + /// \param[out] data Buffer for the user to write the object into. + /// \return Status. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data); + + /// Finalize placing an object into the object store. This should be called after + /// a corresponding `Create()` call and then writing into the returned buffer. + /// + /// \param[in] object_id Object ID corresponding to the object. + /// \return Status. + Status Seal(const ObjectID &object_id); + + /// Get a list of objects from the object store. Objects that failed to be retrieved + /// will be returned as nullptrs. /// /// \param[in] ids IDs of the objects to get. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. @@ -45,6 +78,13 @@ class CoreWorkerObjectInterface { Status Get(const std::vector &ids, int64_t timeout_ms, std::vector> *results); + /// Return whether or not the object store contains the given object. + /// + /// \param[in] object_id ID of the objects to check for. + /// \param[out] has_object Whether or not the object is present. + /// \return Status. + Status Contains(const ObjectID &object_id, bool *has_object); + /// Wait for a list of objects to appear in the object store. /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this /// case. @@ -70,7 +110,21 @@ class CoreWorkerObjectInterface { Status Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks); + /// Get a string describing object store memory usage for debugging purposes. + /// + /// \return std::string The string describing memory usage. + std::string MemoryUsageString(); + private: + /// Helper function to group object IDs by the store provider that should be used + /// for them. + /// + /// \param[in] object_ids Object IDs to group. + /// \param[out] results Map of provider type to object IDs. + void GroupObjectIdsByStoreProvider( + const std::vector &object_ids, + EnumUnorderedMap> *results); + /// Helper function to get a set of objects from different store providers. /// /// \param[in] ids_per_provider A map from store provider type to the set of @@ -96,8 +150,8 @@ class CoreWorkerObjectInterface { /// Reference to the parent CoreWorker's raylet client. std::unique_ptr &raylet_client_; - /// Store socket name. std::string store_socket_; + bool use_memory_store_; /// In-memory store for return objects. This is used for `MEMORY` store provider. std::shared_ptr memory_store_; diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 0a8422f81..8af5f2540 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -26,6 +26,26 @@ class CoreWorkerMemoryStore { /// \return Status. Status Put(const ObjectID &object_id, const RayObject &object); + /// Create and return a buffer in the object store that can be directly written + /// into. After writing to the buffer, the caller must call `Seal()` to finalize + /// the object. The `Create()` and `Seal()` combination is an alternative interface + /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// + /// \param[in] metadata Metadata of the object to be written. + /// \param[in] data_size Size of the object to be written. + /// \param[in] object_id Object ID specified by the user. + /// \param[out] data Buffer for the user to write the object into. + /// \return Status. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data); + + /// Finalize placing an object into the object store. This should be called after + /// a corresponding `Create()` call and then writing into the returned buffer. + /// + /// \param[in] object_id Object ID corresponding to the object. + /// \return Status. + Status Seal(const ObjectID &object_id); + /// Get a list of objects from the object store. /// /// \param[in] object_ids IDs of the objects to get. Duplicates are not allowed. diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index 676456613..6a8a8dbcf 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -7,24 +7,39 @@ namespace ray { -// -// CoreWorkerMemoryStoreProvider functions -// CoreWorkerMemoryStoreProvider::CoreWorkerMemoryStoreProvider( std::shared_ptr store) : store_(store) { RAY_CHECK(store != nullptr); } +Status CoreWorkerMemoryStoreProvider::SetClientOptions(std::string name, + int64_t limit_bytes) { + return Status::NotImplemented( + "SetClientOptions() not implemented for in-memory store."); +} + Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object, const ObjectID &object_id) { - auto status = store_->Put(object_id, object); + Status status = store_->Put(object_id, object); if (status.IsObjectExists()) { // Object already exists in store, treat it as ok. return Status::OK(); - } else { - return status; } + return status; +} + +Status CoreWorkerMemoryStoreProvider::Create(const std::shared_ptr &metadata, + const size_t data_size, + const ObjectID &object_id, + std::shared_ptr *data) { + return Status::NotImplemented( + "Create/Seal interface not implemented for in-memory store."); +} + +Status CoreWorkerMemoryStoreProvider::Seal(const ObjectID &object_id) { + return Status::NotImplemented( + "Create/Seal interface not implemented for in-memory store."); } Status CoreWorkerMemoryStoreProvider::Get( @@ -48,6 +63,11 @@ Status CoreWorkerMemoryStoreProvider::Get( return Status::OK(); } +Status CoreWorkerMemoryStoreProvider::Contains(const ObjectID &object_id, + bool *has_object) { + return Status::NotImplemented("Contains() not implemented for in-memory store."); +} + Status CoreWorkerMemoryStoreProvider::Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, @@ -74,4 +94,6 @@ Status CoreWorkerMemoryStoreProvider::Delete(const std::vector &object return Status::OK(); } +std::string CoreWorkerMemoryStoreProvider::MemoryUsageString() { return ""; } + } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store_provider.h b/src/ray/core_worker/store_provider/memory_store_provider.h index cad734b49..51d018064 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.h +++ b/src/ray/core_worker/store_provider/memory_store_provider.h @@ -15,30 +15,38 @@ class CoreWorker; /// The class provides implementations for accessing local process memory store. /// An example usage for this is to retrieve the returned objects from direct /// actor call (see direct_actor_transport.cc). +/// See `CoreWorkerStoreProvider` for the semantics of public methods. class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider { public: CoreWorkerMemoryStoreProvider(std::shared_ptr store); - /// See `CoreWorkerStoreProvider::Put` for semantics. + Status SetClientOptions(std::string name, int64_t limit_bytes) override; + Status Put(const RayObject &object, const ObjectID &object_id) override; - /// See `CoreWorkerStoreProvider::Get` for semantics. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data) override; + + Status Seal(const ObjectID &object_id) override; + Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, std::unordered_map> *results, bool *got_exception) override; - /// See `CoreWorkerStoreProvider::Wait` for semantics. + Status Contains(const ObjectID &object_id, bool *has_object) override; + /// Note that `num_objects` must equal to number of items in `object_ids`. Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, std::unordered_set *ready) override; - /// See `CoreWorkerStoreProvider::Delete` for semantics. /// Note that `local_only` must be true, and `delete_creating_tasks` must be false here. Status Delete(const std::vector &object_ids, bool local_only = true, bool delete_creating_tasks = false) override; + std::string MemoryUsageString() override; + private: /// Implementation. std::shared_ptr store_; diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 618748a96..019d0f31d 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -13,32 +13,66 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket)); } +CoreWorkerPlasmaStoreProvider::~CoreWorkerPlasmaStoreProvider() { + RAY_IGNORE_EXPR(store_client_.Disconnect()); +} + +Status CoreWorkerPlasmaStoreProvider::SetClientOptions(std::string name, + int64_t limit_bytes) { + std::lock_guard guard(store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(store_client_.SetClientOptions(name, limit_bytes)); + return Status::OK(); +} + Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, const ObjectID &object_id) { + std::shared_ptr data; + RAY_RETURN_NOT_OK(Create(object.GetMetadata(), + object.HasData() ? object.GetData()->Size() : 0, object_id, + &data)); + // data could be a nullptr if the ObjectID already existed, but this does + // not throw an error. + if (data != nullptr) { + if (object.HasData()) { + memcpy(data->Data(), object.GetData()->Data(), object.GetData()->Size()); + } + RAY_RETURN_NOT_OK(Seal(object_id)); + } + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &metadata, + const size_t data_size, + const ObjectID &object_id, + std::shared_ptr *data) { auto plasma_id = object_id.ToPlasmaId(); - auto data = object.GetData(); - auto metadata = object.GetMetadata(); - std::shared_ptr out_buffer; + std::shared_ptr arrow_buffer; { - std::unique_lock guard(store_client_mutex_); - arrow::Status status = store_client_.Create( - plasma_id, data ? data->Size() : 0, metadata ? metadata->Data() : nullptr, - metadata ? metadata->Size() : 0, &out_buffer); + std::lock_guard guard(store_client_mutex_); + arrow::Status status = + store_client_.Create(plasma_id, data_size, metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, &arrow_buffer); if (plasma::IsPlasmaObjectExists(status)) { - // TODO(hchen): Should we propagate this error out of `ObjectInterface::put`? RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " << object_id << "."; return Status::OK(); } + if (plasma::IsPlasmaStoreFull(status)) { + std::ostringstream message; + message << "Failed to put object " << object_id + << " in object store because it is full: " << status.message(); + return Status::ObjectStoreFull(message.str()); + } RAY_ARROW_RETURN_NOT_OK(status); } + *data = std::make_shared(PlasmaBuffer(arrow_buffer)); + return Status::OK(); +} - if (data != nullptr) { - memcpy(out_buffer->mutable_data(), data->Data(), data->Size()); - } - +Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) { + auto plasma_id = object_id.ToPlasmaId(); { - std::unique_lock guard(store_client_mutex_); + std::lock_guard guard(store_client_mutex_); RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); } @@ -50,7 +84,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( int64_t timeout_ms, bool fetch_only, const TaskID &task_id, std::unordered_map> *results, bool *got_exception) { - RAY_CHECK_OK(raylet_client_->FetchOrReconstruct(batch_ids, fetch_only, task_id)); + RAY_RETURN_NOT_OK(raylet_client_->FetchOrReconstruct(batch_ids, fetch_only, task_id)); std::vector plasma_batch_ids; plasma_batch_ids.reserve(batch_ids.size()); @@ -59,7 +93,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( } std::vector plasma_results; { - std::unique_lock guard(store_client_mutex_); + std::lock_guard guard(store_client_mutex_); RAY_ARROW_RETURN_NOT_OK( store_client_.Get(plasma_batch_ids, timeout_ms, &plasma_results)); } @@ -156,6 +190,13 @@ Status CoreWorkerPlasmaStoreProvider::Get( return raylet_client_->NotifyUnblocked(task_id); } +Status CoreWorkerPlasmaStoreProvider::Contains(const ObjectID &object_id, + bool *has_object) { + std::lock_guard guard(store_client_mutex_); + RAY_ARROW_RETURN_NOT_OK(store_client_.Contains(object_id.ToPlasmaId(), has_object)); + return Status::OK(); +} + Status CoreWorkerPlasmaStoreProvider::Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, @@ -178,6 +219,11 @@ Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object return raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks); } +std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() { + std::lock_guard guard(store_client_mutex_); + return store_client_.DebugString(); +} + void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( int num_attempts, const std::unordered_set &remaining) { if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() == diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 1ca5249bb..1640fff12 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -14,30 +14,41 @@ namespace ray { class CoreWorker; /// The class provides implementations for accessing plasma store, which includes both -/// local and remote store, remote access is done via raylet. +/// local and remote stores. Local access goes is done via a +/// CoreWorkerLocalPlasmaStoreProvider and remote access goes through the raylet. +/// See `CoreWorkerStoreProvider` for the semantics of public methods. class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { public: CoreWorkerPlasmaStoreProvider(const std::string &store_socket, std::unique_ptr &raylet_client); - /// See `CoreWorkerStoreProvider::Put` for semantics. + ~CoreWorkerPlasmaStoreProvider(); + + Status SetClientOptions(std::string name, int64_t limit_bytes); + Status Put(const RayObject &object, const ObjectID &object_id) override; - /// See `CoreWorkerStoreProvider::Get` for semantics. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data) override; + + Status Seal(const ObjectID &object_id) override; + Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, std::unordered_map> *results, bool *got_exception) override; - /// See `CoreWorkerStoreProvider::Wait` for semantics. + Status Contains(const ObjectID &object_id, bool *has_object) override; + Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, std::unordered_set *ready) override; - /// See `CoreWorkerStoreProvider::Delete` for semantics. Status Delete(const std::vector &object_ids, bool local_only = true, bool delete_creating_tasks = false) override; + std::string MemoryUsageString() override; + private: /// Ask the raylet to fetch a set of objects and then attempt to get them /// from the local plasma store. Successfully fetched objects will be removed @@ -62,7 +73,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { bool *got_exception); /// Print a warning if we've attempted too many times, but some objects are still - /// unavailable. + /// unavailable. Only the keys in the 'remaining' map are used. /// /// \param[in] num_attemps The number of attempted times. /// \param[in] remaining The remaining objects. diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index 394fc2196..ca389f232 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -18,6 +18,13 @@ class CoreWorkerStoreProvider { virtual ~CoreWorkerStoreProvider() {} + /// Set options for this client's interactions with the object store. + /// + /// \param[in] name Unique name for this object store client. + /// \param[in] limit The maximum amount of memory in bytes that this client + /// can use in the object store. + virtual Status SetClientOptions(std::string name, int64_t limit_bytes) = 0; + /// Put an object with specified ID into object store. /// /// \param[in] object The ray object. @@ -25,6 +32,26 @@ class CoreWorkerStoreProvider { /// \return Status. virtual Status Put(const RayObject &object, const ObjectID &object_id) = 0; + /// Create and return a buffer in the object store that can be directly written + /// into. After writing to the buffer, the caller must call `Seal()` to finalize + /// the object. The `Create()` and `Seal()` combination is an alternative interface + /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// + /// \param[in] metadata Metadata of the object to be written. + /// \param[in] data_size Size of the object to be written. + /// \param[in] object_id Object ID specified by the user. + /// \param[out] data Buffer for the user to write the object into. + /// \return Status. + virtual Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data) = 0; + + /// Finalize placing an object into the object store. This should be called after + /// a corresponding `Create()` call and then writing into the returned buffer. + /// + /// \param[in] object_id Object ID corresponding to the object. + /// \return Status. + virtual Status Seal(const ObjectID &object_id) = 0; + /// Get a set of objects from the object store. /// /// \param[in] object_ids IDs of the objects to get. @@ -33,12 +60,20 @@ class CoreWorkerStoreProvider { /// \param[out] results Map of objects to write results into. Get will only add to this /// map, not clear or remove from it, so the caller can pass in a non-empty map. /// \param[out] got_exception Set to true if any of the fetched results were an - /// exception. \return Status. + /// exception. + /// \return Status. virtual Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, std::unordered_map> *results, bool *got_exception) = 0; + /// Return whether or not the object store contains the given object. + /// + /// \param[in] object_id ID of the objects to check for. + /// \param[out] has_object Whether or not the object is present. + /// \return Status. + virtual Status Contains(const ObjectID &object_id, bool *has_object) = 0; + /// Wait for a list of objects to appear in the object store. Objects that appear will /// be added to the ready set. /// @@ -63,6 +98,11 @@ class CoreWorkerStoreProvider { /// \return Status. virtual Status Delete(const std::vector &object_ids, bool local_only = true, bool delete_creating_tasks = false) = 0; + + /// Get a string describing object store memory usage for debugging purposes. + /// + /// \return std::string The string describing memory usage. + virtual std::string MemoryUsageString() = 0; }; } // namespace ray diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index a1068d867..35a37ef60 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -30,8 +30,6 @@ std::string store_executable; std::string raylet_executable; std::string mock_worker_executable; -ray::ObjectID RandomObjectID() { return ObjectID::FromRandom(); } - static void flushall_redis(void) { redisContext *context = redisConnect("127.0.0.1", 6379); freeReplyObject(redisCommand(context, "FLUSHALL")); @@ -112,7 +110,7 @@ class CoreWorkerTest : public ::testing::Test { } std::string StartStore() { - std::string store_socket_name = "/tmp/store" + RandomObjectID().Hex(); + std::string store_socket_name = "/tmp/store" + ObjectID::FromRandom().Hex(); std::string store_pid = store_socket_name + ".pid"; std::string plasma_command = store_executable + " -m 10000000 -s " + store_socket_name + @@ -134,7 +132,7 @@ class CoreWorkerTest : public ::testing::Test { std::string StartRaylet(std::string store_socket_name, std::string node_ip_address, std::string redis_address, std::string resource) { - std::string raylet_socket_name = "/tmp/raylet" + RandomObjectID().Hex(); + std::string raylet_socket_name = "/tmp/raylet" + ObjectID::FromRandom().Hex(); std::string ray_start_cmd = raylet_executable; ray_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name) .append(" --store_socket_name=" + store_socket_name) @@ -221,7 +219,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker, void CoreWorkerTest::TestNormalTask( const std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); // Test for tasks with by-value and by-ref args. { @@ -263,7 +261,7 @@ void CoreWorkerTest::TestNormalTask( void CoreWorkerTest::TestActorTask( const std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); @@ -352,7 +350,7 @@ void CoreWorkerTest::TestActorTask( void CoreWorkerTest::TestActorReconstruction( const std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); // creating actor. auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); @@ -410,7 +408,7 @@ void CoreWorkerTest::TestActorReconstruction( void CoreWorkerTest::TestActorFailure( const std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); // creating actor. auto actor_handle = @@ -698,7 +696,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, nullptr); + raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, "", + nullptr); std::unique_ptr actor_handle; // Test creating actor. @@ -802,7 +801,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { TEST_F(SingleNodeTest, TestObjectInterface) { CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], - JobID::FromInt(1), gcs_options_, nullptr); + JobID::FromInt(1), gcs_options_, "", nullptr); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -873,10 +872,10 @@ TEST_F(SingleNodeTest, TestObjectInterface) { TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { CoreWorker worker1(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); CoreWorker worker2(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[1], - raylet_socket_names_[1], NextJobId(), gcs_options_, nullptr); + raylet_socket_names_[1], NextJobId(), gcs_options_, "", nullptr); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 1db55b3fc..e93107bdc 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -23,7 +23,7 @@ class MockWorker { MockWorker(const std::string &store_socket, const std::string &raylet_socket, const gcs::GcsClientOptions &gcs_options) : worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket, - JobID::FromInt(1), gcs_options, + JobID::FromInt(1), gcs_options, /*log_dir=*/"", std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4)) {} void Run() { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 617f6ff0b..0a6a7d2db 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -27,7 +27,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( const TaskSpecification &task_spec) { RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId(); if (HasByReferenceArgs(task_spec)) { - return Status::Invalid("direct actor call only supports by-value arguments"); + return Status::Invalid("Direct actor call only supports by-value arguments"); } RAY_CHECK(task_spec.IsActorTask()); @@ -243,7 +243,7 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId(); if (HasByReferenceArgs(task_spec)) { send_reply_callback( - Status::Invalid("direct actor call only supports by value arguments"), nullptr, + Status::Invalid("Direct actor call only supports by value arguments"), nullptr, nullptr); return; } diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 08a4855df..ea71c8831 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -187,7 +187,7 @@ Status AuthenticateRedis(redisAsyncContext *context, const std::string &password } void RedisAsyncContextDisconnectCallback(const redisAsyncContext *context, int status) { - RAY_LOG(WARNING) << "Redis async context disconnected. Status: " << status; + RAY_LOG(INFO) << "Redis async context disconnected. Status: " << status; // Reset raw 'redisAsyncContext' to nullptr because hiredis will release this context. reinterpret_cast(context->data)->ResetRawRedisAsyncContext(); }