diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 7f7c17b34..78317acff 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -304,3 +304,31 @@ class GlobalState(object): node_info[node_ip_address].append(client_info_parsed) return node_info + + def log_files(self): + """Fetch and return a dictionary of log file names to outputs. + + Returns: + IP address to log file name to log file contents mappings. + """ + relevant_files = self.redis_client.keys("LOGFILE*") + + ip_filename_file = dict() + + for filename in relevant_files: + filename = filename.decode("ascii") + filename_components = filename.split(":") + ip_addr = filename_components[1] + + file = self.redis_client.lrange(filename, 0, -1) + file_str = [] + for x in file: + y = x.decode("ascii") + file_str.append(y) + + if ip_addr not in ip_filename_file: + ip_filename_file[ip_addr] = dict() + + ip_filename_file[ip_addr][filename] = file_str + + return ip_filename_file diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index db9321ea1..92abe454e 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -48,7 +48,16 @@ class LogMonitor(object): for log_filename in self.log_files: if log_filename in self.log_file_handles: # Get any updates to the file. - new_lines = self.log_file_handles[log_filename].readlines() + new_lines = [] + while True: + current_position = self.log_file_handles[log_filename].tell() + next_line = self.log_file_handles[log_filename].readline() + if next_line != "": + new_lines.append(next_line) + else: + self.log_file_handles[log_filename].seek(current_position) + break + # If there are any new lines, cache them and also push them to Redis. if len(new_lines) > 0: self.log_files[log_filename] += new_lines @@ -58,7 +67,7 @@ class LogMonitor(object): else: try: self.log_file_handles[log_filename] = open(log_filename, "r") - except FileNotFoundError: + except IOError: print("Warning: The file {} was not found.".format(log_filename)) def run(self): diff --git a/test/runtest.py b/test/runtest.py index 87060a3c7..6e1209343 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1407,6 +1407,9 @@ class GlobalStateAPI(unittest.TestCase): with self.assertRaises(Exception): ray.global_state.function_table() + with self.assertRaises(Exception): + ray.global_state.log_files() + ray.init() self.assertEqual(ray.global_state.object_table(), dict()) @@ -1511,6 +1514,37 @@ class GlobalStateAPI(unittest.TestCase): ray.worker.cleanup() + def testLogFileAPI(self): + ray.init(redirect_output=True) + + message = "unique message" + + @ray.remote + def f(): + print(message) + # The call to sys.stdout.flush() seems to be necessary when using the + # system Python 2.7 on Ubuntu. + sys.stdout.flush() + + ray.get(f.remote()) + + # Make sure that the message appears in the log files. + start_time = time.time() + found_message = False + while time.time() - start_time < 10: + log_files = ray.global_state.log_files() + for ip, innerdict in log_files.items(): + for filename, contents in innerdict.items(): + contents_str = "".join(contents) + if message in contents_str: + found_message = True + if found_message: + break + time.sleep(0.1) + + self.assertEqual(found_message, True) + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)