diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index f8692b41c..ab359718d 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -166,8 +166,8 @@ def ray_deps_setup(): auto_http_archive( name = "plasma", build_file = True, - url = "https://github.com/apache/arrow/archive/66b05abc267661172286b47b9246ad55f1581555.tar.gz", - sha256 = "fb0227005116f64dca4b19b451ae793e9a2591c019136b70424ebe3d4f5334fe", + url = "https://github.com/apache/arrow/archive/af45b9212156980f55c399e2e88b4e19b4bb8ec1.tar.gz", + sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6", patches = [ "//thirdparty/patches:arrow-headers-unused.patch", "//thirdparty/patches:arrow-windows-export.patch", diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index f5a4aba41..e34482190 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -1,3 +1,4 @@ +import json import logging import time @@ -77,6 +78,9 @@ class Cluster: "num_gpus": 0, "object_store_memory": 150 * 1024 * 1024, # 150 MiB } + if "_internal_config" in node_args: + node_args["_internal_config"] = json.loads( + node_args["_internal_config"]) ray_params = ray.parameter.RayParams(**node_args) ray_params.update_if_absent(**default_kwargs) if self.head_node is None: diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 441c6e103..c636f2e8a 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -129,7 +129,17 @@ class ObjectStoreFullError(RayError): This is raised if the attempt to store the object fails because the object store is full even after multiple retries. """ - pass + + def __str__(self): + return super(ObjectStoreFullError, self).__str__() + ( + "\n" + "The local object store is full of objects that are still in scope" + " and cannot be evicted. Try increasing the object store memory " + "available with ray.init(object_store_memory=). " + "You can also try setting an option to fallback to LRU eviction " + "when the object store is full by calling " + "ray.init(lru_evict=True). See also: " + "https://ray.readthedocs.io/en/latest/memory-management.html.") class UnreconstructableError(RayError): diff --git a/python/ray/node.py b/python/ray/node.py index b42d16eaf..100bf7dbe 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -2,7 +2,6 @@ import atexit import collections import datetime import errno -import json import os import logging import signal @@ -91,8 +90,7 @@ class Node: self._resource_spec = None self._ray_params = ray_params self._redis_address = ray_params.redis_address - self._config = (json.loads(ray_params._internal_config) - if ray_params._internal_config else None) + self._config = ray_params._internal_config if head: redis_client = None diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index e169e1b21..344af40d8 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -939,6 +939,47 @@ def test_fill_object_store_exception(shutdown_only): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) +def test_fill_object_store_lru_fallback(shutdown_only): + ray.init(num_cpus=2, object_store_memory=10**8, lru_evict=True) + + @ray.remote + def expensive_task(): + return np.zeros((10**8) // 2, dtype=np.uint8) + + oids = [] + for _ in range(3): + oid = expensive_task.remote() + ray.get(oid) + oids.append(oid) + + @ray.remote + class LargeMemoryActor: + def some_expensive_task(self): + return np.zeros(10**8 // 2, dtype=np.uint8) + + def test(self): + return 1 + + actor = LargeMemoryActor.remote() + for _ in range(3): + oid = actor.some_expensive_task.remote() + ray.get(oid) + oids.append(oid) + # Make sure actor does not die + ray.get(actor.test.remote()) + + for _ in range(3): + oid = ray.put(np.zeros(10**8 // 2, dtype=np.uint8)) + ray.get(oid) + oids.append(oid) + + # NOTE: Needed to unset the config set by the lru_evict flag, for Travis. + ray._raylet.set_internal_config({ + "object_pinning_enabled": 1, + "object_store_full_max_retries": 5, + }) + + @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 08b52dcea..6d73e8a99 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -23,7 +23,8 @@ logger = logging.getLogger(__name__) def one_worker_100MiB(request): config = json.dumps({ "distributed_ref_counting_enabled": 1, - "object_store_full_max_retries": 1, + "object_store_full_max_retries": 3, + "object_store_full_initial_delay_ms": 100, }) yield ray.init( num_cpus=1, @@ -707,40 +708,44 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put): @ray.remote def return_an_id(): - return [ - put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - ] + return put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) @ray.remote def recursive(ref, signal, max_depth, depth=0): - ray.get(ref[0]) + inner_id = ray.get(ref[0]) if depth == max_depth: - return ray.get(signal.wait.remote()) + ray.get(signal.wait.remote()) + return inner_id else: return recursive.remote(ref, signal, max_depth, depth + 1) max_depth = 5 outer_oid = return_an_id.remote() - inner_oid_bytes = ray.get(outer_oid)[0].binary() signal = SignalActor.remote() head_oid = recursive.remote([outer_oid], signal, max_depth) # Remove the local reference. - del outer_oid - tail_oid = head_oid + outer_oid = head_oid for _ in range(max_depth): - tail_oid = ray.get(tail_oid) + outer_oid = ray.get(outer_oid) - # Check that the remote reference pins the object. - _fill_object_store_and_get(inner_oid_bytes) + # Fill the object store. + _fill_object_store_and_get(outer_oid, succeed=False) # Fulfill the dependency, causing the tail task to finish. ray.get(signal.send.remote()) - ray.get(tail_oid) + + # Check that the remote reference pins the object. + inner_oid = ray.get(outer_oid) + _fill_object_store_and_get(inner_oid) + inner_oid_bytes = inner_oid.binary() # Reference should be gone, check that returned ID gets evicted. + del head_oid + del outer_oid + del inner_oid _fill_object_store_and_get(inner_oid_bytes, succeed=False) diff --git a/python/ray/worker.py b/python/ray/worker.py index 5ba7bc13d..7231e381b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -550,7 +550,8 @@ def init(address=None, temp_dir=None, load_code_from_local=False, use_pickle=True, - _internal_config=None): + _internal_config=None, + lru_evict=False): """Connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -646,6 +647,12 @@ def init(address=None, use_pickle: Deprecated. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. + lru_evict (bool): If True, when an object store is full, it will evict + objects in LRU order to make more space and when under memory + pressure, ray.UnreconstructableError may be thrown. If False, then + reference counting will be used to decide which objects are safe to + evict and when under memory pressure, ray.ObjectStoreFullError may + be thrown. Returns: Address information about the started processes. @@ -695,6 +702,18 @@ def init(address=None, if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) + _internal_config = (json.loads(_internal_config) + if _internal_config else {}) + # Set the internal config options for LRU eviction. + if lru_evict: + # Turn off object pinning. + if _internal_config.get("object_pinning_enabled", False): + raise Exception( + "Object pinning cannot be enabled if using LRU eviction.") + _internal_config["object_pinning_enabled"] = False + _internal_config["object_store_full_max_retries"] = -1 + _internal_config["free_objects_period_milliseconds"] = 1000 + global _global_node if driver_mode == LOCAL_MODE: # If starting Ray in LOCAL_MODE, don't start any other processes. @@ -779,8 +798,9 @@ def init(address=None, raise ValueError("When connecting to an existing cluster, " "raylet_socket_name must not be provided.") if _internal_config is not None: - raise ValueError("When connecting to an existing cluster, " - "_internal_config must not be provided.") + logger.warning( + "When connecting to an existing cluster, " + "_internal_config must match the cluster's _internal_config.") # In this case, we only need to connect the node. ray_params = ray.parameter.RayParams( @@ -789,7 +809,8 @@ def init(address=None, redis_password=redis_password, object_id_seed=object_id_seed, temp_dir=temp_dir, - load_code_from_local=load_code_from_local) + load_code_from_local=load_code_from_local, + _internal_config=_internal_config) _global_node = ray.node.Node( ray_params, head=False, @@ -804,8 +825,7 @@ def init(address=None, worker=global_worker, driver_object_store_memory=driver_object_store_memory, job_id=job_id, - internal_config=json.loads(_internal_config) - if _internal_config else {}) + internal_config=_internal_config) for hook in _post_init_hooks: hook() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ee249bd49..66986183a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -187,6 +187,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( store_socket, local_raylet_client_, check_signals_, + /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), boost::bind(&CoreWorker::TriggerGlobalGC, this))); memory_store_.reset(new CoreWorkerMemoryStore( [this](const RayObject &obj, const ObjectID &obj_id) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index d8f914126..7b8481326 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -24,10 +24,12 @@ namespace ray { CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( const std::string &store_socket, const std::shared_ptr raylet_client, - std::function check_signals, std::function on_store_full) - : raylet_client_(raylet_client) { - check_signals_ = check_signals; - on_store_full_ = on_store_full; + std::function check_signals, bool evict_if_full, + std::function on_store_full) + : raylet_client_(raylet_client), + check_signals_(check_signals), + evict_if_full_(evict_if_full), + on_store_full_(on_store_full) { RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket)); } @@ -75,15 +77,20 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms(); Status status; bool should_retry = true; + // If we cannot retry, then always evict on the first attempt. + bool evict_if_full = max_retries == 0 ? true : evict_if_full_; while (should_retry) { should_retry = false; arrow::Status plasma_status; std::shared_ptr arrow_buffer; { std::lock_guard guard(store_client_mutex_); - plasma_status = store_client_.Create( - object_id.ToPlasmaId(), data_size, metadata ? metadata->Data() : nullptr, - metadata ? metadata->Size() : 0, &arrow_buffer); + plasma_status = store_client_.Create(object_id.ToPlasmaId(), data_size, + metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, &arrow_buffer, + /*device_num=*/0, evict_if_full); + // Always try to evict after the first attempt. + evict_if_full = true; } if (plasma::IsPlasmaStoreFull(plasma_status)) { std::ostringstream message; @@ -101,8 +108,8 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta retries += 1; should_retry = true; } else { - RAY_LOG(ERROR) << "Failed to put object " << object_id << " after " << max_retries - << " attempts. Plasma store status:\n" + RAY_LOG(ERROR) << "Failed to put object " << object_id << " after " + << (max_retries + 1) << " attempts. Plasma store status:\n" << MemoryUsageString(); } } else if (plasma::IsPlasmaObjectExists(plasma_status)) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index c5b8f7a34..74045065a 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -35,7 +35,7 @@ class CoreWorkerPlasmaStoreProvider { public: CoreWorkerPlasmaStoreProvider(const std::string &store_socket, const std::shared_ptr raylet_client, - std::function check_signals, + std::function check_signals, bool evict_if_full, std::function on_store_full = nullptr); ~CoreWorkerPlasmaStoreProvider(); @@ -134,6 +134,7 @@ class CoreWorkerPlasmaStoreProvider { plasma::PlasmaClient store_client_; std::mutex store_client_mutex_; std::function check_signals_; + const bool evict_if_full_; std::function on_store_full_; };