Add basic LRU eviction for the plasma store. (#26)

* Basic functionality for LRU eviction.

* Test eviction.

* Factor out eviction policy.

* Move delete_object into eviction policy.

* Replace array of released objects with an LRU cache (hash table + doubly linked list).

* Finish rebase on master.

* Move actual object deletion away from eviction policy and into plasma store.

* Small fixes.

* Fixes.

* Make remove_object_from_lru_cache always remove the object.

* Minor formatting and comments.

* Pass in allowed memory as argument to Plasma store.

* Small fix.
This commit is contained in:
Robert Nishihara
2016-11-05 21:34:11 -07:00
committed by Philipp Moritz
parent 90a2aa4bf7
commit efe8a295ea
13 changed files with 614 additions and 78 deletions
+2 -2
View File
@@ -31,13 +31,13 @@ if [[ $platform == "linux" ]]; then
# These commands must be kept in sync with the installation instructions.
sudo apt-get update
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip
sudo pip install funcsigs colorama redis
sudo pip install funcsigs colorama psutil redis
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
elif [[ $platform == "macosx" ]]; then
# These commands must be kept in sync with the installation instructions.
brew install git cmake automake autoconf libtool boost
sudo easy_install pip
sudo pip install numpy funcsigs colorama redis --ignore-installed six
sudo pip install numpy funcsigs colorama psutil redis --ignore-installed six
sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples.
fi
+8 -4
View File
@@ -1,12 +1,13 @@
from __future__ import print_function
import psutil
import os
import sys
import time
import random
import signal
import sys
import subprocess
import string
import random
import time
# Ray modules
import config
@@ -90,13 +91,16 @@ def start_objstore(node_ip_address, redis_address, cleanup=True):
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
# Let the object store use a fraction of the system memory.
system_memory = psutil.virtual_memory().total
plasma_store_memory = int(system_memory * 0.75)
plasma_store_filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store")
if RUN_PLASMA_STORE_PROFILER:
plasma_store_prefix = ["valgrind", "--tool=callgrind", plasma_store_filepath]
else:
plasma_store_prefix = [plasma_store_filepath]
store_name = "/tmp/ray_plasma_store{}".format(random_name())
p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name])
p1 = subprocess.Popen(plasma_store_prefix + ["-s", store_name, "-m", str(plasma_store_memory)])
manager_name = "/tmp/ray_plasma_manager{}".format(random_name())
p2, manager_port = plasma.start_plasma_manager(store_name, manager_name, redis_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER)
+2 -1
View File
@@ -13,6 +13,7 @@ import photon
import plasma
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
class TestPhotonClient(unittest.TestCase):
@@ -23,7 +24,7 @@ class TestPhotonClient(unittest.TestCase):
# Start Plasma.
plasma_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../plasma/build/plasma_store")
plasma_socket = "/tmp/plasma_store{}".format(random.randint(0, 10000))
self.p2 = subprocess.Popen([plasma_executable, "-s", plasma_socket])
self.p2 = subprocess.Popen([plasma_executable, "-s", plasma_socket, "-m", str(PLASMA_STORE_MEMORY)])
time.sleep(0.1)
self.plasma_client = plasma.PlasmaClient(plasma_socket)
scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/photon_scheduler")
+2 -2
View File
@@ -16,8 +16,8 @@ clean:
$(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common
$(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a
$(BUILD)/plasma_store: plasma_store.c plasma.h fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common
$(CC) $(CFLAGS) plasma_store.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store
$(BUILD)/plasma_store: plasma_store.c plasma.h eviction_policy.c fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common
$(CC) $(CFLAGS) plasma_store.c eviction_policy.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store
$(BUILD)/plasma_manager: plasma_manager.c plasma.h plasma_client.c fling.h fling.c common
$(CC) $(CFLAGS) plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a -o $(BUILD)/plasma_manager
+210
View File
@@ -0,0 +1,210 @@
#include "eviction_policy.h"
#include "utlist.h"
/** An element representing a released object in a doubly-linked list. This is
* used to implement an LRU cache. */
typedef struct released_object {
/** The object_id of the released object. */
object_id object_id;
/** Needed for the doubly-linked list macros. */
struct released_object *prev;
/** Needed for the doubly-linked list macros. */
struct released_object *next;
} released_object;
/** This type is used to define a hash table mapping the object ID of a released
* object to its location in the doubly-linked list of released objects. */
typedef struct {
/** Object ID of this object. */
object_id object_id;
/** A pointer to the corresponding entry for this object in the doubly-linked
* list of released objects. */
released_object *released_object;
/** Handle for the uthash table. */
UT_hash_handle handle;
} released_object_entry;
/** The part of the Plasma state that is maintained by the eviction policy. */
struct eviction_state {
/** The amount of memory (in bytes) that we allow to be allocated in the
* store. */
int64_t memory_capacity;
/** The amount of memory (in bytes) currently being used. */
int64_t memory_used;
/** A doubly-linked list of the released objects in order from least recently
* released to most recently released. */
released_object *released_objects;
/** A hash table mapping the object ID of a released object to its location in
* the doubly linked list of released objects. */
released_object_entry *released_object_table;
};
/* This is used to define the array of object IDs used to define the
* released_objects type. */
UT_icd released_objects_entry_icd = {sizeof(object_id), NULL, NULL, NULL};
eviction_state *make_eviction_state(int64_t system_memory) {
eviction_state *state = malloc(sizeof(eviction_state));
/* Find the amount of available memory on the machine. */
state->memory_capacity = system_memory;
state->memory_used = 0;
state->released_objects = NULL;
state->released_object_table = NULL;
return state;
}
void free_eviction_state(eviction_state *s) {
/* Delete each element in the doubly-linked list. */
released_object *element, *temp;
DL_FOREACH_SAFE(s->released_objects, element, temp) {
DL_DELETE(s->released_objects, element);
free(element);
}
/* Delete each element in the hash table. */
released_object_entry *current_entry, *temp_entry;
HASH_ITER(handle, s->released_object_table, current_entry, temp_entry) {
HASH_DELETE(handle, s->released_object_table, current_entry);
free(current_entry);
}
/* Free the eviction state. */
free(s);
}
void add_object_to_lru_cache(eviction_state *eviction_state,
object_id object_id) {
/* Add the object ID to the doubly-linked list. */
released_object *linked_list_entry = malloc(sizeof(released_object));
linked_list_entry->object_id = object_id;
DL_APPEND(eviction_state->released_objects, linked_list_entry);
/* Check that the object ID is not already in the hash table. */
released_object_entry *hash_table_entry;
HASH_FIND(handle, eviction_state->released_object_table, &object_id,
sizeof(object_id), hash_table_entry);
CHECK(hash_table_entry == NULL);
/* Add the object ID to the hash table. */
hash_table_entry = malloc(sizeof(released_object_entry));
hash_table_entry->object_id = object_id;
hash_table_entry->released_object = linked_list_entry;
HASH_ADD(handle, eviction_state->released_object_table, object_id,
sizeof(object_id), hash_table_entry);
}
void remove_object_from_lru_cache(eviction_state *eviction_state,
object_id object_id) {
/* Check that the object ID is in the hash table. */
released_object_entry *hash_table_entry;
HASH_FIND(handle, eviction_state->released_object_table, &object_id,
sizeof(object_id), hash_table_entry);
/* Only remove the object ID if it is in the LRU cache. */
CHECK(hash_table_entry != NULL);
/* Remove the object ID from the doubly-linked list. */
DL_DELETE(eviction_state->released_objects,
hash_table_entry->released_object);
/* Free the entry from the doubly-linked list. */
free(hash_table_entry->released_object);
/* Remove the object ID from the hash table. */
HASH_DELETE(handle, eviction_state->released_object_table, hash_table_entry);
/* Free the entry from the hash table. */
free(hash_table_entry);
}
int64_t choose_objects_to_evict(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t num_bytes_required,
int64_t *num_objects_to_evict,
object_id **objects_to_evict) {
int64_t num_objects = 0;
int64_t num_bytes = 0;
/* Figure out how many objects need to be evicted in order to recover a
* sufficient number of bytes. */
released_object *element, *temp;
DL_FOREACH_SAFE(eviction_state->released_objects, element, temp) {
if (num_bytes >= num_bytes_required) {
break;
}
/* Find the object table entry for this object. */
object_table_entry *entry;
HASH_FIND(handle, plasma_store_info->objects, &element->object_id,
sizeof(object_id), entry);
/* Update the cumulative bytes and the number of objects so far. */
num_bytes += (entry->info.data_size + entry->info.metadata_size);
num_objects += 1;
}
/* Construct the return values. */
*num_objects_to_evict = num_objects;
if (num_objects == 0) {
*objects_to_evict = NULL;
} else {
*objects_to_evict = (object_id *) malloc(num_objects * sizeof(object_id));
int counter = 0;
DL_FOREACH_SAFE(eviction_state->released_objects, element, temp) {
if (counter == num_objects) {
break;
}
(*objects_to_evict)[counter] = element->object_id;
/* Update the LRU cache. */
remove_object_from_lru_cache(eviction_state, element->object_id);
counter += 1;
}
}
/* Update the number used. */
eviction_state->memory_used -= num_bytes;
return num_bytes;
}
void object_created(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id) {
add_object_to_lru_cache(eviction_state, obj_id);
}
void require_space(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
object_id **objects_to_evict) {
/* Check if there is enough space to create the object. */
int64_t required_space =
eviction_state->memory_used + size - eviction_state->memory_capacity;
if (required_space > 0) {
/* Try to free up as much free space as we need right now. */
LOG_DEBUG("not enough space to create this object, so evicting objects");
/* Choose some objects to evict, and update the return pointers. */
int64_t num_bytes_evicted = choose_objects_to_evict(
eviction_state, plasma_store_info, required_space, num_objects_to_evict,
objects_to_evict);
printf("Evicted %" PRId64 " bytes.\n", num_bytes_evicted);
LOG_INFO(
"There is not enough space to create this object, so evicting "
"%" PRId64 " objects to free up %" PRId64 " bytes.\n",
*num_objects_to_evict, num_bytes_evicted);
CHECK(num_bytes_evicted >= required_space);
} else {
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
}
eviction_state->memory_used += size;
}
void begin_object_access(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id,
int64_t *num_objects_to_evict,
object_id **objects_to_evict) {
/* If the object is in the LRU cache, remove it. */
remove_object_from_lru_cache(eviction_state, obj_id);
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
}
void end_object_access(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id,
int64_t *num_objects_to_evict,
object_id **objects_to_evict) {
/* Add the object to the LRU cache.*/
add_object_to_lru_cache(eviction_state, obj_id);
*num_objects_to_evict = 0;
*objects_to_evict = NULL;
}
+148
View File
@@ -0,0 +1,148 @@
#ifndef EVICTION_POLICY_H
#define EVICTION_POLICY_H
#include "plasma.h"
/* ==== The eviction policy ====
*
* This file contains declaration for all functions and data structures that
* need to be provided if you want to implement a new eviction algorithm for the
* Plasma store.
*/
/** Internal state of the eviction policy. */
typedef struct eviction_state eviction_state;
/**
* Initialize the eviction policy state.
*
* @param system_memory The amount of memory that can be used by the Plasma
* store.
* @return The internal state of the eviction policy.
*/
eviction_state *make_eviction_state(int64_t system_memory);
/**
* Free the eviction policy state.
*
* @param state The state managed by the eviction policy.
* @return Void.
*/
void free_eviction_state(eviction_state *state);
/**
* This method will be called whenever an object is first created in order to
* add it to the LRU cache. This is done so that the first time, the Plasma
* store calls begin_object_access, we can remove the object from the LRU cache.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The object ID of the object that was created.
* @return Void.
*/
void object_created(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id);
/**
* This method will be called when the Plasma store needs more space, perhaps to
* create a new object. If the required amount of space cannot be freed up, then
* a fatal error will be thrown. When this method is called, the eviction policy
* will assume that the objects chosen to be evicted will in fact be evicted
* from the Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param size The size in bytes of the new object, including both data and
* metadata.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
*/
void require_space(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t size,
int64_t *num_objects_to_evict,
object_id **objects_to_evict);
/**
* This method will be called whenever an unused object in the Plasma store
* starts to be used. When this method is called, the eviction policy will
* assume that the objects chosen to be evicted will in fact be evicted from the
* Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The ID of the object that is now being used.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
*/
void begin_object_access(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id,
int64_t *num_objects_to_evict,
object_id **objects_to_evict);
/**
* This method will be called whenever an object in the Plasma store that was
* being used is no longer being used. When this method is called, the eviction
* policy will assume that the objects chosen to be evicted will in fact be
* evicted from the Plasma store by the caller.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param obj_id The ID of the object that is no longer being used.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return Void.
*/
void end_object_access(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
object_id obj_id,
int64_t *num_objects_to_evict,
object_id **objects_to_evict);
/**
* Choose some objects to evict from the Plasma store. When this method is
* called, the eviction policy will assume that the objects chosen to be evicted
* will in fact be evicted from the Plasma store by the caller.
*
* @note This method is not part of the API. It is exposed in the header file
* only for testing.
*
* @param eviction_state The state managed by the eviction policy.
* @param plasma_store_info Information about the Plasma store that is exposed
* to the eviction policy.
* @param num_bytes_required The number of bytes of space to try to free up.
* @param num_objects_to_evict The number of objects that are chosen will be
* stored at this address.
* @param objects_to_evict An array of the object IDs that were chosen will be
* stored at this address. If the number of objects chosen is greater
* than 0, then the caller needs to free that array. If it equals 0, then
* the array will be NULL.
* @return The total number of bytes of space chosen to be evicted.
*/
int64_t choose_objects_to_evict(eviction_state *eviction_state,
plasma_store_info *plasma_store_info,
int64_t num_bytes_required,
int64_t *num_objects_to_evict,
object_id **objects_to_evict);
#endif /* EVICTION_POLICY_H */
+11
View File
@@ -205,6 +205,17 @@ class PlasmaClient(object):
"""
self.client.plasma_delete(self.plasma_conn, make_plasma_id(object_id))
def evict(self, num_bytes):
"""Evict some objects until to recover some bytes.
Recover at least num_bytes bytes if possible.
Args:
num_bytes (int): The number of bytes to attempt to recover.
"""
num_bytes_evicted = self.client.plasma_evict(self.plasma_conn, num_bytes)
return num_bytes_evicted
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to another plasma instance
+38
View File
@@ -9,6 +9,9 @@
#include "common.h"
#include "utarray.h"
#include "uthash.h"
typedef struct {
int64_t data_size;
int64_t metadata_size;
@@ -56,6 +59,8 @@ enum plasma_message_type {
PLASMA_SEAL,
/** Delete an object. */
PLASMA_DELETE,
/** Evict objects from the store. */
PLASMA_EVICT,
/** Subscribe to notifications about sealed objects. */
PLASMA_SUBSCRIBE,
/** Request transfer to another store. */
@@ -83,6 +88,8 @@ typedef struct {
/** In a transfer request, this is the port of the Plasma Manager to transfer
* the object to. */
int port;
/** A number of bytes. This is used for eviction requests. */
int64_t num_bytes;
/** The number of object IDs that will be included in this request. */
int num_object_ids;
/** The IDs of the objects that the request is about. */
@@ -97,6 +104,8 @@ typedef struct {
* present and 0 otherwise. Used for plasma_contains and
* plasma_fetch. */
int has_object;
/** A number of bytes. This is used for replies to eviction requests. */
int64_t num_bytes;
/** Number of object IDs a wait is returning. */
int num_objects_returned;
/** The number of object IDs that will be included in this reply. */
@@ -105,4 +114,33 @@ typedef struct {
object_id object_ids[1];
} plasma_reply;
/** This type is used by the Plasma store. It is here because it is exposed to
* the eviction policy. */
typedef struct {
/** Object id of this object. */
object_id object_id;
/** Object info like size, creation time and owner. */
plasma_object_info info;
/** Memory mapped file containing the object. */
int fd;
/** Size of the underlying map. */
int64_t map_size;
/** Offset from the base of the mmap. */
ptrdiff_t offset;
/** Handle for the uthash table. */
UT_hash_handle handle;
/** Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/** An array of the clients that are currently using this object. */
UT_array *clients;
/** The state of the object, e.g., whether it is open or sealed. */
object_state state;
} object_table_entry;
/** The plasma store information that is exposed to the eviction policy. */
typedef struct {
/** Objects that are in the Plasma store. */
object_table_entry *objects;
} plasma_store_info;
#endif
+12
View File
@@ -286,6 +286,18 @@ void plasma_delete(plasma_connection *conn, object_id object_id) {
plasma_send_request(conn->store_conn, PLASMA_DELETE, &req);
}
int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes) {
/* Send a request to the store to evict objects. */
plasma_request req = {.num_bytes = num_bytes};
plasma_send_request(conn->store_conn, PLASMA_EVICT, &req);
/* Wait for a response with the number of bytes actually evicted. */
plasma_reply reply;
int r = read(conn->store_conn, &reply, sizeof(plasma_reply));
CHECKM(r != -1, "read error");
CHECKM(r != 0, "connection disconnected");
return reply.num_bytes;
}
int plasma_subscribe(plasma_connection *conn) {
int fd[2];
/* Create a non-blocking socket pair. This will only be used to send
+10
View File
@@ -177,6 +177,16 @@ void plasma_seal(plasma_connection *conn, object_id object_id);
*/
void plasma_delete(plasma_connection *conn, object_id object_id);
/**
* Delete objects until we have freed up num_bytes bytes or there are no more
* released objects that can be deleted.
*
* @param conn The object containing the connection state.
* @param num_bytes The number of bytes to try to free up.
* @return The total number of bytes of space retrieved.
*/
int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes);
/**
* Fetch objects from remote plasma stores that have the
* objects stored.
+125 -57
View File
@@ -25,6 +25,7 @@
#include "common.h"
#include "event_loop.h"
#include "eviction_policy.h"
#include "io.h"
#include "uthash.h"
#include "utarray.h"
@@ -46,27 +47,6 @@ void plasma_send_reply(int fd, plasma_reply *reply) {
}
}
typedef struct {
/* Object id of this object. */
object_id object_id;
/* Object info like size, creation time and owner. */
plasma_object_info info;
/* Memory mapped file containing the object. */
int fd;
/* Size of the underlying map. */
int64_t map_size;
/* Offset from the base of the mmap. */
ptrdiff_t offset;
/* Handle for the uthash table. */
UT_hash_handle handle;
/* Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/* An array of the clients that are currently using this object. */
UT_array *clients;
/* The state of the object, e.g., whether it is open or sealed. */
object_state state;
} object_table_entry;
typedef struct {
/* Object id of this object. */
object_id object_id;
@@ -76,7 +56,7 @@ typedef struct {
UT_hash_handle handle;
} object_notify_entry;
/** Contains all information that is associated with a client. */
/** Contains all information that is associated with a Plasma store client. */
struct client {
/** The socket used to communicate with the client. */
int sock;
@@ -105,22 +85,29 @@ typedef struct {
struct plasma_store_state {
/* Event loop of the plasma store. */
event_loop *loop;
/* A hash table of all the objects in the store. */
object_table_entry *objects;
/* Objects that processes are waiting for. */
object_notify_entry *objects_notify;
/** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file
* descriptor to an array of object_ids to send to that client. */
notification_queue *pending_notifications;
/** The plasma store information, including the object tables, that is exposed
* to the eviction policy. */
plasma_store_info *plasma_store_info;
/** The state that is managed by the eviction policy. */
eviction_state *eviction_state;
};
plasma_store_state *init_plasma_store(event_loop *loop) {
plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) {
plasma_store_state *state = malloc(sizeof(plasma_store_state));
state->loop = loop;
state->objects = NULL;
state->objects_notify = NULL;
state->pending_notifications = NULL;
/* Initialize the plasma store info. */
state->plasma_store_info = malloc(sizeof(plasma_store_info));
state->plasma_store_info->objects = NULL;
/* Initialize the eviction state. */
state->eviction_state = make_eviction_state(system_memory);
return state;
}
@@ -135,25 +122,44 @@ void add_client_to_object_clients(object_table_entry *entry,
return;
}
}
/* If there are no other clients using this object, notify the eviction policy
* that the object is being used. */
if (utarray_len(entry->clients) == 0) {
/* Tell the eviction policy that this object is being used. */
int64_t num_objects_to_evict;
object_id *objects_to_evict;
begin_object_access(client_info->plasma_state->eviction_state,
client_info->plasma_state->plasma_store_info,
entry->object_id, &num_objects_to_evict,
&objects_to_evict);
remove_objects(client_info->plasma_state, num_objects_to_evict,
objects_to_evict);
}
/* Add the client pointer to the list of clients using this object. */
utarray_push_back(entry->clients, &client_info);
}
/* Create a new object buffer in the hash table. */
void create_object(client *client_context,
object_id object_id,
object_id obj_id,
int64_t data_size,
int64_t metadata_size,
plasma_object *result) {
LOG_DEBUG("creating object"); /* TODO(pcm): add object_id here */
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
/* TODO(swang): Return these error to the client instead of exiting. */
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id,
sizeof(object_id), entry);
CHECKM(entry == NULL, "Cannot create object twice.");
/* Tell the eviction policy how much space we need to create this object. */
int64_t num_objects_to_evict;
object_id *objects_to_evict;
require_space(plasma_state->eviction_state, plasma_state->plasma_store_info,
data_size + metadata_size, &num_objects_to_evict,
&objects_to_evict);
remove_objects(plasma_state, num_objects_to_evict, objects_to_evict);
/* Allocate space for the new object */
uint8_t *pointer = dlmalloc(data_size + metadata_size);
int fd;
int64_t map_size;
@@ -162,7 +168,7 @@ void create_object(client *client_context,
assert(fd != -1);
entry = malloc(sizeof(object_table_entry));
memcpy(&entry->object_id, &object_id, sizeof(object_id));
memcpy(&entry->object_id, &obj_id, sizeof(object_id));
entry->info.data_size = data_size;
entry->info.metadata_size = metadata_size;
entry->pointer = pointer;
@@ -172,13 +178,19 @@ void create_object(client *client_context,
entry->offset = offset;
entry->state = OPEN;
utarray_new(entry->clients, &client_icd);
HASH_ADD(handle, plasma_state->objects, object_id, sizeof(object_id), entry);
HASH_ADD(handle, plasma_state->plasma_store_info->objects, object_id,
sizeof(object_id), entry);
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
result->data_offset = offset;
result->metadata_offset = offset + data_size;
result->data_size = data_size;
result->metadata_size = metadata_size;
/* Notify the eviction policy that this object was created. This must be done
* immediately before the call to add_client_to_object_clients so that the
* eviction policy does not have an opportunity to evict the object. */
object_created(plasma_state->eviction_state, plasma_state->plasma_store_info,
obj_id);
/* Record that this client is using this object. */
add_client_to_object_clients(entry, client_context);
}
@@ -190,8 +202,8 @@ int get_object(client *client_context,
plasma_object *result) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
if (entry && entry->state == SEALED) {
result->handle.store_fd = entry->fd;
result->handle.mmap_size = entry->map_size;
@@ -229,6 +241,19 @@ int remove_client_from_object_clients(object_table_entry *entry,
if (*c == client_info) {
/* Remove the client from the array. */
utarray_erase(entry->clients, i, 1);
/* If no more clients are using this object, notify the eviction policy
* that the object is no longer being used. */
if (utarray_len(entry->clients) == 0) {
/* Tell the eviction policy that this object is no longer being used. */
int64_t num_objects_to_evict;
object_id *objects_to_evict;
end_object_access(client_info->plasma_state->eviction_state,
client_info->plasma_state->plasma_store_info,
entry->object_id, &num_objects_to_evict,
&objects_to_evict);
remove_objects(client_info->plasma_state, num_objects_to_evict,
objects_to_evict);
}
/* Return 1 to indicate that the client was removed. */
return 1;
}
@@ -240,8 +265,8 @@ int remove_client_from_object_clients(object_table_entry *entry,
void release_object(client *client_context, object_id object_id) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
CHECK(entry != NULL);
/* Remove the client from the object's array of clients. */
CHECK(remove_client_from_object_clients(entry, client_context) == 1);
@@ -251,8 +276,8 @@ void release_object(client *client_context, object_id object_id) {
int contains_object(client *client_context, object_id object_id) {
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
return entry && (entry->state == SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND;
}
@@ -261,9 +286,10 @@ void seal_object(client *client_context, object_id object_id) {
LOG_DEBUG("sealing object"); // TODO(pcm): add object_id here
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
CHECK(entry != NULL && entry->state == OPEN);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
CHECK(entry != NULL);
CHECK(entry->state == OPEN);
/* Set the state of object to SEALED. */
entry->state = SEALED;
/* Inform all subscribers that a new object has been sealed. */
@@ -302,28 +328,41 @@ void seal_object(client *client_context, object_id object_id) {
}
}
/* Delete an object that has been created in the hash table. */
void delete_object(client *client_context, object_id object_id) {
LOG_DEBUG("deleting object"); // TODO(rkn): add object_id here
plasma_store_state *plasma_state = client_context->plasma_state;
/* Delete an object that has been created in the hash table. This should only
* be called on objects that are returned by the eviction policy to evict. */
void delete_object(plasma_store_state *plasma_state, object_id object_id) {
LOG_DEBUG("deleting object");
object_table_entry *entry;
HASH_FIND(handle, plasma_state->objects, &object_id, sizeof(object_id),
entry);
HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id,
sizeof(object_id), entry);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been created
* but not sealed. */
CHECKM(entry != NULL, "To delete an object it must have been created.");
CHECKM(entry != NULL, "To delete an object it must be in the object table.");
CHECKM(entry->state == SEALED,
"To delete an object it must have been sealed.");
CHECKM(utarray_len(entry->clients) == 0,
"To delete an object, there must be no clients currently using it.");
uint8_t *pointer = entry->pointer;
HASH_DELETE(handle, plasma_state->objects, entry);
HASH_DELETE(handle, plasma_state->plasma_store_info->objects, entry);
dlfree(pointer);
utarray_free(entry->clients);
free(entry);
}
void remove_objects(plasma_store_state *plasma_state,
int64_t num_objects_to_evict,
object_id *objects_to_evict) {
if (num_objects_to_evict > 0) {
for (int i = 0; i < num_objects_to_evict; ++i) {
delete_object(plasma_state, objects_to_evict[i]);
}
/* Free the array of objects to evict. This array was originally allocated
* by the eviction policy. */
free(objects_to_evict);
}
}
/* Send more notifications to a subscriber. */
void send_notifications(event_loop *loop,
int client_sock,
@@ -373,7 +412,7 @@ void subscribe_to_updates(client *client_context, int conn) {
plasma_store_state *plasma_state = client_context->plasma_state;
char dummy;
int fd = recv_fd(conn, &dummy, 1);
CHECKM(HASH_CNT(handle, plasma_state->objects) == 0,
CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0,
"plasma_subscribe should be called before any objects are created.");
/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
@@ -425,8 +464,23 @@ void process_message(event_loop *loop,
seal_object(client_context, req->object_ids[0]);
break;
case PLASMA_DELETE:
delete_object(client_context, req->object_ids[0]);
/* TODO(rkn): In the future, we can use this method to give hints to the
* eviction policy about when an object will no longer be needed. */
break;
case PLASMA_EVICT: {
/* This code path should only be used for testing. */
int64_t num_objects_to_evict;
object_id *objects_to_evict;
int64_t num_bytes_evicted = choose_objects_to_evict(
client_context->plasma_state->eviction_state,
client_context->plasma_state->plasma_store_info, req->num_bytes,
&num_objects_to_evict, &objects_to_evict);
remove_objects(client_context->plasma_state, num_objects_to_evict,
objects_to_evict);
reply.num_bytes = num_bytes_evicted;
plasma_send_reply(client_sock, &reply);
break;
}
case PLASMA_SUBSCRIBE:
subscribe_to_updates(client_context, client_sock);
break;
@@ -437,7 +491,8 @@ void process_message(event_loop *loop,
* lists. */
plasma_store_state *plasma_state = client_context->plasma_state;
object_table_entry *entry, *temp_entry;
HASH_ITER(handle, plasma_state->objects, entry, temp_entry) {
HASH_ITER(handle, plasma_state->plasma_store_info->objects, entry,
temp_entry) {
remove_client_from_object_clients(entry, client_context);
}
} break;
@@ -473,9 +528,9 @@ void signal_handler(int signal) {
}
}
void start_server(char *socket_name) {
void start_server(char *socket_name, int64_t system_memory) {
event_loop *loop = event_loop_create();
plasma_store_state *state = init_plasma_store(loop);
plasma_store_state *state = init_plasma_store(loop, system_memory);
int socket = bind_ipc_sock(socket_name, true);
CHECK(socket >= 0);
event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection,
@@ -486,12 +541,21 @@ void start_server(char *socket_name) {
int main(int argc, char *argv[]) {
signal(SIGTERM, signal_handler);
char *socket_name = NULL;
int64_t system_memory = -1;
int c;
while ((c = getopt(argc, argv, "s:")) != -1) {
while ((c = getopt(argc, argv, "s:m:")) != -1) {
switch (c) {
case 's':
socket_name = optarg;
break;
case 'm': {
char extra;
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
CHECK(scanned == 1);
LOG_INFO("Allowing the Plasma store to use up to %.2fGB of memory.",
((double) system_memory) / 1000000000);
break;
}
default:
exit(-1);
}
@@ -500,6 +564,10 @@ int main(int argc, char *argv[]) {
LOG_ERR("please specify socket for incoming connections with -s switch");
exit(-1);
}
if (system_memory == -1) {
LOG_ERR("please specify the amount of system memory with -m switch");
exit(-1);
}
LOG_DEBUG("starting server listening on %s", socket_name);
start_server(socket_name);
start_server(socket_name, system_memory);
}
+4 -9
View File
@@ -68,15 +68,6 @@ void seal_object(client *client_context, object_id object_id);
*/
int contains_object(client *client_context, object_id object_id);
/**
* Delete an object from the plasma store:
*
* @param client_context The context of the client making this request.
* @param object_id Object ID of the object to be deleted.
* @return Void.
*/
void delete_object(client *client_context, object_id object_id);
/**
* Send notifications about sealed objects to the subscribers. This is called
* in seal_object. If the socket's send buffer is full, the notification will be
@@ -94,4 +85,8 @@ void send_notifications(event_loop *loop,
void *plasma_state,
int events);
void remove_objects(plasma_store_state *plasma_state,
int64_t num_objects_to_evict,
object_id *objects_to_evict);
#endif /* PLASMA_STORE_H */
+42 -3
View File
@@ -15,6 +15,7 @@ import threading
import plasma
USE_VALGRIND = False
PLASMA_STORE_MEMORY = 1000000000
def random_object_id():
return "".join([chr(random.randint(0, 255)) for _ in range(plasma.PLASMA_ID_SIZE)])
@@ -64,7 +65,7 @@ class TestPlasmaClient(unittest.TestCase):
# Start Plasma.
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store")
store_name = "/tmp/store{}".format(random.randint(0, 10000))
command = [plasma_store_executable, "-s", store_name]
command = [plasma_store_executable, "-s", store_name, "-m", str(PLASMA_STORE_MEMORY)]
if USE_VALGRIND:
self.p = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + command)
time.sleep(2.0)
@@ -197,6 +198,44 @@ class TestPlasmaClient(unittest.TestCase):
memory_buffer[0] = chr(0)
self.assertRaises(Exception, illegal_assignment)
def test_evict(self):
object_id1 = random_object_id()
b1 = self.plasma_client.create(object_id1, 1000)
self.plasma_client.seal(object_id1)
del b1
self.assertEqual(self.plasma_client.evict(1), 1000)
object_id2 = random_object_id()
object_id3 = random_object_id()
b2 = self.plasma_client.create(object_id2, 999)
b3 = self.plasma_client.create(object_id3, 998)
del b3
self.plasma_client.seal(object_id3)
self.assertEqual(self.plasma_client.evict(1000), 998)
object_id4 = random_object_id()
b4 = self.plasma_client.create(object_id4, 997)
self.plasma_client.seal(object_id4)
del b4
self.plasma_client.seal(object_id2)
del b2
self.assertEqual(self.plasma_client.evict(1), 997)
self.assertEqual(self.plasma_client.evict(1), 999)
object_id5 = random_object_id()
object_id6 = random_object_id()
object_id7 = random_object_id()
b5 = self.plasma_client.create(object_id5, 996)
b6 = self.plasma_client.create(object_id6, 995)
b7 = self.plasma_client.create(object_id7, 994)
self.plasma_client.seal(object_id5)
self.plasma_client.seal(object_id6)
self.plasma_client.seal(object_id7)
del b5
del b6
del b7
self.assertEqual(self.plasma_client.evict(2000), 996 + 995 + 994)
def test_subscribe(self):
# Subscribe to notifications from the Plasma Store.
sock = self.plasma_client.subscribe()
@@ -220,8 +259,8 @@ class TestPlasmaManager(unittest.TestCase):
store_name2 = "/tmp/store{}".format(random.randint(0, 10000))
manager_name1 = "/tmp/manager{}".format(random.randint(0, 10000))
manager_name2 = "/tmp/manager{}".format(random.randint(0, 10000))
plasma_store_command1 = [plasma_store_executable, "-s", store_name1]
plasma_store_command2 = [plasma_store_executable, "-s", store_name2]
plasma_store_command1 = [plasma_store_executable, "-s", store_name1, "-m", str(PLASMA_STORE_MEMORY)]
plasma_store_command2 = [plasma_store_executable, "-s", store_name2, "-m", str(PLASMA_STORE_MEMORY)]
if USE_VALGRIND:
self.p2 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_store_command1)