fetching objects in parallel in _get_arguments_for_execution (#4775)

This commit is contained in:
Akshat Gokhale
2019-06-02 12:05:48 +05:30
committed by Robert Nishihara
parent 665d081fe9
commit d86ee8c83e
+16 -7
View File
@@ -782,18 +782,27 @@ class Worker(object):
RayError: This exception is raised if a task that
created one of the arguments failed.
"""
arguments = []
arguments = [None] * len(serialized_args)
object_ids = []
object_indices = []
for (i, arg) in enumerate(serialized_args):
if isinstance(arg, ObjectID):
# get the object from the local object store
argument = self.get_object([arg])[0]
if isinstance(argument, RayError):
raise argument
object_ids.append(arg)
object_indices.append(i)
else:
# pass the argument by value
argument = arg
arguments[i] = arg
# Get the objects from the local object store.
if len(object_ids) > 0:
values = self.get_object(object_ids)
for i, value in enumerate(values):
if isinstance(value, RayError):
raise value
else:
arguments[object_indices[i]] = value
arguments.append(argument)
return arguments
def _store_outputs_in_object_store(self, object_ids, outputs):