mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 21:12:15 +08:00
Several small fixes for function_manager (#7685)
This commit is contained in:
@@ -158,36 +158,37 @@ class FunctionActorManager:
|
||||
def fetch_and_register_remote_function(self, key):
|
||||
"""Import a remote function."""
|
||||
(job_id_str, function_id_str, function_name, serialized_function,
|
||||
num_return_vals, module, resources,
|
||||
max_calls) = self._worker.redis_client.hmget(key, [
|
||||
"job_id", "function_id", "function_name", "function",
|
||||
"num_return_vals", "module", "resources", "max_calls"
|
||||
])
|
||||
module, max_calls) = self._worker.redis_client.hmget(
|
||||
key, [
|
||||
"job_id", "function_id", "function_name", "function",
|
||||
"module", "max_calls"
|
||||
])
|
||||
function_id = ray.FunctionID(function_id_str)
|
||||
job_id = ray.JobID(job_id_str)
|
||||
function_name = decode(function_name)
|
||||
max_calls = int(max_calls)
|
||||
module = decode(module)
|
||||
|
||||
# This is a placeholder in case the function can't be unpickled. This
|
||||
# will be overwritten if the function is successfully registered.
|
||||
def f(*args, **kwargs):
|
||||
raise RuntimeError("This function was not imported properly.")
|
||||
|
||||
# This function is called by ImportThread. This operation needs to be
|
||||
# atomic. Otherwise, there is race condition. Another thread may use
|
||||
# the temporary function above before the real function is ready.
|
||||
with self.lock:
|
||||
self._function_execution_info[job_id][function_id] = (
|
||||
FunctionExecutionInfo(
|
||||
function=f,
|
||||
function_name=function_name,
|
||||
max_calls=max_calls))
|
||||
self._num_task_executions[job_id][function_id] = 0
|
||||
|
||||
try:
|
||||
function = pickle.loads(serialized_function)
|
||||
except Exception:
|
||||
|
||||
def f(*args, **kwargs):
|
||||
raise RuntimeError(
|
||||
"This function was not imported properly.")
|
||||
|
||||
# Use a placeholder method when function pickled failed
|
||||
self._function_execution_info[job_id][function_id] = (
|
||||
FunctionExecutionInfo(
|
||||
function=f,
|
||||
function_name=function_name,
|
||||
max_calls=max_calls))
|
||||
# If an exception was thrown when the remote function was
|
||||
# imported, we record the traceback and notify the scheduler
|
||||
# of the failure.
|
||||
@@ -257,7 +258,7 @@ class FunctionActorManager:
|
||||
assert not function_descriptor.is_actor_method()
|
||||
function_id = function_descriptor.function_id
|
||||
if (job_id in self._function_execution_info
|
||||
and function_id in self._function_execution_info[function_id]):
|
||||
and function_id in self._function_execution_info[job_id]):
|
||||
return
|
||||
module_name, function_name = (
|
||||
function_descriptor.module_name,
|
||||
|
||||
Reference in New Issue
Block a user