diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 1118313ae..8fb884ff8 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -78,27 +78,11 @@ from ray.profiling import profile # noqa: E402 from ray.state import (jobs, nodes, actors, objects, timeline, object_transfer_timeline, cluster_resources, available_resources) # noqa: E402 -from ray.worker import ( - LOCAL_MODE, - SCRIPT_MODE, - WORKER_MODE, - cancel, - connect, - disconnect, - get, - get_actor, - get_gpu_ids, - get_resource_ids, - get_webui_url, - init, - is_initialized, - put, - kill, - register_custom_serializer, - remote, - shutdown, - show_in_webui, - wait, +from ray.worker import ( # noqa: F401 + LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, IO_WORKER_MODE, cancel, connect, + disconnect, get, get_actor, get_gpu_ids, get_resource_ids, get_webui_url, + init, is_initialized, put, kill, register_custom_serializer, remote, + shutdown, show_in_webui, wait, ) # noqa: E402 import ray.internal # noqa: E402 import ray.projects # noqa: E402 diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5a52a22c1..b0bb90957 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -66,6 +66,7 @@ from ray.includes.common cimport ( TASK_TYPE_ACTOR_TASK, WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, + WORKER_TYPE_IO_WORKER, PLACEMENT_STRATEGY_PACK, PLACEMENT_STRATEGY_SPREAD, ) @@ -90,6 +91,7 @@ from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor import ray +from ray import external_storage from ray.async_compat import ( sync_to_async, get_new_event_loop) import ray.memory_monitor as memory_monitor @@ -590,6 +592,49 @@ cdef void gc_collect() nogil: num_freed, end - start)) +cdef c_vector[c_string] spill_objects_handler( + const c_vector[CObjectID]& object_ids_to_spill) nogil: + cdef c_vector[c_string] return_urls + with gil: + object_refs = VectorToObjectRefs(object_ids_to_spill) + try: + urls = external_storage.spill_objects(object_refs) + for url in urls: + return_urls.push_back(url) + except Exception: + exception_str = ( + "An unexpected internal error occurred while the IO worker " + "was spilling objects.") + logger.exception(exception_str) + ray.utils.push_error_to_driver( + ray.worker.global_worker, + "io_worker_spill_objects_error", + traceback.format_exc() + exception_str, + job_id=None) + return return_urls + + +cdef void restore_spilled_objects_handler( + const c_vector[c_string]& object_urls) nogil: + with gil: + urls = [] + size = object_urls.size() + for i in range(size): + urls.append(object_urls[i]) + try: + external_storage.restore_spilled_objects(urls) + except Exception: + exception_str = ( + "An unexpected internal error occurred while the IO worker " + "was restoring spilled objects.") + logger.exception(exception_str) + ray.utils.push_error_to_driver( + ray.worker.global_worker, + "io_worker_retore_spilled_objects_error", + traceback.format_exc() + exception_str, + job_id=None) + + # This function introduces ~2-7us of overhead per call (i.e., it can be called # up to hundreds of thousands of times per second). cdef void get_py_stack(c_string* stack_out) nogil: @@ -650,17 +695,25 @@ cdef void terminate_asyncio_thread() nogil: cdef class CoreWorker: - def __cinit__(self, is_driver, store_socket, raylet_socket, + def __cinit__(self, worker_type, store_socket, raylet_socket, JobID job_id, GcsClientOptions gcs_options, log_dir, node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, serialized_job_config, metrics_agent_port): - self.is_driver = is_driver self.is_local_mode = local_mode cdef CCoreWorkerOptions options = CCoreWorkerOptions() - options.worker_type = ( - WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER) + if worker_type in (ray.LOCAL_MODE, ray.SCRIPT_MODE): + self.is_driver = True + options.worker_type = WORKER_TYPE_DRIVER + elif worker_type == ray.WORKER_MODE: + self.is_driver = False + options.worker_type = WORKER_TYPE_WORKER + elif worker_type == ray.IO_WORKER_MODE: + self.is_driver = False + options.worker_type = WORKER_TYPE_IO_WORKER + else: + raise ValueError(f"Unknown worker type: {worker_type}") options.language = LANGUAGE_PYTHON options.store_socket = store_socket.encode("ascii") options.raylet_socket = raylet_socket.encode("ascii") @@ -678,6 +731,8 @@ cdef class CoreWorker: options.task_execution_callback = task_execution_handler options.check_signals = check_signals options.gc_collect = gc_collect + options.spill_objects = spill_objects_handler + options.restore_spilled_objects = restore_spilled_objects_handler options.get_lang_stack = get_py_stack options.ref_counting_enabled = True options.is_local_mode = local_mode @@ -725,15 +780,15 @@ cdef class CoreWorker: return self.plasma_event_handler def get_objects(self, object_refs, TaskID current_task_id, - int64_t timeout_ms=-1): + int64_t timeout_ms=-1, plasma_objects_only=False): cdef: c_vector[shared_ptr[CRayObject]] results CTaskID c_task_id = current_task_id.native() c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs) - + c_bool _plasma_objects_only = plasma_objects_only with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().Get( - c_object_ids, timeout_ms, &results)) + c_object_ids, timeout_ms, &results, _plasma_objects_only)) return RayObjectsToDataMetadataPairs(results) @@ -771,6 +826,48 @@ cdef class CoreWorker: # and deal with it here. return data.get() == NULL + def put_file_like_object( + self, metadata, data_size, file_like, ObjectRef object_ref=None): + """Directly create a new Plasma Store object from a file like + object. This avoids extra memory copy. + + Args: + metadata (bytes): The metadata of the object. + data_size (int): The size of the data buffer. + file_like: A python file object that provides the `readinto` + interface. + object_ref: The new ObjectRef. + """ + cdef: + CObjectID c_object_id + shared_ptr[CBuffer] data_buf + shared_ptr[CBuffer] metadata_buf + int64_t put_threshold + c_bool put_small_object_in_memory_store + c_vector[CObjectID] c_object_id_vector + # TODO(suquark): This method does not support put objects to + # in memory store currently. + metadata_buf = string_to_buffer(metadata) + object_already_exists = self._create_put_buffer( + metadata_buf, data_size, object_ref, + ObjectRefsToVector([]), + &c_object_id, &data_buf) + if object_already_exists: + logger.debug("Object already exists in 'put_file_like_object'.") + return + data = Buffer.make(data_buf) + view = memoryview(data) + index = 0 + while index < data_size: + bytes_read = file_like.readinto(view[index:]) + index += bytes_read + with nogil: + # Using custom object refs is not supported because we + # can't track their lifecycle, so we don't pin the object + # in this case. + check_status(CCoreWorkerProcess.GetCoreWorker().Seal( + c_object_id, pin_object=object_ref is None)) + def put_serialized_object(self, serialized_object, ObjectRef object_ref=None, c_bool pin_object=True): @@ -1342,6 +1439,20 @@ cdef class CoreWorker: resource_name.encode("ascii"), capacity, CClientID.FromBinary(client_id.binary())) + def force_spill_objects(self, object_refs): + cdef c_vector[CObjectID] object_ids + object_ids = ObjectRefsToVector(object_refs) + with nogil: + check_status(CCoreWorkerProcess.GetCoreWorker() + .ForceSpillObjects(object_ids)) + + def force_restore_spilled_objects(self, object_refs): + cdef c_vector[CObjectID] object_ids + object_ids = ObjectRefsToVector(object_refs) + with nogil: + check_status(CCoreWorkerProcess.GetCoreWorker() + .ForceRestoreSpilledObjects(object_ids)) + cdef void async_set_result(shared_ptr[CRayObject] obj, CObjectID object_ref, void *future) with gil: diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 64db9e7fc..cd0878256 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -1,10 +1,13 @@ from .api import get, wait from .dynamic_resources import set_resource +from .object_spilling import force_spill_objects, force_restore_spilled_objects from .placement_group import ( placement_group, ) __all__ = [ "get", "wait", "set_resource", + "force_spill_objects", + "force_restore_spilled_objects", "placement_group", ] diff --git a/python/ray/experimental/object_spilling.py b/python/ray/experimental/object_spilling.py new file mode 100644 index 000000000..6a61ae22b --- /dev/null +++ b/python/ray/experimental/object_spilling.py @@ -0,0 +1,35 @@ +import ray + + +def force_spill_objects(object_refs): + """Force spilling objects to external storage. + + Args: + object_refs: Object refs of the objects to be + spilled. + """ + core_worker = ray.worker.global_worker.core_worker + # Make sure that the values are object refs. + for object_ref in object_refs: + if not isinstance(object_ref, ray.ObjectRef): + raise TypeError( + f"Attempting to call `force_spill_objects` on the " + f"value {object_ref}, which is not an ray.ObjectRef.") + return core_worker.force_spill_objects(object_refs) + + +def force_restore_spilled_objects(object_refs): + """Force restoring objects from external storage. + + Args: + object_refs: Object refs of the objects to be + restored. + """ + core_worker = ray.worker.global_worker.core_worker + # Make sure that the values are object refs. + for object_ref in object_refs: + if not isinstance(object_ref, ray.ObjectRef): + raise TypeError( + f"Attempting to call `force_restore_spilled_objects` on the " + f"value {object_ref}, which is not an ray.ObjectRef.") + return core_worker.force_restore_spilled_objects(object_refs) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py new file mode 100644 index 000000000..093ab2bd4 --- /dev/null +++ b/python/ray/external_storage.py @@ -0,0 +1,129 @@ +import abc +import os +from typing import List +import ray + + +class ExternalStorage(metaclass=abc.ABCMeta): + """The base class for external storage. + + This class provides some useful functions for zero-copy object + put/get from plasma store. Also it specifies the interface for + object spilling. + """ + + def _get_objects_from_store(self, object_refs): + worker = ray.worker.global_worker + ray_object_pairs = worker.core_worker.get_objects( + object_refs, + worker.current_task_id, + timeout_ms=0, + plasma_objects_only=True) + return ray_object_pairs + + def _put_object_to_store(self, metadata, data_size, file_like, object_ref): + worker = ray.worker.global_worker + worker.core_worker.put_file_like_object(metadata, data_size, file_like, + object_ref) + + @abc.abstractmethod + def spill_objects(self, object_refs): + """Spill objects to the external storage. Objects are specified + by their object refs. + + Args: + object_refs: The list of the refs of the objects to be spilled. + Returns: + A list of keys corresponding to the input object refs. + """ + + @abc.abstractmethod + def restore_spilled_objects(self, keys: List[bytes]): + """Spill objects to the external storage. Objects are specified + by their object refs. + + Args: + keys: A list of bytes corresponding to the spilled objects. + """ + + +class NullStorage(ExternalStorage): + """The class that represents an uninitialized external storage.""" + + def spill_objects(self, object_refs): + raise NotImplementedError("External storage is not initialized") + + def restore_spilled_objects(self, keys): + raise NotImplementedError("External storage is not initialized") + + +class FileSystemStorage(ExternalStorage): + """The class for filesystem-like external storage.""" + + def __init__(self, directory_path): + self.directory_path = directory_path + self.prefix = "ray_spilled_object_" + + def spill_objects(self, object_refs): + keys = [] + ray_object_pairs = self._get_objects_from_store(object_refs) + for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): + filename = self.prefix + ref.hex() + with open(os.path.join(self.directory_path, filename), "wb") as f: + metadata_len = len(metadata) + buf_len = len(buf) + f.write(metadata_len.to_bytes(8, byteorder="little")) + f.write(buf_len.to_bytes(8, byteorder="little")) + f.write(metadata) + f.write(memoryview(buf)) + keys.append(filename.encode()) + return keys + + def restore_spilled_objects(self, keys): + for k in keys: + filename = k.decode() + ref = ray.ObjectRef(bytes.fromhex(filename[len(self.prefix):])) + with open(os.path.join(self.directory_path, filename), "rb") as f: + metadata_len = int.from_bytes(f.read(8), byteorder="little") + buf_len = int.from_bytes(f.read(8), byteorder="little") + metadata = f.read(metadata_len) + # read remaining data to our buffer + self._put_object_to_store(metadata, buf_len, f, ref) + + +_external_storage = NullStorage() + + +def setup_external_storage(config): + """Setup the external storage according to the config.""" + global _external_storage + if config: + storage_type = config["type"] + if storage_type == "filesystem": + _external_storage = FileSystemStorage(**config["params"]) + else: + raise ValueError(f"Unknown external storage type: {storage_type}") + else: + _external_storage = NullStorage() + + +def spill_objects(object_refs): + """Spill objects to the external storage. Objects are specified + by their object refs. + + Args: + object_refs: The list of the refs of the objects to be spilled. + Returns: + A list of keys corresponding to the input object refs. + """ + return _external_storage.spill_objects(object_refs) + + +def restore_spilled_objects(keys: List[bytes]): + """Spill objects to the external storage. Objects are specified + by their object refs. + + Args: + keys: A list of bytes corresponding to the spilled objects. + """ + _external_storage.restore_spilled_objects(keys) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 0fdf3036f..5c8917fc9 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -162,6 +162,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER" cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER" + cdef CWorkerType WORKER_TYPE_IO_WORKER "ray::WorkerType::IO_WORKER" cdef extern from "src/ray/protobuf/common.pb.h" nogil: cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK" diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 120b8a40f..871e5d0c3 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -165,7 +165,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: shared_ptr[CBuffer] *data) CRayStatus Seal(const CObjectID &object_id, c_bool pin_object) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, - c_vector[shared_ptr[CRayObject]] *results) + c_vector[shared_ptr[CRayObject]] *results, + c_bool plasma_objects_only) CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, int64_t timeout_ms, c_vector[c_bool] *results) @@ -192,6 +193,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id) + CRayStatus ForceSpillObjects(const c_vector[CObjectID] &object_ids) + CRayStatus ForceRestoreSpilledObjects( + const c_vector[CObjectID] &object_ids) cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions": CWorkerType worker_type @@ -220,6 +224,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: ) task_execution_callback (CRayStatus() nogil) check_signals (void() nogil) gc_collect + (c_vector[c_string](const c_vector[CObjectID]&) nogil) spill_objects + (void(const c_vector[c_string]&) nogil) restore_spilled_objects (void(c_string *stack_out) nogil) get_lang_stack c_bool ref_counting_enabled c_bool is_local_mode diff --git a/python/ray/node.py b/python/ray/node.py index b56efe498..aa4ab55d4 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -718,7 +718,8 @@ class Node: socket_to_use=self.socket, head_node=self.head, start_initial_python_workers_for_first_job=self._ray_params. - start_initial_python_workers_for_first_job) + start_initial_python_workers_for_first_job, + object_spilling_config=self._ray_params.object_spilling_config) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 07e36db26..b7eebdc26 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -145,7 +145,8 @@ class RayParams: enable_object_reconstruction=False, metrics_agent_port=None, metrics_export_port=None, - lru_evict=False): + lru_evict=False, + object_spilling_config=None): self.object_ref_seed = object_ref_seed self.redis_address = redis_address self.num_cpus = num_cpus @@ -190,6 +191,7 @@ class RayParams: self._internal_config = _internal_config self._lru_evict = lru_evict self._enable_object_reconstruction = enable_object_reconstruction + self.object_spilling_config = object_spilling_config self._check_usage() # Set the internal config options for LRU eviction. diff --git a/python/ray/services.py b/python/ray/services.py index f5cc4d426..715cc3093 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1289,7 +1289,8 @@ def start_raylet(redis_address, fate_share=None, socket_to_use=None, head_node=False, - start_initial_python_workers_for_first_job=False): + start_initial_python_workers_for_first_job=False, + object_spilling_config=None): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1398,6 +1399,10 @@ def start_raylet(redis_address, if load_code_from_local: start_worker_command += ["--load-code-from-local"] + if object_spilling_config: + start_worker_command.append( + f"--object-spilling-config={json.dumps(object_spilling_config)}") + command = [ RAYLET_EXECUTABLE, "--raylet_socket_name={}".format(raylet_name), diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 50f7be593..b1572019b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -48,6 +48,7 @@ py_test_module_list( "test_stress_sharded.py", "test_unreconstructable_errors.py", "test_tensorflow.py", + "test_object_spilling.py", ], size = "medium", extra_srcs = SRCS, diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py new file mode 100644 index 000000000..cce0320ef --- /dev/null +++ b/python/ray/tests/test_object_spilling.py @@ -0,0 +1,140 @@ +import json +import random +import time + +import numpy as np +import pytest +import ray + + +def test_spill_objects_manually(shutdown_only): + # Limit our object store to 75 MiB of memory. + ray.init( + object_store_memory=75 * 1024 * 1024, + object_spilling_config={ + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } + }, + _internal_config=json.dumps({ + "object_store_full_max_retries": 0, + "max_io_workers": 4, + })) + arr = np.random.rand(1024 * 1024) # 8 MB data + replay_buffer = [] + pinned_objects = set() + spilled_objects = set() + + # Create objects of more than 200 MiB. + for _ in range(25): + ref = None + while ref is None: + try: + ref = ray.put(arr) + replay_buffer.append(ref) + pinned_objects.add(ref) + except ray.exceptions.ObjectStoreFullError: + ref_to_spill = pinned_objects.pop() + ray.experimental.force_spill_objects([ref_to_spill]) + spilled_objects.add(ref_to_spill) + + # Spill 2 more objects so we will always have enough space for + # restoring objects back. + refs_to_spill = (pinned_objects.pop(), pinned_objects.pop()) + ray.experimental.force_spill_objects(refs_to_spill) + spilled_objects.update(refs_to_spill) + + # randomly sample objects + for _ in range(100): + ref = random.choice(replay_buffer) + if ref in spilled_objects: + ray.experimental.force_restore_spilled_objects([ref]) + sample = ray.get(ref) + assert np.array_equal(sample, arr) + + +def test_spill_objects_manually_from_workers(shutdown_only): + # Limit our object store to 100 MiB of memory. + ray.init( + object_store_memory=100 * 1024 * 1024, + object_spilling_config={ + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } + }, + _internal_config=json.dumps({ + "object_store_full_max_retries": 0, + "max_io_workers": 4, + })) + + @ray.remote + def _worker(): + arr = np.random.rand(100 * 1024) + ref = ray.put(arr) + ray.experimental.force_spill_objects([ref]) + ray.experimental.force_restore_spilled_objects([ref]) + assert np.array_equal(ray.get(ref), arr) + + ray.get([_worker.remote() for _ in range(50)]) + + +def test_spill_objects_manually_with_workers(shutdown_only): + # Limit our object store to 75 MiB of memory. + ray.init( + object_store_memory=100 * 1024 * 1024, + object_spilling_config={ + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } + }, + _internal_config=json.dumps({ + "object_store_full_max_retries": 0, + "max_io_workers": 4, + })) + arrays = [np.random.rand(100 * 1024) for _ in range(50)] + objects = [ray.put(arr) for arr in arrays] + + @ray.remote + def _worker(object_refs): + ray.experimental.force_spill_objects(object_refs) + + ray.get([_worker.remote([o]) for o in objects]) + + for restored, arr in zip(ray.get(objects), arrays): + assert np.array_equal(restored, arr) + + +@pytest.mark.skip(reason="have not been fully implemented") +def test_spill_objects_automatically(shutdown_only): + # Limit our object store to 75 MiB of memory. + ray.init( + object_store_memory=75 * 1024 * 1024, + _internal_config=json.dumps({ + "max_io_workers": 4, + "object_store_full_max_retries": 2, + "object_store_full_initial_delay_ms": 10, + "auto_object_spilling": True, + })) + arr = np.random.rand(1024 * 1024) # 8 MB data + replay_buffer = [] + + # Wait raylet for starting an IO worker. + time.sleep(1) + + # Create objects of more than 800 MiB. + for _ in range(100): + ref = None + while ref is None: + ref = ray.put(arr) + replay_buffer.append(ref) + + print("-----------------------------------") + + # randomly sample objects + for _ in range(1000): + ref = random.choice(replay_buffer) + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, arr) diff --git a/python/ray/worker.py b/python/ray/worker.py index ee0e073f2..f3fa074e8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -53,6 +53,7 @@ from ray.utils import (_random_string, check_oversized_pickle, is_cython, SCRIPT_MODE = 0 WORKER_MODE = 1 LOCAL_MODE = 2 +IO_WORKER_MODE = 3 ERROR_KEY_PREFIX = b"Error:" @@ -513,7 +514,8 @@ def init(address=None, _internal_config=None, lru_evict=False, enable_object_reconstruction=False, - _metrics_export_port=None): + _metrics_export_port=None, + object_spilling_config=None): """ Connect to an existing Ray cluster or start one and connect to it. @@ -642,6 +644,8 @@ def init(address=None, _metrics_export_port(int): Port number Ray exposes system metrics through a Prometheus endpoint. It is currently under active development, and the API is subject to change. + object_spilling_config (str): The configuration json string for object + spilling I/O worker. Returns: Address information about the started processes. @@ -737,7 +741,8 @@ def init(address=None, _internal_config=_internal_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, - metrics_export_port=_metrics_export_port) + metrics_export_port=_metrics_export_port, + object_spilling_config=object_spilling_config) # Start the Ray processes. We set shutdown_at_exit=False because we # shutdown the node in the ray.shutdown call that happens in the atexit # handler. We still spawn a reaper process in case the atexit handler @@ -1206,7 +1211,7 @@ def connect(node, worker.redis_client = node.create_redis_client() # Initialize some fields. - if mode is WORKER_MODE: + if mode in (WORKER_MODE, IO_WORKER_MODE): # We should not specify the job_id if it's `WORKER_MODE`. assert job_id is None job_id = JobID.nil() @@ -1260,7 +1265,7 @@ def connect(node, import __main__ as main driver_name = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") - elif mode == WORKER_MODE: + elif mode == WORKER_MODE or mode == IO_WORKER_MODE: # Check the RedirectOutput key in Redis and based on its value redirect # worker output and error to their own files. # This key is set in services.py when Redis is started. @@ -1295,8 +1300,7 @@ def connect(node, job_config = ray.job_config.JobConfig() serialized_job_config = job_config.serialize() worker.core_worker = ray._raylet.CoreWorker( - (mode == SCRIPT_MODE or mode == LOCAL_MODE), - node.plasma_store_socket_name, node.raylet_socket_name, job_id, + mode, node.plasma_store_socket_name, node.raylet_socket_name, job_id, gcs_options, node.get_logs_dir_path(), node.node_ip_address, node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE), driver_name, log_stdout_file_path, log_stderr_file_path, diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index c489d48a5..6eb8ef3b6 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -1,5 +1,6 @@ import argparse import json +import time import ray import ray.actor @@ -80,17 +81,47 @@ parser.add_argument( default=False, action="store_true", help="True if cloudpickle should be used for serialization.") +parser.add_argument( + "--worker-type", + required=False, + type=str, + default="WORKER", + help="Specify the type of the worker process") parser.add_argument( "--metrics-agent-port", required=True, type=int, help="the port of the node's metric agent.") +parser.add_argument( + "--object-spilling-config", + required=False, + type=str, + default="", + help="The configuration of object spilling. Only used by I/O workers.") if __name__ == "__main__": args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) + if args.worker_type == "WORKER": + mode = ray.WORKER_MODE + elif args.worker_type == "IO_WORKER": + mode = ray.IO_WORKER_MODE + else: + raise ValueError("Unknown worker type: " + args.worker_type) + + # NOTE(suquark): We must initialize the external storage before we + # connect to raylet. Otherwise we may receive requests before the + # external storage is intialized. + if mode == ray.IO_WORKER_MODE: + from ray import external_storage + if args.object_spilling_config: + object_spilling_config = json.loads(args.object_spilling_config) + else: + object_spilling_config = {} + external_storage.setup_external_storage(object_spilling_config) + internal_config = {} if args.config_list is not None: config_list = args.config_list.split(",") @@ -125,5 +156,14 @@ if __name__ == "__main__": spawn_reaper=False, connect_only=True) ray.worker._global_node = node - ray.worker.connect(node, mode=ray.WORKER_MODE) - ray.worker.global_worker.main_loop() + + ray.worker.connect(node, mode=mode) + if mode == ray.WORKER_MODE: + ray.worker.global_worker.main_loop() + elif mode == ray.IO_WORKER_MODE: + # It is handled by another thread in the C++ core worker. + # We just need to keep the worker alive. + while True: + time.sleep(100000) + else: + raise ValueError(f"Unexcepted worker mode: {mode}") diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 928fb98a4..6b3629710 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -343,3 +343,6 @@ RAY_CONFIG(bool, ownership_based_object_directory_enabled, false) // The interval where metrics are exported in milliseconds. RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000) + +/// The maximum number of I/O worker that raylet starts. +RAY_CONFIG(int, max_io_workers, 1) diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc index eb30dd04a..7470807d2 100644 --- a/src/ray/core_worker/common.cc +++ b/src/ray/core_worker/common.cc @@ -17,10 +17,13 @@ namespace ray { std::string WorkerTypeString(WorkerType type) { + // TODO(suquark): Use proto3 utils to get the string. if (type == WorkerType::DRIVER) { return "driver"; } else if (type == WorkerType::WORKER) { return "worker"; + } else if (type == WorkerType::IO_WORKER) { + return "io_worker"; } RAY_CHECK(false); return ""; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index edd98cda3..6300cbda9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -305,9 +305,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ std::unordered_map internal_config; local_raylet_client_ = std::shared_ptr(new raylet::RayletClient( io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), - (options_.worker_type == ray::WorkerType::WORKER), - worker_context_.GetCurrentJobID(), options_.language, options_.node_ip_address, - &local_raylet_id, &assigned_port, &internal_config, + options_.worker_type, worker_context_.GetCurrentJobID(), options_.language, + options_.node_ip_address, &local_raylet_id, &assigned_port, &internal_config, options_.serialized_job_config)); connected_ = true; @@ -326,6 +325,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ core_worker_server_->Run(); // Tell the raylet the port that we are listening on. + // NOTE: This also marks the worker as available in Raylet. RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort())); // Set our own address. @@ -378,8 +378,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ options_.store_socket, local_raylet_client_, reference_counter_, options_.check_signals, /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), - boost::bind(&CoreWorker::TriggerGlobalGC, this), - boost::bind(&CoreWorker::CurrentCallSite, this))); + /*on_store_full=*/boost::bind(&CoreWorker::TriggerGlobalGC, this), + /*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this))); memory_store_.reset(new CoreWorkerMemoryStore( [this](const RayObject &object, const ObjectID &object_id) { PutObjectIntoPlasma(object, object_id); @@ -914,7 +914,8 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, } Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, - std::vector> *results) { + std::vector> *results, + bool plasma_objects_only) { results->resize(ids.size(), nullptr); absl::flat_hash_set plasma_object_ids; @@ -924,20 +925,24 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m absl::flat_hash_map> result_map; auto start_time = current_time_ms(); - if (!memory_object_ids.empty()) { - RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_, - &result_map, &got_exception)); - } - - // Erase any objects that were promoted to plasma from the results. These get - // requests will be retried at the plasma store. - for (auto it = result_map.begin(); it != result_map.end();) { - auto current = it++; - if (current->second->IsInPlasmaError()) { - RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get"; - plasma_object_ids.insert(current->first); - result_map.erase(current); + if (!plasma_objects_only) { + if (!memory_object_ids.empty()) { + RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_, + &result_map, &got_exception)); } + + // Erase any objects that were promoted to plasma from the results. These get + // requests will be retried at the plasma store. + for (auto it = result_map.begin(); it != result_map.end();) { + auto current = it++; + if (current->second->IsInPlasmaError()) { + RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get"; + plasma_object_ids.insert(current->first); + result_map.erase(current); + } + } + } else { + plasma_object_ids = std::move(memory_object_ids); } if (!got_exception) { @@ -1169,6 +1174,14 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca return local_raylet_client_->SetResource(resource_name, capacity, client_id); } +Status CoreWorker::ForceSpillObjects(const std::vector &object_ids) { + return local_raylet_client_->ForceSpillObjects(object_ids); +} + +Status CoreWorker::ForceRestoreSpilledObjects(const std::vector &object_ids) { + return local_raylet_client_->ForceRestoreSpilledObjects(object_ids); +} + std::unordered_map AddPlacementGroupConstraint( const std::unordered_map &resources, PlacementGroupID placement_group_id, int64_t bundle_index) { @@ -2103,6 +2116,44 @@ void CoreWorker::HandleLocalGC(const rpc::LocalGCRequest &request, } } +void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request, + rpc::SpillObjectsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (options_.spill_objects != nullptr) { + std::vector object_ids_to_spill; + object_ids_to_spill.reserve(request.object_ids_to_spill_size()); + for (const auto &id_binary : request.object_ids_to_spill()) { + object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary)); + } + std::vector object_urls = options_.spill_objects(object_ids_to_spill); + for (size_t i = 0; i < object_urls.size(); i++) { + reply->add_spilled_objects_url(std::move(object_urls[i])); + } + send_reply_callback(Status::OK(), nullptr, nullptr); + } else { + send_reply_callback(Status::NotImplemented("Spill objects callback not defined"), + nullptr, nullptr); + } +} + +void CoreWorker::HandleRestoreSpilledObjects( + const rpc::RestoreSpilledObjectsRequest &request, + rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (options_.restore_spilled_objects != nullptr) { + std::vector spilled_objects_url; + spilled_objects_url.reserve(request.spilled_objects_url_size()); + for (const auto &url : request.spilled_objects_url()) { + spilled_objects_url.push_back(url); + } + options_.restore_spilled_objects(spilled_objects_url); + send_reply_callback(Status::OK(), nullptr, nullptr); + } else { + send_reply_callback( + Status::NotImplemented("Restore spilled objects callback not defined"), nullptr, + nullptr); + } +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c1eb621bb..31211d867 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -105,6 +105,10 @@ struct CoreWorkerOptions { /// runtime. This is required to free distributed references that may otherwise /// be held up in garbage objects. std::function gc_collect; + /// Application-language callback to spill objects to external storage. + std::function(const std::vector &)> spill_objects; + /// Application-language callback to restore objects from external storage. + std::function &)> restore_spilled_objects; /// Language worker callback to get the current call stack. std::function get_lang_stack; // Function that tries to interrupt the currently running Python thread. @@ -490,9 +494,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] ids IDs of the objects to get. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[out] results Result list of objects data. + /// \param[in] plasma_objects_only Only get objects from Plasma Store. /// \return Status. Status Get(const std::vector &ids, const int64_t timeout_ms, - std::vector> *results); + std::vector> *results, + bool plasma_objects_only = false); /// Return whether or not the object store contains the given object. /// @@ -580,6 +586,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status SetResource(const std::string &resource_name, const double capacity, const ClientID &client_id); + /// Force spilling objects to external storage. + /// \param[in] object_ids The objects to be spilled. + /// \return Status + Status ForceSpillObjects(const std::vector &object_ids); + + /// Restore objects from external storage. + /// \param[in] object_ids The objects to be restored. + /// \return Status + Status ForceRestoreSpilledObjects(const std::vector &object_ids); + /// Submit a normal task. /// /// \param[in] function The remote function to execute. @@ -809,6 +825,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void HandleLocalGC(const rpc::LocalGCRequest &request, rpc::LocalGCReply *reply, rpc::SendReplyCallback send_reply_callback) override; + // Spill objects to external storage. + void HandleSpillObjects(const rpc::SpillObjectsRequest &request, + rpc::SpillObjectsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + // Restore objects from external storage. + void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request, + rpc::RestoreSpilledObjectsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index 26a2f7dba..e1acf26f5 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -225,6 +225,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( task_execution_callback, // task_execution_callback nullptr, // check_signals gc_collect, // gc_collect + nullptr, // spill_objects + nullptr, // restore_spilled_objects nullptr, // get_lang_stack nullptr, // kill_main true, // ref_counting_enabled diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 0c2e2a942..af87dd5ef 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -158,6 +158,8 @@ class CoreWorkerTest : public ::testing::Test { nullptr, // task_execution_callback nullptr, // check_signals nullptr, // gc_collect + nullptr, // spill_objects + nullptr, // restore_spilled_objects nullptr, // get_lang_stack nullptr, // kill_main true, // ref_counting_enabled diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index cf4cc26c2..d705d97e5 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -54,6 +54,8 @@ class MockWorker { _7), // task_execution_callback nullptr, // check_signals nullptr, // gc_collect + nullptr, // spill_objects + nullptr, // restore_spilled_objects nullptr, // get_lang_stack nullptr, // kill_main true, // ref_counting_enabled diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 8745b3a57..261fd3f44 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -29,6 +29,7 @@ enum Language { enum WorkerType { WORKER = 0; DRIVER = 1; + IO_WORKER = 2; } // Type of a task. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 80f7ce30e..007c97b70 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -294,6 +294,24 @@ message PlasmaObjectReadyRequest { message PlasmaObjectReadyReply { } +message SpillObjectsRequest { + // The IDs of objects to be spilled. + repeated bytes object_ids_to_spill = 1; +} + +message SpillObjectsReply { + // The URLs of spilled objects. + repeated string spilled_objects_url = 1; +} + +message RestoreSpilledObjectsRequest { + // The URLs of spilled objects. + repeated string spilled_objects_url = 1; +} + +message RestoreSpilledObjectsReply { +} + service CoreWorkerService { // Push a task directly to this worker from another. rpc PushTask(PushTaskRequest) returns (PushTaskReply); @@ -332,6 +350,11 @@ service CoreWorkerService { rpc WaitForRefRemoved(WaitForRefRemovedRequest) returns (WaitForRefRemovedReply); // Trigger local GC on the worker. rpc LocalGC(LocalGCRequest) returns (LocalGCReply); + // Spill objects to external storage. Caller: raylet; callee: I/O worker. + rpc SpillObjects(SpillObjectsRequest) returns (SpillObjectsReply); + // Restore spilled objects from external storage. Caller: raylet; callee: I/O worker. + rpc RestoreSpilledObjects(RestoreSpilledObjectsRequest) + returns (RestoreSpilledObjectsReply); // Notification from raylet that an object ID is available in local plasma. rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); } diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index c5457804f..34cda671c 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -82,6 +82,12 @@ enum MessageType:int { SetResourceRequest, // Subscribe to Plasma updates SubscribePlasmaReady, + // Manually spill objects to external storage. + ForceSpillObjectsRequest, + ForceSpillObjectsReply, + // Manually restore objects from external storage. + ForceRestoreSpilledObjectsRequest, + ForceRestoreSpilledObjectsReply, } table TaskExecutionSpecification { @@ -126,8 +132,9 @@ table ResourceIdSetInfos { // This struct is used to register a new worker with the raylet. // It is shipped as part of raylet_connect. table RegisterClientRequest { - // True if the client is a worker and false if the client is a driver. - is_worker: bool; + // Type of the worker. + // TODO(suquark): Use `WorkerType` in `common.proto`. + worker_type: int; // The ID of the worker or driver. worker_id: string; // The process ID of this worker. @@ -292,3 +299,19 @@ table SubscribePlasmaReady { // ObjectID to wait for object_id: string; } + +table ForceSpillObjectsRequest { + // List of object IDs to be spilled to external storage. + object_ids: [string]; +} + +table ForceSpillObjectsReply { +} + +table ForceRestoreSpilledObjectsRequest { + // List of object IDs to be restored from external storage. + object_ids: [string]; +} + +table ForceRestoreSpilledObjectsReply { +} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f5602dffa..6e9458658 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -507,6 +507,89 @@ void NodeManager::DoLocalGC() { } } +void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill, + std::function callback) { + std::vector objects_ids; + for (const auto &id : objects_ids_to_spill) { + // Do not spill already spilled objects. + if (spilled_objects_.count(id) == 0) { + objects_ids.push_back(id); + } + } + if (objects_ids.empty()) { + if (callback) { + callback(Status::OK()); + } + return; + } + worker_pool_.PopIOWorker( + [this, objects_ids, callback](std::shared_ptr io_worker) { + RAY_LOG(DEBUG) << "Sending object spilling request"; + rpc::SpillObjectsRequest request; + for (const auto &object_id : objects_ids) { + request.add_object_ids_to_spill(object_id.Binary()); + } + io_worker->rpc_client()->SpillObjects( + request, [this, objects_ids, callback, io_worker]( + const ray::Status &status, const rpc::SpillObjectsReply &r) { + worker_pool_.PushIOWorker(io_worker); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to send object spilling request: " + << status.ToString(); + } else { + RAY_CHECK(static_cast(r.spilled_objects_url_size()) == + objects_ids.size()); + for (size_t i = 0; i < objects_ids.size(); ++i) { + const ObjectID &object_id = objects_ids[i]; + const std::string &object_url = r.spilled_objects_url(i); + // TODO(suquark): write to object directory. + spilled_objects_[object_id] = object_url; + auto search = pinned_objects_.find(object_id); + if (search != pinned_objects_.end()) { + pinned_objects_.erase(search); + } else { + RAY_LOG(ERROR) + << "The spilled object " << object_id.Hex() << " is not pinned."; + } + } + } + if (callback) { + callback(status); + } + }); + }); +} + +void NodeManager::RestoreSpilledObjects( + const std::vector &object_ids, + std::function callback) { + std::vector object_urls; + object_urls.reserve(object_ids.size()); + for (const auto &object_id : object_ids) { + object_urls.push_back(spilled_objects_[object_id]); + } + worker_pool_.PopIOWorker([this, object_urls, + callback](std::shared_ptr io_worker) { + RAY_LOG(DEBUG) << "Sending restore spilled object request"; + rpc::RestoreSpilledObjectsRequest request; + for (const auto &url : object_urls) { + request.add_spilled_objects_url(std::move(url)); + } + io_worker->rpc_client()->RestoreSpilledObjects( + request, [this, callback, io_worker](const ray::Status &status, + const rpc::RestoreSpilledObjectsReply &r) { + worker_pool_.PushIOWorker(io_worker); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to send restore spilled object request: " + << status.ToString(); + } + if (callback) { + callback(status); + } + }); + }); +} + // TODO(edoakes): this function is problematic because it both sends warnings spuriously // under normal conditions and sometimes doesn't send a warning under actual deadlock // conditions. The current logic is to push a warning when: all running tasks are @@ -1144,7 +1227,41 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & case protocol::MessageType::SubscribePlasmaReady: { ProcessSubscribePlasmaReady(client, message_data); } break; - + case protocol::MessageType::ForceSpillObjectsRequest: { + auto message = flatbuffers::GetRoot(message_data); + std::vector object_ids = from_flatbuf(*message->object_ids()); + SpillObjects(object_ids, [this, client](const ray::Status &status) { + flatbuffers::FlatBufferBuilder fbb; + flatbuffers::Offset reply = + protocol::CreateForceSpillObjectsReply(fbb); + fbb.Finish(reply); + auto reply_status = client->WriteMessage( + static_cast(protocol::MessageType::ForceSpillObjectsReply), + fbb.GetSize(), fbb.GetBufferPointer()); + if (!reply_status.ok()) { + // We failed to write to the client, so disconnect the client. + ProcessDisconnectClientMessage(client); + } + }); + } break; + case protocol::MessageType::ForceRestoreSpilledObjectsRequest: { + auto message = + flatbuffers::GetRoot(message_data); + std::vector object_ids = from_flatbuf(*message->object_ids()); + RestoreSpilledObjects(object_ids, [this, client](const ray::Status &status) { + flatbuffers::FlatBufferBuilder fbb; + flatbuffers::Offset reply = + protocol::CreateForceRestoreSpilledObjectsReply(fbb); + fbb.Finish(reply); + auto reply_status = client->WriteMessage( + static_cast(protocol::MessageType::ForceRestoreSpilledObjectsReply), + fbb.GetSize(), fbb.GetBufferPointer()); + if (!reply_status.ok()) { + // We failed to write to the client, so disconnect the client. + ProcessDisconnectClientMessage(client); + } + }); + } break; default: RAY_LOG(FATAL) << "Received unexpected message type " << message_type; } @@ -1162,8 +1279,10 @@ void NodeManager::ProcessRegisterClientRequestMessage( WorkerID worker_id = from_flatbuf(*message->worker_id()); pid_t pid = message->worker_pid(); std::string worker_ip_address = string_from_flatbuf(*message->ip_address()); + // TODO(suquark): Use `WorkerType` in `common.proto` without type converting. + rpc::WorkerType worker_type = static_cast(message->worker_type()); auto worker = std::dynamic_pointer_cast(std::make_shared( - worker_id, language, worker_ip_address, client, client_call_manager_)); + worker_id, language, worker_type, worker_ip_address, client, client_call_manager_)); auto send_reply_callback = [this, client](int assigned_port) { flatbuffers::FlatBufferBuilder fbb; @@ -1187,7 +1306,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( }); }; - if (message->is_worker()) { + if (worker_type == rpc::WorkerType::WORKER || + worker_type == rpc::WorkerType::IO_WORKER) { // Register the new worker. RAY_UNUSED(worker_pool_.RegisterWorker(worker, pid, send_reply_callback)); } else { @@ -1238,6 +1358,13 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { RAY_CHECK(worker); + + if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { + // Return the worker to the idle pool. + worker_pool_.PushIOWorker(worker); + return; + } + bool worker_idle = true; // If the worker was assigned a task, mark it as finished. @@ -1400,15 +1527,23 @@ void NodeManager::ProcessFetchOrReconstructMessage( const auto refs = FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses()); if (message->fetch_only()) { + std::vector spilled_object_ids; for (const auto &ref : refs) { ObjectID object_id = ObjectID::FromBinary(ref.object_id()); // If only a fetch is required, then do not subscribe to the // dependencies to the task dependency manager. if (!task_dependency_manager_.CheckObjectLocal(object_id)) { - // Fetch the object if it's not already local. - RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address())); + if (spilled_objects_.count(object_id) > 0) { + spilled_object_ids.push_back(object_id); + } else { + // Fetch the object if it's not already local. + RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address())); + } } } + if (spilled_object_ids.size() > 0) { + RestoreSpilledObjects(spilled_object_ids); + } } else { // The values are needed. Add all requested objects to the list to // subscribe to in the task dependency manager. These objects will be diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f8b8a6797..15192640e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -614,6 +614,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Trigger local GC on each worker of this raylet. void DoLocalGC(); + /// Spill objects to external storage. + /// \param objects_ids_to_spill The objects to be spilled. + void SpillObjects(const std::vector &objects_ids_to_spill, + std::function callback = nullptr); + + /// Restore spilled objects from external storage. + /// \param object_ids Objects to be restored. + void RestoreSpilledObjects(const std::vector &object_ids, + std::function callback = nullptr); + /// Push an error to the driver if this node is full of actors and so we are /// unable to schedule new tasks or actors at all. void WarnResourceDeadlock(); @@ -699,7 +709,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// A mapping from actor ID to registration information about that actor /// (including which node manager owns it). std::unordered_map actor_registry_; - + /// A mapping from ObjectIDs to external object URLs for spilled objects. + /// TODO(suquark): Move it into object directory. + absl::flat_hash_map spilled_objects_; /// This map stores actor ID to the ID of the checkpoint that will be used to /// restore the actor. std::unordered_map checkpoint_id_to_restore_; diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 6133c0dae..27dbd5b24 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -62,6 +62,8 @@ class MockWorker : public WorkerInterface { WorkerID WorkerId() const { return worker_id_; } + rpc::WorkerType GetWorkerType() const { return rpc::WorkerType::WORKER; } + int Port() const { return port_; } void SetOwnerAddress(const rpc::Address &address) { address_ = address; } diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index c37685f1a..d062f49ff 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -27,11 +27,12 @@ namespace raylet { /// A constructor responsible for initializing the state of a worker. Worker::Worker(const WorkerID &worker_id, const Language &language, - const std::string &ip_address, + rpc::WorkerType worker_type, const std::string &ip_address, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager) : worker_id_(worker_id), language_(language), + worker_type_(worker_type), ip_address_(ip_address), assigned_port_(-1), port_(-1), @@ -41,6 +42,8 @@ Worker::Worker(const WorkerID &worker_id, const Language &language, client_call_manager_(client_call_manager), is_detached_actor_(false) {} +rpc::WorkerType Worker::GetWorkerType() const { return worker_type_; } + void Worker::MarkDead() { dead_ = true; } bool Worker::IsDead() const { return dead_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index cb845fbaf..61d811d4f 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -37,6 +37,7 @@ class WorkerInterface { public: /// A destructor responsible for freeing all worker state. virtual ~WorkerInterface() {} + virtual rpc::WorkerType GetWorkerType() const = 0; virtual void MarkDead() = 0; virtual bool IsDead() const = 0; virtual void MarkBlocked() = 0; @@ -117,11 +118,12 @@ class Worker : public WorkerInterface { public: /// A constructor that initializes a worker object. /// NOTE: You MUST manually set the worker process. - Worker(const WorkerID &worker_id, const Language &language, + Worker(const WorkerID &worker_id, const Language &language, rpc::WorkerType worker_type, const std::string &ip_address, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager); /// A destructor responsible for freeing all worker state. ~Worker() {} + rpc::WorkerType GetWorkerType() const; void MarkDead(); bool IsDead() const; void MarkBlocked(); @@ -215,6 +217,8 @@ class Worker : public WorkerInterface { Process proc_; /// The language type of this worker. Language language_; + /// The type of the worker. + rpc::WorkerType worker_type_; /// IP address of this worker. std::string ip_address_; /// Port assigned to this worker by the raylet. If this is 0, the actual diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index c8ebdc0cc..e07ffb41d 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -131,7 +131,7 @@ void WorkerPool::Start(int num_workers) { int num_worker_processes = static_cast( std::ceil(static_cast(num_workers) / state.num_workers_per_process)); for (int i = 0; i < num_worker_processes; i++) { - StartWorkerProcess(entry.first, JobID::Nil()); + StartWorkerProcess(entry.first, ray::rpc::WorkerType::WORKER, JobID::Nil()); } } } @@ -165,7 +165,9 @@ uint32_t WorkerPool::Size(const Language &language) const { } } -Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &job_id, +Process WorkerPool::StartWorkerProcess(const Language &language, + const rpc::WorkerType worker_type, + const JobID &job_id, std::vector dynamic_options) { rpc::JobConfig *job_config = nullptr; if (RayConfig::instance().enable_multi_tenancy()) { @@ -186,6 +188,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo for (auto &entry : state.starting_worker_processes) { starting_workers += entry.second; } + + // Here we consider both task workers and I/O workers. if (starting_workers >= maximum_startup_concurrency_) { // Workers have been started, but not registered. Force start disabled -- returning. RAY_LOG(DEBUG) << "Worker not started, " << starting_workers @@ -271,6 +275,13 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo RAY_CHECK(worker_raylet_config_placeholder_found) << "The " << kWorkerRayletConfigPlaceholder << " placeholder is not found in worker command."; + } else if (language == Language::PYTHON) { + RAY_CHECK(worker_type == rpc::WorkerType::WORKER || + worker_type == rpc::WorkerType::IO_WORKER); + if (worker_type == rpc::WorkerType::IO_WORKER) { + // Without "--worker-type", by default the worker type is rpc::WorkerType::WORKER. + worker_command_args.push_back("--worker-type=" + rpc::WorkerType_Name(worker_type)); + } } std::map env; @@ -286,19 +297,23 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo } RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start << " worker(s) with pid " << proc.GetId(); - MonitorStartingWorkerProcess(proc, language); + MonitorStartingWorkerProcess(proc, language, worker_type); state.starting_worker_processes.emplace(proc, workers_to_start); + if (worker_type == rpc::WorkerType::IO_WORKER) { + state.num_starting_io_workers++; + } return proc; } void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, - const Language &language) { + const Language &language, + const rpc::WorkerType worker_type) { auto timer = std::make_shared( *io_service_, boost::posix_time::seconds( RayConfig::instance().worker_register_timeout_seconds())); // Capture timer in lambda to copy it once, so that it can avoid destructing timer. timer->async_wait( - [timer, language, proc, this](const boost::system::error_code e) -> void { + [timer, language, proc, worker_type, this](const boost::system::error_code e) { // check the error code. auto &state = this->GetStateForLanguage(language); // Since this process times out to start, remove it from starting_worker_processes @@ -308,6 +323,12 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() << ") have not registered to raylet within timeout."; state.starting_worker_processes.erase(it); + if (worker_type == rpc::WorkerType::IO_WORKER) { + // Mark the I/O worker as failed. + state.num_starting_io_workers--; + } + // We may have places to start more workers now. + TryStartIOWorkers(language, state); starting_worker_timeout_callback_(); } }); @@ -397,16 +418,23 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker status = Status::Invalid("Unknown worker"); } else { RAY_RETURN_NOT_OK(GetNextFreePort(&port)); - RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port; + RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port + << ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType()); worker->SetAssignedPort(port); worker->SetProcess(it->first); it->second--; if (it->second == 0) { state.starting_worker_processes.erase(it); + // We may have slots to start more workers now. + TryStartIOWorkers(worker->GetLanguage(), state); } RAY_CHECK(worker->GetProcess().GetId() == pid); state.registered_workers.insert(worker); + if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { + state.registered_io_workers.insert(worker); + state.num_starting_io_workers--; + } if (RayConfig::instance().enable_multi_tenancy()) { auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid); @@ -468,7 +496,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver delay_callback = true; // Start initial Python workers for the first job. for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { - StartWorkerProcess(Language::PYTHON, job_id); + StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); } } } @@ -509,12 +537,39 @@ std::shared_ptr WorkerPool::GetRegisteredDriver( return nullptr; } +void WorkerPool::PushIOWorker(const std::shared_ptr &worker) { + auto &state = GetStateForLanguage(worker->GetLanguage()); + RAY_CHECK(worker->GetWorkerType() == rpc::WorkerType::IO_WORKER); + RAY_LOG(DEBUG) << "Pushing an IO worker to the worker pool."; + if (state.pending_io_tasks.empty()) { + state.idle_io_workers.push(worker); + } else { + auto callback = state.pending_io_tasks.front(); + state.pending_io_tasks.pop(); + callback(worker); + } +} + +void WorkerPool::PopIOWorker( + std::function)> callback) { + auto &state = GetStateForLanguage(Language::PYTHON); + if (state.idle_io_workers.empty()) { + // We must fill the pending task first, because 'TryStartIOWorkers' will + // start I/O workers according to the number of pending tasks. + state.pending_io_tasks.push(callback); + TryStartIOWorkers(Language::PYTHON, state); + } else { + auto io_worker = state.idle_io_workers.front(); + state.idle_io_workers.pop(); + callback(io_worker); + } +} + void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Since the worker is now idle, unset its assigned task ID. RAY_CHECK(worker->GetAssignedTaskId().IsNil()) << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.dedicated_workers_to_tasks.find(worker->GetProcess()); if (it != state.dedicated_workers_to_tasks.end()) { // The worker is used for the actor creation task with dynamic options. @@ -553,8 +608,8 @@ std::shared_ptr WorkerPool::PopWorker( } else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) { // We are not pending a registration from a worker for this task, // so start a new worker process for this task. - proc = StartWorkerProcess(task_spec.GetLanguage(), task_spec.JobId(), - task_spec.DynamicWorkerOptions()); + proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, + task_spec.JobId(), task_spec.DynamicWorkerOptions()); if (proc.IsValid()) { state.dedicated_workers_to_tasks[proc] = task_spec.TaskId(); state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc; @@ -569,7 +624,8 @@ std::shared_ptr WorkerPool::PopWorker( } else { // There are no more non-actor workers available to execute this task. // Start a new worker process. - proc = StartWorkerProcess(task_spec.GetLanguage(), JobID::Nil()); + proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, + JobID::Nil()); } } else { // Find an available worker which is already assigned to this job. @@ -584,7 +640,8 @@ std::shared_ptr WorkerPool::PopWorker( if (worker == nullptr) { // There are no more non-actor workers available to execute this task. // Start a new worker process. - proc = StartWorkerProcess(task_spec.GetLanguage(), task_spec.JobId()); + proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, + task_spec.JobId()); } } } else { @@ -715,6 +772,32 @@ bool WorkerPool::HasPendingWorkerForTask(const Language &language, return it != state.tasks_to_dedicated_workers.end(); } +void WorkerPool::TryStartIOWorkers(const Language &language, State &state) { + if (language != Language::PYTHON) { + return; + } + int available_io_workers_num = + state.num_starting_io_workers + state.registered_io_workers.size(); + int max_workers_to_start = + RayConfig::instance().max_io_workers() - available_io_workers_num; + // Compare first to prevent unsigned underflow. + if (state.pending_io_tasks.size() > state.idle_io_workers.size()) { + int expected_workers_num = + state.pending_io_tasks.size() - state.idle_io_workers.size(); + if (expected_workers_num > max_workers_to_start) { + expected_workers_num = max_workers_to_start; + } + for (; expected_workers_num > 0; expected_workers_num--) { + Process proc = StartWorkerProcess(ray::Language::PYTHON, + ray::rpc::WorkerType::IO_WORKER, JobID::Nil()); + if (!proc.IsValid()) { + // We may hit the maximum worker start up concurrency limit. Stop. + return; + } + } + } +} + std::string WorkerPool::DebugString() const { std::stringstream result; result << "WorkerPool:"; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 59338db30..62988752c 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -162,6 +162,19 @@ class WorkerPool : public WorkerPoolInterface { /// \param The driver to disconnect. The driver must be registered. void DisconnectDriver(const std::shared_ptr &driver); + /// Add an idle I/O worker to the pool. + /// + /// \param worker The idle I/O worker to add. + void PushIOWorker(const std::shared_ptr &worker); + + /// Pop an idle I/O worker from the pool and trigger a callback when + /// an I/O worker is available. + /// The caller is responsible for pushing the worker back onto the + /// pool once the worker has completed its work. + /// + /// \param callback The callback that returns an available I/O worker. + void PopIOWorker(std::function)> callback); + /// Add an idle worker to the pool. /// /// \param The idle worker to add. @@ -229,11 +242,13 @@ class WorkerPool : public WorkerPoolInterface { /// any workers. /// /// \param language Which language this worker process should be. + /// \param worker_type The type of the worker. /// \param job_id The ID of the job to which the started worker process belongs. /// \param dynamic_options The dynamic options that we should add for worker command. /// \return The id of the process that we started if it's positive, /// otherwise it means we didn't start a process. - Process StartWorkerProcess(const Language &language, const JobID &job_id, + Process StartWorkerProcess(const Language &language, const rpc::WorkerType worker_type, + const JobID &job_id, std::vector dynamic_options = {}); /// The implementation of how to start a new worker process with command arguments. @@ -263,6 +278,15 @@ class WorkerPool : public WorkerPoolInterface { std::unordered_set> idle; /// The pool of idle actor workers. std::unordered_map> idle_actor; + /// The pool of idle I/O workers. + std::queue> idle_io_workers; + /// The queue of pending I/O tasks. + std::queue)>> pending_io_tasks; + /// All I/O workers that have registered and are still connected, including both + /// idle and executing. + std::unordered_set> registered_io_workers; + /// Number of starting I/O workers. + int num_starting_io_workers = 0; /// All workers that have registered and are still connected, including both /// idle and executing. std::unordered_set> registered_workers; @@ -306,7 +330,8 @@ class WorkerPool : public WorkerPoolInterface { /// (due to worker process crash or any other reasons), remove them /// from `starting_worker_processes`. Otherwise if we'll mistakenly /// think there are unregistered workers, and won't start new workers. - void MonitorStartingWorkerProcess(const Process &proc, const Language &language); + void MonitorStartingWorkerProcess(const Process &proc, const Language &language, + const rpc::WorkerType worker_type); /// Get the next unallocated port in the free ports list. If a port range isn't /// configured, returns 0. @@ -320,6 +345,13 @@ class WorkerPool : public WorkerPoolInterface { /// \param[in] port The port to mark as free. void MarkPortAsFree(int port); + /// Try start all I/O workers waiting to be started. + /// \param language The language of the I/O worker. Currently only Python I/O + /// workers are effective. + /// \param state The state including the number of I/O workers waiting to be + /// started. + void TryStartIOWorkers(const Language &language, State &state); + /// For Process class for managing subprocesses (e.g. reaping zombies). boost::asio::io_service *io_service_; /// The maximum number of worker processes that can be started concurrently. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 1cf777821..2d799f257 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -119,7 +119,8 @@ class WorkerPoolTest : public ::testing::TestWithParam { ClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); std::shared_ptr worker_ = std::make_shared( - WorkerID::FromRandom(), language, "127.0.0.1", client, client_call_manager_); + WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", client, + client_call_manager_); std::shared_ptr worker = std::dynamic_pointer_cast(worker_); worker->AssignJobId(job_id); @@ -155,7 +156,7 @@ class WorkerPoolTest : public ::testing::TestWithParam { static_cast(desired_initial_worker_process_count)); Process last_started_worker_process; for (int i = 0; i < desired_initial_worker_process_count; i++) { - worker_pool_->StartWorkerProcess(language, JOB_ID); + worker_pool_->StartWorkerProcess(language, rpc::WorkerType::WORKER, JOB_ID); ASSERT_TRUE(worker_pool_->NumWorkerProcessesStarting() <= expected_worker_process_count); Process prev = worker_pool_->LastStartedWorkerProcess(); @@ -233,7 +234,8 @@ TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) { } TEST_P(WorkerPoolTest, HandleWorkerRegistration) { - Process proc = worker_pool_->StartWorkerProcess(Language::JAVA, JOB_ID); + Process proc = + worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID); std::vector> workers; for (int i = 0; i < NUM_WORKERS_PER_PROCESS_JAVA; i++) { workers.push_back(CreateWorker(Process(), Language::JAVA)); @@ -361,7 +363,7 @@ TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { TaskSpecification task_spec = ExampleTaskSpec( ActorID::Nil(), Language::JAVA, JOB_ID, ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1), {"test_op_0", "test_op_1"}); - worker_pool_->StartWorkerProcess(Language::JAVA, JOB_ID, + worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID, task_spec.DynamicWorkerOptions()); const auto real_command = worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess()); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index e9ff24a0c..545569280 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -80,9 +80,9 @@ raylet::RayletClient::RayletClient( raylet::RayletClient::RayletClient( boost::asio::io_service &io_service, std::shared_ptr grpc_client, - const std::string &raylet_socket, const WorkerID &worker_id, bool is_worker, - const JobID &job_id, const Language &language, const std::string &ip_address, - ClientID *raylet_id, int *port, + const std::string &raylet_socket, const WorkerID &worker_id, + rpc::WorkerType worker_type, const JobID &job_id, const Language &language, + const std::string &ip_address, ClientID *raylet_id, int *port, std::unordered_map *internal_config, const std::string &job_config) : grpc_client_(std::move(grpc_client)), @@ -94,9 +94,11 @@ raylet::RayletClient::RayletClient( new raylet::RayletConnection(io_service, raylet_socket, -1, -1)); flatbuffers::FlatBufferBuilder fbb; + // TODO(suquark): Use `WorkerType` in `common.proto` without converting to int. auto message = protocol::CreateRegisterClientRequest( - fbb, is_worker, to_flatbuf(fbb, worker_id), getpid(), to_flatbuf(fbb, job_id), - language, fbb.CreateString(ip_address), /*port=*/0, fbb.CreateString(job_config_)); + fbb, static_cast(worker_type), to_flatbuf(fbb, worker_id), getpid(), + to_flatbuf(fbb, job_id), language, fbb.CreateString(ip_address), /*port=*/0, + fbb.CreateString(job_config_)); fbb.Finish(message); // Register the process ID with the raylet. // NOTE(swang): If raylet exits and we are registered as a worker, we will get killed. @@ -308,6 +310,38 @@ void raylet::RayletClient::RequestWorkerLease( grpc_client_->RequestWorkerLease(request, callback); } +/// Spill objects to external storage. +/// \param object_ids The IDs of objects to be spilled. +Status raylet::RayletClient::ForceSpillObjects(const std::vector &object_ids) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + protocol::CreateForceSpillObjectsRequest(fbb, to_flatbuf(fbb, object_ids)); + fbb.Finish(message); + std::vector reply; + RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::ForceSpillObjectsRequest, + MessageType::ForceSpillObjectsReply, &reply, + &fbb)); + RAY_UNUSED(flatbuffers::GetRoot(reply.data())); + return Status::OK(); +} + +/// Restore spilled objects from external storage. +/// \param object_ids The IDs of objects to be restored. +Status raylet::RayletClient::ForceRestoreSpilledObjects( + const std::vector &object_ids) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + protocol::CreateForceRestoreSpilledObjectsRequest(fbb, to_flatbuf(fbb, object_ids)); + fbb.Finish(message); + std::vector reply; + RAY_RETURN_NOT_OK(conn_->AtomicRequestReply( + MessageType::ForceRestoreSpilledObjectsRequest, + MessageType::ForceRestoreSpilledObjectsReply, &reply, &fbb)); + RAY_UNUSED( + flatbuffers::GetRoot(reply.data())); + return Status::OK(); +} + Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id, bool disconnect_worker) { rpc::ReturnWorkerRequest request; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 78421f43a..ddf80c6ea 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -23,6 +23,7 @@ #include "ray/common/status.h" #include "ray/common/task/task_spec.h" #include "ray/rpc/node_manager/node_manager_client.h" +#include "src/ray/protobuf/common.pb.h" #include "src/ray/protobuf/gcs.pb.h" using ray::ActorCheckpointID; @@ -162,7 +163,7 @@ class RayletClient : public PinObjectsInterface, /// \param grpc_client gRPC client to the raylet. /// \param raylet_socket The name of the socket to use to connect to the raylet. /// \param worker_id A unique ID to represent the worker. - /// \param is_worker Whether this client is a worker. If it is a worker, an + /// \param worker_type The type of the worker. If it is a certain worker type, an /// additional message will be sent to register as one. /// \param job_id The ID of the driver. This is non-nil if the client is a driver. /// \param language Language of the worker. @@ -175,7 +176,7 @@ class RayletClient : public PinObjectsInterface, RayletClient(boost::asio::io_service &io_service, std::shared_ptr grpc_client, const std::string &raylet_socket, const WorkerID &worker_id, - bool is_worker, const JobID &job_id, const Language &language, + rpc::WorkerType worker_type, const JobID &job_id, const Language &language, const std::string &ip_address, ClientID *raylet_id, int *port, std::unordered_map *internal_config, const std::string &job_config); @@ -318,6 +319,16 @@ class RayletClient : public PinObjectsInterface, ray::Status SetResource(const std::string &resource_name, const double capacity, const ray::ClientID &client_Id); + /// Spill objects to external storage. + /// \param object_ids The IDs of objects to be spilled. + /// \return ray::Status + ray::Status ForceSpillObjects(const std::vector &object_ids); + + /// Restore spilled objects from external storage. + /// \param object_ids The IDs of objects to be restored. + /// \return ray::Status + ray::Status ForceRestoreSpilledObjects(const std::vector &object_ids); + /// Implements WorkerLeaseInterface. void RequestWorkerLease( const ray::TaskSpecification &resource_spec, diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index f30046fa5..cfb68df93 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -175,6 +175,13 @@ class CoreWorkerClientInterface { const ClientCallback &callback) { } + virtual void SpillObjects(const SpillObjectsRequest &request, + const ClientCallback &callback) {} + + virtual void RestoreSpilledObjects( + const RestoreSpilledObjectsRequest &request, + const ClientCallback &callback) {} + virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request, const ClientCallback &callback) { } @@ -231,6 +238,10 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, WaitForRefRemoved, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, SpillObjects, grpc_client_, override) + + VOID_RPC_CLIENT_METHOD(CoreWorkerService, RestoreSpilledObjects, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) void PushActorTask(std::unique_ptr request, bool skip_queue, diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index cf0c0e519..4926e047f 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -41,6 +41,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, RemoteCancelTask) \ RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats) \ RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \ + RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \ + RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ @@ -58,6 +60,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RemoteCancelTask) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetCoreWorkerStats) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.