[GCS]Fix MGetValues Command to send is too large bug (#10877)

This commit is contained in:
fangfengbin
2020-09-19 12:22:20 +08:00
committed by GitHub
parent 6a227ae501
commit 890fa6704f
3 changed files with 65 additions and 45 deletions
+2 -2
View File
@@ -205,8 +205,8 @@ RAY_CONFIG(int, num_workers_per_process_cpp, 1)
/// Maximum number of ids in one batch to send to GCS to delete keys.
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)
/// Maximum number of items in one batch to scan from GCS storage.
RAY_CONFIG(uint32_t, maximum_gcs_scan_batch_size, 1000)
/// Maximum number of items in one batch to scan/get/delete from GCS storage.
RAY_CONFIG(uint32_t, maximum_gcs_storage_operation_batch_size, 1000)
/// When getting objects from object store, print a warning every this number of attempts.
RAY_CONFIG(uint32_t, object_store_get_warn_per_num_attempts, 50)
+57 -40
View File
@@ -224,38 +224,53 @@ Status RedisStoreClient::DoPut(const std::string &key, const std::string &data,
Status RedisStoreClient::DeleteByKeys(const std::vector<std::string> &keys,
const StatusCallback &callback) {
// The `DEL` command for each shard.
auto del_commands_by_shards = GenCommandsByShards(redis_client_, "DEL", keys);
int total_count = 0;
auto del_commands_by_shards =
GenCommandsByShards(redis_client_, "DEL", keys, &total_count);
auto finished_count = std::make_shared<int>(0);
int size = del_commands_by_shards.size();
for (auto &item : del_commands_by_shards) {
auto delete_callback = [finished_count, size,
callback](const std::shared_ptr<CallbackReply> &reply) {
++(*finished_count);
if (*finished_count == size) {
if (callback) {
callback(Status::OK());
for (auto &command_list : del_commands_by_shards) {
for (auto &command : command_list.second) {
auto delete_callback = [finished_count, total_count,
callback](const std::shared_ptr<CallbackReply> &reply) {
++(*finished_count);
if (*finished_count == total_count) {
if (callback) {
callback(Status::OK());
}
}
}
};
RAY_CHECK_OK(item.first->RunArgvAsync(item.second, delete_callback));
};
RAY_CHECK_OK(command_list.first->RunArgvAsync(command, delete_callback));
}
}
return Status::OK();
}
std::unordered_map<RedisContext *, std::vector<std::string>>
std::unordered_map<RedisContext *, std::list<std::vector<std::string>>>
RedisStoreClient::GenCommandsByShards(const std::shared_ptr<RedisClient> &redis_client,
const std::string &command,
const std::vector<std::string> &keys) {
std::unordered_map<RedisContext *, std::vector<std::string>> commands_by_shards;
const std::vector<std::string> &keys, int *count) {
std::unordered_map<RedisContext *, std::list<std::vector<std::string>>>
commands_by_shards;
for (auto &key : keys) {
auto shard_context = redis_client->GetShardContext(key).get();
auto it = commands_by_shards.find(shard_context);
if (it == commands_by_shards.end()) {
commands_by_shards[shard_context].push_back(command);
commands_by_shards[shard_context].push_back(key);
auto key_vector = commands_by_shards[shard_context].emplace(
commands_by_shards[shard_context].begin(), std::vector<std::string>());
key_vector->push_back(command);
key_vector->push_back(key);
(*count)++;
} else {
it->second.push_back(key);
// If the last batch is full, add a new batch.
if (it->second.back().size() - 1 ==
RayConfig::instance().maximum_gcs_storage_operation_batch_size()) {
it->second.emplace_back(std::vector<std::string>());
it->second.back().push_back(command);
(*count)++;
}
it->second.back().push_back(key);
}
}
return commands_by_shards;
@@ -309,30 +324,32 @@ Status RedisStoreClient::MGetValues(
const std::vector<std::string> &keys,
const ItemCallback<std::unordered_map<std::string, std::string>> &callback) {
// The `MGET` command for each shard.
auto mget_commands_by_shards = GenCommandsByShards(redis_client, "MGET", keys);
int total_count = 0;
auto mget_commands_by_shards =
GenCommandsByShards(redis_client, "MGET", keys, &total_count);
auto finished_count = std::make_shared<int>(0);
int size = mget_commands_by_shards.size();
for (auto &item : mget_commands_by_shards) {
auto mget_keys = std::move(item.second);
auto mget_callback = [table_name, finished_count, size, mget_keys,
callback](const std::shared_ptr<CallbackReply> &reply) {
std::unordered_map<std::string, std::string> key_value_map;
if (!reply->IsNil()) {
auto value = reply->ReadAsStringArray();
// The 0 th element of mget_keys is "MGET", so we start from the 1 th element.
for (int index = 0; index < (int)value.size(); ++index) {
key_value_map[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] =
value[index];
auto key_value_map = std::make_shared<std::unordered_map<std::string, std::string>>();
for (auto &command_list : mget_commands_by_shards) {
for (auto &command : command_list.second) {
auto mget_keys = std::move(command);
auto mget_callback = [table_name, finished_count, total_count, mget_keys, callback,
key_value_map](const std::shared_ptr<CallbackReply> &reply) {
if (!reply->IsNil()) {
auto value = reply->ReadAsStringArray();
// The 0 th element of mget_keys is "MGET", so we start from the 1 th element.
for (int index = 0; index < (int)value.size(); ++index) {
(*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] =
value[index];
}
}
}
++(*finished_count);
if (*finished_count == size) {
callback(key_value_map);
}
};
RAY_CHECK_OK(item.first->RunArgvAsync(mget_keys, mget_callback));
++(*finished_count);
if (*finished_count == total_count) {
callback(*key_value_map);
}
};
RAY_CHECK_OK(command_list.first->RunArgvAsync(mget_keys, mget_callback));
}
}
return Status::OK();
}
@@ -377,7 +394,7 @@ void RedisStoreClient::RedisScanner::Scan(std::string match_pattern,
return;
}
size_t batch_count = RayConfig::instance().maximum_gcs_scan_batch_size();
size_t batch_count = RayConfig::instance().maximum_gcs_storage_operation_batch_size();
for (const auto &item : shard_to_cursor_) {
++pending_request_count_;
@@ -112,9 +112,12 @@ class RedisStoreClient : public StoreClient {
Status DeleteByKeys(const std::vector<std::string> &keys,
const StatusCallback &callback);
static std::unordered_map<RedisContext *, std::vector<std::string>> GenCommandsByShards(
const std::shared_ptr<RedisClient> &redis_client, const std::string &command,
const std::vector<std::string> &keys);
/// The return value is a map, whose key is the shard and the value is a list of batch
/// operations.
static std::unordered_map<RedisContext *, std::list<std::vector<std::string>>>
GenCommandsByShards(const std::shared_ptr<RedisClient> &redis_client,
const std::string &command, const std::vector<std::string> &keys,
int *count);
/// The separator is used when building redis key.
static std::string table_separator_;