diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 78317acff..482b26a1e 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import pickle import redis @@ -332,3 +333,31 @@ class GlobalState(object): ip_filename_file[ip_addr][filename] = file_str return ip_filename_file + + def task_profiles(self): + """Fetch and return a list of task profiles. + + Returns: + A tuple of two elements. The first element is a dictionary mapping the + task ID of a task to a list of the profiling information for all of the + executions of that task. The second element is a list of profiling + information for tasks where the events have no task ID. + """ + event_names = self.redis_client.keys("event_log*") + results = dict() + events = [] + for i in range(len(event_names)): + event_list = self.redis_client.lrange(event_names[i], 0, -1) + for event in event_list: + event_dict = json.loads(event.decode("ascii")) + task_id = "" + for element in event_dict: + if "task_id" in element[3]: + task_id = element[3]["task_id"] + if task_id != "": + if task_id not in results: + results[task_id] = [] + results[task_id].append(event_dict) + else: + events.append(event_dict) + return results, events diff --git a/test/runtest.py b/test/runtest.py index e3de3b7fe..bf4223c0a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1565,6 +1565,50 @@ class GlobalStateAPI(unittest.TestCase): time.sleep(0.1) self.assertEqual(found_message, True) + + ray.worker.cleanup() + + def testTaskProfileAPI(self): + ray.init(redirect_output=True) + + @ray.remote + def f(): + return 1 + + num_calls = 5 + [f.remote() for _ in range(num_calls)] + + # Make sure the event log has the correct number of events. + start_time = time.time() + while time.time() - start_time < 10: + profiles, events = ray.global_state.task_profiles() + if len(profiles) == num_calls: + break + time.sleep(0.1) + self.assertEqual(len(profiles), num_calls) + self.assertEqual(len(events), 0) + + # Make sure that each entry is properly formatted. + for task_id in profiles: + events_list = profiles[task_id] + # Make sure that the task was not executed more than once. + self.assertEqual(len(events_list), 1) + events = events_list[0] + for event in events: + found_exec = False + found_store = False + found_get = False + for event in events: + if event[1] == "ray:task:execute": + found_exec = True + if event[1] == "ray:task:get_arguments": + found_get = True + if event[1] == "ray:task:store_outputs": + found_store = True + self.assertTrue(found_exec) + self.assertTrue(found_store) + self.assertTrue(found_get) + ray.worker.cleanup()