[xray] Revert dynamic chunk size optimization for ObjectManager. (#2557)

* Revert dynamic chunk size optimization.

* fix mac build issues.
This commit is contained in:
Melih Elibol
2018-08-05 05:09:37 -04:00
committed by Philipp Moritz
parent 914a433e3f
commit 34d3a46f48
5 changed files with 19 additions and 44 deletions
+9 -20
View File
@@ -3,8 +3,8 @@
namespace ray {
ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size, int max_chunks, int release_delay)
: default_chunk_size_(chunk_size), max_chunks_(static_cast<uint64_t>(max_chunks)) {
uint64_t chunk_size, int release_delay)
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", release_delay));
}
@@ -24,24 +24,14 @@ ObjectBufferPool::~ObjectBufferPool() {
ARROW_CHECK_OK(store_client_.Disconnect());
}
uint64_t ObjectBufferPool::GetChunkSize(uint64_t data_size) {
// If the number of chunks generated by the default chunk size exceeds the number of
// send threads, then use a chunk size such that the number of chunks is exactly
// the number of send threads.
if (data_size / default_chunk_size_ >= max_chunks_) {
return (data_size + default_chunk_size_ - 1) / max_chunks_;
}
return default_chunk_size_;
}
uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
uint64_t chunk_size = GetChunkSize(data_size);
return (data_size + chunk_size - 1) / chunk_size;
return (data_size + default_chunk_size_ - 1) / default_chunk_size_;
}
uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
uint64_t chunk_size = GetChunkSize(data_size);
return (chunk_index + 1) * chunk_size > data_size ? data_size % chunk_size : chunk_size;
return (chunk_index + 1) * default_chunk_size_ > data_size
? data_size % default_chunk_size_
: default_chunk_size_;
}
std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::GetChunk(
@@ -179,18 +169,17 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
const ObjectID &object_id, uint8_t *data, uint64_t data_size) {
uint64_t chunk_size = GetChunkSize(data_size);
uint64_t space_remaining = data_size;
std::vector<ChunkInfo> chunks;
int64_t position = 0;
while (space_remaining) {
position = data_size - space_remaining;
if (space_remaining < chunk_size) {
if (space_remaining < default_chunk_size_) {
chunks.emplace_back(chunks.size(), data + position, space_remaining);
space_remaining = 0;
} else {
chunks.emplace_back(chunks.size(), data + position, chunk_size);
space_remaining -= chunk_size;
chunks.emplace_back(chunks.size(), data + position, default_chunk_size_);
space_remaining -= default_chunk_size_;
}
}
return chunks;
+1 -6
View File
@@ -43,7 +43,7 @@ class ObjectBufferPool {
/// \param release_delay The number of release calls before objects are released
/// from the store client (FIFO).
ObjectBufferPool(const std::string &store_socket_name, const uint64_t chunk_size,
int max_chunks, const int release_delay);
const int release_delay);
~ObjectBufferPool();
@@ -124,9 +124,6 @@ class ObjectBufferPool {
void SealChunk(const ObjectID &object_id, uint64_t chunk_index);
private:
/// Gets the chunk size based on data size.
uint64_t GetChunkSize(uint64_t data_size);
/// Abort the create operation associated with an object. This destroys the buffer
/// state, including create operations in progress for all chunks of the object.
void AbortCreate(const ObjectID &object_id);
@@ -180,8 +177,6 @@ class ObjectBufferPool {
std::mutex pool_mutex_;
/// Determines the maximum chunk size to be transferred by a single thread.
const uint64_t default_chunk_size_;
/// The maximum number of chunks allowed.
const uint64_t max_chunks_;
/// The state of a buffer that's currently being used.
std::unordered_map<ray::ObjectID, GetBufferState> get_buffer_state_;
/// The state of a buffer that's currently being used.
-2
View File
@@ -17,7 +17,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
@@ -41,7 +40,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
@@ -121,8 +121,10 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_2 = StartStore(UniqueID::from_random().hex());
uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
int max_sends_a = 2;
int max_receives_a = 2;
int max_sends_b = 3;
int max_receives_b = 3;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
int push_timeout_ms = 10000;
@@ -131,8 +133,8 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.max_sends = max_sends_a;
om_config_1.max_receives = max_receives_a;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
@@ -142,8 +144,8 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.max_sends = max_sends_b;
om_config_2.max_receives = max_receives_b;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
@@ -437,16 +437,7 @@ class TestObjectManager : public TestObjectManagerBase {
}));
}
void TestWaitComplete() { TestBufferPool(); }
void TestBufferPool() {
// Ensure the number of chunks generated do not exceed the number of send threads.
for (uint64_t i = object_chunk_size / 2; i < 10 * object_chunk_size; ++i) {
uint64_t num_chunks = server1->object_manager_.buffer_pool_.GetNumChunks(i);
ASSERT_LE(num_chunks, max_sends);
}
main_service.stop();
}
void TestWaitComplete() { main_service.stop(); }
void TestConnections() {
RAY_LOG(DEBUG) << "\n"