diff --git a/BUILD.bazel b/BUILD.bazel index bc9e6bcd8..710c1b9c3 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -37,8 +37,38 @@ cc_proto_library( deps = ["node_manager_proto"], ) +proto_library( + name = "object_manager_proto", + srcs = ["src/ray/protobuf/object_manager.proto"], +) + +cc_proto_library( + name = "object_manager_cc_proto", + deps = ["object_manager_proto"], +) + # === End of protobuf definitions === +# === Begin of rpc definitions === + +# grpc common lib +cc_library( + name = "grpc_common_lib", + srcs = glob([ + "src/ray/rpc/*.cc", + ]), + hdrs = glob([ + "src/ray/rpc/*.h", + ]), + copts = COPTS, + deps = [ + ":ray_common", + "@boost//:asio", + "@com_github_grpc_grpc//:grpc++", + "@com_google_protobuf//:protobuf", + ], +) + # Node manager gRPC lib. cc_grpc_library( name = "node_manager_cc_grpc", @@ -50,14 +80,12 @@ cc_grpc_library( # Node manager server and client. cc_library( name = "node_manager_rpc", - srcs = glob([ - "src/ray/rpc/*.cc", - ]), hdrs = glob([ - "src/ray/rpc/*.h", + "src/ray/rpc/node_manager/*.h", ]), copts = COPTS, deps = [ + ":grpc_common_lib", ":node_manager_cc_grpc", ":ray_common", "@boost//:asio", @@ -65,6 +93,32 @@ cc_library( ], ) +# Object manager gRPC lib. +cc_grpc_library( + name = "object_manager_cc_grpc", + srcs = [":object_manager_proto"], + grpc_only = True, + deps = [":object_manager_cc_proto"], +) + +# Object manager server and client. +cc_library( + name = "object_manager_rpc", + hdrs = glob([ + "src/ray/rpc/object_manager/*.h", + ]), + copts = COPTS, + deps = [ + ":grpc_common_lib", + ":object_manager_cc_grpc", + ":ray_common", + "@boost//:asio", + "@com_github_grpc_grpc//:grpc++", + ], +) + +# === End of rpc definitions === + cc_binary( name = "raylet", srcs = ["src/ray/raylet/main.cc"], @@ -320,6 +374,7 @@ cc_library( deps = [ ":gcs", ":object_manager_fbs", + ":object_manager_rpc", ":ray_common", ":ray_util", "@boost//:asio", diff --git a/python/ray/tests/perf_integration_tests/test_perf_integration.py b/python/ray/tests/perf_integration_tests/test_perf_integration.py index 393f191e1..b4225dec1 100644 --- a/python/ray/tests/perf_integration_tests/test_perf_integration.py +++ b/python/ray/tests/perf_integration_tests/test_perf_integration.py @@ -72,3 +72,31 @@ def test_task_forward(benchmark, num_tasks): # Warm up ray.get([f.remote() for _ in range(100)]) benchmark(benchmark_task_forward, f, num_tasks) + + +def benchmark_transfer_object(actor, object_ids): + ray.get(actor.f.remote(object_ids)) + + +@pytest.mark.benchmark +@pytest.mark.parametrize("object_number, data_size", + [(10000, 500), (10000, 5000), (1000, 500), + (1000, 5000)]) +def test_transfer_performance(benchmark, ray_start_cluster_head, object_number, + data_size): + cluster = ray_start_cluster_head + cluster.add_node(resources={"my_resource": 1}, object_store_memory=10**9) + + @ray.remote(resources={"my_resource": 1}) + class ObjectActor: + def f(self, object_ids): + ray.get(object_ids) + + # setup remote actor + actor = ObjectActor.remote() + actor.f.remote([]) + + data = bytes(1) * data_size + object_ids = [ray.put(data) for _ in range(object_number)] + + benchmark(benchmark_transfer_object, actor, object_ids) diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index 73c28b7be..c928d69e6 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -24,44 +24,3 @@ table ObjectInfo { // Specifies if this object was deleted or added. is_deletion: bool; } - -enum MessageType:int { - ConnectClient = 1, - DisconnectClient, - PushRequest, - PullRequest, - FreeRequest -} - -table PushRequestMessage { - // The push ID to allow the receiver to differentiate different push attempts - // from the same sender. - push_id: string; - // The object ID being transferred. - object_id: string; - // The index of the chunk being transferred. - chunk_index: ulong; - // The total size of the object + metadata. - data_size: ulong; - // The metadata size. - metadata_size: ulong; -} - -table PullRequestMessage { - // ID of the requesting client. - client_id: string; - // Requested ObjectID. - object_id: string; -} - -table ConnectClientMessage { - // ID of the connecting client. - client_id: string; - // Whether this is a transfer connection. - is_transfer: bool; -} - -table FreeRequestMessage { - // List of IDs to be deleted. - object_ids: [string]; -} diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index ee2a2319c..2c8430d50 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -108,6 +108,9 @@ std::pair ObjectBufferPool::Cr create_buffer_state_.emplace( std::piecewise_construct, std::forward_as_tuple(object_id), std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size))); + RAY_LOG(DEBUG) << "Created object " << object_id + << " in plasma store, number of chunks: " << num_chunks + << ", chunk index: " << chunk_index; RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks); } if (create_buffer_state_[object_id].chunk_state[chunk_index] != @@ -154,6 +157,8 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id)); RAY_ARROW_CHECK_OK(store_client_.Release(plasma_id)); create_buffer_state_.erase(object_id); + RAY_LOG(DEBUG) << "Have received all chunks for object " << object_id + << ", last chunk index: " << chunk_index; } } diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 964cee605..e25ba99c7 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -16,12 +16,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service, object_directory_(std::move(object_directory)), store_notification_(main_service, config_.store_socket_name), buffer_pool_(config_.store_socket_name, config_.object_chunk_size), - send_work_(send_service_), - receive_work_(receive_service_), + rpc_work_(rpc_service_), connection_pool_(), - gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { - RAY_CHECK(config_.max_sends > 0); - RAY_CHECK(config_.max_receives > 0); + gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()), + object_manager_server_("object_manager", config_.object_manager_port), + object_manager_service_(rpc_service_, *this), + client_call_manager_(main_service) { + RAY_CHECK(config_.rpc_service_threads_number > 0); client_id_ = object_directory_->GetLocalClientID(); main_service_ = &main_service; store_notification_.SubscribeObjAdded( @@ -30,35 +31,32 @@ ObjectManager::ObjectManager(asio::io_service &main_service, }); store_notification_.SubscribeObjDeleted( [this](const ObjectID &oid) { NotifyDirectoryObjectDeleted(oid); }); - StartIOService(); + + // Start object manager rpc server and send & receive request threads + StartRpcService(); } -ObjectManager::~ObjectManager() { StopIOService(); } +ObjectManager::~ObjectManager() { StopRpcService(); } void ObjectManager::RegisterGcs() { object_directory_->RegisterBackend(); } -void ObjectManager::StartIOService() { - for (int i = 0; i < config_.max_sends; ++i) { - send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this)); - } - for (int i = 0; i < config_.max_receives; ++i) { - receive_threads_.emplace_back(std::thread(&ObjectManager::RunReceiveService, this)); +void ObjectManager::RunRpcService() { rpc_service_.run(); } + +void ObjectManager::StartRpcService() { + rpc_threads_.resize(config_.rpc_service_threads_number); + for (int i = 0; i < config_.rpc_service_threads_number; i++) { + rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this); } + object_manager_server_.RegisterService(object_manager_service_); + object_manager_server_.Run(); } -void ObjectManager::RunSendService() { send_service_.run(); } - -void ObjectManager::RunReceiveService() { receive_service_.run(); } - -void ObjectManager::StopIOService() { - send_service_.stop(); - for (int i = 0; i < config_.max_sends; ++i) { - send_threads_[i].join(); - } - receive_service_.stop(); - for (int i = 0; i < config_.max_receives; ++i) { - receive_threads_[i].join(); +void ObjectManager::StopRpcService() { + rpc_service_.stop(); + for (int i = 0; i < config_.rpc_service_threads_number; i++) { + rpc_threads_[i].join(); } + object_manager_server_.Shutdown(); } void ObjectManager::HandleObjectAdded( @@ -201,8 +199,18 @@ void ObjectManager::TryPull(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Sending pull request from " << client_id_ << " to " << client_id << " of object " << object_id; - // Try pulling from the client. - PullEstablishConnection(object_id, client_id); + + auto rpc_client = GetRpcClient(client_id); + if (rpc_client) { + // Try pulling from the client. + rpc_service_.post([this, object_id, client_id, rpc_client]() { + SendPullRequest(object_id, client_id, rpc_client); + }); + } else { + RAY_LOG(ERROR) << "Couldn't send pull request from " << client_id_ << " to " + << client_id << " of object " << object_id + << " , setup rpc connection failed."; + } // If there are more clients to try, try them in succession, with a timeout // in between each try. @@ -238,51 +246,20 @@ void ObjectManager::TryPull(const ObjectID &object_id) { } }; -void ObjectManager::PullEstablishConnection(const ObjectID &object_id, - const ClientID &client_id) { - // Acquire a message connection and send pull request. - ray::Status status; - std::shared_ptr conn; - // TODO(hme): There is no cap on the number of pull request connections. - connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, client_id, &conn); +void ObjectManager::SendPullRequest( + const ObjectID &object_id, const ClientID &client_id, + std::shared_ptr rpc_client) { + rpc::PullRequest pull_request; + pull_request.set_object_id(object_id.Binary()); + pull_request.set_client_id(client_id_.Binary()); - // Try to create a new connection to the remote object manager if one doesn't - // already exist. - if (conn == nullptr) { - RemoteConnectionInfo connection_info(client_id); - object_directory_->LookupRemoteConnectionInfo(connection_info); - if (connection_info.Connected()) { - conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, - connection_info); - } else { - RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; + rpc_client->Pull(pull_request, [object_id, client_id](const Status &status, + const rpc::PullReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Send pull " << object_id << " request to client " << client_id + << " failed due to" << status.message(); } - } - - if (conn != nullptr) { - PullSendRequest(object_id, conn); - connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); - } -} - -void ObjectManager::PullSendRequest(const ObjectID &object_id, - std::shared_ptr &conn) { - // TODO(rkn): This would be a natural place to record a profile event - // indicating that a pull request was sent. - - flatbuffers::FlatBufferBuilder fbb; - auto message = object_manager_protocol::CreatePullRequestMessage( - fbb, fbb.CreateString(client_id_.Binary()), fbb.CreateString(object_id.Binary())); - fbb.Finish(message); - conn->WriteMessageAsync( - static_cast(object_manager_protocol::MessageType::PullRequest), - fbb.GetSize(), fbb.GetBufferPointer(), [this, conn](ray::Status status) { - if (!status.ok()) { - RAY_CHECK(status.IsIOError()) - << "Failed to contact remote object manager during Pull"; - connection_pool_.RemoveSender(conn); - } - }); + }); } void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id, @@ -318,6 +295,8 @@ void ObjectManager::HandleSendFinished(const ObjectID &object_id, profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," + std::to_string(chunk_index) + ",\"" + status.ToString() + "\"]"); + + std::lock_guard lock(profile_mutex_); profile_events_.push_back(profile_event); } @@ -335,9 +314,12 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, profile_event.set_end_time(end_time); // Encode the object ID, client ID, chunk index, and status as a json list, // which will be parsed by the reader of the profile table. + profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"," + std::to_string(chunk_index) + ",\"" + status.ToString() + "\"]"); + + std::lock_guard lock(profile_mutex_); profile_events_.push_back(profile_event); } @@ -399,35 +381,29 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { } } - RemoteConnectionInfo connection_info(client_id); - object_directory_->LookupRemoteConnectionInfo(connection_info); - if (connection_info.Connected()) { + auto rpc_client = GetRpcClient(client_id); + if (rpc_client) { const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id].object_info; uint64_t data_size = static_cast(object_info.data_size + object_info.metadata_size); uint64_t metadata_size = static_cast(object_info.metadata_size); uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + + RAY_LOG(DEBUG) << "Sending object chunks of " << object_id << " to client " + << client_id << ", number of chunks: " << num_chunks + << ", total data size: " << data_size; + UniqueID push_id = UniqueID::FromRandom(); for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - send_service_.post([this, push_id, client_id, object_id, data_size, metadata_size, - chunk_index, connection_info]() { - double start_time = current_sys_time_seconds(); - // NOTE: When this callback executes, it's possible that the object - // will have already been evicted. It's also possible that the - // object could be in the process of being transferred to this - // object manager from another object manager. - ray::Status status = - ExecuteSendObject(push_id, client_id, object_id, data_size, metadata_size, - chunk_index, connection_info); - double end_time = current_sys_time_seconds(); - - // Notify the main thread that we have finished sending the chunk. - main_service_->post( - [this, object_id, client_id, chunk_index, start_time, end_time, status]() { - HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, - status); - }); + rpc_service_.post([this, push_id, object_id, client_id, data_size, metadata_size, + chunk_index, rpc_client]() { + auto st = SendObjectChunk(push_id, object_id, client_id, data_size, metadata_size, + chunk_index, rpc_client); + if (!st.ok()) { + RAY_LOG(WARNING) << "Send object " << object_id << " chunk failed due to " + << st.message() << ", chunk index " << chunk_index; + } }); } } else { @@ -437,37 +413,21 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { } } -ray::Status ObjectManager::ExecuteSendObject( - const UniqueID &push_id, const ClientID &client_id, const ObjectID &object_id, +ray::Status ObjectManager::SendObjectChunk( + const UniqueID &push_id, const ObjectID &object_id, const ClientID &client_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &connection_info) { - RAY_LOG(DEBUG) << "ExecuteSendObject on " << client_id_ << " to " << client_id - << " of object " << object_id << " chunk " << chunk_index; - ray::Status status; - std::shared_ptr conn; - connection_pool_.GetSender(ConnectionPool::ConnectionType::TRANSFER, client_id, &conn); - if (conn == nullptr) { - conn = - CreateSenderConnection(ConnectionPool::ConnectionType::TRANSFER, connection_info); - } + std::shared_ptr rpc_client) { + double start_time = current_sys_time_seconds(); + rpc::PushRequest push_request; + // Set request header + push_request.set_push_id(push_id.Binary()); + push_request.set_object_id(object_id.Binary()); + push_request.set_client_id(client_id_.Binary()); + push_request.set_data_size(data_size); + push_request.set_metadata_size(metadata_size); + push_request.set_chunk_index(chunk_index); - if (conn != nullptr) { - status = SendObjectHeaders(push_id, object_id, data_size, metadata_size, chunk_index, - conn); - if (!status.ok()) { - RAY_CHECK(status.IsIOError()) - << "Failed to contact remote object manager during Push"; - connection_pool_.RemoveSender(conn); - } - } - return status; -} - -ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id, - const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - std::shared_ptr &conn) { + // Get data std::pair chunk_status = buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index); ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first; @@ -481,36 +441,30 @@ ray::Status ObjectManager::SendObjectHeaders(const UniqueID &push_id, RAY_RETURN_NOT_OK(status); } - // Create buffer. - flatbuffers::FlatBufferBuilder fbb; - auto message = object_manager_protocol::CreatePushRequestMessage( - fbb, to_flatbuf(fbb, push_id), to_flatbuf(fbb, object_id), chunk_index, data_size, - metadata_size); - fbb.Finish(message); - status = conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::PushRequest), - fbb.GetSize(), fbb.GetBufferPointer()); - if (!status.ok()) { - return status; - } - return SendObjectData(object_id, chunk_info, conn); -} + std::string buffer; + buffer.resize(chunk_info.buffer_length); + buffer.assign(chunk_info.data, chunk_info.data + chunk_info.buffer_length); + push_request.set_data(std::move(buffer)); -ray::Status ObjectManager::SendObjectData(const ObjectID &object_id, - const ObjectBufferPool::ChunkInfo &chunk_info, - std::shared_ptr &conn) { - boost::system::error_code error; - std::vector buffer; - buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length)); - Status status = conn->WriteBuffer(buffer); + // record the time cost between send chunk and receive reply + rpc::ClientCallback callback = [this, start_time, object_id, client_id, + chunk_index]( + const Status &status, + const rpc::PushReply &reply) { + // TODO: Just print warning here, should we try to resend this chunk? + if (!status.ok()) { + RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client " << client_id + << " failed due to" << status.message() + << ", chunk index: " << chunk_index; + } + double end_time = current_sys_time_seconds(); + HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status); + }; + rpc_client->Push(push_request, callback); // Do this regardless of whether it failed or succeeded. buffer_pool_.ReleaseGetChunk(object_id, chunk_info.chunk_index); - - if (status.ok()) { - connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn); - } - return status; + return Status::OK(); } void ObjectManager::CancelPull(const ObjectID &object_id) { @@ -708,140 +662,37 @@ void ObjectManager::WaitComplete(const UniqueID &wait_id) { << " remaining " << remaining.size(); } -std::shared_ptr ObjectManager::CreateSenderConnection( - ConnectionPool::ConnectionType type, RemoteConnectionInfo info) { - std::shared_ptr conn = - SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port); - if (conn == nullptr) { - RAY_LOG(ERROR) << "Failed to connect to remote object manager."; - } else { - // Register the new connection. - connection_pool_.RegisterSender(type, info.client_id, conn); - // Prepare client connection info buffer - flatbuffers::FlatBufferBuilder fbb; - bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); - auto message = object_manager_protocol::CreateConnectClientMessage( - fbb, to_flatbuf(fbb, client_id_), is_transfer); - fbb.Finish(message); - // Send synchronously. - // TODO(swang): Make this a WriteMessageAsync. - RAY_CHECK_OK(conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::ConnectClient), - fbb.GetSize(), fbb.GetBufferPointer())); - } - return conn; -} +/// Implementation of ObjectManagerServiceHandler +void ObjectManager::HandlePushRequest(const rpc::PushRequest &request, + rpc::PushReply *reply, + rpc::RequestDoneCallback done_callback) { + ObjectID object_id = ObjectID::FromBinary(request.object_id()); + ClientID client_id = ClientID::FromBinary(request.client_id()); -void ObjectManager::ProcessNewClient(TcpClientConnection &conn) { - conn.ProcessMessages(); -} - -void ObjectManager::ProcessClientMessage(std::shared_ptr &conn, - int64_t message_type, const uint8_t *message) { - const auto message_type_value = - static_cast(message_type); - RAY_LOG(DEBUG) << "[ObjectManager] Message " - << object_manager_protocol::EnumNameMessageType(message_type_value) - << "(" << message_type << ") from object manager"; - switch (message_type_value) { - case object_manager_protocol::MessageType::PushRequest: { - ReceivePushRequest(conn, message); - break; - } - case object_manager_protocol::MessageType::PullRequest: { - ReceivePullRequest(conn, message); - break; - } - case object_manager_protocol::MessageType::ConnectClient: { - ConnectClient(conn, message); - break; - } - case object_manager_protocol::MessageType::FreeRequest: { - ReceiveFreeRequest(conn, message); - break; - } - case object_manager_protocol::MessageType::DisconnectClient: { - DisconnectClient(conn, message); - break; - } - default: { RAY_LOG(FATAL) << "invalid request " << message_type; } - } -} - -void ObjectManager::ConnectClient(std::shared_ptr &conn, - const uint8_t *message) { - // TODO: trash connection on failure. - auto info = - flatbuffers::GetRoot(message); - ClientID client_id = ClientID::FromBinary(info->client_id()->str()); - bool is_transfer = info->is_transfer(); - conn->SetClientID(client_id); - if (is_transfer) { - connection_pool_.RegisterReceiver(ConnectionPool::ConnectionType::TRANSFER, client_id, - conn); - } else { - connection_pool_.RegisterReceiver(ConnectionPool::ConnectionType::MESSAGE, client_id, - conn); - } - conn->ProcessMessages(); -} - -void ObjectManager::DisconnectClient(std::shared_ptr &conn, - const uint8_t *message) { - connection_pool_.RemoveReceiver(conn); - - // We don't need to clean up unfulfilled_push_requests_ because the - // unfulfilled push timers will fire and clean it up. -} - -void ObjectManager::ReceivePullRequest(std::shared_ptr &conn, - const uint8_t *message) { - // Serialize and push object to requesting client. - auto pr = flatbuffers::GetRoot(message); - ObjectID object_id = ObjectID::FromBinary(pr->object_id()->str()); - ClientID client_id = ClientID::FromBinary(pr->client_id()->str()); - - rpc::ProfileTableData::ProfileEvent profile_event; - profile_event.set_event_type("receive_pull_request"); - profile_event.set_start_time(current_sys_time_seconds()); - profile_event.set_end_time(profile_event.start_time()); - profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + - "\"]"); - profile_events_.push_back(profile_event); - - Push(object_id, client_id); - conn->ProcessMessages(); -} - -void ObjectManager::ReceivePushRequest(std::shared_ptr &conn, - const uint8_t *message) { // Serialize. - auto object_header = - flatbuffers::GetRoot(message); - const ObjectID object_id = ObjectID::FromBinary(object_header->object_id()->str()); - uint64_t chunk_index = object_header->chunk_index(); - uint64_t data_size = object_header->data_size(); - uint64_t metadata_size = object_header->metadata_size(); - receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() { - double start_time = current_sys_time_seconds(); - const ClientID client_id = conn->GetClientId(); - auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size, - chunk_index, *conn); - double end_time = current_sys_time_seconds(); - // Notify the main thread that we have finished receiving the object. - main_service_->post( - [this, object_id, client_id, chunk_index, start_time, end_time, status]() { - HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, - status); - }); - }); + uint64_t chunk_index = request.chunk_index(); + uint64_t metadata_size = request.metadata_size(); + uint64_t data_size = request.data_size(); + const std::string &data = request.data(); + + double start_time = current_sys_time_seconds(); + auto status = ReceiveObjectChunk(client_id, object_id, data_size, metadata_size, + chunk_index, data); + double end_time = current_sys_time_seconds(); + + HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, status); + done_callback(status); } -ray::Status ObjectManager::ExecuteReceiveObject( - const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, TcpClientConnection &conn) { - RAY_LOG(DEBUG) << "ExecuteReceiveObject on " << client_id_ << " from " << client_id - << " of object " << object_id << " chunk " << chunk_index; +ray::Status ObjectManager::ReceiveObjectChunk(const ClientID &client_id, + const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, + const std::string &data) { + RAY_LOG(DEBUG) << "ReceiveObjectChunk on " << client_id_ << " from " << client_id + << " of object " << object_id << " chunk index: " << chunk_index + << ", chunk data size: " << data.size() + << ", object size: " << data_size; std::pair chunk_status = buffer_pool_.CreateChunk(object_id, data_size, metadata_size, chunk_index); @@ -849,94 +700,108 @@ ray::Status ObjectManager::ExecuteReceiveObject( ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first; if (chunk_status.second.ok()) { // Avoid handling this chunk if it's already being handled by another process. - std::vector buffer; - buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length)); - status = conn.ReadBuffer(buffer); - if (status.ok()) { - buffer_pool_.SealChunk(object_id, chunk_index); - } else { - // We may have not have read out the correct data, so abort this chunk. - buffer_pool_.AbortCreateChunk(object_id, chunk_index); - // TODO(hme): This chunk failed, so create a pull request for this chunk. - } + std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length); + buffer_pool_.SealChunk(object_id, chunk_index); } else { - RAY_LOG(DEBUG) << "ExecuteReceiveObject failed: " << chunk_status.second.message(); - // Read object into empty buffer. - uint64_t buffer_length = buffer_pool_.GetBufferLength(chunk_index, data_size); - std::vector mutable_vec; - mutable_vec.resize(buffer_length); - std::vector buffer; - buffer.push_back(asio::buffer(mutable_vec, buffer_length)); - status = conn.ReadBuffer(buffer); + RAY_LOG(WARNING) << "ReceiveObjectChunk index " << chunk_index << " of object " + << object_id << " failed: " << chunk_status.second.message(); // TODO(hme): If the object isn't local, create a pull request for this chunk. } - - RAY_LOG(DEBUG) << "ExecuteReceiveObject completed on " << client_id_ << " from " - << client_id << " of object " << object_id << " chunk " << chunk_index - << " at " << current_sys_time_ms(); - if (status.ok()) { - // We successfully read the buffer, so we are ready to receive the next - // message. - conn.ProcessMessages(); - } else { - // Close the connection by skipping the call to ProcessMessages. - RAY_LOG(ERROR) << "Failed to ExecuteReceiveObject from remote object manager, error: " - << status; - } - return status; } -void ObjectManager::ReceiveFreeRequest(std::shared_ptr &conn, - const uint8_t *message) { - auto free_request = - flatbuffers::GetRoot(message); - std::vector object_ids = from_flatbuf(*free_request->object_ids()); - // This RPC should come from another Object Manager. - // Keep this request local. - bool local_only = true; - FreeObjects(object_ids, local_only); - conn->ProcessMessages(); +void ObjectManager::HandlePullRequest(const rpc::PullRequest &request, + rpc::PullReply *reply, + rpc::RequestDoneCallback done_callback) { + ObjectID object_id = ObjectID::FromBinary(request.object_id()); + ClientID client_id = ClientID::FromBinary(request.client_id()); + RAY_LOG(DEBUG) << "Received pull request from client " << client_id << " for object [" + << object_id << "]."; + + rpc::ProfileTableData::ProfileEvent profile_event; + profile_event.set_event_type("receive_pull_request"); + profile_event.set_start_time(current_sys_time_seconds()); + profile_event.set_end_time(profile_event.start_time()); + profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + + "\"]"); + { + std::lock_guard lock(profile_mutex_); + profile_events_.emplace_back(profile_event); + } + + main_service_->post([this, object_id, client_id]() { Push(object_id, client_id); }); + done_callback(Status::OK()); +} + +void ObjectManager::HandleFreeObjectsRequest(const rpc::FreeObjectsRequest &request, + rpc::FreeObjectsReply *reply, + rpc::RequestDoneCallback done_callback) { + std::vector object_ids; + for (const auto &e : request.object_ids()) { + object_ids.emplace_back(ObjectID::FromBinary(e)); + } + FreeObjects(object_ids, /* local_only */ true); + done_callback(Status::OK()); } void ObjectManager::FreeObjects(const std::vector &object_ids, bool local_only) { buffer_pool_.FreeObjects(object_ids); if (!local_only) { - SpreadFreeObjectRequest(object_ids); + const auto remote_connections = object_directory_->LookupAllRemoteConnections(); + std::vector> rpc_clients; + for (const auto &connection_info : remote_connections) { + auto rpc_client = GetRpcClient(connection_info.client_id); + if (rpc_client != nullptr) { + rpc_clients.push_back(rpc_client); + } + } + rpc_service_.post([this, object_ids, rpc_clients]() { + SpreadFreeObjectsRequest(object_ids, rpc_clients); + }); } } -void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ids) { +void ObjectManager::SpreadFreeObjectsRequest( + const std::vector &object_ids, + const std::vector> &rpc_clients) { // This code path should be called from node manager. - flatbuffers::FlatBufferBuilder fbb; - flatbuffers::Offset request = - object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids)); - fbb.Finish(request); - - const auto remote_connections = object_directory_->LookupAllRemoteConnections(); - for (const auto &connection_info : remote_connections) { - std::shared_ptr conn; - connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, - connection_info.client_id, &conn); - if (conn == nullptr) { - conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, - connection_info); - } - - if (conn != nullptr) { - conn->WriteMessageAsync( - static_cast(object_manager_protocol::MessageType::FreeRequest), - fbb.GetSize(), fbb.GetBufferPointer(), [this, conn](ray::Status status) { - if (!status.ok()) { - RAY_CHECK(status.IsIOError()) - << "Failed to contact remote object manager during Free"; - connection_pool_.RemoveSender(conn); - } - }); - connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); - } + rpc::FreeObjectsRequest free_objects_request; + for (const auto &e : object_ids) { + free_objects_request.add_object_ids(e.Binary()); } + + for (auto &rpc_client : rpc_clients) { + rpc_client->FreeObjects(free_objects_request, [](const Status &status, + const rpc::FreeObjectsReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Send free objects request failed due to" << status.message(); + } + }); + } +} + +std::shared_ptr ObjectManager::GetRpcClient( + const ClientID &client_id) { + auto it = remote_object_manager_clients_.find(client_id); + if (it == remote_object_manager_clients_.end()) { + RemoteConnectionInfo connection_info(client_id); + object_directory_->LookupRemoteConnectionInfo(connection_info); + if (!connection_info.Connected()) { + return nullptr; + } + auto object_manager_client = std::make_shared( + connection_info.ip, connection_info.port, client_call_manager_); + + RAY_LOG(DEBUG) << "Get rpc client, address: " << connection_info.ip + << ", port: " << connection_info.port + << ", local port: " << GetServerPort(); + + it = remote_object_manager_clients_ + .emplace(client_id, std::move(object_manager_client)) + .first; + } + return it->second; } rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() { @@ -944,12 +809,14 @@ rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() { profile_info.set_component_type("object_manager"); profile_info.set_component_id(client_id_.Binary()); - for (auto const &profile_event : profile_events_) { - profile_info.add_profile_events()->CopyFrom(profile_event); + { + std::lock_guard lock(profile_mutex_); + for (auto const &profile_event : profile_events_) { + profile_info.add_profile_events()->CopyFrom(profile_event); + } + profile_events_.clear(); } - profile_events_.clear(); - return profile_info; } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 6664dd0a9..f40fd43b1 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -25,6 +26,8 @@ #include "ray/object_manager/object_directory.h" #include "ray/object_manager/object_manager_client_connection.h" #include "ray/object_manager/object_store_notification_manager.h" +#include "ray/rpc/object_manager/object_manager_client.h" +#include "ray/rpc/object_manager/object_manager_server.h" namespace ray { @@ -36,10 +39,6 @@ struct ObjectManagerConfig { /// The time in milliseconds to wait before retrying a pull /// that fails due to client id lookup. uint pull_timeout_ms; - /// Maximum number of sends allowed. - int max_sends; - /// Maximum number of receives allowed. - int max_receives; /// Object chunk size, in bytes uint64_t object_chunk_size; /// The store socket name. @@ -49,6 +48,9 @@ struct ObjectManagerConfig { /// Negative: waiting infinitely. /// 0: giving up retrying immediately. int push_timeout_ms; + /// Number of threads of rpc service + /// Send and receive request in these threads + int rpc_service_threads_number; }; struct LocalObjectInfo { @@ -67,7 +69,81 @@ class ObjectManagerInterface { }; // TODO(hme): Add success/failure callbacks for push and pull. -class ObjectManager : public ObjectManagerInterface { +class ObjectManager : public ObjectManagerInterface, + public rpc::ObjectManagerServiceHandler { + public: + /// Implementation of object manager service + + /// Handle push request from remote object manager + /// + /// Push request will contain the object which is specified by pull request + /// the object will be transfered by a sequence of chunks. + /// + /// \param request Push request including the object chunk data + /// \param reply Reply to the sender + /// \param done_callback Callback of the request + void HandlePushRequest(const rpc::PushRequest &request, rpc::PushReply *reply, + rpc::RequestDoneCallback done_callback) override; + + /// Handle pull request from remote object manager + /// + /// \param request Pull request + /// \param reply Reply + /// \param done_callback Callback of request + void HandlePullRequest(const rpc::PullRequest &request, rpc::PullReply *reply, + rpc::RequestDoneCallback done_callback) override; + + /// Handle free objects request + /// + /// \param request Free objects request + /// \param reply Reply + /// \param done_callback + void HandleFreeObjectsRequest(const rpc::FreeObjectsRequest &request, + rpc::FreeObjectsReply *reply, + rpc::RequestDoneCallback done_callback) override; + + /// Send object to remote object manager + /// + /// Object will be transfered as a sequence of chunks, small object(defined in config) + /// contains only one chunk + /// \param push_id Unique push id to indicate this push request + /// \param object_id Object id + /// \param data_size Data size + /// \param metadata_size Metadata size + /// \param chunk_index Chunk index of this object chunk, start with 0 + /// \param rpc_client Rpc client used to send message to remote object manager + ray::Status SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id, + const ClientID &client_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + std::shared_ptr rpc_client); + + /// Receive object chunk from remote object manager, small object may contain one chunk + /// + /// \param client_id Client id of remote object manager which sends this chunk + /// \param object_id Object id + /// \param data_size Data size + /// \param metadata_size Metadata size + /// \param chunk_index Chunk index + /// \param data Chunk data + ray::Status ReceiveObjectChunk(const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, const std::string &data); + + /// Send pull request + /// + /// \param object_id Object id + /// \param client_id Remote server client id + void SendPullRequest(const ObjectID &object_id, const ClientID &client_id, + std::shared_ptr rpc_client); + + /// Get the rpc client according to the client ID + /// + /// \param client_id Remote client id, will send rpc request to it + std::shared_ptr GetRpcClient(const ClientID &client_id); + + /// Get the port of the object manager rpc server. + int GetServerPort() const { return object_manager_server_.GetPort(); } + public: /// Takes user-defined ObjectDirectoryInterface implementation. /// When this constructor is used, the ObjectManager assumes ownership of @@ -244,13 +320,14 @@ class ObjectManager : public ObjectManagerInterface { /// Spread the Free request to all objects managers. /// /// \param object_ids the The list of ObjectIDs to be deleted. - void SpreadFreeObjectRequest(const std::vector &object_ids); + void SpreadFreeObjectsRequest( + const std::vector &object_ids, + const std::vector> &rpc_clients); - /// Handle starting, running, and stopping asio io_service. - void StartIOService(); - void RunSendService(); - void RunReceiveService(); - void StopIOService(); + /// Handle starting, running, and stopping asio rpc_service. + void StartRpcService(); + void RunRpcService(); + void StopRpcService(); /// Handle an object being added to this node. This adds the object to the /// directory, pushes the object to other nodes if necessary, and cancels any @@ -274,9 +351,6 @@ class ObjectManager : public ObjectManagerInterface { void PullSendRequest(const ObjectID &object_id, std::shared_ptr &conn); - std::shared_ptr CreateSenderConnection( - ConnectionPool::ConnectionType type, RemoteConnectionInfo info); - /// This is used to notify the main thread that the sending of a chunk has /// completed. /// @@ -309,49 +383,15 @@ class ObjectManager : public ObjectManagerInterface { uint64_t chunk_index, double start_time_us, double end_time_us, ray::Status status); - /// Begin executing a send. - /// Executes on send_service_ thread pool. - ray::Status ExecuteSendObject(const UniqueID &push_id, const ClientID &client_id, - const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &connection_info); - - /// This method synchronously sends the object id and object size - /// to the remote object manager. - /// Executes on send_service_ thread pool. - ray::Status SendObjectHeaders(const UniqueID &push_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - std::shared_ptr &conn); - - /// This method initiates the actual object transfer. - /// Executes on send_service_ thread pool. - ray::Status SendObjectData(const ObjectID &object_id, - const ObjectBufferPool::ChunkInfo &chunk_info, - std::shared_ptr &conn); - - /// Invoked when a remote object manager pushes an object to this object manager. - /// This will invoke the object receive on the receive_service_ thread pool. - void ReceivePushRequest(std::shared_ptr &conn, - const uint8_t *message); - /// Execute a receive on the receive_service_ thread pool. ray::Status ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, TcpClientConnection &conn); - /// Handles receiving a pull request message. - void ReceivePullRequest(std::shared_ptr &conn, - const uint8_t *message); /// Handles freeing objects request. void ReceiveFreeRequest(std::shared_ptr &conn, const uint8_t *message); - /// Handles connect message of a new client connection. - void ConnectClient(std::shared_ptr &conn, const uint8_t *message); - /// Handles disconnect message of an existing client connection. - void DisconnectClient(std::shared_ptr &conn, - const uint8_t *message); /// Handle Push task timeout. void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id); @@ -361,28 +401,19 @@ class ObjectManager : public ObjectManagerInterface { ObjectStoreNotificationManager store_notification_; ObjectBufferPool buffer_pool_; - /// This runs on a thread pool dedicated to sending objects. - boost::asio::io_service send_service_; - /// This runs on a thread pool dedicated to receiving objects. - boost::asio::io_service receive_service_; - /// Weak reference to main service. We ensure this object is destroyed before /// main_service_ is stopped. boost::asio::io_service *main_service_; - /// Used to create "work" for send_service_. - /// Without this, if send_service_ has no more sends to process, it will stop. - boost::asio::io_service::work send_work_; - /// Used to create "work" for receive_service_. - /// Without this, if receive_service_ has no more receives to process, it will stop. - boost::asio::io_service::work receive_work_; + /// Multi-thread asio service, deal with all outgoing and incoming RPC request. + boost::asio::io_service rpc_service_; - /// Runs the send service, which handle - /// all outgoing object transfers. - std::vector send_threads_; - /// Runs the receive service, which handle - /// all incoming object transfers. - std::vector receive_threads_; + /// Keep rpc service running when no task in rpc service. + boost::asio::io_service::work rpc_work_; + + /// The thread pool used for running `rpc_service`. + /// Data copy operations during request are done in this thread pool. + std::vector rpc_threads_; /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; @@ -414,8 +445,25 @@ class ObjectManager : public ObjectManagerInterface { /// table in the GCS. std::vector profile_events_; + /// mutex lock used to protect profile_events_, profile_events_ is used in main thread + /// and rpc thread. + std::mutex profile_mutex_; + /// Internally maintained random number generator. std::mt19937_64 gen_; + + /// The gPRC server. + rpc::GrpcServer object_manager_server_; + + /// The gRPC service. + rpc::ObjectManagerGrpcService object_manager_service_; + + /// The client call manager used to deal with reply. + rpc::ClientCallManager client_call_manager_; + + /// clientID - object manager gRPC client. + std::unordered_map> + remote_object_manager_clients_; }; } // namespace ray diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 2d5292842..3b142e55c 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -33,15 +33,11 @@ class MockServer { MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) - : object_manager_acceptor_( - main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), - object_manager_socket_(main_service), + : config_(object_manager_config), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); - // Start listening for clients. - DoAcceptObjectManager(); } ~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); } @@ -50,45 +46,20 @@ class MockServer { ray::Status RegisterGcs(boost::asio::io_service &io_service) { RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service)); - boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint(); - std::string ip = endpoint.address().to_string(); - unsigned short object_manager_port = endpoint.port(); - + auto object_manager_port = config_.object_manager_port; ClientTableData client_info = gcs_client_->client_table().GetLocalClient(); - client_info.set_node_manager_address(ip); + client_info.set_node_manager_address("127.0.0.1"); client_info.set_node_manager_port(object_manager_port); client_info.set_object_manager_port(object_manager_port); + ray::Status status = gcs_client_->client_table().Connect(client_info); object_manager_.RegisterGcs(); return status; } - void DoAcceptObjectManager() { - object_manager_acceptor_.async_accept( - object_manager_socket_, boost::bind(&MockServer::HandleAcceptObjectManager, this, - boost::asio::placeholders::error)); - } - - void HandleAcceptObjectManager(const boost::system::error_code &error) { - ClientHandler client_handler = - [this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); }; - MessageHandler message_handler = - [this](std::shared_ptr client, int64_t message_type, - const uint8_t *message) { - object_manager_.ProcessClientMessage(client, message_type, message); - }; - // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create( - client_handler, message_handler, std::move(object_manager_socket_), - "object manager", {}, - static_cast(object_manager::protocol::MessageType::DisconnectClient)); - DoAcceptObjectManager(); - } - friend class StressTestObjectManager; - boost::asio::ip::tcp::acceptor object_manager_acceptor_; - boost::asio::ip::tcp::socket object_manager_socket_; + ObjectManagerConfig config_; std::shared_ptr gcs_client_; ObjectManager object_manager_; }; @@ -127,10 +98,6 @@ class TestObjectManagerBase : public ::testing::Test { store_id_2 = StartStore(UniqueID::FromRandom().Hex()); uint pull_timeout_ms = 1000; - int max_sends_a = 2; - int max_receives_a = 2; - int max_sends_b = 3; - int max_receives_b = 3; uint64_t object_chunk_size = static_cast(std::pow(10, 3)); int push_timeout_ms = 10000; @@ -140,10 +107,10 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_id_1; om_config_1.pull_timeout_ms = pull_timeout_ms; - om_config_1.max_sends = max_sends_a; - om_config_1.max_receives = max_receives_a; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; + om_config_1.object_manager_port = 12345; + om_config_1.rpc_service_threads_number = 3; server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server @@ -152,10 +119,10 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_id_2; om_config_2.pull_timeout_ms = pull_timeout_ms; - om_config_2.max_sends = max_sends_b; - om_config_2.max_receives = max_receives_b; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; + om_config_2.object_manager_port = 23456; + om_config_2.rpc_service_threads_number = 3; server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 45b80a267..0e0af1ad2 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -27,15 +27,11 @@ class MockServer { MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) - : object_manager_acceptor_( - main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), - object_manager_socket_(main_service), + : config_(object_manager_config), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, std::make_shared(main_service, gcs_client_)) { RAY_CHECK_OK(RegisterGcs(main_service)); - // Start listening for clients. - DoAcceptObjectManager(); } ~MockServer() { RAY_CHECK_OK(gcs_client_->client_table().Disconnect()); } @@ -44,45 +40,20 @@ class MockServer { ray::Status RegisterGcs(boost::asio::io_service &io_service) { RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service)); - boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint(); - std::string ip = endpoint.address().to_string(); - unsigned short object_manager_port = endpoint.port(); - + auto object_manager_port = config_.object_manager_port; ClientTableData client_info = gcs_client_->client_table().GetLocalClient(); - client_info.set_node_manager_address(ip); + client_info.set_node_manager_address("127.0.0.1"); client_info.set_node_manager_port(object_manager_port); client_info.set_object_manager_port(object_manager_port); + ray::Status status = gcs_client_->client_table().Connect(client_info); object_manager_.RegisterGcs(); return status; } - void DoAcceptObjectManager() { - object_manager_acceptor_.async_accept( - object_manager_socket_, boost::bind(&MockServer::HandleAcceptObjectManager, this, - boost::asio::placeholders::error)); - } - - void HandleAcceptObjectManager(const boost::system::error_code &error) { - ClientHandler client_handler = - [this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); }; - MessageHandler message_handler = - [this](std::shared_ptr client, int64_t message_type, - const uint8_t *message) { - object_manager_.ProcessClientMessage(client, message_type, message); - }; - // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create( - client_handler, message_handler, std::move(object_manager_socket_), - "object manager", {}, - static_cast(object_manager::protocol::MessageType::DisconnectClient)); - DoAcceptObjectManager(); - } - friend class TestObjectManager; - boost::asio::ip::tcp::acceptor object_manager_acceptor_; - boost::asio::ip::tcp::socket object_manager_socket_; + ObjectManagerConfig config_; std::shared_ptr gcs_client_; ObjectManager object_manager_; }; @@ -128,10 +99,10 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_id_1; om_config_1.pull_timeout_ms = pull_timeout_ms; - om_config_1.max_sends = max_sends; - om_config_1.max_receives = max_receives; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; + om_config_1.object_manager_port = 12345; + om_config_1.rpc_service_threads_number = 3; server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server @@ -140,10 +111,10 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_id_2; om_config_2.pull_timeout_ms = pull_timeout_ms; - om_config_2.max_sends = max_sends; - om_config_2.max_receives = max_receives; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; + om_config_2.object_manager_port = 23456; + om_config_2.rpc_service_threads_number = 3; server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. @@ -201,8 +172,6 @@ class TestObjectManagerBase : public ::testing::Test { uint push_timeout_ms; - int max_sends = 2; - int max_receives = 2; uint64_t object_chunk_size = static_cast(std::pow(10, 3)); }; @@ -384,6 +353,7 @@ class TestObjectManager : public TestObjectManagerBase { num_objects += 1; object_ids.push_back(ObjectID::FromRandom()); } + boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time(); RAY_CHECK_OK(server1->object_manager_.Wait( object_ids, timeout_ms, required_objects, false, diff --git a/src/ray/protobuf/object_manager.proto b/src/ray/protobuf/object_manager.proto new file mode 100644 index 000000000..90dfea915 --- /dev/null +++ b/src/ray/protobuf/object_manager.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +package ray.rpc; + +message PushRequest { + // The push ID to allow the receiver to differentiate different push attempts + // from the same sender. + bytes push_id = 1; + // The object ID being transferred. + bytes object_id = 2; + // The client ID of client sending this object + bytes client_id = 3; + // The index of the chunk being transferred. + uint32 chunk_index = 4; + // The data_size include object_size and metadata_size + uint64 data_size = 5; + // The metadata size. + uint64 metadata_size = 6; + // The chunk data + bytes data = 7; +} + +message PullRequest { + // ID of the requesting client. + bytes client_id = 1; + // Requested ObjectID. + bytes object_id = 2; +} + +message FreeObjectsRequest { + repeated bytes object_ids = 1; +} + +// Reply for request +message PushReply { +} +message PullReply { +} +message FreeObjectsReply { +} + +service ObjectManagerService { + // Push service used to send object chunks + rpc Push(PushRequest) returns (PushReply); + // Try to pull object from remote object manager + rpc Pull(PullRequest) returns (PullReply); + // Tell remote object manager to free some objects + rpc FreeObjects(FreeObjectsRequest) returns (FreeObjectsReply); +} diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index eca282a53..5c0680e02 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -146,14 +146,13 @@ int main(int argc, char *argv[]) { RayConfig::instance().object_manager_push_timeout_ms(); int num_cpus = static_cast(static_resource_conf["CPU"]); - object_manager_config.max_sends = std::max(1, num_cpus / 4); - object_manager_config.max_receives = std::max(1, num_cpus / 4); + object_manager_config.rpc_service_threads_number = std::max(2, num_cpus / 2); object_manager_config.object_chunk_size = RayConfig::instance().object_manager_default_chunk_size(); RAY_LOG(DEBUG) << "Starting object manager with configuration: \n" - << "max_sends = " << object_manager_config.max_sends << "\n" - << "max_receives = " << object_manager_config.max_receives << "\n" + << "rpc_service_threads_number = " + << object_manager_config.rpc_service_threads_number << "object_chunk_size = " << object_manager_config.object_chunk_size; // Initialize the node manager. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 464e0c917..bea7fb97b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -5,8 +5,8 @@ // clang-format off #include "ray/rpc/client_call.h" -#include "ray/rpc/node_manager_server.h" -#include "ray/rpc/node_manager_client.h" +#include "ray/rpc/node_manager/node_manager_server.h" +#include "ray/rpc/node_manager/node_manager_client.h" #include "ray/raylet/task.h" #include "ray/object_manager/object_manager.h" #include "ray/common/client_connection.h" diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index cbf9b2521..871dc5049 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -33,10 +33,6 @@ static const std::vector node_manager_message_enum = GenerateEnumNames(ray::protocol::EnumNamesMessageType(), static_cast(ray::protocol::MessageType::MIN), static_cast(ray::protocol::MessageType::MAX)); -static const std::vector object_manager_message_enum = - GenerateEnumNames(ray::object_manager::protocol::EnumNamesMessageType(), - static_cast(ray::object_manager::protocol::MessageType::MIN), - static_cast(ray::object_manager::protocol::MessageType::MAX)); } // namespace namespace ray { @@ -56,15 +52,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ object_directory_), socket_name_(socket_name), acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), - socket_(main_service), - object_manager_acceptor_( - main_service, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), - object_manager_config.object_manager_port)), - object_manager_socket_(main_service) { + socket_(main_service) { // Start listening for clients. DoAccept(); - DoAcceptObjectManager(); RAY_CHECK_OK(RegisterGcs( node_ip_address, socket_name_, object_manager_config.store_socket_name, @@ -94,7 +84,7 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, client_info.set_node_manager_address(node_ip_address); client_info.set_raylet_socket_name(raylet_socket_name); client_info.set_object_store_socket_name(object_store_socket_name); - client_info.set_object_manager_port(object_manager_acceptor_.local_endpoint().port()); + client_info.set_object_manager_port(object_manager_.GetServerPort()); client_info.set_node_manager_port(node_manager_.GetServerPort()); // Add resource information. for (const auto &resource_pair : node_manager_config.resource_config.GetResourceMap()) { @@ -115,28 +105,6 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, return Status::OK(); } -void Raylet::DoAcceptObjectManager() { - object_manager_acceptor_.async_accept( - object_manager_socket_, boost::bind(&Raylet::HandleAcceptObjectManager, this, - boost::asio::placeholders::error)); -} - -void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) { - ClientHandler client_handler = - [this](TcpClientConnection &client) { object_manager_.ProcessNewClient(client); }; - MessageHandler message_handler = - [this](std::shared_ptr client, int64_t message_type, - const uint8_t *message) { - object_manager_.ProcessClientMessage(client, message_type, message); - }; - // Accept a new TCP client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create( - client_handler, message_handler, std::move(object_manager_socket_), - "object manager", object_manager_message_enum, - static_cast(object_manager::protocol::MessageType::DisconnectClient)); - DoAcceptObjectManager(); -} - void Raylet::DoAccept() { acceptor_.async_accept(socket_, boost::bind(&Raylet::HandleAccept, this, boost::asio::placeholders::error)); diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 9367a5054..24826c6e6 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -61,10 +61,6 @@ class Raylet { void DoAccept(); /// Handle an accepted client connection. void HandleAccept(const boost::system::error_code &error); - /// Accept a tcp client connection. - void DoAcceptObjectManager(); - /// Handle an accepted tcp client connection. - void HandleAcceptObjectManager(const boost::system::error_code &error); friend class TestObjectManagerIntegration; @@ -84,10 +80,6 @@ class Raylet { boost::asio::local::stream_protocol::acceptor acceptor_; /// The socket to listen on for new clients. boost::asio::local::stream_protocol::socket socket_; - /// An acceptor for new object manager tcp clients. - boost::asio::ip::tcp::acceptor object_manager_acceptor_; - /// The socket to listen on for new object manager tcp clients. - boost::asio::ip::tcp::socket object_manager_socket_; }; } // namespace raylet diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 8bb175ed4..a134c05a6 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -49,7 +49,11 @@ using ClientCallback = std::function class ClientCallImpl : public ClientCall { public: - void OnReplyReceived() override { callback_(GrpcStatusToRayStatus(status_), reply_); } + void OnReplyReceived() override { + if (callback_ != nullptr) { + callback_(GrpcStatusToRayStatus(status_), reply_); + } + } private: /// Constructor. diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 3b1a13ec7..08d064304 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -1,4 +1,5 @@ -#include "ray/rpc/grpc_server.h" + +#include "src/ray/rpc/grpc_server.h" #include namespace ray { @@ -11,6 +12,9 @@ void GrpcServer::Run() { // TODO(hchen): Add options for authentication. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_); // Register all the services to this server. + if (services_.size() == 0) { + RAY_LOG(WARNING) << "No service is found when start grpc server."; + } for (auto &entry : services_) { builder.RegisterService(&entry.get()); } @@ -19,7 +23,7 @@ void GrpcServer::Run() { cq_ = builder.AddCompletionQueue(); // Build and start server. server_ = builder.BuildAndStart(); - RAY_LOG(DEBUG) << name_ << " server started, listening on port " << port_ << "."; + RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << "."; // Create calls for all the server call factories. for (auto &entry : server_call_factories_and_concurrencies_) { @@ -29,8 +33,7 @@ void GrpcServer::Run() { } } // Start a thread that polls incoming requests. - std::thread polling_thread(&GrpcServer::PollEventsFromCompletionQueue, this); - polling_thread.detach(); + polling_thread_ = std::thread(&GrpcServer::PollEventsFromCompletionQueue, this); } void GrpcServer::RegisterService(GrpcService &service) { @@ -59,7 +62,7 @@ void GrpcServer::PollEventsFromCompletionQueue() { break; case ServerCallState::SENDING_REPLY: // The reply has been sent, this call can be deleted now. - // This event is triggered by `ServerCallImpl::SendReply`. + // This event is triggered by `ServerCallImpl::Finish`. delete_call = true; break; default: diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 584da6565..01db49421 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -30,17 +30,28 @@ class GrpcServer { /// \param[in] name Name of this server, used for logging and debugging purpose. /// \param[in] port The port to bind this server to. If it's 0, a random available port /// will be chosen. - GrpcServer(const std::string &name, const uint32_t port) : name_(name), port_(port) {} + /// \param[in] main_service The main event loop, to which service handler functions + /// will be posted. + GrpcServer(const std::string &name, const uint32_t port) + : name_(name), port_(port), is_closed_(false) {} /// Destruct this gRPC server. - ~GrpcServer() { - server_->Shutdown(); - cq_->Shutdown(); - } + ~GrpcServer() { Shutdown(); } /// Initialize and run this server. void Run(); + // Shutdown this server + void Shutdown() { + if (!is_closed_) { + server_->Shutdown(); + cq_->Shutdown(); + polling_thread_.join(); + is_closed_ = true; + RAY_LOG(DEBUG) << "gRPC server of " << name_ << " shutdown."; + } + } + /// Get the port of this gRPC server. int GetPort() const { return port_; } @@ -71,6 +82,10 @@ class GrpcServer { std::unique_ptr cq_; /// The `Server` object. std::unique_ptr server_; + /// The polling thread used to check the completion queue + std::thread polling_thread_; + /// Flag indicates whether this server has closed + bool is_closed_; }; /// Base class that represents an abstract gRPC service. diff --git a/src/ray/rpc/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h similarity index 100% rename from src/ray/rpc/node_manager_client.h rename to src/ray/rpc/node_manager/node_manager_client.h diff --git a/src/ray/rpc/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h similarity index 97% rename from src/ray/rpc/node_manager_server.h rename to src/ray/rpc/node_manager/node_manager_server.h index d05f268c6..4505d1017 100644 --- a/src/ray/rpc/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -14,8 +14,8 @@ namespace rpc { class NodeManagerServiceHandler { public: /// Handle a `ForwardTask` request. - /// The implementation can handle this request asynchronously. When hanling is done, the - /// `done_callback` should be called. + /// The implementation can handle this request asynchronously. When handling is done, + /// the `done_callback` should be called. /// /// \param[in] request The request message. /// \param[out] reply The reply message. diff --git a/src/ray/rpc/object_manager/object_manager_client.h b/src/ray/rpc/object_manager/object_manager_client.h new file mode 100644 index 000000000..f37a081e6 --- /dev/null +++ b/src/ray/rpc/object_manager/object_manager_client.h @@ -0,0 +1,74 @@ +#ifndef RAY_RPC_OBJECT_MANAGER_CLIENT_H +#define RAY_RPC_OBJECT_MANAGER_CLIENT_H + +#include + +#include + +#include "ray/common/status.h" +#include "ray/util/logging.h" +#include "src/ray/protobuf/object_manager.grpc.pb.h" +#include "src/ray/protobuf/object_manager.pb.h" +#include "src/ray/rpc/client_call.h" + +namespace ray { +namespace rpc { + +/// Client used for communicating with a remote node manager server. +class ObjectManagerClient { + public: + /// Constructor. + /// + /// \param[in] address Address of the node manager server. + /// \param[in] port Port of the node manager server. + /// \param[in] client_call_manager The `ClientCallManager` used for managing requests. + ObjectManagerClient(const std::string &address, const int port, + ClientCallManager &client_call_manager) + : client_call_manager_(client_call_manager) { + std::shared_ptr channel = grpc::CreateChannel( + address + ":" + std::to_string(port), grpc::InsecureChannelCredentials()); + stub_ = ObjectManagerService::NewStub(channel); + }; + + /// Push object to remote object manager + /// + /// \param request The request message. + /// \param callback The callback function that handles reply from server + void Push(const PushRequest &request, const ClientCallback &callback) { + client_call_manager_.CreateCall( + *stub_, &ObjectManagerService::Stub::PrepareAsyncPush, request, callback); + } + + /// Pull object from remote object manager + /// + /// \param request The request message + /// \param callback The callback function that handles reply from server + void Pull(const PullRequest &request, const ClientCallback &callback) { + client_call_manager_.CreateCall( + *stub_, &ObjectManagerService::Stub::PrepareAsyncPull, request, callback); + } + + /// Tell remote object manager to free objects + /// + /// \param request The request message + /// \param callback The callback function that handles reply + void FreeObjects(const FreeObjectsRequest &request, + const ClientCallback &callback) { + client_call_manager_ + .CreateCall( + *stub_, &ObjectManagerService::Stub::PrepareAsyncFreeObjects, request, + callback); + } + + private: + /// The gRPC-generated stub. + std::unique_ptr stub_; + + /// The `ClientCallManager` used for managing requests. + ClientCallManager &client_call_manager_; +}; + +} // namespace rpc +} // namespace ray + +#endif // RAY_RPC_OBJECT_MANAGER_CLIENT_H diff --git a/src/ray/rpc/object_manager/object_manager_server.h b/src/ray/rpc/object_manager/object_manager_server.h new file mode 100644 index 000000000..dbabc11a9 --- /dev/null +++ b/src/ray/rpc/object_manager/object_manager_server.h @@ -0,0 +1,92 @@ +#ifndef RAY_RPC_OBJECT_MANAGER_SERVER_H +#define RAY_RPC_OBJECT_MANAGER_SERVER_H + +#include "src/ray/rpc/grpc_server.h" +#include "src/ray/rpc/server_call.h" + +#include "src/ray/protobuf/object_manager.grpc.pb.h" +#include "src/ray/protobuf/object_manager.pb.h" + +namespace ray { +namespace rpc { + +/// Implementations of the `ObjectManagerGrpcService`, check interface in +/// `src/ray/protobuf/object_manager.proto`. +class ObjectManagerServiceHandler { + public: + /// Handle a `Push` request. + /// The implementation can handle this request asynchronously. When handling is done, + /// the `done_callback` should be called. + /// + /// \param[in] request The request message. + /// \param[out] reply The reply message. + /// \param[in] done_callback The callback to be called when the request is done. + virtual void HandlePushRequest(const PushRequest &request, PushReply *reply, + RequestDoneCallback done_callback) = 0; + /// Handle a `Pull` request + virtual void HandlePullRequest(const PullRequest &request, PullReply *reply, + RequestDoneCallback done_callback) = 0; + /// Handle a `FreeObjects` request + virtual void HandleFreeObjectsRequest(const FreeObjectsRequest &request, + FreeObjectsReply *reply, + RequestDoneCallback done_callback) = 0; +}; + +/// The `GrpcService` for `ObjectManagerGrpcService`. +class ObjectManagerGrpcService : public GrpcService { + public: + /// Construct a `ObjectManagerGrpcService`. + /// + /// \param[in] port See `GrpcService`. + /// \param[in] handler The service handler that actually handle the requests. + ObjectManagerGrpcService(boost::asio::io_service &io_service, + ObjectManagerServiceHandler &service_handler) + : GrpcService(io_service), service_handler_(service_handler){}; + + protected: + grpc::Service &GetGrpcService() override { return service_; } + + void InitServerCallFactories( + const std::unique_ptr &cq, + std::vector, int>> + *server_call_factories_and_concurrencies) override { + // Initialize the factory for `Push` requests. + std::unique_ptr push_call_factory( + new ServerCallFactoryImpl( + service_, &ObjectManagerService::AsyncService::RequestPush, service_handler_, + &ObjectManagerServiceHandler::HandlePushRequest, cq, main_service_)); + server_call_factories_and_concurrencies->emplace_back(std::move(push_call_factory), + 50); + + // Initialize the factory for `Pull` requests. + std::unique_ptr pull_call_factory( + new ServerCallFactoryImpl( + service_, &ObjectManagerService::AsyncService::RequestPull, service_handler_, + &ObjectManagerServiceHandler::HandlePullRequest, cq, main_service_)); + server_call_factories_and_concurrencies->emplace_back(std::move(pull_call_factory), + 50); + + // Initialize the factory for `FreeObjects` requests. + std::unique_ptr free_objects_call_factory( + new ServerCallFactoryImpl( + service_, &ObjectManagerService::AsyncService::RequestFreeObjects, + service_handler_, &ObjectManagerServiceHandler::HandleFreeObjectsRequest, cq, + main_service_)); + server_call_factories_and_concurrencies->emplace_back( + std::move(free_objects_call_factory), 2); + } + + private: + /// The grpc async service object. + ObjectManagerService::AsyncService service_; + /// The service handler that actually handle the requests. + ObjectManagerServiceHandler &service_handler_; +}; + +} // namespace rpc +} // namespace ray + +#endif diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index f091ebbed..263250e73 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -59,6 +59,10 @@ class ServerCall { /// Get the factory that created this `ServerCall`. virtual const ServerCallFactory &GetFactory() const = 0; + /// Finish the `ServerCall`. + virtual void Finish(Status status) = 0; + + /// Virtual destruct function to make sure subclass would destruct properly. virtual ~ServerCall() = default; }; @@ -113,7 +117,14 @@ class ServerCallImpl : public ServerCall { void SetState(const ServerCallState &new_state) override { state_ = new_state; } void HandleRequest() override { - io_service_.post([this] { HandleRequestImpl(); }); + if (!io_service_.stopped()) { + io_service_.post([this] { HandleRequestImpl(); }); + } else { + // Handle service for rpc call has stopped, we must handle the call here + // to send reply and remove it from cq + RAY_LOG(DEBUG) << "Handle service has been closed."; + Finish(Status::Invalid("HandleServiceClosed")); + } } void HandleRequestImpl() { @@ -123,19 +134,19 @@ class ServerCallImpl : public ServerCall { // When the handler is done with the // request, tell gRPC to finish this // request. - SendReply(status); + Finish(status); }); } const ServerCallFactory &GetFactory() const override { return factory_; } - private: /// Tell gRPC to finish this request. - void SendReply(Status status) { + void Finish(Status status) override { state_ = ServerCallState::SENDING_REPLY; response_writer_.Finish(reply_, RayStatusToGrpcStatus(status), this); } + private: /// State of this call. ServerCallState state_;