properly shutdown worker service thread (#367)

This commit is contained in:
Robert Nishihara
2016-08-10 16:53:24 -07:00
committed by Philipp Moritz
parent c01ef95d04
commit fbc49410ec
3 changed files with 24 additions and 9 deletions
+14 -6
View File
@@ -544,7 +544,7 @@ def check_connected(worker=global_worker):
Raises:
Exception: An exception is raised if the worker is not connected.
"""
if worker.handle is None:
if worker.handle is None and worker.mode != raylib.PYTHON_MODE:
raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.")
def print_failed_task(task_status):
@@ -635,7 +635,10 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
if start_ray_local:
if driver_mode == raylib.PYTHON_MODE:
# If starting Ray in PYTHON_MODE, don't start any other processes.
pass
elif start_ray_local:
# In this case, we launch a scheduler, a new object store, and some workers,
# and we connect to them.
if (scheduler_address is not None) or (node_ip_address is not None):
@@ -669,7 +672,7 @@ def cleanup(worker=global_worker):
services.cleanup() in the tests because we need to start and stop many
clusters in the tests, but the import and exit only happen once.
"""
disconnect()
disconnect(worker)
worker.set_mode(None)
services.cleanup()
@@ -688,8 +691,13 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, is_driver
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
and SILENT_MODE.
"""
if hasattr(worker, "handle"):
del worker.handle
assert worker.handle is None, "When connect is called, worker.handle should be None."
# If running Ray in PYTHON_MODE, there is no need to create call create_worker
# or to start the worker service.
if mode == raylib.PYTHON_MODE:
worker.mode = raylib.PYTHON_MODE
return
worker.scheduler_address = scheduler_address
worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", is_driver)
worker.set_mode(mode)
@@ -723,10 +731,10 @@ def disconnect(worker=global_worker):
"""Disconnect this worker from the scheduler and object store."""
if worker.handle is not None:
raylib.disconnect(worker.handle)
worker.handle = None
# Reset the list of cached remote functions so that if more remote functions
# are defined and then connect is called again, the remote functions will be
# exported. This is mostly relevant for the tests.
worker.handle = None
worker.cached_remote_functions = []
reusables._cached_reusables = []
+9 -3
View File
@@ -385,8 +385,11 @@ void Worker::notify_task_completed() {
void Worker::disconnect() {
connected_ = false;
// TODO(rkn): This probably isn't the right way to clean up the thread.
worker_server_thread_->detach();
// Shut down the worker service. This will cause the call to server->Wait() to
// return.
server_ptr_->Shutdown();
// Wait for the thread that launched the worker service to return.
worker_server_thread_->join();
}
// TODO(rkn): Should we be using pointers or references? And should they be const?
@@ -441,6 +444,7 @@ void Worker::start_worker_service(Mode mode) {
builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
server_ptr_ = server.get();
RAY_LOG(RAY_INFO, "worker server listening on " << service_address);
// If this is part of a worker process (and not a driver process), then tell
// the scheduler that it is ready to start receiving tasks.
@@ -451,7 +455,9 @@ void Worker::start_worker_service(Mode mode) {
AckReply reply;
scheduler_stub_->ReadyForNewTask(&context, request, &reply);
}
// Wait for work and process work, this does not return.
// Wait for work and process work. This method does not return until
// Shutdown is called from a different thread.
server->Wait();
RAY_LOG(RAY_INFO, "Worker service thread returning.")
}));
}
+1
View File
@@ -114,6 +114,7 @@ class Worker {
const size_t CHUNK_SIZE = 8 * 1024;
std::unique_ptr<Scheduler::Stub> scheduler_stub_;
std::unique_ptr<std::thread> worker_server_thread_;
Server* server_ptr_;
MessageQueue<WorkerMessage*> receive_queue_;
bip::managed_shared_memory segment_;
WorkerId workerid_;