mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 05:41:51 +08:00
Add entries to in-memory store on Put() (#7085)
This commit is contained in:
+8
-13
@@ -177,10 +177,9 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids):
|
||||
The output vector.
|
||||
"""
|
||||
cdef:
|
||||
ObjectID object_id
|
||||
c_vector[CObjectID] result
|
||||
for object_id in object_ids:
|
||||
result.push_back(object_id.native())
|
||||
result.push_back((<ObjectID>object_id).native())
|
||||
return result
|
||||
|
||||
|
||||
@@ -267,7 +266,6 @@ cdef void prepare_args(
|
||||
int64_t put_threshold
|
||||
shared_ptr[CBuffer] arg_data
|
||||
c_vector[CObjectID] inlined_ids
|
||||
ObjectID obj_id
|
||||
|
||||
worker = ray.worker.global_worker
|
||||
put_threshold = RayConfig.instance().max_direct_call_object_size()
|
||||
@@ -288,8 +286,8 @@ cdef void prepare_args(
|
||||
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
|
||||
make_shared[LocalMemoryBuffer](size))
|
||||
write_serialized_object(serialized_arg, arg_data)
|
||||
for obj_id in serialized_arg.contained_object_ids:
|
||||
inlined_ids.push_back(obj_id.native())
|
||||
for object_id in serialized_arg.contained_object_ids:
|
||||
inlined_ids.push_back((<ObjectID>object_id).native())
|
||||
args_vector.push_back(
|
||||
CTaskArg.PassByValue(make_shared[CRayObject](
|
||||
arg_data, string_to_buffer(serialized_arg.metadata),
|
||||
@@ -298,7 +296,7 @@ cdef void prepare_args(
|
||||
else:
|
||||
args_vector.push_back(
|
||||
CTaskArg.PassByReference((CObjectID.FromBinary(
|
||||
core_worker.put_serialized_cobject(serialized_arg)))))
|
||||
core_worker.put_serialized_object(serialized_arg)))))
|
||||
|
||||
cdef deserialize_args(
|
||||
const c_vector[shared_ptr[CRayObject]] &c_args,
|
||||
@@ -694,12 +692,6 @@ cdef class CoreWorker:
|
||||
def put_serialized_object(self, serialized_object,
|
||||
ObjectID object_id=None,
|
||||
c_bool pin_object=True):
|
||||
return ObjectID(self.put_serialized_cobject(
|
||||
serialized_object, object_id, pin_object))
|
||||
|
||||
def put_serialized_cobject(self, serialized_object,
|
||||
ObjectID object_id=None,
|
||||
c_bool pin_object=True):
|
||||
cdef:
|
||||
CObjectID c_object_id
|
||||
shared_ptr[CBuffer] data
|
||||
@@ -916,7 +908,9 @@ cdef class CoreWorker:
|
||||
extra_data)
|
||||
|
||||
def deserialize_and_register_actor_handle(self, const c_string &bytes):
|
||||
cdef CActorHandle* c_actor_handle
|
||||
cdef:
|
||||
CActorHandle* c_actor_handle
|
||||
|
||||
worker = ray.worker.get_global_worker()
|
||||
worker.check_connected()
|
||||
manager = worker.function_actor_manager
|
||||
@@ -996,6 +990,7 @@ cdef class CoreWorker:
|
||||
CObjectID c_outer_object_id = outer_object_id.native()
|
||||
CTaskID c_owner_id = CTaskID.FromBinary(owner_id_binary)
|
||||
CAddress c_owner_address = CAddress()
|
||||
|
||||
c_owner_address.ParseFromString(serialized_owner_address)
|
||||
self.core_worker.get().RegisterOwnershipInfoAndResolveFuture(
|
||||
c_object_id,
|
||||
|
||||
@@ -40,8 +40,9 @@ class PlasmaEventHandler:
|
||||
"""Process notifications."""
|
||||
for object_id, object_size, metadata_size in messages:
|
||||
if object_size > 0 and object_id in self._waiting_dict:
|
||||
# This must be asynchronous to allow objects to be locally
|
||||
# received
|
||||
# This must be asynchronous because it runs on the main IO
|
||||
# thread in the worker. If this is blocked, other messages
|
||||
# won't be received.
|
||||
self._loop.call_soon_threadsafe(_complete_future, self,
|
||||
object_id)
|
||||
|
||||
|
||||
@@ -152,6 +152,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
|
||||
|
||||
CObjectID WithPlasmaTransportType()
|
||||
|
||||
CObjectID WithDirectTransportType()
|
||||
|
||||
int64_t ObjectIndex() const
|
||||
|
||||
CTaskID TaskId() const
|
||||
|
||||
@@ -185,7 +185,7 @@ cdef class ObjectID(BaseID):
|
||||
|
||||
@classmethod
|
||||
def from_random(cls):
|
||||
return cls(CObjectID.FromRandom().Binary())
|
||||
return cls(CObjectID.FromRandom().WithDirectTransportType().Binary())
|
||||
|
||||
def __await__(self):
|
||||
# Delayed import because this can only be imported in py3.
|
||||
|
||||
@@ -192,6 +192,10 @@ class SerializationContext:
|
||||
# UniqueIDs are serialized as
|
||||
# (class name, (unique bytes,)).
|
||||
outer_id = context.get_outer_object_id()
|
||||
# outer_id is None in the case that this ObjectID was closed
|
||||
# over in a function or pickled directly using pickle.dumps().
|
||||
if outer_id is None:
|
||||
outer_id = ray.ObjectID.nil()
|
||||
worker.core_worker.deserialize_and_register_object_id(
|
||||
obj_id[1][0], outer_id, owner_id[1][0], owner_address)
|
||||
return deserialized_object_id
|
||||
|
||||
@@ -235,3 +235,15 @@ class RemoteSignal:
|
||||
|
||||
def wait(self):
|
||||
ray.get(self.signal_actor.wait.remote())
|
||||
|
||||
|
||||
@ray.remote
|
||||
def _put(obj):
|
||||
return obj
|
||||
|
||||
|
||||
def put_object(obj, use_ray_put):
|
||||
if use_ray_put:
|
||||
return ray.put(obj)
|
||||
else:
|
||||
return _put.remote(obj)
|
||||
|
||||
@@ -527,8 +527,8 @@ def test_put_pins_object(ray_start_object_store_memory):
|
||||
del x_id
|
||||
for _ in range(10):
|
||||
ray.put(np.zeros(10 * 1024 * 1024))
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(ray.ObjectID(x_binary))
|
||||
assert not ray.worker.global_worker.core_worker.object_exists(
|
||||
ray.ObjectID(x_binary))
|
||||
|
||||
# weakref put
|
||||
y_id = ray.put("HI", weakref=True)
|
||||
|
||||
@@ -163,19 +163,18 @@ def test_asyncio_actor_async_get(ray_start_regular_shared):
|
||||
def remote_task():
|
||||
return 1
|
||||
|
||||
plasma_object = ray.put(2)
|
||||
|
||||
@ray.remote
|
||||
class AsyncGetter:
|
||||
async def get(self):
|
||||
return await remote_task.remote()
|
||||
|
||||
async def plasma_get(self):
|
||||
return await plasma_object
|
||||
async def plasma_get(self, plasma_object):
|
||||
return await plasma_object[0]
|
||||
|
||||
getter = AsyncGetter.options().remote()
|
||||
plasma_object = ray.put(2)
|
||||
getter = AsyncGetter.remote()
|
||||
assert ray.get(getter.get.remote()) == 1
|
||||
assert ray.get(getter.plasma_get.remote()) == 2
|
||||
assert ray.get(getter.plasma_get.remote([plasma_object])) == 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -841,10 +841,15 @@ def test_raylet_crash_when_get(ray_start_regular):
|
||||
time.sleep(2)
|
||||
ray.worker._global_node.kill_raylet()
|
||||
|
||||
object_id = ray.put(None)
|
||||
ray.internal.free(object_id)
|
||||
while ray.worker.global_worker.core_worker.object_exists(object_id):
|
||||
time.sleep(1)
|
||||
|
||||
thread = threading.Thread(target=sleep_to_kill_raylet)
|
||||
thread.start()
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(ray.ObjectID.from_random())
|
||||
ray.get(object_id)
|
||||
thread.join()
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
# coding: utf-8
|
||||
import asyncio
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
@@ -12,7 +13,7 @@ import pytest
|
||||
|
||||
import ray
|
||||
import ray.cluster_utils
|
||||
from ray.test_utils import SignalActor, wait_for_condition
|
||||
from ray.test_utils import SignalActor, put_object, wait_for_condition
|
||||
from ray.internal.internal_api import global_gc
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -372,19 +373,14 @@ def test_feature_flag(shutdown_only):
|
||||
# Remote function takes serialized reference and doesn't hold onto it after
|
||||
# finishing. Referenced object shouldn't be evicted while the task is pending
|
||||
# and should be evicted after it returns.
|
||||
def test_basic_serialized_reference(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_basic_serialized_reference(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote
|
||||
def pending(ref, dep):
|
||||
ray.get(ref[0])
|
||||
|
||||
# TODO(edoakes): currently these tests don't work with ray.put() so we need
|
||||
# to return from a task like this instead. Once that is fixed, should have
|
||||
# tests run with both codepaths.
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
array_oid = put.remote()
|
||||
array_oid = put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
signal = SignalActor.remote()
|
||||
oid = pending.remote([array_oid], signal.wait.remote())
|
||||
|
||||
@@ -406,7 +402,19 @@ def test_basic_serialized_reference(one_worker_100MiB):
|
||||
# Call a recursive chain of tasks that pass a serialized reference to the end
|
||||
# of the chain. The reference should still exist while the final task in the
|
||||
# chain is running and should be removed once it finishes.
|
||||
def test_recursive_serialized_reference(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote(num_cpus=0)
|
||||
class Signal:
|
||||
def __init__(self):
|
||||
self.ready_event = asyncio.Event()
|
||||
|
||||
def send(self):
|
||||
self.ready_event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self.ready_event.wait()
|
||||
|
||||
@ray.remote
|
||||
def recursive(ref, signal, max_depth, depth=0):
|
||||
ray.get(ref[0])
|
||||
@@ -415,14 +423,11 @@ def test_recursive_serialized_reference(one_worker_100MiB):
|
||||
else:
|
||||
return recursive.remote(ref, signal, max_depth, depth + 1)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
signal = SignalActor.remote()
|
||||
|
||||
max_depth = 5
|
||||
array_oid = put.remote()
|
||||
array_oid = put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
head_oid = recursive.remote([array_oid], signal, max_depth)
|
||||
|
||||
# Remove the local reference.
|
||||
@@ -447,7 +452,8 @@ def test_recursive_serialized_reference(one_worker_100MiB):
|
||||
# Test that a passed reference held by an actor after the method finishes
|
||||
# is kept until the reference is removed from the actor. Also tests giving
|
||||
# the actor a duplicate reference to the same object ID.
|
||||
def test_actor_holding_serialized_reference(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote
|
||||
class GreedyActor(object):
|
||||
def __init__(self):
|
||||
@@ -465,12 +471,9 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
|
||||
def delete_ref2(self):
|
||||
self.ref2 = None
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
# Test that the reference held by the actor isn't evicted.
|
||||
array_oid = put.remote()
|
||||
array_oid = put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
actor = GreedyActor.remote()
|
||||
actor.set_ref1.remote([array_oid])
|
||||
|
||||
@@ -496,7 +499,19 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
|
||||
# Test that a passed reference held by an actor after a task finishes
|
||||
# is kept until the reference is removed from the worker. Also tests giving
|
||||
# the worker a duplicate reference to the same object ID.
|
||||
def test_worker_holding_serialized_reference(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote(num_cpus=0)
|
||||
class Signal:
|
||||
def __init__(self):
|
||||
self.ready_event = asyncio.Event()
|
||||
|
||||
def send(self):
|
||||
self.ready_event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self.ready_event.wait()
|
||||
|
||||
@ray.remote
|
||||
def child(dep1, dep2):
|
||||
return
|
||||
@@ -505,14 +520,11 @@ def test_worker_holding_serialized_reference(one_worker_100MiB):
|
||||
def launch_pending_task(ref, signal):
|
||||
return child.remote(ref[0], signal.wait.remote())
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
signal = SignalActor.remote()
|
||||
|
||||
# Test that the reference held by the actor isn't evicted.
|
||||
array_oid = put.remote()
|
||||
array_oid = put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
child_return_id = ray.get(launch_pending_task.remote([array_oid], signal))
|
||||
|
||||
# Remove the local reference.
|
||||
@@ -548,7 +560,19 @@ def test_basic_nested_ids(one_worker_100MiB):
|
||||
|
||||
# Test that an object containing object IDs within it pins the inner IDs
|
||||
# recursively and for submitted tasks.
|
||||
def test_recursively_nest_ids(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_recursively_nest_ids(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote(num_cpus=0)
|
||||
class Signal:
|
||||
def __init__(self):
|
||||
self.ready_event = asyncio.Event()
|
||||
|
||||
def send(self):
|
||||
self.ready_event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self.ready_event.wait()
|
||||
|
||||
@ray.remote
|
||||
def recursive(ref, signal, max_depth, depth=0):
|
||||
unwrapped = ray.get(ref[0])
|
||||
@@ -557,14 +581,11 @@ def test_recursively_nest_ids(one_worker_100MiB):
|
||||
else:
|
||||
return recursive.remote(unwrapped, signal, max_depth, depth + 1)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
signal = SignalActor.remote()
|
||||
|
||||
max_depth = 5
|
||||
array_oid = put.remote()
|
||||
array_oid = put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
nested_oid = array_oid
|
||||
for _ in range(max_depth):
|
||||
nested_oid = ray.put([nested_oid])
|
||||
@@ -591,14 +612,14 @@ def test_recursively_nest_ids(one_worker_100MiB):
|
||||
|
||||
# Test that serialized objectIDs returned from remote tasks are pinned until
|
||||
# they go out of scope on the caller side.
|
||||
def test_return_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_return_object_id(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
return [
|
||||
put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
]
|
||||
|
||||
outer_oid = return_an_id.remote()
|
||||
inner_oid_binary = ray.get(outer_oid)[0].binary()
|
||||
@@ -619,14 +640,29 @@ def test_return_object_id(one_worker_100MiB):
|
||||
|
||||
# Test that serialized objectIDs returned from remote tasks are pinned if
|
||||
# passed into another remote task by the caller.
|
||||
def test_pass_returned_object_id(one_worker_100MiB):
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_pass_returned_object_id(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote(num_cpus=0)
|
||||
class Signal:
|
||||
def __init__(self):
|
||||
self.ready_event = asyncio.Event()
|
||||
|
||||
def send(self):
|
||||
self.ready_event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self.ready_event.wait()
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
return
|
||||
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
return [
|
||||
put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
]
|
||||
|
||||
@ray.remote
|
||||
def pending(ref, signal):
|
||||
@@ -654,14 +690,25 @@ def test_pass_returned_object_id(one_worker_100MiB):
|
||||
# returned by another task to the end of the chain. The reference should still
|
||||
# exist while the final task in the chain is running and should be removed once
|
||||
# it finishes.
|
||||
def test_recursively_pass_returned_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote(num_cpus=0)
|
||||
class Signal:
|
||||
def __init__(self):
|
||||
self.ready_event = asyncio.Event()
|
||||
|
||||
def send(self):
|
||||
self.ready_event.set()
|
||||
|
||||
async def wait(self):
|
||||
await self.ready_event.wait()
|
||||
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
return [
|
||||
put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
]
|
||||
|
||||
@ray.remote
|
||||
def recursive(ref, signal, max_depth, depth=0):
|
||||
@@ -700,15 +747,13 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB):
|
||||
# returns the same ObjectID by calling ray.get() on its submitted task and
|
||||
# returning the result. The reference should still exist while the driver has a
|
||||
# reference to the final task's ObjectID.
|
||||
def test_recursively_return_borrowed_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@pytest.mark.parametrize("use_ray_put", [False, True])
|
||||
def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put):
|
||||
@ray.remote
|
||||
def recursive(num_tasks_left):
|
||||
if num_tasks_left == 0:
|
||||
return put.remote()
|
||||
return put_object(
|
||||
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
|
||||
|
||||
final_id = ray.get(recursive.remote(num_tasks_left - 1))
|
||||
ray.get(final_id)
|
||||
|
||||
@@ -270,8 +270,15 @@ class Worker:
|
||||
"call 'put' on it (or return it).")
|
||||
|
||||
serialized_value = self.get_serialization_context().serialize(value)
|
||||
return self.core_worker.put_serialized_object(
|
||||
serialized_value, object_id=object_id, pin_object=pin_object)
|
||||
# This *must* be the first place that we construct this python
|
||||
# ObjectID because an entry with 0 local references is created when
|
||||
# the object is Put() in the core worker, expecting that this python
|
||||
# reference will be created. If another reference is created and
|
||||
# removed before this one, it will corrupt the state in the
|
||||
# reference counter.
|
||||
return ray.ObjectID(
|
||||
self.core_worker.put_serialized_object(
|
||||
serialized_value, object_id=object_id, pin_object=pin_object))
|
||||
|
||||
def deserialize_objects(self, data_metadata_pairs, object_ids):
|
||||
context = self.get_serialization_context()
|
||||
|
||||
@@ -390,7 +390,7 @@ Status CoreWorker::Put(const RayObject &object,
|
||||
ObjectID *object_id) {
|
||||
*object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(),
|
||||
worker_context_.GetNextPutIndex(),
|
||||
static_cast<uint8_t>(TaskTransportType::RAYLET));
|
||||
static_cast<uint8_t>(TaskTransportType::DIRECT));
|
||||
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
|
||||
rpc_address_);
|
||||
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
|
||||
@@ -419,7 +419,7 @@ Status CoreWorker::Put(const RayObject &object,
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
|
||||
}
|
||||
|
||||
Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
|
||||
@@ -427,7 +427,7 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t
|
||||
ObjectID *object_id, std::shared_ptr<Buffer> *data) {
|
||||
*object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(),
|
||||
worker_context_.GetNextPutIndex(),
|
||||
static_cast<uint8_t>(TaskTransportType::RAYLET));
|
||||
static_cast<uint8_t>(TaskTransportType::DIRECT));
|
||||
RAY_RETURN_NOT_OK(
|
||||
plasma_store_provider_->Create(metadata, data_size, *object_id, data));
|
||||
// Only add the object to the reference counter if it didn't already exist.
|
||||
@@ -462,7 +462,7 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object,
|
||||
} else {
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
|
||||
}
|
||||
return Status::OK();
|
||||
return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
|
||||
}
|
||||
|
||||
Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
|
||||
@@ -538,13 +538,9 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
|
||||
|
||||
Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) {
|
||||
bool found = false;
|
||||
if (object_id.IsDirectCallType()) {
|
||||
bool in_plasma = false;
|
||||
found = memory_store_->Contains(object_id, &in_plasma);
|
||||
if (in_plasma) {
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found));
|
||||
}
|
||||
} else {
|
||||
bool in_plasma = false;
|
||||
found = memory_store_->Contains(object_id, &in_plasma);
|
||||
if (in_plasma) {
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found));
|
||||
}
|
||||
*has_object = found;
|
||||
@@ -667,14 +663,9 @@ Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_on
|
||||
|
||||
// We only delete from plasma, which avoids hangs (issue #7105). In-memory
|
||||
// objects are always handled by ref counting only.
|
||||
absl::flat_hash_set<ObjectID> plasma_object_ids;
|
||||
for (const auto &obj_id : object_ids) {
|
||||
plasma_object_ids.insert(obj_id);
|
||||
}
|
||||
RAY_RETURN_NOT_OK(plasma_store_provider_->Delete(plasma_object_ids, local_only,
|
||||
delete_creating_tasks));
|
||||
|
||||
return Status::OK();
|
||||
absl::flat_hash_set<ObjectID> plasma_object_ids(object_ids.begin(), object_ids.end());
|
||||
return plasma_store_provider_->Delete(plasma_object_ids, local_only,
|
||||
delete_creating_tasks);
|
||||
}
|
||||
|
||||
void CoreWorker::TriggerGlobalGC() {
|
||||
@@ -1407,7 +1398,10 @@ void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &re
|
||||
rpc::PlasmaObjectReadyReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
RAY_CHECK(plasma_done_callback_ != nullptr) << "Plasma done callback not defined.";
|
||||
// This callback must be asynchronous to allow plasma to receive objects
|
||||
// This callback needs to be asynchronous because it runs on the io_service_, so no
|
||||
// RPCs can be processed while it's running. This can easily lead to deadlock (for
|
||||
// example if the callback calls ray.get() on an object that is dependent on an RPC
|
||||
// to be ready).
|
||||
plasma_done_callback_(ObjectID::FromBinary(request.object_id()), request.data_size(),
|
||||
request.metadata_size());
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
|
||||
@@ -129,4 +129,4 @@ class FiberState {
|
||||
|
||||
} // namespace ray
|
||||
|
||||
#endif // RAY_CORE_WORKER_FIBER_H
|
||||
#endif // RAY_CORE_WORKER_FIBER_H
|
||||
|
||||
@@ -158,11 +158,13 @@ std::shared_ptr<RayObject> CoreWorkerMemoryStore::GetOrPromoteToPlasma(
|
||||
}
|
||||
|
||||
Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) {
|
||||
RAY_CHECK(object_id.IsDirectCallType());
|
||||
std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks;
|
||||
auto object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(),
|
||||
object.GetNestedIds(), true);
|
||||
|
||||
// TODO(edoakes): we should instead return a flag to the caller to put the object in
|
||||
// plasma.
|
||||
bool should_put_in_plasma = false;
|
||||
{
|
||||
absl::MutexLock lock(&mu_);
|
||||
|
||||
@@ -181,11 +183,9 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec
|
||||
auto promoted_it = promoted_to_plasma_.find(object_id);
|
||||
if (promoted_it != promoted_to_plasma_.end()) {
|
||||
RAY_CHECK(store_in_plasma_ != nullptr);
|
||||
if (!object.IsInPlasmaError()) {
|
||||
// Only need to promote to plasma if it wasn't already put into plasma
|
||||
// by the task that created the object.
|
||||
store_in_plasma_(object, object_id);
|
||||
}
|
||||
// Only need to promote to plasma if it wasn't already put into plasma
|
||||
// by the task that created the object.
|
||||
should_put_in_plasma = !object.IsInPlasmaError();
|
||||
promoted_to_plasma_.erase(promoted_it);
|
||||
}
|
||||
|
||||
@@ -212,6 +212,13 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec
|
||||
}
|
||||
}
|
||||
|
||||
// Must be called without holding the lock because store_in_plasma_ goes
|
||||
// through the regular CoreWorker::Put() codepath, which calls into the
|
||||
// in-memory store (would cause deadlock).
|
||||
if (should_put_in_plasma) {
|
||||
store_in_plasma_(object, object_id);
|
||||
}
|
||||
|
||||
// It's important for performance to run the callbacks outside the lock.
|
||||
for (const auto &cb : async_callbacks) {
|
||||
cb(object_entry);
|
||||
@@ -250,7 +257,6 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
|
||||
if (iter != objects_.end()) {
|
||||
(*results)[i] = iter->second;
|
||||
if (remove_after_get) {
|
||||
RAY_LOG(ERROR) << "REMOVE_AFTER_GET";
|
||||
// Note that we cannot remove the object_id from `objects_` now,
|
||||
// because `object_ids` might have duplicate ids.
|
||||
ids_to_remove.insert(object_id);
|
||||
@@ -437,7 +443,6 @@ bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma)
|
||||
if (it != objects_.end()) {
|
||||
if (it->second->IsInPlasmaError()) {
|
||||
*in_plasma = true;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ class CoreWorkerMemoryStore {
|
||||
///
|
||||
/// \param[in] object_id The object to check.
|
||||
/// \param[out] in_plasma Set to true if the object was spilled to plasma.
|
||||
/// If this is set to true, Contains() will return false.
|
||||
/// Will only be true if the store contains the object.
|
||||
/// \return Whether the store has the object.
|
||||
bool Contains(const ObjectID &object_id, bool *in_plasma);
|
||||
|
||||
|
||||
@@ -951,76 +951,6 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
|
||||
ASSERT_TRUE(!results[1]);
|
||||
}
|
||||
|
||||
TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) {
|
||||
CoreWorker worker1(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
|
||||
raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1",
|
||||
node_manager_port, nullptr);
|
||||
|
||||
CoreWorker worker2(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[1],
|
||||
raylet_socket_names_[1], NextJobId(), gcs_options_, "", "127.0.0.1",
|
||||
node_manager_port, nullptr);
|
||||
|
||||
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
|
||||
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
|
||||
|
||||
std::vector<std::shared_ptr<LocalMemoryBuffer>> buffers;
|
||||
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1)));
|
||||
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2)));
|
||||
|
||||
std::vector<ObjectID> ids(buffers.size());
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
RAY_CHECK_OK(worker1.Put(RayObject(buffers[i], nullptr, std::vector<ObjectID>()), {},
|
||||
&ids[i]));
|
||||
}
|
||||
|
||||
// Test Get() from remote node.
|
||||
std::vector<std::shared_ptr<RayObject>> results;
|
||||
RAY_CHECK_OK(worker2.Get(ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
ASSERT_EQ(results[i]->GetData()->Size(), buffers[i]->Size());
|
||||
ASSERT_EQ(*(results[i]->GetData()), *buffers[i]);
|
||||
}
|
||||
|
||||
// Test Wait() from remote node.
|
||||
ObjectID non_existent_id = ObjectID::FromRandom();
|
||||
std::vector<ObjectID> all_ids(ids);
|
||||
all_ids.push_back(non_existent_id);
|
||||
|
||||
std::vector<bool> wait_results;
|
||||
RAY_CHECK_OK(worker2.Wait(all_ids, 2, -1, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
RAY_CHECK_OK(worker2.Wait(all_ids, 3, 100, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
// Test Delete() from all machines.
|
||||
// clear the reference held by PlasmaBuffer.
|
||||
results.clear();
|
||||
RAY_CHECK_OK(worker2.Delete(ids, false, false));
|
||||
|
||||
// Note that Delete() calls RayletClient::FreeObjects and would not
|
||||
// wait for objects being deleted, so wait a while for plasma store
|
||||
// to process the command.
|
||||
usleep(1000 * 1000);
|
||||
// Verify objects are deleted from both machines.
|
||||
ASSERT_TRUE(worker2.Get(ids, 0, &results).IsTimedOut());
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
ASSERT_TRUE(!results[0]);
|
||||
ASSERT_TRUE(!results[1]);
|
||||
|
||||
// TODO(edoakes): this currently fails because the object is pinned on the
|
||||
// creating node. Should be fixed or removed once we decide the semantics
|
||||
// for Delete() with pinning.
|
||||
// ASSERT_TRUE(worker1.Get(ids, 0, &results).IsTimedOut());
|
||||
// ASSERT_EQ(results.size(), 2);
|
||||
// ASSERT_TRUE(!results[0]);
|
||||
// ASSERT_TRUE(!results[1]);
|
||||
}
|
||||
|
||||
TEST_F(SingleNodeTest, TestNormalTaskLocal) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
TestNormalTask(resources);
|
||||
|
||||
@@ -24,7 +24,7 @@ TEST(SchedulingQueueTest, TestInOrder) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
@@ -45,7 +45,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
@@ -71,7 +71,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
@@ -89,7 +89,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
@@ -107,7 +107,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
@@ -130,7 +130,7 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
|
||||
boost::asio::io_service io_service;
|
||||
MockWaiter waiter;
|
||||
WorkerContext context(WorkerType::WORKER, JobID::Nil());
|
||||
SchedulingQueue queue(io_service, waiter, context, 0);
|
||||
SchedulingQueue queue(io_service, waiter, context);
|
||||
int n_ok = 0;
|
||||
int n_rej = 0;
|
||||
auto fn_ok = [&n_ok]() { n_ok++; };
|
||||
|
||||
@@ -183,7 +183,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use";
|
||||
const TaskSpecification task_spec(request.task_spec());
|
||||
RAY_LOG(DEBUG) << "Received task " << task_spec.DebugString();
|
||||
if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) {
|
||||
send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."),
|
||||
nullptr, nullptr);
|
||||
|
||||
@@ -178,14 +178,6 @@ raylet::RayletClient::RayletClient(
|
||||
}
|
||||
|
||||
Status raylet::RayletClient::SubmitTask(const TaskSpecification &task_spec) {
|
||||
for (size_t i = 0; i < task_spec.NumArgs(); i++) {
|
||||
if (task_spec.ArgByRef(i)) {
|
||||
for (size_t j = 0; j < task_spec.ArgIdCount(i); j++) {
|
||||
RAY_CHECK(!task_spec.ArgId(i, j).IsDirectCallType())
|
||||
<< "Passing direct call objects to non-direct tasks is not allowed.";
|
||||
}
|
||||
}
|
||||
}
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message =
|
||||
protocol::CreateSubmitTaskRequest(fbb, fbb.CreateString(task_spec.Serialize()));
|
||||
|
||||
Reference in New Issue
Block a user