ENH: Emit minute benchmarks, refactor event loop, fix order timing.

Note, in a somewhat Catch-22 situation, the refactoring of
the simulation loop and streaming of minutely benchmarks enable
each other, since the benchmarks fall over because of doubled
date groupings, which is fixed by the change of the simulation loop,
but the simulation loop depends on using the benchmarks as a clock,
so pull the two changes in together, though the two changes do not
work independently of each other.
This commit is contained in:
Eddie Hebert
2013-04-25 17:18:58 -04:00
5 changed files with 230 additions and 184 deletions
+57
View File
@@ -0,0 +1,57 @@
#
# Copyright 2013 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import datetime
import pytz
from zipline.finance.trading import SimulationParameters
from zipline.finance import risk
class TestMinuteRisk(unittest.TestCase):
def setUp(self):
start_date = datetime.datetime(
year=2006,
month=1,
day=3,
hour=0,
minute=0,
tzinfo=pytz.utc)
end_date = datetime.datetime(
year=2006, month=1, day=3, tzinfo=pytz.utc)
self.sim_params = SimulationParameters(
period_start=start_date,
period_end=end_date
)
self.sim_params.emission_rate = 'minute'
def test_minute_risk(self):
risk_metrics = risk.RiskMetricsIterative(self.sim_params)
first_dt = self.sim_params.first_open
second_dt = self.sim_params.first_open + datetime.timedelta(minutes=1)
risk_metrics.update(first_dt, 1.0, 2.0)
self.assertEquals(1, len(risk_metrics.alpha))
risk_metrics.update(second_dt, 3.0, 4.0)
self.assertEquals(2, len(risk_metrics.alpha))
+66 -58
View File
@@ -22,7 +22,6 @@ from nose_parameterized import parameterized
import datetime
import pytz
import itertools
from operator import attrgetter
import zipline.utils.factory as factory
import zipline.finance.performance as perf
@@ -63,23 +62,26 @@ def calculate_results(host, events):
perf_tracker = perf.PerformanceTracker(host.sim_params)
all_events = (msg[1] for msg in heapq.merge(
all_events = heapq.merge(
((event.dt, event) for event in events),
((event.dt, event) for event in host.benchmark_events)))
((event.dt, event) for event in host.benchmark_events))
transformed_events = list(perf_tracker.transform(
itertools.groupby(all_events, attrgetter('dt'))))
#flatten the list of events
filtered_events = [(date, filt_event) for (date, filt_event)
in all_events if date <= events[-1].dt]
filtered_events.sort(key=lambda x: x[0])
grouped_events = itertools.groupby(filtered_events, lambda x: x[0])
results = []
for te in transformed_events:
for event in te[1]:
for message in event.perf_messages:
results.append(message)
perf_messages, risk = perf_tracker.handle_simulation_end()
results.append(perf_messages[0])
bm_updated = False
for date, group in grouped_events:
for _, event in group:
perf_tracker.process_event(event)
if event.type == DATASOURCE_TYPE.BENCHMARK:
bm_updated = True
if bm_updated:
msg = perf_tracker.handle_market_close()
results.append(msg)
bm_updated = False
return results
@@ -239,9 +241,9 @@ class TestDividendPerformance(unittest.TestCase):
)
buy_txn = create_txn(1, 10.0, 100, events[1].dt)
events.insert(2, buy_txn)
events.insert(1, buy_txn)
sell_txn = create_txn(1, 10.0, -100, events[3].dt)
events.insert(4, sell_txn)
events.insert(3, sell_txn)
events.insert(1, dividend)
results = calculate_results(self, events)
@@ -267,12 +269,16 @@ class TestDividendPerformance(unittest.TestCase):
self.sim_params
)
pay_date = self.sim_params.first_open
# find pay date that is much later.
for i in xrange(30):
pay_date = factory.get_next_trading_dt(pay_date, oneday)
dividend = factory.create_dividend(
1,
10.00,
events[0].dt,
events[1].dt,
events[-1].dt + 10 * oneday
pay_date
)
buy_txn = create_txn(1, 10.0, 100, events[1].dt)
@@ -308,9 +314,11 @@ class TestDividendPerformance(unittest.TestCase):
dividend = factory.create_dividend(
1,
10.00,
# declare at open of test
events[0].dt,
events[1].dt,
events[2].dt
# ex_date same as trade 2
events[2].dt,
events[3].dt
)
txn = create_txn(1, 10.0, -100, events[1].dt)
@@ -321,14 +329,14 @@ class TestDividendPerformance(unittest.TestCase):
self.assertEqual(len(results), 5)
cumulative_returns = \
[event['cumulative_perf']['returns'] for event in results]
self.assertEqual(cumulative_returns, [0.0, 0.0, -0.1, -0.1, -0.1])
self.assertEqual(cumulative_returns, [0.0, 0.0, 0.0, -0.1, -0.1])
daily_returns = [event['daily_perf']['returns'] for event in results]
self.assertEqual(daily_returns, [0.0, 0.0, -0.1, 0.0, 0.0])
self.assertEqual(daily_returns, [0.0, 0.0, 0.0, -0.1, 0.0])
cash_flows = [event['daily_perf']['capital_used'] for event in results]
self.assertEqual(cash_flows, [1000, 0, -1000, 0, 0])
self.assertEqual(cash_flows, [0, 1000, 0, -1000, 0])
cumulative_cash_flows = \
[event['cumulative_perf']['capital_used'] for event in results]
self.assertEqual(cumulative_cash_flows, [1000, 1000, 0, 0, 0])
self.assertEqual(cumulative_cash_flows, [0, 1000, 1000, 0, 0])
def test_no_position_receives_no_dividend(self):
#post some trades in the market
@@ -349,24 +357,7 @@ class TestDividendPerformance(unittest.TestCase):
)
events.insert(1, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
all_events = (msg[1] for msg in heapq.merge(
((event.dt, event) for event in events),
((event.dt, event) for event in self.benchmark_events)))
transformed_events = list(perf_tracker.transform(
itertools.groupby(all_events, attrgetter('dt'))))
#flatten the list of events
results = []
for te in transformed_events:
for event in te[1]:
for message in event.perf_messages:
results.append(message)
perf_messages, risk = perf_tracker.handle_simulation_end()
results.append(perf_messages[0])
results = calculate_results(self, events)
self.assertEqual(len(results), 5)
cumulative_returns = \
@@ -972,19 +963,18 @@ class TestPerformanceTracker(unittest.TestCase):
((event.dt, event) for event in events),
((event.dt, event) for event in benchmark_events)))
# Extract events with transactions to use for verification.
perf_messages = \
[m for date, snapshot in
perf_tracker.transform(
itertools.groupby(all_events, attrgetter('dt')))
for e in snapshot
for m in e.perf_messages]
filtered_events = [filt_event for filt_event
in all_events if event.dt <= end_dt]
filtered_events.sort(key=lambda x: x.dt)
grouped_events = itertools.groupby(filtered_events, lambda x: x.dt)
perf_messages = []
end_perf_messages, risk_message = perf_tracker.handle_simulation_end()
for date, group in grouped_events:
for event in group:
perf_tracker.process_event(event)
msg = perf_tracker.handle_market_close()
perf_messages.append(msg)
perf_messages.extend(end_perf_messages)
#we skip two trades, to test case of None transaction
self.assertEqual(perf_tracker.txn_count, len(txns))
self.assertEqual(perf_tracker.txn_count, len(orders))
@@ -1047,26 +1037,44 @@ class TestPerformanceTracker(unittest.TestCase):
dt=foo_event_1.dt,
price=10.0,
commission=0.50)
benchmark_event_1 = Event({
'dt': start_dt,
'returns': 1.0,
'type': DATASOURCE_TYPE.BENCHMARK
})
foo_event_2 = factory.create_trade(
'foo', 11.0, 20, start_dt + datetime.timedelta(minutes=1))
bar_event_2 = factory.create_trade(
'bar', 11.0, 20, start_dt + datetime.timedelta(minutes=1))
benchmark_event_2 = Event({
'dt': start_dt + datetime.timedelta(minutes=1),
'returns': 2.0,
'type': DATASOURCE_TYPE.BENCHMARK
})
events = [
foo_event_1,
order_event_1,
benchmark_event_1,
txn_event_1,
bar_event_1,
foo_event_2,
bar_event_2
benchmark_event_2,
bar_event_2,
]
messages = {date: snapshot[-1].perf_messages[0] for date, snapshot in
tracker.transform(
itertools.groupby(
events,
operator.attrgetter('dt')))}
grouped_events = itertools.groupby(
events, operator.attrgetter('dt'))
messages = {}
for date, group in grouped_events:
tracker.set_date(date)
for event in group:
tracker.process_event(event)
tracker.handle_minute_close(date)
msg = tracker.to_dict()
messages[date] = msg
self.assertEquals(2, len(messages))
+23 -67
View File
@@ -166,9 +166,13 @@ class PerformanceTracker(object):
risk.RiskMetricsIterative(self.sim_params)
self.emission_rate = sim_params.emission_rate
# Temporarily hold these here as we work on streaming benchmarks.
self.all_benchmark_returns = pd.Series(
index=trading.environment.trading_days)
if self.emission_rate == 'daily':
self.all_benchmark_returns = pd.Series(
index=trading.environment.trading_days)
elif self.emission_rate == 'minute':
self.all_benchmark_returns = pd.Series(index=pd.date_range(
self.sim_params.first_open, self.sim_params.last_close,
freq='Min'))
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
@@ -217,39 +221,10 @@ class PerformanceTracker(object):
elif self.emission_rate == 'daily':
return self.day_count / self.total_days
def transform(self, stream_in):
"""
Main generator work loop.
"""
for date, snapshot in stream_in:
new_snapshot = []
if self.emission_rate == 'daily':
for event in snapshot:
messages = self.process_event(event)
if messages is not None:
event.perf_messages = messages
event.portfolio = self.get_portfolio()
new_snapshot.append(event)
elif self.emission_rate == 'minute':
self.saved_dt = date
self.todays_performance.period_close = self.saved_dt
for event in snapshot:
self.process_event(event)
if event.type == zp.DATASOURCE_TYPE.TRADE:
event.perf_messages = []
event.portfolio = None
new_snapshot.append(event)
if new_snapshot:
new_snapshot[-1].perf_messages = [self.to_dict()]
new_snapshot[-1].portfolio = self.get_portfolio()
if new_snapshot:
yield date, new_snapshot
def set_date(self, date):
if self.emission_rate == 'minute':
self.saved_dt = date
self.todays_performance.period_close = self.saved_dt
def get_portfolio(self):
return self.cumulative_performance.as_portfolio()
@@ -279,6 +254,8 @@ class PerformanceTracker(object):
# its own configuration down the line.
# Naming as intraday to make clear that these results are
# being updated per minute
_dict['intraday_risk_metrics'] = \
self.cumulative_risk_metrics.to_dict()
_dict['intraday_perf'] = self.todays_performance.to_dict(
self.saved_dt)
@@ -286,25 +263,14 @@ class PerformanceTracker(object):
def process_event(self, event):
messages = None
self.event_count += 1
if event.type == zp.DATASOURCE_TYPE.TRADE:
messages = []
# This switch could also be handled by an inheritance
# with a DailyPerformanceTracker and a MinutePerformanceTracker
if self.emission_rate == 'daily':
while (event.dt > self.market_close and
event.dt < self.last_close):
messages.append(self.handle_market_close())
#update last sale
self.cumulative_performance.update_last_sale(event)
self.todays_performance.update_last_sale(event)
elif event.type == zp.DATASOURCE_TYPE.TRANSACTION:
# Trade simulation always follows a transaction with the
# TRADE event that was used to simulate it, so we don't
# check for end of day rollover messages here.
@@ -313,26 +279,17 @@ class PerformanceTracker(object):
event
)
self.todays_performance.execute_transaction(event)
# Transactions are consumed by performance, and not
# relayed to the next element in the generator chain.
messages = None
elif event.type == zp.DATASOURCE_TYPE.DIVIDEND:
self.cumulative_performance.add_dividend(event)
self.todays_performance.add_dividend(event)
# Dividends are consumed by performance, and not
# relayed to the next element in the generator chain.
messages = None
elif event.type == zp.DATASOURCE_TYPE.ORDER:
self.cumulative_performance.record_order(event)
self.todays_performance.record_order(event)
messages = None
elif event.type == zp.DATASOURCE_TYPE.CUSTOM:
# we just want to relay this event unchanged.
messages = []
return messages
pass
elif event.type == zp.DATASOURCE_TYPE.BENCHMARK:
self.all_benchmark_returns[event.dt] = event.returns
@@ -340,7 +297,14 @@ class PerformanceTracker(object):
self.cumulative_performance.calculate_performance()
self.todays_performance.calculate_performance()
return messages
def handle_minute_close(self, dt):
#update risk metrics for cumulative performance
algorithm_returns = pd.Series({dt: self.todays_performance.returns})
benchmark_returns = pd.Series({dt: self.all_benchmark_returns[dt]})
self.cumulative_risk_metrics.update(dt,
algorithm_returns,
benchmark_returns)
def handle_market_close(self):
# add the return results from today to the list of DailyReturn objects.
@@ -405,14 +369,6 @@ class PerformanceTracker(object):
When the simulation is complete, run the full period risk report
and send it out on the results socket.
"""
# the stream will end on the last trading day, but will
# not trigger an end of day, so we trigger the final
# market close(s) here
perf_messages = []
while self.last_close > self.market_close:
perf_messages.append(self.handle_market_close())
perf_messages.append(self.handle_market_close())
log_msg = "Simulated {n} trading days out of {m}."
log.info(log_msg.format(n=int(self.day_count), m=self.total_days))
@@ -424,7 +380,7 @@ class PerformanceTracker(object):
self.risk_report = risk.RiskReport(self.returns, self.sim_params)
risk_dict = self.risk_report.to_dict()
return perf_messages, risk_dict
return risk_dict
class Position(object):
+25 -5
View File
@@ -537,8 +537,20 @@ class RiskMetricsIterative(RiskMetricsBase):
self.trading_days = all_trading_days[mask]
self.algorithm_returns_cont = pd.Series(index=self.trading_days)
self.benchmark_returns_cont = pd.Series(index=self.trading_days)
self.sim_params = sim_params
if sim_params.emission_rate == 'daily':
self.algorithm_returns_cont = pd.Series(index=self.trading_days)
self.benchmark_returns_cont = pd.Series(index=self.trading_days)
elif sim_params.emission_rate == 'minute':
self.algorithm_returns_cont = pd.Series(index=pd.date_range(
sim_params.first_open, sim_params.last_close,
freq="Min"))
self.benchmark_returns_cont = pd.Series(index=pd.date_range(
sim_params.first_open, sim_params.last_close,
freq="Min"))
self.algorithm_returns = None
self.benchmark_returns = None
@@ -629,9 +641,6 @@ algorithm_returns ({algo_count}) in range {start} : {end} on {dt}"
'treasury_period_return': self.treasury_period_return,
'algorithm_period_return': self.algorithm_period_returns[-1],
'benchmark_period_return': self.benchmark_period_returns[-1],
'sharpe': self.sharpe[-1],
'sortino': self.sortino[-1],
'information': self.information[-1],
'beta': self.beta[-1],
'alpha': self.alpha[-1],
'excess_return': self.excess_returns[-1],
@@ -639,6 +648,17 @@ algorithm_returns ({algo_count}) in range {start} : {end} on {dt}"
'period_label': period_label
}
if self.sim_params.emission_rate == 'daily':
# Some risk metrics only make sense in a context of daily
# risk calculations.
rval['sharpe'] = self.sharpe[-1]
rval['sortino'] = self.sortino[-1]
rval['information'] = self.information[-1]
elif self.sim_params.emission_rate == 'minute':
rval['sharpe'] = 0.0
rval['sortino'] = 0.0
rval['information'] = 0.0
# check if a field in rval is nan, and replace it with
# None.
def check_entry(key, value):
+59 -54
View File
@@ -68,11 +68,6 @@ class Blotter(object):
Main generator work loop.
"""
for date, snapshot in stream_in:
# relay any orders placed in prior snapshot
# handling and reset the internal holding pen
if self.new_orders:
yield date, self.new_orders
self.new_orders = []
results = []
for event in snapshot:
@@ -85,6 +80,9 @@ class Blotter(object):
yield date, results
def process_trade(self, trade_event):
if trade_event.type != DATASOURCE_TYPE.TRADE:
return [], []
if zp_math.tolerant_equals(trade_event.volume, 0):
# there are zero volume trade_events bc some stocks trade
# less frequently than once per minute.
@@ -103,7 +101,8 @@ class Blotter(object):
txns = self.transact(trade_event, current_orders)
for txn in txns:
self.orders[txn.order_id].filled += txn.amount
# mark the date of the order to match the txn
# mark the date of the order to match the transaction
# that is filling it.
self.orders[txn.order_id].dt = txn.dt
modified_orders = [order for order
@@ -262,23 +261,10 @@ class TradeSimulationClient(object):
"""
Main generator work loop.
"""
# Simulate filling any open orders made by the previous run of
# the user's algorithm. Fills the Transaction field on any
# event that results in a filled order.
with_filled_orders = self.blotter.transform(stream_in)
# Pipe the events with transactions to perf. This will remove
# the TRANSACTION field added by TransactionSimulator and replace it
# with a portfolio field to be passed to the user's
# algorithm. Also adds a perf_messages field which is usually
# empty, but contains update messages once per day.
with_portfolio = self.perf_tracker.transform(with_filled_orders)
# Pass the messages from perf to the user's algorithm for simulation.
# Events are batched by dt so that the algo handles all events for a
# given timestamp at one one go.
performance_messages = self.algo_sim.transform(with_portfolio)
performance_messages = self.algo_sim.transform(stream_in)
# The algorithm will yield a daily_results message (as
# calculated by the performance tracker) at the end of each
@@ -407,41 +393,57 @@ class AlgorithmSimulator(object):
# snapshot time to any log record generated.
with self.processor.threadbound():
updated = False
bm_updated = False
for date, snapshot in stream:
# We're still in the warmup period. Use the event to
self.perf_tracker.set_date(date)
# If we're still in the warmup period. Use the event to
# update our universe, but don't yield any perf messages,
# and don't send a snapshot to handle_data.
if date < self.algo_start:
for event in snapshot:
del event['perf_messages']
self.update_universe(event)
if event.type in (DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.CUSTOM):
self.update_universe(event)
self.perf_tracker.process_event(event)
# Regular snapshot. Update the universe and send a snapshot
# to handle data.
else:
for event in snapshot:
for perf_message in event.perf_messages:
# append current values of recorded vars
# to emitted message
perf_message[self.perf_key]['recorded_vars'] =\
self.algo.recorded_vars
yield perf_message
del event['perf_messages']
self.update_universe(event)
for event in snapshot:
if event.type in (DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.CUSTOM):
self.update_universe(event)
updated = True
if event.type == DATASOURCE_TYPE.BENCHMARK:
bm_updated = True
txns, orders = self.blotter.process_trade(event)
for data in chain([event], txns, orders):
self.perf_tracker.process_event(data)
# Update our portfolio.
self.algo.set_portfolio(self.perf_tracker.get_portfolio())
# Send the current state of the universe
# to the user's algo.
self.simulate_snapshot(date)
if updated:
self.simulate_snapshot(date)
updated = False
perf_messages, risk_message = \
self.perf_tracker.handle_simulation_end()
# run orders placed in the algorithm call
# above through perf tracker before emitting
# the perf packet, so that the perf includes
# placed orders
for order in self.blotter.new_orders:
self.perf_tracker.process_event(order)
self.blotter.new_orders = []
if self.perf_tracker.emission_rate == 'daily':
for message in perf_messages:
message[self.perf_key]['recorded_vars'] =\
self.algo.recorded_vars
yield message
# The benchmark is our internal clock. When it
# updates, we need to emit a performance message.
if bm_updated:
bm_updated = False
yield self.get_message(date)
risk_message = self.perf_tracker.handle_simulation_end()
# When emitting minutely, it is still useful to have a final
# packet with the entire days performance rolled up.
@@ -455,20 +457,24 @@ class AlgorithmSimulator(object):
yield risk_message
def get_message(self, date):
rvars = self.algo.recorded_vars
if self.perf_tracker.emission_rate == 'daily':
perf_message = \
self.perf_tracker.handle_market_close()
perf_message['daily_perf']['recorded_vars'] = rvars
return perf_message
elif self.perf_tracker.emission_rate == 'minute':
self.perf_tracker.handle_minute_close(date)
perf_message = self.perf_tracker.to_dict()
perf_message['intraday_perf']['recorded_vars'] = rvars
return perf_message
def update_universe(self, event):
"""
Update the universe with new event information.
"""
# Update our portfolio.
self.algo.set_portfolio(event.portfolio)
# the portfolio is modified by each event passed into the
# performance tracker (prices and amounts can change).
# Performance tracker sends back an up-to-date portfolio
# with each event. However, we provide the portfolio to
# the algorithm via a setter method, rather than as part
# of the event data sent to handle_data. To avoid
# confusion, we remove it from the event here.
del event.portfolio
# Update our knowledge of this event's sid
sid_data = self.universe[event.sid]
sid_data.__dict__.update(event.__dict__)
@@ -482,7 +488,6 @@ class AlgorithmSimulator(object):
# log/print lines.
self.snapshot_dt = date
self.algo.set_datetime(self.snapshot_dt)
self.algo.handle_data(self.universe)
# Update the simulation time.
self.simulation_dt = date
self.algo.handle_data(self.universe)