diff --git a/python/ray/serialization.py b/python/ray/serialization.py index c5d2f7293..39663e4e4 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -298,7 +298,10 @@ class SerializationContext(object): except pyarrow.DeserializationCallbackError: raise DeserializationError() else: - # Object isn't available in plasma. + # Object isn't available in plasma. This should never be returned + # to the user. We should only reach this line if this object was + # deserialized as part of a list, and another object in the list + # throws an exception. return plasma.ObjectNotAvailable def _store_and_register_pyarrow(self, value, depth=100): diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 9d55133bb..d7ec7912a 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -2213,7 +2213,8 @@ def test_actor_eviction(ray_start_object_store_memory): num_evicted, num_success = 0, 0 for obj in objects: try: - ray.get(obj) + val = ray.get(obj) + assert isinstance(val, np.ndarray), val num_success += 1 except ray.exceptions.UnreconstructableError: num_evicted += 1 diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index bab5e5fbc..6653b51d6 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -351,35 +351,37 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m worker_context_, &result_map, &got_exception)); } - // If any of the objects have been promoted to plasma, then we retry their - // gets at the provider plasma. Once we get the objects from plasma, we flip - // the transport type again and return them for the original direct call ids. - absl::flat_hash_set promoted_plasma_ids; - for (const auto &pair : result_map) { - if (pair.second->IsInPlasmaError()) { - promoted_plasma_ids.insert(pair.first.WithTransportType(TaskTransportType::RAYLET)); - } - } - if (!promoted_plasma_ids.empty()) { - int64_t local_timeout_ms = timeout_ms; - if (timeout_ms >= 0) { - local_timeout_ms = std::max(static_cast(0), - timeout_ms - (current_time_ms() - start_time)); - } - RAY_RETURN_NOT_OK(plasma_store_provider_->Get(promoted_plasma_ids, local_timeout_ms, - worker_context_.GetCurrentTaskID(), - &result_map, &got_exception)); - for (const auto &id : promoted_plasma_ids) { - auto it = result_map.find(id); - if (it == result_map.end()) { - result_map.erase(id.WithTransportType(TaskTransportType::DIRECT)); - } else { - result_map[id.WithTransportType(TaskTransportType::DIRECT)] = it->second; - } - result_map.erase(id); - } + if (!got_exception) { + // If any of the objects have been promoted to plasma, then we retry their + // gets at the provider plasma. Once we get the objects from plasma, we flip + // the transport type again and return them for the original direct call ids. + absl::flat_hash_set promoted_plasma_ids; for (const auto &pair : result_map) { - RAY_CHECK(!pair.second->IsInPlasmaError()); + if (pair.second->IsInPlasmaError()) { + RAY_LOG(DEBUG) << pair.first << " in plasma, doing fetch-and-get"; + promoted_plasma_ids.insert( + pair.first.WithTransportType(TaskTransportType::RAYLET)); + } + } + if (!promoted_plasma_ids.empty()) { + int64_t local_timeout_ms = timeout_ms; + if (timeout_ms >= 0) { + local_timeout_ms = std::max(static_cast(0), + timeout_ms - (current_time_ms() - start_time)); + } + RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms; + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(promoted_plasma_ids, local_timeout_ms, + worker_context_.GetCurrentTaskID(), + &result_map, &got_exception)); + for (const auto &id : promoted_plasma_ids) { + auto it = result_map.find(id); + if (it == result_map.end()) { + result_map.erase(id.WithTransportType(TaskTransportType::DIRECT)); + } else { + result_map[id.WithTransportType(TaskTransportType::DIRECT)] = it->second; + } + result_map.erase(id); + } } } @@ -387,11 +389,27 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m // this ensures that entries `results` have exactly the same order as // they are in `ids`. When there are duplicate object ids, all the entries // for the same id are filled in. + bool missing_result = false; + bool will_throw_exception = false; for (size_t i = 0; i < ids.size(); i++) { - if (result_map.find(ids[i]) != result_map.end()) { - (*results)[i] = result_map[ids[i]]; + auto pair = result_map.find(ids[i]); + if (pair != result_map.end()) { + (*results)[i] = pair->second; + RAY_CHECK(!pair->second->IsInPlasmaError()); + if (pair->second->IsException()) { + // The language bindings should throw an exception if they see this + // object. + will_throw_exception = true; + } + } else { + missing_result = true; } } + // If no timeout was set and none of the results will throw an exception, + // then check that we fetched all results before returning. + if (timeout_ms >= 0 && !will_throw_exception) { + RAY_CHECK(!missing_result); + } return Status::OK(); } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index e8525f788..575d03de2 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -312,6 +312,52 @@ Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, } } +Status CoreWorkerMemoryStore::Get( + const absl::flat_hash_set &object_ids, int64_t timeout_ms, + const WorkerContext &ctx, + absl::flat_hash_map> *results, + bool *got_exception) { + const std::vector id_vector(object_ids.begin(), object_ids.end()); + std::vector> result_objects; + RAY_RETURN_NOT_OK( + Get(id_vector, id_vector.size(), timeout_ms, ctx, true, &result_objects)); + + for (size_t i = 0; i < id_vector.size(); i++) { + if (result_objects[i] != nullptr) { + (*results)[id_vector[i]] = result_objects[i]; + if (result_objects[i]->IsException() && !result_objects[i]->IsInPlasmaError()) { + // Can return early if an object value contains an exception. + // InPlasmaError does not count as an exception because then the object + // value should then be found in plasma. + *got_exception = true; + } + } + } + return Status::OK(); +} + +Status CoreWorkerMemoryStore::Wait(const absl::flat_hash_set &object_ids, + int num_objects, int64_t timeout_ms, + const WorkerContext &ctx, + absl::flat_hash_set *ready) { + std::vector id_vector(object_ids.begin(), object_ids.end()); + std::vector> result_objects; + RAY_CHECK(object_ids.size() == id_vector.size()); + auto status = Get(id_vector, num_objects, timeout_ms, ctx, false, &result_objects); + // Ignore TimedOut statuses since we return ready objects explicitly. + if (!status.IsTimedOut()) { + RAY_RETURN_NOT_OK(status); + } + + for (size_t i = 0; i < id_vector.size(); i++) { + if (result_objects[i] != nullptr) { + ready->insert(id_vector[i]); + } + } + + return Status::OK(); +} + void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set &object_ids) { absl::MutexLock lock(&mu_); for (const auto &object_id : object_ids) { diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index ab6fe592a..71c1bed03 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -57,44 +57,12 @@ class CoreWorkerMemoryStore { Status Get(const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, - bool *got_exception) { - const std::vector id_vector(object_ids.begin(), object_ids.end()); - std::vector> result_objects; - RAY_RETURN_NOT_OK( - Get(id_vector, id_vector.size(), timeout_ms, ctx, true, &result_objects)); - - for (size_t i = 0; i < id_vector.size(); i++) { - if (result_objects[i] != nullptr) { - (*results)[id_vector[i]] = result_objects[i]; - if (result_objects[i]->IsException()) { - *got_exception = true; - } - } - } - return Status::OK(); - } + bool *got_exception); /// Convenience wrapper around Get() that stores ready objects in a given result set. Status Wait(const absl::flat_hash_set &object_ids, int num_objects, int64_t timeout_ms, const WorkerContext &ctx, - absl::flat_hash_set *ready) { - std::vector id_vector(object_ids.begin(), object_ids.end()); - std::vector> result_objects; - RAY_CHECK(object_ids.size() == id_vector.size()); - auto status = Get(id_vector, num_objects, timeout_ms, ctx, false, &result_objects); - // Ignore TimedOut statuses since we return ready objects explicitly. - if (!status.IsTimedOut()) { - RAY_RETURN_NOT_OK(status); - } - - for (size_t i = 0; i < id_vector.size(); i++) { - if (result_objects[i] != nullptr) { - ready->insert(id_vector[i]); - } - } - - return Status::OK(); - } + absl::flat_hash_set *ready); /// Asynchronously get an object from the object store. The object will not be removed /// from storage after GetAsync (TODO(ekl): integrate this with object GC). diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 9df34f040..611856990 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -117,6 +117,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( (*results)[object_id] = result_object; remaining.erase(object_id); if (result_object->IsException()) { + RAY_CHECK(!result_object->IsInPlasmaError()); *got_exception = true; } }