diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 1d1dfcd97..2159b5a1a 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -105,8 +105,14 @@ class UnreconstructableError(RayError): self.object_id = object_id def __str__(self): - return ("Object {} is lost (either evicted or explicitly deleted) and " - + "cannot be reconstructed.").format(self.object_id.hex()) + return ( + "Object {} is lost (either LRU evicted or deleted by user) and " + "cannot be reconstructed. Try increasing the object store " + "memory available with ray.init(object_store_memory=) " + "or setting object store limits with " + "ray.remote(object_store_memory=). See also: {}".format( + self.object_id.hex(), + "https://ray.readthedocs.io/en/latest/memory-management.html")) RAY_EXCEPTION_TYPES = [ diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index fe98e0baa..e6d37eea8 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -113,6 +113,7 @@ WORKER_DIED_PUSH_ERROR = "worker_died" WORKER_POOL_LARGE_ERROR = "worker_pool_large" PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction" INFEASIBLE_TASK_ERROR = "infeasible_task" +RESOURCE_DEADLOCK_ERROR = "resource_deadlock" REMOVED_NODE_ERROR = "node_removed" MONITOR_DIED_ERROR = "monitor_died" LOG_MONITOR_DIED_ERROR = "log_monitor_died" diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 79bd2fff6..e586902b8 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -527,6 +527,27 @@ def test_export_large_objects(ray_start_regular): wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2) +def test_warning_for_resource_deadlock(shutdown_only): + # Check that we get warning messages for infeasible tasks. + ray.init(num_cpus=1) + + @ray.remote(num_cpus=1) + class Foo(object): + def f(self): + return 0 + + @ray.remote + def f(): + # Creating both actors is not possible. + actors = [Foo.remote() for _ in range(2)] + for a in actors: + ray.get(a.f.remote()) + + # Run in a task to check we handle the blocked task case correctly + f.remote() + wait_for_errors(ray_constants.RESOURCE_DEADLOCK_ERROR, 1, timeout=30) + + def test_warning_for_infeasible_tasks(ray_start_regular): # Check that we get warning messages for infeasible tasks. diff --git a/python/ray/worker.py b/python/ray/worker.py index 31a8930f7..7ba86c7d2 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -398,13 +398,20 @@ class Worker(object): break except pyarrow.plasma.PlasmaStoreFull as plasma_exc: if attempt: - logger.debug( - "Waiting {} secs for plasma to drain.".format(delay)) + logger.warning("Waiting {} seconds for space to free up " + "in the object store.".format(delay)) time.sleep(delay) delay *= 2 else: + self.dump_object_store_memory_usage() raise plasma_exc + def dump_object_store_memory_usage(self): + """Prints object store debug string to stdout.""" + msg = "\n" + self.plasma_client.debug_string() + msg = msg.replace("\n", "\nplasma: ") + logger.warning("Local object store memory usage:\n{}\n".format(msg)) + def _try_store_and_register(self, object_id, value): """Wraps `store_and_register` with cases for existence and pickling. @@ -1007,14 +1014,13 @@ class Worker(object): self.plasma_client.set_client_options(client_name, object_store_memory) except pyarrow._plasma.PlasmaStoreFull: + self.dump_object_store_memory_usage() raise memory_monitor.RayOutOfMemoryError( "Failed to set object_store_memory={} for {}. The " "plasma store may have insufficient memory remaining " "to satisfy this limit (30% of object store memory is " - "permanently reserved for shared usage). The current " - "object store memory status is:\n\n{}".format( - object_store_memory, client_name, - self.plasma_client.debug_string())) + "permanently reserved for shared usage).".format( + object_store_memory, client_name)) def _handle_process_task_failure(self, function_descriptor, return_object_ids, error, backtrace): @@ -1788,7 +1794,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): # Delay it a bit to see if we can suppress it task_error_queue.put((error_message, time.time())) else: - logger.error(error_message) + logger.warn(error_message) except (OSError, redis.exceptions.ConnectionError) as e: logger.error("listen_error_messages_raylet: {}".format(e)) finally: @@ -2329,6 +2335,8 @@ def get(object_ids): for i, value in enumerate(values): if isinstance(value, RayError): last_task_error_raise_time = time.time() + if isinstance(value, ray.exceptions.UnreconstructableError): + worker.dump_object_store_memory_usage() raise value # Run post processors. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bb8bce61e..d74be41c6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -336,6 +336,7 @@ void NodeManager::Heartbeat() { static_cast(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) { DumpDebugState(); RecordMetrics(); + WarnResourceDeadlock(); last_debug_dump_at_ms_ = now_ms; } @@ -347,6 +348,69 @@ void NodeManager::Heartbeat() { }); } +void NodeManager::WarnResourceDeadlock() { + // Check if any progress is being made on this raylet. + for (const auto &task : local_queues_.GetTasks(TaskState::RUNNING)) { + // Ignore blocked tasks. + if (local_queues_.GetBlockedTaskIds().count(task.GetTaskSpecification().TaskId())) { + continue; + } + // Progress is being made, don't warn. + resource_deadlock_warned_ = false; + return; + } + + // suppress duplicates warning messages + if (resource_deadlock_warned_) { + return; + } + + // The node is full of actors and no progress has been made for some time. + // If there are any pending tasks, build a warning. + std::ostringstream error_message; + ray::Task exemplar; + bool should_warn = false; + int pending_actor_creations = 0; + int pending_tasks = 0; + + // See if any tasks are blocked trying to acquire resources. + for (const auto &task : local_queues_.GetTasks(TaskState::READY)) { + const TaskSpecification &spec = task.GetTaskSpecification(); + if (spec.IsActorCreationTask()) { + pending_actor_creations += 1; + } else { + pending_tasks += 1; + } + if (!should_warn) { + exemplar = task; + should_warn = true; + } + } + + // Push an warning to the driver that a task is blocked trying to acquire resources. + if (should_warn) { + const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); + SchedulingResources &local_resources = cluster_resource_map_[my_client_id]; + error_message + << "The actor or task with ID " << exemplar.GetTaskSpecification().TaskId() + << " is pending and cannot currently be scheduled. It requires " + << exemplar.GetTaskSpecification().GetRequiredResources().ToString() + << " for execution and " + << exemplar.GetTaskSpecification().GetRequiredPlacementResources().ToString() + << " for placement, but this node only has remaining " + << local_resources.GetAvailableResources().ToString() << ". In total there are " + << pending_tasks << " pending tasks and " << pending_actor_creations + << " pending actors on this node. " + << "This is likely due to all cluster resources being claimed by actors. " + << "To resolve the issue, consider creating fewer actors or increase the " + << "resources available to this Ray cluster."; + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + exemplar.GetTaskSpecification().JobId(), "resource_deadlock", error_message.str(), + current_time_ms())); + resource_deadlock_warned_ = true; + } +} + void NodeManager::GetObjectManagerProfileInfo() { int64_t start_time_ms = current_time_ms(); @@ -1330,12 +1394,15 @@ void NodeManager::ScheduleTasks( std::string type = "infeasible_task"; std::ostringstream error_message; error_message - << "The task with ID " << task.GetTaskSpecification().TaskId() - << " is infeasible and cannot currently be executed. It requires " + << "The actor or task with ID " << task.GetTaskSpecification().TaskId() + << " is infeasible and cannot currently be scheduled. It requires " << task.GetTaskSpecification().GetRequiredResources().ToString() << " for execution and " << task.GetTaskSpecification().GetRequiredPlacementResources().ToString() - << " for placement. Check the client table to view node resources."; + << " for placement, however there are no nodes in the cluster that can " + << "provide the requested resources. To resolve this issue, consider " + << "reducing the resource requests of this task or add nodes that " + << "can fit the task."; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( task.GetTaskSpecification().JobId(), type, error_message.str(), current_time_ms())); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index c063d9182..931853134 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -492,6 +492,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::ForwardTaskReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Push an error to the driver if this node is full of actors and so we are + /// unable to schedule new tasks or actors at all. + void WarnResourceDeadlock(); + // GCS client ID for this node. ClientID client_id_; boost::asio::io_service &io_service_; @@ -510,6 +514,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { std::chrono::milliseconds heartbeat_period_; /// The period between debug state dumps. int64_t debug_dump_period_; + /// Whether we have printed out a resource deadlock warning. + bool resource_deadlock_warned_ = false; /// The path to the ray temp dir. std::string temp_dir_; /// The timer used to get profiling information from the object manager and