diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 6e453ec2f..0a6f5d86b 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -3,8 +3,8 @@ namespace ray { ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, - uint64_t chunk_size, int max_chunks, int release_delay) - : default_chunk_size_(chunk_size), max_chunks_(static_cast(max_chunks)) { + uint64_t chunk_size, int release_delay) + : default_chunk_size_(chunk_size) { store_socket_name_ = store_socket_name; ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", release_delay)); } @@ -24,24 +24,14 @@ ObjectBufferPool::~ObjectBufferPool() { ARROW_CHECK_OK(store_client_.Disconnect()); } -uint64_t ObjectBufferPool::GetChunkSize(uint64_t data_size) { - // If the number of chunks generated by the default chunk size exceeds the number of - // send threads, then use a chunk size such that the number of chunks is exactly - // the number of send threads. - if (data_size / default_chunk_size_ >= max_chunks_) { - return (data_size + default_chunk_size_ - 1) / max_chunks_; - } - return default_chunk_size_; -} - uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { - uint64_t chunk_size = GetChunkSize(data_size); - return (data_size + chunk_size - 1) / chunk_size; + return (data_size + default_chunk_size_ - 1) / default_chunk_size_; } uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) { - uint64_t chunk_size = GetChunkSize(data_size); - return (chunk_index + 1) * chunk_size > data_size ? data_size % chunk_size : chunk_size; + return (chunk_index + 1) * default_chunk_size_ > data_size + ? data_size % default_chunk_size_ + : default_chunk_size_; } std::pair ObjectBufferPool::GetChunk( @@ -179,18 +169,17 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { std::vector ObjectBufferPool::BuildChunks( const ObjectID &object_id, uint8_t *data, uint64_t data_size) { - uint64_t chunk_size = GetChunkSize(data_size); uint64_t space_remaining = data_size; std::vector chunks; int64_t position = 0; while (space_remaining) { position = data_size - space_remaining; - if (space_remaining < chunk_size) { + if (space_remaining < default_chunk_size_) { chunks.emplace_back(chunks.size(), data + position, space_remaining); space_remaining = 0; } else { - chunks.emplace_back(chunks.size(), data + position, chunk_size); - space_remaining -= chunk_size; + chunks.emplace_back(chunks.size(), data + position, default_chunk_size_); + space_remaining -= default_chunk_size_; } } return chunks; diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 2a1e02a39..3f15b2023 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -43,7 +43,7 @@ class ObjectBufferPool { /// \param release_delay The number of release calls before objects are released /// from the store client (FIFO). ObjectBufferPool(const std::string &store_socket_name, const uint64_t chunk_size, - int max_chunks, const int release_delay); + const int release_delay); ~ObjectBufferPool(); @@ -124,9 +124,6 @@ class ObjectBufferPool { void SealChunk(const ObjectID &object_id, uint64_t chunk_index); private: - /// Gets the chunk size based on data size. - uint64_t GetChunkSize(uint64_t data_size); - /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. void AbortCreate(const ObjectID &object_id); @@ -180,8 +177,6 @@ class ObjectBufferPool { std::mutex pool_mutex_; /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t default_chunk_size_; - /// The maximum number of chunks allowed. - const uint64_t max_chunks_; /// The state of a buffer that's currently being used. std::unordered_map get_buffer_state_; /// The state of a buffer that's currently being used. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 493461f49..413d25e46 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -17,7 +17,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, - config_.max_sends, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), @@ -41,7 +40,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, - config_.max_sends, /*release_delay=*/2 * config_.max_sends), send_work_(send_service_), receive_work_(receive_service_), diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 9e6e91bc0..76d9f2ae4 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -121,8 +121,10 @@ class TestObjectManagerBase : public ::testing::Test { store_id_2 = StartStore(UniqueID::from_random().hex()); uint pull_timeout_ms = 1; - int max_sends = 2; - int max_receives = 2; + int max_sends_a = 2; + int max_receives_a = 2; + int max_sends_b = 3; + int max_receives_b = 3; uint64_t object_chunk_size = static_cast(std::pow(10, 3)); int push_timeout_ms = 10000; @@ -131,8 +133,8 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_id_1; om_config_1.pull_timeout_ms = pull_timeout_ms; - om_config_1.max_sends = max_sends; - om_config_1.max_receives = max_receives; + om_config_1.max_sends = max_sends_a; + om_config_1.max_receives = max_receives_a; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); @@ -142,8 +144,8 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_id_2; om_config_2.pull_timeout_ms = pull_timeout_ms; - om_config_2.max_sends = max_sends; - om_config_2.max_receives = max_receives; + om_config_2.max_sends = max_sends_b; + om_config_2.max_receives = max_receives_b; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index c411c00ab..09e0263c0 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -437,16 +437,7 @@ class TestObjectManager : public TestObjectManagerBase { })); } - void TestWaitComplete() { TestBufferPool(); } - - void TestBufferPool() { - // Ensure the number of chunks generated do not exceed the number of send threads. - for (uint64_t i = object_chunk_size / 2; i < 10 * object_chunk_size; ++i) { - uint64_t num_chunks = server1->object_manager_.buffer_pool_.GetNumChunks(i); - ASSERT_LE(num_chunks, max_sends); - } - main_service.stop(); - } + void TestWaitComplete() { main_service.stop(); } void TestConnections() { RAY_LOG(DEBUG) << "\n"