Let the worker know about remote functions that failed to unpickle. (#175)

* Let the worker know about remote functions that failed to unpickle.

* Cleanup.
This commit is contained in:
Robert Nishihara
2017-01-03 18:41:03 -08:00
committed by Philipp Moritz
parent b1e76e582e
commit 509685d240
2 changed files with 18 additions and 7 deletions
+10 -4
View File
@@ -909,6 +909,16 @@ def fetch_and_register_remote_function(key, worker=global_worker):
num_return_vals = int(num_return_vals)
module = module.decode("ascii")
function_export_counter = int(function_export_counter)
worker.function_names[function_id.id()] = function_name
worker.num_return_vals[function_id.id()] = num_return_vals
worker.function_export_counters[function_id.id()] = function_export_counter
# This is a placeholder in case the function can't be unpickled. This will be
# overwritten if the function is unpickled successfully.
def f():
raise Exception("This function was not imported properly.")
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(lambda *xs: f())
try:
function = pickling.loads(serialized_function)
except:
@@ -924,11 +934,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
else:
# TODO(rkn): Why is the below line necessary?
function.__module__ = module
function_name = "{}.{}".format(function.__module__, function.__name__)
worker.functions[function_id.id()] = remote(num_return_vals=num_return_vals, function_id=function_id)(function)
worker.function_names[function_id.id()] = function_name
worker.num_return_vals[function_id.id()] = num_return_vals
worker.function_export_counters[function_id.id()] = function_export_counter
# Add the function to the function table.
worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id)