exception handling code revised. gevent, pypy, and threadsim vestiges removed.

devel flag removed.
This commit is contained in:
fawce
2012-07-26 16:22:13 -04:00
parent 1f3a2cc3dc
commit abf9c8efa5
13 changed files with 233 additions and 321 deletions
-1
View File
@@ -1,5 +1,4 @@
import logging
from gevent_zeromq import zmq
import zipline.protocol as zp
from zipline.core.component import Component
+83 -21
View File
@@ -2,14 +2,17 @@ import zmq
from unittest2 import TestCase
from collections import defaultdict
from logbook.compat import LoggingHandler
from zipline.test_algorithms import ExceptionAlgorithm
from zipline.test_algorithms import ExceptionAlgorithm, NoopAlgorithm
from zipline.finance.trading import SIMULATION_STYLE
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
from zipline.utils.test_utils import drain_zipline, check
from zipline.utils.test_utils import \
drain_zipline, \
check, \
setup_logger, \
teardown_logger
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
@@ -30,67 +33,126 @@ class ExceptionTestCase(TestCase):
'simulation_style' : SIMULATION_STYLE.FIXED_SLIPPAGE
}
self.ctx = zmq.Context()
self.log_handler = LoggingHandler()
self.log_handler.push_application()
setup_logger(self)
def tearDown(self):
self.log_handler.pop_application()
self.ctx.term()
teardown_logger(self)
def test_exception_in_init(self):
# Simulation
# ----------
self.zipline_test_config['algorithm'] = \
ExceptionAlgorithm(
'initialize',
self.zipline_test_config['sid']
)
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
self.assertEqual(len(output), 1)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
#check(self, payload, INITIALIZE_TB)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
def test_exception_in_handle_data(self):
# Simulation
# ----------
self.zipline_test_config['algorithm'] = \
ExceptionAlgorithm('initialize')
ExceptionAlgorithm(
'handle_data',
self.zipline_test_config['sid']
)
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
self.assertEqual(len(output), 1)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
check(self, payload, INITIALIZE_STACK_TB)
#check(self, payload, HANDLE_DATA_TB)
import nose.tools; nose.tools.set_trace()
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
# TODO:
# - exception protocol to use prefix/payload as EXCEPT,
# and the stack trace
# - test exception in handle_data
# - define more zipline failure modes: exception in other
# components, exception in Monitor, etc. write tests
# for those scenarios.
INITIALIZE_STACK_TB =\
INITIALIZE_TB =\
[{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self._run()',
'lineno': 229,
'method': 'run'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self.open()',
'lineno': 208,
'line': 'self.loop()',
'lineno': 220,
'method': '_run'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self.do_work()',
'lineno': 257,
'method': 'loop'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'self.initialize_algo()',
'lineno': 73,
'method': 'open'},
'lineno': 100,
'method': 'do_work'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'self.do_op(self.algorithm.initialize)',
'lineno': 83,
'method': 'initialize_algo'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'callable_op(*args, **kwargs)',
'lineno': 205,
'lineno': 208,
'method': 'do_op'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/test_algorithms.py',
'line': 'raise Exception("Algo exception in initialize")',
'lineno': 161,
'lineno': 162,
'method': 'initialize'}]
HANDLE_DATA_TB =\
[{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self._run()',
'lineno': 229,
'method': 'run'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self.loop()',
'lineno': 220,
'method': '_run'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py',
'line': 'self.do_work()',
'lineno': 257,
'method': 'loop'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'self.process_event(event)',
'lineno': 119,
'method': 'do_work'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'self.run_algorithm()',
'lineno': 167,
'method': 'process_event'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'self.do_op(self.algorithm.handle_data, data)',
'lineno': 189,
'method': 'run_algorithm'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py',
'line': 'callable_op(*args, **kwargs)',
'lineno': 208,
'method': 'do_op'},
{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/test_algorithms.py',
'line': 'raise Exception("Algo exception in handle_data")',
'lineno': 183,
'method': 'handle_data'}]
+9 -6
View File
@@ -21,8 +21,13 @@ from zipline.lines import SimulatedTrading
from zipline.finance.performance import PerformanceTracker
from zipline.utils.protocol_utils import ndict
from zipline.finance.trading import TransactionSimulator, SIMULATION_STYLE
from zipline.utils.test_utils import assert_single_position,\
drain_zipline
from zipline.utils.test_utils import \
drain_zipline, \
check, \
setup_logger, \
teardown_logger,\
assert_single_position
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
@@ -42,11 +47,10 @@ class FinanceTestCase(TestCase):
}
self.ctx = zmq.Context()
self.log_handler = LoggingHandler()
self.log_handler.push_application()
setup_logger(self)
def tearDown(self):
self.log_handler.pop_application()
teardown_logger(self)
@timed(DEFAULT_TIMEOUT)
def test_factory_daily(self):
@@ -143,7 +147,6 @@ class FinanceTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, transaction_count = drain_zipline(self, zipline)
self.assertTrue(zipline.sim.ready())
-16
View File
@@ -1,4 +1,3 @@
import gevent
from logbook.compat import LoggingHandler
from unittest2 import TestCase, skip
@@ -25,18 +24,3 @@ class TestMonitor(TestCase):
con = Controller(pub_socket, route_socket, )
con.manage([ 'a', 'b', 'c', 'd' ])
@skip
def test_poll(self):
from mock_zmq import zmq_synthetic
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
cancel_socket = 'tcp://127.0.0.1:5002'
con = Controller(pub_socket, route_socket, cancel_socket)
con.manage([ 'a', 'b', 'c', 'd' ])
con.zmq = zmq_synthetic
con.zmq_flavor = 'green'
con.period = 0.00001
gevent.spawn(con.run).join(timeout=con.period)
+8 -18
View File
@@ -83,26 +83,16 @@ class Aggregate(Component):
self.drain()
self.signal_done()
else:
try:
event = self.unframe(message)
except zp.INVALID_DATASOURCE_FRAME as exc:
# Error deserializing
return self.signal_exception(exc)
event = self.unframe(message)
self.append(event)
try:
self.append(event)
if self.is_full():
event = self.next()
if self.is_full() or self.draining:
event = self.next()
if event:
self.send(event)
else:
pass
except zp.INVALID_DATASOURCE_FRAME as exc:
# Invalid message
return self.signal_exception(exc)
if event:
self.send(event)
else:
pass
# -------------
# Flow Control
+5 -4
View File
@@ -38,6 +38,7 @@ class TradeSimulationClient(Component):
self.perf = perf.PerformanceTracker(self.trading_environment)
self.zmq_out = None
self.results_socket = results_socket
self.algo_initialized = False
@property
def get_id(self):
@@ -56,9 +57,6 @@ class TradeSimulationClient(Component):
# initialize with all possible sids.
self.perf.set_sids(self.algorithm.get_sid_filter())
# N.B. Initialize is now called from open, because we
# need to have a socket open for logging.
def open(self):
self.result_feed = self.connect_result()
if self.results_socket:
@@ -70,7 +68,6 @@ class TradeSimulationClient(Component):
self.setup_logging(sock)
self.perf.publish_to(sock)
self.initialize_algo()
def initialize_algo(self):
""" Setup loggers for algorithm and run algorithm's own
@@ -81,6 +78,7 @@ class TradeSimulationClient(Component):
self.algorithm.set_logger(self.algo_log)
self.do_op(self.algorithm.initialize)
self.algo_initialized = True
def setup_logging(self, socket = None):
sock = socket or self.results_socket
@@ -95,6 +93,8 @@ class TradeSimulationClient(Component):
self.stdout_capture = stdout_only_pipe
def do_work(self):
if not self.algo_initialized:
self.initialize_algo()
# see if the poller has results for the result_feed
if self.socks.get(self.result_feed) == self.zmq.POLLIN:
@@ -187,6 +187,7 @@ class TradeSimulationClient(Component):
def exception_callback(self, exc_type, exc_value, exc_traceback):
if self.results_socket:
log.info("Sending exception frame")
msg = zp.EXCEPTION_FRAME(exc_traceback)
self.out_socket.send(msg)
+69 -63
View File
@@ -14,14 +14,9 @@ from setproctitle import setproctitle
# pyzmq
import zmq
# gevent_zeromq
import gevent_zeromq
# zmq_ctypes
#import zmq_ctypes
from zipline.core.monitor import PARAMETERS
from zipline.utils.gpoll import _Poller as GeventPoller
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
COMPONENT_FAILURE, CONTROL_FRAME, CONTROL_UNFRAME
@@ -29,6 +24,10 @@ log = logbook.Logger('Component')
from zipline.exceptions import ComponentNoInit
class KillSignal(Exception):
def __init__(self):
pass
class Component(object):
"""
@@ -154,35 +153,13 @@ class Component(object):
def do_work(self):
raise NotImplementedError
def init_zmq(self, flavor):
"""
ZMQ in all flavors. Have it your way.
mp - Distinct contexts | pyzmq
green - Same context | gevent_zeromq
pypy - Same context | zmq_ctypes
"""
if flavor == 'mp':
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
# The the process title so you can watch it in top
setproctitle(self.__class__.__name__)
return
if flavor == 'green':
self.zmq = gevent_zeromq.zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = GeventPoller
return
if flavor == 'pypy':
self.zmq = zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = self.zmq.Poller
return
raise Exception("Unknown ZeroMQ Flavor")
def init_zmq(self):
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
# The the process title so you can watch it in top
setproctitle(self.__class__.__name__)
return
def _run(self):
"""
@@ -200,7 +177,7 @@ class Component(object):
self.done = False # TODO: use state flag
self.sockets = []
self.init_zmq(self.zmq_flavor)
self.init_zmq()
self.setup_poller()
@@ -209,8 +186,6 @@ class Component(object):
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
@@ -228,14 +203,17 @@ class Component(object):
try:
self._run()
except Exception as exc:
exc_info = sys.exc_info()
self.signal_exception(exc)
if not isinstance(exc, KillSignal):
self.signal_exception(exc)
else:
# if we get a kill signal, forcibly close all the
# sockets.
# exc_info = sys.exc_info()
# self.relay_exception(exc_info[0], exc_info[1], exc_info[2])
self.teardown_sockets()
# Reraise the exception
raise exc_info[0], exc_info[1], exc_info[2]
finally:
self.shutdown()
self.teardown_sockets()
log.info("Exiting %r" % self)
def working(self):
@@ -311,7 +289,6 @@ class Component(object):
# controller that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# =========
# Hard Kill
@@ -336,7 +313,7 @@ class Component(object):
# Echo back the heartbeat identifier to tell the
# controller that this component is still alive and
# doing work
self.control_out.send(heartbeat_frame)
self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK)
self.last_ping = pre_pong
elif self.last_ping and \
time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
@@ -353,6 +330,7 @@ class Component(object):
Close all zmq sockets safely. This is universal, no matter where
this is running it will need the sockets closed.
"""
log.warn("{id} closing all sockets".format(id=self.get_id))
#close all the sockets
for sock in self.sockets:
sock.close()
@@ -364,6 +342,7 @@ class Component(object):
Tear down after normal operation.
"""
if self.on_done:
log.warn("{id} calling done.".format(id=self.get_id))
self.on_done()
def kill(self):
@@ -373,7 +352,8 @@ class Component(object):
Tear down ( fast ) as a mode of failure in the simulation or on
service halt.
"""
sys.exit(1)
# sys.exit(1)
raise KillSignal()
# ----------------------
# Internal Maintenance
@@ -404,7 +384,7 @@ class Component(object):
start_wait = time.time()
while self.waiting:
socks = dict(self.poll.poll(100))
socks = dict(self.poll.poll(0))
assert self.control_in, \
'Component does not have a control_in socket'
@@ -444,9 +424,7 @@ class Component(object):
# data that are done during a clean shutdown. Inform the
# controller that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
break
# =========
@@ -492,11 +470,13 @@ class Component(object):
def signal_exception(self, exc=None, scope=None):
"""
This is a *very* important error tracking handler.
All exceptions inside any component should boil back to
this handler.
Will inform the system that the component has failed and how it
has failed.
"""
if scope == 'algo':
self.error_state = COMPONENT_FAILURE.ALGOEXCEPT
else:
@@ -512,19 +492,47 @@ class Component(object):
exc_type, exc_value, exc_traceback = sys.exc_info()
trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
sys.stdout.write(trace)
# if a downstream component fails, this component may try
# sending when there are zero connections to the socket,
# which will raise ZMQError(EAGAIN). So, it doesn't make
# sense to relay this exception to Monitor and the rest
# of the zipline.
if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN:
log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\
.format(id=self.get_id))
return
if hasattr(self, 'exception_callback') and self.exception_callback:
self.exception_callback(exc_type, exc_value, exc_traceback)
# sys.stdout.write(trace)
log.exception("Unexpected error in run for {id}.".format(id=self.get_id))
self.relay_exception(exc_type, exc_value, exc_traceback)
if hasattr(self, 'control_out') and self.control_out:
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame)
try:
log.info('{id} sending exception to controller'.format(id=self.get_id))
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
)
self.control_out.send(exception_frame, self.zmq.NOBLOCK)
# The controller should relay the exception back
# to all zipline components. Wait here until the
# notice arrives, and we can assume other zipline
# components have broken out of their message
# loops.
for i in xrange(100):
self.heartbeat(timeout=1000)
log.warn("{id} Never heard back from monitor."\
.format(id=self.get_id))
except:
log.exception("Exception waiting for controller reply")
def relay_exception(self, exc_type, exc_value, exc_traceback):
if hasattr(self, 'exception_callback') and self.exception_callback:
log.info('{id} making exception callback'.format(id=self.get_id))
self.exception_callback(exc_type, exc_value, exc_traceback)
#LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
def signal_done(self):
"""
@@ -532,6 +540,8 @@ class Component(object):
"""
self.state_flag = COMPONENT_STATE.DONE
# notify internal work loop that we're done
self.done = True # TODO: use state flag
if hasattr(self, 'out_socket') and self.out_socket:
msg = zmq.Message(str(CONTROL_PROTOCOL.DONE))
@@ -554,8 +564,7 @@ class Component(object):
# last heartbeat, and wait an unusually long time.
self.heartbeat(timeout=5000)
# notify internal work look that we're done
self.done = True # TODO: use state flag
# -----------
@@ -567,9 +576,6 @@ class Component(object):
Setup the poller used for multiplexing the incoming data
handling sockets.
"""
# Initializes the poller class specified by the flavor of
# ZeroMQ. Either zmq.Poller or gpoll.Poller .
self.poll = self.zmq_poller()
def bind_data(self):
+36 -73
View File
@@ -1,17 +1,15 @@
import inspect
import os
import zmq
import sys
import time
import gevent
import itertools
import logbook
import gevent_zeromq
from setproctitle import setproctitle
from signal import SIGHUP, SIGINT
from collections import OrderedDict, Counter
from zipline.utils.gpoll import _Poller as GeventPoller
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
@@ -71,9 +69,8 @@ class Controller(object):
debug = True
period = PARAMETERS.GENERATIONAL_PERIOD
def __init__(self, pub_socket, route_socket, devel=True):
def __init__(self, pub_socket, route_socket):
self.devel = devel
self.nosignals = False
self.context = None
self.zmq = None
@@ -100,34 +97,17 @@ class Controller(object):
self.missed_beats = Counter()
log.warn("Running Controller in development mode, will ONLY synchronize start.")
# if we are inside a test, we want to skip signalling
# back to the parent process.
self.inside_test = 'nose' in inspect.stack()[-1][1]
def init_zmq(self, flavor):
assert self.zmq_flavor in ['thread', 'mp', 'green']
if flavor == 'mp':
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
if self.devel:
log.warning("USING DEVELOPMENT MODE IN MP CONTEXT NOT RECOMMENDED")
return
if flavor == 'thread':
self.zmq = zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = self.zmq.Poller
return
if flavor == 'green':
self.zmq = gevent_zeromq.zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = GeventPoller
return
if flavor == 'pypy':
self.zmq = zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = self.zmq.Poller
return
def init_zmq(self):
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
return
def manage(self, topology):
"""
@@ -170,8 +150,8 @@ class Controller(object):
# -----------------------
# The last breathe of the interpreter will assume that we've
# failed unless we specify otherwise.
if not self.devel:
sys.exitfunc = self.signal_interrupt
log.info('registering exit function')
sys.exitfunc = self.signal_interrupt
# We overload this if ( and only if ) the topology exits
# cleanly. This prevents failure modes where the monitor
# dies.
@@ -349,31 +329,17 @@ class Controller(object):
if complete:
self.send_go()
# If we're running in development stop here
# because our responsibilites are over. The
# zipline will either run to completion or die,
# monitor doesn't care anymore because its all
# threads.
if self.devel:
log.warn("Shutting down Controller because in devel mode")
#sys.exitfunc = lambda: None
self.shutdown(soft=True)
log.info('Heartbeat (%s, %s)' % (done, complete))
# ================
# Exit Strategies
# ================
if self.zmq_flavor == 'green':
gevent.sleep(0)
# Will also fall out of loop when done, if using
# non-freeform topology
if done:
log.info('Entire topology exited cleanly')
self.shutdown(soft=True)
self.shutdown()
# Noop exit func
#sys.exitfunc = lambda: None
@@ -391,12 +357,12 @@ class Controller(object):
we're good. The topology exited cleanly and we can prove
it.
"""
if not self.nosignals:
ppid = os.getppid()
log.warning("Sending SIGHUP")
os.kill(ppid, SIGHUP)
else:
log.warning("Would SIGHUP here, but disabled")
if self.inside_test:
log.warning("Skipping SIGHUP because we're in a nosetest")
return
ppid = os.getppid()
log.warning("Sending SIGHUP")
os.kill(ppid, SIGHUP)
def signal_interrupt(self):
"""
@@ -404,11 +370,8 @@ class Controller(object):
interpreter exits. If the monitor dies the system is
considered a failure.
"""
if not self.nosignals:
ppid = os.getpid()
os.kill(ppid, SIGINT)
else:
log.warning("Would SIGINT here, but disabled")
ppid = os.getpid()
os.kill(ppid, SIGINT)
def beat(self):
"""
@@ -495,7 +458,7 @@ class Controller(object):
def fail_universal(self):
# TODO: this requires higher order functionality
log.error('System in exception state, shutting down')
self.shutdown(soft=True)
self.shutdown()
def fail(self, component):
if self.state is CONTROL_STATES.TERMINATE:
@@ -527,7 +490,7 @@ class Controller(object):
Shutdown the system on failure.
"""
log.error('System in exception state, shutting down')
self.shutdown(hard=True, soft=False)
self.kill()
def exception(self, component, failure):
universal = self.exception_universal
@@ -536,7 +499,6 @@ class Controller(object):
if component in self.topology or self.freeform:
self.error_replay[(component, time.time())] = failure
log.error('Component in exception state: %s' % component)
log.error(str(failure))
exception_handlers.get(component, universal)()
else:
@@ -642,21 +604,22 @@ class Controller(object):
for (component, time), error in self.error_replay.iteritems():
log.info('Component Log for -- %s --:\n%s' % (component, error))
def shutdown(self, hard=False, soft=True):
def kill(self):
if self.state is CONTROL_STATES.TERMINATE:
return
assert hard or soft, """ Must specify kill hard or soft """
log.info('Hard Shutdown')
self.send_hardkill()
self.state = CONTROL_STATES.TERMINATE
self.alive = False
def shutdown(self):
if self.state is CONTROL_STATES.TERMINATE:
return
log.info('Soft Shutdown')
self.send_softkill()
self.state = CONTROL_STATES.TERMINATE
self.alive = False
if hard and not self.devel:
self.state = CONTROL_STATES.TERMINATE
log.info('Hard Shutdown')
self.send_hardkill()
if soft and not self.devel:
self.state = CONTROL_STATES.TERMINATE
log.info('Soft Shutdown')
self.send_softkill()
-10
View File
@@ -115,7 +115,6 @@ class SimulatedTrading(object):
self.trading_environment = config['trading_environment']
self.sim_style = config.get('simulation_style')
self.devel = config.get('devel', False)
self.leased_sockets = []
self.sim_context = None
@@ -134,7 +133,6 @@ class SimulatedTrading(object):
self.con = Controller(
sockets[5],
sockets[6],
devel = self.devel
)
self.started = False
@@ -255,19 +253,11 @@ class SimulatedTrading(object):
'allocator' : allocator,
'simulation_style' : simulation_style,
'results_socket' : results_socket,
'devel' : config.get('devel', False)
})
#-------------------
zipline.add_source(trade_source)
# Save us from needless debugging
inside_test = 'nose' in inspect.stack()[-1][1]
if False and inside_test and not config.get('devel', False):
assert False, """
You need to run the SimulatedTrading inside a test with devel=True
"""
return zipline
def add_source(self, source):
+10 -8
View File
@@ -145,7 +145,7 @@ class NoopAlgorithm(object):
pass
def get_sid_filter(self):
return None
return []
class ExceptionAlgorithm(object):
"""
@@ -153,8 +153,9 @@ class ExceptionAlgorithm(object):
constructor.
"""
def __init__(self, throw_from):
def __init__(self, throw_from, sid):
self.throw_from = throw_from
self.sid = sid
def initialize(self):
if self.throw_from == "initialize":
@@ -187,12 +188,12 @@ class ExceptionAlgorithm(object):
if self.throw_from == "get_sid_filter":
raise Exception("Algo exception in get_sid_filter")
else:
return [1]
return [self.sid]
class TestPrintAlgorithm():
def __init__(self):
pass
def __init__(self, sid):
self.sid = sid
def initialize(self):
print "Initializing..."
@@ -211,12 +212,13 @@ class TestPrintAlgorithm():
pass
def get_sid_filter(self):
return [1]
return [self.sid]
class TestLoggingAlgorithm():
def __init__(self):
def __init__(self, sid):
self.log = None
self.sid = sid
def initialize(self):
self.log.info("Initializing...")
@@ -234,4 +236,4 @@ class TestLoggingAlgorithm():
self.log.info("Handling Data...")
def get_sid_filter(self):
return [1]
return [self.sid]
-99
View File
@@ -1,99 +0,0 @@
"""
This is somewhat legally ambigious, since it technically
hasn't been merged in gevent_zeromq but given that the
author issued it as a Pull Request on a MIT project,
indicates that its probably fine to use. ~Steve
"""
import zmq
from zmq import *
from zmq.core.poll import Poller as _original_Poller
import gevent
from gevent import select
from gevent_zeromq.core import _Socket
def patch_poller(self):
zmq.Poller = _Poller
class _Poller(_original_Poller):
"""
Replacement for :class:`zmq.core.Poller`
Ensures that the greened Poller below is used in calls
to :meth:`zmq.core.Poller.poll`.
"""
def _get_descriptors(self):
"""
Returns three elements tuple with socket descriptors ready for
gevent.select
"""
rlist = []
wlist = []
xlist = []
for socket, flags in self.sockets.items():
if isinstance(socket, _Socket):
fd = socket.getsockopt(FD)
elif isinstance(socket, int):
fd = socket
elif hasattr(socket, 'fileno'):
try:
fd = int(socket.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
raise TypeError("Socket must be a 0MQ socket, an integer fd or \
have a fileno() method: %r" % socket)
if flags & POLLIN: rlist.append(fd)
if flags & POLLOUT: wlist.append(fd)
if flags & POLLERR: xlist.append(fd)
return (rlist, wlist, xlist)
def poll(self, timeout=-1):
"""Overridden method to ensure that the green version of Poller is used
Behaves the same as :meth:`zmq.core.Poller.poll`
"""
if timeout is None:
timeout = -1
timeout = int(timeout)
if timeout < 0:
timeout = -1
rlist = None
wlist = None
xlist = None
if timeout > 0:
tout = gevent.Timeout.start_new(timeout/1000.0)
try:
# Loop until timeout or events available
while True:
events = super(_Poller, self).poll(0)
if events or timeout == 0:
return events
# wait for activity on sockets in a green way
if not rlist and not wlist and not xlist:
rlist, wlist, xlist = self._get_descriptors()
try:
select.select(rlist, wlist, xlist)
except gevent.select.error, ex:
raise ZMQError(*ex.args)
except gevent.Timeout, t:
if t is not tout:
raise
return []
finally:
if timeout > 0:
tout.cancel()
+12 -1
View File
@@ -1,10 +1,18 @@
import zmq
import time
import zipline.protocol as zp
from datetime import datetime
import blist
from zipline.utils.date_utils import EPOCH
from itertools import izip
from logbook import FileHandler
def setup_logger(test, path='/var/log/zipline/zipline.log'):
test.log_handler = FileHandler(path)
test.log_handler.push_application()
def teardown_logger(test):
test.log_handler.pop_application()
def check_list(test, a, b, label):
test.assertTrue(isinstance(a, (list, blist.blist)))
@@ -59,7 +67,9 @@ def drain_zipline(test, zipline):
"need to specify a socket address for logs/perf/risk"
test.receiver = test.ctx.socket(zmq.PULL)
test.receiver.bind(test.zipline_test_config['results_socket'])
# Bind and connect are asynch, so allow time for bind before
# starting the zipline (TSC connects internally).
time.sleep(1)
# start the simulation
zipline.simulate(blocking=False)
@@ -78,6 +88,7 @@ def drain_zipline(test, zipline):
elif update['prefix'] == 'EXCEPTION':
break
test.receiver.close()
del test.receiver
# some processes will exit after the message stream is
+1 -1
View File
@@ -1,5 +1,5 @@
"""
Misc ZeroMQ utilities.
Misc ZeroMQ experimental tools
"""
import gevent
import msgpack