diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 51446b44a..4d9d9e7ef 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -65,7 +65,6 @@ except ImportError as e: from ray._raylet import ( ActorCheckpointID, ActorClassID, - ActorHandleID, ActorID, ClientID, Config as _Config, @@ -154,7 +153,6 @@ __all__ = [ __all__ += [ "ActorCheckpointID", "ActorClassID", - "ActorHandleID", "ActorID", "ClientID", "JobID", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7c5f75b46..a149bd550 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -23,6 +23,7 @@ from libcpp.vector cimport vector as c_vector from cython.operator import dereference, postincrement from ray.includes.common cimport ( + CActorHandle, CLanguage, CRayObject, CRayStatus, @@ -432,6 +433,13 @@ cdef class CoreWorker: with nogil: self.core_worker.get().SetCurrentTaskId(c_task_id) + def set_actor_id(self, ActorID actor_id): + cdef: + CActorID c_actor_id = actor_id.native() + + with nogil: + self.core_worker.get().SetActorId(c_actor_id) + def get_current_task_id(self): return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) @@ -622,6 +630,7 @@ cdef class CoreWorker: CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids + CTaskID caller_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -629,9 +638,11 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) + caller_id = self.core_worker.get().GetCallerId() with nogil: check_status(self.core_worker.get().Tasks().SubmitTask( + caller_id, ray_function, args_vector, task_options, &return_ids)) return VectorToObjectIDs(return_ids) @@ -643,12 +654,13 @@ cdef class CoreWorker: resources, placement_resources): cdef: - ActorHandle actor_handle = ActorHandle.__new__(ActorHandle) + unique_ptr[CActorHandle] actor_handle CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[c_string] dynamic_worker_options unordered_map[c_string, double] c_resources unordered_map[c_string, double] c_placement_resources + CTaskID caller_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -656,30 +668,38 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) + caller_id = self.core_worker.get().GetCallerId() with nogil: check_status(self.core_worker.get().Tasks().CreateActor( + caller_id, ray_function, args_vector, CActorCreationOptions( max_reconstructions, False, c_resources, c_placement_resources, dynamic_worker_options), - &actor_handle.inner)) + &actor_handle)) - return actor_handle + actor_id = ActorID(actor_handle.get().GetActorID().Binary()) + inserted = self.core_worker.get().AddActorHandle( + move(actor_handle)) + assert inserted, "Actor {} already exists".format(actor_id) + return actor_id def submit_actor_task(self, - ActorHandle handle, + ActorID actor_id, function_descriptor, args, int num_return_vals, resources): cdef: + CActorID c_actor_id = actor_id.native() unordered_map[c_string, double] c_resources CTaskOptions task_options CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids + CTaskID caller_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -687,10 +707,13 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) + caller_id = self.core_worker.get().GetCallerId() with nogil: check_status(self.core_worker.get().Tasks().SubmitActorTask( - handle.inner.get()[0], ray_function, + caller_id, + self.core_worker.get().GetActorHandle(c_actor_id), + ray_function, args_vector, task_options, &return_ids)) return VectorToObjectIDs(return_ids) @@ -702,3 +725,18 @@ cdef class CoreWorker: return ProfileEvent.make( self.core_worker.get().CreateProfileEvent(c_event_type), extra_data) + + def deserialize_actor_handle(self, c_string bytes): + cdef: + unique_ptr[CActorHandle] actor_handle + actor_handle.reset(new CActorHandle(bytes)) + actor_id = ActorID(actor_handle.get().GetActorID().Binary()) + self.core_worker.get().AddActorHandle(move(actor_handle)) + return actor_id + + def serialize_actor_handle(self, ActorID actor_id): + cdef: + CActorID c_actor_id = actor_id.native() + c_string output + self.core_worker.get().GetActorHandle(c_actor_id).Serialize(&output) + return output diff --git a/python/ray/actor.py b/python/ray/actor.py index 39775ad09..7da3967c3 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -16,7 +16,7 @@ import ray.ray_constants as ray_constants import ray._raylet import ray.signature as signature import ray.worker -from ray import ActorID, ActorHandleID, ActorClassID, profiling +from ray import ActorID, ActorClassID, profiling logger = logging.getLogger(__name__) @@ -372,9 +372,6 @@ class ActorClass(object): actor_id = ActorID.from_random() worker.actors[actor_id] = meta.modified_class( *copy.deepcopy(args), **copy.deepcopy(kwargs)) - core_handle = ray._raylet.ActorHandle( - actor_id, ActorHandleID.nil(), worker.current_job_id, - function_descriptor.get_function_descriptor_list()) else: # Export the actor. if (meta.last_export_session_and_job != @@ -404,13 +401,13 @@ class ActorClass(object): function_signature = meta.method_signatures[function_name] creation_args = signature.extend_args(function_signature, args, kwargs) - core_handle = worker.core_worker.create_actor( + actor_id = worker.core_worker.create_actor( function_descriptor.get_function_descriptor_list(), creation_args, meta.max_reconstructions, resources, actor_placement_resources) actor_handle = ActorHandle( - core_handle, + actor_id, meta.modified_class.__module__, meta.class_name, meta.actor_method_names, @@ -436,7 +433,7 @@ class ActorHandle(object): cloudpickle). Attributes: - _ray_core_handle: Core worker actor handle for this actor. + _ray_actor_id: Actor ID. _ray_module_name: The module name of this actor. _ray_actor_method_names: The names of the actor methods. _ray_method_decorators: Optional decorators for the function @@ -454,7 +451,7 @@ class ActorHandle(object): """ def __init__(self, - core_handle, + actor_id, module_name, class_name, actor_method_names, @@ -464,7 +461,7 @@ class ActorHandle(object): actor_method_cpus, session_and_job, original_handle=False): - self._ray_core_handle = core_handle + self._ray_actor_id = actor_id self._ray_module_name = module_name self._ray_original_handle = original_handle self._ray_actor_method_names = actor_method_names @@ -518,7 +515,7 @@ class ActorHandle(object): function, function_descriptor, args, num_return_vals) else: object_ids = worker.core_worker.submit_actor_task( - self._ray_core_handle, + self._ray_actor_id, function_descriptor.get_function_descriptor_list(), args, num_return_vals, {"CPU": self._ray_actor_method_cpus}) @@ -579,8 +576,8 @@ class ActorHandle(object): # and we don't need to send `__ray_terminate__` again. logger.warning( "Actor is garbage collected in the wrong driver." + - " Actor id = %s, class name = %s.", - self._ray_core_handle.actor_id(), self._ray_class_name) + " Actor id = %s, class name = %s.", self._ray_actor_id, + self._ray_class_name) return if worker.connected and self._ray_original_handle: # TODO(rkn): Should we be passing in the actor cursor as a @@ -589,11 +586,7 @@ class ActorHandle(object): @property def _actor_id(self): - return self._ray_core_handle.actor_id() - - @property - def _actor_handle_id(self): - return self._ray_core_handle.actor_handle_id() + return self._ray_actor_id def _serialization_helper(self, ray_forking): """This is defined in order to make pickling work. @@ -605,8 +598,13 @@ class ActorHandle(object): Returns: A dictionary of the information needed to reconstruct the object. """ + worker = ray.worker.get_global_worker() + worker.check_connected() state = { - "core_handle": self._ray_core_handle.fork(ray_forking).to_bytes(), + # Local mode just uses the actor ID. + "core_handle": worker.core_worker.serialize_actor_handle( + self._ray_actor_id) + if hasattr(worker, "core_worker") else self._ray_actor_id, "module_name": self._ray_module_name, "class_name": self._ray_class_name, "actor_method_names": self._ray_actor_method_names, @@ -632,8 +630,9 @@ class ActorHandle(object): self.__init__( # TODO(swang): Accessing the worker's current task ID is not # thread-safe. - ray._raylet.ActorHandle.from_bytes(state["core_handle"], - worker.current_task_id), + # Local mode just uses the actor ID. + worker.core_worker.deserialize_actor_handle(state["core_handle"]) + if hasattr(worker, "core_worker") else state["core_handle"], state["module_name"], state["class_name"], state["actor_method_names"], diff --git a/python/ray/experimental/serve/metric.py b/python/ray/experimental/serve/metric.py index 756b1ecc1..b153261c2 100644 --- a/python/ray/experimental/serve/metric.py +++ b/python/ray/experimental/serve/metric.py @@ -27,11 +27,11 @@ class MetricMonitor: return True def add_target(self, target_handle): - hex_id = target_handle._ray_core_handle.actor_id().hex() + hex_id = target_handle._actor_id.hex() self.actor_handles[hex_id] = target_handle def remove_target(self, target_handle): - hex_id = target_handle._ray_core_handle.actor_id().hex() + hex_id = target_handle._actor_id.hex() self.actor_handles.pop(hex_id) def scrape(self): diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 04ae17942..d259158a5 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -8,7 +8,6 @@ from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( CActorID, - CActorHandleID, CJobID, CWorkerID, CObjectID, @@ -199,19 +198,9 @@ cdef extern from "ray/core_worker/task_interface.h" nogil: const c_vector[c_string] &dynamic_worker_options) cdef cppclass CActorHandle "ray::ActorHandle": - CActorHandle( - const CActorID &actor_id, const CActorHandleID &actor_handle_id, - const CJobID &job_id, const CObjectID &initial_cursor, - const CLanguage actor_language, c_bool is_direct_call, - const c_vector[c_string] &actor_creation_task_function_descriptor) - CActorHandle(CActorHandle &other, c_bool in_band) - CActorHandle( - const c_string &serialized, const CTaskID ¤t_task_id) + CActorHandle(const c_string &serialized) CActorID GetActorID() const - CActorHandleID GetActorHandleID() const - unique_ptr[CActorHandle] Fork() - unique_ptr[CActorHandle] ForkForSerialization() void Serialize(c_string *output) cdef extern from "ray/gcs/gcs_client_interface.h" nogil: diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index 71cfdfd64..7313cd618 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -3,7 +3,6 @@ from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector from ray.includes.common cimport ( - CActorHandle, CGcsClientOptions, ) @@ -25,48 +24,3 @@ cdef class GcsClientOptions: cdef CGcsClientOptions* native(self): return (self.inner.get()) - -cdef class ActorHandle: - """Cython wrapper class of C++ `ray::ActorHandle`.""" - cdef: - unique_ptr[CActorHandle] inner - - def __init__(self, ActorID actor_id, ActorHandleID actor_handle_id, - JobID job_id, list creation_function_descriptor): - cdef: - c_vector[c_string] c_descriptor - ObjectID cursor = ObjectID.from_random() - - c_descriptor = string_vector_from_list(creation_function_descriptor) - self.inner.reset(new CActorHandle( - actor_id.native(), actor_handle_id.native(), job_id.native(), - cursor.native(), LANGUAGE_PYTHON, False, c_descriptor)) - - def fork(self, c_bool ray_forking): - cdef: - ActorHandle other = ActorHandle.__new__(ActorHandle) - if ray_forking: - other.inner = self.inner.get().Fork() - else: - other.inner = self.inner.get().ForkForSerialization() - return other - - @staticmethod - def from_bytes(c_string bytes, TaskID current_task_id): - cdef: - ActorHandle self = ActorHandle.__new__(ActorHandle) - self.inner.reset(new CActorHandle(bytes, current_task_id.native())) - return self - - def to_bytes(self): - cdef: - c_string output - - self.inner.get().Serialize(&output) - return output - - def actor_id(self): - return ActorID(self.inner.get().GetActorID().Binary()) - - def actor_handle_id(self): - return ActorHandleID(self.inner.get().GetActorHandleID().Binary()) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 0d7766db3..d8c1e2942 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -5,6 +5,7 @@ from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( + CActorID, CJobID, CTaskID, CObjectID, @@ -32,13 +33,16 @@ cdef extern from "ray/core_worker/profiling.h" nogil: cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil: cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface": CRayStatus SubmitTask( + const CTaskID &caller_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) CRayStatus CreateActor( + const CTaskID &caller_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CActorCreationOptions &options, unique_ptr[CActorHandle] *handle) CRayStatus SubmitActorTask( + const CTaskID &caller_id, CActorHandle &handle, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) @@ -87,3 +91,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void SetCurrentJobId(const CJobID &job_id) CTaskID GetCurrentTaskId() void SetCurrentTaskId(const CTaskID &task_id) + void SetActorId(const CActorID &actor_id) + const CActorID &GetActorId() + CTaskID GetCallerId() + c_bool AddActorHandle(unique_ptr[CActorHandle] handle) + CActorHandle &GetActorHandle(const CActorID &actor_id) diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index 4034d4c3e..bfd31fc93 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -10,7 +10,6 @@ from ray.includes.common cimport ( ResourceSet, ) from ray.includes.unique_ids cimport ( - CActorHandleID, CActorID, CJobID, CObjectID, @@ -71,10 +70,8 @@ cdef extern from "ray/common/task/task_spec.h" nogil: CObjectID PreviousActorTaskDummyObjectId() const uint64_t MaxActorReconstructions() const CActorID ActorId() const - CActorHandleID ActorHandleId() const uint64_t ActorCounter() const CObjectID ActorDummyObject() const - c_vector[CActorHandleID] NewActorHandles() const cdef extern from "ray/common/task/task_execution_spec.h" nogil: diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 5987325f3..29fd7932d 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -68,11 +68,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CActorID Of(CJobID job_id, CTaskID parent_task_id, int64_t parent_task_counter) - cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID): - - @staticmethod - CActorHandleID FromBinary(const c_string &binary) - cdef cppclass CClientID "ray::ClientID"(CUniqueID): @staticmethod diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 985030e09..bf6183978 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -11,7 +11,6 @@ import os from ray.includes.unique_ids cimport ( CActorCheckpointID, CActorClassID, - CActorHandleID, CActorID, CClientID, CConfigID, @@ -343,16 +342,6 @@ cdef class ActorID(BaseID): return self.data.Hash() -cdef class ActorHandleID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CActorHandleID.FromBinary(id) - - cdef CActorHandleID native(self): - return self.data - - cdef class ActorCheckpointID(UniqueID): def __init__(self, id): @@ -385,7 +374,6 @@ cdef class ActorClassID(UniqueID): _ID_TYPES = [ ActorCheckpointID, ActorClassID, - ActorHandleID, ActorID, ClientID, JobID, diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index f4aad579e..1e334e1b4 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1447,14 +1447,14 @@ def test_multithreading(ray_start_2_cpus): time.sleep(delay_ms / 1000.0) return value - @ray.remote - class Echo(object): - def echo(self, value): - return value - def test_api_in_multi_threads(): """Test using Ray api in multiple threads.""" + @ray.remote + class Echo(object): + def echo(self, value): + return value + # Test calling remote functions in multiple threads. def test_remote_call(): value = random.randint(0, 1000000) diff --git a/python/ray/worker.py b/python/ray/worker.py index b4f2938ac..e3e94ac05 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -867,7 +867,9 @@ class Worker(object): # TODO(rkn): It would be preferable for actor creation tasks to share # more of the code path with regular task execution. if task.is_actor_creation_task(): + # TODO: Remove Worker.actor_id and just use CoreWorker.GetActorId. self.actor_id = task.actor_creation_id() + self.core_worker.set_actor_id(task.actor_creation_id()) self.actor_creation_task_id = task.task_id() actor_class = self.function_actor_manager.load_actor_class( job_id, function_descriptor) diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 2789f1838..2612c7008 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -346,39 +346,6 @@ ObjectID ObjectID::GenerateObjectId(const std::string &task_id_binary, return ret; } -const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id, - int64_t num_forks) { - // Compute hashes. - SHA256_CTX ctx; - sha256_init(&ctx); - sha256_update(&ctx, reinterpret_cast(actor_handle_id.Data()), - actor_handle_id.Size()); - sha256_update(&ctx, reinterpret_cast(&num_forks), sizeof(num_forks)); - - // Compute the final actor handle ID from the hash. - BYTE buff[DIGEST_SIZE]; - sha256_final(&ctx, buff); - RAY_CHECK(DIGEST_SIZE >= ActorHandleID::Size()); - return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size())); -} - -const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id, - const TaskID ¤t_task_id) { - // Compute hashes. - SHA256_CTX ctx; - sha256_init(&ctx); - sha256_update(&ctx, reinterpret_cast(actor_handle_id.Data()), - actor_handle_id.Size()); - sha256_update(&ctx, reinterpret_cast(current_task_id.Data()), - current_task_id.Size()); - - // Compute the final actor handle ID from the hash. - BYTE buff[DIGEST_SIZE]; - sha256_final(&ctx, buff); - RAY_CHECK(DIGEST_SIZE >= ActorHandleID::Size()); - return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size())); -} - JobID JobID::FromInt(uint32_t value) { std::vector data(JobID::Size(), 0); std::memcpy(data.data(), &value, JobID::Size()); diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 8e55f23a7..0c611f405 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -369,23 +369,6 @@ std::ostream &operator<<(std::ostream &os, const ObjectID &id); // Restore the compiler alignment to default (8 bytes). #pragma pack(pop) -/// Compute an actor handle ID for a newly forked actor handle. -/// -/// \param actor_handle_id The actor handle ID of the existing actor handle. -/// \param num_forks The number of forks of the existing actor handle. -/// \return Generated actor handle ID. -const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id, - int64_t num_forks); - -/// Compute an actor handle ID for a new actor handle created by an -/// out-of-band serialization mechanism. -/// -/// \param actor_handle_id The actor handle ID of the existing actor handle. -/// \param current_task_id The current task ID. -/// \return Generated actor handle ID. -const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id, - const TaskID ¤t_task_id); - template BaseID::BaseID() { // Using const_cast to directly change data is dangerous. The cached diff --git a/src/ray/common/id_def.h b/src/ray/common/id_def.h index ed5c2d343..31fd1cb0b 100644 --- a/src/ray/common/id_def.h +++ b/src/ray/common/id_def.h @@ -6,7 +6,6 @@ DEFINE_UNIQUE_ID(FunctionID) DEFINE_UNIQUE_ID(ActorClassID) -DEFINE_UNIQUE_ID(ActorHandleID) DEFINE_UNIQUE_ID(ActorCheckpointID) DEFINE_UNIQUE_ID(WorkerID) DEFINE_UNIQUE_ID(ConfigID) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 41b4f3542..de53e6dee 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -151,6 +151,10 @@ std::vector TaskSpecification::DynamicWorkerOptions() const { message_->actor_creation_task_spec().dynamic_worker_options()); } +TaskID TaskSpecification::CallerId() const { + return TaskID::FromBinary(message_->caller_id()); +} + // === Below are getter methods specific to actor tasks. ActorID TaskSpecification::ActorId() const { @@ -158,11 +162,6 @@ ActorID TaskSpecification::ActorId() const { return ActorID::FromBinary(message_->actor_task_spec().actor_id()); } -ActorHandleID TaskSpecification::ActorHandleId() const { - RAY_CHECK(IsActorTask()); - return ActorHandleID::FromBinary(message_->actor_task_spec().actor_handle_id()); -} - uint64_t TaskSpecification::ActorCounter() const { RAY_CHECK(IsActorTask()); return message_->actor_task_spec().actor_counter(); @@ -185,12 +184,6 @@ ObjectID TaskSpecification::ActorDummyObject() const { return ReturnId(NumReturns() - 1); } -std::vector TaskSpecification::NewActorHandles() const { - RAY_CHECK(IsActorTask()); - return IdVectorFromProtobuf( - message_->actor_task_spec().new_actor_handles()); -} - bool TaskSpecification::IsDirectCall() const { RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().is_direct_call(); @@ -224,8 +217,8 @@ std::string TaskSpecification::DebugString() const { } else if (IsActorTask()) { // Print actor task spec. stream << ", actor_task_spec={actor_id=" << ActorId() - << ", actor_handle_id=" << ActorHandleId() - << ", actor_counter=" << ActorCounter() << "}"; + << ", actor_caller_id=" << CallerId() << ", actor_counter=" << ActorCounter() + << "}"; } return stream.str(); diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index e69241ec1..4f9cbe001 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -132,7 +132,7 @@ class TaskSpecification : public MessageWrapper { ActorID ActorId() const; - ActorHandleID ActorHandleId() const; + TaskID CallerId() const; uint64_t ActorCounter() const; @@ -140,8 +140,6 @@ class TaskSpecification : public MessageWrapper { ObjectID PreviousActorTaskDummyObjectId() const; - std::vector NewActorHandles() const; - bool IsDirectCall() const; ObjectID ActorDummyObject() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index c51c38de2..6ddd7275b 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -26,7 +26,8 @@ class TaskSpecBuilder { TaskSpecBuilder &SetCommonTaskSpec( const TaskID &task_id, const Language &language, const std::vector &function_descriptor, const JobID &job_id, - const TaskID &parent_task_id, uint64_t parent_counter, uint64_t num_returns, + const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id, + uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources) { message_->set_type(TaskType::NORMAL_TASK); @@ -38,6 +39,7 @@ class TaskSpecBuilder { message_->set_task_id(task_id.Binary()); message_->set_parent_task_id(parent_task_id.Binary()); message_->set_parent_counter(parent_counter); + message_->set_caller_id(caller_id.Binary()); message_->set_num_returns(num_returns); message_->mutable_required_resources()->insert(required_resources.begin(), required_resources.end()); @@ -107,23 +109,18 @@ class TaskSpecBuilder { /// See `common.proto` for meaning of the arguments. /// /// \return Reference to the builder object itself. - TaskSpecBuilder &SetActorTaskSpec( - const ActorID &actor_id, const ActorHandleID &actor_handle_id, - const ObjectID &actor_creation_dummy_object_id, - const ObjectID &previous_actor_task_dummy_object_id, uint64_t actor_counter, - const std::vector &new_handle_ids = {}) { + TaskSpecBuilder &SetActorTaskSpec(const ActorID &actor_id, + const ObjectID &actor_creation_dummy_object_id, + const ObjectID &previous_actor_task_dummy_object_id, + uint64_t actor_counter) { message_->set_type(TaskType::ACTOR_TASK); auto actor_spec = message_->mutable_actor_task_spec(); actor_spec->set_actor_id(actor_id.Binary()); - actor_spec->set_actor_handle_id(actor_handle_id.Binary()); actor_spec->set_actor_creation_dummy_object_id( actor_creation_dummy_object_id.Binary()); actor_spec->set_previous_actor_task_dummy_object_id( previous_actor_task_dummy_object_id.Binary()); actor_spec->set_actor_counter(actor_counter); - for (const auto &id : new_handle_ids) { - actor_spec->add_new_actor_handles(id.Binary()); - } return *this; } diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 576181546..8ac249fa6 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -2,84 +2,44 @@ #include "ray/core_worker/actor_handle.h" +namespace { + +ray::rpc::ActorHandle CreateInnerActorHandle( + const class ActorID &actor_id, const class JobID &job_id, + const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, + const std::vector &actor_creation_task_function_descriptor) { + ray::rpc::ActorHandle inner; + inner.set_actor_id(actor_id.Data(), actor_id.Size()); + inner.set_creation_job_id(job_id.Data(), job_id.Size()); + inner.set_actor_language(actor_language); + *inner.mutable_actor_creation_task_function_descriptor() = { + actor_creation_task_function_descriptor.begin(), + actor_creation_task_function_descriptor.end()}; + inner.set_actor_cursor(initial_cursor.Binary()); + inner.set_is_direct_call(is_direct_call); + return inner; +} + +ray::rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) { + ray::rpc::ActorHandle inner; + inner.ParseFromString(serialized); + return inner; +} + +} // namespace + namespace ray { ActorHandle::ActorHandle( - const class ActorID &actor_id, const class ActorHandleID &actor_handle_id, - const class JobID &job_id, const ObjectID &initial_cursor, - const Language actor_language, bool is_direct_call, - const std::vector &actor_creation_task_function_descriptor) { - inner_.set_actor_id(actor_id.Data(), actor_id.Size()); - inner_.set_actor_handle_id(actor_handle_id.Data(), actor_handle_id.Size()); - inner_.set_creation_job_id(job_id.Data(), job_id.Size()); - inner_.set_actor_language(actor_language); - *inner_.mutable_actor_creation_task_function_descriptor() = { - actor_creation_task_function_descriptor.begin(), - actor_creation_task_function_descriptor.end()}; - inner_.set_actor_cursor(initial_cursor.Binary()); - inner_.set_is_direct_call(is_direct_call); - // Increment the task counter to account for the actor creation task. - task_counter_++; -} + const class ActorID &actor_id, const class JobID &job_id, + const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, + const std::vector &actor_creation_task_function_descriptor) + : ActorHandle(CreateInnerActorHandle(actor_id, job_id, initial_cursor, actor_language, + is_direct_call, + actor_creation_task_function_descriptor)) {} -std::unique_ptr ActorHandle::Fork() { - std::unique_lock guard(mutex_); - std::unique_ptr child = - std::unique_ptr(new ActorHandle(inner_)); - child->inner_ = inner_; - const class ActorHandleID new_actor_handle_id = - ComputeForkedActorHandleId(GetActorHandleID(), num_forks_++); - // Notify the backend to expect this new actor handle. The backend will - // not release the cursor for any new handles until the first task for - // each of the new handles is submitted. - // NOTE(swang): There is currently no garbage collection for actor - // handles until the actor itself is removed. - new_actor_handles_.push_back(new_actor_handle_id); - guard.unlock(); - - child->inner_.set_actor_handle_id(new_actor_handle_id.Data(), - new_actor_handle_id.Size()); - return child; -} - -std::unique_ptr ActorHandle::ForkForSerialization() { - std::unique_lock guard(mutex_); - std::unique_ptr child = - std::unique_ptr(new ActorHandle(inner_)); - child->inner_ = inner_; - // The execution dependency for a serialized actor handle is never safe - // to release, since it could be deserialized and submit another - // dependent task at any time. Therefore, we notify the backend of a - // random handle ID that will never actually be used. - new_actor_handles_.push_back(ActorHandleID::FromRandom()); - guard.unlock(); - - // We set the actor handle ID to nil to signal that this actor handle was - // created by an out-of-band fork. A new actor handle ID will be computed - // when the handle is deserialized. - const class ActorHandleID new_actor_handle_id = ActorHandleID::Nil(); - child->inner_.set_actor_handle_id(new_actor_handle_id.Data(), - new_actor_handle_id.Size()); - return child; -} - -ActorHandle::ActorHandle(const std::string &serialized, const TaskID ¤t_task_id) { - inner_.ParseFromString(serialized); - // If the actor handle ID is nil, this serialized handle was created by an out-of-band - // mechanism (see fork constructor above), so we compute a new actor handle ID. - // TODO(pcm): This still leads to a lot of actor handles being - // created, there should be a better way to handle serialized - // actor handles. - // TODO(swang): Deserializing the same actor handle twice in the same - // task will break the application, and deserializing it twice in the - // same actor is likely a performance bug. We should consider - // logging a warning in these cases. - if (ActorHandleID::FromBinary(inner_.actor_handle_id()).IsNil()) { - const class ActorHandleID new_actor_handle_id = ComputeSerializedActorHandleId( - ActorHandleID::FromBinary(inner_.actor_handle_id()), current_task_id); - inner_.set_actor_handle_id(new_actor_handle_id.Data(), new_actor_handle_id.Size()); - } -} +ActorHandle::ActorHandle(const std::string &serialized) + : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, @@ -90,18 +50,18 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1, /*transport_type=*/static_cast(transport_type)); - builder.SetActorTaskSpec(GetActorID(), GetActorHandleID(), - actor_creation_dummy_object_id, - /*previous_actor_task_dummy_object_id=*/ActorCursor(), - task_counter_++, new_actor_handles_); - - inner_.set_actor_cursor(new_cursor.Binary()); - new_actor_handles_.clear(); + builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id, + /*previous_actor_task_dummy_object_id=*/actor_cursor_, + task_counter_++); + actor_cursor_ = new_cursor; } -void ActorHandle::Serialize(std::string *output) { +void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); } + +void ActorHandle::Reset() { std::unique_lock guard(mutex_); - inner_.SerializeToString(output); + task_counter_ = 0; + actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor()); } } // namespace ray diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index c54d21a57..a8c7ead48 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -13,33 +13,20 @@ namespace ray { class ActorHandle { public: - ActorHandle(ray::rpc::ActorHandle inner) : inner_(inner) {} + ActorHandle(ray::rpc::ActorHandle inner) + : inner_(inner), actor_cursor_(ObjectID::FromBinary(inner_.actor_cursor())) {} // Constructs a new ActorHandle as part of the actor creation process. - ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id, - const JobID &job_id, const ObjectID &initial_cursor, - const Language actor_language, bool is_direct_call, + ActorHandle(const ActorID &actor_id, const JobID &job_id, + const ObjectID &initial_cursor, const Language actor_language, + bool is_direct_call, const std::vector &actor_creation_task_function_descriptor); /// Constructs an ActorHandle from a serialized string. - ActorHandle(const std::string &serialized, const TaskID ¤t_task_id); - - /// Forks a child ActorHandle. This will modify the handle to account for the newly - /// forked child handle. This should only be used for forks that are part of a Ray - /// API call (e.g., passing an actor handle into a remote function). - std::unique_ptr Fork(); - - /// Forks a child ActorHandle. This will *not* modify the handle to account for the - /// newly forked child handle. This should be used by application-level code for - /// serialization in order to pass an actor handle for uses not covered by the Ray API. - std::unique_ptr ForkForSerialization(); + ActorHandle(const std::string &serialized); ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); }; - ActorHandleID GetActorHandleID() const { - return ActorHandleID::FromBinary(inner_.actor_handle_id()); - }; - /// ID of the job that created the actor (it is possible that the handle /// exists on a job with a different job ID). JobID CreationJobID() const { return JobID::FromBinary(inner_.creation_job_id()); }; @@ -50,8 +37,6 @@ class ActorHandle { return VectorFromProtobuf(inner_.actor_creation_task_function_descriptor()); }; - ObjectID ActorCursor() const { return ObjectID::FromBinary(inner_.actor_cursor()); } - bool IsDirectCallActor() const { return inner_.is_direct_call(); } void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, @@ -59,22 +44,27 @@ class ActorHandle { void Serialize(std::string *output); + /// Reset the handle state next task submitted. + /// + /// This should be called whenever the actor is restarted, since the new + /// instance of the actor does not have the previous sequence number. + /// TODO: We should also move the other actor state (status and IP) inside + /// ActorHandle and reset them in this method. + void Reset(); + private: // Protobuf-defined persistent state of the actor handle. - ray::rpc::ActorHandle inner_; - - // Number of times this handle has been forked. - uint64_t num_forks_ = 0; + const ray::rpc::ActorHandle inner_; + /// The unique id of the dummy object returned by the previous task. + /// TODO: This can be removed once we schedule actor tasks by task counter + /// only. + // TODO: Save this state in the core worker. + ObjectID actor_cursor_; // Number of tasks that have been submitted on this handle. uint64_t task_counter_ = 0; - /// The new actor handles that were created from this handle - /// since the last task on this handle was submitted. This is - /// used to garbage-collect dummy objects that are no longer - /// necessary in the backend. - std::vector new_actor_handles_; - + /// Guards actor_cursor_ and task_counter_. std::mutex mutex_; FRIEND_TEST(ZeroNodeTest, TestActorHandle); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index aeb1ed301..62cd6d646 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -49,7 +49,7 @@ CoreWorker::CoreWorker( std::unique_ptr(new CoreWorkerObjectInterface( worker_context_, raylet_client_, store_socket, use_memory_store)); task_interface_ = std::unique_ptr(new CoreWorkerTaskInterface( - worker_context_, raylet_client_, *object_interface_, io_service_, *gcs_client_)); + worker_context_, raylet_client_, *object_interface_, io_service_)); // Initialize task execution. int rpc_server_port = 0; @@ -90,7 +90,7 @@ CoreWorker::CoreWorker( builder.SetCommonTaskSpec(task_id, language_, empty_descriptor, worker_context_.GetCurrentJobID(), TaskID::ComputeDriverTaskId(worker_context_.GetWorkerID()), - 0, 0, empty_resources, empty_resources); + 0, GetCallerId(), 0, empty_resources, empty_resources); std::shared_ptr data = std::make_shared(); data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); @@ -127,4 +127,71 @@ std::unique_ptr CoreWorker::CreateProfileEvent( new worker::ProfileEvent(profiler_, event_type)); } +void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { + worker_context_.SetCurrentTaskId(task_id); + main_thread_task_id_ = task_id; + // Clear all actor handles at the end of each non-actor task. + if (actor_id_.IsNil() && task_id.IsNil()) { + for (const auto &handle : actor_handles_) { + RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); + } + actor_handles_.clear(); + } +} + +TaskID CoreWorker::GetCallerId() const { + TaskID caller_id; + ActorID actor_id = GetActorId(); + if (!actor_id.IsNil()) { + caller_id = TaskID::ForActorCreationTask(actor_id); + } else { + caller_id = main_thread_task_id_; + } + return caller_id; +} + +bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { + const auto &actor_id = actor_handle->GetActorID(); + auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; + if (inserted) { + // Register a callback to handle actor notifications. + auto actor_notification_callback = [this](const ActorID &actor_id, + const gcs::ActorTableData &actor_data) { + if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { + auto it = actor_handles_.find(actor_id); + RAY_CHECK(it != actor_handles_.end()); + if (it->second->IsDirectCallActor()) { + // We have to reset the actor handle since the next instance of the + // actor will not have the last sequence number that we sent. + // TODO: Remove the check for direct calls. We do not reset for the + // raylet codepath because it tries to replay all tasks since the + // last actor checkpoint. + it->second->Reset(); + } + } else if (actor_data.state() == gcs::ActorTableData::DEAD) { + RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr)); + // We cannot erase the actor handle here because clients can still + // submit tasks to dead actors. + } + + task_interface_->HandleDirectActorUpdate(actor_id, actor_data); + + RAY_LOG(INFO) << "received notification on actor, state=" + << static_cast(actor_data.state()) << ", actor_id: " << actor_id + << ", ip address: " << actor_data.ip_address() + << ", port: " << actor_data.port(); + }; + + RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( + actor_id, actor_notification_callback, nullptr)); + } + return inserted; +} + +ActorHandle &CoreWorker::GetActorHandle(const ActorID &actor_id) { + auto it = actor_handles_.find(actor_id); + RAY_CHECK(it != actor_handles_.end()); + return *it->second; +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 6ee3fcc8c..5c98af38f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -82,10 +82,42 @@ class CoreWorker { void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); } // TODO(edoakes): remove this once Python core worker uses the task interfaces. - void SetCurrentTaskId(const TaskID &task_id) { - worker_context_.SetCurrentTaskId(task_id); + void SetCurrentTaskId(const TaskID &task_id); + + void SetActorId(const ActorID &actor_id) { + RAY_CHECK(actor_id_.IsNil()); + actor_id_ = actor_id; } + const ActorID &GetActorId() const { return actor_id_; } + + /// Get the caller ID used to submit tasks from this worker to an actor. + /// + /// \return The caller ID. For non-actor tasks, this is the current task ID. + /// For actors, this is the current actor ID. To make sure that all caller + /// IDs have the same type, we embed the actor ID in a TaskID with the rest + /// of the bytes zeroed out. + TaskID GetCallerId() const; + + /// Give this worker a handle to an actor. + /// + /// This handle will remain as long as the current actor or task is + /// executing, even if the Python handle goes out of scope. Tasks submitted + /// through this handle are guaranteed to execute in the same order in which + /// they are submitted. + /// + /// \param actor_handle The handle to the actor. + /// \return True if the handle was added and False if we already had a handle + /// to the same actor. + bool AddActorHandle(std::unique_ptr actor_handle); + + /// Get a handle to an actor. This asserts that the worker actually has this + /// handle. + /// + /// \param actor_id The actor handle to get. + /// \return A handle to the requested actor. + ActorHandle &GetActorHandle(const ActorID &actor_id); + private: void StartIOService(); @@ -94,6 +126,12 @@ class CoreWorker { const std::string raylet_socket_; const std::string log_dir_; WorkerContext worker_context_; + /// The ID of the current task being executed by the main thread. If there + /// are multiple threads, they will have a thread-local task ID stored in the + /// worker context. + TaskID main_thread_task_id_; + /// Our actor ID. If this is nil, then we execute only stateless tasks. + ActorID actor_id_; /// Event loop where the IO events are handled. e.g. async GCS operations. boost::asio::io_service io_service_; @@ -107,6 +145,9 @@ class CoreWorker { std::unique_ptr task_interface_; std::unique_ptr object_interface_; + /// Map from actor ID to a handle to that actor. + std::unordered_map> actor_handles_; + /// Only available if it's not a driver. std::unique_ptr task_execution_interface_; }; diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index f6fb92a5c..7923afb29 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -8,32 +8,30 @@ namespace ray { CoreWorkerTaskInterface::CoreWorkerTaskInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, - gcs::RedisGcsClient &gcs_client) + CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service) : worker_context_(worker_context) { task_submitters_.emplace(TaskTransportType::RAYLET, std::unique_ptr( new CoreWorkerRayletTaskSubmitter(raylet_client))); - task_submitters_.emplace( - TaskTransportType::DIRECT_ACTOR, - std::unique_ptr( - new CoreWorkerDirectActorTaskSubmitter( - io_service, gcs_client, - object_interface.CreateStoreProvider(StoreProviderType::MEMORY)))); + task_submitters_.emplace(TaskTransportType::DIRECT_ACTOR, + std::unique_ptr( + new CoreWorkerDirectActorTaskSubmitter( + io_service, object_interface.CreateStoreProvider( + StoreProviderType::MEMORY)))); } void CoreWorkerTaskInterface::BuildCommonTaskSpec( TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const int task_index, const RayFunction &function, const std::vector &args, - uint64_t num_returns, + const int task_index, const TaskID &caller_id, const RayFunction &function, + const std::vector &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, TaskTransportType transport_type, std::vector *return_ids) { // Build common task spec. - builder.SetCommonTaskSpec(task_id, function.GetLanguage(), - function.GetFunctionDescriptor(), job_id, - worker_context_.GetCurrentTaskID(), task_index, num_returns, - required_resources, required_placement_resources); + builder.SetCommonTaskSpec( + task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, + worker_context_.GetCurrentTaskID(), task_index, caller_id, num_returns, + required_resources, required_placement_resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -52,7 +50,8 @@ void CoreWorkerTaskInterface::BuildCommonTaskSpec( } } -Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, +Status CoreWorkerTaskInterface::SubmitTask(const TaskID &caller_id, + const RayFunction &function, const std::vector &args, const TaskOptions &task_options, std::vector *return_ids) { @@ -62,14 +61,15 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index); BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, - next_task_index, function, args, task_options.num_returns, - task_options.resources, {}, TaskTransportType::RAYLET, return_ids); + next_task_index, caller_id, function, args, + task_options.num_returns, task_options.resources, {}, + TaskTransportType::RAYLET, return_ids); return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); } Status CoreWorkerTaskInterface::CreateActor( - const RayFunction &function, const std::vector &args, - const ActorCreationOptions &actor_creation_options, + const TaskID &caller_id, const RayFunction &function, + const std::vector &args, const ActorCreationOptions &actor_creation_options, std::unique_ptr *actor_handle) { const int next_task_index = worker_context_.GetNextTaskIndex(); const ActorID actor_id = @@ -79,8 +79,8 @@ Status CoreWorkerTaskInterface::CreateActor( const JobID job_id = worker_context_.GetCurrentJobID(); std::vector return_ids; TaskSpecBuilder builder; - BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, function, - args, 1, actor_creation_options.resources, + BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, caller_id, + function, args, 1, actor_creation_options.resources, actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids); builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, @@ -88,14 +88,14 @@ Status CoreWorkerTaskInterface::CreateActor( actor_creation_options.is_direct_call); *actor_handle = std::unique_ptr(new ActorHandle( - actor_id, ActorHandleID::Nil(), job_id, /*actor_cursor=*/return_ids[0], - function.GetLanguage(), actor_creation_options.is_direct_call, - function.GetFunctionDescriptor())); + actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), + actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); } -Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, +Status CoreWorkerTaskInterface::SubmitActorTask(const TaskID &caller_id, + ActorHandle &actor_handle, const RayFunction &function, const std::vector &args, const TaskOptions &task_options, @@ -114,7 +114,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index, actor_handle.GetActorID()); BuildCommonTaskSpec(builder, actor_handle.CreationJobID(), actor_task_id, - next_task_index, function, args, num_returns, + next_task_index, caller_id, function, args, num_returns, task_options.resources, {}, transport_type, return_ids); const ObjectID new_cursor = return_ids->back(); @@ -128,4 +128,12 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, return status; } +void CoreWorkerTaskInterface::HandleDirectActorUpdate( + const ActorID &actor_id, const gcs::ActorTableData &actor_data) { + auto &submitter = task_submitters_[TaskTransportType::DIRECT_ACTOR]; + auto &direct_actor_submitter = + reinterpret_cast &>(submitter); + direct_actor_submitter->HandleActorUpdate(actor_id, actor_data); +} + } // namespace ray diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index ea4201848..e9c08f9bc 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -65,43 +65,54 @@ class CoreWorkerTaskInterface { CoreWorkerTaskInterface(WorkerContext &worker_context, std::unique_ptr &raylet_client, CoreWorkerObjectInterface &object_interface, - boost::asio::io_service &io_service, - gcs::RedisGcsClient &gcs_client); + boost::asio::io_service &io_service); /// Submit a normal task. /// + /// \param[in] caller_id ID of the task submitter. /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. /// \param[out] return_ids Ids of the return objects. /// \return Status. - Status SubmitTask(const RayFunction &function, const std::vector &args, - const TaskOptions &task_options, std::vector *return_ids); + Status SubmitTask(const TaskID &caller_id, const RayFunction &function, + const std::vector &args, const TaskOptions &task_options, + std::vector *return_ids); /// Create an actor. /// + /// \param[in] caller_id ID of the task submitter. /// \param[in] function The remote function that generates the actor object. /// \param[in] args Arguments of this task. /// \param[in] actor_creation_options Options for this actor creation task. /// \param[out] actor_handle Handle to the actor. /// \return Status. - Status CreateActor(const RayFunction &function, const std::vector &args, + Status CreateActor(const TaskID &caller_id, const RayFunction &function, + const std::vector &args, const ActorCreationOptions &actor_creation_options, std::unique_ptr *actor_handle); /// Submit an actor task. /// + /// \param[in] caller_id ID of the task submitter. /// \param[in] actor_handle Handle to the actor. /// \param[in] function The remote function to execute. /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. /// \param[out] return_ids Ids of the return objects. /// \return Status. - Status SubmitActorTask(ActorHandle &actor_handle, const RayFunction &function, - const std::vector &args, + Status SubmitActorTask(const TaskID &caller_id, ActorHandle &actor_handle, + const RayFunction &function, const std::vector &args, const TaskOptions &task_options, std::vector *return_ids); + /// Handle an update about an actor. + /// + /// \param[in] actor_id The ID of the actor whose status has changed. + /// \param[in] actor_data The actor's new status information. + void HandleDirectActorUpdate(const ActorID &actor_id, + const gcs::ActorTableData &actor_data); + private: /// Build common attributes of the task spec, and compute return ids. /// @@ -120,8 +131,8 @@ class CoreWorkerTaskInterface { /// \return Void. void BuildCommonTaskSpec( TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const int task_index, const RayFunction &function, const std::vector &args, - uint64_t num_returns, + const int task_index, const TaskID &caller_id, const RayFunction &function, + const std::vector &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, TaskTransportType transport_type, std::vector *return_ids); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 04bead63e..b41cfa055 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -48,12 +48,11 @@ std::shared_ptr GenerateRandomBuffer() { return std::make_shared(arg1.data(), arg1.size(), true); } -std::unique_ptr CreateActorHelper( - CoreWorker &worker, std::unordered_map &resources, - bool is_direct_call, uint64_t max_reconstructions) { +ActorHandle &CreateActorHelper(CoreWorker &worker, + std::unordered_map &resources, + bool is_direct_call, uint64_t max_reconstructions) { std::unique_ptr actor_handle; - // Test creating actor. uint8_t array[] = {1, 2, 3}; auto buffer = std::make_shared(array, sizeof(array)); @@ -65,8 +64,11 @@ std::unique_ptr CreateActorHelper( max_reconstructions, is_direct_call, resources, resources, {}}; // Create an actor. - RAY_CHECK_OK(worker.Tasks().CreateActor(func, args, actor_options, &actor_handle)); - return actor_handle; + RAY_CHECK_OK(worker.Tasks().CreateActor(worker.GetCallerId(), func, args, actor_options, + &actor_handle)); + ActorID actor_id = actor_handle->GetActorID(); + RAY_CHECK(worker.AddActorHandle(std::move(actor_handle))); + return worker.GetActorHandle(actor_id); } class CoreWorkerTest : public ::testing::Test { @@ -241,7 +243,8 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res TaskOptions options; std::vector return_ids; - RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids)); + RAY_CHECK_OK(driver.Tasks().SubmitTask(driver.GetCallerId(), func, args, options, + &return_ids)); ASSERT_EQ(return_ids.size(), 1); @@ -265,7 +268,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", nullptr); - auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); // Test submitting some tasks with by-value args for that actor. { @@ -285,8 +288,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, - &return_ids)); + RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, + func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); ASSERT_TRUE(return_ids[0].IsReturnObject()); ASSERT_EQ( @@ -326,8 +329,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = - driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); + auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func, + args, options, &return_ids); if (is_direct_call) { // For direct actor call, submitting a task with by-reference arguments // would fail. @@ -356,10 +359,10 @@ void CoreWorkerTest::TestActorReconstruction( nullptr); // creating actor. - auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); // Wait for actor alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been created"; @@ -374,9 +377,9 @@ void CoreWorkerTest::TestActorReconstruction( ASSERT_EQ(system("pkill mock_worker"), 0); // Wait for actor restruction event, and then for alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), false, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), false, 30 * 1000 /* 30s */)); - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been reconstructed"; @@ -394,8 +397,8 @@ void CoreWorkerTest::TestActorReconstruction( std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = - driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); + auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, + func, args, options, &return_ids); RAY_CHECK_OK(status); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. @@ -415,7 +418,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r nullptr); // creating actor. - auto actor_handle = + auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 0 /* not reconstructable */); // Test submitting some tasks with by-value args for that actor. @@ -441,8 +444,8 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = - driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); + auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, + func, args, options, &return_ids); if (i < task_index_to_kill_worker) { RAY_CHECK_OK(status); } @@ -653,9 +656,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { ActorCreationOptions actor_options{ 0, /*is_direct_call*/ true, resources, resources, {}}; const auto job_id = NextJobId(); - ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), - ActorHandleID::Nil(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), true, + ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id, + ObjectID::FromRandom(), function.GetLanguage(), true, function.GetFunctionDescriptor()); // Manually create `num_tasks` task specs, and for each of them create a @@ -672,7 +674,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, - num_returns, resources, resources); + RandomTaskId(), num_returns, resources, resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -699,23 +701,14 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, "", "127.0.0.1", nullptr); - std::unique_ptr actor_handle; std::vector object_ids; - - // Test creating actor. - uint8_t array[] = {1, 2, 3}; - auto buffer = std::make_shared(array, sizeof(array)); - RayFunction func(ray::Language::PYTHON, {}); - std::vector args; - args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); - - std::unordered_map resources; - ActorCreationOptions actor_options{ - 0, /*is_direct_call*/ true, resources, resources, {}}; // Create an actor. - RAY_CHECK_OK(driver.Tasks().CreateActor(func, args, actor_options, &actor_handle)); + std::unordered_map resources; + auto &actor_handle = CreateActorHelper(driver, resources, + /*is_direct_call=*/true, + /*max_reconstructions=*/0); // wait for actor creation finish. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, 30 * 1000 /* 30s */)); // Test submitting some tasks with by-value args for that actor. int64_t start_ms = current_time_ms(); @@ -733,8 +726,8 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - RAY_CHECK_OK( - driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids)); + RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func, + args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } @@ -776,75 +769,20 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { } TEST_F(ZeroNodeTest, TestActorHandle) { - const JobID job_id = NextJobId(); - const TaskID task_id = TaskID::ForDriverTask(job_id); - const ActorHandleID actor_handle_id = ActorHandleID::FromRandom(); - ActorHandle parent(ActorID::Of(job_id, task_id, 1), actor_handle_id, job_id, - ObjectID::FromRandom(), Language::JAVA, false, - {"org.ray.exampleClass", "exampleMethod", "exampleSignature"}); + // Test actor handle serialization and deserialization round trip. + JobID job_id = NextJobId(); + ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), job_id, + ObjectID::FromRandom(), Language::PYTHON, /*is_direct_call=*/false, + {}); + std::string output; + original.Serialize(&output); + ActorHandle deserialized(output); + ASSERT_EQ(deserialized.GetActorID(), original.GetActorID()); + ASSERT_EQ(deserialized.ActorLanguage(), original.ActorLanguage()); + ASSERT_EQ(deserialized.ActorCreationTaskFunctionDescriptor(), + original.ActorCreationTaskFunctionDescriptor()); - // Test in-band forking logic. - std::unique_ptr forkedHandle1 = parent.Fork(); - ASSERT_EQ(1, parent.num_forks_); - ASSERT_EQ(parent.GetActorID(), forkedHandle1->GetActorID()); - ASSERT_EQ(actor_handle_id, parent.GetActorHandleID()); - ASSERT_NE(parent.GetActorHandleID(), forkedHandle1->GetActorHandleID()); - ASSERT_EQ(parent.ActorLanguage(), forkedHandle1->ActorLanguage()); - ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(), - forkedHandle1->ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(parent.ActorCursor(), forkedHandle1->ActorCursor()); - ASSERT_EQ(0, forkedHandle1->task_counter_); - ASSERT_EQ(0, forkedHandle1->num_forks_); - ASSERT_EQ(parent.new_actor_handles_.size(), 1); - ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle1->GetActorHandleID()); - parent.new_actor_handles_.clear(); - - std::unique_ptr forkedHandle2 = parent.Fork(); - ASSERT_EQ(2, parent.num_forks_); - ASSERT_EQ(0, forkedHandle2->task_counter_); - ASSERT_EQ(0, forkedHandle2->num_forks_); - ASSERT_EQ(parent.new_actor_handles_.size(), 1); - ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle2->GetActorHandleID()); - parent.new_actor_handles_.clear(); - - // Test serialization and deserialization for in-band fork. - std::string buffer1; - forkedHandle2->Serialize(&buffer1); - ActorHandle deserializedHandle1(buffer1, task_id); - ASSERT_EQ(forkedHandle2->GetActorID(), deserializedHandle1.GetActorID()); - ASSERT_EQ(forkedHandle2->GetActorHandleID(), deserializedHandle1.GetActorHandleID()); - ASSERT_EQ(forkedHandle2->ActorLanguage(), deserializedHandle1.ActorLanguage()); - ASSERT_EQ(forkedHandle2->ActorCreationTaskFunctionDescriptor(), - deserializedHandle1.ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(forkedHandle2->ActorCursor(), deserializedHandle1.ActorCursor()); - - // Test out-of-band forking logic. - std::unique_ptr forkedHandle3 = parent.ForkForSerialization(); - ASSERT_EQ(2, parent.num_forks_); - ASSERT_EQ(parent.GetActorID(), forkedHandle3->GetActorID()); - ASSERT_EQ(actor_handle_id, parent.GetActorHandleID()); - ASSERT_NE(parent.GetActorHandleID(), forkedHandle3->GetActorHandleID()); - ASSERT_NE(forkedHandle2->GetActorHandleID(), forkedHandle3->GetActorHandleID()); - ASSERT_EQ(parent.ActorLanguage(), forkedHandle3->ActorLanguage()); - ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(), - forkedHandle3->ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(parent.ActorCursor(), forkedHandle3->ActorCursor()); - ASSERT_EQ(0, forkedHandle3->task_counter_); - ASSERT_EQ(0, forkedHandle3->num_forks_); - ASSERT_EQ(parent.new_actor_handles_.size(), 1); - ASSERT_NE(parent.new_actor_handles_.back(), forkedHandle3->GetActorHandleID()); - parent.new_actor_handles_.clear(); - - // Test serialization and deserialization for out-of-band fork. - std::string buffer2; - forkedHandle3->Serialize(&buffer2); - ActorHandle deserializedHandle2(buffer2, task_id); - ASSERT_EQ(forkedHandle3->GetActorID(), deserializedHandle2.GetActorID()); - ASSERT_NE(forkedHandle3->GetActorHandleID(), deserializedHandle2.GetActorHandleID()); - ASSERT_EQ(forkedHandle3->ActorLanguage(), deserializedHandle2.ActorLanguage()); - ASSERT_EQ(forkedHandle3->ActorCreationTaskFunctionDescriptor(), - deserializedHandle2.ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(forkedHandle3->ActorCursor(), deserializedHandle2.ActorCursor()); + // TODO: Test submission from different handles. } TEST_F(SingleNodeTest, TestMemoryStoreProvider) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index aafe319e7..d630711da 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -15,10 +15,9 @@ bool HasByReferenceArgs(const TaskSpecification &spec) { } CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter( - boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client, + boost::asio::io_service &io_service, std::unique_ptr store_provider) : io_service_(io_service), - gcs_client_(gcs_client), client_call_manager_(io_service), store_provider_(std::move(store_provider)) {} @@ -40,11 +39,6 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( std::unique_lock guard(mutex_); - if (subscribed_actors_.find(actor_id) == subscribed_actors_.end()) { - RAY_CHECK_OK(SubscribeActorUpdates(actor_id)); - subscribed_actors_.insert(actor_id); - } - auto iter = actor_states_.find(actor_id); if (iter == actor_states_.end() || iter->second.state_ == ActorTableData::RECONSTRUCTING) { @@ -78,61 +72,48 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( } } -Status CoreWorkerDirectActorTaskSubmitter::SubscribeActorUpdates( - const ActorID &actor_id) { - // Register a callback to handle actor notifications. - auto actor_notification_callback = [this](const ActorID &actor_id, - const ActorTableData &actor_data) { - std::unique_lock guard(mutex_); - actor_states_.erase(actor_id); - actor_states_.emplace( - actor_id, - ActorStateData(actor_data.state(), actor_data.ip_address(), actor_data.port())); +void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( + const ActorID &actor_id, const ActorTableData &actor_data) { + std::unique_lock guard(mutex_); + actor_states_.erase(actor_id); + actor_states_.emplace( + actor_id, + ActorStateData(actor_data.state(), actor_data.ip_address(), actor_data.port())); - if (actor_data.state() == ActorTableData::ALIVE) { - // Check if this actor is the one that we're interested, if we already have - // a connection to the actor, or have pending requests for it, we should - // create a new connection. - if (pending_requests_.count(actor_id) > 0) { - ConnectAndSendPendingTasks(actor_id, actor_data.ip_address(), actor_data.port()); - } - } else { - // Remove rpc client if it's dead or being reconstructed. - rpc_clients_.erase(actor_id); + if (actor_data.state() == ActorTableData::ALIVE) { + // Check if this actor is the one that we're interested, if we already have + // a connection to the actor, or have pending requests for it, we should + // create a new connection. + if (pending_requests_.count(actor_id) > 0) { + ConnectAndSendPendingTasks(actor_id, actor_data.ip_address(), actor_data.port()); + } + } else { + // Remove rpc client if it's dead or being reconstructed. + rpc_clients_.erase(actor_id); - // For tasks that have been sent and are waiting for replies, treat them - // as failed when the destination actor is dead or reconstructing. - auto iter = waiting_reply_tasks_.find(actor_id); - if (iter != waiting_reply_tasks_.end()) { - for (const auto &entry : iter->second) { - const auto &task_id = entry.first; - const auto num_returns = entry.second; - TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED); - } - waiting_reply_tasks_.erase(actor_id); - } - - // If this actor is permanently dead and there are pending requests, treat - // the pending tasks as failed. - if (actor_data.state() == ActorTableData::DEAD && - pending_requests_.count(actor_id) > 0) { - for (const auto &request : pending_requests_[actor_id]) { - TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()), - request->task_spec().num_returns(), - rpc::ErrorType::ACTOR_DIED); - } - pending_requests_.erase(actor_id); + // For tasks that have been sent and are waiting for replies, treat them + // as failed when the destination actor is dead or reconstructing. + auto iter = waiting_reply_tasks_.find(actor_id); + if (iter != waiting_reply_tasks_.end()) { + for (const auto &entry : iter->second) { + const auto &task_id = entry.first; + const auto num_returns = entry.second; + TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED); } + waiting_reply_tasks_.erase(actor_id); } - RAY_LOG(INFO) << "received notification on actor, state=" - << static_cast(actor_data.state()) << ", actor_id: " << actor_id - << ", ip address: " << actor_data.ip_address() - << ", port: " << actor_data.port(); - }; - - return gcs_client_.Actors().AsyncSubscribe(actor_id, actor_notification_callback, - nullptr); + // If this actor is permanently dead and there are pending requests, treat + // the pending tasks as failed. + if (actor_data.state() == ActorTableData::DEAD && + pending_requests_.count(actor_id) > 0) { + for (const auto &request : pending_requests_[actor_id]) { + TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()), + request->task_spec().num_returns(), rpc::ErrorType::ACTOR_DIED); + } + pending_requests_.erase(actor_id); + } + } } void CoreWorkerDirectActorTaskSubmitter::ConnectAndSendPendingTasks( @@ -211,14 +192,9 @@ void CoreWorkerDirectActorTaskSubmitter::TreatTaskAsFailed( } } -bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) { +bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { std::unique_lock guard(mutex_); - if (subscribed_actors_.find(actor_id) == subscribed_actors_.end()) { - RAY_CHECK_OK(SubscribeActorUpdates(actor_id)); - subscribed_actors_.insert(actor_id); - } - auto iter = actor_states_.find(actor_id); return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE); } @@ -252,10 +228,10 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( return; } - auto it = scheduling_queue_.find(task_spec.ActorHandleId()); + auto it = scheduling_queue_.find(task_spec.CallerId()); if (it == scheduling_queue_.end()) { auto result = scheduling_queue_.emplace( - task_spec.ActorHandleId(), + task_spec.CallerId(), std::unique_ptr(new SchedulingQueue(io_service_))); it = result.first; } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index de8054fb1..489608cf3 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -35,7 +35,7 @@ struct ActorStateData { class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { public: CoreWorkerDirectActorTaskSubmitter( - boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client, + boost::asio::io_service &io_service, std::unique_ptr store_provider); /// Submit a task to an actor for execution. @@ -44,10 +44,13 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { /// \return Status. Status SubmitTask(const TaskSpecification &task_spec) override; - private: - /// Subscribe to updates of an actor. - Status SubscribeActorUpdates(const ActorID &actor_id); + /// Handle an update about an actor. + /// + /// \param[in] actor_id The ID of the actor whose status has changed. + /// \param[in] actor_data The actor's new status information. + void HandleActorUpdate(const ActorID &actor_id, const gcs::ActorTableData &actor_data); + private: /// Push a task to a remote actor via the given client. /// Note, this function doesn't return any error status code. If an error occurs while /// sending the request, this task will be treated as failed. @@ -86,14 +89,11 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { /// /// \param[in] actor_id The actor ID. /// \return Whether this actor is alive. - bool IsActorAlive(const ActorID &actor_id); + bool IsActorAlive(const ActorID &actor_id) const; /// The IO event loop. boost::asio::io_service &io_service_; - /// Gcs client. - gcs::RedisGcsClient &gcs_client_; - /// The `ClientCallManager` object that is shared by all `DirectActorClient`s. rpc::ClientCallManager client_call_manager_; @@ -117,9 +117,6 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { /// Map from actor id to the tasks that are waiting for reply. std::unordered_map> waiting_reply_tasks_; - /// The set of actors which are subscribed for further updates. - std::unordered_set subscribed_actors_; - /// The store provider. std::unique_ptr store_provider_; @@ -230,7 +227,7 @@ class CoreWorkerDirectActorTaskReceiver : public CoreWorkerTaskReceiver, TaskHandler task_handler_; /// Queue of pending requests per actor handle. /// TODO(ekl) GC these queues once the handle is no longer active. - std::unordered_map> scheduling_queue_; + std::unordered_map> scheduling_queue_; }; } // namespace ray diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index 9b2ff20c9..f6a9540b8 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -149,6 +149,7 @@ Status SubscriptionExecutor::AsyncSubscribe( template Status SubscriptionExecutor::AsyncUnsubscribe( const ClientID &client_id, const ID &id, const StatusCallback &done) { + SubscribeCallback subscribe = nullptr; { std::unique_lock lock(mutex_); const auto it = id_to_callback_map_.find(id); @@ -156,14 +157,25 @@ Status SubscriptionExecutor::AsyncUnsubscribe( RAY_LOG(DEBUG) << "Invalid Unsubscribe! id " << id << " client_id " << client_id; return Status::Invalid("Invalid Unsubscribe, no existing subscription found."); } + subscribe = std::move(it->second); + id_to_callback_map_.erase(it); } - auto on_done = [this, id, done](Status status) { - if (status.ok()) { + RAY_CHECK(subscribe != nullptr); + auto on_done = [this, id, subscribe, done](Status status) { + if (!status.ok()) { std::unique_lock lock(mutex_); const auto it = id_to_callback_map_.find(id); if (it != id_to_callback_map_.end()) { - id_to_callback_map_.erase(it); + // The initial AsyncUnsubscribe deleted the callback, but the client + // has subscribed again in the meantime. This new callback will be + // called if we receive more notifications. + RAY_LOG(WARNING) + << "Client called AsyncSubscribe on " << id + << " while AsyncUnsubscribe was pending, but the unsubscribe failed."; + } else { + // The Unsubscribe failed, so restore the initial callback. + id_to_callback_map_[id] = subscribe; } } if (done != nullptr) { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 737e879a7..b53e253ec 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -48,21 +48,24 @@ message TaskSpec { bytes parent_task_id = 6; // A count of the number of tasks submitted by the parent task before this one. uint64 parent_counter = 7; + // Task ID of the caller. This is the same as parent_task_id for non-actors. + // This is the actor ID (embedded in a nil task ID) for actors. + bytes caller_id = 8; // Task arguments. - repeated TaskArg args = 8; + repeated TaskArg args = 9; // Number of return objects. - uint64 num_returns = 9; + uint64 num_returns = 10; // Quantities of the different resources required by this task. - map required_resources = 10; + map required_resources = 11; // The resources required for placing this task on a node. If this is empty, // then the placement resources are equal to the required_resources. - map required_placement_resources = 11; + map required_placement_resources = 12; // Task specification for an actor creation task. // This field is only valid when `type == ACTOR_CREATION_TASK`. - ActorCreationTaskSpec actor_creation_task_spec = 14; + ActorCreationTaskSpec actor_creation_task_spec = 13; // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. - ActorTaskSpec actor_task_spec = 15; + ActorTaskSpec actor_task_spec = 14; } // Argument in the task. @@ -98,16 +101,10 @@ message ActorCreationTaskSpec { message ActorTaskSpec { // Actor ID of the actor that this task is executed on. bytes actor_id = 2; - // The ID of the handle that was used to submit the task. This should be - // unique across handles with the same actor_id. - bytes actor_handle_id = 3; // The dummy object ID of the actor creation task. bytes actor_creation_dummy_object_id = 4; // Number of tasks that have been submitted to this actor so far. uint64 actor_counter = 5; - // This will be populated with all of the new actor handles that were forked - // from this handle since the last task on this handle was submitted. - repeated bytes new_actor_handles = 6; // The dummy object ID of the previous actor task. bytes previous_actor_task_dummy_object_id = 7; } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index a50d1a1e9..764529bf1 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -9,9 +9,6 @@ message ActorHandle { // ID of the actor. bytes actor_id = 1; - // ID of this actor handle. - bytes actor_handle_id = 2; - // ID of the job that created the actor (it is possible that the handle // exists on a job with a different job ID). bytes creation_job_id = 3; @@ -22,8 +19,9 @@ message ActorHandle { // Function descriptor of actor creation task. repeated string actor_creation_task_function_descriptor = 5; - // The unique id of the last return of the last task. - // It's used as a dependency for the next task. + // The unique id of the dummy object returned by the actor creation task. + // It's used as a dependency for the first task. + // TODO: Remove this once scheduling is done by task counter only. bytes actor_cursor = 6; // Whether direct actor call is used. diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index 7574a57db..622af4df7 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -9,7 +9,11 @@ namespace ray { namespace raylet { ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data) - : actor_table_data_(actor_table_data) {} + : actor_table_data_(actor_table_data) { + // The first task submitted on each new actor handle will depend on the actor + // creation object, so we always pin it. + dummy_objects_[GetActorCreationDependency()]++; +} ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data, const ActorCheckpointData &checkpoint_data) @@ -18,8 +22,8 @@ ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data, ObjectID::FromBinary(checkpoint_data.execution_dependency())) { // Restore `frontier_`. for (int64_t i = 0; i < checkpoint_data.handle_ids_size(); i++) { - auto handle_id = ActorHandleID::FromBinary(checkpoint_data.handle_ids(i)); - auto &frontier_entry = frontier_[handle_id]; + auto caller_id = TaskID::FromBinary(checkpoint_data.handle_ids(i)); + auto &frontier_entry = frontier_[caller_id]; frontier_entry.task_counter = checkpoint_data.task_counters(i); frontier_entry.execution_dependency = ObjectID::FromBinary(checkpoint_data.frontier_dependencies(i)); @@ -55,14 +59,14 @@ const int64_t ActorRegistration::GetRemainingReconstructions() const { return actor_table_data_.remaining_reconstructions(); } -const std::unordered_map +const std::unordered_map &ActorRegistration::GetFrontier() const { return frontier_; } -ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, +ObjectID ActorRegistration::ExtendFrontier(const TaskID &caller_id, const ObjectID &execution_dependency) { - auto &frontier_entry = frontier_[handle_id]; + auto &frontier_entry = frontier_[caller_id]; // Release the reference to the previous cursor for this // actor handle, if there was one. ObjectID object_to_release; @@ -85,16 +89,6 @@ ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, return object_to_release; } -void ActorRegistration::AddHandle(const ActorHandleID &handle_id, - const ObjectID &execution_dependency) { - if (frontier_.find(handle_id) == frontier_.end()) { - auto &new_handle = frontier_[handle_id]; - new_handle.task_counter = 0; - new_handle.execution_dependency = execution_dependency; - dummy_objects_[execution_dependency]++; - } -} - int ActorRegistration::NumHandles() const { return frontier_.size(); } std::shared_ptr ActorRegistration::GenerateCheckpointData( @@ -102,14 +96,14 @@ std::shared_ptr ActorRegistration::GenerateCheckpointData( // Make a copy of the actor registration ActorRegistration copy = *this; if (task) { - const auto actor_handle_id = task->GetTaskSpecification().ActorHandleId(); + const auto actor_caller_id = task->GetTaskSpecification().CallerId(); const auto dummy_object = task->GetTaskSpecification().ActorDummyObject(); // Extend its frontier to include the most recent task. // NOTE(hchen): For non-direct-call actors, this is needed because this method is // called before `FinishAssignedTask`, which will be called when the worker tries to // fetch the next task. For direct-call actors, checkpoint data doesn't contain // frontier info, so we don't need to do `ExtendFrontier` here. - copy.ExtendFrontier(actor_handle_id, dummy_object); + copy.ExtendFrontier(actor_caller_id, dummy_object); } // Use actor's current state to generate checkpoint data. diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index 8aa40253b..24b1fa7bd 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -97,7 +97,7 @@ class ActorRegistration { /// /// \return The actor frontier, a map from handle ID to execution state for /// that handle. - const std::unordered_map &GetFrontier() const; + const std::unordered_map &GetFrontier() const; /// Get all the dummy objects of this actor's tasks. const std::unordered_map &GetDummyObjects() const { @@ -112,18 +112,7 @@ class ActorRegistration { /// state. This is the execution dependency returned by the task. /// \return The dummy object that can be released as a result of the executed /// task. If no dummy object can be released, then this is nil. - ObjectID ExtendFrontier(const ActorHandleID &handle_id, - const ObjectID &execution_dependency); - - /// Add a new handle to the actor frontier. This does nothing if the actor - /// handle already exists. - /// - /// \param handle_id The ID of the handle to add. - /// \param execution_dependency This is the expected execution dependency for - /// the first task submitted on the new handle. If the new handle hasn't been - /// seen yet, then this dependency will be added to the actor frontier and is - /// not safe to release until the first task has been submitted. - void AddHandle(const ActorHandleID &handle_id, const ObjectID &execution_dependency); + ObjectID ExtendFrontier(const TaskID &caller_id, const ObjectID &execution_dependency); /// Returns num handles to this actor entry. /// @@ -150,7 +139,7 @@ class ActorRegistration { /// The execution frontier of the actor, which represents which tasks have /// executed so far and which tasks may execute next, based on execution /// dependencies. This is indexed by handle. - std::unordered_map frontier_; + std::unordered_map frontier_; /// This map is used to track all the unreleased dummy objects for this /// actor. The map key is the dummy object ID, and the map value is the /// number of actor handles that depend on that dummy object. When the map diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index e6760dd5f..2e9974108 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -139,7 +139,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, {"", "", ""}, JobID::Nil(), - RandomTaskId(), 0, num_returns, {}, {}); + RandomTaskId(), 0, RandomTaskId(), num_returns, {}, {}); for (const auto &arg : arguments) { builder.AddByRefArg(arg); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 5cf4c1019..c191ca0b1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -23,12 +23,12 @@ namespace { int64_t GetExpectedTaskCounter( const std::unordered_map &actor_registry, - const ray::ActorID &actor_id, const ray::ActorHandleID &actor_handle_id) { + const ray::ActorID &actor_id, const ray::TaskID &actor_caller_id) { auto actor_entry = actor_registry.find(actor_id); RAY_CHECK(actor_entry != actor_registry.end()); const auto &frontier = actor_entry->second.GetFrontier(); int64_t expected_task_counter = 0; - auto frontier_entry = frontier.find(actor_handle_id); + auto frontier_entry = frontier.find(actor_caller_id); if (frontier_entry != frontier.end()) { expected_task_counter = frontier_entry->second.task_counter; } @@ -1605,8 +1605,8 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag auto node_manager_id = actor_entry->second.GetNodeManagerId(); if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) { // The actor is local. - int64_t expected_task_counter = GetExpectedTaskCounter( - actor_registry_, spec.ActorId(), spec.ActorHandleId()); + int64_t expected_task_counter = + GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.CallerId()); if (static_cast(spec.ActorCounter()) < expected_task_counter) { // A task that has already been executed before has been found. The // task will be treated as failed if at least one of the task's @@ -1825,7 +1825,7 @@ bool NodeManager::AssignTask(const Task &task) { // An actor task should only be ready to be assigned if it matches the // expected task counter. int64_t expected_task_counter = - GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.ActorHandleId()); + GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.CallerId()); RAY_CHECK(static_cast(spec.ActorCounter()) == expected_task_counter) << "Expected actor counter: " << expected_task_counter << ", task " << spec.TaskId() << " has: " << spec.ActorCounter(); @@ -1985,18 +1985,18 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { ActorID actor_id; - ActorHandleID actor_handle_id; + TaskID caller_id; const TaskSpecification task_spec = task.GetTaskSpecification(); bool resumed_from_checkpoint = false; if (task_spec.IsActorCreationTask()) { actor_id = task_spec.ActorCreationId(); - actor_handle_id = ActorHandleID::Nil(); + caller_id = TaskID::Nil(); if (checkpoint_id_to_restore_.count(actor_id) > 0) { resumed_from_checkpoint = true; } } else { actor_id = task_spec.ActorId(); - actor_handle_id = task_spec.ActorHandleId(); + caller_id = task_spec.CallerId(); } if (task_spec.IsActorCreationTask()) { @@ -2004,7 +2004,6 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { worker.AssignActorId(actor_id); // Lookup the parent actor id. auto parent_task_id = task_spec.ParentTaskId(); - RAY_CHECK(actor_handle_id.IsNil()); int port = worker.Port(); RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::Nil(), parent_task_id, @@ -2052,36 +2051,28 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { resumed_from_checkpoint, port); })); } else { - // The actor was not resumed from a checkpoint. We extend the actor's - // frontier as usual since there is no frontier to restore. - ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, actor_handle_id); + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); + // Extend the actor's frontier to include the executed task. + const ObjectID object_to_release = + actor_entry->second.ExtendFrontier(caller_id, task_spec.ActorDummyObject()); + if (!object_to_release.IsNil()) { + // If there were no new actor handles created, then no other actor task + // will depend on this execution dependency, so it safe to release. + HandleObjectMissing(object_to_release); + } + // Mark the dummy object as locally available to indicate that the actor's + // state has changed and the next method can run. This is not added to the + // object table, so the update will be invisible to both the local object + // manager and the other nodes. + // NOTE(swang): The dummy objects must be marked as local whenever + // ExtendFrontier is called, and vice versa, so that we can clean up the + // dummy objects properly in case the actor fails and needs to be + // reconstructed. + HandleObjectLocal(task_spec.ActorDummyObject()); } } -void NodeManager::ExtendActorFrontier(const ObjectID &dummy_object, - const ActorID &actor_id, - const ActorHandleID &actor_handle_id) { - auto actor_entry = actor_registry_.find(actor_id); - RAY_CHECK(actor_entry != actor_registry_.end()); - // Extend the actor's frontier to include the executed task. - const ObjectID object_to_release = - actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); - if (!object_to_release.IsNil()) { - // If there were no new actor handles created, then no other actor task - // will depend on this execution dependency, so it safe to release. - HandleObjectMissing(object_to_release); - } - // Mark the dummy object as locally available to indicate that the actor's - // state has changed and the next method can run. This is not added to the - // object table, so the update will be invisible to both the local object - // manager and the other nodes. - // NOTE(swang): The dummy objects must be marked as local whenever - // ExtendFrontier is called, and vice versa, so that we can clean up the - // dummy objects properly in case the actor fails and needs to be - // reconstructed. - HandleObjectLocal(dummy_object); -} - void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id, const TaskSpecification &task_spec, bool resumed_from_checkpoint, @@ -2141,9 +2132,10 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id } } if (!resumed_from_checkpoint) { - // The actor was not resumed from a checkpoint. We extend the actor's - // frontier as usual since there is no frontier to restore. - ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, ActorHandleID::Nil()); + // The actor was not resumed from a checkpoint. Store the + // initial dummy object. All future handles to the actor will + // depend on this object. + HandleObjectLocal(task_spec.ActorDummyObject()); } } @@ -2450,28 +2442,10 @@ void NodeManager::FinishAssignTask(const TaskID &task_id, Worker &worker, bool s // We successfully assigned the task to the worker. worker.AssignTaskId(spec.TaskId()); worker.AssignJobId(spec.JobId()); - // Actor tasks require extra accounting to track the actor's state. - if (spec.IsActorTask()) { - auto actor_entry = actor_registry_.find(spec.ActorId()); - RAY_CHECK(actor_entry != actor_registry_.end()); - // Process any new actor handles that were created since the - // previous task on this handle was executed. The first task - // submitted on a new actor handle will depend on the dummy object - // returned by the previous task, so the dependency will not be - // released until this first task is submitted. - for (auto &new_handle_id : spec.NewActorHandles()) { - const auto prev_actor_task_id = spec.PreviousActorTaskDummyObjectId(); - RAY_CHECK(!prev_actor_task_id.IsNil()); - // Add the new handle and give it a reference to the finished task's - // execution dependency. - actor_entry->second.AddHandle(new_handle_id, prev_actor_task_id); - } - - // TODO(swang): For actors with multiple actor handles, to - // guarantee that tasks are replayed in the same order after a - // failure, we must update the task's execution dependency to be - // the actor's current execution dependency. - } + // TODO(swang): For actors with multiple actor handles, to + // guarantee that tasks are replayed in the same order after a + // failure, we must update the task's execution dependency to be + // the actor's current execution dependency. // Mark the task as running. // (See design_docs/task_states.rst for the state transition diagram.) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 58b4b381a..173f1e0b7 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -243,13 +243,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void FinishAssignedActorCreationTask(const ActorID &parent_actor_id, const TaskSpecification &task_spec, bool resumed_from_checkpoint, int port); - /// Extend actor frontier after an actor task or actor creation task executes. - /// - /// \param dummy_object Dummy object corresponding to the task. - /// \param actor_id The relevant actor ID. - /// \param actor_handle_id The relevant actor handle ID. - void ExtendActorFrontier(const ObjectID &dummy_object, const ActorID &actor_id, - const ActorHandleID &actor_handle_id); /// Make a placement decision for placeable tasks given the resource_map /// provided. This will perform task state transitions and task forwarding. /// diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 510bd0397..3cf67e6f1 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -76,7 +76,7 @@ static inline Task ExampleTask(const std::vector &arguments, uint64_t num_returns) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, {"", "", ""}, JobID::Nil(), - RandomTaskId(), 0, num_returns, {}, {}); + RandomTaskId(), 0, RandomTaskId(), num_returns, {}, {}); for (const auto &arg : arguments) { builder.AddByRefArg(arg); } diff --git a/src/ray/rpc/worker/direct_actor_client.h b/src/ray/rpc/worker/direct_actor_client.h index 238100a90..ae685ccad 100644 --- a/src/ray/rpc/worker/direct_actor_client.h +++ b/src/ray/rpc/worker/direct_actor_client.h @@ -40,13 +40,16 @@ class DirectActorClient : public std::enable_shared_from_this /// \return if the rpc call succeeds ray::Status PushTask(std::unique_ptr request, const ClientCallback &callback) { - request->set_sequence_number(next_seq_no_++); - send_queue_.push_back(std::make_pair(std::move(request), callback)); + request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter()); + { + std::lock_guard lock(mutex_); + send_queue_.push_back(std::make_pair(std::move(request), callback)); + } SendRequests(); return ray::Status::OK(); } - /// Send as many pending tasks as possible. This method is thread-safe. + /// Send as many pending tasks as possible. This method is NOT thread-safe. /// /// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being /// sent at once. This prevents the server scheduling queue from being overwhelmed. @@ -115,9 +118,6 @@ class DirectActorClient : public std::enable_shared_from_this std::deque, ClientCallback>> send_queue_; - /// The next sequence number to assign to a task for this server. - int64_t next_seq_no_ = 0; - /// The number of bytes currently in flight. int64_t rpc_bytes_in_flight_ = 0;