diff --git a/BUILD.bazel b/BUILD.bazel index 7aafbb259..d25da120e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1075,6 +1075,7 @@ pyx_library( ), deps = [ "//:core_worker_lib", + "//:ray_util", "//:raylet_lib", "//:serialization_cc_proto", "//:src/ray/ray_exported_symbols.lds", diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 9874d36c2..ecd967738 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -13,12 +13,6 @@ if "pickle5" in sys.modules: "requires a specific version of pickle5 (which is " "packaged along with Ray).") -if "OMP_NUM_THREADS" not in os.environ: - logger.debug("[ray] Forcing OMP_NUM_THREADS=1 to avoid performance " - "degradation with many workers (issue #6998). You can " - "override this by explicitly setting OMP_NUM_THREADS.") - os.environ["OMP_NUM_THREADS"] = "1" - # Add the directory containing pickle5 to the Python path so that we find the # pickle5 version packaged with ray and not a pre-existing pickle5. pickle5_path = os.path.join( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 641ff45cc..300286729 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -112,7 +112,7 @@ include "includes/libcoreworker.pxi" logger = logging.getLogger(__name__) -MEMCOPY_THREADS = 12 +MEMCOPY_THREADS = 6 def set_internal_config(dict options): diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi index ce5fca653..a5e08d8f7 100644 --- a/python/ray/includes/serialization.pxi +++ b/python/ray/includes/serialization.pxi @@ -10,7 +10,7 @@ DEF kMajorBufferSize = 2048 DEF kMemcopyDefaultBlocksize = 64 DEF kMemcopyDefaultThreshold = 1024 * 1024 -cdef extern from "arrow/util/memory.h" namespace "arrow::internal" nogil: +cdef extern from "ray/util/memory.h" namespace "ray" nogil: void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes, uintptr_t block_size, int num_threads) diff --git a/python/ray/worker.py b/python/ray/worker.py index ff35cadcc..b1547e7c5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -66,12 +66,6 @@ ERROR_KEY_PREFIX = b"Error:" # entry/init points. logger = logging.getLogger(__name__) -# Whether we should warn about slow put performance. -if os.environ.get("OMP_NUM_THREADS") == "1": - should_warn_of_slow_puts = True -else: - should_warn_of_slow_puts = False - class ActorCheckpointInfo: """Information used to maintain actor checkpoints.""" @@ -275,22 +269,10 @@ class Worker: "do this, you can wrap the ray.ObjectID in a list and " "call 'put' on it (or return it).") - global should_warn_of_slow_puts - if should_warn_of_slow_puts: - start = time.perf_counter() - serialized_value = self.get_serialization_context().serialize(value) - result = self.core_worker.put_serialized_object( + return self.core_worker.put_serialized_object( serialized_value, object_id=object_id, pin_object=pin_object) - if should_warn_of_slow_puts: - delta = time.perf_counter() - start - if delta > 0.1: - logger.warning("OMP_NUM_THREADS=1 is set, this may slow down " - "ray.put() for large objects (issue #6998).") - should_warn_of_slow_puts = False - return result - def deserialize_objects(self, data_metadata_pairs, object_ids): context = self.get_serialization_context() return context.deserialize_objects(data_metadata_pairs, object_ids) diff --git a/src/ray/util/memory.cc b/src/ray/util/memory.cc new file mode 100644 index 000000000..65b8bff6b --- /dev/null +++ b/src/ray/util/memory.cc @@ -0,0 +1,50 @@ +#include "ray/util/memory.h" + +#include +#include +#include + +namespace ray { + +uint8_t *pointer_logical_and(const uint8_t *address, uintptr_t bits) { + uintptr_t value = reinterpret_cast(address); + return reinterpret_cast(value & bits); +} + +void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes, + uintptr_t block_size, int num_threads) { + std::vector threadpool(num_threads); + uint8_t *left = pointer_logical_and(src + block_size - 1, ~(block_size - 1)); + uint8_t *right = pointer_logical_and(src + nbytes, ~(block_size - 1)); + int64_t num_blocks = (right - left) / block_size; + + // Update right address + right = right - (num_blocks % num_threads) * block_size; + + // Now we divide these blocks between available threads. The remainder is + // handled on the main thread. + int64_t chunk_size = (right - left) / num_threads; + int64_t prefix = left - src; + int64_t suffix = src + nbytes - right; + // Now the data layout is | prefix | k * num_threads * block_size | suffix |. + // We have chunk_size = k * block_size, therefore the data layout is + // | prefix | num_threads * chunk_size | suffix |. + // Each thread gets a "chunk" of k blocks. + + // Start all threads first and handle leftovers while threads run. + for (int i = 0; i < num_threads; i++) { + threadpool[i] = std::thread(std::memcpy, dst + prefix + i * chunk_size, + left + i * chunk_size, chunk_size); + } + + std::memcpy(dst, src, prefix); + std::memcpy(dst + prefix + num_threads * chunk_size, right, suffix); + + for (auto &t : threadpool) { + if (t.joinable()) { + t.join(); + } + } +} + +} // namespace ray diff --git a/src/ray/util/memory.h b/src/ray/util/memory.h new file mode 100644 index 000000000..c3b587a3e --- /dev/null +++ b/src/ray/util/memory.h @@ -0,0 +1,15 @@ +#ifndef RAY_UTIL_MEMORY_H +#define RAY_UTIL_MEMORY_H + +#include + +namespace ray { + +// A helper function for doing memcpy with multiple threads. This is required +// to saturate the memory bandwidth of modern cpus. +void parallel_memcopy(uint8_t *dst, const uint8_t *src, int64_t nbytes, + uintptr_t block_size, int num_threads); + +} // namespace ray + +#endif // RAY_UTIL_MEMORY_H