From 2fb7d7846f18e7005560cce493e4dd13642a309a Mon Sep 17 00:00:00 2001 From: Si-Yuan Date: Thu, 3 Oct 2019 09:20:26 -0700 Subject: [PATCH] Initial implementation of Cython pickle5 support (#5725) --- BUILD.bazel | 11 + python/ray/_raylet.pyx | 32 +++ python/ray/includes/serialization.pxi | 334 ++++++++++++++++++++++++++ python/ray/ray_constants.py | 1 + python/ray/worker.py | 30 +-- src/ray/protobuf/serialization.proto | 82 +++++++ 6 files changed, 470 insertions(+), 20 deletions(-) create mode 100644 python/ray/includes/serialization.pxi create mode 100644 src/ray/protobuf/serialization.proto diff --git a/BUILD.bazel b/BUILD.bazel index 87231f0e9..ed0af09ae 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -98,6 +98,16 @@ cc_proto_library( deps = ["direct_actor_proto"], ) +proto_library( + name = "serialization_proto", + srcs = ["src/ray/protobuf/serialization.proto"], +) + +cc_proto_library( + name = "serialization_cc_proto", + deps = ["serialization_proto"], +) + # === End of protobuf definitions === # === Begin of rpc definitions === @@ -735,6 +745,7 @@ pyx_library( deps = [ "//:core_worker_lib", "//:raylet_lib", + "//:serialization_cc_proto", ], ) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f55a207dd..c28b293b8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -58,6 +58,7 @@ from ray.ray_constants import ( DEFAULT_PUT_OBJECT_DELAY, DEFAULT_PUT_OBJECT_RETRIES, RAW_BUFFER_METADATA, + PICKLE5_BUFFER_METADATA, ) # pyarrow cannot be imported until after _raylet finishes initializing @@ -75,6 +76,7 @@ include "includes/ray_config.pxi" include "includes/task.pxi" include "includes/buffer.pxi" include "includes/common.pxi" +include "includes/serialization.pxi" include "includes/libcoreworker.pxi" @@ -474,6 +476,36 @@ cdef class CoreWorker: with nogil: check_status(self.core_worker.get().Objects().Seal(c_object_id)) + def put_pickle5_buffers(self, ObjectID object_id, c_string inband, + Pickle5Writer writer, + int memcopy_threads): + cdef: + shared_ptr[CBuffer] data + c_string metadata_str = PICKLE5_BUFFER_METADATA + shared_ptr[CBuffer] metadata = dynamic_pointer_cast[ + CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer]( + (metadata_str.data()), metadata_str.size())) + CObjectID c_object_id = object_id.native() + size_t data_size + + data_size = writer.get_total_bytes(inband) + + with nogil: + check_status(self.core_worker.get().Objects().Create( + metadata, data_size, c_object_id, &data)) + + # If data is nullptr, that means the ObjectID already existed, + # which we ignore. + # TODO(edoakes): this is hacky, we should return the error instead + # and deal with it here. + if not data: + return + + writer.write_to(inband, data, memcopy_threads) + with nogil: + check_status(self.core_worker.get().Objects().Seal(c_object_id)) + def wait(self, object_ids, int num_returns, int64_t timeout_milliseconds, TaskID current_task_id): cdef: diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi new file mode 100644 index 000000000..091edc5a3 --- /dev/null +++ b/python/ray/includes/serialization.pxi @@ -0,0 +1,334 @@ +from libc.stdlib cimport malloc, free +from libc.string cimport memcpy +from libc.stdint cimport uintptr_t, uint64_t, INT32_MAX +from cython.parallel cimport prange + +# This is the default alignment value for len(buffer) < 2048. +DEF kMinorBufferAlign = 8 +# This is the default alignment value for len(buffer) >= 2048. +# Some projects like Arrow use it for possible SIMD acceleration. +DEF kMajorBufferAlign = 64 +DEF kMajorBufferSize = 2048 +DEF kMemcopyDefaultBlocksize = 64 +DEF kMemcopyDefaultThreshold = 1024 * 1024 + + +cdef extern from "google/protobuf/repeated_field.h" nogil: + cdef cppclass RepeatedField[Element]: + const Element* data() const + +cdef extern from "ray/protobuf/serialization.pb.h" nogil: + cdef cppclass CPythonBuffer "ray::serialization::PythonBuffer": + void set_address(uint64_t value) + uint64_t address() const + void set_length(int64_t value) + int64_t length() const + void set_itemsize(int64_t value) + int64_t itemsize() + void set_ndim(int32_t value) + int32_t ndim() + void set_readonly(c_bool value) + c_bool readonly() + void set_format(const c_string& value) + const c_string &format() + c_string* release_format() + void add_shape(int64_t value) + int64_t shape(int index) + const RepeatedField[int64_t] &shape() const + int shape_size() + void add_strides(int64_t value) + int64_t strides(int index) + const RepeatedField[int64_t] &strides() const + int strides_size() + + cdef cppclass CPythonObject "ray::serialization::PythonObject": + uint64_t inband_data_offset() const + void set_inband_data_offset(uint64_t value) + uint64_t inband_data_size() const + void set_inband_data_size(uint64_t value) + uint64_t raw_buffers_offset() const + void set_raw_buffers_offset(uint64_t value) + uint64_t raw_buffers_size() const + void set_raw_buffers_size(uint64_t value) + CPythonBuffer* add_buffer() + CPythonBuffer& buffer(int index) const + int buffer_size() const + size_t ByteSizeLong() const + int GetCachedSize() const + uint8_t *SerializeWithCachedSizesToArray(uint8_t *target) + c_bool ParseFromArray(void* data, int size) + + +cdef int64_t padded_length(int64_t offset, int64_t alignment): + return ((offset + alignment - 1) // alignment) * alignment + + +cdef int64_t padded_length_u64(uint64_t offset, uint64_t alignment): + return ((offset + alignment - 1) // alignment) * alignment + + +cdef uint8_t* pointer_logical_and(const uint8_t *address, uintptr_t bits): + return (( address) & bits) + + +cdef class SubBuffer: + cdef: + void *buf + Py_ssize_t len + int readonly + c_string _format + int ndim + c_vector[Py_ssize_t] _shape + c_vector[Py_ssize_t] _strides + Py_ssize_t *suboffsets + Py_ssize_t itemsize + void *internal + object buffer + + def __cinit__(self, Buffer buffer): + # Increase ref count. + self.buffer = buffer + self.suboffsets = NULL + self.internal = NULL + + def __len__(self): + return self.len // self.itemsize + + @property + def nbytes(self): + """ + The buffer size in bytes. + """ + return self.len + + def tobytes(self): + """ + Return this buffer as a Python bytes object. Memory is copied. + """ + return PyBytes_FromStringAndSize( + self.buf, self.len) + + def __getbuffer__(self, Py_buffer* buffer, int flags): + buffer.readonly = self.readonly + buffer.buf = self.buf + buffer.format = self._format.c_str() + buffer.internal = self.internal + buffer.itemsize = self.itemsize + buffer.len = self.len + buffer.ndim = self.ndim + buffer.obj = self # This is important for GC. + buffer.shape = self._shape.data() + buffer.strides = self._strides.data() + buffer.suboffsets = self.suboffsets + + def __getsegcount__(self, Py_ssize_t *len_out): + if len_out != NULL: + len_out[0] = self.size + return 1 + + def __getreadbuffer__(self, Py_ssize_t idx, void ** p): + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = self.buf + return self.size + + def __getwritebuffer__(self, Py_ssize_t idx, void ** p): + if idx != 0: + raise SystemError("accessing non-existent buffer segment") + if p != NULL: + p[0] = self.buf + return self.size + + +cdef void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes, + uintptr_t block_size, int num_threads): + cdef: + uint8_t *left = pointer_logical_and(src + block_size - 1, + ~(block_size - 1)) + uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1)) + int64_t num_blocks = (right - left) // block_size + size_t chunk_size + int64_t prefix, suffix + int i + + # Update right address + right = right - (num_blocks % num_threads) * block_size + + # Now we divide these blocks between available threads. The remainder is + # handled separately. + chunk_size = (right - left) // num_threads + prefix = left - src + suffix = src + nbytes - right + # Now the data layout is + # | prefix | k * num_threads * block_size | suffix |. + # + # We have chunk_size = k * block_size, therefore the data layout is + # | prefix | num_threads * chunk_size | suffix |. + # Each thread gets a "chunk" of k blocks. + + for i in prange(num_threads, nogil=True, num_threads=num_threads): + memcpy(dst + prefix + i * chunk_size, left + i * chunk_size, + chunk_size) + if i == 0: + memcpy(dst, src, prefix) + if i == num_threads - 1: + memcpy(dst + prefix + num_threads * chunk_size, right, suffix) + + +# See 'serialization.proto' for the memory layout in the Plasma buffer. +def unpack_pickle5_buffers(Buffer buf): + cdef: + shared_ptr[CBuffer] _buffer = buf.buffer + const uint8_t *data = buf.buffer.get().Data() + size_t size = _buffer.get().Size() + CPythonObject python_object + CPythonBuffer *buffer_meta + c_string inband_data + int64_t protobuf_offset + int64_t protobuf_size + int32_t i + const uint8_t *buffers_segment + protobuf_offset = (data)[0] + if protobuf_offset < 0: + raise ValueError("The protobuf data offset should be positive." + "Got negative instead. " + "Maybe the buffer has been corrupted.") + protobuf_size = (data)[1] + if protobuf_size > INT32_MAX or protobuf_size < 0: + raise ValueError("Incorrect protobuf size. " + "Maybe the buffer has been corrupted.") + if not python_object.ParseFromArray( + data + protobuf_offset, protobuf_size): + raise ValueError("Protobuf object is corrupted.") + inband_data.append((data + python_object.inband_data_offset()), + python_object.inband_data_size()) + buffers_segment = data + python_object.raw_buffers_offset() + pickled_buffers = [] + # Now read buffer meta + for i in range(python_object.buffer_size()): + buffer_meta = &python_object.buffer(i) + buffer = SubBuffer(buf) + buffer.buf = (buffers_segment + buffer_meta.address()) + buffer.len = buffer_meta.length() + buffer.itemsize = buffer_meta.itemsize() + buffer.readonly = buffer_meta.readonly() + buffer.ndim = buffer_meta.ndim() + buffer._format = buffer_meta.format() + buffer._shape.assign( + buffer_meta.shape().data(), + buffer_meta.shape().data() + buffer_meta.ndim()) + buffer._strides.assign( + buffer_meta.strides().data(), + buffer_meta.strides().data() + buffer_meta.ndim()) + buffer.internal = NULL + buffer.suboffsets = NULL + pickled_buffers.append(buffer) + return inband_data, pickled_buffers + + +cdef class Pickle5Writer: + cdef: + CPythonObject python_object + c_vector[Py_buffer] buffers + # Address of end of the current buffer, relative to the + # begin offset of our buffers. + uint64_t _curr_buffer_addr + uint64_t _protobuf_offset + int64_t _total_bytes + + def __cinit__(self): + self._curr_buffer_addr = 0 + self._total_bytes = -1 + + def buffer_callback(self, pickle_buffer): + cdef: + Py_buffer view + int32_t i + CPythonBuffer* buffer = self.python_object.add_buffer() + cpython.PyObject_GetBuffer(pickle_buffer, &view, + cpython.PyBUF_FULL_RO) + buffer.set_length(view.len) + buffer.set_ndim(view.ndim) + buffer.set_readonly(view.readonly) + buffer.set_itemsize(view.itemsize) + if view.format: + buffer.set_format(view.format) + if view.shape: + for i in range(view.ndim): + buffer.add_shape(view.shape[i]) + if view.strides: + for i in range(view.ndim): + buffer.add_strides(view.strides[i]) + + # Increase buffer address. + if view.len < kMajorBufferSize: + self._curr_buffer_addr = padded_length( + self._curr_buffer_addr, kMinorBufferAlign) + else: + self._curr_buffer_addr = padded_length( + self._curr_buffer_addr, kMajorBufferAlign) + buffer.set_address(self._curr_buffer_addr) + self._curr_buffer_addr += view.len + self.buffers.push_back(view) + + def get_total_bytes(self, const c_string &inband): + cdef: + size_t protobuf_bytes = 0 + uint64_t inband_data_offset = sizeof(int64_t) * 2 + uint64_t raw_buffers_offset = padded_length_u64( + inband_data_offset + inband.length(), kMajorBufferAlign) + self.python_object.set_inband_data_offset(inband_data_offset) + self.python_object.set_inband_data_size(inband.length()) + self.python_object.set_raw_buffers_offset(raw_buffers_offset) + self.python_object.set_raw_buffers_size(self._curr_buffer_addr) + # Since calculating the output size is expensive, we will + # reuse the cached size. + # So we MUST NOT change 'python_object' afterwards. + # This is because protobuf could change the output size + # according to different values. + protobuf_bytes = self.python_object.ByteSizeLong() + if protobuf_bytes > INT32_MAX: + raise ValueError("Total buffer metadata size is bigger than %d. " + "Consider reduce the number of buffers " + "(number of numpy arrays, etc)." % INT32_MAX) + self._protobuf_offset = padded_length_u64( + raw_buffers_offset + self._curr_buffer_addr, kMinorBufferAlign) + self._total_bytes = self._protobuf_offset + protobuf_bytes + return self._total_bytes + + cdef void write_to(self, const c_string &inband, shared_ptr[CBuffer] data, + int memcopy_threads): + cdef uint8_t *ptr = data.get().Data() + cdef int32_t protobuf_size + cdef uint64_t buffer_addr + cdef uint64_t buffer_len + cdef int i + if self._total_bytes < 0: + raise ValueError("Must call 'get_total_bytes()' first " + "to get the actual size") + # Write protobuf size for deserialization. + protobuf_size = self.python_object.GetCachedSize() + (ptr)[0] = self._protobuf_offset + (ptr)[1] = protobuf_size + # Write protobuf data. + self.python_object.SerializeWithCachedSizesToArray( + ptr + self._protobuf_offset) + # Write inband data. + memcpy(ptr + self.python_object.inband_data_offset(), + inband.data(), inband.length()) + # Write buffer data. + ptr += self.python_object.raw_buffers_offset() + for i in range(self.python_object.buffer_size()): + buffer_addr = self.python_object.buffer(i).address() + buffer_len = self.python_object.buffer(i).length() + if (memcopy_threads > 1 and + buffer_len > kMemcopyDefaultThreshold): + parallel_memcopy(ptr + buffer_addr, + self.buffers[i].buf, + buffer_len, + kMemcopyDefaultBlocksize, memcopy_threads) + else: + memcpy(ptr + buffer_addr, self.buffers[i].buf, buffer_len) + # We must release the buffer, or we could experience memory leaks. + cpython.PyBuffer_Release(&self.buffers[i]) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index c61cfed2e..50d4b5036 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -177,5 +177,6 @@ LOG_MONITOR_MAX_OPEN_FILES = 200 # A constant used as object metadata to indicate the object is raw binary. RAW_BUFFER_METADATA = b"RAW" +PICKLE5_BUFFER_METADATA = b"PICKLE5" AUTOSCALER_RESOURCE_REQUEST_CHANNEL = b"autoscaler_resource_request" diff --git a/python/ray/worker.py b/python/ray/worker.py index 2cbd18825..fae9d77df 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -50,6 +50,7 @@ from ray import ( ) from ray import import_thread from ray import profiling +from ray._raylet import Pickle5Writer, unpack_pickle5_buffers from ray.gcs_utils import ErrorType from ray.exceptions import ( @@ -89,9 +90,6 @@ try: except ImportError: setproctitle = None -if USE_NEW_SERIALIZER: - from _pickle import PickleBuffer - class ActorCheckpointInfo(object): """Information used to maintain actor checkpoints.""" @@ -435,18 +433,11 @@ class Worker(object): self.core_worker.put_raw_buffer( value, object_id, memcopy_threads=self.memcopy_threads) else: - buffers = [] + writer = Pickle5Writer() meta = pickle.dumps( - value, protocol=5, buffer_callback=buffers.append) - # TODO(suquark): This could involve more copies. - # Should implement zero-copy for PickleBuffer. - buffers = [b.raw().tobytes() for b in buffers] - value = (meta, buffers) - - self.core_worker.put_serialized_object( - pyarrow.serialize(value), - object_id, - memcopy_threads=self.memcopy_threads) + value, protocol=5, buffer_callback=writer.buffer_callback) + self.core_worker.put_pickle5_buffers(object_id, meta, writer, + self.memcopy_threads) except pyarrow.plasma.PlasmaObjectExists: # The object already exists in the object store, so there is no # need to add it again. TODO(rkn): We need to compare hashes @@ -522,6 +513,10 @@ class Worker(object): def _deserialize_object_from_arrow(self, data, metadata, object_id, serialization_context): if metadata: + if (USE_NEW_SERIALIZER + and metadata == ray_constants.PICKLE5_BUFFER_METADATA): + in_band, buffers = unpack_pickle5_buffers(data) + return pickle.loads(in_band, buffers=buffers) # Check if the object should be returned as raw bytes. if metadata == ray_constants.RAW_BUFFER_METADATA: if data is None: @@ -540,12 +535,7 @@ class Worker(object): assert False, "Unrecognized error type " + str(error_type) elif data: # If data is not empty, deserialize the object. - if USE_NEW_SERIALIZER: - r, buffers = pyarrow.deserialize(data, serialization_context) - buffers = [PickleBuffer(b) for b in buffers] - return pickle.loads(r, buffers=buffers) - else: - return pyarrow.deserialize(data, serialization_context) + return pyarrow.deserialize(data, serialization_context) else: # Object isn't available in plasma. return plasma.ObjectNotAvailable diff --git a/src/ray/protobuf/serialization.proto b/src/ray/protobuf/serialization.proto new file mode 100644 index 000000000..2dbd35e8c --- /dev/null +++ b/src/ray/protobuf/serialization.proto @@ -0,0 +1,82 @@ +syntax = "proto3"; + +package ray.serialization; + +// This is the protocol for python object serialization with pickle5. +// +// ## About Pickle 5 Protocol +// Pickle5 will create two things during serialization: +// 1. Inband data. This is the framed pickle data for most objects. +// 2. Buffers. They are python buffers referring internal data of objects. +// They contain metadata of the buffer and a native pointer. +// Thus they provide interface for zero-copy serialization. +// +// ## Protobuf object +// A PythonObject protobuf object will be created for each python object. +// Unfortunately, protobuf object has a 2GB memory limit and cannot support zero-copy, +// so we have to put inband data and raw buffer contents outside. Thus PythonObject +// will only store buffer metadata, the offset and size of inband data, and the +// offset and length of raw buffers object. +// +// ## Python object serialization memory layout +// This section describes the memory layout in the Plasma store buffer. +// Unfortunately, no frame info is included in protobuf data, so we have to specify +// the length and offset of PythonObject. +// --------------------- +// i64 offset(PythonObject): +// Offset of the PythonObject relative to the start of this buffer. +// i64 len(PythonObject): +// Length of the PythonObject. +// inband_data | pad(64) +// Inband data, padded with 64 bytes for the alignment of buffers. +// buffers | pad(8) +// Raw data of buffers, padded with 8 bytes for the alignment of PythonObject. +// PythonObject +// PythonObject is stored at the end because its size will be variable. +// --------------------- + +// The message for metadata of python buffer objects. +message PythonBuffer { + // The offset of the buffer relative to the beginning of the raw buffer section, + // which is stored in 'PythonObject'. + uint64 address = 1; + // The length of the buffer. + // It should be equal to 'product(*shape) * itemsize'. + // 'int64' represents 'Py_ssize_t' of the corresponding python interface. + int64 length = 2; + // The size of every element in the buffer. + // 'int64' represents 'Py_ssize_t' of the corresponding python interface. + int64 itemsize = 3; + // The dimensions of the object (for example, number of tensor axises). + int32 ndim = 4; + // Readonly flag for this object. + bool readonly = 5; + // The format string for every item. This is optional. + // If this is NULL, "B" (unsigned bytes) is assumed. + string format = 6; + // The shape of the object per dimension. This is NULL when ndim == 0 + // The length of the shape should be equal to 'ndim'. + // 'int64' represents 'Py_ssize_t' of the corresponding python interface. + repeated int64 shape = 7; + // The stride of the object per dimension. This is NULL when ndim == 0 + // The length of the strides should be equal to 'ndim'. + // 'int64' represents 'Py_ssize_t' of the corresponding python interface. + repeated int64 strides = 8; + // 'suboffsets' is ignored since it is required to be NULL by the pickle5 protocol. +} + +// The message for pickle5 serialized python object. +message PythonObject { + // The offset of the inband data section relative to the beginning of the Plasma buffer. + uint64 inband_data_offset = 1; + // The size of the inband data section. + uint64 inband_data_size = 2; + // The offset of the raw buffers section relative to the beginning of the Plasma buffer. + uint64 raw_buffers_offset = 3; + // The size of the buffers section. It is not used in deserialization + // because we already have the length and address of every buffer. However, it could + // be useful for debugging or future adjustment, so we just keep it. + uint64 raw_buffers_size = 4; + // The metadata of python buffer objects. + repeated PythonBuffer buffer = 5; +}