From 08e988aee504028c0c685d3a45bcce65bc1c8af3 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 15 May 2017 01:19:44 -0700 Subject: [PATCH] Modernize plasma store (C to C++ changes). (#546) --- python/ray/plasma/plasma.py | 1 + src/common/common.h | 3 - src/common/io.cc | 30 ++ src/common/io.h | 18 + src/plasma/CMakeLists.txt | 2 + src/plasma/eviction_policy.cc | 176 +++------- src/plasma/eviction_policy.h | 234 ++++++------- src/plasma/plasma.cc | 2 +- src/plasma/plasma.h | 15 +- src/plasma/plasma_events.cc | 74 ++++ src/plasma/plasma_events.h | 96 +++++ src/plasma/plasma_manager.h | 1 + src/plasma/plasma_store.cc | 635 +++++++++++++--------------------- src/plasma/plasma_store.h | 245 ++++++++----- 14 files changed, 794 insertions(+), 738 deletions(-) create mode 100644 src/plasma/plasma_events.cc create mode 100644 src/plasma/plasma_events.h diff --git a/python/ray/plasma/plasma.py b/python/ray/plasma/plasma.py index 847f6ce2c..cdee371b8 100644 --- a/python/ray/plasma/plasma.py +++ b/python/ray/plasma/plasma.py @@ -341,6 +341,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", + "--leak-check-heuristics=stdstring", "--error-exitcode=1"] + command, stdout=stdout_file, stderr=stderr_file) time.sleep(1.0) diff --git a/src/common/common.h b/src/common/common.h index ca68692fa..9560123d7 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -14,7 +14,6 @@ #include #endif -#include "utarray.h" #ifdef __cplusplus #include extern "C" { @@ -145,8 +144,6 @@ extern "C" { typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } UniqueID; -extern const UT_icd object_id_icd; - extern const UniqueID NIL_ID; /* Generate a globally unique ID. */ diff --git a/src/common/io.cc b/src/common/io.cc index 25944ee26..b75fdcf37 100644 --- a/src/common/io.cc +++ b/src/common/io.cc @@ -384,6 +384,36 @@ disconnected: return 0; } +int64_t read_vector(int fd, int64_t *type, std::vector &buffer) { + int64_t version; + int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version)); + if (closed) { + goto disconnected; + } + CHECK(version == RAY_PROTOCOL_VERSION); + int64_t length; + closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); + if (closed) { + goto disconnected; + } + closed = read_bytes(fd, (uint8_t *) &length, sizeof(length)); + if (closed) { + goto disconnected; + } + if (length > buffer.size()) { + buffer.resize(length); + } + closed = read_bytes(fd, buffer.data(), length); + if (closed) { + goto disconnected; + } + return length; +disconnected: + /* Handle the case in which the socket is closed. */ + *type = DISCONNECT_CLIENT; + return 0; +} + void write_log_message(int fd, const char *message) { /* Account for the \0 at the end of the string. */ write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message); diff --git a/src/common/io.h b/src/common/io.h index cf9f16aa0..a6a77e3c5 100644 --- a/src/common/io.h +++ b/src/common/io.h @@ -4,6 +4,7 @@ #include #include +#include #include "utarray.h" #define RAY_PROTOCOL_VERSION 0x0000000000000000 @@ -187,6 +188,23 @@ uint8_t *read_message_async(event_loop *loop, int sock); */ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer); +/** + * Read a sequence of bytes written by write_message from a file descriptor. + * This does not allocate space for the message if the provided buffer is + * large enough and can therefore often avoid allocations. + * + * @param fd The file descriptor to read from. It can be non-blocking. + * @param type The type of the message that is read will be written at this + * address. If there was an error while reading, this will be + * DISCONNECT_CLIENT. + * @param buffer The array the message will be written to. If it is not + * large enough to hold the message, it will be enlarged by read_vector. + * @return Number of bytes of the message that were read. This size does not + * include the bytes used to encode the type and length. If there was + * an error while reading, this will be 0. + */ +int64_t read_vector(int fd, int64_t *type, std::vector &buffer); + /** * Write a null-terminated string to a file descriptor. */ diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index a35af93f6..706345407 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -70,6 +70,7 @@ set_source_files_properties(thirdparty/dlmalloc.c PROPERTIES COMPILE_FLAGS -Wno- add_executable(plasma_store plasma_store.cc plasma.cc + plasma_events.cc plasma_protocol.cc eviction_policy.cc fling.c @@ -82,6 +83,7 @@ target_link_libraries(plasma_store common ${FLATBUFFERS_STATIC_LIB}) add_library(plasma_lib STATIC plasma_client.cc plasma.cc + plasma_events.cc plasma_protocol.cc fling.c thirdparty/xxhash.c) diff --git a/src/plasma/eviction_policy.cc b/src/plasma/eviction_policy.cc index f1c1acb0b..cd1c3d27f 100644 --- a/src/plasma/eviction_policy.cc +++ b/src/plasma/eviction_policy.cc @@ -1,156 +1,94 @@ #include "eviction_policy.h" -#include -#include - -class LRUCache { - private: - /** A doubly-linked list containing the items in the cache and - * their sizes in LRU order. */ - typedef std::list> ItemList; - ItemList item_list_; - /** A hash table mapping the object ID of an object in the cache to its - * location in the doubly linked list item_list_. */ - std::unordered_map item_map_; - - public: - LRUCache(){}; - - void add(const ObjectID &key, int64_t size) { - auto it = item_map_.find(key); - CHECK(it == item_map_.end()); - item_list_.push_front(std::make_pair(key, size)); - item_map_.insert(std::make_pair(key, item_list_.begin())); - } - - void remove(const ObjectID &key) { - auto it = item_map_.find(key); - CHECK(it != item_map_.end()); - item_list_.erase(it->second); - item_map_.erase(it); - } - - int64_t choose_objects_to_evict(int64_t num_bytes_required, - std::vector &objects_to_evict) { - int64_t bytes_evicted = 0; - auto it = item_list_.end(); - while (bytes_evicted < num_bytes_required && it != item_list_.begin()) { - it--; - objects_to_evict.push_back(it->first); - bytes_evicted += it->second; - } - return bytes_evicted; - } -}; - -/** The part of the Plasma state that is maintained by the eviction policy. */ -struct EvictionState { - /** The amount of memory (in bytes) currently being used. */ - int64_t memory_used; - /** Datastructure for the LRU cache. */ - LRUCache cache; -}; - -EvictionState *EvictionState_init() { - EvictionState *s = new EvictionState(); - s->memory_used = 0; - return s; +void LRUCache::add(const ObjectID &key, int64_t size) { + auto it = item_map_.find(key); + CHECK(it == item_map_.end()); + /* Note that it is important to use a list so the iterators stay valid. */ + item_list_.emplace_front(key, size); + item_map_.emplace(key, item_list_.begin()); } -void EvictionState_free(EvictionState *s) { - delete s; +void LRUCache::remove(const ObjectID &key) { + auto it = item_map_.find(key); + CHECK(it != item_map_.end()); + item_list_.erase(it->second); + item_map_.erase(it); } -int64_t EvictionState_choose_objects_to_evict( - EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, +int64_t LRUCache::choose_objects_to_evict( int64_t num_bytes_required, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict) { - std::vector objs_to_evict; - int64_t bytes_evicted = eviction_state->cache.choose_objects_to_evict( - num_bytes_required, objs_to_evict); - /* Update the LRU cache. */ - for (auto &object_id : objs_to_evict) { - eviction_state->cache.remove(object_id); + std::vector &objects_to_evict) { + int64_t bytes_evicted = 0; + auto it = item_list_.end(); + while (bytes_evicted < num_bytes_required && it != item_list_.begin()) { + it--; + objects_to_evict.push_back(it->first); + bytes_evicted += it->second; } - /* Construct the return values. */ - *num_objects_to_evict = objs_to_evict.size(); - if (objs_to_evict.size() == 0) { - *objects_to_evict = NULL; - } else { - int64_t result_size = objs_to_evict.size() * sizeof(ObjectID); - *objects_to_evict = (ObjectID *) malloc(result_size); - memcpy(*objects_to_evict, objs_to_evict.data(), result_size); - } - /* Update the number of bytes used. */ - eviction_state->memory_used -= bytes_evicted; return bytes_evicted; } -void EvictionState_object_created(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID object_id) { - ObjectTableEntry *entry = plasma_store_info->objects[object_id]; - eviction_state->cache.add(object_id, - entry->info.data_size + entry->info.metadata_size); +EvictionPolicy::EvictionPolicy(PlasmaStoreInfo *store_info) + : memory_used_(0), store_info_(store_info) {} + +int64_t EvictionPolicy::choose_objects_to_evict( + int64_t num_bytes_required, + std::vector &objects_to_evict) { + int64_t bytes_evicted = + cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict); + /* Update the LRU cache. */ + for (auto &object_id : objects_to_evict) { + cache_.remove(object_id); + } + /* Update the number of bytes used. */ + memory_used_ -= bytes_evicted; + return bytes_evicted; } -bool EvictionState_require_space(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - int64_t size, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict) { +void EvictionPolicy::object_created(ObjectID object_id) { + auto entry = store_info_->objects[object_id].get(); + cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); +} + +bool EvictionPolicy::require_space(int64_t size, + std::vector &objects_to_evict) { /* Check if there is enough space to create the object. */ - int64_t required_space = - eviction_state->memory_used + size - plasma_store_info->memory_capacity; + int64_t required_space = memory_used_ + size - store_info_->memory_capacity; int64_t num_bytes_evicted; if (required_space > 0) { /* Try to free up at least as much space as we need right now but ideally * up to 20% of the total capacity. */ - int64_t space_to_free = MAX(size, plasma_store_info->memory_capacity / 5); + int64_t space_to_free = MAX(size, store_info_->memory_capacity / 5); LOG_DEBUG("not enough space to create this object, so evicting objects"); /* Choose some objects to evict, and update the return pointers. */ - num_bytes_evicted = EvictionState_choose_objects_to_evict( - eviction_state, plasma_store_info, space_to_free, num_objects_to_evict, - objects_to_evict); + num_bytes_evicted = + choose_objects_to_evict(space_to_free, objects_to_evict); LOG_INFO( "There is not enough space to create this object, so evicting " - "%" PRId64 " objects to free up %" PRId64 " bytes.", - *num_objects_to_evict, num_bytes_evicted); + "%zu objects to free up %" PRId64 " bytes.", + objects_to_evict.size(), num_bytes_evicted); } else { num_bytes_evicted = 0; - *num_objects_to_evict = 0; - *objects_to_evict = NULL; } if (num_bytes_evicted >= required_space) { /* We only increment the space used if there is enough space to create the * object. */ - eviction_state->memory_used += size; + memory_used_ += size; } return num_bytes_evicted >= required_space; } -void EvictionState_begin_object_access(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID object_id, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict) { +void EvictionPolicy::begin_object_access( + ObjectID object_id, + std::vector &objects_to_evict) { /* If the object is in the LRU cache, remove it. */ - eviction_state->cache.remove(object_id); - *num_objects_to_evict = 0; - *objects_to_evict = NULL; + cache_.remove(object_id); } -void EvictionState_end_object_access(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID object_id, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict) { - ObjectTableEntry *entry = plasma_store_info->objects[object_id]; +void EvictionPolicy::end_object_access( + ObjectID object_id, + std::vector &objects_to_evict) { + auto entry = store_info_->objects[object_id].get(); /* Add the object to the LRU cache.*/ - eviction_state->cache.add(object_id, - entry->info.data_size + entry->info.metadata_size); - *num_objects_to_evict = 0; - *objects_to_evict = NULL; + cache_.add(object_id, entry->info.data_size + entry->info.metadata_size); } diff --git a/src/plasma/eviction_policy.h b/src/plasma/eviction_policy.h index 63cfcbc06..4ff75f8b7 100644 --- a/src/plasma/eviction_policy.h +++ b/src/plasma/eviction_policy.h @@ -1,6 +1,9 @@ #ifndef EVICTION_POLICY_H #define EVICTION_POLICY_H +#include +#include + #include "plasma.h" /* ==== The eviction policy ==== @@ -10,140 +13,115 @@ * Plasma store. */ -/** Internal state of the eviction policy. */ -typedef struct EvictionState EvictionState; +class LRUCache { + private: + /** A doubly-linked list containing the items in the cache and + * their sizes in LRU order. */ + typedef std::list> ItemList; + ItemList item_list_; + /** A hash table mapping the object ID of an object in the cache to its + * location in the doubly linked list item_list_. */ + std::unordered_map item_map_; -/** - * Initialize the eviction policy state. - * - * @param system_memory The amount of memory that can be used by the Plasma - * store. - * @return The internal state of the eviction policy. - */ -EvictionState *EvictionState_init(void); + public: + LRUCache(){}; -/** - * Free the eviction policy state. - * - * @param state The state managed by the eviction policy. - * @return Void. - */ -void EvictionState_free(EvictionState *state); + void add(const ObjectID &key, int64_t size); -/** - * This method will be called whenever an object is first created in order to - * add it to the LRU cache. This is done so that the first time, the Plasma - * store calls begin_object_access, we can remove the object from the LRU cache. - * - * @param eviction_state The state managed by the eviction policy. - * @param plasma_store_info Information about the Plasma store that is exposed - * to the eviction policy. - * @param obj_id The object ID of the object that was created. - * @return Void. - */ -void EvictionState_object_created(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID obj_id); + void remove(const ObjectID &key); -/** - * This method will be called when the Plasma store needs more space, perhaps to - * create a new object. If the required amount of space cannot be freed up, then - * a fatal error will be thrown. When this method is called, the eviction policy - * will assume that the objects chosen to be evicted will in fact be evicted - * from the Plasma store by the caller. - * - * @param eviction_state The state managed by the eviction policy. - * @param plasma_store_info Information about the Plasma store that is exposed - * to the eviction policy. - * @param size The size in bytes of the new object, including both data and - * metadata. - * @param num_objects_to_evict The number of objects that are chosen will be - * stored at this address. - * @param objects_to_evict An array of the object IDs that were chosen will be - * stored at this address. If the number of objects chosen is greater - * than 0, then the caller needs to free that array. If it equals 0, then - * the array will be NULL. - * @return True if enough space can be freed and false otherwise. - */ -bool EvictionState_require_space(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - int64_t size, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict); + int64_t choose_objects_to_evict(int64_t num_bytes_required, + std::vector &objects_to_evict); +}; -/** - * This method will be called whenever an unused object in the Plasma store - * starts to be used. When this method is called, the eviction policy will - * assume that the objects chosen to be evicted will in fact be evicted from the - * Plasma store by the caller. - * - * @param eviction_state The state managed by the eviction policy. - * @param plasma_store_info Information about the Plasma store that is exposed - * to the eviction policy. - * @param obj_id The ID of the object that is now being used. - * @param num_objects_to_evict The number of objects that are chosen will be - * stored at this address. - * @param objects_to_evict An array of the object IDs that were chosen will be - * stored at this address. If the number of objects chosen is greater - * than 0, then the caller needs to free that array. If it equals 0, then - * the array will be NULL. - * @return Void. - */ -void EvictionState_begin_object_access(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID obj_id, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict); +/** The eviction policy. */ +class EvictionPolicy { + public: + /** + * Construct an eviction policy. + * + * @param store_info Information about the Plasma store that is exposed + * to the eviction policy. + */ + EvictionPolicy(PlasmaStoreInfo *store_info); -/** - * This method will be called whenever an object in the Plasma store that was - * being used is no longer being used. When this method is called, the eviction - * policy will assume that the objects chosen to be evicted will in fact be - * evicted from the Plasma store by the caller. - * - * @param eviction_state The state managed by the eviction policy. - * @param plasma_store_info Information about the Plasma store that is exposed - * to the eviction policy. - * @param obj_id The ID of the object that is no longer being used. - * @param num_objects_to_evict The number of objects that are chosen will be - * stored at this address. - * @param objects_to_evict An array of the object IDs that were chosen will be - * stored at this address. If the number of objects chosen is greater - * than 0, then the caller needs to free that array. If it equals 0, then - * the array will be NULL. - * @return Void. - */ -void EvictionState_end_object_access(EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - ObjectID obj_id, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict); + /** + * This method will be called whenever an object is first created in order to + * add it to the LRU cache. This is done so that the first time, the Plasma + * store calls begin_object_access, we can remove the object from the LRU + * cache. + * + * @param object_id The object ID of the object that was created. + * @return Void. + */ + void object_created(ObjectID object_id); -/** - * Choose some objects to evict from the Plasma store. When this method is - * called, the eviction policy will assume that the objects chosen to be evicted - * will in fact be evicted from the Plasma store by the caller. - * - * @note This method is not part of the API. It is exposed in the header file - * only for testing. - * - * @param eviction_state The state managed by the eviction policy. - * @param plasma_store_info Information about the Plasma store that is exposed - * to the eviction policy. - * @param num_bytes_required The number of bytes of space to try to free up. - * @param num_objects_to_evict The number of objects that are chosen will be - * stored at this address. - * @param objects_to_evict An array of the object IDs that were chosen will be - * stored at this address. If the number of objects chosen is greater - * than 0, then the caller needs to free that array. If it equals 0, then - * the array will be NULL. - * @return The total number of bytes of space chosen to be evicted. - */ -int64_t EvictionState_choose_objects_to_evict( - EvictionState *eviction_state, - PlasmaStoreInfo *plasma_store_info, - int64_t num_bytes_required, - int64_t *num_objects_to_evict, - ObjectID **objects_to_evict); + /** + * This method will be called when the Plasma store needs more space, perhaps + * to create a new object. If the required amount of space cannot be freed up, + * then a fatal error will be thrown. When this method is called, the eviction + * policy will assume that the objects chosen to be evicted will in fact be + * evicted from the Plasma store by the caller. + * + * @param size The size in bytes of the new object, including both data and + * metadata. + * @param objects_to_evict The object IDs that were chosen for eviction will + * be stored into this vector. + * @return True if enough space can be freed and false otherwise. + */ + bool require_space(int64_t size, std::vector &objects_to_evict); + + /** + * This method will be called whenever an unused object in the Plasma store + * starts to be used. When this method is called, the eviction policy will + * assume that the objects chosen to be evicted will in fact be evicted from + * the Plasma store by the caller. + * + * @param object_id The ID of the object that is now being used. + * @param objects_to_evict The object IDs that were chosen for eviction will + * be stored into this vector. + * @return Void. + */ + void begin_object_access(ObjectID object_id, + std::vector &objects_to_evict); + + /** + * This method will be called whenever an object in the Plasma store that was + * being used is no longer being used. When this method is called, the + * eviction policy will assume that the objects chosen to be evicted will in + * fact be evicted from the Plasma store by the caller. + * + * @param object_id The ID of the object that is no longer being used. + * @param objects_to_evict The object IDs that were chosen for eviction will + * be stored into this vector. + * @return Void. + */ + void end_object_access(ObjectID object_id, + std::vector &objects_to_evict); + + /** + * Choose some objects to evict from the Plasma store. When this method is + * called, the eviction policy will assume that the objects chosen to be + * evicted will in fact be evicted from the Plasma store by the caller. + * + * @note This method is not part of the API. It is exposed in the header file + * only for testing. + * + * @param num_bytes_required The number of bytes of space to try to free up. + * @param objects_to_evict The object IDs that were chosen for eviction will + * be stored into this vector. + * @return The total number of bytes of space chosen to be evicted. + */ + int64_t choose_objects_to_evict(int64_t num_bytes_required, + std::vector &objects_to_evict); + + private: + /** Pointer to the plasma store info. */ + PlasmaStoreInfo *store_info_; + /** The amount of memory (in bytes) currently being used. */ + int64_t memory_used_; + /** Datastructure for the LRU cache. */ + LRUCache cache_; +}; #endif /* EVICTION_POLICY_H */ diff --git a/src/plasma/plasma.cc b/src/plasma/plasma.cc index 2b3243694..a420ab423 100644 --- a/src/plasma/plasma.cc +++ b/src/plasma/plasma.cc @@ -47,5 +47,5 @@ ObjectTableEntry *get_object_table_entry(PlasmaStoreInfo *store_info, if (it == store_info->objects.end()) { return NULL; } - return it->second; + return it->second.get(); } diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index e404436e2..4247fd5b4 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -10,15 +10,13 @@ #include /* pid_t */ #include +#include #include "common.h" #include "format/common_generated.h" #include -#include "utarray.h" -#include "uthash.h" - /** Allocation granularity used in plasma for object allocation. */ #define BLOCK_SIZE 64 @@ -108,12 +106,10 @@ struct ObjectTableEntry { int64_t map_size; /** Offset from the base of the mmap. */ ptrdiff_t offset; - /** Handle for the uthash table. */ - UT_hash_handle handle; /** Pointer to the object data. Needed to free the object. */ uint8_t *pointer; - /** An array of the clients that are currently using this object. */ - std::vector clients; + /** Set of clients currently using this object. */ + std::unordered_set clients; /** The state of the object, e.g., whether it is open or sealed. */ object_state state; /** The digest of the object. Used to see if two objects are the same. */ @@ -123,7 +119,10 @@ struct ObjectTableEntry { /** The plasma store information that is exposed to the eviction policy. */ struct PlasmaStoreInfo { /** Objects that are in the Plasma store. */ - std::unordered_map objects; + std::unordered_map, + UniqueIDHasher> + objects; /** The amount of memory (in bytes) that we allow to be allocated in the * store. */ int64_t memory_capacity; diff --git a/src/plasma/plasma_events.cc b/src/plasma/plasma_events.cc new file mode 100644 index 000000000..0ce3c5de3 --- /dev/null +++ b/src/plasma/plasma_events.cc @@ -0,0 +1,74 @@ +#include "plasma_events.h" + +#include + +void EventLoop::file_event_callback(aeEventLoop *loop, + int fd, + void *context, + int events) { + FileCallback *callback = reinterpret_cast(context); + (*callback)(events); +} + +int EventLoop::timer_event_callback(aeEventLoop *loop, + long long timer_id, + void *context) { + TimerCallback *callback = reinterpret_cast(context); + return (*callback)(timer_id); +} + +constexpr int kInitialEventLoopSize = 1024; + +EventLoop::EventLoop() { + loop_ = aeCreateEventLoop(kInitialEventLoopSize); +} + +bool EventLoop::add_file_event(int fd, int events, FileCallback callback) { + if (file_callbacks_.find(fd) != file_callbacks_.end()) { + return false; + } + auto data = std::unique_ptr(new FileCallback(callback)); + void *context = reinterpret_cast(data.get()); + /* Try to add the file descriptor. */ + int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, + context); + /* If it cannot be added, increase the size of the event loop. */ + if (err == AE_ERR && errno == ERANGE) { + err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2); + if (err != AE_OK) { + return false; + } + err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, + context); + } + /* In any case, test if there were errors. */ + if (err == AE_OK) { + file_callbacks_.emplace(fd, std::move(data)); + return true; + } + return false; +} + +void EventLoop::remove_file_event(int fd) { + aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE); + file_callbacks_.erase(fd); +} + +void EventLoop::run() { + aeMain(loop_); +} + +int64_t EventLoop::add_timer(int64_t timeout, TimerCallback callback) { + auto data = std::unique_ptr(new TimerCallback(callback)); + void *context = reinterpret_cast(data.get()); + int64_t timer_id = aeCreateTimeEvent( + loop_, timeout, EventLoop::timer_event_callback, context, NULL); + timer_callbacks_.emplace(timer_id, std::move(data)); + return timer_id; +} + +int EventLoop::remove_timer(int64_t timer_id) { + int err = aeDeleteTimeEvent(loop_, timer_id); + timer_callbacks_.erase(timer_id); + return err; +} diff --git a/src/plasma/plasma_events.h b/src/plasma/plasma_events.h new file mode 100644 index 000000000..099a6e0ef --- /dev/null +++ b/src/plasma/plasma_events.h @@ -0,0 +1,96 @@ +#ifndef PLASMA_EVENTS +#define PLASMA_EVENTS + +#include +#include +#include + +extern "C" { +#include "ae/ae.h" +} + +/** Constant specifying that the timer is done and it will be removed. */ +constexpr int kEventLoopTimerDone = AE_NOMORE; + +/** Read event on the file descriptor. */ +constexpr int kEventLoopRead = AE_READABLE; + +/** Write event on the file descriptor. */ +constexpr int kEventLoopWrite = AE_WRITABLE; + +class EventLoop { + public: + /* Signature of the handler that will be called when there is a new event + * on the file descriptor that this handler has been registered for. + * + * The arguments are the event flags (read or write). + */ + typedef std::function FileCallback; + + /* This handler will be called when a timer times out. The timer id is + * passed as an argument. The return is the number of milliseconds the timer + * shall be reset to or kEventLoopTimerDone if the timer shall not be + * triggered again. + */ + typedef std::function TimerCallback; + + EventLoop(); + + /** + * Add a new file event handler to the event loop. + * + * @param fd The file descriptor we are listening to. + * @param events The flags for events we are listening to (read or write). + * @param callback The callback that will be called when the event happens. + * @return Returns true if the event handler was added successfully. + */ + bool add_file_event(int fd, int events, FileCallback callback); + + /** + * Remove a file event handler from the event loop. + * + * @param fd The file descriptor of the event handler. + * @return Void. + */ + void remove_file_event(int fd); + + /** Register a handler that will be called after a time slice of + * "timeout" milliseconds. + * + * @param timeout The timeout in milliseconds. + * @param callback The callback for the timeout. + * @return The ID of the newly created timer. + */ + int64_t add_timer(int64_t timeout, TimerCallback callback); + + /** + * Remove a timer handler from the event loop. + * + * @param timer_id The ID of the timer that is to be removed. + * @return The ae.c error code. TODO(pcm): needs to be standardized + */ + int remove_timer(int64_t timer_id); + + /** + * Run the event loop. + * + * @return Void. + */ + void run(); + + private: + static void file_event_callback(aeEventLoop *loop, + int fd, + void *context, + int events); + + static int timer_event_callback(aeEventLoop *loop, + long long timer_id, + void *context); + + aeEventLoop *loop_; + std::unordered_map> file_callbacks_; + std::unordered_map> timer_callbacks_; +}; + +#endif /* PLASMA_EVENTS */ diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 177a0babc..f28224570 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -2,6 +2,7 @@ #define PLASMA_MANAGER_H #include +#include "uthash.h" #include "utarray.h" #ifndef RAY_NUM_RETRIES diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index 431205d0e..62fa93583 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -30,15 +30,11 @@ #include #include -#include "common.h" -#include "event_loop.h" -#include "eviction_policy.h" +#include "plasma_store.h" + #include "format/common_generated.h" #include "io.h" #include "malloc.h" -#include "plasma_protocol.h" -#include "plasma_store.h" -#include "plasma.h" extern "C" { #include "fling.h" @@ -48,26 +44,10 @@ void dlfree(void *); size_t dlmalloc_set_footprint_limit(size_t bytes); } -/** Contains all information that is associated with a Plasma store client. */ -struct Client { - Client(int sock, PlasmaStoreState *plasma_state); - - /** The socket used to communicate with the client. */ - int sock; - /** A pointer to the global plasma state. */ - PlasmaStoreState *plasma_state; -}; - -struct NotificationQueue { - /** The object notifications for clients. We notify the client about the - * objects in the order that the objects were sealed or deleted. */ - std::deque object_notifications; -}; - struct GetRequest { - GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]); + GetRequest(Client *client, const std::vector &object_ids); - /** The client connection that called get. */ + /** The client that called get. */ Client *client; /** The ID of the timer that will time out and cause this wait to return to * the client if it hasn't already returned. */ @@ -84,72 +64,28 @@ struct GetRequest { int64_t num_satisfied; }; -struct PlasmaStoreState { - PlasmaStoreState(event_loop *loop, int64_t system_memory); - - /* Event loop of the plasma store. */ - event_loop *loop; - /** A hash table mapping object IDs to a vector of the get requests that are - * waiting for the object to arrive. */ - std::unordered_map, UniqueIDHasher> - object_get_requests; - - /** The pending notifications that have not been sent to subscribers because - * the socket send buffers were full. This is a hash table from client file - * descriptor to an array of object_ids to send to that client. - * TODO(pcm): Consider putting this into the Client data structure and - * reorganize the code slightly. */ - std::unordered_map pending_notifications; - /** The plasma store information, including the object tables, that is exposed - * to the eviction policy. */ - PlasmaStoreInfo *plasma_store_info; - /** The state that is managed by the eviction policy. */ - EvictionState *eviction_state; - /** Input buffer. This is allocated only once to avoid mallocs for every - * call to process_message. */ - UT_array *input_buffer; - /** Buffer that holds memory for serializing plasma protocol messages. */ - protocol_builder *builder; -}; - -PlasmaStoreState *g_state; - -UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; - -Client::Client(int sock, PlasmaStoreState *plasma_state) - : sock(sock), plasma_state(plasma_state) {} - -GetRequest::GetRequest(Client *client, - int num_object_ids, - ObjectID object_ids[]) +GetRequest::GetRequest(Client *client, const std::vector &object_ids) : client(client), timer(-1), - object_ids(object_ids, object_ids + num_object_ids), - objects(num_object_ids), + object_ids(object_ids.begin(), object_ids.end()), + objects(object_ids.size()), num_satisfied(0) { - std::unordered_set unique_ids( - object_ids, object_ids + num_object_ids); + std::unordered_set unique_ids(object_ids.begin(), + object_ids.end()); num_objects_to_wait_for = unique_ids.size(); } -PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory) - : loop(loop), - plasma_store_info(new PlasmaStoreInfo()), - eviction_state(EvictionState_init()), - builder(make_protocol_builder()) { - this->plasma_store_info->memory_capacity = system_memory; +Client::Client(int fd) : fd(fd) {} - utarray_new(this->input_buffer, &byte_icd); +PlasmaStore::PlasmaStore(EventLoop *loop, int64_t system_memory) + : loop_(loop), + eviction_policy_(&store_info_), + builder_(make_protocol_builder()) { + store_info_.memory_capacity = system_memory; } -void PlasmaStoreState_free(PlasmaStoreState *state) { - /* Here we only clean up objects that need to be cleaned - * up to make the valgrind warnings go away. Objects that - * are still reachable are not cleaned up. */ - for (const auto &it : state->plasma_store_info->objects) { - delete it.second; - } - for (const auto &element : state->pending_notifications) { +PlasmaStore::~PlasmaStore() { + for (const auto &element : pending_notifications_) { auto object_notifications = element.second.object_notifications; for (int i = 0; i < object_notifications.size(); ++i) { uint8_t *notification = (uint8_t *) object_notifications.at(i); @@ -157,47 +93,37 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { free(data); } } + free_protocol_builder(builder_); } -void push_notification(PlasmaStoreState *state, - ObjectInfoT *object_notification); - /* If this client is not already using the object, add the client to the * object's list of clients, otherwise do nothing. */ -void add_client_to_object_clients(ObjectTableEntry *entry, - Client *client_info) { +void PlasmaStore::add_client_to_object_clients(ObjectTableEntry *entry, + Client *client) { /* Check if this client is already using the object. */ - for (int i = 0; i < entry->clients.size(); ++i) { - if (entry->clients[i] == client_info) { - return; - } + if (entry->clients.find(client) != entry->clients.end()) { + return; } /* If there are no other clients using this object, notify the eviction policy * that the object is being used. */ if (entry->clients.size() == 0) { /* Tell the eviction policy that this object is being used. */ - int64_t num_objects_to_evict; - ObjectID *objects_to_evict; - EvictionState_begin_object_access( - client_info->plasma_state->eviction_state, - client_info->plasma_state->plasma_store_info, entry->object_id, - &num_objects_to_evict, &objects_to_evict); - remove_objects(client_info->plasma_state, num_objects_to_evict, - objects_to_evict); + std::vector objects_to_evict; + eviction_policy_.begin_object_access(entry->object_id, objects_to_evict); + delete_objects(objects_to_evict); } /* Add the client pointer to the list of clients using this object. */ - entry->clients.push_back(client_info); + entry->clients.insert(client); } /* Create a new object buffer in the hash table. */ -int create_object(Client *client_context, - ObjectID obj_id, - int64_t data_size, - int64_t metadata_size, - PlasmaObject *result) { +int PlasmaStore::create_object(ObjectID object_id, + int64_t data_size, + int64_t metadata_size, + Client *client, + PlasmaObject *result) { LOG_DEBUG("creating object"); /* TODO(pcm): add ObjectID here */ - PlasmaStoreState *plasma_state = client_context->plasma_state; - if (plasma_state->plasma_store_info->objects.count(obj_id) != 0) { + if (store_info_.objects.count(object_id) != 0) { /* There is already an object with the same ID in the Plasma Store, so * ignore this requst. */ return PlasmaError_ObjectExists; @@ -216,12 +142,10 @@ int create_object(Client *client_context, if (pointer == NULL) { /* Tell the eviction policy how much space we need to create this object. */ - int64_t num_objects_to_evict; - ObjectID *objects_to_evict; - bool success = EvictionState_require_space( - plasma_state->eviction_state, plasma_state->plasma_store_info, - data_size + metadata_size, &num_objects_to_evict, &objects_to_evict); - remove_objects(plasma_state, num_objects_to_evict, objects_to_evict); + std::vector objects_to_evict; + bool success = eviction_policy_.require_space(data_size + metadata_size, + objects_to_evict); + delete_objects(objects_to_evict); /* Return an error to the client if not enough space could be freed to * create the object. */ if (!success) { @@ -235,9 +159,10 @@ int create_object(Client *client_context, get_malloc_mapinfo(pointer, &fd, &map_size, &offset); assert(fd != -1); - ObjectTableEntry *entry = new ObjectTableEntry(); - entry->object_id = obj_id; - entry->info.object_id = std::string((char *) &obj_id.id[0], sizeof(obj_id)); + auto entry = std::unique_ptr(new ObjectTableEntry()); + entry->object_id = object_id; + entry->info.object_id = + std::string((char *) &object_id.id[0], sizeof(object_id)); entry->info.data_size = data_size; entry->info.metadata_size = metadata_size; entry->pointer = pointer; @@ -246,7 +171,8 @@ int create_object(Client *client_context, entry->map_size = map_size; entry->offset = offset; entry->state = PLASMA_CREATED; - plasma_state->plasma_store_info->objects[obj_id] = entry; + + store_info_.objects[object_id] = std::move(entry); result->handle.store_fd = fd; result->handle.mmap_size = map_size; result->data_offset = offset; @@ -256,39 +182,12 @@ int create_object(Client *client_context, /* Notify the eviction policy that this object was created. This must be done * immediately before the call to add_client_to_object_clients so that the * eviction policy does not have an opportunity to evict the object. */ - EvictionState_object_created(plasma_state->eviction_state, - plasma_state->plasma_store_info, obj_id); + eviction_policy_.object_created(object_id); /* Record that this client is using this object. */ - add_client_to_object_clients(entry, client_context); + add_client_to_object_clients(store_info_.objects[object_id].get(), client); return PlasmaError_OK; } -void add_get_request_for_object(PlasmaStoreState *store_state, - ObjectID object_id, - GetRequest *get_req) { - store_state->object_get_requests[object_id].push_back(get_req); -} - -void remove_get_request_for_object(PlasmaStoreState *store_state, - ObjectID object_id, - GetRequest *get_req) { - std::vector &get_requests = - store_state->object_get_requests[object_id]; - for (auto it = get_requests.begin(); it != get_requests.end(); ++it) { - if (*it == get_req) { - get_requests.erase(it); - break; - } - } -} - -void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) { - if (get_req->timer != -1) { - CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK); - } - delete get_req; -} - void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) { DCHECK(object != NULL); DCHECK(entry != NULL); @@ -301,12 +200,12 @@ void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) { object->metadata_size = entry->info.metadata_size; } -void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { +void PlasmaStore::return_from_get(GetRequest *get_req) { /* Send the get reply to the client. */ - int status = plasma_send_GetReply(get_req->client->sock, store_state->builder, + int status = plasma_send_GetReply(get_req->client->fd, builder_, &get_req->object_ids[0], get_req->objects, get_req->object_ids.size()); - warn_if_sigpipe(status, get_req->client->sock); + warn_if_sigpipe(status, get_req->client->fd); /* If we successfully sent the get reply message to the client, then also send * the file descriptors. */ if (status >= 0) { @@ -316,7 +215,7 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { /* We use the data size to indicate whether the object is present or not. */ if (object.data_size != -1) { - int error_code = send_fd(get_req->client->sock, object.handle.store_fd); + int error_code = send_fd(get_req->client->fd, object.handle.store_fd); /* If we failed to send the file descriptor, loop until we have sent it * successfully. TODO(rkn): This is problematic for two reasons. First * of all, sending the file descriptor should just succeed without any @@ -326,10 +225,10 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { while (error_code < 0) { if (errno == EMSGSIZE) { LOG_WARN("Failed to send file descriptor, retrying."); - error_code = send_fd(get_req->client->sock, object.handle.store_fd); + error_code = send_fd(get_req->client->fd, object.handle.store_fd); continue; } - warn_if_sigpipe(error_code, get_req->client->sock); + warn_if_sigpipe(error_code, get_req->client->fd); break; } } @@ -340,25 +239,30 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { * tables if it is present there. It should only be present there if the get * request timed out. */ for (ObjectID &object_id : get_req->object_ids) { - remove_get_request_for_object(store_state, object_id, get_req); + auto &get_requests = object_get_requests_[object_id]; + /* Erase get_req from the vector. */ + auto it = std::find(get_requests.begin(), get_requests.end(), get_req); + if (it != get_requests.end()) { + get_requests.erase(it); + } } /* Remove the get request. */ - remove_get_request(store_state, get_req); + if (get_req->timer != -1) { + CHECK(loop_->remove_timer(get_req->timer) == AE_OK); + } + delete get_req; } -void update_object_get_requests(PlasmaStoreState *store_state, - ObjectID obj_id) { - std::vector &get_requests = - store_state->object_get_requests[obj_id]; +void PlasmaStore::update_object_get_requests(ObjectID object_id) { + std::vector &get_requests = object_get_requests_[object_id]; int index = 0; int num_requests = get_requests.size(); for (int i = 0; i < num_requests; ++i) { GetRequest *get_req = get_requests[index]; - ObjectTableEntry *entry = - get_object_table_entry(store_state->plasma_store_info, obj_id); + auto entry = get_object_table_entry(&store_info_, object_id); CHECK(entry != NULL); - PlasmaObject_init(&get_req->objects[obj_id], entry); + PlasmaObject_init(&get_req->objects[object_id], entry); get_req->num_satisfied += 1; /* Record the fact that this client will be using this object and will * be responsible for releasing this object. */ @@ -366,7 +270,7 @@ void update_object_get_requests(PlasmaStoreState *store_state, /* If this get request is done, reply to the client. */ if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { - return_from_get(store_state, get_req); + return_from_get(get_req); } else { /* The call to return_from_get will remove the current element in the * array, so we only increment the counter in the else branch. */ @@ -377,46 +281,33 @@ void update_object_get_requests(PlasmaStoreState *store_state, DCHECK(index == get_requests.size()); /* Remove the array of get requests for this object, since no one should be * waiting for this object anymore. */ - store_state->object_get_requests.erase(obj_id); + object_get_requests_.erase(object_id); } -int get_timeout_handler(event_loop *loop, timer_id id, void *context) { - GetRequest *get_req = (GetRequest *) context; - return_from_get(get_req->client->plasma_state, get_req); - return EVENT_LOOP_TIMER_DONE; -} - -void process_get_request(Client *client_context, - int num_object_ids, - ObjectID object_ids[], - uint64_t timeout_ms) { - PlasmaStoreState *plasma_state = client_context->plasma_state; - +void PlasmaStore::process_get_request(Client *client, + const std::vector &object_ids, + uint64_t timeout_ms) { /* Create a get request for this object. */ - GetRequest *get_req = - new GetRequest(client_context, num_object_ids, object_ids); - - for (int i = 0; i < num_object_ids; ++i) { - ObjectID obj_id = object_ids[i]; + GetRequest *get_req = new GetRequest(client, object_ids); + for (auto object_id : object_ids) { /* Check if this object is already present locally. If so, record that the * object is being used and mark it as accounted for. */ - ObjectTableEntry *entry = - get_object_table_entry(plasma_state->plasma_store_info, obj_id); + auto entry = get_object_table_entry(&store_info_, object_id); if (entry && entry->state == PLASMA_SEALED) { /* Update the get request to take into account the present object. */ - PlasmaObject_init(&get_req->objects[obj_id], entry); + PlasmaObject_init(&get_req->objects[object_id], entry); get_req->num_satisfied += 1; /* If necessary, record that this client is using this object. In the case * where entry == NULL, this will be called from seal_object. */ - add_client_to_object_clients(entry, client_context); + add_client_to_object_clients(entry, client); } else { /* Add a placeholder plasma object to the get request to indicate that the * object is not present. This will be parsed by the client. We set the * data size to -1 to indicate that the object is not present. */ - get_req->objects[obj_id].data_size = -1; + get_req->objects[object_id].data_size = -1; /* Add the get request to the relevant data structures. */ - add_get_request_for_object(plasma_state, obj_id, get_req); + object_get_requests_[object_id].push_back(get_req); } } @@ -424,69 +315,57 @@ void process_get_request(Client *client_context, * the client. */ if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) { - return_from_get(plasma_state, get_req); + return_from_get(get_req); } else if (timeout_ms != -1) { /* Set a timer that will cause the get request to return to the client. Note * that a timeout of -1 is used to indicate that no timer should be set. */ - get_req->timer = event_loop_add_timer(plasma_state->loop, timeout_ms, - get_timeout_handler, get_req); + get_req->timer = + loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id) { + return_from_get(get_req); + return kEventLoopTimerDone; + }); } } -int remove_client_from_object_clients(ObjectTableEntry *entry, - Client *client_info) { - /* Find the location of the client in the array. */ - for (int i = 0; i < entry->clients.size(); ++i) { - if (entry->clients[i] == client_info) { - /* Remove the client from the array. */ - entry->clients.erase(entry->clients.begin() + i); - /* If no more clients are using this object, notify the eviction policy - * that the object is no longer being used. */ - if (entry->clients.size() == 0) { - /* Tell the eviction policy that this object is no longer being used. */ - int64_t num_objects_to_evict; - ObjectID *objects_to_evict; - EvictionState_end_object_access( - client_info->plasma_state->eviction_state, - client_info->plasma_state->plasma_store_info, entry->object_id, - &num_objects_to_evict, &objects_to_evict); - remove_objects(client_info->plasma_state, num_objects_to_evict, - objects_to_evict); - } - /* Return 1 to indicate that the client was removed. */ - return 1; +int PlasmaStore::remove_client_from_object_clients(ObjectTableEntry *entry, + Client *client) { + auto it = entry->clients.find(client); + if (it != entry->clients.end()) { + entry->clients.erase(it); + /* If no more clients are using this object, notify the eviction policy + * that the object is no longer being used. */ + if (entry->clients.size() == 0) { + /* Tell the eviction policy that this object is no longer being used. */ + std::vector objects_to_evict; + eviction_policy_.end_object_access(entry->object_id, objects_to_evict); + delete_objects(objects_to_evict); } + /* Return 1 to indicate that the client was removed. */ + return 1; + } else { + /* Return 0 to indicate that the client was not removed. */ + return 0; } - /* Return 0 to indicate that the client was not removed. */ - return 0; } -void release_object(Client *client_context, ObjectID object_id) { - PlasmaStoreState *plasma_state = client_context->plasma_state; - ObjectTableEntry *entry = - get_object_table_entry(plasma_state->plasma_store_info, object_id); +void PlasmaStore::release_object(ObjectID object_id, Client *client) { + auto entry = get_object_table_entry(&store_info_, object_id); CHECK(entry != NULL); /* Remove the client from the object's array of clients. */ - CHECK(remove_client_from_object_clients(entry, client_context) == 1); + CHECK(remove_client_from_object_clients(entry, client) == 1); } /* Check if an object is present. */ -int contains_object(Client *client_context, ObjectID object_id) { - PlasmaStoreState *plasma_state = client_context->plasma_state; - ObjectTableEntry *entry = - get_object_table_entry(plasma_state->plasma_store_info, object_id); +int PlasmaStore::contains_object(ObjectID object_id) { + auto entry = get_object_table_entry(&store_info_, object_id); return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; } /* Seal an object that has been created in the hash table. */ -void seal_object(Client *client_context, - ObjectID object_id, - unsigned char digest[]) { +void PlasmaStore::seal_object(ObjectID object_id, unsigned char digest[]) { LOG_DEBUG("sealing object"); // TODO(pcm): add ObjectID here - PlasmaStoreState *plasma_state = client_context->plasma_state; - ObjectTableEntry *entry = - get_object_table_entry(plasma_state->plasma_store_info, object_id); + auto entry = get_object_table_entry(&store_info_, object_id); CHECK(entry != NULL); CHECK(entry->state == PLASMA_CREATED); /* Set the state of object to SEALED. */ @@ -494,69 +373,72 @@ void seal_object(Client *client_context, /* Set the object digest. */ entry->info.digest = std::string((char *) &digest[0], DIGEST_SIZE); /* Inform all subscribers that a new object has been sealed. */ - push_notification(plasma_state, &entry->info); + push_notification(&entry->info); /* Update all get requests that involve this object. */ - update_object_get_requests(plasma_state, object_id); + update_object_get_requests(object_id); } -/* Delete an object that has been created in the hash table. This should only - * be called on objects that are returned by the eviction policy to evict. */ -void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) { - LOG_DEBUG("deleting object"); - ObjectTableEntry *entry = - get_object_table_entry(plasma_state->plasma_store_info, object_id); - /* TODO(rkn): This should probably not fail, but should instead throw an - * error. Maybe we should also support deleting objects that have been created - * but not sealed. */ - CHECKM(entry != NULL, "To delete an object it must be in the object table."); - CHECKM(entry->state == PLASMA_SEALED, - "To delete an object it must have been sealed."); - CHECKM(entry->clients.size() == 0, - "To delete an object, there must be no clients currently using it."); - uint8_t *pointer = entry->pointer; - dlfree(pointer); - plasma_state->plasma_store_info->objects.erase(object_id); - delete entry; - /* Inform all subscribers that the object has been deleted. */ - ObjectInfoT notification; - notification.object_id = - std::string((char *) &object_id.id[0], sizeof(object_id)); - notification.is_deletion = true; - push_notification(plasma_state, ¬ification); -} - -void remove_objects(PlasmaStoreState *plasma_state, - int64_t num_objects_to_evict, - ObjectID *objects_to_evict) { - if (num_objects_to_evict > 0) { - for (int i = 0; i < num_objects_to_evict; ++i) { - delete_object(plasma_state, objects_to_evict[i]); - } - /* Free the array of objects to evict. This array was originally allocated - * by the eviction policy. */ - free(objects_to_evict); +void PlasmaStore::delete_objects(const std::vector &object_ids) { + for (const auto &object_id : object_ids) { + LOG_DEBUG("deleting object"); + auto entry = get_object_table_entry(&store_info_, object_id); + /* TODO(rkn): This should probably not fail, but should instead throw an + * error. Maybe we should also support deleting objects that have been + * created but not sealed. */ + CHECKM(entry != NULL, + "To delete an object it must be in the object table."); + CHECKM(entry->state == PLASMA_SEALED, + "To delete an object it must have been sealed."); + CHECKM(entry->clients.size() == 0, + "To delete an object, there must be no clients currently using it."); + dlfree(entry->pointer); + store_info_.objects.erase(object_id); + /* Inform all subscribers that the object has been deleted. */ + ObjectInfoT notification; + notification.object_id = + std::string((char *) &object_id.id[0], sizeof(object_id)); + notification.is_deletion = true; + push_notification(¬ification); } } -void push_notification(PlasmaStoreState *plasma_state, - ObjectInfoT *object_info) { - for (auto &element : plasma_state->pending_notifications) { - uint8_t *notification = create_object_info_buffer(object_info); - element.second.object_notifications.push_back(notification); - send_notifications(plasma_state->loop, element.first, plasma_state, 0); - /* The notification gets freed in send_notifications when the notification - * is sent over the socket. */ - } +void PlasmaStore::connect_client(int listener_sock) { + int client_fd = accept_client(listener_sock); + /* This is freed in disconnect_client. */ + Client *client = new Client(client_fd); + /* Add a callback to handle events on this socket. + * TODO(pcm): Check return value. */ + loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) { + process_message(client); + }); + LOG_DEBUG("New connection with fd %d", client_fd); } -/* Send more notifications to a subscriber. */ -void send_notifications(event_loop *loop, - int client_sock, - void *context, - int events) { - PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - auto it = plasma_state->pending_notifications.find(client_sock); +void PlasmaStore::disconnect_client(Client *client) { + loop_->remove_file_event(client->fd); + /* If this client was using any objects, remove it from the appropriate + * lists. */ + for (const auto &entry : store_info_.objects) { + remove_client_from_object_clients(entry.second.get(), client); + } + /* Note, the store may still attempt to send a message to the disconnected + * client (for example, when an object ID that the client was waiting for + * is ready). In these cases, the attempt to send the message will fail, but + * the store should just ignore the failure. */ + delete client; +} + +/** + * Send notifications about sealed objects to the subscribers. This is called + * in seal_object. If the socket's send buffer is full, the notification will be + * buffered, and this will be called again when the send buffer has room. + * + * @param client The client to send the notification to. + * @return Void. + */ +void PlasmaStore::send_notifications(int client_fd) { + auto it = pending_notifications_.find(client_fd); int num_processed = 0; bool closed = false; @@ -568,7 +450,7 @@ void send_notifications(event_loop *loop, int64_t size = *((int64_t *) notification); /* Attempt to send a notification about this object ID. */ - int nbytes = send(client_sock, notification, sizeof(int64_t) + size, 0); + int nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0); if (nbytes >= 0) { CHECK(nbytes == sizeof(int64_t) + size); } else if (nbytes == -1 && @@ -579,12 +461,15 @@ void send_notifications(event_loop *loop, /* Add a callback to the event loop to send queued notifications whenever * there is room in the socket's send buffer. Callbacks can be added * more than once here and will be overwritten. The callback is removed - * at the end of the method. */ - event_loop_add_file(plasma_state->loop, client_sock, EVENT_LOOP_WRITE, - send_notifications, plasma_state); + * at the end of the method. + * TODO(pcm): Introduce status codes and check in case the file descriptor + * is added twice.*/ + loop_->add_file_event( + client_fd, kEventLoopWrite, + [this, client_fd](int events) { send_notifications(client_fd); }); break; } else { - LOG_WARN("Failed to send notification to client on fd %d", client_sock); + LOG_WARN("Failed to send notification to client on fd %d", client_fd); if (errno == EPIPE) { closed = true; break; @@ -602,171 +487,138 @@ void send_notifications(event_loop *loop, /* Stop sending notifications if the pipe was broken. */ if (closed) { - close(client_sock); - plasma_state->pending_notifications.erase(client_sock); + close(client_fd); + pending_notifications_.erase(client_fd); } /* If we have sent all notifications, remove the fd from the event loop. */ if (it->second.object_notifications.empty()) { - event_loop_remove_file(loop, client_sock); + loop_->remove_file_event(client_fd); + } +} + +void PlasmaStore::push_notification(ObjectInfoT *object_info) { + for (auto &element : pending_notifications_) { + uint8_t *notification = create_object_info_buffer(object_info); + element.second.object_notifications.push_back(notification); + send_notifications(element.first); + /* The notification gets freed in send_notifications when the notification + * is sent over the socket. */ } } /* Subscribe to notifications about sealed objects. */ -void subscribe_to_updates(Client *client_context, int conn) { +void PlasmaStore::subscribe_to_updates(Client *client) { LOG_DEBUG("subscribing to updates"); - PlasmaStoreState *plasma_state = client_context->plasma_state; /* TODO(rkn): The store could block here if the client doesn't send a file * descriptor. */ - int fd = recv_fd(conn); + int fd = recv_fd(client->fd); if (fd < 0) { /* This may mean that the client died before sending the file descriptor. */ - LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn); + LOG_WARN("Failed to receive file descriptor from client on fd %d.", + client->fd); return; } /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue * never gets freed. */ - NotificationQueue &queue = plasma_state->pending_notifications[fd]; + NotificationQueue &queue = pending_notifications_[fd]; /* Push notifications to the new subscriber about existing objects. */ - for (const auto &entry : plasma_state->plasma_store_info->objects) { - push_notification(plasma_state, &entry.second->info); + for (const auto &entry : store_info_.objects) { + push_notification(&entry.second->info); } - send_notifications(plasma_state->loop, fd, plasma_state, 0); + send_notifications(fd); } -void process_message(event_loop *loop, - int client_sock, - void *context, - int events) { - Client *client_context = (Client *) context; - PlasmaStoreState *state = client_context->plasma_state; +void PlasmaStore::process_message(Client *client) { int64_t type; - read_buffer(client_sock, &type, state->input_buffer); + read_vector(client->fd, &type, input_buffer_); - uint8_t *input = (uint8_t *) utarray_front(state->input_buffer); - ObjectID object_ids[1]; - int64_t num_objects; - PlasmaObject objects[1]; - memset(&objects[0], 0, sizeof(objects)); + uint8_t *input = input_buffer_.data(); + ObjectID object_id; + PlasmaObject object; + /* TODO(pcm): Get rid of the following. */ + memset(&object, 0, sizeof(object)); /* Process the different types of requests. */ switch (type) { case MessageType_PlasmaCreateRequest: { int64_t data_size; int64_t metadata_size; - plasma_read_CreateRequest(input, &object_ids[0], &data_size, - &metadata_size); - int error_code = create_object(client_context, object_ids[0], data_size, - metadata_size, &objects[0]); - warn_if_sigpipe( - plasma_send_CreateReply(client_sock, state->builder, object_ids[0], - &objects[0], error_code), - client_sock); + plasma_read_CreateRequest(input, &object_id, &data_size, &metadata_size); + int error_code = + create_object(object_id, data_size, metadata_size, client, &object); + warn_if_sigpipe(plasma_send_CreateReply(client->fd, builder_, object_id, + &object, error_code), + client->fd); if (error_code == PlasmaError_OK) { - warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), - client_sock); + warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd); } } break; case MessageType_PlasmaGetRequest: { - num_objects = plasma_read_GetRequest_num_objects(input); - ObjectID *object_ids_to_get = - (ObjectID *) malloc(num_objects * sizeof(ObjectID)); + int num_objects = plasma_read_GetRequest_num_objects(input); + std::vector object_ids_to_get(num_objects); int64_t timeout_ms; - plasma_read_GetRequest(input, object_ids_to_get, &timeout_ms, num_objects); - /* TODO(pcm): The array object_ids_to_get could be reused in - * process_get_request. */ - process_get_request(client_context, num_objects, object_ids_to_get, - timeout_ms); - free(object_ids_to_get); + plasma_read_GetRequest(input, object_ids_to_get.data(), &timeout_ms, + num_objects); + process_get_request(client, object_ids_to_get, timeout_ms); } break; case MessageType_PlasmaReleaseRequest: - plasma_read_ReleaseRequest(input, &object_ids[0]); - release_object(client_context, object_ids[0]); + plasma_read_ReleaseRequest(input, &object_id); + release_object(object_id, client); break; case MessageType_PlasmaContainsRequest: - plasma_read_ContainsRequest(input, &object_ids[0]); - if (contains_object(client_context, object_ids[0]) == OBJECT_FOUND) { - warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder, - object_ids[0], 1), - client_sock); + plasma_read_ContainsRequest(input, &object_id); + if (contains_object(object_id) == OBJECT_FOUND) { + warn_if_sigpipe( + plasma_send_ContainsReply(client->fd, builder_, object_id, 1), + client->fd); } else { - warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder, - object_ids[0], 0), - client_sock); + warn_if_sigpipe( + plasma_send_ContainsReply(client->fd, builder_, object_id, 0), + client->fd); } break; case MessageType_PlasmaSealRequest: { unsigned char digest[DIGEST_SIZE]; - plasma_read_SealRequest(input, &object_ids[0], &digest[0]); - seal_object(client_context, object_ids[0], &digest[0]); + plasma_read_SealRequest(input, &object_id, &digest[0]); + seal_object(object_id, &digest[0]); } break; case MessageType_PlasmaEvictRequest: { /* This code path should only be used for testing. */ int64_t num_bytes; plasma_read_EvictRequest(input, &num_bytes); - int64_t num_objects_to_evict; - ObjectID *objects_to_evict; - int64_t num_bytes_evicted = EvictionState_choose_objects_to_evict( - client_context->plasma_state->eviction_state, - client_context->plasma_state->plasma_store_info, num_bytes, - &num_objects_to_evict, &objects_to_evict); - remove_objects(client_context->plasma_state, num_objects_to_evict, - objects_to_evict); + std::vector objects_to_evict; + int64_t num_bytes_evicted = + eviction_policy_.choose_objects_to_evict(num_bytes, objects_to_evict); + delete_objects(objects_to_evict); warn_if_sigpipe( - plasma_send_EvictReply(client_sock, state->builder, num_bytes_evicted), - client_sock); + plasma_send_EvictReply(client->fd, builder_, num_bytes_evicted), + client->fd); } break; case MessageType_PlasmaSubscribeRequest: - subscribe_to_updates(client_context, client_sock); + subscribe_to_updates(client); break; case MessageType_PlasmaConnectRequest: { - warn_if_sigpipe( - plasma_send_ConnectReply(client_sock, state->builder, - state->plasma_store_info->memory_capacity), - client_sock); - } break; - case DISCONNECT_CLIENT: { - LOG_INFO("Disconnecting client on fd %d", client_sock); - event_loop_remove_file(loop, client_sock); - /* If this client was using any objects, remove it from the appropriate - * lists. */ - PlasmaStoreState *plasma_state = client_context->plasma_state; - for (const auto &entry : plasma_state->plasma_store_info->objects) { - remove_client_from_object_clients(entry.second, client_context); - } - /* Note, the store may still attempt to send a message to the disconnected - * client (for example, when an object ID that the client was waiting for - * is ready). In these cases, the attempt to send the message will fail, but - * the store should just ignore the failure. */ + warn_if_sigpipe(plasma_send_ConnectReply(client->fd, builder_, + store_info_.memory_capacity), + client->fd); } break; + case DISCONNECT_CLIENT: + LOG_INFO("Disconnecting client on fd %d", client->fd); + disconnect_client(client); + break; default: /* This code should be unreachable. */ CHECK(0); } } -void new_client_connection(event_loop *loop, - int listener_sock, - void *context, - int events) { - PlasmaStoreState *plasma_state = (PlasmaStoreState *) context; - int new_socket = accept_client(listener_sock); - /* Create a new client object. This will also be used as the context to use - * for events on this client's socket. TODO(rkn): free this somewhere. */ - Client *client_context = new Client(new_socket, plasma_state); - /* Add a callback to handle events on this socket. */ - event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, - client_context); - LOG_DEBUG("new connection with fd %d", new_socket); -} - /* Report "success" to valgrind. */ void signal_handler(int signal) { if (signal == SIGTERM) { - PlasmaStoreState_free(g_state); exit(0); } } @@ -776,14 +628,15 @@ void start_server(char *socket_name, int64_t system_memory) { * to a client that has already died, the store could die. */ signal(SIGPIPE, SIG_IGN); /* Create the event loop. */ - event_loop *loop = event_loop_create(); - PlasmaStoreState *state = new PlasmaStoreState(loop, system_memory); + EventLoop loop; + PlasmaStore store(&loop, system_memory); int socket = bind_ipc_sock(socket_name, true); CHECK(socket >= 0); - event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection, - state); - g_state = state; - event_loop_run(loop); + /* TODO(pcm): Check return value. */ + loop.add_file_event(socket, kEventLoopRead, [&store, socket](int events) { + store.connect_client(socket); + }); + loop.run(); } int main(int argc, char *argv[]) { diff --git a/src/plasma/plasma_store.h b/src/plasma/plasma_store.h index cd1717649..a74873121 100644 --- a/src/plasma/plasma_store.h +++ b/src/plasma/plasma_store.h @@ -1,103 +1,172 @@ #ifndef PLASMA_STORE_H #define PLASMA_STORE_H +#include "common.h" +#include "eviction_policy.h" #include "plasma.h" +#include "plasma_events.h" +#include "plasma_protocol.h" -typedef struct Client Client; +class GetRequest; -typedef struct PlasmaStoreState PlasmaStoreState; +struct NotificationQueue { + /** The object notifications for clients. We notify the client about the + * objects in the order that the objects were sealed or deleted. */ + std::deque object_notifications; +}; -/** - * Create a new object. The client must do a call to release_object to tell the - * store when it is done with the object. - * - * @param client_context The context of the client making this request. - * @param object_id Object ID of the object to be created. - * @param data_size Size in bytes of the object to be created. - * @param metadata_size Size in bytes of the object metadata. - * @return One of the following error codes: - * - PlasmaError_OK, if the object was created successfully. - * - PlasmaError_ObjectExists, if an object with this ID is already - * present in the store. In this case, the client should not call - * plasma_release. - * - PlasmaError_OutOfMemory, if the store is out of memory and cannot - * create the object. In this case, the client should not call - * plasma_release. - */ -int create_object(Client *client_context, - ObjectID object_id, - int64_t data_size, - int64_t metadata_size, - PlasmaObject *result); +/** Contains all information that is associated with a Plasma store client. */ +struct Client { + Client(int fd); -/** - * Get an object. This method assumes that we currently have or will eventually - * have this object sealed. If the object has not yet been sealed, the client - * that requested the object will be notified when it is sealed. - * - * For each call to get_object, the client must do a call to release_object to - * tell the store when it is done with the object. - * - * @param client_context The context of the client making this request. - * @param conn The client connection that requests the object. - * @param object_id Object ID of the object to be gotten. - * @return The status of the object (object_status in plasma.h). - */ -int get_object(Client *client_context, - int conn, - ObjectID object_id, - PlasmaObject *result); + /** The file descriptor used to communicate with the client. */ + int fd; +}; -/** - * Record the fact that a particular client is no longer using an object. - * - * @param client_context The context of the client making this request. - * @param object_id The object ID of the object that is being released. - * @param Void. - */ -void release_object(Client *client_context, ObjectID object_id); +class PlasmaStore { + public: + PlasmaStore(EventLoop *loop, int64_t system_memory); -/** - * Seal an object. The object is now immutable and can be accessed with get. - * - * @param client_context The context of the client making this request. - * @param object_id Object ID of the object to be sealed. - * @param digest The digest of the object. This is used to tell if two objects - * with the same object ID are the same. - * @return Void. - */ -void seal_object(Client *client_context, - ObjectID object_id, - unsigned char digest[]); + ~PlasmaStore(); -/** - * Check if the plasma store contains an object: - * - * @param client_context The context of the client making this request. - * @param object_id Object ID that will be checked. - * @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if not - */ -int contains_object(Client *client_context, ObjectID object_id); + /** + * Create a new object. The client must do a call to release_object to tell + * the store when it is done with the object. + * + * @param object_id Object ID of the object to be created. + * @param data_size Size in bytes of the object to be created. + * @param metadata_size Size in bytes of the object metadata. + * @return One of the following error codes: + * - PlasmaError_OK, if the object was created successfully. + * - PlasmaError_ObjectExists, if an object with this ID is already + * present in the store. In this case, the client should not call + * plasma_release. + * - PlasmaError_OutOfMemory, if the store is out of memory and cannot + * create the object. In this case, the client should not call + * plasma_release. + */ + int create_object(ObjectID object_id, + int64_t data_size, + int64_t metadata_size, + Client *client, + PlasmaObject *result); -/** - * Send notifications about sealed objects to the subscribers. This is called - * in seal_object. If the socket's send buffer is full, the notification will be - * buffered, and this will be called again when the send buffer has room. - * - * @param loop The Plasma store event loop. - * @param client_sock The socket of the client to send the notification to. - * @param plasma_state The plasma store global state. - * @param events This is needed for this function to have the signature of a - callback. - * @return Void. - */ -void send_notifications(event_loop *loop, - int client_sock, - void *plasma_state, - int events); + /** + * Delete objects that have been created in the hash table. This should only + * be called on objects that are returned by the eviction policy to evict. + * + * @param object_ids Object IDs of the objects to be deleted. + * @return Void. + */ + void delete_objects(const std::vector &object_ids); -void remove_objects(PlasmaStoreState *plasma_state, - int64_t num_objects_to_evict, - ObjectID *objects_to_evict); + /** + * Process a get request from a client. This method assumes that we will + * eventually have these objects sealed. If one of the objects has not yet + * been sealed, the client that requested the object will be notified when it + * is sealed. + * + * For each object, the client must do a call to release_object to tell the + * store when it is done with the object. + * + * @param client The client making this request. + * @param object_ids Object IDs of the objects to be gotten. + * @param timeout_ms The timeout for the get request in milliseconds. + * @return Void. + */ + void process_get_request(Client *client, + const std::vector &object_ids, + uint64_t timeout_ms); + + /** + * Seal an object. The object is now immutable and can be accessed with get. + * + * @param object_id Object ID of the object to be sealed. + * @param digest The digest of the object. This is used to tell if two objects + * with the same object ID are the same. + * @return Void. + */ + void seal_object(ObjectID object_id, unsigned char digest[]); + + /** + * Check if the plasma store contains an object: + * + * @param object_id Object ID that will be checked. + * @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if not + */ + int contains_object(ObjectID object_id); + + /** + * Record the fact that a particular client is no longer using an object. + * + * @param object_id The object ID of the object that is being released. + * @param client The client making this request. + * @param Void. + */ + void release_object(ObjectID object_id, Client *client); + + /** + * Subscribe a file descriptor to updates about new sealed objects. + * + * @param client The client making this request. + * @return Void. + */ + void subscribe_to_updates(Client *client); + + /** + * Connect a new client to the PlasmaStore. + * + * @param listener_sock The socket that is listening to incoming connections. + * @return Void. + */ + void connect_client(int listener_sock); + + /** + * Disconnect a client from the PlasmaStore. + * + * @param client The client that is disconnected. + * @return Void. + */ + void disconnect_client(Client *client); + + void send_notifications(int client_fd); + + void process_message(Client *client); + + private: + void push_notification(ObjectInfoT *object_notification); + + void add_client_to_object_clients(ObjectTableEntry *entry, Client *client); + + void return_from_get(GetRequest *get_req); + + void update_object_get_requests(ObjectID object_id); + + int remove_client_from_object_clients(ObjectTableEntry *entry, + Client *client); + + /* Event loop of the plasma store. */ + EventLoop *loop_; + /** The plasma store information, including the object tables, that is exposed + * to the eviction policy. */ + PlasmaStoreInfo store_info_; + /** The state that is managed by the eviction policy. */ + EvictionPolicy eviction_policy_; + /** Input buffer. This is allocated only once to avoid mallocs for every + * call to process_message. */ + std::vector input_buffer_; + /** Buffer that holds memory for serializing plasma protocol messages. */ + protocol_builder *builder_; + /** A hash table mapping object IDs to a vector of the get requests that are + * waiting for the object to arrive. */ + std::unordered_map, UniqueIDHasher> + object_get_requests_; + /** The pending notifications that have not been sent to subscribers because + * the socket send buffers were full. This is a hash table from client file + * descriptor to an array of object_ids to send to that client. + * TODO(pcm): Consider putting this into the Client data structure and + * reorganize the code slightly. */ + std::unordered_map pending_notifications_; +}; #endif /* PLASMA_STORE_H */