Remove raylet client from Python worker (#6018)

This commit is contained in:
Edward Oakes
2020-01-31 18:23:01 -08:00
committed by GitHub
parent 341a921d81
commit 92525f35d1
14 changed files with 112 additions and 165 deletions
+29 -61
View File
@@ -58,12 +58,6 @@ from ray.includes.common cimport (
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
)
from ray.includes.libraylet cimport (
CRayletClient,
GCSProfileEvent,
GCSProfileTableData,
WaitResultPair,
)
from ray.includes.unique_ids cimport (
CActorID,
CActorCheckpointID,
@@ -306,60 +300,6 @@ cdef void prepare_args(
(<ObjectID>core_worker.put_serialized_object(
serialized_arg)).native()))
cdef class RayletClient:
cdef CRayletClient* client
def __cinit__(self, CoreWorker core_worker):
# The core worker and raylet client need to share an underlying
# raylet client, so we take a reference to the core worker's client
# here. The client is a raw pointer because it is only a temporary
# workaround and will be removed once the core worker transition is
# complete, so we don't want to change the unique_ptr in core worker
# to a shared_ptr. This means the core worker *must* be
# initialized before the raylet client.
self.client = &core_worker.core_worker.get().GetRayletClient()
def fetch_or_reconstruct(self, object_ids,
c_bool fetch_only,
TaskID current_task_id=TaskID.nil()):
cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids)
check_status(self.client.FetchOrReconstruct(
fetch_ids, fetch_only, True, current_task_id.native()))
def push_error(self, JobID job_id, error_type, error_message,
double timestamp):
check_status(self.client.PushError(job_id.native(),
error_type.encode("ascii"),
error_message.encode("ascii"),
timestamp))
def prepare_actor_checkpoint(self, ActorID actor_id):
cdef:
CActorCheckpointID checkpoint_id
CActorID c_actor_id = actor_id.native()
# PrepareActorCheckpoint will wait for raylet's reply, release
# the GIL so other Python threads can run.
with nogil:
check_status(self.client.PrepareActorCheckpoint(
c_actor_id, checkpoint_id))
return ActorCheckpointID(checkpoint_id.Binary())
def notify_actor_resumed_from_checkpoint(self, ActorID actor_id,
ActorCheckpointID checkpoint_id):
check_status(self.client.NotifyActorResumedFromCheckpoint(
actor_id.native(), checkpoint_id.native()))
def set_resource(self, basestring resource_name,
double capacity, ClientID client_id):
self.client.SetResource(resource_name.encode("ascii"), capacity,
CClientID.FromBinary(client_id.binary()))
@property
def job_id(self):
return JobID(self.client.GetJobID().Binary())
cdef deserialize_args(
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &arg_reference_ids):
@@ -770,7 +710,6 @@ cdef class CoreWorker:
def wait(self, object_ids, int num_returns, int64_t timeout_ms,
TaskID current_task_id):
cdef:
WaitResultPair result
c_vector[CObjectID] wait_ids
c_vector[c_bool] results
CTaskID c_task_id = current_task_id.native()
@@ -1099,6 +1038,35 @@ cdef class CoreWorker:
async_retry_with_plasma_callback,
<void*>future)
def push_error(self, JobID job_id, error_type, error_message,
double timestamp):
check_status(self.core_worker.get().PushError(
job_id.native(), error_type.encode("ascii"),
error_message.encode("ascii"), timestamp))
def prepare_actor_checkpoint(self, ActorID actor_id):
cdef:
CActorCheckpointID checkpoint_id
CActorID c_actor_id = actor_id.native()
# PrepareActorCheckpoint will wait for raylet's reply, release
# the GIL so other Python threads can run.
with nogil:
check_status(self.core_worker.get().PrepareActorCheckpoint(
c_actor_id, &checkpoint_id))
return ActorCheckpointID(checkpoint_id.Binary())
def notify_actor_resumed_from_checkpoint(self, ActorID actor_id,
ActorCheckpointID checkpoint_id):
check_status(self.core_worker.get().NotifyActorResumedFromCheckpoint(
actor_id.native(), checkpoint_id.native()))
def set_resource(self, basestring resource_name,
double capacity, ClientID client_id):
self.core_worker.get().SetResource(
resource_name.encode("ascii"), capacity,
CClientID.FromBinary(client_id.binary()))
cdef void async_set_result_callback(shared_ptr[CRayObject] obj,
CObjectID object_id,
void *future) with gil:
+1 -1
View File
@@ -31,5 +31,5 @@ def set_resource(resource_name, capacity, client_id=None):
if (capacity < 0) or (capacity != int(capacity)):
raise ValueError(
"Capacity {} must be a non-negative integer.".format(capacity))
return ray.worker.global_worker.raylet_client.set_resource(
return ray.worker.global_worker.core_worker.set_resource(
resource_name, capacity, client_id_obj)
+4 -3
View File
@@ -817,8 +817,9 @@ class FunctionActorManager:
if actor.should_checkpoint(checkpoint_context):
try:
now = int(1000 * time.time())
checkpoint_id = (self._worker.raylet_client.
prepare_actor_checkpoint(actor_id))
checkpoint_id = (
self._worker.core_worker.prepare_actor_checkpoint(actor_id)
)
checkpoint_info.checkpoint_ids.append(checkpoint_id)
actor.save_checkpoint(actor_id, checkpoint_id)
if (len(checkpoint_info.checkpoint_ids) >
@@ -865,7 +866,7 @@ class FunctionActorManager:
for checkpoint in checkpoints), msg
# Notify raylet that this actor has been resumed from
# a checkpoint.
(self._worker.raylet_client.
(self._worker.core_worker.
notify_actor_resumed_from_checkpoint(
actor_id, checkpoint_id))
except Exception:
+12 -4
View File
@@ -12,6 +12,8 @@ from libcpp.vector cimport vector as c_vector
from ray.includes.unique_ids cimport (
CActorID,
CActorCheckpointID,
CClientID,
CJobID,
CTaskID,
CObjectID,
@@ -31,7 +33,6 @@ from ray.includes.common cimport (
CGcsClientOptions,
)
from ray.includes.task cimport CTaskSpec
from ray.includes.libraylet cimport CRayletClient
ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \
ResourceMappingType
@@ -107,9 +108,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[shared_ptr[CBuffer]] &metadatas,
c_vector[shared_ptr[CRayObject]] *return_objects)
# TODO(edoakes): remove this once the raylet client is no longer used
# directly.
CRayletClient &GetRayletClient()
CJobID GetCurrentJobId()
CTaskID GetCurrentTaskId()
const CActorID &GetActorId()
@@ -159,3 +157,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
ray_callback_function successs_callback,
ray_callback_function fallback_callback,
void* python_future)
CRayStatus PushError(const CJobID &job_id, const c_string &type,
const c_string &error_message, double timestamp)
CRayStatus PrepareActorCheckpoint(const CActorID &actor_id,
CActorCheckpointID *checkpoint_id)
CRayStatus NotifyActorResumedFromCheckpoint(
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
CRayStatus SetResource(const c_string &resource_name,
const double capacity,
const CClientID &client_Id)
-77
View File
@@ -1,77 +0,0 @@
from libc.stdint cimport int64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string as c_string
from libcpp.utility cimport pair
from libcpp.vector cimport vector as c_vector
from ray.includes.common cimport (
CLanguage,
CRayStatus,
)
from ray.includes.unique_ids cimport (
CActorCheckpointID,
CActorID,
CClientID,
CJobID,
CWorkerID,
CObjectID,
CTaskID,
)
from ray.includes.task cimport CTaskSpec
cdef extern from "ray/protobuf/gcs.pb.h" nogil:
cdef cppclass GCSProfileEvent "ProfileTableData::ProfileEvent":
void set_event_type(const c_string &value)
void set_start_time(double value)
void set_end_time(double value)
c_string set_extra_data(const c_string &value)
GCSProfileEvent()
cdef cppclass GCSProfileTableData "ProfileTableData":
void set_component_type(const c_string &value)
void set_component_id(const c_string &value)
void set_node_ip_address(const c_string &value)
GCSProfileEvent *add_profile_events()
GCSProfileTableData()
ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair
cdef extern from "ray/raylet/raylet_client.h" nogil:
cdef cppclass CRayletClient "ray::raylet::RayletClient":
CRayletClient(const c_string &raylet_socket,
const CWorkerID &worker_id,
c_bool is_worker, const CJobID &job_id,
const CLanguage &language)
CRayStatus Disconnect()
CRayStatus SubmitTask(const CTaskSpec &task_spec)
CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids,
c_bool fetch_only,
c_bool is_direct_call_task,
const CTaskID &current_task_id)
CRayStatus NotifyUnblocked(const CTaskID &current_task_id)
CRayStatus Wait(const c_vector[CObjectID] &object_ids,
int num_returns, int64_t timeout_milliseconds,
c_bool wait_local, c_bool is_direct_call_task,
const CTaskID &current_task_id,
WaitResultPair *result)
CRayStatus PushError(const CJobID &job_id, const c_string &type,
const c_string &error_message, double timestamp)
CRayStatus PushProfileEvents(
const GCSProfileTableData &profile_events)
CRayStatus FreeObjects(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
CRayStatus PrepareActorCheckpoint(const CActorID &actor_id,
CActorCheckpointID &checkpoint_id)
CRayStatus NotifyActorResumedFromCheckpoint(
const CActorID &actor_id, const CActorCheckpointID &checkpoint_id)
CRayStatus SetResource(const c_string &resource_name,
const double capacity,
const CClientID &client_Id)
CLanguage GetLanguage() const
CWorkerID GetWorkerID() const
CJobID GetJobID() const
c_bool IsWorker() const
+1 -2
View File
@@ -332,8 +332,7 @@ def test_driver_put_errors(ray_start_object_store_memory):
# were evicted and whose originating tasks are still running, this
# for-loop should hang on its first iteration and push an error to the
# driver.
ray.worker.global_worker.raylet_client.fetch_or_reconstruct([args[0]],
False)
ray.wait([args[0]], timeout=30)
def error_check(errors):
return len(errors) > 1
+1 -1
View File
@@ -60,7 +60,7 @@ def push_error_to_driver(worker, error_type, message, job_id=None):
if job_id is None:
job_id = ray.JobID.nil()
assert isinstance(job_id, ray.JobID)
worker.raylet_client.push_error(job_id, error_type, message, time.time())
worker.core_worker.push_error(job_id, error_type, message, time.time())
def push_error_to_driver_through_redis(redis_client,
-1
View File
@@ -1252,7 +1252,6 @@ def connect(node,
node.node_ip_address,
node.node_manager_port,
)
worker.raylet_client = ray._raylet.RayletClient(worker.core_worker)
if driver_object_store_memory is not None:
worker.core_worker.set_object_store_client_options(
+20 -2
View File
@@ -1,7 +1,5 @@
#include "ray/core_worker/core_worker.h"
#include <cstdlib>
#include "boost/fiber/all.hpp"
#include "ray/common/ray_config.h"
#include "ray/common/task/task_util.h"
@@ -646,6 +644,26 @@ TaskID CoreWorker::GetCallerId() const {
return caller_id;
}
Status CoreWorker::PushError(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp) {
return local_raylet_client_->PushError(job_id, type, error_message, timestamp);
}
Status CoreWorker::PrepareActorCheckpoint(const ActorID &actor_id,
ActorCheckpointID *checkpoint_id) {
return local_raylet_client_->PrepareActorCheckpoint(actor_id, checkpoint_id);
}
Status CoreWorker::NotifyActorResumedFromCheckpoint(
const ActorID &actor_id, const ActorCheckpointID &checkpoint_id) {
return local_raylet_client_->NotifyActorResumedFromCheckpoint(actor_id, checkpoint_id);
}
Status CoreWorker::SetResource(const std::string &resource_name, const double capacity,
const ClientID &client_id) {
return local_raylet_client_->SetResource(resource_name, capacity, client_id);
}
Status CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
+34
View File
@@ -286,6 +286,40 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// of the bytes zeroed out.
TaskID GetCallerId() const;
/// Push an error to the relevant driver.
///
/// \param[in] The ID of the job_id that the error is for.
/// \param[in] The type of the error.
/// \param[in] The error message.
/// \param[in] The timestamp of the error.
/// \return Status.
Status PushError(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp);
/// Request raylet backend to prepare a checkpoint for an actor.
///
/// \param[in] actor_id ID of the actor.
/// \param[out] checkpoint_id ID of the new checkpoint (output parameter).
/// \return Status.
Status PrepareActorCheckpoint(const ActorID &actor_id,
ActorCheckpointID *checkpoint_id);
/// Notify raylet backend that an actor was resumed from a checkpoint.
///
/// \param[in] actor_id ID of the actor.
/// \param[in] checkpoint_id ID of the checkpoint from which the actor was resumed.
/// \return Status.
Status NotifyActorResumedFromCheckpoint(const ActorID &actor_id,
const ActorCheckpointID &checkpoint_id);
/// Sets a resource with the specified capacity and client id
/// \param[in] resource_name Name of the resource to be set.
/// \param[in] capacity Capacity of the resource.
/// \param[in] client_Id ClientID where the resource is to be set.
/// \return Status
Status SetResource(const std::string &resource_name, const double capacity,
const ClientID &client_id);
/// Submit a normal task.
///
/// \param[in] function The remote function to execute.
@@ -125,10 +125,9 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource(
const auto node_id = JavaByteArrayToId<ClientID>(env, nodeId);
const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE);
auto &raylet_client =
reinterpret_cast<ray::CoreWorker *>(nativeCoreWorkerPointer)->GetRayletClient();
auto status = raylet_client.SetResource(native_resource_name,
static_cast<double>(capacity), node_id);
auto status =
reinterpret_cast<ray::CoreWorker *>(nativeCoreWorkerPointer)
->SetResource(native_resource_name, static_cast<double>(capacity), node_id);
env->ReleaseStringUTFChars(resourceName, native_resource_name);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
@@ -20,8 +20,7 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint(
const auto &task_spec = core_worker.GetWorkerContext().GetCurrentTask();
RAY_CHECK(task_spec->IsActorTask());
ActorCheckpointID checkpoint_id;
auto status =
core_worker.GetRayletClient().PrepareActorCheckpoint(actor_id, checkpoint_id);
auto status = core_worker.PrepareActorCheckpoint(actor_id, &checkpoint_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
jbyteArray result = env->NewByteArray(checkpoint_id.Size());
env->SetByteArrayRegion(result, 0, checkpoint_id.Size(),
@@ -35,8 +34,7 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativeNotifyActorResumedFromCheckpo
auto &core_worker = *reinterpret_cast<ray::CoreWorker *>(nativeCoreWorkerPointer);
const auto &actor_id = core_worker.GetWorkerContext().GetCurrentActorID();
const auto checkpoint_id = JavaByteArrayToId<ActorCheckpointID>(env, checkpointId);
auto status = core_worker.GetRayletClient().NotifyActorResumedFromCheckpoint(
actor_id, checkpoint_id);
auto status = core_worker.NotifyActorResumedFromCheckpoint(actor_id, checkpoint_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
+2 -2
View File
@@ -307,7 +307,7 @@ Status raylet::RayletClient::FreeObjects(const std::vector<ObjectID> &object_ids
}
Status raylet::RayletClient::PrepareActorCheckpoint(const ActorID &actor_id,
ActorCheckpointID &checkpoint_id) {
ActorCheckpointID *checkpoint_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreatePrepareActorCheckpointRequest(fbb, to_flatbuf(fbb, actor_id));
@@ -320,7 +320,7 @@ Status raylet::RayletClient::PrepareActorCheckpoint(const ActorID &actor_id,
if (!status.ok()) return status;
auto reply_message =
flatbuffers::GetRoot<protocol::PrepareActorCheckpointReply>(reply.get());
checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str());
*checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str());
return Status::OK();
}
+3 -3
View File
@@ -221,11 +221,11 @@ class RayletClient : public WorkerLeaseInterface {
/// Request raylet backend to prepare a checkpoint for an actor.
///
/// \param actor_id ID of the actor.
/// \param checkpoint_id ID of the new checkpoint (output parameter).
/// \param[in] actor_id ID of the actor.
/// \param[out] checkpoint_id ID of the new checkpoint (output parameter).
/// \return ray::Status.
ray::Status PrepareActorCheckpoint(const ActorID &actor_id,
ActorCheckpointID &checkpoint_id);
ActorCheckpointID *checkpoint_id);
/// Notify raylet backend that an actor was resumed from a checkpoint.
///