From fbc49410ec477a80acbfe0d8961c06bb634520dc Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 10 Aug 2016 16:53:24 -0700 Subject: [PATCH] properly shutdown worker service thread (#367) --- lib/python/ray/worker.py | 20 ++++++++++++++------ src/worker.cc | 12 +++++++++--- src/worker.h | 1 + 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 9beed015e..b3523d16d 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -544,7 +544,7 @@ def check_connected(worker=global_worker): Raises: Exception: An exception is raised if the worker is not connected. """ - if worker.handle is None: + if worker.handle is None and worker.mode != raylib.PYTHON_MODE: raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.") def print_failed_task(task_status): @@ -635,7 +635,10 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_ Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ - if start_ray_local: + if driver_mode == raylib.PYTHON_MODE: + # If starting Ray in PYTHON_MODE, don't start any other processes. + pass + elif start_ray_local: # In this case, we launch a scheduler, a new object store, and some workers, # and we connect to them. if (scheduler_address is not None) or (node_ip_address is not None): @@ -669,7 +672,7 @@ def cleanup(worker=global_worker): services.cleanup() in the tests because we need to start and stop many clusters in the tests, but the import and exit only happen once. """ - disconnect() + disconnect(worker) worker.set_mode(None) services.cleanup() @@ -688,8 +691,13 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ - if hasattr(worker, "handle"): - del worker.handle + assert worker.handle is None, "When connect is called, worker.handle should be None." + # If running Ray in PYTHON_MODE, there is no need to create call create_worker + # or to start the worker service. + if mode == raylib.PYTHON_MODE: + worker.mode = raylib.PYTHON_MODE + return + worker.scheduler_address = scheduler_address worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", is_driver) worker.set_mode(mode) @@ -723,10 +731,10 @@ def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" if worker.handle is not None: raylib.disconnect(worker.handle) + worker.handle = None # Reset the list of cached remote functions so that if more remote functions # are defined and then connect is called again, the remote functions will be # exported. This is mostly relevant for the tests. - worker.handle = None worker.cached_remote_functions = [] reusables._cached_reusables = [] diff --git a/src/worker.cc b/src/worker.cc index de43ece59..bff9a07f2 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -385,8 +385,11 @@ void Worker::notify_task_completed() { void Worker::disconnect() { connected_ = false; - // TODO(rkn): This probably isn't the right way to clean up the thread. - worker_server_thread_->detach(); + // Shut down the worker service. This will cause the call to server->Wait() to + // return. + server_ptr_->Shutdown(); + // Wait for the thread that launched the worker service to return. + worker_server_thread_->join(); } // TODO(rkn): Should we be using pointers or references? And should they be const? @@ -441,6 +444,7 @@ void Worker::start_worker_service(Mode mode) { builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); + server_ptr_ = server.get(); RAY_LOG(RAY_INFO, "worker server listening on " << service_address); // If this is part of a worker process (and not a driver process), then tell // the scheduler that it is ready to start receiving tasks. @@ -451,7 +455,9 @@ void Worker::start_worker_service(Mode mode) { AckReply reply; scheduler_stub_->ReadyForNewTask(&context, request, &reply); } - // Wait for work and process work, this does not return. + // Wait for work and process work. This method does not return until + // Shutdown is called from a different thread. server->Wait(); + RAY_LOG(RAY_INFO, "Worker service thread returning.") })); } diff --git a/src/worker.h b/src/worker.h index b2920f7ee..cc84ead30 100644 --- a/src/worker.h +++ b/src/worker.h @@ -114,6 +114,7 @@ class Worker { const size_t CHUNK_SIZE = 8 * 1024; std::unique_ptr scheduler_stub_; std::unique_ptr worker_server_thread_; + Server* server_ptr_; MessageQueue receive_queue_; bip::managed_shared_memory segment_; WorkerId workerid_;