diff --git a/docs/index.rst b/docs/index.rst index fa1afbf1..281835b2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,7 +6,7 @@ Contents: .. toctree:: - :maxdepth: 2 + :maxdepth: 4 notes.rst modules.rst diff --git a/zipline/component.py b/zipline/component.py index f7b42512..782653f0 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -1,6 +1,9 @@ """ Commonly used messaging components. + +Contains the base class for all components. """ + import os import uuid import socket @@ -10,28 +13,46 @@ import zipline.util as qutil from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE class Component(object): + """ + Base class for components. Defines the the base messaging + interface between components. + + :param addresses: a dict of name_string -> zmq port address strings. + Must have the following entries + + :param sync_address: socket address used for synchronizing the start of + all workers, heartbeating, and exit notification + will be used in REP/REQ sockets. Bind is always on + the REP side. + + :param data_address: socket address used for data sources to stream + their records. Will be used in PUSH/PULL sockets + between data sources and a ParallelBuffer (aka + the Feed). Bind will always be on the PULL side + (we always have N producers and 1 consumer) + + :param feed_address: socket address used to publish consolidated feed + from serialization of data sources + will be used in PUB/SUB sockets between Feed and + Transforms. Bind is always on the PUB side. + + :param merge_address: socket address used to publish transformed + values. will be used in PUSH/PULL from many + transforms to one MergedParallelBuffer (aka the + Merge). Bind will always be on the PULL side (we + always have N producers and 1 consumer) + + :param result_address: socket address used to publish merged data + source feed and transforms to clients will be + used in PUB/SUB from one Merge to one or many + clients. Bind is always on the PUB side. + + bind/connect methods will return the correct socket type for each + address. + + """ def __init__(self): - """ - :addresses: a dict of name_string -> zmq port address strings. Must have the following entries:: - - - sync_address: socket address used for synchronizing the start of all workers, heartbeating, and exit notification - will be used in REP/REQ sockets. Bind is always on the REP side. - - data_address: socket address used for data sources to stream their records. - will be used in PUSH/PULL sockets between data sources and a ParallelBuffer (aka the Feed). Bind - will always be on the PULL side (we always have N producers and 1 consumer) - - feed_address: socket address used to publish consolidated feed from serialization of data sources - will be used in PUB/SUB sockets between Feed and Transforms. Bind is always on the PUB side. - - merge_address: socket address used to publish transformed values. - will be used in PUSH/PULL from many transforms to one MergedParallelBuffer (aka the Merge). Bind - will always be on the PULL side (we always have N producers and 1 consumer) - - result_address: socket address used to publish merged data source feed and transforms to clients - will be used in PUB/SUB from one Merge to one or many clients. Bind is always on the PUB side. - - Bind/Connect methods will return the correct socket type for each address. Any sockets on which recv is expected to be called - will also return a Poller. - - """ self.zmq = None self.context = None self.addresses = None @@ -73,14 +94,18 @@ class Component(object): def destroy(self): """ + Clean shutdown. + Tear down after normal operation. """ pass def kill(self): """ + Unclean shutdown. + Tear down ( fast ) as a mode of failure in the - simulation. + simulation or on service halt. """ raise NotImplementedError diff --git a/zipline/monitor.py b/zipline/monitor.py index 4d208c0d..a0cd0b08 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -2,7 +2,7 @@ import zmq class Controller(object): """ - A N to N messaging system for inter component communication. + A N to M messaging system for inter component communication. Ostensibly a broker of sorts. Putting messages to the broker is durable, if the broker goes down messages will queue up until the HWM and then go out when the new broker comes up. @@ -17,9 +17,9 @@ class Controller(object): :param pull_socket: Socket to subscribe to for republication of published messages. The endpoint for - :func message_sender:. + :func message_sender: . :param pub_socket: Socket to publish messages, the starting - point of :func message_listener:. + point of :func message_listener: . :param logging: Logging interface for tracking broker state Defaults to None @@ -43,12 +43,12 @@ class Controller(object): """ - polling = False debug = False def __init__(self, pull_socket, pub_socket, logging = None): self._ctx = None + polling = False self.associated = [] @@ -78,10 +78,10 @@ class Controller(object): else: self._ctx = context - if debug: - return self._poll_fast() # the c loop - else: - return self._poll() # use a python loop + #if not debug: + #return self._poll_fast() # the c loop + #else: + return self._poll() # use a python loop def _poll_fast(self): """ @@ -90,6 +90,9 @@ class Controller(object): self.pull = self._ctx.socket(zmq.PULL) self.pub = self._ctx.socket(zmq.PUB) + self.pull.bind(self.pull_socket) + self.pub.bind(self.pub_socket) + zmq.device(zmq.FORWARDER, self.pull, self.pub) def _poll(self): diff --git a/zipline/protocol.py b/zipline/protocol.py index d1d21728..b13c5ae3 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -78,7 +78,7 @@ class namedict(object): -- or -- foo['BAR'] - For more complex strcuts use collections.namedtuple: + For more complex structs use collections.namedtuple: """ def __init__(self, dct): diff --git a/zipline/serial.py b/zipline/serial.py index 518dfe47..b0eba920 100644 --- a/zipline/serial.py +++ b/zipline/serial.py @@ -31,7 +31,7 @@ def recv_zipped_pickle(socket, flags=0, protocol=-1): """ z = socket.recv(flags) p = zlib.uncompress(z) - return pickle.loads(p) + return pickle.loads(p, protocol=protocol) # HDF5, Numpy Byte Strings, Pandas arrays should use # blosc and reconstruct the Python container from the byte string