diff --git a/python/ray/worker.py b/python/ray/worker.py
index 575965fb9..3b3b02d70 100644
--- a/python/ray/worker.py
+++ b/python/ray/worker.py
@@ -1203,12 +1203,19 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
import __main__ as main
driver_info = {"node_ip_address": worker.node_ip_address,
"driver_id": worker.worker_id,
- "start_time": time.time()}
+ "start_time": time.time(),
+ "plasma_store_socket": info["store_socket_name"],
+ "plasma_manager_socket": info["manager_socket_name"],
+ "local_scheduler_socket": info["local_scheduler_socket_name"]}
driver_info["name"] = main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE"
worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info)
elif mode == WORKER_MODE:
# Register the worker with Redis.
- worker.redis_client.hmset(b"Workers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address})
+ worker.redis_client.hmset(b"Workers:" + worker.worker_id,
+ {"node_ip_address": worker.node_ip_address,
+ "plasma_store_socket": info["store_socket_name"],
+ "plasma_manager_socket": info["manager_socket_name"],
+ "local_scheduler_socket": info["local_scheduler_socket_name"]})
else:
raise Exception("This code should be unreachable.")
# If this is a driver, set the current task ID, the task driver ID, and set
diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py
index 852bb7f19..dd1241723 100644
--- a/webui/backend/ray_ui.py
+++ b/webui/backend/ray_ui.py
@@ -2,7 +2,7 @@ import aioredis
import argparse
import asyncio
import binascii
-from collections import defaultdict
+import collections
import datetime
import json
import numpy as np
@@ -43,7 +43,10 @@ def key_to_hex_identifiers(key):
task_id = hex_identifier(key[offset:(offset + IDENTIFIER_LENGTH)])
return worker_id, task_id
-worker_ids = []
+async def hgetall_as_dict(redis_conn, key):
+ fields = await redis_conn.execute("hgetall", key)
+ return {fields[2 * i]: fields[2 * i + 1] for i in range(len(fields) // 2)}
+
# Cache information about the local schedulers.
local_schedulers = {}
@@ -66,8 +69,10 @@ def duration_to_string(duration):
duration_str = "{0:0.1f} minutes".format(duration / 60)
elif duration > 1:
duration_str = "{0:0.1f} seconds".format(duration)
+ elif duration > 0.001:
+ duration_str = "{0:0.1f} milliseconds".format(duration * 1000)
else:
- duration_str = "{} milliseconds".format(int(duration * 1000))
+ duration_str = "{} microseconds".format(int(duration * 1000000))
return duration_str
async def handle_get_statistics(websocket, redis_conn):
@@ -79,8 +84,7 @@ async def handle_get_statistics(websocket, redis_conn):
client_keys = await redis_conn.execute("keys", "CL:*")
clients = []
for client_key in client_keys:
- client_fields = await redis_conn.execute("hgetall", client_key)
- client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)}
+ client_fields = await hgetall_as_dict(redis_conn, client_key)
clients.append(client_fields)
ip_addresses = list(set([client[b"node_ip_address"].decode("ascii") for client in clients if client[b"client_type"] == b"photon"]))
num_nodes = len(ip_addresses)
@@ -94,8 +98,7 @@ async def handle_get_drivers(websocket, redis_conn):
keys = await redis_conn.execute("keys", "Drivers:*")
drivers = []
for key in keys:
- driver_fields = await redis_conn.execute("hgetall", key)
- driver_fields = {driver_fields[2 * i]: driver_fields[2 * i + 1] for i in range(len(driver_fields) // 2)}
+ driver_fields = await hgetall_as_dict(redis_conn, key)
driver_info = {"node ip address": driver_fields[b"node_ip_address"].decode("ascii"),
"name": driver_fields[b"name"].decode("ascii")}
@@ -122,7 +125,21 @@ async def handle_get_drivers(websocket, redis_conn):
reply = sorted(drivers, key=(lambda driver: driver["start time"]))[::-1]
await websocket.send(json.dumps(reply))
+node_info = collections.OrderedDict()
+worker_info = collections.OrderedDict()
+
async def handle_get_recent_tasks(websocket, redis_conn, num_tasks):
+ # First update the cache of worker information.
+ worker_keys = await redis_conn.execute("keys", "Workers:*")
+ for key in worker_keys:
+ worker_id = hex_identifier(key[len("Workers:"):])
+ if worker_id not in worker_info:
+ worker_info[worker_id] = await hgetall_as_dict(redis_conn, key)
+ node_ip_address = worker_info[worker_id][b"node_ip_address"].decode("ascii")
+ if node_ip_address not in node_info:
+ node_info[node_ip_address] = {"workers": []}
+ node_info[node_ip_address]["workers"].append(worker_id)
+
keys = await redis_conn.execute("keys", "event_log:*")
if len(keys) == 0:
# There are no tasks, so send a message to the client saying so.
@@ -142,13 +159,21 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks):
min_time = time_cutoff - (max_time - time_cutoff) * 0.1
max_time = max_time + (max_time - time_cutoff) * 0.1
- task_data = []
+ worker_ids = list(worker_info.keys())
+ node_ip_addresses = list(node_info.keys())
+
+ num_tasks = 0
+ task_data = [{"task_data": [],
+ "num_workers": len(node_info[node_ip_address]["workers"])} for node_ip_address in node_ip_addresses]
for i in range(len(keys)):
worker_id, task_id = key_to_hex_identifiers(keys[i])
data = contents[i]
if worker_id not in worker_ids:
- worker_ids.append(worker_id)
- worker_index = worker_ids.index(worker_id)
+ # This case should be extremely rare.
+ raise Exception("A worker ID was not present in the list of worker IDs.")
+ node_ip_address = worker_info[worker_id][b"node_ip_address"].decode("ascii")
+ worker_index = node_info[node_ip_address]["workers"].index(worker_id)
+ node_index = node_ip_addresses.index(node_ip_address)
task_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task"]
if task_times[1] <= time_cutoff:
@@ -157,14 +182,21 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks):
task_get_arguments_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:get_arguments"]
task_execute_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:execute"]
task_store_outputs_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:store_outputs"]
- task_data.append({"task": task_times,
- "get_arguments": task_get_arguments_times,
- "execute": task_execute_times,
- "store_outputs": task_store_outputs_times,
- "worker_index": worker_index})
+ task_data[node_index]["task_data"].append(
+ {"task": task_times,
+ "get_arguments": task_get_arguments_times,
+ "execute": task_execute_times,
+ "store_outputs": task_store_outputs_times,
+ "worker_index": worker_index,
+ "node_ip_address": node_ip_address,
+ "task_formatted_time": duration_to_string(task_times[1] - task_times[0]),
+ "get_arguments_formatted_time": duration_to_string(task_get_arguments_times[1] - task_get_arguments_times[0]),
+ "execute_formatted_time": duration_to_string(task_execute_times[1] - task_execute_times[0]),
+ "store_outputs_formatted_time": duration_to_string(task_store_outputs_times[1] - task_store_outputs_times[0])})
+ num_tasks += 1
reply = {"min_time": min_time,
"max_time": max_time,
- "num_tasks": len(task_data),
+ "num_tasks": num_tasks,
"task_data": task_data}
await websocket.send(json.dumps(reply))
@@ -189,8 +221,7 @@ async def send_heartbeats(websocket, redis_conn):
client_keys = await redis_conn.execute("keys", "CL:*")
clients = []
for client_key in client_keys:
- client_fields = await redis_conn.execute("hgetall", client_key)
- client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)}
+ client_fields = await hgetall_as_dict(redis_conn, client_key)
if client_fields[b"client_type"] == b"photon":
local_scheduler_id = hex_identifier(client_fields[b"ray_client_id"])
local_schedulers[local_scheduler_id] = {"node_ip_address": client_fields[b"node_ip_address"].decode("ascii"),
@@ -275,7 +306,7 @@ async def serve_requests(websocket, path):
"node_id": hex_identifier(content[3])})
await websocket.send(json.dumps(result))
elif command["command"] == "get-timeline":
- tasks = defaultdict(list)
+ tasks = collections.defaultdict(list)
for key in await redis_conn.execute("keys", "event_log:*"):
worker_id, task_id = key_to_hex_identifiers(key)
content = await redis_conn.execute("lrange", key, "0", "-1")
diff --git a/webui/src/ray-recent-tasks.html b/webui/src/ray-recent-tasks.html
index 10a86ff87..7ae100f89 100644
--- a/webui/src/ray-recent-tasks.html
+++ b/webui/src/ray-recent-tasks.html
@@ -23,18 +23,15 @@
-
+
+