diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5be509a7d..339c4f3a1 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -109,7 +109,8 @@ include "includes/serialization.pxi" include "includes/libcoreworker.pxi" include "includes/global_state_accessor.pxi" -# Expose GCC & Clang macro to report whether C++ optimizations were enabled during compilation. +# Expose GCC & Clang macro to report whether C++ optimizations were enabled +# during compilation. OPTIMIZED = __OPTIMIZE__ logger = logging.getLogger(__name__) @@ -953,11 +954,10 @@ cdef class CoreWorker: prepare_args(self, language, args, &args_vector) with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( - c_actor_id, - ray_function, - args_vector, task_options, &return_ids)) + CCoreWorkerProcess.GetCoreWorker().SubmitActorTask( + c_actor_id, + ray_function, + args_vector, task_options, &return_ids) return VectorToObjectIDs(return_ids) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 84f4cc054..a9a8c43d6 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -89,7 +89,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CRayFunction &function, const c_vector[CTaskArg] &args, const CActorCreationOptions &options, const c_string &extension_data, CActorID *actor_id) - CRayStatus SubmitActorTask( + void SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ae193e96e..535c02442 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1171,12 +1171,12 @@ Status CoreWorker::CreateActor(const RayFunction &function, return status; } -Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, - const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids) { +void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, + const std::vector &args, + const TaskOptions &task_options, + std::vector *return_ids) { ActorHandle *actor_handle = nullptr; - RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); + RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle)); // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; @@ -1199,16 +1199,16 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f return_ids->pop_back(); // Submit task. - Status status; TaskSpecification task_spec = builder.Build(); if (options_.is_local_mode) { ExecuteTaskLocalMode(task_spec, actor_id); } else { task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), actor_handle->MaxTaskRetries()); - status = direct_actor_submitter_->SubmitTask(task_spec); + io_service_.post([this, task_spec]() { + RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec)); + }); } - return status; } Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 728a67b8f..816b41439 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -579,10 +579,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \return Status error if the task is invalid or if the task submission /// failed. Tasks can be invalid for direct actor calls because not all tasks /// are currently supported. - Status SubmitActorTask(const ActorID &actor_id, const RayFunction &function, - const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids); + void SubmitActorTask(const ActorID &actor_id, const RayFunction &function, + const std::vector &args, const TaskOptions &task_options, + std::vector *return_ids); /// Tell an actor to exit immediately, without completing outstanding work. /// diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index e76f02c89..dda4f1d12 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -170,10 +170,9 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( auto task_options = ToTaskOptions(env, numReturns, callOptions); std::vector return_ids; - auto status = ray::CoreWorkerProcess::GetCoreWorker().SubmitActorTask( + ray::CoreWorkerProcess::GetCoreWorker().SubmitActorTask( actor_id, ray_function, task_args, task_options, &return_ids); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeIdVectorToJavaByteArrayList(env, return_ids); } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 2242bfcfe..193cf2899 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -225,8 +225,8 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id, RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "GetWorkerPid", "", "", "")}; - RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, - options, &return_ids)); + CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options, + &return_ids); std::vector> results; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results)); @@ -306,7 +306,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); ASSERT_TRUE(return_ids[0].IsReturnObject()); @@ -348,8 +348,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - auto status = driver.SubmitActorTask(actor_id, func, args, options, &return_ids); - ASSERT_TRUE(status.ok()); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); @@ -412,7 +411,7 @@ void CoreWorkerTest::TestActorRestart( RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -455,7 +454,7 @@ void CoreWorkerTest::TestActorFailure( RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -606,7 +605,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index 427e5cab5..fa819d5e9 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -29,11 +29,8 @@ void Transport::SendInternal(std::shared_ptr buffer, std::move(buffer), meta, std::vector(), true))); std::vector> results; - ray::Status st = CoreWorkerProcess::GetCoreWorker().SubmitActorTask( - peer_actor_id_, function, args, options, &return_ids); - if (!st.ok()) { - STREAMING_LOG(ERROR) << "SubmitActorTask failed. " << st; - } + CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args, + options, &return_ids); } void Transport::Send(std::shared_ptr buffer) { diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index b78fb3fbc..42d259919 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -91,7 +91,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")}; - RAY_CHECK_OK(driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids); } void SubmitTestToActor(ActorID &actor_id, const std::string test) { @@ -107,7 +107,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", test, "execute_test", "")}; - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); } bool CheckCurTest(ActorID &actor_id, const std::string test_name) { @@ -123,7 +123,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "", "", "check_current_test_status", "")}; - RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); + driver.SubmitActorTask(actor_id, func, args, options, &return_ids); std::vector wait_results; std::vector> results;