Modernize plasma store (C to C++ changes). (#546)

This commit is contained in:
Philipp Moritz
2017-05-15 01:19:44 -07:00
committed by Robert Nishihara
parent e2e9e4ce6f
commit 08e988aee5
14 changed files with 794 additions and 738 deletions
+1
View File
@@ -341,6 +341,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
"--track-origins=yes",
"--leak-check=full",
"--show-leak-kinds=all",
"--leak-check-heuristics=stdstring",
"--error-exitcode=1"] + command,
stdout=stdout_file, stderr=stderr_file)
time.sleep(1.0)
-3
View File
@@ -14,7 +14,6 @@
#include <execinfo.h>
#endif
#include "utarray.h"
#ifdef __cplusplus
#include <functional>
extern "C" {
@@ -145,8 +144,6 @@ extern "C" {
typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } UniqueID;
extern const UT_icd object_id_icd;
extern const UniqueID NIL_ID;
/* Generate a globally unique ID. */
+30
View File
@@ -384,6 +384,36 @@ disconnected:
return 0;
}
int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer) {
int64_t version;
int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
int64_t length;
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
goto disconnected;
}
closed = read_bytes(fd, (uint8_t *) &length, sizeof(length));
if (closed) {
goto disconnected;
}
if (length > buffer.size()) {
buffer.resize(length);
}
closed = read_bytes(fd, buffer.data(), length);
if (closed) {
goto disconnected;
}
return length;
disconnected:
/* Handle the case in which the socket is closed. */
*type = DISCONNECT_CLIENT;
return 0;
}
void write_log_message(int fd, const char *message) {
/* Account for the \0 at the end of the string. */
write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message);
+18
View File
@@ -4,6 +4,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <vector>
#include "utarray.h"
#define RAY_PROTOCOL_VERSION 0x0000000000000000
@@ -187,6 +188,23 @@ uint8_t *read_message_async(event_loop *loop, int sock);
*/
int64_t read_buffer(int fd, int64_t *type, UT_array *buffer);
/**
* Read a sequence of bytes written by write_message from a file descriptor.
* This does not allocate space for the message if the provided buffer is
* large enough and can therefore often avoid allocations.
*
* @param fd The file descriptor to read from. It can be non-blocking.
* @param type The type of the message that is read will be written at this
* address. If there was an error while reading, this will be
* DISCONNECT_CLIENT.
* @param buffer The array the message will be written to. If it is not
* large enough to hold the message, it will be enlarged by read_vector.
* @return Number of bytes of the message that were read. This size does not
* include the bytes used to encode the type and length. If there was
* an error while reading, this will be 0.
*/
int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer);
/**
* Write a null-terminated string to a file descriptor.
*/
+2
View File
@@ -70,6 +70,7 @@ set_source_files_properties(thirdparty/dlmalloc.c PROPERTIES COMPILE_FLAGS -Wno-
add_executable(plasma_store
plasma_store.cc
plasma.cc
plasma_events.cc
plasma_protocol.cc
eviction_policy.cc
fling.c
@@ -82,6 +83,7 @@ target_link_libraries(plasma_store common ${FLATBUFFERS_STATIC_LIB})
add_library(plasma_lib STATIC
plasma_client.cc
plasma.cc
plasma_events.cc
plasma_protocol.cc
fling.c
thirdparty/xxhash.c)
+57 -119
View File
@@ -1,156 +1,94 @@
#include "eviction_policy.h"
#include <list>
#include <unordered_map>
class LRUCache {
private:
/** A doubly-linked list containing the items in the cache and
* their sizes in LRU order. */
typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
ItemList item_list_;
/** A hash table mapping the object ID of an object in the cache to its
* location in the doubly linked list item_list_. */
std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
public:
LRUCache(){};
void add(const ObjectID &key, int64_t size) {
auto it = item_map_.find(key);
CHECK(it == item_map_.end());
item_list_.push_front(std::make_pair(key, size));
item_map_.insert(std::make_pair(key, item_list_.begin()));
}
void remove(const ObjectID &key) {
auto it = item_map_.find(key);
CHECK(it != item_map_.end());
item_list_.erase(it->second);
item_map_.erase(it);
}
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID> &objects_to_evict) {
int64_t bytes_evicted = 0;
auto it = item_list_.end();
while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
it--;
objects_to_evict.push_back(it->first);
bytes_evicted += it->second;
}
return bytes_evicted;
}
};
/** The part of the Plasma state that is maintained by the eviction policy. */
struct EvictionState {
/** The amount of memory (in bytes) currently being used. */
int64_t memory_used;
/** Datastructure for the LRU cache. */
LRUCache cache;
};
EvictionState *EvictionState_init() {
EvictionState *s = new EvictionState();
s->memory_used = 0;
return s;
void LRUCache::add(const ObjectID &key, int64_t size) {
auto it = item_map_.find(key);
CHECK(it == item_map_.end());
/* Note that it is important to use a list so the iterators stay valid. */
item_list_.emplace_front(key, size);
item_map_.emplace(key, item_list_.begin());
}
void EvictionState_free(EvictionState *s) {
delete s;
void LRUCache::remove(const ObjectID &key) {
auto it = item_map_.find(key);
CHECK(it != item_map_.end());
item_list_.erase(it->second);
item_map_.erase(it);
}
int64_t EvictionState_choose_objects_to_evict(
EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
int64_t LRUCache::choose_objects_to_evict(
int64_t num_bytes_required,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict) {
std::vector<ObjectID> objs_to_evict;
int64_t bytes_evicted = eviction_state->cache.choose_objects_to_evict(
num_bytes_required, objs_to_evict);
/* Update the LRU cache. */
for (auto &object_id : objs_to_evict) {
eviction_state->cache.remove(object_id);
std::vector<ObjectID> &objects_to_evict) {
int64_t bytes_evicted = 0;
auto it = item_list_.end();
while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
it--;
objects_to_evict.push_back(it->first);
bytes_evicted += it->second;
}
/* Construct the return values. */
*num_objects_to_evict = objs_to_evict.size();
if (objs_to_evict.size() == 0) {
*objects_to_evict = NULL;
} else {
int64_t result_size = objs_to_evict.size() * sizeof(ObjectID);
*objects_to_evict = (ObjectID *) malloc(result_size);
memcpy(*objects_to_evict, objs_to_evict.data(), result_size);
}
/* Update the number of bytes used. */
eviction_state->memory_used -= bytes_evicted;
return bytes_evicted;
}
void EvictionState_object_created(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID object_id) {
ObjectTableEntry *entry = plasma_store_info->objects[object_id];
eviction_state->cache.add(object_id,
entry->info.data_size + entry->info.metadata_size);
EvictionPolicy::EvictionPolicy(PlasmaStoreInfo *store_info)
: memory_used_(0), store_info_(store_info) {}
int64_t EvictionPolicy::choose_objects_to_evict(
int64_t num_bytes_required,
std::vector<ObjectID> &objects_to_evict) {
int64_t bytes_evicted =
cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
/* Update the LRU cache. */
for (auto &object_id : objects_to_evict) {
cache_.remove(object_id);
}
/* Update the number of bytes used. */
memory_used_ -= bytes_evicted;
return bytes_evicted;
}
bool EvictionState_require_space(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict) {
void EvictionPolicy::object_created(ObjectID object_id) {
auto entry = store_info_->objects[object_id].get();
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}
bool EvictionPolicy::require_space(int64_t size,
std::vector<ObjectID> &objects_to_evict) {
/* Check if there is enough space to create the object. */
int64_t required_space =
eviction_state->memory_used + size - plasma_store_info->memory_capacity;
int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
int64_t num_bytes_evicted;
if (required_space > 0) {
/* Try to free up at least as much space as we need right now but ideally
* up to 20% of the total capacity. */
int64_t space_to_free = MAX(size, plasma_store_info->memory_capacity / 5);
int64_t space_to_free = MAX(size, store_info_->memory_capacity / 5);
LOG_DEBUG("not enough space to create this object, so evicting objects");
/* Choose some objects to evict, and update the return pointers. */
num_bytes_evicted = EvictionState_choose_objects_to_evict(
eviction_state, plasma_store_info, space_to_free, num_objects_to_evict,
objects_to_evict);
num_bytes_evicted =
choose_objects_to_evict(space_to_free, objects_to_evict);
LOG_INFO(
"There is not enough space to create this object, so evicting "
"%" PRId64 " objects to free up %" PRId64 " bytes.",
*num_objects_to_evict, num_bytes_evicted);
"%zu objects to free up %" PRId64 " bytes.",
objects_to_evict.size(), num_bytes_evicted);
} else {
num_bytes_evicted = 0;
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
}
if (num_bytes_evicted >= required_space) {
/* We only increment the space used if there is enough space to create the
* object. */
eviction_state->memory_used += size;
memory_used_ += size;
}
return num_bytes_evicted >= required_space;
}
void EvictionState_begin_object_access(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID object_id,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict) {
void EvictionPolicy::begin_object_access(
ObjectID object_id,
std::vector<ObjectID> &objects_to_evict) {
/* If the object is in the LRU cache, remove it. */
eviction_state->cache.remove(object_id);
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
cache_.remove(object_id);
}
void EvictionState_end_object_access(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID object_id,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict) {
ObjectTableEntry *entry = plasma_store_info->objects[object_id];
void EvictionPolicy::end_object_access(
ObjectID object_id,
std::vector<ObjectID> &objects_to_evict) {
auto entry = store_info_->objects[object_id].get();
/* Add the object to the LRU cache.*/
eviction_state->cache.add(object_id,
entry->info.data_size + entry->info.metadata_size);
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}
+106 -128
View File
@@ -1,6 +1,9 @@
#ifndef EVICTION_POLICY_H
#define EVICTION_POLICY_H
#include <list>
#include <unordered_map>
#include "plasma.h"
/* ==== The eviction policy ====
@@ -10,140 +13,115 @@
* Plasma store.
*/
/** Internal state of the eviction policy. */
typedef struct EvictionState EvictionState;
class LRUCache {
private:
/** A doubly-linked list containing the items in the cache and
* their sizes in LRU order. */
typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
ItemList item_list_;
/** A hash table mapping the object ID of an object in the cache to its
* location in the doubly linked list item_list_. */
std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
/**
* Initialize the eviction policy state.
*
* @param system_memory The amount of memory that can be used by the Plasma
* store.
* @return The internal state of the eviction policy.
*/
EvictionState *EvictionState_init(void);
public:
LRUCache(){};
/**
* Free the eviction policy state.
*
* @param state The state managed by the eviction policy.
* @return Void.
*/
void EvictionState_free(EvictionState *state);
void add(const ObjectID &key, int64_t size);
/**
* This method will be called whenever an object is first created in order to
* add it to the LRU cache. This is done so that the first time, the Plasma
* store calls begin_object_access, we can remove the object from the LRU cache.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The object ID of the object that was created.
* @return Void.
*/
void EvictionState_object_created(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID obj_id);
void remove(const ObjectID &key);
/**
* This method will be called when the Plasma store needs more space, perhaps to
* create a new object. If the required amount of space cannot be freed up, then
* a fatal error will be thrown. When this method is called, the eviction policy
* will assume that the objects chosen to be evicted will in fact be evicted
* from the Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param size The size in bytes of the new object, including both data and
* metadata.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return True if enough space can be freed and false otherwise.
*/
bool EvictionState_require_space(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict);
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID> &objects_to_evict);
};
/**
* This method will be called whenever an unused object in the Plasma store
* starts to be used. When this method is called, the eviction policy will
* assume that the objects chosen to be evicted will in fact be evicted from the
* Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The ID of the object that is now being used.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
*/
void EvictionState_begin_object_access(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID obj_id,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict);
/** The eviction policy. */
class EvictionPolicy {
public:
/**
* Construct an eviction policy.
*
* @param store_info Information about the Plasma store that is exposed
* to the eviction policy.
*/
EvictionPolicy(PlasmaStoreInfo *store_info);
/**
* This method will be called whenever an object in the Plasma store that was
* being used is no longer being used. When this method is called, the eviction
* policy will assume that the objects chosen to be evicted will in fact be
* evicted from the Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The ID of the object that is no longer being used.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
*/
void EvictionState_end_object_access(EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
ObjectID obj_id,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict);
/**
* This method will be called whenever an object is first created in order to
* add it to the LRU cache. This is done so that the first time, the Plasma
* store calls begin_object_access, we can remove the object from the LRU
* cache.
*
* @param object_id The object ID of the object that was created.
* @return Void.
*/
void object_created(ObjectID object_id);
/**
* Choose some objects to evict from the Plasma store. When this method is
* called, the eviction policy will assume that the objects chosen to be evicted
* will in fact be evicted from the Plasma store by the caller.
*
* @note This method is not part of the API. It is exposed in the header file
* only for testing.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param num_bytes_required The number of bytes of space to try to free up.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return The total number of bytes of space chosen to be evicted.
*/
int64_t EvictionState_choose_objects_to_evict(
EvictionState *eviction_state,
PlasmaStoreInfo *plasma_store_info,
int64_t num_bytes_required,
int64_t *num_objects_to_evict,
ObjectID **objects_to_evict);
/**
* This method will be called when the Plasma store needs more space, perhaps
* to create a new object. If the required amount of space cannot be freed up,
* then a fatal error will be thrown. When this method is called, the eviction
* policy will assume that the objects chosen to be evicted will in fact be
* evicted from the Plasma store by the caller.
*
* @param size The size in bytes of the new object, including both data and
* metadata.
* @param objects_to_evict The object IDs that were chosen for eviction will
* be stored into this vector.
* @return True if enough space can be freed and false otherwise.
*/
bool require_space(int64_t size, std::vector<ObjectID> &objects_to_evict);
/**
* This method will be called whenever an unused object in the Plasma store
* starts to be used. When this method is called, the eviction policy will
* assume that the objects chosen to be evicted will in fact be evicted from
* the Plasma store by the caller.
*
* @param object_id The ID of the object that is now being used.
* @param objects_to_evict The object IDs that were chosen for eviction will
* be stored into this vector.
* @return Void.
*/
void begin_object_access(ObjectID object_id,
std::vector<ObjectID> &objects_to_evict);
/**
* This method will be called whenever an object in the Plasma store that was
* being used is no longer being used. When this method is called, the
* eviction policy will assume that the objects chosen to be evicted will in
* fact be evicted from the Plasma store by the caller.
*
* @param object_id The ID of the object that is no longer being used.
* @param objects_to_evict The object IDs that were chosen for eviction will
* be stored into this vector.
* @return Void.
*/
void end_object_access(ObjectID object_id,
std::vector<ObjectID> &objects_to_evict);
/**
* Choose some objects to evict from the Plasma store. When this method is
* called, the eviction policy will assume that the objects chosen to be
* evicted will in fact be evicted from the Plasma store by the caller.
*
* @note This method is not part of the API. It is exposed in the header file
* only for testing.
*
* @param num_bytes_required The number of bytes of space to try to free up.
* @param objects_to_evict The object IDs that were chosen for eviction will
* be stored into this vector.
* @return The total number of bytes of space chosen to be evicted.
*/
int64_t choose_objects_to_evict(int64_t num_bytes_required,
std::vector<ObjectID> &objects_to_evict);
private:
/** Pointer to the plasma store info. */
PlasmaStoreInfo *store_info_;
/** The amount of memory (in bytes) currently being used. */
int64_t memory_used_;
/** Datastructure for the LRU cache. */
LRUCache cache_;
};
#endif /* EVICTION_POLICY_H */
+1 -1
View File
@@ -47,5 +47,5 @@ ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info,
if (it == store_info->objects.end()) {
return NULL;
}
return it->second;
return it->second.get();
}
+7 -8
View File
@@ -10,15 +10,13 @@
#include <unistd.h> /* pid_t */
#include <unordered_map>
#include <unordered_set>
#include "common.h"
#include "format/common_generated.h"
#include <inttypes.h>
#include "utarray.h"
#include "uthash.h"
/** Allocation granularity used in plasma for object allocation. */
#define BLOCK_SIZE 64
@@ -108,12 +106,10 @@ struct ObjectTableEntry {
int64_t map_size;
/** Offset from the base of the mmap. */
ptrdiff_t offset;
/** Handle for the uthash table. */
UT_hash_handle handle;
/** Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/** An array of the clients that are currently using this object. */
std::vector<Client *> clients;
/** Set of clients currently using this object. */
std::unordered_set<Client *> clients;
/** The state of the object, e.g., whether it is open or sealed. */
object_state state;
/** The digest of the object. Used to see if two objects are the same. */
@@ -123,7 +119,10 @@ struct ObjectTableEntry {
/** The plasma store information that is exposed to the eviction policy. */
struct PlasmaStoreInfo {
/** Objects that are in the Plasma store. */
std::unordered_map<ObjectID, ObjectTableEntry *, UniqueIDHasher> objects;
std::unordered_map<ObjectID,
std::unique_ptr<ObjectTableEntry>,
UniqueIDHasher>
objects;
/** The amount of memory (in bytes) that we allow to be allocated in the
* store. */
int64_t memory_capacity;
+74
View File
@@ -0,0 +1,74 @@
#include "plasma_events.h"
#include <errno.h>
void EventLoop::file_event_callback(aeEventLoop *loop,
int fd,
void *context,
int events) {
FileCallback *callback = reinterpret_cast<FileCallback *>(context);
(*callback)(events);
}
int EventLoop::timer_event_callback(aeEventLoop *loop,
long long timer_id,
void *context) {
TimerCallback *callback = reinterpret_cast<TimerCallback *>(context);
return (*callback)(timer_id);
}
constexpr int kInitialEventLoopSize = 1024;
EventLoop::EventLoop() {
loop_ = aeCreateEventLoop(kInitialEventLoopSize);
}
bool EventLoop::add_file_event(int fd, int events, FileCallback callback) {
if (file_callbacks_.find(fd) != file_callbacks_.end()) {
return false;
}
auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
void *context = reinterpret_cast<void *>(data.get());
/* Try to add the file descriptor. */
int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback,
context);
/* If it cannot be added, increase the size of the event loop. */
if (err == AE_ERR && errno == ERANGE) {
err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
if (err != AE_OK) {
return false;
}
err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback,
context);
}
/* In any case, test if there were errors. */
if (err == AE_OK) {
file_callbacks_.emplace(fd, std::move(data));
return true;
}
return false;
}
void EventLoop::remove_file_event(int fd) {
aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
file_callbacks_.erase(fd);
}
void EventLoop::run() {
aeMain(loop_);
}
int64_t EventLoop::add_timer(int64_t timeout, TimerCallback callback) {
auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
void *context = reinterpret_cast<void *>(data.get());
int64_t timer_id = aeCreateTimeEvent(
loop_, timeout, EventLoop::timer_event_callback, context, NULL);
timer_callbacks_.emplace(timer_id, std::move(data));
return timer_id;
}
int EventLoop::remove_timer(int64_t timer_id) {
int err = aeDeleteTimeEvent(loop_, timer_id);
timer_callbacks_.erase(timer_id);
return err;
}
+96
View File
@@ -0,0 +1,96 @@
#ifndef PLASMA_EVENTS
#define PLASMA_EVENTS
#include <functional>
#include <memory>
#include <unordered_map>
extern "C" {
#include "ae/ae.h"
}
/** Constant specifying that the timer is done and it will be removed. */
constexpr int kEventLoopTimerDone = AE_NOMORE;
/** Read event on the file descriptor. */
constexpr int kEventLoopRead = AE_READABLE;
/** Write event on the file descriptor. */
constexpr int kEventLoopWrite = AE_WRITABLE;
class EventLoop {
public:
/* Signature of the handler that will be called when there is a new event
* on the file descriptor that this handler has been registered for.
*
* The arguments are the event flags (read or write).
*/
typedef std::function<void(int)> FileCallback;
/* This handler will be called when a timer times out. The timer id is
* passed as an argument. The return is the number of milliseconds the timer
* shall be reset to or kEventLoopTimerDone if the timer shall not be
* triggered again.
*/
typedef std::function<int(int64_t)> TimerCallback;
EventLoop();
/**
* Add a new file event handler to the event loop.
*
* @param fd The file descriptor we are listening to.
* @param events The flags for events we are listening to (read or write).
* @param callback The callback that will be called when the event happens.
* @return Returns true if the event handler was added successfully.
*/
bool add_file_event(int fd, int events, FileCallback callback);
/**
* Remove a file event handler from the event loop.
*
* @param fd The file descriptor of the event handler.
* @return Void.
*/
void remove_file_event(int fd);
/** Register a handler that will be called after a time slice of
* "timeout" milliseconds.
*
* @param timeout The timeout in milliseconds.
* @param callback The callback for the timeout.
* @return The ID of the newly created timer.
*/
int64_t add_timer(int64_t timeout, TimerCallback callback);
/**
* Remove a timer handler from the event loop.
*
* @param timer_id The ID of the timer that is to be removed.
* @return The ae.c error code. TODO(pcm): needs to be standardized
*/
int remove_timer(int64_t timer_id);
/**
* Run the event loop.
*
* @return Void.
*/
void run();
private:
static void file_event_callback(aeEventLoop *loop,
int fd,
void *context,
int events);
static int timer_event_callback(aeEventLoop *loop,
long long timer_id,
void *context);
aeEventLoop *loop_;
std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
};
#endif /* PLASMA_EVENTS */
+1
View File
@@ -2,6 +2,7 @@
#define PLASMA_MANAGER_H
#include <poll.h>
#include "uthash.h"
#include "utarray.h"
#ifndef RAY_NUM_RETRIES
+244 -391
View File
@@ -30,15 +30,11 @@
#include <unordered_set>
#include <vector>
#include "common.h"
#include "event_loop.h"
#include "eviction_policy.h"
#include "plasma_store.h"
#include "format/common_generated.h"
#include "io.h"
#include "malloc.h"
#include "plasma_protocol.h"
#include "plasma_store.h"
#include "plasma.h"
extern "C" {
#include "fling.h"
@@ -48,26 +44,10 @@ void dlfree(void *);
size_t dlmalloc_set_footprint_limit(size_t bytes);
}
/** Contains all information that is associated with a Plasma store client. */
struct Client {
Client(int sock, PlasmaStoreState *plasma_state);
/** The socket used to communicate with the client. */
int sock;
/** A pointer to the global plasma state. */
PlasmaStoreState *plasma_state;
};
struct NotificationQueue {
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
std::deque<uint8_t *> object_notifications;
};
struct GetRequest {
GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]);
GetRequest(Client *client, const std::vector<ObjectID> &object_ids);
/** The client connection that called get. */
/** The client that called get. */
Client *client;
/** The ID of the timer that will time out and cause this wait to return to
* the client if it hasn't already returned. */
@@ -84,72 +64,28 @@ struct GetRequest {
int64_t num_satisfied;
};
struct PlasmaStoreState {
PlasmaStoreState(event_loop *loop, int64_t system_memory);
/* Event loop of the plasma store. */
event_loop *loop;
/** A hash table mapping object IDs to a vector of the get requests that are
* waiting for the object to arrive. */
std::unordered_map<ObjectID, std::vector<GetRequest *>, UniqueIDHasher>
object_get_requests;
/** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file
* descriptor to an array of object_ids to send to that client.
* TODO(pcm): Consider putting this into the Client data structure and
* reorganize the code slightly. */
std::unordered_map<int, NotificationQueue> pending_notifications;
/** The plasma store information, including the object tables, that is exposed
* to the eviction policy. */
PlasmaStoreInfo *plasma_store_info;
/** The state that is managed by the eviction policy. */
EvictionState *eviction_state;
/** Input buffer. This is allocated only once to avoid mallocs for every
* call to process_message. */
UT_array *input_buffer;
/** Buffer that holds memory for serializing plasma protocol messages. */
protocol_builder *builder;
};
PlasmaStoreState *g_state;
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
Client::Client(int sock, PlasmaStoreState *plasma_state)
: sock(sock), plasma_state(plasma_state) {}
GetRequest::GetRequest(Client *client,
int num_object_ids,
ObjectID object_ids[])
GetRequest::GetRequest(Client *client, const std::vector<ObjectID> &object_ids)
: client(client),
timer(-1),
object_ids(object_ids, object_ids + num_object_ids),
objects(num_object_ids),
object_ids(object_ids.begin(), object_ids.end()),
objects(object_ids.size()),
num_satisfied(0) {
std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(
object_ids, object_ids + num_object_ids);
std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(object_ids.begin(),
object_ids.end());
num_objects_to_wait_for = unique_ids.size();
}
PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory)
: loop(loop),
plasma_store_info(new PlasmaStoreInfo()),
eviction_state(EvictionState_init()),
builder(make_protocol_builder()) {
this->plasma_store_info->memory_capacity = system_memory;
Client::Client(int fd) : fd(fd) {}
utarray_new(this->input_buffer, &byte_icd);
PlasmaStore::PlasmaStore(EventLoop *loop, int64_t system_memory)
: loop_(loop),
eviction_policy_(&store_info_),
builder_(make_protocol_builder()) {
store_info_.memory_capacity = system_memory;
}
void PlasmaStoreState_free(PlasmaStoreState *state) {
/* Here we only clean up objects that need to be cleaned
* up to make the valgrind warnings go away. Objects that
* are still reachable are not cleaned up. */
for (const auto &it : state->plasma_store_info->objects) {
delete it.second;
}
for (const auto &element : state->pending_notifications) {
PlasmaStore::~PlasmaStore() {
for (const auto &element : pending_notifications_) {
auto object_notifications = element.second.object_notifications;
for (int i = 0; i < object_notifications.size(); ++i) {
uint8_t *notification = (uint8_t *) object_notifications.at(i);
@@ -157,47 +93,37 @@ void PlasmaStoreState_free(PlasmaStoreState *state) {
free(data);
}
}
free_protocol_builder(builder_);
}
void push_notification(PlasmaStoreState *state,
ObjectInfoT *object_notification);
/* If this client is not already using the object, add the client to the
* object's list of clients, otherwise do nothing. */
void add_client_to_object_clients(ObjectTableEntry *entry,
Client *client_info) {
void PlasmaStore::add_client_to_object_clients(ObjectTableEntry *entry,
Client *client) {
/* Check if this client is already using the object. */
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
return;
}
if (entry->clients.find(client) != entry->clients.end()) {
return;
}
/* If there are no other clients using this object, notify the eviction policy
* that the object is being used. */
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
EvictionState_begin_object_access(
client_info->plasma_state->eviction_state,
client_info->plasma_state->plasma_store_info, entry->object_id,
&num_objects_to_evict, &objects_to_evict);
remove_objects(client_info->plasma_state, num_objects_to_evict,
objects_to_evict);
std::vector<ObjectID> objects_to_evict;
eviction_policy_.begin_object_access(entry->object_id, objects_to_evict);
delete_objects(objects_to_evict);
}
/* Add the client pointer to the list of clients using this object. */
entry->clients.push_back(client_info);
entry->clients.insert(client);
}
/* Create a new object buffer in the hash table. */
int create_object(Client *client_context,
ObjectID obj_id,
int64_t data_size,
int64_t metadata_size,
PlasmaObject *result) {
int PlasmaStore::create_object(ObjectID object_id,
int64_t data_size,
int64_t metadata_size,
Client *client,
PlasmaObject *result) {
LOG_DEBUG("creating object"); /* TODO(pcm): add ObjectID here */
PlasmaStoreState *plasma_state = client_context->plasma_state;
if (plasma_state->plasma_store_info->objects.count(obj_id) != 0) {
if (store_info_.objects.count(object_id) != 0) {
/* There is already an object with the same ID in the Plasma Store, so
* ignore this requst. */
return PlasmaError_ObjectExists;
@@ -216,12 +142,10 @@ int create_object(Client *client_context,
if (pointer == NULL) {
/* Tell the eviction policy how much space we need to create this object.
*/
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
bool success = EvictionState_require_space(
plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict, &objects_to_evict);
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);
std::vector<ObjectID> objects_to_evict;
bool success = eviction_policy_.require_space(data_size + metadata_size,
objects_to_evict);
delete_objects(objects_to_evict);
/* Return an error to the client if not enough space could be freed to
* create the object. */
if (!success) {
@@ -235,9 +159,10 @@ int create_object(Client *client_context,
get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
assert(fd != -1);
ObjectTableEntry *entry = new ObjectTableEntry();
entry->object_id = obj_id;
entry->info.object_id = std::string((char *) &obj_id.id[0], sizeof(obj_id));
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry->object_id = object_id;
entry->info.object_id =
std::string((char *) &object_id.id[0], sizeof(object_id));
entry->info.data_size = data_size;
entry->info.metadata_size = metadata_size;
entry->pointer = pointer;
@@ -246,7 +171,8 @@ int create_object(Client *client_context,
entry->map_size = map_size;
entry->offset = offset;
entry->state = PLASMA_CREATED;
plasma_state->plasma_store_info->objects[obj_id] = entry;
store_info_.objects[object_id] = std::move(entry);
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
result->data_offset = offset;
@@ -256,39 +182,12 @@ int create_object(Client *client_context,
/* Notify the eviction policy that this object was created. This must be done
* immediately before the call to add_client_to_object_clients so that the
* eviction policy does not have an opportunity to evict the object. */
EvictionState_object_created(plasma_state->eviction_state,
plasma_state->plasma_store_info, obj_id);
eviction_policy_.object_created(object_id);
/* Record that this client is using this object. */
add_client_to_object_clients(entry, client_context);
add_client_to_object_clients(store_info_.objects[object_id].get(), client);
return PlasmaError_OK;
}
void add_get_request_for_object(PlasmaStoreState *store_state,
ObjectID object_id,
GetRequest *get_req) {
store_state->object_get_requests[object_id].push_back(get_req);
}
void remove_get_request_for_object(PlasmaStoreState *store_state,
ObjectID object_id,
GetRequest *get_req) {
std::vector<GetRequest *> &get_requests =
store_state->object_get_requests[object_id];
for (auto it = get_requests.begin(); it != get_requests.end(); ++it) {
if (*it == get_req) {
get_requests.erase(it);
break;
}
}
}
void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) {
if (get_req->timer != -1) {
CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK);
}
delete get_req;
}
void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) {
DCHECK(object != NULL);
DCHECK(entry != NULL);
@@ -301,12 +200,12 @@ void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) {
object->metadata_size = entry->info.metadata_size;
}
void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
void PlasmaStore::return_from_get(GetRequest *get_req) {
/* Send the get reply to the client. */
int status = plasma_send_GetReply(get_req->client->sock, store_state->builder,
int status = plasma_send_GetReply(get_req->client->fd, builder_,
&get_req->object_ids[0], get_req->objects,
get_req->object_ids.size());
warn_if_sigpipe(status, get_req->client->sock);
warn_if_sigpipe(status, get_req->client->fd);
/* If we successfully sent the get reply message to the client, then also send
* the file descriptors. */
if (status >= 0) {
@@ -316,7 +215,7 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
/* We use the data size to indicate whether the object is present or not.
*/
if (object.data_size != -1) {
int error_code = send_fd(get_req->client->sock, object.handle.store_fd);
int error_code = send_fd(get_req->client->fd, object.handle.store_fd);
/* If we failed to send the file descriptor, loop until we have sent it
* successfully. TODO(rkn): This is problematic for two reasons. First
* of all, sending the file descriptor should just succeed without any
@@ -326,10 +225,10 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
while (error_code < 0) {
if (errno == EMSGSIZE) {
LOG_WARN("Failed to send file descriptor, retrying.");
error_code = send_fd(get_req->client->sock, object.handle.store_fd);
error_code = send_fd(get_req->client->fd, object.handle.store_fd);
continue;
}
warn_if_sigpipe(error_code, get_req->client->sock);
warn_if_sigpipe(error_code, get_req->client->fd);
break;
}
}
@@ -340,25 +239,30 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
* tables if it is present there. It should only be present there if the get
* request timed out. */
for (ObjectID &object_id : get_req->object_ids) {
remove_get_request_for_object(store_state, object_id, get_req);
auto &get_requests = object_get_requests_[object_id];
/* Erase get_req from the vector. */
auto it = std::find(get_requests.begin(), get_requests.end(), get_req);
if (it != get_requests.end()) {
get_requests.erase(it);
}
}
/* Remove the get request. */
remove_get_request(store_state, get_req);
if (get_req->timer != -1) {
CHECK(loop_->remove_timer(get_req->timer) == AE_OK);
}
delete get_req;
}
void update_object_get_requests(PlasmaStoreState *store_state,
ObjectID obj_id) {
std::vector<GetRequest *> &get_requests =
store_state->object_get_requests[obj_id];
void PlasmaStore::update_object_get_requests(ObjectID object_id) {
std::vector<GetRequest *> &get_requests = object_get_requests_[object_id];
int index = 0;
int num_requests = get_requests.size();
for (int i = 0; i < num_requests; ++i) {
GetRequest *get_req = get_requests[index];
ObjectTableEntry *entry =
get_object_table_entry(store_state->plasma_store_info, obj_id);
auto entry = get_object_table_entry(&store_info_, object_id);
CHECK(entry != NULL);
PlasmaObject_init(&get_req->objects[obj_id], entry);
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
/* Record the fact that this client will be using this object and will
* be responsible for releasing this object. */
@@ -366,7 +270,7 @@ void update_object_get_requests(PlasmaStoreState *store_state,
/* If this get request is done, reply to the client. */
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
return_from_get(store_state, get_req);
return_from_get(get_req);
} else {
/* The call to return_from_get will remove the current element in the
* array, so we only increment the counter in the else branch. */
@@ -377,46 +281,33 @@ void update_object_get_requests(PlasmaStoreState *store_state,
DCHECK(index == get_requests.size());
/* Remove the array of get requests for this object, since no one should be
* waiting for this object anymore. */
store_state->object_get_requests.erase(obj_id);
object_get_requests_.erase(object_id);
}
int get_timeout_handler(event_loop *loop, timer_id id, void *context) {
GetRequest *get_req = (GetRequest *) context;
return_from_get(get_req->client->plasma_state, get_req);
return EVENT_LOOP_TIMER_DONE;
}
void process_get_request(Client *client_context,
int num_object_ids,
ObjectID object_ids[],
uint64_t timeout_ms) {
PlasmaStoreState *plasma_state = client_context->plasma_state;
void PlasmaStore::process_get_request(Client *client,
const std::vector<ObjectID> &object_ids,
uint64_t timeout_ms) {
/* Create a get request for this object. */
GetRequest *get_req =
new GetRequest(client_context, num_object_ids, object_ids);
for (int i = 0; i < num_object_ids; ++i) {
ObjectID obj_id = object_ids[i];
GetRequest *get_req = new GetRequest(client, object_ids);
for (auto object_id : object_ids) {
/* Check if this object is already present locally. If so, record that the
* object is being used and mark it as accounted for. */
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, obj_id);
auto entry = get_object_table_entry(&store_info_, object_id);
if (entry && entry->state == PLASMA_SEALED) {
/* Update the get request to take into account the present object. */
PlasmaObject_init(&get_req->objects[obj_id], entry);
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
/* If necessary, record that this client is using this object. In the case
* where entry == NULL, this will be called from seal_object. */
add_client_to_object_clients(entry, client_context);
add_client_to_object_clients(entry, client);
} else {
/* Add a placeholder plasma object to the get request to indicate that the
* object is not present. This will be parsed by the client. We set the
* data size to -1 to indicate that the object is not present. */
get_req->objects[obj_id].data_size = -1;
get_req->objects[object_id].data_size = -1;
/* Add the get request to the relevant data structures. */
add_get_request_for_object(plasma_state, obj_id, get_req);
object_get_requests_[object_id].push_back(get_req);
}
}
@@ -424,69 +315,57 @@ void process_get_request(Client *client_context,
* the client. */
if (get_req->num_satisfied == get_req->num_objects_to_wait_for ||
timeout_ms == 0) {
return_from_get(plasma_state, get_req);
return_from_get(get_req);
} else if (timeout_ms != -1) {
/* Set a timer that will cause the get request to return to the client. Note
* that a timeout of -1 is used to indicate that no timer should be set. */
get_req->timer = event_loop_add_timer(plasma_state->loop, timeout_ms,
get_timeout_handler, get_req);
get_req->timer =
loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id) {
return_from_get(get_req);
return kEventLoopTimerDone;
});
}
}
int remove_client_from_object_clients(ObjectTableEntry *entry,
Client *client_info) {
/* Find the location of the client in the array. */
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
/* Remove the client from the array. */
entry->clients.erase(entry->clients.begin() + i);
/* If no more clients are using this object, notify the eviction policy
* that the object is no longer being used. */
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is no longer being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
EvictionState_end_object_access(
client_info->plasma_state->eviction_state,
client_info->plasma_state->plasma_store_info, entry->object_id,
&num_objects_to_evict, &objects_to_evict);
remove_objects(client_info->plasma_state, num_objects_to_evict,
objects_to_evict);
}
/* Return 1 to indicate that the client was removed. */
return 1;
int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry *entry,
Client *client) {
auto it = entry->clients.find(client);
if (it != entry->clients.end()) {
entry->clients.erase(it);
/* If no more clients are using this object, notify the eviction policy
* that the object is no longer being used. */
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is no longer being used. */
std::vector<ObjectID> objects_to_evict;
eviction_policy_.end_object_access(entry->object_id, objects_to_evict);
delete_objects(objects_to_evict);
}
/* Return 1 to indicate that the client was removed. */
return 1;
} else {
/* Return 0 to indicate that the client was not removed. */
return 0;
}
/* Return 0 to indicate that the client was not removed. */
return 0;
}
void release_object(Client *client_context, ObjectID object_id) {
PlasmaStoreState *plasma_state = client_context->plasma_state;
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
void PlasmaStore::release_object(ObjectID object_id, Client *client) {
auto entry = get_object_table_entry(&store_info_, object_id);
CHECK(entry != NULL);
/* Remove the client from the object's array of clients. */
CHECK(remove_client_from_object_clients(entry, client_context) == 1);
CHECK(remove_client_from_object_clients(entry, client) == 1);
}
/* Check if an object is present. */
int contains_object(Client *client_context, ObjectID object_id) {
PlasmaStoreState *plasma_state = client_context->plasma_state;
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
int PlasmaStore::contains_object(ObjectID object_id) {
auto entry = get_object_table_entry(&store_info_, object_id);
return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND
: OBJECT_NOT_FOUND;
}
/* Seal an object that has been created in the hash table. */
void seal_object(Client *client_context,
ObjectID object_id,
unsigned char digest[]) {
void PlasmaStore::seal_object(ObjectID object_id, unsigned char digest[]) {
LOG_DEBUG("sealing object"); // TODO(pcm): add ObjectID here
PlasmaStoreState *plasma_state = client_context->plasma_state;
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
auto entry = get_object_table_entry(&store_info_, object_id);
CHECK(entry != NULL);
CHECK(entry->state == PLASMA_CREATED);
/* Set the state of object to SEALED. */
@@ -494,69 +373,72 @@ void seal_object(Client *client_context,
/* Set the object digest. */
entry->info.digest = std::string((char *) &digest[0], DIGEST_SIZE);
/* Inform all subscribers that a new object has been sealed. */
push_notification(plasma_state, &entry->info);
push_notification(&entry->info);
/* Update all get requests that involve this object. */
update_object_get_requests(plasma_state, object_id);
update_object_get_requests(object_id);
}
/* Delete an object that has been created in the hash table. This should only
* be called on objects that are returned by the eviction policy to evict. */
void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) {
LOG_DEBUG("deleting object");
ObjectTableEntry *entry =
get_object_table_entry(plasma_state->plasma_store_info, object_id);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been created
* but not sealed. */
CHECKM(entry != NULL, "To delete an object it must be in the object table.");
CHECKM(entry->state == PLASMA_SEALED,
"To delete an object it must have been sealed.");
CHECKM(entry->clients.size() == 0,
"To delete an object, there must be no clients currently using it.");
uint8_t *pointer = entry->pointer;
dlfree(pointer);
plasma_state->plasma_store_info->objects.erase(object_id);
delete entry;
/* Inform all subscribers that the object has been deleted. */
ObjectInfoT notification;
notification.object_id =
std::string((char *) &object_id.id[0], sizeof(object_id));
notification.is_deletion = true;
push_notification(plasma_state, &notification);
}
void remove_objects(PlasmaStoreState *plasma_state,
int64_t num_objects_to_evict,
ObjectID *objects_to_evict) {
if (num_objects_to_evict > 0) {
for (int i = 0; i < num_objects_to_evict; ++i) {
delete_object(plasma_state, objects_to_evict[i]);
}
/* Free the array of objects to evict. This array was originally allocated
* by the eviction policy. */
free(objects_to_evict);
void PlasmaStore::delete_objects(const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
LOG_DEBUG("deleting object");
auto entry = get_object_table_entry(&store_info_, object_id);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been
* created but not sealed. */
CHECKM(entry != NULL,
"To delete an object it must be in the object table.");
CHECKM(entry->state == PLASMA_SEALED,
"To delete an object it must have been sealed.");
CHECKM(entry->clients.size() == 0,
"To delete an object, there must be no clients currently using it.");
dlfree(entry->pointer);
store_info_.objects.erase(object_id);
/* Inform all subscribers that the object has been deleted. */
ObjectInfoT notification;
notification.object_id =
std::string((char *) &object_id.id[0], sizeof(object_id));
notification.is_deletion = true;
push_notification(&notification);
}
}
void push_notification(PlasmaStoreState *plasma_state,
ObjectInfoT *object_info) {
for (auto &element : plasma_state->pending_notifications) {
uint8_t *notification = create_object_info_buffer(object_info);
element.second.object_notifications.push_back(notification);
send_notifications(plasma_state->loop, element.first, plasma_state, 0);
/* The notification gets freed in send_notifications when the notification
* is sent over the socket. */
}
void PlasmaStore::connect_client(int listener_sock) {
int client_fd = accept_client(listener_sock);
/* This is freed in disconnect_client. */
Client *client = new Client(client_fd);
/* Add a callback to handle events on this socket.
* TODO(pcm): Check return value. */
loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) {
process_message(client);
});
LOG_DEBUG("New connection with fd %d", client_fd);
}
/* Send more notifications to a subscriber. */
void send_notifications(event_loop *loop,
int client_sock,
void *context,
int events) {
PlasmaStoreState *plasma_state = (PlasmaStoreState *) context;
auto it = plasma_state->pending_notifications.find(client_sock);
void PlasmaStore::disconnect_client(Client *client) {
loop_->remove_file_event(client->fd);
/* If this client was using any objects, remove it from the appropriate
* lists. */
for (const auto &entry : store_info_.objects) {
remove_client_from_object_clients(entry.second.get(), client);
}
/* Note, the store may still attempt to send a message to the disconnected
* client (for example, when an object ID that the client was waiting for
* is ready). In these cases, the attempt to send the message will fail, but
* the store should just ignore the failure. */
delete client;
}
/**
* Send notifications about sealed objects to the subscribers. This is called
* in seal_object. If the socket's send buffer is full, the notification will be
* buffered, and this will be called again when the send buffer has room.
*
* @param client The client to send the notification to.
* @return Void.
*/
void PlasmaStore::send_notifications(int client_fd) {
auto it = pending_notifications_.find(client_fd);
int num_processed = 0;
bool closed = false;
@@ -568,7 +450,7 @@ void send_notifications(event_loop *loop,
int64_t size = *((int64_t *) notification);
/* Attempt to send a notification about this object ID. */
int nbytes = send(client_sock, notification, sizeof(int64_t) + size, 0);
int nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
if (nbytes >= 0) {
CHECK(nbytes == sizeof(int64_t) + size);
} else if (nbytes == -1 &&
@@ -579,12 +461,15 @@ void send_notifications(event_loop *loop,
/* Add a callback to the event loop to send queued notifications whenever
* there is room in the socket's send buffer. Callbacks can be added
* more than once here and will be overwritten. The callback is removed
* at the end of the method. */
event_loop_add_file(plasma_state->loop, client_sock, EVENT_LOOP_WRITE,
send_notifications, plasma_state);
* at the end of the method.
* TODO(pcm): Introduce status codes and check in case the file descriptor
* is added twice.*/
loop_->add_file_event(
client_fd, kEventLoopWrite,
[this, client_fd](int events) { send_notifications(client_fd); });
break;
} else {
LOG_WARN("Failed to send notification to client on fd %d", client_sock);
LOG_WARN("Failed to send notification to client on fd %d", client_fd);
if (errno == EPIPE) {
closed = true;
break;
@@ -602,171 +487,138 @@ void send_notifications(event_loop *loop,
/* Stop sending notifications if the pipe was broken. */
if (closed) {
close(client_sock);
plasma_state->pending_notifications.erase(client_sock);
close(client_fd);
pending_notifications_.erase(client_fd);
}
/* If we have sent all notifications, remove the fd from the event loop. */
if (it->second.object_notifications.empty()) {
event_loop_remove_file(loop, client_sock);
loop_->remove_file_event(client_fd);
}
}
void PlasmaStore::push_notification(ObjectInfoT *object_info) {
for (auto &element : pending_notifications_) {
uint8_t *notification = create_object_info_buffer(object_info);
element.second.object_notifications.push_back(notification);
send_notifications(element.first);
/* The notification gets freed in send_notifications when the notification
* is sent over the socket. */
}
}
/* Subscribe to notifications about sealed objects. */
void subscribe_to_updates(Client *client_context, int conn) {
void PlasmaStore::subscribe_to_updates(Client *client) {
LOG_DEBUG("subscribing to updates");
PlasmaStoreState *plasma_state = client_context->plasma_state;
/* TODO(rkn): The store could block here if the client doesn't send a file
* descriptor. */
int fd = recv_fd(conn);
int fd = recv_fd(client->fd);
if (fd < 0) {
/* This may mean that the client died before sending the file descriptor. */
LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn);
LOG_WARN("Failed to receive file descriptor from client on fd %d.",
client->fd);
return;
}
/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
* never gets freed. */
NotificationQueue &queue = plasma_state->pending_notifications[fd];
NotificationQueue &queue = pending_notifications_[fd];
/* Push notifications to the new subscriber about existing objects. */
for (const auto &entry : plasma_state->plasma_store_info->objects) {
push_notification(plasma_state, &entry.second->info);
for (const auto &entry : store_info_.objects) {
push_notification(&entry.second->info);
}
send_notifications(plasma_state->loop, fd, plasma_state, 0);
send_notifications(fd);
}
void process_message(event_loop *loop,
int client_sock,
void *context,
int events) {
Client *client_context = (Client *) context;
PlasmaStoreState *state = client_context->plasma_state;
void PlasmaStore::process_message(Client *client) {
int64_t type;
read_buffer(client_sock, &type, state->input_buffer);
read_vector(client->fd, &type, input_buffer_);
uint8_t *input = (uint8_t *) utarray_front(state->input_buffer);
ObjectID object_ids[1];
int64_t num_objects;
PlasmaObject objects[1];
memset(&objects[0], 0, sizeof(objects));
uint8_t *input = input_buffer_.data();
ObjectID object_id;
PlasmaObject object;
/* TODO(pcm): Get rid of the following. */
memset(&object, 0, sizeof(object));
/* Process the different types of requests. */
switch (type) {
case MessageType_PlasmaCreateRequest: {
int64_t data_size;
int64_t metadata_size;
plasma_read_CreateRequest(input, &object_ids[0], &data_size,
&metadata_size);
int error_code = create_object(client_context, object_ids[0], data_size,
metadata_size, &objects[0]);
warn_if_sigpipe(
plasma_send_CreateReply(client_sock, state->builder, object_ids[0],
&objects[0], error_code),
client_sock);
plasma_read_CreateRequest(input, &object_id, &data_size, &metadata_size);
int error_code =
create_object(object_id, data_size, metadata_size, client, &object);
warn_if_sigpipe(plasma_send_CreateReply(client->fd, builder_, object_id,
&object, error_code),
client->fd);
if (error_code == PlasmaError_OK) {
warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd),
client_sock);
warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
}
} break;
case MessageType_PlasmaGetRequest: {
num_objects = plasma_read_GetRequest_num_objects(input);
ObjectID *object_ids_to_get =
(ObjectID *) malloc(num_objects * sizeof(ObjectID));
int num_objects = plasma_read_GetRequest_num_objects(input);
std::vector<ObjectID> object_ids_to_get(num_objects);
int64_t timeout_ms;
plasma_read_GetRequest(input, object_ids_to_get, &timeout_ms, num_objects);
/* TODO(pcm): The array object_ids_to_get could be reused in
* process_get_request. */
process_get_request(client_context, num_objects, object_ids_to_get,
timeout_ms);
free(object_ids_to_get);
plasma_read_GetRequest(input, object_ids_to_get.data(), &timeout_ms,
num_objects);
process_get_request(client, object_ids_to_get, timeout_ms);
} break;
case MessageType_PlasmaReleaseRequest:
plasma_read_ReleaseRequest(input, &object_ids[0]);
release_object(client_context, object_ids[0]);
plasma_read_ReleaseRequest(input, &object_id);
release_object(object_id, client);
break;
case MessageType_PlasmaContainsRequest:
plasma_read_ContainsRequest(input, &object_ids[0]);
if (contains_object(client_context, object_ids[0]) == OBJECT_FOUND) {
warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder,
object_ids[0], 1),
client_sock);
plasma_read_ContainsRequest(input, &object_id);
if (contains_object(object_id) == OBJECT_FOUND) {
warn_if_sigpipe(
plasma_send_ContainsReply(client->fd, builder_, object_id, 1),
client->fd);
} else {
warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder,
object_ids[0], 0),
client_sock);
warn_if_sigpipe(
plasma_send_ContainsReply(client->fd, builder_, object_id, 0),
client->fd);
}
break;
case MessageType_PlasmaSealRequest: {
unsigned char digest[DIGEST_SIZE];
plasma_read_SealRequest(input, &object_ids[0], &digest[0]);
seal_object(client_context, object_ids[0], &digest[0]);
plasma_read_SealRequest(input, &object_id, &digest[0]);
seal_object(object_id, &digest[0]);
} break;
case MessageType_PlasmaEvictRequest: {
/* This code path should only be used for testing. */
int64_t num_bytes;
plasma_read_EvictRequest(input, &num_bytes);
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
int64_t num_bytes_evicted = EvictionState_choose_objects_to_evict(
client_context->plasma_state->eviction_state,
client_context->plasma_state->plasma_store_info, num_bytes,
&num_objects_to_evict, &objects_to_evict);
remove_objects(client_context->plasma_state, num_objects_to_evict,
objects_to_evict);
std::vector<ObjectID> objects_to_evict;
int64_t num_bytes_evicted =
eviction_policy_.choose_objects_to_evict(num_bytes, objects_to_evict);
delete_objects(objects_to_evict);
warn_if_sigpipe(
plasma_send_EvictReply(client_sock, state->builder, num_bytes_evicted),
client_sock);
plasma_send_EvictReply(client->fd, builder_, num_bytes_evicted),
client->fd);
} break;
case MessageType_PlasmaSubscribeRequest:
subscribe_to_updates(client_context, client_sock);
subscribe_to_updates(client);
break;
case MessageType_PlasmaConnectRequest: {
warn_if_sigpipe(
plasma_send_ConnectReply(client_sock, state->builder,
state->plasma_store_info->memory_capacity),
client_sock);
} break;
case DISCONNECT_CLIENT: {
LOG_INFO("Disconnecting client on fd %d", client_sock);
event_loop_remove_file(loop, client_sock);
/* If this client was using any objects, remove it from the appropriate
* lists. */
PlasmaStoreState *plasma_state = client_context->plasma_state;
for (const auto &entry : plasma_state->plasma_store_info->objects) {
remove_client_from_object_clients(entry.second, client_context);
}
/* Note, the store may still attempt to send a message to the disconnected
* client (for example, when an object ID that the client was waiting for
* is ready). In these cases, the attempt to send the message will fail, but
* the store should just ignore the failure. */
warn_if_sigpipe(plasma_send_ConnectReply(client->fd, builder_,
store_info_.memory_capacity),
client->fd);
} break;
case DISCONNECT_CLIENT:
LOG_INFO("Disconnecting client on fd %d", client->fd);
disconnect_client(client);
break;
default:
/* This code should be unreachable. */
CHECK(0);
}
}
void new_client_connection(event_loop *loop,
int listener_sock,
void *context,
int events) {
PlasmaStoreState *plasma_state = (PlasmaStoreState *) context;
int new_socket = accept_client(listener_sock);
/* Create a new client object. This will also be used as the context to use
* for events on this client's socket. TODO(rkn): free this somewhere. */
Client *client_context = new Client(new_socket, plasma_state);
/* Add a callback to handle events on this socket. */
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
client_context);
LOG_DEBUG("new connection with fd %d", new_socket);
}
/* Report "success" to valgrind. */
void signal_handler(int signal) {
if (signal == SIGTERM) {
PlasmaStoreState_free(g_state);
exit(0);
}
}
@@ -776,14 +628,15 @@ void start_server(char *socket_name, int64_t system_memory) {
* to a client that has already died, the store could die. */
signal(SIGPIPE, SIG_IGN);
/* Create the event loop. */
event_loop *loop = event_loop_create();
PlasmaStoreState *state = new PlasmaStoreState(loop, system_memory);
EventLoop loop;
PlasmaStore store(&loop, system_memory);
int socket = bind_ipc_sock(socket_name, true);
CHECK(socket >= 0);
event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection,
state);
g_state = state;
event_loop_run(loop);
/* TODO(pcm): Check return value. */
loop.add_file_event(socket, kEventLoopRead, [&store, socket](int events) {
store.connect_client(socket);
});
loop.run();
}
int main(int argc, char *argv[]) {
+157 -88
View File
@@ -1,103 +1,172 @@
#ifndef PLASMA_STORE_H
#define PLASMA_STORE_H
#include "common.h"
#include "eviction_policy.h"
#include "plasma.h"
#include "plasma_events.h"
#include "plasma_protocol.h"
typedef struct Client Client;
class GetRequest;
typedef struct PlasmaStoreState PlasmaStoreState;
struct NotificationQueue {
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
std::deque<uint8_t *> object_notifications;
};
/**
* Create a new object. The client must do a call to release_object to tell the
* store when it is done with the object.
*
* @param client_context The context of the client making this request.
* @param object_id Object ID of the object to be created.
* @param data_size Size in bytes of the object to be created.
* @param metadata_size Size in bytes of the object metadata.
* @return One of the following error codes:
* - PlasmaError_OK, if the object was created successfully.
* - PlasmaError_ObjectExists, if an object with this ID is already
* present in the store. In this case, the client should not call
* plasma_release.
* - PlasmaError_OutOfMemory, if the store is out of memory and cannot
* create the object. In this case, the client should not call
* plasma_release.
*/
int create_object(Client *client_context,
ObjectID object_id,
int64_t data_size,
int64_t metadata_size,
PlasmaObject *result);
/** Contains all information that is associated with a Plasma store client. */
struct Client {
Client(int fd);
/**
* Get an object. This method assumes that we currently have or will eventually
* have this object sealed. If the object has not yet been sealed, the client
* that requested the object will be notified when it is sealed.
*
* For each call to get_object, the client must do a call to release_object to
* tell the store when it is done with the object.
*
* @param client_context The context of the client making this request.
* @param conn The client connection that requests the object.
* @param object_id Object ID of the object to be gotten.
* @return The status of the object (object_status in plasma.h).
*/
int get_object(Client *client_context,
int conn,
ObjectID object_id,
PlasmaObject *result);
/** The file descriptor used to communicate with the client. */
int fd;
};
/**
* Record the fact that a particular client is no longer using an object.
*
* @param client_context The context of the client making this request.
* @param object_id The object ID of the object that is being released.
* @param Void.
*/
void release_object(Client *client_context, ObjectID object_id);
class PlasmaStore {
public:
PlasmaStore(EventLoop *loop, int64_t system_memory);
/**
* Seal an object. The object is now immutable and can be accessed with get.
*
* @param client_context The context of the client making this request.
* @param object_id Object ID of the object to be sealed.
* @param digest The digest of the object. This is used to tell if two objects
* with the same object ID are the same.
* @return Void.
*/
void seal_object(Client *client_context,
ObjectID object_id,
unsigned char digest[]);
~PlasmaStore();
/**
* Check if the plasma store contains an object:
*
* @param client_context The context of the client making this request.
* @param object_id Object ID that will be checked.
* @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if not
*/
int contains_object(Client *client_context, ObjectID object_id);
/**
* Create a new object. The client must do a call to release_object to tell
* the store when it is done with the object.
*
* @param object_id Object ID of the object to be created.
* @param data_size Size in bytes of the object to be created.
* @param metadata_size Size in bytes of the object metadata.
* @return One of the following error codes:
* - PlasmaError_OK, if the object was created successfully.
* - PlasmaError_ObjectExists, if an object with this ID is already
* present in the store. In this case, the client should not call
* plasma_release.
* - PlasmaError_OutOfMemory, if the store is out of memory and cannot
* create the object. In this case, the client should not call
* plasma_release.
*/
int create_object(ObjectID object_id,
int64_t data_size,
int64_t metadata_size,
Client *client,
PlasmaObject *result);
/**
* Send notifications about sealed objects to the subscribers. This is called
* in seal_object. If the socket's send buffer is full, the notification will be
* buffered, and this will be called again when the send buffer has room.
*
* @param loop The Plasma store event loop.
* @param client_sock The socket of the client to send the notification to.
* @param plasma_state The plasma store global state.
* @param events This is needed for this function to have the signature of a
callback.
* @return Void.
*/
void send_notifications(event_loop *loop,
int client_sock,
void *plasma_state,
int events);
/**
* Delete objects that have been created in the hash table. This should only
* be called on objects that are returned by the eviction policy to evict.
*
* @param object_ids Object IDs of the objects to be deleted.
* @return Void.
*/
void delete_objects(const std::vector<ObjectID> &object_ids);
void remove_objects(PlasmaStoreState *plasma_state,
int64_t num_objects_to_evict,
ObjectID *objects_to_evict);
/**
* Process a get request from a client. This method assumes that we will
* eventually have these objects sealed. If one of the objects has not yet
* been sealed, the client that requested the object will be notified when it
* is sealed.
*
* For each object, the client must do a call to release_object to tell the
* store when it is done with the object.
*
* @param client The client making this request.
* @param object_ids Object IDs of the objects to be gotten.
* @param timeout_ms The timeout for the get request in milliseconds.
* @return Void.
*/
void process_get_request(Client *client,
const std::vector<ObjectID> &object_ids,
uint64_t timeout_ms);
/**
* Seal an object. The object is now immutable and can be accessed with get.
*
* @param object_id Object ID of the object to be sealed.
* @param digest The digest of the object. This is used to tell if two objects
* with the same object ID are the same.
* @return Void.
*/
void seal_object(ObjectID object_id, unsigned char digest[]);
/**
* Check if the plasma store contains an object:
*
* @param object_id Object ID that will be checked.
* @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if not
*/
int contains_object(ObjectID object_id);
/**
* Record the fact that a particular client is no longer using an object.
*
* @param object_id The object ID of the object that is being released.
* @param client The client making this request.
* @param Void.
*/
void release_object(ObjectID object_id, Client *client);
/**
* Subscribe a file descriptor to updates about new sealed objects.
*
* @param client The client making this request.
* @return Void.
*/
void subscribe_to_updates(Client *client);
/**
* Connect a new client to the PlasmaStore.
*
* @param listener_sock The socket that is listening to incoming connections.
* @return Void.
*/
void connect_client(int listener_sock);
/**
* Disconnect a client from the PlasmaStore.
*
* @param client The client that is disconnected.
* @return Void.
*/
void disconnect_client(Client *client);
void send_notifications(int client_fd);
void process_message(Client *client);
private:
void push_notification(ObjectInfoT *object_notification);
void add_client_to_object_clients(ObjectTableEntry *entry, Client *client);
void return_from_get(GetRequest *get_req);
void update_object_get_requests(ObjectID object_id);
int remove_client_from_object_clients(ObjectTableEntry *entry,
Client *client);
/* Event loop of the plasma store. */
EventLoop *loop_;
/** The plasma store information, including the object tables, that is exposed
* to the eviction policy. */
PlasmaStoreInfo store_info_;
/** The state that is managed by the eviction policy. */
EvictionPolicy eviction_policy_;
/** Input buffer. This is allocated only once to avoid mallocs for every
* call to process_message. */
std::vector<uint8_t> input_buffer_;
/** Buffer that holds memory for serializing plasma protocol messages. */
protocol_builder *builder_;
/** A hash table mapping object IDs to a vector of the get requests that are
* waiting for the object to arrive. */
std::unordered_map<ObjectID, std::vector<GetRequest *>, UniqueIDHasher>
object_get_requests_;
/** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file
* descriptor to an array of object_ids to send to that client.
* TODO(pcm): Consider putting this into the Client data structure and
* reorganize the code slightly. */
std::unordered_map<int, NotificationQueue> pending_notifications_;
};
#endif /* PLASMA_STORE_H */