mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-03 12:17:50 +08:00
+51
-16
@@ -9,14 +9,24 @@ import sys
|
||||
import uuid
|
||||
import time
|
||||
import socket
|
||||
import gevent
|
||||
import humanhash
|
||||
|
||||
# pyzmq
|
||||
import zmq
|
||||
# gevent_zeromq
|
||||
import gevent_zeromq
|
||||
# zmq_ctypes
|
||||
#import zmq_ctypes
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import zipline.util as qutil
|
||||
from zipline.gpoll import _Poller as GeventPoller
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
|
||||
COMPONENT_FAILURE, BACKTEST_STATE
|
||||
|
||||
|
||||
class Component(object):
|
||||
"""
|
||||
Base class for components. Defines the the base messaging
|
||||
@@ -61,8 +71,8 @@ class Component(object):
|
||||
self.zmq = None
|
||||
self.context = None
|
||||
self.addresses = None
|
||||
|
||||
self.out_socket = None
|
||||
self.gevent_needed = False
|
||||
self.killed = False
|
||||
self.controller = None
|
||||
self.heartbeat_timeout = 2000
|
||||
@@ -126,27 +136,54 @@ class Component(object):
|
||||
def do_work(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def init_zmq(self, flavor):
|
||||
"""
|
||||
ZMQ in all flavors. Have it your way.
|
||||
|
||||
mp - Distinct contexts | pyzmq
|
||||
thread - Same context | pyzmq
|
||||
green - Same context | gevent_zeromq
|
||||
pypy - Same context | zmq_ctypes
|
||||
|
||||
"""
|
||||
|
||||
if flavor == 'mp':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
if flavor == 'thread':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
if flavor == 'green':
|
||||
self.zmq = gevent_zeromq.zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = GeventPoller
|
||||
return
|
||||
if flavor == 'pypy':
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
return
|
||||
|
||||
raise Exception("Unknown ZeroMQ Flavor")
|
||||
|
||||
def _run(self):
|
||||
self.start_tic = time.time()
|
||||
|
||||
self.done = False # TODO: use state flag
|
||||
self.sockets = []
|
||||
|
||||
if self.gevent_needed:
|
||||
qutil.LOGGER.info("Loading gevent specific zmq for {id}".format(id=self.get_id))
|
||||
import gevent_zeromq
|
||||
self.zmq = gevent_zeromq.zmq
|
||||
else:
|
||||
import zmq
|
||||
self.zmq = zmq
|
||||
|
||||
self.context = self.zmq.Context.instance()
|
||||
self.init_zmq(self.zmq_flavor)
|
||||
|
||||
self.setup_poller()
|
||||
|
||||
self.open()
|
||||
self.setup_sync()
|
||||
self.setup_control()
|
||||
|
||||
self.loop()
|
||||
self.shutdown()
|
||||
|
||||
@@ -303,7 +340,9 @@ class Component(object):
|
||||
handling sockets.
|
||||
"""
|
||||
|
||||
self.poll = self.zmq.Poller()
|
||||
# Initializes the poller class specified by the flavor of
|
||||
# ZeroMQ. Either zmq.Poller or gpoll.Poller .
|
||||
self.poll = self.zmq_poller()
|
||||
|
||||
def receive_sync_ack(self):
|
||||
"""
|
||||
@@ -405,11 +444,7 @@ class Component(object):
|
||||
self.sync_socket.connect(self.addresses['sync_address'])
|
||||
#self.sync_socket.setsockopt(self.zmq.LINGER,0)
|
||||
|
||||
# Explictly a different poller for obvious reasons.
|
||||
# I'm not fond of having this poller init'd as a side
|
||||
# effect of a method call. Still thinking about where to
|
||||
# put it at the moment though...
|
||||
self.sync_poller = self.zmq.Poller()
|
||||
self.sync_poller = self.zmq_poller()
|
||||
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
"""
|
||||
This is somewhat legally ambigious, since it technically
|
||||
hasn't been merged in gevent_zeromq but given that the
|
||||
author issued it as a Pull Request on a MIT project,
|
||||
indicates that its probably fine to use. ~Steve
|
||||
"""
|
||||
|
||||
import zmq
|
||||
from zmq import *
|
||||
|
||||
from zmq.core.poll import Poller as _original_Poller
|
||||
|
||||
import gevent
|
||||
from gevent import select
|
||||
from gevent_zeromq.core import _Socket
|
||||
|
||||
def patch_poller(self):
|
||||
zmq.Poller = _Poller
|
||||
|
||||
class _Poller(_original_Poller):
|
||||
"""
|
||||
Replacement for :class:`zmq.core.Poller`
|
||||
|
||||
Ensures that the greened Poller below is used in calls
|
||||
to :meth:`zmq.core.Poller.poll`.
|
||||
"""
|
||||
|
||||
def _get_descriptors(self):
|
||||
"""
|
||||
Returns three elements tuple with socket descriptors ready for
|
||||
gevent.select
|
||||
"""
|
||||
rlist = []
|
||||
wlist = []
|
||||
xlist = []
|
||||
|
||||
for socket, flags in self.sockets.items():
|
||||
if isinstance(socket, _Socket):
|
||||
fd = socket.getsockopt(FD)
|
||||
elif isinstance(socket, int):
|
||||
fd = socket
|
||||
elif hasattr(socket, 'fileno'):
|
||||
try:
|
||||
fd = int(socket.fileno())
|
||||
except:
|
||||
raise ValueError('fileno() must return an valid integer fd')
|
||||
else:
|
||||
raise TypeError("Socket must be a 0MQ socket, an integer fd or \
|
||||
have a fileno() method: %r" % socket)
|
||||
|
||||
if flags & POLLIN: rlist.append(fd)
|
||||
if flags & POLLOUT: wlist.append(fd)
|
||||
if flags & POLLERR: xlist.append(fd)
|
||||
|
||||
return (rlist, wlist, xlist)
|
||||
|
||||
def poll(self, timeout=-1):
|
||||
"""Overridden method to ensure that the green version of Poller is used
|
||||
|
||||
Behaves the same as :meth:`zmq.core.Poller.poll`
|
||||
"""
|
||||
|
||||
if timeout is None:
|
||||
timeout = -1
|
||||
|
||||
timeout = int(timeout)
|
||||
if timeout < 0:
|
||||
timeout = -1
|
||||
|
||||
rlist = None
|
||||
wlist = None
|
||||
xlist = None
|
||||
|
||||
if timeout > 0:
|
||||
tout = gevent.Timeout.start_new(timeout/1000.0)
|
||||
|
||||
try:
|
||||
# Loop until timeout or events available
|
||||
while True:
|
||||
events = super(_Poller, self).poll(0)
|
||||
if events or timeout == 0:
|
||||
return events
|
||||
|
||||
# wait for activity on sockets in a green way
|
||||
if not rlist and not wlist and not xlist:
|
||||
rlist, wlist, xlist = self._get_descriptors()
|
||||
|
||||
try:
|
||||
select.select(rlist, wlist, xlist)
|
||||
except gevent.select.error, ex:
|
||||
raise ZMQError(*ex.args)
|
||||
|
||||
except gevent.Timeout, t:
|
||||
if t is not tout:
|
||||
raise
|
||||
return []
|
||||
finally:
|
||||
if timeout > 0:
|
||||
tout.cancel()
|
||||
+3
-2
@@ -309,7 +309,8 @@ class SimulatedTrading(object):
|
||||
self.sim_context.join()
|
||||
|
||||
def shutdown(self):
|
||||
self.allocator.reaquire(*self.leased_sockets)
|
||||
pass
|
||||
#self.allocator.reaquire(*self.leased_sockets)
|
||||
|
||||
#--------------------------------
|
||||
# Component property accessors
|
||||
@@ -333,4 +334,4 @@ class ZiplineException(Exception):
|
||||
return "Unexpected exception {line}: {msg}".format(
|
||||
line=self.name,
|
||||
msg=self.message
|
||||
)
|
||||
)
|
||||
|
||||
@@ -16,15 +16,17 @@ class ComponentHost(Component):
|
||||
start, and then wait for all components to be finished.
|
||||
"""
|
||||
|
||||
def __init__(self, addresses, gevent_needed=False):
|
||||
def __init__(self, addresses):
|
||||
Component.__init__(self)
|
||||
self.addresses = addresses
|
||||
self.gevent_needed = gevent_needed
|
||||
self.running = False
|
||||
|
||||
self.init()
|
||||
|
||||
def init(self):
|
||||
assert hasattr(self, 'zmq_flavor'), \
|
||||
""" You must specify a flavor of ZeroMQ for all
|
||||
ComponentHost subclasses. """
|
||||
|
||||
# Component Registry, keyed by get_id
|
||||
# ----------------------
|
||||
@@ -67,10 +69,12 @@ class ComponentHost(Component):
|
||||
assert isinstance(components, list)
|
||||
for component in components:
|
||||
|
||||
component.gevent_needed = self.gevent_needed
|
||||
component.addresses = self.addresses
|
||||
component.controller = self.controller
|
||||
|
||||
# Hosts share their zmq flavor with hosted components
|
||||
component.zmq_flavor = self.zmq_flavor
|
||||
|
||||
self._components[component.guid] = component
|
||||
self.components[component.get_id] = component
|
||||
self.sync_register[component.get_id] = datetime.datetime.utcnow()
|
||||
@@ -97,13 +101,8 @@ class ComponentHost(Component):
|
||||
self.sync_socket = self.context.socket(self.zmq.REP)
|
||||
self.sync_socket.bind(self.addresses['sync_address'])
|
||||
|
||||
# There is a namespace collision between three classes
|
||||
# which use the self.poller property to mean different
|
||||
# things.
|
||||
# =====================================================
|
||||
self.sync_poller = self.zmq.Poller()
|
||||
self.sync_poller = self.zmq_poller()
|
||||
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
|
||||
# =====================================================
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
|
||||
@@ -26,9 +26,11 @@ class AddressAllocator(object):
|
||||
def reaquire(self, *conn):
|
||||
pass
|
||||
|
||||
#
|
||||
|
||||
class Simulator(ComponentHost):
|
||||
|
||||
zmq_flavor = 'thread'
|
||||
|
||||
def __init__(self, addresses):
|
||||
ComponentHost.__init__(self, addresses)
|
||||
self.subthreads = []
|
||||
|
||||
@@ -1,7 +1,85 @@
|
||||
"""
|
||||
Misc ZeroMQ utilities.
|
||||
"""
|
||||
import gevent
|
||||
import msgpack
|
||||
from gevent_zeromq import zmq
|
||||
|
||||
from contextlib import closing
|
||||
|
||||
class ZmqDone(Exception):
|
||||
|
||||
def __init__(self, socket, frame):
|
||||
self.ident = socket.identity
|
||||
self.frame = str(frame)
|
||||
|
||||
def __str__(self):
|
||||
return 'Socket ( %s ) finished with frame ( %s )' % \
|
||||
( self.ident, self.frame )
|
||||
|
||||
class zs(object):
|
||||
"""
|
||||
A wrapper for the *very* common pattern of reading from a
|
||||
upstream socket until you get a DONE or EXCEPTION frame.
|
||||
|
||||
# Eliminates all the boilerplate serialization logic
|
||||
# and error handling cases into 3 lines.
|
||||
|
||||
halts = (ERROR_FRAME, CLOSE_FRAME)
|
||||
stream = zs(socket, halts)
|
||||
|
||||
stream.on_error(YouFailAtFailing)
|
||||
|
||||
for msg in stream:
|
||||
print msg
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, socket, halts, srl=msgpack):
|
||||
self._socket = socket
|
||||
self.exc_case = halts[0]
|
||||
self.done_case = halts[1]
|
||||
|
||||
self.loads = srl.loads
|
||||
self.halt_method = 'exception'
|
||||
self.exception = ZmqDone
|
||||
self.function = None
|
||||
|
||||
def __iter__(self):
|
||||
self.last = msg = self.loads(self._socket.recv())
|
||||
|
||||
if msg == self.exc_case:
|
||||
return self.halt()
|
||||
|
||||
if msg == self.done_case:
|
||||
raise StopIteration
|
||||
|
||||
yield msg
|
||||
|
||||
def last(self):
|
||||
return self.last
|
||||
|
||||
def halt(self):
|
||||
if self.halt_method == 'exception':
|
||||
raise self.exception
|
||||
elif self.halt_method == 'function':
|
||||
return self.function()
|
||||
|
||||
def on_error(self, callee):
|
||||
|
||||
if isinstance(callee, Exception):
|
||||
self.halt_method = 'exception'
|
||||
self.exception = callee
|
||||
else:
|
||||
self.halt_method = 'function'
|
||||
self.function = callee
|
||||
|
||||
def ZmqConsole(sock_typ, socket_addr, sock_conn=None, context=None):
|
||||
"""
|
||||
A utility to drop into a ZeroMQ pdb console and inspect
|
||||
messages as they come through. If you just want to pipe to
|
||||
stdout, don't use this.
|
||||
"""
|
||||
|
||||
context = context or zmq.Context.instance()
|
||||
socket = context.socket(zmq.PULL)
|
||||
|
||||
Reference in New Issue
Block a user