[core worker] Python core worker normal task submission (#5566)

This commit is contained in:
Edward Oakes
2019-09-14 13:02:53 -07:00
committed by Eric Liang
parent 5f88823c49
commit a5d7de6aaf
11 changed files with 194 additions and 80 deletions
+66 -5
View File
@@ -27,6 +27,8 @@ from ray.includes.common cimport (
CRayObject,
CRayStatus,
CGcsClientOptions,
CTaskArg,
CRayFunction,
LocalMemoryBuffer,
LANGUAGE_CPP,
LANGUAGE_JAVA,
@@ -46,7 +48,7 @@ from ray.includes.unique_ids cimport (
CObjectID,
CClientID,
)
from ray.includes.libcoreworker cimport CCoreWorker
from ray.includes.libcoreworker cimport CCoreWorker, CTaskOptions
from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
from ray.exceptions import RayletError, ObjectStoreFullError
@@ -96,6 +98,13 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise RayletError(message)
cdef VectorToObjectIDs(const c_vector[CObjectID] &object_ids):
result = []
for i in range(object_ids.size()):
result.append(ObjectID(object_ids[i].Binary()))
return result
cdef c_vector[CObjectID] ObjectIDsToVector(object_ids):
"""A helper function that converts a Python list of object IDs to a vector.
@@ -244,6 +253,16 @@ cdef unordered_map[c_string, double] resource_map_from_dict(resource_map):
return out
cdef c_vector[c_string] string_vector_from_list(list string_list):
cdef:
c_vector[c_string] out
for s in string_list:
if not isinstance(s, bytes):
raise TypeError("string_list elements must be bytes")
out.push_back(s)
return out
cdef class RayletClient:
cdef CRayletClient* client
@@ -400,6 +419,10 @@ cdef class CoreWorker:
"outside _raylet. See __init__.py for "
"details.")
def disconnect(self):
with nogil:
self.core_worker.get().Disconnect()
def get_objects(self, object_ids, TaskID current_task_id):
cdef:
c_vector[shared_ptr[CRayObject]] results
@@ -538,6 +561,48 @@ cdef class CoreWorker:
with nogil:
self.core_worker.get().SetCurrentJobId(c_job_id)
def submit_task(self,
function_descriptor,
args,
int num_return_vals,
resources):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
c_string pickled_str
shared_ptr[CBuffer] arg_data
shared_ptr[CBuffer] arg_metadata
c_resources = resource_map_from_dict(resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
for arg in args:
if isinstance(arg, ObjectID):
args_vector.push_back(
CTaskArg.PassByReference((<ObjectID>arg).native()))
else:
pickled_str = pickle.dumps(
arg, protocol=pickle.HIGHEST_PROTOCOL)
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(pickled_str.data()),
pickled_str.size(),
True))
args_vector.push_back(
CTaskArg.PassByValue(
make_shared[CRayObject](arg_data, arg_metadata)))
with nogil:
check_status(self.core_worker.get().Tasks().SubmitTask(
ray_function, args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
def set_object_store_client_options(self, c_string client_name,
int64_t limit_bytes):
with nogil:
@@ -552,7 +617,3 @@ cdef class CoreWorker:
message = self.core_worker.get().Objects().MemoryUsageString()
return message.decode("utf-8")
def disconnect(self):
with nogil:
self.core_worker.get().Disconnect()
+50 -6
View File
@@ -2,11 +2,13 @@ from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t
from libc.stdint cimport uint8_t, uint64_t, int64_t
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
from ray.includes.unique_ids cimport (
CActorID,
CActorHandleID,
CJobID,
CWorkerID,
CObjectID,
@@ -138,15 +140,57 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil:
size_t Size() const
cdef cppclass LocalMemoryBuffer(CBuffer):
LocalMemoryBuffer(uint8_t *data, size_t size)
LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data)
cdef extern from "ray/core_worker/store_provider/store_provider.h" nogil:
cdef extern from "ray/common/ray_object.h" nogil:
cdef cppclass CRayObject "ray::RayObject":
const shared_ptr[CBuffer] &GetData()
const size_t DataSize() const
const shared_ptr[CBuffer] &GetMetadata() const
c_bool HasData() const
c_bool HasMetadata() const
const size_t DataSize() const
const shared_ptr[CBuffer] &GetData()
const shared_ptr[CBuffer] &GetMetadata() const
cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CRayFunction "ray::RayFunction":
CRayFunction()
CRayFunction(CLanguage language,
const c_vector[c_string] function_descriptor)
CLanguage GetLanguage()
c_vector[c_string] GetFunctionDescriptor()
cdef cppclass CTaskArg "ray::TaskArg":
@staticmethod
CTaskArg PassByReference(const CObjectID &object_id)
@staticmethod
CTaskArg PassByValue(const shared_ptr[CRayObject] &data)
cdef extern from "ray/core_worker/task_interface.h" nogil:
cdef cppclass CTaskOptions "ray::TaskOptions":
CTaskOptions()
CTaskOptions(int num_returns,
unordered_map[c_string, double] &resources)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
CActorCreationOptions(uint64_t max_reconstructions,
const unordered_map[c_string, double] &resources)
cdef cppclass CActorHandle "ray::ActorHandle":
CActorHandle(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CLanguage actor_language,
const c_vector[c_string] &actor_creation_task_function_descriptor)
CActorHandle(const CActorHandle &other)
CActorID ActorID() const
CActorHandleID ActorHandleID() const
c_vector[c_string] ActorCreationTaskFunctionDescriptor() const
CObjectID ActorCursor() const
int64_t TaskCursor() const
int64_t NumForks() const
CActorHandle Fork()
void Serialize(c_string *output)
CActorHandle Deserialize(const c_string &data)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
+23 -4
View File
@@ -1,6 +1,6 @@
from libc.stdint cimport int64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr
from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
@@ -10,9 +10,14 @@ from ray.includes.unique_ids cimport (
CObjectID,
)
from ray.includes.common cimport (
CActorCreationOptions,
CActorHandle,
CBuffer,
CRayStatus,
CRayFunction,
CRayObject,
CRayStatus,
CTaskArg,
CTaskOptions,
CWorkerType,
CLanguage,
CGcsClientOptions,
@@ -20,6 +25,20 @@ from ray.includes.common cimport (
from ray.includes.libraylet cimport CRayletClient
cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil:
cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface":
CRayStatus SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids)
CRayStatus CreateActor(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options,
unique_ptr[CActorHandle] *handle)
CRayStatus SubmitActorTask(
CActorHandle &handle, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
cdef extern from "ray/core_worker/object_interface.h" nogil:
cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface":
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
@@ -50,13 +69,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CObjectInterface &Objects()
# CTaskSubmissionInterface &Tasks()
CTaskSubmissionInterface &Tasks()
# CTaskExecutionInterface &Execution()
# TODO(edoakes): remove this once the raylet client is no longer used
# directly.
CRayletClient &GetRayletClient()
# TODO(edoakes): remove this once the Python core worker uses the task
# TODO(edoakes): remove these once the Python core worker uses the task
# interfaces
void SetCurrentJobId(const CJobID &job_id)
void SetCurrentTaskId(const CTaskID &task_id)
+7 -4
View File
@@ -707,11 +707,14 @@ class Worker(object):
self.current_job_id, self.current_task_id,
self.task_context.task_index, actor_id)
else:
# This is a normal task.
task_id = TaskID.for_normal_task(self.current_job_id,
self.current_task_id,
self.task_context.task_index)
# Normal tasks are submitted through the core worker (in the
# future, all tasks will be).
return self.core_worker.submit_task(function_descriptor_list,
args_for_raylet,
num_return_vals, resources)
# Actor creation tasks and actor tasks are submitted directly to
# the raylet.
task = ray._raylet.TaskSpec(
task_id,
job_id,
+13 -5
View File
@@ -13,11 +13,19 @@ namespace ray {
using WorkerType = rpc::WorkerType;
/// Information about a remote function.
struct RayFunction {
/// Language of the remote function.
const Language language;
/// Function descriptor of the remote function.
const std::vector<std::string> function_descriptor;
class RayFunction {
public:
RayFunction() {}
RayFunction(Language language, const std::vector<std::string> &function_descriptor)
: language_(language), function_descriptor_(function_descriptor) {}
Language GetLanguage() const { return language_; }
std::vector<std::string> GetFunctionDescriptor() const { return function_descriptor_; }
private:
Language language_;
std::vector<std::string> function_descriptor_;
};
/// Argument of a task.
+1 -1
View File
@@ -83,7 +83,7 @@ void WorkerContext::SetCurrentTaskId(const TaskID &task_id) {
}
void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_job_id_ = task_spec.JobId();
SetCurrentJobId(task_spec.JobId());
GetThreadContext().SetCurrentTask(task_spec);
if (task_spec.IsActorCreationTask()) {
RAY_CHECK(current_actor_id_.IsNil());
@@ -46,7 +46,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork
RAY_CHECK(local_java_task_executor);
// convert RayFunction
jobject ray_function_array_list =
NativeStringVectorToJavaStringList(env, ray_function.function_descriptor);
NativeStringVectorToJavaStringList(env, ray_function.GetFunctionDescriptor());
// convert args
// TODO (kfstorm): Avoid copying binary data from Java to C++
jobject args_array_list = NativeVectorToJavaList<std::shared_ptr<ray::RayObject>>(
@@ -26,26 +26,6 @@ class CoreWorkerMemoryStore {
/// \return Status.
Status Put(const ObjectID &object_id, const RayObject &object);
/// Create and return a buffer in the object store that can be directly written
/// into. After writing to the buffer, the caller must call `Seal()` to finalize
/// the object. The `Create()` and `Seal()` combination is an alternative interface
/// to `Put()` that allows frontends to avoid an extra copy when possible.
///
/// \param[in] metadata Metadata of the object to be written.
/// \param[in] data_size Size of the object to be written.
/// \param[in] object_id Object ID specified by the user.
/// \param[out] data Buffer for the user to write the object into.
/// \return Status.
Status Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const ObjectID &object_id, std::shared_ptr<Buffer> *data);
/// Finalize placing an object into the object store. This should be called after
/// a corresponding `Create()` call and then writing into the returned buffer.
///
/// \param[in] object_id Object ID corresponding to the object.
/// \return Status.
Status Seal(const ObjectID &object_id);
/// Get a list of objects from the object store.
///
/// \param[in] object_ids IDs of the objects to get. Duplicates are not allowed.
+6 -6
View File
@@ -121,10 +121,10 @@ void CoreWorkerTaskInterface::BuildCommonTaskSpec(
const std::unordered_map<std::string, double> &required_placement_resources,
TaskTransportType transport_type, std::vector<ObjectID> *return_ids) {
// Build common task spec.
builder.SetCommonTaskSpec(task_id, function.language, function.function_descriptor,
worker_context_.GetCurrentJobID(),
worker_context_.GetCurrentTaskID(), task_index, num_returns,
required_resources, required_placement_resources);
builder.SetCommonTaskSpec(
task_id, function.GetLanguage(), function.GetFunctionDescriptor(),
worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), task_index,
num_returns, required_resources, required_placement_resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {
@@ -177,8 +177,8 @@ Status CoreWorkerTaskInterface::CreateActor(
actor_creation_options.is_direct_call);
*actor_handle = std::unique_ptr<ActorHandle>(new ActorHandle(
actor_id, ActorHandleID::Nil(), function.language,
actor_creation_options.is_direct_call, function.function_descriptor));
actor_id, ActorHandleID::Nil(), function.GetLanguage(),
actor_creation_options.is_direct_call, function.GetFunctionDescriptor()));
(*actor_handle)->IncreaseTaskCounter();
(*actor_handle)->SetActorCursor(return_ids[0]);
+3 -3
View File
@@ -22,13 +22,13 @@ class CoreWorker;
/// Options of a non-actor-creation task.
struct TaskOptions {
TaskOptions() {}
TaskOptions(int num_returns, const std::unordered_map<std::string, double> &resources)
TaskOptions(int num_returns, std::unordered_map<std::string, double> &resources)
: num_returns(num_returns), resources(resources) {}
/// Number of returns of this task.
const int num_returns = 1;
int num_returns = 1;
/// Resources required by this task.
const std::unordered_map<std::string, double> resources;
std::unordered_map<std::string, double> resources;
};
/// Options of an actor creation task.
+24 -25
View File
@@ -49,7 +49,7 @@ std::shared_ptr<Buffer> GenerateRandomBuffer() {
}
std::unique_ptr<ActorHandle> CreateActorHelper(
CoreWorker &worker, const std::unordered_map<std::string, double> &resources,
CoreWorker &worker, std::unordered_map<std::string, double> &resources,
bool is_direct_call, uint64_t max_reconstructions) {
std::unique_ptr<ActorHandle> actor_handle;
@@ -57,7 +57,7 @@ std::unique_ptr<ActorHandle> CreateActorHelper(
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction func{ray::Language::PYTHON, {"actor creation task"}};
RayFunction func(ray::Language::PYTHON, {"actor creation task"});
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
@@ -171,22 +171,22 @@ class CoreWorkerTest : public ::testing::Test {
void TestStoreProvider(StoreProviderType type);
// Test normal tasks.
void TestNormalTask(const std::unordered_map<std::string, double> &resources);
void TestNormalTask(std::unordered_map<std::string, double> &resources);
// Test actor tasks.
void TestActorTask(const std::unordered_map<std::string, double> &resources,
void TestActorTask(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
// Test actor failure case, verify that the tasks would either succeed or
// fail with exceptions, in that case the return objects fetched from `Get`
// contain errors.
void TestActorFailure(const std::unordered_map<std::string, double> &resources,
void TestActorFailure(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
// Test actor failover case. Verify that actor can be reconstructed successfully,
// and as long as we wait for actor reconstruction before submitting new tasks,
// it is guaranteed that all tasks are successfully completed.
void TestActorReconstruction(const std::unordered_map<std::string, double> &resources,
void TestActorReconstruction(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
protected:
@@ -216,8 +216,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker,
return WaitForCondition(condition_func, timeout_ms);
}
void CoreWorkerTest::TestNormalTask(
const std::unordered_map<std::string, double> &resources) {
void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr);
@@ -236,7 +235,7 @@ void CoreWorkerTest::TestNormalTask(
TaskArg::PassByValue(std::make_shared<RayObject>(buffer1, nullptr)));
args.emplace_back(TaskArg::PassByReference(object_id));
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
TaskOptions options;
std::vector<ObjectID> return_ids;
@@ -258,8 +257,8 @@ void CoreWorkerTest::TestNormalTask(
}
}
void CoreWorkerTest::TestActorTask(
const std::unordered_map<std::string, double> &resources, bool is_direct_call) {
void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &resources,
bool is_direct_call) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr);
@@ -281,7 +280,7 @@ void CoreWorkerTest::TestActorTask(
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options,
&return_ids));
@@ -323,7 +322,7 @@ void CoreWorkerTest::TestActorTask(
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
if (is_direct_call) {
@@ -348,7 +347,7 @@ void CoreWorkerTest::TestActorTask(
}
void CoreWorkerTest::TestActorReconstruction(
const std::unordered_map<std::string, double> &resources, bool is_direct_call) {
std::unordered_map<std::string, double> &resources, bool is_direct_call) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr);
@@ -389,7 +388,7 @@ void CoreWorkerTest::TestActorReconstruction(
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
@@ -405,8 +404,8 @@ void CoreWorkerTest::TestActorReconstruction(
}
}
void CoreWorkerTest::TestActorFailure(
const std::unordered_map<std::string, double> &resources, bool is_direct_call) {
void CoreWorkerTest::TestActorFailure(std::unordered_map<std::string, double> &resources,
bool is_direct_call) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr);
@@ -435,7 +434,7 @@ void CoreWorkerTest::TestActorFailure(
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
auto status =
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids);
@@ -641,7 +640,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
// to benchmark performance.
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction function{ray::Language::PYTHON, {}};
RayFunction function(ray::Language::PYTHON, {});
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
@@ -649,8 +648,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, {}};
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
ActorHandleID::Nil(), function.language, true,
function.function_descriptor);
ActorHandleID::Nil(), function.GetLanguage(), true,
function.GetFunctionDescriptor());
// Manually create `num_tasks` task specs, and for each of them create a
// `PushTaskRequest`, this is to batch performance of TaskSpec
@@ -664,8 +663,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
auto num_returns = options.num_returns;
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), function.language,
function.function_descriptor, job_id, RandomTaskId(), 0,
builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
num_returns, resources, resources);
// Set task arguments.
for (const auto &arg : args) {
@@ -703,7 +702,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
// Test creating actor.
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
@@ -725,7 +724,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, {}};
RayFunction func(ray::Language::PYTHON, {});
RAY_CHECK_OK(
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids));