From 5780ec1b62c73742ec9168944fb94a49253fc2db Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Sun, 10 Nov 2019 23:12:59 -0800 Subject: [PATCH] Refresh ObjectIDs in raylet for stopgap GC (#6109) --- doc/source/memory-management.rst | 2 +- python/ray/tests/test_garbage_collection.py | 71 +++++++++++++++++++ python/ray/tests/test_memory_limits.py | 6 +- .../tests/test_unreconstructable_errors.py | 6 +- src/ray/raylet/node_manager.cc | 11 +++ 5 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 python/ray/tests/test_garbage_collection.py diff --git a/doc/source/memory-management.rst b/doc/source/memory-management.rst index 82f54e279..de3503c1d 100644 --- a/doc/source/memory-management.rst +++ b/doc/source/memory-management.rst @@ -16,7 +16,7 @@ Ray system memory: this is memory used internally by Ray Application memory: this is memory used by your application - **Worker heap**: memory used by your application (e.g., in Python code or TensorFlow), best measured as the *resident set size (RSS)* of your application minus its *shared memory usage (SHR)* in commands such as ``top``. - - **Object store memory**: memory used when your application creates objects in the objects store via ``ray.put`` and when returning values from remote functions. Objects are LRU evicted when the store is full. There is an object store server running on each node. + - **Object store memory**: memory used when your application creates objects in the objects store via ``ray.put`` and when returning values from remote functions. Objects are LRU evicted when the store is full, prioritizing objects that are no longer in scope on the driver or any worker. There is an object store server running on each node. - **Object store shared memory**: memory used when your application reads objects via ``ray.get``. Note that if an object is already present on the node, this does not cause additional allocations. This allows large objects to be efficiently shared among many actors and tasks. By default, Ray will cap the memory used by Redis at ``min(30% of node memory, 10GiB)``, and object store at ``min(10% of node memory, 20GiB)``, leaving half of the remaining memory on the node available for use by worker heap. You can also manually configure this by setting ``redis_max_memory=`` and ``object_store_memory=`` on Ray init. diff --git a/python/ray/tests/test_garbage_collection.py b/python/ray/tests/test_garbage_collection.py new file mode 100644 index 000000000..6bc485d45 --- /dev/null +++ b/python/ray/tests/test_garbage_collection.py @@ -0,0 +1,71 @@ +# coding: utf-8 +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import time +import logging + +import ray +import ray.tests.cluster_utils +import ray.tests.utils + +logger = logging.getLogger(__name__) + + +def test_basic_gc(shutdown_only): + ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True) + + @ray.remote + def shuffle(input): + return np.random.shuffle(input) + + @ray.remote + class Actor(object): + def __init__(self): + # Hold a long-lived reference to a ray.put object. This should not + # be garbage collected while the actor is alive. + self.large_object = ray.put( + np.zeros(25 * 1024 * 1024, dtype=np.uint8), weakref=True) + + def get_large_object(self): + return ray.get(self.large_object) + + actor = Actor.remote() + + # Fill up the object store with short-lived objects. These should be + # evicted before the long-lived object whose reference is held by + # the actor. + for batch in range(10): + intermediate_result = shuffle.remote( + np.zeros(10 * 1024 * 1024, dtype=np.uint8)) + ray.get(intermediate_result) + + # The ray.get below would fail with only LRU eviction, as the object + # that was ray.put by the actor would have been evicted. + ray.get(actor.get_large_object.remote()) + + +def test_pending_task_dependency(shutdown_only): + ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True) + + @ray.remote + def pending(input1, input2): + return + + @ray.remote + def slow(): + time.sleep(5) + + # The object that is ray.put here will go out of scope immediately, so if + # pending task dependencies aren't considered, it will be evicted before + # the ray.get below due to the subsequent ray.puts that fill up the object + # store. + np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8) + oid = pending.remote(ray.put(np_array), slow.remote()) + + for _ in range(2): + ray.put(np_array) + + ray.get(oid) diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index f4a32a37f..47ccfad09 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -66,10 +66,14 @@ class TestMemoryLimits(unittest.TestCase): z = ray.put("hi", weakref=True) a = LightActor._remote(object_store_memory=a_quota) b = GreedyActor._remote(object_store_memory=b_quota) + oids = [z] for _ in range(5): r_a = a.sample.remote() for _ in range(20): - ray.get(b.sample.remote()) + new_oid = b.sample.remote() + oids.append(new_oid) + ray.get(new_oid) + oids.append(r_a) ray.get(r_a) ray.get(z) except Exception as e: diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index 88d2b429d..7438c11c1 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -33,8 +33,12 @@ class TestUnreconstructableErrors(unittest.TestCase): x_id = f.remote(None) ray.get(x_id) + # Hold references to the ray.put objects so they aren't LRU'd. + oids = [] for _ in range(400): - ray.get([f.remote(np.zeros(10000)) for _ in range(50)]) + new_oids = [f.remote(np.zeros(10000)) for _ in range(50)] + oids.extend(new_oids) + ray.get(new_oids) self.assertRaises(ray.exceptions.UnreconstructableError, lambda: ray.get(x_id)) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 88791a793..485fb28be 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -691,7 +691,18 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_b } HeartbeatAdded(client_id, heartbeat_data); } + RAY_LOG(DEBUG) << "Total active object IDs received: " << active_object_ids.size(); + + // Refresh the active object IDs in plasma to prevent them from being evicted. + std::vector plasma_ids; + plasma_ids.reserve(active_object_ids.size()); + for (const ObjectID &object_id : active_object_ids) { + plasma_ids.push_back(object_id.ToPlasmaId()); + } + if (!store_client_.Refresh(plasma_ids).ok()) { + RAY_LOG(WARNING) << "Failed to refresh active object IDs in plasma."; + } } void NodeManager::HandleActorStateTransition(const ActorID &actor_id,