[Serialization] Fix buffer alignment issues (#11888)

* fix buffer alignment issues

* remove unused fields

* aligned memory allocation

* windows compat

* license. fix compiler warnings

* fix compilation error

* reinterpret_cast
This commit is contained in:
Siyuan (Ryans) Zhuang
2020-11-10 23:44:16 -08:00
committed by GitHub
parent 1979ea9c0a
commit b8dda0e3d0
8 changed files with 238 additions and 70 deletions
-1
View File
@@ -54,7 +54,6 @@ from ray.includes.common cimport (
CTaskType,
CPlacementStrategy,
CRayFunction,
LocalMemoryBuffer,
move,
LANGUAGE_CPP,
LANGUAGE_JAVA,
+45 -42
View File
@@ -48,12 +48,8 @@ cdef extern from "src/ray/protobuf/serialization.pb.h" nogil:
int strides_size()
cdef cppclass CPythonObject "ray::serialization::PythonObject":
uint64_t inband_data_offset() const
void set_inband_data_offset(uint64_t value)
uint64_t inband_data_size() const
void set_inband_data_size(uint64_t value)
uint64_t raw_buffers_offset() const
void set_raw_buffers_offset(uint64_t value)
uint64_t raw_buffers_size() const
void set_raw_buffers_size(uint64_t value)
CPythonBuffer* add_buffer()
@@ -69,8 +65,9 @@ cdef int64_t padded_length(int64_t offset, int64_t alignment):
return ((offset + alignment - 1) // alignment) * alignment
cdef int64_t padded_length_u64(uint64_t offset, uint64_t alignment):
return ((offset + alignment - 1) // alignment) * alignment
cdef uint8_t* aligned_address(uint8_t* addr, uint64_t alignment) nogil:
cdef uintptr_t u_addr = <uintptr_t>addr
return <uint8_t*>(((u_addr + alignment - 1) // alignment) * alignment)
cdef class SubBuffer:
@@ -204,36 +201,41 @@ def split_buffer(Buffer buf):
bufferview[kMessagePackOffset + msgpack_bytes_length:])
# See 'serialization.proto' for the memory layout in the Plasma buffer.
# Note [Pickle5 serialization layout & alignment]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# To ensure efficient data access, our serialize enforces alignment
# when writing data to a buffer. See 'serialization.proto' for
# the detail memory layout and alignment.
@cython.boundscheck(False)
@cython.wraparound(False)
def unpack_pickle5_buffers(uint8_t[:] bufferview):
cdef:
const uint8_t *data = &bufferview[0]
size_t size = len(bufferview)
CPythonObject python_object
CPythonBuffer *buffer_meta
int64_t protobuf_offset
int inband_offset = sizeof(int64_t) * 2
int64_t inband_size
int64_t protobuf_size
int32_t i
const uint8_t *buffers_segment
protobuf_offset = (<int64_t*>data)[0]
if protobuf_offset < 0:
raise ValueError("The protobuf data offset should be positive."
inband_size = (<int64_t*>data)[0]
if inband_size < 0:
raise ValueError("The inband data size should be positive."
"Got negative instead. "
"Maybe the buffer has been corrupted.")
protobuf_size = (<int64_t*>data)[1]
if protobuf_size > INT32_MAX or protobuf_size < 0:
raise ValueError("Incorrect protobuf size. "
"Maybe the buffer has been corrupted.")
inband_data = bufferview[inband_offset:inband_offset + inband_size]
if not python_object.ParseFromArray(
data + protobuf_offset, <int32_t>protobuf_size):
data + inband_offset + inband_size, <int32_t>protobuf_size):
raise ValueError("Protobuf object is corrupted.")
inband_data_offset = python_object.inband_data_offset()
inband_data = bufferview[
inband_data_offset:
inband_data_offset + python_object.inband_data_size()]
buffers_segment = data + python_object.raw_buffers_offset()
buffers_segment = aligned_address(
<uint8_t*>data + inband_offset + inband_size + protobuf_size,
kMajorBufferAlign)
pickled_buffers = []
# Now read buffer meta
for i in range(python_object.buffer_size()):
@@ -313,51 +315,52 @@ cdef class Pickle5Writer:
cdef:
size_t protobuf_bytes = 0
uint64_t inband_data_offset = sizeof(int64_t) * 2
uint64_t raw_buffers_offset = padded_length_u64(
inband_data_offset + len(inband), kMajorBufferAlign)
self.python_object.set_inband_data_offset(inband_data_offset)
self.python_object.set_inband_data_size(len(inband))
self.python_object.set_raw_buffers_offset(raw_buffers_offset)
self.python_object.set_raw_buffers_size(self._curr_buffer_addr)
# Since calculating the output size is expensive, we will
# reuse the cached size.
# So we MUST NOT change 'python_object' afterwards.
# This is because protobuf could change the output size
# according to different values.
# However, protobuf could change the output size according to
# different values, so we MUST NOT change 'python_object' afterwards.
protobuf_bytes = self.python_object.ByteSizeLong()
if protobuf_bytes > INT32_MAX:
raise ValueError("Total buffer metadata size is bigger than %d. "
"Consider reduce the number of buffers "
"(number of numpy arrays, etc)." % INT32_MAX)
self._protobuf_offset = padded_length_u64(
raw_buffers_offset + self._curr_buffer_addr, kMinorBufferAlign)
self._protobuf_offset = inband_data_offset + len(inband)
self._total_bytes = self._protobuf_offset + protobuf_bytes
if self._curr_buffer_addr > 0:
# reserve 'kMajorBufferAlign' bytes for possible buffer alignment
self._total_bytes += kMajorBufferAlign + self._curr_buffer_addr
return self._total_bytes
@cython.boundscheck(False)
@cython.wraparound(False)
cdef void write_to(self, const uint8_t[:] inband, uint8_t[:] data,
int memcopy_threads) nogil:
cdef uint8_t *ptr = &data[0]
cdef int32_t protobuf_size
cdef uint64_t buffer_addr
cdef uint64_t buffer_len
cdef int i
cdef:
uint8_t *ptr = &data[0]
uint64_t buffer_addr
uint64_t buffer_len
int i
int64_t protobuf_size = self.python_object.GetCachedSize()
if self._total_bytes < 0:
raise ValueError("Must call 'get_total_bytes()' first "
"to get the actual size")
# Write protobuf size for deserialization.
protobuf_size = self.python_object.GetCachedSize()
(<int64_t*>ptr)[0] = self._protobuf_offset
# Write inband data & protobuf size for deserialization.
(<int64_t*>ptr)[0] = len(inband)
(<int64_t*>ptr)[1] = protobuf_size
# Write protobuf data.
self.python_object.SerializeWithCachedSizesToArray(
ptr + self._protobuf_offset)
# Write inband data.
memcpy(ptr + self.python_object.inband_data_offset(),
&inband[0], len(inband))
# Write buffer data.
ptr += self.python_object.raw_buffers_offset()
ptr += sizeof(int64_t) * 2
memcpy(ptr, &inband[0], len(inband))
# Write protobuf data.
ptr += len(inband)
self.python_object.SerializeWithCachedSizesToArray(ptr)
ptr += protobuf_size
if self._curr_buffer_addr <= 0:
# End of serialization. Writing more stuff will corrupt the memory.
return
# aligned to 64 bytes
ptr = aligned_address(ptr, kMajorBufferAlign)
for i in range(self.python_object.buffer_size()):
buffer_addr = self.python_object.buffer(i).address()
buffer_len = self.python_object.buffer(i).length()
+25
View File
@@ -543,6 +543,31 @@ def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes):
assert new_obj() is None
def test_buffer_alignment():
# Deserialized large numpy arrays should be 64-byte aligned.
x = np.random.normal(size=(10, 20, 30))
y = ray.get(ray.put(x))
assert y.ctypes.data % 64 == 0
# Unlike PyArrow, Ray aligns small numpy arrays to 8
# bytes to be memory efficient.
xs = [np.random.normal(size=i) for i in range(100)]
ys = ray.get(ray.put(xs))
for y in ys:
assert y.ctypes.data % 8 == 0
xs = [np.random.normal(size=i * (1, )) for i in range(20)]
ys = ray.get(ray.put(xs))
for y in ys:
assert y.ctypes.data % 8 == 0
xs = [np.random.normal(size=i * (5, )) for i in range(1, 8)]
xs = [xs[i][(i + 1) * (slice(1, 3), )] for i in range(len(xs))]
ys = ray.get(ray.put(xs))
for y in ys:
assert y.ctypes.data % 8 == 0
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))