From a5d7de6aaf7114c629c10cf71fb33dbd8c03576b Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Sat, 14 Sep 2019 13:02:53 -0700 Subject: [PATCH] [core worker] Python core worker normal task submission (#5566) --- python/ray/_raylet.pyx | 71 +++++++++++++++++-- python/ray/includes/common.pxd | 56 +++++++++++++-- python/ray/includes/libcoreworker.pxd | 27 +++++-- python/ray/worker.py | 11 +-- src/ray/core_worker/common.h | 18 +++-- src/ray/core_worker/context.cc | 2 +- .../java/org_ray_runtime_RayNativeRuntime.cc | 2 +- .../memory_store/memory_store.h | 20 ------ src/ray/core_worker/task_interface.cc | 12 ++-- src/ray/core_worker/task_interface.h | 6 +- src/ray/core_worker/test/core_worker_test.cc | 49 +++++++------ 11 files changed, 194 insertions(+), 80 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index dba03b2fe..c0bd06926 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -27,6 +27,8 @@ from ray.includes.common cimport ( CRayObject, CRayStatus, CGcsClientOptions, + CTaskArg, + CRayFunction, LocalMemoryBuffer, LANGUAGE_CPP, LANGUAGE_JAVA, @@ -46,7 +48,7 @@ from ray.includes.unique_ids cimport ( CObjectID, CClientID, ) -from ray.includes.libcoreworker cimport CCoreWorker +from ray.includes.libcoreworker cimport CCoreWorker, CTaskOptions from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig from ray.exceptions import RayletError, ObjectStoreFullError @@ -96,6 +98,13 @@ cdef int check_status(const CRayStatus& status) nogil except -1: raise RayletError(message) +cdef VectorToObjectIDs(const c_vector[CObjectID] &object_ids): + result = [] + for i in range(object_ids.size()): + result.append(ObjectID(object_ids[i].Binary())) + return result + + cdef c_vector[CObjectID] ObjectIDsToVector(object_ids): """A helper function that converts a Python list of object IDs to a vector. @@ -244,6 +253,16 @@ cdef unordered_map[c_string, double] resource_map_from_dict(resource_map): return out +cdef c_vector[c_string] string_vector_from_list(list string_list): + cdef: + c_vector[c_string] out + for s in string_list: + if not isinstance(s, bytes): + raise TypeError("string_list elements must be bytes") + out.push_back(s) + return out + + cdef class RayletClient: cdef CRayletClient* client @@ -400,6 +419,10 @@ cdef class CoreWorker: "outside _raylet. See __init__.py for " "details.") + def disconnect(self): + with nogil: + self.core_worker.get().Disconnect() + def get_objects(self, object_ids, TaskID current_task_id): cdef: c_vector[shared_ptr[CRayObject]] results @@ -538,6 +561,48 @@ cdef class CoreWorker: with nogil: self.core_worker.get().SetCurrentJobId(c_job_id) + def submit_task(self, + function_descriptor, + args, + int num_return_vals, + resources): + cdef: + unordered_map[c_string, double] c_resources + CTaskOptions task_options + CRayFunction ray_function + c_vector[CTaskArg] args_vector + c_vector[CObjectID] return_ids + c_string pickled_str + shared_ptr[CBuffer] arg_data + shared_ptr[CBuffer] arg_metadata + + c_resources = resource_map_from_dict(resources) + task_options = CTaskOptions(num_return_vals, c_resources) + ray_function = CRayFunction( + LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) + + for arg in args: + if isinstance(arg, ObjectID): + args_vector.push_back( + CTaskArg.PassByReference((arg).native())) + else: + pickled_str = pickle.dumps( + arg, protocol=pickle.HIGHEST_PROTOCOL) + arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer]( + (pickled_str.data()), + pickled_str.size(), + True)) + args_vector.push_back( + CTaskArg.PassByValue( + make_shared[CRayObject](arg_data, arg_metadata))) + + with nogil: + check_status(self.core_worker.get().Tasks().SubmitTask( + ray_function, args_vector, task_options, &return_ids)) + + return VectorToObjectIDs(return_ids) + def set_object_store_client_options(self, c_string client_name, int64_t limit_bytes): with nogil: @@ -552,7 +617,3 @@ cdef class CoreWorker: message = self.core_worker.get().Objects().MemoryUsageString() return message.decode("utf-8") - - def disconnect(self): - with nogil: - self.core_worker.get().Disconnect() diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 8feeec351..beb716ac2 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -2,11 +2,13 @@ from libcpp cimport bool as c_bool from libcpp.memory cimport shared_ptr from libcpp.string cimport string as c_string -from libc.stdint cimport uint8_t +from libc.stdint cimport uint8_t, uint64_t, int64_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( + CActorID, + CActorHandleID, CJobID, CWorkerID, CObjectID, @@ -138,15 +140,57 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil: size_t Size() const cdef cppclass LocalMemoryBuffer(CBuffer): - LocalMemoryBuffer(uint8_t *data, size_t size) + LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data) -cdef extern from "ray/core_worker/store_provider/store_provider.h" nogil: +cdef extern from "ray/common/ray_object.h" nogil: cdef cppclass CRayObject "ray::RayObject": - const shared_ptr[CBuffer] &GetData() - const size_t DataSize() const - const shared_ptr[CBuffer] &GetMetadata() const c_bool HasData() const c_bool HasMetadata() const + const size_t DataSize() const + const shared_ptr[CBuffer] &GetData() + const shared_ptr[CBuffer] &GetMetadata() const + +cdef extern from "ray/core_worker/common.h" nogil: + cdef cppclass CRayFunction "ray::RayFunction": + CRayFunction() + CRayFunction(CLanguage language, + const c_vector[c_string] function_descriptor) + CLanguage GetLanguage() + c_vector[c_string] GetFunctionDescriptor() + + cdef cppclass CTaskArg "ray::TaskArg": + @staticmethod + CTaskArg PassByReference(const CObjectID &object_id) + + @staticmethod + CTaskArg PassByValue(const shared_ptr[CRayObject] &data) + +cdef extern from "ray/core_worker/task_interface.h" nogil: + cdef cppclass CTaskOptions "ray::TaskOptions": + CTaskOptions() + CTaskOptions(int num_returns, + unordered_map[c_string, double] &resources) + + cdef cppclass CActorCreationOptions "ray::ActorCreationOptions": + CActorCreationOptions(uint64_t max_reconstructions, + const unordered_map[c_string, double] &resources) + + cdef cppclass CActorHandle "ray::ActorHandle": + CActorHandle( + const CActorID &actor_id, const CActorHandleID &actor_handle_id, + const CLanguage actor_language, + const c_vector[c_string] &actor_creation_task_function_descriptor) + + CActorHandle(const CActorHandle &other) + CActorID ActorID() const + CActorHandleID ActorHandleID() const + c_vector[c_string] ActorCreationTaskFunctionDescriptor() const + CObjectID ActorCursor() const + int64_t TaskCursor() const + int64_t NumForks() const + CActorHandle Fork() + void Serialize(c_string *output) + CActorHandle Deserialize(const c_string &data) cdef extern from "ray/gcs/gcs_client_interface.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index bcf747694..3199ed451 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -1,6 +1,6 @@ from libc.stdint cimport int64_t from libcpp cimport bool as c_bool -from libcpp.memory cimport shared_ptr +from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector @@ -10,9 +10,14 @@ from ray.includes.unique_ids cimport ( CObjectID, ) from ray.includes.common cimport ( + CActorCreationOptions, + CActorHandle, CBuffer, - CRayStatus, + CRayFunction, CRayObject, + CRayStatus, + CTaskArg, + CTaskOptions, CWorkerType, CLanguage, CGcsClientOptions, @@ -20,6 +25,20 @@ from ray.includes.common cimport ( from ray.includes.libraylet cimport CRayletClient +cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil: + cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface": + CRayStatus SubmitTask( + const CRayFunction &function, const c_vector[CTaskArg] &args, + const CTaskOptions &options, c_vector[CObjectID] *return_ids) + CRayStatus CreateActor( + const CRayFunction &function, const c_vector[CTaskArg] &args, + const CActorCreationOptions &options, + unique_ptr[CActorHandle] *handle) + CRayStatus SubmitActorTask( + CActorHandle &handle, const CRayFunction &function, + const c_vector[CTaskArg] &args, const CTaskOptions &options, + c_vector[CObjectID] *return_ids) + cdef extern from "ray/core_worker/object_interface.h" nogil: cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface": CRayStatus SetClientOptions(c_string client_name, int64_t limit) @@ -50,13 +69,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CWorkerType &GetWorkerType() CLanguage &GetLanguage() CObjectInterface &Objects() - # CTaskSubmissionInterface &Tasks() + CTaskSubmissionInterface &Tasks() # CTaskExecutionInterface &Execution() # TODO(edoakes): remove this once the raylet client is no longer used # directly. CRayletClient &GetRayletClient() - # TODO(edoakes): remove this once the Python core worker uses the task + # TODO(edoakes): remove these once the Python core worker uses the task # interfaces void SetCurrentJobId(const CJobID &job_id) void SetCurrentTaskId(const CTaskID &task_id) diff --git a/python/ray/worker.py b/python/ray/worker.py index 0d9673e8a..bdc0e09ef 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -707,11 +707,14 @@ class Worker(object): self.current_job_id, self.current_task_id, self.task_context.task_index, actor_id) else: - # This is a normal task. - task_id = TaskID.for_normal_task(self.current_job_id, - self.current_task_id, - self.task_context.task_index) + # Normal tasks are submitted through the core worker (in the + # future, all tasks will be). + return self.core_worker.submit_task(function_descriptor_list, + args_for_raylet, + num_return_vals, resources) + # Actor creation tasks and actor tasks are submitted directly to + # the raylet. task = ray._raylet.TaskSpec( task_id, job_id, diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index ad540e083..f57ae3995 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -13,11 +13,19 @@ namespace ray { using WorkerType = rpc::WorkerType; /// Information about a remote function. -struct RayFunction { - /// Language of the remote function. - const Language language; - /// Function descriptor of the remote function. - const std::vector function_descriptor; +class RayFunction { + public: + RayFunction() {} + RayFunction(Language language, const std::vector &function_descriptor) + : language_(language), function_descriptor_(function_descriptor) {} + + Language GetLanguage() const { return language_; } + + std::vector GetFunctionDescriptor() const { return function_descriptor_; } + + private: + Language language_; + std::vector function_descriptor_; }; /// Argument of a task. diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 82f3c80bb..863dc8c37 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -83,7 +83,7 @@ void WorkerContext::SetCurrentTaskId(const TaskID &task_id) { } void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { - current_job_id_ = task_spec.JobId(); + SetCurrentJobId(task_spec.JobId()); GetThreadContext().SetCurrentTask(task_spec); if (task_spec.IsActorCreationTask()) { RAY_CHECK(current_actor_id_.IsNil()); diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index ea9082188..084a622ac 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -46,7 +46,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork RAY_CHECK(local_java_task_executor); // convert RayFunction jobject ray_function_array_list = - NativeStringVectorToJavaStringList(env, ray_function.function_descriptor); + NativeStringVectorToJavaStringList(env, ray_function.GetFunctionDescriptor()); // convert args // TODO (kfstorm): Avoid copying binary data from Java to C++ jobject args_array_list = NativeVectorToJavaList>( diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 8af5f2540..0a8422f81 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -26,26 +26,6 @@ class CoreWorkerMemoryStore { /// \return Status. Status Put(const ObjectID &object_id, const RayObject &object); - /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. - /// - /// \param[in] metadata Metadata of the object to be written. - /// \param[in] data_size Size of the object to be written. - /// \param[in] object_id Object ID specified by the user. - /// \param[out] data Buffer for the user to write the object into. - /// \return Status. - Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data); - - /// Finalize placing an object into the object store. This should be called after - /// a corresponding `Create()` call and then writing into the returned buffer. - /// - /// \param[in] object_id Object ID corresponding to the object. - /// \return Status. - Status Seal(const ObjectID &object_id); - /// Get a list of objects from the object store. /// /// \param[in] object_ids IDs of the objects to get. Duplicates are not allowed. diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 09774cd0e..5eeb4c32f 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -121,10 +121,10 @@ void CoreWorkerTaskInterface::BuildCommonTaskSpec( const std::unordered_map &required_placement_resources, TaskTransportType transport_type, std::vector *return_ids) { // Build common task spec. - builder.SetCommonTaskSpec(task_id, function.language, function.function_descriptor, - worker_context_.GetCurrentJobID(), - worker_context_.GetCurrentTaskID(), task_index, num_returns, - required_resources, required_placement_resources); + builder.SetCommonTaskSpec( + task_id, function.GetLanguage(), function.GetFunctionDescriptor(), + worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), task_index, + num_returns, required_resources, required_placement_resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -177,8 +177,8 @@ Status CoreWorkerTaskInterface::CreateActor( actor_creation_options.is_direct_call); *actor_handle = std::unique_ptr(new ActorHandle( - actor_id, ActorHandleID::Nil(), function.language, - actor_creation_options.is_direct_call, function.function_descriptor)); + actor_id, ActorHandleID::Nil(), function.GetLanguage(), + actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); (*actor_handle)->IncreaseTaskCounter(); (*actor_handle)->SetActorCursor(return_ids[0]); diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 35f481b41..802a7a504 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -22,13 +22,13 @@ class CoreWorker; /// Options of a non-actor-creation task. struct TaskOptions { TaskOptions() {} - TaskOptions(int num_returns, const std::unordered_map &resources) + TaskOptions(int num_returns, std::unordered_map &resources) : num_returns(num_returns), resources(resources) {} /// Number of returns of this task. - const int num_returns = 1; + int num_returns = 1; /// Resources required by this task. - const std::unordered_map resources; + std::unordered_map resources; }; /// Options of an actor creation task. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 35a37ef60..b5853463d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -49,7 +49,7 @@ std::shared_ptr GenerateRandomBuffer() { } std::unique_ptr CreateActorHelper( - CoreWorker &worker, const std::unordered_map &resources, + CoreWorker &worker, std::unordered_map &resources, bool is_direct_call, uint64_t max_reconstructions) { std::unique_ptr actor_handle; @@ -57,7 +57,7 @@ std::unique_ptr CreateActorHelper( uint8_t array[] = {1, 2, 3}; auto buffer = std::make_shared(array, sizeof(array)); - RayFunction func{ray::Language::PYTHON, {"actor creation task"}}; + RayFunction func(ray::Language::PYTHON, {"actor creation task"}); std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); @@ -171,22 +171,22 @@ class CoreWorkerTest : public ::testing::Test { void TestStoreProvider(StoreProviderType type); // Test normal tasks. - void TestNormalTask(const std::unordered_map &resources); + void TestNormalTask(std::unordered_map &resources); // Test actor tasks. - void TestActorTask(const std::unordered_map &resources, + void TestActorTask(std::unordered_map &resources, bool is_direct_call); // Test actor failure case, verify that the tasks would either succeed or // fail with exceptions, in that case the return objects fetched from `Get` // contain errors. - void TestActorFailure(const std::unordered_map &resources, + void TestActorFailure(std::unordered_map &resources, bool is_direct_call); // Test actor failover case. Verify that actor can be reconstructed successfully, // and as long as we wait for actor reconstruction before submitting new tasks, // it is guaranteed that all tasks are successfully completed. - void TestActorReconstruction(const std::unordered_map &resources, + void TestActorReconstruction(std::unordered_map &resources, bool is_direct_call); protected: @@ -216,8 +216,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker, return WaitForCondition(condition_func, timeout_ms); } -void CoreWorkerTest::TestNormalTask( - const std::unordered_map &resources) { +void CoreWorkerTest::TestNormalTask(std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); @@ -236,7 +235,7 @@ void CoreWorkerTest::TestNormalTask( TaskArg::PassByValue(std::make_shared(buffer1, nullptr))); args.emplace_back(TaskArg::PassByReference(object_id)); - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); TaskOptions options; std::vector return_ids; @@ -258,8 +257,8 @@ void CoreWorkerTest::TestNormalTask( } } -void CoreWorkerTest::TestActorTask( - const std::unordered_map &resources, bool is_direct_call) { +void CoreWorkerTest::TestActorTask(std::unordered_map &resources, + bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); @@ -281,7 +280,7 @@ void CoreWorkerTest::TestActorTask( TaskOptions options{1, resources}; std::vector return_ids; - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids)); @@ -323,7 +322,7 @@ void CoreWorkerTest::TestActorTask( TaskOptions options{1, resources}; std::vector return_ids; - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); auto status = driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); if (is_direct_call) { @@ -348,7 +347,7 @@ void CoreWorkerTest::TestActorTask( } void CoreWorkerTest::TestActorReconstruction( - const std::unordered_map &resources, bool is_direct_call) { + std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); @@ -389,7 +388,7 @@ void CoreWorkerTest::TestActorReconstruction( TaskOptions options{1, resources}; std::vector return_ids; - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); auto status = driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); @@ -405,8 +404,8 @@ void CoreWorkerTest::TestActorReconstruction( } } -void CoreWorkerTest::TestActorFailure( - const std::unordered_map &resources, bool is_direct_call) { +void CoreWorkerTest::TestActorFailure(std::unordered_map &resources, + bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); @@ -435,7 +434,7 @@ void CoreWorkerTest::TestActorFailure( TaskOptions options{1, resources}; std::vector return_ids; - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); auto status = driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids); @@ -641,7 +640,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { // to benchmark performance. uint8_t array[] = {1, 2, 3}; auto buffer = std::make_shared(array, sizeof(array)); - RayFunction function{ray::Language::PYTHON, {}}; + RayFunction function(ray::Language::PYTHON, {}); std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); @@ -649,8 +648,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, {}}; const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), - ActorHandleID::Nil(), function.language, true, - function.function_descriptor); + ActorHandleID::Nil(), function.GetLanguage(), true, + function.GetFunctionDescriptor()); // Manually create `num_tasks` task specs, and for each of them create a // `PushTaskRequest`, this is to batch performance of TaskSpec @@ -664,8 +663,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { auto num_returns = options.num_returns; TaskSpecBuilder builder; - builder.SetCommonTaskSpec(RandomTaskId(), function.language, - function.function_descriptor, job_id, RandomTaskId(), 0, + builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), + function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, num_returns, resources, resources); // Set task arguments. for (const auto &arg : args) { @@ -703,7 +702,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { // Test creating actor. uint8_t array[] = {1, 2, 3}; auto buffer = std::make_shared(array, sizeof(array)); - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); @@ -725,7 +724,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { TaskOptions options{1, resources}; std::vector return_ids; - RayFunction func{ray::Language::PYTHON, {}}; + RayFunction func(ray::Language::PYTHON, {}); RAY_CHECK_OK( driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids));