mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 03:48:58 +08:00
Date bookkeeping fixes in perf and risk
Issues appeared when we were close to the end of our historical data. Yielding DONE event with both perf and risk messages now
This commit is contained in:
@@ -39,7 +39,8 @@ class TestTransformAlgorithm(TestCase):
|
||||
)
|
||||
self.source = SpecificEquityTrades(event_list=trade_history)
|
||||
|
||||
self.df_source, self.df = factory.create_test_df_source()
|
||||
self.df_source, self.df = \
|
||||
factory.create_test_df_source(self.trading_environment)
|
||||
|
||||
def test_source_as_input(self):
|
||||
algo = TestRegisterTransformAlgorithm(sids=[133])
|
||||
|
||||
+51
-19
@@ -18,10 +18,14 @@ import copy
|
||||
import random
|
||||
import datetime
|
||||
import pytz
|
||||
import itertools
|
||||
from operator import attrgetter
|
||||
|
||||
import zipline.utils.factory as factory
|
||||
import zipline.finance.performance as perf
|
||||
from zipline.utils.protocol_utils import ndict
|
||||
from zipline.gens.sort import date_sort
|
||||
from zipline.protocol import DATASOURCE_TYPE
|
||||
|
||||
from zipline.finance.trading import TradingEnvironment
|
||||
|
||||
@@ -539,7 +543,8 @@ shares in position"
|
||||
price_list,
|
||||
volume,
|
||||
trade_time_increment,
|
||||
self.trading_environment
|
||||
self.trading_environment,
|
||||
source_id="factory1"
|
||||
)
|
||||
|
||||
sid2 = 134
|
||||
@@ -550,13 +555,18 @@ shares in position"
|
||||
price2_list,
|
||||
volume,
|
||||
trade_time_increment,
|
||||
self.trading_environment
|
||||
self.trading_environment,
|
||||
source_id="factory2"
|
||||
)
|
||||
|
||||
trade_history.extend(trade_history2)
|
||||
|
||||
self.trading_environment.period_start = trade_history[0].dt
|
||||
self.trading_environment.period_end = trade_history[-1].dt
|
||||
self.trading_environment.first_open = \
|
||||
self.trading_environment.calculate_first_open()
|
||||
self.trading_environment.last_close = \
|
||||
self.trading_environment.calculate_last_close()
|
||||
self.trading_environment.capital_base = 1000.0
|
||||
self.trading_environment.frame_index = [
|
||||
'sid',
|
||||
@@ -568,21 +578,26 @@ shares in position"
|
||||
self.trading_environment
|
||||
)
|
||||
|
||||
for event in trade_history:
|
||||
#create a transaction for all but
|
||||
#first trade in each sid, to simulate None transaction
|
||||
if(event.dt != self.trading_environment.period_start):
|
||||
txn = ndict({
|
||||
'sid': event.sid,
|
||||
'amount': -25,
|
||||
'dt': event.dt,
|
||||
'price': 10.0,
|
||||
'commission': 0.50
|
||||
})
|
||||
else:
|
||||
txn = None
|
||||
event['TRANSACTION'] = txn
|
||||
perf_tracker.process_event(event)
|
||||
# date_sort requires 'DONE' messages from each source
|
||||
events = itertools.chain(trade_history,
|
||||
[ndict({
|
||||
'source_id': 'factory1',
|
||||
'dt': 'DONE',
|
||||
'type': DATASOURCE_TYPE.TRADE
|
||||
}),
|
||||
ndict({
|
||||
'source_id': 'factory2',
|
||||
'dt': 'DONE',
|
||||
'type': DATASOURCE_TYPE.TRADE
|
||||
})])
|
||||
events = date_sort(events, ('factory1', 'factory2'))
|
||||
events = itertools.chain(events,
|
||||
[ndict({'dt': 'DONE'})])
|
||||
|
||||
events = [self.event_with_txn(event) for event in events]
|
||||
|
||||
list(perf_tracker.transform(
|
||||
itertools.groupby(events, attrgetter('dt'))))
|
||||
|
||||
#we skip two trades, to test case of None transaction
|
||||
txn_count = len(trade_history) - 2
|
||||
@@ -592,6 +607,23 @@ shares in position"
|
||||
expected_size = txn_count / 2 * -25
|
||||
self.assertEqual(cumulative_pos.amount, expected_size)
|
||||
|
||||
self.assertEqual(perf_tracker.period_end.
|
||||
replace(hour=0, minute=0, second=0),
|
||||
self.assertEqual(perf_tracker.last_close,
|
||||
perf_tracker.cumulative_risk_metrics.end_date)
|
||||
|
||||
def event_with_txn(self, event):
|
||||
#create a transaction for all but
|
||||
#first trade in each sid, to simulate None transaction
|
||||
if event.dt != self.trading_environment.period_start \
|
||||
and event.dt != 'DONE':
|
||||
txn = ndict({
|
||||
'sid': event.sid,
|
||||
'amount': -25,
|
||||
'dt': event.dt,
|
||||
'price': 10.0,
|
||||
'commission': 0.50
|
||||
})
|
||||
else:
|
||||
txn = None
|
||||
event['TRANSACTION'] = txn
|
||||
|
||||
return event
|
||||
|
||||
@@ -87,10 +87,10 @@ class RiskCompareIterativeToBatch(unittest.TestCase):
|
||||
#assert that when original raises exception, same
|
||||
#exception is raised by risk_metrics_refactor
|
||||
np.testing.assert_raises(
|
||||
type(e), risk_metrics_refactor.update, ret)
|
||||
type(e), risk_metrics_refactor.update, todays_date, ret)
|
||||
continue
|
||||
|
||||
risk_metrics_refactor.update(ret)
|
||||
risk_metrics_refactor.update(todays_date, ret)
|
||||
|
||||
self.assertEqual(
|
||||
risk_metrics_original.start_date,
|
||||
|
||||
@@ -159,11 +159,11 @@ class PerformanceTracker(object):
|
||||
|
||||
self.trading_environment = trading_environment
|
||||
self.trading_day = datetime.timedelta(hours=6, minutes=30)
|
||||
self.calendar_day = datetime.timedelta(hours=24)
|
||||
self.started_at = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
|
||||
|
||||
self.period_start = self.trading_environment.period_start
|
||||
self.period_end = self.trading_environment.period_end
|
||||
self.last_close = self.trading_environment.last_close
|
||||
self.market_open = self.trading_environment.first_open
|
||||
self.market_close = self.market_open + self.trading_day
|
||||
self.progress = 0.0
|
||||
@@ -211,17 +211,23 @@ class PerformanceTracker(object):
|
||||
Main generator work loop.
|
||||
"""
|
||||
for date, snapshot in stream_in:
|
||||
yield date, [self._transform_event(event) for event in snapshot]
|
||||
new_snapshot = []
|
||||
|
||||
def _transform_event(self, event):
|
||||
if event.dt == "DONE":
|
||||
event.perf_message = self.handle_simulation_end()
|
||||
else:
|
||||
event.perf_message = self.process_event(event)
|
||||
event.portfolio = self.get_portfolio()
|
||||
for event in snapshot:
|
||||
if date != "DONE":
|
||||
event.perf_message = self.process_event(event)
|
||||
event.portfolio = self.get_portfolio()
|
||||
else:
|
||||
# the stream will end on the last trading day, but will
|
||||
# not trigger an end of day, so we trigger the final
|
||||
# market close here
|
||||
event.perf_message = self.handle_market_close()
|
||||
event.risk_message = self.handle_simulation_end()
|
||||
|
||||
del event['TRANSACTION']
|
||||
return event
|
||||
del event['TRANSACTION']
|
||||
new_snapshot.append(event)
|
||||
|
||||
yield date, new_snapshot
|
||||
|
||||
def get_portfolio(self):
|
||||
return self.cumulative_performance.as_portfolio()
|
||||
@@ -249,7 +255,7 @@ class PerformanceTracker(object):
|
||||
assert isinstance(event, ndict)
|
||||
self.event_count += 1
|
||||
|
||||
if(event.dt >= self.market_close):
|
||||
if(event.dt > self.market_close):
|
||||
message = self.handle_market_close()
|
||||
|
||||
if event.TRANSACTION:
|
||||
@@ -279,6 +285,7 @@ class PerformanceTracker(object):
|
||||
|
||||
#update risk metrics for cumulative performance
|
||||
self.cumulative_risk_metrics.update(
|
||||
self.market_close,
|
||||
self.todays_performance.returns)
|
||||
|
||||
# increment the day counter before we move markers forward.
|
||||
@@ -290,15 +297,23 @@ class PerformanceTracker(object):
|
||||
# browser.
|
||||
daily_update = self.to_dict()
|
||||
|
||||
# On the last day of the test, don't create tomorrow's performance
|
||||
# period. We may not be able to find the next trading day if we're
|
||||
# at the end of our historical data
|
||||
if self.market_close >= self.last_close:
|
||||
return daily_update
|
||||
|
||||
#move the market day markers forward
|
||||
self.market_open = self.market_open + self.calendar_day
|
||||
|
||||
while not self.trading_environment.is_trading_day(self.market_open):
|
||||
if self.market_open > self.trading_environment.trading_days[-1]:
|
||||
raise Exception(
|
||||
"Attempt to backtest beyond available history.")
|
||||
self.market_open = self.market_open + self.calendar_day
|
||||
next_open = self.trading_environment.next_trading_day(self.market_open)
|
||||
if next_open is None:
|
||||
raise Exception(
|
||||
"Attempt to backtest beyond available history. \
|
||||
Last successful date: %s" % self.market_open)
|
||||
|
||||
# next_open is a midnight date, but we want the time too
|
||||
self.market_open = next_open.replace(hour=self.market_open.hour,
|
||||
minute=self.market_open.minute,
|
||||
second=self.market_open.second)
|
||||
self.market_close = self.market_open + self.trading_day
|
||||
|
||||
# Roll over positions to current day.
|
||||
@@ -323,10 +338,8 @@ class PerformanceTracker(object):
|
||||
log.info(log_msg.format(n=self.day_count, m=self.total_days))
|
||||
log.info("first open: {d}".format(
|
||||
d=self.trading_environment.first_open))
|
||||
|
||||
# the stream will end on the last trading day, but will not trigger
|
||||
# an end of day, so we trigger the final market close here.
|
||||
self.handle_market_close()
|
||||
log.info("last close: {d}".format(
|
||||
d=self.trading_environment.last_close))
|
||||
|
||||
self.risk_report = risk.RiskReport(
|
||||
self.returns,
|
||||
|
||||
+2
-13
@@ -423,7 +423,7 @@ class RiskMetricsIterative(RiskMetricsBase):
|
||||
if x.date >= self.start_date
|
||||
]
|
||||
|
||||
def update(self, returns_in_period):
|
||||
def update(self, market_close, returns_in_period):
|
||||
if self.trading_environment.is_trading_day(self.end_date):
|
||||
self.algorithm_returns.append(returns_in_period)
|
||||
self.benchmark_returns.append(
|
||||
@@ -431,18 +431,7 @@ class RiskMetricsIterative(RiskMetricsBase):
|
||||
self.trading_days += 1
|
||||
self.update_compounded_log_returns()
|
||||
|
||||
next_trading_day = \
|
||||
self.trading_environment.next_trading_day(self.end_date)
|
||||
|
||||
if next_trading_day:
|
||||
self.end_date = next_trading_day
|
||||
else:
|
||||
message = "No trading data on or after {dt}. Check \
|
||||
that date doesn't exceed benchmark history range."
|
||||
message = message.format(dt=self.end_date)
|
||||
raise Exception(message)
|
||||
|
||||
self.end_date = self.end_date.replace(hour=0, minute=0, second=0)
|
||||
self.end_date = market_close
|
||||
|
||||
self.algorithm_period_returns.append(
|
||||
self.calculate_period_returns(self.algorithm_returns))
|
||||
|
||||
@@ -209,6 +209,7 @@ class AlgorithmSimulator(object):
|
||||
if date == 'DONE':
|
||||
for event in snapshot:
|
||||
yield event.perf_message
|
||||
yield event.risk_message
|
||||
raise StopIteration
|
||||
|
||||
# We're still in the warmup period. Use the event to
|
||||
|
||||
@@ -127,12 +127,13 @@ def get_next_trading_dt(current, interval, trading_calendar):
|
||||
return next
|
||||
|
||||
|
||||
def create_trade_history(sid, prices, amounts, interval, trading_calendar):
|
||||
def create_trade_history(sid, prices, amounts, interval, trading_calendar,
|
||||
source_id="test_factory"):
|
||||
trades = []
|
||||
current = trading_calendar.first_open
|
||||
|
||||
for price, amount in zip(prices, amounts):
|
||||
trade = create_trade(sid, price, amount, current)
|
||||
trade = create_trade(sid, price, amount, current, source_id)
|
||||
trades.append(trade)
|
||||
current = get_next_trading_dt(current, interval, trading_calendar)
|
||||
|
||||
@@ -272,9 +273,13 @@ def create_trade_source(sids, trade_count,
|
||||
return source
|
||||
|
||||
|
||||
def create_test_df_source():
|
||||
start = pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
|
||||
end = pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
|
||||
def create_test_df_source(trading_calendar=None):
|
||||
start = trading_calendar.first_open \
|
||||
if trading_calendar else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
|
||||
|
||||
end = trading_calendar.last_close \
|
||||
if trading_calendar else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
|
||||
|
||||
index = pd.DatetimeIndex(start=start, end=end, freq=pd.datetools.day)
|
||||
x = np.arange(0, len(index))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user