From d8af3fb92ea6e5f898df25681c817c849f989713 Mon Sep 17 00:00:00 2001 From: Jean Bredeche Date: Wed, 20 Jul 2016 09:08:46 -0400 Subject: [PATCH] ENH: Augment `data.can_trade` to check whether the asset's exchange is currently open. --- tests/test_bar_data.py | 63 ++++++++++++++++++----- zipline/_protocol.pyx | 34 ++++++++----- zipline/assets/_assets.pyx | 65 ++++++++++++++++++------ zipline/data/daily_history_aggregator.py | 25 +++++---- zipline/data/data_portal.py | 4 +- 5 files changed, 136 insertions(+), 55 deletions(-) diff --git a/tests/test_bar_data.py b/tests/test_bar_data.py index f3d253cb..b5edbac2 100644 --- a/tests/test_bar_data.py +++ b/tests/test_bar_data.py @@ -438,7 +438,7 @@ class TestMinuteBarData(WithBarDataChecks, bd.current(self.HILARIOUSLY_ILLIQUID_ASSET, "volume") ) - def test_can_trade_at_midnight(self): + def test_can_trade_during_non_market_hours(self): # make sure that if we use `can_trade` at midnight, we don't pretend # we're in the previous day's last minute the_day_after = self.trading_calendar.next_session_label( @@ -453,19 +453,39 @@ class TestMinuteBarData(WithBarDataChecks, with handle_non_market_minutes(bar_data): self.assertFalse(bar_data.can_trade(asset)) - # but make sure it works when the assets are alive + # NYSE is closed at midnight, so even if the asset is alive, can_trade + # should return False bar_data2 = BarData( self.data_portal, lambda: self.equity_minute_bar_days[1], "minute", ) for asset in [self.ASSET1, self.HILARIOUSLY_ILLIQUID_ASSET]: - self.assertTrue(bar_data2.can_trade(asset)) + self.assertFalse(bar_data2.can_trade(asset)) with handle_non_market_minutes(bar_data2): - self.assertTrue(bar_data2.can_trade(asset)) + self.assertFalse(bar_data2.can_trade(asset)) - def test_is_stale_at_midnight(self): + def test_can_trade_exchange_closed(self): + session = self.equity_minute_bar_days[1] + session_open, session_close = \ + self.trading_calendar.open_and_close_for_session(session) + + one_minute = pd.Timedelta(minutes=1) + + minutes_to_check = [ + (session_open - one_minute, False), + (session_open, True), + (session_close - one_minute, True), + (session_close, True), + (session_close + one_minute, False) + ] + + for info in minutes_to_check: + bar_data = BarData(self.data_portal, lambda: info[0], "minute") + self.assertEqual(info[1], bar_data.can_trade(self.ASSET1)) + + def test_is_stale_during_non_market_hours(self): bar_data = BarData( self.data_portal, lambda: self.equity_minute_bar_days[1], @@ -644,13 +664,20 @@ class TestDailyBarData(WithBarDataChecks, ) cls.ASSETS = [cls.ASSET1, cls.ASSET2] + def get_last_minute_of_session(self, session_label): + return self.trading_calendar.open_and_close_for_session( + session_label + )[1] + def test_day_before_assets_trading(self): # use the day before self.bcolz_daily_bar_days[0] - day = self.trading_calendar.previous_session_label( - self.equity_daily_bar_days[0] + minute = self.get_last_minute_of_session( + self.trading_calendar.previous_session_label( + self.equity_daily_bar_days[0] + ) ) - bar_data = BarData(self.data_portal, lambda: day, "daily") + bar_data = BarData(self.data_portal, lambda: minute, "daily") self.check_internal_consistency(bar_data) self.assertFalse(bar_data.can_trade(self.ASSET1)) @@ -674,7 +701,9 @@ class TestDailyBarData(WithBarDataChecks, # on self.equity_daily_bar_days[0], only asset1 has data bar_data = BarData( self.data_portal, - lambda: self.equity_daily_bar_days[0], + lambda: self.get_last_minute_of_session( + self.equity_daily_bar_days[0] + ), "daily", ) self.check_internal_consistency(bar_data) @@ -709,7 +738,9 @@ class TestDailyBarData(WithBarDataChecks, def test_fully_active_day(self): bar_data = BarData( self.data_portal, - lambda: self.equity_daily_bar_days[1], + lambda: self.get_last_minute_of_session( + self.equity_daily_bar_days[1] + ), "daily", ) self.check_internal_consistency(bar_data) @@ -733,7 +764,9 @@ class TestDailyBarData(WithBarDataChecks, def test_last_active_day(self): bar_data = BarData( self.data_portal, - lambda: self.equity_daily_bar_days[-1], + lambda: self.get_last_minute_of_session( + self.equity_daily_bar_days[-1] + ), "daily", ) self.check_internal_consistency(bar_data) @@ -751,11 +784,13 @@ class TestDailyBarData(WithBarDataChecks, def test_after_assets_dead(self): # both assets end on self.day[-1], so let's try the next day - next_day = self.trading_calendar.next_session_label( - self.equity_daily_bar_days[-1] + minute = self.get_last_minute_of_session( + self.trading_calendar.next_session_label( + self.equity_daily_bar_days[-1] + ) ) - bar_data = BarData(self.data_portal, lambda: next_day, "daily") + bar_data = BarData(self.data_portal, lambda: minute, "daily") self.check_internal_consistency(bar_data) for asset in self.ASSETS: diff --git a/zipline/_protocol.pyx b/zipline/_protocol.pyx index 33d0df5b..5851d7c0 100644 --- a/zipline/_protocol.pyx +++ b/zipline/_protocol.pyx @@ -426,9 +426,11 @@ cdef class BarData: @check_parameters(('assets',), (Asset,)) def can_trade(self, assets): """ - For the given asset or iterable of assets, returns true if the asset - is alive at the current simulation time and there is a known last - price. + For the given asset or iterable of assets, returns true if all of the + following are true: + - the asset is alive at the current simulation time + - the asset's exchange is open at the current simulation time + - there is a known last price for the asset. Parameters ---------- @@ -460,15 +462,21 @@ cdef class BarData: }) cdef bool _can_trade_for_asset(self, asset, dt, adjusted_dt, data_portal): - if asset._is_alive(dt, False): - # is there a last price? - return not np.isnan( - data_portal.get_spot_value( - asset, "price", adjusted_dt, self.data_frequency - ) - ) + session_label = normalize_date(dt) # FIXME + if not asset._is_alive_for_session(session_label): + # asset isn't alive + return False - return False + if not asset._asset_exchange_open(dt): + # exchange isn't open + return False + + # is there a last price? + return not np.isnan( + data_portal.get_spot_value( + asset, "price", adjusted_dt, self.data_frequency + ) + ) @check_parameters(('assets',), (Asset,)) def is_stale(self, assets): @@ -511,7 +519,9 @@ cdef class BarData: }) cdef bool _is_stale_for_asset(self, asset, dt, adjusted_dt, data_portal): - if not asset._is_alive(dt, False): + session_label = normalize_date(dt) # FIXME + + if not asset._is_alive_for_session(session_label): return False current_volume = data_portal.get_spot_value( diff --git a/zipline/assets/_assets.pyx b/zipline/assets/_assets.pyx index fb636c1d..311418f2 100644 --- a/zipline/assets/_assets.pyx +++ b/zipline/assets/_assets.pyx @@ -34,6 +34,8 @@ from numpy cimport int64_t import warnings cimport numpy as np +from zipline.utils.calendars import get_calendar + # IMPORTANT NOTE: You must change this template if you change # Asset.__reduce__, or else we'll attempt to unpickle an old version of this @@ -42,6 +44,16 @@ from pandas.tslib import normalize_date CACHE_FILE_TEMPLATE = '/tmp/.%s-%s.v6.cache' +FUTURE_EXCHANGE_MAPPING = { + "CFE": "CFE", + "CBOT": "CME", + "CME": "CME", + "COMEX": "CME", + "NYMEX": "CME", + "ICEUS": "ICE", + "NYFE": "ICE" +} + cdef class Asset: cdef readonly int sid @@ -89,6 +101,8 @@ cdef class Asset: self.first_traded = first_traded self.auto_close_date = auto_close_date + + def __int__(self): return self.sid @@ -187,37 +201,58 @@ cdef class Asset: """ return cls(**dict_) - def _is_alive(self, dt, bool normalized): + def _is_alive_for_session(self, session_label): """ Returns whether the asset is alive at the given dt. Parameters ---------- - dt: pd.Timestamp - The desired timestamp. - - normalized: boolean - Whether the date has already been normalized. If not, we need - to first normalize the date before doing the alive check. If the - date is already normalized, this method runs up to 80% faster. + session_label: pd.Timestamp + The desired session label to check. (midnight UTC) Returns ------- boolean: whether the asset is alive at the given dt. """ - cdef int64_t dt_value cdef int64_t ref_start cdef int64_t ref_end - if not normalized: - dt_value = normalize_date(dt).value - else: - dt_value = dt.value - ref_start = self.start_date.value ref_end = self.end_date.value - return ref_start <= dt_value <= ref_end + return ref_start <= session_label.value <= ref_end + + def _asset_exchange_open(self, dt_minute): + """ + Parameters + ---------- + dt_minute: pd.Timestamp (UTC, tz-aware) + The minute to check. + + Returns + ------- + boolean: whether the asset's exchange is open at the given minute. + """ + calendar = self._exchange_trading_calendar_for_asset() + return calendar.is_open_on_minute(dt_minute) + + def _exchange_trading_calendar_for_asset(self): + """ + Get the calendar for this asset's exchange. + + Raises KeyError if the asset's exchange calendar cannot be found. + + Returns + ------- + The asset's exchange's trading calendar. + """ + if isinstance(self, Equity): + # FIXME: probably too Quantopian-specific + return get_calendar("NYSE") + else: + asset_exchange_str = self.exchange + calendar_str = FUTURE_EXCHANGE_MAPPING[asset_exchange_str] + return get_calendar(calendar_str) cdef class Equity(Asset): diff --git a/zipline/data/daily_history_aggregator.py b/zipline/data/daily_history_aggregator.py index cb108004..7536e2d2 100644 --- a/zipline/data/daily_history_aggregator.py +++ b/zipline/data/daily_history_aggregator.py @@ -15,8 +15,6 @@ import numpy as np import pandas as pd -from pandas.tslib import normalize_date - class DailyHistoryAggregator(object): """ @@ -32,9 +30,10 @@ class DailyHistoryAggregator(object): """ - def __init__(self, market_opens, minute_reader): + def __init__(self, market_opens, minute_reader, trading_calendar): self._market_opens = market_opens self._minute_reader = minute_reader + self._trading_calendar = trading_calendar # The caches are structured as (date, market_open, entries), where # entries is a dict of asset -> (last_visited_dt, value) @@ -97,10 +96,10 @@ class DailyHistoryAggregator(object): market_open, prev_dt, dt_value, entries = self._prelude(dt, 'open') opens = [] - normalized_date = normalize_date(dt) + session_label = self._trading_calendar.minute_to_session_label(dt) for asset in assets: - if not asset._is_alive(normalized_date, True): + if not asset._is_alive_for_session(session_label): opens.append(np.NaN) continue @@ -166,10 +165,10 @@ class DailyHistoryAggregator(object): market_open, prev_dt, dt_value, entries = self._prelude(dt, 'high') highs = [] - normalized_date = normalize_date(dt) + session_label = self._trading_calendar.minute_to_session_label(dt) for asset in assets: - if not asset._is_alive(normalized_date, True): + if not asset._is_alive_for_session(session_label): highs.append(np.NaN) continue @@ -235,10 +234,10 @@ class DailyHistoryAggregator(object): market_open, prev_dt, dt_value, entries = self._prelude(dt, 'low') lows = [] - normalized_date = normalize_date(dt) + session_label = self._trading_calendar.minute_to_session_label(dt) for asset in assets: - if not asset._is_alive(normalized_date, True): + if not asset._is_alive_for_session(session_label): lows.append(np.NaN) continue @@ -305,10 +304,10 @@ class DailyHistoryAggregator(object): market_open, prev_dt, dt_value, entries = self._prelude(dt, 'close') closes = [] - normalized_dt = normalize_date(dt) + session_label = self._trading_calendar.minute_to_session_label(dt) for asset in assets: - if not asset._is_alive(normalized_dt, True): + if not asset._is_alive_for_session(session_label): closes.append(np.NaN) continue @@ -365,10 +364,10 @@ class DailyHistoryAggregator(object): market_open, prev_dt, dt_value, entries = self._prelude(dt, 'volume') volumes = [] - normalized_date = normalize_date(dt) + session_label = self._trading_calendar.minute_to_session_label(dt) for asset in assets: - if not asset._is_alive(normalized_date, True): + if not asset._is_alive_for_session(session_label): volumes.append(0) continue diff --git a/zipline/data/data_portal.py b/zipline/data/data_portal.py index 80b13988..54f26d50 100644 --- a/zipline/data/data_portal.py +++ b/zipline/data/data_portal.py @@ -150,7 +150,9 @@ class DataPortal(object): if self._equity_minute_reader is not None: self._equity_daily_aggregator = DailyHistoryAggregator( self.trading_calendar.schedule.market_open, - self._equity_minute_reader) + self._equity_minute_reader, + self.trading_calendar + ) self._equity_minute_history_loader = USEquityMinuteHistoryLoader( self.trading_calendar, self._equity_minute_reader,