From 7134b4c1b196c5e58890802e558777cb575c7257 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Sat, 24 Mar 2012 11:56:18 -0400 Subject: [PATCH 1/3] Support for different flavors of ZeroMQ. --- zipline/component.py | 52 +++++++++++++++++++++----- zipline/lines.py | 5 ++- zipline/messaging.py | 15 ++++---- zipline/simulator.py | 4 +- zipline/zmq_utils.py | 87 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 142 insertions(+), 21 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 119138e3..d33ae44b 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -9,8 +9,16 @@ import sys import uuid import time import socket +import gevent import humanhash +# pyzmq +import zmq +# gevent_zeromq +import gevent_zeromq +# zmq_ctypes +#import zmq_ctypes + from datetime import datetime import zipline.util as qutil @@ -61,8 +69,8 @@ class Component(object): self.zmq = None self.context = None self.addresses = None + self.out_socket = None - self.gevent_needed = False self.killed = False self.controller = None self.heartbeat_timeout = 2000 @@ -126,27 +134,51 @@ class Component(object): def do_work(self): raise NotImplementedError + def init_zmq(self, flavor): + """ + ZMQ in all flavors. Have it your way. + + mp - Distinct contexts | pyzmq + thread - Same context | pyzmq + green - Same context | gevent_zeromq + pypy - Same context | zmq_ctypes + + """ + + if flavor == 'mp': + self.zmq = zmq + self.context = self.zmq.Context() + return + if flavor == 'thread': + self.zmq = zmq + self.context = self.zmq.Context.instance() + return + if flavor == 'green': + self.zmq = gevent_zeromq.zmq + self.context = self.zmq.Context.instance() + return + if flavor == 'pypy': + self.zmq = zmq + self.context = self.zmq.Context.instance() + return + + import pdb; pdb.set_trace() + raise Exception("Unknown ZeroMQ Flavor") + def _run(self): self.start_tic = time.time() self.done = False # TODO: use state flag self.sockets = [] - if self.gevent_needed: - qutil.LOGGER.info("Loading gevent specific zmq for {id}".format(id=self.get_id)) - import gevent_zeromq - self.zmq = gevent_zeromq.zmq - else: - import zmq - self.zmq = zmq - - self.context = self.zmq.Context.instance() + self.init_zmq(self.zmq_flavor) self.setup_poller() self.open() self.setup_sync() self.setup_control() + self.loop() self.shutdown() diff --git a/zipline/lines.py b/zipline/lines.py index 211a4f39..50f3b787 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -309,7 +309,8 @@ class SimulatedTrading(object): self.sim_context.join() def shutdown(self): - self.allocator.reaquire(*self.leased_sockets) + pass + #self.allocator.reaquire(*self.leased_sockets) #-------------------------------- # Component property accessors @@ -333,4 +334,4 @@ class ZiplineException(Exception): return "Unexpected exception {line}: {msg}".format( line=self.name, msg=self.message - ) \ No newline at end of file + ) diff --git a/zipline/messaging.py b/zipline/messaging.py index 12724023..1b7ffbdf 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -16,15 +16,17 @@ class ComponentHost(Component): start, and then wait for all components to be finished. """ - def __init__(self, addresses, gevent_needed=False): + def __init__(self, addresses): Component.__init__(self) self.addresses = addresses - self.gevent_needed = gevent_needed self.running = False self.init() def init(self): + assert hasattr(self, 'zmq_flavor'), \ + """ You must specify a flavor of ZeroMQ for all + ComponentHost subclasses. """ # Component Registry, keyed by get_id # ---------------------- @@ -67,10 +69,12 @@ class ComponentHost(Component): assert isinstance(components, list) for component in components: - component.gevent_needed = self.gevent_needed component.addresses = self.addresses component.controller = self.controller + # Hosts share their zmq flavor with hosted components + component.zmq_flavor = self.zmq_flavor + self._components[component.guid] = component self.components[component.get_id] = component self.sync_register[component.get_id] = datetime.datetime.utcnow() @@ -97,13 +101,8 @@ class ComponentHost(Component): self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) - # There is a namespace collision between three classes - # which use the self.poller property to mean different - # things. - # ===================================================== self.sync_poller = self.zmq.Poller() self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) - # ===================================================== self.sockets.append(self.sync_socket) diff --git a/zipline/simulator.py b/zipline/simulator.py index 99185c7c..8d6480b5 100644 --- a/zipline/simulator.py +++ b/zipline/simulator.py @@ -26,9 +26,11 @@ class AddressAllocator(object): def reaquire(self, *conn): pass -# + class Simulator(ComponentHost): + zmq_flavor = 'thread' + def __init__(self, addresses): ComponentHost.__init__(self, addresses) self.subthreads = [] diff --git a/zipline/zmq_utils.py b/zipline/zmq_utils.py index e13a5e34..eeb04411 100644 --- a/zipline/zmq_utils.py +++ b/zipline/zmq_utils.py @@ -1,7 +1,94 @@ +""" +Misc ZeroMQ utilities. +""" import gevent from gevent_zeromq import zmq +from contextlib import closing + +class ZmqTimeout(object): + def __init__(self, socket): + self._socket = socket + + def __enter__(self): + pass + + def __exit__(self): + self._socket.close() + +class ZmqDone(object): + + def __init__(self, socket, frame): + self.ident = socket.identity + self.frame = str(frame) + + def __str__(self): + return 'Socket ( %s ) finished with frame ( %s )' % \ + ( self.ident, self.frame ) + +class zs(object): + """ + A wrapper for the *very* common pattern of reading from a + upstream socket until you get a DONE or EXCEPTION frame. + + # Eliminates all the boilerplate serialization logic + # and error handling cases into 3 lines. + + halts = (ERROR_FRAME, CLOSE_FRAME) + stream = zs(socket, halts) + + stream.on_error(YouFailAtFailing) + + for msg in stream: + print msg + + """ + + def __init__(self, socket, halts, srl=msgpack): + self._socket = socket + self.exc_case = halts[0] + self.done_case = halts[1] + + self.loads = srl.loads + self.halt_method = 'exception' + self.exception = ZmqDone + self.function = None + + def __iter__(self): + self.last = msg = self.loads(self._socket.recv()) + + if msg == self.exc_case: + return self.halt() + + if msg == self.done_case: + raise StopIteration + + yield msg + + def last(self): + return self.last + + def halt(self): + if self.halt_method == 'exception': + raise self.exception + elif self.halt_method == 'function': + return self.function() + + def on_error(self, callee): + + if isinstance(callee, Exception): + self.halt_method = 'exception' + self.exception = callee + else: + self.halt_method = 'function' + self.function = callee + def ZmqConsole(sock_typ, socket_addr, sock_conn=None, context=None): + """ + A utility to drop into a ZeroMQ pdb console and inspect + messages as they come through. If you just want to pipe to + stdout, don't use this. + """ context = context or zmq.Context.instance() socket = context.socket(zmq.PULL) From efcda0f585122e7b9f6de0890a2a7322d07fad6b Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Sat, 24 Mar 2012 12:35:06 -0400 Subject: [PATCH 2/3] Gevent driven zmq poller, per sage advice of benoitc. --- zipline/component.py | 17 ++++---- zipline/gpoll.py | 99 ++++++++++++++++++++++++++++++++++++++++++++ zipline/messaging.py | 2 +- 3 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 zipline/gpoll.py diff --git a/zipline/component.py b/zipline/component.py index d33ae44b..cfdc8781 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -22,9 +22,11 @@ import gevent_zeromq from datetime import datetime import zipline.util as qutil +from zipline.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ COMPONENT_FAILURE, BACKTEST_STATE + class Component(object): """ Base class for components. Defines the the base messaging @@ -148,21 +150,24 @@ class Component(object): if flavor == 'mp': self.zmq = zmq self.context = self.zmq.Context() + self.zmq_poller = self.zmq.Poller return if flavor == 'thread': self.zmq = zmq self.context = self.zmq.Context.instance() + self.zmq_poller = self.zmq.Poller return if flavor == 'green': self.zmq = gevent_zeromq.zmq self.context = self.zmq.Context.instance() + self.zmq_poller = GeventPoller return if flavor == 'pypy': self.zmq = zmq self.context = self.zmq.Context.instance() + self.zmq_poller = self.zmq.Poller return - import pdb; pdb.set_trace() raise Exception("Unknown ZeroMQ Flavor") def _run(self): @@ -335,7 +340,9 @@ class Component(object): handling sockets. """ - self.poll = self.zmq.Poller() + # Initializes the poller class specified by the flavor of + # ZeroMQ. Either zmq.Poller or gpoll.Poller . + self.poll = self.zmq_poller() def receive_sync_ack(self): """ @@ -437,11 +444,7 @@ class Component(object): self.sync_socket.connect(self.addresses['sync_address']) #self.sync_socket.setsockopt(self.zmq.LINGER,0) - # Explictly a different poller for obvious reasons. - # I'm not fond of having this poller init'd as a side - # effect of a method call. Still thinking about where to - # put it at the moment though... - self.sync_poller = self.zmq.Poller() + self.sync_poller = self.zmq_poller() self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) self.sockets.append(self.sync_socket) diff --git a/zipline/gpoll.py b/zipline/gpoll.py new file mode 100644 index 00000000..8c18d4cf --- /dev/null +++ b/zipline/gpoll.py @@ -0,0 +1,99 @@ +""" +This is somewhat legally ambigious, since it technically +hasn't been merged in gevent_zeromq but given that the +author issued it as a Pull Request on a MIT project, +indicates that its probably fine to use. ~Steve +""" + +import zmq +from zmq import * + +from zmq.core.poll import Poller as _original_Poller + +import gevent +from gevent import select +from gevent_zeromq.core import _Socket + +def patch_poller(self): + zmq.Poller = _Poller + +class _Poller(_original_Poller): + """ + Replacement for :class:`zmq.core.Poller` + + Ensures that the greened Poller below is used in calls + to :meth:`zmq.core.Poller.poll`. + """ + + def _get_descriptors(self): + """ + Returns three elements tuple with socket descriptors ready for + gevent.select + """ + rlist = [] + wlist = [] + xlist = [] + + for socket, flags in self.sockets.items(): + if isinstance(socket, _Socket): + fd = socket.getsockopt(FD) + elif isinstance(socket, int): + fd = socket + elif hasattr(socket, 'fileno'): + try: + fd = int(socket.fileno()) + except: + raise ValueError('fileno() must return an valid integer fd') + else: + raise TypeError("Socket must be a 0MQ socket, an integer fd or \ + have a fileno() method: %r" % socket) + + if flags & POLLIN: rlist.append(fd) + if flags & POLLOUT: wlist.append(fd) + if flags & POLLERR: xlist.append(fd) + + return (rlist, wlist, xlist) + + def poll(self, timeout=-1): + """Overridden method to ensure that the green version of Poller is used + + Behaves the same as :meth:`zmq.core.Poller.poll` + """ + + if timeout is None: + timeout = -1 + + timeout = int(timeout) + if timeout < 0: + timeout = -1 + + rlist = None + wlist = None + xlist = None + + if timeout > 0: + tout = gevent.Timeout.start_new(timeout/1000.0) + + try: + # Loop until timeout or events available + while True: + events = super(_Poller, self).poll(0) + if events or timeout == 0: + return events + + # wait for activity on sockets in a green way + if not rlist and not wlist and not xlist: + rlist, wlist, xlist = self._get_descriptors() + + try: + select.select(rlist, wlist, xlist) + except gevent.select.error, ex: + raise ZMQError(*ex.args) + + except gevent.Timeout, t: + if t is not tout: + raise + return [] + finally: + if timeout > 0: + tout.cancel() diff --git a/zipline/messaging.py b/zipline/messaging.py index 1b7ffbdf..5e6ff570 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -101,7 +101,7 @@ class ComponentHost(Component): self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) - self.sync_poller = self.zmq.Poller() + self.sync_poller = self.zmq_poller() self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) self.sockets.append(self.sync_socket) From d6f2f757c811edac2eb83035917ac4d374cd4bbf Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 26 Mar 2012 13:37:34 -0400 Subject: [PATCH 3/3] Tweak zmq utils. --- zipline/zmq_utils.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/zipline/zmq_utils.py b/zipline/zmq_utils.py index eeb04411..e32931dc 100644 --- a/zipline/zmq_utils.py +++ b/zipline/zmq_utils.py @@ -2,21 +2,12 @@ Misc ZeroMQ utilities. """ import gevent +import msgpack from gevent_zeromq import zmq from contextlib import closing -class ZmqTimeout(object): - def __init__(self, socket): - self._socket = socket - - def __enter__(self): - pass - - def __exit__(self): - self._socket.close() - -class ZmqDone(object): +class ZmqDone(Exception): def __init__(self, socket, frame): self.ident = socket.identity