From ee2da0cf45ac70225d9196b4fad7f63a45dcfa55 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 9 Nov 2020 18:01:47 -0800 Subject: [PATCH] [Core] PushManager for reliable broadcast (#11869) --- BUILD.bazel | 12 ++ python/ray/tests/test_object_manager.py | 5 +- src/ray/common/ray_config_def.h | 7 +- src/ray/object_manager/object_manager.cc | 112 +++++++++++++----- src/ray/object_manager/object_manager.h | 86 +++++++++++++- .../object_manager/test/push_manager_test.cc | 74 ++++++++++++ src/ray/raylet/main.cc | 2 + 7 files changed, 259 insertions(+), 39 deletions(-) create mode 100644 src/ray/object_manager/test/push_manager_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 6a224cafe..e55b44dcf 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -807,6 +807,18 @@ cc_test( ], ) +cc_test( + name = "push_manager_test", + srcs = [ + "src/ray/object_manager/test/push_manager_test.cc", + ], + copts = COPTS, + deps = [ + ":raylet_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "reconstruction_policy_test", srcs = ["src/ray/raylet/reconstruction_policy_test.cc"], diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index e4b3de543..b7252519b 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -157,9 +157,8 @@ def test_actor_broadcast(ray_start_cluster_with_resource): # Make sure that each object was transferred a reasonable number of times. for x_id in object_refs: relevant_events = [ - event for event in transfer_events - if event["cat"] == "transfer_send" - and event["args"][0] == x_id.hex() and event["args"][2] == 1 + event for event in transfer_events if + event["cat"] == "transfer_send" and event["args"][0] == x_id.hex() ] # NOTE: Each event currently appears twice because we duplicate the diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index a2fd66228..7105f51da 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -193,7 +193,12 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000) /// In the object manager, no single thread is permitted to transfer more /// data than what is specified by the chunk size unless the number of object /// chunks exceeds the number of available sending threads. -RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000) +/// NOTE(ekl): this has been raised to lower broadcast overheads. +RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 5 * 1024 * 1024) + +/// The maximum number of outbound bytes to allow to be outstanding. This avoids +/// excessive memory usage during object broadcast to many receivers. +RAY_CONFIG(uint64_t, object_manager_max_bytes_in_flight, 2L * 1024 * 1024 * 1024) /// Number of workers per Python worker process RAY_CONFIG(int, num_workers_per_process_python, 1) diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 209eb4106..64d8ca94f 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -70,6 +70,10 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_ RAY_CHECK(config_.rpc_service_threads_number > 0); main_service_ = &main_service; + push_manager_.reset(new PushManager(/* max_chunks_in_flight= */ std::max( + static_cast(1L), + static_cast(config_.max_bytes_in_flight / config_.object_chunk_size)))); + if (plasma::plasma_store_runner) { store_notification_ = std::make_shared(main_service); plasma::plasma_store_runner->SetNotificationListener(store_notification_); @@ -400,6 +404,56 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id, profile_events_.push_back(profile_event); } +void PushManager::StartPush(const UniqueID &push_id, int64_t num_chunks, + std::function send_chunk_fn) { + RAY_LOG(DEBUG) << "Start push for " << push_id << ", num chunks " << num_chunks; + RAY_CHECK(num_chunks > 0); + push_info_[push_id] = std::make_pair(num_chunks, send_chunk_fn); + next_chunk_id_[push_id] = 0; + chunks_remaining_ += num_chunks; + ScheduleRemainingPushes(); + RAY_CHECK(push_info_.size() == next_chunk_id_.size()); +} + +void PushManager::OnChunkComplete() { + chunks_in_flight_ -= 1; + ScheduleRemainingPushes(); +} + +void PushManager::ScheduleRemainingPushes() { + // Loop over all active pushes for approximate round-robin prioritization. + // TODO(ekl) this isn't the best implementation of round robin, we should + // consider tracking the number of chunks active per-push and balancing those. + while (chunks_in_flight_ < max_chunks_in_flight_ && push_info_.size() > 0) { + // Loop over each active push and try to send another chunk. + auto it = push_info_.begin(); + while (it != push_info_.end() && chunks_in_flight_ < max_chunks_in_flight_) { + auto push_id = it->first; + auto max_chunks = it->second.first; + auto send_chunk_fn = it->second.second; + + // Send the next chunk for this push. + send_chunk_fn(next_chunk_id_[push_id]); + chunks_in_flight_ += 1; + chunks_remaining_ -= 1; + RAY_LOG(DEBUG) << "Sending chunk " << next_chunk_id_[push_id] << " of " + << max_chunks << " for push " << push_id << ", chunks in flight " + << NumChunksInFlight() << " / " << max_chunks_in_flight_ + << " max, remaining chunks: " << NumChunksRemaining(); + + // It is the last chunk and we don't need to track it any more. + if (++next_chunk_id_[push_id] >= max_chunks) { + next_chunk_id_.erase(push_id); + push_info_.erase(it++); + RAY_LOG(DEBUG) << "Push for " << push_id + << " completed, remaining: " << NumPushesInFlight(); + } else { + it++; + } + } + } +} + void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) { RAY_LOG(DEBUG) << "Push on " << self_node_id_ << " to " << client_id << " of object " << object_id; @@ -478,17 +532,11 @@ void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) { << ", total data size: " << data_size; UniqueID push_id = UniqueID::FromRandom(); - for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - rpc_service_.post([this, push_id, object_id, owner_address, client_id, data_size, - metadata_size, chunk_index, rpc_client]() { - auto st = SendObjectChunk(push_id, object_id, owner_address, client_id, data_size, - metadata_size, chunk_index, rpc_client); - if (!st.ok()) { - RAY_LOG(WARNING) << "Send object " << object_id << " chunk failed due to " - << st.message() << ", chunk index " << chunk_index; - } - }); - } + push_manager_->StartPush(push_id, num_chunks, [=](int64_t chunk_id) { + SendObjectChunk(push_id, object_id, owner_address, client_id, data_size, + metadata_size, chunk_id, rpc_client, + [=](const Status &status) { push_manager_->OnChunkComplete(); }); + }); } else { // Push is best effort, so do nothing here. RAY_LOG(ERROR) @@ -496,10 +544,12 @@ void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) { } } -ray::Status ObjectManager::SendObjectChunk( - const UniqueID &push_id, const ObjectID &object_id, const rpc::Address &owner_address, - const NodeID &client_id, uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, std::shared_ptr rpc_client) { +void ObjectManager::SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id, + const rpc::Address &owner_address, + const NodeID &client_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + std::shared_ptr rpc_client, + std::function on_complete) { double start_time = absl::GetCurrentTimeNanos() / 1e9; rpc::PushRequest push_request; // Set request header @@ -522,30 +572,31 @@ ray::Status ObjectManager::SendObjectChunk( if (!chunk_status.second.ok()) { RAY_LOG(WARNING) << "Attempting to push object " << object_id << " which is not local. It may have been evicted."; - RAY_RETURN_NOT_OK(status); + on_complete(status); + return; } push_request.set_data(chunk_info.data, chunk_info.buffer_length); // record the time cost between send chunk and receive reply - rpc::ClientCallback callback = [this, start_time, object_id, client_id, - chunk_index]( - const Status &status, - const rpc::PushReply &reply) { - // TODO: Just print warning here, should we try to resend this chunk? - if (!status.ok()) { - RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client " << client_id - << " failed due to" << status.message() - << ", chunk index: " << chunk_index; - } - double end_time = absl::GetCurrentTimeNanos() / 1e9; - HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status); - }; + rpc::ClientCallback callback = + [this, start_time, object_id, client_id, chunk_index, owner_address, rpc_client, + on_complete](const Status &status, const rpc::PushReply &reply) { + // TODO: Just print warning here, should we try to resend this chunk? + if (!status.ok()) { + RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client " + << client_id << " failed due to" << status.message() + << ", chunk index: " << chunk_index; + } + double end_time = absl::GetCurrentTimeNanos() / 1e9; + HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, + status); + on_complete(status); + }; rpc_client->Push(push_request, callback); // Do this regardless of whether it failed or succeeded. buffer_pool_.ReleaseGetChunk(object_id, chunk_info.chunk_index); - return Status::OK(); } void ObjectManager::CancelPull(const ObjectID &object_id) { @@ -920,6 +971,7 @@ std::string ObjectManager::DebugString() const { result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size(); result << "\n- num pull requests: " << pull_requests_.size(); result << "\n- num buffered profile events: " << profile_events_.size(); + result << "\n" << push_manager_->DebugString(); result << "\n" << object_directory_->DebugString(); result << "\n" << store_notification_->DebugString(); result << "\n" << buffer_pool_.DebugString(); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index e716c6624..cba152b58 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -53,6 +53,8 @@ struct ObjectManagerConfig { unsigned int pull_timeout_ms; /// Object chunk size, in bytes uint64_t object_chunk_size; + /// Max object push bytes in flight. + uint64_t max_bytes_in_flight; /// The store socket name. std::string store_socket_name; /// The time in milliseconds to wait until a Push request @@ -97,6 +99,75 @@ class ObjectManagerInterface { virtual ~ObjectManagerInterface(){}; }; +class PushManager { + public: + /// Manages rate limiting of outbound object pushes. + + /// Create a push manager. + /// + /// \param max_chunks_in_flight Max number of chunks allowed to be in flight + /// from this PushManager (this raylet). + PushManager(int64_t max_chunks_in_flight) + : max_chunks_in_flight_(max_chunks_in_flight) { + RAY_CHECK(max_chunks_in_flight_ > 0) << max_chunks_in_flight_; + }; + + /// Start pushing an object subject to max chunks in flight limit. + /// + /// \param push_id Unique identifier for this push. + /// \param num_chunks The total number of chunks to send. + /// \param send_chunk_fn This function will be called with args 0...{num_chunks-1}. + /// The caller promises to call PushManager::OnChunkComplete() + /// once a call to send_chunk_fn finishes. + void StartPush(const UniqueID &push_id, int64_t num_chunks, + std::function send_chunk_fn); + + /// Called every time a chunk completes to trigger additional sends. + /// TODO(ekl) maybe we should cancel the entire push on error. + void OnChunkComplete(); + + /// Return the number of chunks currently in flight. For testing only. + int64_t NumChunksInFlight() const { return chunks_in_flight_; }; + + /// Return the number of chunks remaining. For testing only. + int64_t NumChunksRemaining() const { return chunks_remaining_; }; + + /// Return the number of pushes currently in flight. For testing only. + int64_t NumPushesInFlight() const { return push_info_.size(); }; + + std::string DebugString() const { + std::stringstream result; + result << "PushManager:"; + result << "\n- num pushes in flight: " << NumPushesInFlight(); + result << "\n- num chunks in flight: " << NumChunksInFlight(); + result << "\n- num chunks remaining: " << NumChunksRemaining(); + result << "\n- max chunks allowed: " << max_chunks_in_flight_; + return result.str(); + } + + private: + /// Called on completion events to trigger additional pushes. + void ScheduleRemainingPushes(); + + /// Info about the pushed object: (num_chunks total, chunk_send_fn). + typedef std::pair> PushInfo; + + /// Max number of chunks in flight allowed. + const int64_t max_chunks_in_flight_; + + /// Running count of chunks remaining to send. + int64_t chunks_remaining_ = 0; + + /// Running count of chunks in flight, used to limit progress of in_flight_pushes_. + int64_t chunks_in_flight_ = 0; + + /// Tracks all pushes with chunk transfers in flight. + absl::flat_hash_map push_info_; + + /// Tracks progress of in flight pushes. + absl::flat_hash_map next_chunk_id_; +}; + // TODO(hme): Add success/failure callbacks for push and pull. class ObjectManager : public ObjectManagerInterface, public rpc::ObjectManagerServiceHandler { @@ -141,15 +212,17 @@ class ObjectManager : public ObjectManagerInterface, /// \param push_id Unique push id to indicate this push request /// \param object_id Object id /// \param owner_address The address of the object's owner + /// \param client_id The id of the receiver. /// \param data_size Data size /// \param metadata_size Metadata size /// \param chunk_index Chunk index of this object chunk, start with 0 /// \param rpc_client Rpc client used to send message to remote object manager - ray::Status SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id, - const rpc::Address &owner_address, const NodeID &client_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - std::shared_ptr rpc_client); + /// \param on_complete Callback to run on completion. + void SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id, + const rpc::Address &owner_address, const NodeID &client_id, + uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, + std::shared_ptr rpc_client, + std::function on_complete); /// Receive object chunk from remote object manager, small object may contain one chunk /// @@ -474,6 +547,9 @@ class ObjectManager : public ObjectManagerInterface, const RestoreSpilledObjectCallback restore_spilled_object_; + /// Object push manager. + std::unique_ptr push_manager_; + /// Running sum of the amount of memory used in the object store. int64_t used_memory_ = 0; }; diff --git a/src/ray/object_manager/test/push_manager_test.cc b/src/ray/object_manager/test/push_manager_test.cc new file mode 100644 index 000000000..82ae167ba --- /dev/null +++ b/src/ray/object_manager/test/push_manager_test.cc @@ -0,0 +1,74 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/object_manager/object_manager.h" + +#include "gtest/gtest.h" +#include "ray/common/test_util.h" + +namespace ray { + +TEST(TestPushManager, TestSingleTransfer) { + std::vector results; + results.reserve(10); + UniqueID push_id = UniqueID::FromRandom(); + PushManager pm(5); + pm.StartPush(push_id, 10, [&](int64_t chunk_id) { results[chunk_id] = 1; }); + ASSERT_EQ(pm.NumChunksInFlight(), 5); + ASSERT_EQ(pm.NumChunksRemaining(), 5); + ASSERT_EQ(pm.NumPushesInFlight(), 1); + for (int i = 0; i < 10; i++) { + pm.OnChunkComplete(); + } + ASSERT_EQ(pm.NumChunksInFlight(), 0); + ASSERT_EQ(pm.NumChunksRemaining(), 0); + ASSERT_EQ(pm.NumPushesInFlight(), 0); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(results[i], 1); + } +} + +TEST(TestPushManager, TestMultipleTransfers) { + std::vector results1; + results1.reserve(10); + std::vector results2; + results2.reserve(10); + UniqueID push1 = UniqueID::FromRandom(); + UniqueID push2 = UniqueID::FromRandom(); + PushManager pm(5); + pm.StartPush(push1, 10, [&](int64_t chunk_id) { results1[chunk_id] = 1; }); + pm.StartPush(push2, 10, [&](int64_t chunk_id) { results2[chunk_id] = 2; }); + ASSERT_EQ(pm.NumChunksInFlight(), 5); + ASSERT_EQ(pm.NumChunksRemaining(), 15); + ASSERT_EQ(pm.NumPushesInFlight(), 2); + for (int i = 0; i < 20; i++) { + pm.OnChunkComplete(); + } + ASSERT_EQ(pm.NumChunksInFlight(), 0); + ASSERT_EQ(pm.NumChunksRemaining(), 0); + ASSERT_EQ(pm.NumPushesInFlight(), 0); + for (int i = 0; i < 10; i++) { + ASSERT_EQ(results1[i], 1); + } + for (int i = 0; i < 10; i++) { + ASSERT_EQ(results2[i], 2); + } +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index a20843612..cb0d38a5d 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -236,6 +236,8 @@ int main(int argc, char *argv[]) { object_manager_config.push_timeout_ms = RayConfig::instance().object_manager_push_timeout_ms(); object_manager_config.object_store_memory = object_store_memory; + object_manager_config.max_bytes_in_flight = + RayConfig::instance().object_manager_max_bytes_in_flight(); object_manager_config.plasma_directory = plasma_directory; object_manager_config.huge_pages = huge_pages;