From 7bb4e754b04d0549669ee7d9062e6cfb470d12a1 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 9 Aug 2012 16:21:49 -0400 Subject: [PATCH 01/46] added logs --- zipline/gens/sort.py | 8 +++++++- zipline/gens/transform.py | 7 ++++++- zipline/protocol.py | 6 ++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/zipline/gens/sort.py b/zipline/gens/sort.py index 9755da74..c76db032 100644 --- a/zipline/gens/sort.py +++ b/zipline/gens/sort.py @@ -1,12 +1,16 @@ """ -Generator version of Feed. +Sorting generator. """ +import logbook + from collections import deque from zipline import ndict from zipline.gens.utils import \ assert_datasource_unframe_protocol, \ assert_sort_protocol +log = logbook.Logger('Sorting') + def date_sort(stream_in, source_ids): """ A generator that takes a generator and a list of source_ids. We @@ -23,6 +27,7 @@ def date_sort(stream_in, source_ids): sources[id] = deque() # Process incoming streams. + log.info('Sorting first message') for message in stream_in: # Incoming messages should be the output of DATASOURCE_UNFRAME. assert_datasource_unframe_protocol(message), \ @@ -46,6 +51,7 @@ def date_sort(stream_in, source_ids): assert len(queue) == 1, "Bad queue in date_sort on exit: %s" % queue assert queue[0].dt == "DONE", \ "Bad last message in date_sort on exit: %s" % queue + log.info('Successfully finished Sorting') def ready(sources): """ diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 651c337d..babf4e3d 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -3,6 +3,7 @@ Generator versions of transforms. """ import types import pytz +import logbook from copy import deepcopy from datetime import datetime, timedelta @@ -15,6 +16,8 @@ from zipline.utils.tradingcalendar import trading_days_between from zipline.gens.utils import assert_sort_unframe_protocol, \ assert_transform_protocol, hash_args +log = logbook.Logger('Transform') + class Passthrough(object): FORWARDER = True """ @@ -72,6 +75,7 @@ class StatefulTransform(object): # Create the string associated with this generator's output. self.namestring = tnfm_class.__name__ + hash_args(*args, **kwargs) + log.info('StatefulTransform [%s] initialized' % self.namestring) def get_hash(self): return self.namestring @@ -82,7 +86,7 @@ class StatefulTransform(object): def _gen(self, stream_in): # IMPORTANT: Messages may contain pointers that are shared with # other streams, so we only manipulate copies. - + log.info('Running StatefulTransform [%s]' % self.get_hash()) for message in stream_in: # allow upstream generators to yield None to avoid @@ -143,6 +147,7 @@ class StatefulTransform(object): out_message.dt = message_copy.dt yield out_message + log.info('Finished StatefulTransform [%s]' % self.get_hash()) class EventWindow: """ Abstract base class for transform classes that calculate iterative diff --git a/zipline/protocol.py b/zipline/protocol.py index dd46bd60..fce16403 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -654,7 +654,13 @@ def tuple_to_date(date_tuple): dt = dt.replace(microsecond = micros, tzinfo = pytz.utc) return dt +# Datasource type should completely determine the other fields of a +# message with its type. DATASOURCE_TYPE = Enum( + 'AS_TRADED_EQUITY', + 'MERGE', + 'SPLIT', + 'DIVIDEND', 'TRADE', 'EMPTY', 'DONE' From 4bfbaa8c26853f54f794ffb8fed0dee3bbd6bae0 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Fri, 10 Aug 2012 01:18:55 -0400 Subject: [PATCH 02/46] minor protocol change --- zipline/gens/transform.py | 3 ++- zipline/protocol.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index babf4e3d..60d0be85 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -105,7 +105,8 @@ class StatefulTransform(object): # FORWARDER flag means we want to keep all original # values, plus append tnfm_id and tnfm_value. Used for # preserving the original event fields when our output - # will be fed into a merge. + # will be fed into a merge. Currently only Passthrough + # uses this flag. if self.forward_all: out_message = message_copy out_message.tnfm_id = self.namestring diff --git a/zipline/protocol.py b/zipline/protocol.py index fce16403..19ee8c80 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -658,7 +658,7 @@ def tuple_to_date(date_tuple): # message with its type. DATASOURCE_TYPE = Enum( 'AS_TRADED_EQUITY', - 'MERGE', + 'MERGER', 'SPLIT', 'DIVIDEND', 'TRADE', From 0506baeae59f0272532f90dc7cca3a2d36a64be4 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 10 Aug 2012 20:59:53 -0400 Subject: [PATCH 03/46] fixed integration tests --- zipline/gens/tradesimulation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 5e56b4f0..a049a5b8 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -87,7 +87,7 @@ class TradeSimulationClient(object): self.sids ) with_portfolio = perf_tracker.transform(with_filled_orders) - + # Pass the messages from perf along with the trading client's # state into the algorithm for simulation. We provide the # trading client so that the algorithm can place new orders @@ -97,7 +97,7 @@ class TradeSimulationClient(object): ordering_client.state, self.algo, ) - + # The algorithm will yield a daily_results message (as # calculated by the performance tracker) at the end of each # day. It will also yield a risk report at the end of the From 500e6a9f2478a2c1bfc7a26480f7b334bab0334a Mon Sep 17 00:00:00 2001 From: fawce Date: Sun, 12 Aug 2012 00:48:00 -0400 Subject: [PATCH 04/46] fixed #563 --- zipline/finance/risk.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index 8e52ad3d..5c42cf69 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -307,16 +307,15 @@ class RiskMetrics(): for i in xrange(7): if(self.treasury_curves.has_key(self.end_date + i * one_day)): curve = self.treasury_curves[self.end_date + i * one_day] - break - if curve: - self.treasury_curve = curve - rate = self.treasury_curve[self.treasury_duration] - #1month note data begins in 8/2001, so we can use 3month instead. - if rate == None and self.treasury_duration == '1month': - rate = self.treasury_curve['3month'] - if rate != None: - return rate * (td.days + 1) / 365 + if curve: + self.treasury_curve = curve + rate = self.treasury_curve[self.treasury_duration] + #1month note data begins in 8/2001, so we can use 3month instead. + if rate == None and self.treasury_duration == '1month': + rate = self.treasury_curve['3month'] + if rate != None: + return rate * (td.days + 1) / 365 message = "no rate for end date = {dt} and term = {term}. Check \ that date doesn't exceed treasury history range." From 19f00e867b4646ae648f8bdaddde500f49a4f8fc Mon Sep 17 00:00:00 2001 From: fawce Date: Sun, 12 Aug 2012 00:53:23 -0400 Subject: [PATCH 05/46] indentation bug --- zipline/finance/risk.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index 5c42cf69..66825eba 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -307,23 +307,22 @@ class RiskMetrics(): for i in xrange(7): if(self.treasury_curves.has_key(self.end_date + i * one_day)): curve = self.treasury_curves[self.end_date + i * one_day] - - if curve: self.treasury_curve = curve rate = self.treasury_curve[self.treasury_duration] #1month note data begins in 8/2001, so we can use 3month instead. if rate == None and self.treasury_duration == '1month': rate = self.treasury_curve['3month'] + if rate != None: return rate * (td.days + 1) / 365 - message = "no rate for end date = {dt} and term = {term}. Check \ - that date doesn't exceed treasury history range." - message = message.format( - dt=self.end_date, - term=self.treasury_duration - ) - raise Exception(message) + message = "no rate for end date = {dt} and term = {term}. Check \ + that date doesn't exceed treasury history range." + message = message.format( + dt=self.end_date, + term=self.treasury_duration + ) + raise Exception(message) From f87025446e3ac8f752d0d3d939293ad0de8cde73 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Mon, 13 Aug 2012 10:14:56 -0400 Subject: [PATCH 06/46] fix DONE bug --- zipline/gens/tradesimulation.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index a049a5b8..e64ba86a 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -221,12 +221,17 @@ class AlgorithmSimulator(object): if event.dt == "DONE": if self.this_snapshot_dt: - # stop iteration happened - # mid-snapshot, so we have a universe - # snapshot that is not yet processed - # by the algorithm. + # StopIteration happened mid-snapshot, so we + # have a universe snapshot that is not yet + # processed by the algorithm. self.simulate_current_snapshot() - break + + # Break out of the loop, causing us to raise + # StopIteration This needs to be outside the check + # on self.this_snapshot_dt or else getting a DONE + # immediately after a snapshot finishes will cause + # type errors. + break # This should only happen for the first event we run. if self.simulation_dt == None: From f819ed67ec160945856647cad12da087ee19af2c Mon Sep 17 00:00:00 2001 From: fawce Date: Mon, 13 Aug 2012 17:05:14 -0400 Subject: [PATCH 07/46] bugfixes --- zipline/gens/tradegens.py | 50 +++++++++++++++++++++++++++------------ zipline/lines.py | 15 ++++++++---- zipline/utils/factory.py | 15 +++++++----- 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index 2e8f6bea..801d98f4 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -5,18 +5,22 @@ and zipline development import random import pytz -from itertools import chain, cycle, ifilter, izip +from itertools import chain, cycle, ifilter, izip, repeat from datetime import datetime, timedelta from zipline.gens.utils import hash_args, create_trade def date_gen(start = datetime(2006, 6, 6, 12, tzinfo=pytz.utc), delta = timedelta(minutes = 1), - count = 100): + count = 100, + repeats = None): """ Utility to generate a stream of dates. """ - return (start + (i * delta) for i in xrange(count)) + if repeats: + return (start + (i * delta) for i in xrange(count) for n in xrange(repeats)) + else: + return (start + (i * delta) for i in xrange(count)) def mock_prices(count, rand = False): """ @@ -74,6 +78,7 @@ class SpecificEquityTrades(object): self.sids = kwargs.get('sids', [1, 2]) self.start = kwargs.get('start', datetime(2008, 6, 6, 15, tzinfo = pytz.utc)) self.delta = kwargs.get('delta', timedelta(minutes = 1)) + self.concurrent = kwargs.get('concurrent', False) # Default to None for event_list and filter. self.event_list = kwargs.get('event_list') @@ -103,20 +108,35 @@ class SpecificEquityTrades(object): # Set up iterators for each expected field. else: - dates = date_gen(count=self.count, - start=self.start, - delta=self.delta - ) - prices = mock_prices(self.count) - volumes = mock_volumes(self.count) - sids = cycle(self.sids) + if self.concurrent: + # in this context the count is the number of + # trades per sid, not the total. + dates = date_gen( + count=self.count, + start=self.start, + delta=self.delta, + repeats=len(self.sids), + ) - # Combine the iterators into a single iterator of arguments - arg_gen = izip(sids, prices, volumes, dates) - # Convert argument packages into events. - unfiltered = (create_trade(*args, source_id = self.get_hash()) - for args in arg_gen) + else: + + dates = date_gen( + count=self.count, + start=self.start, + delta=self.delta + ) + + prices = mock_prices(self.count) + volumes = mock_volumes(self.count) + + sids = cycle(self.sids) + # Combine the iterators into a single iterator of arguments + arg_gen = izip(sids, prices, volumes, dates) + + # Convert argument packages into events. + unfiltered = (create_trade(*args, source_id = self.get_hash()) + for args in arg_gen) # If we specified a sid filter, filter out elements that don't # match the filter. diff --git a/zipline/lines.py b/zipline/lines.py index 9bc8b3ac..1c3a558f 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -269,7 +269,12 @@ class SimulatedTrading(object): of StatefulTransform objects. """ assert isinstance(config, dict) - sid = config['sid'] + sid_list = config.get('sid_list') + if not sid_list: + sid = config.get('sid') + sid_list = [sid] + + concurrent_trades = config.get('concurrent_trades', False) #-------------------- # Trading Environment @@ -307,17 +312,17 @@ class SimulatedTrading(object): #------------------- # Trade Source #------------------- - sids = [sid] - #------------------- if config.has_key('trade_source'): trade_source = config['trade_source'] else: trade_source = factory.create_daily_trade_source( - sids, + sid_list, trade_count, - trading_environment + trading_environment, + concurrent=concurrent_trades ) + #------------------- # Transforms #------------------- diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index e3d92443..cf2168fb 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -174,7 +174,7 @@ def create_random_trade_source(sid, trade_count, trading_environment): return source -def create_daily_trade_source(sids, trade_count, trading_environment): +def create_daily_trade_source(sids, trade_count, trading_environment, concurrent=False): """ creates trade_count trades for each sid in sids list. @@ -189,11 +189,12 @@ def create_daily_trade_source(sids, trade_count, trading_environment): sids, trade_count, timedelta(days=1), - trading_environment + trading_environment, + concurrent=concurrent ) -def create_minutely_trade_source(sids, trade_count, trading_environment): +def create_minutely_trade_source(sids, trade_count, trading_environment, concurrent=False): """ creates trade_count trades for each sid in sids list. @@ -208,10 +209,11 @@ def create_minutely_trade_source(sids, trade_count, trading_environment): sids, trade_count, timedelta(minutes=1), - trading_environment + trading_environment, + concurrent=concurrent ) -def create_trade_source(sids, trade_count, trade_time_increment, trading_environment): +def create_trade_source(sids, trade_count, trade_time_increment, trading_environment, concurrent=False): args = tuple() kwargs = { @@ -219,7 +221,8 @@ def create_trade_source(sids, trade_count, trade_time_increment, trading_environ 'sids' : sids, 'start' : trading_environment.first_open, 'delta' : trade_time_increment, - 'filter' : sids + 'filter' : sids, + 'concurrent' : concurrent } source = SpecificEquityTrades(*args, **kwargs) From b323c6c8f5e75b47e50f0f25916f296cc90ef5a2 Mon Sep 17 00:00:00 2001 From: fawce Date: Mon, 13 Aug 2012 17:18:00 -0400 Subject: [PATCH 08/46] fixed indentation error --- zipline/gens/tradegens.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index 801d98f4..8855b868 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -127,15 +127,15 @@ class SpecificEquityTrades(object): delta=self.delta ) - prices = mock_prices(self.count) - volumes = mock_volumes(self.count) + prices = mock_prices(self.count) + volumes = mock_volumes(self.count) - sids = cycle(self.sids) - # Combine the iterators into a single iterator of arguments - arg_gen = izip(sids, prices, volumes, dates) + sids = cycle(self.sids) + # Combine the iterators into a single iterator of arguments + arg_gen = izip(sids, prices, volumes, dates) - # Convert argument packages into events. - unfiltered = (create_trade(*args, source_id = self.get_hash()) + # Convert argument packages into events. + unfiltered = (create_trade(*args, source_id = self.get_hash()) for args in arg_gen) # If we specified a sid filter, filter out elements that don't From 8a131027a6bbeac08bbbb312744f8e1d5f2a4e7a Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 14 Aug 2012 10:58:13 -0400 Subject: [PATCH 09/46] whitespace Conflicts: zipline/gens/tradegens.py --- zipline/gens/tradegens.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index 8855b868..e099091d 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -136,7 +136,7 @@ class SpecificEquityTrades(object): # Convert argument packages into events. unfiltered = (create_trade(*args, source_id = self.get_hash()) - for args in arg_gen) + for args in arg_gen) # If we specified a sid filter, filter out elements that don't # match the filter. From b5054293da3bb8e104615f839c55b88b32783450 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 14 Aug 2012 12:24:57 -0400 Subject: [PATCH 10/46] change protocol to match new datasources --- zipline/gens/utils.py | 5 +++++ zipline/protocol.py | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index 1ac85df6..b8ee6ac4 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -67,12 +67,17 @@ def hash_args(*args, **kwargs): return hasher.hexdigest() def create_trade(sid, price, amount, datetime, source_id = "test_factory"): + row = ndict({ 'source_id' : source_id, 'type' : DATASOURCE_TYPE.TRADE, 'sid' : sid, 'dt' : datetime, 'price' : price, + 'close' : price, + 'open' : price, + 'low' : price * .95, + 'high' : price * 1.05, 'volume' : amount }) return row diff --git a/zipline/protocol.py b/zipline/protocol.py index 19ee8c80..a2cf50c3 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -132,6 +132,7 @@ from utils.date_utils import EPOCH, UN_EPOCH, epoch_now # ----------------------- PRODUCTION_PREFIXES = ['PERF', 'RISK', 'EXCEPTION','CANCEL','DONE', 'LOG'] +PRICE_FIELDS = ['price', 'open', 'close', 'high', 'low'] INVALID_CONTROL_FRAME = FrameExceptionFactory('CONTROL') @@ -428,21 +429,26 @@ def TRADE_FRAME(event): assert isinstance(event, ndict) assert event.type == DATASOURCE_TYPE.TRADE assert isinstance(event.sid, int) - assert isinstance(event.price, numbers.Real) + for field in PRICE_FIELDS: + assert isinstance(event[field], numbers.Real) assert isinstance(event.volume, numbers.Integral) PACK_DATE(event) return msgpack.dumps(tuple([ event.sid, event.price, + event.open, + event.close, + event.high, + event.low, event.volume, event.dt, - event.type, + event.type ])) def TRADE_UNFRAME(msg): try: packed = msgpack.loads(msg) - sid, price, volume, dt, source_type = packed + sid, price, open, close, high, low, volume, dt, source_type = packed assert isinstance(sid, int) assert isinstance(price, numbers.Real) @@ -450,6 +456,10 @@ def TRADE_UNFRAME(msg): rval = ndict({ 'sid' : sid, 'price' : price, + 'open' : open, + 'close' : close, + 'high' : high, + 'low' : low, 'volume' : volume, 'dt' : dt, 'type' : source_type From 7ce3667bc2401a420e7aaa45fee970f52c025c37 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 14 Aug 2012 15:28:02 -0400 Subject: [PATCH 11/46] hard coded one-day warmup --- zipline/finance/trading.py | 20 ++++++++++++++++++++ zipline/gens/tradesimulation.py | 18 +++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 1d82bc66..2fa99d9b 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -184,6 +184,8 @@ class TradingEnvironment(object): self.first_open = self.calculate_first_open() self.last_close = self.calculate_last_close() + self.prior_day_open = self.calculate_prior_day_open() + def calculate_first_open(self): """ Finds the first trading day on or after self.period_start. @@ -197,6 +199,24 @@ class TradingEnvironment(object): first_open = self.set_NYSE_time(first_open, 9, 30) return first_open + def calculate_prior_day_open(self): + """ + Finds the first trading day open that falls at least a day + before period_start. + """ + one_day = datetime.timedelta(days=1) + first_open = self.period_start - one_day + + if first_open <= self.trading_days[0]: + log.warn("Cannot calculate prior day open.") + return self.period_start + + while not self.is_trading_day(first_open): + first_open = first_open - one_day + + first_open = self.set_NYSE_time(first_open, 9, 30) + return first_open + def calculate_last_close(self): """ Finds the last trading day on or before self.period_end diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index e64ba86a..ba474f3b 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -55,6 +55,9 @@ class TradeSimulationClient(object): self.style = sim_style self.algo_sim = None + self.warmup_start = self.environment.prior_day_open + self.algo_start = self.environment.first_open + def get_hash(self): """ There should only ever be one TSC in the system. @@ -96,6 +99,7 @@ class TradeSimulationClient(object): with_portfolio, ordering_client.state, self.algo, + self.algo_start ) # The algorithm will yield a daily_results message (as @@ -107,7 +111,7 @@ class TradeSimulationClient(object): class AlgorithmSimulator(object): - def __init__(self, stream_in, order_book, algo): + def __init__(self, stream_in, order_book, algo, algo_start): self.stream_in = stream_in @@ -121,6 +125,7 @@ class AlgorithmSimulator(object): self.algo = algo self.sids = algo.get_sid_filter() + self.algo_start = algo_start # Monkey patch the user algorithm to place orders in the # TransactionSimulator's order book. @@ -212,6 +217,17 @@ class AlgorithmSimulator(object): self.algo.initialize() for event in self.stream_in: + + # We're still in the warmup period. Use the event to + # update our universe, but don't start a snapshot or + # pass anything to handle_data. Discard any + # perf messages. + if event.dt < self.algo_start: + self.update_universe(event) + if event.perf_message: + log.info("Discarding perf message because we're in warmup.") + continue + # Yield any perf messages received to be relayed back to # the browser. From 473dafcfb6e7d6c0862f4f1f1b97283c16b66bc0 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 14 Aug 2012 16:31:00 -0400 Subject: [PATCH 12/46] fix DONE bug when all events are in the warmup period --- zipline/gens/tradesimulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index ba474f3b..22afa34f 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -222,7 +222,7 @@ class AlgorithmSimulator(object): # update our universe, but don't start a snapshot or # pass anything to handle_data. Discard any # perf messages. - if event.dt < self.algo_start: + if event.dt != 'DONE' and event.dt < self.algo_start: self.update_universe(event) if event.perf_message: log.info("Discarding perf message because we're in warmup.") From 76ff8e93eb3c259a2273e07849207dd0dadba5ca Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Tue, 14 Aug 2012 16:50:24 -0400 Subject: [PATCH 13/46] Adds source id when specifying events. So that when we add events for testing, we populate source_ids. --- zipline/gens/tradegens.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index e099091d..0842b3bd 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -101,10 +101,16 @@ class SpecificEquityTrades(object): def get_hash(self): return self.__class__.__name__ + "-" + self.arg_string + def update_source_id(self, gen): + for event in gen: + event.source_id = self.get_hash() + yield event + def create_fresh_generator(self): if self.event_list: - unfiltered = (event for event in self.event_list) + event_gen = (event for event in self.event_list) + unfiltered = self.update_source_id(event_gen) # Set up iterators for each expected field. else: From 7f41a9435e8a9e02c9e466a418b98bb4158dd268 Mon Sep 17 00:00:00 2001 From: fawce Date: Tue, 14 Aug 2012 22:46:58 -0400 Subject: [PATCH 14/46] new tests and support for logging primitives (bugfix) --- tests/test_logger.py | 40 +++++++++++++++++++++++++++++++++++++ zipline/protocol.py | 13 +++++++----- zipline/utils/log_utils.py | 1 + zipline/utils/test_utils.py | 7 ++++++- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index 9c7eb685..5a9b8e31 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -1,7 +1,15 @@ import logging +import logbook import uuid +import zmq + +from zipline import ndict from zipline.utils.logger import configure_logging, tail +from zipline.utils.log_utils import ZeroMQLogHandler + +from zipline.utils.test_utils import create_receiver, drain_receiver + from unittest2 import TestCase @@ -20,3 +28,35 @@ class LoggerTestCase(TestCase): last_line = tail(logfile, window=1) logged_msg = last_line.split(" - ")[1] self.assertEqual(test_msg, logged_msg) + + + def test_zmq_handler(self): + socket_addr = 'tcp://127.0.0.1:10000' + ctx = zmq.Context() + socket_push = ctx.socket(zmq.PUSH) + socket_push.connect(socket_addr) + recv = create_receiver(socket_addr, ctx) + zmq_out = ZeroMQLogHandler( + socket = socket_push, + filter = lambda r, h: r.channel in ['test zmq logger'], + context=ctx, + #bubble=False + ) + + log = logbook.Logger('test zmq logger') + x = ndict({}) + x.a = 1 + ex = example(133) + with zmq_out.threadbound(): + log.info(ex.num) + + + output, _ = drain_receiver(recv, count=1) + self.assertEqual(output[-1]['prefix'], 'LOG') + self.assertTrue(isinstance(output[-1]['payload']['msg'], basestring)) + + +class example(object): + + def __init__(self, num): + self.num = num diff --git a/zipline/protocol.py b/zipline/protocol.py index a2cf50c3..f9c1326b 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -437,8 +437,8 @@ def TRADE_FRAME(event): event.sid, event.price, event.open, - event.close, - event.high, + event.close, + event.high, event.low, event.volume, event.dt, @@ -665,11 +665,11 @@ def tuple_to_date(date_tuple): return dt # Datasource type should completely determine the other fields of a -# message with its type. +# message with its type. DATASOURCE_TYPE = Enum( - 'AS_TRADED_EQUITY', + 'AS_TRADED_EQUITY', 'MERGER', - 'SPLIT', + 'SPLIT', 'DIVIDEND', 'TRADE', 'EMPTY', @@ -736,6 +736,9 @@ def LOG_FRAME(payload): assert payload.has_key('msg'),\ "LOG_FRAME with no message" + # truncation will only work with strings and msgpack will + # preserve primitives. + payload['msg'] = str(payload['msg']) return BT_UPDATE_FRAME('LOG', payload) diff --git a/zipline/utils/log_utils.py b/zipline/utils/log_utils.py index f9fbc57c..ea1abf18 100644 --- a/zipline/utils/log_utils.py +++ b/zipline/utils/log_utils.py @@ -89,6 +89,7 @@ class ZeroMQLogHandler(Handler): def __init__(self, socket=None, level=NOTSET, filter=None, bubble=False, context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS): Handler.__init__(self, level, filter, bubble) + try: import zmq except ImportError: diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 3a9858a3..2ebbae73 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -91,11 +91,13 @@ def create_receiver(socket_addr, ctx): return receiver -def drain_receiver(receiver): +def drain_receiver(receiver, count=None): output = [] transaction_count = 0 + msg_counter = 0 while True: msg = receiver.recv() + msg_counter += 1 update = zp.BT_UPDATE_UNFRAME(msg) output.append(update) if update['prefix'] == 'PERF': @@ -106,6 +108,9 @@ def drain_receiver(receiver): elif update['prefix'] == 'DONE': break + if count and msg_counter >= count: + break + receiver.close() del receiver From c4cb7d41f2cd3869d8b21f6de183270d5a0458f1 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 15 Aug 2012 12:15:41 -0400 Subject: [PATCH 15/46] added tests for stddev --- tests/test_transforms.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 491df1b5..69ab0ac1 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -15,6 +15,7 @@ from zipline.gens.tradegens import SpecificEquityTrades from zipline.gens.transform import StatefulTransform, EventWindow from zipline.gens.vwap import VWAP from zipline.gens.mavg import MovingAverage +from zipline.gens.stddev import MovingStandardDev from zipline.gens.returns import Returns import zipline.utils.factory as factory @@ -70,6 +71,7 @@ class EventWindowTestCase(TestCase): delta = timedelta(minutes = 5), days = None ) + now = utcnow() # 15 dates, increasing in 1 minute increments. @@ -99,6 +101,7 @@ class EventWindowTestCase(TestCase): delta = None, days = 1 ) + dates = ([self.pre_open]*3) dates += ([self.mid_day]*3) dates += ([self.post_close]*3) @@ -239,11 +242,12 @@ class FinanceTransformsTestCase(TestCase): fields = ['price', 'volume'], delta = timedelta(days = 2), ) + transformed = list(mavg.transform(self.source)) # Output values. tnfm_prices = [message.tnfm_value.price for message in transformed] tnfm_volumes = [message.tnfm_value.volume for message in transformed] - + # "Hand-calculated" values expected_prices = [ ((10.0) / 1.0), @@ -264,3 +268,29 @@ class FinanceTransformsTestCase(TestCase): assert tnfm_prices == expected_prices assert tnfm_volumes == expected_volumes + + def test_moving_stddev(self): + + trade_history = factory.create_trade_history( + 133, + [10.0, 15.0, 13.0, 12.0], + [100, 100, 100, 100], + timedelta(days=1), + self.trading_environment + ) + + stddev = StatefulTransform( + MovingStandardDev, + market_aware = False, + delta = timedelta(days = 2), + ) + self.source = SpecificEquityTrades(event_list=trade_history) + + transformed = list(stddev.transform(self.source)) + + vals = [message.tnfm_value for message in transformed] + + assert vals == [0.0, 2.5, 1.0, 0.5] + + + From 496851ef72bcce6e17407f6d9e802b4673a9177b Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 15 Aug 2012 12:16:38 -0400 Subject: [PATCH 16/46] added stddev transform --- zipline/gens/stddev.py | 100 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 zipline/gens/stddev.py diff --git a/zipline/gens/stddev.py b/zipline/gens/stddev.py new file mode 100644 index 00000000..144f3ace --- /dev/null +++ b/zipline/gens/stddev.py @@ -0,0 +1,100 @@ +from numbers import Number +from datetime import datetime, timedelta +from collections import defaultdict +from math import sqrt + +from zipline import ndict +from zipline.gens.transform import EventWindow + +class MovingStandardDev(object): + """ + Class that maintains a dicitonary from sids to + MovingStandardDevWindows. For each sid, we maintain a the + standard deviation of all events falling within the specified + window. + """ + + def __init__(self, market_aware, days = None, delta = None): + + self.market_aware = market_aware + + self.delta = delta + self.days = days + + # Market-aware mode only works with full-day windows. + if self.market_aware: + assert self.days and not self.delta,\ + "Market-aware mode only works with full-day windows." + + # Non-market-aware mode requires a timedelta. + else: + assert self.delta and not self.days, \ + "Non-market-aware mode requires a timedelta." + + # No way to pass arguments to the defaultdict factory, so we + # need to define a method to generate the correct EventWindows. + self.sid_windows = defaultdict(self.create_window) + + def create_window(self): + """ + Factory method for self.sid_windows. + """ + return MovingStandardDevWindow( + self.market_aware, + self.days, + self.delta + ) + + def update(self, event): + """ + Update the event window for this event's sid. Return an ndict + from tracked fields to moving averages. + """ + # This will create a new EventWindow if this is the first + # message for this sid. + window = self.sid_windows[event.sid] + window.update(event) + return window.get_stddev() + +class MovingStandardDevWindow(EventWindow): + """ + Iteratively calculates standard deviation for a particular sid + over a given time window. The expected functionality of this + class is to be instantiated inside a MovingStandardDev. + """ + + def __init__(self, market_aware, days, delta): + + # Call the superclass constructor to set up base EventWindow + # infrastructure. + EventWindow.__init__(self, market_aware, days, delta) + + self.sum = 0.0 + self.sum_sqr = 0.0 + + def handle_add(self, event): + assert event.has_key('price') + assert isinstance(event.price, Number) + + self.sum += event.price + self.sum_sqr += event.price ** 2 + + def handle_remove(self, event): + assert event.has_key('price') + assert isinstance(event.price, Number) + + self.sum -= event.price + self.sum_sqr -= event.price ** 2 + + def get_stddev(self): + + # Stddev is 0 if we have only one event. len(self) is + # provided by EventWindow superclass. + if len(self) == 1: + return 0.0 + + else: + average = self.sum /len(self.ticks) + variance = (self.sum_sqr - self.sum*average) / len(self) + stddev = sqrt(variance) + return stddev From 9537067138a9849e28b1be87ad157d7e002b95b4 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 15 Aug 2012 13:31:15 -0400 Subject: [PATCH 17/46] fix comment in EventWindow --- zipline/gens/transform.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 60d0be85..51475f45 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -159,8 +159,9 @@ class EventWindow: from the window. Subclass these methods along with init(*args, **kwargs) to calculate metrics over the window. - The market_aware flag is used to toggle whether the eventwindow - calculates + If the market_aware flag is True, the EventWindow drops old events + based on the number of elapsed trading days between newest and oldest. + Otherwise old events are dropped based on a raw timedelta. See zipline/gens/mavg.py and zipline/gens/vwap.py for example implementations of moving average and volume-weighted average From 1659cfff454fe3eff646479fa15747868b2923c2 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 15 Aug 2012 16:16:36 -0400 Subject: [PATCH 18/46] fixed bessel correction in stddev --- tests/test_transforms.py | 28 +++++++++++++++++++++++----- zipline/gens/stddev.py | 14 +++++++------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 69ab0ac1..e515d725 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,4 +1,5 @@ import pytz +import numpy from datetime import timedelta, datetime from collections import defaultdict @@ -270,27 +271,44 @@ class FinanceTransformsTestCase(TestCase): assert tnfm_volumes == expected_volumes def test_moving_stddev(self): - trade_history = factory.create_trade_history( 133, [10.0, 15.0, 13.0, 12.0], [100, 100, 100, 100], - timedelta(days=1), + timedelta(hours = 1), self.trading_environment ) stddev = StatefulTransform( MovingStandardDev, market_aware = False, - delta = timedelta(days = 2), + delta = timedelta(minutes = 150), ) self.source = SpecificEquityTrades(event_list=trade_history) transformed = list(stddev.transform(self.source)) - + vals = [message.tnfm_value for message in transformed] + + expected = [ + None, + numpy.std([10.0, 15.0], ddof = 1), + numpy.std([10.0, 15.0, 13.0], ddof = 1), + numpy.std([15.0, 13.0, 12.0], ddof = 1), + ] - assert vals == [0.0, 2.5, 1.0, 0.5] + # numpy has odd rounding behavior, cf. + # http://docs.scipy.org/doc/numpy/reference/generated/numpy.std.html + for v1, v2 in zip(vals, expected): + + if v1 == None: + assert v2 == None + continue + assert round(v1, 5) == round(v2, 5) + + + + diff --git a/zipline/gens/stddev.py b/zipline/gens/stddev.py index 144f3ace..1f46429a 100644 --- a/zipline/gens/stddev.py +++ b/zipline/gens/stddev.py @@ -88,13 +88,13 @@ class MovingStandardDevWindow(EventWindow): def get_stddev(self): - # Stddev is 0 if we have only one event. len(self) is - # provided by EventWindow superclass. - if len(self) == 1: - return 0.0 + # Sample standard deviation is undefined for a single event or + # no events. + if len(self) <= 1: + return None else: - average = self.sum /len(self.ticks) - variance = (self.sum_sqr - self.sum*average) / len(self) - stddev = sqrt(variance) + average = self.sum /len(self) + s_squared = (self.sum_sqr - self.sum*average) / (len(self) - 1) + stddev = sqrt(s_squared) return stddev From c911dca4d248e6c250340d4a6d1b5bcb10adcb0d Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 16 Aug 2012 13:01:22 -0400 Subject: [PATCH 19/46] added a kwarg to run in blocking --- zipline/utils/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 2ebbae73..99175c63 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -117,8 +117,8 @@ def drain_receiver(receiver, count=None): return output, transaction_count -def assert_single_position(test, zipline): - output, transaction_count = drain_zipline(test, zipline) +def assert_single_position(test, zipline, blocking=False): + output, transaction_count = drain_zipline(test, zipline, p_blocking=blocking) test.assertEqual( test.zipline_test_config['order_count'], From 5a70c7464aaf40ebbfd59eae350840c15f77a868 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Fri, 17 Aug 2012 11:07:12 -0400 Subject: [PATCH 20/46] update the comments in tradesim --- zipline/gens/tradesimulation.py | 39 +++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 22afa34f..13d60194 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -1,3 +1,5 @@ +import signal + from logbook import Logger, Processor from datetime import datetime, timedelta @@ -13,10 +15,19 @@ from zipline.gens.utils import hash_args log = Logger('Trade Simulation') +class AlgoTimeoutException(Exception): + def __init__(self): + pass + +def handle_init_timeout(signum, frame): + log.error("Algorithm timed out during initialize.") + raise + + class TradeSimulationClient(object): """ - Generator that takes the expected output of a merge, a user - algorithm, a trading environment, and a simulator style as + Generator-style class that takes the expected output of a merge, a + user algorithm, a trading environment, and a simulator style as arguments. Pipes the merge stream through a TransactionSimulator and a PerformanceTracker, which keep track of the current state of our algorithm's simulated universe. Results are fed to the user's @@ -24,7 +35,7 @@ class TradeSimulationClient(object): TransactionSimulator's order book. TransactionSimulator maintains a dictionary from sids to the - unfulfilled orders placed by the user's algorithm. As trade + as-yet unfilled orders placed by the user's algorithm. As trade events arrive, if the algorithm has open orders against the trade's sid, the simulator will fill orders up to 25% of market cap. Applied transactions are added to a txn field on the event @@ -40,9 +51,9 @@ class TradeSimulationClient(object): performance report, which is appended to event's perf_report field. - Fully processed events are run through a batcher generator, which - batches together events with the same dt field into a single event - to be fed to the algo. The portfolio object is repeatedly + Fully processed events are fed to AlgorithmSimulator, which + batches together events with the same dt field into a single + snapshot to be fed to the algo. The portfolio object is repeatedly overwritten so that only the most recent snapshot of the universe is sent to the algo. """ @@ -54,13 +65,14 @@ class TradeSimulationClient(object): self.environment = environment self.style = sim_style self.algo_sim = None - + self.warmup_start = self.environment.prior_day_open self.algo_start = self.environment.first_open def get_hash(self): """ - There should only ever be one TSC in the system. + There should only ever be one TSC in the system, so + we don't bother passing args into the hash. """ return self.__class__.__name__ + hash_args() @@ -92,9 +104,9 @@ class TradeSimulationClient(object): with_portfolio = perf_tracker.transform(with_filled_orders) # Pass the messages from perf along with the trading client's - # state into the algorithm for simulation. We provide the - # trading client so that the algorithm can place new orders - # into the client's order book. + # state into the algorithm for simulation. We provide a + # pointer to the ordering client's internal state so that the + # algorithm can place new orders into the client's order book. self.algo_sim = AlgorithmSimulator( with_portfolio, ordering_client.state, @@ -273,7 +285,9 @@ class AlgorithmSimulator(object): def update_current_snapshot(self, event): """ - Update our current snapshot of the universe. Call handle_data if + Update our current snapshot of the universe. If event.dt doesn't + match our current snapshot's dt, we simulate the current snapshot + before processing the event. """ # The new event matches our snapshot dt. Just update the # universe and move on. @@ -326,3 +340,4 @@ class AlgorithmSimulator(object): # Update our knowledge of this event's sid for field in event.keys(): self.universe[event.sid][field] = event[field] + From 7696393179a42a0b804e12fad0b6f3aad7bea6e4 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 17 Aug 2012 18:36:05 -0400 Subject: [PATCH 21/46] removed spammy log lines. --- zipline/gens/sort.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/zipline/gens/sort.py b/zipline/gens/sort.py index c76db032..d8ebd173 100644 --- a/zipline/gens/sort.py +++ b/zipline/gens/sort.py @@ -27,12 +27,11 @@ def date_sort(stream_in, source_ids): sources[id] = deque() # Process incoming streams. - log.info('Sorting first message') for message in stream_in: # Incoming messages should be the output of DATASOURCE_UNFRAME. assert_datasource_unframe_protocol(message), \ "Bad message in date_sort: %s" % message - + # Only allow messages from sources we expect. assert message.source_id in sources, "Unexpected source: %s" % message @@ -45,13 +44,12 @@ def date_sort(stream_in, source_ids): message = pop_oldest(sources) assert_sort_protocol(message) yield message - + # We should have only a done message left in each queue. for queue in sources.itervalues(): assert len(queue) == 1, "Bad queue in date_sort on exit: %s" % queue assert queue[0].dt == "DONE", \ "Bad last message in date_sort on exit: %s" % queue - log.info('Successfully finished Sorting') def ready(sources): """ From a68a48b62e6bb31534a15a29eb17eae65ac0c23c Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sat, 18 Aug 2012 17:07:20 -0400 Subject: [PATCH 22/46] removed deprecated test and refactored tradesimulation time compression logic --- tests/test_finance.py | 30 ----- zipline/gens/tradesimulation.py | 210 +++++++++++++++----------------- 2 files changed, 98 insertions(+), 142 deletions(-) diff --git a/tests/test_finance.py b/tests/test_finance.py index e5f26240..94a98da9 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -121,36 +121,6 @@ class FinanceTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) assert_single_position(self, zipline) - #@timed(DEFAULT_TIMEOUT) - def test_sid_filter(self): - # Ensure the algorithm's filter prevents events from arriving. - # create a test algorithm whose filter will not match any of the - # trade events sourced inside the zipline. - order_amount = 100 - order_count = 100 - no_match_sid = 222 - test_algo = TestAlgorithm( - no_match_sid, - order_amount, - order_count - ) - - self.zipline_test_config['trade_count'] = 200 - self.zipline_test_config['algorithm'] = test_algo - - zipline = SimulatedTrading.create_test_zipline( - **self.zipline_test_config - ) - output, transaction_count = drain_zipline(self, zipline) - - #check that the algorithm received no events - self.assertEqual( - 0, - transaction_count, - "The algorithm should not receive any events due to filtering." - ) - - # TODO: write tests for short sales # TODO: write a test to do massive buying or shorting. diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 13d60194..6a5e88b2 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -1,11 +1,12 @@ -import signal from logbook import Logger, Processor from datetime import datetime, timedelta from numbers import Integral +from itertools import groupby from zipline import ndict +from zipline.utils import heartbeat from zipline.gens.transform import StatefulTransform from zipline.finance.trading import TransactionSimulator @@ -16,13 +17,7 @@ from zipline.gens.utils import hash_args log = Logger('Trade Simulation') class AlgoTimeoutException(Exception): - def __init__(self): - pass - -def handle_init_timeout(signum, frame): - log.error("Algorithm timed out during initialize.") - raise - + pass class TradeSimulationClient(object): """ @@ -118,12 +113,17 @@ class TradeSimulationClient(object): # calculated by the performance tracker) at the end of each # day. It will also yield a risk report at the end of the # simulation. + for message in self.algo_sim: yield message class AlgorithmSimulator(object): - def __init__(self, stream_in, order_book, algo, algo_start): + def __init__(self, + stream_in, + order_book, + algo, + algo_start): self.stream_in = stream_in @@ -140,15 +140,15 @@ class AlgorithmSimulator(object): self.algo_start = algo_start # Monkey patch the user algorithm to place orders in the - # TransactionSimulator's order book. + # TransactionSimulator's order book and use our logger. self.algo.set_order(self.order) - self.algo.set_logger(Logger("AlgoLog")) - + self.algolog = Logger("AlgoLog") + self.algo.set_logger(self.algolog) # ============== # Snapshot Setup # ============== - + # The algorithm's universe as of our most recent event. self.universe = ndict() @@ -159,7 +159,7 @@ class AlgorithmSimulator(object): # We don't have a datetime for the current snapshot until we # receive a message. self.simulation_dt = None - self.this_snapshot_dt = None + self.snapshot_dt = None # ============= # Logging Setup @@ -168,7 +168,7 @@ class AlgorithmSimulator(object): # Processor function for injecting the algo_dt into # user prints/logs. def inject_algo_dt(record): - record.extra['algo_dt'] = self.this_snapshot_dt + record.extra['algo_dt'] = self.snapshot_dt self.processor = Processor(inject_algo_dt) # This is a class, which is instantiated later @@ -225,110 +225,65 @@ class AlgorithmSimulator(object): # snapshot time to any log record generated. with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''): + + #Set an alarm to go off if initialize takes more than 5 seconds. + signal.signal(signal.SIGALRM, self.handle_init_timeout) + signal.alarm(5) # Call the user's initialize method. self.algo.initialize() + # Deactivate the alarm. + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) - for event in self.stream_in: + # Group together events with the same dt field. This depends on the + # events already being sorted. + for date, snapshot in groupby(self.stream_in, lambda e: e.dt): + + # Set the simulation date to be the first event we see. + # This should only occur once, at the start of the test. + if self.simulation_dt == None: + self.simulation_dt = date + + # Done message has the risk report, so we yield before exiting. + if date == 'DONE': + for event in snapshot: + yield event.perf_message # We're still in the warmup period. Use the event to - # update our universe, but don't start a snapshot or - # pass anything to handle_data. Discard any - # perf messages. - if event.dt != 'DONE' and event.dt < self.algo_start: - self.update_universe(event) - if event.perf_message: - log.info("Discarding perf message because we're in warmup.") - continue + # update our universe, but don't yield any perf messages, + # and don't send a snapshot to handle_data. + elif date < self.algo_start: + for event in snapshot: + del event['perf_message'] + self.update_universe(event) - # Yield any perf messages received to be relayed back to - # the browser. - - if event.perf_message: - yield event.perf_message - del event['perf_message'] - - if event.dt == "DONE": - if self.this_snapshot_dt: - # StopIteration happened mid-snapshot, so we - # have a universe snapshot that is not yet - # processed by the algorithm. - self.simulate_current_snapshot() - - # Break out of the loop, causing us to raise - # StopIteration This needs to be outside the check - # on self.this_snapshot_dt or else getting a DONE - # immediately after a snapshot finishes will cause - # type errors. - break - - # This should only happen for the first event we run. - if self.simulation_dt == None: - self.simulation_dt = event.dt - - # ====================== - # Time Compression Logic - # ====================== - - if self.this_snapshot_dt != None: - self.update_current_snapshot(event) - - # The algorithm has been missing events because it took - # too long processing. Update the universe with data from - # this event, then check if enough time has passed that we - # can start a new snapshot. + # The algo has taken so long to process events that + # its simulated time is later than the event time. + # Update the universe and yield any perf messages + # encountered, but don't call handle_data. + elif date < self.simulation_dt: + for event in snapshot: + # Only yield if we have something interesting to say. + if event.perf_message != None: + yield event.perf_message + # Delete the message before updating so we don't send it + # to the user. + del event['perf_message'] + self.update_universe(event) + + # Regular snapshot. Update the universe and send a snapshot + # to handle data. else: - self.update_universe(event) - if event.dt >= self.simulation_dt: - self.this_snapshot_dt = event.dt - - - - def update_current_snapshot(self, event): - """ - Update our current snapshot of the universe. If event.dt doesn't - match our current snapshot's dt, we simulate the current snapshot - before processing the event. - """ - # The new event matches our snapshot dt. Just update the - # universe and move on. - if event.dt == self.this_snapshot_dt: - self.update_universe(event) - - # The new event does not match our snapshot. - else: - self.simulate_current_snapshot() - - # Once we've finished simulating the old snapshot, - # we can update the universe with the new event. - self.update_universe(event) - - # The current event is later than the simulation time, - # which means the algorithm finished quickly enough to - # receive the new event. Start a new snapshot with this - # event's dt. - if event.dt >= self.simulation_dt: - self.this_snapshot_dt = event.dt - - # The algorithm spent enough time processing that it - # missed the new event. Wait to start a new snapshot until - # the events catch up to the algo's simulated dt. - else: - self.this_snapshot_dt = None - - def simulate_current_snapshot(self): - """ - Run the user's algo against our current snapshot and update the algo's - simulated time. - """ - start_tic = datetime.now() - self.algo.handle_data(self.universe) - stop_tic = datetime.now() - - # How long did you take? - delta = stop_tic - start_tic - - # Update the simulation time. - self.simulation_dt = self.this_snapshot_dt + delta + for event in snapshot: + # Only yield if we have something interesting to say. + if event.perf_message != None: + yield event.perf_message + del event['perf_message'] + + self.update_universe(event) + + # Send the current state of the universe to the user's algo. + self.simulate_snapshot(date) def update_universe(self, event): """ @@ -341,3 +296,34 @@ class AlgorithmSimulator(object): for field in event.keys(): self.universe[event.sid][field] = event[field] + # Ping every 10 seconds. Timeout after 9 pings. + @heartbeat(10, 9, self.handle_simulation_ping) + def simulate_snapshot(self, date): + """ + Run the user's algo against our current snapshot and update + the algo's simulated time. + """ + start_tic = datetime.now() + + self.algo.handle_data(self.universe) + stop_tic = datetime.now() + + # How long did you take? + delta = stop_tic - start_tic + + # Update the simulation time. + self.simulation_dt = date + delta + + def handle_init_timeout(self, signum, frame): + """ + Handler method for initialize timeout. + """ + log.error("Algorithm timed out during initialize.") + raise AlgoTimeoutException("More than 5 seconds in initialize.") + + def handle_simulation_ping(self, frame): + """ + Frame handler for decorated simulate_snapshot method. + """ + + From 58b027798a771178cad45be77f5afba8d4c5d84f Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sat, 18 Aug 2012 17:08:41 -0400 Subject: [PATCH 23/46] added timeout and heartbeat decorators --- zipline/utils/timeout.py | 102 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 zipline/utils/timeout.py diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py new file mode 100644 index 00000000..02b0903d --- /dev/null +++ b/zipline/utils/timeout.py @@ -0,0 +1,102 @@ +import signal + +from pprint import pprint as pp +from numbers import Number +from logbook import Logger + +class Timeout(Exception): + + def __init__(self, frame): + self.frame = frame + + +class timeout(object): + """ + Decorator to make a function raise TimeoutException if it spends + more than a specified number of seconds executing. + """ + + def __init__(self, seconds): + self.seconds = seconds + assert isinstance(seconds, Number), "Failed to specify a timeout." + assert seconds > 0, "Timeout must be greater than 0" + + def handler(self, signum, frame): + raise Timeout(frame) + + def __call__(self, fn): + + def wrapped(*args, **kwargs): + # Set the alarm. + signal.signal(signal.SIGALRM, self.handler) + signal.alarm(self.seconds) + try: + outval = fn(*args, **kwargs) + + # Deactivate the alarm once we're done so that the + # decorator doesn't have unexpected side-effects later. + # Note that this will still raise TimeoutException if the + # call to fn takes too long. + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + # Return the value of fn if it finished before the alarm. This + # won't execute if the Timeout was raised. + return outval + return wrapped + +class heartbeat(object): + """ + Decorator to perform pseudo-heartbeat checks on a single-threaded + function. Calls frame_handler on the current stack frame of the + decorated function every ``interval`` seconds. After ``max_interval`` + intervals, raises MaxHeartBeats + """ + + def __init__(self, interval, max_intervals, frame_handler=None): + self.count = 0 + self.interval = interval + self.max_intervals = max_intervals + self.frame_handler = frame_handler + + def handler(self, signum, frame): + self.count += 1 + if self.frame_handler: + self.frame_handler(frame) + + if self.count > self.max_intervals: + raise Timeout(frame) + + def __call__(self, fn): + def wrapped(*args, **kwargs): + # Set a timer to call our handler every N seconds. + signal.signal(signal.SIGALRM, self.handler) + signal.setitimer(signal.ITIMER_REAL, self.interval, self.interval) + try: + outval = fn(*args, **kwargs) + + # Deactivate the timer once we're done so that the + # decorator doesn't have unexpected side-effects later. + finally: + signal.setitimer(signal.ITIMER_REAL, 0, 0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + + # Return the value of fn if it finished without tripping + # an exception. This won't execute if the Timeout or any + # other exception was raised by self.handle. + return outval + return wrapped + +if __name__ == "__main__": + import time + + def pframe_g(frame): + print frame.f_globals + + @heartbeat(1, 10, pframe_g) + def foo(): + for i in xrange(10000): + time.sleep(.1) + print i + foo() From 18a1c88145d0d1e472c02918c275c6325b40633b Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sat, 18 Aug 2012 21:10:28 -0400 Subject: [PATCH 24/46] fix import signal --- zipline/gens/tradesimulation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 6a5e88b2..14669195 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -1,4 +1,4 @@ - +import signal from logbook import Logger, Processor from datetime import datetime, timedelta @@ -6,7 +6,7 @@ from numbers import Integral from itertools import groupby from zipline import ndict -from zipline.utils import heartbeat +from zipline.utils.timeout import heartbeat from zipline.gens.transform import StatefulTransform from zipline.finance.trading import TransactionSimulator @@ -225,7 +225,7 @@ class AlgorithmSimulator(object): # snapshot time to any log record generated. with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''): - + #Set an alarm to go off if initialize takes more than 5 seconds. signal.signal(signal.SIGALRM, self.handle_init_timeout) signal.alarm(5) @@ -297,7 +297,7 @@ class AlgorithmSimulator(object): self.universe[event.sid][field] = event[field] # Ping every 10 seconds. Timeout after 9 pings. - @heartbeat(10, 9, self.handle_simulation_ping) + # @heartbeat(10, 9, self.handle_simulation_ping) def simulate_snapshot(self, date): """ Run the user's algo against our current snapshot and update @@ -325,5 +325,5 @@ class AlgorithmSimulator(object): """ Frame handler for decorated simulate_snapshot method. """ - + print 'foo' From 0ef89ee7049cbf37081ffbd254dcc18ca73d1047 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sat, 18 Aug 2012 22:04:21 -0400 Subject: [PATCH 25/46] fix algo_dt logging --- zipline/gens/tradesimulation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 14669195..8784a4fc 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -303,6 +303,10 @@ class AlgorithmSimulator(object): Run the user's algo against our current snapshot and update the algo's simulated time. """ + # Needs to be set so that we inject the proper date into algo + # log/print lines. + self.snapshot_dt = date + start_tic = datetime.now() self.algo.handle_data(self.universe) From b5cb08ceedbfdeb4c91dde66cc729cfae3609314 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 19 Aug 2012 02:21:43 -0400 Subject: [PATCH 26/46] heartbeat and timeout usable as context managers --- zipline/utils/timeout.py | 97 +++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 37 deletions(-) diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 02b0903d..ae844558 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -1,84 +1,109 @@ import signal +from functools import wraps + from pprint import pprint as pp from numbers import Number from logbook import Logger class Timeout(Exception): - def __init__(self, frame): + def __init__(self, frame, message=''): self.frame = frame + self.message = message - class timeout(object): """ - Decorator to make a function raise TimeoutException if it spends - more than a specified number of seconds executing. + Utility to make a function raise TimeoutException if it spends + more than a specified number of seconds executing. Can be used + as a decorator to apply a static timeout to a function, or as + a context manager to dynamically add a timeout to a code block. """ - def __init__(self, seconds): + def __init__(self, seconds, message=''): self.seconds = seconds + self.message = message assert isinstance(seconds, Number), "Failed to specify a timeout." assert seconds > 0, "Timeout must be greater than 0" def handler(self, signum, frame): - raise Timeout(frame) + raise Timeout(frame, self.message) def __call__(self, fn): - - def wrapped(*args, **kwargs): + + @wraps(fn) + def call_fn_with_timeout(*args, **kwargs): # Set the alarm. signal.signal(signal.SIGALRM, self.handler) - signal.alarm(self.seconds) + signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) try: outval = fn(*args, **kwargs) # Deactivate the alarm once we're done so that the # decorator doesn't have unexpected side-effects later. - # Note that this will still raise TimeoutException if the + # Note that this will still raise Timeout if the # call to fn takes too long. finally: - signal.alarm(0) + signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, signal.SIG_DFL) # Return the value of fn if it finished before the alarm. This # won't execute if the Timeout was raised. return outval - return wrapped + return call_fn_with_timeout + + def __enter__(self): + # Set the alarm on entrance. + signal.signal(signal.SIGALRM, self.handler) + signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) + + def __exit__(self, type, value, traceback): + # Deactivate the alarm on exit. This will re-raise + # any exceptions raised inside the with block. + signal.signal(signal.SIGALRM, self.handler) + signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) class heartbeat(object): """ - Decorator to perform pseudo-heartbeat checks on a single-threaded + Utility to perform pseudo-heartbeat checks on a single-threaded function. Calls frame_handler on the current stack frame of the - decorated function every ``interval`` seconds. After ``max_interval`` - intervals, raises MaxHeartBeats + wrapped function every ``interval`` seconds. After ``max_interval`` + intervals, raises Timeout. Can be used either as a decorator or + a context manager. """ + def __init__(self, + interval, + max_intervals, + frame_handler=None, + timeout_message=''): - def __init__(self, interval, max_intervals, frame_handler=None): - self.count = 0 self.interval = interval self.max_intervals = max_intervals self.frame_handler = frame_handler + self.timeout_message = timeout_message + self.count = 0 def handler(self, signum, frame): self.count += 1 if self.frame_handler: - self.frame_handler(frame) + self.frame_handler(self.count, frame) - if self.count > self.max_intervals: - raise Timeout(frame) + if self.count >= self.max_intervals: + raise Timeout(frame, self.timeout_message) def __call__(self, fn): - def wrapped(*args, **kwargs): - # Set a timer to call our handler every N seconds. + + @wraps(fn) + def call_fn_with_heartbeat(*args, **kwargs): + # Set a timer to call our handler every ``interval`` seconds. signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, self.interval, self.interval) try: outval = fn(*args, **kwargs) - # Deactivate the timer once we're done so that the - # decorator doesn't have unexpected side-effects later. finally: + # Deactivate the timer once we're done so that the + # decorator doesn't have unexpected side-effects later. signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, signal.SIG_DFL) @@ -86,17 +111,15 @@ class heartbeat(object): # an exception. This won't execute if the Timeout or any # other exception was raised by self.handle. return outval - return wrapped - -if __name__ == "__main__": - import time + return call_fn_with_heartbeat - def pframe_g(frame): - print frame.f_globals - - @heartbeat(1, 10, pframe_g) - def foo(): - for i in xrange(10000): - time.sleep(.1) - print i - foo() + def __enter__(self): + # Set a timer to call our handler every N seconds. + signal.signal(signal.SIGALRM, self.handler) + signal.setitimer(signal.ITIMER_REAL, self.interval, self.interval) + + def __exit__(self, type, value, traceback): + # Turn off the timer on exit. This will re-raise any exception raised + # during execution of the with-block + signal.setitimer(signal.ITIMER_REAL, 0, 0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) From 69ac68af2ee2d0010f2dbab22f7413752466da9c Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 19 Aug 2012 02:22:16 -0400 Subject: [PATCH 27/46] heartbeating for handle_data --- tests/test_exception_handling.py | 35 +++++++++++++++++++- zipline/gens/tradesimulation.py | 57 +++++++++++++++----------------- zipline/test_algorithms.py | 50 ++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 32 deletions(-) diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index d1561837..ac6339bc 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -3,7 +3,8 @@ import zmq from unittest2 import TestCase from collections import defaultdict -from zipline.test_algorithms import ExceptionAlgorithm, DivByZeroAlgorithm +from zipline.test_algorithms import ExceptionAlgorithm, DivByZeroAlgorithm, \ + InitializeTimeoutAlgorithm, TooMuchProcessingAlgorithm from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading @@ -143,3 +144,35 @@ class ExceptionTestCase(TestCase): # make sure our path shortening is working self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py') self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py') + + def test_initialize_timeout(self): + + self.zipline_test_config['algorithm'] = \ + InitializeTimeoutAlgorithm( + self.zipline_test_config['sid'] + ) + + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) + output, _ = drain_zipline(self, zipline) + self.assertEqual(output[-1]['prefix'], 'EXCEPTION') + payload = output[-1]['payload'] + self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['message'], 'Call to initialize timed out') + +# def test_heartbeat(self): + +# self.zipline_test_config['algorithm'] = \ +# TooMuchProcessingAlgorithm( +# self.zipline_test_config['sid'] +# ) +# zipline = SimulatedTrading.create_test_zipline( +# **self.zipline_test_config +# ) +# output, _ = drain_zipline(self, zipline) +# self.assertEqual(output[-1]['prefix'], 'EXCEPTION') +# payload = output[-1]['payload'] +# self.assertEqual(payload['name'],'Timeout') +# self.assertEqual(payload['message'], 'Too much time spent in handle_data call') + diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 8784a4fc..811f99ac 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -6,7 +6,7 @@ from numbers import Integral from itertools import groupby from zipline import ndict -from zipline.utils.timeout import heartbeat +from zipline.utils.timeout import timeout, heartbeat, Timeout from zipline.gens.transform import StatefulTransform from zipline.finance.trading import TransactionSimulator @@ -16,8 +16,10 @@ from zipline.gens.utils import hash_args log = Logger('Trade Simulation') -class AlgoTimeoutException(Exception): - pass +# TODO: make these arguments rather than global constants +INIT_TIMEOUT = 5 +HEARTBEAT_INTERVAL = 1 # seconds +MAX_HEARTBEAT_INTERVALS = 15 class TradeSimulationClient(object): """ @@ -145,6 +147,21 @@ class AlgorithmSimulator(object): self.algolog = Logger("AlgoLog") self.algo.set_logger(self.algolog) + # Handler for heartbeats during calls to handle_data. + def log_heartbeats(beat_count, stackframe): + t = beat_count * HEARTBEAT_INTERVAL + warning = "handle_data has been processing for %i seconds" %t + self.algolog.warn(warning) + + # Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL + # seconds, raising an exception after MAX_HEARTBEATS + self.heartbeat_monitor = heartbeat( + HEARTBEAT_INTERVAL, + MAX_HEARTBEAT_INTERVALS, + frame_handler=log_heartbeats, + timeout_message="Too much time spent in handle_data call" + ) + # ============== # Snapshot Setup # ============== @@ -223,17 +240,11 @@ class AlgorithmSimulator(object): # Capture any output of this generator to stdout and pipe it # to a logbook interface. Also inject the current algo # snapshot time to any log record generated. - with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''): - #Set an alarm to go off if initialize takes more than 5 seconds. - signal.signal(signal.SIGALRM, self.handle_init_timeout) - signal.alarm(5) - # Call the user's initialize method. - self.algo.initialize() - # Deactivate the alarm. - signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) + # Call user's initialize method with a timeout. + with timeout(INIT_TIMEOUT, message="Call to initialize timed out"): + self.algo.initialize() # Group together events with the same dt field. This depends on the # events already being sorted. @@ -243,7 +254,7 @@ class AlgorithmSimulator(object): # This should only occur once, at the start of the test. if self.simulation_dt == None: self.simulation_dt = date - + # Done message has the risk report, so we yield before exiting. if date == 'DONE': for event in snapshot: @@ -296,8 +307,6 @@ class AlgorithmSimulator(object): for field in event.keys(): self.universe[event.sid][field] = event[field] - # Ping every 10 seconds. Timeout after 9 pings. - # @heartbeat(10, 9, self.handle_simulation_ping) def simulate_snapshot(self, date): """ Run the user's algo against our current snapshot and update @@ -308,8 +317,8 @@ class AlgorithmSimulator(object): self.snapshot_dt = date start_tic = datetime.now() - - self.algo.handle_data(self.universe) + with self.heartbeat_monitor: + self.algo.handle_data(self.universe) stop_tic = datetime.now() # How long did you take? @@ -317,17 +326,3 @@ class AlgorithmSimulator(object): # Update the simulation time. self.simulation_dt = date + delta - - def handle_init_timeout(self, signum, frame): - """ - Handler method for initialize timeout. - """ - log.error("Algorithm timed out during initialize.") - raise AlgoTimeoutException("More than 5 seconds in initialize.") - - def handle_simulation_ping(self, frame): - """ - Frame handler for decorated simulate_snapshot method. - """ - print 'foo' - diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index a7881fa8..f6bdfd4d 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -221,6 +221,56 @@ class DivByZeroAlgorithm(): def get_sid_filter(self): return [self.sid] +class InitializeTimeoutAlgorithm(): + def __init__(self, sid): + self.sid = sid + self.incr = 0 + + def initialize(self): + import time + from zipline.gens.tradesimulation import INIT_TIMEOUT + time.sleep(INIT_TIMEOUT + 1) + + def set_order(self, order_callable): + pass + + def set_logger(self, logger): + pass + + def set_portfolio(self, portfolio): + pass + + def handle_data(self, data): + pass + + def get_sid_filter(self): + return [self.sid] + +class TooMuchProcessingAlgorithm(): + def __init__(self, sid): + self.sid = sid + + def initialize(self): + pass + + def set_order(self, order_callable): + pass + + def set_logger(self, logger): + pass + + def set_portfolio(self, portfolio): + pass + + def handle_data(self, data): + # Unless we're running on some sort of + # supercomputer this will hit timeout. + for i in xrange(100000000): + self.foo = i + + def get_sid_filter(self): + return [self.sid] + class TimeoutAlgorithm(): def __init__(self, sid): From 8b67d6a45c4fb4a17ed6b42713c4aca968e0b0f3 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 19 Aug 2012 15:00:09 -0400 Subject: [PATCH 28/46] fix __exit__ for timeout --- zipline/utils/timeout.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index ae844558..0310a208 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -61,7 +61,7 @@ class timeout(object): # Deactivate the alarm on exit. This will re-raise # any exceptions raised inside the with block. signal.signal(signal.SIGALRM, self.handler) - signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) + signal.setitimer(signal.ITIMER_REAL, 0, 0) class heartbeat(object): """ From ece5d45c95090e14351d3afbd1b362b65955dd4c Mon Sep 17 00:00:00 2001 From: fawce Date: Sun, 19 Aug 2012 23:34:52 -0400 Subject: [PATCH 29/46] bumping the timeout to 90s --- zipline/gens/tradesimulation.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 811f99ac..806f1b36 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -18,8 +18,8 @@ log = Logger('Trade Simulation') # TODO: make these arguments rather than global constants INIT_TIMEOUT = 5 -HEARTBEAT_INTERVAL = 1 # seconds -MAX_HEARTBEAT_INTERVALS = 15 +HEARTBEAT_INTERVAL = 10 # seconds +MAX_HEARTBEAT_INTERVALS = 9 #count class TradeSimulationClient(object): """ @@ -62,7 +62,7 @@ class TradeSimulationClient(object): self.environment = environment self.style = sim_style self.algo_sim = None - + self.warmup_start = self.environment.prior_day_open self.algo_start = self.environment.first_open @@ -121,10 +121,10 @@ class TradeSimulationClient(object): class AlgorithmSimulator(object): - def __init__(self, - stream_in, - order_book, - algo, + def __init__(self, + stream_in, + order_book, + algo, algo_start): self.stream_in = stream_in @@ -156,8 +156,8 @@ class AlgorithmSimulator(object): # Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL # seconds, raising an exception after MAX_HEARTBEATS self.heartbeat_monitor = heartbeat( - HEARTBEAT_INTERVAL, - MAX_HEARTBEAT_INTERVALS, + HEARTBEAT_INTERVAL, + MAX_HEARTBEAT_INTERVALS, frame_handler=log_heartbeats, timeout_message="Too much time spent in handle_data call" ) @@ -165,7 +165,7 @@ class AlgorithmSimulator(object): # ============== # Snapshot Setup # ============== - + # The algorithm's universe as of our most recent event. self.universe = ndict() @@ -249,12 +249,12 @@ class AlgorithmSimulator(object): # Group together events with the same dt field. This depends on the # events already being sorted. for date, snapshot in groupby(self.stream_in, lambda e: e.dt): - + # Set the simulation date to be the first event we see. # This should only occur once, at the start of the test. if self.simulation_dt == None: self.simulation_dt = date - + # Done message has the risk report, so we yield before exiting. if date == 'DONE': for event in snapshot: @@ -281,7 +281,7 @@ class AlgorithmSimulator(object): # to the user. del event['perf_message'] self.update_universe(event) - + # Regular snapshot. Update the universe and send a snapshot # to handle data. else: @@ -290,9 +290,9 @@ class AlgorithmSimulator(object): if event.perf_message != None: yield event.perf_message del event['perf_message'] - + self.update_universe(event) - + # Send the current state of the universe to the user's algo. self.simulate_snapshot(date) @@ -320,7 +320,7 @@ class AlgorithmSimulator(object): with self.heartbeat_monitor: self.algo.handle_data(self.universe) stop_tic = datetime.now() - + # How long did you take? delta = stop_tic - start_tic From 8fed50130c18a7ee19eb7428350ff038cab538b6 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Mon, 20 Aug 2012 18:04:30 -0400 Subject: [PATCH 30/46] reset heartbeat count on enter/exit --- zipline/utils/timeout.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 0310a208..2985222e 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -106,6 +106,7 @@ class heartbeat(object): # decorator doesn't have unexpected side-effects later. signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, signal.SIG_DFL) + self.count = 0 # Return the value of fn if it finished without tripping # an exception. This won't execute if the Timeout or any @@ -115,11 +116,13 @@ class heartbeat(object): def __enter__(self): # Set a timer to call our handler every N seconds. + self.count = 0 signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, self.interval, self.interval) def __exit__(self, type, value, traceback): # Turn off the timer on exit. This will re-raise any exception raised # during execution of the with-block + self.count = 0 signal.setitimer(signal.ITIMER_REAL, 0, 0) signal.signal(signal.SIGALRM, signal.SIG_DFL) From 57f1c6056cea2856b45f1434e8d7e4a916f5ee77 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Mon, 20 Aug 2012 18:24:17 -0400 Subject: [PATCH 31/46] unit test for heartbeating --- tests/test_exception_handling.py | 39 +++++++++++++++++++++----------- zipline/gens/tradesimulation.py | 5 ++-- zipline/test_algorithms.py | 2 +- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index ac6339bc..f16111ee 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -9,6 +9,8 @@ from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading from zipline.gens.transform import StatefulTransform +from zipline.gens.tradesimulation import HEARTBEAT_INTERVAL, \ + MAX_HEARTBEAT_INTERVALS from zipline.utils.test_utils import \ drain_zipline, \ @@ -161,18 +163,29 @@ class ExceptionTestCase(TestCase): self.assertEqual(payload['name'],'Timeout') self.assertEqual(payload['message'], 'Call to initialize timed out') -# def test_heartbeat(self): + def test_heartbeat(self): -# self.zipline_test_config['algorithm'] = \ -# TooMuchProcessingAlgorithm( -# self.zipline_test_config['sid'] -# ) -# zipline = SimulatedTrading.create_test_zipline( -# **self.zipline_test_config -# ) -# output, _ = drain_zipline(self, zipline) -# self.assertEqual(output[-1]['prefix'], 'EXCEPTION') -# payload = output[-1]['payload'] -# self.assertEqual(payload['name'],'Timeout') -# self.assertEqual(payload['message'], 'Too much time spent in handle_data call') + self.zipline_test_config['algorithm'] = \ + TooMuchProcessingAlgorithm( + self.zipline_test_config['sid'] + ) + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) + output, _ = drain_zipline(self, zipline) + + # There should be a message for each hearbeat, plus a message + # for the final timeout. + assert len(output) == MAX_HEARTBEAT_INTERVALS + 1 + + # Assert that everything but the last message is a heartbeat log. + for message in output[0:-1]: + assert message['prefix'] == 'LOG' + assert message['payload']['func_name'] == 'log_heartbeats' + + # Assert that the last message is a timeout exception. + self.assertEqual(output[-1]['prefix'], 'EXCEPTION') + payload = output[-1]['payload'] + self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['message'], 'Too much time spent in handle_data call') diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 811f99ac..02a9e03c 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -61,7 +61,6 @@ class TradeSimulationClient(object): self.sids = algo.get_sid_filter() self.environment = environment self.style = sim_style - self.algo_sim = None self.warmup_start = self.environment.prior_day_open self.algo_start = self.environment.first_open @@ -150,8 +149,10 @@ class AlgorithmSimulator(object): # Handler for heartbeats during calls to handle_data. def log_heartbeats(beat_count, stackframe): t = beat_count * HEARTBEAT_INTERVAL - warning = "handle_data has been processing for %i seconds" %t + warning = "handle_data has been processing for %i seconds" %t + # Log the warning to our logs as well as the user's log output. self.algolog.warn(warning) + log.warn(warning) # Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL # seconds, raising an exception after MAX_HEARTBEATS diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index f6bdfd4d..eb883469 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -265,7 +265,7 @@ class TooMuchProcessingAlgorithm(): def handle_data(self, data): # Unless we're running on some sort of # supercomputer this will hit timeout. - for i in xrange(100000000): + for i in xrange(1000000000): self.foo = i def get_sid_filter(self): From 0bd41c8db228d7cea031ce6fcc455e878a424c7f Mon Sep 17 00:00:00 2001 From: fawce Date: Mon, 20 Aug 2012 19:56:27 -0400 Subject: [PATCH 32/46] tweaked --- zipline/gens/tradesimulation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 806f1b36..3fdcc565 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -18,8 +18,8 @@ log = Logger('Trade Simulation') # TODO: make these arguments rather than global constants INIT_TIMEOUT = 5 -HEARTBEAT_INTERVAL = 10 # seconds -MAX_HEARTBEAT_INTERVALS = 9 #count +HEARTBEAT_INTERVAL = 1 # seconds +MAX_HEARTBEAT_INTERVALS = 15 #count class TradeSimulationClient(object): """ From ceb1061d87da9042199de1ef8b36c579cf4f9a32 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 21 Aug 2012 14:39:32 -0400 Subject: [PATCH 33/46] properly break out of loop when we hit max drawdown --- zipline/gens/tradesimulation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 3fdcc565..8c650dbc 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -259,6 +259,7 @@ class AlgorithmSimulator(object): if date == 'DONE': for event in snapshot: yield event.perf_message + break # We're still in the warmup period. Use the event to # update our universe, but don't yield any perf messages, From 1479adf519608800b7e85fc0ac1ca30817a11f08 Mon Sep 17 00:00:00 2001 From: fawce Date: Tue, 21 Aug 2012 19:26:46 +0000 Subject: [PATCH 34/46] fixed infinite looking loop in risk reporting, thanks to groupby --- zipline/finance/performance.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 9fee208b..98ac8daf 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -166,6 +166,7 @@ class PerformanceTracker(object): self.event_count = 0 self.last_dict = None self.exceeded_max_loss = False + self.no_more_updates = False self.results_socket = None self.results_addr = None @@ -203,9 +204,12 @@ class PerformanceTracker(object): self.todays_performance.positions[sid] = Position(sid) def update(self, event): - if event.dt == "DONE": + if self.no_more_updates: + return zp.ndict({'dt':0}) + elif event.dt == "DONE": event.perf_message = self.handle_simulation_end() del event['TRANSACTION'] + self.no_more_updates = True return event elif self.exceeded_max_loss: # in case of max_loss, signal to downstream @@ -213,6 +217,7 @@ class PerformanceTracker(object): event.dt = "DONE" event.perf_message = self.handle_simulation_end() del event['TRANSACTION'] + self.no_more_updates = True return event else: event.perf_message = self.process_event(event) From 88e0afc7a036dc514a53b32451d8a9947742ca90 Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Mon, 20 Aug 2012 16:55:37 -0400 Subject: [PATCH 35/46] Require zipline tests to end with a DONE packet --- zipline/utils/test_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 99175c63..ae804426 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -119,6 +119,7 @@ def drain_receiver(receiver, count=None): def assert_single_position(test, zipline, blocking=False): output, transaction_count = drain_zipline(test, zipline, p_blocking=blocking) + test.assertEqual(output[-1]['prefix'], 'DONE') test.assertEqual( test.zipline_test_config['order_count'], From a2376cb87adebc719a045888da2063395d956bc0 Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Mon, 20 Aug 2012 20:56:40 -0400 Subject: [PATCH 36/46] test_finance.py no longer needs drain_zipline --- tests/test_finance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_finance.py b/tests/test_finance.py index 94a98da9..5562b671 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -20,7 +20,6 @@ from zipline.finance.performance import PerformanceTracker from zipline.utils.protocol_utils import ndict from zipline.finance.trading import TransactionSimulator from zipline.utils.test_utils import \ - drain_zipline, \ setup_logger, \ teardown_logger,\ assert_single_position From a7bb4b53d85ee79a6609418fa5a1a65880f8a3ad Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Tue, 21 Aug 2012 13:53:03 -0400 Subject: [PATCH 37/46] Close the results socket to ensure messages are sent --- zipline/lines.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipline/lines.py b/zipline/lines.py index 1c3a558f..d31c47d6 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -172,6 +172,8 @@ class SimulatedTrading(object): def close(self): log.info("Closing Simulation: {id}".format(id=self.sim_id)) + if self.results_socket: + self.results_socket.close() if self.proc and self.send_sighup: ppid = os.getppid() if self.success: From e43ded840b8d620f4e67efe0440b11f19c1a0ed8 Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Tue, 21 Aug 2012 13:53:59 -0400 Subject: [PATCH 38/46] Add delayed_signals class. --- tests/test_delayed_signals.py | 53 ++++++++++++++++++++++++++++++++ zipline/utils/delayed_signals.py | 42 +++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 tests/test_delayed_signals.py create mode 100644 zipline/utils/delayed_signals.py diff --git a/tests/test_delayed_signals.py b/tests/test_delayed_signals.py new file mode 100644 index 00000000..d840bb0a --- /dev/null +++ b/tests/test_delayed_signals.py @@ -0,0 +1,53 @@ +import os +from signal import signal, SIGHUP, SIGINT +import time +from types import FrameType +import unittest + +from zipline.utils.delayed_signals import delayed_signals + +class DelayedSignals(unittest.TestCase): + def handler(self, signum, frame): + print "Got signal " + str(signum) + self.got[signum] = time.time() + self.assertTrue(isinstance(frame, FrameType)) + + def setUp(self): + signal(SIGHUP, self.handler) + signal(SIGINT, self.handler) + + def reset(self): + self.got = {} + + def test_delayed_signals(self): + self.reset() + with delayed_signals([SIGHUP]): + os.kill(os.getpid(), SIGHUP) + time.sleep(2) + self.assertTrue(self.got[SIGHUP]) + self.assertTrue(time.time() - self.got[SIGHUP] < 2) + + def test_immediate_signals(self): + self.reset() + os.kill(os.getpid(), SIGHUP) + time.sleep(2) + self.assertTrue(self.got[SIGHUP]) + self.assertTrue(time.time() - self.got[SIGHUP] > 1) + + def test_multiple_signals(self): + self.reset() + with delayed_signals([SIGHUP, SIGINT]): + os.kill(os.getpid(), SIGINT) + self.assertFalse(SIGHUP in self.got) + self.assertTrue(SIGINT in self.got) + + @delayed_signals([SIGHUP]) + def kill_and_sleep(self): + os.kill(os.getpid(), SIGHUP) + time.sleep(2) + + def test_decorator(self): + self.reset() + self.kill_and_sleep() + self.assertTrue(SIGHUP in self.got) + self.assertTrue(time.time() - self.got[SIGHUP] < 2) diff --git a/zipline/utils/delayed_signals.py b/zipline/utils/delayed_signals.py new file mode 100644 index 00000000..1341fa3e --- /dev/null +++ b/zipline/utils/delayed_signals.py @@ -0,0 +1,42 @@ +from functools import wraps +from signal import signal + +class delayed_signals(object): + """ + Utility to temporary intercept one or more signals while a function or code + block is executed, restore their signal handlers at the end of execution, + and invoke them if the signals were in fact received during execution. + + Can be used either as a decorator or a context manager. + + Pass in an iterable of signals to intercept. + """ + + def handler(self, signum, frame=None): + self.got.append([self.trapped.index(signum), frame]) + + def __init__(self, signals): + self.trapped = signals + self.orig_handlers = [] + self.got = [] + + def __enter__(self): + for sig in self.trapped: + self.orig_handlers.append(signal(sig, self.handler)) + + def __exit__(self, time, value, traceback): + for i in xrange(len(self.trapped)): + signal(self.trapped[i], self.orig_handlers[i]) + for intercepted in self.got: + i = intercepted[0] + signum = self.trapped[i] + frame = intercepted[1] + self.orig_handlers[i](signum, frame) + + def __call__(self, fn): + @wraps(fn) + def call_fn(*args, **kwargs): + with self: + outval = fn(*args, **kwargs) + return outval + return call_fn From edb7e9bbf5216ef9b0384245665f2ac3fd11debf Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Tue, 21 Aug 2012 16:02:33 -0400 Subject: [PATCH 39/46] Refactor delayed_signals class for readability --- zipline/utils/delayed_signals.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/zipline/utils/delayed_signals.py b/zipline/utils/delayed_signals.py index 1341fa3e..9ca8c811 100644 --- a/zipline/utils/delayed_signals.py +++ b/zipline/utils/delayed_signals.py @@ -13,25 +13,23 @@ class delayed_signals(object): """ def handler(self, signum, frame=None): - self.got.append([self.trapped.index(signum), frame]) + self.got.append({'signum': signum, 'frame': frame}) def __init__(self, signals): - self.trapped = signals - self.orig_handlers = [] + self.signals = signals + self.handlers = {} self.got = [] def __enter__(self): - for sig in self.trapped: - self.orig_handlers.append(signal(sig, self.handler)) + for signum in self.signals: + # signal() returns the old signal handler + self.handlers[signum] = signal(signum, self.handler) def __exit__(self, time, value, traceback): - for i in xrange(len(self.trapped)): - signal(self.trapped[i], self.orig_handlers[i]) - for intercepted in self.got: - i = intercepted[0] - signum = self.trapped[i] - frame = intercepted[1] - self.orig_handlers[i](signum, frame) + for signum, handler in self.handlers.items(): + signal(signum, handler) + for signum, frame in ((i['signum'], i['frame']) for i in self.got): + self.handlers[signum](signum, frame) def __call__(self, fn): @wraps(fn) From ad980e64852bee526bd29ad717545ebb0a4b7604 Mon Sep 17 00:00:00 2001 From: Jonathan Kamens Date: Tue, 21 Aug 2012 22:29:51 -0400 Subject: [PATCH 40/46] Close test log handler at end of test to fix file descriptor leak --- zipline/utils/test_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index ae804426..3a9a0906 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -15,6 +15,7 @@ def setup_logger(test, path='/var/log/zipline/zipline.log'): def teardown_logger(test): test.log_handler.pop_application() + test.log_handler.close() def check_list(test, a, b, label): test.assertTrue(isinstance(a, (list, blist.blist))) From 784bb3673b6a3a1697d535b6486e664f151ad087 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Wed, 22 Aug 2012 09:51:29 -0400 Subject: [PATCH 41/46] Removes unused dev folder. - cli.py: Only contains a TODO: statement - topos.py: Appears to be a utility for debugging zeromq topologies. Which is unneeded in the current zeromq-less architecture. --- dev/cli.py | 1 - dev/topos.py | 189 --------------------------------------------------- 2 files changed, 190 deletions(-) delete mode 100644 dev/cli.py delete mode 100644 dev/topos.py diff --git a/dev/cli.py b/dev/cli.py deleted file mode 100644 index 3fd8a4b3..00000000 --- a/dev/cli.py +++ /dev/null @@ -1 +0,0 @@ -# TODO: move qexec console here diff --git a/dev/topos.py b/dev/topos.py deleted file mode 100644 index 23e37ff7..00000000 --- a/dev/topos.py +++ /dev/null @@ -1,189 +0,0 @@ -import uuid -import copy -import atexit -import pickle - -from datetime import datetime -from collections import defaultdict - -from UserDict import DictMixin - -class Snapshot(object, DictMixin): - """ - A snapshot in time of a history container. - """ - - def __init__(self, state, version, ts): - self.version = version - self.timestamp = ts - self._state = state - - def keys(self): - return self._state.keys() - - def values(self): - return self._state.values() - - def items(self): - return self._state.items() - - def __getitem__(self, key): - return self._state.__getitem__(key) - - def has_key(self, key): - return self._state.has_key(key) - - def copy(self): - return copy.copy(self._state) - -class History(object, DictMixin): - """ - A duck-typed dictionary that tracks its time evolution. - - Worth noting this not a particuarly high-performance - data structure due to the copious amount of copying going on. - """ - - def __init__(self, default=None): - if default: - initial = defaultdict(default) - else: - initial = {} - - self.version = 0 - self.changeset = [('CREATE', None)] - self.current = Snapshot(initial, version=self.version, ts=datetime.now()) - self._history = [self.current] - - def items(self, version=-1): - return self._history[version].items() - - def keys(self, version=-1): - return self._history[version].keys() - - def rollback(self, version): - pass - - def event(self, tup): - self.changeset.append(tup) - - def __getitem__(self, key, version=-1): - return self._history[version].__getitem__(key) - - def __setitem__(self, key, val): - if self.current.has_key(key): - self.changeset.append(('CHANGE', key)) - else: - self.changeset.append(('ADD', key)) - - state = self.current.copy() - state[key] = val - - self.version += 1 - self.current = Snapshot(state, self.version, datetime.now()) - self._history.append(self.current) - - def __delitem__(self, key): - self.changeset.append(('REMOVE', key)) - - state = self.current.copy() - del state[key] - - self.version += 1 - self.current = Snapshot(state, self.version, datetime.now()) - self._history.append(self.current) - - def history(self): - for change in self.changeset: - print change - - def __repr__(self): - return ':'.join(['historical', self.current._state.__repr__()]) - -SocketHistory = History() -ContextHistory = History() - -def patch_zmq(_zmq=None): - """ - Monkey patch zeromq to allow for socket tracking. - """ - if _zmq: - zmq = _zmq - else: - import zmq - - _Context = zmq.Context - _Socket = zmq.Socket - - class TrackedSocket(zmq.Socket): - - def __init__(self, context, socket_type): - self.context = context - self.uuid = str(uuid.uuid4()) - SocketHistory[self.uuid] = self - _Socket.__init__(self, context, socket_type) - - def connect(self, address): - SocketHistory.event(('CONNECT', self.uuid, address)) - _Socket.connect(self, address) - - def bind(self, address): - SocketHistory.event(('BIND', self.uuid, address)) - _Socket.bind(self, address) - - def close(self, *args, **kwargs): - del SocketHistory[self.uuid] - _Socket.close(self, *args, **kwargs) - - def setsockopt(self, option, optval): - if option == zmq.IDENTITY: - old = SocketHistory[self.uuid] - SocketHistory[optval] = old - del SocketHistory[self.uuid] - self.uuid = optval - - _Socket.setsockopt(self, option, optval) - - class TrackedContext(zmq.Context): - - def __init__(self, *args, **kwargs): - self.sockets = {} - _Context.__init__(self, *args, **kwargs) - self.uuid = str(uuid.uuid4()) - ContextHistory[self.uuid] = self - - def socket(self, socket_type): - sock = TrackedSocket(self, socket_type) - ContextHistory.event(('EMBED', self.uuid, sock.uuid)) - self.sockets[sock.uuid] = sock - return sock - - def name(self, name): - """ - Name the context. Is a superset of the vanilla pyzmq - API. - """ - old = ContextHistory[self.context.uuid] - ContextHistory[name] = old - del ContextHistory[self.context.uuid] - self.uuid = name - - def term(self, *args, **kwargs): - for uid, sock in self.sockets.iteritems(): - if not sock.closed: - del SocketHistory[sock.uuid] - del ContextHistory[self.uuid] - _Context.term(self, *args, **kwargs) - - def destroy(self, *args, **kwargs): - ContextHistory.event(('DESTROY', self.uuid)) - _Context.destroy(self, *args, **kwargs) - - zmq.Context = TrackedContext - zmq.Socket = TrackedSocket - return TrackedContext, TrackedSocket - -def track_to_file(f): - def write_track(): - pickle.dump(SocketHistory.changeset, file(f, 'wb+')) - atexit.register(write_track) From bf1247e00935a7c85fa05b77874d4c8787892c6a Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Wed, 22 Aug 2012 12:57:29 -0400 Subject: [PATCH 42/46] Removes unused imports. --- zipline/lines.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/zipline/lines.py b/zipline/lines.py index d31c47d6..4e5f2cfb 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -68,16 +68,15 @@ from setproctitle import setproctitle from zipline.test_algorithms import TestAlgorithm from zipline.finance.trading import SIMULATION_STYLE -from zipline.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe +from zipline.utils.log_utils import ZeroMQLogHandler from zipline.utils import factory -from zipline.test_algorithms import TestAlgorithm - -from zipline.gens.composites import \ - date_sorted_sources, merged_transforms, sequential_transforms -from zipline.gens.transform import Passthrough, StatefulTransform +from zipline.gens.composites import ( + date_sorted_sources, + sequential_transforms +) from zipline.gens.tradesimulation import TradeSimulationClient as tsc -from logbook import Logger, NestedSetup, Processor +from logbook import Logger import zipline.protocol as zp From 90cf794952a5093c78f9ef21f37797c10a503139 Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 22 Aug 2012 16:33:52 -0400 Subject: [PATCH 43/46] added statsmodels and moved pandas forward --- etc/requirements_sci.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index b74b3e57..5128eb20 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -4,7 +4,7 @@ python-dateutil==1.5 # Core scientific python numpy>=1.6.1 -pandas>=0.7.0rc1 +pandas=0.8.0 scipy>=0.10.0 matplotlib==1.1.0 @@ -12,8 +12,8 @@ matplotlib==1.1.0 numexpr==2.0.1 Cython==0.15.1 -#tables>=2.3.1 -#scikits.statsmodels>=0.3.1 +statsmodels==0.5.0 +patsy==0.1.0 # ZeroMQ pyzmq==2.1.11 From e6be7d49f21f2d2d07e8bc1fbaaf5d0c476869bd Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 22 Aug 2012 17:03:28 -0400 Subject: [PATCH 44/46] updated sci requirements to include statsmodels and latest pandas --- etc/requirements_sci.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index 5128eb20..3c72a01d 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -15,5 +15,6 @@ Cython==0.15.1 statsmodels==0.5.0 patsy==0.1.0 + # ZeroMQ pyzmq==2.1.11 From 25c33b70c77bc45f0e5be4f190af17f53d0dd4a7 Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 22 Aug 2012 17:21:32 -0400 Subject: [PATCH 45/46] fixed pandas version specifier, reversed order of patsy --- etc/requirements_sci.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index 3c72a01d..e9ab5111 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -4,7 +4,7 @@ python-dateutil==1.5 # Core scientific python numpy>=1.6.1 -pandas=0.8.0 +pandas==0.8.0 scipy>=0.10.0 matplotlib==1.1.0 @@ -12,8 +12,8 @@ matplotlib==1.1.0 numexpr==2.0.1 Cython==0.15.1 -statsmodels==0.5.0 patsy==0.1.0 +statsmodels==0.5.0 # ZeroMQ From 88c31e7a60a716f288a2a1c1875b24c9c16702b7 Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 22 Aug 2012 19:28:16 -0400 Subject: [PATCH 46/46] pip freeze reports statsmodels==0.5.0 ... actual version? 0.5.0-tutorial-beta --- etc/requirements_sci.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index e9ab5111..a0104713 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -13,7 +13,7 @@ matplotlib==1.1.0 numexpr==2.0.1 Cython==0.15.1 patsy==0.1.0 -statsmodels==0.5.0 +statsmodels==0.5.0-tutorial-beta # ZeroMQ