From 095f2dd65bf89e89c1313bc8c39deead60231d63 Mon Sep 17 00:00:00 2001 From: Richard Frank Date: Wed, 12 Dec 2012 14:01:10 -0500 Subject: [PATCH] Date bookkeeping fixes in perf and risk Issues appeared when we were close to the end of our historical data. Yielding DONE event with both perf and risk messages now --- tests/test_algorithm.py | 3 +- tests/test_perf_tracking.py | 70 ++++++++++++++++------ tests/test_risk_compare_batch_iterative.py | 4 +- zipline/finance/performance.py | 57 +++++++++++------- zipline/finance/risk.py | 15 +---- zipline/gens/tradesimulation.py | 1 + zipline/utils/factory.py | 15 +++-- 7 files changed, 103 insertions(+), 62 deletions(-) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index a8513424..f028c9a7 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -39,7 +39,8 @@ class TestTransformAlgorithm(TestCase): ) self.source = SpecificEquityTrades(event_list=trade_history) - self.df_source, self.df = factory.create_test_df_source() + self.df_source, self.df = \ + factory.create_test_df_source(self.trading_environment) def test_source_as_input(self): algo = TestRegisterTransformAlgorithm(sids=[133]) diff --git a/tests/test_perf_tracking.py b/tests/test_perf_tracking.py index be1ee502..85f32175 100644 --- a/tests/test_perf_tracking.py +++ b/tests/test_perf_tracking.py @@ -18,10 +18,14 @@ import copy import random import datetime import pytz +import itertools +from operator import attrgetter import zipline.utils.factory as factory import zipline.finance.performance as perf from zipline.utils.protocol_utils import ndict +from zipline.gens.sort import date_sort +from zipline.protocol import DATASOURCE_TYPE from zipline.finance.trading import TradingEnvironment @@ -539,7 +543,8 @@ shares in position" price_list, volume, trade_time_increment, - self.trading_environment + self.trading_environment, + source_id="factory1" ) sid2 = 134 @@ -550,13 +555,18 @@ shares in position" price2_list, volume, trade_time_increment, - self.trading_environment + self.trading_environment, + source_id="factory2" ) trade_history.extend(trade_history2) self.trading_environment.period_start = trade_history[0].dt self.trading_environment.period_end = trade_history[-1].dt + self.trading_environment.first_open = \ + self.trading_environment.calculate_first_open() + self.trading_environment.last_close = \ + self.trading_environment.calculate_last_close() self.trading_environment.capital_base = 1000.0 self.trading_environment.frame_index = [ 'sid', @@ -568,21 +578,26 @@ shares in position" self.trading_environment ) - for event in trade_history: - #create a transaction for all but - #first trade in each sid, to simulate None transaction - if(event.dt != self.trading_environment.period_start): - txn = ndict({ - 'sid': event.sid, - 'amount': -25, - 'dt': event.dt, - 'price': 10.0, - 'commission': 0.50 - }) - else: - txn = None - event['TRANSACTION'] = txn - perf_tracker.process_event(event) + # date_sort requires 'DONE' messages from each source + events = itertools.chain(trade_history, + [ndict({ + 'source_id': 'factory1', + 'dt': 'DONE', + 'type': DATASOURCE_TYPE.TRADE + }), + ndict({ + 'source_id': 'factory2', + 'dt': 'DONE', + 'type': DATASOURCE_TYPE.TRADE + })]) + events = date_sort(events, ('factory1', 'factory2')) + events = itertools.chain(events, + [ndict({'dt': 'DONE'})]) + + events = [self.event_with_txn(event) for event in events] + + list(perf_tracker.transform( + itertools.groupby(events, attrgetter('dt')))) #we skip two trades, to test case of None transaction txn_count = len(trade_history) - 2 @@ -592,6 +607,23 @@ shares in position" expected_size = txn_count / 2 * -25 self.assertEqual(cumulative_pos.amount, expected_size) - self.assertEqual(perf_tracker.period_end. - replace(hour=0, minute=0, second=0), + self.assertEqual(perf_tracker.last_close, perf_tracker.cumulative_risk_metrics.end_date) + + def event_with_txn(self, event): + #create a transaction for all but + #first trade in each sid, to simulate None transaction + if event.dt != self.trading_environment.period_start \ + and event.dt != 'DONE': + txn = ndict({ + 'sid': event.sid, + 'amount': -25, + 'dt': event.dt, + 'price': 10.0, + 'commission': 0.50 + }) + else: + txn = None + event['TRANSACTION'] = txn + + return event diff --git a/tests/test_risk_compare_batch_iterative.py b/tests/test_risk_compare_batch_iterative.py index a64ac59f..4eec911e 100644 --- a/tests/test_risk_compare_batch_iterative.py +++ b/tests/test_risk_compare_batch_iterative.py @@ -87,10 +87,10 @@ class RiskCompareIterativeToBatch(unittest.TestCase): #assert that when original raises exception, same #exception is raised by risk_metrics_refactor np.testing.assert_raises( - type(e), risk_metrics_refactor.update, ret) + type(e), risk_metrics_refactor.update, todays_date, ret) continue - risk_metrics_refactor.update(ret) + risk_metrics_refactor.update(todays_date, ret) self.assertEqual( risk_metrics_original.start_date, diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 532d508f..4a0c35ff 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -159,11 +159,11 @@ class PerformanceTracker(object): self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours=6, minutes=30) - self.calendar_day = datetime.timedelta(hours=24) self.started_at = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) self.period_start = self.trading_environment.period_start self.period_end = self.trading_environment.period_end + self.last_close = self.trading_environment.last_close self.market_open = self.trading_environment.first_open self.market_close = self.market_open + self.trading_day self.progress = 0.0 @@ -211,17 +211,23 @@ class PerformanceTracker(object): Main generator work loop. """ for date, snapshot in stream_in: - yield date, [self._transform_event(event) for event in snapshot] + new_snapshot = [] - def _transform_event(self, event): - if event.dt == "DONE": - event.perf_message = self.handle_simulation_end() - else: - event.perf_message = self.process_event(event) - event.portfolio = self.get_portfolio() + for event in snapshot: + if date != "DONE": + event.perf_message = self.process_event(event) + event.portfolio = self.get_portfolio() + else: + # the stream will end on the last trading day, but will + # not trigger an end of day, so we trigger the final + # market close here + event.perf_message = self.handle_market_close() + event.risk_message = self.handle_simulation_end() - del event['TRANSACTION'] - return event + del event['TRANSACTION'] + new_snapshot.append(event) + + yield date, new_snapshot def get_portfolio(self): return self.cumulative_performance.as_portfolio() @@ -249,7 +255,7 @@ class PerformanceTracker(object): assert isinstance(event, ndict) self.event_count += 1 - if(event.dt >= self.market_close): + if(event.dt > self.market_close): message = self.handle_market_close() if event.TRANSACTION: @@ -279,6 +285,7 @@ class PerformanceTracker(object): #update risk metrics for cumulative performance self.cumulative_risk_metrics.update( + self.market_close, self.todays_performance.returns) # increment the day counter before we move markers forward. @@ -290,15 +297,23 @@ class PerformanceTracker(object): # browser. daily_update = self.to_dict() + # On the last day of the test, don't create tomorrow's performance + # period. We may not be able to find the next trading day if we're + # at the end of our historical data + if self.market_close >= self.last_close: + return daily_update + #move the market day markers forward - self.market_open = self.market_open + self.calendar_day - - while not self.trading_environment.is_trading_day(self.market_open): - if self.market_open > self.trading_environment.trading_days[-1]: - raise Exception( - "Attempt to backtest beyond available history.") - self.market_open = self.market_open + self.calendar_day + next_open = self.trading_environment.next_trading_day(self.market_open) + if next_open is None: + raise Exception( + "Attempt to backtest beyond available history. \ +Last successful date: %s" % self.market_open) + # next_open is a midnight date, but we want the time too + self.market_open = next_open.replace(hour=self.market_open.hour, + minute=self.market_open.minute, + second=self.market_open.second) self.market_close = self.market_open + self.trading_day # Roll over positions to current day. @@ -323,10 +338,8 @@ class PerformanceTracker(object): log.info(log_msg.format(n=self.day_count, m=self.total_days)) log.info("first open: {d}".format( d=self.trading_environment.first_open)) - - # the stream will end on the last trading day, but will not trigger - # an end of day, so we trigger the final market close here. - self.handle_market_close() + log.info("last close: {d}".format( + d=self.trading_environment.last_close)) self.risk_report = risk.RiskReport( self.returns, diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index 842cfb56..78463889 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -423,7 +423,7 @@ class RiskMetricsIterative(RiskMetricsBase): if x.date >= self.start_date ] - def update(self, returns_in_period): + def update(self, market_close, returns_in_period): if self.trading_environment.is_trading_day(self.end_date): self.algorithm_returns.append(returns_in_period) self.benchmark_returns.append( @@ -431,18 +431,7 @@ class RiskMetricsIterative(RiskMetricsBase): self.trading_days += 1 self.update_compounded_log_returns() - next_trading_day = \ - self.trading_environment.next_trading_day(self.end_date) - - if next_trading_day: - self.end_date = next_trading_day - else: - message = "No trading data on or after {dt}. Check \ -that date doesn't exceed benchmark history range." - message = message.format(dt=self.end_date) - raise Exception(message) - - self.end_date = self.end_date.replace(hour=0, minute=0, second=0) + self.end_date = market_close self.algorithm_period_returns.append( self.calculate_period_returns(self.algorithm_returns)) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 5a70746c..4aab868d 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -209,6 +209,7 @@ class AlgorithmSimulator(object): if date == 'DONE': for event in snapshot: yield event.perf_message + yield event.risk_message raise StopIteration # We're still in the warmup period. Use the event to diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index c3f6af47..c203c914 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -127,12 +127,13 @@ def get_next_trading_dt(current, interval, trading_calendar): return next -def create_trade_history(sid, prices, amounts, interval, trading_calendar): +def create_trade_history(sid, prices, amounts, interval, trading_calendar, + source_id="test_factory"): trades = [] current = trading_calendar.first_open for price, amount in zip(prices, amounts): - trade = create_trade(sid, price, amount, current) + trade = create_trade(sid, price, amount, current, source_id) trades.append(trade) current = get_next_trading_dt(current, interval, trading_calendar) @@ -272,9 +273,13 @@ def create_trade_source(sids, trade_count, return source -def create_test_df_source(): - start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) - end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) +def create_test_df_source(trading_calendar=None): + start = trading_calendar.first_open \ + if trading_calendar else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) + + end = trading_calendar.last_close \ + if trading_calendar else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) + index = pd.DatetimeIndex(start=start, end=end, freq=pd.datetools.day) x = np.arange(0, len(index))