diff --git a/Makefile b/Makefile index a91249059..c36d99e02 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -I. -Icommon -Icommon/thirdparty BUILD = build -all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example +all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example $(BUILD)/libplasma_client.a debug: FORCE debug: CFLAGS += -DDEBUG=1 @@ -21,6 +21,9 @@ $(BUILD)/plasma_manager: src/plasma_manager.c src/plasma.h src/plasma_client.c s $(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c common $(CC) $(CFLAGS) src/plasma_client.c src/fling.c common/build/libcommon.a -fPIC -shared -o $(BUILD)/plasma_client.so +$(BUILD)/libplasma_client.a: src/plasma_client.o src/fling.o + ar rcs $@ $^ + $(BUILD)/example: src/plasma_client.c src/plasma.h src/example.c src/fling.h src/fling.c common $(CC) $(CFLAGS) src/plasma_client.c src/example.c src/fling.c common/build/libcommon.a -o $(BUILD)/example diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 37fa8f72c..347715118 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -1,17 +1,19 @@ import os import socket import ctypes +import time Addr = ctypes.c_ubyte * 4 -ID = ctypes.c_ubyte * 20 +PLASMA_ID_SIZE = 20 +ID = ctypes.c_ubyte * PLASMA_ID_SIZE class PlasmaID(ctypes.Structure): _fields_ = [("plasma_id", ID)] def make_plasma_id(string): - if len(string) != 20: - raise Exception("PlasmaIDs must be 20 characters long") + if len(string) != PLASMA_ID_SIZE: + raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE)) object_id = map(ord, string) return PlasmaID(plasma_id=ID(*object_id)) @@ -46,6 +48,7 @@ class PlasmaClient(object): self.client.plasma_contains.restype = None self.client.plasma_seal.restype = None self.client.plasma_delete.restype = None + self.client.plasma_subscribe.restype = ctypes.c_int self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] @@ -161,3 +164,25 @@ class PlasmaClient(object): if self.manager_conn == -1: raise Exception("Not connected to the plasma manager socket") self.client.plasma_transfer(self.manager_conn, addr, port, make_plasma_id(object_id)) + + def subscribe(self): + """Subscribe to notifications about sealed objects.""" + fd = self.client.plasma_subscribe(self.store_conn) + self.notification_sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) + # Make the socket non-blocking. + self.notification_sock.setblocking(0) + + def get_next_notification(self): + """Get the next notification from the notification socket.""" + if not self.notification_sock: + raise Exception("To get notifications, first call subscribe.") + # Loop until we've read PLASMA_ID_SIZE bytes from the socket. + while True: + try: + message_data = self.notification_sock.recv(PLASMA_ID_SIZE) + except socket.error: + time.sleep(0.001) + else: + assert len(message_data) == PLASMA_ID_SIZE + break + return message_data diff --git a/src/plasma.h b/src/plasma.h index 6a39deb6c..8b6852f65 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -52,6 +52,8 @@ enum plasma_message_type { PLASMA_SEAL, /** Delete an object. */ PLASMA_DELETE, + /** Subscribe to notifications about sealed objects. */ + PLASMA_SUBSCRIBE, /** Request transfer to another store. */ PLASMA_TRANSFER, /** Header for sending data. */ diff --git a/src/plasma_client.c b/src/plasma_client.c index 43c448e40..6c4ee4f1d 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -1,6 +1,7 @@ /* PLASMA CLIENT: Client library for using the plasma store and manager */ #include +#include #include #include #include @@ -151,6 +152,25 @@ void plasma_delete(plasma_store_conn *conn, object_id object_id) { plasma_send_request(conn->conn, PLASMA_DELETE, &req); } +int plasma_subscribe(plasma_store_conn *conn) { + int fd[2]; + /* Create a non-blocking socket pair. This will only be used to send + * notifications from the Plasma store to the client. */ + socketpair(AF_UNIX, SOCK_STREAM, 0, fd); + /* Make the socket non-blocking. */ + int flags = fcntl(fd[1], F_GETFL, 0); + CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0); + /* Tell the Plasma store about the subscription. */ + plasma_request req = {}; + plasma_send_request(conn->conn, PLASMA_SUBSCRIBE, &req); + /* Send the file descriptor that the Plasma store should use to push + * notifications about sealed objects to this client. */ + send_fd(conn->conn, fd[1], NULL, 0); + /* Return the file descriptor that the client should use to read notifications + * about sealed objects. */ + return fd[0]; +} + plasma_store_conn *plasma_store_connect(const char *socket_name) { assert(socket_name); /* Try to connect to the Plasma store. If unsuccessful, retry several times. diff --git a/src/plasma_client.h b/src/plasma_client.h index 44af5a1f0..36ecb1061 100644 --- a/src/plasma_client.h +++ b/src/plasma_client.h @@ -1,6 +1,8 @@ #ifndef PLASMA_CLIENT_H #define PLASMA_CLIENT_H +#include "plasma.h" + typedef struct plasma_store_conn plasma_store_conn; /** @@ -124,4 +126,15 @@ void plasma_seal(plasma_store_conn *conn, object_id object_id); */ void plasma_delete(plasma_store_conn *conn, object_id object_id); +/** + * Subscribe to notifications when objects are sealed in the object store. + * Whenever an object is sealed, a message will be written to the client socket + * that is returned by this method. + * + * @param conn The object containing the connection state. + * @return The file descriptor that the client should use to read notifications + from the object store about sealed objects. + */ +int plasma_subscribe(plasma_store_conn *conn); + #endif diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 921d328ee..c7bdc7627 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -5,6 +5,7 @@ * transfering an object to another object store comes in, it ships the data * using a new connection to the target object manager. */ +#include #include #include #include @@ -12,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -319,13 +319,10 @@ void start_server(const char *store_socket_name, name.sin_family = AF_INET; name.sin_port = htons(port); name.sin_addr.s_addr = htonl(INADDR_ANY); + /* Make the socket non-blocking. */ + int flags = fcntl(sock, F_GETFL, 0); + CHECK(fcntl(sock, F_SETFL, flags | O_NONBLOCK) == 0); int on = 1; - /* TODO(pcm): http://stackoverflow.com/q/1150635 */ - if (ioctl(sock, FIONBIO, (char *) &on) < 0) { - LOG_ERR("ioctl failed"); - close(sock); - exit(-1); - } setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); if (bind(sock, (struct sockaddr *) &name, sizeof(name)) < 0) { LOG_ERR("could not bind socket"); diff --git a/src/plasma_store.c b/src/plasma_store.c index ebb3bac0b..9419d9dd2 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +72,20 @@ typedef struct { UT_hash_handle handle; } object_notify_entry; +/* This is used to define the array of object IDs used to define the + * notification_queue type. */ +UT_icd object_id_icd = {sizeof(object_id), NULL, NULL, NULL}; + +typedef struct { + /** Client file descriptor. This is used as a key for the hash table. */ + int subscriber_fd; + /** The object IDs to notify the client about. We notify the client about the + * IDs in the order that the objects were sealed. */ + UT_array *object_ids; + /** Handle for the uthash table. */ + UT_hash_handle hh; +} notification_queue; + struct plasma_store_state { /* Event loop of the plasma store. */ event_loop *loop; @@ -81,6 +96,10 @@ struct plasma_store_state { object_table_entry *sealed_objects; /* Objects that processes are waiting for. */ object_notify_entry *objects_notify; + /** The pending notifications that have not been sent to subscribers because + * the socket send buffers were full. This is a hash table from client file + * descriptor to an array of object_ids to send to that client. */ + notification_queue *pending_notifications; }; plasma_store_state *init_plasma_store(event_loop *loop) { @@ -89,6 +108,7 @@ plasma_store_state *init_plasma_store(event_loop *loop) { state->open_objects = NULL; state->sealed_objects = NULL; state->objects_notify = NULL; + state->pending_notifications = NULL; return state; } @@ -182,7 +202,15 @@ void seal_object(plasma_store_state *s, } HASH_DELETE(handle, s->open_objects, entry); HASH_ADD(handle, s->sealed_objects, object_id, sizeof(object_id), entry); - /* Inform processes that the object is ready now. */ + + /* Inform all subscribers that a new object has been sealed. */ + notification_queue *queue, *temp_queue; + HASH_ITER(hh, s->pending_notifications, queue, temp_queue) { + utarray_push_back(queue->object_ids, &object_id); + send_notifications(s->loop, queue->subscriber_fd, s, 0); + } + + /* Inform processes getting this object that the object is ready now. */ object_notify_entry *notify_entry; HASH_FIND(handle, s->objects_notify, &object_id, sizeof(object_id), notify_entry); @@ -216,6 +244,62 @@ void delete_object(plasma_store_state *s, object_id object_id) { free(entry); } +/* Send more notifications to a subscriber. */ +void send_notifications(event_loop *loop, + int client_sock, + void *context, + int events) { + plasma_store_state *s = context; + + notification_queue *queue; + HASH_FIND_INT(s->pending_notifications, &client_sock, queue); + CHECK(queue != NULL); + + int num_processed = 0; + /* Loop over the array of pending notifications and send as many of them as + * possible. */ + for (object_id *obj_id = (object_id *) utarray_front(queue->object_ids); + obj_id != NULL; + obj_id = (object_id *) utarray_next(queue->object_ids, obj_id)) { + /* Attempt to send a notification about this object ID. */ + int nbytes = send(client_sock, obj_id, sizeof(object_id), 0); + if (nbytes >= 0) { + CHECK(nbytes == sizeof(object_id)); + } else if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + LOG_DEBUG( + "The socket's send buffer is full, so we are caching this " + "notification and will send it later."); + break; + } else { + CHECKM(0, "This code should be unreachable."); + } + num_processed += 1; + } + /* Remove the sent notifications from the array. */ + utarray_erase(queue->object_ids, 0, num_processed); +} + +/* Subscribe to notifications about sealed objects. */ +void subscribe_to_updates(plasma_store_state *s, int conn) { + LOG_DEBUG("subscribing to updates"); + int fd = recv_fd(conn, NULL, 0); + CHECKM(HASH_CNT(handle, s->open_objects) == 0, + "plasma_subscribe should be called before any objects are created."); + CHECKM(HASH_CNT(handle, s->sealed_objects) == 0, + "plasma_subscribe should be called before any objects are created."); + /* Create a new array to buffer notifications that can't be sent to the + * subscriber yet because the socket send buffer is full. TODO(rkn): the queue + * never gets freed. */ + notification_queue *queue = + (notification_queue *) malloc(sizeof(notification_queue)); + queue->subscriber_fd = fd; + utarray_new(queue->object_ids, &object_id_icd); + HASH_ADD_INT(s->pending_notifications, subscriber_fd, queue); + /* Add a callback to the event loop to send queued notifications whenever + * there is room in the socket's send buffer. */ + event_loop_add_file(s->loop, fd, EVENT_LOOP_WRITE, send_notifications, s); +} + void process_message(event_loop *loop, int client_sock, void *context, @@ -263,6 +347,9 @@ void process_message(event_loop *loop, case PLASMA_DELETE: delete_object(s, req->object_id); break; + case PLASMA_SUBSCRIBE: + subscribe_to_updates(s, client_sock); + break; case DISCONNECT_CLIENT: { LOG_DEBUG("Disconnecting client on fd %d", client_sock); event_loop_remove_file(loop, client_sock); diff --git a/src/plasma_store.h b/src/plasma_store.h index 0f4bfd82a..dd5e963fb 100644 --- a/src/plasma_store.h +++ b/src/plasma_store.h @@ -65,4 +65,21 @@ int contains_object(plasma_store_state *s, object_id object_id); */ void delete_object(plasma_store_state *s, object_id object_id); +/** + * Send notifications about sealed objects to the subscribers. This is called + * in seal_object. If the socket's send buffer is full, the notification will be + * buffered, and this will be called again when the send buffer has room. + * + * @param loop The Plasma store event loop. + * @param client_sock The file descriptor to send the notification to. + * @param context The plasma store global state. + * @param events This is needed for this function to have the signature of a + callback. + * @return Void. + */ +void send_notifications(event_loop *loop, + int client_sock, + void *context, + int events); + #endif /* PLASMA_STORE_H */ diff --git a/test/test.py b/test/test.py index 75c072c65..4e08ed67e 100644 --- a/test/test.py +++ b/test/test.py @@ -3,6 +3,7 @@ from __future__ import print_function import os import signal import socket +import struct import subprocess import sys import unittest @@ -15,7 +16,7 @@ import plasma USE_VALGRIND = False def random_object_id(): - return "".join([chr(random.randint(0, 255)) for _ in range(20)]) + return "".join([chr(random.randint(0, 255)) for _ in range(plasma.PLASMA_ID_SIZE)]) def generate_metadata(length): metadata = length * ["\x00"] @@ -181,6 +182,20 @@ class TestPlasmaClient(unittest.TestCase): memory_buffer[0] = chr(0) self.assertRaises(Exception, illegal_assignment) + def test_subscribe(self): + # Subscribe to notifications from the Plasma Store. + sock = self.plasma_client.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + for object_id in object_ids: + # Create an object and seal it to trigger a notification. + self.plasma_client.create(object_id, 1000) + self.plasma_client.seal(object_id) + # Check that we received notifications for all of the objects. + for object_id in object_ids: + message_data = self.plasma_client.get_next_notification() + self.assertEqual(object_id, message_data) + class TestPlasmaManager(unittest.TestCase): def setUp(self):