WIP: Integration of generic bundle and five minute bars

This commit is contained in:
Conner Fromknecht
2017-07-19 11:38:32 -07:00
parent e28459b19f
commit 3df274431e
22 changed files with 671 additions and 344 deletions
+9 -1
View File
@@ -290,6 +290,13 @@ def catalyst_magic(line, cell=None):
show_default=True,
help='The data bundle to ingest.',
)
@click.option(
'-c',
'--compile-locally',
is_flag=True,
default=False,
help='Download dataset from source and compile bundle locally.',
)
@click.option(
'--assets-version',
type=int,
@@ -301,7 +308,7 @@ def catalyst_magic(line, cell=None):
default=True,
help='Print progress information to the terminal.'
)
def ingest(bundle, assets_version, show_progress):
def ingest(bundle, compile_locally, assets_version, show_progress):
"""Ingest the data for the given bundle.
"""
bundles_module.ingest(
@@ -310,6 +317,7 @@ def ingest(bundle, assets_version, show_progress):
pd.Timestamp.utcnow(),
assets_version,
show_progress,
compile_locally,
)
+21 -3
View File
@@ -308,7 +308,10 @@ class TradingAlgorithm(object):
self.asset_finder = self.trading_environment.asset_finder
# Initialize Pipeline API data.
self.init_engine(kwargs.pop('get_pipeline_loader', None))
self.init_engine(
kwargs.pop('get_pipeline_loader', None),
self.sim_params.data_frequency,
)
self._pipelines = {}
# Create an always-expired cache so that we compute the first time data
# is requested.
@@ -422,16 +425,31 @@ class TradingAlgorithm(object):
self.restrictions = NoRestrictions()
def init_engine(self, get_loader):
def init_engine(self, get_loader, data_frequency):
"""
Construct and store a PipelineEngine from loader.
If get_loader is None, constructs an ExplodingPipelineEngine
"""
if get_loader is not None:
if data_frequency == 'daily':
all_dates = self.trading_calendar.all_sessions
elif data_frequency == '5-minute':
all_dates = self.trading_calendar.all_five_minutes
elif data_frequency == 'minute':
all_dates = self.trading_calendar.all_minutes
else:
raise ValueError(
'Cannot initialize engine with '
'data frequency: {}'.format(data_frequency)
)
print 'first_dates:', all_dates[:10]
print 'last_dates:', all_dates[:-10]
self.engine = SimplePipelineEngine(
get_loader,
self.trading_calendar.all_sessions,
all_dates,
self.asset_finder,
)
else:
+79
View File
@@ -35,6 +35,17 @@ def minute_value(ndarray[long_t, ndim=1] market_opens,
return market_opens[q] + r
@cython.cdivision(True)
def five_minute_value(ndarray[long_t, ndim=1] market_opens,
Py_ssize_t pos,
short five_minutes_per_day):
cdef short q, r
q = cython.cdiv(pos, five_minutes_per_day)
r = cython.cmod(pos, five_minutes_per_day)
return market_opens[q] + r
def find_position_of_minute(ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t minute_val,
@@ -88,6 +99,26 @@ def find_position_of_minute(ndarray[long_t, ndim=1] market_opens,
return (market_open_loc * minutes_per_day) + delta
def find_position_of_five_minute(ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t five_minute_val,
short five_minutes_per_day,
bool forward_fill):
cdef Py_ssize_t market_open_loc, market_open, delta
market_open_loc = \
searchsorted(market_opens, five_minute_val, side='right') - 1
market_open = market_opens[market_open_loc]
market_close = market_closes[market_open_loc]
if not forward_fill and ((five_minute_val - market_open) >= five_minutes_per_day):
raise ValueError("Given five minutes is not between an open and a close")
delta = int_min(five_minute_val - market_open, market_close - market_open)
return (market_open_loc * five_minutes_per_day) + delta
def find_last_traded_position_internal(
ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
@@ -157,3 +188,51 @@ def find_last_traded_position_internal(
# we've gone to the beginning of this asset's range, and still haven't
# found a trade event
return -1
def find_last_traded_five_minute_position_internal(
ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t end_five_minute,
long_t start_five_minute,
volumes,
short five_minutes_per_day):
cdef Py_ssize_t minute_pos, current_minute, q
five_minute_pos = int_min(
find_position_of_five_minute(
market_opens,
market_closes,
end_five_minute,
five_minutes_per_day,
True,
),
len(volumes) - 1,
)
while five_minute_pos >= 0:
current_five_minute = five_minute_value(
market_opens, five_minute_pos, five_minutes_per_day
)
q = cython.cdiv(five_minute_pos, five_minutes_per_day)
if current_five_minute > market_closes[q]:
five_minute_pos = find_position_of_five_minute(
market_opens,
market_closes,
market_closes[q],
five_minutes_per_day,
False,
)
continue
if current_five_minute < start_five_minute:
return -1
if volumes[five_minute_pos] != 0:
return five_minute_pos
five_minute_pos -= 1
# we've gone to the beginning of this asset's range, and still haven't
# found a trade event
return -1
+148 -127
View File
@@ -33,7 +33,17 @@ from catalyst.utils.memoize import lazyval
logbook.StderrHandler().push_application()
log = logbook.Logger(__name__)
DEFAULT_RETRIES = 5
class BaseBundle(object):
def __init__(self, asset_filter=[]):
self._asset_filter = asset_filter
self._reset()
def _reset(self):
self._splits = []
self._dividends = []
@lazyval
def name(self):
raise NotImplementedError()
@@ -50,6 +60,10 @@ class BaseBundle(object):
def minutes_per_day(self):
raise NotImplementedError()
@lazyval
def five_minutes_per_day(self):
raise NotImplementedError()
@lazyval
def frequencies(self):
raise NotImplementedError()
@@ -101,6 +115,7 @@ class BaseBundle(object):
environ,
asset_db_writer,
minute_bar_writer,
five_minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
@@ -108,110 +123,117 @@ class BaseBundle(object):
end_session,
cache,
show_progress,
is_compile,
output_dir):
api_key = environ.get('CATALYST_API_KEY')
retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5)
use_local = environ.get('CATALYST_INGEST_LOCAL', 0) > 0
try:
api_key = environ.get('CATALYST_API_KEY')
retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5)
if use_local:
# User has instructed local curation and ingestion of bundle.
# Fetch raw metadata for all symbols.
raw_metadata = self._fetch_metadata_frame(
api_key,
cache=cache,
retries=retries,
environ=environ,
show_progress=show_progress,
)
# Compile daily symbol data if bundle supports daily mode and
# persist the dataset to disk.
symbol_map = raw_metadata.symbol
if 'daily' in self.frequencies:
daily_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'daily',
retries,
),
assets=raw_metadata.index,
if is_compile:
# User has instructed local compilation and ingestion of bundle.
# Fetch raw metadata for all symbols.
raw_metadata = self._fetch_metadata_frame(
api_key,
cache=cache,
retries=retries,
environ=environ,
show_progress=show_progress,
)
# Post-process metadata using cached symbol frames, and write to
# disk. This metadata must be written before any attempt to write
# either minute or 5-minute data.
metadata = self._post_process_metadata(
raw_metadata,
cache,
show_progress=show_progress,
)
asset_db_writer.write(metadata)
# Compile daily symbol data if bundle supports daily mode and
# persist the dataset to disk.
symbol_map = raw_metadata.symbol
if 'daily' in self.frequencies:
daily_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'daily',
retries,
),
assets=raw_metadata.index,
show_progress=show_progress,
)
# Compile 5-minute symbol data if bundle supports 5-minute mode and
# persist the dataset to disk.
if '5-minute' in self.frequencies:
#TODO(cfromknecht) replace with five_minute_bar_writer
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'5-minute',
retries,
),
# Post-process metadata using cached symbol frames, and write to
# disk. This metadata must be written before any attempt to write
# either minute or 5-minute data.
metadata = self._post_process_metadata(
raw_metadata,
cache,
show_progress=show_progress,
)
asset_db_writer.write(metadata)
# Compile minute symbol data if bundle supports minute mode and
# persist the dataset to disk.
if 'minute' in self.frequencies:
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'minute',
retries,
),
show_progress=show_progress,
)
# Compile 5-minute symbol data if bundle supports 5-minute mode and
# persist the dataset to disk.
if '5-minute' in self.frequencies:
five_minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'5-minute',
retries,
),
length=len(symbol_map),
show_progress=show_progress,
)
# For legacy purposes, this call is required to ensure the database
# contains an appropriately initialized file structure. We don't
# forsee a usecase for adjustments at this time, but may later
# choose to expose this functionality in the future.
if len(self.splits) > 0 or len(self.dividends) > 0:
# Compile minute symbol data if bundle supports minute mode and
# persist the dataset to disk.
if 'minute' in self.frequencies:
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'minute',
retries,
),
show_progress=show_progress,
)
# For legacy purposes, this call is required to ensure the database
# contains an appropriately initialized file structure. We don't
# forsee a usecase for adjustments at this time, but may later
# choose to expose this functionality in the future.
adjustment_writer.write(
splits=(
pd.concat(self.splits, ignore_index=True)
#if self.splits is not None \
#and len(self.splits) > 0 else
#None
if len(self.splits) > 0 else
None
),
dividends=(
pd.concat(self.dividends, ignore_index=True)
#if self.dividends is not None \
#and len(dividends) > 0 else
#None
if len(self.dividends) > 0 else
None
),
)
else:
# Otherwise, user has instructed to download and untar bundle
# directly from the bundles `tar_url`.
self._download_and_untar(show_progress, output_dir)
except Exception as e:
log.exception(
' Failed to ingest {name}:\n{msg}'.format(
name=self.name,
msg=str(e),
)
)
else:
# Otherwise, user has instructed to download and untar bundle
# directly from the bundles `tar_url`.
self._download_and_untar(show_progress, output_dir)
self._reset()
def _download_and_untar(self, show_progress, output_dir):
# Download bundle conditioned on whether the user would like progress
@@ -233,7 +255,7 @@ class BaseBundle(object):
def _fetch_metadata_frame(self,
api_key,
cache,
retries=5,
retries=DEFAULT_RETRIES,
environ=None,
show_progress=False):
@@ -257,7 +279,7 @@ class BaseBundle(object):
for page_number in count(1):
# Attempt to load metadata page from cache. If it does not exist,
# poll the API upto `retries` times in order to get raw DataFrame.
key = 'metadata-page-{pn}'.format(pn=page_number)
key = 'metadata-page-{pn}.frame'.format(pn=page_number)
try:
raw = cache[key]
except KeyError:
@@ -289,6 +311,10 @@ class BaseBundle(object):
# Empty DataFrame signals completion.
break
# Apply selective asset filtering, useful for benchmark
# ingestion.
raw = raw[raw.symbol.isin(self._asset_filter)]
# Update cached value for key.
cache[key] = raw
@@ -317,8 +343,9 @@ class BaseBundle(object):
# Attempt to load data from disk, the cache should have an entry
# for each symbol at this point of the execution. If one does
# not exist, we should fail.
key = '{sym}.daily.frame'.format(sym=symbol)
try:
raw_data = cache[symbol]
raw_data = cache[key]
except KeyError:
raise ValueError(
'Unable to find cached data for symbol: {0}'.format(symbol)
@@ -398,8 +425,9 @@ class BaseBundle(object):
retries):
# Attempt to load pre-existing symbol data from cache.
key = '{sym}.{freq}.frame'.format(sym=symbol, freq=data_frequency)
try:
raw_data = cache[symbol]
raw_data = cache[key]
except KeyError:
raw_data = None
@@ -418,6 +446,36 @@ class BaseBundle(object):
# proceed to next symbol directly since no API call was required.
return raw_data, should_sleep
# If we arrive here, we must have attempted an API call.
# Setting this flag tells the iterator to pause before starting
# the next asset, that we don't exceed the data source's rate
# limit.
should_sleep = True
raw_data = self._fetch_symbol_frame(
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries=retries,
)
# Cache latest symbol data.
cache[key] = raw_data
return raw_data, should_sleep
def _fetch_symbol_frame(self,
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries=DEFAULT_RETRIES):
# Data for symbol is old enough to attempt an update or is not
# present in the cache. Fetch raw data for a single symbol
# with requested intervals and frequency. Retry as necessary.
@@ -432,6 +490,7 @@ class BaseBundle(object):
data_frequency,
)
raw_data.index = pd.to_datetime(raw_data.index, utc=True)
raw_data.index = raw_data.index.tz_localize('UTC')
# Filter incoming data to fit start and end sessions.
raw_data = raw_data[
@@ -443,15 +502,7 @@ class BaseBundle(object):
# previous frame is probably an incomplete.
raw_data = raw_data[~raw_data.index.duplicated(keep='last')]
# Cache latest symbol data.
cache[symbol] = raw_data
# If we arrive here, we must have attempted an API call.
# This flag tells the iterator to pause before starting the next
# asset, that we don't exceed the data source's rate limit.
should_sleep = True
return raw_data, should_sleep
return raw_data
except Exception as e:
log.exception(
@@ -468,35 +519,5 @@ class BaseBundle(object):
)
def _write_symbol_for_freq(self,
pricing_iter,
data_frequency,
daily_bar_writer,
minute_bar_writer,
assets,
show_progress=False):
if data_frequency == 'daily':
daily_bar_writer.write(
pricing_iter,
assets=assets,
show_progress=show_progress,
)
elif data_frequency == '5-minute':
# TODO(cfromknecht) replace with five minute writer
minute_bar_writer.write(
pricing_iter,
show_progress=show_progress,
)
elif data_frequency == 'minute':
minute_bar_writer.write(
pricing_iter,
show_progress=show_progress,
)
else:
raise ValueError(
'Unsupported data-frequency: {0}'.format(data_frequency)
)
def _dtypes_to_cols(dtypes):
return [name for name, _ in dtypes]
+7 -8
View File
@@ -38,9 +38,6 @@ class BasePricingBundle(BaseBundle):
]
class BaseCryptoPricingBundle(BasePricingBundle):
def __init__(self):
super(BasePricingBundle, self).__init__()
@lazyval
def calendar_name(self):
return 'OPEN'
@@ -49,6 +46,10 @@ class BaseCryptoPricingBundle(BasePricingBundle):
def minutes_per_day(self):
return 1440
@lazyval
def five_minutes_per_day(self):
return 288
@property
def splits(self):
return []
@@ -58,11 +59,6 @@ class BaseCryptoPricingBundle(BasePricingBundle):
return []
class BaseEquityPricingBundle(BasePricingBundle):
def __init__(self):
super(BasePricingBundle, self).__init__()
self._splits = []
self._dividends = []
@lazyval
def calendar_name(self):
return 'NYSE'
@@ -71,6 +67,9 @@ class BaseEquityPricingBundle(BasePricingBundle):
def minutes_per_day(self):
return 390
@lazyval
def five_minutes_per_day(self):
return 78
@property
def splits(self):
+55 -21
View File
@@ -17,6 +17,10 @@ from ..us_equity_pricing import (
SQLiteAdjustmentReader,
SQLiteAdjustmentWriter,
)
from ..five_minute_bars import (
BcolzFiveMinuteBarReader,
BcolzFiveMinuteBarWriter,
)
from ..minute_bars import (
BcolzMinuteBarReader,
BcolzMinuteBarWriter,
@@ -44,16 +48,21 @@ def asset_db_path(bundle_name, timestr, environ=None, db_version=None):
)
def minute_equity_path(bundle_name, timestr, environ=None):
def minute_path(bundle_name, timestr, environ=None):
return pth.data_path(
minute_equity_relative(bundle_name, timestr, environ),
minute_relative(bundle_name, timestr, environ),
environ=environ,
)
def daily_equity_path(bundle_name, timestr, environ=None):
def five_minute_path(bundle_name, timestr, environ=None):
return pth.data_path(
daily_equity_relative(bundle_name, timestr, environ),
five_minute_relative(bundle_name, timestr, environ),
environ=environ,
)
def daily_path(bundle_name, timestr, environ=None):
return pth.data_path(
daily_relative(bundle_name, timestr, environ),
environ=environ,
)
@@ -80,12 +89,14 @@ def cache_relative(bundle_name, timestr, environ=None):
return bundle_name, '.cache'
def daily_equity_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'daily_equities.bcolz'
def daily_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'daily.bcolz'
def five_minute_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'five_minute.bcolz'
def minute_equity_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'minute_equities.bcolz'
def minute_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'minute.bcolz'
def asset_db_relative(bundle_name, timestr, environ=None, db_version=None):
@@ -195,13 +206,14 @@ RegisteredBundle = namedtuple(
'start_session',
'end_session',
'minutes_per_day',
'five_minutes_per_day',
'ingest',
'create_writers']
)
BundleData = namedtuple(
'BundleData',
'asset_finder equity_minute_bar_reader equity_daily_bar_reader '
'asset_finder minute_bar_reader five_minute_bar_reader daily_bar_reader '
'adjustment_reader',
)
@@ -281,15 +293,17 @@ def _make_bundle_core():
bundles = mappingproxy(_bundles)
def register_bundle(bundle_cls,
asset_filter=None,
start_session=None,
end_session=None,
create_writers=True):
bundle = bundle_cls()
bundle = bundle_cls(asset_filter=asset_filter)
return register(
bundle.name,
bundle.ingest,
calendar_name=bundle.calendar_name,
minutes_per_day=bundle.minutes_per_day,
five_minutes_per_day=bundle.five_minutes_per_day,
start_session=start_session,
end_session=end_session,
create_writers=create_writers,
@@ -298,10 +312,11 @@ def _make_bundle_core():
@curry
def register(name,
f,
calendar_name='NYSE',
calendar_name='OPEN',
start_session=None,
end_session=None,
minutes_per_day=390,
minutes_per_day=1440,
five_minutes_per_day=288,
create_writers=True):
"""Register a data bundle ingest function.
@@ -382,6 +397,7 @@ def _make_bundle_core():
start_session=start_session,
end_session=end_session,
minutes_per_day=minutes_per_day,
five_minutes_per_day=five_minutes_per_day,
ingest=f,
create_writers=create_writers,
)
@@ -413,7 +429,8 @@ def _make_bundle_core():
environ=os.environ,
timestamp=None,
assets_versions=(),
show_progress=False):
show_progress=False,
is_compile=False):
"""Ingest data for a given bundle.
Parameters
@@ -463,7 +480,7 @@ def _make_bundle_core():
pth.data_path([], environ=environ))
)
daily_bars_path = wd.ensure_dir(
*daily_equity_relative(
*daily_relative(
name, timestr, environ=environ,
)
)
@@ -477,10 +494,20 @@ def _make_bundle_core():
# when we create the SQLiteAdjustmentWriter below. The
# SQLiteAdjustmentWriter needs to open the daily ctables so
# that it can compute the adjustment ratios for the dividends.
daily_bar_writer.write(())
five_minute_bar_writer = BcolzFiveMinuteBarWriter(
wd.ensure_dir(*five_minute_relative(
name, timestr, environ=environ)
),
calendar,
start_session,
end_session,
five_minutes_per_day=bundle.five_minutes_per_day,
)
minute_bar_writer = BcolzMinuteBarWriter(
wd.ensure_dir(*minute_equity_relative(
wd.ensure_dir(*minute_relative(
name, timestr, environ=environ)
),
calendar,
@@ -488,6 +515,7 @@ def _make_bundle_core():
end_session,
minutes_per_day=bundle.minutes_per_day,
)
assets_db_path = wd.getpath(*asset_db_relative(
name, timestr, environ=environ,
))
@@ -504,6 +532,7 @@ def _make_bundle_core():
)
else:
daily_bar_writer = None
five_minute_bar_writer = None
minute_bar_writer = None
asset_db_writer = None
adjustment_db_writer = None
@@ -515,6 +544,7 @@ def _make_bundle_core():
environ,
asset_db_writer,
minute_bar_writer,
five_minute_bar_writer,
daily_bar_writer,
adjustment_db_writer,
calendar,
@@ -522,6 +552,7 @@ def _make_bundle_core():
end_session,
cache,
show_progress,
is_compile,
pth.data_path([name, timestr], environ=environ),
)
@@ -597,11 +628,14 @@ def _make_bundle_core():
asset_finder=AssetFinder(
asset_db_path(name, timestr, environ=environ),
),
equity_minute_bar_reader=BcolzMinuteBarReader(
minute_equity_path(name, timestr, environ=environ),
minute_bar_reader=BcolzMinuteBarReader(
minute_path(name, timestr, environ=environ),
),
equity_daily_bar_reader=BcolzDailyBarReader(
daily_equity_path(name, timestr, environ=environ),
five_minute_bar_reader=BcolzFiveMinuteBarReader(
five_minute_path(name, timestr, environ=environ),
),
daily_bar_reader=BcolzDailyBarReader(
daily_path(name, timestr, environ=environ),
),
adjustment_reader=SQLiteAdjustmentReader(
adjustment_db_path(name, timestr, environ=environ),
+4 -4
View File
@@ -36,6 +36,7 @@ class PoloniexBundle(BaseCryptoPricingBundle):
def frequencies(self):
return set((
'daily',
'5-minute',
))
@lazyval
@@ -72,8 +73,8 @@ class PoloniexBundle(BaseCryptoPricingBundle):
return raw
def post_process_symbol_metadata(self, asset_id, sym_md, sym_data):
start_date = sym_data.index[0].tz_localize(None)
end_date = sym_data.index[-1].tz_localize(None)
start_date = sym_data.index[0]
end_date = sym_data.index[-1]
ac_date = end_date + pd.Timedelta(days=1)
return (
@@ -100,7 +101,6 @@ class PoloniexBundle(BaseCryptoPricingBundle):
),
orient='records',
)
raw.set_index('date', inplace=True)
scale = 1000.0
@@ -155,4 +155,4 @@ class PoloniexBundle(BaseCryptoPricingBundle):
query=urlencode(query_params),
)
register_bundle(PoloniexBundle)
register_bundle(PoloniexBundle, ['USDT_BTC'])
+77 -22
View File
@@ -42,6 +42,7 @@ from catalyst.assets.roll_finder import (
)
from catalyst.data.dispatch_bar_reader import (
AssetDispatchMinuteBarReader,
AssetDispatchFiveMinuteBarReader,
AssetDispatchSessionBarReader
)
from catalyst.data.resample import (
@@ -114,12 +115,16 @@ class DataPortal(object):
The calendar instance used to provide minute->session information.
first_trading_day : pd.Timestamp
The first trading day for the simulation.
equity_daily_reader : BcolzDailyBarReader, optional
daily_reader : BcolzDailyBarReader, optional
The daily bar reader for equities. This will be used to service
daily data backtests or daily history calls in a minute backetest.
If a daily bar reader is not provided but a minute bar reader is,
the minutes will be rolled up to serve the daily requests.
equity_minute_reader : BcolzMinuteBarReader, optional
five_minute_reader : BcolzFiveMinuteBarReader, optional
The five minute bar reader for equities. This will be used to service
5-minute data backtests or five-minute history calls. This can be used
to serve daily calls if no daily bar reader is provided.
minute_reader : BcolzMinuteBarReader, optional
The minute bar reader for equities. This will be used to service
minute data backtests or minute history calls. This can be used
to serve daily calls if no daily bar reader is provided.
@@ -144,8 +149,9 @@ class DataPortal(object):
asset_finder,
trading_calendar,
first_trading_day,
equity_daily_reader=None,
equity_minute_reader=None,
daily_reader=None,
five_minute_reader=None,
minute_reader=None,
future_daily_reader=None,
future_minute_reader=None,
adjustment_reader=None,
@@ -180,7 +186,7 @@ class DataPortal(object):
# Infer the last session from the provided readers.
last_sessions = [
reader.last_available_dt
for reader in [equity_daily_reader, future_daily_reader]
for reader in [daily_reader, future_daily_reader]
if reader is not None
]
if last_sessions:
@@ -194,7 +200,11 @@ class DataPortal(object):
# Infer the last minute from the provided readers.
last_minutes = [
reader.last_available_dt
for reader in [equity_minute_reader, future_minute_reader]
for reader in [
minute_reader,
five_minute_reader,
future_minute_reader,
]
if reader is not None
]
if last_minutes:
@@ -202,10 +212,12 @@ class DataPortal(object):
else:
self._last_available_minute = None
aligned_equity_minute_reader = self._ensure_reader_aligned(
equity_minute_reader)
aligned_equity_session_reader = self._ensure_reader_aligned(
equity_daily_reader)
aligned_minute_reader = self._ensure_reader_aligned(
minute_reader)
aligned_five_minute_reader = self._ensure_reader_aligned(
five_minute_reader)
aligned_session_reader = self._ensure_reader_aligned(
daily_reader)
aligned_future_minute_reader = self._ensure_reader_aligned(
future_minute_reader)
aligned_future_session_reader = self._ensure_reader_aligned(
@@ -217,12 +229,15 @@ class DataPortal(object):
}
aligned_minute_readers = {}
aligned_five_minute_readers = {}
aligned_session_readers = {}
if aligned_equity_minute_reader is not None:
aligned_minute_readers[Equity] = aligned_equity_minute_reader
if aligned_equity_session_reader is not None:
aligned_session_readers[Equity] = aligned_equity_session_reader
if aligned_minute_reader is not None:
aligned_minute_readers[Equity] = aligned_minute_reader
if aligned_five_minute_reader is not None:
aligned_five_minute_readers[Equity] = aligned_five_minute_reader
if aligned_session_reader is not None:
aligned_session_readers[Equity] = aligned_session_reader
if aligned_future_minute_reader is not None:
aligned_minute_readers[Future] = aligned_future_minute_reader
@@ -252,6 +267,13 @@ class DataPortal(object):
self._last_available_minute,
)
_dispatch_five_minute_reader = AssetDispatchFiveMinuteBarReader(
self.trading_calendar,
self.asset_finder,
aligned_five_minute_readers,
self._last_available_minute,
)
_dispatch_session_reader = AssetDispatchSessionBarReader(
self.trading_calendar,
self.asset_finder,
@@ -261,6 +283,7 @@ class DataPortal(object):
self._pricing_readers = {
'minute': _dispatch_minute_reader,
'5-minute': _dispatch_five_minute_reader,
'daily': _dispatch_session_reader,
}
@@ -514,15 +537,17 @@ class DataPortal(object):
)
else:
if field == "last_traded":
return self.get_last_traded_dt(asset, dt, 'minute')
return self.get_last_traded_dt(asset, dt, data_frequency)
elif field == "price":
return self._get_minute_spot_value(
asset, "close", dt, ffill=True,
return self._get_minutely_spot_value(
asset, "close", dt, data_frequency, ffill=True,
)
elif field == "contract":
return self._get_current_contract(asset, dt)
else:
return self._get_minute_spot_value(asset, field, dt)
return self._get_minutely_spot_value(
asset, field, dt, data_frequency,
)
if assets_is_scalar:
return get_single_asset_value(assets)
@@ -648,8 +673,14 @@ class DataPortal(object):
return spot_value
def _get_minute_spot_value(self, asset, column, dt, ffill=False):
reader = self._get_pricing_reader('minute')
def _get_minutely_spot_value(self,
asset,
column,
dt,
data_frequency,
ffill=False):
reader = self._get_pricing_reader(data_frequency)
if ffill:
# If forward filling, we want the last minute with values (up to
@@ -680,8 +711,32 @@ class DataPortal(object):
# the value we found came from a different day, so we have to adjust
# the data if there are any adjustments on that day barrier
return self.get_adjusted_value(
asset, column, query_dt,
dt, "minute", spot_value=result
asset,
column,
query_dt,
dt,
data_frequency,
spot_value=result
)
def _get_five_minute_spot_value(self, asset, column, dt, ffill=False):
return self._get_minutely_spot_value(
asset,
column,
dt,
ffill,
'5-minute',
)
def _get_minute_spot_value(self, asset, column, dt, ffill=False):
return self._get_minutely_spot_value(
asset,
column,
dt,
ffill,
'minute',
)
def _get_daily_spot_value(self, asset, column, dt):
+5 -1
View File
@@ -130,13 +130,17 @@ class AssetDispatchBarReader(with_metaclass(ABCMeta)):
return results
class AssetDispatchMinuteBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.minutes_in_range(start_dt, end_dt))
class AssetDispatchFiveMinuteBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.five_minutes_in_range(start_dt, end_dt))
class AssetDispatchSessionBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
+53 -42
View File
@@ -35,9 +35,9 @@ from six import with_metaclass
from toolz import keymap, valmap
from catalyst.data._minute_bar_internal import (
minute_value,
find_position_of_minute,
find_last_traded_position_internal
five_minute_value,
find_position_of_five_minute,
find_last_traded_five_minute_position_internal,
)
from catalyst.gens.sim_engine import NANOS_IN_MINUTE
@@ -48,15 +48,16 @@ from catalyst.data.us_equity_pricing import (
check_uint64_safe,
)
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.cli import (
item_show_count,
maybe_show_progress,
)
from catalyst.utils.memoize import lazyval
logger = logbook.Logger('FiveMinuteBars')
OPEN_FIVE_MINUTES_PER_DAY = 288
US_EQUITIES_MINUTES_PER_DAY = 390
DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15
DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15
OHLC_RATIO = 1000000
@@ -66,6 +67,8 @@ OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume'])
UINT64_MAX = iinfo(uint64).max
NANOS_IN_FIVE_MINUTES = 5 * NANOS_IN_MINUTE
class BcolzFiveMinuteOverlappingData(Exception):
pass
@@ -83,7 +86,7 @@ class FiveMinuteBarReader(BarReader):
def _calc_five_minute_index(market_opens, five_minutes_per_day):
five_minutes = np.zeros(len(market_opens) * five_minutes_per_day,
dtype='datetime64[ns]')
deltas = np.arange(0, five_minutes_per_day, dtype='timedelta64[m]')
deltas = 5 * np.arange(0, five_minutes_per_day, dtype='timedelta64[m]')
for i, market_open in enumerate(market_opens):
start = market_open.asm8
five_minute_values = start + deltas
@@ -211,7 +214,7 @@ class BcolzFiveMinuteBarMetadata(object):
"""
FORMAT_VERSION = 3
METADATA_FILENAME = 'metadata.json'
METADATA_FILENAME = 'five-minute-metadata.json'
@classmethod
def metadata_path(cls, rootdir):
@@ -268,7 +271,7 @@ class BcolzFiveMinuteBarMetadata(object):
calendar,
start_session,
end_session,
minutes_per_day,
five_minutes_per_day,
version=version,
)
@@ -345,7 +348,7 @@ class BcolzFiveMinuteBarMetadata(object):
'version': self.version,
'ohlc_ratio': self.default_ohlc_ratio,
'ohlc_ratios_per_sid': self.ohlc_ratios_per_sid,
'minutes_per_day': self.five_minutes_per_day,
'five_minutes_per_day': self.five_minutes_per_day,
'calendar_name': self.calendar.name,
'start_session': str(self.start_session.date()),
'end_session': str(self.end_session.date()),
@@ -461,7 +464,7 @@ class BcolzFiveMinuteBarWriter(object):
five_minutes_per_day,
default_ohlc_ratio=OHLC_RATIO,
ohlc_ratios_per_sid=None,
expectedlen=DEFAULT_EXPECTED_CRYPTO_LEN,
expectedlen=DEFAULT_EXPECTEDLEN_CRYPTO,
write_metadata=True):
self._rootdir = rootdir
@@ -477,11 +480,11 @@ class BcolzFiveMinuteBarWriter(object):
self._default_ohlc_ratio = default_ohlc_ratio
self._ohlc_ratios_per_sid = ohlc_ratios_per_sid
self._minute_index = _calc_minute_index(
self._schedule.market_open, self._minutes_per_day)
self._five_minute_index = _calc_five_minute_index(
self._schedule.market_open, self._five_minutes_per_day)
if write_metadata:
metadata = BcolzMinuteBarMetadata(
metadata = BcolzFiveMinuteBarMetadata(
self._default_ohlc_ratio,
self._ohlc_ratios_per_sid,
self._calendar,
@@ -678,7 +681,11 @@ class BcolzFiveMinuteBarWriter(object):
for k, v in kwargs.items():
table.attrs[k] = v
def write(self, data, show_progress=False, invalid_data_behavior='warn'):
def write(self,
data,
length=None,
show_progress=False,
invalid_data_behavior='warn'):
"""Write a stream of minute data.
Parameters
@@ -698,14 +705,15 @@ class BcolzFiveMinuteBarWriter(object):
show_progress : bool, optional
Whether or not to show a progress bar while writing.
"""
ctx = maybe_show_progress(
with maybe_show_progress(
data,
length=length,
show_percent=False,
show_progress=show_progress,
item_show_func=lambda e: e if e is None else str(e[0]),
label="Merging minute equity files:",
)
write_sid = self.write_sid
with ctx as it:
item_show_func=item_show_count(length),
label='Compiling five-minute data',
) as it:
write_sid = self.write_sid
for e in it:
write_sid(*e, invalid_data_behavior=invalid_data_behavior)
@@ -807,10 +815,13 @@ class BcolzFiveMinuteBarWriter(object):
# Get the number of minutes already recorded in this sid's ctable
num_rec_mins = table.size
all_minutes = self._minute_index
all_minutes = self._five_minute_index
# Get the latest minute we wish to write to the ctable
last_minute_to_write = pd.Timestamp(dts[-1], tz='UTC')
#print 'all_minutes[-1]:', all_minutes[num_rec_mins-1]
#print 'last_minute_to_write:', last_minute_to_write
# In the event that we've already written some minutely data to the
# ctable, guard against overwriting that data.
if num_rec_mins > 0:
@@ -864,7 +875,7 @@ class BcolzFiveMinuteBarWriter(object):
day_ix = self._session_labels.get_loc(day)
# Add one to the 0-indexed day_ix to get the number of days.
num_days = day_ix + 1
return num_days * self._minutes_per_day
return num_days * self._five_minutes_per_day
def truncate(self, date):
"""Truncate data beyond this date in all ctables."""
@@ -1002,7 +1013,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
market_closes = self._market_closes.values.astype('datetime64[m]')
minutes_per_day = (market_closes - market_opens).astype(np.int64) / 5
early_indices = np.where(
minutes_per_day != self._minutes_per_day - 1)[0]
minutes_per_day != self._five_minutes_per_day - 1)[0]
early_opens = self._market_opens[early_indices]
early_closes = self._market_closes[early_indices]
minutes = [(market_open, early_close)
@@ -1030,9 +1041,9 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
"""
itree = IntervalTree()
for market_open, early_close in self._minutes_to_exclude():
start_pos = self._find_position_of_minute(early_close) + 1
start_pos = self._find_position_of_five_minute(early_close) + 1
end_pos = (
self._find_position_of_minute(market_open)
self._find_position_of_five_minute(market_open)
+
self._five_minutes_per_day
-
@@ -1121,7 +1132,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
minute_pos = self._last_get_value_dt_position
else:
try:
minute_pos = self._find_position_of_minute(dt)
minute_pos = self._find_position_of_five_minute(dt)
except ValueError:
raise NoDataOnDate()
@@ -1143,15 +1154,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
return value
def get_last_traded_dt(self, asset, dt):
minute_pos = self._find_last_traded_position(asset, dt)
minute_pos = self._find_last_traded_five_minute_position(asset, dt)
if minute_pos == -1:
return pd.NaT
return self._pos_to_minute(minute_pos)
def _find_last_traded_position(self, asset, dt):
def _find_last_traded_five_minute_position(self, asset, dt):
volumes = self._open_minute_file('volume', asset)
start_date_minute = asset.start_date.value / NANOS_IN_MINUTE
dt_minute = dt.value / NANOS_IN_MINUTE
start_date_minute = asset.start_date.value / NANOS_IN_FIVE_MINUTE
dt_minute = dt.value / NANOS_IN_FIVE_MINUTE
try:
# if we know of a dt before which this asset has no volume,
@@ -1163,13 +1174,13 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
if dt_minute < earliest_dt_to_search:
return -1
pos = find_last_traded_position_internal(
pos = find_last_traded_five_minute_position_internal(
self._market_open_values,
self._market_close_values,
dt_minute,
earliest_dt_to_search,
volumes,
self._minutes_per_day,
self._five_minutes_per_day,
)
if pos == -1:
@@ -1186,15 +1197,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
return pos
def _pos_to_minute(self, pos):
minute_epoch = minute_value(
minute_epoch = five_minute_value(
self._market_open_values,
pos,
self._minutes_per_day
self._five_minutes_per_day
)
return pd.Timestamp(minute_epoch, tz='UTC', unit="m")
def _find_position_of_minute(self, minute_dt):
def _find_position_of_five_minute(self, minute_dt):
"""
Internal method that returns the position of the given minute in the
list of every trading minute since market open of the first trading
@@ -1213,11 +1224,11 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
int: The position of the given minute in the list of all trading
minutes since market open on the first trading day.
"""
return find_position_of_minute(
return find_position_of_five_minute(
self._market_open_values,
self._market_close_values,
minute_dt.value / NANOS_IN_MINUTE,
self._minutes_per_day,
minute_dt.value / NANOS_IN_FIVE_MINUTE,
self._five_minutes_per_day,
False,
)
@@ -1241,8 +1252,8 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
(minutes in range, sids) with a dtype of float64, containing the
values for the respective field over start and end dt range.
"""
start_idx = self._find_position_of_minute(start_dt)
end_idx = self._find_position_of_minute(end_dt)
start_idx = self._find_position_of_five_minute(start_dt)
end_idx = self._find_position_of_five_minute(end_dt)
num_minutes = (end_idx - start_idx + 1)
@@ -1261,7 +1272,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
if field != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=int64)
out = np.zeros(shape, dtype=uint64)
for i, sid in enumerate(sids):
carray = self._open_minute_file(field, sid)
+21 -87
View File
@@ -33,7 +33,7 @@ from ..utils.paths import (
)
from ..utils.deprecate import deprecated
from catalyst.curate.poloniex import PoloniexCurator
from catalyst.data.bundles.poloniex import PoloniexBundle
from catalyst.utils.calendars import get_calendar
@@ -232,11 +232,14 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY',
treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)]
return benchmark_returns, treasury_curves
def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
trading_day, environ=None):
def ensure_crypto_benchmark_data(symbol,
first_date,
last_date,
now,
trading_day,
environ=None):
filename = get_benchmark_filename(symbol)
source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\
format(symbol)
logger.info(
('Loading benchmark data for {symbol!r} '
@@ -269,92 +272,23 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
last_date=last_date
)
def dateparse(time_in_secs):
return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc)
def compute_daily_bars(five_min_bars, schedule):
# filter and copy the entry at the beginning of each session
daily_bars = five_min_bars[
five_min_bars.index.isin(schedule)
].copy()
day_offset = pd.Timedelta(days=1)
# iterate through session starts doing:
# 1. filter five_min_bars to get all entries in one day
# 2. compute daily bar entry
# 3. record in rid-th row of daily_bars
for rid, start_date in enumerate(daily_bars.index):
# compute beginning of next session
end_date = start_date + day_offset
# filter for entries session entries
day_data = five_min_bars[
(five_min_bars.index >= start_date) &
(five_min_bars.index < end_date)
]
# compute and record daily bar
daily_bars.iloc[rid] = (
day_data.open.iloc[0], # first open price
day_data.high.max(), # max of high prices
day_data.low.min(), # min of low prices
day_data.close.iloc[-1], # last close prices
day_data.volume.sum(), # sum of all volumes
)
# scale to allow trading 10-ths of a coin
scale = 10.0
daily_bars.loc[:, 'open'] /= scale
daily_bars.loc[:, 'high'] /= scale
daily_bars.loc[:, 'low'] /= scale
daily_bars.loc[:, 'close'] /= scale
daily_bars.loc[:, 'volume'] *= scale
return daily_bars
five_min_bars = None
# Load benchmark symbol from Poloniex API
try:
# load five minute bars from csv cache
five_min_bars = pd.read_csv(
source_filename,
names=['date', 'open', 'high', 'low', 'close', 'volume'],
index_col=[0],
parse_dates=True,
date_parser=dateparse,
bundle = PoloniexBundle()
bench_raw = bundle._fetch_symbol_frame(
None,
symbol,
get_calendar(bundle.calendar_name),
first_date,
last_date,
'daily',
)
five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s')
except (OSError, IOError):
# Otherwise load from Poloniex API
try:
pc = PoloniexCurator()
pc.append_data_single_pair(symbol)
five_min_bars = pc.to_dataframe(
time.mktime(first_date.timetuple()),
time.mktime(last_date.timetuple()),
currencyPair=symbol,
)
except (OSError, IOError, HTTPError):
logger.exception('Failed to new crypto benchmark returns')
raise
# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)
# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]
except (OSError, IOError, HTTPError):
logger.exception('Failed to fetch new crypto benchmark returns')
raise
# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = bench_raw[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]
try:
+1 -1
View File
@@ -229,7 +229,7 @@ class BcolzDailyBarWriter(object):
@property
def progress_bar_message(self):
return 'Compiling daily pricing data'
return 'Compiling daily data'
def write(self,
data,
+7 -1
View File
@@ -25,7 +25,7 @@ from catalyst.api import (
def initialize(context):
context.ASSET_NAME = 'USDT_ETH'
context.ASSET_NAME = 'USDT_BTC'
context.TARGET_HODL_RATIO = 0.8
context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO
@@ -37,7 +37,13 @@ def initialize(context):
context.is_buying = True
context.asset = symbol(context.ASSET_NAME)
context.i = 0
def handle_data(context, data):
context.i += 1
print 'i:', context.i
starting_cash = context.portfolio.starting_cash
target_hodl_value = context.TARGET_HODL_RATIO * starting_cash
reserve_value = context.RESERVE_RATIO * starting_cash
+1 -1
View File
@@ -52,7 +52,7 @@ def initialize(context):
schedule_function(
rebalance,
date_rules.every_day(),
time_rules=times_rules.every_minute(),
)
+15
View File
@@ -111,6 +111,21 @@ class PerformanceTracker(object):
self.treasury_curves,
self.trading_calendar
)
elif self.emission_rate == '5-minute':
self.all_benchmark_returns = pd.Series(
index=pd.date_range(
self.sim_params.first_open,
self.sim_params.last_close,
freq='5min'
),
)
self.cumulative_risk_metrics = \
risk.RiskMetricsCumulative(
self.sim_params,
self.treasury_curves,
self.trading_calendar,
create_first_day_stats=True,
)
elif self.emission_rate == 'minute':
self.all_benchmark_returns = pd.Series(index=pd.date_range(
self.sim_params.first_open, self.sim_params.last_close,
+1 -1
View File
@@ -131,7 +131,7 @@ cdef class FiveMinuteSimulationClock(MinuteSimulationClock):
for session_idx, session_nano in enumerate(self.sessions_nanos):
five_minutes_nanos = np.arange(
self.market_opens_nanos[session_idx],
self.market_closes_nanos[session_idx] + _nanos_in_five_minutes,
self.market_closes_nanos[session_idx],
_nanos_in_five_minutes
)
five_minutes_by_session[session_nano] = pd.to_datetime(
@@ -33,13 +33,30 @@ class CryptoPricingLoader(PipelineLoader):
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, dataset):
self.raw_price_loader = raw_price_loader
self._columns = dataset.columns
def __init__(self, bundle, data_frequency, dataset):
cal = get_calendar('OPEN')
self._all_sessions = cal.all_sessions
if data_frequency == 'daily':
reader = bundle.daily_bar_reader
all_sessions = cal.all_sessions
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
all_sessions = cal.all_five_minutes
elif daily_bar_reader == 'minute':
reader = bundle.minute_bar_reader
all_sessions = cal.all_minutes
else:
raise ValueError(
'Invalid data frequency: {}'.format(data_frequency)
)
self.raw_price_loader = reader
self._columns = dataset.columns
self._all_sessions = all_sessions
@classmethod
def from_files(cls, pricing_path):
@@ -89,6 +106,13 @@ class CryptoPricingLoader(PipelineLoader):
def _shift_dates(dates, start_date, end_date, shift):
print 'dates.head:\n', dates[:10]
print 'dates.tail:\n', dates[:-10]
print 'start_date:', start_date
print 'end_date:', end_date
print 'shift:', shift
try:
start = dates.get_loc(start_date)
except KeyError:
@@ -36,15 +36,34 @@ class USEquityPricingLoader(PipelineLoader):
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, adjustments_loader, dataset):
self.raw_price_loader = raw_price_loader
self.adjustments_loader = adjustments_loader
def __init__(self, bundle, data_frequency, dataset):
if data_frequency == 'daily':
reader = bundle.daily_bar_reader
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
elif daily_bar_reader == 'minute':
reader = bundle.minute_bar_reader
else:
raise ValueError(
'Invalid data frequency: {}'.format(data_frequency)
)
cal = reader.trading_calendar or get_calendar('NYSE')
if data_frequency == 'daily':
all_sessions = cal.all_sessions
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
all_sessions = cal.all_five_minutes
elif daily_bar_reader == 'minute':
reader = bundle.minute_bar_reader
all_sessions = cal.all_minutes
self.raw_price_loader = reader
self.adjustments_loader = bundle.adjustments_loader
self._columns = dataset.columns
cal = self.raw_price_loader.trading_calendar or \
get_calendar("NYSE")
self._all_sessions = cal.all_sessions
self._all_sessions = all_sessions
@classmethod
def from_files(cls, pricing_path, adjustments_path):
+34
View File
@@ -51,7 +51,10 @@ class BenchmarkSource(object):
elif benchmark_returns is not None:
daily_series = benchmark_returns[sessions[0]:sessions[-1]]
print 'BENCHMARK_RETURNS'
if self.emission_rate == "minute":
print 'BENCHMARK_RETURNS minute'
# we need to take the env's benchmark returns, which are daily,
# and resample them to minute
minutes = trading_calendar.minutes_for_sessions_in_range(
@@ -65,7 +68,22 @@ class BenchmarkSource(object):
)
self._precalculated_series = minute_series
elif self.emission_rate == '5-minute':
print 'BENCHMARK_RETURNS 5-minute'
five_minutes = \
trading_calendar.five_minutes_for_sessions_in_range(
sessions[0],
sessions[-1],
)
five_minute_series = daily_series.reindex(
index=five_minutes,
method='ffill',
)
self._precalculated_series = five_minute_series
else:
print 'BENCHMARK_RETURNS daily'
self._precalculated_series = daily_series
else:
raise Exception("Must provide either benchmark_asset or "
@@ -155,8 +173,24 @@ class BenchmarkSource(object):
ffill=True
)[asset]
return benchmark_series.pct_change()[1:]
elif self.emission_rate == '5-minute':
five_minutes = trading_calendar.five_minutes_for_sessions_in_range(
self.sessions[0], self.sessions[-1]
)
benchmark_series = data_portal.get_history_window(
[asset],
five_minutes[-1],
bar_count=len(five_minutes) + 1,
frequency='5m',
field='price',
data_frequency=self.emission_rate,
ffill=True,
)[asset]
return benchmark_series.pct_change()[1:]
else:
print '----------------------------------------'
start_date = asset.start_date
if start_date < trading_days[0]:
# get the window of close prices for benchmark_asset from the
+67 -4
View File
@@ -117,6 +117,9 @@ class TradingCalendar(with_metaclass(ABCMeta)):
self._trading_minutes_nanos = self.all_minutes.values.\
astype(np.int64)
self._trading_five_minutes_nanos = self.all_five_minutes.values.\
astype(np.int64)
self.first_trading_session = _all_days[0]
self.last_trading_session = _all_days[-1]
@@ -179,6 +182,18 @@ class TradingCalendar(with_metaclass(ABCMeta)):
"""
return int(self._minutes_per_session[start_session:end_session].sum())
@lazyval
def _five_minutes_per_session(self):
diff = self.schedule.market_close - self.schedule.market_open
diff = diff.astype('timedelta64[m]')
return (diff + 1) // 5
def five_minutes_count_for_sessions_in_range(self,
start_session,
end_session):
five_mins = self._five_minutes_per_session[start_session:end_session]
return int(five_mins.sum())
@property
def regular_holidays(self):
"""
@@ -371,6 +386,10 @@ class TradingCalendar(with_metaclass(ABCMeta)):
idx = next_divider_idx(self._trading_minutes_nanos, dt.value)
return self.all_minutes[idx]
def next_five_minute(self, dt):
idx = next_divider_idx(self._trading_five_minutes_nanos, dt.values)
return self.all_five_mintutes[idx]
def previous_minute(self, dt):
"""
Given a dt, return the previous exchange minute.
@@ -465,6 +484,12 @@ class TradingCalendar(with_metaclass(ABCMeta)):
end_minute=self.schedule.at[session_label, 'market_close'],
)
def five_minutes_for_session(self, session_label):
return self.five_minutes_in_range(
start_five_minute=self.schedule.at[session_label, 'market_open'],
end_five_minute=self.schedule.at[session_label, 'market_close'],
)
def minutes_window(self, start_dt, count):
start_dt_nanos = start_dt.value
all_minutes_nanos = self._trading_minutes_nanos
@@ -566,6 +591,20 @@ class TradingCalendar(with_metaclass(ABCMeta)):
return abs(end_idx - start_idx)
def five_minutes_in_range(self, start_five_minute, end_five_minute):
start_idx = searchsorted(self._trading_five_minutes_nanos,
start_five_minute.value)
end_idx = searchsorted(self._trading_five_minutes_nanos,
end_five_minute.value)
if end_five_minute.value == self._trading_five_minutes_nanos[end_idx]:
# if the end minute is a market minute, increase by 1
end_idx += 1
return self.all_five_minutes[start_idx:end_idx]
def minutes_in_range(self, start_minute, end_minute):
"""
Given start and end minutes, return all the calendar minutes
@@ -623,6 +662,15 @@ class TradingCalendar(with_metaclass(ABCMeta)):
return self.minutes_in_range(first_minute, last_minute)
def five_minutes_for_sessions_in_range(self,
start_session_label,
end_session_label):
first_minute, _ = self.open_and_close_for_session(start_session_label)
_, last_minute = self.open_and_close_for_session(end_session_label)
return self.five_minutes_in_range(first_minute, last_minute)
def open_and_close_for_session(self, session_label):
"""
Returns a tuple of timestamps of the open and close of the session
@@ -690,8 +738,7 @@ class TradingCalendar(with_metaclass(ABCMeta)):
def execution_time_from_close(self, close_dates):
return close_dates
@lazyval
def all_minutes(self):
def _all_minutes_with_interval(self, interval):
"""
Returns a DatetimeIndex representing all the minutes in this calendar.
"""
@@ -703,8 +750,10 @@ class TradingCalendar(with_metaclass(ABCMeta)):
deltas = closes_in_ns - opens_in_ns
nanos_in_interval = interval * NANOS_IN_MINUTE
# + 1 because we want 390 days per standard day, not 389
daily_sizes = (deltas / NANOS_IN_MINUTE) + 1
daily_sizes = (deltas / nanos_in_interval) + 1
num_minutes = np.sum(daily_sizes).astype(np.int64)
# One allocation for the entire thing. This assumes that each day
@@ -721,13 +770,27 @@ class TradingCalendar(with_metaclass(ABCMeta)):
np.arange(
opens_in_ns[day_idx],
closes_in_ns[day_idx] + NANOS_IN_MINUTE,
NANOS_IN_MINUTE
nanos_in_interval
)
idx += size_int
return DatetimeIndex(all_minutes).tz_localize("UTC")
@lazyval
def all_five_minutes(self):
"""
Returns a DatetimeIndex representing all the five minutes in this calendar.
"""
return self._all_minutes_with_interval(5)
@lazyval
def all_minutes(self):
"""
Returns a DatetimeIndex representing all the minutes in this calendar.
"""
return self._all_minutes_with_interval(1)
@preprocess(dt=coerce(pd.Timestamp, attrgetter('value')))
def minute_to_session_label(self, dt, direction="next"):
"""
+1
View File
@@ -17,6 +17,7 @@ def item_show_count(total=None):
def item_show_func(item, _it=iter(count())):
if item is not None:
starting = False
return maybe_show_total(next(_it))
return 'DONE'
+10 -8
View File
@@ -156,15 +156,15 @@ def _run(handle_data,
environ=environ,
)
first_trading_day =\
bundle_data.equity_minute_bar_reader.first_trading_day
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
equity_minute_reader=bundle_data.equity_minute_bar_reader,
equity_daily_reader=bundle_data.equity_daily_bar_reader,
minute_reader=bundle_data.minute_bar_reader,
five_minute_reader=bundle_data.five_minute_bar_reader,
daily_reader=bundle_data.daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
@@ -179,13 +179,14 @@ def _run(handle_data,
if b == 'poloniex':
return CryptoPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data,
data_frequency,
CryptoPricing,
)
elif b == 'quantopian-quandl':
elif b == 'quandl':
return USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
bundle_data,
data_frequency,
USEquityPricing,
)
raise ValueError(
@@ -216,6 +217,7 @@ def _run(handle_data,
end=end,
capital_base=capital_base,
data_frequency=data_frequency,
emission_rate=data_frequency,
),
**{
'initialize': initialize,