From 3333e1d6b9ba4ecdb103934dc74d92551f5056ff Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 15 Mar 2017 20:32:23 -0700 Subject: [PATCH] Fix bug in parsing of tasks in monitor. (#372) --- python/ray/monitor.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 0355e40b6..a563cd1ea 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -14,6 +14,7 @@ from ray.services import get_port # Import flatbuffer bindings. from ray.core.generated.SubscribeToDBClientTableReply import SubscribeToDBClientTableReply +from ray.core.generated.TaskReply import TaskReply # These variables must be kept in sync with the C codebase. # common/common.h @@ -109,16 +110,24 @@ class Monitor(object): self.local_schedulers. """ task_ids = self.redis.scan_iter(match="{prefix}*".format(prefix=TASK_PREFIX)) + num_tasks_updated = 0 for task_id in task_ids: task_id = task_id[len(TASK_PREFIX):] response = self.redis.execute_command("RAY.TASK_TABLE_GET", task_id) - if response[1] not in self.local_schedulers: + # Parse the serialized task object. + task_object = TaskReply.GetRootAsTaskReply(response, 0) + local_scheduler_id = task_object.LocalSchedulerId() + # See if the corresponding local scheduler is alive. + if local_scheduler_id not in self.local_schedulers: + num_tasks_updated += 1 ok = self.redis.execute_command("RAY.TASK_TABLE_UPDATE", task_id, TASK_STATUS_LOST, NIL_ID) if ok != b"OK": log.warn("Failed to update lost task for dead scheduler.") + if num_tasks_updated > 0: + log.warn("Marked {} tasks as lost.".format(num_tasks_updated)) def scan_db_client_table(self): """Scan the database client table for the current clients.