[Core] Object spilling prototype (#9818)

This commit is contained in:
Siyuan (Ryans) Zhuang
2020-08-14 15:39:10 -07:00
committed by GitHub
parent 36e626e95d
commit 17ca1d8ff4
36 changed files with 1026 additions and 95 deletions
+5 -21
View File
@@ -78,27 +78,11 @@ from ray.profiling import profile # noqa: E402
from ray.state import (jobs, nodes, actors, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources) # noqa: E402
from ray.worker import (
LOCAL_MODE,
SCRIPT_MODE,
WORKER_MODE,
cancel,
connect,
disconnect,
get,
get_actor,
get_gpu_ids,
get_resource_ids,
get_webui_url,
init,
is_initialized,
put,
kill,
register_custom_serializer,
remote,
shutdown,
show_in_webui,
wait,
from ray.worker import ( # noqa: F401
LOCAL_MODE, SCRIPT_MODE, WORKER_MODE, IO_WORKER_MODE, cancel, connect,
disconnect, get, get_actor, get_gpu_ids, get_resource_ids, get_webui_url,
init, is_initialized, put, kill, register_custom_serializer, remote,
shutdown, show_in_webui, wait,
) # noqa: E402
import ray.internal # noqa: E402
import ray.projects # noqa: E402
+118 -7
View File
@@ -66,6 +66,7 @@ from ray.includes.common cimport (
TASK_TYPE_ACTOR_TASK,
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
WORKER_TYPE_IO_WORKER,
PLACEMENT_STRATEGY_PACK,
PLACEMENT_STRATEGY_SPREAD,
)
@@ -90,6 +91,7 @@ from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
import ray
from ray import external_storage
from ray.async_compat import (
sync_to_async, get_new_event_loop)
import ray.memory_monitor as memory_monitor
@@ -590,6 +592,49 @@ cdef void gc_collect() nogil:
num_freed, end - start))
cdef c_vector[c_string] spill_objects_handler(
const c_vector[CObjectID]& object_ids_to_spill) nogil:
cdef c_vector[c_string] return_urls
with gil:
object_refs = VectorToObjectRefs(object_ids_to_spill)
try:
urls = external_storage.spill_objects(object_refs)
for url in urls:
return_urls.push_back(url)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "
"was spilling objects.")
logger.exception(exception_str)
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"io_worker_spill_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
return return_urls
cdef void restore_spilled_objects_handler(
const c_vector[c_string]& object_urls) nogil:
with gil:
urls = []
size = object_urls.size()
for i in range(size):
urls.append(object_urls[i])
try:
external_storage.restore_spilled_objects(urls)
except Exception:
exception_str = (
"An unexpected internal error occurred while the IO worker "
"was restoring spilled objects.")
logger.exception(exception_str)
ray.utils.push_error_to_driver(
ray.worker.global_worker,
"io_worker_retore_spilled_objects_error",
traceback.format_exc() + exception_str,
job_id=None)
# This function introduces ~2-7us of overhead per call (i.e., it can be called
# up to hundreds of thousands of times per second).
cdef void get_py_stack(c_string* stack_out) nogil:
@@ -650,17 +695,25 @@ cdef void terminate_asyncio_thread() nogil:
cdef class CoreWorker:
def __cinit__(self, is_driver, store_socket, raylet_socket,
def __cinit__(self, worker_type, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port, raylet_ip_address,
local_mode, driver_name, stdout_file, stderr_file,
serialized_job_config, metrics_agent_port):
self.is_driver = is_driver
self.is_local_mode = local_mode
cdef CCoreWorkerOptions options = CCoreWorkerOptions()
options.worker_type = (
WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER)
if worker_type in (ray.LOCAL_MODE, ray.SCRIPT_MODE):
self.is_driver = True
options.worker_type = WORKER_TYPE_DRIVER
elif worker_type == ray.WORKER_MODE:
self.is_driver = False
options.worker_type = WORKER_TYPE_WORKER
elif worker_type == ray.IO_WORKER_MODE:
self.is_driver = False
options.worker_type = WORKER_TYPE_IO_WORKER
else:
raise ValueError(f"Unknown worker type: {worker_type}")
options.language = LANGUAGE_PYTHON
options.store_socket = store_socket.encode("ascii")
options.raylet_socket = raylet_socket.encode("ascii")
@@ -678,6 +731,8 @@ cdef class CoreWorker:
options.task_execution_callback = task_execution_handler
options.check_signals = check_signals
options.gc_collect = gc_collect
options.spill_objects = spill_objects_handler
options.restore_spilled_objects = restore_spilled_objects_handler
options.get_lang_stack = get_py_stack
options.ref_counting_enabled = True
options.is_local_mode = local_mode
@@ -725,15 +780,15 @@ cdef class CoreWorker:
return self.plasma_event_handler
def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1):
int64_t timeout_ms=-1, plasma_objects_only=False):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
c_bool _plasma_objects_only = plasma_objects_only
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results))
c_object_ids, timeout_ms, &results, _plasma_objects_only))
return RayObjectsToDataMetadataPairs(results)
@@ -771,6 +826,48 @@ cdef class CoreWorker:
# and deal with it here.
return data.get() == NULL
def put_file_like_object(
self, metadata, data_size, file_like, ObjectRef object_ref=None):
"""Directly create a new Plasma Store object from a file like
object. This avoids extra memory copy.
Args:
metadata (bytes): The metadata of the object.
data_size (int): The size of the data buffer.
file_like: A python file object that provides the `readinto`
interface.
object_ref: The new ObjectRef.
"""
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data_buf
shared_ptr[CBuffer] metadata_buf
int64_t put_threshold
c_bool put_small_object_in_memory_store
c_vector[CObjectID] c_object_id_vector
# TODO(suquark): This method does not support put objects to
# in memory store currently.
metadata_buf = string_to_buffer(metadata)
object_already_exists = self._create_put_buffer(
metadata_buf, data_size, object_ref,
ObjectRefsToVector([]),
&c_object_id, &data_buf)
if object_already_exists:
logger.debug("Object already exists in 'put_file_like_object'.")
return
data = Buffer.make(data_buf)
view = memoryview(data)
index = 0
while index < data_size:
bytes_read = file_like.readinto(view[index:])
index += bytes_read
with nogil:
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the object
# in this case.
check_status(CCoreWorkerProcess.GetCoreWorker().Seal(
c_object_id, pin_object=object_ref is None))
def put_serialized_object(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True):
@@ -1342,6 +1439,20 @@ cdef class CoreWorker:
resource_name.encode("ascii"), capacity,
CClientID.FromBinary(client_id.binary()))
def force_spill_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ForceSpillObjects(object_ids))
def force_restore_spilled_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ForceRestoreSpilledObjects(object_ids))
cdef void async_set_result(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *future) with gil:
+3
View File
@@ -1,10 +1,13 @@
from .api import get, wait
from .dynamic_resources import set_resource
from .object_spilling import force_spill_objects, force_restore_spilled_objects
from .placement_group import (
placement_group, )
__all__ = [
"get",
"wait",
"set_resource",
"force_spill_objects",
"force_restore_spilled_objects",
"placement_group",
]
@@ -0,0 +1,35 @@
import ray
def force_spill_objects(object_refs):
"""Force spilling objects to external storage.
Args:
object_refs: Object refs of the objects to be
spilled.
"""
core_worker = ray.worker.global_worker.core_worker
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
f"Attempting to call `force_spill_objects` on the "
f"value {object_ref}, which is not an ray.ObjectRef.")
return core_worker.force_spill_objects(object_refs)
def force_restore_spilled_objects(object_refs):
"""Force restoring objects from external storage.
Args:
object_refs: Object refs of the objects to be
restored.
"""
core_worker = ray.worker.global_worker.core_worker
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
f"Attempting to call `force_restore_spilled_objects` on the "
f"value {object_ref}, which is not an ray.ObjectRef.")
return core_worker.force_restore_spilled_objects(object_refs)
+129
View File
@@ -0,0 +1,129 @@
import abc
import os
from typing import List
import ray
class ExternalStorage(metaclass=abc.ABCMeta):
"""The base class for external storage.
This class provides some useful functions for zero-copy object
put/get from plasma store. Also it specifies the interface for
object spilling.
"""
def _get_objects_from_store(self, object_refs):
worker = ray.worker.global_worker
ray_object_pairs = worker.core_worker.get_objects(
object_refs,
worker.current_task_id,
timeout_ms=0,
plasma_objects_only=True)
return ray_object_pairs
def _put_object_to_store(self, metadata, data_size, file_like, object_ref):
worker = ray.worker.global_worker
worker.core_worker.put_file_like_object(metadata, data_size, file_like,
object_ref)
@abc.abstractmethod
def spill_objects(self, object_refs):
"""Spill objects to the external storage. Objects are specified
by their object refs.
Args:
object_refs: The list of the refs of the objects to be spilled.
Returns:
A list of keys corresponding to the input object refs.
"""
@abc.abstractmethod
def restore_spilled_objects(self, keys: List[bytes]):
"""Spill objects to the external storage. Objects are specified
by their object refs.
Args:
keys: A list of bytes corresponding to the spilled objects.
"""
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""
def spill_objects(self, object_refs):
raise NotImplementedError("External storage is not initialized")
def restore_spilled_objects(self, keys):
raise NotImplementedError("External storage is not initialized")
class FileSystemStorage(ExternalStorage):
"""The class for filesystem-like external storage."""
def __init__(self, directory_path):
self.directory_path = directory_path
self.prefix = "ray_spilled_object_"
def spill_objects(self, object_refs):
keys = []
ray_object_pairs = self._get_objects_from_store(object_refs)
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
filename = self.prefix + ref.hex()
with open(os.path.join(self.directory_path, filename), "wb") as f:
metadata_len = len(metadata)
buf_len = len(buf)
f.write(metadata_len.to_bytes(8, byteorder="little"))
f.write(buf_len.to_bytes(8, byteorder="little"))
f.write(metadata)
f.write(memoryview(buf))
keys.append(filename.encode())
return keys
def restore_spilled_objects(self, keys):
for k in keys:
filename = k.decode()
ref = ray.ObjectRef(bytes.fromhex(filename[len(self.prefix):]))
with open(os.path.join(self.directory_path, filename), "rb") as f:
metadata_len = int.from_bytes(f.read(8), byteorder="little")
buf_len = int.from_bytes(f.read(8), byteorder="little")
metadata = f.read(metadata_len)
# read remaining data to our buffer
self._put_object_to_store(metadata, buf_len, f, ref)
_external_storage = NullStorage()
def setup_external_storage(config):
"""Setup the external storage according to the config."""
global _external_storage
if config:
storage_type = config["type"]
if storage_type == "filesystem":
_external_storage = FileSystemStorage(**config["params"])
else:
raise ValueError(f"Unknown external storage type: {storage_type}")
else:
_external_storage = NullStorage()
def spill_objects(object_refs):
"""Spill objects to the external storage. Objects are specified
by their object refs.
Args:
object_refs: The list of the refs of the objects to be spilled.
Returns:
A list of keys corresponding to the input object refs.
"""
return _external_storage.spill_objects(object_refs)
def restore_spilled_objects(keys: List[bytes]):
"""Spill objects to the external storage. Objects are specified
by their object refs.
Args:
keys: A list of bytes corresponding to the spilled objects.
"""
_external_storage.restore_spilled_objects(keys)
+1
View File
@@ -162,6 +162,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_IO_WORKER "ray::WorkerType::IO_WORKER"
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK"
+7 -1
View File
@@ -165,7 +165,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id, c_bool pin_object)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results)
c_vector[shared_ptr[CRayObject]] *results,
c_bool plasma_objects_only)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
int64_t timeout_ms, c_vector[c_bool] *results)
@@ -192,6 +193,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus SetResource(const c_string &resource_name,
const double capacity,
const CClientID &client_Id)
CRayStatus ForceSpillObjects(const c_vector[CObjectID] &object_ids)
CRayStatus ForceRestoreSpilledObjects(
const c_vector[CObjectID] &object_ids)
cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions":
CWorkerType worker_type
@@ -220,6 +224,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
) task_execution_callback
(CRayStatus() nogil) check_signals
(void() nogil) gc_collect
(c_vector[c_string](const c_vector[CObjectID]&) nogil) spill_objects
(void(const c_vector[c_string]&) nogil) restore_spilled_objects
(void(c_string *stack_out) nogil) get_lang_stack
c_bool ref_counting_enabled
c_bool is_local_mode
+2 -1
View File
@@ -718,7 +718,8 @@ class Node:
socket_to_use=self.socket,
head_node=self.head,
start_initial_python_workers_for_first_job=self._ray_params.
start_initial_python_workers_for_first_job)
start_initial_python_workers_for_first_job,
object_spilling_config=self._ray_params.object_spilling_config)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
+3 -1
View File
@@ -145,7 +145,8 @@ class RayParams:
enable_object_reconstruction=False,
metrics_agent_port=None,
metrics_export_port=None,
lru_evict=False):
lru_evict=False,
object_spilling_config=None):
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
self.num_cpus = num_cpus
@@ -190,6 +191,7 @@ class RayParams:
self._internal_config = _internal_config
self._lru_evict = lru_evict
self._enable_object_reconstruction = enable_object_reconstruction
self.object_spilling_config = object_spilling_config
self._check_usage()
# Set the internal config options for LRU eviction.
+6 -1
View File
@@ -1289,7 +1289,8 @@ def start_raylet(redis_address,
fate_share=None,
socket_to_use=None,
head_node=False,
start_initial_python_workers_for_first_job=False):
start_initial_python_workers_for_first_job=False,
object_spilling_config=None):
"""Start a raylet, which is a combined local scheduler and object manager.
Args:
@@ -1398,6 +1399,10 @@ def start_raylet(redis_address,
if load_code_from_local:
start_worker_command += ["--load-code-from-local"]
if object_spilling_config:
start_worker_command.append(
f"--object-spilling-config={json.dumps(object_spilling_config)}")
command = [
RAYLET_EXECUTABLE,
"--raylet_socket_name={}".format(raylet_name),
+1
View File
@@ -48,6 +48,7 @@ py_test_module_list(
"test_stress_sharded.py",
"test_unreconstructable_errors.py",
"test_tensorflow.py",
"test_object_spilling.py",
],
size = "medium",
extra_srcs = SRCS,
+140
View File
@@ -0,0 +1,140 @@
import json
import random
import time
import numpy as np
import pytest
import ray
def test_spill_objects_manually(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=75 * 1024 * 1024,
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
},
_internal_config=json.dumps({
"object_store_full_max_retries": 0,
"max_io_workers": 4,
}))
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
pinned_objects = set()
spilled_objects = set()
# Create objects of more than 200 MiB.
for _ in range(25):
ref = None
while ref is None:
try:
ref = ray.put(arr)
replay_buffer.append(ref)
pinned_objects.add(ref)
except ray.exceptions.ObjectStoreFullError:
ref_to_spill = pinned_objects.pop()
ray.experimental.force_spill_objects([ref_to_spill])
spilled_objects.add(ref_to_spill)
# Spill 2 more objects so we will always have enough space for
# restoring objects back.
refs_to_spill = (pinned_objects.pop(), pinned_objects.pop())
ray.experimental.force_spill_objects(refs_to_spill)
spilled_objects.update(refs_to_spill)
# randomly sample objects
for _ in range(100):
ref = random.choice(replay_buffer)
if ref in spilled_objects:
ray.experimental.force_restore_spilled_objects([ref])
sample = ray.get(ref)
assert np.array_equal(sample, arr)
def test_spill_objects_manually_from_workers(shutdown_only):
# Limit our object store to 100 MiB of memory.
ray.init(
object_store_memory=100 * 1024 * 1024,
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
},
_internal_config=json.dumps({
"object_store_full_max_retries": 0,
"max_io_workers": 4,
}))
@ray.remote
def _worker():
arr = np.random.rand(100 * 1024)
ref = ray.put(arr)
ray.experimental.force_spill_objects([ref])
ray.experimental.force_restore_spilled_objects([ref])
assert np.array_equal(ray.get(ref), arr)
ray.get([_worker.remote() for _ in range(50)])
def test_spill_objects_manually_with_workers(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=100 * 1024 * 1024,
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
},
_internal_config=json.dumps({
"object_store_full_max_retries": 0,
"max_io_workers": 4,
}))
arrays = [np.random.rand(100 * 1024) for _ in range(50)]
objects = [ray.put(arr) for arr in arrays]
@ray.remote
def _worker(object_refs):
ray.experimental.force_spill_objects(object_refs)
ray.get([_worker.remote([o]) for o in objects])
for restored, arr in zip(ray.get(objects), arrays):
assert np.array_equal(restored, arr)
@pytest.mark.skip(reason="have not been fully implemented")
def test_spill_objects_automatically(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=75 * 1024 * 1024,
_internal_config=json.dumps({
"max_io_workers": 4,
"object_store_full_max_retries": 2,
"object_store_full_initial_delay_ms": 10,
"auto_object_spilling": True,
}))
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Wait raylet for starting an IO worker.
time.sleep(1)
# Create objects of more than 800 MiB.
for _ in range(100):
ref = None
while ref is None:
ref = ray.put(arr)
replay_buffer.append(ref)
print("-----------------------------------")
# randomly sample objects
for _ in range(1000):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
+10 -6
View File
@@ -53,6 +53,7 @@ from ray.utils import (_random_string, check_oversized_pickle, is_cython,
SCRIPT_MODE = 0
WORKER_MODE = 1
LOCAL_MODE = 2
IO_WORKER_MODE = 3
ERROR_KEY_PREFIX = b"Error:"
@@ -513,7 +514,8 @@ def init(address=None,
_internal_config=None,
lru_evict=False,
enable_object_reconstruction=False,
_metrics_export_port=None):
_metrics_export_port=None,
object_spilling_config=None):
"""
Connect to an existing Ray cluster or start one and connect to it.
@@ -642,6 +644,8 @@ def init(address=None,
_metrics_export_port(int): Port number Ray exposes system metrics
through a Prometheus endpoint. It is currently under active
development, and the API is subject to change.
object_spilling_config (str): The configuration json string for object
spilling I/O worker.
Returns:
Address information about the started processes.
@@ -737,7 +741,8 @@ def init(address=None,
_internal_config=_internal_config,
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction,
metrics_export_port=_metrics_export_port)
metrics_export_port=_metrics_export_port,
object_spilling_config=object_spilling_config)
# Start the Ray processes. We set shutdown_at_exit=False because we
# shutdown the node in the ray.shutdown call that happens in the atexit
# handler. We still spawn a reaper process in case the atexit handler
@@ -1206,7 +1211,7 @@ def connect(node,
worker.redis_client = node.create_redis_client()
# Initialize some fields.
if mode is WORKER_MODE:
if mode in (WORKER_MODE, IO_WORKER_MODE):
# We should not specify the job_id if it's `WORKER_MODE`.
assert job_id is None
job_id = JobID.nil()
@@ -1260,7 +1265,7 @@ def connect(node,
import __main__ as main
driver_name = (main.__file__
if hasattr(main, "__file__") else "INTERACTIVE MODE")
elif mode == WORKER_MODE:
elif mode == WORKER_MODE or mode == IO_WORKER_MODE:
# Check the RedirectOutput key in Redis and based on its value redirect
# worker output and error to their own files.
# This key is set in services.py when Redis is started.
@@ -1295,8 +1300,7 @@ def connect(node,
job_config = ray.job_config.JobConfig()
serialized_job_config = job_config.serialize()
worker.core_worker = ray._raylet.CoreWorker(
(mode == SCRIPT_MODE or mode == LOCAL_MODE),
node.plasma_store_socket_name, node.raylet_socket_name, job_id,
mode, node.plasma_store_socket_name, node.raylet_socket_name, job_id,
gcs_options, node.get_logs_dir_path(), node.node_ip_address,
node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE),
driver_name, log_stdout_file_path, log_stderr_file_path,
+42 -2
View File
@@ -1,5 +1,6 @@
import argparse
import json
import time
import ray
import ray.actor
@@ -80,17 +81,47 @@ parser.add_argument(
default=False,
action="store_true",
help="True if cloudpickle should be used for serialization.")
parser.add_argument(
"--worker-type",
required=False,
type=str,
default="WORKER",
help="Specify the type of the worker process")
parser.add_argument(
"--metrics-agent-port",
required=True,
type=int,
help="the port of the node's metric agent.")
parser.add_argument(
"--object-spilling-config",
required=False,
type=str,
default="",
help="The configuration of object spilling. Only used by I/O workers.")
if __name__ == "__main__":
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
if args.worker_type == "WORKER":
mode = ray.WORKER_MODE
elif args.worker_type == "IO_WORKER":
mode = ray.IO_WORKER_MODE
else:
raise ValueError("Unknown worker type: " + args.worker_type)
# NOTE(suquark): We must initialize the external storage before we
# connect to raylet. Otherwise we may receive requests before the
# external storage is intialized.
if mode == ray.IO_WORKER_MODE:
from ray import external_storage
if args.object_spilling_config:
object_spilling_config = json.loads(args.object_spilling_config)
else:
object_spilling_config = {}
external_storage.setup_external_storage(object_spilling_config)
internal_config = {}
if args.config_list is not None:
config_list = args.config_list.split(",")
@@ -125,5 +156,14 @@ if __name__ == "__main__":
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=ray.WORKER_MODE)
ray.worker.global_worker.main_loop()
ray.worker.connect(node, mode=mode)
if mode == ray.WORKER_MODE:
ray.worker.global_worker.main_loop()
elif mode == ray.IO_WORKER_MODE:
# It is handled by another thread in the C++ core worker.
# We just need to keep the worker alive.
while True:
time.sleep(100000)
else:
raise ValueError(f"Unexcepted worker mode: {mode}")
+3
View File
@@ -343,3 +343,6 @@ RAY_CONFIG(bool, ownership_based_object_directory_enabled, false)
// The interval where metrics are exported in milliseconds.
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000)
/// The maximum number of I/O worker that raylet starts.
RAY_CONFIG(int, max_io_workers, 1)
+3
View File
@@ -17,10 +17,13 @@
namespace ray {
std::string WorkerTypeString(WorkerType type) {
// TODO(suquark): Use proto3 utils to get the string.
if (type == WorkerType::DRIVER) {
return "driver";
} else if (type == WorkerType::WORKER) {
return "worker";
} else if (type == WorkerType::IO_WORKER) {
return "io_worker";
}
RAY_CHECK(false);
return "";
+70 -19
View File
@@ -305,9 +305,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::unordered_map<std::string, std::string> internal_config;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
(options_.worker_type == ray::WorkerType::WORKER),
worker_context_.GetCurrentJobID(), options_.language, options_.node_ip_address,
&local_raylet_id, &assigned_port, &internal_config,
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &local_raylet_id, &assigned_port, &internal_config,
options_.serialized_job_config));
connected_ = true;
@@ -326,6 +325,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
core_worker_server_->Run();
// Tell the raylet the port that we are listening on.
// NOTE: This also marks the worker as available in Raylet.
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort()));
// Set our own address.
@@ -378,8 +378,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
options_.store_socket, local_raylet_client_, reference_counter_,
options_.check_signals,
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
boost::bind(&CoreWorker::TriggerGlobalGC, this),
boost::bind(&CoreWorker::CurrentCallSite, this)));
/*on_store_full=*/boost::bind(&CoreWorker::TriggerGlobalGC, this),
/*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this)));
memory_store_.reset(new CoreWorkerMemoryStore(
[this](const RayObject &object, const ObjectID &object_id) {
PutObjectIntoPlasma(object, object_id);
@@ -914,7 +914,8 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object,
}
Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results) {
std::vector<std::shared_ptr<RayObject>> *results,
bool plasma_objects_only) {
results->resize(ids.size(), nullptr);
absl::flat_hash_set<ObjectID> plasma_object_ids;
@@ -924,20 +925,24 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
auto start_time = current_time_ms();
if (!memory_object_ids.empty()) {
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_,
&result_map, &got_exception));
}
// Erase any objects that were promoted to plasma from the results. These get
// requests will be retried at the plasma store.
for (auto it = result_map.begin(); it != result_map.end();) {
auto current = it++;
if (current->second->IsInPlasmaError()) {
RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get";
plasma_object_ids.insert(current->first);
result_map.erase(current);
if (!plasma_objects_only) {
if (!memory_object_ids.empty()) {
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_,
&result_map, &got_exception));
}
// Erase any objects that were promoted to plasma from the results. These get
// requests will be retried at the plasma store.
for (auto it = result_map.begin(); it != result_map.end();) {
auto current = it++;
if (current->second->IsInPlasmaError()) {
RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get";
plasma_object_ids.insert(current->first);
result_map.erase(current);
}
}
} else {
plasma_object_ids = std::move(memory_object_ids);
}
if (!got_exception) {
@@ -1169,6 +1174,14 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca
return local_raylet_client_->SetResource(resource_name, capacity, client_id);
}
Status CoreWorker::ForceSpillObjects(const std::vector<ObjectID> &object_ids) {
return local_raylet_client_->ForceSpillObjects(object_ids);
}
Status CoreWorker::ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids) {
return local_raylet_client_->ForceRestoreSpilledObjects(object_ids);
}
std::unordered_map<std::string, double> AddPlacementGroupConstraint(
const std::unordered_map<std::string, double> &resources,
PlacementGroupID placement_group_id, int64_t bundle_index) {
@@ -2103,6 +2116,44 @@ void CoreWorker::HandleLocalGC(const rpc::LocalGCRequest &request,
}
}
void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (options_.spill_objects != nullptr) {
std::vector<ObjectID> object_ids_to_spill;
object_ids_to_spill.reserve(request.object_ids_to_spill_size());
for (const auto &id_binary : request.object_ids_to_spill()) {
object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary));
}
std::vector<std::string> object_urls = options_.spill_objects(object_ids_to_spill);
for (size_t i = 0; i < object_urls.size(); i++) {
reply->add_spilled_objects_url(std::move(object_urls[i]));
}
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
send_reply_callback(Status::NotImplemented("Spill objects callback not defined"),
nullptr, nullptr);
}
}
void CoreWorker::HandleRestoreSpilledObjects(
const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
if (options_.restore_spilled_objects != nullptr) {
std::vector<std::string> spilled_objects_url;
spilled_objects_url.reserve(request.spilled_objects_url_size());
for (const auto &url : request.spilled_objects_url()) {
spilled_objects_url.push_back(url);
}
options_.restore_spilled_objects(spilled_objects_url);
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
send_reply_callback(
Status::NotImplemented("Restore spilled objects callback not defined"), nullptr,
nullptr);
}
}
void CoreWorker::YieldCurrentFiber(FiberEvent &event) {
RAY_CHECK(worker_context_.CurrentActorIsAsync());
boost::this_fiber::yield();
+27 -1
View File
@@ -105,6 +105,10 @@ struct CoreWorkerOptions {
/// runtime. This is required to free distributed references that may otherwise
/// be held up in garbage objects.
std::function<void()> gc_collect;
/// Application-language callback to spill objects to external storage.
std::function<std::vector<std::string>(const std::vector<ObjectID> &)> spill_objects;
/// Application-language callback to restore objects from external storage.
std::function<void(const std::vector<std::string> &)> restore_spilled_objects;
/// Language worker callback to get the current call stack.
std::function<void(std::string *)> get_lang_stack;
// Function that tries to interrupt the currently running Python thread.
@@ -490,9 +494,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] ids IDs of the objects to get.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[out] results Result list of objects data.
/// \param[in] plasma_objects_only Only get objects from Plasma Store.
/// \return Status.
Status Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results);
std::vector<std::shared_ptr<RayObject>> *results,
bool plasma_objects_only = false);
/// Return whether or not the object store contains the given object.
///
@@ -580,6 +586,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
Status SetResource(const std::string &resource_name, const double capacity,
const ClientID &client_id);
/// Force spilling objects to external storage.
/// \param[in] object_ids The objects to be spilled.
/// \return Status
Status ForceSpillObjects(const std::vector<ObjectID> &object_ids);
/// Restore objects from external storage.
/// \param[in] object_ids The objects to be restored.
/// \return Status
Status ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids);
/// Submit a normal task.
///
/// \param[in] function The remote function to execute.
@@ -809,6 +825,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void HandleLocalGC(const rpc::LocalGCRequest &request, rpc::LocalGCReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
// Spill objects to external storage.
void HandleSpillObjects(const rpc::SpillObjectsRequest &request,
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
// Restore objects from external storage.
void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
///
/// Public methods related to async actor call. This should only be used when
/// the actor is (1) direct actor and (2) using asyncio mode.
@@ -225,6 +225,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
task_execution_callback, // task_execution_callback
nullptr, // check_signals
gc_collect, // gc_collect
nullptr, // spill_objects
nullptr, // restore_spilled_objects
nullptr, // get_lang_stack
nullptr, // kill_main
true, // ref_counting_enabled
@@ -158,6 +158,8 @@ class CoreWorkerTest : public ::testing::Test {
nullptr, // task_execution_callback
nullptr, // check_signals
nullptr, // gc_collect
nullptr, // spill_objects
nullptr, // restore_spilled_objects
nullptr, // get_lang_stack
nullptr, // kill_main
true, // ref_counting_enabled
+2
View File
@@ -54,6 +54,8 @@ class MockWorker {
_7), // task_execution_callback
nullptr, // check_signals
nullptr, // gc_collect
nullptr, // spill_objects
nullptr, // restore_spilled_objects
nullptr, // get_lang_stack
nullptr, // kill_main
true, // ref_counting_enabled
+1
View File
@@ -29,6 +29,7 @@ enum Language {
enum WorkerType {
WORKER = 0;
DRIVER = 1;
IO_WORKER = 2;
}
// Type of a task.
+23
View File
@@ -294,6 +294,24 @@ message PlasmaObjectReadyRequest {
message PlasmaObjectReadyReply {
}
message SpillObjectsRequest {
// The IDs of objects to be spilled.
repeated bytes object_ids_to_spill = 1;
}
message SpillObjectsReply {
// The URLs of spilled objects.
repeated string spilled_objects_url = 1;
}
message RestoreSpilledObjectsRequest {
// The URLs of spilled objects.
repeated string spilled_objects_url = 1;
}
message RestoreSpilledObjectsReply {
}
service CoreWorkerService {
// Push a task directly to this worker from another.
rpc PushTask(PushTaskRequest) returns (PushTaskReply);
@@ -332,6 +350,11 @@ service CoreWorkerService {
rpc WaitForRefRemoved(WaitForRefRemovedRequest) returns (WaitForRefRemovedReply);
// Trigger local GC on the worker.
rpc LocalGC(LocalGCRequest) returns (LocalGCReply);
// Spill objects to external storage. Caller: raylet; callee: I/O worker.
rpc SpillObjects(SpillObjectsRequest) returns (SpillObjectsReply);
// Restore spilled objects from external storage. Caller: raylet; callee: I/O worker.
rpc RestoreSpilledObjects(RestoreSpilledObjectsRequest)
returns (RestoreSpilledObjectsReply);
// Notification from raylet that an object ID is available in local plasma.
rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply);
}
+25 -2
View File
@@ -82,6 +82,12 @@ enum MessageType:int {
SetResourceRequest,
// Subscribe to Plasma updates
SubscribePlasmaReady,
// Manually spill objects to external storage.
ForceSpillObjectsRequest,
ForceSpillObjectsReply,
// Manually restore objects from external storage.
ForceRestoreSpilledObjectsRequest,
ForceRestoreSpilledObjectsReply,
}
table TaskExecutionSpecification {
@@ -126,8 +132,9 @@ table ResourceIdSetInfos {
// This struct is used to register a new worker with the raylet.
// It is shipped as part of raylet_connect.
table RegisterClientRequest {
// True if the client is a worker and false if the client is a driver.
is_worker: bool;
// Type of the worker.
// TODO(suquark): Use `WorkerType` in `common.proto`.
worker_type: int;
// The ID of the worker or driver.
worker_id: string;
// The process ID of this worker.
@@ -292,3 +299,19 @@ table SubscribePlasmaReady {
// ObjectID to wait for
object_id: string;
}
table ForceSpillObjectsRequest {
// List of object IDs to be spilled to external storage.
object_ids: [string];
}
table ForceSpillObjectsReply {
}
table ForceRestoreSpilledObjectsRequest {
// List of object IDs to be restored from external storage.
object_ids: [string];
}
table ForceRestoreSpilledObjectsReply {
}
+140 -5
View File
@@ -507,6 +507,89 @@ void NodeManager::DoLocalGC() {
}
}
void NodeManager::SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill,
std::function<void(const ray::Status &)> callback) {
std::vector<ObjectID> objects_ids;
for (const auto &id : objects_ids_to_spill) {
// Do not spill already spilled objects.
if (spilled_objects_.count(id) == 0) {
objects_ids.push_back(id);
}
}
if (objects_ids.empty()) {
if (callback) {
callback(Status::OK());
}
return;
}
worker_pool_.PopIOWorker(
[this, objects_ids, callback](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending object spilling request";
rpc::SpillObjectsRequest request;
for (const auto &object_id : objects_ids) {
request.add_object_ids_to_spill(object_id.Binary());
}
io_worker->rpc_client()->SpillObjects(
request, [this, objects_ids, callback, io_worker](
const ray::Status &status, const rpc::SpillObjectsReply &r) {
worker_pool_.PushIOWorker(io_worker);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send object spilling request: "
<< status.ToString();
} else {
RAY_CHECK(static_cast<size_t>(r.spilled_objects_url_size()) ==
objects_ids.size());
for (size_t i = 0; i < objects_ids.size(); ++i) {
const ObjectID &object_id = objects_ids[i];
const std::string &object_url = r.spilled_objects_url(i);
// TODO(suquark): write to object directory.
spilled_objects_[object_id] = object_url;
auto search = pinned_objects_.find(object_id);
if (search != pinned_objects_.end()) {
pinned_objects_.erase(search);
} else {
RAY_LOG(ERROR)
<< "The spilled object " << object_id.Hex() << " is not pinned.";
}
}
}
if (callback) {
callback(status);
}
});
});
}
void NodeManager::RestoreSpilledObjects(
const std::vector<ObjectID> &object_ids,
std::function<void(const ray::Status &)> callback) {
std::vector<std::string> object_urls;
object_urls.reserve(object_ids.size());
for (const auto &object_id : object_ids) {
object_urls.push_back(spilled_objects_[object_id]);
}
worker_pool_.PopIOWorker([this, object_urls,
callback](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending restore spilled object request";
rpc::RestoreSpilledObjectsRequest request;
for (const auto &url : object_urls) {
request.add_spilled_objects_url(std::move(url));
}
io_worker->rpc_client()->RestoreSpilledObjects(
request, [this, callback, io_worker](const ray::Status &status,
const rpc::RestoreSpilledObjectsReply &r) {
worker_pool_.PushIOWorker(io_worker);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send restore spilled object request: "
<< status.ToString();
}
if (callback) {
callback(status);
}
});
});
}
// TODO(edoakes): this function is problematic because it both sends warnings spuriously
// under normal conditions and sometimes doesn't send a warning under actual deadlock
// conditions. The current logic is to push a warning when: all running tasks are
@@ -1144,7 +1227,41 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
case protocol::MessageType::SubscribePlasmaReady: {
ProcessSubscribePlasmaReady(client, message_data);
} break;
case protocol::MessageType::ForceSpillObjectsRequest: {
auto message = flatbuffers::GetRoot<protocol::ForceSpillObjectsRequest>(message_data);
std::vector<ObjectID> object_ids = from_flatbuf<ObjectID>(*message->object_ids());
SpillObjects(object_ids, [this, client](const ray::Status &status) {
flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<protocol::ForceSpillObjectsReply> reply =
protocol::CreateForceSpillObjectsReply(fbb);
fbb.Finish(reply);
auto reply_status = client->WriteMessage(
static_cast<int64_t>(protocol::MessageType::ForceSpillObjectsReply),
fbb.GetSize(), fbb.GetBufferPointer());
if (!reply_status.ok()) {
// We failed to write to the client, so disconnect the client.
ProcessDisconnectClientMessage(client);
}
});
} break;
case protocol::MessageType::ForceRestoreSpilledObjectsRequest: {
auto message =
flatbuffers::GetRoot<protocol::ForceRestoreSpilledObjectsRequest>(message_data);
std::vector<ObjectID> object_ids = from_flatbuf<ObjectID>(*message->object_ids());
RestoreSpilledObjects(object_ids, [this, client](const ray::Status &status) {
flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<protocol::ForceRestoreSpilledObjectsReply> reply =
protocol::CreateForceRestoreSpilledObjectsReply(fbb);
fbb.Finish(reply);
auto reply_status = client->WriteMessage(
static_cast<int64_t>(protocol::MessageType::ForceRestoreSpilledObjectsReply),
fbb.GetSize(), fbb.GetBufferPointer());
if (!reply_status.ok()) {
// We failed to write to the client, so disconnect the client.
ProcessDisconnectClientMessage(client);
}
});
} break;
default:
RAY_LOG(FATAL) << "Received unexpected message type " << message_type;
}
@@ -1162,8 +1279,10 @@ void NodeManager::ProcessRegisterClientRequestMessage(
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
pid_t pid = message->worker_pid();
std::string worker_ip_address = string_from_flatbuf(*message->ip_address());
// TODO(suquark): Use `WorkerType` in `common.proto` without type converting.
rpc::WorkerType worker_type = static_cast<rpc::WorkerType>(message->worker_type());
auto worker = std::dynamic_pointer_cast<WorkerInterface>(std::make_shared<Worker>(
worker_id, language, worker_ip_address, client, client_call_manager_));
worker_id, language, worker_type, worker_ip_address, client, client_call_manager_));
auto send_reply_callback = [this, client](int assigned_port) {
flatbuffers::FlatBufferBuilder fbb;
@@ -1187,7 +1306,8 @@ void NodeManager::ProcessRegisterClientRequestMessage(
});
};
if (message->is_worker()) {
if (worker_type == rpc::WorkerType::WORKER ||
worker_type == rpc::WorkerType::IO_WORKER) {
// Register the new worker.
RAY_UNUSED(worker_pool_.RegisterWorker(worker, pid, send_reply_callback));
} else {
@@ -1238,6 +1358,13 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<ClientConnection>
void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &worker) {
RAY_CHECK(worker);
if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) {
// Return the worker to the idle pool.
worker_pool_.PushIOWorker(worker);
return;
}
bool worker_idle = true;
// If the worker was assigned a task, mark it as finished.
@@ -1400,15 +1527,23 @@ void NodeManager::ProcessFetchOrReconstructMessage(
const auto refs =
FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses());
if (message->fetch_only()) {
std::vector<ObjectID> spilled_object_ids;
for (const auto &ref : refs) {
ObjectID object_id = ObjectID::FromBinary(ref.object_id());
// If only a fetch is required, then do not subscribe to the
// dependencies to the task dependency manager.
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
// Fetch the object if it's not already local.
RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address()));
if (spilled_objects_.count(object_id) > 0) {
spilled_object_ids.push_back(object_id);
} else {
// Fetch the object if it's not already local.
RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address()));
}
}
}
if (spilled_object_ids.size() > 0) {
RestoreSpilledObjects(spilled_object_ids);
}
} else {
// The values are needed. Add all requested objects to the list to
// subscribe to in the task dependency manager. These objects will be
+13 -1
View File
@@ -614,6 +614,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Trigger local GC on each worker of this raylet.
void DoLocalGC();
/// Spill objects to external storage.
/// \param objects_ids_to_spill The objects to be spilled.
void SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill,
std::function<void(const ray::Status &)> callback = nullptr);
/// Restore spilled objects from external storage.
/// \param object_ids Objects to be restored.
void RestoreSpilledObjects(const std::vector<ObjectID> &object_ids,
std::function<void(const ray::Status &)> callback = nullptr);
/// Push an error to the driver if this node is full of actors and so we are
/// unable to schedule new tasks or actors at all.
void WarnResourceDeadlock();
@@ -699,7 +709,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// A mapping from actor ID to registration information about that actor
/// (including which node manager owns it).
std::unordered_map<ActorID, ActorRegistration> actor_registry_;
/// A mapping from ObjectIDs to external object URLs for spilled objects.
/// TODO(suquark): Move it into object directory.
absl::flat_hash_map<ObjectID, std::string> spilled_objects_;
/// This map stores actor ID to the ID of the checkpoint that will be used to
/// restore the actor.
std::unordered_map<ActorID, ActorCheckpointID> checkpoint_id_to_restore_;
@@ -62,6 +62,8 @@ class MockWorker : public WorkerInterface {
WorkerID WorkerId() const { return worker_id_; }
rpc::WorkerType GetWorkerType() const { return rpc::WorkerType::WORKER; }
int Port() const { return port_; }
void SetOwnerAddress(const rpc::Address &address) { address_ = address; }
+4 -1
View File
@@ -27,11 +27,12 @@ namespace raylet {
/// A constructor responsible for initializing the state of a worker.
Worker::Worker(const WorkerID &worker_id, const Language &language,
const std::string &ip_address,
rpc::WorkerType worker_type, const std::string &ip_address,
std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager)
: worker_id_(worker_id),
language_(language),
worker_type_(worker_type),
ip_address_(ip_address),
assigned_port_(-1),
port_(-1),
@@ -41,6 +42,8 @@ Worker::Worker(const WorkerID &worker_id, const Language &language,
client_call_manager_(client_call_manager),
is_detached_actor_(false) {}
rpc::WorkerType Worker::GetWorkerType() const { return worker_type_; }
void Worker::MarkDead() { dead_ = true; }
bool Worker::IsDead() const { return dead_; }
+5 -1
View File
@@ -37,6 +37,7 @@ class WorkerInterface {
public:
/// A destructor responsible for freeing all worker state.
virtual ~WorkerInterface() {}
virtual rpc::WorkerType GetWorkerType() const = 0;
virtual void MarkDead() = 0;
virtual bool IsDead() const = 0;
virtual void MarkBlocked() = 0;
@@ -117,11 +118,12 @@ class Worker : public WorkerInterface {
public:
/// A constructor that initializes a worker object.
/// NOTE: You MUST manually set the worker process.
Worker(const WorkerID &worker_id, const Language &language,
Worker(const WorkerID &worker_id, const Language &language, rpc::WorkerType worker_type,
const std::string &ip_address, std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager);
/// A destructor responsible for freeing all worker state.
~Worker() {}
rpc::WorkerType GetWorkerType() const;
void MarkDead();
bool IsDead() const;
void MarkBlocked();
@@ -215,6 +217,8 @@ class Worker : public WorkerInterface {
Process proc_;
/// The language type of this worker.
Language language_;
/// The type of the worker.
rpc::WorkerType worker_type_;
/// IP address of this worker.
std::string ip_address_;
/// Port assigned to this worker by the raylet. If this is 0, the actual
+95 -12
View File
@@ -131,7 +131,7 @@ void WorkerPool::Start(int num_workers) {
int num_worker_processes = static_cast<int>(
std::ceil(static_cast<double>(num_workers) / state.num_workers_per_process));
for (int i = 0; i < num_worker_processes; i++) {
StartWorkerProcess(entry.first, JobID::Nil());
StartWorkerProcess(entry.first, ray::rpc::WorkerType::WORKER, JobID::Nil());
}
}
}
@@ -165,7 +165,9 @@ uint32_t WorkerPool::Size(const Language &language) const {
}
}
Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &job_id,
Process WorkerPool::StartWorkerProcess(const Language &language,
const rpc::WorkerType worker_type,
const JobID &job_id,
std::vector<std::string> dynamic_options) {
rpc::JobConfig *job_config = nullptr;
if (RayConfig::instance().enable_multi_tenancy()) {
@@ -186,6 +188,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo
for (auto &entry : state.starting_worker_processes) {
starting_workers += entry.second;
}
// Here we consider both task workers and I/O workers.
if (starting_workers >= maximum_startup_concurrency_) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << "Worker not started, " << starting_workers
@@ -271,6 +275,13 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo
RAY_CHECK(worker_raylet_config_placeholder_found)
<< "The " << kWorkerRayletConfigPlaceholder
<< " placeholder is not found in worker command.";
} else if (language == Language::PYTHON) {
RAY_CHECK(worker_type == rpc::WorkerType::WORKER ||
worker_type == rpc::WorkerType::IO_WORKER);
if (worker_type == rpc::WorkerType::IO_WORKER) {
// Without "--worker-type", by default the worker type is rpc::WorkerType::WORKER.
worker_command_args.push_back("--worker-type=" + rpc::WorkerType_Name(worker_type));
}
}
std::map<std::string, std::string> env;
@@ -286,19 +297,23 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo
}
RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language);
MonitorStartingWorkerProcess(proc, language, worker_type);
state.starting_worker_processes.emplace(proc, workers_to_start);
if (worker_type == rpc::WorkerType::IO_WORKER) {
state.num_starting_io_workers++;
}
return proc;
}
void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
const Language &language) {
const Language &language,
const rpc::WorkerType worker_type) {
auto timer = std::make_shared<boost::asio::deadline_timer>(
*io_service_, boost::posix_time::seconds(
RayConfig::instance().worker_register_timeout_seconds()));
// Capture timer in lambda to copy it once, so that it can avoid destructing timer.
timer->async_wait(
[timer, language, proc, this](const boost::system::error_code e) -> void {
[timer, language, proc, worker_type, this](const boost::system::error_code e) {
// check the error code.
auto &state = this->GetStateForLanguage(language);
// Since this process times out to start, remove it from starting_worker_processes
@@ -308,6 +323,12 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not registered to raylet within timeout.";
state.starting_worker_processes.erase(it);
if (worker_type == rpc::WorkerType::IO_WORKER) {
// Mark the I/O worker as failed.
state.num_starting_io_workers--;
}
// We may have places to start more workers now.
TryStartIOWorkers(language, state);
starting_worker_timeout_callback_();
}
});
@@ -397,16 +418,23 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
status = Status::Invalid("Unknown worker");
} else {
RAY_RETURN_NOT_OK(GetNextFreePort(&port));
RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port;
RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType());
worker->SetAssignedPort(port);
worker->SetProcess(it->first);
it->second--;
if (it->second == 0) {
state.starting_worker_processes.erase(it);
// We may have slots to start more workers now.
TryStartIOWorkers(worker->GetLanguage(), state);
}
RAY_CHECK(worker->GetProcess().GetId() == pid);
state.registered_workers.insert(worker);
if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) {
state.registered_io_workers.insert(worker);
state.num_starting_io_workers--;
}
if (RayConfig::instance().enable_multi_tenancy()) {
auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid);
@@ -468,7 +496,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
delay_callback = true;
// Start initial Python workers for the first job.
for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) {
StartWorkerProcess(Language::PYTHON, job_id);
StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id);
}
}
}
@@ -509,12 +537,39 @@ std::shared_ptr<WorkerInterface> WorkerPool::GetRegisteredDriver(
return nullptr;
}
void WorkerPool::PushIOWorker(const std::shared_ptr<WorkerInterface> &worker) {
auto &state = GetStateForLanguage(worker->GetLanguage());
RAY_CHECK(worker->GetWorkerType() == rpc::WorkerType::IO_WORKER);
RAY_LOG(DEBUG) << "Pushing an IO worker to the worker pool.";
if (state.pending_io_tasks.empty()) {
state.idle_io_workers.push(worker);
} else {
auto callback = state.pending_io_tasks.front();
state.pending_io_tasks.pop();
callback(worker);
}
}
void WorkerPool::PopIOWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) {
auto &state = GetStateForLanguage(Language::PYTHON);
if (state.idle_io_workers.empty()) {
// We must fill the pending task first, because 'TryStartIOWorkers' will
// start I/O workers according to the number of pending tasks.
state.pending_io_tasks.push(callback);
TryStartIOWorkers(Language::PYTHON, state);
} else {
auto io_worker = state.idle_io_workers.front();
state.idle_io_workers.pop();
callback(io_worker);
}
}
void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
// Since the worker is now idle, unset its assigned task ID.
RAY_CHECK(worker->GetAssignedTaskId().IsNil())
<< "Idle workers cannot have an assigned task ID";
auto &state = GetStateForLanguage(worker->GetLanguage());
auto it = state.dedicated_workers_to_tasks.find(worker->GetProcess());
if (it != state.dedicated_workers_to_tasks.end()) {
// The worker is used for the actor creation task with dynamic options.
@@ -553,8 +608,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
} else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) {
// We are not pending a registration from a worker for this task,
// so start a new worker process for this task.
proc = StartWorkerProcess(task_spec.GetLanguage(), task_spec.JobId(),
task_spec.DynamicWorkerOptions());
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId(), task_spec.DynamicWorkerOptions());
if (proc.IsValid()) {
state.dedicated_workers_to_tasks[proc] = task_spec.TaskId();
state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc;
@@ -569,7 +624,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
} else {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), JobID::Nil());
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
JobID::Nil());
}
} else {
// Find an available worker which is already assigned to this job.
@@ -584,7 +640,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
if (worker == nullptr) {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), task_spec.JobId());
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
}
}
} else {
@@ -715,6 +772,32 @@ bool WorkerPool::HasPendingWorkerForTask(const Language &language,
return it != state.tasks_to_dedicated_workers.end();
}
void WorkerPool::TryStartIOWorkers(const Language &language, State &state) {
if (language != Language::PYTHON) {
return;
}
int available_io_workers_num =
state.num_starting_io_workers + state.registered_io_workers.size();
int max_workers_to_start =
RayConfig::instance().max_io_workers() - available_io_workers_num;
// Compare first to prevent unsigned underflow.
if (state.pending_io_tasks.size() > state.idle_io_workers.size()) {
int expected_workers_num =
state.pending_io_tasks.size() - state.idle_io_workers.size();
if (expected_workers_num > max_workers_to_start) {
expected_workers_num = max_workers_to_start;
}
for (; expected_workers_num > 0; expected_workers_num--) {
Process proc = StartWorkerProcess(ray::Language::PYTHON,
ray::rpc::WorkerType::IO_WORKER, JobID::Nil());
if (!proc.IsValid()) {
// We may hit the maximum worker start up concurrency limit. Stop.
return;
}
}
}
}
std::string WorkerPool::DebugString() const {
std::stringstream result;
result << "WorkerPool:";
+34 -2
View File
@@ -162,6 +162,19 @@ class WorkerPool : public WorkerPoolInterface {
/// \param The driver to disconnect. The driver must be registered.
void DisconnectDriver(const std::shared_ptr<WorkerInterface> &driver);
/// Add an idle I/O worker to the pool.
///
/// \param worker The idle I/O worker to add.
void PushIOWorker(const std::shared_ptr<WorkerInterface> &worker);
/// Pop an idle I/O worker from the pool and trigger a callback when
/// an I/O worker is available.
/// The caller is responsible for pushing the worker back onto the
/// pool once the worker has completed its work.
///
/// \param callback The callback that returns an available I/O worker.
void PopIOWorker(std::function<void(std::shared_ptr<WorkerInterface>)> callback);
/// Add an idle worker to the pool.
///
/// \param The idle worker to add.
@@ -229,11 +242,13 @@ class WorkerPool : public WorkerPoolInterface {
/// any workers.
///
/// \param language Which language this worker process should be.
/// \param worker_type The type of the worker.
/// \param job_id The ID of the job to which the started worker process belongs.
/// \param dynamic_options The dynamic options that we should add for worker command.
/// \return The id of the process that we started if it's positive,
/// otherwise it means we didn't start a process.
Process StartWorkerProcess(const Language &language, const JobID &job_id,
Process StartWorkerProcess(const Language &language, const rpc::WorkerType worker_type,
const JobID &job_id,
std::vector<std::string> dynamic_options = {});
/// The implementation of how to start a new worker process with command arguments.
@@ -263,6 +278,15 @@ class WorkerPool : public WorkerPoolInterface {
std::unordered_set<std::shared_ptr<WorkerInterface>> idle;
/// The pool of idle actor workers.
std::unordered_map<ActorID, std::shared_ptr<WorkerInterface>> idle_actor;
/// The pool of idle I/O workers.
std::queue<std::shared_ptr<WorkerInterface>> idle_io_workers;
/// The queue of pending I/O tasks.
std::queue<std::function<void(std::shared_ptr<WorkerInterface>)>> pending_io_tasks;
/// All I/O workers that have registered and are still connected, including both
/// idle and executing.
std::unordered_set<std::shared_ptr<WorkerInterface>> registered_io_workers;
/// Number of starting I/O workers.
int num_starting_io_workers = 0;
/// All workers that have registered and are still connected, including both
/// idle and executing.
std::unordered_set<std::shared_ptr<WorkerInterface>> registered_workers;
@@ -306,7 +330,8 @@ class WorkerPool : public WorkerPoolInterface {
/// (due to worker process crash or any other reasons), remove them
/// from `starting_worker_processes`. Otherwise if we'll mistakenly
/// think there are unregistered workers, and won't start new workers.
void MonitorStartingWorkerProcess(const Process &proc, const Language &language);
void MonitorStartingWorkerProcess(const Process &proc, const Language &language,
const rpc::WorkerType worker_type);
/// Get the next unallocated port in the free ports list. If a port range isn't
/// configured, returns 0.
@@ -320,6 +345,13 @@ class WorkerPool : public WorkerPoolInterface {
/// \param[in] port The port to mark as free.
void MarkPortAsFree(int port);
/// Try start all I/O workers waiting to be started.
/// \param language The language of the I/O worker. Currently only Python I/O
/// workers are effective.
/// \param state The state including the number of I/O workers waiting to be
/// started.
void TryStartIOWorkers(const Language &language, State &state);
/// For Process class for managing subprocesses (e.g. reaping zombies).
boost::asio::io_service *io_service_;
/// The maximum number of worker processes that can be started concurrently.
+6 -4
View File
@@ -119,7 +119,8 @@ class WorkerPoolTest : public ::testing::TestWithParam<bool> {
ClientConnection::Create(client_handler, message_handler, std::move(socket),
"worker", {}, error_message_type_);
std::shared_ptr<Worker> worker_ = std::make_shared<Worker>(
WorkerID::FromRandom(), language, "127.0.0.1", client, client_call_manager_);
WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", client,
client_call_manager_);
std::shared_ptr<WorkerInterface> worker =
std::dynamic_pointer_cast<WorkerInterface>(worker_);
worker->AssignJobId(job_id);
@@ -155,7 +156,7 @@ class WorkerPoolTest : public ::testing::TestWithParam<bool> {
static_cast<int>(desired_initial_worker_process_count));
Process last_started_worker_process;
for (int i = 0; i < desired_initial_worker_process_count; i++) {
worker_pool_->StartWorkerProcess(language, JOB_ID);
worker_pool_->StartWorkerProcess(language, rpc::WorkerType::WORKER, JOB_ID);
ASSERT_TRUE(worker_pool_->NumWorkerProcessesStarting() <=
expected_worker_process_count);
Process prev = worker_pool_->LastStartedWorkerProcess();
@@ -233,7 +234,8 @@ TEST_P(WorkerPoolTest, CompareWorkerProcessObjects) {
}
TEST_P(WorkerPoolTest, HandleWorkerRegistration) {
Process proc = worker_pool_->StartWorkerProcess(Language::JAVA, JOB_ID);
Process proc =
worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID);
std::vector<std::shared_ptr<WorkerInterface>> workers;
for (int i = 0; i < NUM_WORKERS_PER_PROCESS_JAVA; i++) {
workers.push_back(CreateWorker(Process(), Language::JAVA));
@@ -361,7 +363,7 @@ TEST_P(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) {
TaskSpecification task_spec = ExampleTaskSpec(
ActorID::Nil(), Language::JAVA, JOB_ID,
ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1), {"test_op_0", "test_op_1"});
worker_pool_->StartWorkerProcess(Language::JAVA, JOB_ID,
worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID,
task_spec.DynamicWorkerOptions());
const auto real_command =
worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess());
+39 -5
View File
@@ -80,9 +80,9 @@ raylet::RayletClient::RayletClient(
raylet::RayletClient::RayletClient(
boost::asio::io_service &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id, bool is_worker,
const JobID &job_id, const Language &language, const std::string &ip_address,
ClientID *raylet_id, int *port,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *internal_config,
const std::string &job_config)
: grpc_client_(std::move(grpc_client)),
@@ -94,9 +94,11 @@ raylet::RayletClient::RayletClient(
new raylet::RayletConnection(io_service, raylet_socket, -1, -1));
flatbuffers::FlatBufferBuilder fbb;
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
auto message = protocol::CreateRegisterClientRequest(
fbb, is_worker, to_flatbuf(fbb, worker_id), getpid(), to_flatbuf(fbb, job_id),
language, fbb.CreateString(ip_address), /*port=*/0, fbb.CreateString(job_config_));
fbb, static_cast<int>(worker_type), to_flatbuf(fbb, worker_id), getpid(),
to_flatbuf(fbb, job_id), language, fbb.CreateString(ip_address), /*port=*/0,
fbb.CreateString(job_config_));
fbb.Finish(message);
// Register the process ID with the raylet.
// NOTE(swang): If raylet exits and we are registered as a worker, we will get killed.
@@ -308,6 +310,38 @@ void raylet::RayletClient::RequestWorkerLease(
grpc_client_->RequestWorkerLease(request, callback);
}
/// Spill objects to external storage.
/// \param object_ids The IDs of objects to be spilled.
Status raylet::RayletClient::ForceSpillObjects(const std::vector<ObjectID> &object_ids) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateForceSpillObjectsRequest(fbb, to_flatbuf(fbb, object_ids));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::ForceSpillObjectsRequest,
MessageType::ForceSpillObjectsReply, &reply,
&fbb));
RAY_UNUSED(flatbuffers::GetRoot<protocol::ForceSpillObjectsReply>(reply.data()));
return Status::OK();
}
/// Restore spilled objects from external storage.
/// \param object_ids The IDs of objects to be restored.
Status raylet::RayletClient::ForceRestoreSpilledObjects(
const std::vector<ObjectID> &object_ids) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateForceRestoreSpilledObjectsRequest(fbb, to_flatbuf(fbb, object_ids));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(
MessageType::ForceRestoreSpilledObjectsRequest,
MessageType::ForceRestoreSpilledObjectsReply, &reply, &fbb));
RAY_UNUSED(
flatbuffers::GetRoot<protocol::ForceRestoreSpilledObjectsReply>(reply.data()));
return Status::OK();
}
Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) {
rpc::ReturnWorkerRequest request;
+13 -2
View File
@@ -23,6 +23,7 @@
#include "ray/common/status.h"
#include "ray/common/task/task_spec.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "src/ray/protobuf/common.pb.h"
#include "src/ray/protobuf/gcs.pb.h"
using ray::ActorCheckpointID;
@@ -162,7 +163,7 @@ class RayletClient : public PinObjectsInterface,
/// \param grpc_client gRPC client to the raylet.
/// \param raylet_socket The name of the socket to use to connect to the raylet.
/// \param worker_id A unique ID to represent the worker.
/// \param is_worker Whether this client is a worker. If it is a worker, an
/// \param worker_type The type of the worker. If it is a certain worker type, an
/// additional message will be sent to register as one.
/// \param job_id The ID of the driver. This is non-nil if the client is a driver.
/// \param language Language of the worker.
@@ -175,7 +176,7 @@ class RayletClient : public PinObjectsInterface,
RayletClient(boost::asio::io_service &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
bool is_worker, const JobID &job_id, const Language &language,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *internal_config,
const std::string &job_config);
@@ -318,6 +319,16 @@ class RayletClient : public PinObjectsInterface,
ray::Status SetResource(const std::string &resource_name, const double capacity,
const ray::ClientID &client_Id);
/// Spill objects to external storage.
/// \param object_ids The IDs of objects to be spilled.
/// \return ray::Status
ray::Status ForceSpillObjects(const std::vector<ObjectID> &object_ids);
/// Restore spilled objects from external storage.
/// \param object_ids The IDs of objects to be restored.
/// \return ray::Status
ray::Status ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids);
/// Implements WorkerLeaseInterface.
void RequestWorkerLease(
const ray::TaskSpecification &resource_spec,
+11
View File
@@ -175,6 +175,13 @@ class CoreWorkerClientInterface {
const ClientCallback<WaitForRefRemovedReply> &callback) {
}
virtual void SpillObjects(const SpillObjectsRequest &request,
const ClientCallback<SpillObjectsReply> &callback) {}
virtual void RestoreSpilledObjects(
const RestoreSpilledObjectsRequest &request,
const ClientCallback<RestoreSpilledObjectsReply> &callback) {}
virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request,
const ClientCallback<PlasmaObjectReadyReply> &callback) {
}
@@ -231,6 +238,10 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
VOID_RPC_CLIENT_METHOD(CoreWorkerService, WaitForRefRemoved, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, SpillObjects, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, RestoreSpilledObjects, grpc_client_, override)
VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override)
void PushActorTask(std::unique_ptr<PushTaskRequest> request, bool skip_queue,
+4
View File
@@ -41,6 +41,8 @@ namespace rpc {
RPC_SERVICE_HANDLER(CoreWorkerService, RemoteCancelTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \
RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \
RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady)
#define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \
@@ -58,6 +60,8 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RemoteCancelTask) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetCoreWorkerStats) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady)
/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.