diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index ce8ebee56..732cf4bd7 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -34,7 +34,7 @@ void AbstractRayRuntime::Put(std::shared_ptr data, ObjectID AbstractRayRuntime::Put(std::shared_ptr data) { ObjectID object_id = - ObjectID::ForPut(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex()); + ObjectID::FromIndex(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex()); Put(data, object_id); return object_id; } diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 280a642c6..6603dc1b6 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -43,7 +43,7 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(invocation.actor_id); const ObjectID actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(actor_creation_task_id, 1); + ObjectID::FromIndex(actor_creation_task_id, 1); builder.SetActorTaskSpec(invocation.actor_id, actor_creation_dummy_object_id, ObjectID(), invocation.actor_counter); } else { diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 22cbacac8..a33505e90 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -139,11 +139,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: const CObjectID Nil() @staticmethod - CObjectID ForPut(const CTaskID &task_id, int64_t index, - int64_t transport_type) - - @staticmethod - CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index) + CObjectID FromIndex(const CTaskID &task_id, int64_t index) @staticmethod size_t Size() diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 1fdc0e863..5c9901040 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -221,7 +221,7 @@ cdef class JobID(BaseID): @classmethod def from_int(cls, value): - assert value < 65536, "Maximum JobID integer is 65535." + assert value < 2**32, "Maximum JobID integer is 2**32 - 1." return cls(CJobID.FromInt(value).Binary()) @classmethod diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 330225a41..38af8aec3 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -52,46 +52,6 @@ std::string GenerateUniqueBytes(const JobID &job_id, const TaskID &parent_task_i return std::string(buff, buff + length); } -namespace { - -/// The bit offset of the flag `CreatedByTask` in a flags bytes. -constexpr uint8_t kCreatedByTaskBitsOffset = 15; - -/// The bit offset of the flag `ObjectType` in a flags bytes. -constexpr uint8_t kObjectTypeBitsOffset = 14; - -/// The mask that is used to mask the flag `CreatedByTask`. -constexpr ObjectIDFlagsType kCreatedByTaskFlagBitMask = 0x1 << kCreatedByTaskBitsOffset; - -/// The mask that is used to mask a bit to indicates the type of this object. -/// So it can represent for 2 types. -constexpr ObjectIDFlagsType kObjectTypeFlagBitMask = 0x1 << kObjectTypeBitsOffset; - -/// The implementations of helper functions. -inline void SetCreatedByTaskFlag(bool created_by_task, ObjectIDFlagsType *flags) { - const ObjectIDFlagsType object_type_bits = - static_cast(created_by_task) << kCreatedByTaskBitsOffset; - *flags = (*flags | object_type_bits); -} - -inline void SetObjectTypeFlag(ObjectType object_type, ObjectIDFlagsType *flags) { - const ObjectIDFlagsType object_type_bits = static_cast(object_type) - << kObjectTypeBitsOffset; - *flags = (*flags | object_type_bits); -} - -inline bool CreatedByTask(ObjectIDFlagsType flags) { - return ((flags & kCreatedByTaskFlagBitMask) >> kCreatedByTaskBitsOffset) != 0x0; -} - -inline ObjectType GetObjectType(ObjectIDFlagsType flags) { - const ObjectIDFlagsType object_type = - (flags & kObjectTypeFlagBitMask) >> kObjectTypeBitsOffset; - return static_cast(object_type); -} - -} // namespace - template void FillNil(T *data) { RAY_CHECK(data != nullptr); @@ -108,22 +68,6 @@ WorkerID ComputeDriverIdFromJob(const JobID &job_id) { std::string(reinterpret_cast(data.data()), data.size())); } -ObjectIDFlagsType ObjectID::GetFlags() const { - ObjectIDFlagsType flags; - std::memcpy(&flags, id_ + TaskID::kLength, sizeof(flags)); - return flags; -} - -bool ObjectID::CreatedByTask() const { return ::ray::CreatedByTask(this->GetFlags()); } - -bool ObjectID::IsPutObject() const { - return ::ray::GetObjectType(this->GetFlags()) == ObjectType::PUT_OBJECT; -} - -bool ObjectID::IsReturnObject() const { - return ::ray::GetObjectType(this->GetFlags()) == ObjectType::RETURN_OBJECT; -} - // This code is from https://sites.google.com/site/murmurhash/ // and is public domain. uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) { @@ -256,65 +200,41 @@ TaskID ObjectID::TaskId() const { std::string(reinterpret_cast(id_), TaskID::Size())); } -ObjectID ObjectID::ForPut(const TaskID &task_id, ObjectIDIndexType put_index) { - RAY_CHECK(put_index >= 1 && put_index <= kMaxObjectIndex) << "index=" << put_index; - - ObjectIDFlagsType flags = 0x0000; - SetCreatedByTaskFlag(true, &flags); - SetObjectTypeFlag(ObjectType::PUT_OBJECT, &flags); - - return GenerateObjectId(task_id.Binary(), flags, put_index); -} - ObjectIDIndexType ObjectID::ObjectIndex() const { ObjectIDIndexType index; - std::memcpy(&index, id_ + TaskID::kLength + kFlagsBytesLength, sizeof(index)); + std::memcpy(&index, id_ + TaskID::kLength, sizeof(index)); return index; } -ObjectID ObjectID::ForTaskReturn(const TaskID &task_id, ObjectIDIndexType return_index) { - RAY_CHECK(return_index >= 1 && return_index <= kMaxObjectIndex) - << "index=" << return_index; +ObjectID ObjectID::FromIndex(const TaskID &task_id, ObjectIDIndexType index) { + RAY_CHECK(index >= 1 && index <= kMaxObjectIndex) << "index=" << index; - ObjectIDFlagsType flags = 0x0000; - SetCreatedByTaskFlag(true, &flags); - SetObjectTypeFlag(ObjectType::RETURN_OBJECT, &flags); - - return GenerateObjectId(task_id.Binary(), flags, return_index); + return GenerateObjectId(task_id.Binary(), index); } ObjectID ObjectID::FromRandom() { - ObjectIDFlagsType flags = 0x0000; - SetCreatedByTaskFlag(false, &flags); - // No need to set transport type for a random object id. - // No need to assign put_index/return_index bytes. std::vector task_id_bytes(TaskID::kLength, 0x0); FillRandom(&task_id_bytes); - return GenerateObjectId( - std::string(reinterpret_cast(task_id_bytes.data()), - task_id_bytes.size()), - flags); + return GenerateObjectId(std::string( + reinterpret_cast(task_id_bytes.data()), task_id_bytes.size())); } ObjectID ObjectID::ForActorHandle(const ActorID &actor_id) { - return ObjectID::ForTaskReturn(TaskID::ForActorCreationTask(actor_id), - /*return_index=*/1); + return ObjectID::FromIndex(TaskID::ForActorCreationTask(actor_id), + /*return_index=*/1); } ObjectID ObjectID::GenerateObjectId(const std::string &task_id_binary, - ObjectIDFlagsType flags, ObjectIDIndexType object_index) { RAY_CHECK(task_id_binary.size() == TaskID::Size()); ObjectID ret; std::memcpy(ret.id_, task_id_binary.c_str(), TaskID::kLength); - std::memcpy(ret.id_ + TaskID::kLength, &flags, sizeof(flags)); - std::memcpy(ret.id_ + TaskID::kLength + kFlagsBytesLength, &object_index, - sizeof(object_index)); + std::memcpy(ret.id_ + TaskID::kLength, &object_index, sizeof(object_index)); return ret; } -JobID JobID::FromInt(uint16_t value) { +JobID JobID::FromInt(uint32_t value) { std::vector data(JobID::Size(), 0); std::memcpy(data.data(), &value, JobID::Size()); return JobID::FromBinary( diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 391162196..53be38a4c 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -42,16 +42,6 @@ class JobID; /// A helper function that get the `DriverID` of the given job. WorkerID ComputeDriverIdFromJob(const JobID &job_id); -/// The type of this object. `PUT_OBJECT` indicates this object -/// is generated through `ray.put` during the task's execution. -/// And `RETURN_OBJECT` indicates this object is the return value -/// of a task. -enum class ObjectType : uint8_t { - PUT_OBJECT = 0x0, - RETURN_OBJECT = 0x1, -}; - -using ObjectIDFlagsType = uint16_t; using ObjectIDIndexType = uint32_t; // Declaration. uint64_t MurmurHash64A(const void *key, int len, unsigned int seed); @@ -113,9 +103,9 @@ class UniqueID : public BaseID { class JobID : public BaseID { public: - static constexpr int64_t kLength = 2; + static constexpr int64_t kLength = 4; - static JobID FromInt(uint16_t value); + static JobID FromInt(uint32_t value); static size_t Size() { return kLength; } @@ -250,15 +240,12 @@ class ObjectID : public BaseID { private: static constexpr size_t kIndexBytesLength = sizeof(ObjectIDIndexType); - static constexpr size_t kFlagsBytesLength = sizeof(ObjectIDFlagsType); - public: /// The maximum number of objects that can be returned or put by a task. static constexpr int64_t kMaxObjectIndex = ((int64_t)1 << kObjectIdIndexSize) - 1; /// The length of ObjectID in bytes. - static constexpr size_t kLength = - kIndexBytesLength + kFlagsBytesLength + TaskID::kLength; + static constexpr size_t kLength = kIndexBytesLength + TaskID::kLength; ObjectID() : BaseID() {} @@ -282,36 +269,14 @@ class ObjectID : public BaseID { /// \return The task ID of the task that created this object. TaskID TaskId() const; - /// Whether this object is created by a task. - /// - /// \return True if this object is created by a task, otherwise false. - bool CreatedByTask() const; - - /// Whether this object was created through `ray.put`. - /// - /// \return True if this object was created through `ray.put`. - bool IsPutObject() const; - - /// Whether this object was created as a return object of a task. - /// - /// \return True if this object is a return value of a task. - bool IsReturnObject() const; - - /// Compute the object ID of an object put by the task. + /// Compute the object ID of an object created by a task, either via an object put + /// within the task or by being a task return object. /// /// \param task_id The task ID of the task that created the object. - /// \param index What index of the object put in the task. + /// \param index The index of the object created by the task. /// /// \return The computed object ID. - static ObjectID ForPut(const TaskID &task_id, ObjectIDIndexType put_index); - - /// Compute the object ID of an object returned by the task. - /// - /// \param task_id The task ID of the task that created the object. - /// \param return_index What index of the object returned by in the task. - /// - /// \return The computed object ID. - static ObjectID ForTaskReturn(const TaskID &task_id, ObjectIDIndexType return_index); + static ObjectID FromIndex(const TaskID &task_id, ObjectIDIndexType index); /// Create an object id randomly. /// @@ -334,12 +299,8 @@ class ObjectID : public BaseID { private: /// A helper method to generate an ObjectID. static ObjectID GenerateObjectId(const std::string &task_id_binary, - ObjectIDFlagsType flags, ObjectIDIndexType object_index = 0); - /// Get the flags out of this object id. - ObjectIDFlagsType GetFlags() const; - private: uint8_t id_[kLength]; }; diff --git a/src/ray/common/id_test.cc b/src/ray/common/id_test.cc index dbff2638e..926e6fbd1 100644 --- a/src/ray/common/id_test.cc +++ b/src/ray/common/id_test.cc @@ -18,32 +18,19 @@ namespace ray { -void TestReturnObjectId(const TaskID &task_id, int64_t return_index) { - // Round trip test for computing the object ID for a task's return value, - // then computing the task ID that created the object. - ObjectID return_id = ObjectID::ForTaskReturn(task_id, return_index); - ASSERT_TRUE(return_id.CreatedByTask()); - ASSERT_TRUE(return_id.IsReturnObject()); - ASSERT_FALSE(return_id.IsPutObject()); - ASSERT_EQ(return_id.TaskId(), task_id); - ASSERT_EQ(return_id.ObjectIndex(), return_index); -} - -void TestPutObjectId(const TaskID &task_id, int64_t put_index) { - // Round trip test for computing the object ID for a task's put value, then - // computing the task ID that created the object. - ObjectID put_id = ObjectID::ForPut(task_id, put_index); - ASSERT_TRUE(put_id.CreatedByTask()); - ASSERT_FALSE(put_id.IsReturnObject()); - ASSERT_TRUE(put_id.IsPutObject()); - ASSERT_EQ(put_id.TaskId(), task_id); - ASSERT_EQ(put_id.ObjectIndex(), put_index); +void TestFromIndexObjectId(const TaskID &task_id, int64_t index) { + // Round trip test for computing the object ID for an object created by a task, either + // via an object put or by being a return object for the task. + ObjectID obj_id = ObjectID::FromIndex(task_id, index); + ASSERT_EQ(obj_id.TaskId(), task_id); + ASSERT_EQ(obj_id.ObjectIndex(), index); } void TestRandomObjectId() { // Round trip test for computing the object ID from random. const ObjectID random_object_id = ObjectID::FromRandom(); - ASSERT_FALSE(random_object_id.CreatedByTask()); + ASSERT_FALSE(random_object_id.TaskId().IsNil()); + ASSERT_EQ(random_object_id.ObjectIndex(), 0); } const static JobID kDefaultJobId = JobID::FromInt(199); @@ -83,17 +70,10 @@ TEST(ObjectIDTest, TestObjectID) { TaskID::ForActorTask(kDefaultJobId, kDefaultDriverTaskId, 1, default_actor_id); { - // test for put - TestPutObjectId(default_task_id, 1); - TestPutObjectId(default_task_id, 2); - TestPutObjectId(default_task_id, ObjectID::kMaxObjectIndex); - } - - { - // test for return - TestReturnObjectId(default_task_id, 1); - TestReturnObjectId(default_task_id, 2); - TestReturnObjectId(default_task_id, ObjectID::kMaxObjectIndex); + // test from index + TestFromIndexObjectId(default_task_id, 1); + TestFromIndexObjectId(default_task_id, 2); + TestFromIndexObjectId(default_task_id, ObjectID::kMaxObjectIndex); } { diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index ee8785713..e9fa7ab4e 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -116,7 +116,7 @@ size_t TaskSpecification::NumArgs() const { return message_->args_size(); } size_t TaskSpecification::NumReturns() const { return message_->num_returns(); } ObjectID TaskSpecification::ReturnId(size_t return_index) const { - return ObjectID::ForTaskReturn(TaskId(), return_index + 1); + return ObjectID::FromIndex(TaskId(), return_index + 1); } bool TaskSpecification::ArgByRef(size_t arg_index) const { diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index d419a5da2..c59516080 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -86,7 +86,7 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_ // Build actor task spec. const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID()); const ObjectID actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1); + ObjectID::FromIndex(actor_creation_task_id, /*index=*/1); builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id, /*previous_actor_task_dummy_object_id=*/actor_cursor_, task_counter_++); diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index c0b568199..c11d7f35d 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -19,11 +19,26 @@ namespace ray { /// per-thread context for core worker. struct WorkerThreadContext { WorkerThreadContext() - : current_task_id_(TaskID::ForFakeTask()), task_index_(0), put_index_(0) {} + : current_task_id_(TaskID::ForFakeTask()), task_index_(0), put_counter_(0) {} int GetNextTaskIndex() { return ++task_index_; } - int GetNextPutIndex() { return ++put_index_; } + /// Returns the next put object index. The index starts at the number of + /// return values for the current task in order to keep the put indices from + /// conflicting with return object indices. 1 <= idx <= NumReturns() is reserved for + /// return objects, while idx > NumReturns is available for put objects. + int GetNextPutIndex() { + // If current_task_ is nullptr, we assume that we're in the event loop thread and + // are executing async tasks; in this case, we're using a fake, random task ID + // for put objects, so there's no risk of creating put object IDs that conflict with + // return object IDs (none of the latter are created). The put counter will never + // reset and will therefore continue to increment for the lifetime of the event + // loop thread (ResetCurrentTask and SetCurrenTask will never be called in the + // thread), so there's no risk of conflicting put object IDs, either. + // See https://github.com/ray-project/ray/issues/10324 for further details. + auto num_returns = current_task_ != nullptr ? current_task_->NumReturns() : 0; + return num_returns + ++put_counter_; + } const TaskID &GetCurrentTaskID() const { return current_task_id_; } @@ -35,15 +50,15 @@ struct WorkerThreadContext { void SetCurrentTask(const TaskSpecification &task_spec) { RAY_CHECK(task_index_ == 0); - RAY_CHECK(put_index_ == 0); + RAY_CHECK(put_counter_ == 0); SetCurrentTaskId(task_spec.TaskId()); current_task_ = std::make_shared(task_spec); } - void ResetCurrentTask(const TaskSpecification &task_spec) { + void ResetCurrentTask() { SetCurrentTaskId(TaskID::Nil()); task_index_ = 0; - put_index_ = 0; + put_counter_ = 0; } private: @@ -56,8 +71,9 @@ struct WorkerThreadContext { /// Number of tasks that have been submitted from current task. int task_index_; - /// Number of objects that have been put from current task. - int put_index_; + /// A running counter for the number of object puts carried out in the current task. + /// Used to calculate the object index for put object ObjectIDs. + int put_counter_; }; thread_local std::unique_ptr WorkerContext::thread_context_ = @@ -121,7 +137,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { } void WorkerContext::ResetCurrentTask(const TaskSpecification &task_spec) { - GetThreadContext().ResetCurrentTask(task_spec); + GetThreadContext().ResetCurrentTask(); if (task_spec.IsNormalTask()) { SetCurrentJobId(JobID::Nil()); } diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index b377c6e12..3baf6c61a 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -69,6 +69,7 @@ class WorkerContext { int GetNextTaskIndex(); + // Returns the next put object index; used to calculate ObjectIDs for puts. int GetNextPutIndex(); protected: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5d489f8af..425f967ab 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -51,7 +51,7 @@ void BuildCommonTaskSpec( // Compute return IDs. return_ids->resize(num_returns); for (size_t i = 0; i < num_returns; i++) { - (*return_ids)[i] = ObjectID::ForTaskReturn(task_id, i + 1); + (*return_ids)[i] = ObjectID::FromIndex(task_id, i + 1); } } @@ -811,8 +811,8 @@ Status CoreWorker::SetClientOptions(std::string name, int64_t limit_bytes) { Status CoreWorker::Put(const RayObject &object, const std::vector &contained_object_ids, ObjectID *object_id) { - *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), - worker_context_.GetNextPutIndex()); + *object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(), + worker_context_.GetNextPutIndex()); reference_counter_->AddOwnedObject( *object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(), /*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id())); @@ -858,8 +858,8 @@ Status CoreWorker::Put(const RayObject &object, Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, const std::vector &contained_object_ids, ObjectID *object_id, std::shared_ptr *data) { - *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), - worker_context_.GetNextPutIndex()); + *object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(), + worker_context_.GetNextPutIndex()); if (options_.is_local_mode || (RayConfig::instance().put_small_object_in_memory_store() && static_cast(data_size) < @@ -1436,8 +1436,7 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun } Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { - if (!object_id.CreatedByTask() || - actor_manager_->CheckActorHandleExists(object_id.TaskId().ActorId())) { + if (actor_manager_->CheckActorHandleExists(object_id.TaskId().ActorId())) { return Status::Invalid("Actor task cancellation is not supported."); } rpc::Address obj_addr; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 2c288efe5..9ed0ce0e0 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -442,7 +442,7 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, << ", error_type: " << ErrorType_Name(error_type); int64_t num_returns = spec.NumReturns(); for (int i = 0; i < num_returns; i++) { - const auto object_id = ObjectID::ForTaskReturn(task_id, /*index=*/i + 1); + const auto object_id = ObjectID::FromIndex(task_id, /*index=*/i + 1); RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id)); } } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index f28c5c486..fdae29a66 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -27,6 +27,7 @@ #include "hiredis/hiredis.h" #include "ray/common/buffer.h" #include "ray/common/ray_object.h" +#include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" #include "ray/core_worker/context.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" @@ -314,7 +315,6 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso driver.SubmitActorTask(actor_id, func, args, options, &return_ids); ASSERT_EQ(return_ids.size(), 1); - ASSERT_TRUE(return_ids[0].IsReturnObject()); std::vector> results; RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); @@ -630,6 +630,17 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { // Verify that these fields are thread-local. ASSERT_EQ(context.GetNextTaskIndex(), 3); ASSERT_EQ(context.GetNextPutIndex(), 3); + + TaskSpecification task_spec; + size_t num_returns = 3; + task_spec.GetMutableMessage().set_num_returns(num_returns); + context.ResetCurrentTask(task_spec); + context.SetCurrentTask(task_spec); + ASSERT_EQ(context.GetCurrentTaskID(), task_spec.TaskId()); + + // Verify that put index doesn't confict with the return object range. + ASSERT_EQ(context.GetNextPutIndex(), num_returns + 1); + ASSERT_EQ(context.GetNextPutIndex(), num_returns + 2); } TEST_F(ZeroNodeTest, TestActorHandle) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index c05d5b71f..c921301a1 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -350,7 +350,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( if (objects_valid) { for (size_t i = 0; i < return_objects.size(); i++) { auto return_object = reply->add_return_objects(); - ObjectID id = ObjectID::ForTaskReturn(task_spec.TaskId(), /*index=*/i + 1); + ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1); return_object->set_object_id(id.Binary()); // The object is nullptr if it already existed in the object store. diff --git a/src/ray/design_docs/id_specification.md b/src/ray/design_docs/id_specification.md index c7b760644..d308cb1ed 100644 --- a/src/ray/design_docs/id_specification.md +++ b/src/ray/design_docs/id_specification.md @@ -3,36 +3,36 @@ Ray ID Specification ``` high bits low bits -<-------------------------------------------------------------------------------------------- +<------------------------------------------------------------------------------------------ - 2B - +-----------------+ - | unique bytes | JobID 2B - +-----------------+ + 4B + +-----------------+ + | unique bytes | JobID 4B + +-----------------+ - 4B 2B - +-----------------+-----------------+ - | unique bytes | JobID | ActorID 6B - +-----------------+-----------------+ + 4B 4B + +-----------------+-----------------+ + | unique bytes | JobID | ActorID 8B + +-----------------+-----------------+ - 8B 6B - +---------------------------+-----------------------------------+ - | unique bytes | ActorID | TaskID 14B - +---------------------------+-----------------------------------+ + 8B 8B + +-----------------------------------+-----------------------------------+ + | unique bytes | ActorID | TaskID 16B + +-----------------------------------+-----------------------------------+ - 4B 2B 14B -+---------------------------+---------------------------------------------------------------+ -| index bytes |flags bytes| TaskID | ObjectID 20B -+---------------------------+---------------------------------------------------------------+ + 4B 16B ++-----------------+-----------------------------------------------------------------------+ +| index bytes | TaskID | ObjectID 20B ++-----------------+-----------------------------------------------------------------------+ ``` -#### JobID (2 bytes) -`JobID` is generated by `GCS` to ensure uniqueness. Its length is 2 bytes. +#### JobID (4 bytes) +`JobID` is generated by `GCS` to ensure uniqueness. Its length is 4 bytes. -#### ActorID (6 bytes) +#### ActorID (8 bytes) An `ActorID` contains two parts: 1) 4 unique bytes, and 2) its `JobID`. -#### TaskID (14 bytes) +#### TaskID (16 bytes) A `TaskID` contains two parts: 1) 8 unique bytes, and 2) its `ActorID`. If the task is a normal task or a driver task, the part 2 is its dummy actor id. @@ -49,22 +49,8 @@ Note: Dummy actor id is an `ActorID` whose unique part is nil. ``` #### ObjectID (20 bytes) -An `ObjectID` contains 3 parts: -- `index bytes`: 4 bytes to indicate the index of the object. -- `flags bytes`: 2 bytes to indicate the flags of this object. We have 3 flags now: `created_by_task`, `object_type` and `transport_type`. -- `TaskID`: 14 bytes to indicate the ID of the task to which this object belongs. - -**flags bytes format** -``` - 1b 1b 14b -+-------------------------------------------------------+ -| (1) | (2) | (4)unused | -+-------------------------------------------------------+ -``` -- The (1) `created_by_task` part is one bit to indicate whether this `ObjectID` is generated (put or returned) from a task. - -- The (2) `object_type` part is one bit to indicate the type of this object, whether a `PUT_OBJECT` or a `RETURN_OBJECT`. - - `PUT_OBJECT` indicates this object is generated through `ray.put` during the task's execution. - - `RETURN_OBJECT` indicates this object is the return value of a task. - -- There are 14 bits unused in `flags bytes`. +An `ObjectID` contains 2 parts: +- `index bytes`: 4 bytes to indicate the index of the object within its creator task. + 1 <= idx <= num_return_objects is reserved for the task's return objects, while + idx > num_return_objects is available for the task's put objects. +- `TaskID`: 16 bytes to indicate the ID of the task to which this object belongs. diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 279dca5a7..e56fa52ce 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -279,7 +279,7 @@ class ReconstructionPolicyTest : public ::testing::Test { TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); @@ -297,7 +297,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); mock_object_directory_->SetObjectLocations(object_id, {ClientID::FromRandom()}); // Listen for both objects. @@ -320,7 +320,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); ClientID client_id = ClientID::FromRandom(); mock_object_directory_->SetObjectLocations(object_id, {client_id}); @@ -344,8 +344,8 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { // Create two object IDs produced by the same task. TaskID task_id = ForNormalTask(); - ObjectID object_id1 = ObjectID::ForTaskReturn(task_id, /*index=*/1); - ObjectID object_id2 = ObjectID::ForTaskReturn(task_id, /*index=*/2); + ObjectID object_id1 = ObjectID::FromIndex(task_id, /*index=*/1); + ObjectID object_id2 = ObjectID::FromIndex(task_id, /*index=*/2); // Listen for both objects. reconstruction_policy_->ListenAndMaybeReconstruct(object_id1, rpc::Address()); @@ -364,7 +364,7 @@ TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); // Run the test for much longer than the reconstruction timeout. int64_t test_period = 2 * reconstruction_timeout_ms_; @@ -391,7 +391,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); @@ -419,7 +419,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); // Listen for an object. reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); @@ -445,7 +445,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { TEST_F(ReconstructionPolicyTest, TestSimultaneousReconstructionSuppressed) { TaskID task_id = ForNormalTask(); - ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); + ObjectID object_id = ObjectID::FromIndex(task_id, /*index=*/1); // Log a reconstruction attempt to simulate a different node attempting the // reconstruction first. This should suppress this node's first attempt at diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 47df75096..57d329ca1 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -270,7 +270,7 @@ Task CreateTask(const std::unordered_map &required_resource required_resources, {}, PlacementGroupID::Nil()); for (int i = 0; i < num_args; i++) { - ObjectID put_id = ObjectID::ForPut(TaskID::Nil(), /*index=*/i + 1); + ObjectID put_id = ObjectID::FromIndex(TaskID::Nil(), /*index=*/i + 1); spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address())); } diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index e043f6422..27129ae50 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -261,7 +261,8 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { TEST_F(TaskDependencyManagerTest, TestDependentPut) { // Create a task with 3 arguments. auto task1 = ExampleTask({}, 0); - ObjectID put_id = ObjectID::ForPut(task1.GetTaskSpecification().TaskId(), /*index=*/1); + ObjectID put_id = + ObjectID::FromIndex(task1.GetTaskSpecification().TaskId(), /*index=*/1); auto task2 = ExampleTask({put_id}, 0); // No objects have been registered in the task dependency manager, so the put