Refactor ID Serial 2: change all ID functions to CamelCase (#4896)

This commit is contained in:
Yuhong Guo
2019-05-31 11:31:19 +08:00
committed by Hao Chen
parent 4e0be8b450
commit 1f0809e2b4
41 changed files with 506 additions and 509 deletions
@@ -161,7 +161,7 @@ public class IdUtil {
}
/**
* This method is the same as `hash()` method of `ID` class in ray/src/ray/id.h
* This method is the same as `Hash()` method of `ID` class in ray/src/ray/id.h
*/
private static long murmurHash64A(byte[] data, int length, int seed) {
final long m = 0xc6a4a7935bd1e995L;
+7 -7
View File
@@ -80,7 +80,7 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids):
cdef VectorToObjectIDs(c_vector[CObjectID] object_ids):
result = []
for i in range(object_ids.size()):
result.append(ObjectID(object_ids[i].binary()))
result.append(ObjectID(object_ids[i].Binary()))
return result
@@ -88,11 +88,11 @@ def compute_put_id(TaskID task_id, int64_t put_index):
if put_index < 1 or put_index > kMaxTaskPuts:
raise ValueError("The range of 'put_index' should be [1, %d]"
% kMaxTaskPuts)
return ObjectID(CObjectID.for_put(task_id.native(), put_index).binary())
return ObjectID(CObjectID.ForPut(task_id.native(), put_index).Binary())
def compute_task_id(ObjectID object_id):
return TaskID(object_id.native().task_id().binary())
return TaskID(object_id.native().TaskId().Binary())
cdef c_bool is_simple_value(value, int *num_elements_contained):
@@ -362,7 +362,7 @@ cdef class RayletClient:
with nogil:
check_status(self.client.get().PrepareActorCheckpoint(
c_actor_id, checkpoint_id))
return ActorCheckpointID(checkpoint_id.binary())
return ActorCheckpointID(checkpoint_id.Binary())
def notify_actor_resumed_from_checkpoint(self, ActorID actor_id,
ActorCheckpointID checkpoint_id):
@@ -370,7 +370,7 @@ cdef class RayletClient:
actor_id.native(), checkpoint_id.native()))
def set_resource(self, basestring resource_name, double capacity, ClientID client_id):
self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.from_binary(client_id.binary()))
self.client.get().SetResource(resource_name.encode("ascii"), capacity, CClientID.FromBinary(client_id.binary()))
@property
def language(self):
@@ -378,11 +378,11 @@ cdef class RayletClient:
@property
def client_id(self):
return ClientID(self.client.get().GetClientID().binary())
return ClientID(self.client.get().GetClientID().Binary())
@property
def driver_id(self):
return DriverID(self.client.get().GetDriverID().binary())
return DriverID(self.client.get().GetDriverID().Binary())
@property
def is_worker(self):
+8 -8
View File
@@ -124,15 +124,15 @@ cdef class Task:
def driver_id(self):
"""Return the driver ID for this task."""
return DriverID(self.task_spec.get().DriverId().binary())
return DriverID(self.task_spec.get().DriverId().Binary())
def task_id(self):
"""Return the task ID for this task."""
return TaskID(self.task_spec.get().TaskId().binary())
return TaskID(self.task_spec.get().TaskId().Binary())
def parent_task_id(self):
"""Return the task ID of the parent task."""
return TaskID(self.task_spec.get().ParentTaskId().binary())
return TaskID(self.task_spec.get().ParentTaskId().Binary())
def parent_counter(self):
"""Return the parent counter of this task."""
@@ -162,7 +162,7 @@ cdef class Task:
if count > 0:
assert count == 1
arg_list.append(
ObjectID(task_spec.ArgId(i, 0).binary()))
ObjectID(task_spec.ArgId(i, 0).Binary()))
else:
serialized_str = (
task_spec.ArgVal(i)[:task_spec.ArgValLength(i)])
@@ -178,7 +178,7 @@ cdef class Task:
cdef CTaskSpecification *task_spec = self.task_spec.get()
return_id_list = []
for i in range(task_spec.NumReturns()):
return_id_list.append(ObjectID(task_spec.ReturnId(i).binary()))
return_id_list.append(ObjectID(task_spec.ReturnId(i).Binary()))
return return_id_list
def required_resources(self):
@@ -207,16 +207,16 @@ cdef class Task:
def actor_creation_id(self):
"""Return the actor creation ID for the task."""
return ActorID(self.task_spec.get().ActorCreationId().binary())
return ActorID(self.task_spec.get().ActorCreationId().Binary())
def actor_creation_dummy_object_id(self):
"""Return the actor creation dummy object ID for the task."""
return ObjectID(
self.task_spec.get().ActorCreationDummyObjectId().binary())
self.task_spec.get().ActorCreationDummyObjectId().Binary())
def actor_id(self):
"""Return the actor ID for this task."""
return ActorID(self.task_spec.get().ActorId().binary())
return ActorID(self.task_spec.get().ActorId().Binary())
def actor_counter(self):
"""Return the actor counter for this task."""
+30 -30
View File
@@ -8,116 +8,116 @@ cdef extern from "ray/id.h" namespace "ray" nogil:
T from_random()
@staticmethod
T from_binary(const c_string &binary)
T FromBinary(const c_string &binary)
@staticmethod
const T nil()
const T Nil()
@staticmethod
size_t size()
size_t Size()
size_t hash() const
c_bool is_nil() const
size_t Hash() const
c_bool IsNil() const
c_bool operator==(const CBaseID &rhs) const
c_bool operator!=(const CBaseID &rhs) const
const uint8_t *data() const;
c_string binary() const;
c_string hex() const;
c_string Binary() const;
c_string Hex() const;
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
CUniqueID()
@staticmethod
size_t size()
size_t Size()
@staticmethod
CUniqueID from_random()
@staticmethod
CUniqueID from_binary(const c_string &binary)
CUniqueID FromBinary(const c_string &binary)
@staticmethod
const CUniqueID nil()
const CUniqueID Nil()
@staticmethod
size_t size()
size_t Size()
cdef cppclass CActorCheckpointID "ray::ActorCheckpointID"(CUniqueID):
@staticmethod
CActorCheckpointID from_binary(const c_string &binary)
CActorCheckpointID FromBinary(const c_string &binary)
cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID):
@staticmethod
CActorClassID from_binary(const c_string &binary)
CActorClassID FromBinary(const c_string &binary)
cdef cppclass CActorID "ray::ActorID"(CUniqueID):
@staticmethod
CActorID from_binary(const c_string &binary)
CActorID FromBinary(const c_string &binary)
cdef cppclass CActorHandleID "ray::ActorHandleID"(CUniqueID):
@staticmethod
CActorHandleID from_binary(const c_string &binary)
CActorHandleID FromBinary(const c_string &binary)
cdef cppclass CClientID "ray::ClientID"(CUniqueID):
@staticmethod
CClientID from_binary(const c_string &binary)
CClientID FromBinary(const c_string &binary)
cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):
@staticmethod
CConfigID from_binary(const c_string &binary)
CConfigID FromBinary(const c_string &binary)
cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID):
@staticmethod
CFunctionID from_binary(const c_string &binary)
CFunctionID FromBinary(const c_string &binary)
cdef cppclass CDriverID "ray::DriverID"(CUniqueID):
@staticmethod
CDriverID from_binary(const c_string &binary)
CDriverID FromBinary(const c_string &binary)
cdef cppclass CTaskID "ray::TaskID"(CBaseID[CTaskID]):
@staticmethod
CTaskID from_binary(const c_string &binary)
CTaskID FromBinary(const c_string &binary)
@staticmethod
const CTaskID nil()
const CTaskID Nil()
@staticmethod
size_t size()
size_t Size()
cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]):
@staticmethod
CObjectID from_binary(const c_string &binary)
CObjectID FromBinary(const c_string &binary)
@staticmethod
const CObjectID nil()
const CObjectID Nil()
@staticmethod
CObjectID for_put(const CTaskID &task_id, int64_t index);
CObjectID ForPut(const CTaskID &task_id, int64_t index);
@staticmethod
CObjectID for_task_return(const CTaskID &task_id, int64_t index);
CObjectID ForTaskReturn(const CTaskID &task_id, int64_t index);
@staticmethod
size_t size()
size_t Size()
c_bool is_put()
int64_t object_index() const
int64_t ObjectIndex() const
CTaskID task_id() const
CTaskID TaskId() const
cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):
@staticmethod
CWorkerID from_binary(const c_string &binary)
CWorkerID FromBinary(const c_string &binary)
+33 -33
View File
@@ -97,7 +97,7 @@ cdef class UniqueID(BaseID):
def __init__(self, id):
check_id(id)
self.data = CUniqueID.from_binary(id)
self.data = CUniqueID.FromBinary(id)
@classmethod
def from_binary(cls, id_bytes):
@@ -107,27 +107,27 @@ cdef class UniqueID(BaseID):
@classmethod
def nil(cls):
return cls(CUniqueID.nil().binary())
return cls(CUniqueID.Nil().Binary())
@classmethod
def from_random(cls):
return cls(os.urandom(CUniqueID.size()))
return cls(os.urandom(CUniqueID.Size()))
def size(self):
return CUniqueID.size()
return CUniqueID.Size()
def binary(self):
return self.data.binary()
return self.data.Binary()
def hex(self):
return decode(self.data.hex())
return decode(self.data.Hex())
def is_nil(self):
return self.data.is_nil()
return self.data.IsNil()
cdef size_t hash(self):
return self.data.hash()
return self.data.Hash()
cdef class ObjectID(BaseID):
@@ -135,78 +135,78 @@ cdef class ObjectID(BaseID):
def __init__(self, id):
check_id(id)
self.data = CObjectID.from_binary(<c_string>id)
self.data = CObjectID.FromBinary(<c_string>id)
cdef CObjectID native(self):
return <CObjectID>self.data
def size(self):
return CObjectID.size()
return CObjectID.Size()
def binary(self):
return self.data.binary()
return self.data.Binary()
def hex(self):
return decode(self.data.hex())
return decode(self.data.Hex())
def is_nil(self):
return self.data.is_nil()
return self.data.IsNil()
cdef size_t hash(self):
return self.data.hash()
return self.data.Hash()
@classmethod
def nil(cls):
return cls(CObjectID.nil().binary())
return cls(CObjectID.Nil().Binary())
@classmethod
def from_random(cls):
return cls(os.urandom(CObjectID.size()))
return cls(os.urandom(CObjectID.Size()))
cdef class TaskID(BaseID):
cdef CTaskID data
def __init__(self, id):
check_id(id, CTaskID.size())
self.data = CTaskID.from_binary(<c_string>id)
check_id(id, CTaskID.Size())
self.data = CTaskID.FromBinary(<c_string>id)
cdef CTaskID native(self):
return <CTaskID>self.data
def size(self):
return CTaskID.size()
return CTaskID.Size()
def binary(self):
return self.data.binary()
return self.data.Binary()
def hex(self):
return decode(self.data.hex())
return decode(self.data.Hex())
def is_nil(self):
return self.data.is_nil()
return self.data.IsNil()
cdef size_t hash(self):
return self.data.hash()
return self.data.Hash()
@classmethod
def nil(cls):
return cls(CTaskID.nil().binary())
return cls(CTaskID.Nil().Binary())
@classmethod
def size(cla):
return CTaskID.size()
return CTaskID.Size()
@classmethod
def from_random(cls):
return cls(os.urandom(CTaskID.size()))
return cls(os.urandom(CTaskID.Size()))
cdef class ClientID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CClientID.from_binary(<c_string>id)
self.data = CClientID.FromBinary(<c_string>id)
cdef CClientID native(self):
return <CClientID>self.data
@@ -216,7 +216,7 @@ cdef class DriverID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CDriverID.from_binary(<c_string>id)
self.data = CDriverID.FromBinary(<c_string>id)
cdef CDriverID native(self):
return <CDriverID>self.data
@@ -226,7 +226,7 @@ cdef class ActorID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CActorID.from_binary(<c_string>id)
self.data = CActorID.FromBinary(<c_string>id)
cdef CActorID native(self):
return <CActorID>self.data
@@ -236,7 +236,7 @@ cdef class ActorHandleID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CActorHandleID.from_binary(<c_string>id)
self.data = CActorHandleID.FromBinary(<c_string>id)
cdef CActorHandleID native(self):
return <CActorHandleID>self.data
@@ -246,7 +246,7 @@ cdef class ActorCheckpointID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CActorCheckpointID.from_binary(<c_string>id)
self.data = CActorCheckpointID.FromBinary(<c_string>id)
cdef CActorCheckpointID native(self):
return <CActorCheckpointID>self.data
@@ -256,7 +256,7 @@ cdef class FunctionID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CFunctionID.from_binary(<c_string>id)
self.data = CFunctionID.FromBinary(<c_string>id)
cdef CFunctionID native(self):
return <CFunctionID>self.data
@@ -266,7 +266,7 @@ cdef class ActorClassID(UniqueID):
def __init__(self, id):
check_id(id)
self.data = CActorClassID.from_binary(<c_string>id)
self.data = CActorClassID.FromBinary(<c_string>id)
cdef CActorClassID native(self):
return <CActorClassID>self.data
+2 -2
View File
@@ -231,7 +231,7 @@ ClientConnection<T>::ClientConnection(
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type)
: ServerConnection<T>(std::move(socket)),
client_id_(ClientID::nil()),
client_id_(ClientID::Nil()),
message_handler_(message_handler),
debug_label_(debug_label),
message_type_enum_names_(message_type_enum_names),
@@ -307,7 +307,7 @@ bool ClientConnection<T>::CheckRayCookie() {
ss << ", remote endpoint info: " << remote_endpoint_info;
}
if (!client_id_.is_nil()) {
if (!client_id_.IsNil()) {
// This is from a known client, which indicates a bug.
RAY_LOG(FATAL) << ss.str();
} else {
+6 -6
View File
@@ -104,13 +104,13 @@ string_vec_to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
template <typename ID>
flatbuffers::Offset<flatbuffers::String> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb,
ID id) {
return fbb.CreateString(reinterpret_cast<const char *>(id.data()), id.size());
return fbb.CreateString(reinterpret_cast<const char *>(id.Data()), id.Size());
}
template <typename ID>
ID from_flatbuf(const flatbuffers::String &string) {
RAY_CHECK(string.size() == ID::size());
return ID::from_binary(string.str());
RAY_CHECK(string.size() == ID::Size());
return ID::FromBinary(string.str());
}
template <typename ID>
@@ -127,14 +127,14 @@ template <typename ID>
const std::vector<ID> ids_from_flatbuf(const flatbuffers::String &string) {
const auto &ids = string_from_flatbuf(string);
std::vector<ID> ret;
size_t id_size = ID::size();
size_t id_size = ID::Size();
RAY_CHECK(ids.size() % id_size == 0);
auto count = ids.size() / id_size;
for (size_t i = 0; i < count; ++i) {
auto pos = static_cast<size_t>(id_size * i);
const auto &id = ids.substr(pos, id_size);
ret.push_back(ID::from_binary(id));
ret.push_back(ID::FromBinary(id));
}
return ret;
@@ -145,7 +145,7 @@ flatbuffers::Offset<flatbuffers::String> ids_to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb, const std::vector<ID> &ids) {
std::string result;
for (const auto &id : ids) {
result += id.binary();
result += id.Binary();
}
return fbb.CreateString(result);
+4 -4
View File
@@ -146,19 +146,19 @@ AsyncGcsClient::AsyncGcsClient(const std::string &address, int port,
AsyncGcsClient::AsyncGcsClient(const std::string &address, int port,
CommandType command_type)
: AsyncGcsClient(address, port, ClientID::from_random(), command_type) {}
: AsyncGcsClient(address, port, ClientID::FromRandom(), command_type) {}
AsyncGcsClient::AsyncGcsClient(const std::string &address, int port,
CommandType command_type, bool is_test_client)
: AsyncGcsClient(address, port, ClientID::from_random(), command_type,
: AsyncGcsClient(address, port, ClientID::FromRandom(), command_type,
is_test_client) {}
AsyncGcsClient::AsyncGcsClient(const std::string &address, int port,
const std::string &password = "")
: AsyncGcsClient(address, port, ClientID::from_random(), false, password) {}
: AsyncGcsClient(address, port, ClientID::FromRandom(), false, password) {}
AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, bool is_test_client)
: AsyncGcsClient(address, port, ClientID::from_random(), is_test_client) {}
: AsyncGcsClient(address, port, ClientID::FromRandom(), is_test_client) {}
Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
// Take care of sharding contexts.
+31 -31
View File
@@ -29,7 +29,7 @@ class TestGcs : public ::testing::Test {
TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) {
client_ = std::make_shared<gcs::AsyncGcsClient>("127.0.0.1", 6379, command_type_,
/*is_test_client=*/true);
driver_id_ = DriverID::from_random();
driver_id_ = DriverID::FromRandom();
}
virtual ~TestGcs() {
@@ -84,7 +84,7 @@ class TestGcsWithChainAsio : public TestGcsWithAsio {
void TestTableLookup(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = "123";
@@ -133,7 +133,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableLookup);
void TestLogLookup(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Append some entries to the log at an object ID.
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
std::vector<std::string> node_manager_ids = {"abc", "def", "ghi"};
for (auto &node_manager_id : node_manager_ids) {
auto data = std::make_shared<TaskReconstructionDataT>();
@@ -178,7 +178,7 @@ TEST_F(TestGcsWithAsio, TestLogLookup) {
void TestTableLookupFailure(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
// Check that the lookup does not return data.
auto lookup_callback = [](gcs::AsyncGcsClient *client, const TaskID &id,
@@ -205,7 +205,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableLookupFailure);
void TestLogAppendAt(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
std::vector<std::string> node_manager_ids = {"A", "B"};
std::vector<std::shared_ptr<TaskReconstructionDataT>> data_log;
for (const auto &node_manager_id : node_manager_ids) {
@@ -265,7 +265,7 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) {
void TestSet(const DriverID &driver_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add some entries to the set at an object ID.
ObjectID object_id = ObjectID::from_random();
ObjectID object_id = ObjectID::FromRandom();
std::vector<std::string> managers = {"abc", "def", "ghi"};
for (auto &manager : managers) {
auto data = std::make_shared<ObjectTableDataT>();
@@ -335,7 +335,7 @@ void TestDeleteKeysFromLog(
std::vector<TaskID> ids;
TaskID task_id;
for (auto &data : data_vector) {
task_id = TaskID::from_random();
task_id = TaskID::FromRandom();
ids.push_back(task_id);
// Check that we added the correct object entries.
auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id,
@@ -383,7 +383,7 @@ void TestDeleteKeysFromTable(const DriverID &driver_id,
std::vector<TaskID> ids;
TaskID task_id;
for (auto &data : data_vector) {
task_id = TaskID::from_random();
task_id = TaskID::FromRandom();
ids.push_back(task_id);
// Check that we added the correct object entries.
auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id,
@@ -431,7 +431,7 @@ void TestDeleteKeysFromSet(const DriverID &driver_id,
std::vector<ObjectID> ids;
ObjectID object_id;
for (auto &data : data_vector) {
object_id = ObjectID::from_random();
object_id = ObjectID::FromRandom();
ids.push_back(object_id);
// Check that we added the correct object entries.
auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const ObjectID &id,
@@ -477,7 +477,7 @@ void TestDeleteKeys(const DriverID &driver_id,
auto AppendTaskReconstructionData = [&task_reconstruction_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
auto data = std::make_shared<TaskReconstructionDataT>();
data->node_manager_id = ObjectID::from_random().hex();
data->node_manager_id = ObjectID::FromRandom().Hex();
task_reconstruction_vector.push_back(data);
}
};
@@ -506,7 +506,7 @@ void TestDeleteKeys(const DriverID &driver_id,
auto AppendTaskData = [&task_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
auto task_data = std::make_shared<protocol::TaskT>();
task_data->task_specification = ObjectID::from_random().hex();
task_data->task_specification = ObjectID::FromRandom().Hex();
task_vector.push_back(task_data);
}
};
@@ -532,7 +532,7 @@ void TestDeleteKeys(const DriverID &driver_id,
auto AppendObjectData = [&object_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = ObjectID::from_random().hex();
data->manager = ObjectID::FromRandom().Hex();
object_vector.push_back(data);
}
};
@@ -603,7 +603,7 @@ void TestLogSubscribeAll(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
std::vector<DriverID> driver_ids;
for (int i = 0; i < 3; i++) {
driver_ids.emplace_back(DriverID::from_random());
driver_ids.emplace_back(DriverID::FromRandom());
}
// Callback for a notification.
auto notification_callback = [driver_ids](gcs::AsyncGcsClient *client,
@@ -612,7 +612,7 @@ void TestLogSubscribeAll(const DriverID &driver_id,
ASSERT_EQ(id, driver_ids[test->NumCallbacks()]);
// Check that we get notifications in the same order as the writes.
for (const auto &entry : data) {
ASSERT_EQ(entry.driver_id, driver_ids[test->NumCallbacks()].binary());
ASSERT_EQ(entry.driver_id, driver_ids[test->NumCallbacks()].Binary());
test->IncrementNumCallbacks();
}
if (test->NumCallbacks() == driver_ids.size()) {
@@ -633,7 +633,7 @@ void TestLogSubscribeAll(const DriverID &driver_id,
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->driver_table().Subscribe(
driver_id, ClientID::nil(), notification_callback, subscribe_callback));
driver_id, ClientID::Nil(), notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
@@ -651,7 +651,7 @@ void TestSetSubscribeAll(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
std::vector<ObjectID> object_ids;
for (int i = 0; i < 3; i++) {
object_ids.emplace_back(ObjectID::from_random());
object_ids.emplace_back(ObjectID::FromRandom());
}
std::vector<std::string> managers = {"abc", "def", "ghi"};
@@ -711,7 +711,7 @@ void TestSetSubscribeAll(const DriverID &driver_id,
// subscribed, we will append to the key several times and check that we get
// notified for each.
RAY_CHECK_OK(client->object_table().Subscribe(
driver_id, ClientID::nil(), notification_callback, subscribe_callback));
driver_id, ClientID::Nil(), notification_callback, subscribe_callback));
// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
@@ -728,11 +728,11 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeAll) {
void TestTableSubscribeId(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a table entry.
TaskID task_id1 = TaskID::from_random();
TaskID task_id1 = TaskID::FromRandom();
std::vector<std::string> task_specs1 = {"abc", "def", "ghi"};
// Add a table entry at a second key.
TaskID task_id2 = TaskID::from_random();
TaskID task_id2 = TaskID::FromRandom();
std::vector<std::string> task_specs2 = {"jkl", "mno", "pqr"};
// The callback for a notification from the table. This should only be
@@ -804,14 +804,14 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeId);
void TestLogSubscribeId(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a log entry.
DriverID driver_id1 = DriverID::from_random();
DriverID driver_id1 = DriverID::FromRandom();
std::vector<std::string> driver_ids1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<DriverTableDataT>();
data1->driver_id = driver_ids1[0];
RAY_CHECK_OK(client->driver_table().Append(driver_id, driver_id1, data1, nullptr));
// Add a log entry at a second key.
DriverID driver_id2 = DriverID::from_random();
DriverID driver_id2 = DriverID::FromRandom();
std::vector<std::string> driver_ids2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<DriverTableDataT>();
data2->driver_id = driver_ids2[0];
@@ -878,14 +878,14 @@ TEST_F(TestGcsWithAsio, TestLogSubscribeId) {
void TestSetSubscribeId(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a set entry.
ObjectID object_id1 = ObjectID::from_random();
ObjectID object_id1 = ObjectID::FromRandom();
std::vector<std::string> managers1 = {"abc", "def", "ghi"};
auto data1 = std::make_shared<ObjectTableDataT>();
data1->manager = managers1[0];
RAY_CHECK_OK(client->object_table().Add(driver_id, object_id1, data1, nullptr));
// Add a set entry at a second key.
ObjectID object_id2 = ObjectID::from_random();
ObjectID object_id2 = ObjectID::FromRandom();
std::vector<std::string> managers2 = {"jkl", "mno", "pqr"};
auto data2 = std::make_shared<ObjectTableDataT>();
data2->manager = managers2[0];
@@ -954,7 +954,7 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeId) {
void TestTableSubscribeCancel(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a table entry.
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
std::vector<std::string> task_specs = {"jkl", "mno", "pqr"};
auto data = std::make_shared<protocol::TaskT>();
data->task_specification = task_specs[0];
@@ -1029,7 +1029,7 @@ TEST_MACRO(TestGcsWithChainAsio, TestTableSubscribeCancel);
void TestLogSubscribeCancel(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a log entry.
DriverID random_driver_id = DriverID::from_random();
DriverID random_driver_id = DriverID::FromRandom();
std::vector<std::string> driver_ids = {"jkl", "mno", "pqr"};
auto data = std::make_shared<DriverTableDataT>();
data->driver_id = driver_ids[0];
@@ -1102,7 +1102,7 @@ TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) {
void TestSetSubscribeCancel(const DriverID &driver_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add a set entry.
ObjectID object_id = ObjectID::from_random();
ObjectID object_id = ObjectID::FromRandom();
std::vector<std::string> managers = {"jkl", "mno", "pqr"};
auto data = std::make_shared<ObjectTableDataT>();
data->manager = managers[0];
@@ -1186,13 +1186,13 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client
const ClientTableDataT &data, bool is_insertion) {
ClientID added_id = client->client_table().GetLocalClientId();
ASSERT_EQ(client_id, added_id);
ASSERT_EQ(ClientID::from_binary(data.client_id), added_id);
ASSERT_EQ(ClientID::from_binary(data.client_id), added_id);
ASSERT_EQ(ClientID::FromBinary(data.client_id), added_id);
ASSERT_EQ(ClientID::FromBinary(data.client_id), added_id);
ASSERT_EQ(data.entry_type == EntryType::INSERTION, is_insertion);
ClientTableDataT cached_client;
client->client_table().GetClient(added_id, cached_client);
ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id);
ASSERT_EQ(ClientID::FromBinary(cached_client.client_id), added_id);
ASSERT_EQ(cached_client.entry_type == EntryType::INSERTION, is_insertion);
}
@@ -1290,13 +1290,13 @@ void TestClientTableMarkDisconnected(const DriverID &driver_id,
// Connect to the client table to start receiving notifications.
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
// Mark a different client as dead.
ClientID dead_client_id = ClientID::from_random();
ClientID dead_client_id = ClientID::FromRandom();
RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id));
// Make sure we only get a notification for the removal of the client we
// marked as dead.
client->client_table().RegisterClientRemovedCallback([dead_client_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ASSERT_EQ(ClientID::from_binary(data.client_id), dead_client_id);
ASSERT_EQ(ClientID::FromBinary(data.client_id), dead_client_id);
test->Stop();
});
test->Start();
+2 -2
View File
@@ -273,7 +273,7 @@ Status RedisContext::SubscribeAsync(const ClientID &client_id,
RAY_CHECK(out_callback_index != nullptr);
*out_callback_index = callback_index;
int status = 0;
if (client_id.is_nil()) {
if (client_id.IsNil()) {
// Subscribe to all messages.
std::string redis_command = "SUBSCRIBE %d";
status = redisAsyncCommand(
@@ -285,7 +285,7 @@ Status RedisContext::SubscribeAsync(const ClientID &client_id,
status = redisAsyncCommand(
subscribe_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), pubsub_channel,
client_id.data(), client_id.size());
client_id.Data(), client_id.Size());
}
if (status == REDIS_ERR) {
+3 -3
View File
@@ -168,7 +168,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id,
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size(), data, length, log_length);
pubsub_channel, id.Data(), id.Size(), data, length, log_length);
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
@@ -177,7 +177,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id,
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size(), data, length);
pubsub_channel, id.Data(), id.Size(), data, length);
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
@@ -188,7 +188,7 @@ Status RedisContext::RunAsync(const std::string &command, const ID &id,
int status = redisAsyncCommand(
async_context_, reinterpret_cast<redisCallbackFn *>(&GlobalRedisCallback),
reinterpret_cast<void *>(callback_index), redis_command.c_str(), prefix,
pubsub_channel, id.data(), id.size());
pubsub_channel, id.Data(), id.Size());
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
+1 -1
View File
@@ -648,7 +648,7 @@ static Status DeleteKeyHelper(RedisModuleCtx *ctx, RedisModuleString *prefix_str
const char *redis_string_str = RedisModule_StringPtrLen(id_data, &redis_string_size);
auto id_binary = std::string(redis_string_str, redis_string_size);
ostream << "Undesired type for RAY.TableDelete: " << key_type
<< " id:" << ray::UniqueID::from_binary(id_binary);
<< " id:" << ray::UniqueID::FromBinary(id_binary);
RAY_LOG(ERROR) << ostream.str();
return Status::RedisError(ostream.str());
}
+29 -29
View File
@@ -172,7 +172,7 @@ Status Log<ID, Data>::RequestNotifications(const DriverID &driver_id, const ID &
RAY_CHECK(subscribe_callback_index_ >= 0)
<< "Client requested notifications on a key before Subscribe completed";
return GetRedisContext(id)->RunAsync("RAY.TABLE_REQUEST_NOTIFICATIONS", id,
client_id.data(), client_id.size(), prefix_,
client_id.Data(), client_id.Size(), prefix_,
pubsub_channel_, nullptr);
}
@@ -182,7 +182,7 @@ Status Log<ID, Data>::CancelNotifications(const DriverID &driver_id, const ID &i
RAY_CHECK(subscribe_callback_index_ >= 0)
<< "Client canceled notifications on a key before Subscribe completed";
return GetRedisContext(id)->RunAsync("RAY.TABLE_CANCEL_NOTIFICATIONS", id,
client_id.data(), client_id.size(), prefix_,
client_id.Data(), client_id.Size(), prefix_,
pubsub_channel_, nullptr);
}
@@ -193,16 +193,16 @@ void Log<ID, Data>::Delete(const DriverID &driver_id, const std::vector<ID> &ids
}
std::unordered_map<RedisContext *, std::ostringstream> sharded_data;
for (const auto &id : ids) {
sharded_data[GetRedisContext(id).get()] << id.binary();
sharded_data[GetRedisContext(id).get()] << id.Binary();
}
// Breaking really large deletion commands into batches of smaller size.
const size_t batch_size =
RayConfig::instance().maximum_gcs_deletion_batch_size() * ID::size();
RayConfig::instance().maximum_gcs_deletion_batch_size() * ID::Size();
for (const auto &pair : sharded_data) {
std::string current_data = pair.second.str();
for (size_t cur = 0; cur < pair.second.str().size(); cur += batch_size) {
size_t data_field_size = std::min(batch_size, current_data.size() - cur);
uint16_t id_count = data_field_size / ID::size();
uint16_t id_count = data_field_size / ID::Size();
// Send data contains id count and all the id data.
std::string send_data(data_field_size + sizeof(id_count), 0);
uint8_t *buffer = reinterpret_cast<uint8_t *>(&send_data[0]);
@@ -212,7 +212,7 @@ void Log<ID, Data>::Delete(const DriverID &driver_id, const std::vector<ID> &ids
data_field_size, buffer + sizeof(uint16_t)));
RAY_IGNORE_EXPR(
pair.first->RunAsync("RAY.TABLE_DELETE", UniqueID::nil(),
pair.first->RunAsync("RAY.TABLE_DELETE", UniqueID::Nil(),
reinterpret_cast<const uint8_t *>(send_data.c_str()),
send_data.size(), prefix_, pubsub_channel_,
/*redisCallback=*/nullptr));
@@ -342,7 +342,7 @@ std::string Set<ID, Data>::DebugString() const {
Status ErrorTable::PushErrorToDriver(const DriverID &driver_id, const std::string &type,
const std::string &error_message, double timestamp) {
auto data = std::make_shared<ErrorTableDataT>();
data->driver_id = driver_id.binary();
data->driver_id = driver_id.Binary();
data->type = type;
data->error_message = error_message;
data->timestamp = timestamp;
@@ -359,7 +359,7 @@ Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events
// call "Pack" and undo the "UnPack".
profile_events.UnPackTo(data.get());
return Append(DriverID::nil(), UniqueID::from_random(), data,
return Append(DriverID::Nil(), UniqueID::FromRandom(), data,
/*done_callback=*/nullptr);
}
@@ -369,7 +369,7 @@ std::string ProfileTable::DebugString() const {
Status DriverTable::AppendDriverData(const DriverID &driver_id, bool is_dead) {
auto data = std::make_shared<DriverTableDataT>();
data->driver_id = driver_id.binary();
data->driver_id = driver_id.Binary();
data->is_dead = is_dead;
return Append(DriverID(driver_id), driver_id, data, /*done_callback=*/nullptr);
}
@@ -378,7 +378,7 @@ void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callbac
client_added_callback_ = callback;
// Call the callback for any added clients that are cached.
for (const auto &entry : client_cache_) {
if (!entry.first.is_nil() && (entry.second.entry_type == EntryType::INSERTION)) {
if (!entry.first.IsNil() && (entry.second.entry_type == EntryType::INSERTION)) {
client_added_callback_(client_, entry.first, entry.second);
}
}
@@ -388,7 +388,7 @@ void ClientTable::RegisterClientRemovedCallback(const ClientTableCallback &callb
client_removed_callback_ = callback;
// Call the callback for any removed clients that are cached.
for (const auto &entry : client_cache_) {
if (!entry.first.is_nil() && entry.second.entry_type == EntryType::DELETION) {
if (!entry.first.IsNil() && entry.second.entry_type == EntryType::DELETION) {
client_removed_callback_(client_, entry.first, entry.second);
}
}
@@ -399,7 +399,7 @@ void ClientTable::RegisterResourceCreateUpdatedCallback(
resource_createupdated_callback_ = callback;
// Call the callback for any clients that are cached.
for (const auto &entry : client_cache_) {
if (!entry.first.is_nil() &&
if (!entry.first.IsNil() &&
(entry.second.entry_type == EntryType::RES_CREATEUPDATE)) {
resource_createupdated_callback_(client_, entry.first, entry.second);
}
@@ -410,7 +410,7 @@ void ClientTable::RegisterResourceDeletedCallback(const ClientTableCallback &cal
resource_deleted_callback_ = callback;
// Call the callback for any clients that are cached.
for (const auto &entry : client_cache_) {
if (!entry.first.is_nil() && entry.second.entry_type == EntryType::RES_DELETE) {
if (!entry.first.IsNil() && entry.second.entry_type == EntryType::RES_DELETE) {
resource_deleted_callback_(client_, entry.first, entry.second);
}
}
@@ -418,7 +418,7 @@ void ClientTable::RegisterResourceDeletedCallback(const ClientTableCallback &cal
void ClientTable::HandleNotification(AsyncGcsClient *client,
const ClientTableDataT &data) {
ClientID client_id = ClientID::from_binary(data.client_id);
ClientID client_id = ClientID::FromBinary(data.client_id);
// It's possible to get duplicate notifications from the client table, so
// check whether this notification is new.
auto entry = client_cache_.find(client_id);
@@ -524,7 +524,7 @@ void ClientTable::HandleNotification(AsyncGcsClient *client,
}
void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT &data) {
auto connected_client_id = ClientID::from_binary(data.client_id);
auto connected_client_id = ClientID::FromBinary(data.client_id);
RAY_CHECK(client_id_ == connected_client_id) << connected_client_id << " "
<< client_id_;
}
@@ -583,13 +583,13 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
// Callback to request notifications from the client table once we've
// successfully subscribed.
auto subscription_callback = [this](AsyncGcsClient *c) {
RAY_CHECK_OK(RequestNotifications(DriverID::nil(), client_log_key_, client_id_));
RAY_CHECK_OK(RequestNotifications(DriverID::Nil(), client_log_key_, client_id_));
};
// Subscribe to the client table.
RAY_CHECK_OK(Subscribe(DriverID::nil(), client_id_, notification_callback,
RAY_CHECK_OK(Subscribe(DriverID::Nil(), client_id_, notification_callback,
subscription_callback));
};
return Append(DriverID::nil(), client_log_key_, data, add_callback);
return Append(DriverID::Nil(), client_log_key_, data, add_callback);
}
Status ClientTable::Disconnect(const DisconnectCallback &callback) {
@@ -598,12 +598,12 @@ Status ClientTable::Disconnect(const DisconnectCallback &callback) {
auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id,
const ClientTableDataT &data) {
HandleConnected(client, data);
RAY_CHECK_OK(CancelNotifications(DriverID::nil(), client_log_key_, id));
RAY_CHECK_OK(CancelNotifications(DriverID::Nil(), client_log_key_, id));
if (callback != nullptr) {
callback();
}
};
RAY_RETURN_NOT_OK(Append(DriverID::nil(), client_log_key_, data, add_callback));
RAY_RETURN_NOT_OK(Append(DriverID::Nil(), client_log_key_, data, add_callback));
// We successfully added the deletion entry. Mark ourselves as disconnected.
disconnected_ = true;
return Status::OK();
@@ -611,19 +611,19 @@ Status ClientTable::Disconnect(const DisconnectCallback &callback) {
ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
auto data = std::make_shared<ClientTableDataT>();
data->client_id = dead_client_id.binary();
data->client_id = dead_client_id.Binary();
data->entry_type = EntryType::DELETION;
return Append(DriverID::nil(), client_log_key_, data, nullptr);
return Append(DriverID::Nil(), client_log_key_, data, nullptr);
}
void ClientTable::GetClient(const ClientID &client_id,
ClientTableDataT &client_info) const {
RAY_CHECK(!client_id.is_nil());
RAY_CHECK(!client_id.IsNil());
auto entry = client_cache_.find(client_id);
if (entry != client_cache_.end()) {
client_info = entry->second;
} else {
client_info.client_id = ClientID::nil().binary();
client_info.client_id = ClientID::Nil().Binary();
}
}
@@ -633,7 +633,7 @@ const std::unordered_map<ClientID, ClientTableDataT> &ClientTable::GetAllClients
Status ClientTable::Lookup(const Callback &lookup) {
RAY_CHECK(lookup != nullptr);
return Log::Lookup(DriverID::nil(), client_log_key_, lookup);
return Log::Lookup(DriverID::Nil(), client_log_key_, lookup);
}
std::string ClientTable::DebugString() const {
@@ -653,12 +653,12 @@ Status ActorCheckpointIdTable::AddCheckpointId(const DriverID &driver_id,
std::shared_ptr<ActorCheckpointIdDataT> copy =
std::make_shared<ActorCheckpointIdDataT>(data);
copy->timestamps.push_back(current_sys_time_ms());
copy->checkpoint_ids += checkpoint_id.binary();
copy->checkpoint_ids += checkpoint_id.Binary();
auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep();
while (copy->timestamps.size() > num_to_keep) {
// Delete the checkpoint from actor checkpoint table.
const auto &checkpoint_id =
ActorCheckpointID::from_binary(copy->checkpoint_ids.substr(0, kUniqueIDSize));
ActorCheckpointID::FromBinary(copy->checkpoint_ids.substr(0, kUniqueIDSize));
RAY_LOG(DEBUG) << "Deleting checkpoint " << checkpoint_id << " for actor "
<< actor_id;
copy->timestamps.erase(copy->timestamps.begin());
@@ -671,9 +671,9 @@ Status ActorCheckpointIdTable::AddCheckpointId(const DriverID &driver_id,
ray::gcs::AsyncGcsClient *client, const UniqueID &id) {
std::shared_ptr<ActorCheckpointIdDataT> data =
std::make_shared<ActorCheckpointIdDataT>();
data->actor_id = id.binary();
data->actor_id = id.Binary();
data->timestamps.push_back(current_sys_time_ms());
data->checkpoint_ids = checkpoint_id.binary();
data->checkpoint_ids = checkpoint_id.Binary();
RAY_CHECK_OK(Add(driver_id, actor_id, data, nullptr));
};
return Lookup(driver_id, actor_id, lookup_callback, failure_callback);
+2 -2
View File
@@ -559,7 +559,7 @@ class TaskLeaseTable : public Table<TaskID, TaskLeaseData> {
// TODO(swang): Use a common helper function to format the key instead of
// hardcoding it to match the Redis module.
std::vector<std::string> args = {"PEXPIRE",
EnumNameTablePrefix(prefix_) + id.binary(),
EnumNameTablePrefix(prefix_) + id.Binary(),
std::to_string(data->timeout)};
return GetRedisContext(id)->RunArgvAsync(args);
@@ -695,7 +695,7 @@ class ClientTable : public Log<ClientID, ClientTableData> {
prefix_ = TablePrefix::CLIENT;
// Set the local client's ID.
local_client_.client_id = client_id.binary();
local_client_.client_id = client_id.Binary();
};
/// Connect as a client to the GCS. This registers us in the client table
+19 -19
View File
@@ -26,14 +26,14 @@ std::mt19937 RandomlySeededMersenneTwister() {
uint64_t MurmurHash64A(const void *key, int len, unsigned int seed);
plasma::UniqueID ObjectID::to_plasma_id() const {
plasma::UniqueID ObjectID::ToPlasmaId() const {
plasma::UniqueID result;
std::memcpy(result.mutable_data(), data(), kUniqueIDSize);
std::memcpy(result.mutable_data(), Data(), kUniqueIDSize);
return result;
}
ObjectID::ObjectID(const plasma::UniqueID &from) {
std::memcpy(this->mutable_data(), from.data(), kUniqueIDSize);
std::memcpy(this->MutableData(), from.data(), kUniqueIDSize);
}
// This code is from https://sites.google.com/site/murmurhash/
@@ -86,29 +86,29 @@ uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) {
}
TaskID TaskID::GetDriverTaskID(const DriverID &driver_id) {
std::string driver_id_str = driver_id.binary();
driver_id_str.resize(size());
return TaskID::from_binary(driver_id_str);
std::string driver_id_str = driver_id.Binary();
driver_id_str.resize(Size());
return TaskID::FromBinary(driver_id_str);
}
TaskID ObjectID::task_id() const {
return TaskID::from_binary(
std::string(reinterpret_cast<const char *>(id_), TaskID::size()));
TaskID ObjectID::TaskId() const {
return TaskID::FromBinary(
std::string(reinterpret_cast<const char *>(id_), TaskID::Size()));
}
ObjectID ObjectID::for_put(const TaskID &task_id, int64_t put_index) {
ObjectID ObjectID::ForPut(const TaskID &task_id, int64_t put_index) {
RAY_CHECK(put_index >= 1 && put_index <= kMaxTaskPuts) << "index=" << put_index;
ObjectID object_id;
std::memcpy(object_id.id_, task_id.binary().c_str(), task_id.size());
std::memcpy(object_id.id_, task_id.Binary().c_str(), task_id.Size());
object_id.index_ = -put_index;
return object_id;
}
ObjectID ObjectID::for_task_return(const TaskID &task_id, int64_t return_index) {
ObjectID ObjectID::ForTaskReturn(const TaskID &task_id, int64_t return_index) {
RAY_CHECK(return_index >= 1 && return_index <= kMaxTaskReturns) << "index="
<< return_index;
ObjectID object_id;
std::memcpy(object_id.id_, task_id.binary().c_str(), task_id.size());
std::memcpy(object_id.id_, task_id.Binary().c_str(), task_id.Size());
object_id.index_ = return_index;
return object_id;
}
@@ -118,23 +118,23 @@ const TaskID GenerateTaskId(const DriverID &driver_id, const TaskID &parent_task
// Compute hashes.
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx, reinterpret_cast<const BYTE *>(driver_id.data()), driver_id.size());
sha256_update(&ctx, reinterpret_cast<const BYTE *>(parent_task_id.data()),
parent_task_id.size());
sha256_update(&ctx, reinterpret_cast<const BYTE *>(driver_id.Data()), driver_id.Size());
sha256_update(&ctx, reinterpret_cast<const BYTE *>(parent_task_id.Data()),
parent_task_id.Size());
sha256_update(&ctx, (const BYTE *)&parent_task_counter, sizeof(parent_task_counter));
// Compute the final task ID from the hash.
BYTE buff[DIGEST_SIZE];
sha256_final(&ctx, buff);
return TaskID::from_binary(std::string(buff, buff + TaskID::size()));
return TaskID::FromBinary(std::string(buff, buff + TaskID::Size()));
}
#define ID_OSTREAM_OPERATOR(id_type) \
std::ostream &operator<<(std::ostream &os, const id_type &id) { \
if (id.is_nil()) { \
if (id.IsNil()) { \
os << "NIL_ID"; \
} else { \
os << id.hex(); \
os << id.Hex(); \
} \
return os; \
}
+59 -59
View File
@@ -31,26 +31,26 @@ template <typename T>
class BaseID {
public:
BaseID();
static T from_random();
static T from_binary(const std::string &binary);
static const T &nil();
static size_t size() { return T::size(); }
static T FromRandom();
static T FromBinary(const std::string &binary);
static const T &Nil();
static size_t Size() { return T::Size(); }
size_t hash() const;
bool is_nil() const;
size_t Hash() const;
bool IsNil() const;
bool operator==(const BaseID &rhs) const;
bool operator!=(const BaseID &rhs) const;
const uint8_t *data() const;
std::string binary() const;
std::string hex() const;
const uint8_t *Data() const;
std::string Binary() const;
std::string Hex() const;
protected:
BaseID(const std::string &binary) {
std::memcpy(const_cast<uint8_t *>(this->data()), binary.data(), T::size());
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), T::Size());
}
// All IDs are immutable for hash evaluations. mutable_data is only allow to use
// All IDs are immutable for hash evaluations. MutableData is only allow to use
// in construction time, so this function is protected.
uint8_t *mutable_data();
uint8_t *MutableData();
// For lazy evaluation, be careful to have one Id contained in another.
// This hash code will be duplicated.
mutable size_t hash_ = 0;
@@ -59,7 +59,7 @@ class BaseID {
class UniqueID : public BaseID<UniqueID> {
public:
UniqueID() : BaseID(){};
static size_t size() { return kUniqueIDSize; }
static size_t Size() { return kUniqueIDSize; }
protected:
UniqueID(const std::string &binary);
@@ -71,7 +71,7 @@ class UniqueID : public BaseID<UniqueID> {
class TaskID : public BaseID<TaskID> {
public:
TaskID() : BaseID() {}
static size_t size() { return kTaskIDSize; }
static size_t Size() { return kTaskIDSize; }
static TaskID GetDriverTaskID(const DriverID &driver_id);
private:
@@ -81,8 +81,8 @@ class TaskID : public BaseID<TaskID> {
class ObjectID : public BaseID<ObjectID> {
public:
ObjectID() : BaseID() {}
static size_t size() { return kUniqueIDSize; }
plasma::ObjectID to_plasma_id() const;
static size_t Size() { return kUniqueIDSize; }
plasma::ObjectID ToPlasmaId() const;
ObjectID(const plasma::UniqueID &from);
/// Get the index of this object in the task that created it.
@@ -90,26 +90,26 @@ class ObjectID : public BaseID<ObjectID> {
/// \return The index of object creation according to the task that created
/// this object. This is positive if the task returned the object and negative
/// if created by a put.
int32_t object_index() const { return index_; }
int32_t ObjectIndex() const { return index_; }
/// Compute the task ID of the task that created the object.
///
/// \return The task ID of the task that created this object.
TaskID task_id() const;
TaskID TaskId() const;
/// Compute the object ID of an object put by the task.
///
/// \param task_id The task ID of the task that created the object.
/// \param index What index of the object put in the task.
/// \return The computed object ID.
static ObjectID for_put(const TaskID &task_id, int64_t put_index);
static ObjectID ForPut(const TaskID &task_id, int64_t put_index);
/// Compute the object ID of an object returned by the task.
///
/// \param task_id The task ID of the task that created the object.
/// \param return_index What index of the object returned by in the task.
/// \return The computed object ID.
static ObjectID for_task_return(const TaskID &task_id, int64_t return_index);
static ObjectID ForTaskReturn(const TaskID &task_id, int64_t return_index);
private:
uint8_t id_[kTaskIDSize];
@@ -125,22 +125,22 @@ std::ostream &operator<<(std::ostream &os, const UniqueID &id);
std::ostream &operator<<(std::ostream &os, const TaskID &id);
std::ostream &operator<<(std::ostream &os, const ObjectID &id);
#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type from_random() { return type(UniqueID::from_random()); } \
static type from_binary(const std::string &binary) { return type(binary); } \
static type nil() { return type(UniqueID::nil()); } \
static size_t size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
std::memcpy(&id_, binary.data(), kUniqueIDSize); \
} \
#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type FromRandom() { return type(UniqueID::FromRandom()); } \
static type FromBinary(const std::string &binary) { return type(binary); } \
static type Nil() { return type(UniqueID::Nil()); } \
static size_t Size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
std::memcpy(&id_, binary.data(), kUniqueIDSize); \
} \
};
#include "id_def.h"
@@ -163,12 +163,12 @@ template <typename T>
BaseID<T>::BaseID() {
// Using const_cast to directly change data is dangerous. The cached
// hash may not be changed. This is used in construction time.
std::fill_n(this->mutable_data(), T::size(), 0xff);
std::fill_n(this->MutableData(), T::Size(), 0xff);
}
template <typename T>
T BaseID<T>::from_random() {
std::string data(T::size(), 0);
T BaseID<T>::FromRandom() {
std::string data(T::Size(), 0);
// NOTE(pcm): The right way to do this is to have one std::mt19937 per
// thread (using the thread_local keyword), but that's not supported on
// older versions of macOS (see https://stackoverflow.com/a/29929949)
@@ -176,44 +176,44 @@ T BaseID<T>::from_random() {
std::lock_guard<std::mutex> lock(random_engine_mutex);
static std::mt19937 generator = RandomlySeededMersenneTwister();
std::uniform_int_distribution<uint32_t> dist(0, std::numeric_limits<uint8_t>::max());
for (int i = 0; i < T::size(); i++) {
for (int i = 0; i < T::Size(); i++) {
data[i] = static_cast<uint8_t>(dist(generator));
}
return T::from_binary(data);
return T::FromBinary(data);
}
template <typename T>
T BaseID<T>::from_binary(const std::string &binary) {
T t = T::nil();
std::memcpy(t.mutable_data(), binary.data(), T::size());
T BaseID<T>::FromBinary(const std::string &binary) {
T t = T::Nil();
std::memcpy(t.MutableData(), binary.data(), T::Size());
return t;
}
template <typename T>
const T &BaseID<T>::nil() {
const T &BaseID<T>::Nil() {
static const T nil_id;
return nil_id;
}
template <typename T>
bool BaseID<T>::is_nil() const {
static T nil_id = T::nil();
bool BaseID<T>::IsNil() const {
static T nil_id = T::Nil();
return *this == nil_id;
}
template <typename T>
size_t BaseID<T>::hash() const {
size_t BaseID<T>::Hash() const {
// Note(ashione): hash code lazy calculation(it's invoked every time if hash code is
// default value 0)
if (!hash_) {
hash_ = MurmurHash64A(data(), T::size(), 0);
hash_ = MurmurHash64A(Data(), T::Size(), 0);
}
return hash_;
}
template <typename T>
bool BaseID<T>::operator==(const BaseID &rhs) const {
return std::memcmp(data(), rhs.data(), T::size()) == 0;
return std::memcmp(Data(), rhs.Data(), T::Size()) == 0;
}
template <typename T>
@@ -222,26 +222,26 @@ bool BaseID<T>::operator!=(const BaseID &rhs) const {
}
template <typename T>
uint8_t *BaseID<T>::mutable_data() {
uint8_t *BaseID<T>::MutableData() {
return reinterpret_cast<uint8_t *>(this) + sizeof(hash_);
}
template <typename T>
const uint8_t *BaseID<T>::data() const {
const uint8_t *BaseID<T>::Data() const {
return reinterpret_cast<const uint8_t *>(this) + sizeof(hash_);
}
template <typename T>
std::string BaseID<T>::binary() const {
return std::string(reinterpret_cast<const char *>(data()), T::size());
std::string BaseID<T>::Binary() const {
return std::string(reinterpret_cast<const char *>(Data()), T::Size());
}
template <typename T>
std::string BaseID<T>::hex() const {
std::string BaseID<T>::Hex() const {
constexpr char hex[] = "0123456789abcdef";
const uint8_t *id = data();
const uint8_t *id = Data();
std::string result;
for (int i = 0; i < T::size(); i++) {
for (int i = 0; i < T::Size(); i++) {
unsigned int val = id[i];
result.push_back(hex[val >> 4]);
result.push_back(hex[val & 0xf]);
@@ -256,11 +256,11 @@ namespace std {
#define DEFINE_UNIQUE_ID(type) \
template <> \
struct hash<::ray::type> { \
size_t operator()(const ::ray::type &id) const { return id.hash(); } \
size_t operator()(const ::ray::type &id) const { return id.Hash(); } \
}; \
template <> \
struct hash<const ::ray::type> { \
size_t operator()(const ::ray::type &id) const { return id.hash(); } \
size_t operator()(const ::ray::type &id) const { return id.Hash(); } \
};
DEFINE_UNIQUE_ID(UniqueID);
+7 -7
View File
@@ -43,7 +43,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
std::lock_guard<std::mutex> lock(pool_mutex_);
if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
plasma::ObjectID plasma_id = object_id.ToPlasmaId();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data == nullptr) {
RAY_LOG(ERROR) << "Failed to get object";
@@ -72,14 +72,14 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk
GetBufferState &buffer_state = get_buffer_state_[object_id];
buffer_state.references--;
if (buffer_state.references == 0) {
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId()));
get_buffer_state_.erase(object_id);
}
}
void ObjectBufferPool::AbortGet(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.ToPlasmaId()));
get_buffer_state_.erase(object_id);
}
@@ -88,7 +88,7 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_);
if (create_buffer_state_.count(object_id) == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
const plasma::ObjectID plasma_id = object_id.ToPlasmaId();
int64_t object_size = data_size - metadata_size;
// Try to create shared buffer.
std::shared_ptr<Buffer> data;
@@ -150,7 +150,7 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED;
create_buffer_state_[object_id].num_seals_remaining--;
if (create_buffer_state_[object_id].num_seals_remaining == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
const plasma::ObjectID plasma_id = object_id.ToPlasmaId();
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
create_buffer_state_.erase(object_id);
@@ -158,7 +158,7 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
}
void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
const plasma::ObjectID plasma_id = object_id.ToPlasmaId();
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Abort(plasma_id));
create_buffer_state_.erase(object_id);
@@ -186,7 +186,7 @@ void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
std::vector<plasma::ObjectID> plasma_ids;
plasma_ids.reserve(object_ids.size());
for (const auto &id : object_ids) {
plasma_ids.push_back(id.to_plasma_id());
plasma_ids.push_back(id.ToPlasmaId());
}
std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_ARROW_CHECK_OK(store_client_.Delete(plasma_ids));
+11 -11
View File
@@ -19,7 +19,7 @@ void UpdateObjectLocations(const GcsTableNotificationMode notification_mode,
// with GcsTableNotificationMode, we can determine whether the update mode is
// addition or deletion.
for (const auto &object_table_data : location_updates) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
ClientID client_id = ClientID::FromBinary(object_table_data.manager);
if (notification_mode != GcsTableNotificationMode::REMOVE) {
client_ids->insert(client_id);
} else {
@@ -71,7 +71,7 @@ void ObjectDirectory::RegisterBackend() {
}
};
RAY_CHECK_OK(gcs_client_->object_table().Subscribe(
DriverID::nil(), gcs_client_->client_table().GetLocalClientId(),
DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(),
object_notification_callback, nullptr));
}
@@ -81,10 +81,10 @@ ray::Status ObjectDirectory::ReportObjectAdded(
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->manager = client_id.Binary();
data->object_size = object_info.data_size;
ray::Status status =
gcs_client_->object_table().Add(DriverID::nil(), object_id, data, nullptr);
gcs_client_->object_table().Add(DriverID::Nil(), object_id, data, nullptr);
return status;
}
@@ -94,10 +94,10 @@ ray::Status ObjectDirectory::ReportObjectRemoved(
RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id;
// Append the eviction entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->manager = client_id.Binary();
data->object_size = object_info.data_size;
ray::Status status =
gcs_client_->object_table().Remove(DriverID::nil(), object_id, data, nullptr);
gcs_client_->object_table().Remove(DriverID::Nil(), object_id, data, nullptr);
return status;
};
@@ -105,8 +105,8 @@ void ObjectDirectory::LookupRemoteConnectionInfo(
RemoteConnectionInfo &connection_info) const {
ClientTableDataT client_data;
gcs_client_->client_table().GetClient(connection_info.client_id, client_data);
ClientID result_client_id = ClientID::from_binary(client_data.client_id);
if (!result_client_id.is_nil()) {
ClientID result_client_id = ClientID::FromBinary(client_data.client_id);
if (!result_client_id.IsNil()) {
RAY_CHECK(result_client_id == connection_info.client_id);
if (client_data.entry_type == EntryType::INSERTION) {
connection_info.ip = client_data.node_manager_address;
@@ -157,7 +157,7 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
if (it == listeners_.end()) {
it = listeners_.emplace(object_id, LocationListenerState()).first;
status = gcs_client_->object_table().RequestNotifications(
DriverID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
DriverID::Nil(), object_id, gcs_client_->client_table().GetLocalClientId());
}
auto &listener_state = it->second;
// TODO(hme): Make this fatal after implementing Pull suppression.
@@ -185,7 +185,7 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback
entry->second.callbacks.erase(callback_id);
if (entry->second.callbacks.empty()) {
status = gcs_client_->object_table().CancelNotifications(
DriverID::nil(), object_id, gcs_client_->client_table().GetLocalClientId());
DriverID::Nil(), object_id, gcs_client_->client_table().GetLocalClientId());
listeners_.erase(entry);
}
return status;
@@ -208,7 +208,7 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
// SubscribeObjectLocations call, so look up the object's locations
// directly from the GCS.
status = gcs_client_->object_table().Lookup(
DriverID::nil(), object_id,
DriverID::Nil(), object_id,
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_updates) {
// Build the set of current locations based on the entries in the log.
+12 -12
View File
@@ -64,7 +64,7 @@ void ObjectManager::StopIOService() {
void ObjectManager::HandleObjectAdded(
const object_manager::protocol::ObjectInfoT &object_info) {
// Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
ObjectID object_id = ObjectID::FromBinary(object_info.object_id);
RAY_LOG(DEBUG) << "Object added " << object_id;
RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id].object_info = object_info;
@@ -272,7 +272,7 @@ void ObjectManager::PullSendRequest(const ObjectID &object_id,
flatbuffers::FlatBufferBuilder fbb;
auto message = object_manager_protocol::CreatePullRequestMessage(
fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary()));
fbb, fbb.CreateString(client_id_.Binary()), fbb.CreateString(object_id.Binary()));
fbb.Finish(message);
conn->WriteMessageAsync(
static_cast<int64_t>(object_manager_protocol::MessageType::PullRequest),
@@ -315,7 +315,7 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id,
profile_event.end_time = end_time;
// Encode the object ID, client ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," +
profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," +
std::to_string(chunk_index) + ",\"" + status.ToString() +
"\"]";
profile_events_.push_back(profile_event);
@@ -335,7 +335,7 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
profile_event.end_time = end_time;
// Encode the object ID, client ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," +
profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," +
std::to_string(chunk_index) + ",\"" + status.ToString() +
"\"]";
profile_events_.push_back(profile_event);
@@ -408,7 +408,7 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
UniqueID push_id = UniqueID::from_random();
UniqueID push_id = UniqueID::FromRandom();
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info]() {
@@ -527,7 +527,7 @@ void ObjectManager::CancelPull(const ObjectID &object_id) {
ray::Status ObjectManager::Wait(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms, uint64_t num_required_objects,
bool wait_local, const WaitCallback &callback) {
UniqueID wait_id = UniqueID::from_random();
UniqueID wait_id = UniqueID::FromRandom();
RAY_LOG(DEBUG) << "Wait request " << wait_id << " on " << client_id_;
RAY_RETURN_NOT_OK(AddWaitRequest(wait_id, object_ids, timeout_ms, num_required_objects,
wait_local, callback));
@@ -773,7 +773,7 @@ void ObjectManager::ConnectClient(std::shared_ptr<TcpClientConnection> &conn,
// TODO: trash connection on failure.
auto info =
flatbuffers::GetRoot<object_manager_protocol::ConnectClientMessage>(message);
ClientID client_id = ClientID::from_binary(info->client_id()->str());
ClientID client_id = ClientID::FromBinary(info->client_id()->str());
bool is_transfer = info->is_transfer();
conn->SetClientID(client_id);
if (is_transfer) {
@@ -798,14 +798,14 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr<TcpClientConnection> &con
const uint8_t *message) {
// Serialize and push object to requesting client.
auto pr = flatbuffers::GetRoot<object_manager_protocol::PullRequestMessage>(message);
ObjectID object_id = ObjectID::from_binary(pr->object_id()->str());
ClientID client_id = ClientID::from_binary(pr->client_id()->str());
ObjectID object_id = ObjectID::FromBinary(pr->object_id()->str());
ClientID client_id = ClientID::FromBinary(pr->client_id()->str());
ProfileEventT profile_event;
profile_event.event_type = "receive_pull_request";
profile_event.start_time = current_sys_time_seconds();
profile_event.end_time = profile_event.start_time;
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"]";
profile_event.extra_data = "[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"]";
profile_events_.push_back(profile_event);
Push(object_id, client_id);
@@ -817,7 +817,7 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> &con
// Serialize.
auto object_header =
flatbuffers::GetRoot<object_manager_protocol::PushRequestMessage>(message);
const ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str());
const ObjectID object_id = ObjectID::FromBinary(object_header->object_id()->str());
uint64_t chunk_index = object_header->chunk_index();
uint64_t data_size = object_header->data_size();
uint64_t metadata_size = object_header->metadata_size();
@@ -941,7 +941,7 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector<ObjectID> &object_
ProfileTableDataT ObjectManager::GetAndResetProfilingInfo() {
ProfileTableDataT profile_info;
profile_info.component_type = "object_manager";
profile_info.component_id = client_id_.binary();
profile_info.component_id = client_id_.Binary();
for (auto const &profile_event : profile_events_) {
profile_info.profile_events.emplace_back(new ProfileEventT(profile_event));
+1 -1
View File
@@ -394,7 +394,7 @@ class ObjectManager : public ObjectManagerInterface {
/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
/// subscribe multiple times to the same object during Pull.
UniqueID object_directory_pull_callback_id_ = UniqueID::from_random();
UniqueID object_directory_pull_callback_id_ = UniqueID::FromRandom();
/// A set of active wait requests.
std::unordered_map<UniqueID, WaitState> active_wait_requests_;
@@ -121,8 +121,8 @@ class TestObjectManagerBase : public ::testing::Test {
flushall_redis();
// start store
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());
store_id_1 = StartStore(UniqueID::FromRandom().Hex());
store_id_2 = StartStore(UniqueID::FromRandom().Hex());
uint pull_timeout_ms = 1000;
int max_sends_a = 2;
@@ -174,14 +174,14 @@ class TestObjectManagerBase : public ::testing::Test {
}
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) {
ObjectID object_id = ObjectID::from_random();
ObjectID object_id = ObjectID::FromRandom();
RAY_LOG(DEBUG) << "ObjectID Created: " << object_id;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId()));
return object_id;
}
@@ -242,7 +242,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
client_id_2 = gcs_client_2->client_table().GetLocalClientId();
gcs_client_1->client_table().RegisterClientAddedCallback([this](
gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) {
ClientID parsed_id = ClientID::from_binary(data.client_id);
ClientID parsed_id = ClientID::FromBinary(data.client_id);
if (parsed_id == client_id_1 || parsed_id == client_id_2) {
num_connected_clients += 1;
}
@@ -262,7 +262,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
ray::Status status = ray::Status::OK();
status = server1->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
object_added_handler_1(ObjectID::from_binary(object_info.object_id));
object_added_handler_1(ObjectID::FromBinary(object_info.object_id));
if (v1.size() == num_expected_objects && v1.size() == v2.size()) {
TransferTestComplete();
}
@@ -270,7 +270,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
RAY_CHECK_OK(status);
status = server2->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
object_added_handler_2(ObjectID::from_binary(object_info.object_id));
object_added_handler_2(ObjectID::FromBinary(object_info.object_id));
if (v2.size() == num_expected_objects && v1.size() == v2.size()) {
TransferTestComplete();
}
@@ -290,7 +290,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
plasma::ObjectBuffer GetObject(plasma::PlasmaClient &client, ObjectID &object_id) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
plasma::ObjectID plasma_id = object_id.ToPlasmaId();
RAY_ARROW_CHECK_OK(client.Get(&plasma_id, 1, 0, &object_buffer));
return object_buffer;
}
@@ -298,7 +298,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
static unsigned char *GetDigest(plasma::PlasmaClient &client, ObjectID &object_id) {
const int64_t size = sizeof(uint64_t);
static unsigned char digest_1[size];
RAY_ARROW_CHECK_OK(client.Hash(object_id.to_plasma_id(), &digest_1[0]));
RAY_ARROW_CHECK_OK(client.Hash(object_id.ToPlasmaId(), &digest_1[0]));
return digest_1;
}
@@ -439,12 +439,12 @@ class StressTestObjectManager : public TestObjectManagerBase {
<< "\n";
ClientTableDataT data;
gcs_client_1->client_table().GetClient(client_id_1, data);
RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data.client_id) << "\n"
RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data.client_id) << "\n"
<< "ClientIp=" << data.node_manager_address << "\n"
<< "ClientPort=" << data.node_manager_port;
ClientTableDataT data2;
gcs_client_1->client_table().GetClient(client_id_2, data2);
RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data2.client_id) << "\n"
RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data2.client_id) << "\n"
<< "ClientIp=" << data2.node_manager_address << "\n"
<< "ClientPort=" << data2.node_manager_port;
}
@@ -114,8 +114,8 @@ class TestObjectManagerBase : public ::testing::Test {
flushall_redis();
// start store
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());
store_id_1 = StartStore(UniqueID::FromRandom().Hex());
store_id_2 = StartStore(UniqueID::FromRandom().Hex());
uint pull_timeout_ms = 1;
push_timeout_ms = 1000;
@@ -162,7 +162,7 @@ class TestObjectManagerBase : public ::testing::Test {
}
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) {
return WriteDataToClient(client, data_size, ObjectID::from_random());
return WriteDataToClient(client, data_size, ObjectID::FromRandom());
}
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size,
@@ -171,9 +171,9 @@ class TestObjectManagerBase : public ::testing::Test {
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId()));
return object_id;
}
@@ -221,7 +221,7 @@ class TestObjectManager : public TestObjectManagerBase {
client_id_2 = gcs_client_2->client_table().GetLocalClientId();
gcs_client_1->client_table().RegisterClientAddedCallback([this](
gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) {
ClientID parsed_id = ClientID::from_binary(data.client_id);
ClientID parsed_id = ClientID::FromBinary(data.client_id);
if (parsed_id == client_id_1 || parsed_id == client_id_2) {
num_connected_clients += 1;
}
@@ -240,13 +240,13 @@ class TestObjectManager : public TestObjectManagerBase {
ray::Status status = ray::Status::OK();
status = server1->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
object_added_handler_1(ObjectID::from_binary(object_info.object_id));
object_added_handler_1(ObjectID::FromBinary(object_info.object_id));
NotificationTestCompleteIfSatisfied();
});
RAY_CHECK_OK(status);
status = server2->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
object_added_handler_2(ObjectID::from_binary(object_info.object_id));
object_added_handler_2(ObjectID::FromBinary(object_info.object_id));
NotificationTestCompleteIfSatisfied();
});
RAY_CHECK_OK(status);
@@ -254,11 +254,11 @@ class TestObjectManager : public TestObjectManagerBase {
uint data_size = 1000000;
// dummy_id is not local. The push function will timeout.
ObjectID dummy_id = ObjectID::from_random();
ObjectID dummy_id = ObjectID::FromRandom();
server1->object_manager_.Push(dummy_id,
gcs_client_2->client_table().GetLocalClientId());
created_object_id1 = ObjectID::from_random();
created_object_id1 = ObjectID::FromRandom();
WriteDataToClient(client1, data_size, created_object_id1);
// Server1 holds Object1 so this Push call will success.
server1->object_manager_.Push(created_object_id1,
@@ -268,7 +268,7 @@ class TestObjectManager : public TestObjectManagerBase {
timer.reset(new boost::asio::deadline_timer(main_service));
auto period = boost::posix_time::milliseconds(push_timeout_ms + 10);
timer->expires_from_now(period);
created_object_id2 = ObjectID::from_random();
created_object_id2 = ObjectID::FromRandom();
timer->async_wait([this, data_size](const boost::system::error_code &error) {
WriteDataToClient(client2, data_size, created_object_id2);
});
@@ -288,7 +288,7 @@ class TestObjectManager : public TestObjectManagerBase {
// object.
ObjectID object_1 = WriteDataToClient(client2, data_size);
ObjectID object_2 = WriteDataToClient(client2, data_size);
UniqueID sub_id = ray::UniqueID::from_random();
UniqueID sub_id = ray::UniqueID::FromRandom();
RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations(
sub_id, object_1, [this, sub_id, object_1, object_2](
@@ -307,7 +307,7 @@ class TestObjectManager : public TestObjectManagerBase {
std::vector<ObjectID> object_ids = {object_1, object_2};
boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time();
UniqueID wait_id = UniqueID::from_random();
UniqueID wait_id = UniqueID::FromRandom();
RAY_CHECK_OK(server1->object_manager_.AddWaitRequest(
wait_id, object_ids, timeout_ms, required_objects, false,
@@ -378,7 +378,7 @@ class TestObjectManager : public TestObjectManagerBase {
}
if (include_nonexistent) {
num_objects += 1;
object_ids.push_back(ObjectID::from_random());
object_ids.push_back(ObjectID::FromRandom());
}
boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time();
RAY_CHECK_OK(server1->object_manager_.Wait(
@@ -457,17 +457,17 @@ class TestObjectManager : public TestObjectManagerBase {
<< "\n";
ClientTableDataT data;
gcs_client_1->client_table().GetClient(client_id_1, data);
RAY_LOG(DEBUG) << (ClientID::from_binary(data.client_id).is_nil());
RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::from_binary(data.client_id);
RAY_LOG(DEBUG) << (ClientID::FromBinary(data.client_id).IsNil());
RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::FromBinary(data.client_id);
RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address;
RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port;
ASSERT_EQ(client_id_1, ClientID::from_binary(data.client_id));
ASSERT_EQ(client_id_1, ClientID::FromBinary(data.client_id));
ClientTableDataT data2;
gcs_client_1->client_table().GetClient(client_id_2, data2);
RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::from_binary(data2.client_id);
RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::FromBinary(data2.client_id);
RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address;
RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port;
ASSERT_EQ(client_id_2, ClientID::from_binary(data2.client_id));
ASSERT_EQ(client_id_2, ClientID::FromBinary(data2.client_id));
}
};
+13 -13
View File
@@ -14,28 +14,28 @@ ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data)
ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data,
const ActorCheckpointDataT &checkpoint_data)
: actor_table_data_(actor_table_data),
execution_dependency_(ObjectID::from_binary(checkpoint_data.execution_dependency)) {
execution_dependency_(ObjectID::FromBinary(checkpoint_data.execution_dependency)) {
// Restore `frontier_`.
for (size_t i = 0; i < checkpoint_data.handle_ids.size(); i++) {
auto handle_id = ActorHandleID::from_binary(checkpoint_data.handle_ids[i]);
auto handle_id = ActorHandleID::FromBinary(checkpoint_data.handle_ids[i]);
auto &frontier_entry = frontier_[handle_id];
frontier_entry.task_counter = checkpoint_data.task_counters[i];
frontier_entry.execution_dependency =
ObjectID::from_binary(checkpoint_data.frontier_dependencies[i]);
ObjectID::FromBinary(checkpoint_data.frontier_dependencies[i]);
}
// Restore `dummy_objects_`.
for (size_t i = 0; i < checkpoint_data.unreleased_dummy_objects.size(); i++) {
auto dummy = ObjectID::from_binary(checkpoint_data.unreleased_dummy_objects[i]);
auto dummy = ObjectID::FromBinary(checkpoint_data.unreleased_dummy_objects[i]);
dummy_objects_[dummy] = checkpoint_data.num_dummy_object_dependencies[i];
}
}
const ClientID ActorRegistration::GetNodeManagerId() const {
return ClientID::from_binary(actor_table_data_.node_manager_id);
return ClientID::FromBinary(actor_table_data_.node_manager_id);
}
const ObjectID ActorRegistration::GetActorCreationDependency() const {
return ObjectID::from_binary(actor_table_data_.actor_creation_dummy_object_id);
return ObjectID::FromBinary(actor_table_data_.actor_creation_dummy_object_id);
}
const ObjectID ActorRegistration::GetExecutionDependency() const {
@@ -43,7 +43,7 @@ const ObjectID ActorRegistration::GetExecutionDependency() const {
}
const DriverID ActorRegistration::GetDriverId() const {
return DriverID::from_binary(actor_table_data_.driver_id);
return DriverID::FromBinary(actor_table_data_.driver_id);
}
const int64_t ActorRegistration::GetMaxReconstructions() const {
@@ -65,7 +65,7 @@ ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
// Release the reference to the previous cursor for this
// actor handle, if there was one.
ObjectID object_to_release;
if (!frontier_entry.execution_dependency.is_nil()) {
if (!frontier_entry.execution_dependency.IsNil()) {
auto it = dummy_objects_.find(frontier_entry.execution_dependency);
RAY_CHECK(it != dummy_objects_.end());
it->second--;
@@ -110,16 +110,16 @@ std::shared_ptr<ActorCheckpointDataT> ActorRegistration::GenerateCheckpointData(
// Use actor's current state to generate checkpoint data.
auto checkpoint_data = std::make_shared<ActorCheckpointDataT>();
checkpoint_data->actor_id = actor_id.binary();
checkpoint_data->execution_dependency = copy.GetExecutionDependency().binary();
checkpoint_data->actor_id = actor_id.Binary();
checkpoint_data->execution_dependency = copy.GetExecutionDependency().Binary();
for (const auto &frontier : copy.GetFrontier()) {
checkpoint_data->handle_ids.push_back(frontier.first.binary());
checkpoint_data->handle_ids.push_back(frontier.first.Binary());
checkpoint_data->task_counters.push_back(frontier.second.task_counter);
checkpoint_data->frontier_dependencies.push_back(
frontier.second.execution_dependency.binary());
frontier.second.execution_dependency.Binary());
}
for (const auto &entry : copy.GetDummyObjects()) {
checkpoint_data->unreleased_dummy_objects.push_back(entry.first.binary());
checkpoint_data->unreleased_dummy_objects.push_back(entry.first.Binary());
checkpoint_data->num_dummy_object_dependencies.push_back(entry.second);
}
return checkpoint_data;
+1 -1
View File
@@ -180,7 +180,7 @@ TEST_F(ClientConnectionTest, ProcessBadMessage) {
"reader", {}, error_message_type_);
// If client ID is set, bad message would crash the test.
// reader->SetClientID(UniqueID::from_random());
// reader->SetClientID(UniqueID::FromRandom());
// Intentionally write a message with incorrect cookie.
// Verify it won't crash as long as client ID is not set.
@@ -12,10 +12,10 @@ class UniqueIdFromJByteArray {
const ID &GetId() const { return id; }
UniqueIdFromJByteArray(JNIEnv *env, const jbyteArray &bytes) {
std::string id_str(ID::size(), 0);
env->GetByteArrayRegion(bytes, 0, ID::size(),
std::string id_str(ID::Size(), 0);
env->GetByteArrayRegion(bytes, 0, ID::Size(),
reinterpret_cast<jbyte *>(&id_str.front()));
id = ID::from_binary(id_str);
id = ID::FromBinary(id_str);
}
private:
@@ -231,12 +231,12 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeGenerateTaskId(
TaskID task_id =
ray::GenerateTaskId(driver_id.GetId(), parent_task_id.GetId(), parent_task_counter);
jbyteArray result = env->NewByteArray(task_id.size());
jbyteArray result = env->NewByteArray(task_id.Size());
if (nullptr == result) {
return nullptr;
}
env->SetByteArrayRegion(result, 0, task_id.size(),
reinterpret_cast<const jbyte *>(task_id.data()));
env->SetByteArrayRegion(result, 0, task_id.Size(),
reinterpret_cast<const jbyte *>(task_id.Data()));
return result;
}
@@ -280,9 +280,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativePrepareCheckpoint(JNIEnv *env
if (ThrowRayExceptionIfNotOK(env, status)) {
return nullptr;
}
jbyteArray result = env->NewByteArray(checkpoint_id.size());
env->SetByteArrayRegion(result, 0, checkpoint_id.size(),
reinterpret_cast<const jbyte *>(checkpoint_id.data()));
jbyteArray result = env->NewByteArray(checkpoint_id.Size());
env->SetByteArrayRegion(result, 0, checkpoint_id.Size(),
reinterpret_cast<const jbyte *>(checkpoint_id.Data()));
return result;
}
+4 -4
View File
@@ -48,7 +48,7 @@ void LineageEntry::ComputeParentTaskIds() {
parent_task_ids_.clear();
// A task's parents are the tasks that created its arguments.
for (const auto &dependency : task_.GetDependencies()) {
parent_task_ids_.insert(dependency.task_id());
parent_task_ids_.insert(dependency.TaskId());
}
}
@@ -296,7 +296,7 @@ bool LineageCache::RemoveWaitingTask(const TaskID &task_id) {
}
void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &node_id) {
RAY_CHECK(!node_id.is_nil());
RAY_CHECK(!node_id.IsNil());
lineage_.GetEntryMutable(task_id)->MarkExplicitlyForwarded(node_id);
}
@@ -374,7 +374,7 @@ bool LineageCache::SubscribeTask(const TaskID &task_id) {
if (unsubscribed) {
// Request notifications for the task if we haven't already requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.RequestNotifications(DriverID::nil(), task_id, client_id_));
RAY_CHECK_OK(task_pubsub_.RequestNotifications(DriverID::Nil(), task_id, client_id_));
}
// Return whether we were previously unsubscribed to this task and are now
// subscribed.
@@ -387,7 +387,7 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
if (subscribed) {
// Cancel notifications for the task if we previously requested
// notifications for it.
RAY_CHECK_OK(task_pubsub_.CancelNotifications(DriverID::nil(), task_id, client_id_));
RAY_CHECK_OK(task_pubsub_.CancelNotifications(DriverID::Nil(), task_id, client_id_));
subscribed_tasks_.erase(it);
}
// Return whether we were previously subscribed to this task and are now
+16 -16
View File
@@ -43,7 +43,7 @@ class MockGcs : public gcs::TableInterface<TaskID, protocol::Task>,
notification_callback_(client, task_id, data);
}
};
return Add(DriverID::nil(), task_id, task_data, callback);
return Add(DriverID::Nil(), task_id, task_data, callback);
}
Status RequestNotifications(const DriverID &driver_id, const TaskID &task_id,
@@ -91,7 +91,7 @@ class LineageCacheTest : public ::testing::Test {
LineageCacheTest()
: max_lineage_size_(10),
mock_gcs_(),
lineage_cache_(ClientID::from_random(), mock_gcs_, mock_gcs_, max_lineage_size_) {
lineage_cache_(ClientID::FromRandom(), mock_gcs_, mock_gcs_, max_lineage_size_) {
mock_gcs_.Subscribe([this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const ray::protocol::TaskT &data) {
lineage_cache_.HandleEntryCommitted(task_id);
@@ -113,7 +113,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
task_arguments.emplace_back(std::make_shared<TaskArgumentByReference>(references));
}
std::vector<std::string> function_descriptor(3);
auto spec = TaskSpecification(DriverID::nil(), TaskID::from_random(), 0, task_arguments,
auto spec = TaskSpecification(DriverID::Nil(), TaskID::FromRandom(), 0, task_arguments,
num_returns, required_resources, Language::PYTHON,
function_descriptor);
auto execution_spec = TaskExecutionSpecification(std::vector<ObjectID>());
@@ -160,7 +160,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) {
// Get the uncommitted lineage for the last task (the leaf) of one of the chains.
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_ids1.back(), ClientID::Nil());
// Check that the uncommitted lineage is exactly equal to the first chain of tasks.
ASSERT_EQ(task_ids1.size(), uncommitted_lineage.GetEntries().size());
for (auto &task_id : task_ids1) {
@@ -181,7 +181,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineageOrDie) {
// Get the uncommitted lineage for the inserted task.
uncommitted_lineage = lineage_cache_.GetUncommittedLineageOrDie(
combined_task_ids.back(), ClientID::nil());
combined_task_ids.back(), ClientID::Nil());
// Check that the uncommitted lineage is exactly equal to the entire set of
// tasks inserted so far.
ASSERT_EQ(combined_task_ids.size(), uncommitted_lineage.GetEntries().size());
@@ -200,8 +200,8 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
task_ids.push_back(task.GetTaskSpecification().TaskId());
}
auto node_id = ClientID::from_random();
auto node_id2 = ClientID::from_random();
auto node_id = ClientID::FromRandom();
auto node_id2 = ClientID::FromRandom();
auto forwarded_task_id = task_ids[task_ids.size() - 2];
auto remaining_task_id = task_ids[task_ids.size() - 1];
lineage_cache_.MarkTaskAsForwarded(forwarded_task_id, node_id);
@@ -285,7 +285,7 @@ TEST_F(LineageCacheTest, TestEvictChain) {
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
ClientID::Nil())
.GetEntries()
.size(),
tasks.size());
@@ -298,7 +298,7 @@ TEST_F(LineageCacheTest, TestEvictChain) {
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineageOrDie(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
ClientID::Nil())
.GetEntries()
.size(),
tasks.size());
@@ -335,7 +335,7 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineageOrDie(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
ClientID::Nil())
.GetEntries()
.size(),
total_tasks);
@@ -351,7 +351,7 @@ TEST_F(LineageCacheTest, TestEvictManyParents) {
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineageOrDie(
child_task.GetTaskSpecification().TaskId(), ClientID::nil())
child_task.GetTaskSpecification().TaskId(), ClientID::Nil())
.GetEntries()
.size(),
total_tasks);
@@ -376,7 +376,7 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) {
const auto task_id = it->GetTaskSpecification().TaskId();
// Simulate removing the task and forwarding it to another node.
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_id, ClientID::Nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
// Simulate receiving the task again. Make sure we can add the task back.
flatbuffers::FlatBufferBuilder fbb;
@@ -400,7 +400,7 @@ TEST_F(LineageCacheTest, TestForwardTask) {
tasks.erase(it);
auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(task_id_to_remove, ClientID::Nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove));
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), 3);
@@ -450,7 +450,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
// Simulate executing the first task on a remote node and adding it to the
@@ -484,7 +484,7 @@ TEST_F(LineageCacheTest, TestEviction) {
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil());
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
// The remaining task should have no uncommitted lineage.
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), 1);
@@ -510,7 +510,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::nil());
lineage_cache_.GetUncommittedLineageOrDie(last_task_id, ClientID::Nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
ASSERT_EQ(lineage_cache_.GetLineage().GetEntries().size(), lineage_size);
+4 -4
View File
@@ -35,7 +35,7 @@ void Monitor::Start() {
HandleHeartbeat(id, heartbeat_data);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(
DriverID::nil(), ClientID::nil(), heartbeat_callback, nullptr, nullptr));
DriverID::Nil(), ClientID::Nil(), heartbeat_callback, nullptr, nullptr));
Tick();
}
@@ -52,7 +52,7 @@ void Monitor::Tick() {
const std::vector<ClientTableDataT> &all_data) {
bool marked = false;
for (const auto &data : all_data) {
if (client_id.binary() == data.client_id &&
if (client_id.Binary() == data.client_id &&
data.entry_type == EntryType::DELETION) {
// The node has been marked dead by itself.
marked = true;
@@ -70,7 +70,7 @@ void Monitor::Tick() {
<< " has missed too many heartbeats from it.";
// We use the nil DriverID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_.error_table().PushErrorToDriver(
DriverID::nil(), type, error_message.str(), current_time_ms()));
DriverID::Nil(), type, error_message.str(), current_time_ms()));
}
};
RAY_CHECK_OK(gcs_client_.client_table().Lookup(lookup_callback));
@@ -89,7 +89,7 @@ void Monitor::Tick() {
batch->batch.push_back(std::unique_ptr<HeartbeatTableDataT>(
new HeartbeatTableDataT(heartbeat.second)));
}
RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(DriverID::nil(), ClientID::nil(),
RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(DriverID::Nil(), ClientID::Nil(),
batch, nullptr));
heartbeat_buffer_.clear();
}
+47 -47
View File
@@ -110,7 +110,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
ObjectID object_id = ObjectID::FromBinary(object_info.object_id);
HandleObjectLocal(object_id);
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
@@ -131,13 +131,13 @@ ray::Status NodeManager::RegisterGcs() {
lineage_cache_.HandleEntryCommitted(task_id);
};
RAY_RETURN_NOT_OK(gcs_client_->raylet_task_table().Subscribe(
DriverID::nil(), gcs_client_->client_table().GetLocalClientId(),
DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(),
task_committed_callback, nullptr, nullptr));
const auto task_lease_notification_callback = [this](gcs::AsyncGcsClient *client,
const TaskID &task_id,
const TaskLeaseDataT &task_lease) {
const ClientID node_manager_id = ClientID::from_binary(task_lease.node_manager_id);
const ClientID node_manager_id = ClientID::FromBinary(task_lease.node_manager_id);
if (gcs_client_->client_table().IsRemoved(node_manager_id)) {
// The node manager that added the task lease is already removed. The
// lease is considered inactive.
@@ -155,7 +155,7 @@ ray::Status NodeManager::RegisterGcs() {
reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0);
};
RAY_RETURN_NOT_OK(gcs_client_->task_lease_table().Subscribe(
DriverID::nil(), gcs_client_->client_table().GetLocalClientId(),
DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(),
task_lease_notification_callback, task_lease_empty_callback, nullptr));
// Register a callback to handle actor notifications.
@@ -170,7 +170,7 @@ ray::Status NodeManager::RegisterGcs() {
};
RAY_RETURN_NOT_OK(gcs_client_->actor_table().Subscribe(
DriverID::nil(), ClientID::nil(), actor_notification_callback, nullptr));
DriverID::Nil(), ClientID::Nil(), actor_notification_callback, nullptr));
// Register a callback on the client table for new clients.
auto node_manager_client_added = [this](gcs::AsyncGcsClient *client, const UniqueID &id,
@@ -208,7 +208,7 @@ ray::Status NodeManager::RegisterGcs() {
HeartbeatBatchAdded(heartbeat_batch);
};
RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe(
DriverID::nil(), ClientID::nil(), heartbeat_batch_added,
DriverID::Nil(), ClientID::Nil(), heartbeat_batch_added,
/*subscribe_callback=*/nullptr,
/*done_callback=*/nullptr));
@@ -219,7 +219,7 @@ ray::Status NodeManager::RegisterGcs() {
HandleDriverTableUpdate(client_id, driver_data);
};
RAY_RETURN_NOT_OK(gcs_client_->driver_table().Subscribe(
DriverID::nil(), ClientID::nil(), driver_table_handler, nullptr));
DriverID::Nil(), ClientID::Nil(), driver_table_handler, nullptr));
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
@@ -253,10 +253,10 @@ void NodeManager::KillWorker(std::shared_ptr<Worker> worker) {
void NodeManager::HandleDriverTableUpdate(
const DriverID &id, const std::vector<DriverTableDataT> &driver_data) {
for (const auto &entry : driver_data) {
RAY_LOG(DEBUG) << "HandleDriverTableUpdate " << UniqueID::from_binary(entry.driver_id)
RAY_LOG(DEBUG) << "HandleDriverTableUpdate " << UniqueID::FromBinary(entry.driver_id)
<< " " << entry.is_dead;
if (entry.is_dead) {
auto driver_id = DriverID::from_binary(entry.driver_id);
auto driver_id = DriverID::FromBinary(entry.driver_id);
auto workers = worker_pool_.GetWorkersRunningTasksForDriver(driver_id);
// Kill all the workers. The actual cleanup for these workers is done
@@ -291,7 +291,7 @@ void NodeManager::Heartbeat() {
auto heartbeat_data = std::make_shared<HeartbeatTableDataT>();
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
SchedulingResources &local_resources = cluster_resource_map_[my_client_id];
heartbeat_data->client_id = my_client_id.binary();
heartbeat_data->client_id = my_client_id.Binary();
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
for (const auto &resource_pair :
@@ -311,7 +311,7 @@ void NodeManager::Heartbeat() {
}
ray::Status status = heartbeat_table.Add(
DriverID::nil(), gcs_client_->client_table().GetLocalClientId(), heartbeat_data,
DriverID::Nil(), gcs_client_->client_table().GetLocalClientId(), heartbeat_data,
/*success_callback=*/nullptr);
RAY_CHECK_OK_PREPEND(status, "Heartbeat failed");
@@ -359,7 +359,7 @@ void NodeManager::GetObjectManagerProfileInfo() {
}
void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
const ClientID client_id = ClientID::from_binary(client_data.client_id);
const ClientID client_id = ClientID::FromBinary(client_data.client_id);
RAY_LOG(DEBUG) << "[ClientAdded] Received callback from client id " << client_id;
if (client_id == gcs_client_->client_table().GetLocalClientId()) {
@@ -393,7 +393,7 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
<< ". This may be since the node was recently removed.";
// We use the nil DriverID to broadcast the message to all drivers.
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
DriverID::nil(), type, error_message.str(), current_time_ms()));
DriverID::Nil(), type, error_message.str(), current_time_ms()));
return;
}
@@ -432,7 +432,7 @@ ray::Status NodeManager::ConnectRemoteNodeManager(const ClientID &client_id,
void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
// TODO(swang): If we receive a notification for our own death, clean up and
// exit immediately.
const ClientID client_id = ClientID::from_binary(client_data.client_id);
const ClientID client_id = ClientID::FromBinary(client_data.client_id);
RAY_LOG(DEBUG) << "[ClientRemoved] Received callback from client id " << client_id;
RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId())
@@ -478,7 +478,7 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
}
void NodeManager::ResourceCreateUpdated(const ClientTableDataT &client_data) {
const ClientID client_id = ClientID::from_binary(client_data.client_id);
const ClientID client_id = ClientID::FromBinary(client_data.client_id);
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from client id "
@@ -514,7 +514,7 @@ void NodeManager::ResourceCreateUpdated(const ClientTableDataT &client_data) {
}
void NodeManager::ResourceDeleted(const ClientTableDataT &client_data) {
const ClientID client_id = ClientID::from_binary(client_data.client_id);
const ClientID client_id = ClientID::FromBinary(client_data.client_id);
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
ResourceSet new_res_set(client_data.resources_total_label,
@@ -608,7 +608,7 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
// Update load information provided by each heartbeat.
for (const auto &heartbeat_data : heartbeat_batch.batch) {
const ClientID &client_id = ClientID::from_binary(heartbeat_data->client_id);
const ClientID &client_id = ClientID::FromBinary(heartbeat_data->client_id);
if (client_id == local_client_id) {
// Skip heartbeats from self.
continue;
@@ -638,12 +638,12 @@ void NodeManager::PublishActorStateTransition(
const ActorTableDataT &data) {
auto redis_context = client->primary_context();
if (data.state == ActorState::DEAD || data.state == ActorState::RECONSTRUCTING) {
std::vector<std::string> args = {"XADD", id.hex(), "*", "signal",
std::vector<std::string> args = {"XADD", id.Hex(), "*", "signal",
"ACTOR_DIED_SIGNAL"};
RAY_CHECK_OK(redis_context->RunArgvAsync(args));
}
};
RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(DriverID::nil(), actor_id,
RAY_CHECK_OK(gcs_client_->actor_table().AppendAt(DriverID::Nil(), actor_id,
actor_notification, success_callback,
failure_callback, log_length));
}
@@ -852,9 +852,9 @@ void NodeManager::ProcessClientMessage(
// Clean up their creating tasks from GCS.
std::vector<TaskID> creating_task_ids;
for (const auto &object_id : object_ids) {
creating_task_ids.push_back(object_id.task_id());
creating_task_ids.push_back(object_id.TaskId());
}
gcs_client_->raylet_task_table().Delete(DriverID::nil(), creating_task_ids);
gcs_client_->raylet_task_table().Delete(DriverID::Nil(), creating_task_ids);
}
} break;
case protocol::MessageType::PrepareActorCheckpointRequest: {
@@ -945,7 +945,7 @@ void NodeManager::ProcessGetTaskMessage(
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
RAY_CHECK(worker);
// If the worker was assigned a task, mark it as finished.
if (!worker->GetAssignedTaskId().is_nil()) {
if (!worker->GetAssignedTaskId().IsNil()) {
FinishAssignedTask(*worker);
}
// Return the worker to the idle pool.
@@ -1003,7 +1003,7 @@ void NodeManager::ProcessDisconnectClientMessage(
}
const ActorID &actor_id = worker->GetActorId();
if (!actor_id.is_nil()) {
if (!actor_id.IsNil()) {
// If the worker was an actor, update actor state, reconstruct the actor if needed,
// and clean up actor's tasks if the actor is permanently dead.
HandleDisconnectedActor(actor_id, true, intentional_disconnect);
@@ -1012,10 +1012,10 @@ void NodeManager::ProcessDisconnectClientMessage(
const TaskID &task_id = worker->GetAssignedTaskId();
// If the worker was running a task, clean up the task and push an error to
// the driver, unless the worker is already dead.
if (!task_id.is_nil() && !worker->IsDead()) {
if (!task_id.IsNil() && !worker->IsDead()) {
// If the worker was an actor, the task was already cleaned up in
// `HandleDisconnectedActor`.
if (actor_id.is_nil()) {
if (actor_id.IsNil()) {
const Task &task = local_queues_.RemoveTask(task_id);
TreatTaskAsFailed(task, ErrorType::WORKER_DIED);
}
@@ -1062,7 +1062,7 @@ void NodeManager::ProcessDisconnectClientMessage(
gcs_client_->driver_table().AppendDriverData(DriverID(client->GetClientId()),
/*is_dead=*/true));
auto driver_id = worker->GetAssignedTaskId();
RAY_CHECK(!driver_id.is_nil());
RAY_CHECK(!driver_id.IsNil());
local_queues_.RemoveDriverTaskId(driver_id);
worker_pool_.DisconnectDriver(worker);
@@ -1197,13 +1197,13 @@ void NodeManager::ProcessPrepareActorCheckpointRequest(
const auto task_id = worker->GetAssignedTaskId();
const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
// Generate checkpoint id and data.
ActorCheckpointID checkpoint_id = ActorCheckpointID::from_random();
ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom();
auto checkpoint_data =
actor_entry->second.GenerateCheckpointData(actor_entry->first, task);
// Write checkpoint data to GCS.
RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Add(
DriverID::nil(), checkpoint_id, checkpoint_data,
DriverID::Nil(), checkpoint_id, checkpoint_data,
[worker, actor_id, this](ray::gcs::AsyncGcsClient *client,
const ActorCheckpointID &checkpoint_id,
const ActorCheckpointDataT &data) {
@@ -1212,7 +1212,7 @@ void NodeManager::ProcessPrepareActorCheckpointRequest(
// Save this actor-to-checkpoint mapping, and remove old checkpoints associated
// with this actor.
RAY_CHECK_OK(gcs_client_->actor_checkpoint_id_table().AddCheckpointId(
DriverID::nil(), actor_id, checkpoint_id));
DriverID::Nil(), actor_id, checkpoint_id));
// Send reply to worker.
flatbuffers::FlatBufferBuilder fbb;
auto reply = ray::protocol::CreatePrepareActorCheckpointReply(
@@ -1293,7 +1293,7 @@ void NodeManager::ProcessSetResourceRequest(
ClientID client_id = from_flatbuf<ClientID>(*message->client_id());
// If the python arg was null, set client_id to the local client
if (client_id.is_nil()) {
if (client_id.IsNil()) {
client_id = gcs_client_->client_table().GetLocalClientId();
}
@@ -1331,7 +1331,7 @@ void NodeManager::ProcessSetResourceRequest(
auto data_shared_ptr = std::make_shared<ClientTableDataT>(data);
auto client_table = gcs_client_->client_table();
RAY_CHECK_OK(gcs_client_->client_table().Append(
DriverID::nil(), client_table.client_log_key_, data_shared_ptr, nullptr));
DriverID::Nil(), client_table.client_log_key_, data_shared_ptr, nullptr));
}
void NodeManager::ScheduleTasks(
@@ -1450,7 +1450,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
}
const std::string meta = std::to_string(static_cast<int>(error_type));
for (int64_t i = 0; i < num_returns; i++) {
const auto object_id = spec.ReturnId(i).to_plasma_id();
const auto object_id = spec.ReturnId(i).ToPlasmaId();
arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta);
if (!status.ok() && !status.IsPlasmaObjectExists()) {
// If we failed to save the error code, log a warning and push an error message
@@ -1605,7 +1605,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
HandleActorStateTransition(actor_id, ActorRegistration(data.back()));
}
};
RAY_CHECK_OK(gcs_client_->actor_table().Lookup(DriverID::nil(), spec.ActorId(),
RAY_CHECK_OK(gcs_client_->actor_table().Lookup(DriverID::Nil(), spec.ActorId(),
lookup_callback));
actor_creation_dummy_object = spec.ActorCreationDummyObjectId();
} else {
@@ -1796,7 +1796,7 @@ bool NodeManager::AssignTask(const Task &task) {
const std::string warning_message = worker_pool_.WarningAboutSize();
if (warning_message != "") {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
DriverID::nil(), "worker_pool_large", warning_message, current_time_ms()));
DriverID::Nil(), "worker_pool_large", warning_message, current_time_ms()));
}
}
// We couldn't assign this task, as no worker available.
@@ -1875,7 +1875,7 @@ bool NodeManager::AssignTask(const Task &task) {
// The execution dependency is initialized to the actor creation task's
// return value, and is subsequently updated to the assigned tasks'
// return values, so it should never be nil.
RAY_CHECK(!execution_dependency.is_nil());
RAY_CHECK(!execution_dependency.IsNil());
// Update the task's execution dependencies to reflect the actual
// execution order, to support deterministic reconstruction.
// NOTE(swang): The update of an actor task's execution dependencies is
@@ -1946,11 +1946,11 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
task_dependency_manager_.TaskCanceled(task_id);
// Unset the worker's assigned task.
worker.AssignTaskId(TaskID::nil());
worker.AssignTaskId(TaskID::Nil());
// Unset the worker's assigned driver Id if this is not an actor.
if (!task.GetTaskSpecification().IsActorCreationTask() &&
!task.GetTaskSpecification().IsActorTask()) {
worker.AssignDriverId(DriverID::nil());
worker.AssignDriverId(DriverID::Nil());
}
}
@@ -1966,10 +1966,10 @@ ActorTableDataT NodeManager::CreateActorTableDataFromCreationTask(const Task &ta
if (actor_entry == actor_registry_.end()) {
// Set all of the static fields for the actor. These fields will not
// change even if the actor fails or is reconstructed.
new_actor_data.actor_id = actor_id.binary();
new_actor_data.actor_id = actor_id.Binary();
new_actor_data.actor_creation_dummy_object_id =
task.GetTaskSpecification().ActorDummyObject().binary();
new_actor_data.driver_id = task.GetTaskSpecification().DriverId().binary();
task.GetTaskSpecification().ActorDummyObject().Binary();
new_actor_data.driver_id = task.GetTaskSpecification().DriverId().Binary();
new_actor_data.max_reconstructions =
task.GetTaskSpecification().MaxActorReconstructions();
// This is the first time that the actor has been created, so the number
@@ -1990,7 +1990,7 @@ ActorTableDataT NodeManager::CreateActorTableDataFromCreationTask(const Task &ta
// Set the new fields for the actor's state to indicate that the actor is
// now alive on this node manager.
new_actor_data.node_manager_id =
gcs_client_->client_table().GetLocalClientId().binary();
gcs_client_->client_table().GetLocalClientId().Binary();
new_actor_data.state = ActorState::ALIVE;
return new_actor_data;
}
@@ -2001,7 +2001,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
bool resumed_from_checkpoint = false;
if (task.GetTaskSpecification().IsActorCreationTask()) {
actor_id = task.GetTaskSpecification().ActorCreationId();
actor_handle_id = ActorHandleID::nil();
actor_handle_id = ActorHandleID::Nil();
if (checkpoint_id_to_restore_.count(actor_id) > 0) {
resumed_from_checkpoint = true;
}
@@ -2024,7 +2024,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor "
<< actor_id;
RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup(
DriverID::nil(), checkpoint_id,
DriverID::Nil(), checkpoint_id,
[this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client,
const UniqueID &checkpoint_id,
const ActorCheckpointDataT &checkpoint_data) {
@@ -2074,7 +2074,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
const auto dummy_object = task.GetTaskSpecification().ActorDummyObject();
const ObjectID object_to_release =
actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object);
if (!object_to_release.is_nil()) {
if (!object_to_release.IsNil()) {
// If there were no new actor handles created, then no other actor task
// will depend on this execution dependency, so it safe to release.
HandleObjectMissing(object_to_release);
@@ -2094,7 +2094,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
// Retrieve the task spec in order to re-execute the task.
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
DriverID::nil(), task_id,
DriverID::Nil(), task_id,
/*success_callback=*/
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const ray::protocol::TaskT &task_data) {
@@ -2380,7 +2380,7 @@ std::string NodeManager::DebugString() const {
result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString();
result << "\nClusterResources:";
for (auto &pair : cluster_resource_map_) {
result << "\n" << pair.first.hex() << ": " << pair.second.DebugString();
result << "\n" << pair.first.Hex() << ": " << pair.second.DebugString();
}
result << "\n" << object_manager_.DebugString();
result << "\n" << gcs_client_->DebugString();
@@ -2399,7 +2399,7 @@ std::string NodeManager::DebugString() const {
result << "\nRemoteConnections:";
for (auto &pair : remote_server_connections_) {
result << "\n" << pair.first.hex() << ": " << pair.second->DebugString();
result << "\n" << pair.first.Hex() << ": " << pair.second->DebugString();
}
result << "\nDebugString() time ms: " << (current_time_ms() - now_ms);
return result.str();
@@ -99,14 +99,14 @@ class TestObjectManagerBase : public ::testing::Test {
}
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) {
ObjectID object_id = ObjectID::from_random();
ObjectID object_id = ObjectID::FromRandom();
RAY_LOG(DEBUG) << "ObjectID Created: " << object_id;
uint8_t metadata[] = {5};
int64_t metadata_size = sizeof(metadata);
std::shared_ptr<Buffer> data;
RAY_ARROW_CHECK_OK(client.Create(object_id.to_plasma_id(), data_size, metadata,
metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.to_plasma_id()));
RAY_ARROW_CHECK_OK(
client.Create(object_id.ToPlasmaId(), data_size, metadata, metadata_size, &data));
RAY_ARROW_CHECK_OK(client.Seal(object_id.ToPlasmaId()));
return object_id;
}
@@ -138,7 +138,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
client_id_2 = gcs_client_2->client_table().GetLocalClientId();
gcs_client_1->client_table().RegisterClientAddedCallback([this](
gcs::AsyncGcsClient *client, const ClientID &id, const ClientTableDataT &data) {
ClientID parsed_id = ClientID::from_binary(data.client_id);
ClientID parsed_id = ClientID::FromBinary(data.client_id);
if (parsed_id == client_id_1 || parsed_id == client_id_2) {
num_connected_clients += 1;
}
@@ -158,7 +158,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
ray::Status status = ray::Status::OK();
status = server1->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
v1.push_back(ObjectID::from_binary(object_info.object_id));
v1.push_back(ObjectID::FromBinary(object_info.object_id));
if (v1.size() == num_expected_objects && v1.size() == v2.size()) {
TestPushComplete();
}
@@ -166,7 +166,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
RAY_CHECK_OK(status);
status = server2->object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
v2.push_back(ObjectID::from_binary(object_info.object_id));
v2.push_back(ObjectID::FromBinary(object_info.object_id));
if (v2.size() == num_expected_objects && v1.size() == v2.size()) {
TestPushComplete();
}
@@ -208,13 +208,13 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
<< "\n";
ClientTableDataT data;
gcs_client_2->client_table().GetClient(client_id_1, data);
RAY_LOG(INFO) << (ClientID::from_binary(data.client_id).is_nil());
RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data.client_id);
RAY_LOG(INFO) << (ClientID::FromBinary(data.client_id).IsNil());
RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data.client_id);
RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address;
RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port;
ClientTableDataT data2;
gcs_client_1->client_table().GetClient(client_id_2, data2);
RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data2.client_id);
RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data2.client_id);
RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address;
RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port;
}
+3 -3
View File
@@ -312,12 +312,12 @@ ray::Status RayletClient::Wait(const std::vector<ObjectID> &object_ids, int num_
auto reply_message = flatbuffers::GetRoot<ray::protocol::WaitReply>(reply.get());
auto found = reply_message->found();
for (uint i = 0; i < found->size(); i++) {
ObjectID object_id = ObjectID::from_binary(found->Get(i)->str());
ObjectID object_id = ObjectID::FromBinary(found->Get(i)->str());
result->first.push_back(object_id);
}
auto remaining = reply_message->remaining();
for (uint i = 0; i < remaining->size(); i++) {
ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str());
ObjectID object_id = ObjectID::FromBinary(remaining->Get(i)->str());
result->second.push_back(object_id);
}
return ray::Status::OK();
@@ -373,7 +373,7 @@ ray::Status RayletClient::PrepareActorCheckpoint(const ActorID &actor_id,
if (!status.ok()) return status;
auto reply_message =
flatbuffers::GetRoot<ray::protocol::PrepareActorCheckpointReply>(reply.get());
checkpoint_id = ActorCheckpointID::from_binary(reply_message->checkpoint_id()->str());
checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str());
return ray::Status::OK();
}
+6 -6
View File
@@ -52,7 +52,7 @@ void ReconstructionPolicy::SetTaskTimeout(
// required by the task are no longer needed soon after. If the
// task is still required after this initial period, then we now
// subscribe to task lease notifications.
RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(DriverID::nil(), task_id,
RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(DriverID::Nil(), task_id,
client_id_));
it->second.subscribed = true;
}
@@ -108,9 +108,9 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
// an entry for this reconstruction.
auto reconstruction_entry = std::make_shared<TaskReconstructionDataT>();
reconstruction_entry->num_reconstructions = reconstruction_attempt;
reconstruction_entry->node_manager_id = client_id_.binary();
reconstruction_entry->node_manager_id = client_id_.Binary();
RAY_CHECK_OK(task_reconstruction_log_.AppendAt(
DriverID::nil(), task_id, reconstruction_entry,
DriverID::Nil(), task_id, reconstruction_entry,
/*success_callback=*/
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionDataT &data) {
@@ -171,7 +171,7 @@ void ReconstructionPolicy::HandleTaskLeaseNotification(const TaskID &task_id,
}
void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) {
TaskID task_id = object_id.task_id();
TaskID task_id = object_id.TaskId();
auto it = listening_tasks_.find(task_id);
// Add this object to the list of objects created by the same task.
if (it == listening_tasks_.end()) {
@@ -185,7 +185,7 @@ void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id)
}
void ReconstructionPolicy::Cancel(const ObjectID &object_id) {
TaskID task_id = object_id.task_id();
TaskID task_id = object_id.TaskId();
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
// We already stopped listening for this task.
@@ -199,7 +199,7 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) {
// Cancel notifications for the task lease if we were subscribed to them.
if (it->second.subscribed) {
RAY_CHECK_OK(
task_lease_pubsub_.CancelNotifications(DriverID::nil(), task_id, client_id_));
task_lease_pubsub_.CancelNotifications(DriverID::Nil(), task_id, client_id_));
}
listening_tasks_.erase(it);
}
+26 -26
View File
@@ -154,7 +154,7 @@ class ReconstructionPolicyTest : public ::testing::Test {
reconstruction_policy_(std::make_shared<ReconstructionPolicy>(
io_service_,
[this](const TaskID &task_id) { TriggerReconstruction(task_id); },
reconstruction_timeout_ms_, ClientID::from_random(), mock_gcs_,
reconstruction_timeout_ms_, ClientID::FromRandom(), mock_gcs_,
mock_object_directory_, mock_gcs_)),
timer_canceled_(false) {
mock_gcs_.Subscribe(
@@ -223,8 +223,8 @@ class ReconstructionPolicyTest : public ::testing::Test {
};
TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
@@ -241,9 +241,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) {
}
TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
mock_object_directory_->SetObjectLocations(object_id, {ClientID::from_random()});
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
mock_object_directory_->SetObjectLocations(object_id, {ClientID::FromRandom()});
// Listen for both objects.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
@@ -264,9 +264,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) {
}
TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
ClientID client_id = ClientID::from_random();
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
ClientID client_id = ClientID::FromRandom();
mock_object_directory_->SetObjectLocations(object_id, {client_id});
// Listen for both objects.
@@ -288,9 +288,9 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) {
TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) {
// Create two object IDs produced by the same task.
TaskID task_id = TaskID::from_random();
ObjectID object_id1 = ObjectID::for_task_return(task_id, 1);
ObjectID object_id2 = ObjectID::for_task_return(task_id, 2);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id1 = ObjectID::ForTaskReturn(task_id, 1);
ObjectID object_id2 = ObjectID::ForTaskReturn(task_id, 2);
// Listen for both objects.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id1);
@@ -308,17 +308,17 @@ TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) {
}
TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
// Run the test for much longer than the reconstruction timeout.
int64_t test_period = 2 * reconstruction_timeout_ms_;
// Acquire the task lease for a period longer than the test period.
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = ClientID::from_random().binary();
task_lease_data->node_manager_id = ClientID::FromRandom().Binary();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = 2 * test_period;
mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data);
mock_gcs_.Add(DriverID::Nil(), task_id, task_lease_data);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
@@ -334,18 +334,18 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) {
}
TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Send the reconstruction manager heartbeats about the object.
SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() {
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = ClientID::from_random().binary();
task_lease_data->node_manager_id = ClientID::FromRandom().Binary();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = reconstruction_timeout_ms_;
mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data);
mock_gcs_.Add(DriverID::Nil(), task_id, task_lease_data);
});
// Run the test for much longer than the reconstruction timeout.
Run(reconstruction_timeout_ms_ * 2);
@@ -361,8 +361,8 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) {
}
TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
@@ -387,17 +387,17 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) {
}
TEST_F(ReconstructionPolicyTest, TestSimultaneousReconstructionSuppressed) {
TaskID task_id = TaskID::from_random();
ObjectID object_id = ObjectID::for_task_return(task_id, 1);
TaskID task_id = TaskID::FromRandom();
ObjectID object_id = ObjectID::ForTaskReturn(task_id, 1);
// Log a reconstruction attempt to simulate a different node attempting the
// reconstruction first. This should suppress this node's first attempt at
// reconstruction.
auto task_reconstruction_data = std::make_shared<TaskReconstructionDataT>();
task_reconstruction_data->node_manager_id = ClientID::from_random().binary();
task_reconstruction_data->node_manager_id = ClientID::FromRandom().Binary();
task_reconstruction_data->num_reconstructions = 0;
RAY_CHECK_OK(
mock_gcs_.AppendAt(DriverID::nil(), task_id, task_reconstruction_data, nullptr,
mock_gcs_.AppendAt(DriverID::Nil(), task_id, task_reconstruction_data, nullptr,
/*failure_callback=*/
[](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionDataT &data) { ASSERT_TRUE(false); },
+8 -8
View File
@@ -24,7 +24,7 @@ bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const {
}
bool TaskDependencyManager::CheckObjectRequired(const ObjectID &object_id) const {
const TaskID task_id = object_id.task_id();
const TaskID task_id = object_id.TaskId();
auto task_entry = required_tasks_.find(task_id);
// If there are no subscribed tasks that are dependent on the object, then do
// nothing.
@@ -82,7 +82,7 @@ std::vector<TaskID> TaskDependencyManager::HandleObjectLocal(
// Find any tasks that are dependent on the newly available object.
std::vector<TaskID> ready_task_ids;
auto creating_task_entry = required_tasks_.find(object_id.task_id());
auto creating_task_entry = required_tasks_.find(object_id.TaskId());
if (creating_task_entry != required_tasks_.end()) {
auto object_entry = creating_task_entry->second.find(object_id);
if (object_entry != creating_task_entry->second.end()) {
@@ -113,7 +113,7 @@ std::vector<TaskID> TaskDependencyManager::HandleObjectMissing(
// Find any tasks that are dependent on the missing object.
std::vector<TaskID> waiting_task_ids;
TaskID creating_task_id = object_id.task_id();
TaskID creating_task_id = object_id.TaskId();
auto creating_task_entry = required_tasks_.find(creating_task_id);
if (creating_task_entry != required_tasks_.end()) {
auto object_entry = creating_task_entry->second.find(object_id);
@@ -149,7 +149,7 @@ bool TaskDependencyManager::SubscribeDependencies(
auto inserted = task_entry.object_dependencies.insert(object_id);
if (inserted.second) {
// Get the ID of the task that creates the dependency.
TaskID creating_task_id = object_id.task_id();
TaskID creating_task_id = object_id.TaskId();
// Determine whether the dependency can be fulfilled by the local node.
if (local_objects_.count(object_id) == 0) {
// The object is not local.
@@ -186,7 +186,7 @@ bool TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) {
// Remove the task from the list of tasks that are dependent on this
// object.
// Get the ID of the task that creates the dependency.
TaskID creating_task_id = object_id.task_id();
TaskID creating_task_id = object_id.TaskId();
auto creating_task_entry = required_tasks_.find(creating_task_id);
std::vector<TaskID> &dependent_tasks = creating_task_entry->second[object_id];
auto it = std::find(dependent_tasks.begin(), dependent_tasks.end(), task_id);
@@ -262,10 +262,10 @@ void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) {
}
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = client_id_.hex();
task_lease_data->node_manager_id = client_id_.Hex();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = it->second.lease_period;
RAY_CHECK_OK(task_lease_table_.Add(DriverID::nil(), task_id, task_lease_data, nullptr));
RAY_CHECK_OK(task_lease_table_.Add(DriverID::Nil(), task_id, task_lease_data, nullptr));
auto period = boost::posix_time::milliseconds(it->second.lease_period / 2);
it->second.lease_timer->expires_from_now(period);
@@ -324,7 +324,7 @@ void TaskDependencyManager::RemoveTasksAndRelatedObjects(
// Cancel all of the objects that were required by the removed tasks.
for (const auto &object_id : required_objects) {
TaskID creating_task_id = object_id.task_id();
TaskID creating_task_id = object_id.TaskId();
required_tasks_.erase(creating_task_id);
HandleRemoteDependencyCanceled(object_id);
}
+11 -11
View File
@@ -43,7 +43,7 @@ class TaskDependencyManagerTest : public ::testing::Test {
gcs_mock_(),
initial_lease_period_ms_(100),
task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_,
io_service_, ClientID::nil(), initial_lease_period_ms_,
io_service_, ClientID::Nil(), initial_lease_period_ms_,
gcs_mock_) {}
void Run(uint64_t timeout_ms) {
@@ -75,7 +75,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
task_arguments.emplace_back(std::make_shared<TaskArgumentByReference>(references));
}
std::vector<std::string> function_descriptor(3);
auto spec = TaskSpecification(DriverID::nil(), TaskID::from_random(), 0, task_arguments,
auto spec = TaskSpecification(DriverID::Nil(), TaskID::FromRandom(), 0, task_arguments,
num_returns, required_resources, Language::PYTHON,
function_descriptor);
auto execution_spec = TaskExecutionSpecification(std::vector<ObjectID>());
@@ -105,9 +105,9 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) {
int num_arguments = 3;
std::vector<ObjectID> arguments;
for (int i = 0; i < num_arguments; i++) {
arguments.push_back(ObjectID::from_random());
arguments.push_back(ObjectID::FromRandom());
}
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
// No objects have been registered in the task dependency manager, so all
// arguments should be remote.
for (const auto &argument_id : arguments) {
@@ -139,12 +139,12 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) {
TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) {
// Create a task with 3 arguments.
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
int num_arguments = 3;
std::vector<ObjectID> arguments;
for (int i = 0; i < num_arguments; i++) {
// Add the new argument to the list of dependencies to subscribe to.
ObjectID argument_id = ObjectID::from_random();
ObjectID argument_id = ObjectID::FromRandom();
arguments.push_back(argument_id);
// Subscribe to the task's dependencies. All arguments except the last are
// duplicates of previous subscription calls. Each argument should only be
@@ -176,7 +176,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) {
TEST_F(TaskDependencyManagerTest, TestMultipleTasks) {
// Create 3 tasks that are dependent on the same object.
ObjectID argument_id = ObjectID::from_random();
ObjectID argument_id = ObjectID::FromRandom();
std::vector<TaskID> dependent_tasks;
int num_dependent_tasks = 3;
// The object should only be requested from the object manager once for all
@@ -184,7 +184,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) {
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
for (int i = 0; i < num_dependent_tasks; i++) {
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
dependent_tasks.push_back(task_id);
// Subscribe to each of the task's dependencies.
bool ready = task_dependency_manager_.SubscribeDependencies(task_id, {argument_id});
@@ -266,7 +266,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) {
TEST_F(TaskDependencyManagerTest, TestDependentPut) {
// Create a task with 3 arguments.
auto task1 = ExampleTask({}, 0);
ObjectID put_id = ObjectID::for_put(task1.GetTaskSpecification().TaskId(), 1);
ObjectID put_id = ObjectID::ForPut(task1.GetTaskSpecification().TaskId(), 1);
auto task2 = ExampleTask({put_id}, 0);
// No objects have been registered in the task dependency manager, so the put
@@ -326,9 +326,9 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
int num_arguments = 3;
std::vector<ObjectID> arguments;
for (int i = 0; i < num_arguments; i++) {
arguments.push_back(ObjectID::from_random());
arguments.push_back(ObjectID::FromRandom());
}
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
// No objects have been registered in the task dependency manager, so all
// arguments should be remote.
for (const auto &argument_id : arguments) {
+2 -2
View File
@@ -25,7 +25,7 @@ TaskExecutionSpecification::ToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb) co
std::vector<ObjectID> TaskExecutionSpecification::ExecutionDependencies() const {
std::vector<ObjectID> dependencies;
for (const auto &dependency : execution_spec_.dependencies) {
dependencies.push_back(ObjectID::from_binary(dependency));
dependencies.push_back(ObjectID::FromBinary(dependency));
}
return dependencies;
}
@@ -34,7 +34,7 @@ void TaskExecutionSpecification::SetExecutionDependencies(
const std::vector<ObjectID> &dependencies) {
execution_spec_.dependencies.clear();
for (const auto &dependency : dependencies) {
execution_spec_.dependencies.push_back(dependency.binary());
execution_spec_.dependencies.push_back(dependency.Binary());
}
}
+5 -8
View File
@@ -65,8 +65,8 @@ TaskSpecification::TaskSpecification(
const std::vector<std::shared_ptr<TaskArgument>> &task_arguments, int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const Language &language, const std::vector<std::string> &function_descriptor)
: TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::nil(),
ObjectID::nil(), 0, ActorID::nil(), ActorHandleID::nil(), -1, {},
: TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::Nil(),
ObjectID::Nil(), 0, ActorID::Nil(), ActorHandleID::Nil(), -1, {},
task_arguments, num_returns, required_resources,
std::unordered_map<std::string, double>(), language,
function_descriptor) {}
@@ -165,8 +165,7 @@ int64_t TaskSpecification::NumReturns() const {
}
ObjectID TaskSpecification::ReturnId(int64_t return_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return ObjectID::for_task_return(TaskId(), return_index + 1);
return ObjectID::ForTaskReturn(TaskId(), return_index + 1);
}
bool TaskSpecification::ArgByRef(int64_t arg_index) const {
@@ -215,11 +214,9 @@ Language TaskSpecification::GetLanguage() const {
return message->language();
}
bool TaskSpecification::IsActorCreationTask() const {
return !ActorCreationId().is_nil();
}
bool TaskSpecification::IsActorCreationTask() const { return !ActorCreationId().IsNil(); }
bool TaskSpecification::IsActorTask() const { return !ActorId().is_nil(); }
bool TaskSpecification::IsActorTask() const { return !ActorId().IsNil(); }
ActorID TaskSpecification::ActorCreationId() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
+28 -28
View File
@@ -10,21 +10,21 @@ namespace raylet {
void TestTaskReturnId(const TaskID &task_id, int64_t return_index) {
// Round trip test for computing the object ID for a task's return value,
// then computing the task ID that created the object.
ObjectID return_id = ObjectID::for_task_return(task_id, return_index);
ASSERT_EQ(return_id.task_id(), task_id);
ASSERT_EQ(return_id.object_index(), return_index);
ObjectID return_id = ObjectID::ForTaskReturn(task_id, return_index);
ASSERT_EQ(return_id.TaskId(), task_id);
ASSERT_EQ(return_id.ObjectIndex(), return_index);
}
void TestTaskPutId(const TaskID &task_id, int64_t put_index) {
// Round trip test for computing the object ID for a task's put value, then
// computing the task ID that created the object.
ObjectID put_id = ObjectID::for_put(task_id, put_index);
ASSERT_EQ(put_id.task_id(), task_id);
ASSERT_EQ(put_id.object_index(), -1 * put_index);
ObjectID put_id = ObjectID::ForPut(task_id, put_index);
ASSERT_EQ(put_id.TaskId(), task_id);
ASSERT_EQ(put_id.ObjectIndex(), -1 * put_index);
}
TEST(TaskSpecTest, TestTaskReturnIds) {
TaskID task_id = TaskID::from_random();
TaskID task_id = TaskID::FromRandom();
// Check that we can compute between a task ID and the object IDs of its
// return values and puts.
@@ -37,25 +37,25 @@ TEST(TaskSpecTest, TestTaskReturnIds) {
}
TEST(IdPropertyTest, TestIdProperty) {
TaskID task_id = TaskID::from_random();
ASSERT_EQ(task_id, TaskID::from_binary(task_id.binary()));
ObjectID object_id = ObjectID::from_random();
ASSERT_EQ(object_id, ObjectID::from_binary(object_id.binary()));
TaskID task_id = TaskID::FromRandom();
ASSERT_EQ(task_id, TaskID::FromBinary(task_id.Binary()));
ObjectID object_id = ObjectID::FromRandom();
ASSERT_EQ(object_id, ObjectID::FromBinary(object_id.Binary()));
ASSERT_TRUE(TaskID().is_nil());
ASSERT_TRUE(TaskID::nil().is_nil());
ASSERT_TRUE(ObjectID().is_nil());
ASSERT_TRUE(ObjectID::nil().is_nil());
ASSERT_TRUE(TaskID().IsNil());
ASSERT_TRUE(TaskID::Nil().IsNil());
ASSERT_TRUE(ObjectID().IsNil());
ASSERT_TRUE(ObjectID::Nil().IsNil());
}
TEST(TaskSpecTest, TaskInfoSize) {
std::vector<ObjectID> references = {ObjectID::from_random(), ObjectID::from_random()};
std::vector<ObjectID> references = {ObjectID::FromRandom(), ObjectID::FromRandom()};
auto arguments_1 = std::make_shared<TaskArgumentByReference>(references);
std::string one_arg("This is an value argument.");
auto arguments_2 = std::make_shared<TaskArgumentByValue>(
reinterpret_cast<const uint8_t *>(one_arg.c_str()), one_arg.size());
std::vector<std::shared_ptr<TaskArgument>> task_arguments({arguments_1, arguments_2});
auto task_id = TaskID::from_random();
auto task_id = TaskID::FromRandom();
{
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<Arg>> arguments;
@@ -64,10 +64,10 @@ TEST(TaskSpecTest, TaskInfoSize) {
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ObjectID::nil()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ActorHandleID::nil()), 0,
fbb, to_flatbuf(fbb, DriverID::FromRandom()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::FromRandom()), 0, to_flatbuf(fbb, ActorID::Nil()),
to_flatbuf(fbb, ObjectID::Nil()), 0, to_flatbuf(fbb, ActorID::Nil()),
to_flatbuf(fbb, ActorHandleID::Nil()), 0,
ids_to_flatbuf(fbb, std::vector<ObjectID>()), fbb.CreateVector(arguments), 1,
map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
@@ -83,13 +83,13 @@ TEST(TaskSpecTest, TaskInfoSize) {
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 10,
to_flatbuf(fbb, ActorID::from_random()), to_flatbuf(fbb, ObjectID::from_random()),
10000000, to_flatbuf(fbb, ActorID::from_random()),
to_flatbuf(fbb, ActorHandleID::from_random()), 20,
ids_to_flatbuf(fbb, std::vector<ObjectID>(
{ObjectID::from_random(), ObjectID::from_random()})),
fbb, to_flatbuf(fbb, DriverID::FromRandom()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::FromRandom()), 10, to_flatbuf(fbb, ActorID::FromRandom()),
to_flatbuf(fbb, ObjectID::FromRandom()), 10000000,
to_flatbuf(fbb, ActorID::FromRandom()),
to_flatbuf(fbb, ActorHandleID::FromRandom()), 20,
ids_to_flatbuf(
fbb, std::vector<ObjectID>({ObjectID::FromRandom(), ObjectID::FromRandom()})),
fbb.CreateVector(arguments), 2, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}),
Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
+2 -2
View File
@@ -57,9 +57,9 @@ void Worker::AssignDriverId(const DriverID &driver_id) {
const DriverID &Worker::GetAssignedDriverId() const { return assigned_driver_id_; }
void Worker::AssignActorId(const ActorID &actor_id) {
RAY_CHECK(actor_id_.is_nil())
RAY_CHECK(actor_id_.IsNil())
<< "A worker that is already an actor cannot be assigned an actor ID again.";
RAY_CHECK(!actor_id.is_nil());
RAY_CHECK(!actor_id.IsNil());
actor_id_ = actor_id;
}
+4 -4
View File
@@ -172,7 +172,7 @@ void WorkerPool::RegisterWorker(const std::shared_ptr<Worker> &worker) {
}
void WorkerPool::RegisterDriver(const std::shared_ptr<Worker> &driver) {
RAY_CHECK(!driver->GetAssignedTaskId().is_nil());
RAY_CHECK(!driver->GetAssignedTaskId().IsNil());
auto &state = GetStateForLanguage(driver->GetLanguage());
state.registered_drivers.insert(std::move(driver));
}
@@ -201,11 +201,11 @@ std::shared_ptr<Worker> WorkerPool::GetRegisteredDriver(
void WorkerPool::PushWorker(const std::shared_ptr<Worker> &worker) {
// Since the worker is now idle, unset its assigned task ID.
RAY_CHECK(worker->GetAssignedTaskId().is_nil())
RAY_CHECK(worker->GetAssignedTaskId().IsNil())
<< "Idle workers cannot have an assigned task ID";
auto &state = GetStateForLanguage(worker->GetLanguage());
// Add the worker to the idle pool.
if (worker->GetActorId().is_nil()) {
if (worker->GetActorId().IsNil()) {
state.idle.insert(std::move(worker));
} else {
state.idle_actor[worker->GetActorId()] = std::move(worker);
@@ -216,7 +216,7 @@ std::shared_ptr<Worker> WorkerPool::PopWorker(const TaskSpecification &task_spec
auto &state = GetStateForLanguage(task_spec.GetLanguage());
const auto &actor_id = task_spec.ActorId();
std::shared_ptr<Worker> worker = nullptr;
if (actor_id.is_nil()) {
if (actor_id.IsNil()) {
if (!state.idle.empty()) {
worker = std::move(*state.idle.begin());
state.idle.erase(state.idle.begin());
+6 -6
View File
@@ -72,11 +72,11 @@ class WorkerPoolTest : public ::testing::Test {
};
static inline TaskSpecification ExampleTaskSpec(
const ActorID actor_id = ActorID::nil(),
const ActorID actor_id = ActorID::Nil(),
const Language &language = Language::PYTHON) {
std::vector<std::string> function_descriptor(3);
return TaskSpecification(DriverID::nil(), TaskID::nil(), 0, ActorID::nil(),
ObjectID::nil(), 0, actor_id, ActorHandleID::nil(), 0, {}, {},
return TaskSpecification(DriverID::Nil(), TaskID::Nil(), 0, ActorID::Nil(),
ObjectID::Nil(), 0, actor_id, ActorHandleID::Nil(), 0, {}, {},
0, {}, {}, language, function_descriptor);
}
@@ -155,7 +155,7 @@ TEST_F(WorkerPoolTest, PopActorWorker) {
// Assign an actor ID to the worker.
const auto task_spec = ExampleTaskSpec();
auto actor = worker_pool_.PopWorker(task_spec);
auto actor_id = ActorID::from_random();
auto actor_id = ActorID::FromRandom();
actor->AssignActorId(actor_id);
worker_pool_.PushWorker(actor);
@@ -173,10 +173,10 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
auto py_worker = CreateWorker(1234, Language::PYTHON);
worker_pool_.PushWorker(py_worker);
// Check that no worker will be popped if the given task is a Java task
const auto java_task_spec = ExampleTaskSpec(ActorID::nil(), Language::JAVA);
const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA);
ASSERT_EQ(worker_pool_.PopWorker(java_task_spec), nullptr);
// Check that the worker can be popped if the given task is a Python task
const auto py_task_spec = ExampleTaskSpec(ActorID::nil(), Language::PYTHON);
const auto py_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON);
ASSERT_NE(worker_pool_.PopWorker(py_task_spec), nullptr);
// Create a Java Worker, and add it to the pool