diff --git a/tests/test_minute_risk.py b/tests/test_minute_risk.py new file mode 100644 index 00000000..9fa02a43 --- /dev/null +++ b/tests/test_minute_risk.py @@ -0,0 +1,57 @@ +# +# Copyright 2013 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import datetime +import pytz + +from zipline.finance.trading import SimulationParameters +from zipline.finance import risk + + +class TestMinuteRisk(unittest.TestCase): + + def setUp(self): + + start_date = datetime.datetime( + year=2006, + month=1, + day=3, + hour=0, + minute=0, + tzinfo=pytz.utc) + end_date = datetime.datetime( + year=2006, month=1, day=3, tzinfo=pytz.utc) + + self.sim_params = SimulationParameters( + period_start=start_date, + period_end=end_date + ) + self.sim_params.emission_rate = 'minute' + + def test_minute_risk(self): + + risk_metrics = risk.RiskMetricsIterative(self.sim_params) + + first_dt = self.sim_params.first_open + second_dt = self.sim_params.first_open + datetime.timedelta(minutes=1) + + risk_metrics.update(first_dt, 1.0, 2.0) + + self.assertEquals(1, len(risk_metrics.alpha)) + + risk_metrics.update(second_dt, 3.0, 4.0) + + self.assertEquals(2, len(risk_metrics.alpha)) diff --git a/tests/test_perf_tracking.py b/tests/test_perf_tracking.py index 3dfbe5ac..22a4d070 100644 --- a/tests/test_perf_tracking.py +++ b/tests/test_perf_tracking.py @@ -22,7 +22,6 @@ from nose_parameterized import parameterized import datetime import pytz import itertools -from operator import attrgetter import zipline.utils.factory as factory import zipline.finance.performance as perf @@ -63,23 +62,26 @@ def calculate_results(host, events): perf_tracker = perf.PerformanceTracker(host.sim_params) - all_events = (msg[1] for msg in heapq.merge( + all_events = heapq.merge( ((event.dt, event) for event in events), - ((event.dt, event) for event in host.benchmark_events))) + ((event.dt, event) for event in host.benchmark_events)) - transformed_events = list(perf_tracker.transform( - itertools.groupby(all_events, attrgetter('dt')))) - - #flatten the list of events + filtered_events = [(date, filt_event) for (date, filt_event) + in all_events if date <= events[-1].dt] + filtered_events.sort(key=lambda x: x[0]) + grouped_events = itertools.groupby(filtered_events, lambda x: x[0]) results = [] - for te in transformed_events: - for event in te[1]: - for message in event.perf_messages: - results.append(message) - - perf_messages, risk = perf_tracker.handle_simulation_end() - results.append(perf_messages[0]) + bm_updated = False + for date, group in grouped_events: + for _, event in group: + perf_tracker.process_event(event) + if event.type == DATASOURCE_TYPE.BENCHMARK: + bm_updated = True + if bm_updated: + msg = perf_tracker.handle_market_close() + results.append(msg) + bm_updated = False return results @@ -239,9 +241,9 @@ class TestDividendPerformance(unittest.TestCase): ) buy_txn = create_txn(1, 10.0, 100, events[1].dt) - events.insert(2, buy_txn) + events.insert(1, buy_txn) sell_txn = create_txn(1, 10.0, -100, events[3].dt) - events.insert(4, sell_txn) + events.insert(3, sell_txn) events.insert(1, dividend) results = calculate_results(self, events) @@ -267,12 +269,16 @@ class TestDividendPerformance(unittest.TestCase): self.sim_params ) + pay_date = self.sim_params.first_open + # find pay date that is much later. + for i in xrange(30): + pay_date = factory.get_next_trading_dt(pay_date, oneday) dividend = factory.create_dividend( 1, 10.00, events[0].dt, events[1].dt, - events[-1].dt + 10 * oneday + pay_date ) buy_txn = create_txn(1, 10.0, 100, events[1].dt) @@ -308,9 +314,11 @@ class TestDividendPerformance(unittest.TestCase): dividend = factory.create_dividend( 1, 10.00, + # declare at open of test events[0].dt, - events[1].dt, - events[2].dt + # ex_date same as trade 2 + events[2].dt, + events[3].dt ) txn = create_txn(1, 10.0, -100, events[1].dt) @@ -321,14 +329,14 @@ class TestDividendPerformance(unittest.TestCase): self.assertEqual(len(results), 5) cumulative_returns = \ [event['cumulative_perf']['returns'] for event in results] - self.assertEqual(cumulative_returns, [0.0, 0.0, -0.1, -0.1, -0.1]) + self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, -0.1, -0.1]) daily_returns = [event['daily_perf']['returns'] for event in results] - self.assertEqual(daily_returns, [0.0, 0.0, -0.1, 0.0, 0.0]) + self.assertEqual(daily_returns, [0.0, 0.0, 0.0, -0.1, 0.0]) cash_flows = [event['daily_perf']['capital_used'] for event in results] - self.assertEqual(cash_flows, [1000, 0, -1000, 0, 0]) + self.assertEqual(cash_flows, [0, 1000, 0, -1000, 0]) cumulative_cash_flows = \ [event['cumulative_perf']['capital_used'] for event in results] - self.assertEqual(cumulative_cash_flows, [1000, 1000, 0, 0, 0]) + self.assertEqual(cumulative_cash_flows, [0, 1000, 1000, 0, 0]) def test_no_position_receives_no_dividend(self): #post some trades in the market @@ -349,24 +357,7 @@ class TestDividendPerformance(unittest.TestCase): ) events.insert(1, dividend) - perf_tracker = perf.PerformanceTracker(self.sim_params) - - all_events = (msg[1] for msg in heapq.merge( - ((event.dt, event) for event in events), - ((event.dt, event) for event in self.benchmark_events))) - - transformed_events = list(perf_tracker.transform( - itertools.groupby(all_events, attrgetter('dt')))) - - #flatten the list of events - results = [] - for te in transformed_events: - for event in te[1]: - for message in event.perf_messages: - results.append(message) - - perf_messages, risk = perf_tracker.handle_simulation_end() - results.append(perf_messages[0]) + results = calculate_results(self, events) self.assertEqual(len(results), 5) cumulative_returns = \ @@ -972,19 +963,18 @@ class TestPerformanceTracker(unittest.TestCase): ((event.dt, event) for event in events), ((event.dt, event) for event in benchmark_events))) - # Extract events with transactions to use for verification. - perf_messages = \ - [m for date, snapshot in - perf_tracker.transform( - itertools.groupby(all_events, attrgetter('dt'))) - for e in snapshot - for m in e.perf_messages] + filtered_events = [filt_event for filt_event + in all_events if event.dt <= end_dt] + filtered_events.sort(key=lambda x: x.dt) + grouped_events = itertools.groupby(filtered_events, lambda x: x.dt) + perf_messages = [] - end_perf_messages, risk_message = perf_tracker.handle_simulation_end() + for date, group in grouped_events: + for event in group: + perf_tracker.process_event(event) + msg = perf_tracker.handle_market_close() + perf_messages.append(msg) - perf_messages.extend(end_perf_messages) - - #we skip two trades, to test case of None transaction self.assertEqual(perf_tracker.txn_count, len(txns)) self.assertEqual(perf_tracker.txn_count, len(orders)) @@ -1047,26 +1037,44 @@ class TestPerformanceTracker(unittest.TestCase): dt=foo_event_1.dt, price=10.0, commission=0.50) + benchmark_event_1 = Event({ + 'dt': start_dt, + 'returns': 1.0, + 'type': DATASOURCE_TYPE.BENCHMARK + }) foo_event_2 = factory.create_trade( 'foo', 11.0, 20, start_dt + datetime.timedelta(minutes=1)) bar_event_2 = factory.create_trade( 'bar', 11.0, 20, start_dt + datetime.timedelta(minutes=1)) + benchmark_event_2 = Event({ + 'dt': start_dt + datetime.timedelta(minutes=1), + 'returns': 2.0, + 'type': DATASOURCE_TYPE.BENCHMARK + }) events = [ foo_event_1, order_event_1, + benchmark_event_1, txn_event_1, bar_event_1, foo_event_2, - bar_event_2 + benchmark_event_2, + bar_event_2, ] - messages = {date: snapshot[-1].perf_messages[0] for date, snapshot in - tracker.transform( - itertools.groupby( - events, - operator.attrgetter('dt')))} + grouped_events = itertools.groupby( + events, operator.attrgetter('dt')) + + messages = {} + for date, group in grouped_events: + tracker.set_date(date) + for event in group: + tracker.process_event(event) + tracker.handle_minute_close(date) + msg = tracker.to_dict() + messages[date] = msg self.assertEquals(2, len(messages)) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 1fdfe4ad..b1a12042 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -166,9 +166,13 @@ class PerformanceTracker(object): risk.RiskMetricsIterative(self.sim_params) self.emission_rate = sim_params.emission_rate - # Temporarily hold these here as we work on streaming benchmarks. - self.all_benchmark_returns = pd.Series( - index=trading.environment.trading_days) + if self.emission_rate == 'daily': + self.all_benchmark_returns = pd.Series( + index=trading.environment.trading_days) + elif self.emission_rate == 'minute': + self.all_benchmark_returns = pd.Series(index=pd.date_range( + self.sim_params.first_open, self.sim_params.last_close, + freq='Min')) # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( @@ -217,39 +221,10 @@ class PerformanceTracker(object): elif self.emission_rate == 'daily': return self.day_count / self.total_days - def transform(self, stream_in): - """ - Main generator work loop. - """ - for date, snapshot in stream_in: - new_snapshot = [] - - if self.emission_rate == 'daily': - for event in snapshot: - messages = self.process_event(event) - if messages is not None: - event.perf_messages = messages - event.portfolio = self.get_portfolio() - - new_snapshot.append(event) - - elif self.emission_rate == 'minute': - self.saved_dt = date - self.todays_performance.period_close = self.saved_dt - - for event in snapshot: - self.process_event(event) - if event.type == zp.DATASOURCE_TYPE.TRADE: - event.perf_messages = [] - event.portfolio = None - new_snapshot.append(event) - - if new_snapshot: - new_snapshot[-1].perf_messages = [self.to_dict()] - new_snapshot[-1].portfolio = self.get_portfolio() - - if new_snapshot: - yield date, new_snapshot + def set_date(self, date): + if self.emission_rate == 'minute': + self.saved_dt = date + self.todays_performance.period_close = self.saved_dt def get_portfolio(self): return self.cumulative_performance.as_portfolio() @@ -279,6 +254,8 @@ class PerformanceTracker(object): # its own configuration down the line. # Naming as intraday to make clear that these results are # being updated per minute + _dict['intraday_risk_metrics'] = \ + self.cumulative_risk_metrics.to_dict() _dict['intraday_perf'] = self.todays_performance.to_dict( self.saved_dt) @@ -286,25 +263,14 @@ class PerformanceTracker(object): def process_event(self, event): - messages = None self.event_count += 1 if event.type == zp.DATASOURCE_TYPE.TRADE: - messages = [] - - # This switch could also be handled by an inheritance - # with a DailyPerformanceTracker and a MinutePerformanceTracker - if self.emission_rate == 'daily': - while (event.dt > self.market_close and - event.dt < self.last_close): - messages.append(self.handle_market_close()) - #update last sale self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) elif event.type == zp.DATASOURCE_TYPE.TRANSACTION: - # Trade simulation always follows a transaction with the # TRADE event that was used to simulate it, so we don't # check for end of day rollover messages here. @@ -313,26 +279,17 @@ class PerformanceTracker(object): event ) self.todays_performance.execute_transaction(event) - # Transactions are consumed by performance, and not - # relayed to the next element in the generator chain. - messages = None elif event.type == zp.DATASOURCE_TYPE.DIVIDEND: self.cumulative_performance.add_dividend(event) self.todays_performance.add_dividend(event) - # Dividends are consumed by performance, and not - # relayed to the next element in the generator chain. - messages = None elif event.type == zp.DATASOURCE_TYPE.ORDER: self.cumulative_performance.record_order(event) self.todays_performance.record_order(event) - messages = None elif event.type == zp.DATASOURCE_TYPE.CUSTOM: - # we just want to relay this event unchanged. - messages = [] - return messages + pass elif event.type == zp.DATASOURCE_TYPE.BENCHMARK: self.all_benchmark_returns[event.dt] = event.returns @@ -340,7 +297,14 @@ class PerformanceTracker(object): self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() - return messages + def handle_minute_close(self, dt): + #update risk metrics for cumulative performance + algorithm_returns = pd.Series({dt: self.todays_performance.returns}) + benchmark_returns = pd.Series({dt: self.all_benchmark_returns[dt]}) + + self.cumulative_risk_metrics.update(dt, + algorithm_returns, + benchmark_returns) def handle_market_close(self): # add the return results from today to the list of DailyReturn objects. @@ -405,14 +369,6 @@ class PerformanceTracker(object): When the simulation is complete, run the full period risk report and send it out on the results socket. """ - # the stream will end on the last trading day, but will - # not trigger an end of day, so we trigger the final - # market close(s) here - perf_messages = [] - while self.last_close > self.market_close: - perf_messages.append(self.handle_market_close()) - - perf_messages.append(self.handle_market_close()) log_msg = "Simulated {n} trading days out of {m}." log.info(log_msg.format(n=int(self.day_count), m=self.total_days)) @@ -424,7 +380,7 @@ class PerformanceTracker(object): self.risk_report = risk.RiskReport(self.returns, self.sim_params) risk_dict = self.risk_report.to_dict() - return perf_messages, risk_dict + return risk_dict class Position(object): diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index 023d7b97..b61385c2 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -537,8 +537,20 @@ class RiskMetricsIterative(RiskMetricsBase): self.trading_days = all_trading_days[mask] - self.algorithm_returns_cont = pd.Series(index=self.trading_days) - self.benchmark_returns_cont = pd.Series(index=self.trading_days) + self.sim_params = sim_params + + if sim_params.emission_rate == 'daily': + self.algorithm_returns_cont = pd.Series(index=self.trading_days) + self.benchmark_returns_cont = pd.Series(index=self.trading_days) + + elif sim_params.emission_rate == 'minute': + + self.algorithm_returns_cont = pd.Series(index=pd.date_range( + sim_params.first_open, sim_params.last_close, + freq="Min")) + self.benchmark_returns_cont = pd.Series(index=pd.date_range( + sim_params.first_open, sim_params.last_close, + freq="Min")) self.algorithm_returns = None self.benchmark_returns = None @@ -629,9 +641,6 @@ algorithm_returns ({algo_count}) in range {start} : {end} on {dt}" 'treasury_period_return': self.treasury_period_return, 'algorithm_period_return': self.algorithm_period_returns[-1], 'benchmark_period_return': self.benchmark_period_returns[-1], - 'sharpe': self.sharpe[-1], - 'sortino': self.sortino[-1], - 'information': self.information[-1], 'beta': self.beta[-1], 'alpha': self.alpha[-1], 'excess_return': self.excess_returns[-1], @@ -639,6 +648,17 @@ algorithm_returns ({algo_count}) in range {start} : {end} on {dt}" 'period_label': period_label } + if self.sim_params.emission_rate == 'daily': + # Some risk metrics only make sense in a context of daily + # risk calculations. + rval['sharpe'] = self.sharpe[-1] + rval['sortino'] = self.sortino[-1] + rval['information'] = self.information[-1] + elif self.sim_params.emission_rate == 'minute': + rval['sharpe'] = 0.0 + rval['sortino'] = 0.0 + rval['information'] = 0.0 + # check if a field in rval is nan, and replace it with # None. def check_entry(key, value): diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index ee52981d..613a9b35 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -68,11 +68,6 @@ class Blotter(object): Main generator work loop. """ for date, snapshot in stream_in: - # relay any orders placed in prior snapshot - # handling and reset the internal holding pen - if self.new_orders: - yield date, self.new_orders - self.new_orders = [] results = [] for event in snapshot: @@ -85,6 +80,9 @@ class Blotter(object): yield date, results def process_trade(self, trade_event): + if trade_event.type != DATASOURCE_TYPE.TRADE: + return [], [] + if zp_math.tolerant_equals(trade_event.volume, 0): # there are zero volume trade_events bc some stocks trade # less frequently than once per minute. @@ -103,7 +101,8 @@ class Blotter(object): txns = self.transact(trade_event, current_orders) for txn in txns: self.orders[txn.order_id].filled += txn.amount - # mark the date of the order to match the txn + # mark the date of the order to match the transaction + # that is filling it. self.orders[txn.order_id].dt = txn.dt modified_orders = [order for order @@ -262,23 +261,10 @@ class TradeSimulationClient(object): """ Main generator work loop. """ - - # Simulate filling any open orders made by the previous run of - # the user's algorithm. Fills the Transaction field on any - # event that results in a filled order. - with_filled_orders = self.blotter.transform(stream_in) - - # Pipe the events with transactions to perf. This will remove - # the TRANSACTION field added by TransactionSimulator and replace it - # with a portfolio field to be passed to the user's - # algorithm. Also adds a perf_messages field which is usually - # empty, but contains update messages once per day. - with_portfolio = self.perf_tracker.transform(with_filled_orders) - # Pass the messages from perf to the user's algorithm for simulation. # Events are batched by dt so that the algo handles all events for a # given timestamp at one one go. - performance_messages = self.algo_sim.transform(with_portfolio) + performance_messages = self.algo_sim.transform(stream_in) # The algorithm will yield a daily_results message (as # calculated by the performance tracker) at the end of each @@ -407,41 +393,57 @@ class AlgorithmSimulator(object): # snapshot time to any log record generated. with self.processor.threadbound(): + updated = False + bm_updated = False for date, snapshot in stream: - # We're still in the warmup period. Use the event to + self.perf_tracker.set_date(date) + # If we're still in the warmup period. Use the event to # update our universe, but don't yield any perf messages, # and don't send a snapshot to handle_data. if date < self.algo_start: for event in snapshot: - del event['perf_messages'] - self.update_universe(event) + if event.type in (DATASOURCE_TYPE.TRADE, + DATASOURCE_TYPE.CUSTOM): + self.update_universe(event) + self.perf_tracker.process_event(event) - # Regular snapshot. Update the universe and send a snapshot - # to handle data. else: - for event in snapshot: - for perf_message in event.perf_messages: - # append current values of recorded vars - # to emitted message - perf_message[self.perf_key]['recorded_vars'] =\ - self.algo.recorded_vars - yield perf_message - del event['perf_messages'] - self.update_universe(event) + for event in snapshot: + if event.type in (DATASOURCE_TYPE.TRADE, + DATASOURCE_TYPE.CUSTOM): + self.update_universe(event) + updated = True + if event.type == DATASOURCE_TYPE.BENCHMARK: + bm_updated = True + txns, orders = self.blotter.process_trade(event) + for data in chain([event], txns, orders): + self.perf_tracker.process_event(data) + + # Update our portfolio. + self.algo.set_portfolio(self.perf_tracker.get_portfolio()) # Send the current state of the universe # to the user's algo. - self.simulate_snapshot(date) + if updated: + self.simulate_snapshot(date) + updated = False - perf_messages, risk_message = \ - self.perf_tracker.handle_simulation_end() + # run orders placed in the algorithm call + # above through perf tracker before emitting + # the perf packet, so that the perf includes + # placed orders + for order in self.blotter.new_orders: + self.perf_tracker.process_event(order) + self.blotter.new_orders = [] - if self.perf_tracker.emission_rate == 'daily': - for message in perf_messages: - message[self.perf_key]['recorded_vars'] =\ - self.algo.recorded_vars - yield message + # The benchmark is our internal clock. When it + # updates, we need to emit a performance message. + if bm_updated: + bm_updated = False + yield self.get_message(date) + + risk_message = self.perf_tracker.handle_simulation_end() # When emitting minutely, it is still useful to have a final # packet with the entire days performance rolled up. @@ -455,20 +457,24 @@ class AlgorithmSimulator(object): yield risk_message + def get_message(self, date): + rvars = self.algo.recorded_vars + if self.perf_tracker.emission_rate == 'daily': + perf_message = \ + self.perf_tracker.handle_market_close() + perf_message['daily_perf']['recorded_vars'] = rvars + return perf_message + + elif self.perf_tracker.emission_rate == 'minute': + self.perf_tracker.handle_minute_close(date) + perf_message = self.perf_tracker.to_dict() + perf_message['intraday_perf']['recorded_vars'] = rvars + return perf_message + def update_universe(self, event): """ Update the universe with new event information. """ - # Update our portfolio. - self.algo.set_portfolio(event.portfolio) - # the portfolio is modified by each event passed into the - # performance tracker (prices and amounts can change). - # Performance tracker sends back an up-to-date portfolio - # with each event. However, we provide the portfolio to - # the algorithm via a setter method, rather than as part - # of the event data sent to handle_data. To avoid - # confusion, we remove it from the event here. - del event.portfolio # Update our knowledge of this event's sid sid_data = self.universe[event.sid] sid_data.__dict__.update(event.__dict__) @@ -482,7 +488,6 @@ class AlgorithmSimulator(object): # log/print lines. self.snapshot_dt = date self.algo.set_datetime(self.snapshot_dt) - self.algo.handle_data(self.universe) - # Update the simulation time. self.simulation_dt = date + self.algo.handle_data(self.universe)