diff --git a/python/ray/dashboard/client/src/api.ts b/python/ray/dashboard/client/src/api.ts index 8c7d63e57..0403f98d4 100644 --- a/python/ray/dashboard/client/src/api.ts +++ b/python/ray/dashboard/client/src/api.ts @@ -103,7 +103,7 @@ export interface RayletInfoResponse { actorTitle: string; averageTaskExecutionSpeed: number; children: RayletInfoResponse["actors"]; - currentTaskFuncDesc: string[]; + // currentTaskFuncDesc: string[]; ipAddress: string; isDirectCall: boolean; jobId: string; @@ -124,8 +124,10 @@ export interface RayletInfoResponse { } | { actorId: string; + actorTitle: string; requiredResources: { [key: string]: number }; state: -1; + invalidStateType?: 'infeasibleActor' | 'pendingActor'; }; }; } diff --git a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index 6af5cefc8..a56be86a0 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -13,6 +13,7 @@ import { } from "../../../api"; import Actors from "./Actors"; import Collapse from "@material-ui/core/Collapse"; +import orange from '@material-ui/core/colors/orange'; const styles = (theme: Theme) => createStyles({ @@ -34,9 +35,12 @@ const styles = (theme: Theme) => cursor: "pointer" } }, - infeasible: { + invalidStateTypeInfeasible: { color: theme.palette.error.main }, + invalidStateTypePendingActor: { + color: orange[500] + }, information: { fontSize: "0.875rem" }, @@ -161,11 +165,11 @@ class Actor extends React.Component, State> { { label: "UsedLocalObjectMemory", value: actor.usedObjectStoreMemory.toLocaleString() - }, - { - label: "Task", - value: actor.currentTaskFuncDesc.join(".") } + // { + // label: "Task", + // value: actor.currentTaskFuncDesc.join(".") + // } ] : [ { @@ -285,8 +289,16 @@ class Actor extends React.Component, State> { ) )} + ) : actor.invalidStateType === 'infeasibleActor' ? ( + + {actor.actorTitle} is infeasible. + (Infeasible actor means an actor cannot be created because + Ray cluster cannot satisfy resources requirement). + ) : ( - Infeasible actor + + {actor.actorTitle} is pending until resources are available. + )} diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index c85efa868..4fcf9d457 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -201,8 +201,13 @@ class Dashboard(object): } infeasible_tasks = sum( (data.get("infeasibleTasks", []) for data in D.values()), []) + # ready_tasks are used to render tasks that are not schedulable + # due to resource limitations. + # (e.g., Actor requires 2 GPUs but there is only 1 gpu available). + ready_tasks = sum( + (data.get("readyTasks", []) for data in D.values()), []) actor_tree = self.node_stats.get_actor_tree( - workers_info_by_node, infeasible_tasks) + workers_info_by_node, infeasible_tasks, ready_tasks) for address, data in D.items(): # process view data measures_dicts = {} @@ -418,7 +423,8 @@ class NodeStats(threading.Thread): "error_counts": self.calculate_error_counts(), } - def get_actor_tree(self, workers_info_by_node, infeasible_tasks) -> Dict: + def get_actor_tree(self, workers_info_by_node, infeasible_tasks, + ready_tasks) -> Dict: now = time.time() # construct flattened actor tree flattened_tree = {"root": {"children": {}}} @@ -449,17 +455,27 @@ class NodeStats(threading.Thread): actor_info["nodeId"] = node_id actor_info["pid"] = worker_info["pid"] - for infeasible_task in infeasible_tasks: + def _update_flatten_tree(task, task_spec_type, invalid_state_type): actor_id = ray.utils.binary_to_hex( - b64decode( - infeasible_task["actorCreationTaskSpec"]["actorId"])) - caller_addr = (infeasible_task["callerAddress"]["ipAddress"], - str(infeasible_task["callerAddress"]["port"])) + b64decode(task[task_spec_type]["actorId"])) + caller_addr = (task["callerAddress"]["ipAddress"], + str(task["callerAddress"]["port"])) caller_id = self._addr_to_actor_id.get(caller_addr, "root") child_to_parent[actor_id] = caller_id - infeasible_task["state"] = -1 - format_reply_id(infeasible_tasks) - flattened_tree[actor_id] = infeasible_task + task["state"] = -1 + task["invalidStateType"] = invalid_state_type + task["actorTitle"] = task["functionDescriptor"][ + "pythonFunctionDescriptor"]["className"] + format_reply_id(task) + flattened_tree[actor_id] = task + + for infeasible_task in infeasible_tasks: + _update_flatten_tree(infeasible_task, "actorCreationTaskSpec", + "infeasibleActor") + + for ready_task in ready_tasks: + _update_flatten_tree(ready_task, "actorCreationTaskSpec", + "pendingActor") # construct actor tree actor_tree = flattened_tree diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 7b914b56a..5f061310a 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -158,6 +158,40 @@ def wait_for_condition(condition_predictor, return False +def wait_until_succeeded_without_exception(func, + exceptions, + *args, + timeout_ms=1000, + retry_interval_ms=100): + """A helper function that waits until a given function + completes without exceptions. + + Args: + func: A function to run. + exceptions(tuple): Exceptions that are supposed to occur. + args: arguments to pass for a given func + timeout_ms: Maximum timeout in milliseconds. + retry_interval_ms: Retry interval in milliseconds. + + Return: + Whether exception occurs within a timeout. + """ + if type(exceptions) != tuple: + print("exceptions arguments should be given as a tuple") + return False + + time_elapsed = 0 + start = time.time() + while time_elapsed <= timeout_ms: + try: + func(*args) + return True + except exceptions: + time_elapsed = (time.time() - start) * 1000 + time.sleep(retry_interval_ms / 1000.0) + return False + + def recursive_fnmatch(dirpath, pattern): """Looks at a file directory subtree for a filename pattern. diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 648119032..66d00e1a7 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -11,7 +11,8 @@ from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc -from ray.test_utils import RayTestTimeoutException +from ray.test_utils import (RayTestTimeoutException, + wait_until_succeeded_without_exception) def test_worker_stats(shutdown_only): @@ -249,6 +250,86 @@ def test_raylet_info_endpoint(shutdown_only): time.sleep(1) +def test_raylet_infeasible_tasks(shutdown_only): + """ + This test creates an actor that requires 5 GPUs + but a ray cluster only has 3 GPUs. As a result, + the new actor should be an infeasible actor. + """ + addresses = ray.init(num_gpus=3) + + @ray.remote(num_gpus=5) + class ActorRequiringGPU: + def __init__(self): + pass + + ActorRequiringGPU.remote() + + def test_infeasible_actor(ray_addresses): + webui_url = ray_addresses["webui_url"].replace("localhost", + "http://127.0.0.1") + raylet_info = requests.get(webui_url + "/api/raylet_info").json() + actor_info = raylet_info["result"]["actors"] + assert len(actor_info) == 1 + + _, infeasible_actor_info = actor_info.popitem() + assert infeasible_actor_info["state"] == -1 + assert infeasible_actor_info["invalidStateType"] == "infeasibleActor" + + assert (wait_until_succeeded_without_exception( + test_infeasible_actor, + (AssertionError, requests.exceptions.ConnectionError), + addresses, + timeout_ms=30000, + retry_interval_ms=1000) is True) + + +def test_raylet_pending_tasks(shutdown_only): + # Make sure to specify num_cpus. Otherwise, the test can be broken + # when the number of cores is less than the number of spawned actors. + addresses = ray.init(num_gpus=3, num_cpus=4) + + @ray.remote(num_gpus=1) + class ActorRequiringGPU: + def __init__(self): + pass + + @ray.remote + class ParentActor: + def __init__(self): + self.a = [ActorRequiringGPU.remote() for i in range(4)] + + ParentActor.remote() + + def test_pending_actor(ray_addresses): + webui_url = ray_addresses["webui_url"].replace("localhost", + "http://127.0.0.1") + raylet_info = requests.get(webui_url + "/api/raylet_info").json() + actor_info = raylet_info["result"]["actors"] + assert len(actor_info) == 1 + _, infeasible_actor_info = actor_info.popitem() + + # Verify there are 4 spawned actors. + children = infeasible_actor_info["children"] + assert len(children) == 4 + + pending_actor_detected = 0 + for child_id, child in children.items(): + if ("invalidStateType" in child + and child["invalidStateType"] == "pendingActor"): + pending_actor_detected += 1 + # 4 GPUActors are spawned although there are only 3 GPUs. + # One actor should be in the pending state. + assert pending_actor_detected == 1 + + assert (wait_until_succeeded_without_exception( + test_pending_actor, + (AssertionError, requests.exceptions.ConnectionError), + addresses, + timeout_ms=30000, + retry_interval_ms=1000) is True) + + @pytest.mark.skipif( os.environ.get("TRAVIS") is None, reason="This test requires password-less sudo due to py-spy requirement.") diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 225a9f90b..540389f99 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -72,6 +72,7 @@ message GetNodeStatsReply { repeated ViewData view_data = 2; uint32 num_workers = 3; repeated TaskSpec infeasible_tasks = 4; + repeated TaskSpec ready_tasks = 5; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f3c642dcc..fed1a7827 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3132,9 +3132,24 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, worker_stats->set_pid(driver->Pid()); worker_stats->set_is_driver(true); } + // NOTE(sang): Currently reporting only infeasible/ready ActorCreationTask + // because Ray dashboard only renders actorCreationTask as of Feb 3 2020. + // TODO(sang): Support dashboard for non-ActorCreationTask. for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { - auto infeasible_task = reply->add_infeasible_tasks(); - infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize()); + if (task.GetTaskSpecification().IsActorCreationTask()) { + auto infeasible_task = reply->add_infeasible_tasks(); + infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize()); + } + } + // Report tasks that are not scheduled because + // resources are occupied by other actors/tasks. + // NOTE(sang): This solution is a workaround. It can be replaced by creating a new state + // like PENDING_UNTIL_RESOURCE_AVAILABLE. + for (const auto task : local_queues_.GetTasks(TaskState::READY)) { + if (task.GetTaskSpecification().IsActorCreationTask()) { + auto ready_task = reply->add_ready_tasks(); + ready_task->ParseFromString(task.GetTaskSpecification().Serialize()); + } } // Ensure we never report an empty set of metrics. if (!recorded_metrics_) {