Abstract plasma store creation request queue (#12039)

This commit is contained in:
Stephanie Wang
2020-11-16 20:09:15 -05:00
committed by GitHub
parent 9f5986ee58
commit c49554fb7a
8 changed files with 292 additions and 53 deletions
+15
View File
@@ -280,6 +280,7 @@ cc_library(
cc_library(
name = "plasma_store_server_lib",
srcs = [
"src/ray/object_manager/plasma/create_request_queue.cc",
"src/ray/object_manager/plasma/dlmalloc.cc",
"src/ray/object_manager/plasma/eviction_policy.cc",
"src/ray/object_manager/plasma/external_store.cc",
@@ -290,6 +291,7 @@ cc_library(
],
hdrs = [
"src/ray/object_manager/common.h",
"src/ray/object_manager/plasma/create_request_queue.h",
"src/ray/object_manager/plasma/eviction_policy.h",
"src/ray/object_manager/plasma/external_store.h",
"src/ray/object_manager/plasma/plasma_allocator.h",
@@ -819,6 +821,19 @@ cc_test(
],
)
cc_test(
name = "create_request_queue_test",
srcs = [
"src/ray/object_manager/test/create_request_queue_test.cc",
],
copts = COPTS,
deps = [
":plasma_store_server_lib",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "reconstruction_policy_test",
srcs = ["src/ray/raylet/reconstruction_policy_test.cc"],
+1 -1
View File
@@ -888,7 +888,7 @@ cdef class CoreWorker:
# can't track their lifecycle, so we don't pin the object
# in this case.
check_status(CCoreWorkerProcess.GetCoreWorker().Seal(
c_object_id, pin_object=object_ref is None))
c_object_id, pin_object=False))
def put_serialized_object(self, serialized_object,
ObjectRef object_ref=None,
+6 -1
View File
@@ -16,8 +16,13 @@ class Client;
using PlasmaStoreMessageHandler =
std::function<ray::Status(std::shared_ptr<Client>, flatbuf::MessageType, const std::vector<uint8_t>&)>;
class ClientInterface {
public:
virtual ~ClientInterface() {}
};
/// Contains all information that is associated with a Plasma store client.
class Client : public ray::ClientConnection {
class Client : public ray::ClientConnection, public ClientInterface {
public:
static std::shared_ptr<Client> Create(
PlasmaStoreMessageHandler message_handler, ray::local_stream_socket &&socket);
@@ -0,0 +1,54 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/object_manager/plasma/create_request_queue.h"
#include <stdlib.h>
#include <memory>
#include "ray/object_manager/plasma/common.h"
#include "ray/util/asio_util.h"
#include "ray/util/util.h"
namespace plasma {
void CreateRequestQueue::AddRequest(const std::shared_ptr<ClientInterface> &client, const CreateObjectCallback &request_callback) {
queue_.push_back({client, request_callback});
}
Status CreateRequestQueue::ProcessRequests() {
for (auto request_it = queue_.begin();
request_it != queue_.end(); ) {
auto status = request_it->second();
if (status.IsTransientObjectStoreFull()) {
return status;
}
request_it = queue_.erase(request_it);
}
return Status::OK();
}
void CreateRequestQueue::RemoveDisconnectedClientRequests(const std::shared_ptr<ClientInterface> &client) {
for (auto it = queue_.begin(); it != queue_.end(); ) {
if (it->first == client) {
it = queue_.erase(it);
} else {
it++;
}
}
}
} // namespace plasma
@@ -0,0 +1,73 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "ray/common/status.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/connection.h"
namespace plasma {
using ray::Status;
using CreateObjectCallback = std::function<Status()>;
class CreateRequestQueue {
public:
CreateRequestQueue() {}
/// Add a request to the queue.
///
/// The request may not get tried immediately if the head of the queue is not
/// serviceable.
///
/// \param client The client that sent the request.
void AddRequest(const std::shared_ptr<ClientInterface> &client, const CreateObjectCallback &request_callback);
/// Process requests in the queue.
///
/// This will try to process as many requests in the queue as possible, in
/// FIFO order. If the first request is not serviceable, this will break and
/// the caller should try again later.
///
/// \return Bad status for the first request in the queue if it failed to be
/// serviced, or OK if all requests were fulfilled.
Status ProcessRequests();
/// Remove all requests that were made by a client that is now disconnected.
///
/// \param client The client that was disconnected.
void RemoveDisconnectedClientRequests(const std::shared_ptr<ClientInterface> &client);
private:
/// Queue of object creation requests to respond to. Requests will be placed
/// on this queue if the object store does not have enough room at the time
/// that the client made the creation request, but space may be made through
/// object spilling. Once the raylet notifies us that objects have been
/// spilled, we will attempt to process these requests again and respond to
/// the client if successful or out of memory. If more objects must be
/// spilled, the request will be replaced at the head of the queue.
/// TODO(swang): We should also queue objects here even if there is no room
/// in the object store. Then, the client does not need to poll on an
/// OutOfMemory error and we can just respond to them once there is enough
/// space made, or after a timeout.
std::list<std::pair<const std::shared_ptr<ClientInterface>, const CreateObjectCallback>> queue_;
};
} // namespace plasma
+43 -27
View File
@@ -49,6 +49,7 @@
#include "ray/object_manager/plasma/malloc.h"
#include "ray/object_manager/plasma/plasma_allocator.h"
#include "ray/object_manager/plasma/protocol.h"
#include "ray/util/asio_util.h"
#include "ray/util/util.h"
#ifdef PLASMA_CUDA
@@ -61,6 +62,18 @@ using arrow::cuda::CudaDeviceManager;
namespace fb = plasma::flatbuf;
namespace {
ray::ObjectID GetCreateRequestObjectId(const std::vector<uint8_t> &message) {
uint8_t* input = (uint8_t*)message.data();
size_t input_size = message.size();
auto request = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(input);
RAY_DCHECK(plasma::VerifyFlatbuffer(request, input, input_size));
return ray::ObjectID::FromBinary(request->object_id()->str());
}
}
namespace plasma {
struct GetRequest {
@@ -113,7 +126,8 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
socket_(main_service),
eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
external_store_(external_store),
spill_objects_callback_(spill_objects_callback) {
spill_objects_callback_(spill_objects_callback),
create_request_queue_() {
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
#ifdef PLASMA_CUDA
@@ -278,6 +292,12 @@ Status PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client> &cli
if (error_code == PlasmaError::TransientOutOfMemory) {
RAY_LOG(DEBUG) << "Create object " << object_id << " failed, waiting for object spill";
status = Status::TransientObjectStoreFull("Object store full, queueing creation request");
} else if (error_code == PlasmaError::OutOfMemory) {
RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory";
RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, /*mmap_size=*/0));
} else {
int64_t mmap_size = 0;
if (error_code == PlasmaError::OK && device_num == 0) {
@@ -321,12 +341,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id,
pointer =
AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true, &error);
if (!pointer) {
if (error == PlasmaError::OutOfMemory) {
RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex()
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory";
}
return error;
}
} else {
@@ -624,6 +638,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->ref_count == 0) {
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id;
if (deletion_cache_.count(object_id) == 0) {
// Tell the eviction policy that this object is no longer being used.
eviction_policy_.EndObjectAccess(object_id);
@@ -859,13 +874,7 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr<Client> &client) {
notification_clients_.erase(client);
}
for (auto it = create_request_queue_.begin(); it != create_request_queue_.end(); ) {
if (it->first == client) {
it = create_request_queue_.erase(it);
} else {
it++;
}
}
create_request_queue_.RemoveDisconnectedClientRequests(client);
}
/// Send notifications about sealed objects to the subscribers. This is called
@@ -963,7 +972,10 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
// Process the different types of requests.
switch (type) {
case fb::MessageType::PlasmaCreateRequest: {
create_request_queue_.push_back({client, message});
RAY_LOG(DEBUG) << "Received create request for object " << GetCreateRequestObjectId(message);
create_request_queue_.AddRequest(client, [this, client, message]() {
return HandleCreateObjectRequest(client, message);
});
ProcessCreateRequests();
} break;
case fb::MessageType::PlasmaAbortRequest: {
@@ -1059,18 +1071,22 @@ void PlasmaStore::DoAccept() {
}
void PlasmaStore::ProcessCreateRequests() {
for (auto request_it = create_request_queue_.begin();
request_it != create_request_queue_.end(); ) {
auto status = HandleCreateObjectRequest(request_it->first, request_it->second);
if (status.IsTransientObjectStoreFull()) {
// The object store is still full.
// NOTE(swang): There could be other requests behind this one that are
// actually serviceable. This may be inefficient, but eventually this
// request will get served and unblock the following requests, once
// enough objects have been spilled.
break;
}
request_it = create_request_queue_.erase(request_it);
// Only try to process requests if the timer is not set. If the timer is set,
// that means that the first request is currently not serviceable because
// there is not enough memory. In that case, we should wait for the timer to
// expire before trying any requests again.
if (create_timer_) {
return;
}
auto status = create_request_queue_.ProcessRequests();
if (status.IsTransientObjectStoreFull()) {
// Try to process requests later, after space has been made.
create_timer_ = execute_after(io_context_, [this]() {
RAY_LOG(DEBUG) << "OOM timer finished, retrying create requests";
create_timer_ = nullptr;
ProcessCreateRequests();
}, delay_on_transient_oom_ms_);
}
}
+21 -24
View File
@@ -24,12 +24,14 @@
#include <unordered_set>
#include <vector>
#include "ray/common/ray_config.h"
#include "ray/common/status.h"
#include "ray/object_manager/common.h"
#include "ray/object_manager/format/object_manager_generated.h"
#include "ray/object_manager/notification/object_store_notification_manager.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/connection.h"
#include "ray/object_manager/plasma/create_request_queue.h"
#include "ray/object_manager/plasma/external_store.h"
#include "ray/object_manager/plasma/plasma.h"
#include "ray/object_manager/plasma/protocol.h"
@@ -99,11 +101,14 @@ class PlasmaStore {
/// - PlasmaError::TransientOutOfMemory, if the store is temporarily out of
/// memory but there may be space soon to create the object. In this
/// case, the client should not call plasma_release.
PlasmaError CreateObject(const ObjectID& object_id, const NodeID& owner_raylet_id,
const std::string& owner_ip_address, int owner_port,
const WorkerID& owner_worker_id, bool evict_if_full,
int64_t data_size, int64_t metadata_size, int device_num,
const std::shared_ptr<Client> &client, PlasmaObject* result);
PlasmaError CreateObject(const ObjectID& object_id,
const NodeID& owner_raylet_id,
const std::string& owner_ip_address,
int owner_port, const WorkerID& owner_worker_id,
bool evict_if_full, int64_t data_size,
int64_t metadata_size, int device_num,
const std::shared_ptr<Client> &client,
PlasmaObject* result);
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
@@ -200,12 +205,6 @@ class PlasmaStore {
}
/// Process queued requests to create an object.
///
/// The queue is processed FIFO.
///
/// \param num_bytes_space A lower bound on the number of bytes of space that
/// have been made newly available, since the last time this method was
/// called.
void ProcessCreateRequests();
private:
@@ -284,19 +283,17 @@ class PlasmaStore {
/// complete.
ray::SpillObjectsCallback spill_objects_callback_;
/// Queue of object creation requests to respond to. Requests will be placed
/// on this queue if the object store does not have enough room at the time
/// that the client made the creation request, but space may be made through
/// object spilling. Once the raylet notifies us that objects have been
/// spilled, we will attempt to process these requests again and respond to
/// the client if successful or out of memory. If more objects must be
/// spilled, the request will be replaced at the head of the queue.
/// TODO(swang): We should also queue objects here even if there is no room
/// in the object store. Then, the client does not need to poll on an
/// OutOfMemory error and we can just respond to them once there is enough
/// space made, or after a timeout.
std::list<std::pair<const std::shared_ptr<Client>,
const std::vector<uint8_t>>> create_request_queue_;
/// The amount of time to wait before retrying a creation request after a
/// transient OOM error.
const uint32_t delay_on_transient_oom_ms_ = 10;
/// A timer that is set when the first request in the queue is not
/// serviceable because there is not enough memory. The request will be
/// retried when this timer expires.
std::shared_ptr<boost::asio::deadline_timer> create_timer_;
/// Queue of object creation requests.
CreateRequestQueue create_request_queue_;
};
} // namespace plasma
@@ -0,0 +1,79 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/object_manager/plasma/create_request_queue.h"
#include "gtest/gtest.h"
#include "ray/common/status.h"
namespace plasma {
class MockClient : public ClientInterface {
public:
MockClient() {}
};
TEST(CreateRequestQueueTest, TestSimple) {
CreateRequestQueue queue;
bool created = false;
auto request = [&]() {
created = true;
return Status();
};
auto client = std::make_shared<MockClient>();
queue.AddRequest(client, request);
ASSERT_FALSE(created);
ASSERT_TRUE(queue.ProcessRequests().ok());
ASSERT_TRUE(created);
}
TEST(CreateRequestQueueTest, TestTransientOom) {
CreateRequestQueue queue;
int num_created = 0;
Status return_status = Status::TransientObjectStoreFull("");
auto oom_request = [&]() {
if (return_status.ok()) {
num_created++;
}
return return_status;
};
auto blocked_request = [&]() {
num_created++;
return Status();
};
auto client = std::make_shared<MockClient>();
queue.AddRequest(client, oom_request);
queue.AddRequest(client, blocked_request);
// Transient OOM should not use up any retries.
for (int i = 0; i < 3; i++) {
ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull());
ASSERT_EQ(num_created, 0);
}
// Return OK for the first request. The second request should also be served.
return_status = Status();
ASSERT_TRUE(queue.ProcessRequests().ok());
ASSERT_EQ(num_created, 2);
}
} // namespace plasma
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}