diff --git a/python/ray/actor.py b/python/ray/actor.py index 78d3398bc..796650383 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -8,6 +8,7 @@ import json import numpy as np import photon import random +import traceback import ray.pickling as pickling import ray.worker @@ -48,17 +49,37 @@ def get_actor_method_function_id(attr): def fetch_and_register_actor(key, worker): """Import an actor.""" - driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids = \ - worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids"]) + driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \ + worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"]) actor_id = photon.ObjectID(actor_id_str) actor_name = actor_name.decode("ascii") module = module.decode("ascii") + actor_method_names = json.loads(actor_method_names.decode("ascii")) global gpu_ids gpu_ids = json.loads(assigned_gpu_ids.decode("ascii")) + + # Create a temporary actor with some temporary methods so that if the actor + # fails to be unpickled, the temporary actor can be used (just to produce + # error messages and to prevent the driver from hanging). + class TemporaryActor(object): + pass + worker.actors[actor_id_str] = TemporaryActor() + def temporary_actor_method(*xs): + raise Exception("The actor with name {} failed to be imported, and so " + "cannot execute this method".format(actor_name)) + for actor_method_name in actor_method_names: + function_id = get_actor_method_function_id(actor_method_name).id() + worker.functions[driver_id][function_id] = (actor_method_name, temporary_actor_method) + try: unpickled_class = pickling.loads(pickled_class) - except: - raise NotImplemented("TODO(pcm)") + except Exception: + # If an exception was thrown when the actor was imported, we record the + # traceback and notify the scheduler of the failure. + traceback_str = ray.worker.format_error_message(traceback.format_exc()) + # Log the error message. + worker.push_error_to_driver(driver_id, "register_actor", traceback_str, + data={"actor_id": actor_id.id()}) else: # TODO(pcm): Why is the below line necessary? unpickled_class.__module__ = module @@ -150,7 +171,8 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker "name": Class.__name__, "module": Class.__module__, "class": pickled_class, - "gpu_ids": json.dumps(gpu_ids)} + "gpu_ids": json.dumps(gpu_ids), + "actor_method_names": json.dumps(list(actor_method_names))} worker.redis_client.hmset(key, d) worker.redis_client.rpush("Exports", key) @@ -178,7 +200,7 @@ def actor(*args, **kwargs): def __init__(self, *args, **kwargs): self._ray_actor_id = random_actor_id() self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))} - export_actor(self._ray_actor_id, Class, self._ray_actor_methods, num_cpus, num_gpus, ray.worker.global_worker) + export_actor(self._ray_actor_id, Class, self._ray_actor_methods.keys(), num_cpus, num_gpus, ray.worker.global_worker) # Call __init__ as a remote function. if "__init__" in self._ray_actor_methods.keys(): actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs) diff --git a/test/failure_test.py b/test/failure_test.py index 07db58336..e504072ca 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -178,6 +178,62 @@ def temporary_helper_function(): ray.worker.cleanup() + def testFailImportingActor(self): + ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) + + # Create the contents of a temporary Python file. + temporary_python_file = """ +def temporary_helper_function(): + return 1 +""" + + f = tempfile.NamedTemporaryFile(suffix=".py") + f.write(temporary_python_file.encode("ascii")) + f.flush() + directory = os.path.dirname(f.name) + # Get the module name and strip ".py" from the end. + module_name = os.path.basename(f.name)[:-3] + sys.path.append(directory) + module = __import__(module_name) + + # Define an actor that closes over this temporary module. This should fail + # when it is unpickled. + @ray.actor + class Foo(object): + def __init__(self): + self.x = module.temporary_python_file() + def get_val(self): + return 1 + + # There should be no errors yet. + self.assertEqual(len(ray.error_info()), 0) + + # Create an actor. + foo = Foo() + + # Wait for the error to arrive. + wait_for_errors(b"register_actor", 1) + self.assertIn(b"No module named", ray.error_info()[0][b"message"]) + + # Wait for the error from when the __init__ tries to run. + wait_for_errors(b"task", 1) + self.assertIn(b"failed to be imported, and so cannot execute this method", ray.error_info()[1][b"message"]) + + # Check that if we try to get the function it throws an exception and does + # not hang. + with self.assertRaises(Exception): + ray.get(foo.get_val()) + + # Wait for the error from when the call to get_val. + wait_for_errors(b"task", 2) + self.assertIn(b"failed to be imported, and so cannot execute this method", ray.error_info()[2][b"message"]) + + f.close() + + # Clean up the junk we added to sys.path. + sys.path.pop(-1) + ray.worker.cleanup() + class ActorTest(unittest.TestCase): def testFailedActorInit(self):