From cf73ccddae5178df3f9fbacdaa4b5744b9daf749 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 29 Nov 2020 21:50:18 -0800 Subject: [PATCH] Allow more fields for object metadata (#12484) --- .../ray/runtime/object/ObjectSerializer.java | 19 +++++++++--------- .../io/ray/runtime/task/ArgumentsBuilder.java | 10 +++++++--- python/ray/_raylet.pyx | 5 +++-- python/ray/ray_constants.py | 5 +++++ python/ray/serialization.py | 20 ++++++++++--------- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java index f26b20b68..e72f393d6 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java @@ -1,5 +1,6 @@ package io.ray.runtime.object; +import com.google.common.primitives.Bytes; import com.google.protobuf.InvalidProtocolBufferException; import io.ray.api.id.ObjectId; import io.ray.runtime.actor.NativeActorHandle; @@ -64,21 +65,21 @@ public class ObjectSerializer { if (meta != null && meta.length > 0) { // If meta is not null, deserialize the object from meta. - if (Arrays.equals(meta, OBJECT_METADATA_TYPE_RAW)) { + if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_RAW) == 0) { if (objectType == ByteBuffer.class) { return ByteBuffer.wrap(data); } return data; - } else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_CROSS_LANGUAGE) || - Arrays.equals(meta, OBJECT_METADATA_TYPE_JAVA)) { + } else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_CROSS_LANGUAGE) == 0 || + Bytes.indexOf(meta, OBJECT_METADATA_TYPE_JAVA) == 0) { return Serializer.decode(data, objectType); - } else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) { + } else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) { return new RayWorkerException(); - } else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) { + } else if (Bytes.indexOf(meta, ACTOR_EXCEPTION_META) == 0) { return new RayActorException(IdUtil.getActorIdFromObjectId(objectId)); - } else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) { + } else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0) { return new UnreconstructableException(objectId); - } else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) { + } else if (Bytes.indexOf(meta, TASK_EXECUTION_EXCEPTION_META) == 0) { // Serialization logic of task execution exception: an instance of // `io.ray.runtime.exception.RayTaskException` // -> a `RayException` protobuf message @@ -94,10 +95,10 @@ public class ObjectSerializer { "Can't deserialize RayTaskException object: " + objectId .toString()); } - } else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE)) { + } else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0) { byte[] serialized = Serializer.decode(data, byte[].class); return NativeActorHandle.fromBytes(serialized); - } else if (Arrays.equals(meta, OBJECT_METADATA_TYPE_PYTHON)) { + } else if (Bytes.indexOf(meta, OBJECT_METADATA_TYPE_PYTHON) == 0) { throw new IllegalArgumentException("Can't deserialize Python object: " + objectId .toString()); } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java b/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java index 6c90b552e..1e7d2adea 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java @@ -1,6 +1,7 @@ package io.ray.runtime.task; import com.google.common.base.Preconditions; +import com.google.common.primitives.Bytes; import io.ray.api.ObjectRef; import io.ray.api.Ray; import io.ray.api.id.ObjectId; @@ -50,9 +51,12 @@ public class ArgumentsBuilder { value = ObjectSerializer.serialize(arg); if (language != Language.JAVA) { boolean isCrossData = - Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_CROSS_LANGUAGE) || - Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_RAW) || - Arrays.equals(value.metadata, ObjectSerializer.OBJECT_METADATA_TYPE_ACTOR_HANDLE); + Bytes.indexOf(value.metadata, + ObjectSerializer.OBJECT_METADATA_TYPE_CROSS_LANGUAGE) == 0 || + Bytes.indexOf(value.metadata, + ObjectSerializer.OBJECT_METADATA_TYPE_RAW) == 0 || + Bytes.indexOf(value.metadata, + ObjectSerializer.OBJECT_METADATA_TYPE_ACTOR_HANDLE) == 0; if (!isCrossData) { throw new IllegalArgumentException(String.format("Can't transfer %s data to %s", Arrays.toString(value.metadata), language.getValueDescriptor().getName())); diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b210b243e..67cfeac61 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -292,13 +292,14 @@ cdef prepare_args( else: serialized_arg = worker.get_serialization_context().serialize(arg) metadata = serialized_arg.metadata + metadata_fields = metadata.split(b",") if language != Language.PYTHON: - if metadata not in [ + if metadata_fields[0] not in [ ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE, ray_constants.OBJECT_METADATA_TYPE_RAW, ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE]: raise Exception("Can't transfer {} data to {}".format( - metadata, language)) + metadata_fields[0], language)) size = serialized_arg.total_bytes # TODO(edoakes): any objects containing ObjectRefs are spilled to diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 9e39b3e7a..f0c7fce69 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -178,6 +178,11 @@ WORKER_PROCESS_TYPE_RESTORE_WORKER = ( LOG_MONITOR_MAX_OPEN_FILES = 200 +# The object metadata field uses the following format: It is a comma +# separated list of fields. The first field is mandatory and is the +# type of the object (see types below) or an integer, which is interpreted +# as an error value. + # A constant used as object metadata to indicate the object is cross language. OBJECT_METADATA_TYPE_CROSS_LANGUAGE = b"XLANG" # A constant used as object metadata to indicate the object is python specific. diff --git a/python/ray/serialization.py b/python/ray/serialization.py index f85b07afa..dc9a2c40e 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -219,10 +219,10 @@ class SerializationContext: raise DeserializationError() return obj - def _deserialize_msgpack_data(self, data, metadata): + def _deserialize_msgpack_data(self, data, metadata_fields): msgpack_data, pickle5_data = split_buffer(data) - if metadata == ray_constants.OBJECT_METADATA_TYPE_PYTHON: + if metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_PYTHON: python_objects = self._deserialize_pickle5_data(pickle5_data) else: python_objects = [] @@ -240,23 +240,25 @@ class SerializationContext: def _deserialize_object(self, data, metadata, object_ref): if metadata: - if metadata in [ + metadata_fields = metadata.split(b",") + if metadata_fields[0] in [ ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE, ray_constants.OBJECT_METADATA_TYPE_PYTHON ]: - return self._deserialize_msgpack_data(data, metadata) + return self._deserialize_msgpack_data(data, metadata_fields) # Check if the object should be returned as raw bytes. - if metadata == ray_constants.OBJECT_METADATA_TYPE_RAW: + if metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_RAW: if data is None: return b"" return data.to_pybytes() - elif metadata == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE: - obj = self._deserialize_msgpack_data(data, metadata) + elif metadata_fields[ + 0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE: + obj = self._deserialize_msgpack_data(data, metadata_fields) return actor_handle_deserializer(obj) # Otherwise, return an exception object based on # the error type. try: - error_type = int(metadata) + error_type = int(metadata_fields[0]) except Exception: raise Exception(f"Can't deserialize object: {object_ref}, " f"metadata: {metadata}") @@ -265,7 +267,7 @@ class SerializationContext: # TODO (kfstorm): exception serialization should be language # independent. if error_type == ErrorType.Value("TASK_EXECUTION_EXCEPTION"): - obj = self._deserialize_msgpack_data(data, metadata) + obj = self._deserialize_msgpack_data(data, metadata_fields) return RayError.from_bytes(obj) elif error_type == ErrorType.Value("WORKER_DIED"): return WorkerCrashedError()