From e631827a9fa3e8cbddfb857cc0d5b5f70dc3190f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 6 May 2020 09:45:07 -0700 Subject: [PATCH] [Core] Show_webui segfault fix. (#8323) --- python/ray/_raylet.pyx | 16 ++++++++++++++-- python/ray/async_compat.py | 11 ++++++++++- python/ray/includes/libcoreworker.pxd | 1 + python/ray/ray_perf.py | 1 + src/ray/core_worker/core_worker.cc | 7 +++++++ src/ray/core_worker/core_worker.h | 2 ++ 6 files changed, 35 insertions(+), 3 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 930e45ecb..7173860ec 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -634,6 +634,12 @@ cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): (c_str.data()), c_str.size(), True)) +cdef void terminate_asyncio_thread() nogil: + with gil: + core_worker = ray.worker.global_worker.core_worker + core_worker.destroy_event_loop_if_exists() + + cdef class CoreWorker: def __cinit__(self, is_driver, store_socket, raylet_socket, @@ -667,6 +673,7 @@ cdef class CoreWorker: options.is_local_mode = local_mode options.num_workers = 1 options.kill_main = kill_main_task + options.terminate_asyncio_thread = terminate_asyncio_thread CCoreWorkerProcess.Initialize(options) @@ -1171,7 +1178,8 @@ cdef class CoreWorker: if self.async_thread is None: self.async_thread = threading.Thread( - target=lambda: self.async_event_loop.run_forever() + target=lambda: self.async_event_loop.run_forever(), + name="AsyncIO Thread" ) # Making the thread a daemon causes it to exit # when the main thread exits. @@ -1192,8 +1200,12 @@ cdef class CoreWorker: .YieldCurrentFiber(event)) return future.result() - def destory_event_loop_if_exists(self): + def destroy_event_loop_if_exists(self): if self.async_event_loop is not None: + # We should stop the monitor first because otherwise, + # loop.stop() will continue forever as monitor + # main loop will not be terminated. + self.async_event_loop.monitor_state.kill() self.async_event_loop.stop() if self.async_thread is not None: self.async_thread.join() diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index df103f5df..4bfe7bffa 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -107,7 +107,8 @@ class AsyncMonitorState: self.names_lock = threading.Lock() self.sleep_time = 1.0 - asyncio.ensure_future(self.monitor(), loop=loop) + self.monitor_loop_future = asyncio.ensure_future( + self.monitor(), loop=loop) async def monitor(self): while True: @@ -131,3 +132,11 @@ class AsyncMonitorState: with self.names_lock: names = list(self.names.values()) return names + + def kill(self): + """Kill the monitor's loop + + This should be called in order to clean an event loop + that this monitor is running. + """ + self.monitor_loop_future.cancel() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 6524139e0..a2698ee51 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -217,6 +217,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int num_workers (c_bool() nogil) kill_main CCoreWorkerOptions() + (void() nogil) terminate_asyncio_thread cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess": @staticmethod diff --git a/python/ray/ray_perf.py b/python/ray/ray_perf.py index eae62e396..55c3c100c 100644 --- a/python/ray/ray_perf.py +++ b/python/ray/ray_perf.py @@ -92,6 +92,7 @@ def timeit(name, fn, multiplier=1): def main(): print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks") ray.init() + value = ray.put(0) arr = np.zeros(100 * 1024 * 1024, dtype=np.int64) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a3e823a11..7739ee867 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -590,6 +590,13 @@ void CoreWorker::WaitForShutdown() { } if (options_.worker_type == WorkerType::WORKER) { RAY_CHECK(task_execution_service_.stopped()); + // Asyncio coroutines could still run after CoreWorker is removed because it is + // running in a different thread. This can cause segfault because coroutines try to + // access CoreWorker methods that are already garbage collected. We should complete + // all coroutines before shutting down in order to prevent this. + if (worker_context_.CurrentActorIsAsync()) { + options_.terminate_asyncio_thread(); + } } } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 27c374ccf..5b6ea601b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -114,6 +114,8 @@ struct CoreWorkerOptions { bool is_local_mode; /// The number of workers to be started in the current process. int num_workers; + /// The function to destroy asyncio event and loops. + std::function terminate_asyncio_thread; }; /// Lifecycle management of one or more `CoreWorker` instances in a process.