From 92525f35d164dccaeeaaff76fce9371ba34c008d Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Fri, 31 Jan 2020 18:23:01 -0800 Subject: [PATCH] Remove raylet client from Python worker (#6018) --- python/ray/_raylet.pyx | 90 ++++++------------- python/ray/experimental/dynamic_resources.py | 2 +- python/ray/function_manager.py | 7 +- python/ray/includes/libcoreworker.pxd | 16 +++- python/ray/includes/libraylet.pxd | 77 ---------------- python/ray/tests/test_stress_failure.py | 3 +- python/ray/utils.py | 2 +- python/ray/worker.py | 1 - src/ray/core_worker/core_worker.cc | 22 ++++- src/ray/core_worker/core_worker.h | 34 +++++++ .../java/org_ray_runtime_RayNativeRuntime.cc | 7 +- ...org_ray_runtime_task_NativeTaskExecutor.cc | 6 +- src/ray/raylet/raylet_client.cc | 4 +- src/ray/raylet/raylet_client.h | 6 +- 14 files changed, 112 insertions(+), 165 deletions(-) delete mode 100644 python/ray/includes/libraylet.pxd diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index bc330c88c..48bd4063f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -58,12 +58,6 @@ from ray.includes.common cimport ( WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, ) -from ray.includes.libraylet cimport ( - CRayletClient, - GCSProfileEvent, - GCSProfileTableData, - WaitResultPair, -) from ray.includes.unique_ids cimport ( CActorID, CActorCheckpointID, @@ -306,60 +300,6 @@ cdef void prepare_args( (core_worker.put_serialized_object( serialized_arg)).native())) - -cdef class RayletClient: - cdef CRayletClient* client - - def __cinit__(self, CoreWorker core_worker): - # The core worker and raylet client need to share an underlying - # raylet client, so we take a reference to the core worker's client - # here. The client is a raw pointer because it is only a temporary - # workaround and will be removed once the core worker transition is - # complete, so we don't want to change the unique_ptr in core worker - # to a shared_ptr. This means the core worker *must* be - # initialized before the raylet client. - self.client = &core_worker.core_worker.get().GetRayletClient() - - def fetch_or_reconstruct(self, object_ids, - c_bool fetch_only, - TaskID current_task_id=TaskID.nil()): - cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids) - check_status(self.client.FetchOrReconstruct( - fetch_ids, fetch_only, True, current_task_id.native())) - - def push_error(self, JobID job_id, error_type, error_message, - double timestamp): - check_status(self.client.PushError(job_id.native(), - error_type.encode("ascii"), - error_message.encode("ascii"), - timestamp)) - - def prepare_actor_checkpoint(self, ActorID actor_id): - cdef: - CActorCheckpointID checkpoint_id - CActorID c_actor_id = actor_id.native() - - # PrepareActorCheckpoint will wait for raylet's reply, release - # the GIL so other Python threads can run. - with nogil: - check_status(self.client.PrepareActorCheckpoint( - c_actor_id, checkpoint_id)) - return ActorCheckpointID(checkpoint_id.Binary()) - - def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, - ActorCheckpointID checkpoint_id): - check_status(self.client.NotifyActorResumedFromCheckpoint( - actor_id.native(), checkpoint_id.native())) - - def set_resource(self, basestring resource_name, - double capacity, ClientID client_id): - self.client.SetResource(resource_name.encode("ascii"), capacity, - CClientID.FromBinary(client_id.binary())) - - @property - def job_id(self): - return JobID(self.client.GetJobID().Binary()) - cdef deserialize_args( const c_vector[shared_ptr[CRayObject]] &c_args, const c_vector[CObjectID] &arg_reference_ids): @@ -770,7 +710,6 @@ cdef class CoreWorker: def wait(self, object_ids, int num_returns, int64_t timeout_ms, TaskID current_task_id): cdef: - WaitResultPair result c_vector[CObjectID] wait_ids c_vector[c_bool] results CTaskID c_task_id = current_task_id.native() @@ -1099,6 +1038,35 @@ cdef class CoreWorker: async_retry_with_plasma_callback, future) + def push_error(self, JobID job_id, error_type, error_message, + double timestamp): + check_status(self.core_worker.get().PushError( + job_id.native(), error_type.encode("ascii"), + error_message.encode("ascii"), timestamp)) + + def prepare_actor_checkpoint(self, ActorID actor_id): + cdef: + CActorCheckpointID checkpoint_id + CActorID c_actor_id = actor_id.native() + + # PrepareActorCheckpoint will wait for raylet's reply, release + # the GIL so other Python threads can run. + with nogil: + check_status(self.core_worker.get().PrepareActorCheckpoint( + c_actor_id, &checkpoint_id)) + return ActorCheckpointID(checkpoint_id.Binary()) + + def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, + ActorCheckpointID checkpoint_id): + check_status(self.core_worker.get().NotifyActorResumedFromCheckpoint( + actor_id.native(), checkpoint_id.native())) + + def set_resource(self, basestring resource_name, + double capacity, ClientID client_id): + self.core_worker.get().SetResource( + resource_name.encode("ascii"), capacity, + CClientID.FromBinary(client_id.binary())) + cdef void async_set_result_callback(shared_ptr[CRayObject] obj, CObjectID object_id, void *future) with gil: diff --git a/python/ray/experimental/dynamic_resources.py b/python/ray/experimental/dynamic_resources.py index 34b2b99e6..7eb5cc056 100644 --- a/python/ray/experimental/dynamic_resources.py +++ b/python/ray/experimental/dynamic_resources.py @@ -31,5 +31,5 @@ def set_resource(resource_name, capacity, client_id=None): if (capacity < 0) or (capacity != int(capacity)): raise ValueError( "Capacity {} must be a non-negative integer.".format(capacity)) - return ray.worker.global_worker.raylet_client.set_resource( + return ray.worker.global_worker.core_worker.set_resource( resource_name, capacity, client_id_obj) diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 1d8372e94..6bebc905a 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -817,8 +817,9 @@ class FunctionActorManager: if actor.should_checkpoint(checkpoint_context): try: now = int(1000 * time.time()) - checkpoint_id = (self._worker.raylet_client. - prepare_actor_checkpoint(actor_id)) + checkpoint_id = ( + self._worker.core_worker.prepare_actor_checkpoint(actor_id) + ) checkpoint_info.checkpoint_ids.append(checkpoint_id) actor.save_checkpoint(actor_id, checkpoint_id) if (len(checkpoint_info.checkpoint_ids) > @@ -865,7 +866,7 @@ class FunctionActorManager: for checkpoint in checkpoints), msg # Notify raylet that this actor has been resumed from # a checkpoint. - (self._worker.raylet_client. + (self._worker.core_worker. notify_actor_resumed_from_checkpoint( actor_id, checkpoint_id)) except Exception: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 49aa706fb..540624483 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -12,6 +12,8 @@ from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( CActorID, + CActorCheckpointID, + CClientID, CJobID, CTaskID, CObjectID, @@ -31,7 +33,6 @@ from ray.includes.common cimport ( CGcsClientOptions, ) from ray.includes.task cimport CTaskSpec -from ray.includes.libraylet cimport CRayletClient ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ ResourceMappingType @@ -107,9 +108,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[shared_ptr[CBuffer]] &metadatas, c_vector[shared_ptr[CRayObject]] *return_objects) - # TODO(edoakes): remove this once the raylet client is no longer used - # directly. - CRayletClient &GetRayletClient() CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() const CActorID &GetActorId() @@ -159,3 +157,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: ray_callback_function successs_callback, ray_callback_function fallback_callback, void* python_future) + + CRayStatus PushError(const CJobID &job_id, const c_string &type, + const c_string &error_message, double timestamp) + CRayStatus PrepareActorCheckpoint(const CActorID &actor_id, + CActorCheckpointID *checkpoint_id) + CRayStatus NotifyActorResumedFromCheckpoint( + const CActorID &actor_id, const CActorCheckpointID &checkpoint_id) + CRayStatus SetResource(const c_string &resource_name, + const double capacity, + const CClientID &client_Id) diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd deleted file mode 100644 index c7f100aad..000000000 --- a/python/ray/includes/libraylet.pxd +++ /dev/null @@ -1,77 +0,0 @@ -from libc.stdint cimport int64_t -from libcpp cimport bool as c_bool -from libcpp.memory cimport unique_ptr -from libcpp.string cimport string as c_string -from libcpp.utility cimport pair -from libcpp.vector cimport vector as c_vector - -from ray.includes.common cimport ( - CLanguage, - CRayStatus, -) -from ray.includes.unique_ids cimport ( - CActorCheckpointID, - CActorID, - CClientID, - CJobID, - CWorkerID, - CObjectID, - CTaskID, -) -from ray.includes.task cimport CTaskSpec - - -cdef extern from "ray/protobuf/gcs.pb.h" nogil: - cdef cppclass GCSProfileEvent "ProfileTableData::ProfileEvent": - void set_event_type(const c_string &value) - void set_start_time(double value) - void set_end_time(double value) - c_string set_extra_data(const c_string &value) - GCSProfileEvent() - - cdef cppclass GCSProfileTableData "ProfileTableData": - void set_component_type(const c_string &value) - void set_component_id(const c_string &value) - void set_node_ip_address(const c_string &value) - GCSProfileEvent *add_profile_events() - GCSProfileTableData() - - -ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair - - -cdef extern from "ray/raylet/raylet_client.h" nogil: - cdef cppclass CRayletClient "ray::raylet::RayletClient": - CRayletClient(const c_string &raylet_socket, - const CWorkerID &worker_id, - c_bool is_worker, const CJobID &job_id, - const CLanguage &language) - CRayStatus Disconnect() - CRayStatus SubmitTask(const CTaskSpec &task_spec) - CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids, - c_bool fetch_only, - c_bool is_direct_call_task, - const CTaskID ¤t_task_id) - CRayStatus NotifyUnblocked(const CTaskID ¤t_task_id) - CRayStatus Wait(const c_vector[CObjectID] &object_ids, - int num_returns, int64_t timeout_milliseconds, - c_bool wait_local, c_bool is_direct_call_task, - const CTaskID ¤t_task_id, - WaitResultPair *result) - CRayStatus PushError(const CJobID &job_id, const c_string &type, - const c_string &error_message, double timestamp) - CRayStatus PushProfileEvents( - const GCSProfileTableData &profile_events) - CRayStatus FreeObjects(const c_vector[CObjectID] &object_ids, - c_bool local_only, c_bool delete_creating_tasks) - CRayStatus PrepareActorCheckpoint(const CActorID &actor_id, - CActorCheckpointID &checkpoint_id) - CRayStatus NotifyActorResumedFromCheckpoint( - const CActorID &actor_id, const CActorCheckpointID &checkpoint_id) - CRayStatus SetResource(const c_string &resource_name, - const double capacity, - const CClientID &client_Id) - CLanguage GetLanguage() const - CWorkerID GetWorkerID() const - CJobID GetJobID() const - c_bool IsWorker() const diff --git a/python/ray/tests/test_stress_failure.py b/python/ray/tests/test_stress_failure.py index 3a4719f67..b307341bc 100644 --- a/python/ray/tests/test_stress_failure.py +++ b/python/ray/tests/test_stress_failure.py @@ -332,8 +332,7 @@ def test_driver_put_errors(ray_start_object_store_memory): # were evicted and whose originating tasks are still running, this # for-loop should hang on its first iteration and push an error to the # driver. - ray.worker.global_worker.raylet_client.fetch_or_reconstruct([args[0]], - False) + ray.wait([args[0]], timeout=30) def error_check(errors): return len(errors) > 1 diff --git a/python/ray/utils.py b/python/ray/utils.py index 53e6a5adf..152c4d1e4 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -60,7 +60,7 @@ def push_error_to_driver(worker, error_type, message, job_id=None): if job_id is None: job_id = ray.JobID.nil() assert isinstance(job_id, ray.JobID) - worker.raylet_client.push_error(job_id, error_type, message, time.time()) + worker.core_worker.push_error(job_id, error_type, message, time.time()) def push_error_to_driver_through_redis(redis_client, diff --git a/python/ray/worker.py b/python/ray/worker.py index b1bddecac..edbd312dd 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1252,7 +1252,6 @@ def connect(node, node.node_ip_address, node.node_manager_port, ) - worker.raylet_client = ray._raylet.RayletClient(worker.core_worker) if driver_object_store_memory is not None: worker.core_worker.set_object_store_client_options( diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 704528cbf..4312e2b2a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,7 +1,5 @@ #include "ray/core_worker/core_worker.h" -#include - #include "boost/fiber/all.hpp" #include "ray/common/ray_config.h" #include "ray/common/task/task_util.h" @@ -646,6 +644,26 @@ TaskID CoreWorker::GetCallerId() const { return caller_id; } +Status CoreWorker::PushError(const JobID &job_id, const std::string &type, + const std::string &error_message, double timestamp) { + return local_raylet_client_->PushError(job_id, type, error_message, timestamp); +} + +Status CoreWorker::PrepareActorCheckpoint(const ActorID &actor_id, + ActorCheckpointID *checkpoint_id) { + return local_raylet_client_->PrepareActorCheckpoint(actor_id, checkpoint_id); +} + +Status CoreWorker::NotifyActorResumedFromCheckpoint( + const ActorID &actor_id, const ActorCheckpointID &checkpoint_id) { + return local_raylet_client_->NotifyActorResumedFromCheckpoint(actor_id, checkpoint_id); +} + +Status CoreWorker::SetResource(const std::string &resource_name, const double capacity, + const ClientID &client_id) { + return local_raylet_client_->SetResource(resource_name, capacity, client_id); +} + Status CoreWorker::SubmitTask(const RayFunction &function, const std::vector &args, const TaskOptions &task_options, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 88f518a1f..ea965125a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -286,6 +286,40 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// of the bytes zeroed out. TaskID GetCallerId() const; + /// Push an error to the relevant driver. + /// + /// \param[in] The ID of the job_id that the error is for. + /// \param[in] The type of the error. + /// \param[in] The error message. + /// \param[in] The timestamp of the error. + /// \return Status. + Status PushError(const JobID &job_id, const std::string &type, + const std::string &error_message, double timestamp); + + /// Request raylet backend to prepare a checkpoint for an actor. + /// + /// \param[in] actor_id ID of the actor. + /// \param[out] checkpoint_id ID of the new checkpoint (output parameter). + /// \return Status. + Status PrepareActorCheckpoint(const ActorID &actor_id, + ActorCheckpointID *checkpoint_id); + + /// Notify raylet backend that an actor was resumed from a checkpoint. + /// + /// \param[in] actor_id ID of the actor. + /// \param[in] checkpoint_id ID of the checkpoint from which the actor was resumed. + /// \return Status. + Status NotifyActorResumedFromCheckpoint(const ActorID &actor_id, + const ActorCheckpointID &checkpoint_id); + + /// Sets a resource with the specified capacity and client id + /// \param[in] resource_name Name of the resource to be set. + /// \param[in] capacity Capacity of the resource. + /// \param[in] client_Id ClientID where the resource is to be set. + /// \return Status + Status SetResource(const std::string &resource_name, const double capacity, + const ClientID &client_id); + /// Submit a normal task. /// /// \param[in] function The remote function to execute. diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index 0c44e8e4f..9eaa9c7ff 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -125,10 +125,9 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( const auto node_id = JavaByteArrayToId(env, nodeId); const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE); - auto &raylet_client = - reinterpret_cast(nativeCoreWorkerPointer)->GetRayletClient(); - auto status = raylet_client.SetResource(native_resource_name, - static_cast(capacity), node_id); + auto status = + reinterpret_cast(nativeCoreWorkerPointer) + ->SetResource(native_resource_name, static_cast(capacity), node_id); env->ReleaseStringUTFChars(resourceName, native_resource_name); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc index 86229cfac..7a0172d68 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc @@ -20,8 +20,7 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint( const auto &task_spec = core_worker.GetWorkerContext().GetCurrentTask(); RAY_CHECK(task_spec->IsActorTask()); ActorCheckpointID checkpoint_id; - auto status = - core_worker.GetRayletClient().PrepareActorCheckpoint(actor_id, checkpoint_id); + auto status = core_worker.PrepareActorCheckpoint(actor_id, &checkpoint_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); jbyteArray result = env->NewByteArray(checkpoint_id.Size()); env->SetByteArrayRegion(result, 0, checkpoint_id.Size(), @@ -35,8 +34,7 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativeNotifyActorResumedFromCheckpo auto &core_worker = *reinterpret_cast(nativeCoreWorkerPointer); const auto &actor_id = core_worker.GetWorkerContext().GetCurrentActorID(); const auto checkpoint_id = JavaByteArrayToId(env, checkpointId); - auto status = core_worker.GetRayletClient().NotifyActorResumedFromCheckpoint( - actor_id, checkpoint_id); + auto status = core_worker.NotifyActorResumedFromCheckpoint(actor_id, checkpoint_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 6bb07befb..cd373a039 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -307,7 +307,7 @@ Status raylet::RayletClient::FreeObjects(const std::vector &object_ids } Status raylet::RayletClient::PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID &checkpoint_id) { + ActorCheckpointID *checkpoint_id) { flatbuffers::FlatBufferBuilder fbb; auto message = protocol::CreatePrepareActorCheckpointRequest(fbb, to_flatbuf(fbb, actor_id)); @@ -320,7 +320,7 @@ Status raylet::RayletClient::PrepareActorCheckpoint(const ActorID &actor_id, if (!status.ok()) return status; auto reply_message = flatbuffers::GetRoot(reply.get()); - checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str()); + *checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str()); return Status::OK(); } diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index 4f4e81f75..4e6770ec2 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -221,11 +221,11 @@ class RayletClient : public WorkerLeaseInterface { /// Request raylet backend to prepare a checkpoint for an actor. /// - /// \param actor_id ID of the actor. - /// \param checkpoint_id ID of the new checkpoint (output parameter). + /// \param[in] actor_id ID of the actor. + /// \param[out] checkpoint_id ID of the new checkpoint (output parameter). /// \return ray::Status. ray::Status PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID &checkpoint_id); + ActorCheckpointID *checkpoint_id); /// Notify raylet backend that an actor was resumed from a checkpoint. ///