Further documentation of BaseBundle class and UI improvements

This commit is contained in:
Conner Fromknecht
2017-07-13 15:59:50 -07:00
parent f0050f2e2f
commit bfffdc681f
2 changed files with 98 additions and 43 deletions
+94 -42
View File
@@ -107,6 +107,8 @@ class BaseBundle(object):
use_local = environ.get('CATALYST_INGEST_LOCAL', 0) > 0
if use_local:
# User has instructed local curation and ingestion of bundle.
# Fetch raw metadata for all symbols.
raw_metadata = self._fetch_metadata_frame(
api_key,
cache=cache,
@@ -115,8 +117,9 @@ class BaseBundle(object):
show_progress=show_progress,
)
# Compile daily symbol data if bundle supports daily mode and
# persist the dataset to disk.
symbol_map = raw_metadata.symbol
if 'daily' in self.frequencies:
daily_bar_writer.write(
self._fetch_symbol_iter(
@@ -133,6 +136,9 @@ class BaseBundle(object):
show_progress=show_progress,
)
# Post-process metadata using cached symbol frames, and write to
# disk. This metadata must be written before any attempt to write
# either minute or 5-minute data.
metadata = self._post_process_metadata(
raw_metadata,
cache,
@@ -140,7 +146,10 @@ class BaseBundle(object):
)
asset_db_writer.write(metadata)
# Compile 5-minute symbol data if bundle supports 5-minute mode and
# persist the dataset to disk.
if '5-minute' in self.frequencies:
#TODO(cfromknecht) replace with five_minute_bar_writer
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
@@ -155,25 +164,49 @@ class BaseBundle(object):
show_progress=show_progress,
)
# Compile minute symbol data if bundle supports minute mode and
# persist the dataset to disk.
if 'minute' in self.frequencies:
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'minute',
retries,
),
show_progress=show_progress,
)
# For legacy purposes, this call is required to ensure the database
# contains an appropriately initialized file structure. We don't
# forsee a usecase for adjustments at this time, but may later
# choose to expose this functionality in the future.
adjustment_writer.write()
else:
self._download(show_progress, output_dir)
# Otherwise, user has instructed to download and untar bundle
# directly from the bundles `tar_url`.
self._download_and_untar(show_progress, output_dir)
def _download(self, show_progress, output_dir):
def _download_and_untar(self, show_progress, output_dir):
# Download bundle conditioned on whether the user would like progress
# information to be displayed in the CLI.
if show_progress:
data = bundles.download_with_progress(
self.tar_url,
chunk_size=bundles.ONE_MEGABYTE,
label='Downloading bundle: {name}'.format(name=self.name),
label='Downloading {name} bundle'.format(name=self.name),
)
else:
data = bundles.download_without_progress(self.tar_url)
# File transfer has completed, untar the bundle to the appropriate
# data directory.
with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print 'Writing data to {dir}'.format(dir=output_dir)
tar.extractall(output_dir)
def _fetch_metadata_frame(self,
api_key,
@@ -182,8 +215,10 @@ class BaseBundle(object):
environ=None,
show_progress=False):
# Setup raw metadata iterator to fetch pages if necessary.
raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ)
# Concatenate all frame in iterator to compute a single metadata frame.
with maybe_show_progress(
raw_iter,
show_progress,
@@ -196,41 +231,10 @@ class BaseBundle(object):
return metadata
def _post_process_metadata(self, metadata, cache, show_progress=False):
final_metadata = pd.DataFrame(
columns=self.md_column_names,
index=metadata.index,
)
with maybe_show_progress(
metadata.symbol.iteritems(),
show_progress,
label='Post-processing symbol metadata',
item_show_func=item_show_count(len(metadata)),
length=len(metadata),
show_percent=False,
) as symbols_map:
for asset_id, symbol in symbols_map:
try:
raw_data = cache[symbol]
except KeyError:
raise ValueError(
'Unable to find cached data for symbol: {0}'.format(symbol)
)
final_symbol_metadata = self.post_process_symbol_metadata(
metadata.iloc[asset_id],
raw_data,
)
final_metadata.iloc[asset_id] = final_symbol_metadata
final_metadata['exchange'] = self.exchange
return final_metadata
def _fetch_metadata_iter(self, api_key, cache, retries, environ):
for page_number in count(1):
# Attempt to load metadata page from cache. If it does not exist,
# poll the API upto `retries` times in order to get raw DataFrame.
key = 'metadata-page-{pn}'.format(pn=page_number)
try:
raw = cache[key]
@@ -253,14 +257,58 @@ class BaseBundle(object):
if raw.empty:
# empty DataFrame signals completion
# Empty DataFrame signals completion.
break
# update cached value for key
# Update cached value for key.
cache[key] = raw
# Return metadata frame to application.
yield raw
def _post_process_metadata(self, metadata, cache, show_progress=False):
# Create empty data frame using target metadata column names and dtypes
final_metadata = pd.DataFrame(
columns=self.md_column_names,
index=metadata.index,
)
# Iterate over the available symbols, loading the asset's raw symbol
# data from the cache. The final metadata is computed and recorded in
# the appropriate row depending on the asset's id.
with maybe_show_progress(
metadata.symbol.iteritems(),
show_progress,
label='Post-processing symbol metadata',
item_show_func=item_show_count(len(metadata)),
length=len(metadata),
show_percent=False,
) as symbols_map:
for asset_id, symbol in symbols_map:
# Attempt to load data from disk, the cache should have an entry
# for each symbol at this point of the execution. If one does
# not exist, we should fail.
try:
raw_data = cache[symbol]
except KeyError:
raise ValueError(
'Unable to find cached data for symbol: {0}'.format(symbol)
)
# Perform and require post-processing of metadata.
final_symbol_metadata = self.post_process_symbol_metadata(
metadata.iloc[asset_id],
raw_data,
)
# Record symbol's final metadata.
final_metadata.iloc[asset_id] = final_symbol_metadata
# Register all assets with the bundle's default exchange.
final_metadata['exchange'] = self.exchange
return final_metadata
def _fetch_symbol_iter(self,
api_key,
cache,
@@ -314,6 +362,8 @@ class BaseBundle(object):
start_session,
end_session,
data_frequency):
# Attempt to load pre-existing symbol data from cache.
try:
raw_data = cache[symbol]
except KeyError:
@@ -377,6 +427,7 @@ class BaseBundle(object):
minute_bar_writer,
assets,
show_progress=False):
if data_frequency == 'daily':
daily_bar_writer.write(
pricing_iter,
@@ -384,6 +435,7 @@ class BaseBundle(object):
show_progress=show_progress,
)
elif data_frequency == '5-minute':
# TODO(cfromknecht) replace with five minute writer
minute_bar_writer.write(
pricing_iter,
show_progress=show_progress,
+4 -1
View File
@@ -33,6 +33,7 @@ from catalyst.utils.input_validation import ensure_timestamp, optionally
import catalyst.utils.paths as pth
from catalyst.utils.preprocess import preprocess
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
ONE_MEGABYTE = 1024 * 1024
@@ -158,7 +159,9 @@ def download_with_progress(url, chunk_size, **progress_kwargs):
total_size = int(resp.headers['content-length'])
data = BytesIO()
with click.progressbar(length=total_size, **progress_kwargs) as pbar:
progress_kwargs['length'] = total_size
with maybe_show_progress(None, True, **progress_kwargs) as pbar:
for chunk in resp.iter_content(chunk_size=chunk_size):
data.write(chunk)
pbar.update(len(chunk))