From aeb6c01272a448ef1258c7814dcfb5ba7ef86477 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 11 Jul 2017 04:10:28 -0700 Subject: [PATCH] WIP: abstract bundle class for generalizing data curation Started PoloniexBundle to adapt current ingestion logic to new structure --- catalyst/data/bundles/__init__.py | 2 +- catalyst/data/bundles/bundle.py | 356 +++++++++++++++++++++++ catalyst/data/bundles/core.py | 31 +- catalyst/data/bundles/poloniex.py | 33 +-- catalyst/data/bundles/poloniex_bundle.py | 187 ++++++++++++ 5 files changed, 588 insertions(+), 21 deletions(-) create mode 100644 catalyst/data/bundles/bundle.py create mode 100644 catalyst/data/bundles/poloniex_bundle.py diff --git a/catalyst/data/bundles/__init__.py b/catalyst/data/bundles/__init__.py index 17343b9c..9ba8e11c 100644 --- a/catalyst/data/bundles/__init__.py +++ b/catalyst/data/bundles/__init__.py @@ -1,6 +1,6 @@ # These imports are necessary to force module-scope register calls to happen. from . import quandl # noqa -from . import poloniex +from . import poloniex_bundle from .core import ( UnknownBundle, bundles, diff --git a/catalyst/data/bundles/bundle.py b/catalyst/data/bundles/bundle.py new file mode 100644 index 00000000..0ab965b7 --- /dev/null +++ b/catalyst/data/bundles/bundle.py @@ -0,0 +1,356 @@ +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from itertools import count +import tarfile +from time import time, sleep + +from abc import abstractmethod +import logbook +import pandas as pd + +from . import core as bundles + +from catalyst.utils.cli import maybe_show_progress +from catalyst.utils.memoize import lazyval + +logbook.StderrHandler().push_application() +log = logbook.Logger(__name__) + +class AbstractBundle(object): + def __init__(self): + pass + + @lazyval + def name(self): + raise NotImplementedError() + + @lazyval + def exchange(self): + raise NotImplementedError() + + @lazyval + def calendar_name(self): + raise NotImplementedError() + + @lazyval + def minutes_per_day(self): + raise NotImplementedError() + + @lazyval + def frequencies(self): + raise NotImplementedError() + + @lazyval + def md_column_names(self): + return _dtypes_to_cols(self.md_dtypes) + + @lazyval + def md_dtypes(self): + raise NotImplementedError() + + @lazyval + def column_names(self): + return _dtypes_to_cols(self.dtypes) + + @lazyval + def dtypes(self): + raise NotImplementedError() + + @lazyval + def tar_url(self): + raise NotImplementedError() + + @lazyval + def wait_time(self): + raise NotImplementedError() + + @abstractmethod + def fetch_raw_metadata_frame(self, api_key, page_number): + raise NotImplementedError() + + def post_process_symbol_metadata(self, metadata, data): + return metadata + + @abstractmethod + def fetch_raw_symbol_frame(self, api_key, symbol, start_date, end_date): + raise NotImplementedError() + + def ingest(self, + environ, + asset_db_writer, + minute_bar_writer, + daily_bar_writer, + adjustment_writer, + calendar, + start_session, + end_session, + cache, + show_progress, + output_dir): + + api_key = environ.get('CATALYST_API_KEY') + retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5) + use_local = environ.get('CATALYST_INGEST_LOCAL', 0) > 0 + + if use_local: + raw_metadata = self._fetch_metadata_frame( + api_key, + cache=cache, + retries=retries, + environ=environ, + show_progress=show_progress, + ) + + symbol_map = raw_metadata.symbol + + daily_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'daily', + retries, + ), + assets=raw_metadata.index, + show_progress=show_progress, + ) + + """ + for data_frequency in self.frequencies: + self._write_symbol_for_freq( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + data_frequency, + retries, + ), + data_frequency, + daily_bar_writer, + minute_bar_writer, + assets=raw_metadata.index, + show_progress=show_progress, + ) + """ + + + + metadata = self._post_process_metadata(raw_metadata, cache) + asset_db_writer.write(metadata) + + adjustment_writer.write() + else: + self._download(show_progress, output_dir) + + def _download(self, show_progress, output_dir): + if show_progress: + data = bundles.download_with_progress( + self.tar_url, + chunk_size=bundles.ONE_MEGABYTE, + label='Downloading bundle: {name}'.format(name=self.name), + ) + else: + data = bundles.download_without_progress(self.tar_url) + + with tarfile.open('r', fileobj=data) as tar: + if show_progress: + print 'Writing data to {dir}'.format(dir=output_dir) + tar.extractall(output_dir) + + + def _fetch_metadata_frame(self, + api_key, + cache, + retries=5, + environ=None, + show_progress=False): + + raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ) + + def item_show_func(_, _it=iter(count())): + return 'Downloading metadata page: {0}'.format(next(_it)) + + with maybe_show_progress( + raw_iter, + show_progress, + item_show_func=item_show_func, + label='Fetching {bundle} metadata:'.format(bundle=self.name), + ) as blocks: + metadata = pd.concat(blocks, ignore_index=True) + + return metadata + + def _post_process_metadata(self, metadata, cache): + final_metadata = pd.DataFrame( + columns=self.md_column_names, + index=metadata.index, + ) + + for asset_id, symbol in metadata.symbol.iteritems(): + try: + raw_data = cache[symbol] + except KeyError: + raise ValueError( + 'Unable to find cached data for symbol: {0}'.format(symbol) + ) + + final_symbol_metadata = self.post_process_symbol_metadata( + metadata.iloc[asset_id], + raw_data, + ) + + final_metadata.iloc[asset_id] = final_symbol_metadata + + final_metadata['exchange'] = self.exchange + + return final_metadata + + def _fetch_metadata_iter(self, api_key, cache, retries, environ): + for page_number in count(1): + key = 'metadata-page-{pn}'.format(pn=page_number) + try: + raw = cache[key] + except KeyError: + for _ in range(retries): + try: + raw = self.fetch_raw_metadata_frame( + api_key, + page_number, + ) + break + except ValueError: + raw = pd.DataFrame([]) + break + else: + raise ValueError( + 'Failed to download metadata page %d after %d ' + 'attempts.'.format(page_number, retries), + ) + + # update cached value for key + cache[key] = raw + + if raw.empty: + # empty DataFrame signals completion + break + + yield raw + + def _fetch_symbol_iter(self, + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + frequency, + retries): + + for asset_id, symbol in symbol_map.iteritems(): + start_time = pd.Timestamp.utcnow() + try: + raw_data = cache[symbol] + except KeyError: + raw_data = None + + if raw_data is not None and not raw_data.empty: + last = raw_data.index[-1].tz_localize('UTC') + else: + last = start_session + + next_start_time = last + pd.Timedelta(minutes=5) + if start_time > next_start_time: + raw_diff = self.fetch_raw_symbol_frame( + api_key, + symbol, + last, + end_session, + frequency, + ) + raw_diff = raw_diff[ + (raw_diff.index >= last) & + (raw_diff.index <= end_session) + ] + + raw_data = cache[symbol] = ( + raw_data.append(raw_diff) + if raw_data is not None else + raw_diff + ) + + raw_data = raw_data[~raw_data.index.duplicated(keep='last')] + + should_sleep = True + else: + should_sleep = False + + """ + sessions = calendar.sessions_in_range(start_session, end_session) + + print 'raw_data before:\n', raw_data.head() + raw_data = raw_data.reindex( + sessions, + copy=False, + ).fillna(0.0) + print 'raw_data after:\n', raw_data.head() + """ + + yield asset_id, raw_data + + if should_sleep: + remaining = pd.Timestamp.utcnow() - start_time + self.wait_time + if remaining.value > 0: + sleep(remaining.value / 10**9) + + def _write_symbol_for_freq(self, + pricing_iter, + data_frequency, + daily_bar_writer, + minute_bar_writer, + assets, + show_progress=False): + if data_frequency == 'daily': + daily_bar_writer.write( + pricing_iter, + assets=assets, + show_progress=show_progress, + ) + elif data_frequency == '5-minute': + minute_bar_writer.write( + pricing_iter, + assets=assets, + show_progress=show_progress, + ) + elif data_frequency == 'minute': + minute_bar_writer.write( + pricing_iter, + assets=assets, + show_progress=show_progress, + ) + else: + raise ValueError( + 'Unsupported data-frequency: {0}'.format(data_frequency) + ) + +def _dtypes_to_cols(dtypes): + return [name for name, _ in dtypes] diff --git a/catalyst/data/bundles/core.py b/catalyst/data/bundles/core.py index c3009f1d..5bd4cf8a 100644 --- a/catalyst/data/bundles/core.py +++ b/catalyst/data/bundles/core.py @@ -204,7 +204,7 @@ BundleData = namedtuple( BundleCore = namedtuple( 'BundleCore', - 'bundles register unregister ingest load clean', + 'bundles register_bundle register unregister ingest load clean', ) @@ -258,6 +258,8 @@ def _make_bundle_core(): ------- bundles : mappingproxy The mapping of bundles to bundle payloads. + register_bundle : Bundle + A bundle instance to add to the ``bundles`` mapping. register : callable The function which registers new bundles in the ``bundles`` mapping. unregister : callable @@ -275,6 +277,21 @@ def _make_bundle_core(): # warn when trampling another bundle. bundles = mappingproxy(_bundles) + def register_bundle(bundle_cls, + start_session=None, + end_session=None, + create_writers=True): + bundle = bundle_cls() + return register( + bundle.name, + bundle.ingest, + calendar_name=bundle.calendar_name, + minutes_per_day=bundle.minutes_per_day, + start_session=start_session, + end_session=end_session, + create_writers=create_writers, + ) + @curry def register(name, f, @@ -670,7 +687,15 @@ def _make_bundle_core(): return cleaned - return BundleCore(bundles, register, unregister, ingest, load, clean) + return BundleCore( + bundles, + register_bundle, + register, + unregister, + ingest, + load, + clean, + ) -bundles, register, unregister, ingest, load, clean = _make_bundle_core() +bundles, register_bundle, register, unregister, ingest, load, clean = _make_bundle_core() diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index 3303a7a8..a3906025 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -1,4 +1,3 @@ -from io import BytesIO import tarfile from . import core as bundles @@ -7,22 +6,22 @@ POLONIEX_BUNDLE_URL = ( 'https://www.dropbox.com/s/9naqffawnq8o4r2/poloniex-bundle.tar?dl=1' ) -@bundles.register( - 'poloniex', - create_writers=False, - calendar_name='OPEN', - minutes_per_day=1440) -def quantopian_quandl_bundle(environ, - asset_db_writer, - minute_bar_writer, - daily_bar_writer, - adjustment_writer, - calendar, - start_session, - end_session, - cache, - show_progress, - output_dir): +#@bundles.register( +# 'poloniex', +# create_writers=False, +# calendar_name='OPEN', +# minutes_per_day=1440) +def poloniex_bundle(environ, + asset_db_writer, + minute_bar_writer, + daily_bar_writer, + adjustment_writer, + calendar, + start_session, + end_session, + cache, + show_progress, + output_dir): if show_progress: data = bundles.download_with_progress( POLONIEX_BUNDLE_URL, diff --git a/catalyst/data/bundles/poloniex_bundle.py b/catalyst/data/bundles/poloniex_bundle.py new file mode 100644 index 00000000..87ac00e1 --- /dev/null +++ b/catalyst/data/bundles/poloniex_bundle.py @@ -0,0 +1,187 @@ +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +import pandas as pd + +from six.moves.urllib.parse import urlencode + +from catalyst.data.bundles.core import register_bundle +from catalyst.data.bundles.bundle import AbstractBundle +from catalyst.utils.memoize import lazyval + +class PoloniexBundle(AbstractBundle): + def __init__(self): + super(self.__class__, self).__init__() + + @lazyval + def name(self): + return 'poloniex' + + @lazyval + def exchange(self): + return 'POLO' + + @lazyval + def calendar_name(self): + return 'OPEN' + + @lazyval + def minutes_per_day(self): + return 1440 + + @lazyval + def frequencies(self): + return set(( + 'daily', + )) + + @lazyval + def md_dtypes(self): + return [ + ('symbol', 'object'), + ('start_date', 'datetime64[ns]'), + ('end_date', 'datetime64[ns]'), + ('ac_date', 'datetime64[ns]'), + ] + + @lazyval + def dtypes(self): + return [ + ('date', 'datetime64[ns]'), + ('open', 'float64'), + ('high', 'float64'), + ('low', 'float64'), + ('close', 'float64'), + ('volume', 'float64'), + ] + + @lazyval + def tar_url(self): + return ( + 'https://www.dropbox.com/s/9naqffawnq8o4r2/' + 'poloniex-bundle.tar?dl=1' + ) + + @lazyval + def wait_time(self): + return pd.Timedelta(milliseconds=170) + + def fetch_raw_metadata_frame(self, api_key, page_number): + if page_number > 1: + return pd.DataFrame([]) + + raw = pd.read_json( + self._format_metadata_url( + api_key, + page_number, + ), + orient='index', + ) + + raw = raw.sort_index().reset_index() + raw.rename( + columns={'index':'symbol'}, + inplace=True, + ) + + return raw + + def post_process_symbol_metadata(self, metadata, data): + start_date = data.index[0].tz_localize(None) + end_date = data.index[-1].tz_localize(None) + ac_date = end_date + pd.Timedelta(days=1) + + return ( + metadata.symbol, + start_date, + end_date, + ac_date, + ) + + def fetch_raw_symbol_frame(self, + api_key, + symbol, + start_date, + end_date, + frequency): + raw = pd.read_json( + self._format_data_url( + api_key, + symbol, + start_date, + end_date, + frequency, + ), + orient='records', + ) + + raw.set_index('date', inplace=True) + + scale = 1000.0 + raw.loc[:, 'open'] /= scale + raw.loc[:, 'high'] /= scale + raw.loc[:, 'low'] /= scale + raw.loc[:, 'close'] /= scale + raw.loc[:, 'volume'] *= scale + + return raw + + ''' + HELPER METHODS + ''' + + def _format_metadata_url(self, api_key, page_number): + query_params = [ + ('command', 'returnTicker'), + ] + + return self._format_polo_query(query_params) + + + def _format_data_url(self, + api_key, + symbol, + start_date, + end_date, + data_frequency): + period_map = { + 'daily': 86400, + '5-minute': 300, + 'minute': 60, + } + + try: + period = period_map[data_frequency] + except KeyError: + return None + + query_params = [ + ('command', 'returnChartData'), + ('currencyPair', symbol), + ('start', start_date.value / 10**9), + ('end', end_date.value / 10**9), + ('period', period), + ] + + return self._format_polo_query(query_params) + + def _format_polo_query(self, query_params): + return 'https://poloniex.com/public?{query}'.format( + query=urlencode(query_params), + ) + +register_bundle(PoloniexBundle)