diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ce21a88ec..abea24000 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -54,6 +54,7 @@ from ray.includes.common cimport ( CTaskType, CPlacementStrategy, CRayFunction, + CWorkerType, move, LANGUAGE_CPP, LANGUAGE_JAVA, @@ -631,6 +632,46 @@ cdef void restore_spilled_objects_handler( job_id=None) +cdef void delete_spilled_objects_handler( + const c_vector[c_string]& object_urls, + CWorkerType worker_type) nogil: + with gil: + urls = [] + size = object_urls.size() + for i in range(size): + urls.append(object_urls[i]) + try: + # Get proctitle. + if worker_type == WORKER_TYPE_SPILL_WORKER: + original_proctitle = ( + ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE) + proctitle = ( + ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_DELETE) + elif worker_type == WORKER_TYPE_RESTORE_WORKER: + original_proctitle = ( + ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_IDLE) + proctitle = ( + ray_constants.WORKER_PROCESS_TYPE_RESTORE_WORKER_DELETE) + else: + assert False, ("This line shouldn't be reachable.") + + # Delete objects. + with ray.worker._changeproctitle( + proctitle, + original_proctitle): + external_storage.delete_spilled_objects(urls) + except Exception: + exception_str = ( + "An unexpected internal error occurred while the IO worker " + "was deleting spilled objects.") + logger.exception(exception_str) + ray.utils.push_error_to_driver( + ray.worker.global_worker, + "delete_spilled_objects_error", + traceback.format_exc() + exception_str, + job_id=None) + + # This function introduces ~2-7us of overhead per call (i.e., it can be called # up to hundreds of thousands of times per second). cdef void get_py_stack(c_string* stack_out) nogil: @@ -739,6 +780,7 @@ cdef class CoreWorker: options.gc_collect = gc_collect options.spill_objects = spill_objects_handler options.restore_spilled_objects = restore_spilled_objects_handler + options.delete_spilled_objects = delete_spilled_objects_handler options.get_lang_stack = get_py_stack options.ref_counting_enabled = True options.is_local_mode = local_mode @@ -1473,6 +1515,10 @@ cdef class CoreWorker: def force_spill_objects(self, object_refs): cdef c_vector[CObjectID] object_ids object_ids = ObjectRefsToVector(object_refs) + assert not RayConfig.instance().automatic_object_deletion_enabled(), ( + "Automatic object deletion is not supported for" + "force_spill_objects yet. Please set" + "automatic_object_deletion_enabled: False in Ray's system config.") with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() .SpillObjects(object_ids)) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 8011f023c..726065277 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -165,6 +165,14 @@ class ExternalStorage(metaclass=abc.ABCMeta): url_with_offset_list: List of url_with_offset. """ + @abc.abstractmethod + def delete_spilled_objects(self, urls: List[str]): + """Delete objects that are spilled to the external storage. + + Args: + urls: URLs that store spilled object files. + """ + class NullStorage(ExternalStorage): """The class that represents an uninitialized external storage.""" @@ -175,6 +183,9 @@ class NullStorage(ExternalStorage): def restore_spilled_objects(self, object_refs, url_with_offset_list): raise NotImplementedError("External storage is not initialized") + def delete_spilled_objects(self, urls: List[str]): + raise NotImplementedError("External storage is not initialized") + class FileSystemStorage(ExternalStorage): """The class for filesystem-like external storage. @@ -221,6 +232,11 @@ class FileSystemStorage(ExternalStorage): # read remaining data to our buffer self._put_object_to_store(metadata, buf_len, f, object_ref) + def delete_spilled_objects(self, urls: List[str]): + for url in urls: + filename = parse_url_with_offset(url.decode()).base_url + os.remove(os.path.join(self.directory_path, filename)) + class ExternalStorageSmartOpenImpl(ExternalStorage): """The external storage class implemented by smart_open. @@ -303,6 +319,9 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): # read remaining data to our buffer self._put_object_to_store(metadata, buf_len, f, object_ref) + def delete_spilled_objects(self, urls: List[str]): + pass + _external_storage = NullStorage() @@ -350,3 +369,12 @@ def restore_spilled_objects(object_refs: List[ObjectRef], """ _external_storage.restore_spilled_objects(object_refs, url_with_offset_list) + + +def delete_spilled_objects(urls: List[str]): + """Delete objects that are spilled to the external storage. + + Args: + urls: URLs that store spilled object files. + """ + _external_storage.delete_spilled_objects(urls) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 7276ce43f..c7647ad49 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -231,6 +231,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (void( const c_vector[CObjectID] &, const c_vector[c_string] &) nogil) restore_spilled_objects + (void( + const c_vector[c_string]&, + CWorkerType) nogil) delete_spilled_objects (void(c_string *stack_out) nogil) get_lang_stack c_bool ref_counting_enabled c_bool is_local_mode diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index bf48176f8..1a1122136 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -66,3 +66,5 @@ cdef extern from "ray/common/ray_config.h" nogil: uint64_t metrics_report_interval_ms() const c_bool enable_timeline() const + + c_bool automatic_object_deletion_enabled() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index e50af1a3c..7db833cbc 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -115,3 +115,7 @@ cdef class Config: @staticmethod def enable_timeline(): return RayConfig.instance().enable_timeline() + + @staticmethod + def automatic_object_deletion_enabled(): + return RayConfig.instance().automatic_object_deletion_enabled() diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a7e08112b..12e429ff5 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -187,6 +187,10 @@ WORKER_PROCESS_TYPE_SPILL_WORKER = ( f"ray::SPILL_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}") WORKER_PROCESS_TYPE_RESTORE_WORKER = ( f"ray::RESTORE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}") +WORKER_PROCESS_TYPE_SPILL_WORKER_DELETE = ( + f"ray::DELETE_{WORKER_PROCESS_TYPE_SPILL_WORKER_NAME}") +WORKER_PROCESS_TYPE_RESTORE_WORKER_DELETE = ( + f"ray::DELETE_{WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME}") LOG_MONITOR_MAX_OPEN_FILES = 200 diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 883a960ab..7a0a76c9d 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -1,5 +1,6 @@ import copy import json +import os import random import platform import sys @@ -11,6 +12,7 @@ import psutil import ray from ray.external_storage import (create_url_with_offset, parse_url_with_offset) +from ray.test_utils import wait_for_condition bucket_name = "object-spilling-test" spill_local_path = "/tmp/spill" @@ -57,6 +59,7 @@ def test_sample_benchmark(object_spilling_config, shutdown_only): "object_store_full_max_retries": 0, "max_io_workers": max_io_workers, "object_spilling_config": object_spilling_config, + "automatic_object_deletion_enabled": False, }) arr = np.random.rand(object_size) replay_buffer = [] @@ -134,6 +137,7 @@ def test_spill_objects_manually(object_spilling_config, shutdown_only): "max_io_workers": 4, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, + "automatic_object_deletion_enabled": False, }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] @@ -195,6 +199,7 @@ def test_spill_objects_manually_from_workers(object_spilling_config, "max_io_workers": 4, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, + "automatic_object_deletion_enabled": False, }) @ray.remote @@ -226,6 +231,7 @@ def test_spill_objects_manually_with_workers(object_spilling_config, "max_io_workers": 4, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, + "automatic_object_deletion_enabled": False, }) arrays = [np.random.rand(100 * 1024) for _ in range(50)] objects = [ray.put(arr) for arr in arrays] @@ -396,6 +402,246 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") +def test_delete_objects(tmp_path, shutdown_only): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 4, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + }) + arr = np.random.rand(1024 * 1024) # 8 MB data + replay_buffer = [] + + for _ in range(80): + ref = None + while ref is None: + ref = ray.put(arr) + replay_buffer.append(ref) + + print("-----------------------------------") + + def is_dir_empty(): + num_files = 0 + for path in temp_folder.iterdir(): + num_files += 1 + return num_files == 0 + + del replay_buffer + del ref + wait_for_condition(is_dir_empty) + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_delete_objects_delete_while_creating(tmp_path, shutdown_only): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 4, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + }) + arr = np.random.rand(1024 * 1024) # 8 MB data + replay_buffer = [] + + for _ in range(80): + ref = None + while ref is None: + ref = ray.put(arr) + replay_buffer.append(ref) + # Remove the replay buffer with 60% probability. + if random.randint(0, 9) < 6: + replay_buffer.pop() + + # Do random sampling. + for _ in range(200): + ref = random.choice(replay_buffer) + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, arr) + + def is_dir_empty(): + num_files = 0 + for path in temp_folder.iterdir(): + num_files += 1 + return num_files == 0 + + # After all, make sure all objects are killed without race condition. + del replay_buffer + del ref + wait_for_condition(is_dir_empty, timeout=1000) + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_delete_objects_on_worker_failure(tmp_path, shutdown_only): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 4, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + "min_spilling_size": 0, + }) + + arr = np.random.rand(1024 * 1024) # 8 MB data + + @ray.remote + class Actor: + def __init__(self): + self.replay_buffer = [] + + def get_pid(self): + return os.getpid() + + def create_objects(self): + for _ in range(80): + ref = None + while ref is None: + ref = ray.put(arr) + self.replay_buffer.append(ref) + # Remove the replay buffer with 60% probability. + if random.randint(0, 9) < 6: + self.replay_buffer.pop() + + # Do random sampling. + for _ in range(200): + ref = random.choice(self.replay_buffer) + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, arr) + + a = Actor.remote() + actor_pid = ray.get(a.get_pid.remote()) + ray.get(a.create_objects.remote()) + os.kill(actor_pid, 9) + + def wait_until_actor_dead(): + try: + ray.get(a.get_pid.remote()) + except ray.exceptions.RayActorError: + return True + return False + + wait_for_condition(wait_until_actor_dead) + + def is_dir_empty(): + num_files = 0 + for path in temp_folder.iterdir(): + num_files += 1 + return num_files == 0 + + # After all, make sure all objects are deleted upon worker failures. + wait_for_condition(is_dir_empty, timeout=1000) + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_delete_objects_multi_node(tmp_path, ray_start_cluster): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + cluster = ray_start_cluster + # Head node. + cluster.add_node( + num_cpus=1, + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 2, + "automatic_object_spilling_enabled": True, + "object_store_full_max_retries": 4, + "object_store_full_initial_delay_ms": 100, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + }) + # Add 2 worker nodes. + for _ in range(2): + cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024) + ray.init(address=cluster.address) + + arr = np.random.rand(1024 * 1024) # 8 MB data + + @ray.remote(num_cpus=1) + class Actor: + def __init__(self): + self.replay_buffer = [] + + def ping(self): + return + + def create_objects(self): + for _ in range(80): + ref = None + while ref is None: + ref = ray.put(arr) + self.replay_buffer.append(ref) + # Remove the replay buffer with 60% probability. + if random.randint(0, 9) < 6: + self.replay_buffer.pop() + + # Do random sampling. + for _ in range(200): + ref = random.choice(self.replay_buffer) + sample = ray.get(ref, timeout=0) + assert np.array_equal(sample, arr) + + actors = [Actor.remote() for _ in range(3)] + ray.get([actor.create_objects.remote() for actor in actors]) + + def wait_until_actor_dead(actor): + try: + ray.get(actor.ping.remote()) + except ray.exceptions.RayActorError: + return True + return False + + def is_dir_empty(): + num_files = 0 + for path in temp_folder.iterdir(): + num_files += 1 + return num_files == 0 + + # Kill actors to remove all references. + for actor in actors: + ray.kill(actor) + wait_for_condition(lambda: wait_until_actor_dead(actor)) + # The multi node deletion should work. + wait_for_condition(is_dir_empty) + + def test_fusion_objects(tmp_path, shutdown_only): # Limit our object store to 75 MiB of memory. temp_folder = tmp_path / "spill" diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f74e8323e..11770be3a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -351,3 +351,8 @@ RAY_CONFIG(int, max_io_workers, 1) /// The minimum object size that can be spilled by each spill operation. 100 MB by /// default. This value is not recommended to set beyond --object-store-memory. RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024) + +/// Whether to enable automatic object deletion when refs are gone out of scope. +/// When it is true, manual (force) spilling is not available. +/// TODO(sang): Fix it. +RAY_CONFIG(bool, automatic_object_deletion_enabled, true) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 109440a5c..96ffaf944 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2358,6 +2358,24 @@ void CoreWorker::HandleRestoreSpilledObjects( } } +void CoreWorker::HandleDeleteSpilledObjects( + const rpc::DeleteSpilledObjectsRequest &request, + rpc::DeleteSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (options_.delete_spilled_objects != nullptr) { + std::vector spilled_objects_url; + spilled_objects_url.reserve(request.spilled_objects_url_size()); + for (const auto &url : request.spilled_objects_url()) { + spilled_objects_url.push_back(url); + } + options_.delete_spilled_objects(spilled_objects_url, worker_context_.GetWorkerType()); + send_reply_callback(Status::OK(), nullptr, nullptr); + } else { + send_reply_callback( + Status::NotImplemented("Delete spilled objects callback not defined"), nullptr, + nullptr); + } +} + void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) { send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 057060ea8..e419adfd1 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -81,6 +81,7 @@ struct CoreWorkerOptions { gc_collect(nullptr), spill_objects(nullptr), restore_spilled_objects(nullptr), + delete_spilled_objects(nullptr), get_lang_stack(nullptr), kill_main(nullptr), ref_counting_enabled(false), @@ -140,6 +141,9 @@ struct CoreWorkerOptions { /// Application-language callback to restore objects from external storage. std::function &, const std::vector &)> restore_spilled_objects; + /// Application-language callback to delete objects from external storage. + std::function &, rpc::WorkerType)> + delete_spilled_objects; /// Language worker callback to get the current call stack. std::function get_lang_stack; // Function that tries to interrupt the currently running Python thread. @@ -874,6 +878,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) override; + // Delete objects from external storage. + void HandleDeleteSpilledObjects(const rpc::DeleteSpilledObjectsRequest &request, + rpc::DeleteSpilledObjectsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + // Make the this worker exit. void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) override; diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index b5463b2e8..c95a58106 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -317,6 +317,14 @@ message RestoreSpilledObjectsRequest { message RestoreSpilledObjectsReply { } +message DeleteSpilledObjectsRequest { + // The URLs of spilled objects. + repeated string spilled_objects_url = 1; +} + +message DeleteSpilledObjectsReply { +} + message ExitRequest { } @@ -366,6 +374,9 @@ service CoreWorkerService { // Restore spilled objects from external storage. Caller: raylet; callee: I/O worker. rpc RestoreSpilledObjects(RestoreSpilledObjectsRequest) returns (RestoreSpilledObjectsReply); + // Delete spilled objects from external storage. Caller: raylet; callee: I/O worker. + rpc DeleteSpilledObjects(DeleteSpilledObjectsRequest) + returns (DeleteSpilledObjectsReply); // Notification from raylet that an object ID is available in local plasma. rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); // Request for a worker to exit. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 0440276b2..8c9a1ec37 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "ray/raylet/local_object_manager.h" +#include "ray/util/asio_util.h" +#include "ray/util/util.h" namespace ray { @@ -21,6 +23,7 @@ namespace raylet { void LocalObjectManager::PinObjects(const std::vector &object_ids, std::vector> &&objects) { absl::MutexLock lock(&mutex_); + RAY_CHECK(object_pinning_enabled_); for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; auto &object = objects[i]; @@ -57,9 +60,17 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, } void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { - { + // object_pinning_enabled_ flag is off when the --lru-evict flag is on. + if (object_pinning_enabled_) { absl::MutexLock lock(&mutex_); RAY_LOG(DEBUG) << "Unpinning object " << object_id; + // The object should be in one of these stats. pinned, spilling, or spilled. + RAY_CHECK((pinned_objects_.count(object_id) > 0) || + (spilled_objects_url_.count(object_id) > 0) || + (objects_pending_spill_.count(object_id) > 0)); + if (automatic_object_deletion_enabled_) { + spilled_object_pending_delete_.push(object_id); + } pinned_objects_.erase(object_id); } @@ -79,6 +90,10 @@ void LocalObjectManager::FlushFreeObjects() { on_objects_freed_(objects_to_free_); objects_to_free_.clear(); } + if (object_pinning_enabled_ && automatic_object_deletion_enabled_) { + // Deletion wouldn't work when the object pinning is not enabled. + ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_); + } last_free_objects_at_ms_ = current_time_ms(); } @@ -92,6 +107,7 @@ void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) { int64_t LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill, int64_t min_bytes_to_spill) { RAY_CHECK(num_bytes_to_spill >= min_bytes_to_spill); + if (RayConfig::instance().object_spilling_config().empty() || !RayConfig::instance().automatic_object_spilling_enabled()) { return min_bytes_to_spill; @@ -180,8 +196,8 @@ void LocalObjectManager::SpillObjectsInternal( io_worker->rpc_client()->SpillObjects( request, [this, objects_to_spill, callback, io_worker]( const ray::Status &status, const rpc::SpillObjectsReply &r) { - io_worker_pool_.PushSpillWorker(io_worker); absl::MutexLock lock(&mutex_); + io_worker_pool_.PushSpillWorker(io_worker); if (!status.ok()) { for (const auto &object_id : objects_to_spill) { auto it = objects_pending_spill_.find(object_id); @@ -217,7 +233,8 @@ void LocalObjectManager::AddSpilledUrls( // be retrieved by other raylets. RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl( object_id, object_url, - [this, object_id, callback, num_remaining, num_bytes_spilled](Status status) { + [this, object_id, object_url, callback, num_remaining, + num_bytes_spilled](Status status) { RAY_CHECK_OK(status); absl::MutexLock lock(&mutex_); // Unpin the object. @@ -227,6 +244,21 @@ void LocalObjectManager::AddSpilledUrls( *num_bytes_spilled += it->second->GetSize(); objects_pending_spill_.erase(it); + // Update the object_id -> url_ref_count to use it for deletion later. + // We need to track the references here because a single file can contain + // multiple objects, and we shouldn't delete the file until + // all the objects are gone out of scope. + // object_url is equivalent to url_with_offset. + auto parsed_url = ParseURL(object_url); + const auto base_url_it = parsed_url->find("url"); + RAY_CHECK(base_url_it != parsed_url->end()); + if (!url_ref_count_.contains(base_url_it->second)) { + url_ref_count_[base_url_it->second] = 1; + } else { + url_ref_count_[base_url_it->second] += 1; + } + spilled_objects_url_.emplace(object_id, object_url); + (*num_remaining)--; if (*num_remaining == 0 && callback) { callback(status); @@ -265,6 +297,73 @@ void LocalObjectManager::AsyncRestoreSpilledObject( }); } +void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size) { + absl::MutexLock lock(&mutex_); + std::vector object_urls_to_delete; + + // Process upto batch size of objects to delete. + while (!spilled_object_pending_delete_.empty() && + object_urls_to_delete.size() < max_batch_size) { + auto &object_id = spilled_object_pending_delete_.front(); + // If the object is still spilling, do nothing. This will block other entries to be + // processed, but it should be fine because the spilling will be eventually done, and + // deleting objects is the low priority tasks. + // This will instead enable simpler logic after this block. + if (objects_pending_spill_.contains(object_id)) { + break; + } + + // Object id is either spilled or not spilled at this point. + const auto spilled_objects_url_it = spilled_objects_url_.find(object_id); + if (spilled_objects_url_it != spilled_objects_url_.end()) { + // If the object was spilled, see if we can delete it. We should first check the ref + // count. + std::string &object_url = spilled_objects_url_it->second; + // Note that here, we need to parse the object url to obtain the base_url. + auto parsed_url = ParseURL(object_url); + const auto base_url_it = parsed_url->find("url"); + RAY_CHECK(base_url_it != parsed_url->end()); + const auto &url_ref_count_it = url_ref_count_.find(base_url_it->second); + RAY_CHECK(url_ref_count_it != url_ref_count_.end()) + << "url_ref_count_ should exist when spilled_objects_url_ exists. Please " + "submit a Github issue if you see this error."; + url_ref_count_it->second -= 1; + + // If there's no more refs, delete the object. + if (url_ref_count_it->second == 0) { + url_ref_count_.erase(url_ref_count_it); + object_urls_to_delete.emplace_back(object_url); + } + spilled_objects_url_.erase(spilled_objects_url_it); + } + spilled_object_pending_delete_.pop(); + } + if (object_urls_to_delete.size() > 0) { + DeleteSpilledObjects(object_urls_to_delete); + } +} + +void LocalObjectManager::DeleteSpilledObjects(std::vector &urls_to_delete) { + io_worker_pool_.PopDeleteWorker( + [this, urls_to_delete](std::shared_ptr io_worker) { + RAY_LOG(DEBUG) << "Sending delete spilled object request. Length: " + << urls_to_delete.size(); + rpc::DeleteSpilledObjectsRequest request; + for (const auto &url : urls_to_delete) { + request.add_spilled_objects_url(std::move(url)); + } + io_worker->rpc_client()->DeleteSpilledObjects( + request, [this, io_worker](const ray::Status &status, + const rpc::DeleteSpilledObjectsReply &reply) { + io_worker_pool_.PushDeleteWorker(io_worker); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to send delete spilled object request: " + << status.ToString(); + } + }); + }); +} + }; // namespace raylet }; // namespace ray diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index f645062ae..b615af346 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -33,10 +33,12 @@ namespace raylet { /// have been freed, and objects that have been spilled. class LocalObjectManager { public: - LocalObjectManager(size_t free_objects_batch_size, int64_t free_objects_period_ms, + LocalObjectManager(boost::asio::io_service &io_context, size_t free_objects_batch_size, + int64_t free_objects_period_ms, IOWorkerPoolInterface &io_worker_pool, gcs::ObjectInfoAccessor &object_info_accessor, rpc::CoreWorkerClientPool &owner_client_pool, + bool object_pinning_enabled, bool automatic_object_deletion_enabled, std::function &)> on_objects_freed, SpaceReleasedCallback on_objects_spilled) : free_objects_period_ms_(free_objects_period_ms), @@ -44,6 +46,8 @@ class LocalObjectManager { io_worker_pool_(io_worker_pool), object_info_accessor_(object_info_accessor), owner_client_pool_(owner_client_pool), + object_pinning_enabled_(object_pinning_enabled), + automatic_object_deletion_enabled_(automatic_object_deletion_enabled), on_objects_freed_(on_objects_freed), on_objects_spilled_(on_objects_spilled), last_free_objects_at_ms_(current_time_ms()) {} @@ -102,6 +106,16 @@ class LocalObjectManager { /// Try to clear any objects that have been freed. void FlushFreeObjectsIfNeeded(int64_t now_ms); + /// Judge if objects are deletable from pending_delete_queue and delete them if + /// necessary. + /// TODO(sang): We currently only use 1 IO worker per each call to this method because + /// delete is a low priority tasks. But we can potentially support more workers to be + /// used at once. + /// + /// \param max_batch_size Maximum number of objects that can be deleted by one + /// invocation. + void ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_size); + private: /// Internal helper method for spilling objects. void SpillObjectsInternal(const std::vector &objects_ids, @@ -121,6 +135,11 @@ class LocalObjectManager { const rpc::SpillObjectsReply &worker_reply, std::function callback); + /// Delete spilled objects stored in given urls. + /// + /// \param urls_to_delete List of urls to delete from external storages. + void DeleteSpilledObjects(std::vector &urls_to_delete); + /// The period between attempts to eagerly evict objects from plasma. const int64_t free_objects_period_ms_; @@ -137,6 +156,12 @@ class LocalObjectManager { /// this node. rpc::CoreWorkerClientPool &owner_client_pool_; + /// Whether to enable pinning for plasma objects. + bool object_pinning_enabled_; + + /// Whether to enable automatic deletion when refs are gone out of scope. + bool automatic_object_deletion_enabled_; + /// A callback to call when an object has been freed. std::function &)> on_objects_freed_; @@ -171,6 +196,24 @@ class LocalObjectManager { /// This class is accessed by both the raylet and plasma store threads. The /// mutex protects private members that relate to object spilling. mutable absl::Mutex mutex_; + + /// + /// Fields below are used to delete spilled objects. + /// + + /// A list of object id and url pairs that need to be deleted. + /// We don't instantly delete objects when it goes out of scope from external storages + /// because those objects could be still in progress of spilling. + std::queue spilled_object_pending_delete_ GUARDED_BY(mutex_); + + /// Mapping from object id to url_with_offsets. We cannot reuse pinned_objects_ because + /// pinned_objects_ entries are deleted when spilling happens. + absl::flat_hash_map spilled_objects_url_ GUARDED_BY(mutex_); + + /// Base URL -> ref_count. It is used because there could be multiple objects + /// within a single spilled file. We need to ref count to avoid deleting the file + /// before all objects within that file are out of scope. + absl::flat_hash_map url_ref_count_ GUARDED_BY(mutex_); }; }; // namespace raylet diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index dfac0ee85..a8e5ad5ca 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -225,6 +225,8 @@ int main(int argc, char *argv[]) { RayConfig::instance().fair_queueing_enabled(); node_manager_config.object_pinning_enabled = RayConfig::instance().object_pinning_enabled(); + node_manager_config.automatic_object_deletion_enabled = + RayConfig::instance().automatic_object_deletion_enabled(); node_manager_config.store_socket_name = store_socket_name; node_manager_config.temp_dir = temp_dir; node_manager_config.session_dir = session_dir; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 680393d49..9b77a6e38 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -157,9 +157,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self agent_manager_service_(io_service, *agent_manager_service_handler_), client_call_manager_(io_service), worker_rpc_pool_(client_call_manager_), - local_object_manager_(RayConfig::instance().free_objects_batch_size(), + local_object_manager_(io_service_, RayConfig::instance().free_objects_batch_size(), RayConfig::instance().free_objects_period_milliseconds(), worker_pool_, gcs_client_->Objects(), worker_rpc_pool_, + /* object_pinning_enabled */ config.object_pinning_enabled, + /* automatic_object_deletion_enabled */ + config.automatic_object_deletion_enabled, [this](const std::vector &object_ids) { object_manager_.FreeObjects(object_ids, /*local_only=*/false); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index c1ef24a08..ccc1fb1ae 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -91,6 +91,8 @@ struct NodeManagerConfig { bool fair_queueing_enabled; /// Whether to enable pinning for plasma objects. bool object_pinning_enabled; + /// Whether to enable automatic object deletion for object spilling. + bool automatic_object_deletion_enabled; /// The store socket name. std::string store_socket_name; /// The path to the ray temp dir. diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 4311c613b..d6001dc94 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -85,7 +85,35 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface { callback(Status(), reply); } + void DeleteSpilledObjects( + const rpc::DeleteSpilledObjectsRequest &request, + const rpc::ClientCallback &callback) override { + rpc::DeleteSpilledObjectsReply reply; + delete_requests.push_back(request); + delete_callbacks.push_back(callback); + } + + /// Return the number of deleted urls. + int ReplyDeleteSpilledObjects(Status status = Status::OK()) { + if (delete_callbacks.size() == 0) { + return 0; + } + + auto callback = delete_callbacks.front(); + auto reply = rpc::DeleteSpilledObjectsReply(); + callback(status, reply); + + auto &request = delete_requests.front(); + int deleted_urls_size = request.spilled_objects_url_size(); + delete_callbacks.pop_front(); + delete_requests.pop_front(); + + return deleted_urls_size; + } + std::list> callbacks; + std::list> delete_callbacks; + std::list delete_requests; }; class MockIOWorker : public MockWorker { @@ -105,6 +133,8 @@ class MockIOWorkerPool : public IOWorkerPoolInterface { MOCK_METHOD1(PushRestoreWorker, void(const std::shared_ptr &worker)); + MOCK_METHOD1(PushDeleteWorker, void(const std::shared_ptr &worker)); + void PopSpillWorker( std::function)> callback) override { callback(io_worker); @@ -115,6 +145,11 @@ class MockIOWorkerPool : public IOWorkerPoolInterface { callback(io_worker); } + void PopDeleteWorker( + std::function)> callback) override { + callback(io_worker); + } + std::shared_ptr io_worker_client = std::make_shared(); std::shared_ptr io_worker = @@ -197,8 +232,10 @@ class LocalObjectManagerTest : public ::testing::Test { LocalObjectManagerTest() : owner_client(std::make_shared()), client_pool([&](const rpc::Address &addr) { return owner_client; }), - manager(free_objects_batch_size, + manager(io_service_, free_objects_batch_size, /*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool, + /*object_pinning_enabled=*/true, + /*automatic_object_delete_enabled=*/true, [&](const std::vector &object_ids) { for (const auto &object_id : object_ids) { freed.insert(object_id); @@ -209,6 +246,12 @@ class LocalObjectManagerTest : public ::testing::Test { RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}}); } + std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) { + return url + "?" + "num_objects=" + std::to_string(num_objects) + + "&offset=" + std::to_string(offset); + } + + boost::asio::io_service io_service_; size_t free_objects_batch_size = 3; std::shared_ptr owner_client; rpc::CoreWorkerClientPool client_pool; @@ -292,7 +335,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); std::vector urls; for (size_t i = 0; i < object_ids.size(); i++) { - urls.push_back("url" + std::to_string(i)); + urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { @@ -342,7 +385,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { std::vector urls; for (size_t i = 0; i < object_ids.size(); i++) { - urls.push_back("url" + std::to_string(i)); + urls.push_back(BuildURL("url" + std::to_string(i))); } EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); @@ -394,7 +437,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { // global object directory. std::vector urls; for (size_t i = 0; i < object_ids.size() / 2 + 1; i++) { - urls.push_back("url" + std::to_string(i)); + urls.push_back(BuildURL("url" + std::to_string(i))); } EXPECT_CALL(worker_pool, PushSpillWorker(_)); // Objects should get freed even though we didn't wait for the owner's notice @@ -450,7 +493,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_TRUE(status.ok()); num_times_fired++; }); - std::string url = "url"; + std::string url = BuildURL("url"); EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); @@ -460,6 +503,250 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { ASSERT_TRUE(num_callbacks_fired > 0); } +TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) { + // Make sure the delete queue won't delete any object when there are no spilled objects. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + std::vector object_ids; + std::vector> objects; + + // Make sure when there is no spilled object, nothing is deleted. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + manager.WaitForObjectFree(owner_address, object_ids); + + for (size_t i = 0; i < free_objects_batch_size; i++) { + ASSERT_TRUE(freed.empty()); + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + } + + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + ASSERT_EQ(deleted_urls_size, 0); +} + +TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { + // Make sure spilled objects are deleted when the delete queue is processed. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + std::vector object_ids; + std::vector> objects; + + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + manager.WaitForObjectFree(owner_address, object_ids); + + // 2 Objects are spilled out of 3. + std::vector object_ids_to_spill; + int spilled_urls_size = free_objects_batch_size - 1; + for (int i = 0; i < spilled_urls_size; i++) { + object_ids_to_spill.push_back(object_ids[i]); + } + manager.SpillObjects(object_ids_to_spill, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + std::vector urls; + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + + // All objects are out of scope now. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + } + + // // Make sure all spilled objects are deleted. + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size()); +} + +TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { + // Make sure an url is deleted only when every object stored in that url is deleted + // (when ref_count == 0). + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + std::vector object_ids; + std::vector> objects; + + // Objects are pinned. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + manager.WaitForObjectFree(owner_address, object_ids); + + // Every object is spilled. + std::vector object_ids_to_spill; + int spilled_urls_size = free_objects_batch_size; + for (int i = 0; i < spilled_urls_size; i++) { + object_ids_to_spill.push_back(object_ids[i]); + } + manager.SpillObjects(object_ids_to_spill, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + std::vector urls; + // Note every object has the same url. It means all objects are fused. + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + // Simulate the situation where there's a single file that contains multiple objects. + urls.push_back(BuildURL("unified_url", + /*offset=*/i, + /*num_objects*/ object_ids_to_spill.size())); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + + // Everything is evicted except the last object. In this case, ref count is still > 0. + for (size_t i = 0; i < free_objects_batch_size - 1; i++) { + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + } + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + // Nothing is deleted yet because the ref count is > 0. + ASSERT_EQ(deleted_urls_size, 0); + + // The last reference is deleted. + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + // Now the object is deleted. + ASSERT_EQ(deleted_urls_size, 1); +} + +TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { + // Make sure the object delete queue is blocked when there are spilling objects. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + std::vector object_ids; + std::vector> objects; + + // Objects are pinned. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + manager.WaitForObjectFree(owner_address, object_ids); + + // Objects are spilled. + std::vector object_ids_to_spill; + int spilled_urls_size = free_objects_batch_size; + for (int i = 0; i < spilled_urls_size; i++) { + object_ids_to_spill.push_back(object_ids[i]); + } + manager.SpillObjects(object_ids_to_spill, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + + std::vector urls; + // Only 1 object's spilling is done. Everything else is still spilling. + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < 1; i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + // Every object has gone out of scope. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + } + // // Now, deletion queue would process only the first object. Everything else won't be + // deleted although it is out of scope because they are still spilling. + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + // Only the first entry that is already spilled will be deleted. + ASSERT_EQ(deleted_urls_size, 1); + + // Now spilling is completely done. + std::vector new_urls; + for (size_t i = 1; i < object_ids_to_spill.size(); i++) { + new_urls.push_back(BuildURL("url" + std::to_string(i))); + } + for (size_t i = 1; i < object_ids_to_spill.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + + // Every object is now deleted. + manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); + deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + ASSERT_EQ(deleted_urls_size, object_ids_to_spill.size() - 1); +} + +TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { + // Make sure deletion queue can only process upto X entries. + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + std::vector object_ids; + std::vector> objects; + + // Make sure when there is no spilled object, nothing is deleted. + for (size_t i = 0; i < free_objects_batch_size + 1; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + manager.WaitForObjectFree(owner_address, object_ids); + + std::vector object_ids_to_spill; + int spilled_urls_size = free_objects_batch_size; + for (int i = 0; i < spilled_urls_size; i++) { + object_ids_to_spill.push_back(object_ids[i]); + } + + // All the entries are spilled. + manager.SpillObjects(object_ids_to_spill, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + std::vector urls; + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids_to_spill.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + + // Every reference has gone out of scope. + for (size_t i = 0; i < free_objects_batch_size; i++) { + ASSERT_TRUE(owner_client->ReplyObjectEviction()); + } + + // The spilled objects should be deleted as number of spilled urls exceeds the batch + // size. + int deleted_urls_size = worker_pool.io_worker_client->ReplyDeleteSpilledObjects(); + ASSERT_EQ(deleted_urls_size, free_objects_batch_size); +} + TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill) { /// Test the case SpillObjectsOfSize(num_bytes_to_spill, min_bytes_to_spill @@ -494,7 +781,7 @@ TEST_F(LocalObjectManagerTest, // Make sure the spilling is done properly. std::vector urls; for (size_t i = 0; i < object_ids.size(); i++) { - urls.push_back("url" + std::to_string(i)); + urls.push_back(BuildURL("url" + std::to_string(i))); } EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 1a1f1271e..bc649ac02 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -656,6 +656,29 @@ void WorkerPool::PopIOWorkerInternal( } } +void WorkerPool::PushDeleteWorker(const std::shared_ptr &worker) { + RAY_CHECK(IsIOWorkerType(worker->GetWorkerType())); + if (worker->GetWorkerType() == rpc::WorkerType::RESTORE_WORKER) { + PushRestoreWorker(worker); + } else { + PushSpillWorker(worker); + } +} + +void WorkerPool::PopDeleteWorker( + std::function)> callback) { + auto &state = GetStateForLanguage(Language::PYTHON); + // Choose an I/O worker with more idle workers. + size_t num_spill_idle_workers = state.spill_io_worker_state.idle_io_workers.size(); + size_t num_restore_idle_workers = state.restore_io_worker_state.idle_io_workers.size(); + + if (num_restore_idle_workers < num_spill_idle_workers) { + PopSpillWorker(callback); + } else { + PopRestoreWorker(callback); + } +} + void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Since the worker is now idle, unset its assigned task ID. RAY_CHECK(worker->GetAssignedTaskId().IsNil()) diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 2ea65468b..78ae85564 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -72,6 +72,11 @@ class IOWorkerPoolInterface { virtual void PopRestoreWorker( std::function)> callback) = 0; + virtual void PushDeleteWorker(const std::shared_ptr &worker) = 0; + + virtual void PopDeleteWorker( + std::function)> callback) = 0; + virtual ~IOWorkerPoolInterface(){}; }; @@ -215,6 +220,22 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \param callback The callback that returns an available restore I/O worker. void PopRestoreWorker(std::function)> callback); + /// Add an idle delete I/O worker to the pool. + /// + /// NOTE: There's currently no concept of delete workers or delete worker pools. + /// When deleting objects, it shares the workers within restore or spill worker pools. + /// This method is just a higher level abstraction to hide that implementation detail. + /// + /// \param worker The idle I/O worker. It could be either spill or restore I/O worker. + void PushDeleteWorker(const std::shared_ptr &worker); + + /// Pop an idle delete I/O worker from the pool and trigger a callback when + /// when delete I/O worker is available. + /// NOTE: There's currently no concept of delete workers or delete worker pools. + /// This method just finds more available I/O workers from either spill or restore pool + /// and pop them out. + void PopDeleteWorker(std::function)> callback); + /// Add an idle worker to the pool. /// /// \param The idle worker to add. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index ddf6b2dad..833be632b 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -732,6 +732,44 @@ TEST_P(WorkerPoolTest, MaxSpillRestoreWorkersIntegrationTest) { ASSERT_EQ(worker_pool_->GetProcessSize(), 2 * MAX_IO_WORKER_SIZE); } +TEST_P(WorkerPoolTest, DeleteWorkerPushPop) { + /// Make sure delete workers always pop an I/O worker that has more idle worker in their + /// pools. + // 2 spill worker and 1 restore worker. + std::unordered_set> spill_workers; + spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy())); + spill_workers.insert(CreateSpillWorker(Process::CreateNewDummy())); + + std::unordered_set> restore_workers; + restore_workers.insert(CreateRestoreWorker(Process::CreateNewDummy())); + + for (const auto &worker : spill_workers) { + worker_pool_->PushSpillWorker(worker); + } + for (const auto &worker : restore_workers) { + worker_pool_->PushRestoreWorker(worker); + } + + // PopDeleteWorker should pop a spill worker in this case. + worker_pool_->PopDeleteWorker([this](std::shared_ptr worker) { + ASSERT_EQ(worker->GetWorkerType(), rpc::WorkerType::SPILL_WORKER); + worker_pool_->PushDeleteWorker(worker); + }); + + // Add 2 more restore workers. Now we have 2 spill workers and 3 restore workers. + for (int i = 0; i < 2; i++) { + auto restore_worker = CreateRestoreWorker(Process::CreateNewDummy()); + restore_workers.insert(restore_worker); + worker_pool_->PushRestoreWorker(restore_worker); + } + + // PopDeleteWorker should pop a spill worker in this case. + worker_pool_->PopDeleteWorker([this](std::shared_ptr worker) { + ASSERT_EQ(worker->GetWorkerType(), rpc::WorkerType::RESTORE_WORKER); + worker_pool_->PushDeleteWorker(worker); + }); +} + INSTANTIATE_TEST_CASE_P(WorkerPoolMultiTenancyTest, WorkerPoolTest, ::testing::Values(true, false)); diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 3d0e599bd..a014a1776 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -182,6 +182,10 @@ class CoreWorkerClientInterface { const RestoreSpilledObjectsRequest &request, const ClientCallback &callback) {} + virtual void DeleteSpilledObjects( + const DeleteSpilledObjectsRequest &request, + const ClientCallback &callback) {} + virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request, const ClientCallback &callback) { } @@ -245,6 +249,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, RestoreSpilledObjects, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, DeleteSpilledObjects, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override) diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 88ccde3c2..8f9d236e0 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -43,6 +43,7 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \ RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \ + RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \ RPC_SERVICE_HANDLER(CoreWorkerService, Exit) @@ -63,6 +64,7 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) diff --git a/src/ray/util/util.cc b/src/ray/util/util.cc index bf13fbbd5..63ab4e9b9 100644 --- a/src/ray/util/util.cc +++ b/src/ray/util/util.cc @@ -327,3 +327,38 @@ std::string CreateCommandLine(const std::vector &args, } return result; } + +std::shared_ptr> ParseURL(std::string url) { + auto result = std::make_shared>(); + std::string delimiter = "?"; + size_t pos = 0; + pos = url.find(delimiter); + if (pos == std::string::npos) { + return result; + } + + const std::string base_url = url.substr(0, pos); + result->emplace("url", base_url); + url.erase(0, pos + delimiter.length()); + const std::string query_delimeter = "&"; + + auto parse_key_value_with_equal_delimter = [](std::string key_value) { + // Parse the query key value pair. + const std::string key_value_delimter = "="; + size_t key_value_pos = 0; + key_value_pos = key_value.find(key_value_delimter); + const std::string key = key_value.substr(0, key_value_pos); + return std::make_pair(key, key_value.substr(key.size() + 1)); + }; + + while ((pos = url.find(query_delimeter)) != std::string::npos) { + std::string token = url.substr(0, pos); + auto key_value_pair = parse_key_value_with_equal_delimter(token); + result->emplace(key_value_pair.first, key_value_pair.second); + url.erase(0, pos + delimiter.length()); + } + std::string token = url.substr(0, pos); + auto key_value_pair = parse_key_value_with_equal_delimter(token); + result->emplace(key_value_pair.first, key_value_pair.second); + return result; +} diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 5b678d4f2..763119861 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -116,6 +116,16 @@ std::string EndpointToUrl( boost::asio::generic::basic_endpoint ParseUrlEndpoint(const std::string &endpoint, int default_port = 0); +/// Parse the url and return a pair of base_url and query string map. +/// EX) http://abc?num_objects=9&offset=8388878 +/// will be returned as +/// { +/// url: http://abc, +/// num_objects: 9, +/// offset: 8388878 +/// } +std::shared_ptr> ParseURL(std::string url); + class InitShutdownRAII { public: /// Type of the Shutdown function. diff --git a/src/ray/util/util_test.cc b/src/ray/util/util_test.cc index 19bcb5776..ea3081c82 100644 --- a/src/ray/util/util_test.cc +++ b/src/ray/util/util_test.cc @@ -5,6 +5,7 @@ #include #include "gtest/gtest.h" +#include "ray/util/logging.h" static const char *argv0 = NULL; @@ -87,6 +88,15 @@ TEST(UtilTest, ParseCommandLineTest) { ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"})); } +TEST(UtilTest, ParseURLTest) { + const std::string url = "http://abc?num_objects=9&offset=8388878&size=8388878"; + auto parsed_url = *ParseURL(url); + ASSERT_EQ(parsed_url["url"], "http://abc"); + ASSERT_EQ(parsed_url["num_objects"], "9"); + ASSERT_EQ(parsed_url["offset"], "8388878"); + ASSERT_EQ(parsed_url["size"], "8388878"); +} + TEST(UtilTest, CreateCommandLineTest) { typedef std::vector ArgList; CommandLineSyntax posix = CommandLineSyntax::POSIX, win32 = CommandLineSyntax::Windows,