diff --git a/install-dependencies.sh b/install-dependencies.sh index 0fcf5f28c..7e814d7b3 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -31,13 +31,13 @@ if [[ $platform == "linux" ]]; then # These commands must be kept in sync with the installation instructions. sudo apt-get update sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip - sudo pip install funcsigs colorama redis + sudo pip install funcsigs colorama psutil redis sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. elif [[ $platform == "macosx" ]]; then # These commands must be kept in sync with the installation instructions. brew install git cmake automake autoconf libtool boost sudo easy_install pip - sudo pip install numpy funcsigs colorama redis --ignore-installed six + sudo pip install numpy funcsigs colorama psutil redis --ignore-installed six sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. fi diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index d10da3150..c265d476b 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -1,12 +1,13 @@ from __future__ import print_function +import psutil import os -import sys -import time +import random import signal +import sys import subprocess import string -import random +import time # Ray modules import config @@ -90,13 +91,16 @@ def start_objstore(node_ip_address, redis_address, cleanup=True): this process will be killed by serices.cleanup() when the Python process that imported services exits. """ + # Let the object store use a fraction of the system memory. + system_memory = psutil.virtual_memory().total + plasma_store_memory = int(system_memory * 0.75) plasma_store_filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store") if RUN_PLASMA_STORE_PROFILER: plasma_store_prefix = ["valgrind", "--tool=callgrind", plasma_store_filepath] else: plasma_store_prefix = [plasma_store_filepath] store_name = "/tmp/ray_plasma_store{}".format(random_name()) - p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name]) + p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name, "-m", str(plasma_store_memory)]) manager_name = "/tmp/ray_plasma_manager{}".format(random_name()) p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER) diff --git a/src/photon/test/test.py b/src/photon/test/test.py index c02ab17b3..8bcbb0807 100644 --- a/src/photon/test/test.py +++ b/src/photon/test/test.py @@ -13,6 +13,7 @@ import photon import plasma USE_VALGRIND = False +PLASMA_STORE_MEMORY = 1000000000 class TestPhotonClient(unittest.TestCase): @@ -23,7 +24,7 @@ class TestPhotonClient(unittest.TestCase): # Start Plasma. plasma_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../plasma/build/plasma_store") plasma_socket = "/tmp/plasma_store{}".format(random.randint(0, 10000)) - self.p2 = subprocess.Popen([plasma_executable, "-s", plasma_socket]) + self.p2 = subprocess.Popen([plasma_executable, "-s", plasma_socket, "-m", str(PLASMA_STORE_MEMORY)]) time.sleep(0.1) self.plasma_client = plasma.PlasmaClient(plasma_socket) scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/photon_scheduler") diff --git a/src/plasma/Makefile b/src/plasma/Makefile index 5a23523dc..3c68e1d03 100644 --- a/src/plasma/Makefile +++ b/src/plasma/Makefile @@ -16,8 +16,8 @@ clean: $(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common $(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a -$(BUILD)/plasma_store: plasma_store.c plasma.h fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common - $(CC) $(CFLAGS) plasma_store.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store +$(BUILD)/plasma_store: plasma_store.c plasma.h eviction_policy.c fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common + $(CC) $(CFLAGS) plasma_store.c eviction_policy.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store $(BUILD)/plasma_manager: plasma_manager.c plasma.h plasma_client.c fling.h fling.c common $(CC) $(CFLAGS) plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a -o $(BUILD)/plasma_manager diff --git a/src/plasma/eviction_policy.c b/src/plasma/eviction_policy.c new file mode 100644 index 000000000..562bbb372 --- /dev/null +++ b/src/plasma/eviction_policy.c @@ -0,0 +1,210 @@ +#include "eviction_policy.h" + +#include "utlist.h" + +/** An element representing a released object in a doubly-linked list. This is + * used to implement an LRU cache. */ +typedef struct released_object { + /** The object_id of the released object. */ + object_id object_id; + /** Needed for the doubly-linked list macros. */ + struct released_object *prev; + /** Needed for the doubly-linked list macros. */ + struct released_object *next; +} released_object; + +/** This type is used to define a hash table mapping the object ID of a released + * object to its location in the doubly-linked list of released objects. */ +typedef struct { + /** Object ID of this object. */ + object_id object_id; + /** A pointer to the corresponding entry for this object in the doubly-linked + * list of released objects. */ + released_object *released_object; + /** Handle for the uthash table. */ + UT_hash_handle handle; +} released_object_entry; + +/** The part of the Plasma state that is maintained by the eviction policy. */ +struct eviction_state { + /** The amount of memory (in bytes) that we allow to be allocated in the + * store. */ + int64_t memory_capacity; + /** The amount of memory (in bytes) currently being used. */ + int64_t memory_used; + /** A doubly-linked list of the released objects in order from least recently + * released to most recently released. */ + released_object *released_objects; + /** A hash table mapping the object ID of a released object to its location in + * the doubly linked list of released objects. */ + released_object_entry *released_object_table; +}; + +/* This is used to define the array of object IDs used to define the + * released_objects type. */ +UT_icd released_objects_entry_icd = {sizeof(object_id), NULL, NULL, NULL}; + +eviction_state *make_eviction_state(int64_t system_memory) { + eviction_state *state = malloc(sizeof(eviction_state)); + /* Find the amount of available memory on the machine. */ + state->memory_capacity = system_memory; + state->memory_used = 0; + state->released_objects = NULL; + state->released_object_table = NULL; + return state; +} + +void free_eviction_state(eviction_state *s) { + /* Delete each element in the doubly-linked list. */ + released_object *element, *temp; + DL_FOREACH_SAFE(s->released_objects, element, temp) { + DL_DELETE(s->released_objects, element); + free(element); + } + /* Delete each element in the hash table. */ + released_object_entry *current_entry, *temp_entry; + HASH_ITER(handle, s->released_object_table, current_entry, temp_entry) { + HASH_DELETE(handle, s->released_object_table, current_entry); + free(current_entry); + } + /* Free the eviction state. */ + free(s); +} + +void add_object_to_lru_cache(eviction_state *eviction_state, + object_id object_id) { + /* Add the object ID to the doubly-linked list. */ + released_object *linked_list_entry = malloc(sizeof(released_object)); + linked_list_entry->object_id = object_id; + DL_APPEND(eviction_state->released_objects, linked_list_entry); + /* Check that the object ID is not already in the hash table. */ + released_object_entry *hash_table_entry; + HASH_FIND(handle, eviction_state->released_object_table, &object_id, + sizeof(object_id), hash_table_entry); + CHECK(hash_table_entry == NULL); + /* Add the object ID to the hash table. */ + hash_table_entry = malloc(sizeof(released_object_entry)); + hash_table_entry->object_id = object_id; + hash_table_entry->released_object = linked_list_entry; + HASH_ADD(handle, eviction_state->released_object_table, object_id, + sizeof(object_id), hash_table_entry); +} + +void remove_object_from_lru_cache(eviction_state *eviction_state, + object_id object_id) { + /* Check that the object ID is in the hash table. */ + released_object_entry *hash_table_entry; + HASH_FIND(handle, eviction_state->released_object_table, &object_id, + sizeof(object_id), hash_table_entry); + /* Only remove the object ID if it is in the LRU cache. */ + CHECK(hash_table_entry != NULL); + /* Remove the object ID from the doubly-linked list. */ + DL_DELETE(eviction_state->released_objects, + hash_table_entry->released_object); + /* Free the entry from the doubly-linked list. */ + free(hash_table_entry->released_object); + /* Remove the object ID from the hash table. */ + HASH_DELETE(handle, eviction_state->released_object_table, hash_table_entry); + /* Free the entry from the hash table. */ + free(hash_table_entry); +} + +int64_t choose_objects_to_evict(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + int64_t num_bytes_required, + int64_t *num_objects_to_evict, + object_id **objects_to_evict) { + int64_t num_objects = 0; + int64_t num_bytes = 0; + /* Figure out how many objects need to be evicted in order to recover a + * sufficient number of bytes. */ + released_object *element, *temp; + DL_FOREACH_SAFE(eviction_state->released_objects, element, temp) { + if (num_bytes >= num_bytes_required) { + break; + } + /* Find the object table entry for this object. */ + object_table_entry *entry; + HASH_FIND(handle, plasma_store_info->objects, &element->object_id, + sizeof(object_id), entry); + /* Update the cumulative bytes and the number of objects so far. */ + num_bytes += (entry->info.data_size + entry->info.metadata_size); + num_objects += 1; + } + /* Construct the return values. */ + *num_objects_to_evict = num_objects; + if (num_objects == 0) { + *objects_to_evict = NULL; + } else { + *objects_to_evict = (object_id *) malloc(num_objects * sizeof(object_id)); + int counter = 0; + DL_FOREACH_SAFE(eviction_state->released_objects, element, temp) { + if (counter == num_objects) { + break; + } + (*objects_to_evict)[counter] = element->object_id; + /* Update the LRU cache. */ + remove_object_from_lru_cache(eviction_state, element->object_id); + counter += 1; + } + } + /* Update the number used. */ + eviction_state->memory_used -= num_bytes; + return num_bytes; +} + +void object_created(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id) { + add_object_to_lru_cache(eviction_state, obj_id); +} + +void require_space(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + int64_t size, + int64_t *num_objects_to_evict, + object_id **objects_to_evict) { + /* Check if there is enough space to create the object. */ + int64_t required_space = + eviction_state->memory_used + size - eviction_state->memory_capacity; + if (required_space > 0) { + /* Try to free up as much free space as we need right now. */ + LOG_DEBUG("not enough space to create this object, so evicting objects"); + /* Choose some objects to evict, and update the return pointers. */ + int64_t num_bytes_evicted = choose_objects_to_evict( + eviction_state, plasma_store_info, required_space, num_objects_to_evict, + objects_to_evict); + printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted); + LOG_INFO( + "There is not enough space to create this object, so evicting " + "%" PRId64 " objects to free up %" PRId64 " bytes.\n", + *num_objects_to_evict, num_bytes_evicted); + CHECK(num_bytes_evicted >= required_space); + } else { + *num_objects_to_evict = 0; + *objects_to_evict = NULL; + } + eviction_state->memory_used += size; +} + +void begin_object_access(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id, + int64_t *num_objects_to_evict, + object_id **objects_to_evict) { + /* If the object is in the LRU cache, remove it. */ + remove_object_from_lru_cache(eviction_state, obj_id); + *num_objects_to_evict = 0; + *objects_to_evict = NULL; +} + +void end_object_access(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id, + int64_t *num_objects_to_evict, + object_id **objects_to_evict) { + /* Add the object to the LRU cache.*/ + add_object_to_lru_cache(eviction_state, obj_id); + *num_objects_to_evict = 0; + *objects_to_evict = NULL; +} diff --git a/src/plasma/eviction_policy.h b/src/plasma/eviction_policy.h new file mode 100644 index 000000000..88412c80f --- /dev/null +++ b/src/plasma/eviction_policy.h @@ -0,0 +1,148 @@ +#ifndef EVICTION_POLICY_H +#define EVICTION_POLICY_H + +#include "plasma.h" + +/* ==== The eviction policy ==== + * + * This file contains declaration for all functions and data structures that + * need to be provided if you want to implement a new eviction algorithm for the + * Plasma store. + */ + +/** Internal state of the eviction policy. */ +typedef struct eviction_state eviction_state; + +/** + * Initialize the eviction policy state. + * + * @param system_memory The amount of memory that can be used by the Plasma + * store. + * @return The internal state of the eviction policy. + */ +eviction_state *make_eviction_state(int64_t system_memory); + +/** + * Free the eviction policy state. + * + * @param state The state managed by the eviction policy. + * @return Void. + */ +void free_eviction_state(eviction_state *state); + +/** + * This method will be called whenever an object is first created in order to + * add it to the LRU cache. This is done so that the first time, the Plasma + * store calls begin_object_access, we can remove the object from the LRU cache. + * + * @param eviction_state The state managed by the eviction policy. + * @param plasma_store_info Information about the Plasma store that is exposed + * to the eviction policy. + * @param obj_id The object ID of the object that was created. + * @return Void. + */ +void object_created(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id); + +/** + * This method will be called when the Plasma store needs more space, perhaps to + * create a new object. If the required amount of space cannot be freed up, then + * a fatal error will be thrown. When this method is called, the eviction policy + * will assume that the objects chosen to be evicted will in fact be evicted + * from the Plasma store by the caller. + * + * @param eviction_state The state managed by the eviction policy. + * @param plasma_store_info Information about the Plasma store that is exposed + * to the eviction policy. + * @param size The size in bytes of the new object, including both data and + * metadata. + * @param num_objects_to_evict The number of objects that are chosen will be + * stored at this address. + * @param objects_to_evict An array of the object IDs that were chosen will be + * stored at this address. If the number of objects chosen is greater + * than 0, then the caller needs to free that array. If it equals 0, then + * the array will be NULL. + * @return Void. + */ +void require_space(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + int64_t size, + int64_t *num_objects_to_evict, + object_id **objects_to_evict); + +/** + * This method will be called whenever an unused object in the Plasma store + * starts to be used. When this method is called, the eviction policy will + * assume that the objects chosen to be evicted will in fact be evicted from the + * Plasma store by the caller. + * + * @param eviction_state The state managed by the eviction policy. + * @param plasma_store_info Information about the Plasma store that is exposed + * to the eviction policy. + * @param obj_id The ID of the object that is now being used. + * @param num_objects_to_evict The number of objects that are chosen will be + * stored at this address. + * @param objects_to_evict An array of the object IDs that were chosen will be + * stored at this address. If the number of objects chosen is greater + * than 0, then the caller needs to free that array. If it equals 0, then + * the array will be NULL. + * @return Void. + */ +void begin_object_access(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id, + int64_t *num_objects_to_evict, + object_id **objects_to_evict); + +/** + * This method will be called whenever an object in the Plasma store that was + * being used is no longer being used. When this method is called, the eviction + * policy will assume that the objects chosen to be evicted will in fact be + * evicted from the Plasma store by the caller. + * + * @param eviction_state The state managed by the eviction policy. + * @param plasma_store_info Information about the Plasma store that is exposed + * to the eviction policy. + * @param obj_id The ID of the object that is no longer being used. + * @param num_objects_to_evict The number of objects that are chosen will be + * stored at this address. + * @param objects_to_evict An array of the object IDs that were chosen will be + * stored at this address. If the number of objects chosen is greater + * than 0, then the caller needs to free that array. If it equals 0, then + * the array will be NULL. + * @return Void. + */ +void end_object_access(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + object_id obj_id, + int64_t *num_objects_to_evict, + object_id **objects_to_evict); + +/** + * Choose some objects to evict from the Plasma store. When this method is + * called, the eviction policy will assume that the objects chosen to be evicted + * will in fact be evicted from the Plasma store by the caller. + * + * @note This method is not part of the API. It is exposed in the header file + * only for testing. + * + * @param eviction_state The state managed by the eviction policy. + * @param plasma_store_info Information about the Plasma store that is exposed + * to the eviction policy. + * @param num_bytes_required The number of bytes of space to try to free up. + * @param num_objects_to_evict The number of objects that are chosen will be + * stored at this address. + * @param objects_to_evict An array of the object IDs that were chosen will be + * stored at this address. If the number of objects chosen is greater + * than 0, then the caller needs to free that array. If it equals 0, then + * the array will be NULL. + * @return The total number of bytes of space chosen to be evicted. + */ +int64_t choose_objects_to_evict(eviction_state *eviction_state, + plasma_store_info *plasma_store_info, + int64_t num_bytes_required, + int64_t *num_objects_to_evict, + object_id **objects_to_evict); + +#endif /* EVICTION_POLICY_H */ diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index 3a57514b1..0e7de23d4 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -205,6 +205,17 @@ class PlasmaClient(object): """ self.client.plasma_delete(self.plasma_conn, make_plasma_id(object_id)) + def evict(self, num_bytes): + """Evict some objects until to recover some bytes. + + Recover at least num_bytes bytes if possible. + + Args: + num_bytes (int): The number of bytes to attempt to recover. + """ + num_bytes_evicted = self.client.plasma_evict(self.plasma_conn, num_bytes) + return num_bytes_evicted + def transfer(self, addr, port, object_id): """Transfer local object with id object_id to another plasma instance diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 6bbba736d..e452dab31 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -9,6 +9,9 @@ #include "common.h" +#include "utarray.h" +#include "uthash.h" + typedef struct { int64_t data_size; int64_t metadata_size; @@ -56,6 +59,8 @@ enum plasma_message_type { PLASMA_SEAL, /** Delete an object. */ PLASMA_DELETE, + /** Evict objects from the store. */ + PLASMA_EVICT, /** Subscribe to notifications about sealed objects. */ PLASMA_SUBSCRIBE, /** Request transfer to another store. */ @@ -83,6 +88,8 @@ typedef struct { /** In a transfer request, this is the port of the Plasma Manager to transfer * the object to. */ int port; + /** A number of bytes. This is used for eviction requests. */ + int64_t num_bytes; /** The number of object IDs that will be included in this request. */ int num_object_ids; /** The IDs of the objects that the request is about. */ @@ -97,6 +104,8 @@ typedef struct { * present and 0 otherwise. Used for plasma_contains and * plasma_fetch. */ int has_object; + /** A number of bytes. This is used for replies to eviction requests. */ + int64_t num_bytes; /** Number of object IDs a wait is returning. */ int num_objects_returned; /** The number of object IDs that will be included in this reply. */ @@ -105,4 +114,33 @@ typedef struct { object_id object_ids[1]; } plasma_reply; +/** This type is used by the Plasma store. It is here because it is exposed to + * the eviction policy. */ +typedef struct { + /** Object id of this object. */ + object_id object_id; + /** Object info like size, creation time and owner. */ + plasma_object_info info; + /** Memory mapped file containing the object. */ + int fd; + /** Size of the underlying map. */ + int64_t map_size; + /** Offset from the base of the mmap. */ + ptrdiff_t offset; + /** Handle for the uthash table. */ + UT_hash_handle handle; + /** Pointer to the object data. Needed to free the object. */ + uint8_t *pointer; + /** An array of the clients that are currently using this object. */ + UT_array *clients; + /** The state of the object, e.g., whether it is open or sealed. */ + object_state state; +} object_table_entry; + +/** The plasma store information that is exposed to the eviction policy. */ +typedef struct { + /** Objects that are in the Plasma store. */ + object_table_entry *objects; +} plasma_store_info; + #endif diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 68b4203b8..ea6ddda5e 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -286,6 +286,18 @@ void plasma_delete(plasma_connection *conn, object_id object_id) { plasma_send_request(conn->store_conn, PLASMA_DELETE, &req); } +int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes) { + /* Send a request to the store to evict objects. */ + plasma_request req = {.num_bytes = num_bytes}; + plasma_send_request(conn->store_conn, PLASMA_EVICT, &req); + /* Wait for a response with the number of bytes actually evicted. */ + plasma_reply reply; + int r = read(conn->store_conn, &reply, sizeof(plasma_reply)); + CHECKM(r != -1, "read error"); + CHECKM(r != 0, "connection disconnected"); + return reply.num_bytes; +} + int plasma_subscribe(plasma_connection *conn) { int fd[2]; /* Create a non-blocking socket pair. This will only be used to send diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index edae26fd5..08e2d5dc2 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -177,6 +177,16 @@ void plasma_seal(plasma_connection *conn, object_id object_id); */ void plasma_delete(plasma_connection *conn, object_id object_id); +/** + * Delete objects until we have freed up num_bytes bytes or there are no more + * released objects that can be deleted. + * + * @param conn The object containing the connection state. + * @param num_bytes The number of bytes to try to free up. + * @return The total number of bytes of space retrieved. + */ +int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes); + /** * Fetch objects from remote plasma stores that have the * objects stored. diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 2508ed4ae..a92b60a55 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -25,6 +25,7 @@ #include "common.h" #include "event_loop.h" +#include "eviction_policy.h" #include "io.h" #include "uthash.h" #include "utarray.h" @@ -46,27 +47,6 @@ void plasma_send_reply(int fd, plasma_reply *reply) { } } -typedef struct { - /* Object id of this object. */ - object_id object_id; - /* Object info like size, creation time and owner. */ - plasma_object_info info; - /* Memory mapped file containing the object. */ - int fd; - /* Size of the underlying map. */ - int64_t map_size; - /* Offset from the base of the mmap. */ - ptrdiff_t offset; - /* Handle for the uthash table. */ - UT_hash_handle handle; - /* Pointer to the object data. Needed to free the object. */ - uint8_t *pointer; - /* An array of the clients that are currently using this object. */ - UT_array *clients; - /* The state of the object, e.g., whether it is open or sealed. */ - object_state state; -} object_table_entry; - typedef struct { /* Object id of this object. */ object_id object_id; @@ -76,7 +56,7 @@ typedef struct { UT_hash_handle handle; } object_notify_entry; -/** Contains all information that is associated with a client. */ +/** Contains all information that is associated with a Plasma store client. */ struct client { /** The socket used to communicate with the client. */ int sock; @@ -105,22 +85,29 @@ typedef struct { struct plasma_store_state { /* Event loop of the plasma store. */ event_loop *loop; - /* A hash table of all the objects in the store. */ - object_table_entry *objects; /* Objects that processes are waiting for. */ object_notify_entry *objects_notify; /** The pending notifications that have not been sent to subscribers because * the socket send buffers were full. This is a hash table from client file * descriptor to an array of object_ids to send to that client. */ notification_queue *pending_notifications; + /** The plasma store information, including the object tables, that is exposed + * to the eviction policy. */ + plasma_store_info *plasma_store_info; + /** The state that is managed by the eviction policy. */ + eviction_state *eviction_state; }; -plasma_store_state *init_plasma_store(event_loop *loop) { +plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { plasma_store_state *state = malloc(sizeof(plasma_store_state)); state->loop = loop; - state->objects = NULL; state->objects_notify = NULL; state->pending_notifications = NULL; + /* Initialize the plasma store info. */ + state->plasma_store_info = malloc(sizeof(plasma_store_info)); + state->plasma_store_info->objects = NULL; + /* Initialize the eviction state. */ + state->eviction_state = make_eviction_state(system_memory); return state; } @@ -135,25 +122,44 @@ void add_client_to_object_clients(object_table_entry *entry, return; } } + /* If there are no other clients using this object, notify the eviction policy + * that the object is being used. */ + if (utarray_len(entry->clients) == 0) { + /* Tell the eviction policy that this object is being used. */ + int64_t num_objects_to_evict; + object_id *objects_to_evict; + begin_object_access(client_info->plasma_state->eviction_state, + client_info->plasma_state->plasma_store_info, + entry->object_id, &num_objects_to_evict, + &objects_to_evict); + remove_objects(client_info->plasma_state, num_objects_to_evict, + objects_to_evict); + } /* Add the client pointer to the list of clients using this object. */ utarray_push_back(entry->clients, &client_info); } /* Create a new object buffer in the hash table. */ void create_object(client *client_context, - object_id object_id, + object_id obj_id, int64_t data_size, int64_t metadata_size, plasma_object *result) { LOG_DEBUG("creating object"); /* TODO(pcm): add object_id here */ plasma_store_state *plasma_state = client_context->plasma_state; - object_table_entry *entry; /* TODO(swang): Return these error to the client instead of exiting. */ - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id, + sizeof(object_id), entry); CHECKM(entry == NULL, "Cannot create object twice."); - + /* Tell the eviction policy how much space we need to create this object. */ + int64_t num_objects_to_evict; + object_id *objects_to_evict; + require_space(plasma_state->eviction_state, plasma_state->plasma_store_info, + data_size + metadata_size, &num_objects_to_evict, + &objects_to_evict); + remove_objects(plasma_state, num_objects_to_evict, objects_to_evict); + /* Allocate space for the new object */ uint8_t *pointer = dlmalloc(data_size + metadata_size); int fd; int64_t map_size; @@ -162,7 +168,7 @@ void create_object(client *client_context, assert(fd != -1); entry = malloc(sizeof(object_table_entry)); - memcpy(&entry->object_id, &object_id, sizeof(object_id)); + memcpy(&entry->object_id, &obj_id, sizeof(object_id)); entry->info.data_size = data_size; entry->info.metadata_size = metadata_size; entry->pointer = pointer; @@ -172,13 +178,19 @@ void create_object(client *client_context, entry->offset = offset; entry->state = OPEN; utarray_new(entry->clients, &client_icd); - HASH_ADD(handle, plasma_state->objects, object_id, sizeof(object_id), entry); + HASH_ADD(handle, plasma_state->plasma_store_info->objects, object_id, + sizeof(object_id), entry); result->handle.store_fd = fd; result->handle.mmap_size = map_size; result->data_offset = offset; result->metadata_offset = offset + data_size; result->data_size = data_size; result->metadata_size = metadata_size; + /* Notify the eviction policy that this object was created. This must be done + * immediately before the call to add_client_to_object_clients so that the + * eviction policy does not have an opportunity to evict the object. */ + object_created(plasma_state->eviction_state, plasma_state->plasma_store_info, + obj_id); /* Record that this client is using this object. */ add_client_to_object_clients(entry, client_context); } @@ -190,8 +202,8 @@ int get_object(client *client_context, plasma_object *result) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); if (entry && entry->state == SEALED) { result->handle.store_fd = entry->fd; result->handle.mmap_size = entry->map_size; @@ -229,6 +241,19 @@ int remove_client_from_object_clients(object_table_entry *entry, if (*c == client_info) { /* Remove the client from the array. */ utarray_erase(entry->clients, i, 1); + /* If no more clients are using this object, notify the eviction policy + * that the object is no longer being used. */ + if (utarray_len(entry->clients) == 0) { + /* Tell the eviction policy that this object is no longer being used. */ + int64_t num_objects_to_evict; + object_id *objects_to_evict; + end_object_access(client_info->plasma_state->eviction_state, + client_info->plasma_state->plasma_store_info, + entry->object_id, &num_objects_to_evict, + &objects_to_evict); + remove_objects(client_info->plasma_state, num_objects_to_evict, + objects_to_evict); + } /* Return 1 to indicate that the client was removed. */ return 1; } @@ -240,8 +265,8 @@ int remove_client_from_object_clients(object_table_entry *entry, void release_object(client *client_context, object_id object_id) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); CHECK(entry != NULL); /* Remove the client from the object's array of clients. */ CHECK(remove_client_from_object_clients(entry, client_context) == 1); @@ -251,8 +276,8 @@ void release_object(client *client_context, object_id object_id) { int contains_object(client *client_context, object_id object_id) { plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); return entry && (entry->state == SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; } @@ -261,9 +286,10 @@ void seal_object(client *client_context, object_id object_id) { LOG_DEBUG("sealing object"); // TODO(pcm): add object_id here plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry; - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); - CHECK(entry != NULL && entry->state == OPEN); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); + CHECK(entry != NULL); + CHECK(entry->state == OPEN); /* Set the state of object to SEALED. */ entry->state = SEALED; /* Inform all subscribers that a new object has been sealed. */ @@ -302,28 +328,41 @@ void seal_object(client *client_context, object_id object_id) { } } -/* Delete an object that has been created in the hash table. */ -void delete_object(client *client_context, object_id object_id) { - LOG_DEBUG("deleting object"); // TODO(rkn): add object_id here - plasma_store_state *plasma_state = client_context->plasma_state; +/* Delete an object that has been created in the hash table. This should only + * be called on objects that are returned by the eviction policy to evict. */ +void delete_object(plasma_store_state *plasma_state, object_id object_id) { + LOG_DEBUG("deleting object"); object_table_entry *entry; - HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id), - entry); + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); /* TODO(rkn): This should probably not fail, but should instead throw an * error. Maybe we should also support deleting objects that have been created * but not sealed. */ - CHECKM(entry != NULL, "To delete an object it must have been created."); + CHECKM(entry != NULL, "To delete an object it must be in the object table."); CHECKM(entry->state == SEALED, "To delete an object it must have been sealed."); CHECKM(utarray_len(entry->clients) == 0, "To delete an object, there must be no clients currently using it."); uint8_t *pointer = entry->pointer; - HASH_DELETE(handle, plasma_state->objects, entry); + HASH_DELETE(handle, plasma_state->plasma_store_info->objects, entry); dlfree(pointer); utarray_free(entry->clients); free(entry); } +void remove_objects(plasma_store_state *plasma_state, + int64_t num_objects_to_evict, + object_id *objects_to_evict) { + if (num_objects_to_evict > 0) { + for (int i = 0; i < num_objects_to_evict; ++i) { + delete_object(plasma_state, objects_to_evict[i]); + } + /* Free the array of objects to evict. This array was originally allocated + * by the eviction policy. */ + free(objects_to_evict); + } +} + /* Send more notifications to a subscriber. */ void send_notifications(event_loop *loop, int client_sock, @@ -373,7 +412,7 @@ void subscribe_to_updates(client *client_context, int conn) { plasma_store_state *plasma_state = client_context->plasma_state; char dummy; int fd = recv_fd(conn, &dummy, 1); - CHECKM(HASH_CNT(handle, plasma_state->objects) == 0, + CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0, "plasma_subscribe should be called before any objects are created."); /* Create a new array to buffer notifications that can't be sent to the * subscriber yet because the socket send buffer is full. TODO(rkn): the queue @@ -425,8 +464,23 @@ void process_message(event_loop *loop, seal_object(client_context, req->object_ids[0]); break; case PLASMA_DELETE: - delete_object(client_context, req->object_ids[0]); + /* TODO(rkn): In the future, we can use this method to give hints to the + * eviction policy about when an object will no longer be needed. */ break; + case PLASMA_EVICT: { + /* This code path should only be used for testing. */ + int64_t num_objects_to_evict; + object_id *objects_to_evict; + int64_t num_bytes_evicted = choose_objects_to_evict( + client_context->plasma_state->eviction_state, + client_context->plasma_state->plasma_store_info, req->num_bytes, + &num_objects_to_evict, &objects_to_evict); + remove_objects(client_context->plasma_state, num_objects_to_evict, + objects_to_evict); + reply.num_bytes = num_bytes_evicted; + plasma_send_reply(client_sock, &reply); + break; + } case PLASMA_SUBSCRIBE: subscribe_to_updates(client_context, client_sock); break; @@ -437,7 +491,8 @@ void process_message(event_loop *loop, * lists. */ plasma_store_state *plasma_state = client_context->plasma_state; object_table_entry *entry, *temp_entry; - HASH_ITER(handle, plasma_state->objects, entry, temp_entry) { + HASH_ITER(handle, plasma_state->plasma_store_info->objects, entry, + temp_entry) { remove_client_from_object_clients(entry, client_context); } } break; @@ -473,9 +528,9 @@ void signal_handler(int signal) { } } -void start_server(char *socket_name) { +void start_server(char *socket_name, int64_t system_memory) { event_loop *loop = event_loop_create(); - plasma_store_state *state = init_plasma_store(loop); + plasma_store_state *state = init_plasma_store(loop, system_memory); int socket = bind_ipc_sock(socket_name, true); CHECK(socket >= 0); event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection, @@ -486,12 +541,21 @@ void start_server(char *socket_name) { int main(int argc, char *argv[]) { signal(SIGTERM, signal_handler); char *socket_name = NULL; + int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:")) != -1) { + while ((c = getopt(argc, argv, "s:m:")) != -1) { switch (c) { case 's': socket_name = optarg; break; + case 'm': { + char extra; + int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); + CHECK(scanned == 1); + LOG_INFO("Allowing the Plasma store to use up to %.2fGB of memory.", + ((double) system_memory) / 1000000000); + break; + } default: exit(-1); } @@ -500,6 +564,10 @@ int main(int argc, char *argv[]) { LOG_ERR("please specify socket for incoming connections with -s switch"); exit(-1); } + if (system_memory == -1) { + LOG_ERR("please specify the amount of system memory with -m switch"); + exit(-1); + } LOG_DEBUG("starting server listening on %s", socket_name); - start_server(socket_name); + start_server(socket_name, system_memory); } diff --git a/src/plasma/plasma_store.h b/src/plasma/plasma_store.h index 9caa967ab..235c6c171 100644 --- a/src/plasma/plasma_store.h +++ b/src/plasma/plasma_store.h @@ -68,15 +68,6 @@ void seal_object(client *client_context, object_id object_id); */ int contains_object(client *client_context, object_id object_id); -/** - * Delete an object from the plasma store: - * - * @param client_context The context of the client making this request. - * @param object_id Object ID of the object to be deleted. - * @return Void. - */ -void delete_object(client *client_context, object_id object_id); - /** * Send notifications about sealed objects to the subscribers. This is called * in seal_object. If the socket's send buffer is full, the notification will be @@ -94,4 +85,8 @@ void send_notifications(event_loop *loop, void *plasma_state, int events); +void remove_objects(plasma_store_state *plasma_state, + int64_t num_objects_to_evict, + object_id *objects_to_evict); + #endif /* PLASMA_STORE_H */ diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index caeaa79fe..7a7bf61b7 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -15,6 +15,7 @@ import threading import plasma USE_VALGRIND = False +PLASMA_STORE_MEMORY = 1000000000 def random_object_id(): return "".join([chr(random.randint(0, 255)) for _ in range(plasma.PLASMA_ID_SIZE)]) @@ -64,7 +65,7 @@ class TestPlasmaClient(unittest.TestCase): # Start Plasma. plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") store_name = "/tmp/store{}".format(random.randint(0, 10000)) - command = [plasma_store_executable, "-s", store_name] + command = [plasma_store_executable, "-s", store_name, "-m", str(PLASMA_STORE_MEMORY)] if USE_VALGRIND: self.p = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command) time.sleep(2.0) @@ -197,6 +198,44 @@ class TestPlasmaClient(unittest.TestCase): memory_buffer[0] = chr(0) self.assertRaises(Exception, illegal_assignment) + def test_evict(self): + object_id1 = random_object_id() + b1 = self.plasma_client.create(object_id1, 1000) + self.plasma_client.seal(object_id1) + del b1 + self.assertEqual(self.plasma_client.evict(1), 1000) + + object_id2 = random_object_id() + object_id3 = random_object_id() + b2 = self.plasma_client.create(object_id2, 999) + b3 = self.plasma_client.create(object_id3, 998) + del b3 + self.plasma_client.seal(object_id3) + self.assertEqual(self.plasma_client.evict(1000), 998) + + object_id4 = random_object_id() + b4 = self.plasma_client.create(object_id4, 997) + self.plasma_client.seal(object_id4) + del b4 + self.plasma_client.seal(object_id2) + del b2 + self.assertEqual(self.plasma_client.evict(1), 997) + self.assertEqual(self.plasma_client.evict(1), 999) + + object_id5 = random_object_id() + object_id6 = random_object_id() + object_id7 = random_object_id() + b5 = self.plasma_client.create(object_id5, 996) + b6 = self.plasma_client.create(object_id6, 995) + b7 = self.plasma_client.create(object_id7, 994) + self.plasma_client.seal(object_id5) + self.plasma_client.seal(object_id6) + self.plasma_client.seal(object_id7) + del b5 + del b6 + del b7 + self.assertEqual(self.plasma_client.evict(2000), 996 + 995 + 994) + def test_subscribe(self): # Subscribe to notifications from the Plasma Store. sock = self.plasma_client.subscribe() @@ -220,8 +259,8 @@ class TestPlasmaManager(unittest.TestCase): store_name2 = "/tmp/store{}".format(random.randint(0, 10000)) manager_name1 = "/tmp/manager{}".format(random.randint(0, 10000)) manager_name2 = "/tmp/manager{}".format(random.randint(0, 10000)) - plasma_store_command1 = [plasma_store_executable, "-s", store_name1] - plasma_store_command2 = [plasma_store_executable, "-s", store_name2] + plasma_store_command1 = [plasma_store_executable, "-s", store_name1, "-m", str(PLASMA_STORE_MEMORY)] + plasma_store_command2 = [plasma_store_executable, "-s", store_name2, "-m", str(PLASMA_STORE_MEMORY)] if USE_VALGRIND: self.p2 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_store_command1)