diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 28afc8588..f70e64ea4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3,11 +3,21 @@ # cython: embedsignature = True # cython: language_level = 3 +from cpython.exc cimport PyErr_CheckSignals + import numpy import time import logging +import os +import sys -from libc.stdint cimport uint8_t, int32_t, int64_t, uint64_t +from libc.stdint cimport ( + int32_t, + int64_t, + INT64_MAX, + uint64_t, + uint8_t, +) from libcpp cimport bool as c_bool from libcpp.memory cimport ( dynamic_pointer_cast, @@ -28,6 +38,7 @@ from ray.includes.common cimport ( CRayStatus, CGcsClientOptions, CTaskArg, + CTaskType, CRayFunction, LocalMemoryBuffer, move, @@ -35,6 +46,9 @@ from ray.includes.common cimport ( LANGUAGE_JAVA, LANGUAGE_PYTHON, LocalMemoryBuffer, + TASK_TYPE_NORMAL_TASK, + TASK_TYPE_ACTOR_CREATION_TASK, + TASK_TYPE_ACTOR_TASK, WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, ) @@ -42,10 +56,10 @@ from ray.includes.libraylet cimport ( CRayletClient, GCSProfileEvent, GCSProfileTableData, - ResourceMappingType, WaitResultPair, ) from ray.includes.unique_ids cimport ( + CActorID, CActorCheckpointID, CObjectID, CClientID, @@ -54,12 +68,22 @@ from ray.includes.libcoreworker cimport ( CActorCreationOptions, CCoreWorker, CTaskOptions, + ResourceMappingType, ) from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig + import ray +import ray.experimental.signal as ray_signal +import ray.ray_constants as ray_constants from ray import profiling -from ray.exceptions import RayletError, ObjectStoreFullError +from ray.exceptions import ( + RayError, + RayletError, + RayTaskError, + ObjectStoreFullError +) +from ray.function_manager import FunctionDescriptor from ray.utils import decode from ray.ray_constants import ( DEFAULT_PUT_OBJECT_DELAY, @@ -105,9 +129,30 @@ cdef int check_status(const CRayStatus& status) nogil except -1: if status.IsObjectStoreFull(): raise ObjectStoreFullError(message) + elif status.IsInterrupted(): + raise KeyboardInterrupt() else: raise RayletError(message) +cdef RayObjectsToDataMetadataPairs( + const c_vector[shared_ptr[CRayObject]] objects): + data_metadata_pairs = [] + for i in range(objects.size()): + # core_worker will return a nullptr for objects that couldn't be + # retrieved from the store or if an object was an exception. + if not objects[i].get(): + data_metadata_pairs.append((None, None)) + else: + data = None + metadata = None + if objects[i].get().HasData(): + data = Buffer.make(objects[i].get().GetData()) + if objects[i].get().HasMetadata(): + metadata = Buffer.make( + objects[i].get().GetMetadata()).to_pybytes() + data_metadata_pairs.append((data, metadata)) + return data_metadata_pairs + cdef VectorToObjectIDs(const c_vector[CObjectID] &object_ids): result = [] @@ -327,17 +372,6 @@ cdef class RayletClient: # initialized before the raylet client. self.client = &core_worker.core_worker.get().GetRayletClient() - def get_task(self): - cdef: - unique_ptr[CTaskSpec] task_spec - - with nogil: - check_status(self.client.GetTask(&task_spec)) - return TaskSpec.make(task_spec) - - def task_done(self): - check_status(self.client.TaskDone()) - def fetch_or_reconstruct(self, object_ids, c_bool fetch_only, TaskID current_task_id=TaskID.nil()): @@ -345,27 +379,6 @@ cdef class RayletClient: check_status(self.client.FetchOrReconstruct( fetch_ids, fetch_only, current_task_id.native())) - def resource_ids(self): - cdef: - ResourceMappingType resource_mapping = ( - self.client.GetResourceIDs()) - unordered_map[ - c_string, c_vector[pair[int64_t, double]] - ].iterator iterator = resource_mapping.begin() - c_vector[pair[int64_t, double]] c_value - - resources_dict = {} - while iterator != resource_mapping.end(): - key = decode(dereference(iterator).first) - c_value = dereference(iterator).second - ids_and_fractions = [] - for i in range(c_value.size()): - ids_and_fractions.append( - (c_value[i].first, c_value[i].second)) - resources_dict[key] = ids_and_fractions - postincrement(iterator) - return resources_dict - def push_error(self, JobID job_id, error_type, error_message, double timestamp): check_status(self.client.PushError(job_id.native(), @@ -403,6 +416,272 @@ cdef class RayletClient: def is_worker(self): return self.client.IsWorker() +cdef deserialize_args( + const c_vector[shared_ptr[CRayObject]] &c_args, + const c_vector[CObjectID] &arg_reference_ids): + cdef: + c_vector[shared_ptr[CRayObject]] by_reference_objects + + args = [] + by_reference_ids = [] + by_reference_indices = [] + for i in range(c_args.size()): + # Passed by value. + if arg_reference_ids[i].IsNil(): + data = Buffer.make(c_args[i].get().GetData()) + if (c_args[i].get().HasMetadata() + and Buffer.make( + c_args[i].get().GetMetadata()).to_pybytes() + == RAW_BUFFER_METADATA): + args.append(data) + else: + args.append(pickle.loads(data.to_pybytes())) + # Passed by reference. + else: + by_reference_ids.append( + ObjectID(arg_reference_ids[i].Binary())) + by_reference_indices.append(i) + by_reference_objects.push_back(c_args[i]) + args.append(None) + + data_metadata_pairs = RayObjectsToDataMetadataPairs( + by_reference_objects) + for i, arg in enumerate( + ray.worker.global_worker.deserialize_objects( + data_metadata_pairs, by_reference_ids)): + args[by_reference_indices[i]] = arg + + for arg in args: + if isinstance(arg, RayError): + raise arg + + return ray.signature.recover_args(args) + +cdef _check_worker_state(worker, CTaskType task_type, JobID job_id): + assert worker.current_task_id.is_nil() + assert worker.task_context.task_index == 0 + assert worker.task_context.put_index == 1 + + # If this worker is not an actor, check that `current_job_id` + # was reset when the worker finished the previous task. + if task_type in [TASK_TYPE_NORMAL_TASK, + TASK_TYPE_ACTOR_CREATION_TASK]: + assert worker.current_job_id.is_nil() + # Set the driver ID of the current running task. This is + # needed so that if the task throws an exception, we propagate + # the error message to the correct driver. + worker.current_job_id = job_id + else: + # If this worker is an actor, current_job_id wasn't reset. + # Check that current task's driver ID equals the previous + # one. + assert worker.current_job_id == job_id + + +cdef _store_task_outputs(worker, return_ids, outputs): + for i in range(len(return_ids)): + return_id, output = return_ids[i], outputs[i] + if isinstance(output, ray.actor.ActorHandle): + raise Exception("Returning an actor handle from a remote " + "function is not allowed).") + if output is ray.experimental.no_return.NoReturn: + if not worker.core_worker.object_exists(return_id): + raise RuntimeError( + "Attempting to return 'ray.experimental.NoReturn' " + "from a remote function, but the corresponding " + "ObjectID does not exist in the local object store.") + else: + worker.put_object(return_id, output) + + +cdef execute_task( + CTaskType task_type, + const CRayFunction &ray_function, + const CJobID &c_job_id, + const CActorID &c_actor_id, + const unordered_map[c_string, double] &c_resources, + const c_vector[shared_ptr[CRayObject]] &c_args, + const c_vector[CObjectID] &c_arg_reference_ids, + const c_vector[CObjectID] &c_return_ids, + c_vector[shared_ptr[CRayObject]] *returns): + + worker = ray.worker.global_worker + + actor_id = ActorID(c_actor_id.Binary()) + job_id = JobID(c_job_id.Binary()) + task_id = worker.core_worker.get_current_task_id() + + # Check that the worker is in the expected state to execute the task. + _check_worker_state(worker, task_type, job_id) + worker.task_context.current_task_id = task_id + + # Automatically restrict the GPUs available to this task. + ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + + function_descriptor = FunctionDescriptor.from_bytes_list( + ray_function.GetFunctionDescriptor()) + + if task_type == TASK_TYPE_ACTOR_CREATION_TASK: + worker.actor_id = actor_id + actor_class = worker.function_actor_manager.load_actor_class( + job_id, function_descriptor) + worker.actors[actor_id] = actor_class.__new__(actor_class) + worker.actor_checkpoint_info[actor_id] = ( + ray.worker.ActorCheckpointInfo( + num_tasks_since_last_checkpoint=0, + last_checkpoint_timestamp=int(1000 * time.time()), + checkpoint_ids=[])) + + execution_info = worker.function_actor_manager.get_execution_info( + job_id, function_descriptor) + function_name = execution_info.function_name + extra_data = {"name": function_name, "task_id": task_id.hex()} + + if task_type == TASK_TYPE_NORMAL_TASK: + title = "ray_worker:{}()".format(function_name) + next_title = "ray_worker" + function_executor = execution_info.function + else: + actor = worker.actors[actor_id] + class_name = actor.__class__.__name__ + title = "ray_{}:{}()".format(class_name, function_name) + next_title = "ray_{}".format(class_name) + worker_name = "ray_{}_{}".format(class_name, os.getpid()) + if c_resources.find(b"memory") != c_resources.end(): + worker.memory_monitor.set_heap_limit( + worker_name, + ray_constants.from_memory_units( + dereference(c_resources.find(b"memory")).second)) + if c_resources.find(b"object_store_memory") != c_resources.end(): + worker._set_object_store_client_options( + worker_name, + int(ray_constants.from_memory_units( + dereference( + c_resources.find(b"object_store_memory")).second))) + + def function_executor(*arguments, **kwarguments): + return execution_info.function(actor, *arguments, **kwarguments) + + return_ids = VectorToObjectIDs(c_return_ids) + with profiling.profile("task", extra_data=extra_data): + try: + task_exception = False + if not (task_type == TASK_TYPE_ACTOR_TASK + and function_name == "__ray_terminate__"): + worker.reraise_actor_init_error() + worker.memory_monitor.raise_if_low_memory() + + with profiling.profile("task:deserialize_arguments"): + args, kwargs = deserialize_args(c_args, c_arg_reference_ids) + + # Execute the task. + with ray.worker._changeproctitle(title, next_title): + with profiling.profile("task:execute"): + task_exception = True + outputs = function_executor(*args, **kwargs) + task_exception = False + if len(return_ids) == 1: + outputs = (outputs,) + + # Store the outputs in the object store. + with profiling.profile("task:store_outputs"): + _store_task_outputs(worker, return_ids, outputs) + except Exception as error: + if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): + worker.mark_actor_init_failed(error) + + backtrace = ray.utils.format_error_message( + traceback.format_exc(), task_exception=task_exception) + if isinstance(error, RayTaskError): + # Avoid recursive nesting of RayTaskError. + failure_object = RayTaskError(function_name, backtrace, + error.cause_cls) + else: + failure_object = RayTaskError(function_name, backtrace, + error.__class__) + _store_task_outputs( + worker, return_ids, [failure_object] * len(return_ids)) + ray.utils.push_error_to_driver( + worker, + ray_constants.TASK_PUSH_ERROR, + str(failure_object), + job_id=worker.current_job_id) + + # Send signal with the error. + ray_signal.send(ray_signal.ErrorSignal(str(failure_object))) + + # Reset the state fields so the next task can run. + worker.task_context.current_task_id = TaskID.nil() + worker.core_worker.set_current_task_id(TaskID.nil()) + worker.task_context.task_index = 0 + worker.task_context.put_index = 1 + + # Don't need to reset `current_job_id` if the worker is an + # actor. Because the following tasks should all have the + # same driver id. + if task_type == TASK_TYPE_NORMAL_TASK: + worker.current_job_id = JobID.nil() + worker.core_worker.set_current_job_id(JobID.nil()) + + # Reset signal counters so that the next task can get + # all past signals. + ray_signal.reset() + + # Reset the state of the worker for the next task to execute. + # Increase the task execution counter. + worker.function_actor_manager.increase_task_counter( + job_id, function_descriptor) + + # If we've reached the max number of executions for this worker, exit. + reached_max_executions = ( + worker.function_actor_manager.get_task_counter( + job_id, function_descriptor) == execution_info.max_calls) + if reached_max_executions: + worker.core_worker.disconnect() + sys.exit(0) + +cdef CRayStatus task_execution_handler( + CTaskType task_type, + const CRayFunction &ray_function, + const CJobID &c_job_id, + const CActorID &c_actor_id, + const unordered_map[c_string, double] &c_resources, + const c_vector[shared_ptr[CRayObject]] &c_args, + const c_vector[CObjectID] &c_arg_reference_ids, + const c_vector[CObjectID] &c_return_ids, + c_vector[shared_ptr[CRayObject]] *returns) nogil: + + with gil: + try: + # The call to execute_task should never raise an exception. If it + # does, that indicates that there was an unexpected internal error. + execute_task(task_type, ray_function, c_job_id, + c_actor_id, c_resources, c_args, + c_arg_reference_ids, c_return_ids, returns) + except Exception: + traceback_str = traceback.format_exc() + ( + "An unexpected internal error occurred while the worker was" + "executing a task.") + ray.utils.push_error_to_driver( + ray.worker.global_worker, + "worker_crash", + traceback_str, + job_id=None) + # TODO(rkn): Note that if the worker was in the middle of executing + # a task, then any worker or driver that is blocking in a get call + # and waiting for the output of that task will hang. We need to + # address this. + sys.exit(1) + + return CRayStatus.OK() + +cdef CRayStatus check_signals() nogil: + with gil: + try: + PyErr_CheckSignals() + except KeyboardInterrupt: + return CRayStatus.Interrupted(b"") + return CRayStatus.OK() cdef class CoreWorker: cdef unique_ptr[CCoreWorker] core_worker @@ -419,12 +698,20 @@ cdef class CoreWorker: LANGUAGE_PYTHON, store_socket.encode("ascii"), raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), - node_ip_address.encode("utf-8"), NULL, False)) + node_ip_address.encode("utf-8"), task_execution_handler, + check_signals, False)) def disconnect(self): with nogil: self.core_worker.get().Disconnect() + def run_task_loop(self): + with nogil: + self.core_worker.get().Execution().Run() + + def get_current_task_id(self): + return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) + def set_current_task_id(self, TaskID task_id): cdef: CTaskID c_task_id = task_id.native() @@ -432,15 +719,8 @@ cdef class CoreWorker: with nogil: self.core_worker.get().SetCurrentTaskId(c_task_id) - def set_actor_id(self, ActorID actor_id): - cdef: - CActorID c_actor_id = actor_id.native() - - with nogil: - self.core_worker.get().SetActorId(c_actor_id) - - def get_current_task_id(self): - return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) + def get_current_job_id(self): + return JobID(self.core_worker.get().GetCurrentJobId().Binary()) def set_current_job_id(self, JobID job_id): cdef: @@ -449,7 +729,8 @@ cdef class CoreWorker: with nogil: self.core_worker.get().SetCurrentJobId(c_job_id) - def get_objects(self, object_ids, TaskID current_task_id): + def get_objects(self, object_ids, TaskID current_task_id, + int64_t timeout_ms=-1): cdef: c_vector[shared_ptr[CRayObject]] results CTaskID c_task_id = current_task_id.native() @@ -457,25 +738,9 @@ cdef class CoreWorker: with nogil: check_status(self.core_worker.get().Objects().Get( - c_object_ids, -1, &results)) + c_object_ids, timeout_ms, &results)) - data_metadata_pairs = [] - for result in results: - # core_worker will return a nullptr for objects that couldn't be - # retrieved from the store or if an object was an exception. - if not result.get(): - data_metadata_pairs.append((None, None)) - else: - data = None - metadata = None - if result.get().HasData(): - data = Buffer.make(result.get().GetData()) - if result.get().HasMetadata(): - metadata = Buffer.make( - result.get().GetMetadata()).to_pybytes() - data_metadata_pairs.append((data, metadata)) - - return data_metadata_pairs + return RayObjectsToDataMetadataPairs(results) def object_exists(self, ObjectID object_id): cdef: @@ -570,7 +835,7 @@ cdef class CoreWorker: with nogil: check_status(self.core_worker.get().Objects().Seal(c_object_id)) - def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds, + def wait(self, object_ids, int num_returns, int64_t timeout_ms, TaskID current_task_id): cdef: WaitResultPair result @@ -581,7 +846,7 @@ cdef class CoreWorker: wait_ids = ObjectIDsToVector(object_ids) with nogil: check_status(self.core_worker.get().Objects().Wait( - wait_ids, num_returns, timeout_milliseconds, &results)) + wait_ids, num_returns, timeout_ms, &results)) assert len(results) == len(object_ids) @@ -704,6 +969,28 @@ cdef class CoreWorker: return VectorToObjectIDs(return_ids) + def resource_ids(self): + cdef: + ResourceMappingType resource_mapping = ( + self.core_worker.get().GetResourceIDs()) + unordered_map[ + c_string, c_vector[pair[int64_t, double]] + ].iterator iterator = resource_mapping.begin() + c_vector[pair[int64_t, double]] c_value + + resources_dict = {} + while iterator != resource_mapping.end(): + key = decode(dereference(iterator).first) + c_value = dereference(iterator).second + ids_and_fractions = [] + for i in range(c_value.size()): + ids_and_fractions.append( + (c_value[i].first, c_value[i].second)) + resources_dict[key] = ids_and_fractions + postincrement(iterator) + + return resources_dict + def profile_event(self, event_type, dict extra_data): cdef: c_string c_event_type = event_type.encode("ascii") diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 9809feef2..674c6089a 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -199,8 +199,8 @@ class PlasmaEventHandler: del self._waiting_dict[fut.object_id] def _complete_future(self, fut): - obj = self._worker.retrieve_and_deserialize( - [ray.ObjectID(fut.object_id.binary())], 0)[0] + obj = self._worker.get_objects([ray.ObjectID( + fut.object_id.binary())])[0] fut.set_result(obj) def as_future(self, object_id, check_ready=True): diff --git a/python/ray/experimental/signal.py b/python/ray/experimental/signal.py index 39de8d861..f9b3d1c41 100644 --- a/python/ray/experimental/signal.py +++ b/python/ray/experimental/signal.py @@ -69,11 +69,10 @@ def send(signal): Args: signal: Signal to be sent. """ - if hasattr(ray.worker.global_worker, "actor_creation_task_id"): - source_key = ray.worker.global_worker.actor_id.hex() - else: - # No actors; this function must have been called from a task + if ray.worker.global_worker.actor_id.is_nil(): source_key = ray.worker.global_worker.current_task_id.hex() + else: + source_key = ray.worker.global_worker.actor_id.hex() encoded_signal = ray.utils.binary_to_hex(cloudpickle.dumps(signal)) ray.worker.global_worker.redis_client.execute_command( diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 8d8c9cfa9..259ef0f6f 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -763,7 +763,7 @@ class FunctionActorManager(object): worker's internal state to record the executed method. """ - def actor_method_executor(dummy_return_id, actor, *args, **kwargs): + def actor_method_executor(actor, *args, **kwargs): # Update the actor's task counter to reflect the task we're about # to execute. self._worker.actor_task_counter += 1 diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index a7268a152..8a5d37165 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -47,31 +47,34 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: CRayStatus OK() @staticmethod - CRayStatus OutOfMemory() + CRayStatus OutOfMemory(const c_string &msg) @staticmethod - CRayStatus KeyError() + CRayStatus KeyError(const c_string &msg) @staticmethod - CRayStatus Invalid() + CRayStatus Invalid(const c_string &msg) @staticmethod - CRayStatus IOError() + CRayStatus IOError(const c_string &msg) @staticmethod - CRayStatus TypeError() + CRayStatus TypeError(const c_string &msg) @staticmethod - CRayStatus UnknownError() + CRayStatus UnknownError(const c_string &msg) @staticmethod - CRayStatus NotImplemented() + CRayStatus NotImplemented(const c_string &msg) @staticmethod - CRayStatus RedisError() + CRayStatus ObjectStoreFull(const c_string &msg) @staticmethod - CRayStatus ObjectStoreFull() + CRayStatus RedisError(const c_string &msg) + + @staticmethod + CRayStatus Interrupted(const c_string &msg) c_bool ok() c_bool IsOutOfMemory() @@ -81,8 +84,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_bool IsTypeError() c_bool IsUnknownError() c_bool IsNotImplemented() - c_bool IsRedisError() c_bool IsObjectStoreFull() + c_bool IsRedisError() + c_bool IsInterrupted() c_string ToString() c_string CodeAsString() @@ -92,6 +96,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: # We can later add more of the common status factory methods as needed cdef CRayStatus RayStatus_OK "Status::OK"() cdef CRayStatus RayStatus_Invalid "Status::Invalid"() + cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"() cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil: @@ -117,6 +122,8 @@ cdef extern from "ray/protobuf/common.pb.h" nogil: pass cdef cppclass CWorkerType "ray::WorkerType": pass + cdef cppclass CTaskType "ray::TaskType": + pass # This is a workaround for C++ enum class since Cython has no corresponding @@ -130,6 +137,11 @@ cdef extern from "ray/protobuf/common.pb.h" nogil: cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER" cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER" +cdef extern from "ray/protobuf/common.pb.h" nogil: + cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK" + cdef CTaskType TASK_TYPE_ACTOR_CREATION_TASK "ray::TaskType::ACTOR_CREATION_TASK" # noqa: E501 + cdef CTaskType TASK_TYPE_ACTOR_TASK "ray::TaskType::ACTOR_TASK" + cdef extern from "ray/common/task/scheduling_resources.h" nogil: cdef cppclass ResourceSet "ray::ResourceSet": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index a1e0aff84..ddead813e 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -1,7 +1,13 @@ +# cython: profile = False +# distutils: language = c++ +# cython: embedsignature = True + from libc.stdint cimport int64_t from libcpp cimport bool as c_bool from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string +from libcpp.unordered_map cimport unordered_map +from libcpp.utility cimport pair from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( @@ -18,12 +24,30 @@ from ray.includes.common cimport ( CRayStatus, CTaskArg, CTaskOptions, + CTaskType, CWorkerType, CLanguage, CGcsClientOptions, ) +from ray.includes.task cimport CTaskSpec from ray.includes.libraylet cimport CRayletClient +ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ + ResourceMappingType + +cdef extern from "ray/core_worker/task_execution.h" namespace "ray" nogil: + cdef cppclass CTaskExecutionInterface "CoreWorkerTaskExecutionInterface": + void Run() + void Stop() + +cdef extern from "ray/core_worker/profiling.h" nogil: + cdef cppclass CProfiler "ray::worker::Profiler": + void Start() + + cdef cppclass CProfileEvent "ray::worker::ProfileEvent": + CProfileEvent(const shared_ptr[CProfiler] profiler, + const c_string &event_type) + void SetExtraData(const c_string &extra_data) cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfileEvent "ray::worker::ProfileEvent": @@ -54,12 +78,23 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_string &raylet_socket, const CJobID &job_id, const CGcsClientOptions &gcs_options, const c_string &log_dir, const c_string &node_ip_address, - void* execution_callback, + CRayStatus ( + CTaskType task_type, + const CRayFunction &ray_function, + const CJobID &job_id, + const CActorID &actor_id, + const unordered_map[c_string, double] &resources, + const c_vector[shared_ptr[CRayObject]] &args, + const c_vector[CObjectID] &arg_reference_ids, + const c_vector[CObjectID] &return_ids, + c_vector[shared_ptr[CRayObject]] *returns) nogil, + CRayStatus() nogil, c_bool use_memory_store_) void Disconnect() CWorkerType &GetWorkerType() CLanguage &GetLanguage() CObjectInterface &Objects() + CTaskExecutionInterface &Execution() CRayStatus SubmitTask( const CRayFunction &function, const c_vector[CTaskArg] &args, @@ -72,7 +107,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) - # CTaskExecutionInterface &Execution() unique_ptr[CProfileEvent] CreateProfileEvent( const c_string &event_type) @@ -81,12 +115,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayletClient &GetRayletClient() # TODO(edoakes): remove these once the Python core worker uses the task # interfaces + CJobID GetCurrentJobId() void SetCurrentJobId(const CJobID &job_id) CTaskID GetCurrentTaskId() void SetCurrentTaskId(const CTaskID &task_id) - void SetActorId(const CActorID &actor_id) const CActorID &GetActorId() CTaskID GetCallerId() + const ResourceMappingType &GetResourceIDs() const CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes) diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd index 9f045ee18..55b0fe148 100644 --- a/python/ray/includes/libraylet.pxd +++ b/python/ray/includes/libraylet.pxd @@ -3,7 +3,6 @@ from libcpp cimport bool as c_bool from libcpp.memory cimport unique_ptr from libcpp.string cimport string as c_string from libcpp.utility cimport pair -from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector from ray.includes.common cimport ( @@ -38,8 +37,6 @@ cdef extern from "ray/protobuf/gcs.pb.h" nogil: GCSProfileTableData() -ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ - ResourceMappingType ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair @@ -78,4 +75,3 @@ cdef extern from "ray/raylet/raylet_client.h" nogil: CWorkerID GetWorkerID() const CJobID GetJobID() const c_bool IsWorker() const - const ResourceMappingType &GetResourceIDs() const diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index f1210572f..6c37a3247 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -14,12 +14,6 @@ cdef class TaskSpec: cdef: unique_ptr[CTaskSpec] task_spec - @staticmethod - cdef make(unique_ptr[CTaskSpec]& task_spec): - cdef TaskSpec self = TaskSpec.__new__(TaskSpec) - self.task_spec.reset(task_spec.release()) - return self - @staticmethod def from_string(const c_string& task_spec_str): """Convert a string to a Ray task specification Python object. @@ -82,23 +76,23 @@ cdef class TaskSpec: def arguments(self): """Return the arguments for the task.""" cdef: - CTaskSpec*task_spec = self.task_spec.get() - int64_t num_args = task_spec.NumArgs() - int32_t lang = task_spec.GetLanguage() + int64_t num_args = self.task_spec.get().NumArgs() + int32_t lang = self.task_spec.get().GetLanguage() int count arg_list = [] if lang == LANGUAGE_PYTHON: for i in range(num_args): - count = task_spec.ArgIdCount(i) + count = self.task_spec.get().ArgIdCount(i) if count > 0: assert count == 1 arg_list.append( - ObjectID(task_spec.ArgId(i, 0).Binary())) + ObjectID(self.task_spec.get().ArgId(i, 0).Binary())) else: - data = task_spec.ArgData(i)[:task_spec.ArgDataSize(i)] - metadata = task_spec.ArgMetadata(i)[ - :task_spec.ArgMetadataSize(i)] + data = self.task_spec.get().ArgData(i)[ + :self.task_spec.get().ArgDataSize(i)] + metadata = self.task_spec.get().ArgMetadata(i)[ + :self.task_spec.get().ArgMetadataSize(i)] if metadata == RAW_BUFFER_METADATA: obj = data else: @@ -111,10 +105,10 @@ cdef class TaskSpec: def returns(self): """Return the object IDs for the return values of the task.""" - cdef CTaskSpec *task_spec = self.task_spec.get() return_id_list = [] - for i in range(task_spec.NumReturns()): - return_id_list.append(ObjectID(task_spec.ReturnId(i).Binary())) + for i in range(self.task_spec.get().NumReturns()): + return_id_list.append( + ObjectID(self.task_spec.get().ReturnId(i).Binary())) return return_id_list def required_resources(self): diff --git a/python/ray/state.py b/python/ray/state.py index e75141da4..f561a793e 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -505,6 +505,10 @@ class GlobalState(object): node_ip_address = profile_table_message.node_ip_address for profile_event_message in profile_table_message.profile_events: + try: + extra_data = json.loads(profile_event_message.extra_data) + except ValueError: + extra_data = {} profile_event = { "event_type": profile_event_message.event_type, "component_id": component_id, @@ -512,7 +516,7 @@ class GlobalState(object): "component_type": component_type, "start_time": profile_event_message.start_time, "end_time": profile_event_message.end_time, - "extra_data": json.loads(profile_event_message.extra_data), + "extra_data": extra_data } profile_events.append(profile_event) diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index e92b1642f..80107f6d3 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -106,7 +106,7 @@ class Cluster(object): return node - def remove_node(self, node, allow_graceful=False): + def remove_node(self, node, allow_graceful=True): """Kills all processes associated with worker node. Args: diff --git a/python/ray/tests/test_debug_tools.py b/python/ray/tests/test_debug_tools.py index 642b82cb5..1c6e4668c 100644 --- a/python/ray/tests/test_debug_tools.py +++ b/python/ray/tests/test_debug_tools.py @@ -47,4 +47,3 @@ def test_raylet_gdb(ray_gdb_start): stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert pgrep_command.communicate()[0] - subprocess.call(["pkill", "-f", "gdb.*{}".format(process_name)]) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 7f747cb3e..27a971a8e 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -292,7 +292,7 @@ def test_incorrect_method_calls(ray_start_regular): def test_worker_raising_exception(ray_start_regular): @ray.remote def f(): - ray.worker.global_worker._get_next_task_from_raylet = None + ray.worker.global_worker.function_actor_manager = None # Running this task should cause the worker to raise an exception after # the task has successfully completed. @@ -618,12 +618,17 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): time.sleep(1000) return 1 + @ray.remote + def h(): + time.sleep(1) + ray.get(f.remote()) + @ray.remote def g(): # Sleep so that the f tasks all get submitted to the scheduler after # the g tasks. time.sleep(1) - ray.get(f.remote()) + ray.get(h.remote()) [g.remote() for _ in range(num_cpus * 4)] wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) @@ -705,8 +710,6 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes): def test_raylet_crash_when_get(ray_start_regular): - nonexistent_id = ray.ObjectID.from_random() - def sleep_to_kill_raylet(): # Don't kill raylet before default workers get connected. time.sleep(2) @@ -715,14 +718,14 @@ def test_raylet_crash_when_get(ray_start_regular): thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(nonexistent_id) + ray.get(ray.ObjectID.from_random()) thread.join() def test_connect_with_disconnected_node(shutdown_only): config = json.dumps({ "num_heartbeats_timeout": 50, - "heartbeat_timeout_milliseconds": 10, + "raylet_heartbeat_timeout_milliseconds": 10, }) cluster = Cluster() cluster.add_node(num_cpus=0, _internal_config=config) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 7d84b12b8..a0249336e 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -52,7 +52,7 @@ def test_internal_config(ray_start_cluster_head): worker = cluster.add_node() cluster.wait_for_nodes() - cluster.remove_node(worker) + cluster.remove_node(worker, allow_graceful=False) time.sleep(1) assert ray.cluster_resources()["CPU"] == 2 diff --git a/python/ray/tests/test_set_task_returns.py b/python/ray/tests/test_set_task_returns.py deleted file mode 100644 index 9accfc74c..000000000 --- a/python/ray/tests/test_set_task_returns.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import pytest - -import ray -import ray.exceptions -import ray.experimental.no_return -import ray.worker - - -def test_set_single_output(ray_start_regular): - @ray.remote - def f(): - return_object_ids = ray.worker.global_worker._current_task.returns() - ray.worker.global_worker.put_object(return_object_ids[0], 123) - return ray.experimental.no_return.NoReturn - - assert ray.get(f.remote()) == 123 - - -def test_set_multiple_outputs(ray_start_regular): - @ray.remote(num_return_vals=3) - def f(set_out0, set_out1, set_out2): - returns = [] - return_object_ids = ray.worker.global_worker._current_task.returns() - for i, set_out in enumerate([set_out0, set_out1, set_out2]): - if set_out: - ray.worker.global_worker.put_object(return_object_ids[i], True) - returns.append(ray.experimental.no_return.NoReturn) - else: - returns.append(False) - return tuple(returns) - - for set_out0 in [True, False]: - for set_out1 in [True, False]: - for set_out2 in [True, False]: - result_object_ids = f.remote(set_out0, set_out1, set_out2) - assert ray.get(result_object_ids) == [ - set_out0, set_out1, set_out2 - ] - - -def test_set_actor_method(ray_start_regular): - @ray.remote - class Actor(object): - def __init__(self): - pass - - def ping(self): - return_object_ids = ray.worker.global_worker._current_task.returns( - ) - ray.worker.global_worker.put_object(return_object_ids[0], 123) - return ray.experimental.no_return.NoReturn - - actor = Actor.remote() - assert ray.get(actor.ping.remote()) == 123 - - -def test_exception(ray_start_regular): - @ray.remote(num_return_vals=2) - def f(): - return_object_ids = ray.worker.global_worker._current_task.returns() - # The first return value is successfully stored in the object store - ray.worker.global_worker.put_object(return_object_ids[0], 123) - raise Exception("Error") - # The exception is stored at the second return objcet ID. - return ray.experimental.no_return.NoReturn, 456 - - object_id, exception_id = f.remote() - - assert ray.get(object_id) == 123 - with pytest.raises(ray.exceptions.RayTaskError): - ray.get(exception_id) - - -def test_no_set_and_no_return(ray_start_regular): - @ray.remote - def f(): - return ray.experimental.no_return.NoReturn - - object_id = f.remote() - with pytest.raises(ray.exceptions.RayTaskError) as e: - ray.get(object_id) - assert "Attempting to return 'ray.experimental.NoReturn'" in str(e.value) diff --git a/python/ray/worker.py b/python/ray/worker.py index ef35798aa..13095a622 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -26,7 +26,6 @@ import random import pyarrow import pyarrow.plasma as plasma import ray.cloudpickle as pickle -import ray.experimental.signal as ray_signal import ray.experimental.no_return import ray.gcs_utils import ray.memory_monitor as memory_monitor @@ -41,7 +40,6 @@ import ray.state from ray import ( ActorID, - WorkerID, JobID, ObjectID, TaskID, @@ -60,10 +58,7 @@ from ray.exceptions import ( UnreconstructableError, RAY_EXCEPTION_TYPES, ) -from ray.function_manager import ( - FunctionActorManager, - FunctionDescriptor, -) +from ray.function_manager import FunctionActorManager from ray.utils import ( _random_string, check_oversized_pickle, @@ -156,7 +151,6 @@ class Worker(object): # Index of the current session. This number will # increment every time when `ray.shutdown` is called. self._session_index = 0 - self._current_task = None # Functions to run to process the values returned by ray.get. Each # postprocessor must take two arguments ("object_ids", and "values"). self._post_get_hooks = [] @@ -473,9 +467,10 @@ class Worker(object): logger.warning(warning_message) self.store_and_register(object_id, value) - def retrieve_and_deserialize(self, object_ids, error_timeout=10): - data_metadata_pairs = self.core_worker.get_objects( - object_ids, self.current_task_id) + def deserialize_objects(self, + data_metadata_pairs, + object_ids, + error_timeout=10): assert len(data_metadata_pairs) == len(object_ids) start_time = time.time() @@ -571,9 +566,9 @@ class Worker(object): if self.mode == LOCAL_MODE: return self.local_mode_manager.get_objects(object_ids) - results = self.retrieve_and_deserialize(object_ids) - assert len(results) == len(object_ids) - return results + data_metadata_pairs = self.core_worker.get_objects( + object_ids, self.current_task_id) + return self.deserialize_objects(data_metadata_pairs, object_ids) def run_function_on_all_workers(self, function, run_on_other_drivers=False): @@ -679,149 +674,6 @@ class Worker(object): return ray.signature.recover_args(arguments) - def _store_outputs_in_object_store(self, object_ids, outputs): - """Store the outputs of a remote function in the local object store. - - This stores the values that were returned by a remote function in the - local object store. If any of the return values are object IDs, then - these object IDs are aliased with the object IDs that the scheduler - assigned for the return values. This is called by the worker that - executes the remote function. - - Note: - The arguments object_ids and outputs should have the same length. - - Args: - object_ids (List[ObjectID]): The object IDs that were assigned to - the outputs of the remote function call. - outputs (Tuple): The value returned by the remote function. If the - remote function was supposed to only return one value, then its - output was wrapped in a tuple with one element prior to being - passed into this function. - """ - for i in range(len(object_ids)): - if isinstance(outputs[i], ray.actor.ActorHandle): - raise Exception("Returning an actor handle from a remote " - "function is not allowed).") - if outputs[i] is ray.experimental.no_return.NoReturn: - if not self.core_worker.object_exists(object_ids[i]): - raise RuntimeError( - "Attempting to return 'ray.experimental.NoReturn' " - "from a remote function, but the corresponding " - "ObjectID does not exist in the local object store.") - else: - self.put_object(object_ids[i], outputs[i]) - - def _process_task(self, task, function_execution_info): - """Execute a task assigned to this worker. - - This method deserializes a task from the scheduler, and attempts to - execute the task. If the task succeeds, the outputs are stored in the - local object store. If the task throws an exception, RayTaskError - objects are stored in the object store to represent the failed task - (these will be retrieved by calls to get or by subsequent tasks that - use the outputs of this task). - """ - assert self.current_task_id.is_nil() - assert self.task_context.task_index == 0 - assert self.task_context.put_index == 1 - if not task.is_actor_task(): - # If this worker is not an actor, check that `current_job_id` - # was reset when the worker finished the previous task. - assert self.current_job_id.is_nil() - # Set the driver ID of the current running task. This is - # needed so that if the task throws an exception, we propagate - # the error message to the correct driver. - self.current_job_id = task.job_id() - self.core_worker.set_current_job_id(task.job_id()) - else: - # If this worker is an actor, current_job_id wasn't reset. - # Check that current task's driver ID equals the previous one. - assert self.current_job_id == task.job_id() - - self.task_context.current_task_id = task.task_id() - self.core_worker.set_current_task_id(task.task_id()) - - function_descriptor = FunctionDescriptor.from_bytes_list( - task.function_descriptor_list()) - serialized_args = task.arguments() - return_object_ids = task.returns() - if task.is_actor_task() or task.is_actor_creation_task(): - dummy_return_id = return_object_ids.pop() - function_executor = function_execution_info.function - function_name = function_execution_info.function_name - - # Get task arguments from the object store. - try: - if function_name != "__ray_terminate__": - self.reraise_actor_init_error() - self.memory_monitor.raise_if_low_memory() - with profiling.profile("task:deserialize_arguments"): - function_args, function_kwargs = ( - self._get_arguments_for_execution(function_name, - serialized_args)) - except Exception as e: - self._handle_process_task_failure( - function_descriptor, return_object_ids, e, - ray.utils.format_error_message(traceback.format_exc())) - return - - # Execute the task. - try: - self._current_task = task - with profiling.profile("task:execute"): - if task.is_normal_task(): - outputs = function_executor(*function_args, - **function_kwargs) - else: - if task.is_actor_task(): - key = task.actor_id() - else: - key = task.actor_creation_id() - worker_name = "ray_{}_{}".format( - self.actors[key].__class__.__name__, os.getpid()) - if "memory" in task.required_resources(): - self.memory_monitor.set_heap_limit( - worker_name, - ray_constants.from_memory_units( - task.required_resources()["memory"])) - if "object_store_memory" in task.required_resources(): - self._set_object_store_client_options( - worker_name, - int( - ray_constants.from_memory_units( - task.required_resources()[ - "object_store_memory"]))) - outputs = function_executor( - dummy_return_id, self.actors[key], *function_args, - **function_kwargs) - except Exception as e: - # Determine whether the exception occured during a task, not an - # actor method. - task_exception = not task.is_actor_task() - traceback_str = ray.utils.format_error_message( - traceback.format_exc(), task_exception=task_exception) - self._handle_process_task_failure( - function_descriptor, return_object_ids, e, traceback_str) - return - finally: - self._current_task = None - - # Store the outputs in the local object store. - try: - with profiling.profile("task:store_outputs"): - # If this is an actor task, then the last object ID returned by - # the task is a dummy output, not returned by the function - # itself. Decrement to get the correct number of return values. - num_returns = len(return_object_ids) - if num_returns == 1: - outputs = (outputs, ) - self._store_outputs_in_object_store(return_object_ids, outputs) - except Exception as e: - self._handle_process_task_failure( - function_descriptor, return_object_ids, e, - ray.utils.format_error_message(traceback.format_exc())) - def _set_object_store_client_options(self, name, object_store_memory): try: logger.debug("Setting plasma memory limit to {} for {}".format( @@ -838,133 +690,15 @@ class Worker(object): "object store memory status is:\n\n{}".format( object_store_memory, name, e)) - def _handle_process_task_failure(self, function_descriptor, - return_object_ids, error, backtrace): - function_name = function_descriptor.function_name - if isinstance(error, RayTaskError): - # avoid recursively nesting of RayTaskError - failure_object = RayTaskError(function_name, backtrace, - error.cause_cls) - else: - failure_object = RayTaskError(function_name, backtrace, - error.__class__) - failure_objects = [ - failure_object for _ in range(len(return_object_ids)) - ] - self._store_outputs_in_object_store(return_object_ids, failure_objects) - # Log the error message. - ray.utils.push_error_to_driver( - self, - ray_constants.TASK_PUSH_ERROR, - str(failure_object), - job_id=self.current_job_id) - # Mark the actor init as failed - if not self.actor_id.is_nil() and function_name == "__init__": - self.mark_actor_init_failed(error) - # Send signal with the error. - ray_signal.send(ray_signal.ErrorSignal(str(failure_object))) - - def _wait_for_and_process_task(self, task): - """Wait for a task to be ready and process the task. - - Args: - task: The task to execute. - """ - function_descriptor = FunctionDescriptor.from_bytes_list( - task.function_descriptor_list()) - job_id = task.job_id() - - # TODO(rkn): It would be preferable for actor creation tasks to share - # more of the code path with regular task execution. - if task.is_actor_creation_task(): - # TODO: Remove Worker.actor_id and just use CoreWorker.GetActorId. - self.actor_id = task.actor_creation_id() - self.core_worker.set_actor_id(task.actor_creation_id()) - self.actor_creation_task_id = task.task_id() - actor_class = self.function_actor_manager.load_actor_class( - job_id, function_descriptor) - self.actors[self.actor_id] = actor_class.__new__(actor_class) - self.actor_checkpoint_info[self.actor_id] = ActorCheckpointInfo( - num_tasks_since_last_checkpoint=0, - last_checkpoint_timestamp=int(1000 * time.time()), - checkpoint_ids=[], - ) - - execution_info = self.function_actor_manager.get_execution_info( - job_id, function_descriptor) - - # Execute the task. - function_name = execution_info.function_name - extra_data = {"name": function_name, "task_id": task.task_id().hex()} - if not task.is_actor_task(): - if not task.is_actor_creation_task(): - title = "ray_worker:{}()".format(function_name) - next_title = "ray_worker" - else: - actor = self.actors[task.actor_creation_id()] - title = "ray_{}:{}()".format(actor.__class__.__name__, - function_name) - next_title = "ray_{}".format(actor.__class__.__name__) - else: - actor = self.actors[task.actor_id()] - title = "ray_{}:{}()".format(actor.__class__.__name__, - function_name) - next_title = "ray_{}".format(actor.__class__.__name__) - - with profiling.profile("task", extra_data=extra_data): - with _changeproctitle(title, next_title): - self._process_task(task, execution_info) - # Reset the state fields so the next task can run. - self.task_context.current_task_id = TaskID.nil() - self.core_worker.set_current_task_id(TaskID.nil()) - self.task_context.task_index = 0 - self.task_context.put_index = 1 - if self.actor_id.is_nil(): - # Don't need to reset `current_job_id` if the worker is an - # actor. Because the following tasks should all have the - # same driver id. - self.current_job_id = WorkerID.nil() - self.core_worker.set_current_job_id(JobID.nil()) - # Reset signal counters so that the next task can get - # all past signals. - ray_signal.reset() - - # Increase the task execution counter. - self.function_actor_manager.increase_task_counter( - job_id, function_descriptor) - - reached_max_executions = (self.function_actor_manager.get_task_counter( - job_id, function_descriptor) == execution_info.max_calls) - if reached_max_executions: - self.core_worker.disconnect() - sys.exit(0) - - def _get_next_task_from_raylet(self): - """Get the next task from the raylet. - - Returns: - A task from the raylet. - """ - with profiling.profile("worker_idle"): - task = self.raylet_client.get_task() - - # Automatically restrict the GPUs available to this task. - ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) - - return task - def main_loop(self): """The main loop a worker runs to receive and execute tasks.""" - def exit(signum, frame): - shutdown() - sys.exit(0) + def sigterm_handler(signum, frame): + shutdown(True) + sys.exit(1) - signal.signal(signal.SIGTERM, exit) - - while True: - task = self._get_next_task_from_raylet() - self._wait_for_and_process_task(task) + signal.signal(signal.SIGTERM, sigterm_handler) + self.core_worker.run_task_loop() def get_gpu_ids(): @@ -982,7 +716,7 @@ def get_gpu_ids(): raise Exception("ray.get_gpu_ids() currently does not work in LOCAL " "MODE.") - all_resource_ids = global_worker.raylet_client.resource_ids() + all_resource_ids = global_worker.core_worker.resource_ids() assigned_ids = [ resource_id for resource_id, _ in all_resource_ids.get("GPU", []) ] @@ -1010,7 +744,7 @@ def get_resource_ids(): "ray.get_resource_ids() currently does not work in LOCAL " "MODE.") - return global_worker.raylet_client.resource_ids() + return global_worker.core_worker.resource_ids() def get_webui_url(): @@ -1437,7 +1171,7 @@ def shutdown(exiting_interpreter=False): # to make sure that log messages finish printing. time.sleep(0.5) - disconnect() + disconnect(exiting_interpreter) # Disconnect global state from GCS. ray.state.state.disconnect() @@ -1456,6 +1190,13 @@ def shutdown(exiting_interpreter=False): atexit.register(shutdown, True) + +def sigterm_handler(signum, frame): + sys.exit(signal.SIGTERM) + + +signal.signal(signal.SIGTERM, sigterm_handler) + # Define a custom excepthook so that if the driver exits with an exception, we # can push that exception to Redis. normal_excepthook = sys.excepthook @@ -1900,7 +1641,7 @@ def connect(node, worker.cached_functions_to_run = None -def disconnect(): +def disconnect(exiting_interpreter=False): """Disconnect this worker from the raylet and object store.""" # Reset the list of cached remote functions and actors so that if more # remote functions or actors are defined and then connect is called again, @@ -1928,10 +1669,12 @@ def disconnect(): worker.function_actor_manager.reset_cache() worker.serialization_context_map.clear() - if hasattr(worker, "raylet_client"): - del worker.raylet_client - if hasattr(worker, "core_worker"): - del worker.core_worker + if not exiting_interpreter: + if hasattr(worker, "raylet_client"): + del worker.raylet_client + + if hasattr(worker, "core_worker"): + del worker.core_worker @contextmanager diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 594218480..26c4030ea 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function import argparse -import traceback import ray import ray.actor @@ -86,30 +85,5 @@ if __name__ == "__main__": node = ray.node.Node( ray_params, head=False, shutdown_at_exit=False, connect_only=True) ray.worker._global_node = node - ray.worker.connect(node, mode=ray.WORKER_MODE) - - error_explanation = """ - This error is unexpected and should not have happened. Somehow a worker - crashed in an unanticipated way causing the main_loop to throw an exception, - which is being caught in "python/ray/workers/default_worker.py". - """ - - try: - # This call to main_loop should never return if things are working. - # Most exceptions that are thrown (e.g., inside the execution of a - # task) should be caught and handled inside of the call to - # main_loop. If an exception is thrown here, then that means that - # there is some error that we didn't anticipate. - ray.worker.global_worker.main_loop() - except Exception: - traceback_str = traceback.format_exc() + error_explanation - ray.utils.push_error_to_driver( - ray.worker.global_worker, - "worker_crash", - traceback_str, - job_id=None) - # TODO(rkn): Note that if the worker was in the middle of executing - # a task, then any worker or driver that is blocking in a get call - # and waiting for the output of that task will hang. We need to - # address this. + ray.worker.global_worker.main_loop() diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 9c6dcef68..f7345c35c 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -74,6 +74,9 @@ std::string Status::CodeAsString() const { case StatusCode::RedisError: type = "RedisError"; break; + case StatusCode::Interrupted: + type = "Interrupted"; + break; default: type = "Unknown"; break; diff --git a/src/ray/common/status.h b/src/ray/common/status.h index 3a8d2f0f4..fb421221a 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -79,6 +79,7 @@ enum class StatusCode : char { UnknownError = 9, NotImplemented = 10, RedisError = 11, + Interrupted = 12, }; #if defined(__clang__) @@ -142,6 +143,10 @@ class RAY_EXPORT Status { return Status(StatusCode::RedisError, msg); } + static Status Interrupted(const std::string &msg) { + return Status(StatusCode::Interrupted, msg); + } + // Returns true iff the status indicates success. bool ok() const { return (state_ == NULL); } @@ -155,6 +160,7 @@ class RAY_EXPORT Status { bool IsUnknownError() const { return code() == StatusCode::UnknownError; } bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; } bool IsRedisError() const { return code() == StatusCode::RedisError; } + bool IsInterrupted() const { return code() == StatusCode::Interrupted; } // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index 775e4bd6d..4844326d9 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -713,8 +713,7 @@ std::vector> ResourceIdSet::ToF const std::string ResourceIdSet::Serialize() const { flatbuffers::FlatBufferBuilder fbb; - auto resource_id_set_flatbuf = ToFlatbuf(fbb); - fbb.Finish(fbb.CreateVector(resource_id_set_flatbuf)); + fbb.Finish(protocol::CreateResourceIdSetInfos(fbb, fbb.CreateVector(ToFlatbuf(fbb)))); return std::string(fbb.GetBufferPointer(), fbb.GetBufferPointer() + fbb.GetSize()); } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b463ff760..0db95165a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,9 +1,7 @@ -#include - +#include "ray/core_worker/core_worker.h" #include "ray/common/ray_config.h" #include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" -#include "ray/core_worker/core_worker.h" namespace { @@ -42,13 +40,13 @@ void BuildCommonTaskSpec( namespace ray { -CoreWorker::CoreWorker( - const WorkerType worker_type, const Language language, - const std::string &store_socket, const std::string &raylet_socket, - const JobID &job_id, const gcs::GcsClientOptions &gcs_options, - const std::string &log_dir, const std::string &node_ip_address, - const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, - bool use_memory_store) +CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, + const std::string &store_socket, const std::string &raylet_socket, + const JobID &job_id, const gcs::GcsClientOptions &gcs_options, + const std::string &log_dir, const std::string &node_ip_address, + const CoreWorkerTaskExecutionInterface::TaskExecutionCallback + &task_execution_callback, + std::function check_signals, bool use_memory_store) : worker_type_(worker_type), language_(language), raylet_socket_(raylet_socket), @@ -66,14 +64,6 @@ CoreWorker::CoreWorker( RayLog::InstallFailureSignalHandler(); } - boost::asio::signal_set signals(io_service_, SIGINT, SIGTERM); - signals.async_wait( - [](const boost::system::error_code &error, int signal_number) -> void { - if (!error) { - exit(signal_number); - } - }); - // Initialize gcs client. gcs_client_ = std::unique_ptr(new gcs::RedisGcsClient(gcs_options)); @@ -83,21 +73,18 @@ CoreWorker::CoreWorker( profiler_ = std::make_shared(worker_context_, node_ip_address, io_service_, gcs_client_); - object_interface_ = - std::unique_ptr(new CoreWorkerObjectInterface( - worker_context_, raylet_client_, store_socket, use_memory_store)); + object_interface_ = std::unique_ptr( + new CoreWorkerObjectInterface(worker_context_, raylet_client_, store_socket, + use_memory_store, check_signals)); // Initialize task execution. int rpc_server_port = 0; if (worker_type_ == WorkerType::WORKER) { - // TODO(edoakes): Remove this check once Python core worker migration is complete. - if (language != Language::PYTHON || execution_callback != nullptr) { - RAY_CHECK(execution_callback != nullptr); - task_execution_interface_ = std::unique_ptr( - new CoreWorkerTaskExecutionInterface(worker_context_, raylet_client_, - *object_interface_, execution_callback)); - rpc_server_port = task_execution_interface_->worker_server_.GetPort(); - } + task_execution_interface_ = std::unique_ptr( + new CoreWorkerTaskExecutionInterface(*this, worker_context_, raylet_client_, + *object_interface_, profiler_, + task_execution_callback)); + rpc_server_port = task_execution_interface_->worker_server_.GetPort(); } // Initialize raylet client. @@ -139,7 +126,6 @@ CoreWorker::CoreWorker( std::shared_ptr data = std::make_shared(); data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr)); - worker_context_.SetCurrentTaskId(task_id); SetCurrentTaskId(task_id); } @@ -201,6 +187,7 @@ CoreWorker::~CoreWorker() { } void CoreWorker::Disconnect() { + io_service_.stop(); if (gcs_client_) { gcs_client_->Disconnect(); } @@ -209,6 +196,17 @@ void CoreWorker::Disconnect() { } } +void CoreWorker::StartIOService() { + // Block SIGINT and SIGTERM so they will be handled by the main thread. + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + + io_service_.run(); +} + std::unique_ptr CoreWorker::CreateProfileEvent( const std::string &event_type) { return std::unique_ptr( @@ -390,4 +388,12 @@ Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, return status; } +const ResourceMappingType CoreWorker::GetResourceIDs() const { + if (worker_type_ == WorkerType::DRIVER) { + ResourceMappingType empty; + return empty; + } + return task_execution_interface_->GetResourceIDs(); +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d0cd16c71..a7d5b9490 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -30,7 +30,10 @@ class CoreWorker { /// \param[in] log_dir Directory to write logs to. If this is empty, logs /// won't be written to a file. /// \param[in] node_ip_address IP address of the node. - /// \param[in] execution_callback Language worker callback to execute tasks. + /// \param[in] task_execution_callback Language worker callback to execute tasks. + /// \parma[in] check_signals Language worker function to check for signals and handle + /// them. If the function returns anything but StatusOK, any long-running + /// operations in the core worker will short circuit and return that status. /// \param[in] use_memory_store Whether or not to use the in-memory object store /// in addition to the plasma store. /// @@ -42,7 +45,9 @@ class CoreWorker { const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, const std::string &log_dir, const std::string &node_ip_address, - const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, + const CoreWorkerTaskExecutionInterface::TaskExecutionCallback + &task_execution_callback, + std::function check_signals = nullptr, bool use_memory_store = true); ~CoreWorker(); @@ -73,13 +78,20 @@ class CoreWorker { return *task_execution_interface_; } + // Get the resource IDs available to this worker (as assigned by the raylet). + const ResourceMappingType GetResourceIDs() const; + + // TODO(edoakes): remove this once Python core worker uses the task interfaces. const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); } // TODO(edoakes): remove this once Python core worker uses the task interfaces. - void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); } + void SetCurrentTaskId(const TaskID &task_id); // TODO(edoakes): remove this once Python core worker uses the task interfaces. - void SetCurrentTaskId(const TaskID &task_id); + const JobID &GetCurrentJobId() const { return worker_context_.GetCurrentJobID(); } + + // TODO(edoakes): remove this once Python core worker uses the task interfaces. + void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); } void SetActorId(const ActorID &actor_id) { RAY_CHECK(actor_id_.IsNil()); @@ -187,7 +199,7 @@ class CoreWorker { /// \return Status::Invalid if we don't have this actor handle. Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; - void StartIOService() { io_service_.run(); } + void StartIOService(); void ReportActiveObjectIDs(); @@ -211,13 +223,24 @@ class CoreWorker { /// raylet. boost::asio::steady_timer heartbeat_timer_; + // Thread that runs a boost::asio service to process IO events. std::thread io_thread_; - std::shared_ptr profiler_; - std::unique_ptr raylet_client_; - std::unique_ptr direct_actor_submitter_; + + // Client to the GCS shared by core worker interfaces. std::unique_ptr gcs_client_; + + // Client to the raylet shared by core worker interfaces. + std::unique_ptr raylet_client_; + + // Interface to submit tasks directly to other actors. + std::unique_ptr direct_actor_submitter_; + + // Interface for storing and retrieving shared objects. std::unique_ptr object_interface_; + // Profiler including a background thread that pushes profiling events to the GCS. + std::shared_ptr profiler_; + /// Map from actor ID to a handle to that actor. std::unordered_map > actor_handles_; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index b0128e5df..a530f59e8 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -37,42 +37,46 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork auto job_id = JavaByteArrayToId(env, jobId); auto gcs_client_options = ToGcsClientOptions(env, gcsClientOptions); - auto executor_func = [](const ray::RayFunction &ray_function, - const std::vector> &args, - int num_returns, - std::vector> *results) { - JNIEnv *env = local_env; - RAY_CHECK(env); - RAY_CHECK(local_java_task_executor); - // convert RayFunction - jobject ray_function_array_list = - NativeStringVectorToJavaStringList(env, ray_function.GetFunctionDescriptor()); - // convert args - // TODO (kfstorm): Avoid copying binary data from Java to C++ - jobject args_array_list = NativeVectorToJavaList>( - env, args, NativeRayObjectToJavaNativeRayObject); + auto task_execution_callback = + [](ray::TaskType task_type, const ray::RayFunction &ray_function, + const JobID &job_id, const ActorID &actor_id, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, + std::vector> *results) { + JNIEnv *env = local_env; + RAY_CHECK(env); + RAY_CHECK(local_java_task_executor); + // convert RayFunction + jobject ray_function_array_list = + NativeStringVectorToJavaStringList(env, ray_function.GetFunctionDescriptor()); + // convert args + // TODO (kfstorm): Avoid copying binary data from Java to C++ + jobject args_array_list = NativeVectorToJavaList>( + env, args, NativeRayObjectToJavaNativeRayObject); - // invoke Java method - jobject java_return_objects = - env->CallObjectMethod(local_java_task_executor, java_task_executor_execute, - ray_function_array_list, args_array_list); - std::vector> return_objects; - JavaListToNativeVector>( - env, java_return_objects, &return_objects, - [](JNIEnv *env, jobject java_native_ray_object) { - return JavaNativeRayObjectToNativeRayObject(env, java_native_ray_object); - }); - for (auto &obj : return_objects) { - results->push_back(obj); - } - return ray::Status::OK(); - }; + // invoke Java method + jobject java_return_objects = + env->CallObjectMethod(local_java_task_executor, java_task_executor_execute, + ray_function_array_list, args_array_list); + std::vector> return_objects; + JavaListToNativeVector>( + env, java_return_objects, &return_objects, + [](JNIEnv *env, jobject java_native_ray_object) { + return JavaNativeRayObjectToNativeRayObject(env, java_native_ray_object); + }); + for (auto &obj : return_objects) { + results->push_back(obj); + } + return ray::Status::OK(); + }; try { auto core_worker = new ray::CoreWorker( static_cast(workerMode), ::Language::JAVA, native_store_socket, native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", - /*node_ip_address=*/"", executor_func); + /*node_ip_address=*/"", task_execution_callback); return reinterpret_cast(core_worker); } catch (const std::exception &e) { std::ostringstream oss; diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index acdf844aa..586e67393 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -37,12 +37,14 @@ void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider( CoreWorkerObjectInterface::CoreWorkerObjectInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket, bool use_memory_store) + const std::string &store_socket, bool use_memory_store, + std::function check_signals) : worker_context_(worker_context), raylet_client_(raylet_client), store_socket_(store_socket), use_memory_store_(use_memory_store), memory_store_(std::make_shared()) { + check_signals_ = check_signals; AddStoreProvider(StoreProviderType::PLASMA); AddStoreProvider(StoreProviderType::MEMORY); } @@ -269,7 +271,7 @@ std::unique_ptr CoreWorkerObjectInterface::CreateStoreP switch (type) { case StoreProviderType::PLASMA: return std::unique_ptr( - new CoreWorkerPlasmaStoreProvider(store_socket_, raylet_client_)); + new CoreWorkerPlasmaStoreProvider(store_socket_, raylet_client_, check_signals_)); case StoreProviderType::MEMORY: return std::unique_ptr( new CoreWorkerMemoryStoreProvider(memory_store_)); diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 9d576c319..050400357 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -24,8 +24,8 @@ class CoreWorkerObjectInterface { /// in addition to the plasma store. CoreWorkerObjectInterface(WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket, - bool use_memory_store = true); + const std::string &store_socket, bool use_memory_store = true, + std::function check_signals = nullptr); /// Set options for this client's interactions with the object store. /// @@ -160,6 +160,8 @@ class CoreWorkerObjectInterface { EnumUnorderedMap> store_providers_; + std::function check_signals_; + friend class CoreWorkerTaskInterface; /// TODO(zhijunfu): This is necessary as direct call task submitter needs to create diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 0e43b445b..4e071578f 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -8,8 +8,10 @@ namespace ray { CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( - const std::string &store_socket, std::unique_ptr &raylet_client) + const std::string &store_socket, std::unique_ptr &raylet_client, + std::function check_signals) : raylet_client_(raylet_client) { + check_signals_ = check_signals; RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket)); } @@ -183,6 +185,14 @@ Status CoreWorkerPlasmaStoreProvider::Get( unsuccessful_attempts++; WarnIfAttemptedTooManyTimes(unsuccessful_attempts, remaining); } + if (check_signals_) { + Status status = check_signals_(); + if (!status.ok()) { + // TODO(edoakes): in this case which status should we return? + RAY_RETURN_NOT_OK(raylet_client_->NotifyUnblocked(task_id)); + return status; + } + } } // Notify unblocked because we blocked when calling FetchOrReconstruct with @@ -201,15 +211,32 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::unordered_set &o int num_objects, int64_t timeout_ms, const TaskID &task_id, std::unordered_set *ready) { - WaitResultPair result_pair; std::vector id_vector(object_ids.begin(), object_ids.end()); - RAY_RETURN_NOT_OK(raylet_client_->Wait(id_vector, num_objects, timeout_ms, false, - task_id, &result_pair)); - for (const auto &entry : result_pair.first) { - ready->insert(entry); + bool should_break = false; + int64_t remaining_timeout = timeout_ms; + while (!should_break) { + WaitResultPair result_pair; + int64_t call_timeout = RayConfig::instance().get_timeout_milliseconds(); + if (remaining_timeout >= 0) { + call_timeout = std::min(remaining_timeout, call_timeout); + remaining_timeout -= call_timeout; + should_break = remaining_timeout <= 0; + } + + RAY_RETURN_NOT_OK(raylet_client_->Wait(id_vector, num_objects, call_timeout, false, + task_id, &result_pair)); + + if (result_pair.first.size() >= static_cast(num_objects)) { + should_break = true; + } + for (const auto &entry : result_pair.first) { + ready->insert(entry); + } + if (check_signals_) { + RAY_RETURN_NOT_OK(check_signals_()); + } } - return Status::OK(); } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 1640fff12..b7fabc2f8 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -20,7 +20,8 @@ class CoreWorker; class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { public: CoreWorkerPlasmaStoreProvider(const std::string &store_socket, - std::unique_ptr &raylet_client); + std::unique_ptr &raylet_client, + std::function check_signals); ~CoreWorkerPlasmaStoreProvider(); @@ -83,6 +84,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { std::unique_ptr &raylet_client_; plasma::PlasmaClient store_client_; std::mutex store_client_mutex_; + std::function check_signals_; }; } // namespace ray diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index 3d1688d6b..5e027f3c3 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -7,18 +7,24 @@ namespace ray { CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( - WorkerContext &worker_context, std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, const TaskExecutor &executor) - : worker_context_(worker_context), + CoreWorker &core_worker, WorkerContext &worker_context, + std::unique_ptr &raylet_client, + CoreWorkerObjectInterface &object_interface, + const std::shared_ptr profiler, + const TaskExecutionCallback &task_execution_callback) + : core_worker_(core_worker), + worker_context_(worker_context), object_interface_(object_interface), - execution_callback_(executor), + profiler_(profiler), + task_execution_callback_(task_execution_callback), worker_server_("Worker", 0 /* let grpc choose port */), main_service_(std::make_shared()), main_work_(*main_service_) { - RAY_CHECK(execution_callback_ != nullptr); + RAY_CHECK(task_execution_callback_ != nullptr); - auto func = std::bind(&CoreWorkerTaskExecutionInterface::ExecuteTask, this, - std::placeholders::_1, std::placeholders::_2); + auto func = + std::bind(&CoreWorkerTaskExecutionInterface::ExecuteTask, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); task_receivers_.emplace( TaskTransportType::RAYLET, std::unique_ptr(new CoreWorkerRayletTaskReceiver( @@ -35,33 +41,54 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface( } Status CoreWorkerTaskExecutionInterface::ExecuteTask( - const TaskSpecification &task_spec, + const TaskSpecification &task_spec, const ResourceMappingType &resource_ids, std::vector> *results) { + idle_profile_event_.reset(); RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId(); + resource_ids_ = resource_ids; worker_context_.SetCurrentTask(task_spec); + core_worker_.SetCurrentTaskId(task_spec.TaskId()); RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; std::vector> args; - RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args)); + std::vector arg_reference_ids; + RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids)); - auto num_returns = task_spec.NumReturns(); - if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) { - RAY_CHECK(num_returns > 0); - // Decrease to account for the dummy object id. - num_returns--; + std::vector return_ids; + for (size_t i = 0; i < task_spec.NumReturns(); i++) { + return_ids.push_back(task_spec.ReturnId(i)); } - auto status = execution_callback_(func, args, num_returns, results); + Status status; + ActorID actor_id = ActorID::Nil(); + TaskType task_type = TaskType::NORMAL_TASK; + if (task_spec.IsActorCreationTask()) { + RAY_CHECK(return_ids.size() > 0); + return_ids.pop_back(); + actor_id = task_spec.ActorCreationId(); + task_type = TaskType::ACTOR_CREATION_TASK; + core_worker_.SetActorId(actor_id); + } else if (task_spec.IsActorTask()) { + RAY_CHECK(return_ids.size() > 0); + return_ids.pop_back(); + actor_id = task_spec.ActorId(); + task_type = TaskType::ACTOR_TASK; + } + status = task_execution_callback_(task_type, func, task_spec.JobId(), actor_id, + task_spec.GetRequiredResources().GetResourceMap(), + args, arg_reference_ids, return_ids, results); + // TODO(zhijunfu): // 1. Check and handle failure. // 2. Save or load checkpoint. + idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); return status; } void CoreWorkerTaskExecutionInterface::Run() { - // Run main IO service. + idle_profile_event_.reset(new worker::ProfileEvent(profiler_, "worker_idle")); main_service_->run(); } @@ -70,13 +97,16 @@ void CoreWorkerTaskExecutionInterface::Stop() { std::shared_ptr main_service = main_service_; // Delay the execution of io_service::stop() to avoid deadlock if // CoreWorkerTaskExecutionInterface::Stop is called inside a task. + idle_profile_event_.reset(); main_service_->post([main_service]() { main_service->stop(); }); } Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( - const TaskSpecification &task, std::vector> *args) { + const TaskSpecification &task, std::vector> *args, + std::vector *arg_reference_ids) { auto num_args = task.NumArgs(); - (*args).resize(num_args); + args->resize(num_args); + arg_reference_ids->resize(num_args); std::vector object_ids_to_fetch; std::vector indices; @@ -88,6 +118,7 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( RAY_CHECK(count == 1); object_ids_to_fetch.push_back(task.ArgId(i, 0)); indices.push_back(i); + arg_reference_ids->at(i) = task.ArgId(i, 0); } else { // pass by value. std::shared_ptr data = nullptr; @@ -100,7 +131,8 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( metadata = std::make_shared( const_cast(task.ArgMetadata(i)), task.ArgMetadataSize(i)); } - (*args)[i] = std::make_shared(data, metadata); + args->at(i) = std::make_shared(data, metadata); + arg_reference_ids->at(i) = ObjectID::Nil(); } } @@ -108,7 +140,7 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( auto status = object_interface_.Get(object_ids_to_fetch, -1, &results); if (status.ok()) { for (size_t i = 0; i < results.size(); i++) { - (*args)[indices[i]] = results[i]; + args->at(indices[i]) = results[i]; } } diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index 6e25680a1..20e39c405 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -6,6 +6,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/object_interface.h" +#include "ray/core_worker/profiling.h" #include "ray/core_worker/transport/transport.h" #include "ray/rpc/client_call.h" #include "ray/rpc/worker/worker_client.h" @@ -23,21 +24,25 @@ class TaskSpecification; /// execution. class CoreWorkerTaskExecutionInterface { public: - /// The callback provided app-language workers that executes tasks. - /// - /// \param ray_function[in] Information about the function to execute. - /// \param args[in] Arguments of the task. - /// \param results[out] Results of the task execution. - /// \return Status. - using TaskExecutor = std::function> &args, int num_returns, + // Callback that must be implemented and provided by the language-specific worker + // frontend to execute tasks and return their results. + using TaskExecutionCallback = std::function &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, std::vector> *results)>; - CoreWorkerTaskExecutionInterface(WorkerContext &worker_context, + CoreWorkerTaskExecutionInterface(CoreWorker &core_worker, WorkerContext &worker_context, std::unique_ptr &raylet_client, CoreWorkerObjectInterface &object_interface, - const TaskExecutor &executor); + const std::shared_ptr profiler, + const TaskExecutionCallback &task_execution_callback); + + // Get the resource IDs available to this worker (as assigned by the raylet). + const ResourceMappingType &GetResourceIDs() const { return resource_ids_; } /// Start receiving and executing tasks. /// \return void. @@ -54,26 +59,45 @@ class CoreWorkerTaskExecutionInterface { /// just copy their content. /// /// \param spec[in] Task specification. - /// \param args[out] The arguments for passing to task executor. - /// - Status BuildArgsForExecutor(const TaskSpecification &spec, - std::vector> *args); + /// \param args[out] Argument data as RayObjects. + /// \param args[out] ObjectIDs corresponding to each by reference argument. The length + /// of this vector will be the same as args, and by value arguments + /// will have ObjectID::Nil(). + /// // TODO(edoakes): this is a bit of a hack that's necessary because + /// we have separate serialization paths for by-value and by-reference + /// arguments in Python. This should ideally be handled better there. + /// \return The arguments for passing to task executor. + Status BuildArgsForExecutor(const TaskSpecification &task, + std::vector> *args, + std::vector *arg_reference_ids); /// Execute a task. /// /// \param spec[in] Task specification. + /// \param spec[in] Resource IDs of resources assigned to this worker. /// \param results[out] Results for task execution. /// \return Status. - Status ExecuteTask(const TaskSpecification &spec, + Status ExecuteTask(const TaskSpecification &task_spec, + const ResourceMappingType &resource_ids, std::vector> *results); + /// Reference to the parent CoreWorker. + /// TODO(edoakes) this is very ugly, but unfortunately necessary so that we + /// can clear the ActorHandle state when we start executing a task. Two + /// possible solutions are to either move the ActorHandle state into the + /// WorkerContext or to remove this interface entirely. + CoreWorker &core_worker_; + /// Reference to the parent CoreWorker's context. WorkerContext &worker_context_; /// Reference to the parent CoreWorker's objects interface. CoreWorkerObjectInterface &object_interface_; + // Reference to the parent CoreWorker's profiler. + const std::shared_ptr profiler_; + // Task execution callback. - TaskExecutor execution_callback_; + TaskExecutionCallback task_execution_callback_; /// All the task task receivers supported. EnumUnorderedMap> @@ -88,6 +112,15 @@ class CoreWorkerTaskExecutionInterface { /// The asio work to keep main_service_ alive. boost::asio::io_service::work main_work_; + /// A map from resource name to the resource IDs that are currently reserved + /// for this worker. Each pair consists of the resource ID and the fraction + /// of that resource allocated for this worker. + ResourceMappingType resource_ids_; + + // Profile event for when the worker is idle. Should be reset when the worker + // enters and exits an idle period. + std::unique_ptr idle_profile_event_; + friend class CoreWorker; }; diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index d68b7a5a6..f60a8e221 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -25,7 +25,8 @@ class MockWorker { : worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket, JobID::FromInt(1), gcs_options, /*log_dir=*/"", /*node_id_address=*/"127.0.0.1", - std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4)) {} + std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, + _9)) {} void Run() { // Start executing tasks. @@ -33,11 +34,15 @@ class MockWorker { } private: - Status ExecuteTask(const RayFunction &ray_function, - const std::vector> &args, int num_returns, + Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + const JobID &job_id, const ActorID &actor_id, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, std::vector> *results) { // Note that this doesn't include dummy object id. - RAY_CHECK(num_returns >= 0); + RAY_CHECK(return_ids.size() >= 0); // Merge all the content from input args. std::vector buffer; @@ -59,7 +64,7 @@ class MockWorker { std::make_shared(buffer.data(), buffer.size(), true); // Write the merged content to each of return ids. - for (int i = 0; i < num_returns; i++) { + for (size_t i = 0; i < return_ids.size(); i++) { results->push_back(std::make_shared(memory_buffer, nullptr)); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 5e6e992c0..1b14dee76 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -243,12 +243,15 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( // Decrease to account for the dummy object id. num_returns--; + // TODO(edoakes): resource IDs are currently kept track of in the raylet, + // need to come up with a solution for this. + ResourceMappingType resource_ids; std::vector> results; - auto status = task_handler_(task_spec, &results); + auto status = task_handler_(task_spec, resource_ids, &results); RAY_CHECK(results.size() == num_returns) << results.size() << " " << num_returns; for (size_t i = 0; i < results.size(); i++) { - auto return_object = (*reply).add_return_objects(); + auto return_object = reply->add_return_objects(); ObjectID id = ObjectID::ForTaskReturn( task_spec.TaskId(), /*index=*/i + 1, /*transport_type=*/static_cast(TaskTransportType::DIRECT_ACTOR)); diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index e17f3acca..d65a20d5f 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -1,5 +1,6 @@ #include "ray/core_worker/transport/raylet_transport.h" +#include "ray/common/common_protocol.h" #include "ray/common/task/task.h" namespace ray { @@ -28,8 +29,34 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( return; } + // Set the resource IDs for this task. + // TODO: convert the resource map to protobuf and change this. + ResourceMappingType resource_ids; + auto resource_infos = + flatbuffers::GetRoot(request.resource_ids().data()) + ->resource_infos(); + for (size_t i = 0; i < resource_infos->size(); ++i) { + auto const &fractional_resource_ids = resource_infos->Get(i); + auto &acquired_resources = + resource_ids[string_from_flatbuf(*fractional_resource_ids->resource_name())]; + + size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); + size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size(); + RAY_CHECK(num_resource_ids == num_resource_fractions); + RAY_CHECK(num_resource_ids > 0); + for (size_t j = 0; j < num_resource_ids; ++j) { + int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j); + double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j); + if (num_resource_ids > 1) { + int64_t whole_fraction = resource_fraction; + RAY_CHECK(whole_fraction == resource_fraction); + } + acquired_resources.push_back(std::make_pair(resource_id, resource_fraction)); + } + } + std::vector> results; - auto status = task_handler_(task_spec, &results); + auto status = task_handler_(task_spec, resource_ids, &results); auto num_returns = task_spec.NumReturns(); if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) { @@ -40,20 +67,22 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( RAY_LOG(DEBUG) << "Assigned task " << task_spec.TaskId() << " finished execution. num_returns: " << num_returns; - RAY_CHECK(results.size() == num_returns); - for (size_t i = 0; i < num_returns; i++) { - ObjectID id = ObjectID::ForTaskReturn( - task_spec.TaskId(), /*index=*/i + 1, - /*transport_type=*/static_cast(TaskTransportType::RAYLET)); - Status status = object_interface_.Put(*results[i], id); - if (!status.ok()) { - // NOTE(hchen): `PlasmaObjectExists` error is already ignored inside - // `ObjectInterface::Put`, we treat other error types as fatal here. - RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id - << " in store: " << status.message(); - } else { - RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " put object " << id - << " in store."; + if (results.size() != 0) { + RAY_CHECK(results.size() == num_returns); + for (size_t i = 0; i < num_returns; i++) { + ObjectID id = ObjectID::ForTaskReturn( + task_spec.TaskId(), /*index=*/i + 1, + /*transport_type=*/static_cast(TaskTransportType::RAYLET)); + Status status = object_interface_.Put(*results[i], id); + if (!status.ok()) { + // NOTE(hchen): `PlasmaObjectExists` error is already ignored inside + // `ObjectInterface::Put`, we treat other error types as fatal here. + RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id + << " in store: " << status.message(); + } else { + RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " put object " << id + << " in store."; + } } } diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index 51a8d9549..1e54e09e8 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -15,9 +15,9 @@ namespace ray { /// This class receives tasks for execution. class CoreWorkerTaskReceiver { public: - using TaskHandler = - std::function> *results)>; + using TaskHandler = std::function> *results)>; virtual ~CoreWorkerTaskReceiver() {} }; diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 39ef1fc49..feb278c8a 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -115,12 +115,16 @@ table ResourceIdSetInfo { table DisconnectClient { } +table ResourceIdSetInfos { + resource_infos: [ResourceIdSetInfo]; +} + // This message is sent from the raylet to a worker. table GetTaskReply { // A string of bytes representing the task specification. task_spec: string; // A list of the resources reserved for this worker. - fractional_resource_ids: [ResourceIdSetInfo]; + fractional_resource_ids: ResourceIdSetInfos; } // This struct is used to register a new worker with the raylet. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 05f53b272..e2261a0c8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -167,11 +167,11 @@ int main(int argc, char *argv[]) { // We should stop the service and remove the local socket file. auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client]( const boost::system::error_code &error, int signal_number) { + RAY_LOG(INFO) << "Raylet received SIGTERM, shutting down..."; auto shutdown_callback = [&server, &main_service, &gcs_client]() { server.reset(); gcs_client->Disconnect(); main_service.stop(); - RAY_LOG(INFO) << "Raylet server received SIGTERM message, shutting down..."; }; RAY_CHECK_OK(gcs_client->client_table().Disconnect(shutdown_callback)); // Give a timeout for this Disconnect operation. @@ -180,6 +180,7 @@ int main(int argc, char *argv[]) { timer.expires_from_now(stop_timeout); timer.async_wait([shutdown_callback](const boost::system::error_code &error) { if (!error) { + RAY_LOG(INFO) << "Disconnect from client table timed out, forcing shutdown."; shutdown_callback(); } }); diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 36dd2caa7..029397559 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -238,9 +238,10 @@ ray::Status RayletClient::GetTask(std::unique_ptr *task_ auto reply_message = flatbuffers::GetRoot(reply.get()); // Set the resource IDs for this task. resource_ids_.clear(); - for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); ++i) { + for (size_t i = 0; + i < reply_message->fractional_resource_ids()->resource_infos()->size(); ++i) { auto const &fractional_resource_ids = - reply_message->fractional_resource_ids()->Get(i); + reply_message->fractional_resource_ids()->resource_infos()->Get(i); auto &acquired_resources = resource_ids_[string_from_flatbuf(*fractional_resource_ids->resource_name())]; diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index e8256bf78..dc2b223d7 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -139,7 +139,7 @@ void Worker::AssignTask(const Task &task, const ResourceIdSet &resource_id_set, auto status = rpc_client_->AssignTask(request, [](Status status, const rpc::AssignTaskReply &reply) { if (!status.ok()) { - RAY_LOG(ERROR) << "Worker failed to finish executing task: " << status.ToString(); + RAY_LOG(DEBUG) << "Worker failed to finish executing task: " << status.ToString(); } // Worker has finished this task. There's nothing to do here // and assigning new task will be done when raylet receives @@ -161,7 +161,8 @@ void Worker::AssignTask(const Task &task, const ResourceIdSet &resource_id_set, auto message = protocol::CreateGetTaskReply(fbb, fbb.CreateString(spec.Serialize()), - fbb.CreateVector(resource_id_set_flatbuf)); + protocol::CreateResourceIdSetInfos( + fbb, fbb.CreateVector(resource_id_set_flatbuf))); fbb.Finish(message); Connection()->WriteMessageAsync( static_cast(protocol::MessageType::ExecuteTask), fbb.GetSize(), diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index b303fb602..984c95736 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -3,7 +3,6 @@ #include #include -#include #include "ray/common/constants.h" #include "ray/common/ray_config.h" diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 27935f489..fe1462535 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -177,18 +177,28 @@ class ClientCallManager { void PollEventsFromCompletionQueue() { void *got_tag; bool ok = false; + auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); // Keep reading events from the `CompletionQueue` until it's shutdown. - while (cq_.Next(&got_tag, &ok)) { - auto tag = reinterpret_cast(got_tag); - if (ok && !main_service_.stopped()) { - // Post the callback to the main event loop. - main_service_.post([tag]() { - tag->GetCall()->OnReplyReceived(); - // The call is finished, and we can delete this tag now. + // NOTE(edoakes): we use AsyncNext here because for some unknown reason, + // synchronous cq_.Next blocks indefinitely in the case that the process + // received a SIGTERM. + while (true) { + auto status = cq_.AsyncNext(&got_tag, &ok, deadline); + if (status == grpc::CompletionQueue::SHUTDOWN) { + break; + } + if (status != grpc::CompletionQueue::TIMEOUT) { + auto tag = reinterpret_cast(got_tag); + if (ok && !main_service_.stopped()) { + // Post the callback to the main event loop. + main_service_.post([tag]() { + tag->GetCall()->OnReplyReceived(); + // The call is finished, and we can delete this tag now. + delete tag; + }); + } else { delete tag; - }); - } else { - delete tag; + } } } }