mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 03:02:56 +08:00
Fix segfault for task arguments passed by value (#6214)
* Fix null data * rename
This commit is contained in:
+14
-13
@@ -425,49 +425,50 @@ cdef deserialize_args(
|
||||
const c_vector[shared_ptr[CRayObject]] &c_args,
|
||||
const c_vector[CObjectID] &arg_reference_ids):
|
||||
cdef:
|
||||
c_vector[shared_ptr[CRayObject]] by_reference_objects
|
||||
c_vector[shared_ptr[CRayObject]] objects_to_deserialize
|
||||
|
||||
if c_args.size() == 0:
|
||||
return [], {}
|
||||
|
||||
args = []
|
||||
by_reference_ids = []
|
||||
by_reference_indices = []
|
||||
ids_to_deserialize = []
|
||||
id_indices = []
|
||||
for i in range(c_args.size()):
|
||||
# Passed by value.
|
||||
if arg_reference_ids[i].IsNil():
|
||||
data = Buffer.make(c_args[i].get().GetData())
|
||||
if (c_args[i].get().HasMetadata()
|
||||
and Buffer.make(
|
||||
c_args[i].get().GetMetadata()).to_pybytes()
|
||||
== RAW_BUFFER_METADATA):
|
||||
data = Buffer.make(c_args[i].get().GetData())
|
||||
args.append(data)
|
||||
elif (c_args[i].get().HasMetadata() and Buffer.make(
|
||||
c_args[i].get().GetMetadata()).to_pybytes()
|
||||
== PICKLE_BUFFER_METADATA):
|
||||
# This is a pickled "simple python value" argument.
|
||||
data = Buffer.make(c_args[i].get().GetData())
|
||||
args.append(pickle.loads(data.to_pybytes()))
|
||||
else:
|
||||
# This is a Ray object inlined by the direct task submitter.
|
||||
by_reference_ids.append(
|
||||
ids_to_deserialize.append(
|
||||
ObjectID(arg_reference_ids[i].Binary()))
|
||||
by_reference_indices.append(i)
|
||||
by_reference_objects.push_back(c_args[i])
|
||||
id_indices.append(i)
|
||||
objects_to_deserialize.push_back(c_args[i])
|
||||
args.append(None)
|
||||
# Passed by reference.
|
||||
else:
|
||||
by_reference_ids.append(
|
||||
ids_to_deserialize.append(
|
||||
ObjectID(arg_reference_ids[i].Binary()))
|
||||
by_reference_indices.append(i)
|
||||
by_reference_objects.push_back(c_args[i])
|
||||
id_indices.append(i)
|
||||
objects_to_deserialize.push_back(c_args[i])
|
||||
args.append(None)
|
||||
|
||||
data_metadata_pairs = RayObjectsToDataMetadataPairs(
|
||||
by_reference_objects)
|
||||
objects_to_deserialize)
|
||||
for i, arg in enumerate(
|
||||
ray.worker.global_worker.deserialize_objects(
|
||||
data_metadata_pairs, by_reference_ids)):
|
||||
args[by_reference_indices[i]] = arg
|
||||
data_metadata_pairs, ids_to_deserialize)):
|
||||
args[id_indices[i]] = arg
|
||||
|
||||
for arg in args:
|
||||
if isinstance(arg, RayError):
|
||||
|
||||
Reference in New Issue
Block a user