From 3d473600a87aa258486276f17a0e8ea784db0c04 Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Tue, 9 Jun 2020 10:10:49 -0700 Subject: [PATCH] [Core] Use Ray ObjectID in Plasma (#8852) * Use Ray ObjectIDs instead * remove unused code --- BUILD.bazel | 14 +++- src/ray/common/buffer.h | 6 +- src/ray/common/constants.h | 3 - src/ray/common/id.cc | 22 +---- src/ray/common/id.h | 11 --- .../store_provider/plasma_store_provider.cc | 18 ++--- src/ray/object_manager/object_buffer_pool.cc | 27 +++---- src/ray/object_manager/object_manager.cc | 3 +- .../object_store_notification_manager.cc | 12 ++- .../object_store_notification_manager.h | 2 - src/ray/object_manager/plasma/client.cc | 2 +- src/ray/object_manager/plasma/common.cc | 80 ------------------- src/ray/object_manager/plasma/common.h | 31 +------ ...org_apache_arrow_plasma_PlasmaClientJNI.cc | 2 +- src/ray/object_manager/plasma/protocol.cc | 70 ++++++++-------- src/ray/object_manager/plasma/store.cc | 14 ++-- .../test/object_manager_stress_test.cc | 8 +- .../test/object_manager_test.cc | 4 +- src/ray/raylet/node_manager.cc | 25 +++--- src/ray/raylet/node_manager.h | 3 +- .../raylet/object_manager_integration_test.cc | 4 +- 21 files changed, 101 insertions(+), 260 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 9c87666b8..fe69d9cad 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -295,14 +295,19 @@ cc_library( "@bazel_tools//src/conditions:windows": PROPAGATED_WINDOWS_DEFINES, "//conditions:default": [], }), + includes = [ + "src", + ], linkopts = PLASMA_LINKOPTS, - strip_include_prefix = "src", deps = [ ":common_fbs", ":plasma_fbs", ":platform_shims", + ":ray_common", + ":ray_util", "@arrow", "@com_github_google_glog//:glog", + "@msgpack", ], ) @@ -389,8 +394,10 @@ cc_library( "src/ray/thirdparty/dlmalloc.c", ], copts = PLASMA_COPTS, + includes = [ + "src", + ], linkopts = PLASMA_LINKOPTS, - strip_include_prefix = "src", deps = [ ":ae", ":plasma_client", @@ -459,8 +466,8 @@ cc_library( ":common_cc_proto", ":gcs_cc_proto", ":node_manager_fbs", - ":plasma_client", ":ray_util", + "@arrow", "@boost//:asio", "@com_github_grpc_grpc//:grpc++", "@com_google_absl//absl/container:flat_hash_map", @@ -1270,7 +1277,6 @@ cc_library( ], visibility = ["//visibility:public"], deps = [ - ":plasma_client", ":sha256", "@boost//:asio", "@com_github_google_glog//:glog", diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index 94c630855..3ccda8fc1 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -18,12 +18,8 @@ #include #include +#include "arrow/buffer.h" #include "ray/common/status.h" -#include "ray/object_manager/plasma/client.h" - -namespace arrow { -class Buffer; -} namespace ray { diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 096857e32..5b3d27a1b 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -21,9 +21,6 @@ /// Length of Ray full-length IDs in bytes. constexpr size_t kUniqueIDSize = 20; -/// Length of plasma ID in bytes. -constexpr size_t kPlasmaIdSize = 20; - /// An ObjectID's bytes are split into the task ID itself and the index of the /// object's creation. This is the maximum width of the object index in bits. constexpr int kObjectIdIndexSize = 32; diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 22939a21e..c3487f4b1 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -108,32 +108,12 @@ WorkerID ComputeDriverIdFromJob(const JobID &job_id) { std::string(reinterpret_cast(data.data()), data.size())); } -ObjectID ObjectID::FromPlasmaIdBinary(const std::string &from) { - RAY_CHECK(from.size() == kPlasmaIdSize); - return ObjectID::FromBinary(from.substr(0, ObjectID::kLength)); -} - -plasma::UniqueID ObjectID::ToPlasmaId() const { - static_assert(ObjectID::kLength <= kPlasmaIdSize, - "Currently length of ObjectID must be shorter than plasma's."); - - plasma::UniqueID result; - std::memcpy(result.mutable_data(), Data(), ObjectID::Size()); - std::fill_n(result.mutable_data() + ObjectID::Size(), kPlasmaIdSize - ObjectID::kLength, - 0xFF); - return result; -} - -ObjectID::ObjectID(const plasma::UniqueID &from) { - RAY_CHECK(from.size() <= static_cast(ObjectID::Size())) << "Out of size."; - std::memcpy(this->MutableData(), from.data(), ObjectID::Size()); -} - ObjectIDFlagsType ObjectID::GetFlags() const { ObjectIDFlagsType flags; std::memcpy(&flags, id_ + TaskID::kLength, sizeof(flags)); return flags; } + bool ObjectID::CreatedByTask() const { return ::ray::CreatedByTask(this->GetFlags()); } bool ObjectID::IsPutObject() const { diff --git a/src/ray/common/id.h b/src/ray/common/id.h index ab222f3c9..71047ab6f 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -26,7 +26,6 @@ #include #include "ray/common/constants.h" -#include "ray/object_manager/plasma/common.h" #include "ray/util/logging.h" #include "ray/util/util.h" #include "ray/util/visibility.h" @@ -274,16 +273,6 @@ class ObjectID : public BaseID { static size_t Size() { return kLength; } - /// Generate ObjectID by the given binary string of a plasma id. - /// - /// \param from The binary string of the given plasma id. - /// \return The ObjectID converted from a binary string of the plasma id. - static ObjectID FromPlasmaIdBinary(const std::string &from); - - plasma::ObjectID ToPlasmaId() const; - - ObjectID(const plasma::UniqueID &from); - /// Get the index of this object in the task that created it. /// /// \return The index of object creation according to the task that created diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 0fc6792a7..23a1cc045 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -92,7 +92,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta std::shared_ptr arrow_buffer; { std::lock_guard guard(store_client_mutex_); - plasma_status = store_client_.Create(object_id.ToPlasmaId(), data_size, + plasma_status = store_client_.Create(object_id, data_size, metadata ? metadata->Data() : nullptr, metadata ? metadata->Size() : 0, &arrow_buffer, /*device_num=*/0, evict_if_full); @@ -136,19 +136,17 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta } Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) { - auto plasma_id = object_id.ToPlasmaId(); { std::lock_guard guard(store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(object_id)); } return Status::OK(); } Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) { - auto plasma_id = object_id.ToPlasmaId(); { std::lock_guard guard(store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Release(object_id)); } return Status::OK(); } @@ -161,16 +159,10 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( RAY_RETURN_NOT_OK(raylet_client_->FetchOrReconstruct( batch_ids, fetch_only, /*mark_worker_blocked*/ !in_direct_call, task_id)); - std::vector plasma_batch_ids; - plasma_batch_ids.reserve(batch_ids.size()); - for (size_t i = 0; i < batch_ids.size(); i++) { - plasma_batch_ids.push_back(batch_ids[i].ToPlasmaId()); - } std::vector plasma_results; { std::lock_guard guard(store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK( - store_client_.Get(plasma_batch_ids, timeout_ms, &plasma_results)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results)); } // Add successfully retrieved objects to the result map and remove them from @@ -319,7 +311,7 @@ Status CoreWorkerPlasmaStoreProvider::Get( Status CoreWorkerPlasmaStoreProvider::Contains(const ObjectID &object_id, bool *has_object) { std::lock_guard guard(store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK(store_client_.Contains(object_id.ToPlasmaId(), has_object)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Contains(object_id, has_object)); return Status::OK(); } diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 06c16adb0..717cca08a 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -57,8 +57,7 @@ std::pair ObjectBufferPool::Ge std::lock_guard lock(pool_mutex_); if (get_buffer_state_.count(object_id) == 0) { plasma::ObjectBuffer object_buffer; - plasma::ObjectID plasma_id = object_id.ToPlasmaId(); - RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); + RAY_ARROW_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer)); if (object_buffer.data == nullptr) { RAY_LOG(ERROR) << "Failed to get object"; return std::pair( @@ -86,14 +85,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk GetBufferState &buffer_state = get_buffer_state_[object_id]; buffer_state.references--; if (buffer_state.references == 0) { - RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id)); get_buffer_state_.erase(object_id); } } void ObjectBufferPool::AbortGet(const ObjectID &object_id) { std::lock_guard lock(pool_mutex_); - RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId())); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id)); get_buffer_state_.erase(object_id); } @@ -102,12 +101,11 @@ std::pair ObjectBufferPool::Cr uint64_t chunk_index) { std::lock_guard lock(pool_mutex_); if (create_buffer_state_.count(object_id) == 0) { - const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); int64_t object_size = data_size - metadata_size; // Try to create shared buffer. std::shared_ptr data; arrow::Status s = - store_client_.Create(plasma_id, object_size, NULL, metadata_size, &data); + store_client_.Create(object_id, object_size, NULL, metadata_size, &data); std::vector buffer; if (!s.ok()) { // Create failed. The object may already exist locally. If something else went @@ -167,9 +165,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED; create_buffer_state_[object_id].num_seals_remaining--; if (create_buffer_state_[object_id].num_seals_remaining == 0) { - const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); - RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id)); - RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Seal(object_id)); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id)); create_buffer_state_.erase(object_id); RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id << ", last chunk index: " << chunk_index; @@ -177,9 +174,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { - const plasma::ObjectID plasma_id = object_id.ToPlasmaId(); - RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); - RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id)); + RAY_ARROW_CHECK_OK(store_client_.Release(object_id)); + RAY_ARROW_CHECK_OK(store_client_.Abort(object_id)); create_buffer_state_.erase(object_id); } @@ -202,13 +198,8 @@ std::vector ObjectBufferPool::BuildChunks( } void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { - std::vector plasma_ids; - plasma_ids.reserve(object_ids.size()); - for (const auto &id : object_ids) { - plasma_ids.push_back(id.ToPlasmaId()); - } std::lock_guard lock(pool_mutex_); - RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); + RAY_ARROW_CHECK_OK(store_client_.Delete(object_ids)); } std::string ObjectBufferPool::DebugString() const { diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 2b990b3b0..4e7aff038 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/object_manager/object_manager.h" + #include "ray/common/common_protocol.h" #include "ray/stats/stats.h" #include "ray/util/util.h" @@ -74,7 +75,7 @@ void ObjectManager::StopRpcService() { void ObjectManager::HandleObjectAdded( const object_manager::protocol::ObjectInfoT &object_info) { // Notify the object directory that the object has been added to this node. - ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); + ObjectID object_id = ObjectID::FromBinary(object_info.object_id); RAY_LOG(DEBUG) << "Object added " << object_id; RAY_CHECK(local_objects_.count(object_id) == 0); local_objects_[object_id].object_info = object_info; diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 79301e108..9d612b4c1 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -12,17 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include +#include "ray/object_manager/object_store_notification_manager.h" #include #include #include - -#include "ray/common/status.h" +#include +#include #include "ray/common/common_protocol.h" -#include "ray/object_manager/object_store_notification_manager.h" +#include "ray/common/status.h" #include "ray/util/util.h" #ifdef _WIN32 @@ -141,8 +140,7 @@ void ObjectStoreNotificationManager::ProcessStoreNotification( notification_.data()); for (size_t i = 0; i < object_notification->object_info()->size(); ++i) { auto object_info = object_notification->object_info()->Get(i); - const ObjectID object_id = - ObjectID::FromPlasmaIdBinary(object_info->object_id()->str()); + const ObjectID object_id = ObjectID::FromBinary(object_info->object_id()->str()); if (object_info->is_deletion()) { ProcessStoreRemove(object_id); } else { diff --git a/src/ray/object_manager/object_store_notification_manager.h b/src/ray/object_manager/object_store_notification_manager.h index 11e351dcc..af9ec010c 100644 --- a/src/ray/object_manager/object_store_notification_manager.h +++ b/src/ray/object_manager/object_store_notification_manager.h @@ -23,8 +23,6 @@ #include #include -#include "ray/object_manager/plasma/client.h" - #include "ray/common/client_connection.h" #include "ray/common/id.h" #include "ray/common/status.h" diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 2ae38e57c..6a0d6d3e3 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -1069,7 +1069,7 @@ Status PlasmaClient::Impl::DecodeNotifications(const uint8_t* buffer, for (size_t i = 0; i < object_info->object_info()->size(); ++i) { auto info = object_info->object_info()->Get(i); - ObjectID id = ObjectID::from_binary(info->object_id()->str()); + ObjectID id = ObjectID::FromBinary(info->object_id()->str()); object_ids->push_back(id); if (info->is_deletion()) { data_sizes->push_back(-1); diff --git a/src/ray/object_manager/plasma/common.cc b/src/ray/object_manager/plasma/common.cc index 7dd45f01f..84602507f 100644 --- a/src/ray/object_manager/plasma/common.cc +++ b/src/ray/object_manager/plasma/common.cc @@ -110,86 +110,6 @@ bool IsPlasmaStoreFull(const arrow::Status& status) { return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull); } -UniqueID UniqueID::from_binary(const std::string& binary) { - UniqueID id; - std::memcpy(&id, binary.data(), sizeof(id)); - return id; -} - -const uint8_t* UniqueID::data() const { return id_; } - -uint8_t* UniqueID::mutable_data() { return id_; } - -std::string UniqueID::binary() const { - return std::string(reinterpret_cast(id_), kUniqueIDSize); -} - -std::string UniqueID::hex() const { - constexpr char hex[] = "0123456789abcdef"; - std::string result; - for (int i = 0; i < kUniqueIDSize; i++) { - unsigned int val = id_[i]; - result.push_back(hex[val >> 4]); - result.push_back(hex[val & 0xf]); - } - return result; -} - -// This code is from https://sites.google.com/site/murmurhash/ -// and is public domain. -uint64_t MurmurHash64A(const void* key, int len, unsigned int seed) { - const uint64_t m = 0xc6a4a7935bd1e995; - const int r = 47; - - uint64_t h = seed ^ (len * m); - - const uint64_t* data = reinterpret_cast(key); - const uint64_t* end = data + (len / 8); - - while (data != end) { - uint64_t k = arrow::util::SafeLoad(data++); - - k *= m; - k ^= k >> r; - k *= m; - - h ^= k; - h *= m; - } - - const unsigned char* data2 = reinterpret_cast(data); - - switch (len & 7) { - case 7: - h ^= uint64_t(data2[6]) << 48; // fall through - case 6: - h ^= uint64_t(data2[5]) << 40; // fall through - case 5: - h ^= uint64_t(data2[4]) << 32; // fall through - case 4: - h ^= uint64_t(data2[3]) << 24; // fall through - case 3: - h ^= uint64_t(data2[2]) << 16; // fall through - case 2: - h ^= uint64_t(data2[1]) << 8; // fall through - case 1: - h ^= uint64_t(data2[0]); - h *= m; - } - - h ^= h >> r; - h *= m; - h ^= h >> r; - - return h; -} - -size_t UniqueID::hash() const { return MurmurHash64A(&id_[0], kUniqueIDSize, 0); } - -bool UniqueID::operator==(const UniqueID& rhs) const { - return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; -} - const PlasmaStoreInfo* plasma_config; } // namespace plasma diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index d77b177ae..c72a139c6 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -24,6 +24,7 @@ #include #include +#include "ray/common/id.h" #include "ray/object_manager/plasma/compat.h" #include "arrow/status.h" @@ -33,6 +34,8 @@ namespace plasma { +using ray::ObjectID; + enum class ObjectLocation : int32_t { Local, Remote, Nonexistent }; enum class PlasmaErrorCode : int8_t { @@ -52,27 +55,6 @@ ARROW_EXPORT bool IsPlasmaObjectAlreadySealed(const arrow::Status& status); /// Return true iff the status indicates the Plasma store reached its capacity limit. ARROW_EXPORT bool IsPlasmaStoreFull(const arrow::Status& status); -constexpr int64_t kUniqueIDSize = 20; - -class ARROW_EXPORT UniqueID { - public: - static UniqueID from_binary(const std::string& binary); - bool operator==(const UniqueID& rhs) const; - const uint8_t* data() const; - uint8_t* mutable_data(); - std::string binary() const; - std::string hex() const; - size_t hash() const; - static int64_t size() { return kUniqueIDSize; } - - private: - uint8_t id_[kUniqueIDSize]; -}; - -static_assert(std::is_pod::value, "UniqueID must be plain old data"); - -typedef UniqueID ObjectID; - /// Size of object hash digests. constexpr int64_t kDigestSize = sizeof(uint64_t); @@ -142,11 +124,4 @@ struct PlasmaStoreInfo; extern const PlasmaStoreInfo* plasma_config; } // namespace plasma -namespace std { -template <> -struct hash<::plasma::UniqueID> { - size_t operator()(const ::plasma::UniqueID& id) const { return id.hash(); } -}; -} // namespace std - #endif // PLASMA_COMMON_H diff --git a/src/ray/object_manager/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/src/ray/object_manager/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc index 3ff34069c..d2bc0c277 100644 --- a/src/ray/object_manager/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc +++ b/src/ray/object_manager/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc @@ -114,7 +114,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( if (plasma::IsPlasmaObjectExists(s)) { jclass exceptionClass = env->FindClass("org/apache/arrow/plasma/exceptions/DuplicateObjectException"); - env->ThrowNew(exceptionClass, oid.hex().c_str()); + env->ThrowNew(exceptionClass, oid.Hex().c_str()); return nullptr; } if (plasma::IsPlasmaStoreFull(s)) { diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 982627bd3..94c79ebb9 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -48,7 +48,7 @@ ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids, int64_t num_objects) { std::vector> results; for (int64_t i = 0; i < num_objects; i++) { - results.push_back(fbb->CreateString(object_ids[i].binary())); + results.push_back(fbb->CreateString(object_ids[i].Binary())); } return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); } @@ -187,7 +187,7 @@ Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full, int64_t data_size, int64_t metadata_size, int device_num) { flatbuffers::FlatBufferBuilder fbb; auto message = - fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()), + fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.Binary()), evict_if_full, data_size, metadata_size, device_num); return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message); } @@ -201,7 +201,7 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, *evict_if_full = message->evict_if_full(); *data_size = message->data_size(); *metadata_size = message->metadata_size(); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); *device_num = message->device_num(); return Status::OK(); } @@ -212,7 +212,7 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size, object->metadata_offset, object->metadata_size, object->device_num); - auto object_string = fbb.CreateString(object_id.binary()); + auto object_string = fbb.CreateString(object_id.Binary()); #ifdef PLASMA_CUDA flatbuffers::Offset ipc_handle; if (object->device_num != 0) { @@ -244,7 +244,7 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); object->store_fd = message->plasma_object()->segment_index(); object->data_offset = message->plasma_object()->data_offset(); object->data_size = message->plasma_object()->data_size(); @@ -271,7 +271,7 @@ Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_ flatbuffers::FlatBufferBuilder fbb; auto digest_string = fbb.CreateString(reinterpret_cast(digest), kDigestSize); auto message = fb::CreatePlasmaCreateAndSealRequest( - fbb, fbb.CreateString(object_id.binary()), evict_if_full, fbb.CreateString(data), + fbb, fbb.CreateString(object_id.Binary()), evict_if_full, fbb.CreateString(data), fbb.CreateString(metadata), digest_string); return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message); } @@ -283,7 +283,7 @@ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); *evict_if_full = message->evict_if_full(); *object_data = message->data()->str(); *metadata = message->metadata()->str(); @@ -320,7 +320,7 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, *evict_if_full = message->evict_if_full(); ConvertToVector(message->object_ids(), object_ids, [](const flatbuffers::String& element) { - return ObjectID::from_binary(element.str()); + return ObjectID::FromBinary(element.str()); }); ConvertToVector(message->data(), object_data, @@ -364,7 +364,7 @@ Status ReadCreateAndSealBatchReply(uint8_t* data, size_t size) { Status SendAbortRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary())); + auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.Binary())); return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message); } @@ -372,13 +372,13 @@ Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } Status SendAbortReply(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary())); + auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.Binary())); return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message); } @@ -386,7 +386,7 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -394,7 +394,7 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) { Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), + auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.Binary()), fbb.CreateString(digest)); return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message); } @@ -404,7 +404,7 @@ Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); ARROW_CHECK_EQ(message->digest()->size(), kDigestSize); digest->assign(message->digest()->data(), kDigestSize); return Status::OK(); @@ -413,7 +413,7 @@ Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id, Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) { flatbuffers::FlatBufferBuilder fbb; auto message = - fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error); + fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.Binary()), error); return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message); } @@ -421,7 +421,7 @@ Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return PlasmaErrorStatus(message->error()); } @@ -430,7 +430,7 @@ Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) { Status SendReleaseRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; auto message = - fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); + fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.Binary())); return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message); } @@ -438,14 +438,14 @@ Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) { flatbuffers::FlatBufferBuilder fbb; auto message = - fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error); + fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.Binary()), error); return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message); } @@ -453,7 +453,7 @@ Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return PlasmaErrorStatus(message->error()); } @@ -475,7 +475,7 @@ Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector* obje auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) { - return ObjectID::from_binary(request.object_ids()->Get(i)->str()); + return ObjectID::FromBinary(request.object_ids()->Get(i)->str()); }); return Status::OK(); } @@ -503,7 +503,7 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector* object auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) { - return ObjectID::from_binary(request.object_ids()->Get(i)->str()); + return ObjectID::FromBinary(request.object_ids()->Get(i)->str()); }); ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) { return static_cast(request.errors()->data()[i]); @@ -516,7 +516,7 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector* object Status SendContainsRequest(int sock, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; auto message = - fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary())); + fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.Binary())); return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message); } @@ -524,13 +524,13 @@ Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), + auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.Binary()), has_object); return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message); } @@ -540,7 +540,7 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); *has_object = message->has_object(); return Status::OK(); } @@ -563,7 +563,7 @@ Status SendListReply(int sock, const ObjectTable& objects) { ? fbb.CreateString("") : fbb.CreateString(reinterpret_cast(entry.second->digest), kDigestSize); - auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()), + auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.Binary()), entry.second->data_size, entry.second->metadata_size, entry.second->ref_count, entry.second->create_time, entry.second->construct_duration, digest); @@ -580,7 +580,7 @@ Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects) { auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); for (auto const& object : *message->objects()) { - ObjectID object_id = ObjectID::from_binary(object->object_id()->str()); + ObjectID object_id = ObjectID::FromBinary(object->object_id()->str()); auto entry = std::unique_ptr(new ObjectTableEntry()); entry->data_size = object->data_size(); entry->metadata_size = object->metadata_size(); @@ -665,7 +665,7 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); - object_ids.push_back(ObjectID::from_binary(object_id)); + object_ids.push_back(ObjectID::FromBinary(object_id)); } *timeout_ms = message->timeout_ms(); return Status::OK(); @@ -712,7 +712,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], #endif DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { - object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); + object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str()); } for (uoffset_t i = 0; i < num_objects; ++i) { const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); @@ -753,7 +753,7 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po flatbuffers::FlatBufferBuilder fbb; auto addr = fbb.CreateString(address, strlen(address)); auto message = - fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port); + fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.Binary()), addr, port); return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message); } @@ -763,7 +763,7 @@ Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** a auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); DCHECK(message->object_id()->size() == sizeof(ObjectID)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); #ifdef _WIN32 *address = _strdup(message->address()->c_str()); #else @@ -776,7 +776,7 @@ Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** a Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) { flatbuffers::FlatBufferBuilder fbb; - auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()), + auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.Binary()), object_size, metadata_size); return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message); } @@ -786,7 +786,7 @@ Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id, DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(VerifyFlatbuffer(message, data, size)); - *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_id = ObjectID::FromBinary(message->object_id()->str()); *object_size = static_cast(message->object_size()); *metadata_size = static_cast(message->metadata_size()); return Status::OK(); @@ -810,7 +810,7 @@ Status ReadRefreshLRURequest(uint8_t* data, size_t size, DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); - object_ids->push_back(ObjectID::from_binary(object_id)); + object_ids->push_back(ObjectID::FromBinary(object_id)); } return Status::OK(); } diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 77baa81c8..72d4f6d52 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -221,7 +221,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f int64_t data_size, int64_t metadata_size, int device_num, Client* client, PlasmaObject* result) { - ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); + ARROW_LOG(DEBUG) << "creating object " << object_id.Hex(); auto entry = GetObjectTableEntry(&store_info_, object_id); if (entry != nullptr) { @@ -240,7 +240,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f pointer = AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true); if (!pointer) { - ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() + ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex() << ", data_size=" << data_size << ", metadata_size=" << metadata_size << ", will send a reply of PlasmaError::OutOfMemory"; @@ -603,7 +603,7 @@ void PlasmaStore::SealObjects(const std::vector& object_ids, // Set object construction duration. entry->construct_duration = std::time(nullptr) - entry->create_time; - object_info.object_id = object_ids[i].binary(); + object_info.object_id = object_ids[i].Binary(); object_info.data_size = entry->data_size; object_info.metadata_size = entry->metadata_size; object_info.digest = digests[i]; @@ -663,7 +663,7 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. fb::ObjectInfoT notification; - notification.object_id = object_id.binary(); + notification.object_id = object_id.Binary(); notification.is_deletion = true; PushNotification(¬ification); @@ -678,7 +678,7 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { std::vector> evicted_object_data; std::vector evicted_entries; for (const auto& object_id : object_ids) { - ARROW_LOG(DEBUG) << "evicting object " << object_id.hex(); + ARROW_LOG(DEBUG) << "evicting object " << object_id.Hex(); auto entry = GetObjectTableEntry(&store_info_, object_id); // TODO(rkn): This should probably not fail, but should instead throw an // error. Maybe we should also support deleting objects that have been @@ -702,7 +702,7 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. fb::ObjectInfoT notification; - notification.object_id = object_id.binary(); + notification.object_id = object_id.Binary(); notification.is_deletion = true; PushNotification(¬ification); } @@ -910,7 +910,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) { for (const auto& entry : store_info_.objects) { if (entry.second->state == ObjectState::PLASMA_SEALED) { ObjectInfoT info; - info.object_id = entry.first.binary(); + info.object_id = entry.first.Binary(); info.data_size = entry.second->data_size; info.metadata_size = entry.second->metadata_size; info.digest = diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 26a1783a1..b4c735ead 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -150,8 +150,8 @@ class TestObjectManagerBase : public ::testing::Test { int64_t metadata_size = sizeof(metadata); std::shared_ptr data; RAY_ARROW_CHECK_OK( - client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); + client.Create(object_id, data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id)); return object_id; } @@ -270,7 +270,7 @@ class StressTestObjectManager : public TestObjectManagerBase { plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) { plasma::ObjectBuffer object_buffer; - plasma::ObjectID plasma_id = object_id.ToPlasmaId(); + plasma::ObjectID plasma_id = object_id; RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer)); return object_buffer; } @@ -278,7 +278,7 @@ class StressTestObjectManager : public TestObjectManagerBase { static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) { const int64_t size = sizeof(uint64_t); static unsigned char digest_1[size]; - RAY_ARROW_CHECK_OK(client.Hash(object_id.ToPlasmaId(), &digest_1[0])); + RAY_ARROW_CHECK_OK(client.Hash(object_id, &digest_1[0])); return digest_1; } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 3c45a3902..f7f74a76e 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -149,8 +149,8 @@ class TestObjectManagerBase : public ::testing::Test { int64_t metadata_size = sizeof(metadata); std::shared_ptr data; RAY_ARROW_CHECK_OK( - client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); + client.Create(object_id, data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id)); return object_id; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 55f968f1d..441a76df1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -171,7 +171,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK(object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); + ObjectID object_id = ObjectID::FromBinary(object_info.object_id); HandleObjectLocal(object_id); })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( @@ -2086,9 +2086,9 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ num_returns -= 1; } // Determine which IDs should be marked as failed. - std::vector objects_to_fail; + std::vector objects_to_fail; for (int64_t i = 0; i < num_returns; i++) { - objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); + objects_to_fail.push_back(spec.ReturnId(i)); } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); @@ -2102,7 +2102,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ } void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, - const std::vector objects_to_fail, + const std::vector objects_to_fail, const JobID &job_id) { const std::string meta = std::to_string(static_cast(error_type)); for (const auto &object_id : objects_to_fail) { @@ -2887,8 +2887,8 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, << "by the redis LRU configuration. Consider increasing the memory " "allocation via " << "ray.init(redis_max_memory=)."; - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, - {required_object_id.ToPlasmaId()}, JobID::Nil()); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, + JobID::Nil()); } })); } @@ -2925,8 +2925,7 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(), task.GetTaskSpecification().JobId()); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, - {required_object_id.ToPlasmaId()}, + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, task.GetTaskSpecification().JobId()); return; } @@ -3286,7 +3285,7 @@ void NodeManager::ProcessSubscribePlasmaReady( ray::Status NodeManager::SetupPlasmaSubscription() { return object_manager_.SubscribeObjAdded( [this](const object_manager::protocol::ObjectInfoT &object_info) { - ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); + ObjectID object_id = ObjectID::FromBinary(object_info.object_id); auto waiting_workers = absl::flat_hash_set>(); { absl::MutexLock guard(&plasma_object_notification_lock_); @@ -3396,10 +3395,10 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, // the returned buffer. // NOTE: the caller must ensure that the objects already exist in plasma before // sending a PinObjectIDs request. - std::vector plasma_ids; - plasma_ids.reserve(request.object_ids_size()); + std::vector object_ids; + object_ids.reserve(request.object_ids_size()); for (const auto &object_id_binary : request.object_ids()) { - plasma_ids.push_back(plasma::ObjectID::from_binary(object_id_binary)); + object_ids.push_back(ObjectID::FromBinary(object_id_binary)); } std::vector plasma_results; // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not @@ -3407,7 +3406,7 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, // heavy load, then this request can still block the NodeManager event loop // since we must wait for the plasma store's reply. We should consider using // an `AsyncGet` instead. - if (!store_client_.Get(plasma_ids, /*timeout_ms=*/0, &plasma_results).ok()) { + if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) { RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); return; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 558df2bee..68d37f70d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -231,8 +231,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param object_ids The object ids to store error messages into. /// \param job_id The optional job to push errors to if the writes fail. void MarkObjectsAsFailed(const ErrorType &error_type, - const std::vector object_ids, - const JobID &job_id); + const std::vector object_ids, const JobID &job_id); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 707724fe2..9a4888959 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -112,8 +112,8 @@ class TestObjectManagerBase : public ::testing::Test { int64_t metadata_size = sizeof(metadata); std::shared_ptr data; RAY_ARROW_CHECK_OK( - client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data)); - RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId())); + client.Create(object_id, data_size, metadata, metadata_size, &data)); + RAY_ARROW_CHECK_OK(client.Seal(object_id)); return object_id; }