From 58dc70f90e1ebe7decb93cbba0429c761076b4a4 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Fri, 20 Mar 2020 15:45:29 -0500 Subject: [PATCH] [minor] Remove get_global_worker(), RuntimeContext (#7638) --- python/ray/__init__.py | 1 - python/ray/_raylet.pyx | 2 +- python/ray/actor.py | 14 ++++----- python/ray/exceptions.py | 5 ++++ python/ray/experimental/internal_kv.py | 6 ++-- python/ray/internal/internal_api.py | 4 +-- python/ray/remote_function.py | 2 +- python/ray/runtime_context.py | 30 ------------------- python/ray/serialization.py | 6 ++-- python/ray/tests/test_advanced_3.py | 4 +-- python/ray/worker.py | 22 +++++--------- .../python/tests/test_direct_transfer.py | 6 ++-- 12 files changed, 33 insertions(+), 69 deletions(-) delete mode 100644 python/ray/runtime_context.py diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 9dc3bd39e..0016465fd 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -96,7 +96,6 @@ import ray.projects # noqa: E402 # some functions in the worker. import ray.actor # noqa: F401 from ray.actor import method # noqa: E402 -from ray.runtime_context import _get_runtime_context # noqa: E402 from ray.cross_language import java_function, java_actor_class # noqa: E402 from ray import util # noqa: E402 diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ca7197a10..7b5ce944c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -953,7 +953,7 @@ cdef class CoreWorker: CObjectID c_outer_object_id = (outer_object_id.native() if outer_object_id else CObjectID.Nil()) - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() manager = worker.function_actor_manager c_actor_id = self.core_worker.get().DeserializeAndRegisterActorHandle( diff --git a/python/ray/actor.py b/python/ray/actor.py index e380dcc9f..d615b5651 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -463,7 +463,7 @@ class ActorClass: if max_concurrency < 1: raise ValueError("max_concurrency must be >= 1") - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if worker.mode is None: raise RuntimeError("Actors cannot be created before ray.init() " "has been called.") @@ -655,7 +655,7 @@ class ActorHandle: def __del__(self): # Mark that this actor handle has gone out of scope. Once all actor # handles are out of scope, the actor will exit. - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if worker.connected and hasattr(worker, "core_worker"): worker.core_worker.remove_actor_handle_reference( self._ray_actor_id) @@ -682,7 +682,7 @@ class ActorHandle: object_ids: A list of object IDs returned by the remote actor method. """ - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker args = args or [] kwargs = kwargs or {} @@ -776,7 +776,7 @@ class ActorHandle: Returns: A dictionary of the information needed to reconstruct the object. """ - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() if hasattr(worker, "core_worker"): @@ -809,7 +809,7 @@ class ActorHandle: the actor handle. """ - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() if hasattr(worker, "core_worker"): @@ -859,7 +859,7 @@ def modify_class(cls): __ray_actor_class__ = cls # The original actor class def __ray_terminate__(self): - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if worker.mode != ray.LOCAL_MODE: ray.actor.exit_actor() @@ -937,8 +937,6 @@ def exit_actor(): raise TypeError("exit_actor called on a non-actor worker.") -ray.worker.global_worker.make_actor = make_actor - CheckpointContext = namedtuple( "CheckpointContext", [ diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index c636f2e8a..a3db708fd 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -11,6 +11,11 @@ class RayError(Exception): pass +class RayConnectionError(RayError): + """Raised when ray is not yet connected but needs to be.""" + pass + + class RayTaskError(RayError): """Indicates that a task threw an exception during execution. diff --git a/python/ray/experimental/internal_kv.py b/python/ray/experimental/internal_kv.py index df670547d..138071f50 100644 --- a/python/ray/experimental/internal_kv.py +++ b/python/ray/experimental/internal_kv.py @@ -4,14 +4,14 @@ _local = {} # dict for local mode def _internal_kv_initialized(): - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker return hasattr(worker, "mode") and worker.mode is not None def _internal_kv_get(key): """Fetch the value of a binary key.""" - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if worker.mode == ray.worker.LOCAL_MODE: return _local.get(key) @@ -27,7 +27,7 @@ def _internal_kv_put(key, value, overwrite=False): already_exists (bool): whether the value already exists. """ - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if worker.mode == ray.worker.LOCAL_MODE: exists = key in _local if not exists or overwrite: diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 14d196095..013fb880b 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -7,7 +7,7 @@ __all__ = ["free", "global_gc"] def global_gc(): """Trigger gc.collect() on all workers in the cluster.""" - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.core_worker.global_gc() @@ -54,7 +54,7 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): delete_creating_tasks (bool): Whether also delete the object creating tasks. """ - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker if isinstance(object_ids, ray.ObjectID): object_ids = [object_ids] diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 948fa305b..5a250ddeb 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -150,7 +150,7 @@ class RemoteFunction: resources=None, max_retries=None): """Submit the remote function for execution.""" - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() # If this function was not exported in this session and job, we need to diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py deleted file mode 100644 index 66ee92c2c..000000000 --- a/python/ray/runtime_context.py +++ /dev/null @@ -1,30 +0,0 @@ -import ray.worker - - -class RuntimeContext: - """A class used for getting runtime context.""" - - def __init__(self, worker=None): - self.worker = worker - - @property - def current_driver_id(self): - """Get current driver ID for this worker or driver. - - Returns: - If called by a driver, this returns the driver ID. If called in - a task, return the driver ID of the associated driver. - """ - assert self.worker is not None - return self.worker.current_job_id - - -_runtime_context = None - - -def _get_runtime_context(): - global _runtime_context - if _runtime_context is None: - _runtime_context = RuntimeContext(ray.worker.get_global_worker()) - - return _runtime_context diff --git a/python/ray/serialization.py b/python/ray/serialization.py index e07b2ad17..03ded8540 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -164,7 +164,7 @@ class SerializationContext: # TODO(swang): Remove this check. Otherwise, we will not be able to # handle serialized plasma IDs correctly. if obj.is_direct_call_type(): - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() obj, owner_id, owner_address = ( worker.core_worker.serialize_and_promote_object_id(obj)) @@ -184,7 +184,7 @@ class SerializationContext: # somewhere, which causes an error. context = ray.worker.global_worker.get_serialization_context() if owner_id: - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker worker.check_connected() # UniqueIDs are serialized as # (class name, (unique bytes,)). @@ -250,7 +250,7 @@ class SerializationContext: # cloudpickle directly or captured in a remote function/actor), # then pin the object for the lifetime of this worker by adding # a local reference that won't ever be removed. - ray.worker.get_global_worker().core_worker.add_object_id_reference( + ray.worker.global_worker.core_worker.add_object_id_reference( object_id) def _deserialize_pickle5_data(self, data): diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 3b4bdd23b..5bca918e1 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -299,12 +299,12 @@ def test_specific_job_id(): ray.init(num_cpus=1, job_id=dummy_driver_id) # in driver - assert dummy_driver_id == ray._get_runtime_context().current_driver_id + assert dummy_driver_id == ray.worker.global_worker.current_job_id # in worker @ray.remote def f(): - return ray._get_runtime_context().current_driver_id + return ray.worker.global_worker.current_job_id assert dummy_driver_id == ray.get(f.remote()) diff --git a/python/ray/worker.py b/python/ray/worker.py index eef923a34..3dba695fe 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -42,6 +42,7 @@ from ray import import_thread from ray import profiling from ray.exceptions import ( + RayConnectionError, RayError, RayTaskError, ObjectStoreFullError, @@ -108,7 +109,6 @@ class Worker: self.mode = None self.cached_functions_to_run = [] self.actor_init_error = None - self.make_actor = None self.actors = {} # Information used to maintain actor checkpoints. self.actor_checkpoint_info = {} @@ -496,10 +496,6 @@ _global_node = None """ray.node.Node: The global node object that is created by ray.init().""" -class RayConnectionError(Exception): - pass - - def print_failed_task(task_status): """Print information about failed tasks. @@ -886,6 +882,7 @@ def shutdown(exiting_interpreter=False): atexit.register(shutdown, True) +# TODO(edoakes): this should only be set in the driver. def sigterm_handler(signum, frame): sys.exit(signal.SIGTERM) @@ -1675,7 +1672,8 @@ def kill(actor): raise ValueError("ray.kill() only supported for actors. " "Got: {}.".format(type(actor))) - worker = ray.worker.get_global_worker() + worker = ray.worker.global_worker + worker.check_connected() worker.core_worker.kill_actor(actor._ray_actor_id, False) @@ -1690,10 +1688,6 @@ def _mode(worker=global_worker): return worker.mode -def get_global_worker(): - return global_worker - - def make_decorator(num_return_vals=None, num_cpus=None, num_gpus=None, @@ -1725,9 +1719,9 @@ def make_decorator(num_return_vals=None, raise TypeError("The keyword 'max_calls' is not " "allowed for actors.") - return worker.make_actor(function_or_class, num_cpus, num_gpus, - memory, object_store_memory, resources, - max_reconstructions) + return ray.actor.make_actor(function_or_class, num_cpus, num_gpus, + memory, object_store_memory, resources, + max_reconstructions) raise TypeError("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -1815,7 +1809,7 @@ def remote(*args, **kwargs): work and then shut down. If you want to kill them immediately, you can also call ``ray.kill(actor)``. """ - worker = get_global_worker() + worker = global_worker if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # This is the case where the decorator is just @ray.remote. diff --git a/streaming/python/tests/test_direct_transfer.py b/streaming/python/tests/test_direct_transfer.py index 57805cbfd..dffdb0554 100644 --- a/streaming/python/tests/test_direct_transfer.py +++ b/streaming/python/tests/test_direct_transfer.py @@ -33,8 +33,7 @@ class Worker: def init_writer(self, output_channel, reader_actor): conf = { - Config.TASK_JOB_ID: ray.runtime_context._get_runtime_context() - .current_driver_id, + Config.TASK_JOB_ID: ray.worker.global_worker.current_job_id, Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL } self.writer = transfer.DataWriter([output_channel], @@ -43,8 +42,7 @@ class Worker: def init_reader(self, input_channel, writer_actor): conf = { - Config.TASK_JOB_ID: ray.runtime_context._get_runtime_context() - .current_driver_id, + Config.TASK_JOB_ID: ray.worker.global_worker.current_job_id, Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL } self.reader = transfer.DataReader([input_channel],