From c49554fb7ae62f2b03ebfadb8e6f99e0ccbfaa46 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 16 Nov 2020 20:09:15 -0500 Subject: [PATCH] Abstract plasma store creation request queue (#12039) --- BUILD.bazel | 15 ++++ python/ray/_raylet.pyx | 2 +- src/ray/object_manager/plasma/connection.h | 7 +- .../plasma/create_request_queue.cc | 54 +++++++++++++ .../plasma/create_request_queue.h | 73 +++++++++++++++++ src/ray/object_manager/plasma/store.cc | 70 +++++++++------- src/ray/object_manager/plasma/store.h | 45 +++++------ .../test/create_request_queue_test.cc | 79 +++++++++++++++++++ 8 files changed, 292 insertions(+), 53 deletions(-) create mode 100644 src/ray/object_manager/plasma/create_request_queue.cc create mode 100644 src/ray/object_manager/plasma/create_request_queue.h create mode 100644 src/ray/object_manager/test/create_request_queue_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 9f5d256f5..2ea04ad60 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -280,6 +280,7 @@ cc_library( cc_library( name = "plasma_store_server_lib", srcs = [ + "src/ray/object_manager/plasma/create_request_queue.cc", "src/ray/object_manager/plasma/dlmalloc.cc", "src/ray/object_manager/plasma/eviction_policy.cc", "src/ray/object_manager/plasma/external_store.cc", @@ -290,6 +291,7 @@ cc_library( ], hdrs = [ "src/ray/object_manager/common.h", + "src/ray/object_manager/plasma/create_request_queue.h", "src/ray/object_manager/plasma/eviction_policy.h", "src/ray/object_manager/plasma/external_store.h", "src/ray/object_manager/plasma/plasma_allocator.h", @@ -819,6 +821,19 @@ cc_test( ], ) +cc_test( + name = "create_request_queue_test", + srcs = [ + "src/ray/object_manager/test/create_request_queue_test.cc", + ], + copts = COPTS, + deps = [ + ":plasma_store_server_lib", + "@com_google_googletest//:gtest", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "reconstruction_policy_test", srcs = ["src/ray/raylet/reconstruction_policy_test.cc"], diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7f02dec9f..4561d9dc4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -888,7 +888,7 @@ cdef class CoreWorker: # can't track their lifecycle, so we don't pin the object # in this case. check_status(CCoreWorkerProcess.GetCoreWorker().Seal( - c_object_id, pin_object=object_ref is None)) + c_object_id, pin_object=False)) def put_serialized_object(self, serialized_object, ObjectRef object_ref=None, diff --git a/src/ray/object_manager/plasma/connection.h b/src/ray/object_manager/plasma/connection.h index cda267deb..aae683fb9 100644 --- a/src/ray/object_manager/plasma/connection.h +++ b/src/ray/object_manager/plasma/connection.h @@ -16,8 +16,13 @@ class Client; using PlasmaStoreMessageHandler = std::function, flatbuf::MessageType, const std::vector&)>; +class ClientInterface { + public: + virtual ~ClientInterface() {} +}; + /// Contains all information that is associated with a Plasma store client. -class Client : public ray::ClientConnection { +class Client : public ray::ClientConnection, public ClientInterface { public: static std::shared_ptr Create( PlasmaStoreMessageHandler message_handler, ray::local_stream_socket &&socket); diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc new file mode 100644 index 000000000..c730bcec1 --- /dev/null +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -0,0 +1,54 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/object_manager/plasma/create_request_queue.h" + +#include + +#include + +#include "ray/object_manager/plasma/common.h" +#include "ray/util/asio_util.h" +#include "ray/util/util.h" + +namespace plasma { + +void CreateRequestQueue::AddRequest(const std::shared_ptr &client, const CreateObjectCallback &request_callback) { + queue_.push_back({client, request_callback}); +} + +Status CreateRequestQueue::ProcessRequests() { + for (auto request_it = queue_.begin(); + request_it != queue_.end(); ) { + auto status = request_it->second(); + if (status.IsTransientObjectStoreFull()) { + return status; + } + request_it = queue_.erase(request_it); + } + return Status::OK(); +} + + +void CreateRequestQueue::RemoveDisconnectedClientRequests(const std::shared_ptr &client) { + for (auto it = queue_.begin(); it != queue_.end(); ) { + if (it->first == client) { + it = queue_.erase(it); + } else { + it++; + } + } +} + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h new file mode 100644 index 000000000..f283d8cc7 --- /dev/null +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -0,0 +1,73 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "ray/common/status.h" +#include "ray/object_manager/plasma/common.h" +#include "ray/object_manager/plasma/connection.h" + +namespace plasma { + +using ray::Status; + +using CreateObjectCallback = std::function; + +class CreateRequestQueue { + public: + CreateRequestQueue() {} + + /// Add a request to the queue. + /// + /// The request may not get tried immediately if the head of the queue is not + /// serviceable. + /// + /// \param client The client that sent the request. + void AddRequest(const std::shared_ptr &client, const CreateObjectCallback &request_callback); + + /// Process requests in the queue. + /// + /// This will try to process as many requests in the queue as possible, in + /// FIFO order. If the first request is not serviceable, this will break and + /// the caller should try again later. + /// + /// \return Bad status for the first request in the queue if it failed to be + /// serviced, or OK if all requests were fulfilled. + Status ProcessRequests(); + + /// Remove all requests that were made by a client that is now disconnected. + /// + /// \param client The client that was disconnected. + void RemoveDisconnectedClientRequests(const std::shared_ptr &client); + + private: + /// Queue of object creation requests to respond to. Requests will be placed + /// on this queue if the object store does not have enough room at the time + /// that the client made the creation request, but space may be made through + /// object spilling. Once the raylet notifies us that objects have been + /// spilled, we will attempt to process these requests again and respond to + /// the client if successful or out of memory. If more objects must be + /// spilled, the request will be replaced at the head of the queue. + /// TODO(swang): We should also queue objects here even if there is no room + /// in the object store. Then, the client does not need to poll on an + /// OutOfMemory error and we can just respond to them once there is enough + /// space made, or after a timeout. + std::list, const CreateObjectCallback>> queue_; +}; + +} // namespace plasma diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 4e421299f..e9c9f5f05 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -49,6 +49,7 @@ #include "ray/object_manager/plasma/malloc.h" #include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/protocol.h" +#include "ray/util/asio_util.h" #include "ray/util/util.h" #ifdef PLASMA_CUDA @@ -61,6 +62,18 @@ using arrow::cuda::CudaDeviceManager; namespace fb = plasma::flatbuf; +namespace { + +ray::ObjectID GetCreateRequestObjectId(const std::vector &message) { + uint8_t* input = (uint8_t*)message.data(); + size_t input_size = message.size(); + auto request = flatbuffers::GetRoot(input); + RAY_DCHECK(plasma::VerifyFlatbuffer(request, input, input_size)); + return ray::ObjectID::FromBinary(request->object_id()->str()); +} + +} + namespace plasma { struct GetRequest { @@ -113,7 +126,8 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire socket_(main_service), eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), external_store_(external_store), - spill_objects_callback_(spill_objects_callback) { + spill_objects_callback_(spill_objects_callback), + create_request_queue_() { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; #ifdef PLASMA_CUDA @@ -278,6 +292,12 @@ Status PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &cli if (error_code == PlasmaError::TransientOutOfMemory) { RAY_LOG(DEBUG) << "Create object " << object_id << " failed, waiting for object spill"; status = Status::TransientObjectStoreFull("Object store full, queueing creation request"); + } else if (error_code == PlasmaError::OutOfMemory) { + RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id + << ", data_size=" << data_size + << ", metadata_size=" << metadata_size + << ", will send a reply of PlasmaError::OutOfMemory"; + RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, /*mmap_size=*/0)); } else { int64_t mmap_size = 0; if (error_code == PlasmaError::OK && device_num == 0) { @@ -321,12 +341,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, pointer = AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true, &error); if (!pointer) { - if (error == PlasmaError::OutOfMemory) { - RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex() - << ", data_size=" << data_size - << ", metadata_size=" << metadata_size - << ", will send a reply of PlasmaError::OutOfMemory"; - } return error; } } else { @@ -624,6 +638,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. if (entry->ref_count == 0) { + RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id; if (deletion_cache_.count(object_id) == 0) { // Tell the eviction policy that this object is no longer being used. eviction_policy_.EndObjectAccess(object_id); @@ -859,13 +874,7 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr &client) { notification_clients_.erase(client); } - for (auto it = create_request_queue_.begin(); it != create_request_queue_.end(); ) { - if (it->first == client) { - it = create_request_queue_.erase(it); - } else { - it++; - } - } + create_request_queue_.RemoveDisconnectedClientRequests(client); } /// Send notifications about sealed objects to the subscribers. This is called @@ -963,7 +972,10 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, // Process the different types of requests. switch (type) { case fb::MessageType::PlasmaCreateRequest: { - create_request_queue_.push_back({client, message}); + RAY_LOG(DEBUG) << "Received create request for object " << GetCreateRequestObjectId(message); + create_request_queue_.AddRequest(client, [this, client, message]() { + return HandleCreateObjectRequest(client, message); + }); ProcessCreateRequests(); } break; case fb::MessageType::PlasmaAbortRequest: { @@ -1059,18 +1071,22 @@ void PlasmaStore::DoAccept() { } void PlasmaStore::ProcessCreateRequests() { - for (auto request_it = create_request_queue_.begin(); - request_it != create_request_queue_.end(); ) { - auto status = HandleCreateObjectRequest(request_it->first, request_it->second); - if (status.IsTransientObjectStoreFull()) { - // The object store is still full. - // NOTE(swang): There could be other requests behind this one that are - // actually serviceable. This may be inefficient, but eventually this - // request will get served and unblock the following requests, once - // enough objects have been spilled. - break; - } - request_it = create_request_queue_.erase(request_it); + // Only try to process requests if the timer is not set. If the timer is set, + // that means that the first request is currently not serviceable because + // there is not enough memory. In that case, we should wait for the timer to + // expire before trying any requests again. + if (create_timer_) { + return; + } + + auto status = create_request_queue_.ProcessRequests(); + if (status.IsTransientObjectStoreFull()) { + // Try to process requests later, after space has been made. + create_timer_ = execute_after(io_context_, [this]() { + RAY_LOG(DEBUG) << "OOM timer finished, retrying create requests"; + create_timer_ = nullptr; + ProcessCreateRequests(); + }, delay_on_transient_oom_ms_); } } diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 3bc6c3d79..4ab24ef91 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -24,12 +24,14 @@ #include #include +#include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/object_manager/common.h" #include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/notification/object_store_notification_manager.h" #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/connection.h" +#include "ray/object_manager/plasma/create_request_queue.h" #include "ray/object_manager/plasma/external_store.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/protocol.h" @@ -99,11 +101,14 @@ class PlasmaStore { /// - PlasmaError::TransientOutOfMemory, if the store is temporarily out of /// memory but there may be space soon to create the object. In this /// case, the client should not call plasma_release. - PlasmaError CreateObject(const ObjectID& object_id, const NodeID& owner_raylet_id, - const std::string& owner_ip_address, int owner_port, - const WorkerID& owner_worker_id, bool evict_if_full, - int64_t data_size, int64_t metadata_size, int device_num, - const std::shared_ptr &client, PlasmaObject* result); + PlasmaError CreateObject(const ObjectID& object_id, + const NodeID& owner_raylet_id, + const std::string& owner_ip_address, + int owner_port, const WorkerID& owner_worker_id, + bool evict_if_full, int64_t data_size, + int64_t metadata_size, int device_num, + const std::shared_ptr &client, + PlasmaObject* result); /// Abort a created but unsealed object. If the client is not the /// creator, then the abort will fail. @@ -200,12 +205,6 @@ class PlasmaStore { } /// Process queued requests to create an object. - /// - /// The queue is processed FIFO. - /// - /// \param num_bytes_space A lower bound on the number of bytes of space that - /// have been made newly available, since the last time this method was - /// called. void ProcessCreateRequests(); private: @@ -284,19 +283,17 @@ class PlasmaStore { /// complete. ray::SpillObjectsCallback spill_objects_callback_; - /// Queue of object creation requests to respond to. Requests will be placed - /// on this queue if the object store does not have enough room at the time - /// that the client made the creation request, but space may be made through - /// object spilling. Once the raylet notifies us that objects have been - /// spilled, we will attempt to process these requests again and respond to - /// the client if successful or out of memory. If more objects must be - /// spilled, the request will be replaced at the head of the queue. - /// TODO(swang): We should also queue objects here even if there is no room - /// in the object store. Then, the client does not need to poll on an - /// OutOfMemory error and we can just respond to them once there is enough - /// space made, or after a timeout. - std::list, - const std::vector>> create_request_queue_; + /// The amount of time to wait before retrying a creation request after a + /// transient OOM error. + const uint32_t delay_on_transient_oom_ms_ = 10; + + /// A timer that is set when the first request in the queue is not + /// serviceable because there is not enough memory. The request will be + /// retried when this timer expires. + std::shared_ptr create_timer_; + + /// Queue of object creation requests. + CreateRequestQueue create_request_queue_; }; } // namespace plasma diff --git a/src/ray/object_manager/test/create_request_queue_test.cc b/src/ray/object_manager/test/create_request_queue_test.cc new file mode 100644 index 000000000..5355fe4f7 --- /dev/null +++ b/src/ray/object_manager/test/create_request_queue_test.cc @@ -0,0 +1,79 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/object_manager/plasma/create_request_queue.h" + +#include "gtest/gtest.h" +#include "ray/common/status.h" + +namespace plasma { + +class MockClient : public ClientInterface { + public: + MockClient() {} +}; + +TEST(CreateRequestQueueTest, TestSimple) { + CreateRequestQueue queue; + + bool created = false; + auto request = [&]() { + created = true; + return Status(); + }; + auto client = std::make_shared(); + queue.AddRequest(client, request); + ASSERT_FALSE(created); + ASSERT_TRUE(queue.ProcessRequests().ok()); + ASSERT_TRUE(created); +} + +TEST(CreateRequestQueueTest, TestTransientOom) { + CreateRequestQueue queue; + + int num_created = 0; + Status return_status = Status::TransientObjectStoreFull(""); + auto oom_request = [&]() { + if (return_status.ok()) { + num_created++; + } + return return_status; + }; + auto blocked_request = [&]() { + num_created++; + return Status(); + }; + + auto client = std::make_shared(); + queue.AddRequest(client, oom_request); + queue.AddRequest(client, blocked_request); + + // Transient OOM should not use up any retries. + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull()); + ASSERT_EQ(num_created, 0); + } + + // Return OK for the first request. The second request should also be served. + return_status = Status(); + ASSERT_TRUE(queue.ProcessRequests().ok()); + ASSERT_EQ(num_created, 2); +} + +} // namespace plasma + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}