Propagate errors from importing actors. (#309)

* Propagate errors from importing actors.

* Fix bug.
This commit is contained in:
Robert Nishihara
2017-02-22 15:15:45 -08:00
committed by Philipp Moritz
parent a6bf16f6a9
commit 54238c4ad0
2 changed files with 84 additions and 6 deletions
+28 -6
View File
@@ -8,6 +8,7 @@ import json
import numpy as np
import photon
import random
import traceback
import ray.pickling as pickling
import ray.worker
@@ -48,17 +49,37 @@ def get_actor_method_function_id(attr):
def fetch_and_register_actor(key, worker):
"""Import an actor."""
driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids"])
driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"])
actor_id = photon.ObjectID(actor_id_str)
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
actor_method_names = json.loads(actor_method_names.decode("ascii"))
global gpu_ids
gpu_ids = json.loads(assigned_gpu_ids.decode("ascii"))
# Create a temporary actor with some temporary methods so that if the actor
# fails to be unpickled, the temporary actor can be used (just to produce
# error messages and to prevent the driver from hanging).
class TemporaryActor(object):
pass
worker.actors[actor_id_str] = TemporaryActor()
def temporary_actor_method(*xs):
raise Exception("The actor with name {} failed to be imported, and so "
"cannot execute this method".format(actor_name))
for actor_method_name in actor_method_names:
function_id = get_actor_method_function_id(actor_method_name).id()
worker.functions[driver_id][function_id] = (actor_method_name, temporary_actor_method)
try:
unpickled_class = pickling.loads(pickled_class)
except:
raise NotImplemented("TODO(pcm)")
except Exception:
# If an exception was thrown when the actor was imported, we record the
# traceback and notify the scheduler of the failure.
traceback_str = ray.worker.format_error_message(traceback.format_exc())
# Log the error message.
worker.push_error_to_driver(driver_id, "register_actor", traceback_str,
data={"actor_id": actor_id.id()})
else:
# TODO(pcm): Why is the below line necessary?
unpickled_class.__module__ = module
@@ -150,7 +171,8 @@ def export_actor(actor_id, Class, actor_method_names, num_cpus, num_gpus, worker
"name": Class.__name__,
"module": Class.__module__,
"class": pickled_class,
"gpu_ids": json.dumps(gpu_ids)}
"gpu_ids": json.dumps(gpu_ids),
"actor_method_names": json.dumps(list(actor_method_names))}
worker.redis_client.hmset(key, d)
worker.redis_client.rpush("Exports", key)
@@ -178,7 +200,7 @@ def actor(*args, **kwargs):
def __init__(self, *args, **kwargs):
self._ray_actor_id = random_actor_id()
self._ray_actor_methods = {k: v for (k, v) in inspect.getmembers(Class, predicate=(lambda x: inspect.isfunction(x) or inspect.ismethod(x)))}
export_actor(self._ray_actor_id, Class, self._ray_actor_methods, num_cpus, num_gpus, ray.worker.global_worker)
export_actor(self._ray_actor_id, Class, self._ray_actor_methods.keys(), num_cpus, num_gpus, ray.worker.global_worker)
# Call __init__ as a remote function.
if "__init__" in self._ray_actor_methods.keys():
actor_method_call(self._ray_actor_id, "__init__", *args, **kwargs)