diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 42c295e12..c8825e266 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -303,11 +303,14 @@ Status RedisContext::RunAsync(const std::string &command, const UniqueID &id, Status RedisContext::SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel, - const RedisCallback &redisCallback) { + const RedisCallback &redisCallback, + int64_t *out_callback_index) { RAY_CHECK(pubsub_channel != TablePubsub::NO_PUBLISH) << "Client requested subscribe on a table that does not support pubsub"; int64_t callback_index = RedisCallbackManager::instance().add(redisCallback); + RAY_CHECK(out_callback_index != nullptr); + *out_callback_index = callback_index; int status = 0; if (client_id.is_nil()) { // Subscribe to all messages. diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index 86460a004..a9f988afe 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -57,7 +57,7 @@ class RedisContext { /// Run an operation on some table key. /// /// \param command The command to run. This must match a registered Ray Redis - /// command. These are strings of the format "RAY.TABLE_*". + /// command. These are strings of the format "RAY.TABLE_*". /// \param id The table key to run the operation at. /// \param data The data to add to the table key, if any. /// \param length The length of the data to be added, if data is provided. @@ -65,15 +65,23 @@ class RedisContext { /// \param pubsub_channel /// \param redisCallback The Redis callback function. /// \param log_length The RAY.TABLE_APPEND command takes in an optional index - /// at which the data must be appended. For all other commands, set to - /// -1 for unused. If set, then data must be provided. + /// at which the data must be appended. For all other commands, set to + /// -1 for unused. If set, then data must be provided. + /// \return Status. Status RunAsync(const std::string &command, const UniqueID &id, const uint8_t *data, int64_t length, const TablePrefix prefix, const TablePubsub pubsub_channel, RedisCallback redisCallback, int log_length = -1); + /// Subscribe to a specific Pub-Sub channel. + /// + /// \param client_id The client ID that subscribe this message. + /// \param pubsub_channel The Pub-Sub channel to subscribe to. + /// \param redisCallback The callback function that the notification calls. + /// \param out_callback_index The output pointer to callback index. + /// \return Status. Status SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel, - const RedisCallback &redisCallback); + const RedisCallback &redisCallback, int64_t *out_callback_index); redisAsyncContext *async_context() { return async_context_; } redisAsyncContext *subscribe_context() { return subscribe_context_; }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 54f30aa28..8b21d9e59 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -136,8 +136,8 @@ Status Log::Subscribe(const JobID &job_id, const ClientID &client_id, // more subscription messages. return false; }; - subscribe_callback_index_ = 1; - return context_->SubscribeAsync(client_id, pubsub_channel_, std::move(callback)); + return context_->SubscribeAsync(client_id, pubsub_channel_, std::move(callback), + &subscribe_callback_index_); } template