diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi index 091edc5a3..c86414da0 100644 --- a/python/ray/includes/serialization.pxi +++ b/python/ray/includes/serialization.pxi @@ -1,7 +1,5 @@ -from libc.stdlib cimport malloc, free from libc.string cimport memcpy from libc.stdint cimport uintptr_t, uint64_t, INT32_MAX -from cython.parallel cimport prange # This is the default alignment value for len(buffer) < 2048. DEF kMinorBufferAlign = 8 @@ -12,6 +10,9 @@ DEF kMajorBufferSize = 2048 DEF kMemcopyDefaultBlocksize = 64 DEF kMemcopyDefaultThreshold = 1024 * 1024 +cdef extern from "arrow/util/memory.h" namespace "arrow::internal" nogil: + void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes, + uintptr_t block_size, int num_threads) cdef extern from "google/protobuf/repeated_field.h" nogil: cdef cppclass RepeatedField[Element]: @@ -67,10 +68,6 @@ cdef int64_t padded_length_u64(uint64_t offset, uint64_t alignment): return ((offset + alignment - 1) // alignment) * alignment -cdef uint8_t* pointer_logical_and(const uint8_t *address, uintptr_t bits): - return (( address) & bits) - - cdef class SubBuffer: cdef: void *buf @@ -141,41 +138,6 @@ cdef class SubBuffer: return self.size -cdef void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes, - uintptr_t block_size, int num_threads): - cdef: - uint8_t *left = pointer_logical_and(src + block_size - 1, - ~(block_size - 1)) - uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1)) - int64_t num_blocks = (right - left) // block_size - size_t chunk_size - int64_t prefix, suffix - int i - - # Update right address - right = right - (num_blocks % num_threads) * block_size - - # Now we divide these blocks between available threads. The remainder is - # handled separately. - chunk_size = (right - left) // num_threads - prefix = left - src - suffix = src + nbytes - right - # Now the data layout is - # | prefix | k * num_threads * block_size | suffix |. - # - # We have chunk_size = k * block_size, therefore the data layout is - # | prefix | num_threads * chunk_size | suffix |. - # Each thread gets a "chunk" of k blocks. - - for i in prange(num_threads, nogil=True, num_threads=num_threads): - memcpy(dst + prefix + i * chunk_size, left + i * chunk_size, - chunk_size) - if i == 0: - memcpy(dst, src, prefix) - if i == num_threads - 1: - memcpy(dst + prefix + num_threads * chunk_size, right, suffix) - - # See 'serialization.proto' for the memory layout in the Plasma buffer. def unpack_pickle5_buffers(Buffer buf): cdef: diff --git a/python/ray/worker.py b/python/ray/worker.py index fae9d77df..bd04c346c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -434,9 +434,9 @@ class Worker(object): value, object_id, memcopy_threads=self.memcopy_threads) else: writer = Pickle5Writer() - meta = pickle.dumps( + inband = pickle.dumps( value, protocol=5, buffer_callback=writer.buffer_callback) - self.core_worker.put_pickle5_buffers(object_id, meta, writer, + self.core_worker.put_pickle5_buffers(object_id, inband, writer, self.memcopy_threads) except pyarrow.plasma.PlasmaObjectExists: # The object already exists in the object store, so there is no