From ba28dddf6f59ba14a6966602c6587e190239bc91 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 3 Jul 2018 17:32:44 -0700 Subject: [PATCH] Make xray object table credis-managed and hence flushable. (#2338) * monitor.py: issue flushes to data shard * ResultTableAdd & ObjectTableAdd: add credis-managed versions * Fix return codes * Credis-manage xray object table & associated ray.table_append cmd * Fix incorrect return code from TableAppend_DoWrite() * Revert "ResultTableAdd & ObjectTableAdd: add credis-managed versions" This reverts commit 628c2ea190df4c861dda0c284fab7ca6faa1ea24. * Address comments * Lint: fix indent * Address comment --- python/ray/monitor.py | 28 ++-- src/common/redis_module/chain_module.h | 5 + src/common/redis_module/ray_redis_module.cc | 143 ++++++++++++-------- src/ray/gcs/client.cc | 2 +- src/ray/gcs/tables.cc | 49 +++++-- src/ray/gcs/tables.h | 7 + 6 files changed, 158 insertions(+), 76 deletions(-) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index a4705314a..5bca2e402 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -111,14 +111,24 @@ class Monitor(object): self.issue_gcs_flushes = "RAY_USE_NEW_GCS" in os.environ self.gcs_flush_policy = None if self.issue_gcs_flushes: - # For now, we take the primary redis server to issue flushes, - # because task table entries are stored there under this flag. - try: - self.redis.execute_command("HEAD.FLUSH 0") - except redis.exceptions.ResponseError as e: - log.info("Turning off flushing due to exception: {}".format( - str(e))) + # Data is stored under the first data shard, so we issue flushes to + # that redis server. + addr_port = self.redis.lrange("RedisShards", 0, -1) + if len(addr_port) > 1: + log.warning("TODO: if launching > 1 redis shard, flushing " + "needs to touch shards in parallel.") self.issue_gcs_flushes = False + else: + addr_port = addr_port[0].split(b":") + self.redis_shard = redis.StrictRedis( + host=addr_port[0], port=addr_port[1]) + try: + self.redis_shard.execute_command("HEAD.FLUSH 0") + except redis.exceptions.ResponseError as e: + log.info( + "Turning off flushing due to exception: {}".format( + str(e))) + self.issue_gcs_flushes = False def subscribe(self, channel): """Subscribe to the given channel. @@ -562,11 +572,11 @@ class Monitor(object): return self.gcs_flush_policy = pickle.loads(serialized) - if not self.gcs_flush_policy.should_flush(self.redis): + if not self.gcs_flush_policy.should_flush(self.redis_shard): return max_entries_to_flush = self.gcs_flush_policy.num_entries_to_flush() - num_flushed = self.redis.execute_command( + num_flushed = self.redis_shard.execute_command( "HEAD.FLUSH {}".format(max_entries_to_flush)) log.info('num_flushed {}'.format(num_flushed)) diff --git a/src/common/redis_module/chain_module.h b/src/common/redis_module/chain_module.h index c26eabf4f..c713be2f2 100644 --- a/src/common/redis_module/chain_module.h +++ b/src/common/redis_module/chain_module.h @@ -50,6 +50,11 @@ class RedisChainModule { // Runs "node_func" on every node in the chain; after the tail node has run it // too, finalizes the mutation by running "tail_func". + // + // If node_func() returns non-zero, it is treated as an error and the entire + // update will terminate early, without running subsequent node_func() and the + // final tail_func(). + // // TODO(zongheng): currently only supports 1-node chain. int ChainReplicate(RedisModuleCtx *ctx, RedisModuleString **argv, diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index 3314665b5..28b9caf25 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -654,6 +654,75 @@ int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, } #endif +int TableAppend_DoWrite(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc, + RedisModuleString **mutated_key_str) { + if (argc < 5 || argc > 6) { + return RedisModule_WrongArity(ctx); + } + + RedisModuleString *prefix_str = argv[1]; + RedisModuleString *id = argv[3]; + RedisModuleString *data = argv[4]; + RedisModuleString *index_str = nullptr; + if (argc == 6) { + index_str = argv[5]; + } + + // Set the keys in the table. + RedisModuleKey *key = + OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, + mutated_key_str); + // Determine the index at which the data should be appended. If no index is + // requested, then is the current length of the log. + size_t index = RedisModule_ValueLength(key); + if (index_str != nullptr) { + // Parse the requested index. + long long requested_index; + RAY_CHECK(RedisModule_StringToLongLong(index_str, &requested_index) == + REDISMODULE_OK); + RAY_CHECK(requested_index >= 0); + index = static_cast(requested_index); + } + // Only perform the append if the requested index matches the current length + // of the log, or if no index was requested. + if (index == RedisModule_ValueLength(key)) { + // The requested index matches the current length of the log or no index + // was requested. Perform the append. + int flags = REDISMODULE_ZADD_NX; + RedisModule_ZsetAdd(key, index, data, &flags); + // Check that we actually add a new entry during the append. This is only + // necessary since we implement the log with a sorted set, so all entries + // must be unique, or else we will have gaps in the log. + RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry"; + return REDISMODULE_OK; + } else { + // The requested index did not match the current length of the log. Return + // an error message as a string. + static const char *reply = "ERR entry exists"; + RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); + return REDISMODULE_ERR; + } +} + +int TableAppend_DoPublish(RedisModuleCtx *ctx, + RedisModuleString **argv, + int /*argc*/) { + RedisModuleString *pubsub_channel_str = argv[2]; + RedisModuleString *id = argv[3]; + RedisModuleString *data = argv[4]; + // Publish a message on the requested pubsub channel if necessary. + TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); + if (pubsub_channel != TablePubsub::NO_PUBLISH) { + // All other pubsub channels write the data back directly onto the + // channel. + return PublishTableAdd(ctx, pubsub_channel_str, id, data); + } else { + return RedisModule_ReplyWithSimpleString(ctx, "OK"); + } +} + /// Append an entry to the log stored at a key. Publishes a notification about /// the update to all subscribers, if a pubsub channel is provided. /// @@ -678,62 +747,25 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_AutoMemory(ctx); - - if (argc < 5 || argc > 6) { - return RedisModule_WrongArity(ctx); - } - - RedisModuleString *prefix_str = argv[1]; - RedisModuleString *pubsub_channel_str = argv[2]; - RedisModuleString *id = argv[3]; - RedisModuleString *data = argv[4]; - RedisModuleString *index_str = nullptr; - if (argc == 6) { - index_str = argv[5]; - } - - // Set the keys in the table. - RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, - REDISMODULE_READ | REDISMODULE_WRITE); - // Determine the index at which the data should be appended. If no index is - // requested, then is the current length of the log. - size_t index = RedisModule_ValueLength(key); - if (index_str != nullptr) { - // Parse the requested index. - long long requested_index; - RAY_CHECK(RedisModule_StringToLongLong(index_str, &requested_index) == - REDISMODULE_OK); - RAY_CHECK(requested_index >= 0); - index = static_cast(requested_index); - } - // Only perform the append if the requested index matches the current length - // of the log, or if no index was requested. - if (index == RedisModule_ValueLength(key)) { - // The requested index matches the current length of the log or no index - // was requested. Perform the append. - int flags = REDISMODULE_ZADD_NX; - RedisModule_ZsetAdd(key, index, data, &flags); - // Check that we actually add a new entry during the append. This is only - // necessary since we implement the log with a sorted set, so all entries - // must be unique, or else we will have gaps in the log. - RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry"; - // Publish a message on the requested pubsub channel if necessary. - TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); - if (pubsub_channel != TablePubsub::NO_PUBLISH) { - // All other pubsub channels write the data back directly onto the - // channel. - return PublishTableAdd(ctx, pubsub_channel_str, id, data); - } else { - return RedisModule_ReplyWithSimpleString(ctx, "OK"); - } - } else { - // The requested index did not match the current length of the log. Return - // an error message as a string. - const char *reply = "ERR entry exists"; - return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); + const int status = TableAppend_DoWrite(ctx, argv, argc, + /*mutated_key_str=*/nullptr); + if (status) { + return status; } + return TableAppend_DoPublish(ctx, argv, argc); } +#if RAY_USE_NEW_GCS +int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + RedisModule_AutoMemory(ctx); + return module.ChainReplicate(ctx, argv, argc, + /*node_func=*/TableAppend_DoWrite, + /*tail_func=*/TableAppend_DoPublish); +} +#endif + /// A helper function to create and finish a GcsTableEntry, based on the /// current value or values at the given key. void TableEntryToFlatbuf(RedisModuleKey *table_key, @@ -1833,6 +1865,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, 0, 0) == REDISMODULE_ERR) { return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "ray.chain.table_append", + ChainTableAppend_RedisCommand, "write pubsub", + 0, 0, 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } #endif return REDISMODULE_OK; diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index 3d06407c5..1700eac5f 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -10,7 +10,7 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty context_ = std::make_shared(); primary_context_ = std::make_shared(); client_table_.reset(new ClientTable(primary_context_, this, client_id)); - object_table_.reset(new ObjectTable(context_, this)); + object_table_.reset(new ObjectTable(context_, this, command_type)); actor_table_.reset(new ActorTable(context_, this)); task_table_.reset(new TaskTable(context_, this, command_type)); raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type)); diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index fa2e9b7b2..a499a30aa 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -3,6 +3,34 @@ #include "common_protocol.h" #include "ray/gcs/client.h" +namespace { + +static const std::string kTableAppendCommand = "RAY.TABLE_APPEND"; +static const std::string kChainTableAppendCommand = "RAY.CHAIN.TABLE_APPEND"; + +static const std::string kTableAddCommand = "RAY.TABLE_ADD"; +static const std::string kChainTableAddCommand = "RAY.CHAIN.TABLE_ADD"; + +std::string GetLogAppendCommand(const ray::gcs::CommandType command_type) { + if (command_type == ray::gcs::CommandType::kRegular) { + return kTableAppendCommand; + } else { + RAY_CHECK(command_type == ray::gcs::CommandType::kChain); + return kChainTableAppendCommand; + } +} + +std::string GetTableAddCommand(const ray::gcs::CommandType command_type) { + if (command_type == ray::gcs::CommandType::kRegular) { + return kTableAddCommand; + } else { + RAY_CHECK(command_type == ray::gcs::CommandType::kChain); + return kChainTableAddCommand; + } +} + +} // namespace + namespace ray { namespace gcs { @@ -19,8 +47,9 @@ Status Log::Append(const JobID &job_id, const ID &id, flatbuffers::FlatBufferBuilder fbb; fbb.ForceDefaults(true); fbb.Finish(Data::Pack(fbb, dataT.get())); - return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(), - prefix_, pubsub_channel_, std::move(callback)); + return context_->RunAsync(GetLogAppendCommand(command_type_), id, + fbb.GetBufferPointer(), fbb.GetSize(), prefix_, + pubsub_channel_, std::move(callback)); } template @@ -42,8 +71,9 @@ Status Log::AppendAt(const JobID &job_id, const ID &id, flatbuffers::FlatBufferBuilder fbb; fbb.ForceDefaults(true); fbb.Finish(Data::Pack(fbb, dataT.get())); - return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(), - prefix_, pubsub_channel_, std::move(callback), log_length); + return context_->RunAsync(GetLogAppendCommand(command_type_), id, + fbb.GetBufferPointer(), fbb.GetSize(), prefix_, + pubsub_channel_, std::move(callback), log_length); } template @@ -140,15 +170,8 @@ Status Table::Add(const JobID &job_id, const ID &id, flatbuffers::FlatBufferBuilder fbb; fbb.ForceDefaults(true); fbb.Finish(Data::Pack(fbb, dataT.get())); - if (command_type_ == CommandType::kRegular) { - return context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), fbb.GetSize(), - prefix_, pubsub_channel_, std::move(callback)); - } else { - RAY_CHECK(command_type_ == CommandType::kChain); - return context_->RunAsync("RAY.CHAIN.TABLE_ADD", id, fbb.GetBufferPointer(), - fbb.GetSize(), prefix_, pubsub_channel_, - std::move(callback)); - } + return context_->RunAsync(GetTableAddCommand(command_type_), id, fbb.GetBufferPointer(), + fbb.GetSize(), prefix_, pubsub_channel_, std::move(callback)); } template diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index d4b0c540d..e2e719de0 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -276,6 +276,13 @@ class ObjectTable : public Log { pubsub_channel_ = TablePubsub::OBJECT; prefix_ = TablePrefix::OBJECT; }; + + ObjectTable(const std::shared_ptr &context, AsyncGcsClient *client, + gcs::CommandType command_type) + : ObjectTable(context, client) { + command_type_ = command_type; + }; + virtual ~ObjectTable(){}; };