From 4ad79ca963015c14ef202100c7db6ed5bd29d884 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 15 Feb 2021 14:24:53 -0800 Subject: [PATCH] [Object Spilling] Remove LRU eviction (#13977) * done. * formatting. * done. * done. --- python/ray/node.py | 4 - python/ray/parameter.py | 19 +--- python/ray/tests/test_actor_failures.py | 22 ++-- python/ray/tests/test_advanced_3.py | 5 +- python/ray/tests/test_basic_2.py | 6 +- python/ray/tests/test_failure.py | 50 --------- python/ray/tests/test_reference_counting.py | 4 +- python/ray/worker.py | 9 -- src/ray/common/ray_config_def.h | 14 +-- .../plasma/create_request_queue.cc | 11 +- .../plasma/create_request_queue.h | 19 +--- src/ray/object_manager/plasma/store.cc | 56 +++++----- src/ray/object_manager/plasma/store.h | 17 ++- .../test/create_request_queue_test.cc | 59 +++------- src/ray/raylet/local_object_manager.cc | 28 +++-- src/ray/raylet/local_object_manager.h | 6 +- src/ray/raylet/main.cc | 2 - src/ray/raylet/node_manager.cc | 102 ++++++++---------- src/ray/raylet/node_manager.h | 4 - .../raylet/test/local_object_manager_test.cc | 1 - 20 files changed, 128 insertions(+), 310 deletions(-) diff --git a/python/ray/node.py b/python/ray/node.py index cd2dc2250..05f3383a5 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -120,10 +120,6 @@ class Node: raise ValueError( "Internal config parameters can only be set on the head node.") - if ray_params._lru_evict: - assert (connect_only or - head), "LRU Evict can only be passed into the head node." - self._raylet_ip_address = raylet_ip_address ray_params.update_if_absent( diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 043cc258c..bdeec7627 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -102,7 +102,6 @@ class RayParams: _system_config (dict): Configuration for overriding RayConfig defaults. Used to set system configuration and for experimental Ray core feature flags. - lru_evict (bool): Enable LRU eviction if space is needed. enable_object_reconstruction (bool): Enable plasma reconstruction on failure. start_initial_python_workers_for_first_job (bool): If true, start @@ -199,30 +198,22 @@ class RayParams: self.start_initial_python_workers_for_first_job = ( start_initial_python_workers_for_first_job) self._system_config = _system_config or {} - self._lru_evict = lru_evict self._enable_object_reconstruction = enable_object_reconstruction self._check_usage() # Set the internal config options for LRU eviction. if lru_evict: - # Turn off object pinning. - if self._system_config is None: - self._system_config = dict() - if self._system_config.get("object_pinning_enabled", False): - raise Exception( - "Object pinning cannot be enabled if using LRU eviction.") - self._system_config["object_pinning_enabled"] = False - self._system_config["free_objects_period_milliseconds"] = 1000 + raise DeprecationWarning( + "The lru_evict flag is deprecated as Ray natively " + "supports object spilling. Please read " + "https://docs.ray.io/en/master/memory-management.html#object-spilling " # noqa + "for more details.") # Set the internal config options for object reconstruction. if enable_object_reconstruction: # Turn off object pinning. if self._system_config is None: self._system_config = dict() - if lru_evict: - raise Exception( - "Object reconstruction cannot be enabled if using LRU " - "eviction.") print(self._system_config) self._system_config["lineage_pinning_enabled"] = True self._system_config["free_objects_period_milliseconds"] = -1 diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index ff9c9fd45..677b0e0fc 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -32,10 +32,9 @@ def ray_init_with_task_retry_delay(): @pytest.mark.parametrize( "ray_start_regular", [{ "object_store_memory": 150 * 1024 * 1024, - "_lru_evict": True, }], indirect=True) -def test_actor_eviction(ray_start_regular): +def test_actor_spilled(ray_start_regular): object_store_memory = 150 * 1024 * 1024 @ray.remote @@ -58,19 +57,14 @@ def test_actor_eviction(ray_start_regular): ray.get(obj) # Get each object again. At this point, the earlier objects should have - # been evicted. - num_evicted, num_success = 0, 0 + # been spilled. + num_success = 0 for obj in objects: - try: - val = ray.get(obj) - assert isinstance(val, np.ndarray), val - num_success += 1 - except ray.exceptions.ObjectLostError: - num_evicted += 1 - # Some objects should have been evicted, and some should still be in the - # object store. - assert num_evicted > 0 - assert num_success > 0 + val = ray.get(obj) + assert isinstance(val, np.ndarray), val + num_success += 1 + # All of objects should've been spilled, so all of them should succeed. + assert num_success == len(objects) @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky on Windows.") diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index f9c736689..5a2b57e2c 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -344,10 +344,7 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check): def test_wait_reconstruction(shutdown_only): - ray.init( - num_cpus=1, - object_store_memory=int(10**8), - _system_config={"object_pinning_enabled": 0}) + ray.init(num_cpus=1, object_store_memory=int(10**8)) @ray.remote def f(): diff --git a/python/ray/tests/test_basic_2.py b/python/ray/tests/test_basic_2.py index b71c63fbf..21fabc4ba 100644 --- a/python/ray/tests/test_basic_2.py +++ b/python/ray/tests/test_basic_2.py @@ -342,7 +342,7 @@ def test_call_chain(ray_start_cluster): @pytest.mark.skipif(client_test_enabled(), reason="message size") def test_system_config_when_connecting(ray_start_cluster): - config = {"object_pinning_enabled": 0, "object_timeout_milliseconds": 200} + config = {"object_timeout_milliseconds": 200} cluster = ray.cluster_utils.Cluster() cluster.add_node( _system_config=config, object_store_memory=100 * 1024 * 1024) @@ -360,9 +360,7 @@ def test_system_config_when_connecting(ray_start_cluster): put_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) del put_ref - # This would not raise an exception if object pinning was enabled. - with pytest.raises(ray.exceptions.ObjectLostError): - ray.get(obj_ref) + ray.get(obj_ref) def test_get_multiple(ray_start_regular_shared): diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index b28ebe1ae..724033c19 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1120,56 +1120,6 @@ def test_fill_object_store_exception(shutdown_only): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) -def test_fill_object_store_lru_fallback(shutdown_only): - config = { - "free_objects_batch_size": 1, - } - ray.init( - num_cpus=2, - object_store_memory=10**8, - _lru_evict=True, - _system_config=config) - - @ray.remote - def expensive_task(): - return np.zeros((10**8) // 2, dtype=np.uint8) - - # Check that objects out of scope are cleaned up quickly. - ray.get(expensive_task.remote()) - start = time.time() - for _ in range(3): - ray.get(expensive_task.remote()) - end = time.time() - assert end - start < 3 - - obj_refs = [] - for _ in range(3): - obj_ref = expensive_task.remote() - ray.get(obj_ref) - obj_refs.append(obj_ref) - - @ray.remote - class LargeMemoryActor: - def some_expensive_task(self): - return np.zeros(10**8 // 2, dtype=np.uint8) - - def test(self): - return 1 - - actor = LargeMemoryActor.remote() - for _ in range(3): - obj_ref = actor.some_expensive_task.remote() - ray.get(obj_ref) - obj_refs.append(obj_ref) - # Make sure actor does not die - ray.get(actor.test.remote()) - - for _ in range(3): - obj_ref = ray.put(np.zeros(10**8 // 2, dtype=np.uint8)) - ray.get(obj_ref) - obj_refs.append(obj_ref) - - @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 9fcd3c25f..0c0f3010a 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -245,9 +245,7 @@ def test_pending_task_dependency_pinning(one_worker_100MiB): def test_feature_flag(shutdown_only): - ray.init( - object_store_memory=100 * 1024 * 1024, - _system_config={"object_pinning_enabled": 0}) + ray.init(object_store_memory=100 * 1024 * 1024) @ray.remote def f(array): diff --git a/python/ray/worker.py b/python/ray/worker.py index 5ca73860a..7239b80a9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -601,12 +601,6 @@ def init( directory for the Ray process. Defaults to an OS-specific conventional location, e.g., "/tmp/ray". _java_worker_options: Overwrite the options to start Java workers. - _lru_evict (bool): If True, when an object store is full, it will evict - objects in LRU order to make more space and when under memory - pressure, ray.ObjectLostError may be thrown. If False, then - reference counting will be used to decide which objects are safe - to evict and when under memory pressure, ray.ObjectStoreFullError - may be thrown. _metrics_export_port(int): Port number Ray exposes system metrics through a Prometheus endpoint. It is currently under active development, and the API is subject to change. @@ -744,9 +738,6 @@ def init( if _system_config is not None and len(_system_config) != 0: raise ValueError("When connecting to an existing cluster, " "_system_config must not be provided.") - if _lru_evict: - raise ValueError("When connecting to an existing cluster, " - "_lru_evict must not be provided.") if _enable_object_reconstruction: raise ValueError( "When connecting to an existing cluster, " diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f109bbd59..3bcb15546 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -57,10 +57,6 @@ RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) /// type of task from starving other types (see issue #3664). RAY_CONFIG(bool, fair_queueing_enabled, true) -/// Whether to enable object pinning for plasma objects. When this is -/// enabled, objects in scope in the cluster will not be LRU evicted. -RAY_CONFIG(bool, object_pinning_enabled, true) - /// Whether to enable distributed reference counting for objects. When this is /// enabled, an object's ref count will include any references held by other /// processes, such as when an ObjectID is serialized and passed as an argument @@ -70,11 +66,9 @@ RAY_CONFIG(bool, object_pinning_enabled, true) /// information: /// 1. Local Python references to the ObjectID. /// 2. Pending tasks submitted by the local process that depend on the object. -/// If both this flag and object_pinning_enabled are turned on, then an object +/// If both this flag is turned on, then an object /// will not be LRU evicted until it is out of scope in ALL processes in the -/// cluster and all objects that contain it are also out of scope. If this flag -/// is off and object_pinning_enabled is turned on, then an object will not be -/// LRU evicted until it is out of scope on the CREATOR of the ObjectID. +/// cluster and all objects that contain it are also out of scope. RAY_CONFIG(bool, distributed_ref_counting_enabled, true) /// Whether to record the creation sites of object references. This adds more @@ -82,7 +76,7 @@ RAY_CONFIG(bool, distributed_ref_counting_enabled, true) /// creating object references. RAY_CONFIG(bool, record_ref_creation_sites, true) -/// If object_pinning_enabled is on, then objects that have been unpinned are +/// Objects that have been unpinned are /// added to a local cache. When the cache is flushed, all objects in the cache /// will be eagerly evicted in a batch by freeing all plasma copies in the /// cluster. If set, then this is the duration between attempts to flush the @@ -96,7 +90,7 @@ RAY_CONFIG(bool, record_ref_creation_sites, true) /// raylet_heartbeat_period_milliseconds. RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000) -/// If object_pinning_enabled is on, then objects that have been unpinned are +/// Objects that have been unpinned are /// added to a local cache. When the cache is flushed, all objects in the cache /// will be eagerly evicted in a batch by freeing all plasma copies in the /// cluster. This is the maximum number of objects in the local cache before it diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index ddb9b0891..e8f45581b 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -81,16 +81,7 @@ std::pair CreateRequestQueue::TryRequestImmediately( } bool CreateRequestQueue::ProcessRequest(std::unique_ptr &request) { - // TODO(sang): Delete this logic when lru evict is removed. - bool evict_if_full = evict_if_full_; - if (oom_start_time_ns_ != -1) { - // If the first attempt fails, we set the evict_if_full true. - // We need this logic because if lru_evict flag is on, this is false because we - // shouldn't evict objects in the first attempt. - evict_if_full = true; - } - request->error = - request->create_callback(/*evict_if_full=*/evict_if_full, &request->result); + request->error = request->create_callback(&request->result); return request->error != PlasmaError::OutOfMemory; } diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h index d2ac288bd..d22ac292b 100644 --- a/src/ray/object_manager/plasma/create_request_queue.h +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -31,22 +31,16 @@ namespace plasma { class CreateRequestQueue { public: - using CreateObjectCallback = - std::function; + using CreateObjectCallback = std::function; - CreateRequestQueue(bool evict_if_full, int64_t oom_grace_period_s, + CreateRequestQueue(int64_t oom_grace_period_s, ray::SpillObjectsCallback spill_objects_callback, std::function trigger_global_gc, std::function get_time) - : evict_if_full_(evict_if_full), - oom_grace_period_ns_(oom_grace_period_s * 1e9), + : oom_grace_period_ns_(oom_grace_period_s * 1e9), spill_objects_callback_(spill_objects_callback), trigger_global_gc_(trigger_global_gc), - get_time_(get_time) { - RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with OOM grace period " - << oom_grace_period_ns_ << ", evict if full? " - << (evict_if_full_ ? 1 : 0); - } + get_time_(get_time) {} /// Add a request to the queue. The caller should use the returned request ID /// to later get the result of the request. @@ -151,11 +145,6 @@ class CreateRequestQueue { /// a request by retrying. Start at 1 because 0 means "do not retry". uint64_t next_req_id_ = 1; - /// On the first attempt to create an object, whether to evict from the - /// object store to make space. If the first attempt fails, then we will - /// always try to evict. - const bool evict_if_full_; - /// Grace period until we throw the OOM error to the application. /// -1 means grace period is infinite. const int64_t oom_grace_period_ns_; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 920ced48e..642d84204 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -129,7 +129,6 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() * 1e9), create_request_queue_( - /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), /*oom_grace_period_s=*/RayConfig::instance().oom_grace_period_s(), spill_objects_callback, object_store_full_callback, /*get_time=*/ @@ -173,21 +172,19 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt } // Allocate memory -uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE *fd, - int64_t *map_size, ptrdiff_t *offset, +uint8_t *PlasmaStore::AllocateMemory(size_t size, MEMFD_TYPE *fd, int64_t *map_size, + ptrdiff_t *offset, const std::shared_ptr &client, bool is_create, PlasmaError *error) { // First free up space from the client's LRU queue if quota enforcement is on. - if (evict_if_full) { - std::vector client_objects_to_evict; - bool quota_ok = eviction_policy_.EnforcePerClientQuota(client.get(), size, is_create, - &client_objects_to_evict); - if (!quota_ok) { - *error = PlasmaError::OutOfMemory; - return nullptr; - } - EvictObjects(client_objects_to_evict); + std::vector client_objects_to_evict; + bool quota_ok = eviction_policy_.EnforcePerClientQuota(client.get(), size, is_create, + &client_objects_to_evict); + if (!quota_ok) { + *error = PlasmaError::OutOfMemory; + return nullptr; } + EvictObjects(client_objects_to_evict); // Try to evict objects until there is enough space. uint8_t *pointer = nullptr; @@ -200,7 +197,7 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE // it is not guaranteed that the corresponding pointer in the client will be // 64-byte aligned, but in practice it often will be. pointer = reinterpret_cast(PlasmaAllocator::Memalign(kBlockSize, size)); - if (pointer || !evict_if_full) { + if (pointer) { // If we manage to allocate the memory, return the pointer. If we cannot // allocate the space, but we are also not allowed to evict anything to // make more space, return an error to the client. @@ -236,7 +233,6 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &client, const std::vector &message, - bool evict_if_full, PlasmaObject *object) { uint8_t *input = (uint8_t *)message.data(); size_t input_size = message.size(); @@ -252,9 +248,9 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr ReadCreateRequest(input, input_size, &object_id, &owner_raylet_id, &owner_ip_address, &owner_port, &owner_worker_id, &data_size, &metadata_size, &device_num); - auto error = CreateObject(object_id, owner_raylet_id, owner_ip_address, owner_port, - owner_worker_id, evict_if_full, data_size, metadata_size, - device_num, client, object); + auto error = + CreateObject(object_id, owner_raylet_id, owner_ip_address, owner_port, + owner_worker_id, data_size, metadata_size, device_num, client, object); if (error == PlasmaError::OutOfMemory) { RAY_LOG(DEBUG) << "Not enough memory to create the object " << object_id << ", data_size=" << data_size << ", metadata_size=" << metadata_size; @@ -262,11 +258,13 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr return error; } -PlasmaError PlasmaStore::CreateObject( - const ObjectID &object_id, const NodeID &owner_raylet_id, - const std::string &owner_ip_address, int owner_port, const WorkerID &owner_worker_id, - bool evict_if_full, int64_t data_size, int64_t metadata_size, int device_num, - const std::shared_ptr &client, PlasmaObject *result) { +PlasmaError PlasmaStore::CreateObject(const ObjectID &object_id, + const NodeID &owner_raylet_id, + const std::string &owner_ip_address, int owner_port, + const WorkerID &owner_worker_id, int64_t data_size, + int64_t metadata_size, int device_num, + const std::shared_ptr &client, + PlasmaObject *result) { RAY_LOG(DEBUG) << "creating object " << object_id.Hex() << " size " << data_size; auto entry = GetObjectTableEntry(&store_info_, object_id); @@ -284,8 +282,7 @@ PlasmaError PlasmaStore::CreateObject( if (device_num == 0) { PlasmaError error = PlasmaError::OK; - pointer = AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, - true, &error); + pointer = AllocateMemory(total_size, &fd, &map_size, &offset, client, true, &error); if (!pointer) { return error; } @@ -491,9 +488,9 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr &client, RAY_CHECK(!entry->pointer); PlasmaError error = PlasmaError::OK; - entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, - /*evict=*/true, &entry->fd, &entry->map_size, - &entry->offset, client, false, &error); + entry->pointer = + AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd, + &entry->map_size, &entry->offset, client, false, &error); if (entry->pointer) { // TODO(suquark): Not sure if this old behavior is still compatible // with our current object spilling mechanics. @@ -865,9 +862,8 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, const auto &object_id = GetCreateRequestObjectId(message); const auto &request = flatbuffers::GetRoot(input); - auto handle_create = [this, client, message](bool evict_if_full, - PlasmaObject *result) { - return HandleCreateObjectRequest(client, message, evict_if_full, result); + auto handle_create = [this, client, message](PlasmaObject *result) { + return HandleCreateObjectRequest(client, message, result); }; if (request->try_immediately()) { diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index eedcb526d..c6561bf65 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -77,10 +77,6 @@ class PlasmaStore { /// \param owner_ip_address IP address of the object's owner. /// \param owner_port Port of the object's owner. /// \param owner_worker_id Worker ID of the object's owner. - /// \param evict_if_full If this is true, then when the object store is full, - /// try to evict objects that are not currently referenced before - /// creating the object. Else, do not evict any objects and - /// immediately return an PlasmaError::OutOfMemory. /// \param data_size Size in bytes of the object to be created. /// \param metadata_size Size in bytes of the object metadata. /// \param device_num The number of the device where the object is being @@ -100,8 +96,8 @@ class PlasmaStore { /// plasma_release. PlasmaError CreateObject(const ObjectID &object_id, const NodeID &owner_raylet_id, const std::string &owner_ip_address, int owner_port, - const WorkerID &owner_worker_id, bool evict_if_full, - int64_t data_size, int64_t metadata_size, int device_num, + const WorkerID &owner_worker_id, int64_t data_size, + int64_t metadata_size, int device_num, const std::shared_ptr &client, PlasmaObject *result); /// Abort a created but unsealed object. If the client is not the @@ -224,7 +220,7 @@ class PlasmaStore { private: PlasmaError HandleCreateObjectRequest(const std::shared_ptr &client, const std::vector &message, - bool evict_if_full, PlasmaObject *object); + PlasmaObject *object); void ReplyToCreateClient(const std::shared_ptr &client, const ObjectID &object_id, uint64_t req_id); @@ -255,10 +251,9 @@ class PlasmaStore { void EraseFromObjectTable(const ObjectID &object_id); - uint8_t *AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE *fd, - int64_t *map_size, ptrdiff_t *offset, - const std::shared_ptr &client, bool is_create, - PlasmaError *error); + uint8_t *AllocateMemory(size_t size, MEMFD_TYPE *fd, int64_t *map_size, + ptrdiff_t *offset, const std::shared_ptr &client, + bool is_create, PlasmaError *error); // Start listening for clients. void DoAccept(); diff --git a/src/ray/object_manager/test/create_request_queue_test.cc b/src/ray/object_manager/test/create_request_queue_test.cc index ec75e0043..5b107c71a 100644 --- a/src/ray/object_manager/test/create_request_queue_test.cc +++ b/src/ray/object_manager/test/create_request_queue_test.cc @@ -49,7 +49,6 @@ class CreateRequestQueueTest : public ::testing::Test { : oom_grace_period_s_(1), current_time_ns_(0), queue_( - /*evict_if_full=*/true, /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return false; }, /*on_global_gc=*/[&]() { num_global_gc_++; }, @@ -69,7 +68,7 @@ class CreateRequestQueueTest : public ::testing::Test { }; TEST_F(CreateRequestQueueTest, TestSimple) { - auto request = [&](bool evict_if_full, PlasmaObject *result) { + auto request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -105,10 +104,8 @@ TEST_F(CreateRequestQueueTest, TestSimple) { } TEST_F(CreateRequestQueueTest, TestOom) { - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { - return PlasmaError::OutOfMemory; - }; - auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + auto oom_request = [&](PlasmaObject *result) { return PlasmaError::OutOfMemory; }; + auto blocked_request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -141,17 +138,14 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { int num_global_gc_ = 0; int64_t current_time_ns; CreateRequestQueue queue( - /*evict_if_full=*/true, /*oom_grace_period_s=*/100, // Spilling is failing. /*spill_object_callback=*/[&]() { return false; }, /*on_global_gc=*/[&]() { num_global_gc_++; }, /*get_time=*/[&]() { return current_time_ns; }); - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { - return PlasmaError::OutOfMemory; - }; - auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + auto oom_request = [&](PlasmaObject *result) { return PlasmaError::OutOfMemory; }; + auto blocked_request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -174,20 +168,19 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { TEST_F(CreateRequestQueueTest, TestTransientOom) { CreateRequestQueue queue( - /*evict_if_full=*/true, /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return true; }, /*on_global_gc=*/[&]() { num_global_gc_++; }, /*get_time=*/[&]() { return current_time_ns_; }); auto return_status = PlasmaError::OutOfMemory; - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + auto oom_request = [&](PlasmaObject *result) { if (return_status == PlasmaError::OK) { result->data_size = 1234; } return return_status; }; - auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + auto blocked_request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -220,20 +213,19 @@ TEST_F(CreateRequestQueueTest, TestTransientOom) { TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { bool is_spilling_possible = true; CreateRequestQueue queue( - /*evict_if_full=*/true, /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return is_spilling_possible; }, /*on_global_gc=*/[&]() { num_global_gc_++; }, /*get_time=*/[&]() { return current_time_ns_; }); auto return_status = PlasmaError::OutOfMemory; - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { + auto oom_request = [&](PlasmaObject *result) { if (return_status == PlasmaError::OK) { result->data_size = 1234; } return return_status; }; - auto blocked_request = [&](bool evict_if_full, PlasmaObject *result) { + auto blocked_request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -271,38 +263,15 @@ TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { AssertNoLeaks(); } -TEST_F(CreateRequestQueueTest, TestEvictIfFull) { - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { - RAY_CHECK(evict_if_full); - return PlasmaError::OutOfMemory; - }; - - auto client = std::make_shared(); - static_cast(queue_.AddRequest(ObjectID::Nil(), client, oom_request)); - ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); - ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); -} - TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) { int64_t current_time_ns = 0; CreateRequestQueue queue( - /*evict_if_full=*/false, /*oom_grace_period_s=*/1, /*spill_object_callback=*/[&]() { return false; }, /*on_global_gc=*/[&]() {}, /*get_time=*/[&]() { return current_time_ns; }); - bool first_try = true; - - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { - if (first_try) { - RAY_CHECK(!evict_if_full); - first_try = false; - } else { - RAY_CHECK(evict_if_full); - } - return PlasmaError::OutOfMemory; - }; + auto oom_request = [&](PlasmaObject *result) { return PlasmaError::OutOfMemory; }; auto client = std::make_shared(); static_cast(queue.AddRequest(ObjectID::Nil(), client, oom_request)); @@ -312,7 +281,7 @@ TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) { } TEST_F(CreateRequestQueueTest, TestClientDisconnected) { - auto request = [&](bool evict_if_full, PlasmaObject *result) { + auto request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -341,7 +310,7 @@ TEST_F(CreateRequestQueueTest, TestClientDisconnected) { } TEST_F(CreateRequestQueueTest, TestTryRequestImmediately) { - auto request = [&](bool evict_if_full, PlasmaObject *result) { + auto request = [&](PlasmaObject *result) { result->data_size = 1234; return PlasmaError::OK; }; @@ -366,9 +335,7 @@ TEST_F(CreateRequestQueueTest, TestTryRequestImmediately) { // Queue is empty, but request would block. Check that we do not attempt to // retry the request. - auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { - return PlasmaError::OutOfMemory; - }; + auto oom_request = [&](PlasmaObject *result) { return PlasmaError::OutOfMemory; }; result = queue_.TryRequestImmediately(ObjectID::Nil(), client, oom_request); ASSERT_EQ(result.first.data_size, 0); ASSERT_EQ(result.second, PlasmaError::OutOfMemory); diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 3ee7de57c..d37576a48 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -23,7 +23,6 @@ namespace raylet { void LocalObjectManager::PinObjects(const std::vector &object_ids, std::vector> &&objects, const rpc::Address &owner_address) { - RAY_CHECK(object_pinning_enabled_); for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; auto &object = objects[i]; @@ -61,20 +60,17 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, } void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { - // object_pinning_enabled_ flag is off when the --lru-evict flag is on. - if (object_pinning_enabled_) { - RAY_LOG(DEBUG) << "Unpinning object " << object_id; - // The object should be in one of these stats. pinned, spilling, or spilled. - RAY_CHECK((pinned_objects_.count(object_id) > 0) || - (spilled_objects_url_.count(object_id) > 0) || - (objects_pending_spill_.count(object_id) > 0)); - if (automatic_object_deletion_enabled_) { - spilled_object_pending_delete_.push(object_id); - } - if (pinned_objects_.count(object_id)) { - pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize(); - pinned_objects_.erase(object_id); - } + RAY_LOG(DEBUG) << "Unpinning object " << object_id; + // The object should be in one of these stats. pinned, spilling, or spilled. + RAY_CHECK((pinned_objects_.count(object_id) > 0) || + (spilled_objects_url_.count(object_id) > 0) || + (objects_pending_spill_.count(object_id) > 0)); + if (automatic_object_deletion_enabled_) { + spilled_object_pending_delete_.push(object_id); + } + if (pinned_objects_.count(object_id)) { + pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize(); + pinned_objects_.erase(object_id); } // Try to evict all copies of the object from the cluster. @@ -93,7 +89,7 @@ void LocalObjectManager::FlushFreeObjects() { on_objects_freed_(objects_to_free_); objects_to_free_.clear(); } - if (object_pinning_enabled_ && automatic_object_deletion_enabled_) { + if (automatic_object_deletion_enabled_) { // Deletion wouldn't work when the object pinning is not enabled. ProcessSpilledObjectsDeleteQueue(free_objects_batch_size_); } diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 267edabd9..285060ab5 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -41,7 +41,7 @@ class LocalObjectManager { const NodeID &node_id, size_t free_objects_batch_size, int64_t free_objects_period_ms, IOWorkerPoolInterface &io_worker_pool, gcs::ObjectInfoAccessor &object_info_accessor, - rpc::CoreWorkerClientPool &owner_client_pool, bool object_pinning_enabled, + rpc::CoreWorkerClientPool &owner_client_pool, bool automatic_object_deletion_enabled, int max_io_workers, int64_t min_spilling_size, bool is_external_storage_type_fs, std::function &)> on_objects_freed, @@ -54,7 +54,6 @@ class LocalObjectManager { io_worker_pool_(io_worker_pool), object_info_accessor_(object_info_accessor), owner_client_pool_(owner_client_pool), - object_pinning_enabled_(object_pinning_enabled), automatic_object_deletion_enabled_(automatic_object_deletion_enabled), on_objects_freed_(on_objects_freed), last_free_objects_at_ms_(current_time_ms()), @@ -203,9 +202,6 @@ class LocalObjectManager { /// this node. rpc::CoreWorkerClientPool &owner_client_pool_; - /// Whether to enable pinning for plasma objects. - bool object_pinning_enabled_; - /// Whether to enable automatic deletion when refs are gone out of scope. bool automatic_object_deletion_enabled_; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 1d47f23b3..729c400fe 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -205,8 +205,6 @@ int main(int argc, char *argv[]) { RayConfig::instance().metrics_report_interval_ms() / 2; node_manager_config.fair_queueing_enabled = RayConfig::instance().fair_queueing_enabled(); - node_manager_config.object_pinning_enabled = - RayConfig::instance().object_pinning_enabled(); node_manager_config.automatic_object_deletion_enabled = RayConfig::instance().automatic_object_deletion_enabled(); node_manager_config.store_socket_name = store_socket_name; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2287fd3e8..4eb3941dd 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -130,7 +130,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self std::chrono::milliseconds(config.report_resources_period_ms)), debug_dump_period_(config.debug_dump_period_ms), fair_queueing_enabled_(config.fair_queueing_enabled), - object_pinning_enabled_(config.object_pinning_enabled), temp_dir_(config.temp_dir), object_manager_profile_timer_(io_service), initial_config_(config), @@ -162,7 +161,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self self_node_id_, RayConfig::instance().free_objects_batch_size(), RayConfig::instance().free_objects_period_milliseconds(), worker_pool_, gcs_client_->Objects(), worker_rpc_pool_, - /* object_pinning_enabled */ config.object_pinning_enabled, /* automatic_object_deletion_enabled */ config.automatic_object_deletion_enabled, /*max_io_workers*/ config.max_io_workers, @@ -2069,52 +2067,42 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, rpc::Address owner_addr; bool has_owner = dependency_manager_.GetOwnerAddress(required_object_id, &owner_addr); if (has_owner) { - if (!RayConfig::instance().object_pinning_enabled()) { - // LRU eviction is enabled. The object may still be in scope, but we - // weren't able to fetch the value within the timeout, so the value has - // most likely been evicted. Mark the object as unreachable. - rpc::ObjectReference ref; - ref.set_object_id(required_object_id.Binary()); - ref.mutable_owner_address()->CopyFrom(owner_addr); - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); - } else { - RAY_LOG(DEBUG) << "Required object " << required_object_id - << " fetch timed out, asking owner " - << WorkerID::FromBinary(owner_addr.worker_id()); - // The owner's address exists. Poll the owner to check if the object is - // still in scope. If not, mark the object as failed. - // TODO(swang): If the owner has died, we could also mark the object as - // failed as soon as we hear about the owner's failure from the GCS, - // avoiding the raylet's reconstruction timeout. - auto client = std::unique_ptr( - new rpc::CoreWorkerClient(owner_addr, client_call_manager_)); + RAY_LOG(DEBUG) << "Required object " << required_object_id + << " fetch timed out, asking owner " + << WorkerID::FromBinary(owner_addr.worker_id()); + // The owner's address exists. Poll the owner to check if the object is + // still in scope. If not, mark the object as failed. + // TODO(swang): If the owner has died, we could also mark the object as + // failed as soon as we hear about the owner's failure from the GCS, + // avoiding the raylet's reconstruction timeout. + auto client = std::unique_ptr( + new rpc::CoreWorkerClient(owner_addr, client_call_manager_)); - rpc::GetObjectStatusRequest request; - request.set_object_id(required_object_id.Binary()); - request.set_owner_worker_id(owner_addr.worker_id()); - client->GetObjectStatus(request, [this, required_object_id, owner_addr]( - Status status, - const rpc::GetObjectStatusReply &reply) { - if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE || - reply.status() == rpc::GetObjectStatusReply::FREED) { - // The owner is gone, or the owner replied that the object has - // gone out of scope (this is an edge case in the distributed ref - // counting protocol where a borrower dies before it can notify - // the owner of another borrower), or the object value has been - // freed. Store an error in the local plasma store so that an - // exception will be thrown when the worker tries to get the - // value. - rpc::ObjectReference ref; - ref.set_object_id(required_object_id.Binary()); - ref.mutable_owner_address()->CopyFrom(owner_addr); - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); - } - // Do nothing if the owner replied that the object is available. The - // object manager will continue trying to fetch the object, and this - // handler will get triggered again if the object is still - // unavailable after another timeout. - }); - } + rpc::GetObjectStatusRequest request; + request.set_object_id(required_object_id.Binary()); + request.set_owner_worker_id(owner_addr.worker_id()); + client->GetObjectStatus( + request, [this, required_object_id, owner_addr]( + Status status, const rpc::GetObjectStatusReply &reply) { + if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE || + reply.status() == rpc::GetObjectStatusReply::FREED) { + // The owner is gone, or the owner replied that the object has + // gone out of scope (this is an edge case in the distributed ref + // counting protocol where a borrower dies before it can notify + // the owner of another borrower), or the object value has been + // freed. Store an error in the local plasma store so that an + // exception will be thrown when the worker tries to get the + // value. + rpc::ObjectReference ref; + ref.set_object_id(required_object_id.Binary()); + ref.mutable_owner_address()->CopyFrom(owner_addr); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); + } + // Do nothing if the owner replied that the object is available. The + // object manager will continue trying to fetch the object, and this + // handler will get triggered again if the object is still + // unavailable after another timeout. + }); } else { RAY_LOG(WARNING) << "Ray cannot get the value of ObjectIDs that are generated " @@ -2416,18 +2404,16 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, for (const auto &object_id_binary : request.object_ids()) { object_ids.push_back(ObjectID::FromBinary(object_id_binary)); } - if (object_pinning_enabled_) { - std::vector> results; - if (!GetObjectsFromPlasma(object_ids, &results)) { - RAY_LOG(WARNING) - << "Failed to get objects that should have been in the object store. These " - "objects may have been evicted while there are still references in scope."; - // TODO(suquark): Maybe "Status::ObjectNotFound" is more accurate here. - send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); - return; - } - local_object_manager_.PinObjects(object_ids, std::move(results), owner_address); + std::vector> results; + if (!GetObjectsFromPlasma(object_ids, &results)) { + RAY_LOG(WARNING) + << "Failed to get objects that should have been in the object store. These " + "objects may have been evicted while there are still references in scope."; + // TODO(suquark): Maybe "Status::ObjectNotFound" is more accurate here. + send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); + return; } + local_object_manager_.PinObjects(object_ids, std::move(results), owner_address); // Wait for the object to be freed by the owner, which keeps the ref count. local_object_manager_.WaitForObjectFree(owner_address, object_ids); send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 606dc3ac6..d08195509 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -93,8 +93,6 @@ struct NodeManagerConfig { uint64_t debug_dump_period_ms; /// Whether to enable fair queueing between task classes in raylet. bool fair_queueing_enabled; - /// Whether to enable pinning for plasma objects. - bool object_pinning_enabled; /// Whether to enable automatic object deletion for object spilling. bool automatic_object_deletion_enabled; /// The store socket name. @@ -801,8 +799,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, int64_t debug_dump_period_; /// Whether to enable fair queueing between task classes in raylet. bool fair_queueing_enabled_; - /// Whether to enable pinning for plasma objects. - bool object_pinning_enabled_; /// Incremented each time we encounter a potential resource deadlock condition. /// This is reset to zero when the condition is cleared. int resource_deadlock_warned_ = 0; diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index d056928c0..148ed6514 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -280,7 +280,6 @@ class LocalObjectManagerTest : public ::testing::Test { manager_node_id_(NodeID::FromRandom()), manager(manager_node_id_, free_objects_batch_size, /*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool, - /*object_pinning_enabled=*/true, /*automatic_object_delete_enabled=*/true, /*max_io_workers=*/2, /*min_spilling_size=*/0,