mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 05:25:28 +08:00
Allow more fields for object metadata (#12484)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package io.ray.runtime.object;
|
||||
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import io.ray.api.id.ObjectId;
|
||||
import io.ray.runtime.actor.NativeActorHandle;
|
||||
@@ -64,21 +65,21 @@ public class ObjectSerializer {
|
||||
|
||||
if (meta != null && meta.length > 0) {
|
||||
// If meta is not null, deserialize the object from meta.
|
||||
if (Arrays.equals(meta, OBJECT_METADATA_TYPE_RAW)) {
|
||||
if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_RAW) == 0) {
|
||||
if (objectType == ByteBuffer.class) {
|
||||
return ByteBuffer.wrap(data);
|
||||
}
|
||||
return data;
|
||||
} else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_CROSS_LANGUAGE) ||
|
||||
Arrays.equals(meta, OBJECT_METADATA_TYPE_JAVA)) {
|
||||
} else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_CROSS_LANGUAGE) == 0 ||
|
||||
Bytes.indexOf(meta, OBJECT_METADATA_TYPE_JAVA) == 0) {
|
||||
return Serializer.decode(data, objectType);
|
||||
} else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) {
|
||||
} else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) {
|
||||
return new RayWorkerException();
|
||||
} else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) {
|
||||
} else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) {
|
||||
return new RayActorException(IdUtil.getActorIdFromObjectId(objectId));
|
||||
} else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) {
|
||||
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) {
|
||||
return new UnreconstructableException(objectId);
|
||||
} else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) {
|
||||
} else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) {
|
||||
// Serialization logic of task execution exception: an instance of
|
||||
// `io.ray.runtime.exception.RayTaskException`
|
||||
// -> a `RayException` protobuf message
|
||||
@@ -94,10 +95,10 @@ public class ObjectSerializer {
|
||||
"Can't deserialize RayTaskException object: " + objectId
|
||||
.toString());
|
||||
}
|
||||
} else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE)) {
|
||||
} else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) {
|
||||
byte[] serialized = Serializer.decode(data, byte[].class);
|
||||
return NativeActorHandle.fromBytes(serialized);
|
||||
} else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_PYTHON)) {
|
||||
} else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_PYTHON) == 0) {
|
||||
throw new IllegalArgumentException("Can't deserialize Python object: " + objectId
|
||||
.toString());
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.ray.runtime.task;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Bytes;
|
||||
import io.ray.api.ObjectRef;
|
||||
import io.ray.api.Ray;
|
||||
import io.ray.api.id.ObjectId;
|
||||
@@ -50,9 +51,12 @@ public class ArgumentsBuilder {
|
||||
value = ObjectSerializer.serialize(arg);
|
||||
if (language != Language.JAVA) {
|
||||
boolean isCrossData =
|
||||
Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_CROSS_LANGUAGE) ||
|
||||
Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_RAW) ||
|
||||
Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_ACTOR_HANDLE);
|
||||
Bytes.indexOf(value.metadata,
|
||||
ObjectSerializer.OBJECT_METADATA_TYPE_CROSS_LANGUAGE) == 0 ||
|
||||
Bytes.indexOf(value.metadata,
|
||||
ObjectSerializer.OBJECT_METADATA_TYPE_RAW) == 0 ||
|
||||
Bytes.indexOf(value.metadata,
|
||||
ObjectSerializer.OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0;
|
||||
if (!isCrossData) {
|
||||
throw new IllegalArgumentException(String.format("Can't transfer %s data to %s",
|
||||
Arrays.toString(value.metadata), language.getValueDescriptor().getName()));
|
||||
|
||||
@@ -292,13 +292,14 @@ cdef prepare_args(
|
||||
else:
|
||||
serialized_arg = worker.get_serialization_context().serialize(arg)
|
||||
metadata = serialized_arg.metadata
|
||||
metadata_fields = metadata.split(b",")
|
||||
if language != Language.PYTHON:
|
||||
if metadata not in [
|
||||
if metadata_fields[0] not in [
|
||||
ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE,
|
||||
ray_constants.OBJECT_METADATA_TYPE_RAW,
|
||||
ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE]:
|
||||
raise Exception("Can't transfer {} data to {}".format(
|
||||
metadata, language))
|
||||
metadata_fields[0], language))
|
||||
size = serialized_arg.total_bytes
|
||||
|
||||
# TODO(edoakes): any objects containing ObjectRefs are spilled to
|
||||
|
||||
@@ -178,6 +178,11 @@ WORKER_PROCESS_TYPE_RESTORE_WORKER = (
|
||||
|
||||
LOG_MONITOR_MAX_OPEN_FILES = 200
|
||||
|
||||
# The object metadata field uses the following format: It is a comma
|
||||
# separated list of fields. The first field is mandatory and is the
|
||||
# type of the object (see types below) or an integer, which is interpreted
|
||||
# as an error value.
|
||||
|
||||
# A constant used as object metadata to indicate the object is cross language.
|
||||
OBJECT_METADATA_TYPE_CROSS_LANGUAGE = b"XLANG"
|
||||
# A constant used as object metadata to indicate the object is python specific.
|
||||
|
||||
@@ -219,10 +219,10 @@ class SerializationContext:
|
||||
raise DeserializationError()
|
||||
return obj
|
||||
|
||||
def _deserialize_msgpack_data(self, data, metadata):
|
||||
def _deserialize_msgpack_data(self, data, metadata_fields):
|
||||
msgpack_data, pickle5_data = split_buffer(data)
|
||||
|
||||
if metadata == ray_constants.OBJECT_METADATA_TYPE_PYTHON:
|
||||
if metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_PYTHON:
|
||||
python_objects = self._deserialize_pickle5_data(pickle5_data)
|
||||
else:
|
||||
python_objects = []
|
||||
@@ -240,23 +240,25 @@ class SerializationContext:
|
||||
|
||||
def _deserialize_object(self, data, metadata, object_ref):
|
||||
if metadata:
|
||||
if metadata in [
|
||||
metadata_fields = metadata.split(b",")
|
||||
if metadata_fields[0] in [
|
||||
ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE,
|
||||
ray_constants.OBJECT_METADATA_TYPE_PYTHON
|
||||
]:
|
||||
return self._deserialize_msgpack_data(data, metadata)
|
||||
return self._deserialize_msgpack_data(data, metadata_fields)
|
||||
# Check if the object should be returned as raw bytes.
|
||||
if metadata == ray_constants.OBJECT_METADATA_TYPE_RAW:
|
||||
if metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_RAW:
|
||||
if data is None:
|
||||
return b""
|
||||
return data.to_pybytes()
|
||||
elif metadata == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE:
|
||||
obj = self._deserialize_msgpack_data(data, metadata)
|
||||
elif metadata_fields[
|
||||
0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE:
|
||||
obj = self._deserialize_msgpack_data(data, metadata_fields)
|
||||
return actor_handle_deserializer(obj)
|
||||
# Otherwise, return an exception object based on
|
||||
# the error type.
|
||||
try:
|
||||
error_type = int(metadata)
|
||||
error_type = int(metadata_fields[0])
|
||||
except Exception:
|
||||
raise Exception(f"Can't deserialize object: {object_ref}, "
|
||||
f"metadata: {metadata}")
|
||||
@@ -265,7 +267,7 @@ class SerializationContext:
|
||||
# TODO (kfstorm): exception serialization should be language
|
||||
# independent.
|
||||
if error_type == ErrorType.Value("TASK_EXECUTION_EXCEPTION"):
|
||||
obj = self._deserialize_msgpack_data(data, metadata)
|
||||
obj = self._deserialize_msgpack_data(data, metadata_fields)
|
||||
return RayError.from_bytes(obj)
|
||||
elif error_type == ErrorType.Value("WORKER_DIED"):
|
||||
return WorkerCrashedError()
|
||||
|
||||
Reference in New Issue
Block a user