Clean up Wait() in the core worker (#5628)

This commit is contained in:
Edward Oakes
2019-09-04 21:31:34 -07:00
committed by Philipp Moritz
parent bb5609afb3
commit f38bb288e2
8 changed files with 135 additions and 130 deletions
+53 -58
View File
@@ -125,90 +125,85 @@ Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &ids, int num
(*results).resize(ids.size(), false);
if (num_objects <= 0 || num_objects > static_cast<int>(ids.size())) {
return Status::Invalid("num_objects value is not valid");
return Status::Invalid(
"Number of objects to wait for must be between 1 and the number of ids.");
}
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
object_ids_per_store_provider;
GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider);
size_t total_count = 0;
for (const auto &entry : object_ids_per_store_provider) {
total_count += entry.second.size();
}
if (total_count != ids.size()) {
return Status::Invalid("Duplicate object IDs not supported in wait.");
}
// TODO(edoakes): this logic is not ideal, and will have to be addressed
// before we enable direct actor calls in the Python code. If we are waiting
// on a list of objects mixed between multiple store providers, we could
// easily end up in the situation where we're blocked waiting on one store
// provider while another actually has enough objects ready to fulfill
// 'num_objects'. This is partially addressed by trying them all once with
// a timeout of 0, but that does not address the situation where objects
// become available on the second store provider while waiting on the first.
std::unordered_set<ObjectID> ready;
// Wait from all the store providers with timeout set to 0. This is to avoid the case
// where we might use up the entire timeout on trying to get objects from one store
// provider before even trying another (which might have all of the objects available).
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider,
/* timeout_ms= */ 0, &num_objects,
results));
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider,
/*timeout_ms=*/0, &num_objects,
&ready));
if (num_objects > 0) {
// Wait from all the store providers with the specified timeout
// if the required number of objects haven't been ready yet.
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider,
/* timeout_ms= */ timeout_ms,
&num_objects, results));
RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider,
/*timeout_ms=*/timeout_ms,
&num_objects, &ready));
}
for (size_t i = 0; i < ids.size(); i++) {
if (ready.find(ids[i]) != ready.end()) {
(*results)[i] = true;
}
}
return Status::OK();
}
Status CoreWorkerObjectInterface::WaitFromMultipleStoreProviders(
const std::vector<ObjectID> &ids,
const EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
&ids_per_provider,
int64_t timeout_ms, int *num_objects, std::vector<bool> *results) {
std::unordered_map<ObjectID, int> object_counts;
for (const auto &entry : ids) {
auto iter = object_counts.find(entry);
if (iter == object_counts.end()) {
object_counts.emplace(entry, 1);
} else {
iter->second++;
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>> &ids_per_provider,
int64_t timeout_ms, int *num_objects, std::unordered_set<ObjectID> *ready) {
int64_t remaining_timeout_ms = timeout_ms;
for (auto &provider_entry : ids_per_provider) {
if (*num_objects <= 0) {
break;
}
int64_t start_time = current_time_ms();
int required_objects =
std::min(static_cast<int>(provider_entry.second.size()), *num_objects);
std::unordered_set<ObjectID> provider_ready;
RAY_RETURN_NOT_OK(store_providers_[provider_entry.first]->Wait(
provider_entry.second, required_objects, remaining_timeout_ms,
worker_context_.GetCurrentTaskID(), &provider_ready));
// Update num_objects and remove the ready objects from the list so they don't get
// double-counted.
*num_objects -= provider_ready.size();
for (const ObjectID &ready_id : provider_ready) {
ready->insert(ready_id);
provider_entry.second.erase(ready_id);
}
}
auto remaining_timeout_ms = timeout_ms;
for (const auto &entry : ids_per_provider) {
std::unordered_set<ObjectID> objects;
auto start_time = current_time_ms();
int required_objects = std::min(static_cast<int>(entry.second.size()), *num_objects);
RAY_RETURN_NOT_OK(WaitFromStoreProvider(entry.first, entry.second, required_objects,
remaining_timeout_ms, &objects));
if (remaining_timeout_ms > 0) {
int64_t duration = current_time_ms() - start_time;
remaining_timeout_ms =
std::max(static_cast<int64_t>(0), remaining_timeout_ms - duration);
}
for (const auto &entry : objects) {
*num_objects -= object_counts[entry];
}
for (size_t i = 0; i < ids.size(); i++) {
if (objects.count(ids[i]) > 0) {
(*results)[i] = true;
}
}
if (*num_objects <= 0) {
break;
}
}
return Status::OK();
};
Status CoreWorkerObjectInterface::WaitFromStoreProvider(
StoreProviderType type, const std::unordered_set<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms, std::unordered_set<ObjectID> *results) {
std::vector<ObjectID> ids(object_ids.begin(), object_ids.end());
if (!ids.empty()) {
std::vector<bool> objects;
RAY_RETURN_NOT_OK(store_providers_[type]->Wait(
ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), &objects));
RAY_CHECK(ids.size() == objects.size());
for (size_t i = 0; i < objects.size(); i++) {
if (objects[i]) {
(*results).insert(ids[i]);
}
}
}
return Status::OK();
+5 -20
View File
@@ -71,33 +71,18 @@ class CoreWorkerObjectInterface {
bool delete_creating_tasks);
private:
/// Helper function to get a list of objects from different store providers.
/// Helper function to get a set of objects from different store providers.
///
/// \param[in] object_ids IDs of the objects to get.
/// \param[in] ids_per_provider A map from store provider type to the set of
// object ids for that store provider.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1.
/// \param[in/out] num_objects Number of objects that should appear before returning.
/// \param[out] results A bitset that indicates each object has appeared or not.
/// Should be updated as objects are added to the ready set.
/// \param[in/out] results A set that holds objects that are ready.
/// \return Status.
Status WaitFromMultipleStoreProviders(
const std::vector<ObjectID> &object_ids,
const EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>>
&ids_per_provider,
int64_t timeout_ms, int *num_objects, std::vector<bool> *results);
/// Helper function to wait a list of objects from a specific store provider.
///
/// \param[in] type The type of store provider to use.
/// \param[in] object_ids IDs of the objects to wait for.
/// \param[in] num_objects Number of objects that should appear before returning.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[out] results A bitset that indicates each object has appeared or not.
/// \return Status.
Status WaitFromStoreProvider(StoreProviderType type,
const std::unordered_set<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
std::unordered_set<ObjectID> *results);
EnumUnorderedMap<StoreProviderType, std::unordered_set<ObjectID>> &ids_per_provider,
int64_t timeout_ms, int *num_objects, std::unordered_set<ObjectID> *results);
/// Create a new store provider for the specified type on demand.
std::unique_ptr<CoreWorkerStoreProvider> CreateStoreProvider(
@@ -44,18 +44,20 @@ Status CoreWorkerMemoryStoreProvider::Get(
return Status::OK();
}
Status CoreWorkerMemoryStoreProvider::Wait(const std::vector<ObjectID> &object_ids,
Status CoreWorkerMemoryStoreProvider::Wait(const std::unordered_set<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const TaskID &task_id,
std::vector<bool> *results) {
(*results).resize(object_ids.size(), false);
std::unordered_set<ObjectID> *ready) {
std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_CHECK(object_ids.size() == id_vector.size());
RAY_RETURN_NOT_OK(
store_->Get(object_ids, num_objects, timeout_ms, false, &result_objects));
RAY_CHECK(result_objects.size() == object_ids.size());
for (size_t i = 0; i < object_ids.size(); i++) {
(*results)[i] = (result_objects[i] != nullptr);
store_->Get(id_vector, num_objects, timeout_ms, false, &result_objects));
for (size_t i = 0; i < id_vector.size(); i++) {
if (result_objects[i] != nullptr) {
ready->insert(id_vector[i]);
}
}
return Status::OK();
@@ -29,9 +29,9 @@ class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider {
/// See `CoreWorkerStoreProvider::Wait` for semantics.
/// Note that `num_objects` must equal to number of items in `object_ids`.
Status Wait(const std::vector<ObjectID> &object_ids, int num_objects,
Status Wait(const std::unordered_set<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const TaskID &task_id,
std::vector<bool> *results) override;
std::unordered_set<ObjectID> *ready) override;
/// See `CoreWorkerStoreProvider::Delete` for semantics.
/// Note that `local_only` must be true, and `delete_creating_tasks` must be false here.
@@ -150,27 +150,20 @@ Status CoreWorkerPlasmaStoreProvider::Get(
return raylet_client_->NotifyUnblocked(task_id);
}
Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector<ObjectID> &object_ids,
Status CoreWorkerPlasmaStoreProvider::Wait(const std::unordered_set<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const TaskID &task_id,
std::vector<bool> *results) {
std::unordered_set<ObjectID> *ready) {
WaitResultPair result_pair;
auto status = raylet_client_->Wait(object_ids, num_objects, timeout_ms, false, task_id,
&result_pair);
std::unordered_set<ObjectID> ready_ids;
std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
RAY_RETURN_NOT_OK(raylet_client_->Wait(id_vector, num_objects, timeout_ms, false,
task_id, &result_pair));
for (const auto &entry : result_pair.first) {
ready_ids.insert(entry);
ready->insert(entry);
}
// TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't
// need
// to do this translation.
(*results).resize(object_ids.size());
for (size_t i = 0; i < object_ids.size(); i++) {
(*results)[i] = ready_ids.count(object_ids[i]) > 0;
}
return status;
return Status::OK();
}
Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector<ObjectID> &object_ids,
@@ -29,9 +29,9 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) override;
/// See `CoreWorkerStoreProvider::Wait` for semantics.
Status Wait(const std::vector<ObjectID> &object_ids, int num_objects,
Status Wait(const std::unordered_set<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const TaskID &task_id,
std::vector<bool> *results) override;
std::unordered_set<ObjectID> *ready) override;
/// See `CoreWorkerStoreProvider::Delete` for semantics.
Status Delete(const std::vector<ObjectID> &object_ids, bool local_only = true,
@@ -88,17 +88,19 @@ class CoreWorkerStoreProvider {
const TaskID &task_id,
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) = 0;
/// Wait for a list of objects to appear in the object store.
/// Wait for a list of objects to appear in the object store. Objects that appear will
/// be added to the ready set.
///
/// \param[in] object_ids IDs of the objects to wait for.
/// \param[in] num_objects Number of objects that should appear before returning.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[in] task_id ID for the current task.
/// \param[out] results A bitset that indicates each object has appeared or not.
/// \param[out] ready IDs of objects that have appeared. Wait will only add to this
/// set, not clear or remove from it, so the caller can pass in a non-empty set.
/// \return Status.
virtual Status Wait(const std::vector<ObjectID> &object_ids, int num_objects,
virtual Status Wait(const std::unordered_set<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const TaskID &task_id,
std::vector<bool> *results) = 0;
std::unordered_set<ObjectID> *ready) = 0;
/// Delete a list of objects from the object store.
///
+49 -21
View File
@@ -494,27 +494,21 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
RAY_CHECK_OK(provider.Put(buffers[i], ids[i]));
}
// Test Wait() with duplicate object ids.
std::vector<ObjectID> ids_with_duplicate;
ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end());
// add the same ids again to test `Get` with duplicate object ids.
ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end());
std::unordered_set<ObjectID> wait_ids(ids.begin(), ids.end());
std::unordered_set<ObjectID> wait_results;
std::vector<ObjectID> wait_ids(ids_with_duplicate);
ObjectID non_existent_id = ObjectID::FromRandom();
wait_ids.push_back(non_existent_id);
ObjectID nonexistent_id = ObjectID::FromRandom();
wait_ids.insert(nonexistent_id);
RAY_CHECK_OK(
provider.Wait(wait_ids, ids.size() + 1, 100, RandomTaskId(), &wait_results));
ASSERT_EQ(wait_results.size(), ids.size());
ASSERT_TRUE(wait_results.count(nonexistent_id) == 0);
std::vector<bool> wait_results;
RAY_CHECK_OK(provider.Wait(wait_ids, 5, 100, RandomTaskId(), &wait_results));
ASSERT_EQ(wait_results.size(), 5);
ASSERT_EQ(wait_results, std::vector<bool>({true, true, true, true, false}));
// Test Wait() with duplicate object ids, and the required `num_objects`
// is less than size of `wait_ids`.
// Test Wait() where the required `num_objects` is less than size of `wait_ids`.
wait_results.clear();
RAY_CHECK_OK(provider.Wait(wait_ids, 4, -1, RandomTaskId(), &wait_results));
ASSERT_EQ(wait_results.size(), 5);
ASSERT_EQ(wait_results, std::vector<bool>({true, true, true, true, false}));
RAY_CHECK_OK(provider.Wait(wait_ids, ids.size(), -1, RandomTaskId(), &wait_results));
ASSERT_EQ(wait_results.size(), ids.size());
ASSERT_TRUE(wait_results.count(nonexistent_id) == 0);
// Test Get().
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> results;
@@ -545,8 +539,11 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
ASSERT_EQ(results.size(), 0);
// Test Wait() with objects which will become ready later.
std::vector<ObjectID> ready_ids(buffers.size());
std::vector<ObjectID> unready_ids(buffers.size());
for (size_t i = 0; i < unready_ids.size(); i++) {
ready_ids[i] = ObjectID::FromRandom();
RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i]));
unready_ids[i] = ObjectID::FromRandom();
}
@@ -560,12 +557,43 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
std::thread async_thread(thread_func);
// wait for the objects to appear.
wait_ids.clear();
wait_ids.insert(ready_ids.begin(), ready_ids.end());
wait_ids.insert(unready_ids.begin(), unready_ids.end());
wait_results.clear();
// Check that only the ready ids are returned when timeout ends before thread runs.
RAY_CHECK_OK(
provider.Wait(unready_ids, unready_ids.size(), -1, RandomTaskId(), &wait_results));
// wait for the thread to finish.
provider.Wait(wait_ids, ready_ids.size() + 1, 100, RandomTaskId(), &wait_results));
ASSERT_EQ(ready_ids.size(), wait_results.size());
for (const auto &ready_id : ready_ids) {
ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end());
}
for (const auto &unready_id : unready_ids) {
ASSERT_TRUE(wait_results.find(unready_id) == wait_results.end());
}
wait_results.clear();
// Check that enough objects are returned after the thread inserts at least one object.
RAY_CHECK_OK(
provider.Wait(wait_ids, ready_ids.size() + 1, 5000, RandomTaskId(), &wait_results));
ASSERT_TRUE(wait_results.size() >= ready_ids.size() + 1);
for (const auto &ready_id : ready_ids) {
ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end());
}
wait_results.clear();
// Check that all objects are returned after the thread completes.
async_thread.join();
RAY_CHECK_OK(
provider.Wait(wait_ids, wait_ids.size(), -1, RandomTaskId(), &wait_results));
ASSERT_EQ(wait_results.size(), ready_ids.size() + unready_ids.size());
for (const auto &ready_id : ready_ids) {
ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end());
}
for (const auto &unready_id : unready_ids) {
ASSERT_TRUE(wait_results.find(unready_id) != wait_results.end());
}
}
class ZeroNodeTest : public CoreWorkerTest {