Convert the ObjectTable implementation to a Log (#1779)

* TABLE_APPEND call

* Convert callbacks back to taking in a string...

* GCS returns flatbuffers, define Log class

* Cleanups

* Modify client table to use the Log interface

* Fix bug where we replied twice from redis

* Fixes

* lint

* Compile and test raylet TaskTable

* Modify GCS tables to handle unique_ptrs from nested flatbuffers

* Add raylet::TaskTable unit tests to replace ObjectTable ones

* Convert ObjectTable to a log

* Convert ObjectTable tests to the Log
This commit is contained in:
Stephanie Wang
2018-03-26 20:36:48 -07:00
committed by Philipp Moritz
parent 1ab0d0ea69
commit 51fdbe3867
5 changed files with 479 additions and 210 deletions
+446 -179
View File
@@ -42,12 +42,12 @@ class TestGcs : public ::testing::Test {
virtual void Stop() = 0;
int64_t NumCallbacks() const { return num_callbacks_; }
uint64_t NumCallbacks() const { return num_callbacks_; }
void IncrementNumCallbacks() { num_callbacks_++; }
protected:
int64_t num_callbacks_;
uint64_t num_callbacks_;
std::shared_ptr<gcs::AsyncGcsClient> client_;
JobID job_id_;
};
@@ -91,44 +91,32 @@ class TestGcsWithAsio : public TestGcs {
boost::asio::io_service::work work_;
};
void ObjectAdded(gcs::AsyncGcsClient *client, const UniqueID &id,
const ObjectTableDataT &data) {
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
}
void Lookup(gcs::AsyncGcsClient *client, const UniqueID &id,
const ObjectTableDataT &data) {
// Check that the object entry was added.
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
test->Stop();
}
void LookupFailed(gcs::AsyncGcsClient *client, const UniqueID &id) {
// Object entry failed.
RAY_CHECK(false);
test->Stop();
}
void TestTableLookup(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = "123";
auto add_callback = [data](gcs::AsyncGcsClient *client, const UniqueID &id,
const protocol::TaskT &d) {
ASSERT_EQ(data->task_specification, d.task_specification);
// Check that we added the correct task.
auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const UniqueID &id,
const std::shared_ptr<protocol::TaskT> d) {
ASSERT_EQ(id, task_id);
ASSERT_EQ(data->task_specification, d->task_specification);
};
auto lookup_callback = [data](gcs::AsyncGcsClient *client, const UniqueID &id,
const protocol::TaskT &d) {
// Check that the lookup returns the added task.
auto lookup_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id,
const protocol::TaskT &d) {
ASSERT_EQ(id, task_id);
ASSERT_EQ(data->task_specification, d.task_specification);
test->Stop();
};
// Check that the lookup does not return an empty entry.
auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) {
RAY_CHECK(false);
};
// Add the task, then do a lookup.
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
@@ -147,32 +135,89 @@ TEST_F(TestGcsWithAsio, TestTableLookup) {
TestTableLookup(job_id_, client_);
}
void TestLookupFailure(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
auto object_id = ObjectID::from_random();
// Looking up an empty object ID should call the failure callback.
auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) {
void TestLogLookup(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Append some entries to the log at an object ID.
ObjectID object_id = ObjectID::from_random();
std::vector<std::string> managers = {"abc", "def", "ghi"};
for (auto &manager : managers) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = manager;
// Check that we added the correct object entries.
auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const UniqueID &id,
const std::shared_ptr<ObjectTableDataT> d) {
ASSERT_EQ(id, object_id);
ASSERT_EQ(data->manager, d->manager);
};
RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, add_callback));
}
// Check that lookup returns the added object entries.
auto lookup_callback = [object_id, managers](
gcs::AsyncGcsClient *client, const ObjectID &id,
const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(id, object_id);
for (const auto &entry : data) {
ASSERT_EQ(entry.manager, managers[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == managers.size()) {
test->Stop();
}
};
// Do a lookup at the object ID.
RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback));
// Run the event loop. The loop will only stop if the Lookup callback is
// called (or an assertion failure).
test->Start();
ASSERT_EQ(test->NumCallbacks(), managers.size());
}
TEST_F(TestGcsWithAe, TestLogLookup) {
test = this;
TestLogLookup(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestLogLookup) {
test = this;
TestLogLookup(job_id_, client_);
}
void TestTableLookupFailure(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
// Check that the lookup does not return data.
auto lookup_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id,
const protocol::TaskT &d) { RAY_CHECK(false); };
// Check that the lookup returns an empty entry.
auto failure_callback = [task_id](gcs::AsyncGcsClient *client, const UniqueID &id) {
ASSERT_EQ(id, task_id);
test->Stop();
};
RAY_CHECK_OK(
client->object_table().Lookup(job_id, object_id, nullptr, failure_callback));
// Lookup the task. We have not done any writes, so the key should be empty.
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback,
failure_callback));
// Run the event loop. The loop will only stop if the failure callback is
// called.
// called (or an assertion failure).
test->Start();
}
TEST_F(TestGcsWithAe, TestLookupFailure) {
TEST_F(TestGcsWithAe, TestTableLookupFailure) {
test = this;
TestLookupFailure(job_id_, client_);
TestTableLookupFailure(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestLookupFailure) {
TEST_F(TestGcsWithAsio, TestTableLookupFailure) {
test = this;
TestLookupFailure(job_id_, client_);
TestTableLookupFailure(job_id_, client_);
}
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
const std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
}
void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id,
@@ -233,191 +278,413 @@ TEST_F(TestGcsWithAsio, TestTaskTable) {
TestTaskTable(job_id_, client_);
}
void TestSubscribeAll(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
ObjectID object_id = ObjectID::from_random();
void TestTableSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
std::vector<std::string> task_specs = {"abc", "def", "ghi"};
// Callback for a notification.
auto notification_callback = [object_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id);
// Check that the object entry was added.
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
auto notification_callback = [task_id, task_specs](
gcs::AsyncGcsClient *client, const UniqueID &id, const protocol::TaskT &data) {
ASSERT_EQ(id, task_id);
// Check that we get notifications in the same order as the writes.
ASSERT_EQ(data.task_specification, task_specs[test->NumCallbacks()]);
test->IncrementNumCallbacks();
test->Stop();
if (test->NumCallbacks() == task_specs.size()) {
test->Stop();
}
};
// Callback for subscription success. This should only be called once.
auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// We have subscribed. Add an object table entry.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
data->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, &ObjectAdded));
// Callback for subscription success. We are guaranteed to receive
// notifications after this is called.
auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (const auto &task_spec : task_specs) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
}
};
// Subscribe to all object table notifications. Once we have successfully
// subscribed, we will add an object and check that we get notified of the
// operation.
// Subscribe to all task table notifications. Once we have successfully
// subscribed, we will write the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, ClientID::nil(), notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one notification callback for each write.
ASSERT_EQ(test->NumCallbacks(), task_specs.size());
}
TEST_F(TestGcsWithAe, TestTableSubscribeAll) {
test = this;
TestTableSubscribeAll(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableSubscribeAll) {
test = this;
TestTableSubscribeAll(job_id_, client_);
}
void TestLogSubscribeAll(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
std::vector<std::string> managers = {"abc", "def", "ghi"};
std::vector<ObjectID> object_ids;
for (size_t i = 0; i < managers.size(); i++) {
object_ids.push_back(ObjectID::from_random());
}
// Callback for a notification.
auto notification_callback = [object_ids, managers](
gcs::AsyncGcsClient *client, const UniqueID &id,
const std::vector<ObjectTableDataT> data) {
ASSERT_EQ(id, object_ids[test->NumCallbacks()]);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.manager, managers[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == managers.size()) {
test->Stop();
}
};
// Callback for subscription success. We are guaranteed to receive
// notifications after this is called.
auto subscribe_callback = [job_id, object_ids, managers](gcs::AsyncGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (size_t i = 0; i < object_ids.size(); i++) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = managers[i];
RAY_CHECK_OK(client->object_table().Append(job_id, object_ids[i], data, nullptr));
}
};
// Subscribe to all task table notifications. Once we have successfully
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->object_table().Subscribe(
job_id, ClientID::nil(), notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one callback for subscription success and one for
// the Add notification.
// Check that we received one notification callback for each write.
ASSERT_EQ(test->NumCallbacks(), managers.size());
}
TEST_F(TestGcsWithAe, TestLogSubscribeAll) {
test = this;
TestLogSubscribeAll(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestLogSubscribeAll) {
test = this;
TestLogSubscribeAll(job_id_, client_);
}
void TestTableSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a table entry.
TaskID task_id1 = TaskID::from_random();
std::vector<std::string> task_specs1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<protocol::TaskT>();
data1->task_specification = task_specs1[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data1, nullptr));
// Add a table entry at a second key.
TaskID task_id2 = TaskID::from_random();
std::vector<std::string> task_specs2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<protocol::TaskT>();
data2->task_specification = task_specs2[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data2, nullptr));
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id2, task_specs2](
gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, task_id2);
// Check that we get notifications in the same order as the writes.
ASSERT_EQ(data.task_specification, task_specs2[test->NumCallbacks()]);
test->IncrementNumCallbacks();
if (test->NumCallbacks() == task_specs2.size()) {
test->Stop();
}
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, task_id1, task_id2, task_specs1,
task_specs2](gcs::AsyncGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id2, client->client_table().GetLocalClientId()));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
auto remaining = std::vector<std::string>(++task_specs1.begin(), task_specs1.end());
for (const auto &task_spec : remaining) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr));
}
remaining = std::vector<std::string>(++task_specs2.begin(), task_specs2.end());
for (const auto &task_spec : remaining) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), task_specs2.size());
}
TEST_F(TestGcsWithAe, TestTableSubscribeId) {
test = this;
TestTableSubscribeId(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestTableSubscribeId) {
test = this;
TestTableSubscribeId(job_id_, client_);
}
void TestLogSubscribeId(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a log entry.
ObjectID object_id1 = ObjectID::from_random();
std::vector<std::string> managers1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<ObjectTableDataT>();
data1->manager = managers1[0];
RAY_CHECK_OK(client->object_table().Append(job_id, object_id1, data1, nullptr));
// Add a log entry at a second key.
ObjectID object_id2 = ObjectID::from_random();
std::vector<std::string> managers2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<ObjectTableDataT>();
data2->manager = managers2[0];
RAY_CHECK_OK(client->object_table().Append(job_id, object_id2, data2, nullptr));
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [object_id2, managers2](
gcs::AsyncGcsClient *client, const ObjectID &id,
const std::vector<ObjectTableDataT> &data) {
// Check that we only get notifications for the requested key.
ASSERT_EQ(id, object_id2);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.manager, managers2[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == managers2.size()) {
test->Stop();
}
};
// The callback for subscription success. Once we've subscribed, request
// notifications for only one of the keys, then write to both keys.
auto subscribe_callback = [job_id, object_id1, object_id2, managers1,
managers2](gcs::AsyncGcsClient *client) {
// Request notifications for one of the keys.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id2, client->client_table().GetLocalClientId()));
// Write both keys. We should only receive notifications for the key that
// we requested them for.
auto remaining = std::vector<std::string>(++managers1.begin(), managers1.end());
for (const auto &manager : remaining) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = manager;
RAY_CHECK_OK(client->object_table().Append(job_id, object_id1, data, nullptr));
}
remaining = std::vector<std::string>(++managers2.begin(), managers2.end());
for (const auto &manager : remaining) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = manager;
RAY_CHECK_OK(client->object_table().Append(job_id, object_id2, data, nullptr));
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(
client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received one notification callback for each write to the
// requested key.
ASSERT_EQ(test->NumCallbacks(), managers2.size());
}
TEST_F(TestGcsWithAe, TestLogSubscribeId) {
test = this;
TestLogSubscribeId(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestLogSubscribeId) {
test = this;
TestLogSubscribeId(job_id_, client_);
}
void TestTableSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a table entry.
TaskID task_id = TaskID::from_random();
std::vector<std::string> task_specs = {"jkl", "mno", "pqr"};
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_specs[0];
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto notification_callback = [task_id, task_specs](
gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
ASSERT_EQ(id, task_id);
// Check that we only get notifications for the first and last writes,
// since notifications are canceled in between.
if (test->NumCallbacks() == 0) {
ASSERT_EQ(data.task_specification, task_specs.front());
} else {
ASSERT_EQ(data.task_specification, task_specs.back());
}
test->IncrementNumCallbacks();
if (test->NumCallbacks() == 2) {
test->Stop();
}
};
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId()));
RAY_CHECK_OK(client->raylet_task_table().CancelNotifications(
job_id, task_id, client->client_table().GetLocalClientId()));
// Write to the key. Since we canceled notifications, we should not receive
// a notification for these writes.
auto remaining = std::vector<std::string>(++task_specs.begin(), task_specs.end());
for (const auto &task_spec : remaining) {
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_spec;
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current value at the key.
RAY_CHECK_OK(client->raylet_task_table().RequestNotifications(
job_id, task_id, client->client_table().GetLocalClientId()));
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(client->raylet_task_table().Subscribe(
job_id, client->client_table().GetLocalClientId(), notification_callback,
subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for the requested key.
test->Start();
// Check that we received a notification callback for the first and least
// writes to the key, since notifications are canceled in between.
ASSERT_EQ(test->NumCallbacks(), 2);
}
TEST_F(TestGcsWithAe, TestSubscribeAll) {
TEST_F(TestGcsWithAe, TestTableSubscribeCancel) {
test = this;
TestSubscribeAll(job_id_, client_);
TestTableSubscribeCancel(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestSubscribeAll) {
TEST_F(TestGcsWithAsio, TestTableSubscribeCancel) {
test = this;
TestSubscribeAll(job_id_, client_);
TestTableSubscribeCancel(job_id_, client_);
}
void TestSubscribeId(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add an object table entry.
ObjectID object_id1 = ObjectID::from_random();
auto data1 = std::make_shared<ObjectTableDataT>();
data1->managers.push_back("A");
data1->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id1, data1, nullptr));
// Add a second object table entry.
ObjectID object_id2 = ObjectID::from_random();
auto data2 = std::make_shared<ObjectTableDataT>();
data2->managers.push_back("C");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data2, nullptr));
// The callback for subscription success. Once we've subscribed, request
// notifications for the second object that was added.
auto subscribe_callback = [job_id, object_id2](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// Request notifications for the second object. Since we already added the
// entry to the table, we should receive an initial notification for its
// current value.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id2, client->client_table().GetLocalClientId()));
// Overwrite the entry for the object. We should receive a second
// notification for its new value.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("C");
data->managers.push_back("D");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data, nullptr));
};
void TestLogSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a log entry.
ObjectID object_id = ObjectID::from_random();
std::vector<std::string> managers = {"jkl", "mno", "pqr"};
auto data = std::make_shared<ObjectTableDataT>();
data->manager = managers[0];
RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, nullptr));
// The callback for a notification from the object table. This should only be
// received for the object that we requested notifications for.
auto notification_callback = [data2, object_id2](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id2);
// Check that we got a notification for the correct object.
ASSERT_EQ(data.managers.front(), "C");
test->IncrementNumCallbacks();
// Stop the loop once we've received notifications for both writes to the
// object key.
if (test->NumCallbacks() == 3) {
auto notification_callback = [object_id, managers](
gcs::AsyncGcsClient *client, const ObjectID &id,
const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(id, object_id);
// Check that we get a duplicate notification for the first write. We get a
// duplicate notification because the log is append-only and notifications
// are canceled after the first write, then requested again.
auto managers_copy = managers;
managers_copy.insert(managers_copy.begin(), managers_copy.front());
for (const auto &entry : data) {
ASSERT_EQ(entry.manager, managers_copy[test->NumCallbacks()]);
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == managers_copy.size()) {
test->Stop();
}
};
RAY_CHECK_OK(
client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called for both writes to the object key.
test->Start();
// Check that we received one callback for subscription success and two
// callbacks for the Add notifications.
ASSERT_EQ(test->NumCallbacks(), 3);
}
TEST_F(TestGcsWithAe, TestSubscribeId) {
test = this;
TestSubscribeId(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestSubscribeId) {
test = this;
TestSubscribeId(job_id_, client_);
}
void TestSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Write the object table once.
ObjectID object_id = ObjectID::from_random();
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));
// The callback for subscription success. Once we've subscribed, request
// notifications for the second object that was added.
auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// Request notifications for the object. We should receive a notification
// for the current value at the key.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
// Cancel notifications.
RAY_CHECK_OK(client->object_table().CancelNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
// Write the object table entry twice. Since we canceled notifications, we
// should not get notifications for either of these writes.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));
data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("C");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));
// Request notifications for the object again. We should only receive a
// The callback for a notification from the table. This should only be
// received for keys that we requested notifications for.
auto subscribe_callback = [job_id, object_id, managers](gcs::AsyncGcsClient *client) {
// Request notifications, then cancel immediately. We should receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
RAY_CHECK_OK(client->object_table().CancelNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
// Append to the key. Since we canceled notifications, we should not
// receive a notification for these writes.
auto remaining = std::vector<std::string>(++managers.begin(), managers.end());
for (const auto &manager : remaining) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = manager;
RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, nullptr));
}
// Request notifications again. We should receive a notification for the
// current values at the key.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
};
// The callback for a notification from the object table.
auto notification_callback = [object_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id);
// Check that we only receive notifications for the key when we have
// requested notifications for it. We should not get a notification for the
// entry that began with "B" since we canceled notifications then.
if (test->NumCallbacks() == 1) {
ASSERT_EQ(data.managers.front(), "A");
} else {
ASSERT_EQ(data.managers.front(), "C");
}
test->IncrementNumCallbacks();
if (test->NumCallbacks() == 3) {
test->Stop();
}
};
// Subscribe to notifications for this client. This allows us to request and
// receive notifications for specific keys.
RAY_CHECK_OK(
client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
// callback is called for the requested key.
test->Start();
// Check that we received one callback for subscription success and two
// callbacks for the Add notifications.
ASSERT_EQ(test->NumCallbacks(), 3);
// Check that we received a notification callback for the first append to the
// key, then a notification for all of the appends, because we cancel
// notifications in between.
ASSERT_EQ(test->NumCallbacks(), managers.size() + 1);
}
TEST_F(TestGcsWithAe, TestSubscribeCancel) {
TEST_F(TestGcsWithAe, TestLogSubscribeCancel) {
test = this;
TestSubscribeCancel(job_id_, client_);
TestLogSubscribeCancel(job_id_, client_);
}
TEST_F(TestGcsWithAsio, TestSubscribeCancel) {
TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) {
test = this;
TestSubscribeCancel(job_id_, client_);
TestLogSubscribeCancel(job_id_, client_);
}
void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client_id,
+2 -4
View File
@@ -35,11 +35,9 @@ table FunctionTableData {
}
table ObjectTableData {
task_id: string;
object_size: long;
is_put: bool;
never_created: bool;
managers: [string];
manager: string;
is_eviction: bool;
}
enum SchedulingState:int {
+18 -19
View File
@@ -9,13 +9,13 @@ namespace gcs {
template <typename ID, typename Data>
Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> data, const Callback &done) {
std::shared_ptr<DataT> data, const WriteCallback &done) {
auto d = std::shared_ptr<CallbackData>(
new CallbackData({id, data, done, nullptr, this, client_}));
new CallbackData({id, data, nullptr, nullptr, this, client_}));
int64_t callback_index =
RedisCallbackManager::instance().add([d](const std::string &data) {
if (d->callback != nullptr) {
(d->callback)(d->client, d->id, {*d->data});
RedisCallbackManager::instance().add([d, done](const std::string &data) {
if (done != nullptr) {
(done)(d->client, d->id, d->data);
}
return true;
});
@@ -120,13 +120,13 @@ Status Log<ID, Data>::CancelNotifications(const JobID &job_id, const ID &id,
template <typename ID, typename Data>
Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> data, const Callback &done) {
std::shared_ptr<DataT> data, const WriteCallback &done) {
auto d = std::shared_ptr<CallbackData>(
new CallbackData({id, data, done, nullptr, this, client_}));
new CallbackData({id, data, nullptr, nullptr, this, client_}));
int64_t callback_index =
RedisCallbackManager::instance().add([d](const std::string &data) {
if (d->callback != nullptr) {
(d->callback)(d->client, d->id, *d->data);
RedisCallbackManager::instance().add([d, done](const std::string &data) {
if (done != nullptr) {
(done)(d->client, d->id, d->data);
}
return true;
});
@@ -232,8 +232,9 @@ void ClientTable::HandleNotification(AsyncGcsClient *client,
}
}
void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT &data) {
auto connected_client_id = ClientID::from_binary(data.client_id);
void ClientTable::HandleConnected(AsyncGcsClient *client,
const std::shared_ptr<ClientTableDataT> data) {
auto connected_client_id = ClientID::from_binary(data->client_id);
RAY_CHECK(client_id_ == connected_client_id) << connected_client_id.hex() << " "
<< client_id_.hex();
}
@@ -259,10 +260,9 @@ Status ClientTable::Connect() {
// Callback to handle our own successful connection once we've added
// ourselves.
auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &data) {
std::shared_ptr<ClientTableDataT> data) {
RAY_CHECK(log_key == client_log_key_);
RAY_CHECK(data.size() == 1);
HandleConnected(client, data[0]);
HandleConnected(client, data);
};
// Callback to add ourselves once we've successfully subscribed.
auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) {
@@ -282,9 +282,8 @@ Status ClientTable::Disconnect() {
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = true;
auto add_callback = [this](AsyncGcsClient *client, const ClientID &id,
const std::vector<ClientTableDataT> &data) {
RAY_CHECK(data.size() == 1);
HandleConnected(client, data[0]);
std::shared_ptr<ClientTableDataT> data) {
HandleConnected(client, data);
RAY_CHECK_OK(CancelNotifications(JobID::nil(), client_log_key_, id));
};
RAY_RETURN_NOT_OK(Append(JobID::nil(), client_log_key_, data, add_callback));
@@ -306,9 +305,9 @@ const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) {
}
template class Log<ObjectID, ObjectTableData>;
template class Log<TaskID, ray::protocol::Task>;
template class Table<TaskID, ray::protocol::Task>;
template class Table<TaskID, TaskTableData>;
template class Table<ObjectID, ObjectTableData>;
} // namespace gcs
+10 -5
View File
@@ -41,6 +41,9 @@ class Log {
using DataT = typename Data::NativeTableType;
using Callback = std::function<void(AsyncGcsClient *client, const ID &id,
const std::vector<DataT> &data)>;
/// The callback to call when a write to a key succeeds.
using WriteCallback = std::function<void(AsyncGcsClient *client, const ID &id,
std::shared_ptr<DataT> data)>;
/// The callback to call when a SUBSCRIBE call completes and we are ready to
/// request and receive notifications.
using SubscriptionCallback = std::function<void(AsyncGcsClient *client)>;
@@ -72,7 +75,7 @@ class Log {
/// GCS.
/// \return Status
Status Append(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data,
const Callback &done);
const WriteCallback &done);
/// Lookup the log values at a key asynchronously.
///
@@ -158,6 +161,7 @@ class Table : private Log<ID, Data> {
using DataT = typename Log<ID, Data>::DataT;
using Callback =
std::function<void(AsyncGcsClient *client, const ID &id, const DataT &data)>;
using WriteCallback = typename Log<ID, Data>::WriteCallback;
/// The callback to call when a Lookup call returns an empty entry.
using FailureCallback = std::function<void(AsyncGcsClient *client, const ID &id)>;
/// The callback to call when a Subscribe call completes and we are ready to
@@ -190,7 +194,7 @@ class Table : private Log<ID, Data> {
/// GCS.
/// \return Status
Status Add(const JobID &job_id, const ID &id, std::shared_ptr<DataT> data,
const Callback &done);
const WriteCallback &done);
/// Lookup an entry asynchronously.
///
@@ -214,10 +218,10 @@ class Table : private Log<ID, Data> {
using Log<ID, Data>::prefix_;
};
class ObjectTable : public Table<ObjectID, ObjectTableData> {
class ObjectTable : public Log<ObjectID, ObjectTableData> {
public:
ObjectTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
: Table(context, client) {
: Log(context, client) {
pubsub_channel_ = TablePubsub_OBJECT;
prefix_ = TablePrefix_OBJECT;
};
@@ -409,7 +413,8 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// Handle a client table notification.
void HandleNotification(AsyncGcsClient *client, const ClientTableDataT &notifications);
/// Handle this client's successful connection to the GCS.
void HandleConnected(AsyncGcsClient *client, const ClientTableDataT &notifications);
void HandleConnected(AsyncGcsClient *client,
const std::shared_ptr<ClientTableDataT> client_data);
/// The key at which the log of client information is stored. This key must
/// be kept the same across all instances of the ClientTable, so that all
+3 -3
View File
@@ -44,9 +44,9 @@ Status TaskTableAdd(AsyncGcsClient *gcs_client, Task *task) {
TaskSpec *spec = execution_spec.Spec();
auto data = MakeTaskTableData(execution_spec, Task_local_scheduler(task),
static_cast<SchedulingState>(Task_state(task)));
return gcs_client->task_table().Add(
ray::JobID::nil(), TaskSpec_task_id(spec), data,
[](gcs::AsyncGcsClient *client, const TaskID &id, const TaskTableDataT &data) {});
return gcs_client->task_table().Add(ray::JobID::nil(), TaskSpec_task_id(spec), data,
[](gcs::AsyncGcsClient *client, const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {});
}
// TODO(pcm): This is a helper method that should go away once we get rid of