diff --git a/catalyst/__main__.py b/catalyst/__main__.py index f57a52b4..fce7fa55 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -307,7 +307,7 @@ def catalyst_magic(line, cell=None): '%s%%catalyst' % ((cell or '') and '%'), # don't use system exit and propogate errors to the caller standalone_mode=False, - ) + ) except SystemExit as e: # https://github.com/mitsuhiko/click/pull/533 # even in standalone_mode=False `--help` really wants to kill us ;_; @@ -471,27 +471,38 @@ def live(ctx, type=Date(tz='utc', as_timestamp=True), help='The end date of the data range. (default: today)', ) +@click.option( + '--include-symbols', + default=None, + help='A list of symbols to ingest (optional comma separated list)', +) +@click.option( + '--exclude-symbols', + default=None, + help='A list of symbols to exclude from the ingestion ' + '(optional comma separated list)', +) @click.option( '--show-progress/--no-show-progress', default=True, help='Print progress information to the terminal.' ) def ingest_exchange(exchange_name, data_frequency, start, end, - show_progress): + include_symbols, exclude_symbols, show_progress): """ Ingest data for the given exchange. """ + exchange_bundle = ExchangeBundle(exchange_name) + click.echo('ingesting exchange bundle {}'.format(exchange_name)) - exchange_bundle = ExchangeBundle( - exchange_name=exchange_name, + exchange_bundle.ingest( data_frequency=data_frequency, - include_symbols=None, - exclude_symbols=None, + include_symbols=include_symbols, + exclude_symbols=exclude_symbols, start=start, end=end, show_progress=show_progress ) - exchange_bundle.ingest() @main.command() diff --git a/catalyst/exchange/bundle_utils.py b/catalyst/exchange/bundle_utils.py index eca43e6c..e4d2d12e 100644 --- a/catalyst/exchange/bundle_utils.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,14 +1,9 @@ import datetime -from logging import Logger, DEBUG import os -from dateutil.relativedelta import relativedelta -import pandas as pd +from logging import Logger -from catalyst import get_calendar -from catalyst.data.minute_bars import BcolzMinuteBarWriter -from catalyst.data.us_equity_pricing import BcolzDailyBarWriter -from catalyst.exchange.exchange_utils import get_exchange_folder -from catalyst.utils.paths import data_root, ensure_directory +from catalyst.data.bundles import from_bundle_ingest_dirname +from catalyst.utils.paths import data_path log = Logger('test_exchange_bundle') @@ -95,3 +90,33 @@ def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count): end_dt=end_dt ) return candles + +def find_most_recent_time(bundle_name): + """ + Find most recent "time folder" for a given bundle. + + :param bundle_name: + The name of the targeted bundle. + + :return folder: + The name of the time folder. + """ + try: + bundle_folders = os.listdir( + data_path([bundle_name]), + ) + except OSError: + return None + + most_recent_bundle = dict() + for folder in bundle_folders: + date = from_bundle_ingest_dirname(folder) + if not most_recent_bundle or date > \ + most_recent_bundle[most_recent_bundle.keys()[0]]: + most_recent_bundle = dict() + most_recent_bundle[folder] = date + + if most_recent_bundle: + return most_recent_bundle.keys()[0] + else: + return None diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py index d2191920..6d124b9f 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/data_portal_exchange.py @@ -12,24 +12,19 @@ # limitations under the License. import abc -import os from time import sleep import pandas as pd from catalyst.assets._assets import TradingPair from logbook import Logger -from catalyst.data.bundles.core import from_bundle_ingest_dirname, \ - minute_path, daily_path from catalyst.data.data_portal import DataPortal -from catalyst.data.minute_bars import BcolzMinuteBarReader -from catalyst.data.us_equity_pricing import BcolzDailyBarReader +from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.exchange.exchange_errors import ( ExchangeRequestError, ExchangeBarDataError, - BundleNotFoundError, PricingDataBeforeTradingError, + PricingDataBeforeTradingError, PricingDataNotLoadedError, InvalidHistoryFrequencyError) -from catalyst.utils.paths import data_path log = Logger('DataPortalExchange') @@ -259,34 +254,14 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): def __init__(self, *args, **kwargs): super(DataPortalExchangeBacktest, self).__init__(*args, **kwargs) - self.daily_bar_readers = dict() - self.minute_bar_readers = dict() + self.exchange_bundles = dict() self.history_loaders = dict() self.minute_history_loaders = dict() + for exchange_name in self.exchanges: - name = 'exchange_{}'.format(exchange_name) - time_folder = \ - DataPortalExchangeBacktest.find_most_recent_time(name) - - if time_folder is None: - raise BundleNotFoundError(exchange=exchange_name) - - try: - self.daily_bar_readers[exchange_name] = \ - BcolzDailyBarReader( - daily_path(name, time_folder), - ) - except IOError: - self.daily_bar_readers[exchange_name] = None - - try: - self.minute_bar_readers[exchange_name] = \ - BcolzMinuteBarReader( - minute_path(name, time_folder), - ) - except IOError: - self.minute_bar_readers[exchange_name] = None + self.exchange_bundles[exchange_name] = \ + ExchangeBundle(exchange_name) def _get_first_trading_day(self, assets): first_date = None @@ -295,62 +270,6 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): first_date = asset.start_date return first_date - @staticmethod - def find_most_recent_time(bundle_name): - """ - Find most recent "time folder" for a given bundle. - - :param bundle_name: - The name of the targeted bundle. - - :return folder: - The name of the time folder. - """ - try: - bundle_folders = os.listdir( - data_path([bundle_name]), - ) - except OSError: - return None - - most_recent_bundle = dict() - for folder in bundle_folders: - date = from_bundle_ingest_dirname(folder) - if not most_recent_bundle or date > \ - most_recent_bundle[most_recent_bundle.keys()[0]]: - most_recent_bundle = dict() - most_recent_bundle[folder] = date - - if most_recent_bundle: - return most_recent_bundle.keys()[0] - else: - return None - - def _get_reader(self, data_frequency, exchange_name): - """ - Pick from a collection of readers based of exchange name and frequency. - - :param data_frequency: - The reader frequency: minute, daily. - - :param exchange_name: - The exchange name. - - :return reader: - A reader object. - """ - if data_frequency == 'minute': - reader = self.minute_bar_readers[exchange_name] - elif data_frequency == 'daily': - reader = self.daily_bar_readers[exchange_name] - else: - raise InvalidHistoryFrequencyError(frequency=data_frequency) - - if reader is None: - raise ValueError('reader not found') - - return reader - def get_exchange_history_window(self, exchange, assets, @@ -360,7 +279,9 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): field, data_frequency, ffill=True): - reader = self._get_reader(data_frequency, exchange.name) + bundle = self.exchange_bundles[exchange.name] + + reader = bundle.get_reader(data_frequency) if data_frequency == 'minute': dts = self.trading_calendar.minutes_window( end_dt, -bar_count @@ -416,7 +337,8 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): def get_exchange_spot_value(self, exchange, assets, field, dt, data_frequency): - reader = self._get_reader(data_frequency, exchange.name) + bundle = self.exchange_bundles[exchange.name] + reader = bundle.get_reader(data_frequency) self.ensure_after_first_day(dt, assets) diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 890a7bbd..a63aca45 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -26,19 +26,12 @@ log = Logger('exchange_bundle') class ExchangeBundle: - def __init__(self, exchange_name, data_frequency, include_symbols=None, - exclude_symbols=None, start=None, end=None, - show_progress=True, environ=os.environ): + def __init__(self, exchange_name, ): self.exchange = get_exchange(exchange_name) - self.data_frequency = data_frequency - self.assets = self.get_assets(include_symbols, exclude_symbols) - self.start, self.end = self.get_adj_dates(start, end) - self.environ = environ - self.show_progress = show_progress self.minutes_per_day = 1440 self.default_ohlc_ratio = 1000000 - self._writer = None - self._reader = None + self._writers = dict() + self._readers = dict() def get_assets(self, include_symbols, exclude_symbols): # TODO: filter exclude symbols assets @@ -50,14 +43,14 @@ class ExchangeBundle: else: return self.exchange.get_assets() - def get_adj_dates(self, start, end): + def get_adj_dates(self, start, end, assets): now = pd.Timestamp.utcnow() if end > now: log.info('adjusting the end date to now {}'.format(now)) end = now earliest_trade = None - for asset in self.assets: + for asset in assets: if earliest_trade is None or earliest_trade > asset.start_date: earliest_trade = asset.start_date @@ -73,80 +66,90 @@ class ExchangeBundle: return start, end - @property - def reader(self): - if self._reader is not None: - return self._reader + def get_reader(self, data_frequency): + """ + Get a data writer object, either a new object or from cache + + :return: BcolzMinuteBarReader or BcolzDailyBarReader + """ + if data_frequency in self._readers: + return self._readers[data_frequency] root = get_exchange_folder(self.exchange.name) input_dir = BUNDLE_NAME_TEMPLATE.format( root=root, - frequency=self.data_frequency + frequency=data_frequency ) - if self.data_frequency == 'minute': + if data_frequency == 'minute': try: - self._reader = BcolzMinuteBarReader(input_dir) + self._readers[data_frequency] = BcolzMinuteBarReader(input_dir) except IOError: log.debug('no reader data found in {}'.format(input_dir)) - elif self.data_frequency == 'daily': + elif data_frequency == 'daily': try: - self._reader = BcolzDailyBarReader(input_dir) + self._readers[data_frequency] = BcolzDailyBarReader(input_dir) except IOError: log.debug('no reader data found in {}'.format(input_dir)) else: raise ValueError( - 'invalid frequency {}'.format(self.data_frequency) + 'invalid frequency {}'.format(data_frequency) ) - return self._reader + return self._readers[data_frequency] - @property - def writer(self): - if self._writer is not None: - return self._writer + def get_writer(self, data_frequency, start, end): + """ + Get a data writer object, either a new object or from cache + + :return: BcolzMinuteBarWriter or BcolzDailyBarWriter + """ + key = (data_frequency, start, end) + if key in self._writers: + return self._writers[key] open_calendar = get_calendar('OPEN') root = get_exchange_folder(self.exchange.name) output_dir = BUNDLE_NAME_TEMPLATE.format( root=root, - frequency=self.data_frequency + frequency=data_frequency ) ensure_directory(output_dir) - if self.data_frequency == 'minute': + if data_frequency == 'minute': if len(os.listdir(output_dir)) > 0: - self._writer = BcolzMinuteBarWriter.open(output_dir, self.end) + self._writers[key] = \ + BcolzMinuteBarWriter.open(output_dir, end) else: - self._writer = BcolzMinuteBarWriter( + self._writers[key] = BcolzMinuteBarWriter( rootdir=output_dir, calendar=open_calendar, minutes_per_day=self.minutes_per_day, - start_session=self.start, - end_session=self.end, + start_session=start, + end_session=end, write_metadata=True, default_ohlc_ratio=self.default_ohlc_ratio ) - elif self.data_frequency == 'daily': + elif data_frequency == 'daily': if len(os.listdir(output_dir)) > 0: - self._writer = BcolzDailyBarWriter.open(output_dir, self.end) + self._writers[key] = BcolzDailyBarWriter.open(output_dir, end) else: - self._writer = BcolzDailyBarWriter( + self._writers[key] = BcolzDailyBarWriter( filename=output_dir, calendar=open_calendar, - start_session=self.start, - end_session=self.end + start_session=start, + end_session=end ) else: raise ValueError( - 'invalid frequency {}'.format(self.data_frequency) + 'invalid frequency {}'.format(data_frequency) ) - return self._writer + return self._writers[key] - def filter_existing_assets(self, assets, start, end): + def filter_existing_assets(self, assets, start, end, data_frequency): """ For each asset, get the close on the start and end dates of the chunk. If the data exists, the chunk ingestion is complete. @@ -161,20 +164,19 @@ class ExchangeBundle: :return: list[TradingPair] The assets missing from the bundle """ + reader = self.get_reader(data_frequency) missing_assets = [] for asset in assets: has_data = True - if has_data and self.reader is not None: + if has_data and reader is not None: try: - start_close = self.reader.get_value( - asset.sid, start, 'close') + start_close = reader.get_value(asset.sid, start, 'close') if np.isnan(start_close): has_data = False else: - end_close = self.reader.get_value( - asset.sid, end, 'close') + end_close = reader.get_value(asset.sid, end, 'close') if np.isnan(end_close): has_data = False @@ -190,36 +192,148 @@ class ExchangeBundle: return missing_assets - def ingest(self): + def ingest_chunk(self, chunk, previous_candle, data_frequency, assets, + writer): + chunk_end = chunk['end'] + chunk_start = chunk_end - timedelta(minutes=chunk['bar_count']) + + chunk_assets = [] + for asset in assets: + if asset.start_date <= chunk_end: + chunk_assets.append(asset) + + missing_assets = self.filter_existing_assets( + assets=chunk_assets, + start=chunk_start, + end=chunk_end, + data_frequency=data_frequency + ) + + if len(missing_assets) == 0: + log.debug('the data chunk already exists') + return + + # TODO: ensure correct behavior for assets starting in the chunk + candles = fetch_candles_chunk( + exchange=self.exchange, + assets=missing_assets, + data_frequency=data_frequency, + end_dt=chunk_end, + bar_count=chunk['bar_count'] + ) + + num_candles = 0 + data = [] + for asset in candles: + asset_candles = candles[asset] + if not asset_candles: + log.debug( + 'no data: {symbols} on {exchange}, date {end}'.format( + symbols=missing_assets, + exchange=self.exchange.name, + end=chunk_end + ) + ) + continue + + all_dates = [] + all_candles = [] + date = chunk_start + while date <= chunk_end: + + previous = previous_candle[asset] \ + if asset in previous_candle else None + + candle = next((candle for candle in asset_candles \ + if candle['last_traded'] == date), + previous) + + if candle is not None: + all_dates.append(date) + all_candles.append(candle) + + previous_candle[asset] = candle + + date += timedelta(minutes=1) + + df = pd.DataFrame(all_candles, index=all_dates) + if not df.empty: + df.sort_index(inplace=True) + + sid = asset.sid + num_candles += len(df.values) + + data.append((sid, df)) + + try: + log.debug( + 'writing {num_candles} candles from {start} to {end}'.format( + num_candles=num_candles, + start=chunk_start, + end=chunk_end + ) + ) + + for pair in data: + log.debug('data for sid {}\n{}\n{}'.format( + pair[0], pair[1].head(2), pair[1].tail(2))) + + writer.write( + data=data, + show_progress=False, + invalid_data_behavior='raise' + ) + except BcolzMinuteOverlappingData as e: + log.warn('chunk already exists {}: {}'.format(chunk, e)) + + def ingest(self, data_frequency, include_symbols=None, + exclude_symbols=None, start=None, end=None, + show_progress=True, environ=os.environ): + """ + Ingest the bundle + + :param data_frequency: + :param include_symbols: + :param exclude_symbols: + :param start: + :param end: + :param show_progress: + :param environ: + :return: + """ + + assets = self.get_assets(include_symbols, exclude_symbols) + start, end = self.get_adj_dates(start, end, assets) + symbols = [] log.debug( 'ingesting trading pairs {symbols} on exchange {exchange} ' 'from {start} to {end}'.format( symbols=symbols, exchange=self.exchange.name, - start=self.start, - end=self.end + start=start, + end=end ) ) - delta = self.end - self.start - if self.data_frequency == 'minute': + delta = end - start + if data_frequency == 'minute': delta_periods = delta.total_seconds() / 60 - frequency = '1m' - elif self.data_frequency == 'daily': + elif data_frequency == 'daily': delta_periods = delta.total_seconds() / 60 / 60 / 24 - frequency = '1d' else: raise ValueError('frequency not supported') + writer = self.get_writer(data_frequency, start, end) + if delta_periods > self.exchange.num_candles_limit: bar_count = self.exchange.num_candles_limit chunks = [] - last_chunk_date = self.end.floor('1 min') - while last_chunk_date > self.start + timedelta(minutes=bar_count): + last_chunk_date = end.floor('1 min') + while last_chunk_date > start + timedelta(minutes=bar_count): # TODO: account for the partial last bar chunk = dict(end=last_chunk_date, bar_count=bar_count) chunks.append(chunk) @@ -231,102 +345,22 @@ class ExchangeBundle: chunks.reverse() else: - chunks = [dict(end=self.end, bar_count=delta_periods)] + chunks = [dict(end=end, bar_count=delta_periods)] with maybe_show_progress( chunks, - self.show_progress, + show_progress, label='Fetching {exchange} {frequency} candles: '.format( exchange=self.exchange.name, - frequency=self.data_frequency + frequency=data_frequency )) as it: previous_candle = dict() for chunk in it: - chunk_end = chunk['end'] - chunk_start = chunk_end - timedelta(minutes=chunk['bar_count']) - - chunk_assets = [] - for asset in self.assets: - if asset.start_date <= chunk_end: - chunk_assets.append(asset) - - missing_assets = self.filter_existing_assets( - chunk_assets, chunk_start, chunk_end) - - if len(missing_assets) == 0: - log.debug('the data chunk already exists') - continue - - # TODO: ensure correct behavior for assets starting in the chunk - candles = fetch_candles_chunk( - exchange=self.exchange, - assets=missing_assets, - data_frequency=frequency, - end_dt=chunk_end, - bar_count=chunk['bar_count'] + self.ingest_chunk( + chunk=chunk, + previous_candle=previous_candle, + data_frequency=data_frequency, + assets=assets, + writer=writer ) - - num_candles = 0 - data = [] - for asset in candles: - asset_candles = candles[asset] - if not asset_candles: - log.debug( - 'no data: {symbols} on {exchange}, date {end}'.format( - symbols=missing_assets, - exchange=self.exchange.name, - end=chunk_end - ) - ) - continue - - all_dates = [] - all_candles = [] - date = chunk_start - while date <= chunk_end: - - previous = previous_candle[asset] \ - if asset in previous_candle else None - - candle = next((candle for candle in asset_candles \ - if candle['last_traded'] == date), - previous) - - if candle is not None: - all_dates.append(date) - all_candles.append(candle) - - previous_candle[asset] = candle - - date += timedelta(minutes=1) - - df = pd.DataFrame(all_candles, index=all_dates) - if not df.empty: - df.sort_index(inplace=True) - - sid = asset.sid - num_candles += len(df.values) - - data.append((sid, df)) - - try: - log.debug( - 'writing {num_candles} candles from {start} to {end}'.format( - num_candles=num_candles, - start=chunk_start, - end=chunk_end - ) - ) - - for pair in data: - log.debug('data for sid {}\n{}\n{}'.format( - pair[0], pair[1].head(2), pair[1].tail(2))) - - self.writer.write( - data=data, - show_progress=False, - invalid_data_behavior='raise' - ) - except BcolzMinuteOverlappingData as e: - log.warn('chunk already exists {}: {}'.format(chunk, e)) diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index a7337e94..76696c3e 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -14,9 +14,10 @@ class ExchangeBundleTestCase: start = pd.to_datetime('2017-09-01', utc=True) end = pd.Timestamp.utcnow() + exchange_bundle = ExchangeBundle(exchange_name) + log.info('ingesting exchange bundle {}'.format(exchange_name)) - exchange_bundle = ExchangeBundle( - exchange_name=exchange_name, + exchange_bundle.ingest( data_frequency='minute', include_symbols='neo_btc', exclude_symbols=None, @@ -24,5 +25,4 @@ class ExchangeBundleTestCase: end=end, show_progress=True ) - exchange_bundle.ingest() pass diff --git a/tests/exchange/test_data_portal.py b/tests/exchange/test_data_portal.py index 8d8f90e6..80ed5531 100644 --- a/tests/exchange/test_data_portal.py +++ b/tests/exchange/test_data_portal.py @@ -90,6 +90,8 @@ class ExchangeDataPortalTestCase: '1m', 'close', 'minute') + + log.info('found history window: {}'.format(data)) pass def test_get_spot_value_backtest(self): @@ -102,5 +104,5 @@ class ExchangeDataPortalTestCase: date = pd.to_datetime('2017-09-10', utc=True) value = self.data_portal_backtest.get_spot_value( assets, 'close', date, 'minute') + log.info('found spot value {}'.format(value)) pass -