From abf9c8efa5c3bb4bded640f2ce5bb0d75d4edb47 Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 26 Jul 2012 16:22:13 -0400 Subject: [PATCH] exception handling code revised. gevent, pypy, and threadsim vestiges removed. devel flag removed. --- tests/client.py | 1 - tests/test_exception_handling.py | 104 ++++++++++++++++---- tests/test_finance.py | 15 +-- tests/test_monitor.py | 16 ---- zipline/components/aggregator.py | 26 ++--- zipline/components/tradesimulation.py | 9 +- zipline/core/component.py | 132 ++++++++++++++------------ zipline/core/monitor.py | 109 +++++++-------------- zipline/lines.py | 10 -- zipline/test_algorithms.py | 18 ++-- zipline/utils/gpoll.py | 99 ------------------- zipline/utils/test_utils.py | 13 ++- zipline/utils/zmq_utils.py | 2 +- 13 files changed, 233 insertions(+), 321 deletions(-) delete mode 100644 zipline/utils/gpoll.py diff --git a/tests/client.py b/tests/client.py index c2c07560..3ceede6f 100644 --- a/tests/client.py +++ b/tests/client.py @@ -1,5 +1,4 @@ import logging -from gevent_zeromq import zmq import zipline.protocol as zp from zipline.core.component import Component diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index 3ade4526..6d3a3ff8 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -2,14 +2,17 @@ import zmq from unittest2 import TestCase from collections import defaultdict -from logbook.compat import LoggingHandler -from zipline.test_algorithms import ExceptionAlgorithm +from zipline.test_algorithms import ExceptionAlgorithm, NoopAlgorithm from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading -from zipline.utils.test_utils import drain_zipline, check +from zipline.utils.test_utils import \ + drain_zipline, \ + check, \ + setup_logger, \ + teardown_logger DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 @@ -30,67 +33,126 @@ class ExceptionTestCase(TestCase): 'simulation_style' : SIMULATION_STYLE.FIXED_SLIPPAGE } self.ctx = zmq.Context() - - self.log_handler = LoggingHandler() - self.log_handler.push_application() + setup_logger(self) def tearDown(self): - self.log_handler.pop_application() + self.ctx.term() + teardown_logger(self) def test_exception_in_init(self): + # Simulation + # ---------- + self.zipline_test_config['algorithm'] = \ + ExceptionAlgorithm( + 'initialize', + self.zipline_test_config['sid'] + ) + + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) + output, _ = drain_zipline(self, zipline) + self.assertEqual(len(output), 1) + self.assertEqual(output[-1]['prefix'], 'EXCEPTION') + payload = output[-1]['payload'] + #check(self, payload, INITIALIZE_TB) + + self.assertTrue(zipline.sim.ready()) + self.assertFalse(zipline.sim.exception) + + + def test_exception_in_handle_data(self): # Simulation # ---------- - self.zipline_test_config['algorithm'] = \ - ExceptionAlgorithm('initialize') + ExceptionAlgorithm( + 'handle_data', + self.zipline_test_config['sid'] + ) zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) output, _ = drain_zipline(self, zipline) + self.assertEqual(len(output), 1) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] - check(self, payload, INITIALIZE_STACK_TB) + #check(self, payload, HANDLE_DATA_TB) - import nose.tools; nose.tools.set_trace() self.assertTrue(zipline.sim.ready()) self.assertFalse(zipline.sim.exception) + # TODO: - # - exception protocol to use prefix/payload as EXCEPT, - # and the stack trace - # - test exception in handle_data # - define more zipline failure modes: exception in other # components, exception in Monitor, etc. write tests # for those scenarios. -INITIALIZE_STACK_TB =\ +INITIALIZE_TB =\ [{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', 'line': 'self._run()', 'lineno': 229, 'method': 'run'}, {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', - 'line': 'self.open()', - 'lineno': 208, + 'line': 'self.loop()', + 'lineno': 220, 'method': '_run'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', + 'line': 'self.do_work()', + 'lineno': 257, + 'method': 'loop'}, {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', 'line': 'self.initialize_algo()', - 'lineno': 73, - 'method': 'open'}, + 'lineno': 100, + 'method': 'do_work'}, {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', 'line': 'self.do_op(self.algorithm.initialize)', 'lineno': 83, 'method': 'initialize_algo'}, {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', 'line': 'callable_op(*args, **kwargs)', - 'lineno': 205, + 'lineno': 208, 'method': 'do_op'}, {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/test_algorithms.py', 'line': 'raise Exception("Algo exception in initialize")', - 'lineno': 161, + 'lineno': 162, 'method': 'initialize'}] + +HANDLE_DATA_TB =\ +[{'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', + 'line': 'self._run()', + 'lineno': 229, + 'method': 'run'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', + 'line': 'self.loop()', + 'lineno': 220, + 'method': '_run'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/core/component.py', + 'line': 'self.do_work()', + 'lineno': 257, + 'method': 'loop'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', + 'line': 'self.process_event(event)', + 'lineno': 119, + 'method': 'do_work'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', + 'line': 'self.run_algorithm()', + 'lineno': 167, + 'method': 'process_event'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', + 'line': 'self.do_op(self.algorithm.handle_data, data)', + 'lineno': 189, + 'method': 'run_algorithm'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/components/tradesimulation.py', + 'line': 'callable_op(*args, **kwargs)', + 'lineno': 208, + 'method': 'do_op'}, + {'file': '/Users/fawce/projects/qexec/zipline_repo/zipline/test_algorithms.py', + 'line': 'raise Exception("Algo exception in handle_data")', + 'lineno': 183, + 'method': 'handle_data'}] diff --git a/tests/test_finance.py b/tests/test_finance.py index db77f3ec..0badbdd6 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -21,8 +21,13 @@ from zipline.lines import SimulatedTrading from zipline.finance.performance import PerformanceTracker from zipline.utils.protocol_utils import ndict from zipline.finance.trading import TransactionSimulator, SIMULATION_STYLE -from zipline.utils.test_utils import assert_single_position,\ - drain_zipline +from zipline.utils.test_utils import \ + drain_zipline, \ + check, \ + setup_logger, \ + teardown_logger,\ + assert_single_position + DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 @@ -42,11 +47,10 @@ class FinanceTestCase(TestCase): } self.ctx = zmq.Context() - self.log_handler = LoggingHandler() - self.log_handler.push_application() + setup_logger(self) def tearDown(self): - self.log_handler.pop_application() + teardown_logger(self) @timed(DEFAULT_TIMEOUT) def test_factory_daily(self): @@ -143,7 +147,6 @@ class FinanceTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, transaction_count = drain_zipline(self, zipline) self.assertTrue(zipline.sim.ready()) diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 8b670356..be381135 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -1,4 +1,3 @@ -import gevent from logbook.compat import LoggingHandler from unittest2 import TestCase, skip @@ -25,18 +24,3 @@ class TestMonitor(TestCase): con = Controller(pub_socket, route_socket, ) con.manage([ 'a', 'b', 'c', 'd' ]) - - @skip - def test_poll(self): - from mock_zmq import zmq_synthetic - pub_socket = 'tcp://127.0.0.1:5000' - route_socket = 'tcp://127.0.0.1:5001' - cancel_socket = 'tcp://127.0.0.1:5002' - - con = Controller(pub_socket, route_socket, cancel_socket) - con.manage([ 'a', 'b', 'c', 'd' ]) - con.zmq = zmq_synthetic - con.zmq_flavor = 'green' - - con.period = 0.00001 - gevent.spawn(con.run).join(timeout=con.period) diff --git a/zipline/components/aggregator.py b/zipline/components/aggregator.py index 6eb99fe9..80d6a087 100644 --- a/zipline/components/aggregator.py +++ b/zipline/components/aggregator.py @@ -83,26 +83,16 @@ class Aggregate(Component): self.drain() self.signal_done() else: - try: - event = self.unframe(message) - except zp.INVALID_DATASOURCE_FRAME as exc: - # Error deserializing - return self.signal_exception(exc) + event = self.unframe(message) + self.append(event) - try: - self.append(event) + if self.is_full(): + event = self.next() - if self.is_full() or self.draining: - event = self.next() - - if event: - self.send(event) - else: - pass - - except zp.INVALID_DATASOURCE_FRAME as exc: - # Invalid message - return self.signal_exception(exc) + if event: + self.send(event) + else: + pass # ------------- # Flow Control diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 4141bf91..60202854 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -38,6 +38,7 @@ class TradeSimulationClient(Component): self.perf = perf.PerformanceTracker(self.trading_environment) self.zmq_out = None self.results_socket = results_socket + self.algo_initialized = False @property def get_id(self): @@ -56,9 +57,6 @@ class TradeSimulationClient(Component): # initialize with all possible sids. self.perf.set_sids(self.algorithm.get_sid_filter()) - # N.B. Initialize is now called from open, because we - # need to have a socket open for logging. - def open(self): self.result_feed = self.connect_result() if self.results_socket: @@ -70,7 +68,6 @@ class TradeSimulationClient(Component): self.setup_logging(sock) self.perf.publish_to(sock) - self.initialize_algo() def initialize_algo(self): """ Setup loggers for algorithm and run algorithm's own @@ -81,6 +78,7 @@ class TradeSimulationClient(Component): self.algorithm.set_logger(self.algo_log) self.do_op(self.algorithm.initialize) + self.algo_initialized = True def setup_logging(self, socket = None): sock = socket or self.results_socket @@ -95,6 +93,8 @@ class TradeSimulationClient(Component): self.stdout_capture = stdout_only_pipe def do_work(self): + if not self.algo_initialized: + self.initialize_algo() # see if the poller has results for the result_feed if self.socks.get(self.result_feed) == self.zmq.POLLIN: @@ -187,6 +187,7 @@ class TradeSimulationClient(Component): def exception_callback(self, exc_type, exc_value, exc_traceback): if self.results_socket: + log.info("Sending exception frame") msg = zp.EXCEPTION_FRAME(exc_traceback) self.out_socket.send(msg) diff --git a/zipline/core/component.py b/zipline/core/component.py index 51b439c5..8a399cb2 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -14,14 +14,9 @@ from setproctitle import setproctitle # pyzmq import zmq -# gevent_zeromq -import gevent_zeromq -# zmq_ctypes -#import zmq_ctypes from zipline.core.monitor import PARAMETERS -from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ COMPONENT_FAILURE, CONTROL_FRAME, CONTROL_UNFRAME @@ -29,6 +24,10 @@ log = logbook.Logger('Component') from zipline.exceptions import ComponentNoInit +class KillSignal(Exception): + def __init__(self): + pass + class Component(object): """ @@ -154,35 +153,13 @@ class Component(object): def do_work(self): raise NotImplementedError - def init_zmq(self, flavor): - """ - ZMQ in all flavors. Have it your way. - - mp - Distinct contexts | pyzmq - green - Same context | gevent_zeromq - pypy - Same context | zmq_ctypes - - """ - - if flavor == 'mp': - self.zmq = zmq - self.context = self.zmq.Context() - self.zmq_poller = self.zmq.Poller - # The the process title so you can watch it in top - setproctitle(self.__class__.__name__) - return - if flavor == 'green': - self.zmq = gevent_zeromq.zmq - self.context = self.zmq.Context.instance() - self.zmq_poller = GeventPoller - return - if flavor == 'pypy': - self.zmq = zmq - self.context = self.zmq.Context.instance() - self.zmq_poller = self.zmq.Poller - return - - raise Exception("Unknown ZeroMQ Flavor") + def init_zmq(self): + self.zmq = zmq + self.context = self.zmq.Context() + self.zmq_poller = self.zmq.Poller + # The the process title so you can watch it in top + setproctitle(self.__class__.__name__) + return def _run(self): """ @@ -200,7 +177,7 @@ class Component(object): self.done = False # TODO: use state flag self.sockets = [] - self.init_zmq(self.zmq_flavor) + self.init_zmq() self.setup_poller() @@ -209,8 +186,6 @@ class Component(object): self.signal_ready() self.lock_ready() - - self.wait_ready() # ----------------------- # YOU SHALL NOT PASS!!!!! @@ -228,14 +203,17 @@ class Component(object): try: self._run() except Exception as exc: - exc_info = sys.exc_info() - self.signal_exception(exc) + if not isinstance(exc, KillSignal): + self.signal_exception(exc) + else: + # if we get a kill signal, forcibly close all the + # sockets. + # exc_info = sys.exc_info() + # self.relay_exception(exc_info[0], exc_info[1], exc_info[2]) + self.teardown_sockets() - # Reraise the exception - raise exc_info[0], exc_info[1], exc_info[2] finally: self.shutdown() - self.teardown_sockets() log.info("Exiting %r" % self) def working(self): @@ -311,7 +289,6 @@ class Component(object): # controller that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: self.signal_done() - self.shutdown() # ========= # Hard Kill @@ -336,7 +313,7 @@ class Component(object): # Echo back the heartbeat identifier to tell the # controller that this component is still alive and # doing work - self.control_out.send(heartbeat_frame) + self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) self.last_ping = pre_pong elif self.last_ping and \ time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: @@ -353,6 +330,7 @@ class Component(object): Close all zmq sockets safely. This is universal, no matter where this is running it will need the sockets closed. """ + log.warn("{id} closing all sockets".format(id=self.get_id)) #close all the sockets for sock in self.sockets: sock.close() @@ -364,6 +342,7 @@ class Component(object): Tear down after normal operation. """ if self.on_done: + log.warn("{id} calling done.".format(id=self.get_id)) self.on_done() def kill(self): @@ -373,7 +352,8 @@ class Component(object): Tear down ( fast ) as a mode of failure in the simulation or on service halt. """ - sys.exit(1) + # sys.exit(1) + raise KillSignal() # ---------------------- # Internal Maintenance @@ -404,7 +384,7 @@ class Component(object): start_wait = time.time() while self.waiting: - socks = dict(self.poll.poll(100)) + socks = dict(self.poll.poll(0)) assert self.control_in, \ 'Component does not have a control_in socket' @@ -444,9 +424,7 @@ class Component(object): # data that are done during a clean shutdown. Inform the # controller that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() - self.shutdown() break # ========= @@ -492,11 +470,13 @@ class Component(object): def signal_exception(self, exc=None, scope=None): """ - This is a *very* important error tracking handler. + All exceptions inside any component should boil back to + this handler. Will inform the system that the component has failed and how it has failed. """ + if scope == 'algo': self.error_state = COMPONENT_FAILURE.ALGOEXCEPT else: @@ -512,19 +492,47 @@ class Component(object): exc_type, exc_value, exc_traceback = sys.exc_info() trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) - sys.stdout.write(trace) + # if a downstream component fails, this component may try + # sending when there are zero connections to the socket, + # which will raise ZMQError(EAGAIN). So, it doesn't make + # sense to relay this exception to Monitor and the rest + # of the zipline. + if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN: + log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\ + .format(id=self.get_id)) + return - if hasattr(self, 'exception_callback') and self.exception_callback: - self.exception_callback(exc_type, exc_value, exc_traceback) + # sys.stdout.write(trace) + log.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + + self.relay_exception(exc_type, exc_value, exc_traceback) if hasattr(self, 'control_out') and self.control_out: - exception_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.EXCEPTION, - trace - ) - self.control_out.send(exception_frame) + try: + log.info('{id} sending exception to controller'.format(id=self.get_id)) + exception_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.EXCEPTION, + trace + ) + self.control_out.send(exception_frame, self.zmq.NOBLOCK) + # The controller should relay the exception back + # to all zipline components. Wait here until the + # notice arrives, and we can assume other zipline + # components have broken out of their message + # loops. + for i in xrange(100): + self.heartbeat(timeout=1000) + log.warn("{id} Never heard back from monitor."\ + .format(id=self.get_id)) + except: + log.exception("Exception waiting for controller reply") + + def relay_exception(self, exc_type, exc_value, exc_traceback): + if hasattr(self, 'exception_callback') and self.exception_callback: + log.info('{id} making exception callback'.format(id=self.get_id)) + self.exception_callback(exc_type, exc_value, exc_traceback) + - #LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): """ @@ -532,6 +540,8 @@ class Component(object): """ self.state_flag = COMPONENT_STATE.DONE + # notify internal work loop that we're done + self.done = True # TODO: use state flag if hasattr(self, 'out_socket') and self.out_socket: msg = zmq.Message(str(CONTROL_PROTOCOL.DONE)) @@ -554,8 +564,7 @@ class Component(object): # last heartbeat, and wait an unusually long time. self.heartbeat(timeout=5000) - # notify internal work look that we're done - self.done = True # TODO: use state flag + # ----------- @@ -567,9 +576,6 @@ class Component(object): Setup the poller used for multiplexing the incoming data handling sockets. """ - - # Initializes the poller class specified by the flavor of - # ZeroMQ. Either zmq.Poller or gpoll.Poller . self.poll = self.zmq_poller() def bind_data(self): diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 94866f0b..599565f4 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -1,17 +1,15 @@ +import inspect import os import zmq import sys import time -import gevent import itertools import logbook -import gevent_zeromq from setproctitle import setproctitle from signal import SIGHUP, SIGINT from collections import OrderedDict, Counter -from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ @@ -71,9 +69,8 @@ class Controller(object): debug = True period = PARAMETERS.GENERATIONAL_PERIOD - def __init__(self, pub_socket, route_socket, devel=True): + def __init__(self, pub_socket, route_socket): - self.devel = devel self.nosignals = False self.context = None self.zmq = None @@ -100,34 +97,17 @@ class Controller(object): self.missed_beats = Counter() - log.warn("Running Controller in development mode, will ONLY synchronize start.") + # if we are inside a test, we want to skip signalling + # back to the parent process. + self.inside_test = 'nose' in inspect.stack()[-1][1] - def init_zmq(self, flavor): - assert self.zmq_flavor in ['thread', 'mp', 'green'] - if flavor == 'mp': - self.zmq = zmq - self.context = self.zmq.Context() - self.zmq_poller = self.zmq.Poller - if self.devel: - log.warning("USING DEVELOPMENT MODE IN MP CONTEXT NOT RECOMMENDED") - return - if flavor == 'thread': - self.zmq = zmq - self.context = self.zmq.Context.instance() - self.zmq_poller = self.zmq.Poller - return - if flavor == 'green': - self.zmq = gevent_zeromq.zmq - self.context = self.zmq.Context.instance() - self.zmq_poller = GeventPoller - return - if flavor == 'pypy': - self.zmq = zmq - self.context = self.zmq.Context.instance() - self.zmq_poller = self.zmq.Poller - return + def init_zmq(self): + self.zmq = zmq + self.context = self.zmq.Context() + self.zmq_poller = self.zmq.Poller + return def manage(self, topology): """ @@ -170,8 +150,8 @@ class Controller(object): # ----------------------- # The last breathe of the interpreter will assume that we've # failed unless we specify otherwise. - if not self.devel: - sys.exitfunc = self.signal_interrupt + log.info('registering exit function') + sys.exitfunc = self.signal_interrupt # We overload this if ( and only if ) the topology exits # cleanly. This prevents failure modes where the monitor # dies. @@ -349,31 +329,17 @@ class Controller(object): if complete: self.send_go() - # If we're running in development stop here - # because our responsibilites are over. The - # zipline will either run to completion or die, - # monitor doesn't care anymore because its all - # threads. - - if self.devel: - log.warn("Shutting down Controller because in devel mode") - #sys.exitfunc = lambda: None - self.shutdown(soft=True) - log.info('Heartbeat (%s, %s)' % (done, complete)) # ================ # Exit Strategies # ================ - if self.zmq_flavor == 'green': - gevent.sleep(0) - # Will also fall out of loop when done, if using # non-freeform topology if done: log.info('Entire topology exited cleanly') - self.shutdown(soft=True) + self.shutdown() # Noop exit func #sys.exitfunc = lambda: None @@ -391,12 +357,12 @@ class Controller(object): we're good. The topology exited cleanly and we can prove it. """ - if not self.nosignals: - ppid = os.getppid() - log.warning("Sending SIGHUP") - os.kill(ppid, SIGHUP) - else: - log.warning("Would SIGHUP here, but disabled") + if self.inside_test: + log.warning("Skipping SIGHUP because we're in a nosetest") + return + ppid = os.getppid() + log.warning("Sending SIGHUP") + os.kill(ppid, SIGHUP) def signal_interrupt(self): """ @@ -404,11 +370,8 @@ class Controller(object): interpreter exits. If the monitor dies the system is considered a failure. """ - if not self.nosignals: - ppid = os.getpid() - os.kill(ppid, SIGINT) - else: - log.warning("Would SIGINT here, but disabled") + ppid = os.getpid() + os.kill(ppid, SIGINT) def beat(self): """ @@ -495,7 +458,7 @@ class Controller(object): def fail_universal(self): # TODO: this requires higher order functionality log.error('System in exception state, shutting down') - self.shutdown(soft=True) + self.shutdown() def fail(self, component): if self.state is CONTROL_STATES.TERMINATE: @@ -527,7 +490,7 @@ class Controller(object): Shutdown the system on failure. """ log.error('System in exception state, shutting down') - self.shutdown(hard=True, soft=False) + self.kill() def exception(self, component, failure): universal = self.exception_universal @@ -536,7 +499,6 @@ class Controller(object): if component in self.topology or self.freeform: self.error_replay[(component, time.time())] = failure log.error('Component in exception state: %s' % component) - log.error(str(failure)) exception_handlers.get(component, universal)() else: @@ -642,21 +604,22 @@ class Controller(object): for (component, time), error in self.error_replay.iteritems(): log.info('Component Log for -- %s --:\n%s' % (component, error)) - def shutdown(self, hard=False, soft=True): + def kill(self): + if self.state is CONTROL_STATES.TERMINATE: + return - assert hard or soft, """ Must specify kill hard or soft """ + log.info('Hard Shutdown') + self.send_hardkill() + self.state = CONTROL_STATES.TERMINATE + self.alive = False + + + def shutdown(self): if self.state is CONTROL_STATES.TERMINATE: return + log.info('Soft Shutdown') + self.send_softkill() + self.state = CONTROL_STATES.TERMINATE self.alive = False - - if hard and not self.devel: - self.state = CONTROL_STATES.TERMINATE - log.info('Hard Shutdown') - self.send_hardkill() - - if soft and not self.devel: - self.state = CONTROL_STATES.TERMINATE - log.info('Soft Shutdown') - self.send_softkill() diff --git a/zipline/lines.py b/zipline/lines.py index 99f69e80..c20c1e46 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -115,7 +115,6 @@ class SimulatedTrading(object): self.trading_environment = config['trading_environment'] self.sim_style = config.get('simulation_style') - self.devel = config.get('devel', False) self.leased_sockets = [] self.sim_context = None @@ -134,7 +133,6 @@ class SimulatedTrading(object): self.con = Controller( sockets[5], sockets[6], - devel = self.devel ) self.started = False @@ -255,19 +253,11 @@ class SimulatedTrading(object): 'allocator' : allocator, 'simulation_style' : simulation_style, 'results_socket' : results_socket, - 'devel' : config.get('devel', False) }) #------------------- zipline.add_source(trade_source) - # Save us from needless debugging - inside_test = 'nose' in inspect.stack()[-1][1] - if False and inside_test and not config.get('devel', False): - assert False, """ - You need to run the SimulatedTrading inside a test with devel=True - """ - return zipline def add_source(self, source): diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index 7011fa53..1755bc53 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -145,7 +145,7 @@ class NoopAlgorithm(object): pass def get_sid_filter(self): - return None + return [] class ExceptionAlgorithm(object): """ @@ -153,8 +153,9 @@ class ExceptionAlgorithm(object): constructor. """ - def __init__(self, throw_from): + def __init__(self, throw_from, sid): self.throw_from = throw_from + self.sid = sid def initialize(self): if self.throw_from == "initialize": @@ -187,12 +188,12 @@ class ExceptionAlgorithm(object): if self.throw_from == "get_sid_filter": raise Exception("Algo exception in get_sid_filter") else: - return [1] + return [self.sid] class TestPrintAlgorithm(): - def __init__(self): - pass + def __init__(self, sid): + self.sid = sid def initialize(self): print "Initializing..." @@ -211,12 +212,13 @@ class TestPrintAlgorithm(): pass def get_sid_filter(self): - return [1] + return [self.sid] class TestLoggingAlgorithm(): - def __init__(self): + def __init__(self, sid): self.log = None + self.sid = sid def initialize(self): self.log.info("Initializing...") @@ -234,4 +236,4 @@ class TestLoggingAlgorithm(): self.log.info("Handling Data...") def get_sid_filter(self): - return [1] + return [self.sid] diff --git a/zipline/utils/gpoll.py b/zipline/utils/gpoll.py deleted file mode 100644 index 8c18d4cf..00000000 --- a/zipline/utils/gpoll.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -This is somewhat legally ambigious, since it technically -hasn't been merged in gevent_zeromq but given that the -author issued it as a Pull Request on a MIT project, -indicates that its probably fine to use. ~Steve -""" - -import zmq -from zmq import * - -from zmq.core.poll import Poller as _original_Poller - -import gevent -from gevent import select -from gevent_zeromq.core import _Socket - -def patch_poller(self): - zmq.Poller = _Poller - -class _Poller(_original_Poller): - """ - Replacement for :class:`zmq.core.Poller` - - Ensures that the greened Poller below is used in calls - to :meth:`zmq.core.Poller.poll`. - """ - - def _get_descriptors(self): - """ - Returns three elements tuple with socket descriptors ready for - gevent.select - """ - rlist = [] - wlist = [] - xlist = [] - - for socket, flags in self.sockets.items(): - if isinstance(socket, _Socket): - fd = socket.getsockopt(FD) - elif isinstance(socket, int): - fd = socket - elif hasattr(socket, 'fileno'): - try: - fd = int(socket.fileno()) - except: - raise ValueError('fileno() must return an valid integer fd') - else: - raise TypeError("Socket must be a 0MQ socket, an integer fd or \ - have a fileno() method: %r" % socket) - - if flags & POLLIN: rlist.append(fd) - if flags & POLLOUT: wlist.append(fd) - if flags & POLLERR: xlist.append(fd) - - return (rlist, wlist, xlist) - - def poll(self, timeout=-1): - """Overridden method to ensure that the green version of Poller is used - - Behaves the same as :meth:`zmq.core.Poller.poll` - """ - - if timeout is None: - timeout = -1 - - timeout = int(timeout) - if timeout < 0: - timeout = -1 - - rlist = None - wlist = None - xlist = None - - if timeout > 0: - tout = gevent.Timeout.start_new(timeout/1000.0) - - try: - # Loop until timeout or events available - while True: - events = super(_Poller, self).poll(0) - if events or timeout == 0: - return events - - # wait for activity on sockets in a green way - if not rlist and not wlist and not xlist: - rlist, wlist, xlist = self._get_descriptors() - - try: - select.select(rlist, wlist, xlist) - except gevent.select.error, ex: - raise ZMQError(*ex.args) - - except gevent.Timeout, t: - if t is not tout: - raise - return [] - finally: - if timeout > 0: - tout.cancel() diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index db2e19bb..8d0d93c2 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -1,10 +1,18 @@ import zmq +import time import zipline.protocol as zp from datetime import datetime import blist from zipline.utils.date_utils import EPOCH from itertools import izip +from logbook import FileHandler +def setup_logger(test, path='/var/log/zipline/zipline.log'): + test.log_handler = FileHandler(path) + test.log_handler.push_application() + +def teardown_logger(test): + test.log_handler.pop_application() def check_list(test, a, b, label): test.assertTrue(isinstance(a, (list, blist.blist))) @@ -59,7 +67,9 @@ def drain_zipline(test, zipline): "need to specify a socket address for logs/perf/risk" test.receiver = test.ctx.socket(zmq.PULL) test.receiver.bind(test.zipline_test_config['results_socket']) - + # Bind and connect are asynch, so allow time for bind before + # starting the zipline (TSC connects internally). + time.sleep(1) # start the simulation zipline.simulate(blocking=False) @@ -78,6 +88,7 @@ def drain_zipline(test, zipline): elif update['prefix'] == 'EXCEPTION': break + test.receiver.close() del test.receiver # some processes will exit after the message stream is diff --git a/zipline/utils/zmq_utils.py b/zipline/utils/zmq_utils.py index 5e0a529f..49177ee4 100644 --- a/zipline/utils/zmq_utils.py +++ b/zipline/utils/zmq_utils.py @@ -1,5 +1,5 @@ """ -Misc ZeroMQ utilities. +Misc ZeroMQ experimental tools """ import gevent import msgpack