[core worker] Python core worker object interface (#5272)

This commit is contained in:
Edward Oakes
2019-09-12 23:07:46 -07:00
committed by Eric Liang
parent 1b880191b0
commit 07c4c6367a
49 changed files with 1157 additions and 552 deletions
+9
View File
@@ -33,6 +33,15 @@ If you are using Anaconda, try fixing this problem by running:
try:
import pyarrow # noqa: F401
# pyarrow is not imported inside of _raylet because of the issue described
# above. In order for Cython to compile _raylet, pyarrow is set to None
# in _raylet instead, so we give _raylet a real reference to it here.
# We first do the attribute checks here so that building the documentation
# succeeds without fully installing ray..
# TODO(edoakes): Fix this.
if hasattr(ray, "_raylet") and hasattr(ray._raylet, "pyarrow"):
ray._raylet.pyarrow = pyarrow
except ImportError as e:
if ((hasattr(e, "msg") and isinstance(e.msg, str)
and ("libstdc++" in e.msg or "CXX" in e.msg))):
+245 -72
View File
@@ -4,10 +4,17 @@
# cython: language_level = 3
import numpy
import time
import logging
from libc.stdint cimport int32_t, int64_t
from libc.stdint cimport uint8_t, int32_t, int64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport unique_ptr
from libcpp.memory cimport (
dynamic_pointer_cast,
make_shared,
shared_ptr,
unique_ptr,
)
from libcpp.string cimport string as c_string
from libcpp.utility cimport pair
from libcpp.unordered_map cimport unordered_map
@@ -17,10 +24,15 @@ from cython.operator import dereference, postincrement
from ray.includes.common cimport (
CLanguage,
CRayObject,
CRayStatus,
CGcsClientOptions,
LocalMemoryBuffer,
LANGUAGE_CPP,
LANGUAGE_JAVA,
LANGUAGE_PYTHON,
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
)
from ray.includes.libraylet cimport (
CRayletClient,
@@ -34,16 +46,35 @@ from ray.includes.unique_ids cimport (
CObjectID,
CClientID,
)
from ray.includes.libcoreworker cimport CCoreWorker
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
from ray.exceptions import RayletError
from ray.exceptions import RayletError, ObjectStoreFullError
from ray.utils import decode
from ray.ray_constants import (
DEFAULT_PUT_OBJECT_DELAY,
DEFAULT_PUT_OBJECT_RETRIES,
RAW_BUFFER_METADATA,
)
# pyarrow cannot be imported until after _raylet finishes initializing
# (see ray/__init__.py for details).
# Unfortunately, Cython won't compile if 'pyarrow' is undefined, so we
# "forward declare" it here and then replace it with a reference to the
# imported package from ray/__init__.py.
# TODO(edoakes): Fix this.
pyarrow = None
cimport cpython
include "includes/unique_ids.pxi"
include "includes/ray_config.pxi"
include "includes/task.pxi"
include "includes/buffer.pxi"
include "includes/common.pxi"
logger = logging.getLogger(__name__)
if cpython.PY_MAJOR_VERSION >= 3:
@@ -58,6 +89,10 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
with gil:
message = status.message().decode()
if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
else:
raise RayletError(message)
@@ -78,13 +113,6 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids):
return result
cdef VectorToObjectIDs(c_vector[CObjectID] object_ids):
result = []
for i in range(object_ids.size()):
result.append(ObjectID(object_ids[i].Binary()))
return result
def compute_put_id(TaskID task_id, int64_t put_index):
if put_index < 1 or put_index > <int64_t>CObjectID.MaxObjectIndex():
raise ValueError("The range of 'put_index' should be [1, %d]"
@@ -217,26 +245,23 @@ cdef unordered_map[c_string, double] resource_map_from_dict(resource_map):
cdef class RayletClient:
cdef unique_ptr[CRayletClient] client
cdef CRayletClient* client
def __cinit__(self, raylet_socket,
WorkerID worker_id,
c_bool is_worker,
JobID job_id):
# We know that we are using Python, so just skip the language
# parameter.
# TODO(suquark): Should we allow unicode chars in "raylet_socket"?
self.client.reset(new CRayletClient(
raylet_socket.encode("ascii"), worker_id.native(), is_worker,
job_id.native(), LANGUAGE_PYTHON))
def disconnect(self):
check_status(self.client.get().Disconnect())
def __cinit__(self, CoreWorker core_worker):
# The core worker and raylet client need to share an underlying
# raylet client, so we take a reference to the core worker's client
# here. The client is a raw pointer because it is only a temporary
# workaround and will be removed once the core worker transition is
# complete, so we don't want to change the unique_ptr in core worker
# to a shared_ptr. This means the core worker *must* be
# initialized before the raylet client.
self.client = &core_worker.core_worker.get().GetRayletClient()
def submit_task(self, TaskSpec task_spec):
cdef:
CObjectID c_id
check_status(self.client.get().SubmitTask(
check_status(self.client.SubmitTask(
task_spec.task_spec.get()[0]))
def get_task(self):
@@ -244,45 +269,28 @@ cdef class RayletClient:
unique_ptr[CTaskSpec] task_spec
with nogil:
check_status(self.client.get().GetTask(&task_spec))
check_status(self.client.GetTask(&task_spec))
return TaskSpec.make(task_spec)
def task_done(self):
check_status(self.client.get().TaskDone())
check_status(self.client.TaskDone())
def fetch_or_reconstruct(self, object_ids,
c_bool fetch_only,
TaskID current_task_id=TaskID.nil()):
cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids)
check_status(self.client.get().FetchOrReconstruct(
check_status(self.client.FetchOrReconstruct(
fetch_ids, fetch_only, current_task_id.native()))
def notify_unblocked(self, TaskID current_task_id):
check_status(self.client.get().NotifyUnblocked(current_task_id.native()))
def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds,
c_bool wait_local, TaskID current_task_id):
cdef:
WaitResultPair result
c_vector[CObjectID] wait_ids
CTaskID c_task_id = current_task_id.native()
wait_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.client.get().Wait(wait_ids, num_returns,
timeout_milliseconds,
wait_local,
c_task_id, &result))
return (VectorToObjectIDs(result.first),
VectorToObjectIDs(result.second))
def resource_ids(self):
cdef:
ResourceMappingType resource_mapping = (
self.client.get().GetResourceIDs())
self.client.GetResourceIDs())
unordered_map[
c_string, c_vector[pair[int64_t, double]]
].iterator iterator = resource_mapping.begin()
c_vector[pair[int64_t, double]] c_value
resources_dict = {}
while iterator != resource_mapping.end():
key = decode(dereference(iterator).first)
@@ -297,10 +305,10 @@ cdef class RayletClient:
def push_error(self, JobID job_id, error_type, error_message,
double timestamp):
check_status(self.client.get().PushError(job_id.native(),
error_type.encode("ascii"),
error_message.encode("ascii"),
timestamp))
check_status(self.client.PushError(job_id.native(),
error_type.encode("ascii"),
error_message.encode("ascii"),
timestamp))
def push_profile_events(self, component_type, UniqueID component_id,
node_ip_address, profile_data):
@@ -344,42 +352,207 @@ cdef class RayletClient:
raise ValueError(
"Unknown profile event key '%s'" % key_string)
check_status(self.client.get().PushProfileEvents(profile_info))
def free_objects(self, object_ids, c_bool local_only, c_bool delete_creating_tasks):
cdef c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids)
check_status(self.client.get().FreeObjects(free_ids, local_only, delete_creating_tasks))
check_status(self.client.PushProfileEvents(profile_info))
def prepare_actor_checkpoint(self, ActorID actor_id):
cdef CActorCheckpointID checkpoint_id
cdef CActorID c_actor_id = actor_id.native()
cdef:
CActorCheckpointID checkpoint_id
CActorID c_actor_id = actor_id.native()
# PrepareActorCheckpoint will wait for raylet's reply, release
# the GIL so other Python threads can run.
with nogil:
check_status(self.client.get().PrepareActorCheckpoint(
check_status(self.client.PrepareActorCheckpoint(
c_actor_id, checkpoint_id))
return ActorCheckpointID(checkpoint_id.Binary())
def notify_actor_resumed_from_checkpoint(self, ActorID actor_id,
ActorCheckpointID checkpoint_id):
check_status(self.client.get().NotifyActorResumedFromCheckpoint(
check_status(self.client.NotifyActorResumedFromCheckpoint(
actor_id.native(), checkpoint_id.native()))
def set_resource(self, basestring resource_name, double capacity, ClientID client_id):
self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.FromBinary(client_id.binary()))
@property
def language(self):
return Language.from_native(self.client.get().GetLanguage())
@property
def client_id(self):
return ClientID(self.client.get().GetWorkerID().Binary())
def set_resource(self, basestring resource_name,
double capacity, ClientID client_id):
self.client.SetResource(resource_name.encode("ascii"), capacity,
CClientID.FromBinary(client_id.binary()))
@property
def job_id(self):
return JobID(self.client.get().GetJobID().Binary())
return JobID(self.client.GetJobID().Binary())
@property
def is_worker(self):
return self.client.get().IsWorker()
return self.client.IsWorker()
cdef class CoreWorker:
cdef unique_ptr[CCoreWorker] core_worker
def __cinit__(self, is_driver, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir):
self.core_worker.reset(new CCoreWorker(
WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER,
LANGUAGE_PYTHON, store_socket.encode("ascii"),
raylet_socket.encode("ascii"), job_id.native(),
gcs_options.native()[0], log_dir.encode("utf-8"), NULL, False))
assert pyarrow is not None, ("Expected pyarrow to be imported from "
"outside _raylet. See __init__.py for "
"details.")
def get_objects(self, object_ids, TaskID current_task_id):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Get(
c_object_ids, -1, &results))
data_metadata_pairs = []
for result in results:
# core_worker will return a nullptr for objects that couldn't be
# retrieved from the store or if an object was an exception.
if not result.get():
data_metadata_pairs.append((None, None))
else:
data = None
metadata = None
if result.get().HasData():
data = Buffer.make(result.get().GetData())
if result.get().HasMetadata():
metadata = Buffer.make(
result.get().GetMetadata()).to_pybytes()
data_metadata_pairs.append((data, metadata))
return data_metadata_pairs
def object_exists(self, ObjectID object_id):
cdef:
c_bool has_object
CObjectID c_object_id = object_id.native()
with nogil:
check_status(self.core_worker.get().Objects().Contains(
c_object_id, &has_object))
return has_object
def put_serialized_object(self, serialized_object, ObjectID object_id,
int memcopy_threads=6):
cdef:
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
CObjectID c_object_id = object_id.native()
size_t data_size
data_size = serialized_object.total_bytes
with nogil:
check_status(self.core_worker.get().Objects().Create(
metadata, data_size, c_object_id, &data))
# If data is nullptr, that means the ObjectID already existed,
# which we ignore.
# TODO(edoakes): this is hacky, we should return the error instead
# and deal with it here.
if not data:
return
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
stream.set_memcopy_threads(memcopy_threads)
serialized_object.write_to(stream)
with nogil:
check_status(self.core_worker.get().Objects().Seal(c_object_id))
def put_raw_buffer(self, c_string value, ObjectID object_id,
int memcopy_threads=6):
cdef:
c_string metadata_str = RAW_BUFFER_METADATA
CObjectID c_object_id = object_id.native()
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()), metadata_str.size()))
with nogil:
check_status(self.core_worker.get().Objects().Create(
metadata, value.size(), c_object_id, &data))
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
stream.set_memcopy_threads(memcopy_threads)
stream.write(pyarrow.py_buffer(value))
with nogil:
check_status(self.core_worker.get().Objects().Seal(c_object_id))
def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds,
TaskID current_task_id):
cdef:
WaitResultPair result
c_vector[CObjectID] wait_ids
c_vector[c_bool] results
CTaskID c_task_id = current_task_id.native()
wait_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Wait(
wait_ids, num_returns, timeout_milliseconds, &results))
assert len(results) == len(object_ids)
ready, not_ready = [], []
for i, object_id in enumerate(object_ids):
if results[i]:
ready.append(object_id)
else:
not_ready.append(object_id)
return (ready, not_ready)
def free_objects(self, object_ids, c_bool local_only,
c_bool delete_creating_tasks):
cdef:
c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids)
with nogil:
check_status(self.core_worker.get().Objects().Delete(
free_ids, local_only, delete_creating_tasks))
def set_current_task_id(self, TaskID task_id):
cdef:
CTaskID c_task_id = task_id.native()
with nogil:
self.core_worker.get().SetCurrentTaskId(c_task_id)
def set_current_job_id(self, JobID job_id):
cdef:
CJobID c_job_id = job_id.native()
with nogil:
self.core_worker.get().SetCurrentJobId(c_job_id)
def set_object_store_client_options(self, c_string client_name,
int64_t limit_bytes):
with nogil:
check_status(self.core_worker.get().Objects().SetClientOptions(
client_name, limit_bytes))
def object_store_memory_usage_string(self):
cdef:
c_string message
with nogil:
message = self.core_worker.get().Objects().MemoryUsageString()
return message.decode("utf-8")
def disconnect(self):
with nogil:
self.core_worker.get().Disconnect()
+2 -3
View File
@@ -862,10 +862,9 @@ def exit_actor():
"""
worker = ray.worker.global_worker
if worker.mode == ray.WORKER_MODE and not worker.actor_id.is_nil():
# Disconnect the worker from the raylet. The point of
# this is so that when the worker kills itself below, the
# Intentionally disconnect the core worker from the raylet so the
# raylet won't push an error message to the driver.
worker.raylet_client.disconnect()
worker.core_worker.disconnect()
ray.disconnect()
# Disconnect global state from GCS.
ray.state.state.disconnect()
+10
View File
@@ -90,6 +90,15 @@ class RayletError(RayError):
return "The Raylet died with this message: {}".format(self.client_exc)
class ObjectStoreFullError(RayError):
"""Indicates that the object store is full.
This is raised if the attempt to store the object fails
because the object store is full even after multiple retries.
"""
pass
class UnreconstructableError(RayError):
"""Indicates that an object is lost and cannot be reconstructed.
@@ -120,5 +129,6 @@ RAY_EXCEPTION_TYPES = [
RayTaskError,
RayWorkerError,
RayActorError,
ObjectStoreFullError,
UnreconstructableError,
]
+61 -3
View File
@@ -1,6 +1,10 @@
# Note: asyncio is only compatible with Python 3
import asyncio
import functools
import threading
import pyarrow.plasma as plasma
import ray
from ray.experimental.async_plasma import PlasmaProtocol, PlasmaEventHandler
@@ -11,16 +15,70 @@ transport = None
protocol = None
class _ThreadSafeProxy(object):
"""This class is used to create a thread-safe proxy for a given object.
Every method call will be guarded with a lock.
Attributes:
orig_obj (object): the original object.
lock (threading.Lock): the lock object.
_wrapper_cache (dict): a cache from original object's methods to
the proxy methods.
"""
def __init__(self, orig_obj, lock):
self.orig_obj = orig_obj
self.lock = lock
self._wrapper_cache = {}
def __getattr__(self, attr):
orig_attr = getattr(self.orig_obj, attr)
if not callable(orig_attr):
# If the original attr is a field, just return it.
return orig_attr
else:
# If the orginal attr is a method,
# return a wrapper that guards the original method with a lock.
wrapper = self._wrapper_cache.get(attr)
if wrapper is None:
@functools.wraps(orig_attr)
def _wrapper(*args, **kwargs):
with self.lock:
return orig_attr(*args, **kwargs)
self._wrapper_cache[attr] = _wrapper
wrapper = _wrapper
return wrapper
def thread_safe_client(client, lock=None):
"""Create a thread-safe proxy which locks every method call
for the given client.
Args:
client: the client object to be guarded.
lock: the lock object that will be used to lock client's methods.
If None, a new lock will be used.
Returns:
A thread-safe proxy for the given client.
"""
if lock is None:
lock = threading.Lock()
return _ThreadSafeProxy(client, lock)
async def _async_init():
global handler, transport, protocol
if handler is None:
worker = ray.worker.global_worker
plasma_client = thread_safe_client(
plasma.connect(worker.node.plasma_store_socket_name, None, 0, 300))
loop = asyncio.get_event_loop()
worker.plasma_client.subscribe()
rsock = worker.plasma_client.get_notification_socket()
plasma_client.subscribe()
rsock = plasma_client.get_notification_socket()
handler = PlasmaEventHandler(loop, worker)
transport, protocol = await loop.create_connection(
lambda: PlasmaProtocol(worker.plasma_client, handler), sock=rsock)
lambda: PlasmaProtocol(plasma_client, handler), sock=rsock)
logger.debug("AsyncPlasma Connection Created!")
+2 -1
View File
@@ -199,7 +199,8 @@ class PlasmaEventHandler:
del self._waiting_dict[fut.object_id]
def _complete_future(self, fut):
obj = self._worker.retrieve_and_deserialize([fut.object_id], 0)[0]
obj = self._worker.retrieve_and_deserialize(
[ray.ObjectID(fut.object_id.binary())], 0)[0]
fut.set_result(obj)
def as_future(self, object_id, check_ready=True):
@@ -21,18 +21,6 @@ def plasma_prefetch(object_id):
local_sched_client.fetch_or_reconstruct([ray_obj_id], True)
def plasma_get(object_id):
"""Get an object directly from plasma without going through object table.
Precondition: plasma_prefetch(object_id) has been called before.
"""
client = ray.worker.global_worker.plasma_client
plasma_id = ray.pyarrow.plasma.ObjectID(object_id)
while not client.contains(plasma_id):
pass
return client.get(plasma_id)
# TODO: doing the timer in Python land is a bit slow
class FlushThread(threading.Thread):
"""A thread that flushes periodically to plasma.
@@ -191,7 +179,8 @@ class BatchedQueue(object):
self.read_batch_offset + self.prefetch_depth):
plasma_prefetch(self._batch_id(self.prefetch_batch_offset))
self.prefetch_batch_offset += 1
self.read_buffer = plasma_get(self._batch_id(self.read_batch_offset))
self.read_buffer = ray.get(
ray.ObjectID(self._batch_id(self.read_batch_offset)))
self.read_batch_offset += 1
logger.debug("[reader] Fetched batch {} offset {} size {}".format(
self.read_batch_offset, self.read_item_offset,
+79
View File
@@ -0,0 +1,79 @@
from cpython cimport Py_buffer, PyBytes_FromStringAndSize
from libc.stdint cimport int64_t, uintptr_t
from libc.stdio cimport printf
from libcpp.memory cimport shared_ptr
from ray.includes.common cimport CBuffer
cdef class Buffer:
"""Cython wrapper class of C++ `ray::Buffer`.
This class implements the Python 'buffer protocol', which allows
us to use it for calls into pyarrow (and other Python libraries
down the line) without having to copy the data.
See https://docs.python.org/3/c-api/buffer.html for details.
"""
cdef:
shared_ptr[CBuffer] buffer
Py_ssize_t shape
Py_ssize_t strides
@staticmethod
cdef make(const shared_ptr[CBuffer]& buffer):
cdef Buffer self = Buffer.__new__(Buffer)
self.buffer = buffer
self.shape = <Py_ssize_t>self.size
self.strides = <Py_ssize_t>(1)
return self
def __len__(self):
return self.size
@property
def size(self):
"""
The buffer size in bytes.
"""
return self.buffer.get().Size()
def to_pybytes(self):
"""
Return this buffer as a Python bytes object. Memory is copied.
"""
return PyBytes_FromStringAndSize(
<const char*>self.buffer.get().Data(),
self.buffer.get().Size())
def __getbuffer__(self, Py_buffer* buffer, int flags):
buffer.readonly = 0
buffer.buf = <char *>self.buffer.get().Data()
buffer.format = 'b'
buffer.internal = NULL
buffer.itemsize = 1
buffer.len = self.size
buffer.ndim = 1
buffer.obj = self
buffer.shape = &self.shape
buffer.strides = &self.strides
buffer.suboffsets = NULL
def __getsegcount__(self, Py_ssize_t *len_out):
if len_out != NULL:
len_out[0] = <Py_ssize_t>self.size
return 1
def __getreadbuffer__(self, Py_ssize_t idx, void **p):
if idx != 0:
raise SystemError("accessing non-existent buffer segment")
if p != NULL:
p[0] = <void*> self.buffer.get().Data()
return self.size
def __getwritebuffer__(self, Py_ssize_t idx, void **p):
if idx != 0:
raise SystemError("accessing non-existent buffer segment")
if p != NULL:
p[0] = <void*> self.buffer.get().Data()
return self.size
+40 -7
View File
@@ -1,7 +1,8 @@
from libcpp.string cimport string as c_string
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.string cimport string as c_string
from libc.stdint cimport int64_t
from libc.stdint cimport uint8_t
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
@@ -49,6 +50,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
@staticmethod
CRayStatus RedisError()
@staticmethod
CRayStatus ObjectStoreFull()
c_bool ok()
c_bool IsOutOfMemory()
c_bool IsKeyError()
@@ -58,6 +62,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
c_bool IsUnknownError()
c_bool IsNotImplemented()
c_bool IsRedisError()
c_bool IsObjectStoreFull()
c_string ToString()
c_string CodeAsString()
@@ -90,19 +95,24 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef extern from "ray/protobuf/common.pb.h" nogil:
cdef cppclass CLanguage "Language":
pass
cdef cppclass CWorkerType "ray::WorkerType":
pass
# This is a workaround for C++ enum class since Cython has no corresponding
# representation.
cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
cdef extern from "ray/protobuf/common.pb.h" nogil:
cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON"
cdef CLanguage LANGUAGE_CPP "Language::CPP"
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
cdef extern from "ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER"
cdef extern from "ray/common/task/scheduling_resources.h" \
namespace "ray" nogil:
cdef cppclass ResourceSet "ResourceSet":
cdef extern from "ray/common/task/scheduling_resources.h" nogil:
cdef cppclass ResourceSet "ray::ResourceSet":
ResourceSet()
ResourceSet(const unordered_map[c_string, double] &resource_map)
ResourceSet(const c_vector[c_string] &resource_labels,
@@ -111,7 +121,8 @@ cdef extern from "ray/common/task/scheduling_resources.h" \
c_bool IsEqual(const ResourceSet &other) const
c_bool IsSubset(const ResourceSet &other) const
c_bool IsSuperset(const ResourceSet &other) const
c_bool AddOrUpdateResource(const c_string &resource_name, double capacity)
c_bool AddOrUpdateResource(const c_string &resource_name,
double capacity)
c_bool RemoveResource(const c_string &resource_name)
void AddResources(const ResourceSet &other)
c_bool SubtractResourcesStrict(const ResourceSet &other)
@@ -120,3 +131,25 @@ cdef extern from "ray/common/task/scheduling_resources.h" \
c_bool IsEmpty() const
const unordered_map[c_string, double] &GetResourceMap() const
const c_string ToString() const
cdef extern from "ray/common/buffer.h" namespace "ray" nogil:
cdef cppclass CBuffer "ray::Buffer":
uint8_t *Data() const
size_t Size() const
cdef cppclass LocalMemoryBuffer(CBuffer):
LocalMemoryBuffer(uint8_t *data, size_t size)
cdef extern from "ray/core_worker/store_provider/store_provider.h" nogil:
cdef cppclass CRayObject "ray::RayObject":
const shared_ptr[CBuffer] &GetData()
const size_t DataSize() const
const shared_ptr[CBuffer] &GetMetadata() const
c_bool HasData() const
c_bool HasMetadata() const
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
CGcsClientOptions(const c_string &ip, int port,
const c_string &password,
c_bool is_test_client)
+23
View File
@@ -0,0 +1,23 @@
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from ray.includes.common cimport CGcsClientOptions
cdef class GcsClientOptions:
"""Cython wrapper class of C++ `ray::gcs::GcsClientOptions`."""
cdef:
unique_ptr[CGcsClientOptions] gcs_client_options
def __init__(self, redis_ip, int redis_port,
redis_password, c_bool is_test_client=False):
if not redis_password:
redis_password = ""
self.gcs_client_options.reset(
new CGcsClientOptions(redis_ip.encode("ascii"),
redis_port,
redis_password.encode("ascii"),
is_test_client))
cdef CGcsClientOptions* native(self):
return <CGcsClientOptions*>(self.gcs_client_options.get())
+62
View File
@@ -0,0 +1,62 @@
from libc.stdint cimport int64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from ray.includes.unique_ids cimport (
CJobID,
CTaskID,
CObjectID,
)
from ray.includes.common cimport (
CBuffer,
CRayStatus,
CRayObject,
CWorkerType,
CLanguage,
CGcsClientOptions,
)
from ray.includes.libraylet cimport CRayletClient
cdef extern from "ray/core_worker/object_interface.h" nogil:
cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface":
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
CRayStatus Put(const CRayObject &object, CObjectID *object_id)
CRayStatus Put(const CRayObject &object, const CObjectID &object_id)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, const CObjectID &object_id,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
int64_t timeout_ms, c_vector[c_bool] *results)
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_string MemoryUsageString()
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CCoreWorker "ray::CoreWorker":
CCoreWorker(const CWorkerType worker_type, const CLanguage language,
const c_string &store_socket,
const c_string &raylet_socket, const CJobID &job_id,
const CGcsClientOptions &gcs_options,
const c_string log_dir, void* execution_callback,
c_bool use_memory_store_)
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CObjectInterface &Objects()
# CTaskSubmissionInterface &Tasks()
# CTaskExecutionInterface &Execution()
# TODO(edoakes): remove this once the raylet client is no longer used
# directly.
CRayletClient &GetRayletClient()
# TODO(edoakes): remove this once the Python core worker uses the task
# interfaces
void SetCurrentJobId(const CJobID &job_id)
void SetCurrentTaskId(const CTaskID &task_id)
+3 -1
View File
@@ -71,7 +71,9 @@ cdef extern from "ray/raylet/raylet_client.h" nogil:
CActorCheckpointID &checkpoint_id)
CRayStatus NotifyActorResumedFromCheckpoint(
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id)
CRayStatus SetResource(const c_string &resource_name,
const double capacity,
const CClientID &client_Id)
CLanguage GetLanguage() const
CWorkerID GetWorkerID() const
CJobID GetJobID() const
+14 -12
View File
@@ -17,7 +17,7 @@ from ray.includes.unique_ids cimport (
CTaskID,
)
cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc" nogil:
cdef extern from "ray/protobuf/common.pb.h" nogil:
cdef cppclass RpcTaskSpec "ray::rpc::TaskSpec":
void CopyFrom(const RpcTaskSpec &value)
@@ -29,13 +29,13 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "ray::rpc" nogil:
RpcTaskSpec *mutable_task_spec()
cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil:
cdef extern from "ray/protobuf/gcs.pb.h" nogil:
cdef cppclass TaskTableData "ray::rpc::TaskTableData":
RpcTask *mutable_task()
const c_string &SerializeAsString()
cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
cdef extern from "ray/common/task/task_spec.h" nogil:
cdef cppclass CTaskSpec "ray::TaskSpecification":
CTaskSpec(const RpcTaskSpec message)
CTaskSpec(const c_string &serialized_binary)
@@ -77,18 +77,20 @@ cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
c_vector[CActorHandleID] NewActorHandles() const
cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
cdef extern from "ray/common/task/task_util.h" nogil:
cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder":
TaskSpecBuilder &SetCommonTaskSpec(
const CTaskID &task_id, const CLanguage &language,
const c_vector[c_string] &function_descriptor, const CJobID &job_id,
const CTaskID &parent_task_id, uint64_t parent_counter,
uint64_t num_returns, const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources)
const c_vector[c_string] &function_descriptor,
const CJobID &job_id, const CTaskID &parent_task_id,
uint64_t parent_counter, uint64_t num_returns,
const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources) # noqa: E501
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id)
TaskSpecBuilder &AddByValueArg(const c_string &data, const c_string &metadata)
TaskSpecBuilder &AddByValueArg(const c_string &data,
const c_string &metadata)
TaskSpecBuilder &SetActorCreationTaskSpec(
const CActorID &actor_id, uint64_t max_reconstructions,
@@ -100,12 +102,12 @@ cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
const CObjectID &actor_creation_dummy_object_id,
const CObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);
const c_vector[CActorHandleID] &new_handle_ids)
RpcTaskSpec GetMessage()
cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil:
cdef extern from "ray/common/task/task_execution_spec.h" nogil:
cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification":
CTaskExecutionSpec(RpcTaskExecutionSpec message)
CTaskExecutionSpec(const c_string &serialized_binary)
@@ -113,6 +115,6 @@ cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil:
c_vector[CObjectID] ExecutionDependencies()
uint64_t NumForwards()
cdef extern from "ray/common/task/task.h" namespace "ray" nogil:
cdef extern from "ray/common/task/task.h" nogil:
cdef cppclass CTask "ray::Task":
CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec)
+3 -8
View File
@@ -1,9 +1,3 @@
from libc.stdint cimport uint8_t
from libcpp.memory cimport (
make_shared,
shared_ptr,
static_pointer_cast,
)
from ray.includes.task cimport (
CTask,
CTaskExecutionSpec,
@@ -184,8 +178,9 @@ cdef class TaskSpec:
arg_list.append(
ObjectID(task_spec.ArgId(i, 0).Binary()))
else:
data = (task_spec.ArgData(i)[:task_spec.ArgDataSize(i)])
metadata = (task_spec.ArgMetadata(i)[:task_spec.ArgMetadataSize(i)])
data = task_spec.ArgData(i)[:task_spec.ArgDataSize(i)]
metadata = task_spec.ArgMetadata(i)[
:task_spec.ArgMetadataSize(i)]
if metadata == RAW_BUFFER_METADATA:
obj = data
else:
+12 -9
View File
@@ -20,10 +20,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_bool IsNil() const
c_bool operator==(const CBaseID &rhs) const
c_bool operator!=(const CBaseID &rhs) const
const uint8_t *data() const;
const uint8_t *data() const
c_string Binary() const;
c_string Hex() const;
c_string Binary() const
c_string Hex() const
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
CUniqueID()
@@ -65,8 +65,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
size_t Size()
@staticmethod
CActorID Of(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter)
CActorID Of(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter)
cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID):
@@ -123,10 +123,12 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CTaskID ForActorCreationTask(CActorID actor_id)
@staticmethod
CTaskID ForActorTask(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter, CActorID actor_id)
CTaskID ForActorTask(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter, CActorID actor_id)
@staticmethod
CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter)
CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter)
cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]):
@@ -140,10 +142,11 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
const CObjectID Nil()
@staticmethod
CObjectID ForPut(const CTaskID &task_id, int64_t index, int64_t transport_type);
CObjectID ForPut(const CTaskID &task_id, int64_t index,
int64_t transport_type)
@staticmethod
CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index);
CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index)
@staticmethod
size_t Size()
+8 -10
View File
@@ -4,12 +4,11 @@ from __future__ import print_function
import ray.worker
from ray import profiling
import pyarrow
__all__ = ["free", "pin_object_data"]
def pin_object_data(obj_id):
def pin_object_data(object_id):
"""Pin the object data referenced by this object id in memory.
The object data cannot be evicted while there exists a Python reference to
@@ -25,14 +24,13 @@ def pin_object_data(obj_id):
Note that ray will automatically do this for objects created with
ray.put() already, unless you ray.put with weakref=True.
"""
worker = ray.worker.get_global_worker()
ray.get(obj_id)
obj_id.set_buffer_ref(
ray.worker.global_worker.plasma_client.get_buffers(
[pyarrow.plasma.ObjectID(obj_id.binary())]))
object_id.set_buffer_ref(
worker.core_worker.get_objects([object_id], worker.current_task_id))
def unpin_object_data(obj_id):
def unpin_object_data(object_id):
"""Unpin an object pinned by pin_object_id.
Examples:
@@ -41,7 +39,7 @@ def unpin_object_data(obj_id):
>>> unpin_object_id(x_id) # as if the pin didn't happen
"""
obj_id.set_buffer_ref(None)
object_id.set_buffer_ref(None)
def free(object_ids, local_only=False, delete_creating_tasks=False):
@@ -94,5 +92,5 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
if len(object_ids) == 0:
return
worker.raylet_client.free_objects(object_ids, local_only,
delete_creating_tasks)
worker.core_worker.free_objects(object_ids, local_only,
delete_creating_tasks)
+1 -1
View File
@@ -83,7 +83,7 @@ class LocalModeManager(object):
object_id.value = value
return object_id
def get_object(self, object_ids):
def get_objects(self, object_ids):
"""Fetch objects from the emulated object store.
Accepts only LocalModeObjectIDs and reads values directly from them.
+6
View File
@@ -230,6 +230,12 @@ class Node(object):
"""Get the node's plasma store socket name."""
return self._plasma_store_socket_name
@property
def unique_id(self):
"""Get a unique identifier for this node."""
return "{}:{}".format(self.node_ip_address,
self._plasma_store_socket_name)
@property
def webui_url(self):
"""Get the cluster's web UI url."""
+42 -52
View File
@@ -15,7 +15,6 @@ except ImportError:
import signal
import sys
import time
from pyarrow import plasma
import ray
import ray.ray_constants as ray_constants
@@ -41,8 +40,8 @@ def ray_checkpointable_actor_cls(request):
self.resumed_from_checkpoint = False
self.checkpoint_dir = checkpoint_dir
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
def increase(self):
self.value += 1
@@ -904,7 +903,7 @@ def test_actor_load_balancing(ray_start_cluster):
pass
def get_location(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
# Create a bunch of actors.
num_actors = 30
@@ -972,7 +971,7 @@ def test_actor_gpus(ray_start_cluster):
def get_location_and_ids(self):
assert ray.get_gpu_ids() == self.gpu_ids
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
# Create one actor per GPU.
@@ -1011,7 +1010,7 @@ def test_actor_multiple_gpus(ray_start_cluster):
def get_location_and_ids(self):
assert ray.get_gpu_ids() == self.gpu_ids
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
# Create some actors.
@@ -1042,7 +1041,7 @@ def test_actor_multiple_gpus(ray_start_cluster):
self.gpu_ids = ray.get_gpu_ids()
def get_location_and_ids(self):
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
# Create some actors.
@@ -1080,7 +1079,7 @@ def test_actor_different_numbers_of_gpus(ray_start_cluster):
self.gpu_ids = ray.get_gpu_ids()
def get_location_and_ids(self):
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
# Create some actors.
@@ -1126,8 +1125,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
self.gpu_ids = ray.get_gpu_ids()
def get_location_and_ids(self):
return ((
ray.worker.global_worker.plasma_client.store_socket_name),
return ((ray.worker.global_worker.node.unique_id),
tuple(self.gpu_ids))
def sleep(self):
@@ -1173,7 +1171,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
self.gpu_ids = ray.get_gpu_ids()
def get_location_and_ids(self):
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
# All the GPUs should be used up now.
@@ -1217,8 +1215,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert gpu_ids[0] in range(num_gpus_per_raylet)
return (ray.worker.global_worker.plasma_client.store_socket_name,
tuple(gpu_ids), [t1, t2])
return (ray.worker.global_worker.node.unique_id, tuple(gpu_ids),
[t1, t2])
@ray.remote(num_gpus=2)
def f2():
@@ -1229,8 +1227,8 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
assert len(gpu_ids) == 2
assert gpu_ids[0] in range(num_gpus_per_raylet)
assert gpu_ids[1] in range(num_gpus_per_raylet)
return (ray.worker.global_worker.plasma_client.store_socket_name,
tuple(gpu_ids), [t1, t2])
return (ray.worker.global_worker.node.unique_id, tuple(gpu_ids),
[t1, t2])
@ray.remote(num_gpus=1)
class Actor1(object):
@@ -1241,7 +1239,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
def get_location_and_ids(self):
assert ray.get_gpu_ids() == self.gpu_ids
return (ray.worker.global_worker.plasma_client.store_socket_name,
return (ray.worker.global_worker.node.unique_id,
tuple(self.gpu_ids))
def locations_to_intervals_for_many_tasks():
@@ -1416,8 +1414,8 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
def __init__(self):
self.x = 0
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
def inc(self):
self.x += 1
@@ -1425,8 +1423,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
# Create an actor that is not on the raylet.
actor = Counter.remote()
while (ray.get(actor.local_plasma.remote()) !=
remote_node.plasma_store_socket_name):
while (ray.get(actor.node_id.remote()) != remote_node.unique_id):
actor = Counter.remote()
# Kill the second node.
@@ -1526,8 +1523,8 @@ def setup_counter_actor(test_checkpoint=False,
self.save_exception = save_exception
self.restored = False
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
def inc(self, *xs):
self.x += 1
@@ -1554,11 +1551,11 @@ def setup_counter_actor(test_checkpoint=False,
self.num_inc_calls = 0
self.restored = True
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
# Create an actor that is not on the raylet.
actor = Counter.remote(save_exception)
while ray.get(actor.local_plasma.remote()) == local_plasma:
while ray.get(actor.node_id.remote()) == node_id:
actor = Counter.remote(save_exception)
args = [ray.put(0) for _ in range(100)]
@@ -1689,8 +1686,8 @@ def _test_nondeterministic_reconstruction(
def __init__(self):
self.queue = []
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
def push(self, item):
self.queue.append(item)
@@ -1699,9 +1696,9 @@ def _test_nondeterministic_reconstruction(
return self.queue
# Schedule the shared queue onto the remote raylet.
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
actor = Queue.remote()
while ray.get(actor.local_plasma.remote()) == local_plasma:
while ray.get(actor.node_id.remote()) == node_id:
actor = Queue.remote()
# A task that takes in the shared queue and a list of items to enqueue,
@@ -2066,14 +2063,14 @@ def test_custom_label_placement(ray_start_cluster):
@ray.remote(resources={"CustomResource1": 1})
class ResourceActor1(object):
def get_location(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource2": 1})
class ResourceActor2(object):
def get_location(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
# Create some actors.
actors1 = [ResourceActor1.remote() for _ in range(2)]
@@ -2081,9 +2078,9 @@ def test_custom_label_placement(ray_start_cluster):
locations1 = ray.get([a.get_location.remote() for a in actors1])
locations2 = ray.get([a.get_location.remote() for a in actors2])
for location in locations1:
assert location == local_plasma
assert location == node_id
for location in locations2:
assert location != local_plasma
assert location != node_id
def test_creating_more_actors_than_resources(shutdown_only):
@@ -2225,21 +2222,15 @@ def test_actor_reconstruction(ray_start_regular):
def test_actor_reconstruction_without_task(ray_start_regular):
"""Test a dead actor can be reconstructed without sending task to it."""
def object_exists(obj_id):
"""Check wether an object exists in plasma store."""
plasma_client = ray.worker.global_worker.plasma_client
plasma_id = plasma.ObjectID(obj_id.binary())
return plasma_client.get(
plasma_id, timeout_ms=0) != plasma.ObjectNotAvailable
@ray.remote(max_reconstructions=1)
class ReconstructableActor(object):
def __init__(self, obj_ids):
for obj_id in obj_ids:
# Every time the actor gets constructed,
# put a new object in plasma store.
if not object_exists(obj_id):
ray.worker.global_worker.put_object(obj_id, 1)
global_worker = ray.worker.global_worker
if not global_worker.core_worker.object_exists(obj_id):
global_worker.put_object(obj_id, 1)
break
def get_pid(self):
@@ -2252,7 +2243,8 @@ def test_actor_reconstruction_without_task(ray_start_regular):
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
assert wait_for_condition(
lambda: object_exists(obj_ids[1]), timeout_ms=5000)
lambda: ray.worker.global_worker.core_worker.object_exists(obj_ids[1]),
timeout_ms=5000)
def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
@@ -2271,10 +2263,10 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
}),
)
def kill_node(object_store_socket):
def kill_node(node_id):
node_to_remove = None
for node in cluster.worker_nodes:
if object_store_socket == node.plasma_store_socket_name:
if node_id == node.unique_id:
node_to_remove = node
cluster.remove_node(node_to_remove)
@@ -2288,7 +2280,7 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
return self.value
def get_object_store_socket(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
actor = MyActor.remote()
# Call increase 3 times.
@@ -2481,8 +2473,7 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
remote_node = [node for node in cluster.worker_nodes]
actor_cls = ray.remote(max_reconstructions=1)(ray_checkpointable_actor_cls)
actor = actor_cls.remote()
while (ray.get(actor.local_plasma.remote()) !=
remote_node[0].plasma_store_socket_name):
while (ray.get(actor.node_id.remote()) != remote_node[0].unique_id):
actor = actor_cls.remote()
# Call increase several times.
@@ -2725,8 +2716,8 @@ def test_ray_wait_dead_actor(ray_start_cluster):
def __init__(self):
pass
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
def ping(self):
time.sleep(1)
@@ -2743,8 +2734,7 @@ def test_ray_wait_dead_actor(ray_start_cluster):
remote_node = cluster.list_all_nodes()[-1]
remote_ping_id = None
for i, actor in enumerate(actors):
if ray.get(actor.local_plasma.remote()
) == remote_node.plasma_store_socket_name:
if ray.get(actor.node_id.remote()) == remote_node.unique_id:
remote_ping_id = ping_ids[i]
ray.internal.free([remote_ping_id], local_only=True)
cluster.remove_node(remote_node)
+26 -27
View File
@@ -1538,7 +1538,7 @@ def test_free_objects_multi_node(ray_start_cluster):
class RawActor(object):
def get(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
ActorOnNode0 = ray.remote(resources={"Custom0": 1})(RawActor)
ActorOnNode1 = ray.remote(resources={"Custom1": 1})(RawActor)
@@ -1585,7 +1585,7 @@ def test_free_objects_multi_node(ray_start_cluster):
assert len(l1) == 2
assert len(l2) == 1
# The deleted object will have the same store with the driver.
local_return = ray.worker.global_worker.plasma_client.store_socket_name
local_return = ray.worker.global_worker.node.unique_id
for object_id in l1:
assert ray.get(object_id) != local_return
@@ -1998,16 +1998,16 @@ def test_zero_cpus_actor(ray_start_cluster):
cluster.add_node(num_cpus=2)
ray.init(address=cluster.address)
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
@ray.remote
class Foo(object):
def method(self):
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
# Make sure tasks and actors run on the remote raylet.
a = Foo.remote()
assert ray.get(a.method.remote()) != local_plasma
assert ray.get(a.method.remote()) != node_id
def test_fractional_resources(shutdown_only):
@@ -2080,32 +2080,32 @@ def test_multiple_raylets(ray_start_cluster):
# This must be run on the zeroth raylet.
@ray.remote(num_cpus=11)
def run_on_0():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first raylet.
@ray.remote(num_gpus=2)
def run_on_1():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the second raylet.
@ray.remote(num_cpus=6, num_gpus=1)
def run_on_2():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
# This can be run anywhere.
@ray.remote(num_cpus=0, num_gpus=0)
def run_on_0_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first or second raylet.
@ray.remote(num_gpus=1)
def run_on_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the zeroth or second raylet.
@ray.remote(num_cpus=8)
def run_on_0_2():
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.plasma_store_socket_name
def run_lots_of_tasks():
names = []
@@ -2196,27 +2196,27 @@ def test_custom_resources(ray_start_cluster):
@ray.remote
def f():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def h():
ray.get([f.remote() for _ in range(5)])
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
# The f tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
# The g tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != local_plasma
assert list(raylet_ids)[0] != node_id
# Make sure that resource bookkeeping works when a task that uses a
# custom resources gets blocked.
@@ -2240,38 +2240,38 @@ def test_two_custom_resources(ray_start_cluster):
@ray.remote(resources={"CustomResource1": 1})
def f():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource2": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 1, "CustomResource2": 3})
def h():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 4})
def j():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource3": 1})
def k():
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
# The f and g tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2
assert len(set(ray.get([g.remote() for _ in range(50)]))) == 2
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
node_id = ray.worker.global_worker.node.unique_id
# The h tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([h.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != local_plasma
assert list(raylet_ids)[0] != node_id
# Make sure that tasks with unsatisfied custom resource requirements do
# not get scheduled.
@@ -2473,7 +2473,7 @@ def test_load_balancing(ray_start_cluster):
@ray.remote
def f():
time.sleep(0.01)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
attempt_to_load_balance(f, [], 100, num_nodes, 10)
attempt_to_load_balance(f, [], 1000, num_nodes, 100)
@@ -2491,7 +2491,7 @@ def test_load_balancing_with_dependencies(ray_start_cluster):
@ray.remote
def f(x):
time.sleep(0.010)
return ray.worker.global_worker.plasma_client.store_socket_name
return ray.worker.global_worker.node.unique_id
# This object will be local to one of the raylets. Make sure
# this doesn't prevent tasks from being scheduled on other raylets.
@@ -2820,8 +2820,7 @@ def test_wait_reconstruction(shutdown_only):
x_id = f.remote()
ray.wait([x_id])
ray.wait([f.remote()])
assert not ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary()))
assert not ray.worker.global_worker.core_worker.object_exists(x_id)
ready_ids, _ = ray.wait([x_id])
assert len(ready_ids) == 1
+2 -3
View File
@@ -4,7 +4,6 @@ from __future__ import print_function
import json
import os
import pyarrow.plasma as plasma
import pytest
import sys
import tempfile
@@ -767,7 +766,7 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
"object_store_memory": 10**8
}],
indirect=True)
def test_fill_plasma_exception(ray_start_cluster_head):
def test_fill_object_store_exception(ray_start_cluster_head):
@ray.remote
class LargeMemoryActor(object):
def some_expensive_task(self):
@@ -782,5 +781,5 @@ def test_fill_plasma_exception(ray_start_cluster_head):
# Make sure actor does not die
ray.get(actor.test.remote())
with pytest.raises(plasma.PlasmaStoreFull):
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
+2 -3
View File
@@ -2,12 +2,11 @@ import numpy as np
import unittest
import ray
import pyarrow
MB = 1024 * 1024
OBJECT_EVICTED = ray.exceptions.UnreconstructableError
OBJECT_TOO_LARGE = pyarrow._plasma.PlasmaStoreFull
OBJECT_TOO_LARGE = ray.exceptions.ObjectStoreFullError
@ray.remote
@@ -77,7 +76,7 @@ class TestMemoryLimits(unittest.TestCase):
print("Raised exception", type(e), e)
raise e
finally:
print(ray.worker.global_worker.plasma_client.debug_string())
print(ray.worker.global_worker.dump_object_store_memory_usage())
ray.shutdown()
+1 -1
View File
@@ -145,7 +145,7 @@ class TestMemoryScheduling(unittest.TestCase):
resources_per_trial={"object_store_memory": 150 * 1024 * 1024},
raise_on_failed_trial=False)
self.assertTrue(result.trials[0].status, "ERROR")
self.assertTrue("PlasmaStoreFull: object does not fit" in
self.assertTrue("ObjectStoreFullError: Failed to put" in
result.trials[0].error_msg)
finally:
ray.shutdown()
+6 -6
View File
@@ -237,8 +237,8 @@ def test_object_transfer_retry(ray_start_cluster):
x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]]
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
ray.worker.global_worker.core_worker.object_exists(x_id)
for x_id in x_ids)
# Get the objects locally to cause them to be transferred. This is the
# first time the objects are getting transferred, so it should happen
@@ -257,8 +257,8 @@ def test_object_transfer_retry(ray_start_cluster):
for _ in range(15):
ray.put(x)
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
ray.worker.global_worker.core_worker.object_exists(x_id)
for x_id in x_ids)
end_time = time.time()
# Make sure that the first time the objects get transferred, it happens
@@ -277,8 +277,8 @@ def test_object_transfer_retry(ray_start_cluster):
for _ in range(15):
ray.put(x)
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
ray.worker.global_worker.core_worker.object_exists(x_id)
for x_id in x_ids)
time.sleep(repeated_push_delay)
+3 -4
View File
@@ -281,16 +281,15 @@ def test_signal_on_node_failure(two_node_cluster):
def __init__(self):
pass
def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name
def node_id(self):
return ray.worker.global_worker.node.unique_id
# Place the actor on the remote node.
cluster, remote_node = two_node_cluster
actor_cls = ray.remote(max_reconstructions=0)(ActorSignal)
actor = actor_cls.remote()
# Try until we put an actor on a different node.
while (ray.get(actor.local_plasma.remote()) !=
remote_node.plasma_store_socket_name):
while (ray.get(actor.node_id.remote()) != remote_node.unique_id):
actor = actor_cls.remote()
# Kill actor process.
-55
View File
@@ -4,7 +4,6 @@ from __future__ import print_function
import binascii
import errno
import functools
import hashlib
import inspect
import logging
@@ -518,60 +517,6 @@ def check_oversized_pickle(pickled, name, obj_type, worker):
job_id=worker.current_job_id)
class _ThreadSafeProxy(object):
"""This class is used to create a thread-safe proxy for a given object.
Every method call will be guarded with a lock.
Attributes:
orig_obj (object): the original object.
lock (threading.Lock): the lock object.
_wrapper_cache (dict): a cache from original object's methods to
the proxy methods.
"""
def __init__(self, orig_obj, lock):
self.orig_obj = orig_obj
self.lock = lock
self._wrapper_cache = {}
def __getattr__(self, attr):
orig_attr = getattr(self.orig_obj, attr)
if not callable(orig_attr):
# If the original attr is a field, just return it.
return orig_attr
else:
# If the orginal attr is a method,
# return a wrapper that guards the original method with a lock.
wrapper = self._wrapper_cache.get(attr)
if wrapper is None:
@functools.wraps(orig_attr)
def _wrapper(*args, **kwargs):
with self.lock:
return orig_attr(*args, **kwargs)
self._wrapper_cache[attr] = _wrapper
wrapper = _wrapper
return wrapper
def thread_safe_client(client, lock=None):
"""Create a thread-safe proxy which locks every method call
for the given client.
Args:
client: the client object to be guarded.
lock: the lock object that will be used to lock client's methods.
If None, a new lock will be used.
Returns:
A thread-safe proxy for the given client.
"""
if lock is None:
lock = threading.Lock()
return _ThreadSafeProxy(client, lock)
def is_main_thread():
return threading.current_thread().getName() == "MainThread"
+89 -158
View File
@@ -55,6 +55,7 @@ from ray.exceptions import (
RayError,
RayTaskError,
RayWorkerError,
ObjectStoreFullError,
UnreconstructableError,
RAY_EXCEPTION_TYPES,
)
@@ -67,7 +68,6 @@ from ray.utils import (
check_oversized_pickle,
is_cython,
setup_logger,
thread_safe_client,
)
from ray.local_mode_manager import LocalModeManager
@@ -312,18 +312,15 @@ class Worker(object):
# If the object is a byte array, skip serializing it and
# use a special metadata to indicate it's raw binary. So
# that this object can also be read by Java.
self.plasma_client.put_raw_buffer(
value,
object_id=pyarrow.plasma.ObjectID(object_id.binary()),
metadata=ray_constants.RAW_BUFFER_METADATA,
memcopy_threads=self.memcopy_threads)
self.core_worker.put_raw_buffer(
value, object_id, memcopy_threads=self.memcopy_threads)
else:
self.plasma_client.put(
value,
object_id=pyarrow.plasma.ObjectID(object_id.binary()),
memcopy_threads=self.memcopy_threads,
serialization_context=self.get_serialization_context(
self.current_job_id))
serialization_context = self.get_serialization_context(
self.current_job_id)
self.core_worker.put_serialized_object(
pyarrow.serialize(value, serialization_context),
object_id,
memcopy_threads=self.memcopy_threads)
break
except pyarrow.SerializationCallbackError as e:
try:
@@ -377,9 +374,9 @@ class Worker(object):
value: The value to put in the object store.
Raises:
plasma.PlasmaStoreFull: This is raised if the attempt to store the
object fails because the object store is full even after
multiple retries.
ray.exceptions.ObjectStoreFullError: This is raised if the attempt
to store the object fails because the object store is full even
after multiple retries.
"""
# Make sure that the value is not an object ID.
if isinstance(value, ObjectID):
@@ -396,7 +393,7 @@ class Worker(object):
try:
self._try_store_and_register(object_id, value)
break
except pyarrow.plasma.PlasmaStoreFull as plasma_exc:
except ObjectStoreFullError as e:
if attempt:
logger.warning("Waiting {} seconds for space to free up "
"in the object store.".format(delay))
@@ -404,13 +401,12 @@ class Worker(object):
delay *= 2
else:
self.dump_object_store_memory_usage()
raise plasma_exc
raise e
def dump_object_store_memory_usage(self):
"""Prints object store debug string to stdout."""
msg = "\n" + self.plasma_client.debug_string()
msg = msg.replace("\n", "\nplasma: ")
logger.warning("Local object store memory usage:\n{}\n".format(msg))
logger.warning("Local object store memory usage:\n{}\n".format(
self.core_worker.object_store_memory_usage_string()))
def _try_store_and_register(self, object_id, value):
"""Wraps `store_and_register` with cases for existence and pickling.
@@ -422,14 +418,6 @@ class Worker(object):
"""
try:
self.store_and_register(object_id, value)
except pyarrow.plasma.PlasmaObjectExists:
# The object already exists in the object store, so there is no
# need to add it again. TODO(rkn): We need to compare hashes
# and make sure that the objects are in fact the same. We also
# should return an error code to caller instead of printing a
# message.
logger.info("The object with ID {} already exists "
"in the object store.".format(object_id))
except TypeError:
# TypeError can happen because one of the members of the object
# may not be serializable for cloudpickle. So we need
@@ -442,36 +430,25 @@ class Worker(object):
logger.warning(warning_message)
self.store_and_register(object_id, value)
def retrieve_and_deserialize(self, object_ids, timeout, error_timeout=10):
def retrieve_and_deserialize(self, object_ids, error_timeout=10):
data_metadata_pairs = self.core_worker.get_objects(
object_ids, self.current_task_id)
assert len(data_metadata_pairs) == len(object_ids)
start_time = time.time()
# Only send the warning once.
warning_sent = False
serialization_context = self.get_serialization_context(
self.current_job_id)
while True:
results = []
warning_sent = False
i = 0
while i < len(object_ids):
object_id = object_ids[i]
data, metadata = data_metadata_pairs[i]
try:
# We divide very large get requests into smaller get requests
# so that a single get request doesn't block the store for a
# long time, if the store is blocked, it can block the manager
# as well as a consequence.
results = []
batch_size = ray._config.worker_fetch_request_size()
for i in range(0, len(object_ids), batch_size):
metadata_data_pairs = self.plasma_client.get_buffers(
object_ids[i:i + batch_size],
timeout,
with_meta=True,
)
for j in range(len(metadata_data_pairs)):
metadata, data = metadata_data_pairs[j]
results.append(
self._deserialize_object_from_arrow(
data,
metadata,
object_ids[i + j],
serialization_context,
))
return results
results.append(
self._deserialize_object_from_arrow(
data, metadata, object_id, serialization_context))
i += 1
except pyarrow.DeserializationCallbackError:
# Wait a little bit for the import thread to import the class.
# If we currently have the worker lock, we need to release it
@@ -492,11 +469,15 @@ class Worker(object):
job_id=self.current_job_id)
warning_sent = True
return results
def _deserialize_object_from_arrow(self, data, metadata, object_id,
serialization_context):
if metadata:
# Check if the object should be returned as raw bytes.
if metadata == ray_constants.RAW_BUFFER_METADATA:
if data is None:
return b""
return data.to_pybytes()
# Otherwise, return an exception object based on
# the error type.
@@ -511,16 +492,13 @@ class Worker(object):
assert False, "Unrecognized error type " + str(error_type)
elif data:
# If data is not empty, deserialize the object.
# Note, the lock is needed because `serialization_context` isn't
# thread-safe.
with self.plasma_client.lock:
return pyarrow.deserialize(data, serialization_context)
return pyarrow.deserialize(data, serialization_context)
else:
# Object isn't available in plasma.
return plasma.ObjectNotAvailable
def get_object(self, object_ids):
"""Get the value or values in the object store associated with the IDs.
def get_objects(self, object_ids):
"""Get the values in the object store associated with the IDs.
Return the values from the local object store for object_ids. This will
block until all the values for object_ids have been written to the
@@ -542,72 +520,11 @@ class Worker(object):
"which is not an ray.ObjectID.".format(object_id))
if self.mode == LOCAL_MODE:
return self.local_mode_manager.get_object(object_ids)
return self.local_mode_manager.get_objects(object_ids)
# Do an initial fetch for remote objects. We divide the fetch into
# smaller fetches so as to not block the manager for a prolonged period
# of time in a single call.
plain_object_ids = [
plasma.ObjectID(object_id.binary()) for object_id in object_ids
]
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
self.raylet_client.fetch_or_reconstruct(
object_ids[i:(i + ray._config.worker_fetch_request_size())],
True)
# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
# Construct a dictionary mapping object IDs that we haven't gotten yet
# to their original index in the object_ids argument.
unready_ids = {
plain_object_ids[i].binary(): i
for (i, val) in enumerate(final_results)
if val is plasma.ObjectNotAvailable
}
if len(unready_ids) > 0:
# Try reconstructing any objects we haven't gotten yet. Try to
# get them until at least get_timeout_milliseconds
# milliseconds passes, then repeat.
while len(unready_ids) > 0:
object_ids_to_fetch = [
plasma.ObjectID(unready_id)
for unready_id in unready_ids.keys()
]
ray_object_ids_to_fetch = [
ObjectID(unready_id) for unready_id in unready_ids.keys()
]
fetch_request_size = ray._config.worker_fetch_request_size()
for i in range(0, len(object_ids_to_fetch),
fetch_request_size):
self.raylet_client.fetch_or_reconstruct(
ray_object_ids_to_fetch[i:(i + fetch_request_size)],
False,
self.current_task_id,
)
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([
ray._config.get_timeout_milliseconds(),
int(0.01 * len(unready_ids)),
]),
)
# Remove any entries for objects we received during this
# iteration so we don't retrieve the same object twice.
for i, val in enumerate(results):
if val is not plasma.ObjectNotAvailable:
object_id = object_ids_to_fetch[i].binary()
index = unready_ids[object_id]
final_results[index] = val
unready_ids.pop(object_id)
# If there were objects that we weren't able to get locally,
# let the raylet know that we're now unblocked.
self.raylet_client.notify_unblocked(self.current_task_id)
assert len(final_results) == len(object_ids)
return final_results
results = self.retrieve_and_deserialize(object_ids)
assert len(results) == len(object_ids)
return results
def submit_task(self,
function_descriptor,
@@ -859,7 +776,7 @@ class Worker(object):
# Get the objects from the local object store.
if len(object_ids) > 0:
values = self.get_object(object_ids)
values = self.get_objects(object_ids)
for i, value in enumerate(values):
if isinstance(value, RayError):
raise value
@@ -893,8 +810,7 @@ class Worker(object):
raise Exception("Returning an actor handle from a remote "
"function is not allowed).")
if outputs[i] is ray.experimental.no_return.NoReturn:
if not self.plasma_client.contains(
pyarrow.plasma.ObjectID(object_ids[i].binary())):
if not self.core_worker.object_exists(object_ids[i]):
raise RuntimeError(
"Attempting to return 'ray.experimental.NoReturn' "
"from a remote function, but the corresponding "
@@ -923,12 +839,14 @@ class Worker(object):
# needed so that if the task throws an exception, we propagate
# the error message to the correct driver.
self.current_job_id = task.job_id()
self.core_worker.set_current_job_id(task.job_id())
else:
# If this worker is an actor, current_job_id wasn't reset.
# Check that current task's driver ID equals the previous one.
assert self.current_job_id == task.job_id()
self.task_context.current_task_id = task.task_id()
self.core_worker.set_current_task_id(task.task_id())
function_descriptor = FunctionDescriptor.from_bytes_list(
task.function_descriptor_list())
@@ -972,7 +890,7 @@ class Worker(object):
ray_constants.from_memory_units(
task.required_resources()["memory"]))
if "object_store_memory" in task.required_resources():
self._set_plasma_client_options(
self._set_object_store_client_options(
worker_name,
int(
ray_constants.from_memory_units(
@@ -1007,20 +925,21 @@ class Worker(object):
function_descriptor, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
def _set_plasma_client_options(self, client_name, object_store_memory):
def _set_object_store_client_options(self, name, object_store_memory):
try:
logger.debug("Setting plasma memory limit to {} for {}".format(
object_store_memory, client_name))
self.plasma_client.set_client_options(client_name,
object_store_memory)
except pyarrow._plasma.PlasmaStoreFull:
object_store_memory, name))
self.core_worker.set_object_store_client_options(
name.encode("ascii"), object_store_memory)
except RayError as e:
self.dump_object_store_memory_usage()
raise memory_monitor.RayOutOfMemoryError(
"Failed to set object_store_memory={} for {}. The "
"plasma store may have insufficient memory remaining "
"to satisfy this limit (30% of object store memory is "
"permanently reserved for shared usage).".format(
object_store_memory, client_name))
"permanently reserved for shared usage). The current "
"object store memory status is:\n\n{}".format(
object_store_memory, name, e))
def _handle_process_task_failure(self, function_descriptor,
return_object_ids, error, backtrace):
@@ -1092,6 +1011,7 @@ class Worker(object):
self._process_task(task, execution_info)
# Reset the state fields so the next task can run.
self.task_context.current_task_id = TaskID.nil()
self.core_worker.set_current_task_id(TaskID.nil())
self.task_context.task_index = 0
self.task_context.put_index = 1
if self.actor_id.is_nil():
@@ -1099,6 +1019,7 @@ class Worker(object):
# actor. Because the following tasks should all have the
# same driver id.
self.current_job_id = WorkerID.nil()
self.core_worker.set_current_job_id(JobID.nil())
# Reset signal counters so that the next task can get
# all past signals.
ray_signal.reset()
@@ -1110,7 +1031,7 @@ class Worker(object):
reached_max_executions = (self.function_actor_manager.get_task_counter(
job_id, function_descriptor) == execution_info.max_calls)
if reached_max_executions:
self.raylet_client.disconnect()
self.core_worker.disconnect()
sys.exit(0)
def _get_next_task_from_raylet(self):
@@ -1967,14 +1888,6 @@ def connect(node,
else:
raise Exception("This code should be unreachable.")
# Create an object store client.
worker.plasma_client = thread_safe_client(
plasma.connect(node.plasma_store_socket_name, None, 0, 300))
if driver_object_store_memory is not None:
worker._set_plasma_client_options("ray_driver_{}".format(os.getpid()),
driver_object_store_memory)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
if mode == SCRIPT_MODE:
@@ -2036,12 +1949,27 @@ def connect(node,
# driver task.
worker.task_context.current_task_id = driver_task_spec.task_id()
worker.raylet_client = ray._raylet.RayletClient(
node.raylet_socket_name,
WorkerID(worker.worker_id),
(mode == WORKER_MODE),
worker.current_job_id,
redis_address, redis_port = node.redis_address.split(":")
gcs_options = ray._raylet.GcsClientOptions(
redis_address,
int(redis_port),
node.redis_password,
)
worker.core_worker = ray._raylet.CoreWorker(
(mode == SCRIPT_MODE),
node.plasma_store_socket_name,
node.raylet_socket_name,
worker.current_job_id,
gcs_options,
node.get_logs_dir_path(),
)
worker.core_worker.set_current_job_id(worker.current_job_id)
worker.core_worker.set_current_task_id(worker.current_task_id)
worker.raylet_client = ray._raylet.RayletClient(worker.core_worker)
if driver_object_store_memory is not None:
worker._set_object_store_client_options(
"ray_driver_{}".format(os.getpid()), driver_object_store_memory)
# Start the import thread
worker.import_thread = import_thread.ImportThread(worker, mode,
@@ -2141,8 +2069,8 @@ def disconnect():
if hasattr(worker, "raylet_client"):
del worker.raylet_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.disconnect()
if hasattr(worker, "core_worker"):
del worker.core_worker
@contextmanager
@@ -2331,7 +2259,7 @@ def get(object_ids):
"or a list of object IDs.")
global last_task_error_raise_time
values = worker.get_object(object_ids)
values = worker.get_objects(object_ids)
for i, value in enumerate(values):
if isinstance(value, RayError):
last_task_error_raise_time = time.time()
@@ -2376,7 +2304,7 @@ def put(value, weakref=False):
)
try:
worker.put_object(object_id, value)
except pyarrow.plasma.PlasmaStoreFull:
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects. If you are putting "
@@ -2387,10 +2315,14 @@ def put(value, weakref=False):
worker.task_context.put_index += 1
# Pin the object buffer with the returned id. This avoids put returns
# from getting evicted out from under the id.
# TODO(edoakes): we should be able to avoid this extra IPC by holding
# a reference to the buffer created when putting the object, but the
# buffer returned by the plasma store create method doesn't prevent
# the object from being evicted.
if not weakref and not worker.mode == LOCAL_MODE:
object_id.set_buffer_ref(
worker.plasma_client.get_buffers(
[pyarrow.plasma.ObjectID(object_id.binary())]))
worker.core_worker.get_objects([object_id],
worker.current_task_id))
return object_id
@@ -2479,11 +2411,10 @@ def wait(object_ids, num_returns=1, timeout=None):
timeout = timeout if timeout is not None else 10**6
timeout_milliseconds = int(timeout * 1000)
ready_ids, remaining_ids = worker.raylet_client.wait(
ready_ids, remaining_ids = worker.core_worker.wait(
object_ids,
num_returns,
timeout_milliseconds,
False,
worker.current_task_id,
)
return ready_ids, remaining_ids