From c50f1966e0ef65b8ee1a47a6b24fb3c8dcd2b4b0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 5 Jul 2018 13:39:07 -0700 Subject: [PATCH] Publish a notification for empty keys in the GCS (#2347) * Publish an empty notification for empty keys * Add failure callback to Table::Subscribe, add unit test for new behavior --- src/common/redis_module/ray_redis_module.cc | 24 +++++++---- src/ray/gcs/client_test.cc | 46 ++++++++++++++------- src/ray/gcs/tables.cc | 14 +++++-- src/ray/gcs/tables.h | 20 ++++++++- src/ray/raylet/monitor.cc | 4 +- src/ray/raylet/node_manager.cc | 5 ++- 6 files changed, 82 insertions(+), 31 deletions(-) diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index e7edb1c0a..d594f74ef 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -800,6 +800,13 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key, fbb.CreateVector(data)); fbb.Finish(message); } break; + case REDISMODULE_KEYTYPE_EMPTY: { + auto message = CreateGcsTableEntry( + fbb, RedisStringToFlatbuf(fbb, entry_id), + fbb.CreateVector( + std::vector>())); + fbb.Finish(message); + } break; default: RAY_LOG(FATAL) << "Invalid Redis type during lookup: " << key_type; } @@ -889,15 +896,14 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, // Lookup the current value at the key. RedisModuleKey *table_key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); - if (table_key != nullptr) { - // Publish the current value at the key to the client that is requesting - // notifications. - flatbuffers::FlatBufferBuilder fbb; - TableEntryToFlatbuf(table_key, id, fbb); - RedisModule_Call(ctx, "PUBLISH", "sb", client_channel, - reinterpret_cast(fbb.GetBufferPointer()), - fbb.GetSize()); - } + // Publish the current value at the key to the client that is requesting + // notifications. An empty notification will be published if the key is + // empty. + flatbuffers::FlatBufferBuilder fbb; + TableEntryToFlatbuf(table_key, id, fbb); + RedisModule_Call(ctx, "PUBLISH", "sb", client_channel, + reinterpret_cast(fbb.GetBufferPointer()), + fbb.GetSize()); return RedisModule_ReplyWithNull(ctx); } diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 05c3acf5e..715edc797 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -388,6 +388,12 @@ void TestTableSubscribeAll(const JobID &job_id, } }; + // The failure callback should not be called if we are subscribing to + // notifications for all keys. + auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) { + RAY_CHECK(false); + }; + // Callback for subscription success. We are guaranteed to receive // notifications after this is called. auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) { @@ -403,7 +409,8 @@ void TestTableSubscribeAll(const JobID &job_id, // subscribed, we will write the key several times and check that we get // notified for each. RAY_CHECK_OK(client->raylet_task_table().Subscribe( - job_id, ClientID::nil(), notification_callback, subscribe_callback)); + job_id, ClientID::nil(), notification_callback, failure_callback, + subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called (or an assertion failure). test->Start(); @@ -451,7 +458,7 @@ void TestLogSubscribeAll(const JobID &job_id, } }; - // Subscribe to all task table notifications. Once we have successfully + // Subscribe to all object table notifications. Once we have successfully // subscribed, we will append to the key several times and check that we get // notified for each. RAY_CHECK_OK(client->object_table().Subscribe( @@ -479,16 +486,10 @@ void TestTableSubscribeId(const JobID &job_id, // Add a table entry. TaskID task_id1 = TaskID::from_random(); std::vector task_specs1 = {"abc", "def", "ghi"}; - auto data1 = std::make_shared(); - data1->task_specification = task_specs1[0]; - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data1, nullptr)); // Add a table entry at a second key. TaskID task_id2 = TaskID::from_random(); std::vector task_specs2 = {"jkl", "mno", "pqr"}; - auto data2 = std::make_shared(); - data2->task_specification = task_specs2[0]; - RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data2, nullptr)); // The callback for a notification from the table. This should only be // received for keys that we requested notifications for. @@ -504,6 +505,16 @@ void TestTableSubscribeId(const JobID &job_id, } }; + // The failure callback should be called once since both keys start as empty. + bool failure_notification_received = false; + auto failure_callback = [task_id2, &failure_notification_received]( + gcs::AsyncGcsClient *client, const UniqueID &id) { + ASSERT_EQ(id, task_id2); + // The failure notification should be the first notification received. + ASSERT_EQ(test->NumCallbacks(), 0); + failure_notification_received = true; + }; + // The callback for subscription success. Once we've subscribed, request // notifications for only one of the keys, then write to both keys. auto subscribe_callback = [job_id, task_id1, task_id2, task_specs1, @@ -513,14 +524,12 @@ void TestTableSubscribeId(const JobID &job_id, job_id, task_id2, client->client_table().GetLocalClientId())); // Write both keys. We should only receive notifications for the key that // we requested them for. - auto remaining = std::vector(++task_specs1.begin(), task_specs1.end()); - for (const auto &task_spec : remaining) { + for (const auto &task_spec : task_specs1) { auto data = std::make_shared(); data->task_specification = task_spec; RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr)); } - remaining = std::vector(++task_specs2.begin(), task_specs2.end()); - for (const auto &task_spec : remaining) { + for (const auto &task_spec : task_specs2) { auto data = std::make_shared(); data->task_specification = task_spec; RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr)); @@ -531,10 +540,13 @@ void TestTableSubscribeId(const JobID &job_id, // receive notifications for specific keys. RAY_CHECK_OK(client->raylet_task_table().Subscribe( job_id, client->client_table().GetLocalClientId(), notification_callback, - subscribe_callback)); + failure_callback, subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called for the requested key. test->Start(); + // Check that the failure callback was called since the key was initially + // empty. + ASSERT_TRUE(failure_notification_received); // Check that we received one notification callback for each write to the // requested key. ASSERT_EQ(test->NumCallbacks(), task_specs2.size()); @@ -635,6 +647,12 @@ void TestTableSubscribeCancel(const JobID &job_id, data->task_specification = task_specs[0]; RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + // The failure callback should not be called since all keys are non-empty + // when notifications are requested. + auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) { + RAY_CHECK(false); + }; + // The callback for a notification from the table. This should only be // received for keys that we requested notifications for. auto notification_callback = [task_id, task_specs]( @@ -680,7 +698,7 @@ void TestTableSubscribeCancel(const JobID &job_id, // receive notifications for specific keys. RAY_CHECK_OK(client->raylet_task_table().Subscribe( job_id, client->client_table().GetLocalClientId(), notification_callback, - subscribe_callback)); + failure_callback, subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called for the requested key. test->Start(); diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index d5ed28699..c570f6930 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -196,12 +196,20 @@ Status Table::Lookup(const JobID &job_id, const ID &id, const Callback template Status Table::Subscribe(const JobID &job_id, const ClientID &client_id, const Callback &subscribe, + const FailureCallback &failure, const SubscriptionCallback &done) { return Log::Subscribe( job_id, client_id, - [subscribe](AsyncGcsClient *client, const ID &id, const std::vector &data) { - RAY_CHECK(data.size() == 1); - subscribe(client, id, data[0]); + [subscribe, failure](AsyncGcsClient *client, const ID &id, + const std::vector &data) { + RAY_CHECK(data.empty() || data.size() == 1); + if (data.size() == 1) { + subscribe(client, id, data[0]); + } else { + if (failure != nullptr) { + failure(client, id); + } + } }, done); } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index b10efe40b..fc2d9a194 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -260,8 +260,26 @@ class Table : private Log, Status Lookup(const JobID &job_id, const ID &id, const Callback &lookup, const FailureCallback &failure); + /// Subscribe to any Add operations to this table. The caller may choose to + /// subscribe to all Adds, or to subscribe only to keys that it requests + /// notifications for. This may only be called once per Table instance. + /// + /// \param job_id The ID of the job (= driver). + /// \param client_id The type of update to listen to. If this is nil, then a + /// message for each Add to the table will be received. Else, only + /// messages for the given client will be received. In the latter + /// case, the client may request notifications on specific keys in the + /// table via `RequestNotifications`. + /// \param subscribe Callback that is called on each received message. If the + /// callback is called with an empty vector, then there was no data at the key. + /// \param failure Callback that is called if the key is empty at the time + /// that notifications are requested. + /// \param done Callback that is called when subscription is complete and we + /// are ready to receive messages. + /// \return Status Status Subscribe(const JobID &job_id, const ClientID &client_id, - const Callback &subscribe, const SubscriptionCallback &done); + const Callback &subscribe, const FailureCallback &failure, + const SubscriptionCallback &done); protected: using Log::context_; diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index 52e45b1d7..dec692a75 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -31,8 +31,8 @@ void Monitor::Start() { const HeartbeatTableDataT &heartbeat_data) { HandleHeartbeat(id); }; - RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(), - heartbeat_callback, nullptr)); + RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe( + UniqueID::nil(), UniqueID::nil(), heartbeat_callback, nullptr, nullptr)); Tick(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d82b9553a..b9eb500a5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -126,7 +126,7 @@ ray::Status NodeManager::RegisterGcs() { }; RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe( JobID::nil(), gcs_client_->client_table().GetLocalClientId(), - task_committed_callback, nullptr)); + task_committed_callback, nullptr, nullptr)); // Register a callback for actor creation notifications. auto actor_creation_callback = [this]( @@ -149,7 +149,8 @@ ray::Status NodeManager::RegisterGcs() { HeartbeatAdded(client, id, heartbeat_data); }; RAY_RETURN_NOT_OK(gcs_client_->heartbeat_table().Subscribe( - UniqueID::nil(), UniqueID::nil(), heartbeat_added, [](gcs::AsyncGcsClient *client) { + UniqueID::nil(), UniqueID::nil(), heartbeat_added, nullptr, + [](gcs::AsyncGcsClient *client) { RAY_LOG(DEBUG) << "heartbeat table subscription done callback called."; }));