From c494ac64ca9c8704ad4d38b38ef17a493bdf23e2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 19 Jul 2017 11:38:32 -0700 Subject: [PATCH] WIP: Integration of generic bundle and five minute bars --- catalyst/__main__.py | 10 +- catalyst/algorithm.py | 24 +- catalyst/data/_minute_bar_internal.pyx | 79 +++++ catalyst/data/bundles/base.py | 275 ++++++++++-------- catalyst/data/bundles/base_pricing.py | 15 +- catalyst/data/bundles/core.py | 76 +++-- catalyst/data/bundles/poloniex.py | 8 +- catalyst/data/data_portal.py | 99 +++++-- catalyst/data/dispatch_bar_reader.py | 6 +- catalyst/data/five_minute_bars.py | 95 +++--- catalyst/data/loader.py | 108 ++----- catalyst/data/us_equity_pricing.py | 2 +- catalyst/examples/buy_and_hodl.py | 8 +- catalyst/examples/dual_vwap.py | 2 +- catalyst/finance/performance/tracker.py | 15 + catalyst/gens/sim_engine.pyx | 2 +- .../pipeline/loaders/crypto_pricing_loader.py | 32 +- .../pipeline/loaders/equity_pricing_loader.py | 35 ++- catalyst/sources/benchmark_source.py | 34 +++ catalyst/utils/calendars/trading_calendar.py | 71 ++++- catalyst/utils/cli.py | 1 + catalyst/utils/run_algo.py | 18 +- 22 files changed, 671 insertions(+), 344 deletions(-) diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 7ee6edf6..7dfe91b1 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -290,6 +290,13 @@ def catalyst_magic(line, cell=None): show_default=True, help='The data bundle to ingest.', ) +@click.option( + '-c', + '--compile-locally', + is_flag=True, + default=False, + help='Download dataset from source and compile bundle locally.', +) @click.option( '--assets-version', type=int, @@ -301,7 +308,7 @@ def catalyst_magic(line, cell=None): default=True, help='Print progress information to the terminal.' ) -def ingest(bundle, assets_version, show_progress): +def ingest(bundle, compile_locally, assets_version, show_progress): """Ingest the data for the given bundle. """ bundles_module.ingest( @@ -310,6 +317,7 @@ def ingest(bundle, assets_version, show_progress): pd.Timestamp.utcnow(), assets_version, show_progress, + compile_locally, ) diff --git a/catalyst/algorithm.py b/catalyst/algorithm.py index 8fe12d34..139a2395 100644 --- a/catalyst/algorithm.py +++ b/catalyst/algorithm.py @@ -308,7 +308,10 @@ class TradingAlgorithm(object): self.asset_finder = self.trading_environment.asset_finder # Initialize Pipeline API data. - self.init_engine(kwargs.pop('get_pipeline_loader', None)) + self.init_engine( + kwargs.pop('get_pipeline_loader', None), + self.sim_params.data_frequency, + ) self._pipelines = {} # Create an always-expired cache so that we compute the first time data # is requested. @@ -422,16 +425,31 @@ class TradingAlgorithm(object): self.restrictions = NoRestrictions() - def init_engine(self, get_loader): + def init_engine(self, get_loader, data_frequency): """ Construct and store a PipelineEngine from loader. If get_loader is None, constructs an ExplodingPipelineEngine """ if get_loader is not None: + if data_frequency == 'daily': + all_dates = self.trading_calendar.all_sessions + elif data_frequency == '5-minute': + all_dates = self.trading_calendar.all_five_minutes + elif data_frequency == 'minute': + all_dates = self.trading_calendar.all_minutes + else: + raise ValueError( + 'Cannot initialize engine with ' + 'data frequency: {}'.format(data_frequency) + ) + + print 'first_dates:', all_dates[:10] + print 'last_dates:', all_dates[:-10] + self.engine = SimplePipelineEngine( get_loader, - self.trading_calendar.all_sessions, + all_dates, self.asset_finder, ) else: diff --git a/catalyst/data/_minute_bar_internal.pyx b/catalyst/data/_minute_bar_internal.pyx index 61818ae1..9ebb0841 100644 --- a/catalyst/data/_minute_bar_internal.pyx +++ b/catalyst/data/_minute_bar_internal.pyx @@ -35,6 +35,17 @@ def minute_value(ndarray[long_t, ndim=1] market_opens, return market_opens[q] + r +@cython.cdivision(True) +def five_minute_value(ndarray[long_t, ndim=1] market_opens, + Py_ssize_t pos, + short five_minutes_per_day): + + cdef short q, r + q = cython.cdiv(pos, five_minutes_per_day) + r = cython.cmod(pos, five_minutes_per_day) + + return market_opens[q] + r + def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, long_t minute_val, @@ -88,6 +99,26 @@ def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, return (market_open_loc * minutes_per_day) + delta +def find_position_of_five_minute(ndarray[long_t, ndim=1] market_opens, + ndarray[long_t, ndim=1] market_closes, + long_t five_minute_val, + short five_minutes_per_day, + bool forward_fill): + + cdef Py_ssize_t market_open_loc, market_open, delta + + market_open_loc = \ + searchsorted(market_opens, five_minute_val, side='right') - 1 + market_open = market_opens[market_open_loc] + market_close = market_closes[market_open_loc] + + if not forward_fill and ((five_minute_val - market_open) >= five_minutes_per_day): + raise ValueError("Given five minutes is not between an open and a close") + + delta = int_min(five_minute_val - market_open, market_close - market_open) + + return (market_open_loc * five_minutes_per_day) + delta + def find_last_traded_position_internal( ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, @@ -157,3 +188,51 @@ def find_last_traded_position_internal( # we've gone to the beginning of this asset's range, and still haven't # found a trade event return -1 + +def find_last_traded_five_minute_position_internal( + ndarray[long_t, ndim=1] market_opens, + ndarray[long_t, ndim=1] market_closes, + long_t end_five_minute, + long_t start_five_minute, + volumes, + short five_minutes_per_day): + cdef Py_ssize_t minute_pos, current_minute, q + + five_minute_pos = int_min( + find_position_of_five_minute( + market_opens, + market_closes, + end_five_minute, + five_minutes_per_day, + True, + ), + len(volumes) - 1, + ) + + while five_minute_pos >= 0: + current_five_minute = five_minute_value( + market_opens, five_minute_pos, five_minutes_per_day + ) + + q = cython.cdiv(five_minute_pos, five_minutes_per_day) + if current_five_minute > market_closes[q]: + five_minute_pos = find_position_of_five_minute( + market_opens, + market_closes, + market_closes[q], + five_minutes_per_day, + False, + ) + continue + + if current_five_minute < start_five_minute: + return -1 + + if volumes[five_minute_pos] != 0: + return five_minute_pos + + five_minute_pos -= 1 + + # we've gone to the beginning of this asset's range, and still haven't + # found a trade event + return -1 diff --git a/catalyst/data/bundles/base.py b/catalyst/data/bundles/base.py index c8a6370c..55895723 100644 --- a/catalyst/data/bundles/base.py +++ b/catalyst/data/bundles/base.py @@ -33,7 +33,17 @@ from catalyst.utils.memoize import lazyval logbook.StderrHandler().push_application() log = logbook.Logger(__name__) +DEFAULT_RETRIES = 5 + class BaseBundle(object): + def __init__(self, asset_filter=[]): + self._asset_filter = asset_filter + self._reset() + + def _reset(self): + self._splits = [] + self._dividends = [] + @lazyval def name(self): raise NotImplementedError() @@ -50,6 +60,10 @@ class BaseBundle(object): def minutes_per_day(self): raise NotImplementedError() + @lazyval + def five_minutes_per_day(self): + raise NotImplementedError() + @lazyval def frequencies(self): raise NotImplementedError() @@ -101,6 +115,7 @@ class BaseBundle(object): environ, asset_db_writer, minute_bar_writer, + five_minute_bar_writer, daily_bar_writer, adjustment_writer, calendar, @@ -108,110 +123,117 @@ class BaseBundle(object): end_session, cache, show_progress, + is_compile, output_dir): - api_key = environ.get('CATALYST_API_KEY') - retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5) - use_local = environ.get('CATALYST_INGEST_LOCAL', 0) > 0 + try: + api_key = environ.get('CATALYST_API_KEY') + retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5) - if use_local: - # User has instructed local curation and ingestion of bundle. - # Fetch raw metadata for all symbols. - raw_metadata = self._fetch_metadata_frame( - api_key, - cache=cache, - retries=retries, - environ=environ, - show_progress=show_progress, - ) - - # Compile daily symbol data if bundle supports daily mode and - # persist the dataset to disk. - symbol_map = raw_metadata.symbol - if 'daily' in self.frequencies: - daily_bar_writer.write( - self._fetch_symbol_iter( - api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - 'daily', - retries, - ), - assets=raw_metadata.index, + if is_compile: + # User has instructed local compilation and ingestion of bundle. + # Fetch raw metadata for all symbols. + raw_metadata = self._fetch_metadata_frame( + api_key, + cache=cache, + retries=retries, + environ=environ, show_progress=show_progress, ) - # Post-process metadata using cached symbol frames, and write to - # disk. This metadata must be written before any attempt to write - # either minute or 5-minute data. - metadata = self._post_process_metadata( - raw_metadata, - cache, - show_progress=show_progress, - ) - asset_db_writer.write(metadata) + # Compile daily symbol data if bundle supports daily mode and + # persist the dataset to disk. + symbol_map = raw_metadata.symbol + if 'daily' in self.frequencies: + daily_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'daily', + retries, + ), + assets=raw_metadata.index, + show_progress=show_progress, + ) - # Compile 5-minute symbol data if bundle supports 5-minute mode and - # persist the dataset to disk. - if '5-minute' in self.frequencies: - #TODO(cfromknecht) replace with five_minute_bar_writer - minute_bar_writer.write( - self._fetch_symbol_iter( - api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - '5-minute', - retries, - ), + # Post-process metadata using cached symbol frames, and write to + # disk. This metadata must be written before any attempt to write + # either minute or 5-minute data. + metadata = self._post_process_metadata( + raw_metadata, + cache, show_progress=show_progress, ) + asset_db_writer.write(metadata) - # Compile minute symbol data if bundle supports minute mode and - # persist the dataset to disk. - if 'minute' in self.frequencies: - minute_bar_writer.write( - self._fetch_symbol_iter( - api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - 'minute', - retries, - ), - show_progress=show_progress, - ) + # Compile 5-minute symbol data if bundle supports 5-minute mode and + # persist the dataset to disk. + if '5-minute' in self.frequencies: + five_minute_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + '5-minute', + retries, + ), + length=len(symbol_map), + show_progress=show_progress, + ) - # For legacy purposes, this call is required to ensure the database - # contains an appropriately initialized file structure. We don't - # forsee a usecase for adjustments at this time, but may later - # choose to expose this functionality in the future. - if len(self.splits) > 0 or len(self.dividends) > 0: + # Compile minute symbol data if bundle supports minute mode and + # persist the dataset to disk. + if 'minute' in self.frequencies: + minute_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'minute', + retries, + ), + show_progress=show_progress, + ) + + # For legacy purposes, this call is required to ensure the database + # contains an appropriately initialized file structure. We don't + # forsee a usecase for adjustments at this time, but may later + # choose to expose this functionality in the future. adjustment_writer.write( splits=( pd.concat(self.splits, ignore_index=True) - #if self.splits is not None \ - #and len(self.splits) > 0 else - #None + if len(self.splits) > 0 else + None ), dividends=( pd.concat(self.dividends, ignore_index=True) - #if self.dividends is not None \ - #and len(dividends) > 0 else - #None + if len(self.dividends) > 0 else + None ), ) + else: + # Otherwise, user has instructed to download and untar bundle + # directly from the bundles `tar_url`. + self._download_and_untar(show_progress, output_dir) + except Exception as e: + log.exception( + ' Failed to ingest {name}:\n{msg}'.format( + name=self.name, + msg=str(e), + ) + ) else: - # Otherwise, user has instructed to download and untar bundle - # directly from the bundles `tar_url`. - self._download_and_untar(show_progress, output_dir) + self._reset() def _download_and_untar(self, show_progress, output_dir): # Download bundle conditioned on whether the user would like progress @@ -233,7 +255,7 @@ class BaseBundle(object): def _fetch_metadata_frame(self, api_key, cache, - retries=5, + retries=DEFAULT_RETRIES, environ=None, show_progress=False): @@ -257,7 +279,7 @@ class BaseBundle(object): for page_number in count(1): # Attempt to load metadata page from cache. If it does not exist, # poll the API upto `retries` times in order to get raw DataFrame. - key = 'metadata-page-{pn}'.format(pn=page_number) + key = 'metadata-page-{pn}.frame'.format(pn=page_number) try: raw = cache[key] except KeyError: @@ -289,6 +311,10 @@ class BaseBundle(object): # Empty DataFrame signals completion. break + # Apply selective asset filtering, useful for benchmark + # ingestion. + raw = raw[raw.symbol.isin(self._asset_filter)] + # Update cached value for key. cache[key] = raw @@ -317,8 +343,9 @@ class BaseBundle(object): # Attempt to load data from disk, the cache should have an entry # for each symbol at this point of the execution. If one does # not exist, we should fail. + key = '{sym}.daily.frame'.format(sym=symbol) try: - raw_data = cache[symbol] + raw_data = cache[key] except KeyError: raise ValueError( 'Unable to find cached data for symbol: {0}'.format(symbol) @@ -398,8 +425,9 @@ class BaseBundle(object): retries): # Attempt to load pre-existing symbol data from cache. + key = '{sym}.{freq}.frame'.format(sym=symbol, freq=data_frequency) try: - raw_data = cache[symbol] + raw_data = cache[key] except KeyError: raw_data = None @@ -418,6 +446,36 @@ class BaseBundle(object): # proceed to next symbol directly since no API call was required. return raw_data, should_sleep + # If we arrive here, we must have attempted an API call. + # Setting this flag tells the iterator to pause before starting + # the next asset, that we don't exceed the data source's rate + # limit. + should_sleep = True + + raw_data = self._fetch_symbol_frame( + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries=retries, + ) + + # Cache latest symbol data. + cache[key] = raw_data + + return raw_data, should_sleep + + def _fetch_symbol_frame(self, + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries=DEFAULT_RETRIES): + # Data for symbol is old enough to attempt an update or is not # present in the cache. Fetch raw data for a single symbol # with requested intervals and frequency. Retry as necessary. @@ -432,6 +490,7 @@ class BaseBundle(object): data_frequency, ) raw_data.index = pd.to_datetime(raw_data.index, utc=True) + raw_data.index = raw_data.index.tz_localize('UTC') # Filter incoming data to fit start and end sessions. raw_data = raw_data[ @@ -443,15 +502,7 @@ class BaseBundle(object): # previous frame is probably an incomplete. raw_data = raw_data[~raw_data.index.duplicated(keep='last')] - # Cache latest symbol data. - cache[symbol] = raw_data - - # If we arrive here, we must have attempted an API call. - # This flag tells the iterator to pause before starting the next - # asset, that we don't exceed the data source's rate limit. - should_sleep = True - - return raw_data, should_sleep + return raw_data except Exception as e: log.exception( @@ -468,35 +519,5 @@ class BaseBundle(object): ) - def _write_symbol_for_freq(self, - pricing_iter, - data_frequency, - daily_bar_writer, - minute_bar_writer, - assets, - show_progress=False): - - if data_frequency == 'daily': - daily_bar_writer.write( - pricing_iter, - assets=assets, - show_progress=show_progress, - ) - elif data_frequency == '5-minute': - # TODO(cfromknecht) replace with five minute writer - minute_bar_writer.write( - pricing_iter, - show_progress=show_progress, - ) - elif data_frequency == 'minute': - minute_bar_writer.write( - pricing_iter, - show_progress=show_progress, - ) - else: - raise ValueError( - 'Unsupported data-frequency: {0}'.format(data_frequency) - ) - def _dtypes_to_cols(dtypes): return [name for name, _ in dtypes] diff --git a/catalyst/data/bundles/base_pricing.py b/catalyst/data/bundles/base_pricing.py index 86d1f042..a0abd51a 100644 --- a/catalyst/data/bundles/base_pricing.py +++ b/catalyst/data/bundles/base_pricing.py @@ -38,9 +38,6 @@ class BasePricingBundle(BaseBundle): ] class BaseCryptoPricingBundle(BasePricingBundle): - def __init__(self): - super(BasePricingBundle, self).__init__() - @lazyval def calendar_name(self): return 'OPEN' @@ -49,6 +46,10 @@ class BaseCryptoPricingBundle(BasePricingBundle): def minutes_per_day(self): return 1440 + @lazyval + def five_minutes_per_day(self): + return 288 + @property def splits(self): return [] @@ -58,11 +59,6 @@ class BaseCryptoPricingBundle(BasePricingBundle): return [] class BaseEquityPricingBundle(BasePricingBundle): - def __init__(self): - super(BasePricingBundle, self).__init__() - self._splits = [] - self._dividends = [] - @lazyval def calendar_name(self): return 'NYSE' @@ -71,6 +67,9 @@ class BaseEquityPricingBundle(BasePricingBundle): def minutes_per_day(self): return 390 + @lazyval + def five_minutes_per_day(self): + return 78 @property def splits(self): diff --git a/catalyst/data/bundles/core.py b/catalyst/data/bundles/core.py index 6b0abd86..152f7110 100644 --- a/catalyst/data/bundles/core.py +++ b/catalyst/data/bundles/core.py @@ -17,6 +17,10 @@ from ..us_equity_pricing import ( SQLiteAdjustmentReader, SQLiteAdjustmentWriter, ) +from ..five_minute_bars import ( + BcolzFiveMinuteBarReader, + BcolzFiveMinuteBarWriter, +) from ..minute_bars import ( BcolzMinuteBarReader, BcolzMinuteBarWriter, @@ -44,16 +48,21 @@ def asset_db_path(bundle_name, timestr, environ=None, db_version=None): ) -def minute_equity_path(bundle_name, timestr, environ=None): +def minute_path(bundle_name, timestr, environ=None): return pth.data_path( - minute_equity_relative(bundle_name, timestr, environ), + minute_relative(bundle_name, timestr, environ), environ=environ, ) - -def daily_equity_path(bundle_name, timestr, environ=None): +def five_minute_path(bundle_name, timestr, environ=None): return pth.data_path( - daily_equity_relative(bundle_name, timestr, environ), + five_minute_relative(bundle_name, timestr, environ), + environ=environ, + ) + +def daily_path(bundle_name, timestr, environ=None): + return pth.data_path( + daily_relative(bundle_name, timestr, environ), environ=environ, ) @@ -80,12 +89,14 @@ def cache_relative(bundle_name, timestr, environ=None): return bundle_name, '.cache' -def daily_equity_relative(bundle_name, timestr, environ=None): - return bundle_name, timestr, 'daily_equities.bcolz' +def daily_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'daily.bcolz' +def five_minute_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'five_minute.bcolz' -def minute_equity_relative(bundle_name, timestr, environ=None): - return bundle_name, timestr, 'minute_equities.bcolz' +def minute_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'minute.bcolz' def asset_db_relative(bundle_name, timestr, environ=None, db_version=None): @@ -195,13 +206,14 @@ RegisteredBundle = namedtuple( 'start_session', 'end_session', 'minutes_per_day', + 'five_minutes_per_day', 'ingest', 'create_writers'] ) BundleData = namedtuple( 'BundleData', - 'asset_finder equity_minute_bar_reader equity_daily_bar_reader ' + 'asset_finder minute_bar_reader five_minute_bar_reader daily_bar_reader ' 'adjustment_reader', ) @@ -281,15 +293,17 @@ def _make_bundle_core(): bundles = mappingproxy(_bundles) def register_bundle(bundle_cls, + asset_filter=None, start_session=None, end_session=None, create_writers=True): - bundle = bundle_cls() + bundle = bundle_cls(asset_filter=asset_filter) return register( bundle.name, bundle.ingest, calendar_name=bundle.calendar_name, minutes_per_day=bundle.minutes_per_day, + five_minutes_per_day=bundle.five_minutes_per_day, start_session=start_session, end_session=end_session, create_writers=create_writers, @@ -298,10 +312,11 @@ def _make_bundle_core(): @curry def register(name, f, - calendar_name='NYSE', + calendar_name='OPEN', start_session=None, end_session=None, - minutes_per_day=390, + minutes_per_day=1440, + five_minutes_per_day=288, create_writers=True): """Register a data bundle ingest function. @@ -382,6 +397,7 @@ def _make_bundle_core(): start_session=start_session, end_session=end_session, minutes_per_day=minutes_per_day, + five_minutes_per_day=five_minutes_per_day, ingest=f, create_writers=create_writers, ) @@ -413,7 +429,8 @@ def _make_bundle_core(): environ=os.environ, timestamp=None, assets_versions=(), - show_progress=False): + show_progress=False, + is_compile=False): """Ingest data for a given bundle. Parameters @@ -463,7 +480,7 @@ def _make_bundle_core(): pth.data_path([], environ=environ)) ) daily_bars_path = wd.ensure_dir( - *daily_equity_relative( + *daily_relative( name, timestr, environ=environ, ) ) @@ -477,10 +494,20 @@ def _make_bundle_core(): # when we create the SQLiteAdjustmentWriter below. The # SQLiteAdjustmentWriter needs to open the daily ctables so # that it can compute the adjustment ratios for the dividends. - daily_bar_writer.write(()) + + five_minute_bar_writer = BcolzFiveMinuteBarWriter( + wd.ensure_dir(*five_minute_relative( + name, timestr, environ=environ) + ), + calendar, + start_session, + end_session, + five_minutes_per_day=bundle.five_minutes_per_day, + ) + minute_bar_writer = BcolzMinuteBarWriter( - wd.ensure_dir(*minute_equity_relative( + wd.ensure_dir(*minute_relative( name, timestr, environ=environ) ), calendar, @@ -488,6 +515,7 @@ def _make_bundle_core(): end_session, minutes_per_day=bundle.minutes_per_day, ) + assets_db_path = wd.getpath(*asset_db_relative( name, timestr, environ=environ, )) @@ -504,6 +532,7 @@ def _make_bundle_core(): ) else: daily_bar_writer = None + five_minute_bar_writer = None minute_bar_writer = None asset_db_writer = None adjustment_db_writer = None @@ -515,6 +544,7 @@ def _make_bundle_core(): environ, asset_db_writer, minute_bar_writer, + five_minute_bar_writer, daily_bar_writer, adjustment_db_writer, calendar, @@ -522,6 +552,7 @@ def _make_bundle_core(): end_session, cache, show_progress, + is_compile, pth.data_path([name, timestr], environ=environ), ) @@ -597,11 +628,14 @@ def _make_bundle_core(): asset_finder=AssetFinder( asset_db_path(name, timestr, environ=environ), ), - equity_minute_bar_reader=BcolzMinuteBarReader( - minute_equity_path(name, timestr, environ=environ), + minute_bar_reader=BcolzMinuteBarReader( + minute_path(name, timestr, environ=environ), ), - equity_daily_bar_reader=BcolzDailyBarReader( - daily_equity_path(name, timestr, environ=environ), + five_minute_bar_reader=BcolzFiveMinuteBarReader( + five_minute_path(name, timestr, environ=environ), + ), + daily_bar_reader=BcolzDailyBarReader( + daily_path(name, timestr, environ=environ), ), adjustment_reader=SQLiteAdjustmentReader( adjustment_db_path(name, timestr, environ=environ), diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index 298ba03a..111bdac0 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -36,6 +36,7 @@ class PoloniexBundle(BaseCryptoPricingBundle): def frequencies(self): return set(( 'daily', + '5-minute', )) @lazyval @@ -72,8 +73,8 @@ class PoloniexBundle(BaseCryptoPricingBundle): return raw def post_process_symbol_metadata(self, asset_id, sym_md, sym_data): - start_date = sym_data.index[0].tz_localize(None) - end_date = sym_data.index[-1].tz_localize(None) + start_date = sym_data.index[0] + end_date = sym_data.index[-1] ac_date = end_date + pd.Timedelta(days=1) return ( @@ -100,7 +101,6 @@ class PoloniexBundle(BaseCryptoPricingBundle): ), orient='records', ) - raw.set_index('date', inplace=True) scale = 1000.0 @@ -155,4 +155,4 @@ class PoloniexBundle(BaseCryptoPricingBundle): query=urlencode(query_params), ) -register_bundle(PoloniexBundle) +register_bundle(PoloniexBundle, ['USDT_BTC']) diff --git a/catalyst/data/data_portal.py b/catalyst/data/data_portal.py index 9049f337..69aa166e 100644 --- a/catalyst/data/data_portal.py +++ b/catalyst/data/data_portal.py @@ -42,6 +42,7 @@ from catalyst.assets.roll_finder import ( ) from catalyst.data.dispatch_bar_reader import ( AssetDispatchMinuteBarReader, + AssetDispatchFiveMinuteBarReader, AssetDispatchSessionBarReader ) from catalyst.data.resample import ( @@ -114,12 +115,16 @@ class DataPortal(object): The calendar instance used to provide minute->session information. first_trading_day : pd.Timestamp The first trading day for the simulation. - equity_daily_reader : BcolzDailyBarReader, optional + daily_reader : BcolzDailyBarReader, optional The daily bar reader for equities. This will be used to service daily data backtests or daily history calls in a minute backetest. If a daily bar reader is not provided but a minute bar reader is, the minutes will be rolled up to serve the daily requests. - equity_minute_reader : BcolzMinuteBarReader, optional + five_minute_reader : BcolzFiveMinuteBarReader, optional + The five minute bar reader for equities. This will be used to service + 5-minute data backtests or five-minute history calls. This can be used + to serve daily calls if no daily bar reader is provided. + minute_reader : BcolzMinuteBarReader, optional The minute bar reader for equities. This will be used to service minute data backtests or minute history calls. This can be used to serve daily calls if no daily bar reader is provided. @@ -144,8 +149,9 @@ class DataPortal(object): asset_finder, trading_calendar, first_trading_day, - equity_daily_reader=None, - equity_minute_reader=None, + daily_reader=None, + five_minute_reader=None, + minute_reader=None, future_daily_reader=None, future_minute_reader=None, adjustment_reader=None, @@ -180,7 +186,7 @@ class DataPortal(object): # Infer the last session from the provided readers. last_sessions = [ reader.last_available_dt - for reader in [equity_daily_reader, future_daily_reader] + for reader in [daily_reader, future_daily_reader] if reader is not None ] if last_sessions: @@ -194,7 +200,11 @@ class DataPortal(object): # Infer the last minute from the provided readers. last_minutes = [ reader.last_available_dt - for reader in [equity_minute_reader, future_minute_reader] + for reader in [ + minute_reader, + five_minute_reader, + future_minute_reader, + ] if reader is not None ] if last_minutes: @@ -202,10 +212,12 @@ class DataPortal(object): else: self._last_available_minute = None - aligned_equity_minute_reader = self._ensure_reader_aligned( - equity_minute_reader) - aligned_equity_session_reader = self._ensure_reader_aligned( - equity_daily_reader) + aligned_minute_reader = self._ensure_reader_aligned( + minute_reader) + aligned_five_minute_reader = self._ensure_reader_aligned( + five_minute_reader) + aligned_session_reader = self._ensure_reader_aligned( + daily_reader) aligned_future_minute_reader = self._ensure_reader_aligned( future_minute_reader) aligned_future_session_reader = self._ensure_reader_aligned( @@ -217,12 +229,15 @@ class DataPortal(object): } aligned_minute_readers = {} + aligned_five_minute_readers = {} aligned_session_readers = {} - if aligned_equity_minute_reader is not None: - aligned_minute_readers[Equity] = aligned_equity_minute_reader - if aligned_equity_session_reader is not None: - aligned_session_readers[Equity] = aligned_equity_session_reader + if aligned_minute_reader is not None: + aligned_minute_readers[Equity] = aligned_minute_reader + if aligned_five_minute_reader is not None: + aligned_five_minute_readers[Equity] = aligned_five_minute_reader + if aligned_session_reader is not None: + aligned_session_readers[Equity] = aligned_session_reader if aligned_future_minute_reader is not None: aligned_minute_readers[Future] = aligned_future_minute_reader @@ -252,6 +267,13 @@ class DataPortal(object): self._last_available_minute, ) + _dispatch_five_minute_reader = AssetDispatchFiveMinuteBarReader( + self.trading_calendar, + self.asset_finder, + aligned_five_minute_readers, + self._last_available_minute, + ) + _dispatch_session_reader = AssetDispatchSessionBarReader( self.trading_calendar, self.asset_finder, @@ -261,6 +283,7 @@ class DataPortal(object): self._pricing_readers = { 'minute': _dispatch_minute_reader, + '5-minute': _dispatch_five_minute_reader, 'daily': _dispatch_session_reader, } @@ -514,15 +537,17 @@ class DataPortal(object): ) else: if field == "last_traded": - return self.get_last_traded_dt(asset, dt, 'minute') + return self.get_last_traded_dt(asset, dt, data_frequency) elif field == "price": - return self._get_minute_spot_value( - asset, "close", dt, ffill=True, + return self._get_minutely_spot_value( + asset, "close", dt, data_frequency, ffill=True, ) elif field == "contract": return self._get_current_contract(asset, dt) else: - return self._get_minute_spot_value(asset, field, dt) + return self._get_minutely_spot_value( + asset, field, dt, data_frequency, + ) if assets_is_scalar: return get_single_asset_value(assets) @@ -648,8 +673,14 @@ class DataPortal(object): return spot_value - def _get_minute_spot_value(self, asset, column, dt, ffill=False): - reader = self._get_pricing_reader('minute') + def _get_minutely_spot_value(self, + asset, + column, + dt, + data_frequency, + ffill=False): + + reader = self._get_pricing_reader(data_frequency) if ffill: # If forward filling, we want the last minute with values (up to @@ -680,8 +711,32 @@ class DataPortal(object): # the value we found came from a different day, so we have to adjust # the data if there are any adjustments on that day barrier return self.get_adjusted_value( - asset, column, query_dt, - dt, "minute", spot_value=result + asset, + column, + query_dt, + dt, + data_frequency, + spot_value=result + ) + + + def _get_five_minute_spot_value(self, asset, column, dt, ffill=False): + return self._get_minutely_spot_value( + asset, + column, + dt, + ffill, + '5-minute', + ) + + + def _get_minute_spot_value(self, asset, column, dt, ffill=False): + return self._get_minutely_spot_value( + asset, + column, + dt, + ffill, + 'minute', ) def _get_daily_spot_value(self, asset, column, dt): diff --git a/catalyst/data/dispatch_bar_reader.py b/catalyst/data/dispatch_bar_reader.py index 42770e55..e545eef0 100644 --- a/catalyst/data/dispatch_bar_reader.py +++ b/catalyst/data/dispatch_bar_reader.py @@ -130,13 +130,17 @@ class AssetDispatchBarReader(with_metaclass(ABCMeta)): return results - class AssetDispatchMinuteBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): return len(self.trading_calendar.minutes_in_range(start_dt, end_dt)) +class AssetDispatchFiveMinuteBarReader(AssetDispatchBarReader): + + def _dt_window_size(self, start_dt, end_dt): + return len(self.trading_calendar.five_minutes_in_range(start_dt, end_dt)) + class AssetDispatchSessionBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): diff --git a/catalyst/data/five_minute_bars.py b/catalyst/data/five_minute_bars.py index aa592740..9021dc0a 100644 --- a/catalyst/data/five_minute_bars.py +++ b/catalyst/data/five_minute_bars.py @@ -35,9 +35,9 @@ from six import with_metaclass from toolz import keymap, valmap from catalyst.data._minute_bar_internal import ( - minute_value, - find_position_of_minute, - find_last_traded_position_internal + five_minute_value, + find_position_of_five_minute, + find_last_traded_five_minute_position_internal, ) from catalyst.gens.sim_engine import NANOS_IN_MINUTE @@ -48,15 +48,16 @@ from catalyst.data.us_equity_pricing import ( check_uint64_safe, ) from catalyst.utils.calendars import get_calendar -from catalyst.utils.cli import maybe_show_progress +from catalyst.utils.cli import ( + item_show_count, + maybe_show_progress, +) from catalyst.utils.memoize import lazyval logger = logbook.Logger('FiveMinuteBars') OPEN_FIVE_MINUTES_PER_DAY = 288 -US_EQUITIES_MINUTES_PER_DAY = 390 -DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15 DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15 OHLC_RATIO = 1000000 @@ -66,6 +67,8 @@ OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume']) UINT64_MAX = iinfo(uint64).max +NANOS_IN_FIVE_MINUTES = 5 * NANOS_IN_MINUTE + class BcolzFiveMinuteOverlappingData(Exception): pass @@ -83,7 +86,7 @@ class FiveMinuteBarReader(BarReader): def _calc_five_minute_index(market_opens, five_minutes_per_day): five_minutes = np.zeros(len(market_opens) * five_minutes_per_day, dtype='datetime64[ns]') - deltas = np.arange(0, five_minutes_per_day, dtype='timedelta64[m]') + deltas = 5 * np.arange(0, five_minutes_per_day, dtype='timedelta64[m]') for i, market_open in enumerate(market_opens): start = market_open.asm8 five_minute_values = start + deltas @@ -211,7 +214,7 @@ class BcolzFiveMinuteBarMetadata(object): """ FORMAT_VERSION = 3 - METADATA_FILENAME = 'metadata.json' + METADATA_FILENAME = 'five-minute-metadata.json' @classmethod def metadata_path(cls, rootdir): @@ -268,7 +271,7 @@ class BcolzFiveMinuteBarMetadata(object): calendar, start_session, end_session, - minutes_per_day, + five_minutes_per_day, version=version, ) @@ -345,7 +348,7 @@ class BcolzFiveMinuteBarMetadata(object): 'version': self.version, 'ohlc_ratio': self.default_ohlc_ratio, 'ohlc_ratios_per_sid': self.ohlc_ratios_per_sid, - 'minutes_per_day': self.five_minutes_per_day, + 'five_minutes_per_day': self.five_minutes_per_day, 'calendar_name': self.calendar.name, 'start_session': str(self.start_session.date()), 'end_session': str(self.end_session.date()), @@ -461,7 +464,7 @@ class BcolzFiveMinuteBarWriter(object): five_minutes_per_day, default_ohlc_ratio=OHLC_RATIO, ohlc_ratios_per_sid=None, - expectedlen=DEFAULT_EXPECTED_CRYPTO_LEN, + expectedlen=DEFAULT_EXPECTEDLEN_CRYPTO, write_metadata=True): self._rootdir = rootdir @@ -477,11 +480,11 @@ class BcolzFiveMinuteBarWriter(object): self._default_ohlc_ratio = default_ohlc_ratio self._ohlc_ratios_per_sid = ohlc_ratios_per_sid - self._minute_index = _calc_minute_index( - self._schedule.market_open, self._minutes_per_day) + self._five_minute_index = _calc_five_minute_index( + self._schedule.market_open, self._five_minutes_per_day) if write_metadata: - metadata = BcolzMinuteBarMetadata( + metadata = BcolzFiveMinuteBarMetadata( self._default_ohlc_ratio, self._ohlc_ratios_per_sid, self._calendar, @@ -678,7 +681,11 @@ class BcolzFiveMinuteBarWriter(object): for k, v in kwargs.items(): table.attrs[k] = v - def write(self, data, show_progress=False, invalid_data_behavior='warn'): + def write(self, + data, + length=None, + show_progress=False, + invalid_data_behavior='warn'): """Write a stream of minute data. Parameters @@ -698,14 +705,15 @@ class BcolzFiveMinuteBarWriter(object): show_progress : bool, optional Whether or not to show a progress bar while writing. """ - ctx = maybe_show_progress( + with maybe_show_progress( data, + length=length, + show_percent=False, show_progress=show_progress, - item_show_func=lambda e: e if e is None else str(e[0]), - label="Merging minute equity files:", - ) - write_sid = self.write_sid - with ctx as it: + item_show_func=item_show_count(length), + label='Compiling five-minute data', + ) as it: + write_sid = self.write_sid for e in it: write_sid(*e, invalid_data_behavior=invalid_data_behavior) @@ -807,10 +815,13 @@ class BcolzFiveMinuteBarWriter(object): # Get the number of minutes already recorded in this sid's ctable num_rec_mins = table.size - all_minutes = self._minute_index + all_minutes = self._five_minute_index # Get the latest minute we wish to write to the ctable last_minute_to_write = pd.Timestamp(dts[-1], tz='UTC') + #print 'all_minutes[-1]:', all_minutes[num_rec_mins-1] + #print 'last_minute_to_write:', last_minute_to_write + # In the event that we've already written some minutely data to the # ctable, guard against overwriting that data. if num_rec_mins > 0: @@ -864,7 +875,7 @@ class BcolzFiveMinuteBarWriter(object): day_ix = self._session_labels.get_loc(day) # Add one to the 0-indexed day_ix to get the number of days. num_days = day_ix + 1 - return num_days * self._minutes_per_day + return num_days * self._five_minutes_per_day def truncate(self, date): """Truncate data beyond this date in all ctables.""" @@ -1002,7 +1013,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): market_closes = self._market_closes.values.astype('datetime64[m]') minutes_per_day = (market_closes - market_opens).astype(np.int64) / 5 early_indices = np.where( - minutes_per_day != self._minutes_per_day - 1)[0] + minutes_per_day != self._five_minutes_per_day - 1)[0] early_opens = self._market_opens[early_indices] early_closes = self._market_closes[early_indices] minutes = [(market_open, early_close) @@ -1030,9 +1041,9 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): """ itree = IntervalTree() for market_open, early_close in self._minutes_to_exclude(): - start_pos = self._find_position_of_minute(early_close) + 1 + start_pos = self._find_position_of_five_minute(early_close) + 1 end_pos = ( - self._find_position_of_minute(market_open) + self._find_position_of_five_minute(market_open) + self._five_minutes_per_day - @@ -1121,7 +1132,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): minute_pos = self._last_get_value_dt_position else: try: - minute_pos = self._find_position_of_minute(dt) + minute_pos = self._find_position_of_five_minute(dt) except ValueError: raise NoDataOnDate() @@ -1143,15 +1154,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): return value def get_last_traded_dt(self, asset, dt): - minute_pos = self._find_last_traded_position(asset, dt) + minute_pos = self._find_last_traded_five_minute_position(asset, dt) if minute_pos == -1: return pd.NaT return self._pos_to_minute(minute_pos) - def _find_last_traded_position(self, asset, dt): + def _find_last_traded_five_minute_position(self, asset, dt): volumes = self._open_minute_file('volume', asset) - start_date_minute = asset.start_date.value / NANOS_IN_MINUTE - dt_minute = dt.value / NANOS_IN_MINUTE + start_date_minute = asset.start_date.value / NANOS_IN_FIVE_MINUTE + dt_minute = dt.value / NANOS_IN_FIVE_MINUTE try: # if we know of a dt before which this asset has no volume, @@ -1163,13 +1174,13 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): if dt_minute < earliest_dt_to_search: return -1 - pos = find_last_traded_position_internal( + pos = find_last_traded_five_minute_position_internal( self._market_open_values, self._market_close_values, dt_minute, earliest_dt_to_search, volumes, - self._minutes_per_day, + self._five_minutes_per_day, ) if pos == -1: @@ -1186,15 +1197,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): return pos def _pos_to_minute(self, pos): - minute_epoch = minute_value( + minute_epoch = five_minute_value( self._market_open_values, pos, - self._minutes_per_day + self._five_minutes_per_day ) return pd.Timestamp(minute_epoch, tz='UTC', unit="m") - def _find_position_of_minute(self, minute_dt): + def _find_position_of_five_minute(self, minute_dt): """ Internal method that returns the position of the given minute in the list of every trading minute since market open of the first trading @@ -1213,11 +1224,11 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): int: The position of the given minute in the list of all trading minutes since market open on the first trading day. """ - return find_position_of_minute( + return find_position_of_five_minute( self._market_open_values, self._market_close_values, - minute_dt.value / NANOS_IN_MINUTE, - self._minutes_per_day, + minute_dt.value / NANOS_IN_FIVE_MINUTE, + self._five_minutes_per_day, False, ) @@ -1241,8 +1252,8 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): (minutes in range, sids) with a dtype of float64, containing the values for the respective field over start and end dt range. """ - start_idx = self._find_position_of_minute(start_dt) - end_idx = self._find_position_of_minute(end_dt) + start_idx = self._find_position_of_five_minute(start_dt) + end_idx = self._find_position_of_five_minute(end_dt) num_minutes = (end_idx - start_idx + 1) @@ -1261,7 +1272,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): if field != 'volume': out = np.full(shape, np.nan) else: - out = np.zeros(shape, dtype=int64) + out = np.zeros(shape, dtype=uint64) for i, sid in enumerate(sids): carray = self._open_minute_file(field, sid) diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index 0ba94e89..4a8426e7 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -33,7 +33,7 @@ from ..utils.paths import ( ) from ..utils.deprecate import deprecated -from catalyst.curate.poloniex import PoloniexCurator +from catalyst.data.bundles.poloniex import PoloniexBundle from catalyst.utils.calendars import get_calendar @@ -232,11 +232,14 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY', treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)] return benchmark_returns, treasury_curves -def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, - trading_day, environ=None): +def ensure_crypto_benchmark_data(symbol, + first_date, + last_date, + now, + trading_day, + environ=None): + filename = get_benchmark_filename(symbol) - source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\ - format(symbol) logger.info( ('Loading benchmark data for {symbol!r} ' @@ -269,92 +272,23 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, last_date=last_date ) - def dateparse(time_in_secs): - return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc) - - def compute_daily_bars(five_min_bars, schedule): - # filter and copy the entry at the beginning of each session - daily_bars = five_min_bars[ - five_min_bars.index.isin(schedule) - ].copy() - - day_offset = pd.Timedelta(days=1) - - # iterate through session starts doing: - # 1. filter five_min_bars to get all entries in one day - # 2. compute daily bar entry - # 3. record in rid-th row of daily_bars - for rid, start_date in enumerate(daily_bars.index): - # compute beginning of next session - end_date = start_date + day_offset - - # filter for entries session entries - day_data = five_min_bars[ - (five_min_bars.index >= start_date) & - (five_min_bars.index < end_date) - ] - - # compute and record daily bar - daily_bars.iloc[rid] = ( - day_data.open.iloc[0], # first open price - day_data.high.max(), # max of high prices - day_data.low.min(), # min of low prices - day_data.close.iloc[-1], # last close prices - day_data.volume.sum(), # sum of all volumes - ) - - # scale to allow trading 10-ths of a coin - scale = 10.0 - daily_bars.loc[:, 'open'] /= scale - daily_bars.loc[:, 'high'] /= scale - daily_bars.loc[:, 'low'] /= scale - daily_bars.loc[:, 'close'] /= scale - daily_bars.loc[:, 'volume'] *= scale - - return daily_bars - - - five_min_bars = None + # Load benchmark symbol from Poloniex API try: - # load five minute bars from csv cache - five_min_bars = pd.read_csv( - source_filename, - names=['date', 'open', 'high', 'low', 'close', 'volume'], - index_col=[0], - parse_dates=True, - date_parser=dateparse, + bundle = PoloniexBundle() + bench_raw = bundle._fetch_symbol_frame( + None, + symbol, + get_calendar(bundle.calendar_name), + first_date, + last_date, + 'daily', ) - five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s') - except (OSError, IOError): - # Otherwise load from Poloniex API - try: - pc = PoloniexCurator() - pc.append_data_single_pair(symbol) - - five_min_bars = pc.to_dataframe( - time.mktime(first_date.timetuple()), - time.mktime(last_date.timetuple()), - currencyPair=symbol, - ) - except (OSError, IOError, HTTPError): - logger.exception('Failed to new crypto benchmark returns') - raise - - # compute daily bars for open calendar - open_calendar = get_calendar('OPEN') - daily_bars = compute_daily_bars( - five_min_bars, - open_calendar.all_sessions, - ) - - # filter daily bars to include first_date and last_date - daily_bars = daily_bars[ - (daily_bars.index >= (first_date - trading_day)) & - (daily_bars.index <= last_date) - ] + except (OSError, IOError, HTTPError): + logger.exception('Failed to fetch new crypto benchmark returns') + raise # select close column and compute percent change between days - daily_close = daily_bars[['close']] + daily_close = bench_raw[['close']] daily_close = daily_close.pct_change(1).iloc[1:] try: diff --git a/catalyst/data/us_equity_pricing.py b/catalyst/data/us_equity_pricing.py index b2244218..03d01a4e 100644 --- a/catalyst/data/us_equity_pricing.py +++ b/catalyst/data/us_equity_pricing.py @@ -229,7 +229,7 @@ class BcolzDailyBarWriter(object): @property def progress_bar_message(self): - return 'Compiling daily pricing data' + return 'Compiling daily data' def write(self, data, diff --git a/catalyst/examples/buy_and_hodl.py b/catalyst/examples/buy_and_hodl.py index c3b832f1..57b7f19e 100644 --- a/catalyst/examples/buy_and_hodl.py +++ b/catalyst/examples/buy_and_hodl.py @@ -25,7 +25,7 @@ from catalyst.api import ( def initialize(context): - context.ASSET_NAME = 'USDT_ETH' + context.ASSET_NAME = 'USDT_BTC' context.TARGET_HODL_RATIO = 0.8 context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO @@ -37,7 +37,13 @@ def initialize(context): context.is_buying = True context.asset = symbol(context.ASSET_NAME) + context.i = 0 + def handle_data(context, data): + context.i += 1 + + print 'i:', context.i + starting_cash = context.portfolio.starting_cash target_hodl_value = context.TARGET_HODL_RATIO * starting_cash reserve_value = context.RESERVE_RATIO * starting_cash diff --git a/catalyst/examples/dual_vwap.py b/catalyst/examples/dual_vwap.py index 81155ba1..52c1789d 100644 --- a/catalyst/examples/dual_vwap.py +++ b/catalyst/examples/dual_vwap.py @@ -52,7 +52,7 @@ def initialize(context): schedule_function( rebalance, - date_rules.every_day(), + time_rules=times_rules.every_minute(), ) diff --git a/catalyst/finance/performance/tracker.py b/catalyst/finance/performance/tracker.py index 6b84431e..810f2c32 100644 --- a/catalyst/finance/performance/tracker.py +++ b/catalyst/finance/performance/tracker.py @@ -111,6 +111,21 @@ class PerformanceTracker(object): self.treasury_curves, self.trading_calendar ) + elif self.emission_rate == '5-minute': + self.all_benchmark_returns = pd.Series( + index=pd.date_range( + self.sim_params.first_open, + self.sim_params.last_close, + freq='5min' + ), + ) + self.cumulative_risk_metrics = \ + risk.RiskMetricsCumulative( + self.sim_params, + self.treasury_curves, + self.trading_calendar, + create_first_day_stats=True, + ) elif self.emission_rate == 'minute': self.all_benchmark_returns = pd.Series(index=pd.date_range( self.sim_params.first_open, self.sim_params.last_close, diff --git a/catalyst/gens/sim_engine.pyx b/catalyst/gens/sim_engine.pyx index 3eddda84..a318f292 100644 --- a/catalyst/gens/sim_engine.pyx +++ b/catalyst/gens/sim_engine.pyx @@ -131,7 +131,7 @@ cdef class FiveMinuteSimulationClock(MinuteSimulationClock): for session_idx, session_nano in enumerate(self.sessions_nanos): five_minutes_nanos = np.arange( self.market_opens_nanos[session_idx], - self.market_closes_nanos[session_idx] + _nanos_in_five_minutes, + self.market_closes_nanos[session_idx], _nanos_in_five_minutes ) five_minutes_by_session[session_nano] = pd.to_datetime( diff --git a/catalyst/pipeline/loaders/crypto_pricing_loader.py b/catalyst/pipeline/loaders/crypto_pricing_loader.py index f8676cce..a3ec1114 100644 --- a/catalyst/pipeline/loaders/crypto_pricing_loader.py +++ b/catalyst/pipeline/loaders/crypto_pricing_loader.py @@ -33,13 +33,30 @@ class CryptoPricingLoader(PipelineLoader): Delegates loading of baselines and adjustments. """ - def __init__(self, raw_price_loader, dataset): - self.raw_price_loader = raw_price_loader - self._columns = dataset.columns + def __init__(self, bundle, data_frequency, dataset): cal = get_calendar('OPEN') - self._all_sessions = cal.all_sessions + if data_frequency == 'daily': + reader = bundle.daily_bar_reader + all_sessions = cal.all_sessions + + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + all_sessions = cal.all_five_minutes + + elif daily_bar_reader == 'minute': + reader = bundle.minute_bar_reader + all_sessions = cal.all_minutes + + else: + raise ValueError( + 'Invalid data frequency: {}'.format(data_frequency) + ) + + self.raw_price_loader = reader + self._columns = dataset.columns + self._all_sessions = all_sessions @classmethod def from_files(cls, pricing_path): @@ -89,6 +106,13 @@ class CryptoPricingLoader(PipelineLoader): def _shift_dates(dates, start_date, end_date, shift): + print 'dates.head:\n', dates[:10] + print 'dates.tail:\n', dates[:-10] + + print 'start_date:', start_date + print 'end_date:', end_date + print 'shift:', shift + try: start = dates.get_loc(start_date) except KeyError: diff --git a/catalyst/pipeline/loaders/equity_pricing_loader.py b/catalyst/pipeline/loaders/equity_pricing_loader.py index 99c8ae00..4d9ec055 100644 --- a/catalyst/pipeline/loaders/equity_pricing_loader.py +++ b/catalyst/pipeline/loaders/equity_pricing_loader.py @@ -36,15 +36,34 @@ class USEquityPricingLoader(PipelineLoader): Delegates loading of baselines and adjustments. """ - def __init__(self, raw_price_loader, adjustments_loader, dataset): - self.raw_price_loader = raw_price_loader - self.adjustments_loader = adjustments_loader + def __init__(self, bundle, data_frequency, dataset): + + if data_frequency == 'daily': + reader = bundle.daily_bar_reader + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + elif daily_bar_reader == 'minute': + reader = bundle.minute_bar_reader + else: + raise ValueError( + 'Invalid data frequency: {}'.format(data_frequency) + ) + + cal = reader.trading_calendar or get_calendar('NYSE') + + if data_frequency == 'daily': + all_sessions = cal.all_sessions + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + all_sessions = cal.all_five_minutes + elif daily_bar_reader == 'minute': + reader = bundle.minute_bar_reader + all_sessions = cal.all_minutes + + self.raw_price_loader = reader + self.adjustments_loader = bundle.adjustments_loader self._columns = dataset.columns - - cal = self.raw_price_loader.trading_calendar or \ - get_calendar("NYSE") - - self._all_sessions = cal.all_sessions + self._all_sessions = all_sessions @classmethod def from_files(cls, pricing_path, adjustments_path): diff --git a/catalyst/sources/benchmark_source.py b/catalyst/sources/benchmark_source.py index 05d5c601..b5a95eee 100644 --- a/catalyst/sources/benchmark_source.py +++ b/catalyst/sources/benchmark_source.py @@ -51,7 +51,10 @@ class BenchmarkSource(object): elif benchmark_returns is not None: daily_series = benchmark_returns[sessions[0]:sessions[-1]] + print 'BENCHMARK_RETURNS' + if self.emission_rate == "minute": + print 'BENCHMARK_RETURNS minute' # we need to take the env's benchmark returns, which are daily, # and resample them to minute minutes = trading_calendar.minutes_for_sessions_in_range( @@ -65,7 +68,22 @@ class BenchmarkSource(object): ) self._precalculated_series = minute_series + elif self.emission_rate == '5-minute': + print 'BENCHMARK_RETURNS 5-minute' + five_minutes = \ + trading_calendar.five_minutes_for_sessions_in_range( + sessions[0], + sessions[-1], + ) + + five_minute_series = daily_series.reindex( + index=five_minutes, + method='ffill', + ) + + self._precalculated_series = five_minute_series else: + print 'BENCHMARK_RETURNS daily' self._precalculated_series = daily_series else: raise Exception("Must provide either benchmark_asset or " @@ -155,8 +173,24 @@ class BenchmarkSource(object): ffill=True )[asset] + return benchmark_series.pct_change()[1:] + elif self.emission_rate == '5-minute': + five_minutes = trading_calendar.five_minutes_for_sessions_in_range( + self.sessions[0], self.sessions[-1] + ) + benchmark_series = data_portal.get_history_window( + [asset], + five_minutes[-1], + bar_count=len(five_minutes) + 1, + frequency='5m', + field='price', + data_frequency=self.emission_rate, + ffill=True, + )[asset] + return benchmark_series.pct_change()[1:] else: + print '----------------------------------------' start_date = asset.start_date if start_date < trading_days[0]: # get the window of close prices for benchmark_asset from the diff --git a/catalyst/utils/calendars/trading_calendar.py b/catalyst/utils/calendars/trading_calendar.py index 29a16f97..a1489067 100644 --- a/catalyst/utils/calendars/trading_calendar.py +++ b/catalyst/utils/calendars/trading_calendar.py @@ -117,6 +117,9 @@ class TradingCalendar(with_metaclass(ABCMeta)): self._trading_minutes_nanos = self.all_minutes.values.\ astype(np.int64) + + self._trading_five_minutes_nanos = self.all_five_minutes.values.\ + astype(np.int64) self.first_trading_session = _all_days[0] self.last_trading_session = _all_days[-1] @@ -179,6 +182,18 @@ class TradingCalendar(with_metaclass(ABCMeta)): """ return int(self._minutes_per_session[start_session:end_session].sum()) + @lazyval + def _five_minutes_per_session(self): + diff = self.schedule.market_close - self.schedule.market_open + diff = diff.astype('timedelta64[m]') + return (diff + 1) // 5 + + def five_minutes_count_for_sessions_in_range(self, + start_session, + end_session): + five_mins = self._five_minutes_per_session[start_session:end_session] + return int(five_mins.sum()) + @property def regular_holidays(self): """ @@ -371,6 +386,10 @@ class TradingCalendar(with_metaclass(ABCMeta)): idx = next_divider_idx(self._trading_minutes_nanos, dt.value) return self.all_minutes[idx] + def next_five_minute(self, dt): + idx = next_divider_idx(self._trading_five_minutes_nanos, dt.values) + return self.all_five_mintutes[idx] + def previous_minute(self, dt): """ Given a dt, return the previous exchange minute. @@ -465,6 +484,12 @@ class TradingCalendar(with_metaclass(ABCMeta)): end_minute=self.schedule.at[session_label, 'market_close'], ) + def five_minutes_for_session(self, session_label): + return self.five_minutes_in_range( + start_five_minute=self.schedule.at[session_label, 'market_open'], + end_five_minute=self.schedule.at[session_label, 'market_close'], + ) + def minutes_window(self, start_dt, count): start_dt_nanos = start_dt.value all_minutes_nanos = self._trading_minutes_nanos @@ -566,6 +591,20 @@ class TradingCalendar(with_metaclass(ABCMeta)): return abs(end_idx - start_idx) + def five_minutes_in_range(self, start_five_minute, end_five_minute): + start_idx = searchsorted(self._trading_five_minutes_nanos, + start_five_minute.value) + + end_idx = searchsorted(self._trading_five_minutes_nanos, + end_five_minute.value) + + if end_five_minute.value == self._trading_five_minutes_nanos[end_idx]: + # if the end minute is a market minute, increase by 1 + end_idx += 1 + + return self.all_five_minutes[start_idx:end_idx] + + def minutes_in_range(self, start_minute, end_minute): """ Given start and end minutes, return all the calendar minutes @@ -623,6 +662,15 @@ class TradingCalendar(with_metaclass(ABCMeta)): return self.minutes_in_range(first_minute, last_minute) + def five_minutes_for_sessions_in_range(self, + start_session_label, + end_session_label): + + first_minute, _ = self.open_and_close_for_session(start_session_label) + _, last_minute = self.open_and_close_for_session(end_session_label) + + return self.five_minutes_in_range(first_minute, last_minute) + def open_and_close_for_session(self, session_label): """ Returns a tuple of timestamps of the open and close of the session @@ -690,8 +738,7 @@ class TradingCalendar(with_metaclass(ABCMeta)): def execution_time_from_close(self, close_dates): return close_dates - @lazyval - def all_minutes(self): + def _all_minutes_with_interval(self, interval): """ Returns a DatetimeIndex representing all the minutes in this calendar. """ @@ -703,8 +750,10 @@ class TradingCalendar(with_metaclass(ABCMeta)): deltas = closes_in_ns - opens_in_ns + nanos_in_interval = interval * NANOS_IN_MINUTE + # + 1 because we want 390 days per standard day, not 389 - daily_sizes = (deltas / NANOS_IN_MINUTE) + 1 + daily_sizes = (deltas / nanos_in_interval) + 1 num_minutes = np.sum(daily_sizes).astype(np.int64) # One allocation for the entire thing. This assumes that each day @@ -721,13 +770,27 @@ class TradingCalendar(with_metaclass(ABCMeta)): np.arange( opens_in_ns[day_idx], closes_in_ns[day_idx] + NANOS_IN_MINUTE, - NANOS_IN_MINUTE + nanos_in_interval ) idx += size_int return DatetimeIndex(all_minutes).tz_localize("UTC") + @lazyval + def all_five_minutes(self): + """ + Returns a DatetimeIndex representing all the five minutes in this calendar. + """ + return self._all_minutes_with_interval(5) + + @lazyval + def all_minutes(self): + """ + Returns a DatetimeIndex representing all the minutes in this calendar. + """ + return self._all_minutes_with_interval(1) + @preprocess(dt=coerce(pd.Timestamp, attrgetter('value'))) def minute_to_session_label(self, dt, direction="next"): """ diff --git a/catalyst/utils/cli.py b/catalyst/utils/cli.py index 6e604838..51e519e0 100644 --- a/catalyst/utils/cli.py +++ b/catalyst/utils/cli.py @@ -17,6 +17,7 @@ def item_show_count(total=None): def item_show_func(item, _it=iter(count())): if item is not None: + starting = False return maybe_show_total(next(_it)) return 'DONE' diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index d4b75207..d63c314a 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -156,15 +156,15 @@ def _run(handle_data, environ=environ, ) - first_trading_day =\ - bundle_data.equity_minute_bar_reader.first_trading_day + first_trading_day = bundle_data.minute_bar_reader.first_trading_day data = DataPortal( env.asset_finder, open_calendar, first_trading_day=first_trading_day, - equity_minute_reader=bundle_data.equity_minute_bar_reader, - equity_daily_reader=bundle_data.equity_daily_bar_reader, + minute_reader=bundle_data.minute_bar_reader, + five_minute_reader=bundle_data.five_minute_bar_reader, + daily_reader=bundle_data.daily_bar_reader, adjustment_reader=bundle_data.adjustment_reader, ) @@ -179,13 +179,14 @@ def _run(handle_data, if b == 'poloniex': return CryptoPricingLoader( - bundle_data.equity_daily_bar_reader, + bundle_data, + data_frequency, CryptoPricing, ) - elif b == 'quantopian-quandl': + elif b == 'quandl': return USEquityPricingLoader( - bundle_data.equity_daily_bar_reader, - bundle_data.adjustment_reader, + bundle_data, + data_frequency, USEquityPricing, ) raise ValueError( @@ -216,6 +217,7 @@ def _run(handle_data, end=end, capital_base=capital_base, data_frequency=data_frequency, + emission_rate=data_frequency, ), **{ 'initialize': initialize,