diff --git a/catalyst/data/bundles/base.py b/catalyst/data/bundles/base.py index b72cc7a6..613f5051 100644 --- a/catalyst/data/bundles/base.py +++ b/catalyst/data/bundles/base.py @@ -107,6 +107,8 @@ class BaseBundle(object): use_local = environ.get('CATALYST_INGEST_LOCAL', 0) > 0 if use_local: + # User has instructed local curation and ingestion of bundle. + # Fetch raw metadata for all symbols. raw_metadata = self._fetch_metadata_frame( api_key, cache=cache, @@ -115,8 +117,9 @@ class BaseBundle(object): show_progress=show_progress, ) + # Compile daily symbol data if bundle supports daily mode and + # persist the dataset to disk. symbol_map = raw_metadata.symbol - if 'daily' in self.frequencies: daily_bar_writer.write( self._fetch_symbol_iter( @@ -133,6 +136,9 @@ class BaseBundle(object): show_progress=show_progress, ) + # Post-process metadata using cached symbol frames, and write to + # disk. This metadata must be written before any attempt to write + # either minute or 5-minute data. metadata = self._post_process_metadata( raw_metadata, cache, @@ -140,7 +146,10 @@ class BaseBundle(object): ) asset_db_writer.write(metadata) + # Compile 5-minute symbol data if bundle supports 5-minute mode and + # persist the dataset to disk. if '5-minute' in self.frequencies: + #TODO(cfromknecht) replace with five_minute_bar_writer minute_bar_writer.write( self._fetch_symbol_iter( api_key, @@ -155,25 +164,49 @@ class BaseBundle(object): show_progress=show_progress, ) + # Compile minute symbol data if bundle supports minute mode and + # persist the dataset to disk. + if 'minute' in self.frequencies: + minute_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'minute', + retries, + ), + show_progress=show_progress, + ) + + # For legacy purposes, this call is required to ensure the database + # contains an appropriately initialized file structure. We don't + # forsee a usecase for adjustments at this time, but may later + # choose to expose this functionality in the future. adjustment_writer.write() else: - self._download(show_progress, output_dir) + # Otherwise, user has instructed to download and untar bundle + # directly from the bundles `tar_url`. + self._download_and_untar(show_progress, output_dir) - def _download(self, show_progress, output_dir): + def _download_and_untar(self, show_progress, output_dir): + # Download bundle conditioned on whether the user would like progress + # information to be displayed in the CLI. if show_progress: data = bundles.download_with_progress( self.tar_url, chunk_size=bundles.ONE_MEGABYTE, - label='Downloading bundle: {name}'.format(name=self.name), + label='Downloading {name} bundle'.format(name=self.name), ) else: data = bundles.download_without_progress(self.tar_url) + # File transfer has completed, untar the bundle to the appropriate + # data directory. with tarfile.open('r', fileobj=data) as tar: - if show_progress: - print 'Writing data to {dir}'.format(dir=output_dir) tar.extractall(output_dir) - def _fetch_metadata_frame(self, api_key, @@ -182,8 +215,10 @@ class BaseBundle(object): environ=None, show_progress=False): + # Setup raw metadata iterator to fetch pages if necessary. raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ) + # Concatenate all frame in iterator to compute a single metadata frame. with maybe_show_progress( raw_iter, show_progress, @@ -196,41 +231,10 @@ class BaseBundle(object): return metadata - def _post_process_metadata(self, metadata, cache, show_progress=False): - final_metadata = pd.DataFrame( - columns=self.md_column_names, - index=metadata.index, - ) - - with maybe_show_progress( - metadata.symbol.iteritems(), - show_progress, - label='Post-processing symbol metadata', - item_show_func=item_show_count(len(metadata)), - length=len(metadata), - show_percent=False, - ) as symbols_map: - for asset_id, symbol in symbols_map: - try: - raw_data = cache[symbol] - except KeyError: - raise ValueError( - 'Unable to find cached data for symbol: {0}'.format(symbol) - ) - - final_symbol_metadata = self.post_process_symbol_metadata( - metadata.iloc[asset_id], - raw_data, - ) - - final_metadata.iloc[asset_id] = final_symbol_metadata - - final_metadata['exchange'] = self.exchange - - return final_metadata - def _fetch_metadata_iter(self, api_key, cache, retries, environ): for page_number in count(1): + # Attempt to load metadata page from cache. If it does not exist, + # poll the API upto `retries` times in order to get raw DataFrame. key = 'metadata-page-{pn}'.format(pn=page_number) try: raw = cache[key] @@ -253,14 +257,58 @@ class BaseBundle(object): if raw.empty: - # empty DataFrame signals completion + # Empty DataFrame signals completion. break - # update cached value for key + # Update cached value for key. cache[key] = raw + # Return metadata frame to application. yield raw + def _post_process_metadata(self, metadata, cache, show_progress=False): + # Create empty data frame using target metadata column names and dtypes + final_metadata = pd.DataFrame( + columns=self.md_column_names, + index=metadata.index, + ) + + # Iterate over the available symbols, loading the asset's raw symbol + # data from the cache. The final metadata is computed and recorded in + # the appropriate row depending on the asset's id. + with maybe_show_progress( + metadata.symbol.iteritems(), + show_progress, + label='Post-processing symbol metadata', + item_show_func=item_show_count(len(metadata)), + length=len(metadata), + show_percent=False, + ) as symbols_map: + for asset_id, symbol in symbols_map: + # Attempt to load data from disk, the cache should have an entry + # for each symbol at this point of the execution. If one does + # not exist, we should fail. + try: + raw_data = cache[symbol] + except KeyError: + raise ValueError( + 'Unable to find cached data for symbol: {0}'.format(symbol) + ) + + # Perform and require post-processing of metadata. + final_symbol_metadata = self.post_process_symbol_metadata( + metadata.iloc[asset_id], + raw_data, + ) + + # Record symbol's final metadata. + final_metadata.iloc[asset_id] = final_symbol_metadata + + # Register all assets with the bundle's default exchange. + final_metadata['exchange'] = self.exchange + + return final_metadata + def _fetch_symbol_iter(self, api_key, cache, @@ -314,6 +362,8 @@ class BaseBundle(object): start_session, end_session, data_frequency): + + # Attempt to load pre-existing symbol data from cache. try: raw_data = cache[symbol] except KeyError: @@ -377,6 +427,7 @@ class BaseBundle(object): minute_bar_writer, assets, show_progress=False): + if data_frequency == 'daily': daily_bar_writer.write( pricing_iter, @@ -384,6 +435,7 @@ class BaseBundle(object): show_progress=show_progress, ) elif data_frequency == '5-minute': + # TODO(cfromknecht) replace with five minute writer minute_bar_writer.write( pricing_iter, show_progress=show_progress, diff --git a/catalyst/data/bundles/core.py b/catalyst/data/bundles/core.py index 5bd4cf8a..6b0abd86 100644 --- a/catalyst/data/bundles/core.py +++ b/catalyst/data/bundles/core.py @@ -33,6 +33,7 @@ from catalyst.utils.input_validation import ensure_timestamp, optionally import catalyst.utils.paths as pth from catalyst.utils.preprocess import preprocess from catalyst.utils.calendars import get_calendar +from catalyst.utils.cli import maybe_show_progress ONE_MEGABYTE = 1024 * 1024 @@ -158,7 +159,9 @@ def download_with_progress(url, chunk_size, **progress_kwargs): total_size = int(resp.headers['content-length']) data = BytesIO() - with click.progressbar(length=total_size, **progress_kwargs) as pbar: + + progress_kwargs['length'] = total_size + with maybe_show_progress(None, True, **progress_kwargs) as pbar: for chunk in resp.iter_content(chunk_size=chunk_size): data.write(chunk) pbar.update(len(chunk))