diff --git a/python/ray/runtime_context.py b/python/ray/runtime_context.py index 8cdb06481..318824ed3 100644 --- a/python/ray/runtime_context.py +++ b/python/ray/runtime_context.py @@ -11,28 +11,106 @@ class RuntimeContext(object): assert worker is not None self.worker = worker + def get(self): + """Get a dictionary of the current_context. + + For fields that are not available (for example actor id inside a task) + won't be included in the field. + + Returns: + dict: Dictionary of the current context. + """ + context = { + "job_id": self.job_id, + "node_id": self.node_id, + "task_id": self.task_id, + "actor_id": self.actor_id + } + # Remove fields that are None. + return { + key: value + for key, value in context.items() if value is not None + } + @property - def current_job_id(self): + def job_id(self): """Get current job ID for this worker or driver. + Job ID is the id of your Ray drivers that create tasks or actors. + Returns: If called by a driver, this returns the job ID. If called in a task, return the job ID of the associated driver. """ - return self.worker.current_job_id + job_id = self.worker.current_job_id + assert not job_id.is_nil() + return job_id @property - def current_actor_id(self): - """Get the current actor ID in this worker. + def node_id(self): + """Get current node ID for this worker or driver. + + Node ID is the id of a node that your driver, task, or actor runs. Returns: - The current driver id in this worker. + a node id for this worker or driver. + """ + node_id = self.worker.current_node_id + assert not node_id.is_nil() + return node_id + + @property + def task_id(self): + """Get current task ID for this worker or driver. + + Task ID is the id of a Ray task. + This shouldn't be used in a driver process. + + Example: + + >>> @ray.remote + >>> class Actor: + >>> def ready(self): + >>> return True + >>> + >>> @ray.remote + >>> def f(): + >>> return True + >>> + >>> # All the below code will generate different task ids. + >>> # Task ids are available for actor creation. + >>> a = Actor.remote() + >>> # Task ids are available for actor tasks. + >>> a.ready.remote() + >>> # Task ids are available for normal tasks. + >>> f.remote() + + Returns: + The current worker's task id. None if there's no task id. """ # only worker mode has actor_id assert self.worker.mode == ray.worker.WORKER_MODE, ( f"This method is only available when the process is a\ worker. Current mode: {self.worker.mode}") - return self.worker.actor_id + task_id = self.worker.current_task_id + return task_id if not task_id.is_nil() else None + + @property + def actor_id(self): + """Get the current actor ID in this worker. + + ID of the actor of the current process. + This shouldn't be used in a driver process. + + Returns: + The current actor id in this worker. None if there's no actor id. + """ + # only worker mode has actor_id + assert self.worker.mode == ray.worker.WORKER_MODE, ( + f"This method is only available when the process is a\ + worker. Current mode: {self.worker.mode}") + actor_id = self.worker.actor_id + return actor_id if not actor_id.is_nil() else None @property def was_current_actor_reconstructed(self): @@ -41,8 +119,9 @@ class RuntimeContext(object): Returns: Whether this actor has been ever restarted. """ - # TODO: this method should not be called in a normal task. - actor_info = ray.state.actors(self.current_actor_id.hex()) + assert not self.actor_id.is_nil(), ( + "This method should't be called inside Ray tasks.") + actor_info = ray.state.actors(self.actor_id.hex()) return actor_info and actor_info["NumRestarts"] != 0 @property diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index b71516b1c..026357628 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -5,7 +5,7 @@ import time import sys -def test_was_current_actor_reconstructed(): +def test_was_current_actor_reconstructed(shutdown_only): ray.init() @ray.remote(max_restarts=10) @@ -51,24 +51,30 @@ def test_was_current_actor_reconstructed(): assert ray.get(a.get_was_reconstructed.remote()) is True assert ray.get(a.update_was_reconstructed.remote()) is True - ray.shutdown() - - -def test_runtime_context_interface(): - ray.init() - @ray.remote(max_restarts=10) class A(object): def current_job_id(self): - return ray.get_runtime_context().current_job_id + return ray.get_runtime_context().job_id def current_actor_id(self): - return ray.get_runtime_context().current_actor_id + return ray.get_runtime_context().actor_id + + @ray.remote + def f(): + assert ray.get_runtime_context().actor_id is None + assert ray.get_runtime_context().task_id is not None + assert ray.get_runtime_context().node_id is not None + assert ray.get_runtime_context().job_id is not None + context = ray.get_runtime_context().get() + assert "actor_id" not in context + assert context["task_id"] == ray.get_runtime_context().task_id + assert context["node_id"] == ray.get_runtime_context().node_id + assert context["job_id"] == ray.get_runtime_context().job_id a = A.remote() assert ray.get(a.current_job_id.remote()) is not None assert ray.get(a.current_actor_id.remote()) is not None - ray.shutdown() + ray.get(f.remote()) if __name__ == "__main__": diff --git a/python/ray/worker.py b/python/ray/worker.py index 9b365f445..d955fd813 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -155,6 +155,10 @@ class Worker: def current_task_id(self): return self.core_worker.get_current_task_id() + @property + def current_node_id(self): + return self.core_worker.get_current_node_id() + @property def placement_group_id(self): return self.core_worker.get_placement_group_id()