mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 06:46:40 +08:00
Consodiate CI Python tests and fix bug about multiple ray.init (#4195)
This commit is contained in:
committed by
Robert Nishihara
parent
9c48cc27aa
commit
6f1a29ad3f
@@ -57,9 +57,11 @@ class RemoteFunction(object):
|
||||
self._function_signature = ray.signature.extract_signature(
|
||||
self._function)
|
||||
|
||||
# # Export the function.
|
||||
# Export the function.
|
||||
worker = ray.worker.get_global_worker()
|
||||
worker.function_actor_manager.export(self)
|
||||
# In which session this function was exported last time.
|
||||
self._last_export_session = worker._session_index
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
raise Exception("Remote functions cannot be called directly. Instead "
|
||||
@@ -97,6 +99,13 @@ class RemoteFunction(object):
|
||||
"""An experimental alternate way to submit remote functions."""
|
||||
worker = ray.worker.get_global_worker()
|
||||
worker.check_connected()
|
||||
|
||||
if self._last_export_session < worker._session_index:
|
||||
# If this function was exported in a previous session, we need to
|
||||
# export this function again, because current GCS doesn't have it.
|
||||
self._last_export_session = worker._session_index
|
||||
worker.function_actor_manager.export(self)
|
||||
|
||||
kwargs = {} if kwargs is None else kwargs
|
||||
args = ray.signature.extend_args(self._function_signature, args,
|
||||
kwargs)
|
||||
|
||||
@@ -13,6 +13,11 @@ import warnings
|
||||
import ray
|
||||
from ray.tests.cluster_utils import Cluster
|
||||
|
||||
# TODO(yuhguo): This test file requires a lot of CPU/memory, and
|
||||
# better be put in Jenkins. However, it fails frequently in Jenkins, but
|
||||
# works well in Travis. We should consider moving it back to Jenkins once
|
||||
# we figure out the reason.
|
||||
|
||||
if (multiprocessing.cpu_count() < 40
|
||||
or ray.utils.get_system_memory() < 50 * 10**9):
|
||||
warnings.warn("This test must be run on large machines.")
|
||||
|
||||
@@ -161,6 +161,9 @@ class Worker(object):
|
||||
# This event is checked regularly by all of the threads so that they
|
||||
# know when to exit.
|
||||
self.threads_stopped = threading.Event()
|
||||
# Index of the current session. This number will
|
||||
# increment every time when `ray.shutdown` is called.
|
||||
self._session_index = 0
|
||||
|
||||
@property
|
||||
def task_context(self):
|
||||
@@ -2064,6 +2067,7 @@ def disconnect():
|
||||
if hasattr(worker, "logger_thread"):
|
||||
worker.logger_thread.join()
|
||||
worker.threads_stopped.clear()
|
||||
worker._session_index += 1
|
||||
|
||||
worker.connected = False
|
||||
worker.cached_functions_to_run = []
|
||||
|
||||
Reference in New Issue
Block a user