diff --git a/src/ray/CMakeLists.txt b/src/ray/CMakeLists.txt index 53edaf7c8..fb3114816 100644 --- a/src/ray/CMakeLists.txt +++ b/src/ray/CMakeLists.txt @@ -42,7 +42,6 @@ set(RAY_SRCS object_manager/object_buffer_pool.cc object_manager/object_store_notification_manager.cc object_manager/object_directory.cc - object_manager/transfer_queue.cc object_manager/object_manager.cc raylet/monitor.cc raylet/mock_gcs_client.cc diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 232114906..cb857fec4 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -7,7 +7,6 @@ namespace object_manager_protocol = ray::object_manager::protocol; namespace ray { ObjectManager::ObjectManager(asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &config, std::shared_ptr gcs_client) // TODO(hme): Eliminate knowledge of GCS. @@ -19,13 +18,9 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, /*release_delay=*/2 * config_.max_sends), - object_manager_service_(std::move(object_manager_service)), - work_(*object_manager_service_), - connection_pool_(), - transfer_queue_(), - num_transfers_send_(0), - num_transfers_receive_(0), - num_threads_(config_.max_sends + config_.max_receives) { + send_work_(send_service_), + receive_work_(receive_service_), + connection_pool_() { RAY_CHECK(config_.max_sends > 0); RAY_CHECK(config_.max_receives > 0); main_service_ = &main_service; @@ -37,7 +32,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service, } ObjectManager::ObjectManager(asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &config, std::unique_ptr od) : config_(config), @@ -47,13 +41,9 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // an object prematurely whenever we reach the maximum number of sends. buffer_pool_(config_.store_socket_name, config_.object_chunk_size, /*release_delay=*/2 * config_.max_sends), - object_manager_service_(std::move(object_manager_service)), - work_(*object_manager_service_), - connection_pool_(), - transfer_queue_(), - num_transfers_send_(0), - num_transfers_receive_(0), - num_threads_(config_.max_sends + config_.max_receives) { + send_work_(send_service_), + receive_work_(receive_service_), + connection_pool_() { RAY_CHECK(config_.max_sends > 0); RAY_CHECK(config_.max_receives > 0); // TODO(hme) Client ID is never set with this constructor. @@ -68,17 +58,26 @@ ObjectManager::ObjectManager(asio::io_service &main_service, ObjectManager::~ObjectManager() { StopIOService(); } void ObjectManager::StartIOService() { - for (int i = 0; i < num_threads_; ++i) { - io_threads_.emplace_back(std::thread(&ObjectManager::IOServiceLoop, this)); + for (int i = 0; i < config_.max_sends; ++i) { + send_threads_.emplace_back(std::thread(&ObjectManager::RunSendService, this)); + } + for (int i = 0; i < config_.max_receives; ++i) { + receive_threads_.emplace_back(std::thread(&ObjectManager::RunReceiveService, this)); } } -void ObjectManager::IOServiceLoop() { object_manager_service_->run(); } +void ObjectManager::RunSendService() { send_service_.run(); } + +void ObjectManager::RunReceiveService() { receive_service_.run(); } void ObjectManager::StopIOService() { - object_manager_service_->stop(); - for (int i = 0; i < num_threads_; ++i) { - io_threads_[i].join(); + send_service_.stop(); + for (int i = 0; i < config_.max_sends; ++i) { + send_threads_[i].join(); + } + receive_service_.stop(); + for (int i = 0; i < config_.max_receives; ++i) { + receive_threads_[i].join(); } } @@ -107,9 +106,7 @@ ray::Status ObjectManager::SubscribeObjDeleted( } ray::Status ObjectManager::Pull(const ObjectID &object_id) { - main_service_->dispatch( - [this, object_id]() { RAY_CHECK_OK(PullGetLocations(object_id)); }); - return Status::OK(); + return PullGetLocations(object_id); } void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) { @@ -118,8 +115,7 @@ void ObjectManager::SchedulePull(const ObjectID &object_id, int wait_ms) { pull_requests_[object_id]->async_wait( [this, object_id](const boost::system::error_code &error_code) { pull_requests_.erase(object_id); - main_service_->dispatch( - [this, object_id]() { RAY_CHECK_OK(PullGetLocations(object_id)); }); + RAY_CHECK_OK(PullGetLocations(object_id)); }); } @@ -137,7 +133,6 @@ void ObjectManager::GetLocationsSuccess(const std::vector &client const ray::ObjectID &object_id) { RAY_CHECK(!client_ids.empty()); ClientID client_id = client_ids.front(); - pull_requests_.erase(object_id); ray::Status status_code = Pull(object_id, client_id); } @@ -146,10 +141,7 @@ void ObjectManager::GetLocationsFailed(const ObjectID &object_id) { } ray::Status ObjectManager::Pull(const ObjectID &object_id, const ClientID &client_id) { - main_service_->dispatch([this, object_id, client_id]() { - RAY_CHECK_OK(PullEstablishConnection(object_id, client_id)); - }); - return Status::OK(); + return PullEstablishConnection(object_id, client_id); }; ray::Status ObjectManager::PullEstablishConnection(const ObjectID &object_id, @@ -212,95 +204,34 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien return ray::Status::OK(); } - main_service_->dispatch([this, object_id, client_id]() { - // TODO(hme): Cache this data in ObjectDirectory. - // Okay for now since the GCS client caches this data. - Status status = object_directory_->GetInformation( - client_id, - [this, object_id, client_id](const RemoteConnectionInfo &info) { - ObjectInfoT object_info = local_objects_[object_id]; - uint64_t data_size = - static_cast(object_info.data_size + object_info.metadata_size); - uint64_t metadata_size = static_cast(object_info.metadata_size); - uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); - for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - transfer_queue_.QueueSend(client_id, object_id, data_size, metadata_size, - chunk_index, info); - } - RAY_CHECK_OK(DequeueTransfers()); - }, - [](const Status &status) { - // Push is best effort, so do nothing here. - }); - RAY_CHECK_OK(status); - }); - return ray::Status::OK(); -} - -ray::Status ObjectManager::DequeueTransfers() { - ray::Status status = ray::Status::OK(); - // Dequeue sends. - while (true) { - int num_transfers_send = std::atomic_fetch_add(&num_transfers_send_, 1); - if (num_transfers_send < config_.max_sends) { - TransferQueue::SendRequest req; - bool exists = transfer_queue_.DequeueSendIfPresent(&req); - if (exists) { - object_manager_service_->dispatch([this, req]() { - RAY_LOG(DEBUG) << "DequeueSend " << client_id_ << " " << req.object_id << " " - << num_transfers_send_ << "/" << config_.max_sends; - RAY_CHECK_OK(ExecuteSendObject(req.client_id, req.object_id, req.data_size, - req.metadata_size, req.chunk_index, - req.connection_info)); - }); - } else { - std::atomic_fetch_sub(&num_transfers_send_, 1); - break; - } - } else { - std::atomic_fetch_sub(&num_transfers_send_, 1); - break; - } - } - // Dequeue receives. - while (true) { - int num_transfers_receive = std::atomic_fetch_add(&num_transfers_receive_, 1); - if (num_transfers_receive < config_.max_receives) { - TransferQueue::ReceiveRequest req; - bool exists = transfer_queue_.DequeueReceiveIfPresent(&req); - if (exists) { - object_manager_service_->dispatch([this, req]() { - RAY_LOG(DEBUG) << "DequeueReceive " << client_id_ << " " << req.object_id << " " - << num_transfers_receive_ << "/" << config_.max_receives; - RAY_CHECK_OK(ExecuteReceiveObject(req.client_id, req.object_id, req.data_size, - req.metadata_size, req.chunk_index, - req.conn)); - }); - } else { - std::atomic_fetch_sub(&num_transfers_receive_, 1); - break; - } - } else { - std::atomic_fetch_sub(&num_transfers_receive_, 1); - break; - } - } + // TODO(hme): Cache this data in ObjectDirectory. + // Okay for now since the GCS client caches this data. + Status status = object_directory_->GetInformation( + client_id, + [this, object_id, client_id](const RemoteConnectionInfo &info) { + ObjectInfoT object_info = local_objects_[object_id]; + uint64_t data_size = + static_cast(object_info.data_size + object_info.metadata_size); + uint64_t metadata_size = static_cast(object_info.metadata_size); + uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { + send_service_.post([this, client_id, object_id, data_size, metadata_size, + chunk_index, info]() { + ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, + info); + }); + } + }, + [](const Status &status) { + // Push is best effort, so do nothing here. + }); return status; } -ray::Status ObjectManager::TransferCompleted(TransferQueue::TransferType type) { - if (type == TransferQueue::TransferType::SEND) { - std::atomic_fetch_sub(&num_transfers_send_, 1); - } else { - std::atomic_fetch_sub(&num_transfers_receive_, 1); - } - return DequeueTransfers(); -}; - -ray::Status ObjectManager::ExecuteSendObject( - const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &connection_info) { +void ObjectManager::ExecuteSendObject(const ClientID &client_id, + const ObjectID &object_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + const RemoteConnectionInfo &connection_info) { RAY_LOG(DEBUG) << "ExecuteSendObject " << client_id << " " << object_id << " " << chunk_index; ray::Status status; @@ -314,7 +245,7 @@ ray::Status ObjectManager::ExecuteSendObject( conn); } status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); - return Status::OK(); + RAY_CHECK_OK(status); } ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, @@ -325,14 +256,11 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, buffer_pool_.GetChunk(object_id, data_size, metadata_size, chunk_index); ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first; - if (!chunk_status.second.ok()) { - // This is the first thread to invoke GetChunk => Get failed on the - // plasma client. - // No reference is acquired for this chunk, so no need to release the chunk. - // TODO(hme): Retry send here? If so, store RemoteConnectionInfo in SenderConnection. - RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::SEND)); - return chunk_status.second; - } + // If status is not okay, then return immediately because + // plasma_client.Get failed. + // No reference is acquired for this chunk, so no need to release the chunk. + RAY_RETURN_NOT_OK(chunk_status.second); + // Create buffer. flatbuffers::FlatBufferBuilder fbb; // TODO(hme): use to_flatbuf @@ -349,7 +277,6 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, ray::Status ObjectManager::SendObjectData(const ObjectID &object_id, const ObjectBufferPool::ChunkInfo &chunk_info, std::shared_ptr conn) { - // TransferQueue::SendContext context = transfer_queue_.GetContext(context_id); boost::system::error_code ec; std::vector buffer; buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length)); @@ -367,8 +294,7 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id, RAY_CHECK_OK( connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::TRANSFER, conn)); RAY_LOG(DEBUG) << "SendCompleted " << client_id_ << " " << object_id << " " - << num_transfers_send_ << "/" << config_.max_sends; - RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::SEND)); + << config_.max_sends; return status; } @@ -387,8 +313,8 @@ ray::Status ObjectManager::Wait(const std::vector &object_ids, std::shared_ptr ObjectManager::CreateSenderConnection( ConnectionPool::ConnectionType type, RemoteConnectionInfo info) { - std::shared_ptr conn = SenderConnection::Create( - *object_manager_service_, info.client_id, info.ip, info.port); + std::shared_ptr conn = + SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port); // Prepare client connection info buffer flatbuffers::FlatBufferBuilder fbb; bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); @@ -472,17 +398,16 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr conn uint64_t chunk_index = object_header->chunk_index(); uint64_t data_size = object_header->data_size(); uint64_t metadata_size = object_header->metadata_size(); - transfer_queue_.QueueReceive(conn->GetClientID(), object_id, data_size, metadata_size, - chunk_index, conn); - RAY_LOG(DEBUG) << "ReceivePushRequest " << conn->GetClientID() << " " << object_id - << " " << chunk_index; - RAY_CHECK_OK(DequeueTransfers()); + receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() { + ExecuteReceiveObject(conn->GetClientID(), object_id, data_size, metadata_size, + chunk_index, conn); + }); } -ray::Status ObjectManager::ExecuteReceiveObject( - const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - std::shared_ptr conn) { +void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, + const ObjectID &object_id, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + std::shared_ptr conn) { RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " " << chunk_index; @@ -518,9 +443,7 @@ ray::Status ObjectManager::ExecuteReceiveObject( } conn->ProcessMessages(); RAY_LOG(DEBUG) << "ReceiveCompleted " << client_id_ << " " << object_id << " " - << num_transfers_receive_ << "/" << config_.max_receives; - RAY_CHECK_OK(TransferCompleted(TransferQueue::TransferType::RECEIVE)); - return Status::OK(); + << "/" << config_.max_receives; } } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index a7db6c683..1dad93f60 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -26,7 +26,6 @@ #include "ray/object_manager/object_directory.h" #include "ray/object_manager/object_manager_client_connection.h" #include "ray/object_manager/object_store_notification_manager.h" -#include "ray/object_manager/transfer_queue.h" namespace ray { @@ -50,11 +49,9 @@ class ObjectManager { /// Implicitly instantiates Ray implementation of ObjectDirectory. /// /// \param main_service The main asio io_service. - /// \param object_manager_service The asio io_service tied to the object manager. /// \param config ObjectManager configuration. /// \param gcs_client A client connection to the Ray GCS. explicit ObjectManager(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &config, std::shared_ptr gcs_client); @@ -63,11 +60,9 @@ class ObjectManager { /// the given ObjectDirectory instance. /// /// \param main_service The main asio io_service. - /// \param object_manager_service The asio io_service tied to the object manager. /// \param config ObjectManager configuration. /// \param od An object implementing the object directory interface. explicit ObjectManager(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &config, std::unique_ptr od); @@ -157,20 +152,28 @@ class ObjectManager { ObjectStoreNotificationManager store_notification_; ObjectBufferPool buffer_pool_; - /// An io service for creating connections to other object managers. - /// This runs on a thread pool. - std::unique_ptr object_manager_service_; + /// This runs on a thread pool dedicated to sending objects. + boost::asio::io_service send_service_; + /// This runs on a thread pool dedicated to receiving objects. + boost::asio::io_service receive_service_; + /// Weak reference to main service. We ensure this object is destroyed before /// main_service_ is stopped. boost::asio::io_service *main_service_; - /// Used to create "work" for an io service, so when it's run, it doesn't exit. - boost::asio::io_service::work work_; + /// Used to create "work" for send_service_. + /// Without this, if send_service_ has no more sends to process, it will stop. + boost::asio::io_service::work send_work_; + /// Used to create "work" for receive_service_. + /// Without this, if receive_service_ has no more receives to process, it will stop. + boost::asio::io_service::work receive_work_; - /// Thread pool for executing asynchronous handlers. - /// These run the object_manager_service_, which handle - /// all incoming and outgoing object transfers. - std::vector io_threads_; + /// Runs the send service, which handle + /// all outgoing object transfers. + std::vector send_threads_; + /// Runs the receive service, which handle + /// all incoming object transfers. + std::vector receive_threads_; /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; @@ -180,26 +183,13 @@ class ObjectManager { UniqueIDHasher> pull_requests_; - /// Allows control of concurrent object transfers. This is a global queue, - /// allowing for concurrent transfers with many object managers as well as - /// concurrent transfers, including both sends and receives, with a single - /// remote object manager. - TransferQueue transfer_queue_; - - /// Variables to track number of concurrent sends and receives. - std::atomic num_transfers_send_; - std::atomic num_transfers_receive_; - - /// Size of thread pool. This is the sum of - /// config_.max_sends and config_.max_receives - const int num_threads_; - /// Cache of locally available objects. std::unordered_map local_objects_; /// Handle starting, running, and stopping asio io_service. void StartIOService(); - void IOServiceLoop(); + void RunSendService(); + void RunReceiveService(); void StopIOService(); /// Register object add with directory. @@ -239,47 +229,36 @@ class ObjectManager { ray::Status PullSendRequest(const ObjectID &object_id, std::shared_ptr conn); - /// Starts as many queued sends and receives as possible without exceeding - /// config_.max_sends and config_.max_receives, respectively. - /// Executes on object_manager_service_ thread pool. - ray::Status DequeueTransfers(); - std::shared_ptr CreateSenderConnection( ConnectionPool::ConnectionType type, RemoteConnectionInfo info); - /// Invoked when a transfer is completed. Invokes DequeueTransfers after - /// updating variables that track concurrent transfers. - /// Executes on object_manager_service_ thread pool. - ray::Status TransferCompleted(TransferQueue::TransferType type); - /// Begin executing a send. - /// Executes on object_manager_service_ thread pool. - ray::Status ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - const RemoteConnectionInfo &connection_info); + /// Executes on send_service_ thread pool. + void ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, + const RemoteConnectionInfo &connection_info); /// This method synchronously sends the object id and object size /// to the remote object manager. - /// Executes on object_manager_service_ thread pool. + /// Executes on send_service_ thread pool. ray::Status SendObjectHeaders(const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, std::shared_ptr conn); /// This method initiates the actual object transfer. - /// Executes on object_manager_service_ thread pool. + /// Executes on send_service_ thread pool. ray::Status SendObjectData(const ObjectID &object_id, const ObjectBufferPool::ChunkInfo &chunk_info, std::shared_ptr conn); /// Invoked when a remote object manager pushes an object to this object manager. - /// This will queue the receive. + /// This will invoke the object receive on the receive_service_ thread pool. void ReceivePushRequest(std::shared_ptr conn, const uint8_t *message); - /// Execute a receive that was in the queue. - ray::Status ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - std::shared_ptr conn); + /// Execute a receive on the receive_service_ thread pool. + void ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, + uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, + std::shared_ptr conn); /// Handles receiving a pull request message. void ReceivePullRequest(std::shared_ptr &conn, diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 006c78d7e..8019725e4 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -27,15 +27,13 @@ int64_t current_time_ms() { class MockServer { public: MockServer(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), - object_manager_(main_service, std::move(object_manager_service), - object_manager_config, gcs_client) { + object_manager_(main_service, object_manager_config, gcs_client) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -118,9 +116,6 @@ class TestObjectManagerBase : public ::testing::Test { void SetUp() { flushall_redis(); - object_manager_service_1.reset(new boost::asio::io_service()); - object_manager_service_2.reset(new boost::asio::io_service()); - // start store store_id_1 = StartStore(UniqueID::from_random().hex()); store_id_2 = StartStore(UniqueID::from_random().hex()); @@ -138,8 +133,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_1.max_sends = max_sends; om_config_1.max_receives = max_receives; om_config_1.object_chunk_size = object_chunk_size; - server1.reset(new MockServer(main_service, std::move(object_manager_service_1), - om_config_1, gcs_client_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server gcs_client_2 = std::shared_ptr(new gcs::AsyncGcsClient()); @@ -149,8 +143,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_2.max_sends = max_sends; om_config_2.max_receives = max_receives; om_config_2.object_chunk_size = object_chunk_size; - server2.reset(new MockServer(main_service, std::move(object_manager_service_2), - om_config_2, gcs_client_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. ARROW_CHECK_OK(client1.Connect(store_id_1, "", PLASMA_DEFAULT_RELEASE_DELAY)); @@ -188,8 +181,6 @@ class TestObjectManagerBase : public ::testing::Test { protected: std::thread p; boost::asio::io_service main_service; - std::unique_ptr object_manager_service_1; - std::unique_ptr object_manager_service_2; std::shared_ptr gcs_client_1; std::shared_ptr gcs_client_2; std::unique_ptr server1; diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index ca6759664..259c3ea82 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -18,15 +18,13 @@ std::string store_executable; class MockServer { public: MockServer(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), - object_manager_(main_service, std::move(object_manager_service), - object_manager_config, gcs_client) { + object_manager_(main_service, object_manager_config, gcs_client) { RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -108,9 +106,6 @@ class TestObjectManager : public ::testing::Test { void SetUp() { flushall_redis(); - object_manager_service_1.reset(new boost::asio::io_service()); - object_manager_service_2.reset(new boost::asio::io_service()); - // start store store_id_1 = StartStore(UniqueID::from_random().hex()); store_id_2 = StartStore(UniqueID::from_random().hex()); @@ -128,8 +123,7 @@ class TestObjectManager : public ::testing::Test { om_config_1.max_sends = max_sends; om_config_1.max_receives = max_receives; om_config_1.object_chunk_size = object_chunk_size; - server1.reset(new MockServer(main_service, std::move(object_manager_service_1), - om_config_1, gcs_client_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server gcs_client_2 = std::shared_ptr(new gcs::AsyncGcsClient()); @@ -139,8 +133,7 @@ class TestObjectManager : public ::testing::Test { om_config_2.max_sends = max_sends; om_config_2.max_receives = max_receives; om_config_2.object_chunk_size = object_chunk_size; - server2.reset(new MockServer(main_service, std::move(object_manager_service_2), - om_config_2, gcs_client_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); // connect to stores. ARROW_CHECK_OK(client1.Connect(store_id_1, "", PLASMA_DEFAULT_RELEASE_DELAY)); @@ -178,8 +171,6 @@ class TestObjectManager : public ::testing::Test { protected: std::thread p; boost::asio::io_service main_service; - std::unique_ptr object_manager_service_1; - std::unique_ptr object_manager_service_2; std::shared_ptr gcs_client_1; std::shared_ptr gcs_client_2; std::unique_ptr server1; diff --git a/src/ray/object_manager/transfer_queue.cc b/src/ray/object_manager/transfer_queue.cc deleted file mode 100644 index b8e4ce01f..000000000 --- a/src/ray/object_manager/transfer_queue.cc +++ /dev/null @@ -1,53 +0,0 @@ -#include "ray/object_manager/transfer_queue.h" - -namespace ray { - -void TransferQueue::QueueSend(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, const RemoteConnectionInfo &info) { - std::lock_guard guard(send_mutex_); - SendRequest req = {client_id, object_id, data_size, metadata_size, chunk_index, info}; - // TODO(hme): Use a set to speed this up. - if (std::find(send_queue_.begin(), send_queue_.end(), req) != send_queue_.end()) { - // already queued. - return; - } - send_queue_.push_back(req); -} - -void TransferQueue::QueueReceive(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, - std::shared_ptr conn) { - std::lock_guard guard(receive_mutex_); - ReceiveRequest req = {client_id, object_id, data_size, - metadata_size, chunk_index, conn}; - if (std::find(receive_queue_.begin(), receive_queue_.end(), req) != - receive_queue_.end()) { - // already queued. - return; - } - receive_queue_.push_back(req); -} - -bool TransferQueue::DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr) { - std::lock_guard guard(send_mutex_); - if (send_queue_.empty()) { - return false; - } - *send_ptr = send_queue_.front(); - send_queue_.pop_front(); - return true; -} - -bool TransferQueue::DequeueReceiveIfPresent(TransferQueue::ReceiveRequest *receive_ptr) { - std::lock_guard guard(receive_mutex_); - if (receive_queue_.empty()) { - return false; - } - *receive_ptr = receive_queue_.front(); - receive_queue_.pop_front(); - return true; -} - -} // namespace ray diff --git a/src/ray/object_manager/transfer_queue.h b/src/ray/object_manager/transfer_queue.h deleted file mode 100644 index c8a6db1e0..000000000 --- a/src/ray/object_manager/transfer_queue.h +++ /dev/null @@ -1,117 +0,0 @@ -#ifndef RAY_OBJECT_MANAGER_TRANSFER_QUEUE_H -#define RAY_OBJECT_MANAGER_TRANSFER_QUEUE_H - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "ray/id.h" -#include "ray/status.h" - -#include "ray/object_manager/format/object_manager_generated.h" -#include "ray/object_manager/object_directory.h" -#include "ray/object_manager/object_manager_client_connection.h" - -namespace ray { - -class TransferQueue { - public: - enum TransferType { SEND = 1, RECEIVE }; - - /// The structure used in the send queue. - struct SendRequest { - ClientID client_id; - ObjectID object_id; - uint64_t data_size; - uint64_t metadata_size; - uint64_t chunk_index; - RemoteConnectionInfo connection_info; - bool operator==(const SendRequest &rhs) const { - return client_id == rhs.client_id && object_id == rhs.object_id && - chunk_index == rhs.chunk_index; - } - }; - - /// The structure used in the receive queue. - struct ReceiveRequest { - ClientID client_id; - ObjectID object_id; - uint64_t data_size; - uint64_t metadata_size; - uint64_t chunk_index; - std::shared_ptr conn; - bool operator==(const ReceiveRequest &rhs) const { - return client_id == rhs.client_id && object_id == rhs.object_id && - chunk_index == rhs.chunk_index; - ; - } - }; - - TransferQueue() = default; - - /// Queues a send. - /// - /// \param client_id The ClientID to which the object needs to be sent. - /// \param object_id The ObjectID of the object to be sent. - /// \param data_size The actual object size + the metadata size. - /// \param metadata_size The size of the object's metadata. - /// \param chunk_index The chunk index, which corresponds to the chunk of object_id that - /// is queued for transfer. - /// \param info Connection information to the remote node, which is required if a new - /// connection needs to be established. - void QueueSend(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index, - const RemoteConnectionInfo &info); - - /// If send_queue_ is not empty, removes a SendRequest from send_queue_ and assigns - /// it to send_ptr. The queue is FIFO. - /// - /// \param send_ptr A pointer to an empty SendRequest. - /// \return A bool indicating whether the queue was empty at the time this method - /// was invoked. - bool DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr); - - /// Queues a receive. - /// - /// \param client_id The ClientID from which the object is being received. - /// \param object_id The ObjectID of the object to be received. - /// \param data_size The actual object size + the metadata size. - /// \param metadata_size The size of the object's metadata. - /// \param chunk_index The chunk index, which corresponds to the chunk of object_id that - /// is queued for transfer. - /// \param conn Connection to the remote object manager that's sending data. - void QueueReceive(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, - std::shared_ptr conn); - - /// If receive_queue_ is not empty, removes a ReceiveRequest from receive_queue_ and - /// assigns it to receive_ptr. The queue is FIFO. - /// - /// \param receive_ptr A pointer to an empty ReceiveRequest. - /// \return A bool indicating whether the queue was empty at the time this method - /// was invoked. - bool DequeueReceiveIfPresent(TransferQueue::ReceiveRequest *receive_ptr); - - /// This object cannot be copied for thread-safety. - RAY_DISALLOW_COPY_AND_ASSIGN(TransferQueue); - - private: - /// Locks access to send_queue_. - std::mutex send_mutex_; - /// Locks access to receive_queue_. - std::mutex receive_mutex_; - std::deque send_queue_; - std::deque receive_queue_; -}; - -} // namespace ray - -#endif // RAY_OBJECT_MANAGER_TRANSFER_QUEUE_H diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 7c027e5de..77a2d708c 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -63,13 +63,10 @@ int main(int argc, char *argv[]) { // Initialize the node manager. boost::asio::io_service main_service; - std::unique_ptr object_manager_service; - object_manager_service.reset(new boost::asio::io_service()); - ray::raylet::Raylet server(main_service, std::move(object_manager_service), - raylet_socket_name, node_ip_address, redis_address, - redis_port, node_manager_config, object_manager_config, - gcs_client); + ray::raylet::Raylet server(main_service, raylet_socket_name, node_ip_address, + redis_address, redis_port, node_manager_config, + object_manager_config, gcs_client); // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 19495d5f0..059af7fec 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -47,9 +47,6 @@ class TestObjectManagerBase : public ::testing::Test { }; void SetUp() { - object_manager_service_1.reset(new boost::asio::io_service()); - object_manager_service_2.reset(new boost::asio::io_service()); - // start store std::string store_sock_1 = StartStore("1"); std::string store_sock_2 = StartStore("2"); @@ -59,18 +56,16 @@ class TestObjectManagerBase : public ::testing::Test { ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_sock_1; server1.reset(new ray::raylet::Raylet( - main_service, std::move(object_manager_service_1), "raylet_1", "0.0.0.0", - "127.0.0.1", 6379, GetNodeManagerConfig("raylet_1", store_sock_1), om_config_1, - gcs_client_1)); + main_service, "raylet_1", "0.0.0.0", "127.0.0.1", 6379, + GetNodeManagerConfig("raylet_1", store_sock_1), om_config_1, gcs_client_1)); // start second server gcs_client_2 = std::shared_ptr(new gcs::AsyncGcsClient()); ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_sock_2; server2.reset(new ray::raylet::Raylet( - main_service, std::move(object_manager_service_2), "raylet_2", "0.0.0.0", - "127.0.0.1", 6379, GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, - gcs_client_2)); + main_service, "raylet_2", "0.0.0.0", "127.0.0.1", 6379, + GetNodeManagerConfig("raylet_2", store_sock_2), om_config_2, gcs_client_2)); // connect to stores. ARROW_CHECK_OK(client1.Connect(store_sock_1, "", PLASMA_DEFAULT_RELEASE_DELAY)); @@ -110,8 +105,6 @@ class TestObjectManagerBase : public ::testing::Test { protected: std::thread p; boost::asio::io_service main_service; - std::unique_ptr object_manager_service_1; - std::unique_ptr object_manager_service_2; std::shared_ptr gcs_client_1; std::shared_ptr gcs_client_2; std::unique_ptr server1; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index a6e001cfe..21ad56541 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -11,16 +11,13 @@ namespace ray { namespace raylet { -Raylet::Raylet(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, - const std::string &socket_name, const std::string &node_ip_address, - const std::string &redis_address, int redis_port, - const NodeManagerConfig &node_manager_config, +Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_name, + const std::string &node_ip_address, const std::string &redis_address, + int redis_port, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client) : gcs_client_(gcs_client), - object_manager_(main_service, std::move(object_manager_service), - object_manager_config, gcs_client), + object_manager_(main_service, object_manager_config, gcs_client), node_manager_(main_service, node_manager_config, object_manager_, gcs_client_), socket_name_(socket_name), acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index bea246583..be634616b 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -34,11 +34,9 @@ class Raylet { /// \param object_manager_config Configuration to initialize the object /// manager. /// \param gcs_client A client connection to the GCS. - Raylet(boost::asio::io_service &main_service, - std::unique_ptr object_manager_service, - const std::string &socket_name, const std::string &node_ip_address, - const std::string &redis_address, int redis_port, - const NodeManagerConfig &node_manager_config, + Raylet(boost::asio::io_service &main_service, const std::string &socket_name, + const std::string &node_ip_address, const std::string &redis_address, + int redis_port, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, std::shared_ptr gcs_client);