From 890fa6704ff1e705e76de8dbfe46d8f55fe0dcf5 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Sat, 19 Sep 2020 12:22:20 +0800 Subject: [PATCH] [GCS]Fix MGetValues Command to send is too large bug (#10877) --- src/ray/common/ray_config_def.h | 4 +- .../gcs/store_client/redis_store_client.cc | 97 +++++++++++-------- src/ray/gcs/store_client/redis_store_client.h | 9 +- 3 files changed, 65 insertions(+), 45 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 73e9e0ded..f7281ba04 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -205,8 +205,8 @@ RAY_CONFIG(int, num_workers_per_process_cpp, 1) /// Maximum number of ids in one batch to send to GCS to delete keys. RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000) -/// Maximum number of items in one batch to scan from GCS storage. -RAY_CONFIG(uint32_t, maximum_gcs_scan_batch_size, 1000) +/// Maximum number of items in one batch to scan/get/delete from GCS storage. +RAY_CONFIG(uint32_t, maximum_gcs_storage_operation_batch_size, 1000) /// When getting objects from object store, print a warning every this number of attempts. RAY_CONFIG(uint32_t, object_store_get_warn_per_num_attempts, 50) diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index 8f971e124..26a8776d5 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -224,38 +224,53 @@ Status RedisStoreClient::DoPut(const std::string &key, const std::string &data, Status RedisStoreClient::DeleteByKeys(const std::vector &keys, const StatusCallback &callback) { // The `DEL` command for each shard. - auto del_commands_by_shards = GenCommandsByShards(redis_client_, "DEL", keys); + int total_count = 0; + auto del_commands_by_shards = + GenCommandsByShards(redis_client_, "DEL", keys, &total_count); auto finished_count = std::make_shared(0); - int size = del_commands_by_shards.size(); - for (auto &item : del_commands_by_shards) { - auto delete_callback = [finished_count, size, - callback](const std::shared_ptr &reply) { - ++(*finished_count); - if (*finished_count == size) { - if (callback) { - callback(Status::OK()); + + for (auto &command_list : del_commands_by_shards) { + for (auto &command : command_list.second) { + auto delete_callback = [finished_count, total_count, + callback](const std::shared_ptr &reply) { + ++(*finished_count); + if (*finished_count == total_count) { + if (callback) { + callback(Status::OK()); + } } - } - }; - RAY_CHECK_OK(item.first->RunArgvAsync(item.second, delete_callback)); + }; + RAY_CHECK_OK(command_list.first->RunArgvAsync(command, delete_callback)); + } } return Status::OK(); } -std::unordered_map> +std::unordered_map>> RedisStoreClient::GenCommandsByShards(const std::shared_ptr &redis_client, const std::string &command, - const std::vector &keys) { - std::unordered_map> commands_by_shards; + const std::vector &keys, int *count) { + std::unordered_map>> + commands_by_shards; for (auto &key : keys) { auto shard_context = redis_client->GetShardContext(key).get(); auto it = commands_by_shards.find(shard_context); if (it == commands_by_shards.end()) { - commands_by_shards[shard_context].push_back(command); - commands_by_shards[shard_context].push_back(key); + auto key_vector = commands_by_shards[shard_context].emplace( + commands_by_shards[shard_context].begin(), std::vector()); + key_vector->push_back(command); + key_vector->push_back(key); + (*count)++; } else { - it->second.push_back(key); + // If the last batch is full, add a new batch. + if (it->second.back().size() - 1 == + RayConfig::instance().maximum_gcs_storage_operation_batch_size()) { + it->second.emplace_back(std::vector()); + it->second.back().push_back(command); + (*count)++; + } + it->second.back().push_back(key); } } return commands_by_shards; @@ -309,30 +324,32 @@ Status RedisStoreClient::MGetValues( const std::vector &keys, const ItemCallback> &callback) { // The `MGET` command for each shard. - auto mget_commands_by_shards = GenCommandsByShards(redis_client, "MGET", keys); - + int total_count = 0; + auto mget_commands_by_shards = + GenCommandsByShards(redis_client, "MGET", keys, &total_count); auto finished_count = std::make_shared(0); - int size = mget_commands_by_shards.size(); - for (auto &item : mget_commands_by_shards) { - auto mget_keys = std::move(item.second); - auto mget_callback = [table_name, finished_count, size, mget_keys, - callback](const std::shared_ptr &reply) { - std::unordered_map key_value_map; - if (!reply->IsNil()) { - auto value = reply->ReadAsStringArray(); - // The 0 th element of mget_keys is "MGET", so we start from the 1 th element. - for (int index = 0; index < (int)value.size(); ++index) { - key_value_map[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] = - value[index]; + auto key_value_map = std::make_shared>(); + for (auto &command_list : mget_commands_by_shards) { + for (auto &command : command_list.second) { + auto mget_keys = std::move(command); + auto mget_callback = [table_name, finished_count, total_count, mget_keys, callback, + key_value_map](const std::shared_ptr &reply) { + if (!reply->IsNil()) { + auto value = reply->ReadAsStringArray(); + // The 0 th element of mget_keys is "MGET", so we start from the 1 th element. + for (int index = 0; index < (int)value.size(); ++index) { + (*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] = + value[index]; + } } - } - ++(*finished_count); - if (*finished_count == size) { - callback(key_value_map); - } - }; - RAY_CHECK_OK(item.first->RunArgvAsync(mget_keys, mget_callback)); + ++(*finished_count); + if (*finished_count == total_count) { + callback(*key_value_map); + } + }; + RAY_CHECK_OK(command_list.first->RunArgvAsync(mget_keys, mget_callback)); + } } return Status::OK(); } @@ -377,7 +394,7 @@ void RedisStoreClient::RedisScanner::Scan(std::string match_pattern, return; } - size_t batch_count = RayConfig::instance().maximum_gcs_scan_batch_size(); + size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size(); for (const auto &item : shard_to_cursor_) { ++pending_request_count_; diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index 14d77b1b6..2fb673d99 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -112,9 +112,12 @@ class RedisStoreClient : public StoreClient { Status DeleteByKeys(const std::vector &keys, const StatusCallback &callback); - static std::unordered_map> GenCommandsByShards( - const std::shared_ptr &redis_client, const std::string &command, - const std::vector &keys); + /// The return value is a map, whose key is the shard and the value is a list of batch + /// operations. + static std::unordered_map>> + GenCommandsByShards(const std::shared_ptr &redis_client, + const std::string &command, const std::vector &keys, + int *count); /// The separator is used when building redis key. static std::string table_separator_;