[core] Option to fallback to LRU on OutOfMemory (#7410)

* Add a test for LRU fallback

* Update error message

* Upgrade arrow to master

* Integrate with arrow

* Revert "Bazel mirrors (#7385)"

This reverts commit 44aded5272.

* Don't LRU evict

* Revert "Revert "Bazel mirrors (#7385)""

This reverts commit b6359fea78d1bd3925452ca88ac71e0c9e5c7dd3.

* Add lru_evict flag

* fix internal config

* Fix

* upgrade arrow

* debug

* Set free period in config for lru_evict, override max retries to fix
test

* Fix test?

* fix test

* Revert "debug"

This reverts commit 98f01c63a267f38218f5047b1866e4c1c8280017.

* fix exception str

* Fix ref count test

* Shorten travis test?
This commit is contained in:
Stephanie Wang
2020-03-14 11:28:43 -07:00
committed by GitHub
parent 52cf77f5a9
commit 53549314c5
10 changed files with 123 additions and 36 deletions
+2 -2
View File
@@ -166,8 +166,8 @@ def ray_deps_setup():
auto_http_archive(
name = "plasma",
build_file = True,
url = "https://github.com/apache/arrow/archive/66b05abc267661172286b47b9246ad55f1581555.tar.gz",
sha256 = "fb0227005116f64dca4b19b451ae793e9a2591c019136b70424ebe3d4f5334fe",
url = "https://github.com/apache/arrow/archive/af45b9212156980f55c399e2e88b4e19b4bb8ec1.tar.gz",
sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6",
patches = [
"//thirdparty/patches:arrow-headers-unused.patch",
"//thirdparty/patches:arrow-windows-export.patch",
+4
View File
@@ -1,3 +1,4 @@
import json
import logging
import time
@@ -77,6 +78,9 @@ class Cluster:
"num_gpus": 0,
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
}
if "_internal_config" in node_args:
node_args["_internal_config"] = json.loads(
node_args["_internal_config"])
ray_params = ray.parameter.RayParams(**node_args)
ray_params.update_if_absent(**default_kwargs)
if self.head_node is None:
+11 -1
View File
@@ -129,7 +129,17 @@ class ObjectStoreFullError(RayError):
This is raised if the attempt to store the object fails
because the object store is full even after multiple retries.
"""
pass
def __str__(self):
return super(ObjectStoreFullError, self).__str__() + (
"\n"
"The local object store is full of objects that are still in scope"
" and cannot be evicted. Try increasing the object store memory "
"available with ray.init(object_store_memory=<bytes>). "
"You can also try setting an option to fallback to LRU eviction "
"when the object store is full by calling "
"ray.init(lru_evict=True). See also: "
"https://ray.readthedocs.io/en/latest/memory-management.html.")
class UnreconstructableError(RayError):
+1 -3
View File
@@ -2,7 +2,6 @@ import atexit
import collections
import datetime
import errno
import json
import os
import logging
import signal
@@ -91,8 +90,7 @@ class Node:
self._resource_spec = None
self._ray_params = ray_params
self._redis_address = ray_params.redis_address
self._config = (json.loads(ray_params._internal_config)
if ray_params._internal_config else None)
self._config = ray_params._internal_config
if head:
redis_client = None
+41
View File
@@ -939,6 +939,47 @@ def test_fill_object_store_exception(shutdown_only):
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
def test_fill_object_store_lru_fallback(shutdown_only):
ray.init(num_cpus=2, object_store_memory=10**8, lru_evict=True)
@ray.remote
def expensive_task():
return np.zeros((10**8) // 2, dtype=np.uint8)
oids = []
for _ in range(3):
oid = expensive_task.remote()
ray.get(oid)
oids.append(oid)
@ray.remote
class LargeMemoryActor:
def some_expensive_task(self):
return np.zeros(10**8 // 2, dtype=np.uint8)
def test(self):
return 1
actor = LargeMemoryActor.remote()
for _ in range(3):
oid = actor.some_expensive_task.remote()
ray.get(oid)
oids.append(oid)
# Make sure actor does not die
ray.get(actor.test.remote())
for _ in range(3):
oid = ray.put(np.zeros(10**8 // 2, dtype=np.uint8))
ray.get(oid)
oids.append(oid)
# NOTE: Needed to unset the config set by the lru_evict flag, for Travis.
ray._raylet.set_internal_config({
"object_pinning_enabled": 1,
"object_store_full_max_retries": 5,
})
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 1,
+19 -14
View File
@@ -23,7 +23,8 @@ logger = logging.getLogger(__name__)
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"object_store_full_max_retries": 1,
"object_store_full_max_retries": 3,
"object_store_full_initial_delay_ms": 100,
})
yield ray.init(
num_cpus=1,
@@ -707,40 +708,44 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put):
@ray.remote
def return_an_id():
return [
put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
]
return put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
@ray.remote
def recursive(ref, signal, max_depth, depth=0):
ray.get(ref[0])
inner_id = ray.get(ref[0])
if depth == max_depth:
return ray.get(signal.wait.remote())
ray.get(signal.wait.remote())
return inner_id
else:
return recursive.remote(ref, signal, max_depth, depth + 1)
max_depth = 5
outer_oid = return_an_id.remote()
inner_oid_bytes = ray.get(outer_oid)[0].binary()
signal = SignalActor.remote()
head_oid = recursive.remote([outer_oid], signal, max_depth)
# Remove the local reference.
del outer_oid
tail_oid = head_oid
outer_oid = head_oid
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
outer_oid = ray.get(outer_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(inner_oid_bytes)
# Fill the object store.
_fill_object_store_and_get(outer_oid, succeed=False)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
ray.get(tail_oid)
# Check that the remote reference pins the object.
inner_oid = ray.get(outer_oid)
_fill_object_store_and_get(inner_oid)
inner_oid_bytes = inner_oid.binary()
# Reference should be gone, check that returned ID gets evicted.
del head_oid
del outer_oid
del inner_oid
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
+26 -6
View File
@@ -550,7 +550,8 @@ def init(address=None,
temp_dir=None,
load_code_from_local=False,
use_pickle=True,
_internal_config=None):
_internal_config=None,
lru_evict=False):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -646,6 +647,12 @@ def init(address=None,
use_pickle: Deprecated.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
lru_evict (bool): If True, when an object store is full, it will evict
objects in LRU order to make more space and when under memory
pressure, ray.UnreconstructableError may be thrown. If False, then
reference counting will be used to decide which objects are safe to
evict and when under memory pressure, ray.ObjectStoreFullError may
be thrown.
Returns:
Address information about the started processes.
@@ -695,6 +702,18 @@ def init(address=None,
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
_internal_config = (json.loads(_internal_config)
if _internal_config else {})
# Set the internal config options for LRU eviction.
if lru_evict:
# Turn off object pinning.
if _internal_config.get("object_pinning_enabled", False):
raise Exception(
"Object pinning cannot be enabled if using LRU eviction.")
_internal_config["object_pinning_enabled"] = False
_internal_config["object_store_full_max_retries"] = -1
_internal_config["free_objects_period_milliseconds"] = 1000
global _global_node
if driver_mode == LOCAL_MODE:
# If starting Ray in LOCAL_MODE, don't start any other processes.
@@ -779,8 +798,9 @@ def init(address=None,
raise ValueError("When connecting to an existing cluster, "
"raylet_socket_name must not be provided.")
if _internal_config is not None:
raise ValueError("When connecting to an existing cluster, "
"_internal_config must not be provided.")
logger.warning(
"When connecting to an existing cluster, "
"_internal_config must match the cluster's _internal_config.")
# In this case, we only need to connect the node.
ray_params = ray.parameter.RayParams(
@@ -789,7 +809,8 @@ def init(address=None,
redis_password=redis_password,
object_id_seed=object_id_seed,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local)
load_code_from_local=load_code_from_local,
_internal_config=_internal_config)
_global_node = ray.node.Node(
ray_params,
head=False,
@@ -804,8 +825,7 @@ def init(address=None,
worker=global_worker,
driver_object_store_memory=driver_object_store_memory,
job_id=job_id,
internal_config=json.loads(_internal_config)
if _internal_config else {})
internal_config=_internal_config)
for hook in _post_init_hooks:
hook()
+1
View File
@@ -187,6 +187,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
store_socket, local_raylet_client_, check_signals_,
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
boost::bind(&CoreWorker::TriggerGlobalGC, this)));
memory_store_.reset(new CoreWorkerMemoryStore(
[this](const RayObject &obj, const ObjectID &obj_id) {
@@ -24,10 +24,12 @@ namespace ray {
CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
const std::string &store_socket,
const std::shared_ptr<raylet::RayletClient> raylet_client,
std::function<Status()> check_signals, std::function<void()> on_store_full)
: raylet_client_(raylet_client) {
check_signals_ = check_signals;
on_store_full_ = on_store_full;
std::function<Status()> check_signals, bool evict_if_full,
std::function<void()> on_store_full)
: raylet_client_(raylet_client),
check_signals_(check_signals),
evict_if_full_(evict_if_full),
on_store_full_(on_store_full) {
RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket));
}
@@ -75,15 +77,20 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
uint32_t delay = RayConfig::instance().object_store_full_initial_delay_ms();
Status status;
bool should_retry = true;
// If we cannot retry, then always evict on the first attempt.
bool evict_if_full = max_retries == 0 ? true : evict_if_full_;
while (should_retry) {
should_retry = false;
arrow::Status plasma_status;
std::shared_ptr<arrow::Buffer> arrow_buffer;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
plasma_status = store_client_.Create(
object_id.ToPlasmaId(), data_size, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &arrow_buffer);
plasma_status = store_client_.Create(object_id.ToPlasmaId(), data_size,
metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &arrow_buffer,
/*device_num=*/0, evict_if_full);
// Always try to evict after the first attempt.
evict_if_full = true;
}
if (plasma::IsPlasmaStoreFull(plasma_status)) {
std::ostringstream message;
@@ -101,8 +108,8 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
retries += 1;
should_retry = true;
} else {
RAY_LOG(ERROR) << "Failed to put object " << object_id << " after " << max_retries
<< " attempts. Plasma store status:\n"
RAY_LOG(ERROR) << "Failed to put object " << object_id << " after "
<< (max_retries + 1) << " attempts. Plasma store status:\n"
<< MemoryUsageString();
}
} else if (plasma::IsPlasmaObjectExists(plasma_status)) {
@@ -35,7 +35,7 @@ class CoreWorkerPlasmaStoreProvider {
public:
CoreWorkerPlasmaStoreProvider(const std::string &store_socket,
const std::shared_ptr<raylet::RayletClient> raylet_client,
std::function<Status()> check_signals,
std::function<Status()> check_signals, bool evict_if_full,
std::function<void()> on_store_full = nullptr);
~CoreWorkerPlasmaStoreProvider();
@@ -134,6 +134,7 @@ class CoreWorkerPlasmaStoreProvider {
plasma::PlasmaClient store_client_;
std::mutex store_client_mutex_;
std::function<Status()> check_signals_;
const bool evict_if_full_;
std::function<void()> on_store_full_;
};