diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index e3b212312..793e5118f 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -158,36 +158,37 @@ class FunctionActorManager: def fetch_and_register_remote_function(self, key): """Import a remote function.""" (job_id_str, function_id_str, function_name, serialized_function, - num_return_vals, module, resources, - max_calls) = self._worker.redis_client.hmget(key, [ - "job_id", "function_id", "function_name", "function", - "num_return_vals", "module", "resources", "max_calls" - ]) + module, max_calls) = self._worker.redis_client.hmget( + key, [ + "job_id", "function_id", "function_name", "function", + "module", "max_calls" + ]) function_id = ray.FunctionID(function_id_str) job_id = ray.JobID(job_id_str) function_name = decode(function_name) max_calls = int(max_calls) module = decode(module) - # This is a placeholder in case the function can't be unpickled. This - # will be overwritten if the function is successfully registered. - def f(*args, **kwargs): - raise RuntimeError("This function was not imported properly.") - # This function is called by ImportThread. This operation needs to be # atomic. Otherwise, there is race condition. Another thread may use # the temporary function above before the real function is ready. with self.lock: - self._function_execution_info[job_id][function_id] = ( - FunctionExecutionInfo( - function=f, - function_name=function_name, - max_calls=max_calls)) self._num_task_executions[job_id][function_id] = 0 try: function = pickle.loads(serialized_function) except Exception: + + def f(*args, **kwargs): + raise RuntimeError( + "This function was not imported properly.") + + # Use a placeholder method when function pickled failed + self._function_execution_info[job_id][function_id] = ( + FunctionExecutionInfo( + function=f, + function_name=function_name, + max_calls=max_calls)) # If an exception was thrown when the remote function was # imported, we record the traceback and notify the scheduler # of the failure. @@ -257,7 +258,7 @@ class FunctionActorManager: assert not function_descriptor.is_actor_method() function_id = function_descriptor.function_id if (job_id in self._function_execution_info - and function_id in self._function_execution_info[function_id]): + and function_id in self._function_execution_info[job_id]): return module_name, function_name = ( function_descriptor.module_name,