Warn on resource deadlock; improve object store error messages (#5555)

* wip

* wip

* wip

* wip

* wip

* add impl

* second

* warn once
This commit is contained in:
Eric Liang
2019-08-30 16:45:54 -07:00
committed by GitHub
parent bea43c85b1
commit 3e70daba74
6 changed files with 121 additions and 12 deletions
+8 -2
View File
@@ -105,8 +105,14 @@ class UnreconstructableError(RayError):
self.object_id = object_id
def __str__(self):
return ("Object {} is lost (either evicted or explicitly deleted) and "
+ "cannot be reconstructed.").format(self.object_id.hex())
return (
"Object {} is lost (either LRU evicted or deleted by user) and "
"cannot be reconstructed. Try increasing the object store "
"memory available with ray.init(object_store_memory=<bytes>) "
"or setting object store limits with "
"ray.remote(object_store_memory=<bytes>). See also: {}".format(
self.object_id.hex(),
"https://ray.readthedocs.io/en/latest/memory-management.html"))
RAY_EXCEPTION_TYPES = [
+1
View File
@@ -113,6 +113,7 @@ WORKER_DIED_PUSH_ERROR = "worker_died"
WORKER_POOL_LARGE_ERROR = "worker_pool_large"
PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
INFEASIBLE_TASK_ERROR = "infeasible_task"
RESOURCE_DEADLOCK_ERROR = "resource_deadlock"
REMOVED_NODE_ERROR = "node_removed"
MONITOR_DIED_ERROR = "monitor_died"
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
+21
View File
@@ -527,6 +527,27 @@ def test_export_large_objects(ray_start_regular):
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)
def test_warning_for_resource_deadlock(shutdown_only):
# Check that we get warning messages for infeasible tasks.
ray.init(num_cpus=1)
@ray.remote(num_cpus=1)
class Foo(object):
def f(self):
return 0
@ray.remote
def f():
# Creating both actors is not possible.
actors = [Foo.remote() for _ in range(2)]
for a in actors:
ray.get(a.f.remote())
# Run in a task to check we handle the blocked task case correctly
f.remote()
wait_for_errors(ray_constants.RESOURCE_DEADLOCK_ERROR, 1, timeout=30)
def test_warning_for_infeasible_tasks(ray_start_regular):
# Check that we get warning messages for infeasible tasks.
+15 -7
View File
@@ -398,13 +398,20 @@ class Worker(object):
break
except pyarrow.plasma.PlasmaStoreFull as plasma_exc:
if attempt:
logger.debug(
"Waiting {} secs for plasma to drain.".format(delay))
logger.warning("Waiting {} seconds for space to free up "
"in the object store.".format(delay))
time.sleep(delay)
delay *= 2
else:
self.dump_object_store_memory_usage()
raise plasma_exc
def dump_object_store_memory_usage(self):
"""Prints object store debug string to stdout."""
msg = "\n" + self.plasma_client.debug_string()
msg = msg.replace("\n", "\nplasma: ")
logger.warning("Local object store memory usage:\n{}\n".format(msg))
def _try_store_and_register(self, object_id, value):
"""Wraps `store_and_register` with cases for existence and pickling.
@@ -1007,14 +1014,13 @@ class Worker(object):
self.plasma_client.set_client_options(client_name,
object_store_memory)
except pyarrow._plasma.PlasmaStoreFull:
self.dump_object_store_memory_usage()
raise memory_monitor.RayOutOfMemoryError(
"Failed to set object_store_memory={} for {}. The "
"plasma store may have insufficient memory remaining "
"to satisfy this limit (30% of object store memory is "
"permanently reserved for shared usage). The current "
"object store memory status is:\n\n{}".format(
object_store_memory, client_name,
self.plasma_client.debug_string()))
"permanently reserved for shared usage).".format(
object_store_memory, client_name))
def _handle_process_task_failure(self, function_descriptor,
return_object_ids, error, backtrace):
@@ -1788,7 +1794,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
# Delay it a bit to see if we can suppress it
task_error_queue.put((error_message, time.time()))
else:
logger.error(error_message)
logger.warn(error_message)
except (OSError, redis.exceptions.ConnectionError) as e:
logger.error("listen_error_messages_raylet: {}".format(e))
finally:
@@ -2329,6 +2335,8 @@ def get(object_ids):
for i, value in enumerate(values):
if isinstance(value, RayError):
last_task_error_raise_time = time.time()
if isinstance(value, ray.exceptions.UnreconstructableError):
worker.dump_object_store_memory_usage()
raise value
# Run post processors.