diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 9118845f0..71a083e13 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -688,6 +688,8 @@ cdef execute_task( # If we've reached the max number of executions for this worker, exit. task_counter = manager.get_task_counter(job_id, function_descriptor) if task_counter == execution_info.max_calls: + # Intentionally disconnect so the raylet doesn't print an error. + # TODO(edoakes): we should handle max_calls in the core worker. worker.core_worker.disconnect() sys.exit(0) @@ -737,11 +739,6 @@ cdef CRayStatus check_signals() nogil: return CRayStatus.OK() -cdef void exit_handler() nogil: - with gil: - sys.exit(0) - - cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): cdef shared_ptr[CBuffer] empty_metadata if c_str.size() == 0: @@ -787,7 +784,7 @@ cdef class CoreWorker: raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), node_ip_address.encode("utf-8"), node_manager_port, - task_execution_handler, check_signals, exit_handler, True)) + task_execution_handler, check_signals, True)) def disconnect(self): self.destory_event_loop_if_exists() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index af3f73e76..1ab49cf86 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -76,7 +76,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectID] &return_ids, c_vector[shared_ptr[CRayObject]] *returns) nogil, CRayStatus() nogil, - void () nogil, c_bool ref_counting_enabled) void Disconnect() CWorkerType &GetWorkerType() diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 21de4e5d6..a9f917340 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -37,13 +37,12 @@ def test_conn_cluster(): "temp_dir must not be provided.") -def test_tempdir(): +def test_tempdir(shutdown_only): shutil.rmtree("/tmp/ray", ignore_errors=True) ray.init(temp_dir="/tmp/i_am_a_temp_dir") assert os.path.exists( "/tmp/i_am_a_temp_dir"), "Specified temp dir not found." assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist." - ray.shutdown() shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True) @@ -57,7 +56,7 @@ def test_tempdir_commandline(): shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True) -def test_raylet_socket_name(): +def test_raylet_socket_name(shutdown_only): ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket") assert os.path.exists( "/tmp/i_am_a_temp_socket"), "Specified socket path not found." @@ -77,7 +76,7 @@ def test_raylet_socket_name(): pass # It could have been removed by Ray. -def test_temp_plasma_store_socket(): +def test_temp_plasma_store_socket(shutdown_only): ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket") assert os.path.exists( "/tmp/i_am_a_temp_socket"), "Specified socket path not found." @@ -97,7 +96,7 @@ def test_temp_plasma_store_socket(): pass # It could have been removed by Ray. -def test_raylet_tempfiles(): +def test_raylet_tempfiles(shutdown_only): ray.init(num_cpus=0) node = ray.worker._global_node top_levels = set(os.listdir(node.get_session_dir_path())) @@ -132,15 +131,13 @@ def test_raylet_tempfiles(): socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} - ray.shutdown() -def test_tempdir_privilege(): +def test_tempdir_privilege(shutdown_only): os.chmod("/tmp/ray", 0o000) ray.init(num_cpus=1) session_dir = ray.worker._global_node.get_session_dir_path() assert os.path.exists(session_dir), "Specified socket path not found." - ray.shutdown() def test_session_dir_uniqueness(): diff --git a/python/ray/worker.py b/python/ray/worker.py index 79942fb3f..87859afe9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -431,6 +431,7 @@ class Worker(object): signal.signal(signal.SIGTERM, sigterm_handler) self.core_worker.run_task_loop() + sys.exit(0) def get_gpu_ids(): @@ -834,6 +835,12 @@ def shutdown(exiting_interpreter=False): disconnect(exiting_interpreter) + # We need to destruct the core worker here because after this function, + # we will tear down any processes spawned by ray.init() and the background + # IO thread in the core worker doesn't currently handle that gracefully. + if hasattr(global_worker, "core_worker"): + del global_worker.core_worker + # Disconnect global state from GCS. ray.state.state.disconnect() @@ -843,7 +850,7 @@ def shutdown(exiting_interpreter=False): _global_node.kill_all_processes(check_alive=False, allow_graceful=True) _global_node = None - # TODO(rkn): Instead of manually reseting some of the worker fields, we + # TODO(rkn): Instead of manually resetting some of the worker fields, we # should simply set "global_worker" to equal "None" or something like that. global_worker.set_mode(None) global_worker._post_get_hooks = [] @@ -1332,12 +1339,6 @@ def disconnect(exiting_interpreter=False): worker.cached_functions_to_run = [] worker.serialization_context_map.clear() - # We need to destruct the core worker here because after this function, - # we will tear down any processes spawned by ray.init() and the background - # threads in the core worker don't currently handle that gracefully. - if hasattr(worker, "core_worker"): - del worker.core_worker - @contextmanager def _changeproctitle(title, next_title): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d2bbd28a3..b58010c04 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -70,9 +70,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, const std::string &log_dir, const std::string &node_ip_address, int node_manager_port, const TaskExecutionCallback &task_execution_callback, - std::function check_signals, - const std::function exit_handler, - bool ref_counting_enabled) + std::function check_signals, bool ref_counting_enabled) : worker_type_(worker_type), language_(language), log_dir_(log_dir), @@ -119,13 +117,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RAY_CHECK(task_execution_callback_ != nullptr); auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + auto exit = std::bind(&CoreWorker::Shutdown, this); raylet_task_receiver_ = std::unique_ptr(new CoreWorkerRayletTaskReceiver( - worker_context_.GetWorkerID(), local_raylet_client_, execute_task, - exit_handler)); + worker_context_.GetWorkerID(), local_raylet_client_, execute_task, exit)); direct_task_receiver_ = std::unique_ptr(new CoreWorkerDirectTaskReceiver( - worker_context_, task_execution_service_, execute_task, exit_handler)); + worker_context_, task_execution_service_, execute_task, exit)); } // Start RPC server after all the task receivers are properly initialized. @@ -239,26 +237,25 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, } CoreWorker::~CoreWorker() { - Shutdown(); + io_service_.stop(); io_thread_.join(); + if (log_dir_ != "") { + RayLog::ShutDownRayLog(); + } } void CoreWorker::Shutdown() { - if (!shutdown_) { - shutdown_ = true; - io_service_.stop(); - if (worker_type_ == WorkerType::WORKER) { - task_execution_service_.stop(); - } - if (log_dir_ != "") { - RayLog::ShutDownRayLog(); - } + io_service_.stop(); + if (worker_type_ == WorkerType::WORKER) { + task_execution_service_.stop(); } } void CoreWorker::Disconnect() { io_service_.stop(); - gcs_client_->Disconnect(); + if (gcs_client_) { + gcs_client_->Disconnect(); + } if (local_raylet_client_) { RAY_IGNORE_EXPR(local_raylet_client_->Disconnect()); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index edb4c078a..127412db3 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -67,8 +67,6 @@ class CoreWorker { /// \param[in] check_signals Language worker function to check for signals and handle /// them. If the function returns anything but StatusOK, any long-running /// operations in the core worker will short circuit and return that status. - /// \param[in] exit_handler Language worker function to orderly shutdown the worker. - /// We guarantee this will be run on the main thread of the worker. /// \param[in] ref_counting_enabled Whether to enable object ref counting. /// /// NOTE(zhijunfu): the constructor would throw if a failure happens. @@ -78,7 +76,6 @@ class CoreWorker { const std::string &log_dir, const std::string &node_ip_address, int node_manager_port, const TaskExecutionCallback &task_execution_callback, std::function check_signals = nullptr, - std::function exit_handler = nullptr, bool ref_counting_enabled = false); ~CoreWorker(); diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 9e8e0ae5d..2831288be 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -431,7 +431,10 @@ class CoreWorkerDirectTaskReceiver { ~CoreWorkerDirectTaskReceiver() { fiber_shutdown_event_.Notify(); - fiber_runner_thread_.join(); + // Only join the fiber thread if it was spawned in the first place. + if (fiber_runner_thread_.joinable()) { + fiber_runner_thread_.join(); + } } /// Initialize this receiver. This must be called prior to use. diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index bd28ae3c1..9e5fb60b8 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -1,12 +1,12 @@ #ifndef RAY_RPC_GRPC_SERVER_H #define RAY_RPC_GRPC_SERVER_H +#include + +#include #include #include -#include -#include - #include "ray/common/status.h" #include "ray/rpc/server_call.h" @@ -42,7 +42,9 @@ class GrpcServer { // Shutdown this server void Shutdown() { if (!is_closed_) { - server_->Shutdown(); + // Shutdown the server with an immediate deadline. + // TODO(edoakes): do we want to do this in all cases? + server_->Shutdown(gpr_now(GPR_CLOCK_REALTIME)); for (const auto &cq : cqs_) { cq->Shutdown(); }