diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index baed93a85..e3fe76352 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -6,6 +6,7 @@ from cpython.exc cimport PyErr_CheckSignals import numpy +import threading import time import logging import os @@ -647,28 +648,34 @@ cdef CRayStatus task_execution_handler( with gil: try: - # The call to execute_task should never raise an exception. If it - # does, that indicates that there was an unexpected internal error. - execute_task(task_type, ray_function, c_resources, c_args, - c_arg_reference_ids, c_return_ids, - return_results_directly, returns) - except Exception: - traceback_str = traceback.format_exc() + ( - "An unexpected internal error occurred while the worker was" - "executing a task.") - ray.utils.push_error_to_driver( - ray.worker.global_worker, - "worker_crash", - traceback_str, - job_id=None) - # TODO(rkn): Note that if the worker was in the middle of executing - # a task, then any worker or driver that is blocking in a get call - # and waiting for the output of that task will hang. We need to - # address this. - sys.exit(1) + try: + # The call to execute_task should never raise an exception. If + # it does, that indicates that there was an internal error. + execute_task(task_type, ray_function, c_resources, c_args, + c_arg_reference_ids, c_return_ids, + return_results_directly, returns) + except Exception: + traceback_str = traceback.format_exc() + ( + "An unexpected internal error occurred while the worker " + "was executing a task.") + ray.utils.push_error_to_driver( + ray.worker.global_worker, + "worker_crash", + traceback_str, + job_id=None) + sys.exit(1) + except SystemExit: + if isinstance(threading.current_thread(), threading._MainThread): + raise + else: + # We cannot exit from a non-main thread, so return a special + # status that tells the core worker to call sys.exit() on the + # main thread instead. This only applies to direct actor calls. + return CRayStatus.SystemExit() return CRayStatus.OK() + cdef CRayStatus check_signals() nogil: with gil: try: @@ -678,6 +685,11 @@ cdef CRayStatus check_signals() nogil: return CRayStatus.OK() +cdef void exit_handler() nogil: + with gil: + sys.exit(0) + + cdef void push_objects_into_return_vector( py_objects, c_vector[shared_ptr[CRayObject]] *returns): @@ -733,7 +745,7 @@ cdef class CoreWorker: raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), node_ip_address.encode("utf-8"), task_execution_handler, - check_signals)) + check_signals, exit_handler)) def disconnect(self): with nogil: @@ -966,6 +978,7 @@ cdef class CoreWorker: resources, placement_resources, c_bool is_direct_call, + int32_t max_concurrency, c_bool is_detached): cdef: CRayFunction ray_function @@ -986,9 +999,9 @@ cdef class CoreWorker: check_status(self.core_worker.get().CreateActor( ray_function, args_vector, CActorCreationOptions( - max_reconstructions, is_direct_call, c_resources, - c_placement_resources, dynamic_worker_options, - is_detached), + max_reconstructions, is_direct_call, max_concurrency, + c_resources, c_placement_resources, + dynamic_worker_options, is_detached), &c_actor_id)) return ActorID(c_actor_id.Binary()) diff --git a/python/ray/actor.py b/python/ray/actor.py index a03319074..7fa2bad41 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -326,6 +326,26 @@ class ActorClass(object): """ return self._remote(args=args, kwargs=kwargs) + def options(self, **options): + """Convenience method for creating an actor with options. + + Same arguments as Actor._remote(), but returns a wrapped actor class + that a non-underscore .remote() can be called on. + + Examples: + # The following two calls are equivalent. + >>> Actor._remote(num_cpus=4, max_concurrency=8, args=[x, y]) + >>> Actor.options(num_cpus=4, max_concurrency=8).remote(x, y) + """ + + actor_cls = self + + class ActorOptionWrapper(object): + def remote(self, *args, **kwargs): + return actor_cls._remote(args=args, kwargs=kwargs, **options) + + return ActorOptionWrapper() + def _remote(self, args=None, kwargs=None, @@ -335,6 +355,7 @@ class ActorClass(object): object_store_memory=None, resources=None, is_direct_call=None, + max_concurrency=None, name=None, detached=False): """Create an actor. @@ -354,6 +375,8 @@ class ActorClass(object): resources: The custom resources required by the actor creation task. is_direct_call: Use direct actor calls. + max_concurrency: The max number of concurrent calls to allow for + this actor. This only works with direct actor calls. name: The globally unique name for the actor. detached: Whether the actor should be kept alive after driver exits. @@ -365,6 +388,16 @@ class ActorClass(object): args = [] if kwargs is None: kwargs = {} + if is_direct_call is None: + is_direct_call = False + if max_concurrency is None: + max_concurrency = 1 + + if max_concurrency > 1 and not is_direct_call: + raise ValueError( + "setting max_concurrency requires is_direct_call=True") + if max_concurrency < 1: + raise ValueError("max_concurrency must be >= 1") worker = ray.worker.get_global_worker() if worker.mode is None: @@ -452,7 +485,8 @@ class ActorClass(object): actor_id = worker.core_worker.create_actor( function_descriptor.get_function_descriptor_list(), creation_args, meta.max_reconstructions, resources, - actor_placement_resources, is_direct_call, detached) + actor_placement_resources, is_direct_call, max_concurrency, + detached) actor_handle = ActorHandle( actor_id, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index ad3a24c79..b7187f87e 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -2,7 +2,7 @@ from libcpp cimport bool as c_bool from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string -from libc.stdint cimport uint8_t, uint64_t, int64_t +from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector @@ -76,6 +76,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: @staticmethod CRayStatus Interrupted(const c_string &msg) + @staticmethod + CRayStatus SystemExit() + c_bool ok() c_bool IsOutOfMemory() c_bool IsKeyError() @@ -205,6 +208,7 @@ cdef extern from "ray/core_worker/common.h" nogil: CActorCreationOptions() CActorCreationOptions( uint64_t max_reconstructions, c_bool is_direct_call, + int32_t max_concurrency, const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, const c_vector[c_string] &dynamic_worker_options, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 858914c60..cbde7a278 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -64,7 +64,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectID] &return_ids, c_bool is_direct_call, c_vector[shared_ptr[CRayObject]] *returns) nogil, - CRayStatus() nogil) + CRayStatus() nogil, + void () nogil) void Disconnect() CWorkerType &GetWorkerType() CLanguage &GetLanguage() diff --git a/python/ray/ray_perf.py b/python/ray/ray_perf.py index 1bdaefc5a..78ab7eaa3 100644 --- a/python/ray/ray_perf.py +++ b/python/ray/ray_perf.py @@ -142,14 +142,28 @@ def main(): def actor_sync(): ray.get(a.small_value.remote()) - timeit("single client actor calls sync", actor_sync) + timeit("1:1 actor calls sync", actor_sync) a = Actor.remote() def actor_async(): ray.get([a.small_value.remote() for _ in range(1000)]) - timeit("single client actor calls async", actor_async, 1000) + timeit("1:1 actor calls async", actor_async, 1000) + + a = Actor.options(is_direct_call=True).remote() + + def actor_concurrent(): + ray.get([a.small_value.remote() for _ in range(1000)]) + + timeit("1:1 direct actor calls async", actor_concurrent, 1000) + + a = Actor.options(is_direct_call=True, max_concurrency=16).remote() + + def actor_concurrent(): + ray.get([a.small_value.remote() for _ in range(1000)]) + + timeit("1:1 direct actor calls concurrent", actor_concurrent, 1000) n_cpu = multiprocessing.cpu_count() // 2 a = [Actor.remote() for _ in range(n_cpu)] @@ -161,7 +175,7 @@ def main(): def actor_multi2(): ray.get([work.remote(a) for _ in range(m)]) - timeit("multi client actor calls async", actor_multi2, m * n) + timeit("n:n actor calls async", actor_multi2, m * n) n = 5000 n_cpu = multiprocessing.cpu_count() // 2 @@ -171,15 +185,24 @@ def main(): def actor_async_direct(): ray.get(client.small_value_batch.remote(n)) - timeit("single client direct actor calls async", actor_async_direct, - n * len(actors)) + timeit("1:n direct actor calls async", actor_async_direct, n * len(actors)) clients = [Client.remote(a) for a in actors] def actor_multi2_direct(): ray.get([c.small_value_batch.remote(n) for c in clients]) - timeit("multi client direct actor calls async", actor_multi2_direct, + timeit("n:n direct actor calls async", actor_multi2_direct, + n * len(clients)) + + n = 1000 + actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)] + clients = [Client.remote(a) for a in actors] + + def actor_multi2_direct_arg(): + ray.get([c.small_value_batch_arg.remote(n) for c in clients]) + + timeit("n:n direct actor calls with arg async", actor_multi2_direct_arg, n * len(clients)) n = 1000 diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 38e9b9d83..69083f354 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -110,6 +110,26 @@ class RemoteFunction(object): num_gpus=num_gpus, resources=resources) + def options(self, **options): + """Convenience method for executing a task with options. + + Same arguments as func._remote(), but returns a wrapped function + that a non-underscore .remote() can be called on. + + Examples: + # The following two calls are equivalent. + >>> func._remote(num_cpus=4, args=[x, y]) + >>> func.options(num_cpus=4).remote(x, y) + """ + + func_cls = self + + class FuncWrapper(object): + def remote(self, *args, **kwargs): + return func_cls._remote(args=args, kwargs=kwargs, **options) + + return FuncWrapper() + def _remote(self, args=None, kwargs=None, diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index aa6ba0d18..d9554b7ef 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1318,6 +1318,32 @@ def test_direct_actor_recursive(ray_start_regular): assert result == [x * 2 for x in range(100)] +def test_direct_actor_concurrent(ray_start_regular): + @ray.remote + class Batcher(object): + def __init__(self): + self.batch = [] + self.event = threading.Event() + + def add(self, x): + self.batch.append(x) + if len(self.batch) >= 3: + self.event.set() + else: + self.event.wait() + return sorted(self.batch) + + a = Batcher.options(is_direct_call=True, max_concurrency=3).remote() + x1 = a.add.remote(1) + x2 = a.add.remote(2) + x3 = a.add.remote(3) + r1 = ray.get(x1) + r2 = ray.get(x2) + r3 = ray.get(x3) + assert r1 == [1, 2, 3] + assert r1 == r2 == r3 + + def test_wait(ray_start_regular): @ray.remote def f(delay): @@ -1516,7 +1542,6 @@ def test_profiling_api(ray_start_2_cpus): profile_data = ray.timeline() event_types = {event["cat"] for event in profile_data} expected_types = [ - "worker_idle", "task", "task:deserialize_arguments", "task:execute", diff --git a/src/ray/common/status.h b/src/ray/common/status.h index fb421221a..e4e970727 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -80,6 +80,7 @@ enum class StatusCode : char { NotImplemented = 10, RedisError = 11, Interrupted = 12, + SystemExit = 13, }; #if defined(__clang__) @@ -147,6 +148,10 @@ class RAY_EXPORT Status { return Status(StatusCode::Interrupted, msg); } + static Status SystemExit() { + return Status(StatusCode::SystemExit, "process requested exit"); + } + // Returns true iff the status indicates success. bool ok() const { return (state_ == NULL); } @@ -161,6 +166,7 @@ class RAY_EXPORT Status { bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; } bool IsRedisError() const { return code() == StatusCode::RedisError; } bool IsInterrupted() const { return code() == StatusCode::Interrupted; } + bool IsSystemExit() const { return code() == StatusCode::SystemExit; } // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 115a38ac0..6e6ec3a33 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -189,6 +189,11 @@ bool TaskSpecification::IsDirectCall() const { return message_->actor_creation_task_spec().is_direct_call(); } +int TaskSpecification::MaxActorConcurrency() const { + RAY_CHECK(IsActorCreationTask()); + return message_->actor_creation_task_spec().max_concurrency(); +} + bool TaskSpecification::IsDetachedActor() const { RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().is_detached(); diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 29f4ec9be..bd925ea35 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -142,6 +142,8 @@ class TaskSpecification : public MessageWrapper { bool IsDirectCall() const; + int MaxActorConcurrency() const; + bool IsDetachedActor() const; ObjectID ActorDummyObject() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index c723bdc7f..bbe855f6a 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -93,7 +93,7 @@ class TaskSpecBuilder { TaskSpecBuilder &SetActorCreationTaskSpec( const ActorID &actor_id, uint64_t max_reconstructions = 0, const std::vector &dynamic_worker_options = {}, - bool is_direct_call = false, bool is_detached = false) { + bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false) { message_->set_type(TaskType::ACTOR_CREATION_TASK); auto actor_creation_spec = message_->mutable_actor_creation_task_spec(); actor_creation_spec->set_actor_id(actor_id.Binary()); @@ -102,6 +102,7 @@ class TaskSpecBuilder { actor_creation_spec->add_dynamic_worker_options(option); } actor_creation_spec->set_is_direct_call(is_direct_call); + actor_creation_spec->set_max_concurrency(max_concurrency); actor_creation_spec->set_is_detached(is_detached); return *this; } diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 2f171ddef..a25f377eb 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -97,12 +97,14 @@ struct TaskOptions { struct ActorCreationOptions { ActorCreationOptions() {} ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, + int max_concurrency, const std::unordered_map &resources, const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options, bool is_detached) : max_reconstructions(max_reconstructions), is_direct_call(is_direct_call), + max_concurrency(max_concurrency), resources(resources), placement_resources(placement_resources), dynamic_worker_options(dynamic_worker_options), @@ -114,6 +116,8 @@ struct ActorCreationOptions { /// Whether to use direct actor call. If this is set to true, callers will submit /// tasks directly to the created actor without going through raylet. const bool is_direct_call = false; + /// The max number of concurrent tasks to run on this direct call actor. + const int max_concurrency = 1; /// Resources required by the whole lifetime of this actor. const std::unordered_map resources; /// Resources required to place this actor. diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 0fcce82ed..8ffbffdbd 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -21,7 +21,6 @@ struct WorkerThreadContext { void SetCurrentTaskId(const TaskID &task_id) { current_task_id_ = task_id; } void SetCurrentTask(const TaskSpecification &task_spec) { - RAY_CHECK(current_task_id_.IsNil()); RAY_CHECK(task_index_ == 0); RAY_CHECK(put_index_ == 0); SetCurrentTaskId(task_spec.TaskId()); @@ -97,6 +96,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { RAY_CHECK(current_actor_id_.IsNil()); current_actor_id_ = task_spec.ActorCreationId(); current_actor_use_direct_call_ = task_spec.IsDirectCall(); + current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); } else if (task_spec.IsActorTask()) { RAY_CHECK(current_job_id_ == task_spec.JobId()); RAY_CHECK(current_actor_id_ == task_spec.ActorId()); @@ -122,21 +122,13 @@ bool WorkerContext::CurrentActorUseDirectCall() const { return current_actor_use_direct_call_; } -WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) { - // Flag used to ensure that we only print a warning about multithreading once per - // process. - static bool multithreading_warning_printed = false; +int WorkerContext::CurrentActorMaxConcurrency() const { + return current_actor_max_concurrency_; +} +WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) { if (thread_context_ == nullptr) { thread_context_ = std::unique_ptr(new WorkerThreadContext()); - if (!for_main_thread && !multithreading_warning_printed) { - std::cout << "WARNING: " - << "Calling ray.get or ray.wait in a separate thread " - << "may lead to deadlock if the main thread blocks on " - << "this thread and there are not enough resources to " - << "execute more tasks." << std::endl; - multithreading_warning_printed = true; - } } return *thread_context_; diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index aadd1e447..d8fb63ae5 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -36,6 +36,8 @@ class WorkerContext { bool CurrentActorUseDirectCall() const; + int CurrentActorMaxConcurrency() const; + int GetNextTaskIndex(); int GetNextPutIndex(); @@ -46,6 +48,7 @@ class WorkerContext { JobID current_job_id_; ActorID current_actor_id_; bool current_actor_use_direct_call_; + int current_actor_max_concurrency_; private: static WorkerThreadContext &GetThreadContext(bool for_main_thread = false); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8930865ec..aaf30e59f 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -73,7 +73,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, const TaskExecutionCallback &task_execution_callback, - std::function check_signals) + std::function check_signals, + const std::function exit_handler) : worker_type_(worker_type), language_(language), log_dir_(log_dir), @@ -116,7 +117,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, execute_task)); direct_actor_task_receiver_ = std::unique_ptr( new CoreWorkerDirectActorTaskReceiver(worker_context_, task_execution_service_, - worker_server_, execute_task)); + worker_server_, execute_task, + exit_handler)); } // Start RPC server after all the task receivers are properly initialized. @@ -483,6 +485,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, actor_creation_options.dynamic_worker_options, actor_creation_options.is_direct_call, + actor_creation_options.max_concurrency, actor_creation_options.is_detached); std::unique_ptr actor_handle(new ActorHandle( @@ -607,17 +610,11 @@ std::unique_ptr CoreWorker::CreateProfileEvent( new worker::ProfileEvent(profiler_, event_type)); } -void CoreWorker::StartExecutingTasks() { - idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); - task_execution_service_.run(); -} +void CoreWorker::StartExecutingTasks() { task_execution_service_.run(); } Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const ResourceMappingType &resource_ids, std::vector> *results) { - idle_profile_event_.reset(); - RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId(); - resource_ids_ = resource_ids; worker_context_.SetCurrentTask(task_spec); SetCurrentTaskId(task_spec.TaskId()); @@ -670,11 +667,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } } } - - // TODO(zhijunfu): - // 1. Check and handle failure. - // 2. Save or load checkpoint. - idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); return status; } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c7fd6a959..e4d58b689 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -52,6 +52,8 @@ class CoreWorker { /// \parma[in] check_signals Language worker function to check for signals and handle /// them. If the function returns anything but StatusOK, any long-running /// operations in the core worker will short circuit and return that status. + /// \parma[in] exit_handler Language worker function to orderly shutdown the worker. + /// We guarantee this will be run on the main thread of the worker. /// /// NOTE(zhijunfu): the constructor would throw if a failure happens. CoreWorker(const WorkerType worker_type, const Language language, @@ -59,7 +61,8 @@ class CoreWorker { const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, const TaskExecutionCallback &task_execution_callback, - std::function check_signals = nullptr); + std::function check_signals = nullptr, + std::function exit_handler = nullptr); ~CoreWorker(); @@ -438,10 +441,6 @@ class CoreWorker { /// Profiler including a background thread that pushes profiling events to the GCS. std::shared_ptr profiler_; - /// Profile event for when the worker is idle. Should be reset when the worker - /// enters and exits an idle period. - std::unique_ptr idle_profile_event_; - /// Task execution callback. TaskExecutionCallback task_execution_callback_; diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 246c933c8..1d1924923 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -62,9 +62,9 @@ ActorID CreateActorHelper(CoreWorker &worker, std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); - ActorCreationOptions actor_options{ - max_reconstructions, is_direct_call, resources, resources, {}, - /*is_detached*/ false}; + ActorCreationOptions actor_options{max_reconstructions, is_direct_call, + /*max_concurrency*/ 1, resources, resources, {}, + /*is_detached*/ false}; // Create an actor. ActorID actor_id; @@ -492,7 +492,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); std::unordered_map resources; - ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, resources, + ActorCreationOptions actor_options{0, /*is_direct_call*/ true, 1, resources, resources, {}, /*is_detached*/ false}; const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id, @@ -592,7 +592,6 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { auto thread_func = [&context]() { // Verify that task_index, put_index are thread-local. - ASSERT_TRUE(!context.GetCurrentTaskID().IsNil()); ASSERT_EQ(context.GetNextTaskIndex(), 1); ASSERT_EQ(context.GetNextPutIndex(), 1); }; diff --git a/src/ray/core_worker/test/scheduling_queue_test.cc b/src/ray/core_worker/test/scheduling_queue_test.cc index 46b78832d..bc402de88 100644 --- a/src/ray/core_worker/test/scheduling_queue_test.cc +++ b/src/ray/core_worker/test/scheduling_queue_test.cc @@ -23,7 +23,7 @@ class MockWaiter : public DependencyWaiter { TEST(SchedulingQueueTest, TestInOrder) { boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -43,7 +43,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { ObjectID obj3 = ObjectID::FromRandom(); boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -68,7 +68,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { ObjectID obj1 = ObjectID::FromRandom(); boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -85,7 +85,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { TEST(SchedulingQueueTest, TestOutOfOrder) { boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -102,7 +102,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) { TEST(SchedulingQueueTest, TestSeqWaitTimeout) { boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -124,7 +124,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) { TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { boost::asio::io_service io_service; MockWaiter waiter; - SchedulingQueue queue(io_service, waiter, 0); + SchedulingQueue queue(io_service, waiter, nullptr, 0); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 016a2ee97..07d9f2d66 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -146,7 +146,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushTask( if (!status.ok()) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. - RAY_LOG(DEBUG) << "Task failed with error: " << status; + RAY_LOG(INFO) << "Task failed with error: " << status; TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED); return; } @@ -200,10 +200,12 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c CoreWorkerDirectActorTaskReceiver::CoreWorkerDirectActorTaskReceiver( WorkerContext &worker_context, boost::asio::io_service &main_io_service, - rpc::GrpcServer &server, const TaskHandler &task_handler) + rpc::GrpcServer &server, const TaskHandler &task_handler, + const std::function &exit_handler) : worker_context_(worker_context), task_service_(main_io_service, *this), task_handler_(task_handler), + exit_handler_(exit_handler), task_main_io_service_(main_io_service) { server.RegisterService(task_service_); } @@ -212,6 +214,15 @@ void CoreWorkerDirectActorTaskReceiver::Init(RayletClient &raylet_client) { waiter_.reset(new DependencyWaiterImpl(raylet_client)); } +void CoreWorkerDirectActorTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { + if (max_concurrency != max_concurrency_) { + RAY_LOG(INFO) << "Creating new thread pool of size " << max_concurrency; + RAY_CHECK(pool_ == nullptr) << "Cannot change max concurrency at runtime."; + pool_.reset(new BoundedExecutor(max_concurrency)); + max_concurrency_ = max_concurrency; + } +} + void CoreWorkerDirectActorTaskReceiver::HandlePushTask( const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -223,6 +234,7 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( nullptr, nullptr); return; } + SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency()); // TODO(ekl) resolving object dependencies is expensive and requires an IPC to // the raylet, which is a central bottleneck. In the future, we should inline @@ -238,8 +250,8 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( auto it = scheduling_queue_.find(task_spec.CallerId()); if (it == scheduling_queue_.end()) { auto result = scheduling_queue_.emplace( - task_spec.CallerId(), std::unique_ptr( - new SchedulingQueue(task_main_io_service_, *waiter_))); + task_spec.CallerId(), std::unique_ptr(new SchedulingQueue( + task_main_io_service_, *waiter_, pool_))); it = result.first; } it->second->Add( @@ -256,6 +268,13 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( ResourceMappingType resource_ids; std::vector> results; auto status = task_handler_(task_spec, resource_ids, &results); + if (status.IsSystemExit()) { + // In Python, SystemExit cannot be raised except on the main thread. To work + // around this when we are executing tasks on worker threads, we re-post the + // exit event explicitly on the main thread. + task_main_io_service_.post([this]() { exit_handler_(); }); + return; + } RAY_CHECK(results.size() == num_returns) << results.size() << " " << num_returns; for (size_t i = 0; i < results.size(); i++) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 0fbf217f6..20ea36dd5 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -1,11 +1,14 @@ #ifndef RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H #define RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H +#include #include #include #include #include +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/ray_object.h" #include "ray/core_worker/context.h" @@ -180,16 +183,52 @@ class DependencyWaiterImpl : public DependencyWaiter { RayletClient &raylet_client_; }; +/// Wraps a thread-pool to block posts until the pool has free slots. This is used +/// by the SchedulingQueue to provide backpressure to clients. +class BoundedExecutor { + public: + BoundedExecutor(int max_concurrency) + : num_running_(0), max_concurrency_(max_concurrency), pool_(max_concurrency){}; + + /// Posts work to the pool, blocking if no free threads are available. + void PostBlocking(std::function fn) { + mu_.LockWhen(absl::Condition(this, &BoundedExecutor::ThreadsAvailable)); + num_running_ += 1; + mu_.Unlock(); + boost::asio::post(pool_, [this, fn]() { + fn(); + absl::MutexLock lock(&mu_); + num_running_ -= 1; + }); + } + + private: + bool ThreadsAvailable() EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return num_running_ < max_concurrency_; + } + + /// Protects access to the counters below. + absl::Mutex mu_; + /// The number of currently running tasks. + int num_running_ GUARDED_BY(mu_); + /// The max number of concurrently running tasks allowed. + const int max_concurrency_; + /// The underlying thread pool for running tasks. + boost::asio::thread_pool pool_; +}; + /// Used to ensure serial order of task execution per actor handle. /// See direct_actor.proto for a description of the ordering protocol. class SchedulingQueue { public: SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter, + std::shared_ptr pool = nullptr, int64_t reorder_wait_seconds = kMaxReorderWaitSeconds) : wait_timer_(main_io_service), waiter_(waiter), reorder_wait_seconds_(reorder_wait_seconds), - main_thread_id_(boost::this_thread::get_id()) {} + main_thread_id_(boost::this_thread::get_id()), + pool_(pool) {} void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, @@ -229,7 +268,12 @@ class SchedulingQueue { while (!pending_tasks_.empty() && pending_tasks_.begin()->first == next_seq_no_ && pending_tasks_.begin()->second.CanExecute()) { auto head = pending_tasks_.begin(); - head->second.Accept(); + auto request = head->second; + if (pool_ != nullptr) { + pool_->PostBlocking([request]() mutable { request.Accept(); }); + } else { + request.Accept(); + } pending_tasks_.erase(head); next_seq_no_++; } @@ -270,12 +314,15 @@ class SchedulingQueue { std::map pending_tasks_; /// The next sequence number we are waiting for to arrive. int64_t next_seq_no_ = 0; - /// Timer for waiting on dependencies. + /// Timer for waiting on dependencies. Note that this is set on the task main + /// io service, which is fine since it only ever fires if no tasks are running. boost::asio::deadline_timer wait_timer_; /// The id of the thread that constructed this scheduling queue. boost::thread::id main_thread_id_; /// Reference to the waiter owned by the task receiver. DependencyWaiter &waiter_; + /// If concurrent calls are allowed, holds the pool for executing these tasks. + std::shared_ptr pool_; friend class SchedulingQueueTest; }; @@ -289,7 +336,8 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context, boost::asio::io_service &main_io_service, rpc::GrpcServer &server, - const TaskHandler &task_handler); + const TaskHandler &task_handler, + const std::function &exit_handler); /// Initialize this receiver. This must be called prior to use. void Init(RayletClient &client); @@ -312,6 +360,9 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { rpc::DirectActorCallArgWaitCompleteReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Set the max concurrency at runtime. It cannot be changed once set. + void SetMaxActorConcurrency(int max_concurrency); + private: // Worker context. WorkerContext &worker_context_; @@ -319,6 +370,8 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { rpc::DirectActorGrpcService task_service_; /// The callback function to process a task. TaskHandler task_handler_; + /// The callback function to exit the worker. + std::function exit_handler_; /// The IO event loop for running tasks on. boost::asio::io_service &task_main_io_service_; /// Shared waiter for dependencies required by incoming tasks. @@ -326,6 +379,10 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { /// Queue of pending requests per actor handle. /// TODO(ekl) GC these queues once the handle is no longer active. std::unordered_map> scheduling_queue_; + /// The max number of concurrent calls to allow. + int max_concurrency_ = 1; + /// If concurrent calls are allowed, holds the pool for executing these tasks. + std::shared_ptr pool_; }; } // namespace ray diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 4c2476a44..e23c40afd 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -95,8 +95,10 @@ message ActorCreationTaskSpec { repeated string dynamic_worker_options = 4; // Whether direct actor call is used. bool is_direct_call = 5; + // The max number of concurrent calls for direct call actors. + int32 max_concurrency = 6; // Whether the actor is persistent - bool is_detached = 6; + bool is_detached = 7; } // Task spec of an actor task.