From 41724399aea083af575c44946bd5925f3e4133a0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 21 Jun 2016 13:28:08 -0700 Subject: [PATCH] print task statuses in shell (#132) --- lib/python/ray/services.py | 4 ++-- lib/python/ray/worker.py | 29 ++++++++++++++++++++++++++++- requirements.txt | 1 + src/raylib.cc | 2 +- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 828fe3801..899902182 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -104,7 +104,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.5) -def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None): +def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, print_task_info=False): global drivers if num_workers_per_objstore > 0 and worker_path is None: raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore)) @@ -134,5 +134,5 @@ def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_ time.sleep(0.5) return driver_workers else: - ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port())) + ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port()), print_task_info=print_task_info) time.sleep(0.5) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index b49430810..2801146ed 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -6,6 +6,7 @@ import typing import funcsigs import numpy as np import pynumbuf +import colorama import ray from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP @@ -18,6 +19,10 @@ class Worker(object): self.functions = {} self.handle = None + def set_print_task_info(self, print_task_info): + self.print_task_info = print_task_info + colorama.init() + def put_object(self, objref, value): """Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store.""" if pynumbuf.serializable(value): @@ -52,11 +57,28 @@ class Worker(object): """Tell the scheduler to schedule the execution of the function with name `func_name` with arguments `args`. Retrieve object references for the outputs of the function from the scheduler and immediately return them.""" task_capsule = serialization.serialize_task(self.handle, func_name, args) objrefs = ray.lib.submit_task(self.handle, task_capsule) + if self.print_task_info: + print_task_info(ray.lib.task_info(self.handle)) return objrefs # We make `global_worker` a global variable so that there is one worker per worker process. global_worker = Worker() +# This is a helper method. It should not be called by users. +def print_task_info(task_data): + num_tasks_succeeded = task_data["num_succeeded"] + num_tasks_in_progress = len(task_data["running_tasks"]) + num_tasks_failed = len(task_data["failed_tasks"]) + info_strings = [] + if num_tasks_succeeded > 0: + info_strings.append("{}{} task{} succeeded{}".format(colorama.Fore.BLUE, num_tasks_succeeded, "s" if num_tasks_succeeded > 1 else "", colorama.Fore.RESET)) + if num_tasks_in_progress > 0: + info_strings.append("{}{} task{} in progress{}".format(colorama.Fore.GREEN, num_tasks_in_progress, "s" if num_tasks_in_progress > 1 else "", colorama.Fore.RESET)) + if num_tasks_failed > 0: + info_strings.append("{}{} task{} failed{}".format(colorama.Fore.RED, num_tasks_failed, "s" if num_tasks_failed > 1 else "", colorama.Fore.RESET)) + if len(info_strings) > 0: + print ", ".join(info_strings) + def scheduler_info(worker=global_worker): return ray.lib.scheduler_info(worker.handle); @@ -74,7 +96,7 @@ def register_module(module, recursive=False, worker=global_worker): # elif recursive and isinstance(val, ModuleType): # register_module(val, recursive, worker) -def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): +def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, print_task_info=False): if hasattr(worker, "handle"): del worker.handle worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) @@ -82,17 +104,22 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr)) logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log") ray.lib.set_log_config(log_basename + "-c++.log") + worker.set_print_task_info(print_task_info) def disconnect(worker=global_worker): ray.lib.disconnect(worker.handle) def pull(objref, worker=global_worker): ray.lib.request_object(worker.handle, objref) + if worker.print_task_info: + print_task_info(ray.lib.task_info(worker.handle)) return worker.get_object(objref) def push(value, worker=global_worker): objref = ray.lib.get_objref(worker.handle) worker.put_object(objref, value) + if worker.print_task_info: + print_task_info(ray.lib.task_info(worker.handle)) return objref def main_loop(worker=global_worker): diff --git a/requirements.txt b/requirements.txt index a2bc4a333..0890a7dcb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ subprocess32 boto3 botocore Pillow +colorama diff --git a/src/raylib.cc b/src/raylib.cc index 5a9e76800..732ad34eb 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -840,7 +840,7 @@ PyObject* task_info(PyObject* self, PyObject* args) { PyObject* dict = PyDict_New(); set_dict_item_and_transfer_ownership(dict, PyString_FromString("failed_tasks"), failed_tasks_list); set_dict_item_and_transfer_ownership(dict, PyString_FromString("running_tasks"), running_tasks_list); - set_dict_item_and_transfer_ownership(dict, PyString_FromString("number_succeeded"), PyInt_FromLong(reply.num_succeeded())); + set_dict_item_and_transfer_ownership(dict, PyString_FromString("num_succeeded"), PyInt_FromLong(reply.num_succeeded())); return dict; }