mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 23:25:24 +08:00
Fix worker exit cleanup (#6450)
* working but ugly * comments * proper but hanging in grpc server destructor * grpc server shutdown deadline * fix disconnect * lint * shutdown_only in test * replace shutdown
This commit is contained in:
@@ -688,6 +688,8 @@ cdef execute_task(
|
||||
# If we've reached the max number of executions for this worker, exit.
|
||||
task_counter = manager.get_task_counter(job_id, function_descriptor)
|
||||
if task_counter == execution_info.max_calls:
|
||||
# Intentionally disconnect so the raylet doesn't print an error.
|
||||
# TODO(edoakes): we should handle max_calls in the core worker.
|
||||
worker.core_worker.disconnect()
|
||||
sys.exit(0)
|
||||
|
||||
@@ -737,11 +739,6 @@ cdef CRayStatus check_signals() nogil:
|
||||
return CRayStatus.OK()
|
||||
|
||||
|
||||
cdef void exit_handler() nogil:
|
||||
with gil:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
|
||||
cdef shared_ptr[CBuffer] empty_metadata
|
||||
if c_str.size() == 0:
|
||||
@@ -787,7 +784,7 @@ cdef class CoreWorker:
|
||||
raylet_socket.encode("ascii"), job_id.native(),
|
||||
gcs_options.native()[0], log_dir.encode("utf-8"),
|
||||
node_ip_address.encode("utf-8"), node_manager_port,
|
||||
task_execution_handler, check_signals, exit_handler, True))
|
||||
task_execution_handler, check_signals, True))
|
||||
|
||||
def disconnect(self):
|
||||
self.destory_event_loop_if_exists()
|
||||
|
||||
@@ -76,7 +76,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
const c_vector[CObjectID] &return_ids,
|
||||
c_vector[shared_ptr[CRayObject]] *returns) nogil,
|
||||
CRayStatus() nogil,
|
||||
void () nogil,
|
||||
c_bool ref_counting_enabled)
|
||||
void Disconnect()
|
||||
CWorkerType &GetWorkerType()
|
||||
|
||||
@@ -37,13 +37,12 @@ def test_conn_cluster():
|
||||
"temp_dir must not be provided.")
|
||||
|
||||
|
||||
def test_tempdir():
|
||||
def test_tempdir(shutdown_only):
|
||||
shutil.rmtree("/tmp/ray", ignore_errors=True)
|
||||
ray.init(temp_dir="/tmp/i_am_a_temp_dir")
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_dir"), "Specified temp dir not found."
|
||||
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
|
||||
ray.shutdown()
|
||||
shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True)
|
||||
|
||||
|
||||
@@ -57,7 +56,7 @@ def test_tempdir_commandline():
|
||||
shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True)
|
||||
|
||||
|
||||
def test_raylet_socket_name():
|
||||
def test_raylet_socket_name(shutdown_only):
|
||||
ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket")
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
|
||||
@@ -77,7 +76,7 @@ def test_raylet_socket_name():
|
||||
pass # It could have been removed by Ray.
|
||||
|
||||
|
||||
def test_temp_plasma_store_socket():
|
||||
def test_temp_plasma_store_socket(shutdown_only):
|
||||
ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket")
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
|
||||
@@ -97,7 +96,7 @@ def test_temp_plasma_store_socket():
|
||||
pass # It could have been removed by Ray.
|
||||
|
||||
|
||||
def test_raylet_tempfiles():
|
||||
def test_raylet_tempfiles(shutdown_only):
|
||||
ray.init(num_cpus=0)
|
||||
node = ray.worker._global_node
|
||||
top_levels = set(os.listdir(node.get_session_dir_path()))
|
||||
@@ -132,15 +131,13 @@ def test_raylet_tempfiles():
|
||||
|
||||
socket_files = set(os.listdir(node.get_sockets_dir_path()))
|
||||
assert socket_files == {"plasma_store", "raylet"}
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
def test_tempdir_privilege():
|
||||
def test_tempdir_privilege(shutdown_only):
|
||||
os.chmod("/tmp/ray", 0o000)
|
||||
ray.init(num_cpus=1)
|
||||
session_dir = ray.worker._global_node.get_session_dir_path()
|
||||
assert os.path.exists(session_dir), "Specified socket path not found."
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
def test_session_dir_uniqueness():
|
||||
|
||||
@@ -431,6 +431,7 @@ class Worker(object):
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm_handler)
|
||||
self.core_worker.run_task_loop()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def get_gpu_ids():
|
||||
@@ -834,6 +835,12 @@ def shutdown(exiting_interpreter=False):
|
||||
|
||||
disconnect(exiting_interpreter)
|
||||
|
||||
# We need to destruct the core worker here because after this function,
|
||||
# we will tear down any processes spawned by ray.init() and the background
|
||||
# IO thread in the core worker doesn't currently handle that gracefully.
|
||||
if hasattr(global_worker, "core_worker"):
|
||||
del global_worker.core_worker
|
||||
|
||||
# Disconnect global state from GCS.
|
||||
ray.state.state.disconnect()
|
||||
|
||||
@@ -843,7 +850,7 @@ def shutdown(exiting_interpreter=False):
|
||||
_global_node.kill_all_processes(check_alive=False, allow_graceful=True)
|
||||
_global_node = None
|
||||
|
||||
# TODO(rkn): Instead of manually reseting some of the worker fields, we
|
||||
# TODO(rkn): Instead of manually resetting some of the worker fields, we
|
||||
# should simply set "global_worker" to equal "None" or something like that.
|
||||
global_worker.set_mode(None)
|
||||
global_worker._post_get_hooks = []
|
||||
@@ -1332,12 +1339,6 @@ def disconnect(exiting_interpreter=False):
|
||||
worker.cached_functions_to_run = []
|
||||
worker.serialization_context_map.clear()
|
||||
|
||||
# We need to destruct the core worker here because after this function,
|
||||
# we will tear down any processes spawned by ray.init() and the background
|
||||
# threads in the core worker don't currently handle that gracefully.
|
||||
if hasattr(worker, "core_worker"):
|
||||
del worker.core_worker
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _changeproctitle(title, next_title):
|
||||
|
||||
Reference in New Issue
Block a user