added support for any component to relay exceptions through monitor.

This commit is contained in:
fawce
2012-08-01 14:56:17 -04:00
10 changed files with 192 additions and 167 deletions
+21 -26
View File
@@ -58,7 +58,6 @@ class ExceptionTestCase(TestCase):
self.assertTrue(payload['date'])
del payload['date']
check(self, payload, INITIALIZE_TB)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
@@ -85,7 +84,6 @@ class ExceptionTestCase(TestCase):
self.assertTrue(payload['date'])
del payload['date']
check(self, payload, HANDLE_DATA_TB)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
@@ -109,7 +107,6 @@ class ExceptionTestCase(TestCase):
self.assertTrue(payload['date'])
del payload['date']
check(self, payload, ZERO_DIV_TB)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
@@ -123,20 +120,20 @@ class ExceptionTestCase(TestCase):
INITIALIZE_TB =\
{'message': 'Algo exception in initialize',
'name': 'Exception',
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'},
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.initialize_algo()',
'lineno': 97,
'lineno': 91,
'method': 'do_work'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.do_op(self.algorithm.initialize)',
'lineno': 80,
'lineno': 74,
'method': 'initialize_algo'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'callable_op(*args, **kwargs)',
'lineno': 210,
'lineno': 194,
'method': 'do_op'},
{'filename': '/zipline/test_algorithms.py',
'line': 'raise Exception("Algo exception in initialize")',
@@ -144,54 +141,52 @@ INITIALIZE_TB =\
'method': 'initialize'}]}
HANDLE_DATA_TB =\
{
'message': 'Algo exception in handle_data',
{'message': 'Algo exception in handle_data',
'name': 'Exception',
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'},
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.process_event(event)',
'lineno': 116,
'lineno': 110,
'method': 'do_work'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.run_algorithm()',
'lineno': 164,
'lineno': 158,
'method': 'process_event'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.do_op(self.algorithm.handle_data, data)',
'lineno': 186,
'lineno': 180,
'method': 'run_algorithm'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'callable_op(*args, **kwargs)',
'lineno': 210,
'lineno': 194,
'method': 'do_op'},
{'filename': '/zipline/test_algorithms.py',
'line': 'raise Exception("Algo exception in handle_data")',
'lineno': 187,
'method': 'handle_data'}]}
ZERO_DIV_TB= \
{'message': 'integer division or modulo by zero',
'name': 'ZeroDivisionError',
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 204, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 195, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 235, 'method': 'loop'},
'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'},
{'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'},
{'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.process_event(event)',
'lineno': 116,
'lineno': 110,
'method': 'do_work'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.run_algorithm()',
'lineno': 164,
'lineno': 158,
'method': 'process_event'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'self.do_op(self.algorithm.handle_data, data)',
'lineno': 186,
'lineno': 180,
'method': 'run_algorithm'},
{'filename': '/zipline/components/tradesimulation.py',
'line': 'callable_op(*args, **kwargs)',
'lineno': 210,
'lineno': 194,
'method': 'do_op'},
{'filename': '/zipline/test_algorithms.py', 'line': '5/0', 'lineno': 218, 'method': 'handle_data'}]}
+12 -28
View File
@@ -12,7 +12,7 @@ from zipline.utils.protocol_utils import ndict
from zipline.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe
from logbook import Logger, NestedSetup, Processor, queues
from logbook import Logger, NestedSetup, Processor
log = logbook.Logger('TradeSimulation')
@@ -20,7 +20,7 @@ log = logbook.Logger('TradeSimulation')
class TradeSimulationClient(Component):
def init(self, trading_environment, sim_style, results_socket):
def init(self, trading_environment, sim_style, results_socket, algorithm):
self.received_count = 0
self.prev_dt = None
self.event_queue = None
@@ -29,13 +29,20 @@ class TradeSimulationClient(Component):
self.trading_environment = trading_environment
self.current_dt = trading_environment.period_start
self.last_iteration_dur = datetime.timedelta(seconds=0)
self.algorithm = None
self.algorithm = algorithm
self.algorithm.set_order(self.order)
self.max_wait = datetime.timedelta(seconds=60)
self.last_msg_dt = datetime.datetime.utcnow()
self.txn_sim = TransactionSimulator(sim_style)
self.txn_sim = TransactionSimulator(
open_orders={},
style=sim_style
)
self.event_data = ndict()
self.perf = perf.PerformanceTracker(self.trading_environment)
self.perf = perf.PerformanceTracker(
self.trading_environment,
self.algorithm.get_sid_filter()
)
self.zmq_out = None
self.results_socket = results_socket
self.algo_initialized = False
@@ -44,19 +51,6 @@ class TradeSimulationClient(Component):
def get_id(self):
return str(zp.FINANCE_COMPONENT.TRADING_CLIENT)
def set_algorithm(self, algorithm):
"""
:param algorithm: must implement the algorithm protocol. See
:py:mod:`zipline.test.algorithm`
"""
self.algorithm = algorithm
# register the client's order method with the algorithm
self.algorithm.set_order(self.order)
# we need to provide the performance tracker with the
# sids referenced in the algorithm, so portfolio can
# initialize with all possible sids.
self.perf.set_sids(self.algorithm.get_sid_filter())
def open(self):
self.result_feed = self.connect_result()
if self.results_socket:
@@ -185,16 +179,6 @@ class TradeSimulationClient(Component):
# LOG_EXTRA_FIELDS in zipline/protocol.py
self.do_op(self.algorithm.handle_data, data)
def exception_callback(self, exc_type, exc_value, exc_traceback):
if self.results_socket:
log.info("Sending exception frame")
msg = zp.EXCEPTION_FRAME(
exc_traceback,
exc_type.__name__,
exc_value.message
)
self.out_socket.send(msg)
def do_op(self, callable_op, *args, **kwargs):
""" Wrap a callable operation with the zmq logbook
handler if it exits."""
+20 -14
View File
@@ -17,8 +17,14 @@ import zmq
from zipline.core.monitor import PARAMETERS
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
COMPONENT_FAILURE, CONTROL_FRAME, CONTROL_UNFRAME
from zipline.protocol import (
CONTROL_PROTOCOL,
COMPONENT_STATE,
COMPONENT_FAILURE,
CONTROL_FRAME,
CONTROL_UNFRAME,
EXCEPTION_FRAME
)
log = logbook.Logger('Component')
@@ -491,7 +497,6 @@ class Component(object):
self._exception = exc
exc_type, exc_value, exc_traceback = sys.exc_info()
trace = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
# if a downstream component fails, this component may try
# sending when there are zero connections to the socket,
@@ -506,14 +511,19 @@ class Component(object):
# sys.stdout.write(trace)
log.exception("Unexpected error in run for {id}.".format(id=self.get_id))
self.relay_exception(exc_type, exc_value, exc_traceback)
if hasattr(self, 'control_out') and self.control_out:
try:
log.info('{id} sending exception to controller'.format(id=self.get_id))
log.info('{id} sending exception to controller'\
.format(id=self.get_id))
msg = EXCEPTION_FRAME(
exc_traceback,
exc_type.__name__,
exc_value.message
)
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
trace
msg
)
self.control_out.send(exception_frame, self.zmq.NOBLOCK)
# The controller should relay the exception back
@@ -525,16 +535,12 @@ class Component(object):
self.heartbeat(timeout=1000)
log.warn("{id} never heard back from monitor."\
.format(id=self.get_id))
except KillSignal:
log.info("{id} received confirmation from controller"\
.format(id=self.get_id))
except:
log.exception("Exception waiting for controller reply")
def relay_exception(self, exc_type, exc_value, exc_traceback):
if hasattr(self, 'exception_callback') and self.exception_callback:
log.info('{id} making exception callback'.format(id=self.get_id))
self.exception_callback(exc_type, exc_value, exc_traceback)
def signal_done(self):
"""
Notify down stream components that we're done.
+45 -34
View File
@@ -9,8 +9,13 @@ from signal import SIGHUP, SIGINT
from collections import OrderedDict, Counter
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
from zipline.protocol import (
CONTROL_PROTOCOL,
CONTROL_FRAME,
CONTROL_UNFRAME,
CONTROL_STATES,
INVALID_CONTROL_FRAME
)
from zipline.utils.protocol_utils import ndict
@@ -248,6 +253,11 @@ class Controller(object):
self.router.bind(self.route_socket)
self.router.setsockopt(zmq.LINGER, 0)
# -- Exception Out --
# ===================
self.ex_out = self.context.socket(self.zmq.PUSH)
self.ex_out.connect(self.exception_socket)
poller = self.zmq.Poller()
poller.register(self.router, self.zmq.POLLIN)
#poller.register(self.cancel, self.zmq.POLLIN)
@@ -379,6 +389,7 @@ class Controller(object):
"""
if not self.send_sighup:
log.warning("Skipping SIGINT")
return
ppid = os.getpid()
log.warning("Sending SIGINT")
os.kill(ppid, SIGINT)
@@ -417,8 +428,7 @@ class Controller(object):
log.info('New component %r' % component)
for component in bad:
self.fail(component)
self.timed_out(component)
for component in missing:
@@ -465,22 +475,16 @@ class Controller(object):
# Epic Fail Handling
# ------------------
def fail_universal(self):
# TODO: this requires higher order functionality
log.error('Component missed heartbeat, Monitor shutting down system')
self.kill()
def fail(self, component):
def timed_out(self, component):
if self.state is CONTROL_STATES.TERMINATE:
return
universal = self.fail_universal
fail_handlers = { }
if component in (self.topology - self.finished) or self.freeform:
log.warning('Component "%s" missed heartbeat' % component)
self.tracked.remove(component)
fail_handlers.get(component, universal)()
# we treat a time out as a severe failure, and
# conduct a rapid shutdown
self.kill()
# -------------------
# Completion Handling
@@ -494,26 +498,16 @@ class Controller(object):
# --------------
# Error Handling
# --------------
def exception_universal(self):
"""
Shutdown the system on failure.
"""
log.error('System in exception state, shutting down')
def exception(self, component, exception_data):
self.error_replay[(component, time.time())] = exception_data
log.error('Component in exception state: %s. Shutting down system and sending exception data to listeners.'\
% component)
# Send the exception message out to listeners.
self.ex_out.send(exception_data)
# An exception in one component is treated as a hard
# failure, and we conduct a rapid shutdown.
self.kill()
def exception(self, component, failure):
universal = self.exception_universal
exception_handlers = { }
if component in self.topology or self.freeform:
self.error_replay[(component, time.time())] = failure
log.error('Component in exception state: %s' % component)
exception_handlers.get(component, universal)()
else:
raise UnknownChatter(component)
# -----------------
# Protocol Handling
# -----------------
@@ -560,7 +554,19 @@ class Controller(object):
# A component is telling us it failed, and how
if id is CONTROL_PROTOCOL.EXCEPTION:
self.exception(identity, status)
# status should be a msgpack emitted from
# EXCEPTION_FRAME
try:
exception_data = status
self.exception(identity, exception_data)
except:
# if an exception occurs when we try to handle
# the exception, signal the parent that we need
# to go down
# TODO: should we attempt to call self.exception?
log.exception("Unexpected exception sending exception data")
self.kill()
return
# A component is telling us its done with work and won't
@@ -615,6 +621,8 @@ class Controller(object):
log.info('Component Log for -- %s --:\n%s' % (component, error))
def kill(self):
"""Aggressively exit the whole zipline.
"""
if self.state is CONTROL_STATES.TERMINATE:
return
@@ -622,6 +630,9 @@ class Controller(object):
self.send_hardkill()
self.state = CONTROL_STATES.TERMINATE
self.alive = False
# send burrito an interrupt, instructing it to kill all
# child processes assocated with this zipline.
time.sleep(3)
self.signal_interrupt()
def shutdown(self):
+9 -6
View File
@@ -144,7 +144,7 @@ class PerformanceTracker(object):
"""
def __init__(self, trading_environment):
def __init__(self, trading_environment, sid_list):
self.trading_environment = trading_environment
self.trading_day = datetime.timedelta(hours = 6, minutes = 30)
@@ -164,7 +164,6 @@ class PerformanceTracker(object):
self.txn_count = 0
self.event_count = 0
self.last_dict = None
self.order_log = []
self.exceeded_max_loss = False
self.results_socket = None
@@ -198,9 +197,14 @@ class PerformanceTracker(object):
keep_transactions = True
)
def set_sids(self, sid_list):
for sid in sid_list:
self.cumulative_performance.positions[sid] = Position(sid)
self.todays_performance.positions[sid] = Position(sid)
def update(self, event):
event.perf_message = self.process_event()
event.portfolio = self.get_portfolio
return event
def get_portfolio(self):
return self.cumulative_performance.as_portfolio()
@@ -238,8 +242,6 @@ class PerformanceTracker(object):
'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict()
}
def log_order(self, order):
self.order_log.append(order)
def process_event(self, event):
@@ -288,6 +290,8 @@ class PerformanceTracker(object):
# calculate progress of test
self.progress = self.day_count / self.total_days
# TODO!!!!
# Output results
if self.results_socket:
msg = zp.PERF_FRAME(self.to_dict())
@@ -584,7 +588,6 @@ class PerformancePeriod(object):
return positions
#
def get_positions_list(self):
positions = []
for sid, pos in self.positions.iteritems():
+9 -26
View File
@@ -10,9 +10,8 @@ log = logbook.Logger('Transaction Simulator')
class TransactionSimulator(object):
def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME):
self.open_orders = {}
self.order_count = 0
def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME):
self.open_orders = open_orders
self.txn_count = 0
self.trade_window = datetime.timedelta(seconds=30)
self.orderTTL = datetime.timedelta(days=1)
@@ -27,28 +26,12 @@ class TransactionSimulator(object):
elif style == SIMULATION_STYLE.NOOP:
self.apply_trade_to_open_orders = self.simulate_noop
def add_open_order(self, event):
# Orders are captured in a buffer by sid. No calculations are done here.
# Amount is explicitly converted to an int.
# Orders of amount zero are ignored.
self.order_count += 1
event.amount = int(event.amount)
if event.amount == 0:
log = "requested to trade zero shares of {sid}".format(
sid=event.sid
)
log.debug(log)
return
if not self.open_orders.has_key(event.sid):
self.open_orders[event.sid] = []
# set the filled property to zero
event.filled = 0
self.open_orders[event.sid].append(event)
def update(self, event):
event.txn = None
if event.type == zp.DATASOURCE_TYPE.TRADE:
event.txn = self.apply_trade_to_open_orders(event)
return event
def simulate_buy_all(self, event):
txn = self.create_transaction(
event.sid,
@@ -81,7 +64,7 @@ class TransactionSimulator(object):
txn = self.create_transaction(
event.sid,
amount,
event.price + 0.10,
event.price + 0.10, # Magic constant?
event.dt,
direction
)
+38
View File
@@ -0,0 +1,38 @@
from zipline.gens.composites import
if __name__ == "__main__":
filter = [1,2,3,4]
#Set up source a. One hour between events.
args_a = tuple()
kwargs_a = {'sids' : [1,2,3,4],
'start' : datetime(2012,6,6,0),
'delta' : timedelta(minutes = ),
'filter' : filter
}
#Set up source b. One day between events.
args_b = tuple()
kwargs_b = {'sids' : [1,2,3,4],
'start' : datetime(2012,6,6,0),
'delta' : timedelta(days = 1),
'filter' : filter
}
#Set up source c. One minute between events.
args_c = tuple()
kwargs_c = {'sids' : [1,2,3,4],
'start' : datetime(2012,6,6,0),
'delta' : timedelta(minutes = 1),
'filter' : filter
}
sources = (SpecificEquityTrades,) * 4
source_args = (args_a, args_b, args_c, args_d)
source_kwargs = (kwargs_a, kwargs_b, kwargs_c, kwargs_d)
# Generate our expected source_ids.
zip_args = zip(source_args, source_kwargs)
expected_ids = ["SpecificEquityTrades" + hash_args(*args, **kwargs)
for args, kwargs in zip_args]
# Pipe our sources into sort.
sort_out = date_sorted_sources(sources, source_args, source_kwargs)
+24 -27
View File
@@ -13,7 +13,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
our algorithm's simulated universe. Results are fed to the user's
algorithm, which directly inserts transactions into the
TransactionSimulator's order book.
TransactionSimulator maintains a dictionary from sids to the
unfulfilled orders placed by the user's algorithm. As trade
events arrive, if the algorithm has open orders against the
@@ -37,7 +37,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
overwritten so that only the most recent snapshot of the universe
is sent to the algo.
"""
#============
# Algo Setup
#============
@@ -46,10 +46,10 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
# reference it from within the user's algorithm.
sids = algo.get_sid_filter()
open_orders = {}
for sid in sids:
open_orders[sids] = []
# Closure to pass into the user's algo to allow placing orders
# into the txn_sim's dict of open orders.
def order(self, sid, amount):
@@ -70,51 +70,48 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
return
open_orders[sid].append(event)
# Set the algo's order method.
algo.set_order(order)
# Provide a logbook logging interface to user code.
algo.set_logger(Logger("Algolog"))
# Call user-defined initialize method before we process any
# events.
algo.initialize()
# Pipe the in stream into the transaction simulator.
# Creates a TRANSACTION field on the event containing transaction
# information if we filled any pending orders on the event's sid.
# TRANSACTION is None if we didn't fill any orders.
with_txns = stateful_transform(stream_in,
TransactionSimulator,
open_orders,
style = sim_style)
with_txns = stateful_transform(
stream_in,
TransactionSimulator,
open_orders,
style = sim_style
)
# Pipe the events with transactions to perf. This will remove the
# TRANSACTION field added by TransactionSimulator and replace it with
# a portfolio object to be passed to the user's algorithm. Also adds
# a PERF_MESSAGE field which is usually none, but contains an update
# message once per day.
with_portfolio_and_perf_msg = stateful_transform(stream_with_txns,
PerformanceTracker,
trading_environment,
sids)
with_portfolio_and_perf_msg = stateful_transform(
stream_with_txns,
PerformanceTracker,
trading_environment,
sids
)
# Batch the event stream by dt to be processed by the user's algo.
# Will also set the PERF_MESSAGE field if the batch contains a perf
# message.
batches = batcher(with_portfolio_and_perf_msg)
for batch in batches:
algo.handle_data(batch.data)
if batch.perf_message:
yield perf_message
+5 -3
View File
@@ -2,15 +2,17 @@ import zmq
import zipline.protocol as zp
def gen_from_zmq(poller, unframe):
def gen_from_zmq(poller, unframe, namestring):
"""
A generator that takes an initialized zmq poller and yields
messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE.
"""
while True:
message = poller.recv()
if message = zp.CONTROL_PROTOCOL.DONE:
yield "DONE"
# Done protocol should now be a message type so that
# done messages can also have source_ids.
if message.type == zp.CONTROL_PROTOCOL.DONE:
yield done_message(message.source_id)
break
else:
yield unframe(message)
+9 -3
View File
@@ -132,9 +132,15 @@ class SimulatedTrading(object):
}
self.con = Controller(
# pub socket
sockets[5],
# route socket
sockets[6],
self.send_sighup
# exception socket to match tradesimclient's result
# socket, because we want to relay exceptions to the
# same listener
config['results_socket'],
send_sighup=self.send_sighup
)
self.started = False
@@ -146,7 +152,8 @@ class SimulatedTrading(object):
self.trading_client = TradeSimulationClient(
self.trading_environment,
self.sim_style,
config['results_socket']
config['results_socket'],
self.algorithm
)
self.add_client(self.trading_client)
@@ -158,7 +165,6 @@ class SimulatedTrading(object):
self.sim.register_controller( self.con )
self.trading_client.set_algorithm(self.algorithm)
@staticmethod
def create_test_zipline(**config):