diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 7160f9ffe..e8d30adfc 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -11,7 +11,12 @@ import ray.config as config _services_env = os.environ.copy() _services_env["PATH"] = os.pathsep.join([os.path.dirname(os.path.abspath(__file__)), _services_env["PATH"]]) +# all_processes is a list of the scheduler, object store, and worker processes +# that have been started by this services module if Ray is being used in local +# mode. all_processes = [] +# drivers is a list of the worker objects corresponding to drivers if +# start_services_local is run with return_drivers=True. drivers = [] IP_ADDRESS = "127.0.0.1" @@ -79,7 +84,7 @@ def cleanup(): atexit.register(cleanup) -def start_scheduler(scheduler_address, local=True): +def start_scheduler(scheduler_address, local): """ This method starts a scheduler process. @@ -92,7 +97,7 @@ def start_scheduler(scheduler_address, local=True): if local: all_processes.append((p, scheduler_address)) -def start_objstore(scheduler_address, objstore_address, local=True): +def start_objstore(scheduler_address, objstore_address, local): """ This method starts an object store process. @@ -106,7 +111,7 @@ def start_objstore(scheduler_address, objstore_address, local=True): if local: all_processes.append((p, objstore_address)) -def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local=True): +def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local): """ This method starts a worker process. diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 5fc653da8..df67760bd 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -213,7 +213,8 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False ray.lib.set_log_config(config.get_log_file_path("-".join(["worker", worker_address, "c++"]) + ".log")) def disconnect(worker=global_worker): - ray.lib.disconnect(worker.handle) + if worker.handle is not None: + ray.lib.disconnect(worker.handle) def get(objref, worker=global_worker): """ diff --git a/scripts/cluster.py b/scripts/cluster.py index f6a0bf13d..c92bb86f2 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -82,7 +82,7 @@ def _start_ray(node_ip_addresses, username, key_file, num_workers_per_node, work start_scheduler_command = """ cd "{}"; source ../setup-env.sh; - python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\")" > start_scheduler.out 2> start_scheduler.err < /dev/null & + python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\", local=False)" > start_scheduler.out 2> start_scheduler.err < /dev/null & """.format(scripts_directory, node_ip_addresses[0]) run_command_over_ssh(node_ip_addresses[0], username, key_file, start_scheduler_command)