From 9c00616cdc4274bd38f4c516688a44f6a63dc3e9 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 27 Jul 2019 01:20:13 -0700 Subject: [PATCH] Retry and exception for hang on memory store full (#5143) --- bazel/ray_deps_setup.bzl | 2 +- build.sh | 2 +- python/ray/ray_constants.py | 6 +++ python/ray/tests/test_failure.py | 46 +++++++++++++++++++++++ python/ray/worker.py | 64 ++++++++++++++++++++++---------- 5 files changed, 98 insertions(+), 22 deletions(-) diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 25309a6fe..0eddcdbae 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -67,7 +67,7 @@ def ray_deps_setup(): new_git_repository( name = "plasma", build_file = "@//bazel:BUILD.plasma", - commit = "d0d9ecec33413f7ef6c7f91448a802666ad5f871", + commit = "f976629a54f5518f6285a311c45c5957281b1ee7", remote = "https://github.com/apache/arrow", ) diff --git a/build.sh b/build.sh index b7d97aeb5..64d42467f 100755 --- a/build.sh +++ b/build.sh @@ -102,7 +102,7 @@ pushd "$BUILD_DIR" # the commit listed in the command. $PYTHON_EXECUTABLE -m pip install \ --target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \ - --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html + --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/50f14adecbb83228599a2dc57859e4ecbe054b92/index.html export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE" if [ "$RAY_BUILD_JAVA" == "YES" ]; then diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 6bc744009..a432c934b 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -17,6 +17,12 @@ ID_SIZE = 20 # The default maximum number of bytes to allocate to the object store unless # overridden by the user. DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9 +# The default number of retries to call `put` when the object store is full. +DEFAULT_PUT_OBJECT_RETRIES = 5 +# The default seconds for delay between calls to retry `put` when +# the object store is full. This delay is exponentially doubled up to +# DEFAULT_PUT_OBJECT_RETRIES times. +DEFAULT_PUT_OBJECT_DELAY = 1 # The smallest cap on the memory used by the object store that we allow. OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7 # The default maximum number of bytes that the non-primary Redis shards are diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 8bf6f9317..d2a89bde5 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -4,6 +4,7 @@ from __future__ import print_function import json import os +import pyarrow.plasma as plasma import pytest import sys import tempfile @@ -719,3 +720,48 @@ def test_connect_with_disconnected_node(shutdown_only): # There is no connection error to a dead node. info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR) assert len(info) == 0 + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 5, + "object_store_memory": 10**7 + }], + indirect=True) +@pytest.mark.parametrize("num_actors", [1, 2, 5]) +def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): + @ray.remote + class LargeMemoryActor(object): + def some_expensive_task(self): + return np.zeros(10**7 // 2, dtype=np.uint8) + + actors = [LargeMemoryActor.remote() for _ in range(num_actors)] + for _ in range(10): + pending = [a.some_expensive_task.remote() for a in actors] + while pending: + [done], pending = ray.wait(pending, num_returns=1) + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 2, + "object_store_memory": 10**7 + }], + indirect=True) +def test_fill_plasma_exception(ray_start_cluster_head): + @ray.remote + class LargeMemoryActor(object): + def some_expensive_task(self): + return np.zeros(10**7 + 2, dtype=np.uint8) + + def test(self): + return 1 + + actor = LargeMemoryActor.remote() + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(actor.some_expensive_task.remote()) + # Make sure actor does not die + ray.get(actor.test.remote()) + + with pytest.raises(plasma.PlasmaStoreFull): + ray.put(np.zeros(10**7 + 2, dtype=np.uint8)) diff --git a/python/ray/worker.py b/python/ray/worker.py index 57dd263e7..91117aca1 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -362,10 +362,14 @@ class Worker(object): logger.warning(warning_message) def put_object(self, object_id, value): - """Put value in the local object store with object id objectid. + """Put value in the local object store with object id `objectid`. - This assumes that the value for objectid has not yet been placed in the - local object store. + This assumes that the value for `objectid` has not yet been placed in + the local object store. If the plasma store is full, the worker will + automatically retry up to DEFAULT_PUT_OBJECT_RETRIES times. Each + retry will delay for an exponentially doubling amount of time, + starting with DEFAULT_PUT_OBJECT_DELAY. After this, exception + will be raised. Args: object_id (object_id.ObjectID): The object ID of the value to be @@ -373,10 +377,9 @@ class Worker(object): value: The value to put in the object store. Raises: - Exception: An exception is raised if the attempt to store the - object fails. This can happen if there is already an object - with the same ID in the object store or if the object store is - full. + plasma.PlasmaStoreFull: This is raised if the attempt to store the + object fails because the object store is full even after + multiple retries. """ # Make sure that the value is not an object ID. if isinstance(value, ObjectID): @@ -387,27 +390,48 @@ class Worker(object): "do this, you can wrap the ray.ObjectID in a list and " "call 'put' on it (or return it).") - # Serialize and put the object in the object store. + delay = ray_constants.DEFAULT_PUT_OBJECT_DELAY + for attempt in reversed( + range(ray_constants.DEFAULT_PUT_OBJECT_RETRIES)): + try: + self._try_store_and_register(object_id, value) + break + except pyarrow.plasma.PlasmaStoreFull as plasma_exc: + if attempt: + logger.debug( + "Waiting {} secs for plasma to drain.".format(delay)) + time.sleep(delay) + delay *= 2 + else: + raise plasma_exc + + def _try_store_and_register(self, object_id, value): + """Wraps `store_and_register` with cases for existence and pickling. + + Args: + object_id (object_id.ObjectID): The object ID of the value to be + put. + value: The value to put in the object store. + """ try: self.store_and_register(object_id, value) except pyarrow.plasma.PlasmaObjectExists: # The object already exists in the object store, so there is no - # need to add it again. TODO(rkn): We need to compare the hashes + # need to add it again. TODO(rkn): We need to compare hashes # and make sure that the objects are in fact the same. We also - # should return an error code to the caller instead of printing a + # should return an error code to caller instead of printing a # message. - logger.info( - "The object with ID {} already exists in the object store.". - format(object_id)) + logger.info("The object with ID {} already exists " + "in the object store.".format(object_id)) except TypeError: - # This error can happen because one of the members of the object - # may not be serializable for cloudpickle. So we need these extra - # fallbacks here to start from the beginning. Hopefully the object - # could have a `__reduce__` method. + # TypeError can happen because one of the members of the object + # may not be serializable for cloudpickle. So we need + # these extra fallbacks here to start from the beginning. + # Hopefully the object could have a `__reduce__` method. register_custom_serializer(type(value), use_pickle=True) - warning_message = ( - "WARNING: Serializing the class {} failed, " - "so are are falling back to cloudpickle.".format(type(value))) + warning_message = ("WARNING: Serializing the class {} failed, " + "falling back to cloudpickle.".format( + type(value))) logger.warning(warning_message) self.store_and_register(object_id, value)