mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 04:07:01 +08:00
[Core] Show_webui segfault fix. (#8323)
This commit is contained in:
+14
-2
@@ -634,6 +634,12 @@ cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
|
||||
<uint8_t*>(c_str.data()), c_str.size(), True))
|
||||
|
||||
|
||||
cdef void terminate_asyncio_thread() nogil:
|
||||
with gil:
|
||||
core_worker = ray.worker.global_worker.core_worker
|
||||
core_worker.destroy_event_loop_if_exists()
|
||||
|
||||
|
||||
cdef class CoreWorker:
|
||||
|
||||
def __cinit__(self, is_driver, store_socket, raylet_socket,
|
||||
@@ -667,6 +673,7 @@ cdef class CoreWorker:
|
||||
options.is_local_mode = local_mode
|
||||
options.num_workers = 1
|
||||
options.kill_main = kill_main_task
|
||||
options.terminate_asyncio_thread = terminate_asyncio_thread
|
||||
|
||||
CCoreWorkerProcess.Initialize(options)
|
||||
|
||||
@@ -1171,7 +1178,8 @@ cdef class CoreWorker:
|
||||
|
||||
if self.async_thread is None:
|
||||
self.async_thread = threading.Thread(
|
||||
target=lambda: self.async_event_loop.run_forever()
|
||||
target=lambda: self.async_event_loop.run_forever(),
|
||||
name="AsyncIO Thread"
|
||||
)
|
||||
# Making the thread a daemon causes it to exit
|
||||
# when the main thread exits.
|
||||
@@ -1192,8 +1200,12 @@ cdef class CoreWorker:
|
||||
.YieldCurrentFiber(event))
|
||||
return future.result()
|
||||
|
||||
def destory_event_loop_if_exists(self):
|
||||
def destroy_event_loop_if_exists(self):
|
||||
if self.async_event_loop is not None:
|
||||
# We should stop the monitor first because otherwise,
|
||||
# loop.stop() will continue forever as monitor
|
||||
# main loop will not be terminated.
|
||||
self.async_event_loop.monitor_state.kill()
|
||||
self.async_event_loop.stop()
|
||||
if self.async_thread is not None:
|
||||
self.async_thread.join()
|
||||
|
||||
@@ -107,7 +107,8 @@ class AsyncMonitorState:
|
||||
self.names_lock = threading.Lock()
|
||||
|
||||
self.sleep_time = 1.0
|
||||
asyncio.ensure_future(self.monitor(), loop=loop)
|
||||
self.monitor_loop_future = asyncio.ensure_future(
|
||||
self.monitor(), loop=loop)
|
||||
|
||||
async def monitor(self):
|
||||
while True:
|
||||
@@ -131,3 +132,11 @@ class AsyncMonitorState:
|
||||
with self.names_lock:
|
||||
names = list(self.names.values())
|
||||
return names
|
||||
|
||||
def kill(self):
|
||||
"""Kill the monitor's loop
|
||||
|
||||
This should be called in order to clean an event loop
|
||||
that this monitor is running.
|
||||
"""
|
||||
self.monitor_loop_future.cancel()
|
||||
|
||||
@@ -217,6 +217,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
int num_workers
|
||||
(c_bool() nogil) kill_main
|
||||
CCoreWorkerOptions()
|
||||
(void() nogil) terminate_asyncio_thread
|
||||
|
||||
cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess":
|
||||
@staticmethod
|
||||
|
||||
@@ -92,6 +92,7 @@ def timeit(name, fn, multiplier=1):
|
||||
def main():
|
||||
print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")
|
||||
ray.init()
|
||||
|
||||
value = ray.put(0)
|
||||
arr = np.zeros(100 * 1024 * 1024, dtype=np.int64)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user