check that we are connected before calling ray commands (#303)

This commit is contained in:
Robert Nishihara
2016-07-27 11:20:37 -07:00
committed by Philipp Moritz
parent a97574d471
commit 975bfe89ad
3 changed files with 21 additions and 5 deletions
+17 -1
View File
@@ -437,6 +437,15 @@ reinitialize these variables after they are used so that changes to their state
made by one task do not affect other tasks.
"""
def check_connected(worker=global_worker):
"""Check if the worker is connected.
Raises:
Exception: An exception is raised if the worker is not connected.
"""
if worker.handle is None:
raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.services.start_ray_local(num_workers=1)'.")
def print_failed_task(task_status):
"""Print information about failed tasks.
@@ -479,6 +488,7 @@ def print_task_info(task_data, mode):
def scheduler_info(worker=global_worker):
"""Return information about the state of the scheduler."""
check_connected(worker)
return ray.lib.scheduler_info(worker.handle)
def visualize_computation_graph(file_path=None, view=False, worker=global_worker):
@@ -499,7 +509,7 @@ def visualize_computation_graph(file_path=None, view=False, worker=global_worker
>>> z = da.dot(x, y)
>>> ray.visualize_computation_graph(view=True)
"""
check_connected(worker)
if file_path is None:
file_path = ray.config.get_log_file_path("computation-graph.pdf")
@@ -519,6 +529,7 @@ def visualize_computation_graph(file_path=None, view=False, worker=global_worker
def task_info(worker=global_worker):
"""Return information about failed tasks."""
check_connected(worker)
return ray.lib.task_info(worker.handle)
def register_module(module, worker=global_worker):
@@ -530,6 +541,7 @@ def register_module(module, worker=global_worker):
args:
module (module): The module of functions to register.
"""
check_connected(worker)
logging.info("registering functions in module {}.".format(module.__name__))
for name in dir(module):
val = getattr(module, name)
@@ -574,6 +586,7 @@ def disconnect(worker=global_worker):
# Reset the list of cached remote functions so that if more remote functions
# are defined and then connect is called again, the remote functions will be
# exported. This is mostly relevant for the tests.
worker.handle = None
worker.cached_remote_functions = []
reusables._cached_reusables = []
@@ -591,6 +604,7 @@ def get(objref, worker=global_worker):
Returns:
A Python object
"""
check_connected(worker)
if worker.mode == ray.PYTHON_MODE:
return objref # In ray.PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objref)
ray.lib.request_object(worker.handle, objref)
@@ -610,6 +624,7 @@ def put(value, worker=global_worker):
Returns:
The object reference assigned to this value.
"""
check_connected(worker)
if worker.mode == ray.PYTHON_MODE:
return value # In ray.PYTHON_MODE, ray.put is the identity operation
objref = ray.lib.get_objref(worker.handle)
@@ -780,6 +795,7 @@ def remote(arg_types, return_types, worker=global_worker):
def remote_decorator(func):
def func_call(*args, **kwargs):
"""This gets run immediately when a worker calls a remote function."""
check_connected()
args = list(args)
args.extend([kwargs[keyword] if kwargs.has_key(keyword) else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments
if _mode() == ray.PYTHON_MODE: