diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 659f18dec..f89bfcf3e 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -509,20 +509,29 @@ class FunctionActorManager(object): """ # We set the driver ID here because it may not have been available when # the actor class was defined. - actor_class_info["driver_id"] = self._worker.task_driver_id.id() self._worker.redis_client.hmset(key, actor_class_info) self._worker.redis_client.rpush("Exports", key) def export_actor_class(self, Class, actor_method_names, checkpoint_interval): function_descriptor = FunctionDescriptor.from_class(Class) - key = (b"ActorClass:" + self._worker.task_driver_id.id() + b":" + + # `task_driver_id` shouldn't be NIL, unless: + # 1) This worker isn't an actor; + # 2) And a previous task started a background thread, which didn't + # finish before the task finished, and still uses Ray API + # after that. + assert not self._worker.task_driver_id.is_nil(), ( + "You might have started a background thread in a non-actor task, " + "please make sure the thread finishes before the task finishes.") + driver_id = self._worker.task_driver_id + key = (b"ActorClass:" + driver_id.id() + b":" + function_descriptor.function_id.id()) actor_class_info = { "class_name": Class.__name__, "module": Class.__module__, "class": pickle.dumps(Class), "checkpoint_interval": checkpoint_interval, + "driver_id": driver_id.id(), "actor_method_names": json.dumps(list(actor_method_names)) } diff --git a/python/ray/utils.py b/python/ray/utils.py index cb3c33ecc..8c61607a0 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -6,6 +6,7 @@ import binascii import functools import hashlib import inspect +import logging import numpy as np import os import subprocess @@ -18,6 +19,8 @@ import ray.gcs_utils import ray.raylet import ray.ray_constants as ray_constants +logger = logging.getLogger(__name__) + def _random_string(): id_hash = hashlib.sha1() @@ -69,6 +72,8 @@ def push_error_to_driver(worker, if driver_id is None: driver_id = ray_constants.NIL_JOB_ID.id() data = {} if data is None else data + logging.error("Pushing error to dirver, type: %s, message: %s.", + error_type, message) worker.raylet_client.push_error( ray.ObjectID(driver_id), error_type, message, time.time()) diff --git a/python/ray/worker.py b/python/ray/worker.py index d8590f4a0..bd0da2ed7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -618,7 +618,8 @@ class Worker(object): task_index = self.task_index self.task_index += 1 # The parent task must be set for the submitted task. - assert not self.current_task_id.is_nil() + if self.actor_id == NIL_ACTOR_ID: + assert not self.current_task_id.is_nil() # Submit the task to local scheduler. function_descriptor_list = ( function_descriptor.get_function_descriptor_list()) @@ -766,23 +767,30 @@ class Worker(object): use the outputs of this task). """ with self.state_lock: - assert self.task_driver_id.is_nil() assert self.current_task_id.is_nil() assert self.task_index == 0 assert self.put_index == 1 + if task.actor_id().is_nil(): + # If this worker is not an actor, check that `task_driver_id` + # was reset when the worker finished the previous task. + assert self.task_driver_id.is_nil() + # Set the driver ID of the current running task. This is + # needed so that if the task throws an exception, we propagate + # the error message to the correct driver. + self.task_driver_id = task.driver_id() + else: + # If this worker is an actor, task_driver_id wasn't reset. + # Check that current task's driver ID equals the previous one. + assert self.task_driver_id == task.driver_id() - # The ID of the driver that this task belongs to. This is needed so - # that if the task throws an exception, we propagate the error - # message to the correct driver. - self.task_driver_id = task.driver_id() self.current_task_id = task.task_id() function_descriptor = FunctionDescriptor.from_bytes_list( task.function_descriptor_list()) args = task.arguments() return_object_ids = task.returns() - if (task.actor_id().id() != NIL_ACTOR_ID - or task.actor_creation_id().id() != NIL_ACTOR_ID): + if (not task.actor_id().is_nil() + or not task.actor_creation_id().is_nil()): dummy_return_id = return_object_ids.pop() function_executor = function_execution_info.function function_name = function_execution_info.function_name @@ -809,11 +817,11 @@ class Worker(object): # Execute the task. try: with profiling.profile("task:execute", worker=self): - if (task.actor_id().id() == NIL_ACTOR_ID - and task.actor_creation_id().id() == NIL_ACTOR_ID): + if (task.actor_id().is_nil() + and task.actor_creation_id().is_nil()): outputs = function_executor(*arguments) else: - if task.actor_id().id() != NIL_ACTOR_ID: + if not task.actor_id().is_nil(): key = task.actor_id().id() else: key = task.actor_creation_id().id() @@ -822,7 +830,7 @@ class Worker(object): except Exception as e: # Determine whether the exception occured during a task, not an # actor method. - task_exception = task.actor_id().id() == NIL_ACTOR_ID + task_exception = task.actor_id().is_nil() traceback_str = ray.utils.format_error_message( traceback.format_exc(), task_exception=task_exception) self._handle_process_task_failure( @@ -881,7 +889,7 @@ class Worker(object): # TODO(rkn): It would be preferable for actor creation tasks to share # more of the code path with regular task execution. - if (task.actor_creation_id() != ray.ObjectID(NIL_ACTOR_ID)): + if not task.actor_creation_id().is_nil(): assert self.actor_id == NIL_ACTOR_ID self.actor_id = task.actor_creation_id().id() self.function_actor_manager.load_actor(driver_id, @@ -901,8 +909,8 @@ class Worker(object): "name": function_name, "task_id": task.task_id().hex() } - if task.actor_id().id() == NIL_ACTOR_ID: - if (task.actor_creation_id() == ray.ObjectID(NIL_ACTOR_ID)): + if task.actor_id().is_nil(): + if task.actor_creation_id().is_nil(): title = "ray_worker:{}()".format(function_name) next_title = "ray_worker" else: @@ -920,7 +928,9 @@ class Worker(object): self._process_task(task, execution_info) # Reset the state fields so the next task can run. with self.state_lock: - self.task_driver_id = ray.ObjectID(NIL_ID) + if self.actor_id == NIL_ACTOR_ID: + # We will keep task_driver_id unchanged for actor. + self.task_driver_id = ray.ObjectID(NIL_ID) self.current_task_id = ray.ObjectID(NIL_ID) self.task_index = 0 self.put_index = 1