Documention tweaks.

This commit is contained in:
Stephen Diehl
2012-02-27 12:14:45 -05:00
parent 40f6c17be7
commit 0de735e816
5 changed files with 60 additions and 32 deletions
+1 -1
View File
@@ -6,7 +6,7 @@
Contents:
.. toctree::
:maxdepth: 2
:maxdepth: 4
notes.rst
modules.rst
+46 -21
View File
@@ -1,6 +1,9 @@
"""
Commonly used messaging components.
Contains the base class for all components.
"""
import os
import uuid
import socket
@@ -10,28 +13,46 @@ import zipline.util as qutil
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
class Component(object):
"""
Base class for components. Defines the the base messaging
interface between components.
:param addresses: a dict of name_string -> zmq port address strings.
Must have the following entries
:param sync_address: socket address used for synchronizing the start of
all workers, heartbeating, and exit notification
will be used in REP/REQ sockets. Bind is always on
the REP side.
:param data_address: socket address used for data sources to stream
their records. Will be used in PUSH/PULL sockets
between data sources and a ParallelBuffer (aka
the Feed). Bind will always be on the PULL side
(we always have N producers and 1 consumer)
:param feed_address: socket address used to publish consolidated feed
from serialization of data sources
will be used in PUB/SUB sockets between Feed and
Transforms. Bind is always on the PUB side.
:param merge_address: socket address used to publish transformed
values. will be used in PUSH/PULL from many
transforms to one MergedParallelBuffer (aka the
Merge). Bind will always be on the PULL side (we
always have N producers and 1 consumer)
:param result_address: socket address used to publish merged data
source feed and transforms to clients will be
used in PUB/SUB from one Merge to one or many
clients. Bind is always on the PUB side.
bind/connect methods will return the correct socket type for each
address.
"""
def __init__(self):
"""
:addresses: a dict of name_string -> zmq port address strings. Must have the following entries::
- sync_address: socket address used for synchronizing the start of all workers, heartbeating, and exit notification
will be used in REP/REQ sockets. Bind is always on the REP side.
- data_address: socket address used for data sources to stream their records.
will be used in PUSH/PULL sockets between data sources and a ParallelBuffer (aka the Feed). Bind
will always be on the PULL side (we always have N producers and 1 consumer)
- feed_address: socket address used to publish consolidated feed from serialization of data sources
will be used in PUB/SUB sockets between Feed and Transforms. Bind is always on the PUB side.
- merge_address: socket address used to publish transformed values.
will be used in PUSH/PULL from many transforms to one MergedParallelBuffer (aka the Merge). Bind
will always be on the PULL side (we always have N producers and 1 consumer)
- result_address: socket address used to publish merged data source feed and transforms to clients
will be used in PUB/SUB from one Merge to one or many clients. Bind is always on the PUB side.
Bind/Connect methods will return the correct socket type for each address. Any sockets on which recv is expected to be called
will also return a Poller.
"""
self.zmq = None
self.context = None
self.addresses = None
@@ -73,14 +94,18 @@ class Component(object):
def destroy(self):
"""
Clean shutdown.
Tear down after normal operation.
"""
pass
def kill(self):
"""
Unclean shutdown.
Tear down ( fast ) as a mode of failure in the
simulation.
simulation or on service halt.
"""
raise NotImplementedError
+11 -8
View File
@@ -2,7 +2,7 @@ import zmq
class Controller(object):
"""
A N to N messaging system for inter component communication.
A N to M messaging system for inter component communication.
Ostensibly a broker of sorts. Putting messages to the broker
is durable, if the broker goes down messages will queue up
until the HWM and then go out when the new broker comes up.
@@ -17,9 +17,9 @@ class Controller(object):
:param pull_socket: Socket to subscribe to for republication of
published messages. The endpoint for
:func message_sender:.
:func message_sender: .
:param pub_socket: Socket to publish messages, the starting
point of :func message_listener:.
point of :func message_listener: .
:param logging: Logging interface for tracking broker state
Defaults to None
@@ -43,12 +43,12 @@ class Controller(object):
"""
polling = False
debug = False
def __init__(self, pull_socket, pub_socket, logging = None):
self._ctx = None
polling = False
self.associated = []
@@ -78,10 +78,10 @@ class Controller(object):
else:
self._ctx = context
if debug:
return self._poll_fast() # the c loop
else:
return self._poll() # use a python loop
#if not debug:
#return self._poll_fast() # the c loop
#else:
return self._poll() # use a python loop
def _poll_fast(self):
"""
@@ -90,6 +90,9 @@ class Controller(object):
self.pull = self._ctx.socket(zmq.PULL)
self.pub = self._ctx.socket(zmq.PUB)
self.pull.bind(self.pull_socket)
self.pub.bind(self.pub_socket)
zmq.device(zmq.FORWARDER, self.pull, self.pub)
def _poll(self):
+1 -1
View File
@@ -78,7 +78,7 @@ class namedict(object):
-- or --
foo['BAR']
For more complex strcuts use collections.namedtuple:
For more complex structs use collections.namedtuple:
"""
def __init__(self, dct):
+1 -1
View File
@@ -31,7 +31,7 @@ def recv_zipped_pickle(socket, flags=0, protocol=-1):
"""
z = socket.recv(flags)
p = zlib.uncompress(z)
return pickle.loads(p)
return pickle.loads(p, protocol=protocol)
# HDF5, Numpy Byte Strings, Pandas arrays should use
# blosc and reconstruct the Python container from the byte string