mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 02:59:52 +08:00
Allow clients to subscribe to notifications about sealed objects. (#40)
* Create libplasma_client.a in Makefile. * Implement plasma subscribe. * Fixes and tests. * Buffer notifications in the Plasma store when the socket send buffer is full. * Fix formatting. * Turn off -Werror in Makefile. * Fixes. * Fix formatting.
This commit is contained in:
committed by
Philipp Moritz
parent
1adafee6d3
commit
5ad8e145ae
+28
-3
@@ -1,17 +1,19 @@
|
||||
import os
|
||||
import socket
|
||||
import ctypes
|
||||
import time
|
||||
|
||||
Addr = ctypes.c_ubyte * 4
|
||||
|
||||
ID = ctypes.c_ubyte * 20
|
||||
PLASMA_ID_SIZE = 20
|
||||
ID = ctypes.c_ubyte * PLASMA_ID_SIZE
|
||||
|
||||
class PlasmaID(ctypes.Structure):
|
||||
_fields_ = [("plasma_id", ID)]
|
||||
|
||||
def make_plasma_id(string):
|
||||
if len(string) != 20:
|
||||
raise Exception("PlasmaIDs must be 20 characters long")
|
||||
if len(string) != PLASMA_ID_SIZE:
|
||||
raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE))
|
||||
object_id = map(ord, string)
|
||||
return PlasmaID(plasma_id=ID(*object_id))
|
||||
|
||||
@@ -46,6 +48,7 @@ class PlasmaClient(object):
|
||||
self.client.plasma_contains.restype = None
|
||||
self.client.plasma_seal.restype = None
|
||||
self.client.plasma_delete.restype = None
|
||||
self.client.plasma_subscribe.restype = ctypes.c_int
|
||||
|
||||
self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory
|
||||
self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
|
||||
@@ -161,3 +164,25 @@ class PlasmaClient(object):
|
||||
if self.manager_conn == -1:
|
||||
raise Exception("Not connected to the plasma manager socket")
|
||||
self.client.plasma_transfer(self.manager_conn, addr, port, make_plasma_id(object_id))
|
||||
|
||||
def subscribe(self):
|
||||
"""Subscribe to notifications about sealed objects."""
|
||||
fd = self.client.plasma_subscribe(self.store_conn)
|
||||
self.notification_sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
# Make the socket non-blocking.
|
||||
self.notification_sock.setblocking(0)
|
||||
|
||||
def get_next_notification(self):
|
||||
"""Get the next notification from the notification socket."""
|
||||
if not self.notification_sock:
|
||||
raise Exception("To get notifications, first call subscribe.")
|
||||
# Loop until we've read PLASMA_ID_SIZE bytes from the socket.
|
||||
while True:
|
||||
try:
|
||||
message_data = self.notification_sock.recv(PLASMA_ID_SIZE)
|
||||
except socket.error:
|
||||
time.sleep(0.001)
|
||||
else:
|
||||
assert len(message_data) == PLASMA_ID_SIZE
|
||||
break
|
||||
return message_data
|
||||
|
||||
Reference in New Issue
Block a user