Retry and exception for hang on memory store full (#5143)

This commit is contained in:
Richard Liaw
2019-07-27 01:20:13 -07:00
committed by GitHub
parent 5e15b36d6e
commit 9c00616cdc
5 changed files with 98 additions and 22 deletions
+1 -1
View File
@@ -67,7 +67,7 @@ def ray_deps_setup():
new_git_repository(
name = "plasma",
build_file = "@//bazel:BUILD.plasma",
commit = "d0d9ecec33413f7ef6c7f91448a802666ad5f871",
commit = "f976629a54f5518f6285a311c45c5957281b1ee7",
remote = "https://github.com/apache/arrow",
)
+1 -1
View File
@@ -102,7 +102,7 @@ pushd "$BUILD_DIR"
# the commit listed in the command.
$PYTHON_EXECUTABLE -m pip install \
--target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/f86340a3b597502bacc801b17ab03c89d31aa561/index.html
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/50f14adecbb83228599a2dc57859e4ecbe054b92/index.html
export PYTHON_BIN_PATH="$PYTHON_EXECUTABLE"
if [ "$RAY_BUILD_JAVA" == "YES" ]; then
+6
View File
@@ -17,6 +17,12 @@ ID_SIZE = 20
# The default maximum number of bytes to allocate to the object store unless
# overridden by the user.
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9
# The default number of retries to call `put` when the object store is full.
DEFAULT_PUT_OBJECT_RETRIES = 5
# The default seconds for delay between calls to retry `put` when
# the object store is full. This delay is exponentially doubled up to
# DEFAULT_PUT_OBJECT_RETRIES times.
DEFAULT_PUT_OBJECT_DELAY = 1
# The smallest cap on the memory used by the object store that we allow.
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7
# The default maximum number of bytes that the non-primary Redis shards are
+46
View File
@@ -4,6 +4,7 @@ from __future__ import print_function
import json
import os
import pyarrow.plasma as plasma
import pytest
import sys
import tempfile
@@ -719,3 +720,48 @@ def test_connect_with_disconnected_node(shutdown_only):
# There is no connection error to a dead node.
info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR)
assert len(info) == 0
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 5,
"object_store_memory": 10**7
}],
indirect=True)
@pytest.mark.parametrize("num_actors", [1, 2, 5])
def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
@ray.remote
class LargeMemoryActor(object):
def some_expensive_task(self):
return np.zeros(10**7 // 2, dtype=np.uint8)
actors = [LargeMemoryActor.remote() for _ in range(num_actors)]
for _ in range(10):
pending = [a.some_expensive_task.remote() for a in actors]
while pending:
[done], pending = ray.wait(pending, num_returns=1)
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 2,
"object_store_memory": 10**7
}],
indirect=True)
def test_fill_plasma_exception(ray_start_cluster_head):
@ray.remote
class LargeMemoryActor(object):
def some_expensive_task(self):
return np.zeros(10**7 + 2, dtype=np.uint8)
def test(self):
return 1
actor = LargeMemoryActor.remote()
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(actor.some_expensive_task.remote())
# Make sure actor does not die
ray.get(actor.test.remote())
with pytest.raises(plasma.PlasmaStoreFull):
ray.put(np.zeros(10**7 + 2, dtype=np.uint8))
+44 -20
View File
@@ -362,10 +362,14 @@ class Worker(object):
logger.warning(warning_message)
def put_object(self, object_id, value):
"""Put value in the local object store with object id objectid.
"""Put value in the local object store with object id `objectid`.
This assumes that the value for objectid has not yet been placed in the
local object store.
This assumes that the value for `objectid` has not yet been placed in
the local object store. If the plasma store is full, the worker will
automatically retry up to DEFAULT_PUT_OBJECT_RETRIES times. Each
retry will delay for an exponentially doubling amount of time,
starting with DEFAULT_PUT_OBJECT_DELAY. After this, exception
will be raised.
Args:
object_id (object_id.ObjectID): The object ID of the value to be
@@ -373,10 +377,9 @@ class Worker(object):
value: The value to put in the object store.
Raises:
Exception: An exception is raised if the attempt to store the
object fails. This can happen if there is already an object
with the same ID in the object store or if the object store is
full.
plasma.PlasmaStoreFull: This is raised if the attempt to store the
object fails because the object store is full even after
multiple retries.
"""
# Make sure that the value is not an object ID.
if isinstance(value, ObjectID):
@@ -387,27 +390,48 @@ class Worker(object):
"do this, you can wrap the ray.ObjectID in a list and "
"call 'put' on it (or return it).")
# Serialize and put the object in the object store.
delay = ray_constants.DEFAULT_PUT_OBJECT_DELAY
for attempt in reversed(
range(ray_constants.DEFAULT_PUT_OBJECT_RETRIES)):
try:
self._try_store_and_register(object_id, value)
break
except pyarrow.plasma.PlasmaStoreFull as plasma_exc:
if attempt:
logger.debug(
"Waiting {} secs for plasma to drain.".format(delay))
time.sleep(delay)
delay *= 2
else:
raise plasma_exc
def _try_store_and_register(self, object_id, value):
"""Wraps `store_and_register` with cases for existence and pickling.
Args:
object_id (object_id.ObjectID): The object ID of the value to be
put.
value: The value to put in the object store.
"""
try:
self.store_and_register(object_id, value)
except pyarrow.plasma.PlasmaObjectExists:
# The object already exists in the object store, so there is no
# need to add it again. TODO(rkn): We need to compare the hashes
# need to add it again. TODO(rkn): We need to compare hashes
# and make sure that the objects are in fact the same. We also
# should return an error code to the caller instead of printing a
# should return an error code to caller instead of printing a
# message.
logger.info(
"The object with ID {} already exists in the object store.".
format(object_id))
logger.info("The object with ID {} already exists "
"in the object store.".format(object_id))
except TypeError:
# This error can happen because one of the members of the object
# may not be serializable for cloudpickle. So we need these extra
# fallbacks here to start from the beginning. Hopefully the object
# could have a `__reduce__` method.
# TypeError can happen because one of the members of the object
# may not be serializable for cloudpickle. So we need
# these extra fallbacks here to start from the beginning.
# Hopefully the object could have a `__reduce__` method.
register_custom_serializer(type(value), use_pickle=True)
warning_message = (
"WARNING: Serializing the class {} failed, "
"so are are falling back to cloudpickle.".format(type(value)))
warning_message = ("WARNING: Serializing the class {} failed, "
"falling back to cloudpickle.".format(
type(value)))
logger.warning(warning_message)
self.store_and_register(object_id, value)