Use gRPC to handle communication and data transmission between object manager (#4996)

This commit is contained in:
Joey Jiang
2019-06-28 10:56:34 +08:00
committed by Hao Chen
parent 62e4b591e3
commit d6bbbdef35
21 changed files with 714 additions and 608 deletions
+59 -4
View File
@@ -37,8 +37,38 @@ cc_proto_library(
deps = ["node_manager_proto"],
)
proto_library(
name = "object_manager_proto",
srcs = ["src/ray/protobuf/object_manager.proto"],
)
cc_proto_library(
name = "object_manager_cc_proto",
deps = ["object_manager_proto"],
)
# === End of protobuf definitions ===
# === Begin of rpc definitions ===
# grpc common lib
cc_library(
name = "grpc_common_lib",
srcs = glob([
"src/ray/rpc/*.cc",
]),
hdrs = glob([
"src/ray/rpc/*.h",
]),
copts = COPTS,
deps = [
":ray_common",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
"@com_google_protobuf//:protobuf",
],
)
# Node manager gRPC lib.
cc_grpc_library(
name = "node_manager_cc_grpc",
@@ -50,14 +80,12 @@ cc_grpc_library(
# Node manager server and client.
cc_library(
name = "node_manager_rpc",
srcs = glob([
"src/ray/rpc/*.cc",
]),
hdrs = glob([
"src/ray/rpc/*.h",
"src/ray/rpc/node_manager/*.h",
]),
copts = COPTS,
deps = [
":grpc_common_lib",
":node_manager_cc_grpc",
":ray_common",
"@boost//:asio",
@@ -65,6 +93,32 @@ cc_library(
],
)
# Object manager gRPC lib.
cc_grpc_library(
name = "object_manager_cc_grpc",
srcs = [":object_manager_proto"],
grpc_only = True,
deps = [":object_manager_cc_proto"],
)
# Object manager server and client.
cc_library(
name = "object_manager_rpc",
hdrs = glob([
"src/ray/rpc/object_manager/*.h",
]),
copts = COPTS,
deps = [
":grpc_common_lib",
":object_manager_cc_grpc",
":ray_common",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
],
)
# === End of rpc definitions ===
cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
@@ -320,6 +374,7 @@ cc_library(
deps = [
":gcs",
":object_manager_fbs",
":object_manager_rpc",
":ray_common",
":ray_util",
"@boost//:asio",
@@ -72,3 +72,31 @@ def test_task_forward(benchmark, num_tasks):
# Warm up
ray.get([f.remote() for _ in range(100)])
benchmark(benchmark_task_forward, f, num_tasks)
def benchmark_transfer_object(actor, object_ids):
ray.get(actor.f.remote(object_ids))
@pytest.mark.benchmark
@pytest.mark.parametrize("object_number, data_size",
[(10000, 500), (10000, 5000), (1000, 500),
(1000, 5000)])
def test_transfer_performance(benchmark, ray_start_cluster_head, object_number,
data_size):
cluster = ray_start_cluster_head
cluster.add_node(resources={"my_resource": 1}, object_store_memory=10**9)
@ray.remote(resources={"my_resource": 1})
class ObjectActor:
def f(self, object_ids):
ray.get(object_ids)
# setup remote actor
actor = ObjectActor.remote()
actor.f.remote([])
data = bytes(1) * data_size
object_ids = [ray.put(data) for _ in range(object_number)]
benchmark(benchmark_transfer_object, actor, object_ids)
@@ -24,44 +24,3 @@ table ObjectInfo {
// Specifies if this object was deleted or added.
is_deletion: bool;
}
enum MessageType:int {
ConnectClient = 1,
DisconnectClient,
PushRequest,
PullRequest,
FreeRequest
}
table PushRequestMessage {
// The push ID to allow the receiver to differentiate different push attempts
// from the same sender.
push_id: string;
// The object ID being transferred.
object_id: string;
// The index of the chunk being transferred.
chunk_index: ulong;
// The total size of the object + metadata.
data_size: ulong;
// The metadata size.
metadata_size: ulong;
}
table PullRequestMessage {
// ID of the requesting client.
client_id: string;
// Requested ObjectID.
object_id: string;
}
table ConnectClientMessage {
// ID of the connecting client.
client_id: string;
// Whether this is a transfer connection.
is_transfer: bool;
}
table FreeRequestMessage {
// List of IDs to be deleted.
object_ids: [string];
}
@@ -108,6 +108,9 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size)));
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
}
if (create_buffer_state_[object_id].chunk_state[chunk_index] !=
@@ -154,6 +157,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id));
RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id));
create_buffer_state_.erase(object_id);
RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id
<< ", last chunk index: " << chunk_index;
}
}
+218 -351
View File
@@ -16,12 +16,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
send_work_(send_service_),
receive_work_(receive_service_),
rpc_work_(rpc_service_),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()),
object_manager_server_("object_manager", config_.object_manager_port),
object_manager_service_(rpc_service_, *this),
client_call_manager_(main_service) {
RAY_CHECK(config_.rpc_service_threads_number > 0);
client_id_ = object_directory_->GetLocalClientID();
main_service_ = &main_service;
store_notification_.SubscribeObjAdded(
@@ -30,35 +31,32 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
});
store_notification_.SubscribeObjDeleted(
[this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); });
StartIOService();
// Start object manager rpc server and send & receive request threads
StartRpcService();
}
ObjectManager::~ObjectManager() { StopIOService(); }
ObjectManager::~ObjectManager() { StopRpcService(); }
void ObjectManager::RegisterGcs() { object_directory_->RegisterBackend(); }
void ObjectManager::StartIOService() {
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this));
}
for (int i = 0; i < config_.max_receives; ++i) {
receive_threads_.emplace_back(std::thread(&ObjectManager::RunReceiveService, this));
void ObjectManager::RunRpcService() { rpc_service_.run(); }
void ObjectManager::StartRpcService() {
rpc_threads_.resize(config_.rpc_service_threads_number);
for (int i = 0; i < config_.rpc_service_threads_number; i++) {
rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this);
}
object_manager_server_.RegisterService(object_manager_service_);
object_manager_server_.Run();
}
void ObjectManager::RunSendService() { send_service_.run(); }
void ObjectManager::RunReceiveService() { receive_service_.run(); }
void ObjectManager::StopIOService() {
send_service_.stop();
for (int i = 0; i < config_.max_sends; ++i) {
send_threads_[i].join();
}
receive_service_.stop();
for (int i = 0; i < config_.max_receives; ++i) {
receive_threads_[i].join();
void ObjectManager::StopRpcService() {
rpc_service_.stop();
for (int i = 0; i < config_.rpc_service_threads_number; i++) {
rpc_threads_[i].join();
}
object_manager_server_.Shutdown();
}
void ObjectManager::HandleObjectAdded(
@@ -201,8 +199,18 @@ void ObjectManager::TryPull(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Sending pull request from " << client_id_ << " to " << client_id
<< " of object " << object_id;
// Try pulling from the client.
PullEstablishConnection(object_id, client_id);
auto rpc_client = GetRpcClient(client_id);
if (rpc_client) {
// Try pulling from the client.
rpc_service_.post([this, object_id, client_id, rpc_client]() {
SendPullRequest(object_id, client_id, rpc_client);
});
} else {
RAY_LOG(ERROR) << "Couldn't send pull request from " << client_id_ << " to "
<< client_id << " of object " << object_id
<< " , setup rpc connection failed.";
}
// If there are more clients to try, try them in succession, with a timeout
// in between each try.
@@ -238,51 +246,20 @@ void ObjectManager::TryPull(const ObjectID &object_id) {
}
};
void ObjectManager::PullEstablishConnection(const ObjectID &object_id,
const ClientID &client_id) {
// Acquire a message connection and send pull request.
ray::Status status;
std::shared_ptr<SenderConnection> conn;
// TODO(hme): There is no cap on the number of pull request connections.
connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, client_id, &conn);
void ObjectManager::SendPullRequest(
const ObjectID &object_id, const ClientID &client_id,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client) {
rpc::PullRequest pull_request;
pull_request.set_object_id(object_id.Binary());
pull_request.set_client_id(client_id_.Binary());
// Try to create a new connection to the remote object manager if one doesn't
// already exist.
if (conn == nullptr) {
RemoteConnectionInfo connection_info(client_id);
object_directory_->LookupRemoteConnectionInfo(connection_info);
if (connection_info.Connected()) {
conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE,
connection_info);
} else {
RAY_LOG(ERROR) << "Failed to establish connection with remote object manager.";
rpc_client->Pull(pull_request, [object_id, client_id](const Status &status,
const rpc::PullReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Send pull " << object_id << " request to client " << client_id
<< " failed due to" << status.message();
}
}
if (conn != nullptr) {
PullSendRequest(object_id, conn);
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn);
}
}
void ObjectManager::PullSendRequest(const ObjectID &object_id,
std::shared_ptr<SenderConnection> &conn) {
// TODO(rkn): This would be a natural place to record a profile event
// indicating that a pull request was sent.
flatbuffers::FlatBufferBuilder fbb;
auto message = object_manager_protocol::CreatePullRequestMessage(
fbb, fbb.CreateString(client_id_.Binary()), fbb.CreateString(object_id.Binary()));
fbb.Finish(message);
conn->WriteMessageAsync(
static_cast<int64_t>(object_manager_protocol::MessageType::PullRequest),
fbb.GetSize(), fbb.GetBufferPointer(), [this, conn](ray::Status status) {
if (!status.ok()) {
RAY_CHECK(status.IsIOError())
<< "Failed to contact remote object manager during Pull";
connection_pool_.RemoveSender(conn);
}
});
});
}
void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id,
@@ -318,6 +295,8 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id,
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() +
"\"," + std::to_string(chunk_index) + ",\"" +
status.ToString() + "\"]");
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.push_back(profile_event);
}
@@ -335,9 +314,12 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
profile_event.set_end_time(end_time);
// Encode the object ID, client ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() +
"\"," + std::to_string(chunk_index) + ",\"" +
status.ToString() + "\"]");
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.push_back(profile_event);
}
@@ -399,35 +381,29 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
}
}
RemoteConnectionInfo connection_info(client_id);
object_directory_->LookupRemoteConnectionInfo(connection_info);
if (connection_info.Connected()) {
auto rpc_client = GetRpcClient(client_id);
if (rpc_client) {
const object_manager::protocol::ObjectInfoT &object_info =
local_objects_[object_id].object_info;
uint64_t data_size =
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
RAY_LOG(DEBUG) << "Sending object chunks of " << object_id << " to client "
<< client_id << ", number of chunks: " << num_chunks
<< ", total data size: " << data_size;
UniqueID push_id = UniqueID::FromRandom();
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info]() {
double start_time = current_sys_time_seconds();
// NOTE: When this callback executes, it's possible that the object
// will have already been evicted. It's also possible that the
// object could be in the process of being transferred to this
// object manager from another object manager.
ray::Status status =
ExecuteSendObject(push_id, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info);
double end_time = current_sys_time_seconds();
// Notify the main thread that we have finished sending the chunk.
main_service_->post(
[this, object_id, client_id, chunk_index, start_time, end_time, status]() {
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
rpc_service_.post([this, push_id, object_id, client_id, data_size, metadata_size,
chunk_index, rpc_client]() {
auto st = SendObjectChunk(push_id, object_id, client_id, data_size, metadata_size,
chunk_index, rpc_client);
if (!st.ok()) {
RAY_LOG(WARNING) << "Send object " << object_id << " chunk failed due to "
<< st.message() << ", chunk index " << chunk_index;
}
});
}
} else {
@@ -437,37 +413,21 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
}
}
ray::Status ObjectManager::ExecuteSendObject(
const UniqueID &push_id, const ClientID &client_id, const ObjectID &object_id,
ray::Status ObjectManager::SendObjectChunk(
const UniqueID &push_id, const ObjectID &object_id, const ClientID &client_id,
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info) {
RAY_LOG(DEBUG) << "ExecuteSendObject on " << client_id_ << " to " << client_id
<< " of object " << object_id << " chunk " << chunk_index;
ray::Status status;
std::shared_ptr<SenderConnection> conn;
connection_pool_.GetSender(ConnectionPool::ConnectionType::TRANSFER, client_id, &conn);
if (conn == nullptr) {
conn =
CreateSenderConnection(ConnectionPool::ConnectionType::TRANSFER, connection_info);
}
std::shared_ptr<rpc::ObjectManagerClient> rpc_client) {
double start_time = current_sys_time_seconds();
rpc::PushRequest push_request;
// Set request header
push_request.set_push_id(push_id.Binary());
push_request.set_object_id(object_id.Binary());
push_request.set_client_id(client_id_.Binary());
push_request.set_data_size(data_size);
push_request.set_metadata_size(metadata_size);
push_request.set_chunk_index(chunk_index);
if (conn != nullptr) {
status = SendObjectHeaders(push_id, object_id, data_size, metadata_size, chunk_index,
conn);
if (!status.ok()) {
RAY_CHECK(status.IsIOError())
<< "Failed to contact remote object manager during Push";
connection_pool_.RemoveSender(conn);
}
}
return status;
}
ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id,
const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<SenderConnection> &conn) {
// Get data
std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> chunk_status =
buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index);
ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first;
@@ -481,36 +441,30 @@ ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id,
RAY_RETURN_NOT_OK(status);
}
// Create buffer.
flatbuffers::FlatBufferBuilder fbb;
auto message = object_manager_protocol::CreatePushRequestMessage(
fbb, to_flatbuf(fbb, push_id), to_flatbuf(fbb, object_id), chunk_index, data_size,
metadata_size);
fbb.Finish(message);
status = conn->WriteMessage(
static_cast<int64_t>(object_manager_protocol::MessageType::PushRequest),
fbb.GetSize(), fbb.GetBufferPointer());
if (!status.ok()) {
return status;
}
return SendObjectData(object_id, chunk_info, conn);
}
std::string buffer;
buffer.resize(chunk_info.buffer_length);
buffer.assign(chunk_info.data, chunk_info.data + chunk_info.buffer_length);
push_request.set_data(std::move(buffer));
ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
const ObjectBufferPool::ChunkInfo &chunk_info,
std::shared_ptr<SenderConnection> &conn) {
boost::system::error_code error;
std::vector<asio::const_buffer> buffer;
buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length));
Status status = conn->WriteBuffer(buffer);
// record the time cost between send chunk and receive reply
rpc::ClientCallback<rpc::PushReply> callback = [this, start_time, object_id, client_id,
chunk_index](
const Status &status,
const rpc::PushReply &reply) {
// TODO: Just print warning here, should we try to resend this chunk?
if (!status.ok()) {
RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client " << client_id
<< " failed due to" << status.message()
<< ", chunk index: " << chunk_index;
}
double end_time = current_sys_time_seconds();
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status);
};
rpc_client->Push(push_request, callback);
// Do this regardless of whether it failed or succeeded.
buffer_pool_.ReleaseGetChunk(object_id, chunk_info.chunk_index);
if (status.ok()) {
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn);
}
return status;
return Status::OK();
}
void ObjectManager::CancelPull(const ObjectID &object_id) {
@@ -708,140 +662,37 @@ void ObjectManager::WaitComplete(const UniqueID &wait_id) {
<< " remaining " << remaining.size();
}
std::shared_ptr<SenderConnection> ObjectManager::CreateSenderConnection(
ConnectionPool::ConnectionType type, RemoteConnectionInfo info) {
std::shared_ptr<SenderConnection> conn =
SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port);
if (conn == nullptr) {
RAY_LOG(ERROR) << "Failed to connect to remote object manager.";
} else {
// Register the new connection.
connection_pool_.RegisterSender(type, info.client_id, conn);
// Prepare client connection info buffer
flatbuffers::FlatBufferBuilder fbb;
bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER);
auto message = object_manager_protocol::CreateConnectClientMessage(
fbb, to_flatbuf(fbb, client_id_), is_transfer);
fbb.Finish(message);
// Send synchronously.
// TODO(swang): Make this a WriteMessageAsync.
RAY_CHECK_OK(conn->WriteMessage(
static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient),
fbb.GetSize(), fbb.GetBufferPointer()));
}
return conn;
}
/// Implementation of ObjectManagerServiceHandler
void ObjectManager::HandlePushRequest(const rpc::PushRequest &request,
rpc::PushReply *reply,
rpc::RequestDoneCallback done_callback) {
ObjectID object_id = ObjectID::FromBinary(request.object_id());
ClientID client_id = ClientID::FromBinary(request.client_id());
void ObjectManager::ProcessNewClient(TcpClientConnection &conn) {
conn.ProcessMessages();
}
void ObjectManager::ProcessClientMessage(std::shared_ptr<TcpClientConnection> &conn,
int64_t message_type, const uint8_t *message) {
const auto message_type_value =
static_cast<object_manager_protocol::MessageType>(message_type);
RAY_LOG(DEBUG) << "[ObjectManager] Message "
<< object_manager_protocol::EnumNameMessageType(message_type_value)
<< "(" << message_type << ") from object manager";
switch (message_type_value) {
case object_manager_protocol::MessageType::PushRequest: {
ReceivePushRequest(conn, message);
break;
}
case object_manager_protocol::MessageType::PullRequest: {
ReceivePullRequest(conn, message);
break;
}
case object_manager_protocol::MessageType::ConnectClient: {
ConnectClient(conn, message);
break;
}
case object_manager_protocol::MessageType::FreeRequest: {
ReceiveFreeRequest(conn, message);
break;
}
case object_manager_protocol::MessageType::DisconnectClient: {
DisconnectClient(conn, message);
break;
}
default: { RAY_LOG(FATAL) << "invalid request " << message_type; }
}
}
void ObjectManager::ConnectClient(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
// TODO: trash connection on failure.
auto info =
flatbuffers::GetRoot<object_manager_protocol::ConnectClientMessage>(message);
ClientID client_id = ClientID::FromBinary(info->client_id()->str());
bool is_transfer = info->is_transfer();
conn->SetClientID(client_id);
if (is_transfer) {
connection_pool_.RegisterReceiver(ConnectionPool::ConnectionType::TRANSFER, client_id,
conn);
} else {
connection_pool_.RegisterReceiver(ConnectionPool::ConnectionType::MESSAGE, client_id,
conn);
}
conn->ProcessMessages();
}
void ObjectManager::DisconnectClient(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
connection_pool_.RemoveReceiver(conn);
// We don't need to clean up unfulfilled_push_requests_ because the
// unfulfilled push timers will fire and clean it up.
}
void ObjectManager::ReceivePullRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
// Serialize and push object to requesting client.
auto pr = flatbuffers::GetRoot<object_manager_protocol::PullRequestMessage>(message);
ObjectID object_id = ObjectID::FromBinary(pr->object_id()->str());
ClientID client_id = ClientID::FromBinary(pr->client_id()->str());
rpc::ProfileTableData::ProfileEvent profile_event;
profile_event.set_event_type("receive_pull_request");
profile_event.set_start_time(current_sys_time_seconds());
profile_event.set_end_time(profile_event.start_time());
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() +
"\"]");
profile_events_.push_back(profile_event);
Push(object_id, client_id);
conn->ProcessMessages();
}
void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
// Serialize.
auto object_header =
flatbuffers::GetRoot<object_manager_protocol::PushRequestMessage>(message);
const ObjectID object_id = ObjectID::FromBinary(object_header->object_id()->str());
uint64_t chunk_index = object_header->chunk_index();
uint64_t data_size = object_header->data_size();
uint64_t metadata_size = object_header->metadata_size();
receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() {
double start_time = current_sys_time_seconds();
const ClientID client_id = conn->GetClientId();
auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size,
chunk_index, *conn);
double end_time = current_sys_time_seconds();
// Notify the main thread that we have finished receiving the object.
main_service_->post(
[this, object_id, client_id, chunk_index, start_time, end_time, status]() {
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
});
uint64_t chunk_index = request.chunk_index();
uint64_t metadata_size = request.metadata_size();
uint64_t data_size = request.data_size();
const std::string &data = request.data();
double start_time = current_sys_time_seconds();
auto status = ReceiveObjectChunk(client_id, object_id, data_size, metadata_size,
chunk_index, data);
double end_time = current_sys_time_seconds();
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, status);
done_callback(status);
}
ray::Status ObjectManager::ExecuteReceiveObject(
const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index, TcpClientConnection &conn) {
RAY_LOG(DEBUG) << "ExecuteReceiveObject on " << client_id_ << " from " << client_id
<< " of object " << object_id << " chunk " << chunk_index;
ray::Status ObjectManager::ReceiveObjectChunk(const ClientID &client_id,
const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
const std::string &data) {
RAY_LOG(DEBUG) << "ReceiveObjectChunk on " << client_id_ << " from " << client_id
<< " of object " << object_id << " chunk index: " << chunk_index
<< ", chunk data size: " << data.size()
<< ", object size: " << data_size;
std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> chunk_status =
buffer_pool_.CreateChunk(object_id, data_size, metadata_size, chunk_index);
@@ -849,94 +700,108 @@ ray::Status ObjectManager::ExecuteReceiveObject(
ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first;
if (chunk_status.second.ok()) {
// Avoid handling this chunk if it's already being handled by another process.
std::vector<boost::asio::mutable_buffer> buffer;
buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length));
status = conn.ReadBuffer(buffer);
if (status.ok()) {
buffer_pool_.SealChunk(object_id, chunk_index);
} else {
// We may have not have read out the correct data, so abort this chunk.
buffer_pool_.AbortCreateChunk(object_id, chunk_index);
// TODO(hme): This chunk failed, so create a pull request for this chunk.
}
std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length);
buffer_pool_.SealChunk(object_id, chunk_index);
} else {
RAY_LOG(DEBUG) << "ExecuteReceiveObject failed: " << chunk_status.second.message();
// Read object into empty buffer.
uint64_t buffer_length = buffer_pool_.GetBufferLength(chunk_index, data_size);
std::vector<uint8_t> mutable_vec;
mutable_vec.resize(buffer_length);
std::vector<boost::asio::mutable_buffer> buffer;
buffer.push_back(asio::buffer(mutable_vec, buffer_length));
status = conn.ReadBuffer(buffer);
RAY_LOG(WARNING) << "ReceiveObjectChunk index " << chunk_index << " of object "
<< object_id << " failed: " << chunk_status.second.message();
// TODO(hme): If the object isn't local, create a pull request for this chunk.
}
RAY_LOG(DEBUG) << "ExecuteReceiveObject completed on " << client_id_ << " from "
<< client_id << " of object " << object_id << " chunk " << chunk_index
<< " at " << current_sys_time_ms();
if (status.ok()) {
// We successfully read the buffer, so we are ready to receive the next
// message.
conn.ProcessMessages();
} else {
// Close the connection by skipping the call to ProcessMessages.
RAY_LOG(ERROR) << "Failed to ExecuteReceiveObject from remote object manager, error: "
<< status;
}
return status;
}
void ObjectManager::ReceiveFreeRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
auto free_request =
flatbuffers::GetRoot<object_manager_protocol::FreeRequestMessage>(message);
std::vector<ObjectID> object_ids = from_flatbuf<ObjectID>(*free_request->object_ids());
// This RPC should come from another Object Manager.
// Keep this request local.
bool local_only = true;
FreeObjects(object_ids, local_only);
conn->ProcessMessages();
void ObjectManager::HandlePullRequest(const rpc::PullRequest &request,
rpc::PullReply *reply,
rpc::RequestDoneCallback done_callback) {
ObjectID object_id = ObjectID::FromBinary(request.object_id());
ClientID client_id = ClientID::FromBinary(request.client_id());
RAY_LOG(DEBUG) << "Received pull request from client " << client_id << " for object ["
<< object_id << "].";
rpc::ProfileTableData::ProfileEvent profile_event;
profile_event.set_event_type("receive_pull_request");
profile_event.set_start_time(current_sys_time_seconds());
profile_event.set_end_time(profile_event.start_time());
profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() +
"\"]");
{
std::lock_guard<std::mutex> lock(profile_mutex_);
profile_events_.emplace_back(profile_event);
}
main_service_->post([this, object_id, client_id]() { Push(object_id, client_id); });
done_callback(Status::OK());
}
void ObjectManager::HandleFreeObjectsRequest(const rpc::FreeObjectsRequest &request,
rpc::FreeObjectsReply *reply,
rpc::RequestDoneCallback done_callback) {
std::vector<ObjectID> object_ids;
for (const auto &e : request.object_ids()) {
object_ids.emplace_back(ObjectID::FromBinary(e));
}
FreeObjects(object_ids, /* local_only */ true);
done_callback(Status::OK());
}
void ObjectManager::FreeObjects(const std::vector<ObjectID> &object_ids,
bool local_only) {
buffer_pool_.FreeObjects(object_ids);
if (!local_only) {
SpreadFreeObjectRequest(object_ids);
const auto remote_connections = object_directory_->LookupAllRemoteConnections();
std::vector<std::shared_ptr<rpc::ObjectManagerClient>> rpc_clients;
for (const auto &connection_info : remote_connections) {
auto rpc_client = GetRpcClient(connection_info.client_id);
if (rpc_client != nullptr) {
rpc_clients.push_back(rpc_client);
}
}
rpc_service_.post([this, object_ids, rpc_clients]() {
SpreadFreeObjectsRequest(object_ids, rpc_clients);
});
}
}
void ObjectManager::SpreadFreeObjectRequest(const std::vector<ObjectID> &object_ids) {
void ObjectManager::SpreadFreeObjectsRequest(
const std::vector<ObjectID> &object_ids,
const std::vector<std::shared_ptr<rpc::ObjectManagerClient>> &rpc_clients) {
// This code path should be called from node manager.
flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<object_manager_protocol::FreeRequestMessage> request =
object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids));
fbb.Finish(request);
const auto remote_connections = object_directory_->LookupAllRemoteConnections();
for (const auto &connection_info : remote_connections) {
std::shared_ptr<SenderConnection> conn;
connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE,
connection_info.client_id, &conn);
if (conn == nullptr) {
conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE,
connection_info);
}
if (conn != nullptr) {
conn->WriteMessageAsync(
static_cast<int64_t>(object_manager_protocol::MessageType::FreeRequest),
fbb.GetSize(), fbb.GetBufferPointer(), [this, conn](ray::Status status) {
if (!status.ok()) {
RAY_CHECK(status.IsIOError())
<< "Failed to contact remote object manager during Free";
connection_pool_.RemoveSender(conn);
}
});
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn);
}
rpc::FreeObjectsRequest free_objects_request;
for (const auto &e : object_ids) {
free_objects_request.add_object_ids(e.Binary());
}
for (auto &rpc_client : rpc_clients) {
rpc_client->FreeObjects(free_objects_request, [](const Status &status,
const rpc::FreeObjectsReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Send free objects request failed due to" << status.message();
}
});
}
}
std::shared_ptr<rpc::ObjectManagerClient> ObjectManager::GetRpcClient(
const ClientID &client_id) {
auto it = remote_object_manager_clients_.find(client_id);
if (it == remote_object_manager_clients_.end()) {
RemoteConnectionInfo connection_info(client_id);
object_directory_->LookupRemoteConnectionInfo(connection_info);
if (!connection_info.Connected()) {
return nullptr;
}
auto object_manager_client = std::make_shared<rpc::ObjectManagerClient>(
connection_info.ip, connection_info.port, client_call_manager_);
RAY_LOG(DEBUG) << "Get rpc client, address: " << connection_info.ip
<< ", port: " << connection_info.port
<< ", local port: " << GetServerPort();
it = remote_object_manager_clients_
.emplace(client_id, std::move(object_manager_client))
.first;
}
return it->second;
}
rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() {
@@ -944,12 +809,14 @@ rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() {
profile_info.set_component_type("object_manager");
profile_info.set_component_id(client_id_.Binary());
for (auto const &profile_event : profile_events_) {
profile_info.add_profile_events()->CopyFrom(profile_event);
{
std::lock_guard<std::mutex> lock(profile_mutex_);
for (auto const &profile_event : profile_events_) {
profile_info.add_profile_events()->CopyFrom(profile_event);
}
profile_events_.clear();
}
profile_events_.clear();
return profile_info;
}
+113 -65
View File
@@ -6,6 +6,7 @@
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
@@ -25,6 +26,8 @@
#include "ray/object_manager/object_directory.h"
#include "ray/object_manager/object_manager_client_connection.h"
#include "ray/object_manager/object_store_notification_manager.h"
#include "ray/rpc/object_manager/object_manager_client.h"
#include "ray/rpc/object_manager/object_manager_server.h"
namespace ray {
@@ -36,10 +39,6 @@ struct ObjectManagerConfig {
/// The time in milliseconds to wait before retrying a pull
/// that fails due to client id lookup.
uint pull_timeout_ms;
/// Maximum number of sends allowed.
int max_sends;
/// Maximum number of receives allowed.
int max_receives;
/// Object chunk size, in bytes
uint64_t object_chunk_size;
/// The store socket name.
@@ -49,6 +48,9 @@ struct ObjectManagerConfig {
/// Negative: waiting infinitely.
/// 0: giving up retrying immediately.
int push_timeout_ms;
/// Number of threads of rpc service
/// Send and receive request in these threads
int rpc_service_threads_number;
};
struct LocalObjectInfo {
@@ -67,7 +69,81 @@ class ObjectManagerInterface {
};
// TODO(hme): Add success/failure callbacks for push and pull.
class ObjectManager : public ObjectManagerInterface {
class ObjectManager : public ObjectManagerInterface,
public rpc::ObjectManagerServiceHandler {
public:
/// Implementation of object manager service
/// Handle push request from remote object manager
///
/// Push request will contain the object which is specified by pull request
/// the object will be transfered by a sequence of chunks.
///
/// \param request Push request including the object chunk data
/// \param reply Reply to the sender
/// \param done_callback Callback of the request
void HandlePushRequest(const rpc::PushRequest &request, rpc::PushReply *reply,
rpc::RequestDoneCallback done_callback) override;
/// Handle pull request from remote object manager
///
/// \param request Pull request
/// \param reply Reply
/// \param done_callback Callback of request
void HandlePullRequest(const rpc::PullRequest &request, rpc::PullReply *reply,
rpc::RequestDoneCallback done_callback) override;
/// Handle free objects request
///
/// \param request Free objects request
/// \param reply Reply
/// \param done_callback
void HandleFreeObjectsRequest(const rpc::FreeObjectsRequest &request,
rpc::FreeObjectsReply *reply,
rpc::RequestDoneCallback done_callback) override;
/// Send object to remote object manager
///
/// Object will be transfered as a sequence of chunks, small object(defined in config)
/// contains only one chunk
/// \param push_id Unique push id to indicate this push request
/// \param object_id Object id
/// \param data_size Data size
/// \param metadata_size Metadata size
/// \param chunk_index Chunk index of this object chunk, start with 0
/// \param rpc_client Rpc client used to send message to remote object manager
ray::Status SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id,
const ClientID &client_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client);
/// Receive object chunk from remote object manager, small object may contain one chunk
///
/// \param client_id Client id of remote object manager which sends this chunk
/// \param object_id Object id
/// \param data_size Data size
/// \param metadata_size Metadata size
/// \param chunk_index Chunk index
/// \param data Chunk data
ray::Status ReceiveObjectChunk(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, const std::string &data);
/// Send pull request
///
/// \param object_id Object id
/// \param client_id Remote server client id
void SendPullRequest(const ObjectID &object_id, const ClientID &client_id,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client);
/// Get the rpc client according to the client ID
///
/// \param client_id Remote client id, will send rpc request to it
std::shared_ptr<rpc::ObjectManagerClient> GetRpcClient(const ClientID &client_id);
/// Get the port of the object manager rpc server.
int GetServerPort() const { return object_manager_server_.GetPort(); }
public:
/// Takes user-defined ObjectDirectoryInterface implementation.
/// When this constructor is used, the ObjectManager assumes ownership of
@@ -244,13 +320,14 @@ class ObjectManager : public ObjectManagerInterface {
/// Spread the Free request to all objects managers.
///
/// \param object_ids the The list of ObjectIDs to be deleted.
void SpreadFreeObjectRequest(const std::vector<ObjectID> &object_ids);
void SpreadFreeObjectsRequest(
const std::vector<ObjectID> &object_ids,
const std::vector<std::shared_ptr<rpc::ObjectManagerClient>> &rpc_clients);
/// Handle starting, running, and stopping asio io_service.
void StartIOService();
void RunSendService();
void RunReceiveService();
void StopIOService();
/// Handle starting, running, and stopping asio rpc_service.
void StartRpcService();
void RunRpcService();
void StopRpcService();
/// Handle an object being added to this node. This adds the object to the
/// directory, pushes the object to other nodes if necessary, and cancels any
@@ -274,9 +351,6 @@ class ObjectManager : public ObjectManagerInterface {
void PullSendRequest(const ObjectID &object_id,
std::shared_ptr<SenderConnection> &conn);
std::shared_ptr<SenderConnection> CreateSenderConnection(
ConnectionPool::ConnectionType type, RemoteConnectionInfo info);
/// This is used to notify the main thread that the sending of a chunk has
/// completed.
///
@@ -309,49 +383,15 @@ class ObjectManager : public ObjectManagerInterface {
uint64_t chunk_index, double start_time_us,
double end_time_us, ray::Status status);
/// Begin executing a send.
/// Executes on send_service_ thread pool.
ray::Status ExecuteSendObject(const UniqueID &push_id, const ClientID &client_id,
const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info);
/// This method synchronously sends the object id and object size
/// to the remote object manager.
/// Executes on send_service_ thread pool.
ray::Status SendObjectHeaders(const UniqueID &push_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<SenderConnection> &conn);
/// This method initiates the actual object transfer.
/// Executes on send_service_ thread pool.
ray::Status SendObjectData(const ObjectID &object_id,
const ObjectBufferPool::ChunkInfo &chunk_info,
std::shared_ptr<SenderConnection> &conn);
/// Invoked when a remote object manager pushes an object to this object manager.
/// This will invoke the object receive on the receive_service_ thread pool.
void ReceivePushRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message);
/// Execute a receive on the receive_service_ thread pool.
ray::Status ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, TcpClientConnection &conn);
/// Handles receiving a pull request message.
void ReceivePullRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message);
/// Handles freeing objects request.
void ReceiveFreeRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message);
/// Handles connect message of a new client connection.
void ConnectClient(std::shared_ptr<TcpClientConnection> &conn, const uint8_t *message);
/// Handles disconnect message of an existing client connection.
void DisconnectClient(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message);
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id);
@@ -361,28 +401,19 @@ class ObjectManager : public ObjectManagerInterface {
ObjectStoreNotificationManager store_notification_;
ObjectBufferPool buffer_pool_;
/// This runs on a thread pool dedicated to sending objects.
boost::asio::io_service send_service_;
/// This runs on a thread pool dedicated to receiving objects.
boost::asio::io_service receive_service_;
/// Weak reference to main service. We ensure this object is destroyed before
/// main_service_ is stopped.
boost::asio::io_service *main_service_;
/// Used to create "work" for send_service_.
/// Without this, if send_service_ has no more sends to process, it will stop.
boost::asio::io_service::work send_work_;
/// Used to create "work" for receive_service_.
/// Without this, if receive_service_ has no more receives to process, it will stop.
boost::asio::io_service::work receive_work_;
/// Multi-thread asio service, deal with all outgoing and incoming RPC request.
boost::asio::io_service rpc_service_;
/// Runs the send service, which handle
/// all outgoing object transfers.
std::vector<std::thread> send_threads_;
/// Runs the receive service, which handle
/// all incoming object transfers.
std::vector<std::thread> receive_threads_;
/// Keep rpc service running when no task in rpc service.
boost::asio::io_service::work rpc_work_;
/// The thread pool used for running `rpc_service`.
/// Data copy operations during request are done in this thread pool.
std::vector<std::thread> rpc_threads_;
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;
@@ -414,8 +445,25 @@ class ObjectManager : public ObjectManagerInterface {
/// table in the GCS.
std::vector<rpc::ProfileTableData::ProfileEvent> profile_events_;
/// mutex lock used to protect profile_events_, profile_events_ is used in main thread
/// and rpc thread.
std::mutex profile_mutex_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
/// The gPRC server.
rpc::GrpcServer object_manager_server_;
/// The gRPC service.
rpc::ObjectManagerGrpcService object_manager_service_;
/// The client call manager used to deal with reply.
rpc::ClientCallManager client_call_manager_;
/// clientID - object manager gRPC client.
std::unordered_map<ClientID, std::shared_ptr<rpc::ObjectManagerClient>>
remote_object_manager_clients_;
};
} // namespace ray
@@ -33,15 +33,11 @@ class MockServer {
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
: config_(object_manager_config),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
}
~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
@@ -50,45 +46,20 @@ class MockServer {
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));
boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
std::string ip = endpoint.address().to_string();
unsigned short object_manager_port = endpoint.port();
auto object_manager_port = config_.object_manager_port;
ClientTableData client_info = gcs_client_->client_table().GetLocalClient();
client_info.set_node_manager_address(ip);
client_info.set_node_manager_address("127.0.0.1");
client_info.set_node_manager_port(object_manager_port);
client_info.set_object_manager_port(object_manager_port);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}
void DoAcceptObjectManager() {
object_manager_acceptor_.async_accept(
object_manager_socket_, boost::bind(&MockServer::HandleAcceptObjectManager, this,
boost::asio::placeholders::error));
}
void HandleAcceptObjectManager(const boost::system::error_code &error) {
ClientHandler<boost::asio::ip::tcp> client_handler =
[this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); };
MessageHandler<boost::asio::ip::tcp> message_handler =
[this](std::shared_ptr<TcpClientConnection> client, int64_t message_type,
const uint8_t *message) {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(
client_handler, message_handler, std::move(object_manager_socket_),
"object manager", {},
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
DoAcceptObjectManager();
}
friend class StressTestObjectManager;
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
ObjectManagerConfig config_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
ObjectManager object_manager_;
};
@@ -127,10 +98,6 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_2 = StartStore(UniqueID::FromRandom().Hex());
uint pull_timeout_ms = 1000;
int max_sends_a = 2;
int max_receives_a = 2;
int max_sends_b = 3;
int max_receives_b = 3;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
int push_timeout_ms = 10000;
@@ -140,10 +107,10 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends_a;
om_config_1.max_receives = max_receives_a;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
om_config_1.object_manager_port = 12345;
om_config_1.rpc_service_threads_number = 3;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
// start second server
@@ -152,10 +119,10 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends_b;
om_config_2.max_receives = max_receives_b;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
om_config_2.object_manager_port = 23456;
om_config_2.rpc_service_threads_number = 3;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
// connect to stores.
@@ -27,15 +27,11 @@ class MockServer {
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
: config_(object_manager_config),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
}
~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); }
@@ -44,45 +40,20 @@ class MockServer {
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));
boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
std::string ip = endpoint.address().to_string();
unsigned short object_manager_port = endpoint.port();
auto object_manager_port = config_.object_manager_port;
ClientTableData client_info = gcs_client_->client_table().GetLocalClient();
client_info.set_node_manager_address(ip);
client_info.set_node_manager_address("127.0.0.1");
client_info.set_node_manager_port(object_manager_port);
client_info.set_object_manager_port(object_manager_port);
ray::Status status = gcs_client_->client_table().Connect(client_info);
object_manager_.RegisterGcs();
return status;
}
void DoAcceptObjectManager() {
object_manager_acceptor_.async_accept(
object_manager_socket_, boost::bind(&MockServer::HandleAcceptObjectManager, this,
boost::asio::placeholders::error));
}
void HandleAcceptObjectManager(const boost::system::error_code &error) {
ClientHandler<boost::asio::ip::tcp> client_handler =
[this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); };
MessageHandler<boost::asio::ip::tcp> message_handler =
[this](std::shared_ptr<TcpClientConnection> client, int64_t message_type,
const uint8_t *message) {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(
client_handler, message_handler, std::move(object_manager_socket_),
"object manager", {},
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
DoAcceptObjectManager();
}
friend class TestObjectManager;
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
ObjectManagerConfig config_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
ObjectManager object_manager_;
};
@@ -128,10 +99,10 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
om_config_1.object_manager_port = 12345;
om_config_1.rpc_service_threads_number = 3;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
// start second server
@@ -140,10 +111,10 @@ class TestObjectManagerBase : public ::testing::Test {
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
om_config_2.object_manager_port = 23456;
om_config_2.rpc_service_threads_number = 3;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
// connect to stores.
@@ -201,8 +172,6 @@ class TestObjectManagerBase : public ::testing::Test {
uint push_timeout_ms;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
};
@@ -384,6 +353,7 @@ class TestObjectManager : public TestObjectManagerBase {
num_objects += 1;
object_ids.push_back(ObjectID::FromRandom());
}
boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time();
RAY_CHECK_OK(server1->object_manager_.Wait(
object_ids, timeout_ms, required_objects, false,
+49
View File
@@ -0,0 +1,49 @@
syntax = "proto3";
package ray.rpc;
message PushRequest {
// The push ID to allow the receiver to differentiate different push attempts
// from the same sender.
bytes push_id = 1;
// The object ID being transferred.
bytes object_id = 2;
// The client ID of client sending this object
bytes client_id = 3;
// The index of the chunk being transferred.
uint32 chunk_index = 4;
// The data_size include object_size and metadata_size
uint64 data_size = 5;
// The metadata size.
uint64 metadata_size = 6;
// The chunk data
bytes data = 7;
}
message PullRequest {
// ID of the requesting client.
bytes client_id = 1;
// Requested ObjectID.
bytes object_id = 2;
}
message FreeObjectsRequest {
repeated bytes object_ids = 1;
}
// Reply for request
message PushReply {
}
message PullReply {
}
message FreeObjectsReply {
}
service ObjectManagerService {
// Push service used to send object chunks
rpc Push(PushRequest) returns (PushReply);
// Try to pull object from remote object manager
rpc Pull(PullRequest) returns (PullReply);
// Tell remote object manager to free some objects
rpc FreeObjects(FreeObjectsRequest) returns (FreeObjectsReply);
}
+3 -4
View File
@@ -146,14 +146,13 @@ int main(int argc, char *argv[]) {
RayConfig::instance().object_manager_push_timeout_ms();
int num_cpus = static_cast<int>(static_resource_conf["CPU"]);
object_manager_config.max_sends = std::max(1, num_cpus / 4);
object_manager_config.max_receives = std::max(1, num_cpus / 4);
object_manager_config.rpc_service_threads_number = std::max(2, num_cpus / 2);
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();
RAY_LOG(DEBUG) << "Starting object manager with configuration: \n"
<< "max_sends = " << object_manager_config.max_sends << "\n"
<< "max_receives = " << object_manager_config.max_receives << "\n"
<< "rpc_service_threads_number = "
<< object_manager_config.rpc_service_threads_number
<< "object_chunk_size = " << object_manager_config.object_chunk_size;
// Initialize the node manager.
+2 -2
View File
@@ -5,8 +5,8 @@
// clang-format off
#include "ray/rpc/client_call.h"
#include "ray/rpc/node_manager_server.h"
#include "ray/rpc/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_server.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/raylet/task.h"
#include "ray/object_manager/object_manager.h"
#include "ray/common/client_connection.h"
+2 -34
View File
@@ -33,10 +33,6 @@ static const std::vector<std::string> node_manager_message_enum =
GenerateEnumNames(ray::protocol::EnumNamesMessageType(),
static_cast<int>(ray::protocol::MessageType::MIN),
static_cast<int>(ray::protocol::MessageType::MAX));
static const std::vector<std::string> object_manager_message_enum =
GenerateEnumNames(ray::object_manager::protocol::EnumNamesMessageType(),
static_cast<int>(ray::object_manager::protocol::MessageType::MIN),
static_cast<int>(ray::object_manager::protocol::MessageType::MAX));
} // namespace
namespace ray {
@@ -56,15 +52,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
object_directory_),
socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
object_manager_acceptor_(
main_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),
object_manager_config.object_manager_port)),
object_manager_socket_(main_service) {
socket_(main_service) {
// Start listening for clients.
DoAccept();
DoAcceptObjectManager();
RAY_CHECK_OK(RegisterGcs(
node_ip_address, socket_name_, object_manager_config.store_socket_name,
@@ -94,7 +84,7 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address,
client_info.set_node_manager_address(node_ip_address);
client_info.set_raylet_socket_name(raylet_socket_name);
client_info.set_object_store_socket_name(object_store_socket_name);
client_info.set_object_manager_port(object_manager_acceptor_.local_endpoint().port());
client_info.set_object_manager_port(object_manager_.GetServerPort());
client_info.set_node_manager_port(node_manager_.GetServerPort());
// Add resource information.
for (const auto &resource_pair : node_manager_config.resource_config.GetResourceMap()) {
@@ -115,28 +105,6 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address,
return Status::OK();
}
void Raylet::DoAcceptObjectManager() {
object_manager_acceptor_.async_accept(
object_manager_socket_, boost::bind(&Raylet::HandleAcceptObjectManager, this,
boost::asio::placeholders::error));
}
void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) {
ClientHandler<boost::asio::ip::tcp> client_handler =
[this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); };
MessageHandler<boost::asio::ip::tcp> message_handler =
[this](std::shared_ptr<TcpClientConnection> client, int64_t message_type,
const uint8_t *message) {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new TCP client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(
client_handler, message_handler, std::move(object_manager_socket_),
"object manager", object_manager_message_enum,
static_cast<int64_t>(object_manager::protocol::MessageType::DisconnectClient));
DoAcceptObjectManager();
}
void Raylet::DoAccept() {
acceptor_.async_accept(socket_, boost::bind(&Raylet::HandleAccept, this,
boost::asio::placeholders::error));
-8
View File
@@ -61,10 +61,6 @@ class Raylet {
void DoAccept();
/// Handle an accepted client connection.
void HandleAccept(const boost::system::error_code &error);
/// Accept a tcp client connection.
void DoAcceptObjectManager();
/// Handle an accepted tcp client connection.
void HandleAcceptObjectManager(const boost::system::error_code &error);
friend class TestObjectManagerIntegration;
@@ -84,10 +80,6 @@ class Raylet {
boost::asio::local::stream_protocol::acceptor acceptor_;
/// The socket to listen on for new clients.
boost::asio::local::stream_protocol::socket socket_;
/// An acceptor for new object manager tcp clients.
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
/// The socket to listen on for new object manager tcp clients.
boost::asio::ip::tcp::socket object_manager_socket_;
};
} // namespace raylet
+5 -1
View File
@@ -49,7 +49,11 @@ using ClientCallback = std::function<void(const Status &status, const Reply &rep
template <class Reply>
class ClientCallImpl : public ClientCall {
public:
void OnReplyReceived() override { callback_(GrpcStatusToRayStatus(status_), reply_); }
void OnReplyReceived() override {
if (callback_ != nullptr) {
callback_(GrpcStatusToRayStatus(status_), reply_);
}
}
private:
/// Constructor.
+8 -5
View File
@@ -1,4 +1,5 @@
#include "ray/rpc/grpc_server.h"
#include "src/ray/rpc/grpc_server.h"
#include <grpcpp/impl/service_type.h>
namespace ray {
@@ -11,6 +12,9 @@ void GrpcServer::Run() {
// TODO(hchen): Add options for authentication.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_);
// Register all the services to this server.
if (services_.size() == 0) {
RAY_LOG(WARNING) << "No service is found when start grpc server.";
}
for (auto &entry : services_) {
builder.RegisterService(&entry.get());
}
@@ -19,7 +23,7 @@ void GrpcServer::Run() {
cq_ = builder.AddCompletionQueue();
// Build and start server.
server_ = builder.BuildAndStart();
RAY_LOG(DEBUG) << name_ << " server started, listening on port " << port_ << ".";
RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << ".";
// Create calls for all the server call factories.
for (auto &entry : server_call_factories_and_concurrencies_) {
@@ -29,8 +33,7 @@ void GrpcServer::Run() {
}
}
// Start a thread that polls incoming requests.
std::thread polling_thread(&GrpcServer::PollEventsFromCompletionQueue, this);
polling_thread.detach();
polling_thread_ = std::thread(&GrpcServer::PollEventsFromCompletionQueue, this);
}
void GrpcServer::RegisterService(GrpcService &service) {
@@ -59,7 +62,7 @@ void GrpcServer::PollEventsFromCompletionQueue() {
break;
case ServerCallState::SENDING_REPLY:
// The reply has been sent, this call can be deleted now.
// This event is triggered by `ServerCallImpl::SendReply`.
// This event is triggered by `ServerCallImpl::Finish`.
delete_call = true;
break;
default:
+20 -5
View File
@@ -30,17 +30,28 @@ class GrpcServer {
/// \param[in] name Name of this server, used for logging and debugging purpose.
/// \param[in] port The port to bind this server to. If it's 0, a random available port
/// will be chosen.
GrpcServer(const std::string &name, const uint32_t port) : name_(name), port_(port) {}
/// \param[in] main_service The main event loop, to which service handler functions
/// will be posted.
GrpcServer(const std::string &name, const uint32_t port)
: name_(name), port_(port), is_closed_(false) {}
/// Destruct this gRPC server.
~GrpcServer() {
server_->Shutdown();
cq_->Shutdown();
}
~GrpcServer() { Shutdown(); }
/// Initialize and run this server.
void Run();
// Shutdown this server
void Shutdown() {
if (!is_closed_) {
server_->Shutdown();
cq_->Shutdown();
polling_thread_.join();
is_closed_ = true;
RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown.";
}
}
/// Get the port of this gRPC server.
int GetPort() const { return port_; }
@@ -71,6 +82,10 @@ class GrpcServer {
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
/// The `Server` object.
std::unique_ptr<grpc::Server> server_;
/// The polling thread used to check the completion queue
std::thread polling_thread_;
/// Flag indicates whether this server has closed
bool is_closed_;
};
/// Base class that represents an abstract gRPC service.
@@ -14,8 +14,8 @@ namespace rpc {
class NodeManagerServiceHandler {
public:
/// Handle a `ForwardTask` request.
/// The implementation can handle this request asynchronously. When hanling is done, the
/// `done_callback` should be called.
/// The implementation can handle this request asynchronously. When handling is done,
/// the `done_callback` should be called.
///
/// \param[in] request The request message.
/// \param[out] reply The reply message.
@@ -0,0 +1,74 @@
#ifndef RAY_RPC_OBJECT_MANAGER_CLIENT_H
#define RAY_RPC_OBJECT_MANAGER_CLIENT_H
#include <thread>
#include <grpcpp/grpcpp.h>
#include "ray/common/status.h"
#include "ray/util/logging.h"
#include "src/ray/protobuf/object_manager.grpc.pb.h"
#include "src/ray/protobuf/object_manager.pb.h"
#include "src/ray/rpc/client_call.h"
namespace ray {
namespace rpc {
/// Client used for communicating with a remote node manager server.
class ObjectManagerClient {
public:
/// Constructor.
///
/// \param[in] address Address of the node manager server.
/// \param[in] port Port of the node manager server.
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
ObjectManagerClient(const std::string &address, const int port,
ClientCallManager &client_call_manager)
: client_call_manager_(client_call_manager) {
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(
address + ":" + std::to_string(port), grpc::InsecureChannelCredentials());
stub_ = ObjectManagerService::NewStub(channel);
};
/// Push object to remote object manager
///
/// \param request The request message.
/// \param callback The callback function that handles reply from server
void Push(const PushRequest &request, const ClientCallback<PushReply> &callback) {
client_call_manager_.CreateCall<ObjectManagerService, PushRequest, PushReply>(
*stub_, &ObjectManagerService::Stub::PrepareAsyncPush, request, callback);
}
/// Pull object from remote object manager
///
/// \param request The request message
/// \param callback The callback function that handles reply from server
void Pull(const PullRequest &request, const ClientCallback<PullReply> &callback) {
client_call_manager_.CreateCall<ObjectManagerService, PullRequest, PullReply>(
*stub_, &ObjectManagerService::Stub::PrepareAsyncPull, request, callback);
}
/// Tell remote object manager to free objects
///
/// \param request The request message
/// \param callback The callback function that handles reply
void FreeObjects(const FreeObjectsRequest &request,
const ClientCallback<FreeObjectsReply> &callback) {
client_call_manager_
.CreateCall<ObjectManagerService, FreeObjectsRequest, FreeObjectsReply>(
*stub_, &ObjectManagerService::Stub::PrepareAsyncFreeObjects, request,
callback);
}
private:
/// The gRPC-generated stub.
std::unique_ptr<ObjectManagerService::Stub> stub_;
/// The `ClientCallManager` used for managing requests.
ClientCallManager &client_call_manager_;
};
} // namespace rpc
} // namespace ray
#endif // RAY_RPC_OBJECT_MANAGER_CLIENT_H
@@ -0,0 +1,92 @@
#ifndef RAY_RPC_OBJECT_MANAGER_SERVER_H
#define RAY_RPC_OBJECT_MANAGER_SERVER_H
#include "src/ray/rpc/grpc_server.h"
#include "src/ray/rpc/server_call.h"
#include "src/ray/protobuf/object_manager.grpc.pb.h"
#include "src/ray/protobuf/object_manager.pb.h"
namespace ray {
namespace rpc {
/// Implementations of the `ObjectManagerGrpcService`, check interface in
/// `src/ray/protobuf/object_manager.proto`.
class ObjectManagerServiceHandler {
public:
/// Handle a `Push` request.
/// The implementation can handle this request asynchronously. When handling is done,
/// the `done_callback` should be called.
///
/// \param[in] request The request message.
/// \param[out] reply The reply message.
/// \param[in] done_callback The callback to be called when the request is done.
virtual void HandlePushRequest(const PushRequest &request, PushReply *reply,
RequestDoneCallback done_callback) = 0;
/// Handle a `Pull` request
virtual void HandlePullRequest(const PullRequest &request, PullReply *reply,
RequestDoneCallback done_callback) = 0;
/// Handle a `FreeObjects` request
virtual void HandleFreeObjectsRequest(const FreeObjectsRequest &request,
FreeObjectsReply *reply,
RequestDoneCallback done_callback) = 0;
};
/// The `GrpcService` for `ObjectManagerGrpcService`.
class ObjectManagerGrpcService : public GrpcService {
public:
/// Construct a `ObjectManagerGrpcService`.
///
/// \param[in] port See `GrpcService`.
/// \param[in] handler The service handler that actually handle the requests.
ObjectManagerGrpcService(boost::asio::io_service &io_service,
ObjectManagerServiceHandler &service_handler)
: GrpcService(io_service), service_handler_(service_handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
// Initialize the factory for `Push` requests.
std::unique_ptr<ServerCallFactory> push_call_factory(
new ServerCallFactoryImpl<ObjectManagerService, ObjectManagerServiceHandler,
PushRequest, PushReply>(
service_, &ObjectManagerService::AsyncService::RequestPush, service_handler_,
&ObjectManagerServiceHandler::HandlePushRequest, cq, main_service_));
server_call_factories_and_concurrencies->emplace_back(std::move(push_call_factory),
50);
// Initialize the factory for `Pull` requests.
std::unique_ptr<ServerCallFactory> pull_call_factory(
new ServerCallFactoryImpl<ObjectManagerService, ObjectManagerServiceHandler,
PullRequest, PullReply>(
service_, &ObjectManagerService::AsyncService::RequestPull, service_handler_,
&ObjectManagerServiceHandler::HandlePullRequest, cq, main_service_));
server_call_factories_and_concurrencies->emplace_back(std::move(pull_call_factory),
50);
// Initialize the factory for `FreeObjects` requests.
std::unique_ptr<ServerCallFactory> free_objects_call_factory(
new ServerCallFactoryImpl<ObjectManagerService, ObjectManagerServiceHandler,
FreeObjectsRequest, FreeObjectsReply>(
service_, &ObjectManagerService::AsyncService::RequestFreeObjects,
service_handler_, &ObjectManagerServiceHandler::HandleFreeObjectsRequest, cq,
main_service_));
server_call_factories_and_concurrencies->emplace_back(
std::move(free_objects_call_factory), 2);
}
private:
/// The grpc async service object.
ObjectManagerService::AsyncService service_;
/// The service handler that actually handle the requests.
ObjectManagerServiceHandler &service_handler_;
};
} // namespace rpc
} // namespace ray
#endif
+15 -4
View File
@@ -59,6 +59,10 @@ class ServerCall {
/// Get the factory that created this `ServerCall`.
virtual const ServerCallFactory &GetFactory() const = 0;
/// Finish the `ServerCall`.
virtual void Finish(Status status) = 0;
/// Virtual destruct function to make sure subclass would destruct properly.
virtual ~ServerCall() = default;
};
@@ -113,7 +117,14 @@ class ServerCallImpl : public ServerCall {
void SetState(const ServerCallState &new_state) override { state_ = new_state; }
void HandleRequest() override {
io_service_.post([this] { HandleRequestImpl(); });
if (!io_service_.stopped()) {
io_service_.post([this] { HandleRequestImpl(); });
} else {
// Handle service for rpc call has stopped, we must handle the call here
// to send reply and remove it from cq
RAY_LOG(DEBUG) << "Handle service has been closed.";
Finish(Status::Invalid("HandleServiceClosed"));
}
}
void HandleRequestImpl() {
@@ -123,19 +134,19 @@ class ServerCallImpl : public ServerCall {
// When the handler is done with the
// request, tell gRPC to finish this
// request.
SendReply(status);
Finish(status);
});
}
const ServerCallFactory &GetFactory() const override { return factory_; }
private:
/// Tell gRPC to finish this request.
void SendReply(Status status) {
void Finish(Status status) override {
state_ = ServerCallState::SENDING_REPLY;
response_writer_.Finish(reply_, RayStatusToGrpcStatus(status), this);
}
private:
/// State of this call.
ServerCallState state_;