Merge branch 'rc-0.1.dev7'

This commit is contained in:
Victor Grau Serrat
2017-08-14 14:57:29 -04:00
30 changed files with 1650 additions and 630 deletions
+2 -2
View File
@@ -1,4 +1,4 @@
Dear Zipline Maintainers,
Dear Catalyst Maintainers,
Before I tell you about my issue, let me describe my environment:
@@ -7,7 +7,7 @@ Before I tell you about my issue, let me describe my environment:
* Operating System: (Windows Version or `$ uname --all`)
* Python Version: `$ python --version`
* Python Bitness: `$ python -c 'import math, sys;print(int(math.log(sys.maxsize + 1, 2) + 1))'`
* How did you install Zipline: (`pip`, `conda`, or `other (please explain)`)
* How did you install Catalyst: (`pip`, `conda`, or `other (please explain)`)
* Python packages: `$ pip freeze` or `$ conda list`
Now that you know a little about me, let me tell you about the issue I am
+1 -1
View File
@@ -1 +1 @@
All the documentation for `Catalyst <https://github.com/enigmampc/catalyst>`_ can be found in the `catalyst-docs wiki <https://github.com/enigmampc/catalyst-docs/wiki>`_.
All the documentation for `Catalyst <https://github.com/enigmampc/catalyst>`_ can be found in the `catalyst-docs wiki <https://github.com/enigmampc/catalyst-docs/wiki>`_.
+10 -2
View File
@@ -123,7 +123,7 @@ def ipython_only(option):
)
@click.option(
'--data-frequency',
type=click.Choice({'daily', 'minute'}),
type=click.Choice({'daily', '5-minute', 'minute'}),
default='daily',
show_default=True,
help='The data frequency of the simulation.',
@@ -290,6 +290,13 @@ def catalyst_magic(line, cell=None):
show_default=True,
help='The data bundle to ingest.',
)
@click.option(
'-c',
'--compile-locally',
is_flag=True,
default=False,
help='Download dataset from source and compile bundle locally.',
)
@click.option(
'--assets-version',
type=int,
@@ -301,7 +308,7 @@ def catalyst_magic(line, cell=None):
default=True,
help='Print progress information to the terminal.'
)
def ingest(bundle, assets_version, show_progress):
def ingest(bundle, compile_locally, assets_version, show_progress):
"""Ingest the data for the given bundle.
"""
bundles_module.ingest(
@@ -310,6 +317,7 @@ def ingest(bundle, assets_version, show_progress):
pd.Timestamp.utcnow(),
assets_version,
show_progress,
compile_locally,
)
+59 -19
View File
@@ -133,7 +133,10 @@ from catalyst.utils.security_list import SecurityList
import catalyst.protocol
from catalyst.sources.requests_csv import PandasRequestsCSV
from catalyst.gens.sim_engine import MinuteSimulationClock
from catalyst.gens.sim_engine import (
MinuteSimulationClock,
FiveMinuteSimulationClock,
)
from catalyst.sources.benchmark_source import BenchmarkSource
from catalyst.catalyst_warnings import ZiplineDeprecationWarning
@@ -170,7 +173,7 @@ class TradingAlgorithm(object):
algo_filename : str, optional
The filename for the algoscript. This will be used in exception
tracebacks. default: '<string>'.
data_frequency : {'daily', 'minute'}, optional
data_frequency : {'daily', '5-minute', 'minute'}, optional
The duration of the bars.
instant_fill : bool, optional
Whether to fill orders immediately or on next bar. default: False
@@ -223,7 +226,7 @@ class TradingAlgorithm(object):
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : {'daily', 'minute'}
data_frequency : {'daily', '5-minute', 'minute'}
The duration of the bars.
capital_base : float <default: 1.0e5>
How much capital to start with.
@@ -305,7 +308,10 @@ class TradingAlgorithm(object):
self.asset_finder = self.trading_environment.asset_finder
# Initialize Pipeline API data.
self.init_engine(kwargs.pop('get_pipeline_loader', None))
self.init_engine(
kwargs.pop('get_pipeline_loader', None),
self.sim_params.data_frequency,
)
self._pipelines = {}
# Create an always-expired cache so that we compute the first time data
# is requested.
@@ -419,16 +425,28 @@ class TradingAlgorithm(object):
self.restrictions = NoRestrictions()
def init_engine(self, get_loader):
def init_engine(self, get_loader, data_frequency):
"""
Construct and store a PipelineEngine from loader.
If get_loader is None, constructs an ExplodingPipelineEngine
"""
if get_loader is not None:
if data_frequency == 'daily':
all_dates = self.trading_calendar.all_sessions
elif data_frequency == '5-minute':
all_dates = self.trading_calendar.all_five_minutes
elif data_frequency == 'minute':
all_dates = self.trading_calendar.all_minutes
else:
raise ValueError(
'Cannot initialize engine with '
'data frequency: {}'.format(data_frequency)
)
self.engine = SimplePipelineEngine(
get_loader,
self.trading_calendar.all_sessions,
all_dates,
self.asset_finder,
)
else:
@@ -449,7 +467,7 @@ class TradingAlgorithm(object):
self._in_before_trading_start = True
with handle_non_market_minutes(data) if \
self.data_frequency == "minute" else ExitStack():
self.data_frequency in ('minute', '5-minute') else ExitStack():
self._before_trading_start(self, data)
self._in_before_trading_start = False
@@ -505,10 +523,11 @@ class TradingAlgorithm(object):
market_closes = trading_o_and_c['market_close']
minutely_emission = False
if self.sim_params.data_frequency == 'minute':
if self.sim_params.data_frequency in set(('minute', '5-minute')):
market_opens = trading_o_and_c['market_open']
minutely_emission = self.sim_params.emission_rate == "minute"
minutely_emission = self.sim_params.emission_rate in \
set(('minute', '5-minute'))
else:
# in daily mode, we want to have one bar per session, timestamped
# as the last minute of the session.
@@ -528,10 +547,19 @@ class TradingAlgorithm(object):
# FIXME generalize these values
before_trading_start_minutes = days_at_time(
self.sim_params.sessions,
time(8, 45),
"US/Eastern"
time(0, 0),
'UTC',
)
if self.sim_params.data_frequency == '5-minute':
return FiveMinuteSimulationClock(
self.sim_params.sessions,
execution_opens,
execution_closes,
before_trading_start_minutes,
minute_emission=minutely_emission,
)
return MinuteSimulationClock(
self.sim_params.sessions,
execution_opens,
@@ -660,8 +688,11 @@ class TradingAlgorithm(object):
# Assume data is daily if timestamp times are
# standardized, otherwise assume minute bars.
times = data.major_axis.time
if np.all(times == times[0]):
time_count = times.nunique()
if time_count == 1:
self.sim_params.data_frequency = 'daily'
elif time_count == 288:
self.sim_params.data_frequency = '5-minute'
else:
self.sim_params.data_frequency = 'minute'
@@ -683,6 +714,8 @@ class TradingAlgorithm(object):
if self.sim_params.data_frequency == 'daily':
equity_reader_arg = 'equity_daily_reader'
elif self.sim_params.data_frequency == '5-minute':
equity_daily_reader = 'equity_5_minute_reader'
elif self.sim_params.data_frequency == 'minute':
equity_reader_arg = 'equity_minute_reader'
equity_reader = PanelBarReader(
@@ -926,9 +959,9 @@ class TradingAlgorithm(object):
The arena from the simulation parameters. This will normally
be ``'backtest'`` but some systems may use this distinguish
live trading from backtesting.
data_frequency : {'daily', 'minute'}
data_frequency : {'daily', '5-minute', 'minute'}
data_frequency tells the algorithm if it is running with
daily data or minute data.
daily, minute, or five-minute mode.
start : datetime
The start date for the simulation.
end : datetime
@@ -1102,12 +1135,19 @@ class TradingAlgorithm(object):
'date_rule. You should use keyword argument '
'time_rule= when calling schedule_function without '
'specifying a date_rule', stacklevel=3)
freq = self.sim_params.data_frequency
date_rule = date_rule or date_rules.every_day()
time_rule = ((time_rule or time_rules.every_minute())
if self.sim_params.data_frequency == 'minute' else
# If we are in daily mode the time_rule is ignored.
time_rules.every_minute())
if freq is 'daily':
# ignore time rule in daily mode
time_rule = time_rules.every_minute()
else:
# use provided time rule or default to every minute or 5 minutes
# based on desired data frequency.
time_rule = time_rule or (time_rules.every_5_minutes()
if freq is '5-minute' else
time_rules.every_minute())
# Check the type of the algorithm's schedule before pulling calendar
# Note that the ExchangeTradingSchedule is currently the only
@@ -1782,7 +1822,7 @@ class TradingAlgorithm(object):
@data_frequency.setter
def data_frequency(self, value):
assert value in ('daily', 'minute')
assert value in ('daily', '5-minute', 'minute')
self.sim_params.data_frequency = value
@api_method
+79
View File
@@ -35,6 +35,17 @@ def minute_value(ndarray[long_t, ndim=1] market_opens,
return market_opens[q] + r
@cython.cdivision(True)
def five_minute_value(ndarray[long_t, ndim=1] market_opens,
Py_ssize_t pos,
short five_minutes_per_day):
cdef short q, r
q = cython.cdiv(pos, five_minutes_per_day)
r = cython.cmod(pos, five_minutes_per_day)
return market_opens[q] + r
def find_position_of_minute(ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t minute_val,
@@ -88,6 +99,26 @@ def find_position_of_minute(ndarray[long_t, ndim=1] market_opens,
return (market_open_loc * minutes_per_day) + delta
def find_position_of_five_minute(ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t five_minute_val,
short five_minutes_per_day,
bool forward_fill):
cdef Py_ssize_t market_open_loc, market_open, delta
market_open_loc = \
searchsorted(market_opens, five_minute_val, side='right') - 1
market_open = market_opens[market_open_loc]
market_close = market_closes[market_open_loc]
if not forward_fill and ((five_minute_val - market_open) >= five_minutes_per_day):
raise ValueError("Given five minutes is not between an open and a close")
delta = int_min(five_minute_val - market_open, market_close - market_open)
return (market_open_loc * five_minutes_per_day) + delta
def find_last_traded_position_internal(
ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
@@ -157,3 +188,51 @@ def find_last_traded_position_internal(
# we've gone to the beginning of this asset's range, and still haven't
# found a trade event
return -1
def find_last_traded_five_minute_position_internal(
ndarray[long_t, ndim=1] market_opens,
ndarray[long_t, ndim=1] market_closes,
long_t end_five_minute,
long_t start_five_minute,
volumes,
short five_minutes_per_day):
cdef Py_ssize_t minute_pos, current_minute, q
five_minute_pos = int_min(
find_position_of_five_minute(
market_opens,
market_closes,
end_five_minute,
five_minutes_per_day,
True,
),
len(volumes) - 1,
)
while five_minute_pos >= 0:
current_five_minute = five_minute_value(
market_opens, five_minute_pos, five_minutes_per_day
)
q = cython.cdiv(five_minute_pos, five_minutes_per_day)
if current_five_minute > market_closes[q]:
five_minute_pos = find_position_of_five_minute(
market_opens,
market_closes,
market_closes[q],
five_minutes_per_day,
False,
)
continue
if current_five_minute < start_five_minute:
return -1
if volumes[five_minute_pos] != 0:
return five_minute_pos
five_minute_pos -= 1
# we've gone to the beginning of this asset's range, and still haven't
# found a trade event
return -1
+524
View File
@@ -0,0 +1,524 @@
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import count
import tarfile
from time import time, sleep
from abc import abstractmethod, abstractproperty
import logbook
import pandas as pd
from . import core as bundles
from catalyst.utils.cli import (
item_show_count,
maybe_show_progress
)
from catalyst.utils.memoize import lazyval
logbook.StderrHandler().push_application()
log = logbook.Logger(__name__)
DEFAULT_RETRIES = 5
class BaseBundle(object):
def __init__(self, asset_filter=[]):
self._asset_filter = asset_filter
self._reset()
def _reset(self):
self._splits = []
self._dividends = []
@lazyval
def name(self):
raise NotImplementedError()
@lazyval
def exchange(self):
raise NotImplementedError()
@lazyval
def calendar_name(self):
raise NotImplementedError()
@lazyval
def minutes_per_day(self):
raise NotImplementedError()
@lazyval
def five_minutes_per_day(self):
raise NotImplementedError()
@lazyval
def frequencies(self):
raise NotImplementedError()
@lazyval
def md_column_names(self):
return _dtypes_to_cols(self.md_dtypes)
@lazyval
def md_dtypes(self):
raise NotImplementedError()
@lazyval
def column_names(self):
return _dtypes_to_cols(self.dtypes)
@lazyval
def dtypes(self):
raise NotImplementedError()
@lazyval
def tar_url(self):
raise NotImplementedError()
@lazyval
def wait_time(self):
raise NotImplementedError()
@abstractproperty
def splits(self):
raise NotImplementedError()
@abstractproperty
def dividends(self):
raise NotImplementedError()
@abstractmethod
def fetch_raw_metadata_frame(self, api_key, page_number):
raise NotImplementedError()
def post_process_symbol_metadata(self, metadata, data):
return metadata
@abstractmethod
def fetch_raw_symbol_frame(self, api_key, symbol, start_date, end_date):
raise NotImplementedError()
def ingest(self,
environ,
asset_db_writer,
minute_bar_writer,
five_minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
is_compile,
output_dir):
try:
api_key = environ.get('CATALYST_API_KEY')
retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5)
if is_compile:
# User has instructed local compilation and ingestion of bundle.
# Fetch raw metadata for all symbols.
raw_metadata = self._fetch_metadata_frame(
api_key,
cache=cache,
retries=retries,
environ=environ,
show_progress=show_progress,
)
# Compile daily symbol data if bundle supports daily mode and
# persist the dataset to disk.
symbol_map = raw_metadata.symbol
if 'daily' in self.frequencies:
daily_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'daily',
retries,
),
assets=raw_metadata.index,
show_progress=show_progress,
)
# Post-process metadata using cached symbol frames, and write to
# disk. This metadata must be written before any attempt to write
# either minute or 5-minute data.
metadata = self._post_process_metadata(
raw_metadata,
cache,
show_progress=show_progress,
)
asset_db_writer.write(metadata)
# Compile 5-minute symbol data if bundle supports 5-minute mode and
# persist the dataset to disk.
'''
if '5-minute' in self.frequencies:
five_minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'5-minute',
retries,
),
length=len(symbol_map),
show_progress=show_progress,
)
'''
# Compile minute symbol data if bundle supports minute mode and
# persist the dataset to disk.
if 'minute' in self.frequencies:
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'minute',
retries,
),
show_progress=show_progress,
)
# For legacy purposes, this call is required to ensure the database
# contains an appropriately initialized file structure. We don't
# forsee a usecase for adjustments at this time, but may later
# choose to expose this functionality in the future.
adjustment_writer.write(
splits=(
pd.concat(self.splits, ignore_index=True)
if len(self.splits) > 0 else
None
),
dividends=(
pd.concat(self.dividends, ignore_index=True)
if len(self.dividends) > 0 else
None
),
)
else:
# Otherwise, user has instructed to download and untar bundle
# directly from the bundles `tar_url`.
self._download_and_untar(show_progress, output_dir)
except Exception as e:
log.exception(
' Failed to ingest {name}:\n{msg}'.format(
name=self.name,
msg=str(e),
)
)
else:
self._reset()
def _download_and_untar(self, show_progress, output_dir):
# Download bundle conditioned on whether the user would like progress
# information to be displayed in the CLI.
if show_progress:
data = bundles.download_with_progress(
self.tar_url,
chunk_size=bundles.ONE_MEGABYTE,
label='Downloading {name} bundle'.format(name=self.name),
)
else:
data = bundles.download_without_progress(self.tar_url)
# File transfer has completed, untar the bundle to the appropriate
# data directory.
with tarfile.open('r', fileobj=data) as tar:
tar.extractall(output_dir)
def _fetch_metadata_frame(self,
api_key,
cache,
retries=DEFAULT_RETRIES,
environ=None,
show_progress=False):
# Setup raw metadata iterator to fetch pages if necessary.
raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ)
# Concatenate all frame in iterator to compute a single metadata frame.
with maybe_show_progress(
raw_iter,
show_progress,
label='Fetching symbol metadata',
item_show_func=item_show_count(),
length=3,
show_percent=False,
) as blocks:
metadata = pd.concat(blocks, ignore_index=True)
return metadata
def _fetch_metadata_iter(self, api_key, cache, retries, environ):
for page_number in count(1):
# Attempt to load metadata page from cache. If it does not exist,
# poll the API upto `retries` times in order to get raw DataFrame.
key = 'metadata-page-{pn}.frame'.format(pn=page_number)
try:
raw = cache[key]
except KeyError:
for _ in range(retries):
try:
raw = self.fetch_raw_metadata_frame(
api_key,
page_number,
)
break
except ValueError as e:
raw = pd.DataFrame([])
break
except Exception as e:
log.exception(
'Failed to load metadata from {}. '
'Retrying.'.format(self.name)
)
else:
raise ValueError(
'Failed to download metadata page {} after {} '
'attempts.'.format(page_number, retries)
)
if raw.empty:
# Empty DataFrame signals completion.
break
# Apply selective asset filtering, useful for benchmark
# ingestion.
if self._asset_filter:
raw = raw[raw.symbol.isin(self._asset_filter)]
# Update cached value for key.
cache[key] = raw
# Return metadata frame to application.
yield raw
def _post_process_metadata(self, metadata, cache, show_progress=False):
# Create empty data frame using target metadata column names and dtypes
final_metadata = pd.DataFrame(
columns=self.md_column_names,
index=metadata.index,
)
# Iterate over the available symbols, loading the asset's raw symbol
# data from the cache. The final metadata is computed and recorded in
# the appropriate row depending on the asset's id.
with maybe_show_progress(
metadata.symbol.iteritems(),
show_progress,
label='Post-processing symbol metadata',
item_show_func=item_show_count(len(metadata)),
length=len(metadata),
show_percent=False,
) as symbols_map:
for asset_id, symbol in symbols_map:
# Attempt to load data from disk, the cache should have an entry
# for each symbol at this point of the execution. If one does
# not exist, we should fail.
key = '{sym}.daily.frame'.format(sym=symbol)
try:
raw_data = cache[key]
except KeyError:
raise ValueError(
'Unable to find cached data for symbol: {0}'.format(symbol)
)
# Perform and require post-processing of metadata.
final_symbol_metadata = self.post_process_symbol_metadata(
asset_id,
metadata.iloc[asset_id],
raw_data,
)
# Record symbol's final metadata.
final_metadata.iloc[asset_id] = final_symbol_metadata
# Register all assets with the bundle's default exchange.
final_metadata['exchange'] = self.exchange
return final_metadata
def _fetch_symbol_iter(self,
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
data_frequency,
retries):
for asset_id, symbol in symbol_map.iteritems():
# Record start time of iteration, compare at end of iteration to
# adhere to the datas source's rate limit policy.
start_time = pd.Timestamp.utcnow()
# Fetch new data if cached data is absent or stale, otherwise
# returns the cached data unaltered. The `should_sleep` flag
# indicates that an API call was attempted, and that we should be
# ensure aren't exceeding our rate limit before proceeding to the
# next symbol. If the raw_data is updated, it is cached before being
# returned.
raw_data, should_sleep = self._maybe_update_symbol_frame(
start_time,
api_key,
cache,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries,
)
# TODO(cfromknecht) further data validation?
# Pass asset_id and symbol data to writer.
yield asset_id, raw_data
# If an API call was made during this iteration and the time to
# reach this point was less than the inter-request `wait_time`,
# sleep until after enough time has elapsed to prevent getting rate
# limited.
if should_sleep:
remaining = pd.Timestamp.utcnow() - start_time + self.wait_time
if remaining.value > 0:
sleep(remaining.value / 10**9)
def _maybe_update_symbol_frame(self,
start_time,
api_key,
cache,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries):
# Attempt to load pre-existing symbol data from cache.
key = '{sym}.{freq}.frame'.format(sym=symbol, freq=data_frequency)
try:
raw_data = cache[key]
except KeyError:
raw_data = None
# Select the most recent date in cached dataset if it exists,
# otherwise use the provided `start_session`.
last = start_session
if raw_data is not None and len(raw_data) > 0:
last = raw_data.index[-1].tz_localize('UTC')
should_sleep = False
# Determine time at which cached data will be considered stale.
cache_expiration = last + pd.Timedelta(days=2)
if start_time <= cache_expiration and raw_data is not None:
# Data is fresh enough to reuse, no need to update. Iterator can
# proceed to next symbol directly since no API call was required.
return raw_data, should_sleep
# If we arrive here, we must have attempted an API call.
# Setting this flag tells the iterator to pause before starting
# the next asset, that we don't exceed the data source's rate
# limit.
should_sleep = True
raw_data = self._fetch_symbol_frame(
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries=retries,
)
# Cache latest symbol data.
cache[key] = raw_data
return raw_data, should_sleep
def _fetch_symbol_frame(self,
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency,
retries=DEFAULT_RETRIES):
# Data for symbol is old enough to attempt an update or is not
# present in the cache. Fetch raw data for a single symbol
# with requested intervals and frequency. Retry as necessary.
for _ in range(retries):
try:
raw_data = self.fetch_raw_symbol_frame(
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency,
)
raw_data.index = pd.to_datetime(raw_data.index, utc=True)
raw_data.index = raw_data.index.tz_localize('UTC')
# Filter incoming data to fit start and end sessions.
raw_data = raw_data[
(raw_data.index >= start_session) &
(raw_data.index <= end_session)
]
# Filter out any duplicates entries, keep last one, since
# previous frame is probably an incomplete.
raw_data = raw_data[~raw_data.index.duplicated(keep='last')]
return raw_data
except Exception as e:
log.exception(
'Exception raised fetching {name} data. Retrying.'
.format(name=self.name)
)
else:
raise ValueError(
'Failed to download data for symbol {sym} '
'after {n} attempts.'.format(
sym=symbol,
n=retries,
)
)
def _dtypes_to_cols(dtypes):
return [name for name, _ in dtypes]
+80
View File
@@ -0,0 +1,80 @@
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from catalyst.data.bundles.base import BaseBundle
from catalyst.utils.memoize import lazyval
class BasePricingBundle(BaseBundle):
@lazyval
def md_dtypes(self):
return [
('symbol', 'object'),
('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('ac_date', 'datetime64[ns]'),
]
@lazyval
def dtypes(self):
return [
('date', 'datetime64[ns]'),
('open', 'float64'),
('high', 'float64'),
('low', 'float64'),
('close', 'float64'),
('volume', 'float64'),
]
class BaseCryptoPricingBundle(BasePricingBundle):
@lazyval
def calendar_name(self):
return 'OPEN'
@lazyval
def minutes_per_day(self):
return 1440
@lazyval
def five_minutes_per_day(self):
return 288
@property
def splits(self):
return []
@property
def dividends(self):
return []
class BaseEquityPricingBundle(BasePricingBundle):
@lazyval
def calendar_name(self):
return 'NYSE'
@lazyval
def minutes_per_day(self):
return 390
@lazyval
def five_minutes_per_day(self):
return 78
@property
def splits(self):
return self._splits
@property
def dividends(self):
return self._dividends
+84 -22
View File
@@ -17,6 +17,10 @@ from ..us_equity_pricing import (
SQLiteAdjustmentReader,
SQLiteAdjustmentWriter,
)
from ..five_minute_bars import (
BcolzFiveMinuteBarReader,
BcolzFiveMinuteBarWriter,
)
from ..minute_bars import (
BcolzMinuteBarReader,
BcolzMinuteBarWriter,
@@ -33,6 +37,7 @@ from catalyst.utils.input_validation import ensure_timestamp, optionally
import catalyst.utils.paths as pth
from catalyst.utils.preprocess import preprocess
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
ONE_MEGABYTE = 1024 * 1024
@@ -43,16 +48,21 @@ def asset_db_path(bundle_name, timestr, environ=None, db_version=None):
)
def minute_equity_path(bundle_name, timestr, environ=None):
def minute_path(bundle_name, timestr, environ=None):
return pth.data_path(
minute_equity_relative(bundle_name, timestr, environ),
minute_relative(bundle_name, timestr, environ),
environ=environ,
)
def daily_equity_path(bundle_name, timestr, environ=None):
def five_minute_path(bundle_name, timestr, environ=None):
return pth.data_path(
daily_equity_relative(bundle_name, timestr, environ),
five_minute_relative(bundle_name, timestr, environ),
environ=environ,
)
def daily_path(bundle_name, timestr, environ=None):
return pth.data_path(
daily_relative(bundle_name, timestr, environ),
environ=environ,
)
@@ -79,11 +89,13 @@ def cache_relative(bundle_name, timestr, environ=None):
return bundle_name, '.cache'
def daily_equity_relative(bundle_name, timestr, environ=None):
def daily_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'daily_equities.bcolz'
def five_minute_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'five_minute.bcolz'
def minute_equity_relative(bundle_name, timestr, environ=None):
def minute_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'minute_equities.bcolz'
@@ -158,7 +170,9 @@ def download_with_progress(url, chunk_size, **progress_kwargs):
total_size = int(resp.headers['content-length'])
data = BytesIO()
with click.progressbar(length=total_size, **progress_kwargs) as pbar:
progress_kwargs['length'] = total_size
with maybe_show_progress(None, True, **progress_kwargs) as pbar:
for chunk in resp.iter_content(chunk_size=chunk_size):
data.write(chunk)
pbar.update(len(chunk))
@@ -192,19 +206,20 @@ RegisteredBundle = namedtuple(
'start_session',
'end_session',
'minutes_per_day',
'five_minutes_per_day',
'ingest',
'create_writers']
)
BundleData = namedtuple(
'BundleData',
'asset_finder equity_minute_bar_reader equity_daily_bar_reader '
'asset_finder minute_bar_reader five_minute_bar_reader daily_bar_reader '
'adjustment_reader',
)
BundleCore = namedtuple(
'BundleCore',
'bundles register unregister ingest load clean',
'bundles register_bundle register unregister ingest load clean',
)
@@ -258,6 +273,8 @@ def _make_bundle_core():
-------
bundles : mappingproxy
The mapping of bundles to bundle payloads.
register_bundle : Bundle
A bundle instance to add to the ``bundles`` mapping.
register : callable
The function which registers new bundles in the ``bundles`` mapping.
unregister : callable
@@ -275,13 +292,31 @@ def _make_bundle_core():
# warn when trampling another bundle.
bundles = mappingproxy(_bundles)
def register_bundle(bundle_cls,
asset_filter=None,
start_session=None,
end_session=None,
create_writers=True):
bundle = bundle_cls(asset_filter=asset_filter)
return register(
bundle.name,
bundle.ingest,
calendar_name=bundle.calendar_name,
minutes_per_day=bundle.minutes_per_day,
five_minutes_per_day=bundle.five_minutes_per_day,
start_session=start_session,
end_session=end_session,
create_writers=create_writers,
)
@curry
def register(name,
f,
calendar_name='NYSE',
calendar_name='OPEN',
start_session=None,
end_session=None,
minutes_per_day=390,
minutes_per_day=1440,
five_minutes_per_day=288,
create_writers=True):
"""Register a data bundle ingest function.
@@ -362,6 +397,7 @@ def _make_bundle_core():
start_session=start_session,
end_session=end_session,
minutes_per_day=minutes_per_day,
five_minutes_per_day=five_minutes_per_day,
ingest=f,
create_writers=create_writers,
)
@@ -393,7 +429,8 @@ def _make_bundle_core():
environ=os.environ,
timestamp=None,
assets_versions=(),
show_progress=False):
show_progress=False,
is_compile=False):
"""Ingest data for a given bundle.
Parameters
@@ -443,7 +480,7 @@ def _make_bundle_core():
pth.data_path([], environ=environ))
)
daily_bars_path = wd.ensure_dir(
*daily_equity_relative(
*daily_relative(
name, timestr, environ=environ,
)
)
@@ -457,10 +494,20 @@ def _make_bundle_core():
# when we create the SQLiteAdjustmentWriter below. The
# SQLiteAdjustmentWriter needs to open the daily ctables so
# that it can compute the adjustment ratios for the dividends.
daily_bar_writer.write(())
five_minute_bar_writer = BcolzFiveMinuteBarWriter(
wd.ensure_dir(*five_minute_relative(
name, timestr, environ=environ)
),
calendar,
start_session,
end_session,
five_minutes_per_day=bundle.five_minutes_per_day,
)
minute_bar_writer = BcolzMinuteBarWriter(
wd.ensure_dir(*minute_equity_relative(
wd.ensure_dir(*minute_relative(
name, timestr, environ=environ)
),
calendar,
@@ -468,6 +515,7 @@ def _make_bundle_core():
end_session,
minutes_per_day=bundle.minutes_per_day,
)
assets_db_path = wd.getpath(*asset_db_relative(
name, timestr, environ=environ,
))
@@ -484,6 +532,7 @@ def _make_bundle_core():
)
else:
daily_bar_writer = None
five_minute_bar_writer = None
minute_bar_writer = None
asset_db_writer = None
adjustment_db_writer = None
@@ -495,6 +544,7 @@ def _make_bundle_core():
environ,
asset_db_writer,
minute_bar_writer,
five_minute_bar_writer,
daily_bar_writer,
adjustment_db_writer,
calendar,
@@ -502,6 +552,7 @@ def _make_bundle_core():
end_session,
cache,
show_progress,
is_compile,
pth.data_path([name, timestr], environ=environ),
)
@@ -577,11 +628,14 @@ def _make_bundle_core():
asset_finder=AssetFinder(
asset_db_path(name, timestr, environ=environ),
),
equity_minute_bar_reader=BcolzMinuteBarReader(
minute_equity_path(name, timestr, environ=environ),
minute_bar_reader=BcolzMinuteBarReader(
minute_path(name, timestr, environ=environ),
),
equity_daily_bar_reader=BcolzDailyBarReader(
daily_equity_path(name, timestr, environ=environ),
five_minute_bar_reader=BcolzFiveMinuteBarReader(
five_minute_path(name, timestr, environ=environ),
),
daily_bar_reader=BcolzDailyBarReader(
daily_path(name, timestr, environ=environ),
),
adjustment_reader=SQLiteAdjustmentReader(
adjustment_db_path(name, timestr, environ=environ),
@@ -670,7 +724,15 @@ def _make_bundle_core():
return cleaned
return BundleCore(bundles, register, unregister, ingest, load, clean)
return BundleCore(
bundles,
register_bundle,
register,
unregister,
ingest,
load,
clean,
)
bundles, register, unregister, ingest, load, clean = _make_bundle_core()
bundles, register_bundle, register, unregister, ingest, load, clean = _make_bundle_core()
+162 -33
View File
@@ -1,38 +1,167 @@
from io import BytesIO
import tarfile
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import core as bundles
from datetime import datetime
POLONIEX_BUNDLE_URL = (
'https://www.dropbox.com/s/9naqffawnq8o4r2/poloniex-bundle.tar?dl=1'
)
import pandas as pd
@bundles.register(
'poloniex',
create_writers=False,
calendar_name='OPEN',
minutes_per_day=1440)
def quantopian_quandl_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
if show_progress:
data = bundles.download_with_progress(
POLONIEX_BUNDLE_URL,
chunk_size=bundles.ONE_MEGABYTE,
label="Downloading Bundle: poloniex",
from six.moves.urllib.parse import urlencode
from catalyst.data.bundles.core import register_bundle
from catalyst.data.bundles.base_pricing import BaseCryptoPricingBundle
from catalyst.utils.memoize import lazyval
class PoloniexBundle(BaseCryptoPricingBundle):
@lazyval
def name(self):
return 'poloniex'
@lazyval
def exchange(self):
return 'POLO'
@lazyval
def frequencies(self):
return set((
'daily',
#'5-minute',
))
@lazyval
def tar_url(self):
return (
'https://www.dropbox.com/s/9naqffawnq8o4r2/'
'poloniex-bundle.tar?dl=1'
)
else:
data = bundles.download_without_progress(POLONIEX_BUNDLE_URL)
with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print("Writing data to %s." % output_dir)
tar.extractall(output_dir)
@lazyval
def wait_time(self):
return pd.Timedelta(milliseconds=170)
def fetch_raw_metadata_frame(self, api_key, page_number):
if page_number > 1:
return pd.DataFrame([])
raw = pd.read_json(
self._format_metadata_url(
api_key,
page_number,
),
orient='index',
)
raw = raw.sort_index().reset_index()
raw.rename(
columns={'index':'symbol'},
inplace=True,
)
raw = raw[raw['isFrozen'] == 0]
return raw
def post_process_symbol_metadata(self, asset_id, sym_md, sym_data):
start_date = sym_data.index[0]
end_date = sym_data.index[-1]
ac_date = end_date + pd.Timedelta(days=1)
return (
sym_md.symbol,
start_date,
end_date,
ac_date,
)
def fetch_raw_symbol_frame(self,
api_key,
symbol,
calendar,
start_date,
end_date,
frequency):
raw = pd.read_json(
self._format_data_url(
api_key,
symbol,
start_date,
end_date,
frequency,
),
orient='records',
)
raw.set_index('date', inplace=True)
scale = 1
raw.loc[:, 'open'] /= scale
raw.loc[:, 'high'] /= scale
raw.loc[:, 'low'] /= scale
raw.loc[:, 'close'] /= scale
raw.loc[:, 'volume'] *= scale
return raw
'''
HELPER METHODS
'''
def _format_metadata_url(self, api_key, page_number):
query_params = [
('command', 'returnTicker'),
]
return self._format_polo_query(query_params)
def _format_data_url(self,
api_key,
symbol,
start_date,
end_date,
data_frequency):
period_map = {
'daily': 86400,
# '5-minute': 300,
}
try:
period = period_map[data_frequency]
except KeyError:
return None
query_params = [
('command', 'returnChartData'),
('currencyPair', symbol),
('start', start_date.value / 10**9),
('end', end_date.value / 10**9),
('period', period),
]
return self._format_polo_query(query_params)
def _format_polo_query(self, query_params):
return 'https://poloniex.com/public?{query}'.format(
query=urlencode(query_params),
)
'''
As a second parameter, you can pass an array of currency pairs
that will be processed as an asset_filter to only process that
subset of assets in the bundle, such as:
register_bundle(PoloniexBundle, ['USDT_BTC',])
For a production environment make sure to use (to bundle all pairs):
register_bundle(PoloniexBundle)
'''
register_bundle(PoloniexBundle)
+182 -317
View File
@@ -1,3 +1,28 @@
#
# Copyright 2017 Enigma MPC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import pandas as pd
from six.moves.urllib.parse import urlencode
from catalyst.data.bundles.core import register_bundle
from catalyst.data.bundles.base_pricing import BaseEquityPricingBundle
from catalyst.utils.memoize import lazyval
"""
Module for building a complete daily dataset from Quandl's WIKI dataset.
"""
@@ -17,350 +42,190 @@ from . import core as bundles
log = Logger(__name__)
seconds_per_call = (pd.Timedelta('10 minutes') / 2000).total_seconds()
# Invalid symbols that quandl has had in its metadata:
excluded_symbols = frozenset({'TEST123456789'})
class QuandlBundle(BaseEquityPricingBundle):
@lazyval
def name(self):
return 'quandl'
def _fetch_raw_metadata(api_key, cache, retries, environ):
"""Generator that yields each page of data from the metadata endpoint
as a dataframe.
"""
for page_number in count(1):
key = 'metadata-page-%d' % page_number
try:
raw = cache[key]
except KeyError:
for _ in range(retries):
try:
raw = pd.read_csv(
format_metadata_url(api_key, page_number),
date_parser=pd.tseries.tools.to_datetime,
parse_dates=[
'oldest_available_date',
'newest_available_date',
],
dtypes={
'dataset_code': 'int',
'name': 'str',
'oldest_available_date': 'str',
'newest_available_date': 'str',
},
usecols=[
'dataset_code',
'name',
'oldest_available_date',
'newest_available_date',
],
)
break
except ValueError:
# when we are past the last page we will get a value
# error because there will be no columns
raw = pd.DataFrame([])
break
except Exception:
pass
else:
raise ValueError(
'Failed to download metadata page %d after %d'
' attempts.' % (page_number, retries),
)
@lazyval
def exchange(self):
return 'QUANDL'
cache[key] = raw
@lazyval
def frequencies(self):
return set(('daily',))
if raw.empty:
# use the empty dataframe to signal completion
break
yield raw
@lazyval
def tar_url(self):
return 'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl'
@lazyval
def wait_time(self):
return pd.Timedelta(milliseconds=300)
def fetch_symbol_metadata_frame(api_key,
cache,
retries=5,
environ=None,
show_progress=False):
"""
Download Quandl symbol metadata.
@lazyval
def _excluded_symbols(self):
"""
Invalid symbols that quandl has had in its metadata:
"""
return frozenset({'TEST123456789'})
Parameters
----------
api_key : str
The quandl api key to use. If this is None then no api key will be
sent.
cache : DataFrameCache
The cache to use for persisting the intermediate data.
retries : int, optional
The number of times to retry each request before failing.
environ : mapping[str -> str], optional
The environment to use to find the catalyst home. By default this
is ``os.environ``.
show_progress : bool, optional
Show a progress bar for the download of this data.
Returns
-------
metadata_frame : pd.DataFrame
A dataframe with the following columns:
symbol: the asset's symbol
name: the full name of the asset
start_date: the first date of data for this asset
end_date: the last date of data for this asset
auto_close_date: end_date + one day
exchange: the exchange for the asset; this is always 'quandl'
The index of the dataframe will be used for symbol->sid mappings but
otherwise does not have specific meaning.
"""
raw_iter = _fetch_raw_metadata(api_key, cache, retries, environ)
def item_show_func(_, _it=iter(count())):
'Downloading page: %d' % next(_it)
with maybe_show_progress(raw_iter,
show_progress,
item_show_func=item_show_func,
label='Downloading WIKI metadata: ') as blocks:
data = pd.concat(blocks, ignore_index=True).rename(columns={
'dataset_code': 'symbol',
'name': 'asset_name',
'oldest_available_date': 'start_date',
'newest_available_date': 'end_date',
}).sort_values('symbol')
data = data[~data.symbol.isin(excluded_symbols)]
# cut out all the other stuff in the name column
# we need to escape the paren because it is actually splitting on a regex
data.asset_name = data.asset_name.str.split(r' \(', 1).str.get(0)
data['exchange'] = 'QUANDL'
data['start_date'] = data['start_date'].astype(datetime)
data['end_date'] = data['end_date'].astype(datetime)
data['auto_close_date'] = data['end_date'] + pd.Timedelta(days=1)
return data
def format_metadata_url(api_key, page_number):
"""Build the query RL for the quandl WIKI metadata.
"""
query_params = [
('per_page', '100'),
('sort_by', 'id'),
('page', str(page_number)),
('database_code', 'WIKI'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
return (
'https://www.quandl.com/api/v3/datasets.csv?' + urlencode(query_params)
)
def format_wiki_url(api_key, symbol, start_date, end_date):
"""
Build a query URL for a quandl WIKI dataset.
"""
query_params = [
('start_date', start_date.strftime('%Y-%m-%d')),
('end_date', end_date.strftime('%Y-%m-%d')),
('order', 'asc'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
return (
"https://www.quandl.com/api/v3/datasets/WIKI/"
"{symbol}.csv?{query}".format(
symbol=symbol,
query=urlencode(query_params),
)
)
def fetch_single_equity(api_key,
symbol,
start_date,
end_date,
retries=5):
"""
Download data for a single equity.
"""
for _ in range(retries):
try:
return pd.read_csv(
format_wiki_url(api_key, symbol, start_date, end_date),
parse_dates=['Date'],
index_col='Date',
usecols=[
'Open',
'High',
'Low',
'Close',
'Volume',
'Date',
'Ex-Dividend',
'Split Ratio',
],
na_values=['NA'],
).rename(columns={
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
'Date': 'date',
'Ex-Dividend': 'ex_dividend',
'Split Ratio': 'split_ratio',
})
except Exception:
log.exception("Exception raised reading Quandl data. Retrying.")
else:
raise ValueError(
"Failed to download data for %r after %d attempts." % (
symbol, retries
)
def fetch_raw_metadata_frame(self, api_key, page_number):
raw = pd.read_csv(
self._format_metadata_url(api_key, page_number),
date_parser=pd.tseries.tools.to_datetime,
parse_dates=[
'oldest_available_date',
'newest_available_date',
],
dtype={
'dataset_code': 'str',
'name': 'str',
'oldest_available_date': 'str',
'newest_available_date': 'str',
},
usecols=[
'dataset_code',
'name',
'oldest_available_date',
'newest_available_date',
],
).rename(
columns={
'dataset_code': 'symbol',
'name': 'asset_name',
'oldest_available_date': 'start_date',
'newest_available_date': 'end_date',
},
)
raw['start_date'] = raw['start_date'].astype(datetime)
raw['end_date'] = raw['end_date'].astype(datetime)
raw['ac_date'] = raw['end_date'] + pd.Timedelta(days=1)
def _update_splits(splits, asset_id, raw_data):
split_ratios = raw_data.split_ratio
df = pd.DataFrame({'ratio': 1 / split_ratios[split_ratios != 1]})
df.index.name = 'effective_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
splits.append(df)
# Filter out invalid symbols
raw = raw[~raw.symbol.isin(self._excluded_symbols)]
# cut out all the other stuff in the name column
# we need to escape the paren because it is actually splitting on a regex
raw.asset_name = raw.asset_name.str.split(r' \(', 1).str.get(0)
def _update_dividends(dividends, asset_id, raw_data):
divs = raw_data.ex_dividend
df = pd.DataFrame({'amount': divs[divs != 0]})
df.index.name = 'ex_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
# we do not have this data in the WIKI dataset
df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT
dividends.append(df)
return raw
def gen_symbol_data(api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
splits,
dividends,
retries):
for asset_id, symbol in symbol_map.iteritems():
start_time = time()
try:
# see if we have this data cached.
raw_data = cache[symbol]
should_sleep = False
except KeyError:
# we need to fetch the data and then write it to our cache
raw_data = cache[symbol] = fetch_single_equity(
def fetch_raw_symbol_frame(self,
api_key,
symbol,
calendar,
start_session,
end_session,
data_frequency):
raw_data = pd.read_csv(
self._format_wiki_url(
api_key,
symbol,
start_date=start_session,
end_date=end_session,
)
should_sleep = True
_update_splits(splits, asset_id, raw_data)
_update_dividends(dividends, asset_id, raw_data)
start_session,
end_session,
data_frequency,
),
parse_dates=['Date'],
index_col='Date',
usecols=[
'Open',
'High',
'Low',
'Close',
'Volume',
'Date',
'Ex-Dividend',
'Split Ratio',
],
na_values=['NA'],
).rename(columns={
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
'Date': 'date',
'Ex-Dividend': 'ex_dividend',
'Split Ratio': 'split_ratio',
})
sessions = calendar.sessions_in_range(start_session, end_session)
raw_data = raw_data.reindex(
return raw_data.reindex(
sessions.tz_localize(None),
copy=False,
).fillna(0.0)
yield asset_id, raw_data
if should_sleep:
remaining = seconds_per_call - time() - start_time
if remaining > 0:
sleep(remaining)
def post_process_symbol_metadata(self, asset_id, sym_md, sym_data):
self._update_splits(asset_id, sym_data)
self._update_dividends(asset_id, sym_data)
return sym_md
def _update_splits(self, asset_id, raw_data):
split_ratios = raw_data.split_ratio
df = pd.DataFrame({'ratio': 1 / split_ratios[split_ratios != 1]})
df.index.name = 'effective_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
self.splits.append(df)
@bundles.register('quandl')
def quandl_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
"""Build a catalyst data bundle from the Quandl WIKI dataset.
"""
api_key = environ.get('QUANDL_API_KEY')
metadata = fetch_symbol_metadata_frame(
api_key,
cache=cache,
show_progress=show_progress,
)
symbol_map = metadata.symbol
# data we will collect in `gen_symbol_data`
splits = []
dividends = []
asset_db_writer.write(metadata)
daily_bar_writer.write(
gen_symbol_data(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
splits,
dividends,
environ.get('QUANDL_DOWNLOAD_ATTEMPTS', 5),
),
assets=metadata.index,
show_progress=show_progress,
)
adjustment_writer.write(
splits=pd.concat(splits, ignore_index=True),
dividends=pd.concat(dividends, ignore_index=True),
)
def _update_dividends(self, asset_id, raw_data):
divs = raw_data.ex_dividend
df = pd.DataFrame({'amount': divs[divs != 0]})
df.index.name = 'ex_date'
df.reset_index(inplace=True)
df['sid'] = asset_id
# we do not have this data in the WIKI dataset
df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT
self.dividends.append(df)
QUANTOPIAN_QUANDL_URL = (
'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl'
)
def _format_metadata_url(self, api_key, page_number):
"""Build the query RL for the quandl WIKI metadata.
"""
query_params = [
('per_page', '100'),
('sort_by', 'id'),
('page', str(page_number)),
('database_code', 'WIKI'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
@bundles.register('quantopian-quandl', create_writers=False)
def quantopian_quandl_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
if show_progress:
data = bundles.download_with_progress(
QUANTOPIAN_QUANDL_URL,
chunk_size=bundles.ONE_MEGABYTE,
label="Downloading Bundle: quantopian-quandl",
return (
'https://www.quandl.com/api/v3/datasets.csv?' + urlencode(query_params)
)
else:
data = bundles.download_without_progress(QUANTOPIAN_QUANDL_URL)
with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print("Writing data to %s." % output_dir)
tar.extractall(output_dir)
register_calendar_alias("QUANDL", "NYSE")
def _format_wiki_url(self,
api_key,
symbol,
start_date,
end_date,
data_frequency):
"""
Build a query URL for a quandl WIKI dataset.
"""
query_params = [
('start_date', start_date.strftime('%Y-%m-%d')),
('end_date', end_date.strftime('%Y-%m-%d')),
('order', 'asc'),
]
if api_key is not None:
query_params = [('api_key', api_key)] + query_params
return (
"https://www.quandl.com/api/v3/datasets/WIKI/"
"{symbol}.csv?{query}".format(
symbol=symbol,
query=urlencode(query_params),
)
)
register_calendar_alias('QUANDL', 'NYSE')
register_bundle(QuandlBundle)
+77 -22
View File
@@ -42,6 +42,7 @@ from catalyst.assets.roll_finder import (
)
from catalyst.data.dispatch_bar_reader import (
AssetDispatchMinuteBarReader,
AssetDispatchFiveMinuteBarReader,
AssetDispatchSessionBarReader
)
from catalyst.data.resample import (
@@ -114,12 +115,16 @@ class DataPortal(object):
The calendar instance used to provide minute->session information.
first_trading_day : pd.Timestamp
The first trading day for the simulation.
equity_daily_reader : BcolzDailyBarReader, optional
daily_reader : BcolzDailyBarReader, optional
The daily bar reader for equities. This will be used to service
daily data backtests or daily history calls in a minute backetest.
If a daily bar reader is not provided but a minute bar reader is,
the minutes will be rolled up to serve the daily requests.
equity_minute_reader : BcolzMinuteBarReader, optional
five_minute_reader : BcolzFiveMinuteBarReader, optional
The five minute bar reader for equities. This will be used to service
5-minute data backtests or five-minute history calls. This can be used
to serve daily calls if no daily bar reader is provided.
minute_reader : BcolzMinuteBarReader, optional
The minute bar reader for equities. This will be used to service
minute data backtests or minute history calls. This can be used
to serve daily calls if no daily bar reader is provided.
@@ -144,8 +149,9 @@ class DataPortal(object):
asset_finder,
trading_calendar,
first_trading_day,
equity_daily_reader=None,
equity_minute_reader=None,
daily_reader=None,
five_minute_reader=None,
minute_reader=None,
future_daily_reader=None,
future_minute_reader=None,
adjustment_reader=None,
@@ -180,7 +186,7 @@ class DataPortal(object):
# Infer the last session from the provided readers.
last_sessions = [
reader.last_available_dt
for reader in [equity_daily_reader, future_daily_reader]
for reader in [daily_reader, future_daily_reader]
if reader is not None
]
if last_sessions:
@@ -194,7 +200,11 @@ class DataPortal(object):
# Infer the last minute from the provided readers.
last_minutes = [
reader.last_available_dt
for reader in [equity_minute_reader, future_minute_reader]
for reader in [
minute_reader,
five_minute_reader,
future_minute_reader,
]
if reader is not None
]
if last_minutes:
@@ -202,10 +212,12 @@ class DataPortal(object):
else:
self._last_available_minute = None
aligned_equity_minute_reader = self._ensure_reader_aligned(
equity_minute_reader)
aligned_equity_session_reader = self._ensure_reader_aligned(
equity_daily_reader)
aligned_minute_reader = self._ensure_reader_aligned(
minute_reader)
aligned_five_minute_reader = self._ensure_reader_aligned(
five_minute_reader)
aligned_session_reader = self._ensure_reader_aligned(
daily_reader)
aligned_future_minute_reader = self._ensure_reader_aligned(
future_minute_reader)
aligned_future_session_reader = self._ensure_reader_aligned(
@@ -217,12 +229,15 @@ class DataPortal(object):
}
aligned_minute_readers = {}
aligned_five_minute_readers = {}
aligned_session_readers = {}
if aligned_equity_minute_reader is not None:
aligned_minute_readers[Equity] = aligned_equity_minute_reader
if aligned_equity_session_reader is not None:
aligned_session_readers[Equity] = aligned_equity_session_reader
if aligned_minute_reader is not None:
aligned_minute_readers[Equity] = aligned_minute_reader
if aligned_five_minute_reader is not None:
aligned_five_minute_readers[Equity] = aligned_five_minute_reader
if aligned_session_reader is not None:
aligned_session_readers[Equity] = aligned_session_reader
if aligned_future_minute_reader is not None:
aligned_minute_readers[Future] = aligned_future_minute_reader
@@ -252,6 +267,13 @@ class DataPortal(object):
self._last_available_minute,
)
_dispatch_five_minute_reader = AssetDispatchFiveMinuteBarReader(
self.trading_calendar,
self.asset_finder,
aligned_five_minute_readers,
self._last_available_minute,
)
_dispatch_session_reader = AssetDispatchSessionBarReader(
self.trading_calendar,
self.asset_finder,
@@ -261,6 +283,7 @@ class DataPortal(object):
self._pricing_readers = {
'minute': _dispatch_minute_reader,
'5-minute': _dispatch_five_minute_reader,
'daily': _dispatch_session_reader,
}
@@ -514,15 +537,17 @@ class DataPortal(object):
)
else:
if field == "last_traded":
return self.get_last_traded_dt(asset, dt, 'minute')
return self.get_last_traded_dt(asset, dt, data_frequency)
elif field == "price":
return self._get_minute_spot_value(
asset, "close", dt, ffill=True,
return self._get_minutely_spot_value(
asset, "close", dt, data_frequency, ffill=True,
)
elif field == "contract":
return self._get_current_contract(asset, dt)
else:
return self._get_minute_spot_value(asset, field, dt)
return self._get_minutely_spot_value(
asset, field, dt, data_frequency,
)
if assets_is_scalar:
return get_single_asset_value(assets)
@@ -648,8 +673,14 @@ class DataPortal(object):
return spot_value
def _get_minute_spot_value(self, asset, column, dt, ffill=False):
reader = self._get_pricing_reader('minute')
def _get_minutely_spot_value(self,
asset,
column,
dt,
data_frequency,
ffill=False):
reader = self._get_pricing_reader(data_frequency)
if ffill:
# If forward filling, we want the last minute with values (up to
@@ -680,8 +711,32 @@ class DataPortal(object):
# the value we found came from a different day, so we have to adjust
# the data if there are any adjustments on that day barrier
return self.get_adjusted_value(
asset, column, query_dt,
dt, "minute", spot_value=result
asset,
column,
query_dt,
dt,
data_frequency,
spot_value=result
)
def _get_five_minute_spot_value(self, asset, column, dt, ffill=False):
return self._get_minutely_spot_value(
asset,
column,
dt,
ffill,
'5-minute',
)
def _get_minute_spot_value(self, asset, column, dt, ffill=False):
return self._get_minutely_spot_value(
asset,
column,
dt,
ffill,
'minute',
)
def _get_daily_spot_value(self, asset, column, dt):
+5 -1
View File
@@ -130,13 +130,17 @@ class AssetDispatchBarReader(with_metaclass(ABCMeta)):
return results
class AssetDispatchMinuteBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.minutes_in_range(start_dt, end_dt))
class AssetDispatchFiveMinuteBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
return len(self.trading_calendar.five_minutes_in_range(start_dt, end_dt))
class AssetDispatchSessionBarReader(AssetDispatchBarReader):
def _dt_window_size(self, start_dt, end_dt):
+96 -74
View File
@@ -24,6 +24,10 @@ from bcolz import ctable
from intervaltree import IntervalTree
import logbook
import numpy as np
from numpy import (
iinfo,
uint64,
)
import pandas as pd
from pandas import HDFStore
import tables
@@ -31,31 +35,39 @@ from six import with_metaclass
from toolz import keymap, valmap
from catalyst.data._minute_bar_internal import (
minute_value,
find_position_of_minute,
find_last_traded_position_internal
five_minute_value,
find_position_of_five_minute,
find_last_traded_five_minute_position_internal,
)
from catalyst.gens.sim_engine import NANOS_IN_MINUTE
from catalyst.data.bar_reader import BarReader, NoDataOnDate
from catalyst.data.us_equity_pricing import check_uint32_safe
from catalyst.data.us_equity_pricing import (
winsorise_uint64,
check_uint64_safe,
)
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.cli import (
item_show_count,
maybe_show_progress,
)
from catalyst.utils.memoize import lazyval
logger = logbook.Logger('FiveMinuteBars')
CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY = 288
US_EQUITIES_MINUTES_PER_DAY = 390
FUTURES_MINUTES_PER_DAY = 1440
OPEN_FIVE_MINUTES_PER_DAY = 288
DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15
DEFAULT_EXPECTED_CRYPTO_LEN = CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY * 366 * 15
DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15
OHLC_RATIO = 1000
OHLC_RATIO = 1000000
OHLC = frozenset(['open', 'high', 'low', 'close'])
OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume'])
UINT64_MAX = iinfo(uint64).max
NANOS_IN_FIVE_MINUTES = 5 * NANOS_IN_MINUTE
class BcolzFiveMinuteOverlappingData(Exception):
pass
@@ -68,13 +80,13 @@ class BcolzFiveMinuteWriterColumnMismatch(Exception):
class FiveMinuteBarReader(BarReader):
@property
def data_frequency(self):
return "five-minute"
return "5-minute"
def _calc_five_minute_index(market_opens, five_minutes_per_day):
five_minutes = np.zeros(len(market_opens) * five_minutes_per_day,
dtype='datetime64[ns]')
deltas = np.arange(0, five_minutes_per_day, dtype='timedelta64[m]')
deltas = 5 * np.arange(0, five_minutes_per_day, dtype='timedelta64[m]')
for i, market_open in enumerate(market_opens):
start = market_open.asm8
five_minute_values = start + deltas
@@ -116,19 +128,19 @@ def _sid_subdir_path(sid):
def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
"""Adapt OHLCV columns into uint32 columns.
"""Adapt OHLCV columns into uint64 columns.
Parameters
----------
cols : dict
A dict mapping each column name (open, high, low, close, volume)
to a float column to convert to uint32.
to a float column to convert to uint64.
scale_factor : int
Factor to use to scale float values before converting to uint32.
Factor to use to scale float values before converting to uint64.
sid : int
Sid of the relevant asset, for logging.
invalid_data_behavior : str
Specifies behavior when data cannot be converted to uint32.
Specifies behavior when data cannot be converted to uint64.
If 'raise', raises an exception.
If 'warn', logs a warning and filters out incompatible values.
If 'ignore', silently filters out incompatible values.
@@ -137,6 +149,7 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
scaled_highs = np.nan_to_num(cols['high']) * scale_factor
scaled_lows = np.nan_to_num(cols['low']) * scale_factor
scaled_closes = np.nan_to_num(cols['close']) * scale_factor
volumes = np.nan_to_num(cols['volume'])
exclude_mask = np.zeros_like(scaled_opens, dtype=bool)
@@ -145,11 +158,12 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
('high', scaled_highs),
('low', scaled_lows),
('close', scaled_closes),
('volume', volumes),
]:
max_val = scaled_col.max()
try:
check_uint32_safe(max_val, col_name)
check_uint64_safe(max_val, col_name)
except ValueError:
if invalid_data_behavior == 'raise':
raise
@@ -157,20 +171,20 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
if invalid_data_behavior == 'warn':
logger.warn(
'Values for sid={}, col={} contain some too large for '
'uint32 (max={}), filtering them out',
'uint64 (max={}), filtering them out',
sid, col_name, max_val,
)
# We want to exclude all rows that have an unsafe value in
# this column.
exclude_mask &= (scaled_col >= np.iinfo(np.uint32).max)
exclude_mask &= (scaled_col >= iinfo(uint64).max)
# Convert all cols to uint32.
opens = scaled_opens.astype(np.uint32)
highs = scaled_highs.astype(np.uint32)
lows = scaled_lows.astype(np.uint32)
closes = scaled_closes.astype(np.uint32)
volumes = cols['volume'].astype(np.uint32)
# Convert all cols to uint64.
opens = scaled_opens.astype(uint64)
highs = scaled_highs.astype(uint64)
lows = scaled_lows.astype(uint64)
closes = scaled_closes.astype(uint64)
volumes = volumes.astype(uint64)
# Exclude rows with unsafe values by setting to zero.
opens[exclude_mask] = 0
@@ -200,7 +214,7 @@ class BcolzFiveMinuteBarMetadata(object):
"""
FORMAT_VERSION = 3
METADATA_FILENAME = 'metadata.json'
METADATA_FILENAME = 'five-minute-metadata.json'
@classmethod
def metadata_path(cls, rootdir):
@@ -257,7 +271,7 @@ class BcolzFiveMinuteBarMetadata(object):
calendar,
start_session,
end_session,
minutes_per_day,
five_minutes_per_day,
version=version,
)
@@ -290,7 +304,7 @@ class BcolzFiveMinuteBarMetadata(object):
ohlc_ratio : int
The default ratio by which to multiply the pricing data to
convert the floats from floats to an integer to fit within
the np.uint32. If ohlc_ratios_per_sid is None or does not
the np.uint64. If ohlc_ratios_per_sid is None or does not
contain a mapping for a given sid, this ratio is used.
ohlc_ratios_per_sid : dict
A dict mapping each sid in the output to the factor by
@@ -334,7 +348,7 @@ class BcolzFiveMinuteBarMetadata(object):
'version': self.version,
'ohlc_ratio': self.default_ohlc_ratio,
'ohlc_ratios_per_sid': self.ohlc_ratios_per_sid,
'minutes_per_day': self.five_minutes_per_day,
'five_minutes_per_day': self.five_minutes_per_day,
'calendar_name': self.calendar.name,
'start_session': str(self.start_session.date()),
'end_session': str(self.end_session.date()),
@@ -374,13 +388,13 @@ class BcolzFiveMinuteBarWriter(object):
The last trading session in the data set.
default_ohlc_ratio : int, optional
The default ratio by which to multiply the pricing data to
convert from floats to integers that fit within np.uint32. If
convert from floats to integers that fit within np.uint64. If
ohlc_ratios_per_sid is None or does not contain a mapping for a
given sid, this ratio is used. Default is OHLC_RATIO (1000).
ohlc_ratios_per_sid : dict, optional
A dict mapping each sid in the output to the ratio by which to
multiply the pricing data to convert the floats from floats to
an integer to fit within the np.uint32.
an integer to fit within the np.uint64.
expectedlen : int, optional
The expected length of the dataset, used when creating the initial
bcolz ctable.
@@ -405,9 +419,9 @@ class BcolzFiveMinuteBarWriter(object):
The open, high, low, and close columns are integers which are 1000 times
the quoted price, so that the data can represented and stored as an
np.uint32, supporting market prices quoted up to the thousands place.
np.uint64, supporting market prices quoted up to the thousands place.
volume is a np.uint32 with no mutation of the tens place.
volume is a np.uint64 with no mutation of the tens place.
The 'index' for each individual asset are a repeating period of minutes of
length `minutes_per_day` starting from each market open.
@@ -450,7 +464,7 @@ class BcolzFiveMinuteBarWriter(object):
five_minutes_per_day,
default_ohlc_ratio=OHLC_RATIO,
ohlc_ratios_per_sid=None,
expectedlen=DEFAULT_EXPECTED_CRYPTO_LEN,
expectedlen=DEFAULT_EXPECTEDLEN_CRYPTO,
write_metadata=True):
self._rootdir = rootdir
@@ -466,11 +480,11 @@ class BcolzFiveMinuteBarWriter(object):
self._default_ohlc_ratio = default_ohlc_ratio
self._ohlc_ratios_per_sid = ohlc_ratios_per_sid
self._minute_index = _calc_minute_index(
self._schedule.market_open, self._minutes_per_day)
self._five_minute_index = _calc_five_minute_index(
self._schedule.market_open, self._five_minutes_per_day)
if write_metadata:
metadata = BcolzMinuteBarMetadata(
metadata = BcolzFiveMinuteBarMetadata(
self._default_ohlc_ratio,
self._ohlc_ratios_per_sid,
self._calendar,
@@ -575,7 +589,7 @@ class BcolzFiveMinuteBarWriter(object):
if not os.path.exists(sid_containing_dirname):
# Other sids may have already created the containing directory.
os.makedirs(sid_containing_dirname)
initial_array = np.empty(0, np.uint32)
initial_array = np.empty(0, np.uint64)
table = ctable(
rootdir=path,
columns=[
@@ -612,7 +626,7 @@ class BcolzFiveMinuteBarWriter(object):
five_minute_offset = len(table) % self._five_minutes_per_day
num_to_prepend = numdays * self._five_minutes_per_day - five_minute_offset
prepend_array = np.zeros(num_to_prepend, np.uint32)
prepend_array = np.zeros(num_to_prepend, np.uint64)
# Fill all OHLCV with zeros.
table.append([prepend_array] * 5)
table.flush()
@@ -667,7 +681,11 @@ class BcolzFiveMinuteBarWriter(object):
for k, v in kwargs.items():
table.attrs[k] = v
def write(self, data, show_progress=False, invalid_data_behavior='warn'):
def write(self,
data,
length=None,
show_progress=False,
invalid_data_behavior='warn'):
"""Write a stream of minute data.
Parameters
@@ -687,14 +705,15 @@ class BcolzFiveMinuteBarWriter(object):
show_progress : bool, optional
Whether or not to show a progress bar while writing.
"""
ctx = maybe_show_progress(
with maybe_show_progress(
data,
length=length,
show_percent=False,
show_progress=show_progress,
item_show_func=lambda e: e if e is None else str(e[0]),
label="Merging minute equity files:",
)
write_sid = self.write_sid
with ctx as it:
item_show_func=item_show_count(length),
label='Compiling five-minute data',
) as it:
write_sid = self.write_sid
for e in it:
write_sid(*e, invalid_data_behavior=invalid_data_behavior)
@@ -796,10 +815,13 @@ class BcolzFiveMinuteBarWriter(object):
# Get the number of minutes already recorded in this sid's ctable
num_rec_mins = table.size
all_minutes = self._minute_index
all_minutes = self._five_minute_index
# Get the latest minute we wish to write to the ctable
last_minute_to_write = pd.Timestamp(dts[-1], tz='UTC')
#print 'all_minutes[-1]:', all_minutes[num_rec_mins-1]
#print 'last_minute_to_write:', last_minute_to_write
# In the event that we've already written some minutely data to the
# ctable, guard against overwriting that data.
if num_rec_mins > 0:
@@ -817,11 +839,11 @@ class BcolzFiveMinuteBarWriter(object):
minutes_count = all_minutes_in_window.size
open_col = np.zeros(minutes_count, dtype=np.uint32)
high_col = np.zeros(minutes_count, dtype=np.uint32)
low_col = np.zeros(minutes_count, dtype=np.uint32)
close_col = np.zeros(minutes_count, dtype=np.uint32)
vol_col = np.zeros(minutes_count, dtype=np.uint32)
open_col = np.zeros(minutes_count, dtype=uint64)
high_col = np.zeros(minutes_count, dtype=uint64)
low_col = np.zeros(minutes_count, dtype=uint64)
close_col = np.zeros(minutes_count, dtype=uint64)
vol_col = np.zeros(minutes_count, dtype=uint64)
dt_ixs = np.searchsorted(all_minutes_in_window.values,
dts.astype('datetime64[ns]'))
@@ -853,7 +875,7 @@ class BcolzFiveMinuteBarWriter(object):
day_ix = self._session_labels.get_loc(day)
# Add one to the 0-indexed day_ix to get the number of days.
num_days = day_ix + 1
return num_days * self._minutes_per_day
return num_days * self._five_minutes_per_day
def truncate(self, date):
"""Truncate data beyond this date in all ctables."""
@@ -991,7 +1013,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
market_closes = self._market_closes.values.astype('datetime64[m]')
minutes_per_day = (market_closes - market_opens).astype(np.int64) / 5
early_indices = np.where(
minutes_per_day != self._minutes_per_day - 1)[0]
minutes_per_day != self._five_minutes_per_day - 1)[0]
early_opens = self._market_opens[early_indices]
early_closes = self._market_closes[early_indices]
minutes = [(market_open, early_close)
@@ -1019,9 +1041,9 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
"""
itree = IntervalTree()
for market_open, early_close in self._minutes_to_exclude():
start_pos = self._find_position_of_minute(early_close) + 1
start_pos = self._find_position_of_five_minute(early_close) + 1
end_pos = (
self._find_position_of_minute(market_open)
self._find_position_of_five_minute(market_open)
+
self._five_minutes_per_day
-
@@ -1110,7 +1132,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
minute_pos = self._last_get_value_dt_position
else:
try:
minute_pos = self._find_position_of_minute(dt)
minute_pos = self._find_position_of_five_minute(dt)
except ValueError:
raise NoDataOnDate()
@@ -1132,15 +1154,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
return value
def get_last_traded_dt(self, asset, dt):
minute_pos = self._find_last_traded_position(asset, dt)
minute_pos = self._find_last_traded_five_minute_position(asset, dt)
if minute_pos == -1:
return pd.NaT
return self._pos_to_minute(minute_pos)
def _find_last_traded_position(self, asset, dt):
def _find_last_traded_five_minute_position(self, asset, dt):
volumes = self._open_minute_file('volume', asset)
start_date_minute = asset.start_date.value / NANOS_IN_MINUTE
dt_minute = dt.value / NANOS_IN_MINUTE
start_date_minute = asset.start_date.value / NANOS_IN_FIVE_MINUTE
dt_minute = dt.value / NANOS_IN_FIVE_MINUTE
try:
# if we know of a dt before which this asset has no volume,
@@ -1152,13 +1174,13 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
if dt_minute < earliest_dt_to_search:
return -1
pos = find_last_traded_position_internal(
pos = find_last_traded_five_minute_position_internal(
self._market_open_values,
self._market_close_values,
dt_minute,
earliest_dt_to_search,
volumes,
self._minutes_per_day,
self._five_minutes_per_day,
)
if pos == -1:
@@ -1175,15 +1197,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
return pos
def _pos_to_minute(self, pos):
minute_epoch = minute_value(
minute_epoch = five_minute_value(
self._market_open_values,
pos,
self._minutes_per_day
self._five_minutes_per_day
)
return pd.Timestamp(minute_epoch, tz='UTC', unit="m")
def _find_position_of_minute(self, minute_dt):
def _find_position_of_five_minute(self, minute_dt):
"""
Internal method that returns the position of the given minute in the
list of every trading minute since market open of the first trading
@@ -1202,11 +1224,11 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
int: The position of the given minute in the list of all trading
minutes since market open on the first trading day.
"""
return find_position_of_minute(
return find_position_of_five_minute(
self._market_open_values,
self._market_close_values,
minute_dt.value / NANOS_IN_MINUTE,
self._minutes_per_day,
minute_dt.value / NANOS_IN_FIVE_MINUTE,
self._five_minutes_per_day,
False,
)
@@ -1230,8 +1252,8 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
(minutes in range, sids) with a dtype of float64, containing the
values for the respective field over start and end dt range.
"""
start_idx = self._find_position_of_minute(start_dt)
end_idx = self._find_position_of_minute(end_dt)
start_idx = self._find_position_of_five_minute(start_dt)
end_idx = self._find_position_of_five_minute(end_dt)
num_minutes = (end_idx - start_idx + 1)
@@ -1250,7 +1272,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
if field != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
out = np.zeros(shape, dtype=uint64)
for i, sid in enumerate(sids):
carray = self._open_minute_file(field, sid)
+34 -99
View File
@@ -33,7 +33,7 @@ from ..utils.paths import (
)
from ..utils.deprecate import deprecated
from catalyst.curate.poloniex import PoloniexCurator
from catalyst.data.bundles.poloniex import PoloniexBundle
from catalyst.utils.calendars import get_calendar
@@ -93,8 +93,8 @@ def has_data_for_dates(series_or_df, first_date, last_date):
dts = series_or_df.index
if not isinstance(dts, pd.DatetimeIndex):
raise TypeError("Expected a DatetimeIndex, but got %s." % type(dts))
first, last = dts[[0, -1]]
return (first <= first_date) and (last >= last_date)
first, last = dts[[0, -1]].tz_localize(None)
return (first <= first_date.tz_localize(None)) and (last >= last_date.tz_localize(None))
def load_crypto_market_data(trading_day=None,
trading_days=None,
@@ -134,17 +134,19 @@ def load_crypto_market_data(trading_day=None,
trading_day,
environ,
)
# Override first_date for treasury data since we have it for many more years
# and is independent of crypto data
first_date_treasury = pd.Timestamp('1990-01-01', tz='UTC')
tc = ensure_treasury_data(
bm_symbol,
first_date,
first_date_treasury,
last_date,
now,
environ,
)
benchmark_returns = br[br.index.slice_indexer(first_date, last_date)]
treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)]
treasury_curves = tc[tc.index.slice_indexer(first_date_treasury, last_date)]
return benchmark_returns, treasury_curves
def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY',
@@ -232,11 +234,15 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY',
treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)]
return benchmark_returns, treasury_curves
def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
trading_day, environ=None):
def ensure_crypto_benchmark_data(symbol,
first_date,
last_date,
now,
trading_day,
environ=None):
filename = get_benchmark_filename(symbol)
source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\
format(symbol)
logger.info(
('Loading benchmark data for {symbol!r} '
@@ -269,92 +275,23 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now,
last_date=last_date
)
def dateparse(time_in_secs):
return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc)
def compute_daily_bars(five_min_bars, schedule):
# filter and copy the entry at the beginning of each session
daily_bars = five_min_bars[
five_min_bars.index.isin(schedule)
].copy()
day_offset = pd.Timedelta(days=1)
# iterate through session starts doing:
# 1. filter five_min_bars to get all entries in one day
# 2. compute daily bar entry
# 3. record in rid-th row of daily_bars
for rid, start_date in enumerate(daily_bars.index):
# compute beginning of next session
end_date = start_date + day_offset
# filter for entries session entries
day_data = five_min_bars[
(five_min_bars.index >= start_date) &
(five_min_bars.index < end_date)
]
# compute and record daily bar
daily_bars.iloc[rid] = (
day_data.open.iloc[0], # first open price
day_data.high.max(), # max of high prices
day_data.low.min(), # min of low prices
day_data.close.iloc[-1], # last close prices
day_data.volume.sum(), # sum of all volumes
)
# scale to allow trading 10-ths of a coin
scale = 10.0
daily_bars.loc[:, 'open'] /= scale
daily_bars.loc[:, 'high'] /= scale
daily_bars.loc[:, 'low'] /= scale
daily_bars.loc[:, 'close'] /= scale
daily_bars.loc[:, 'volume'] *= scale
return daily_bars
five_min_bars = None
# Load benchmark symbol from Poloniex API
try:
# load five minute bars from csv cache
five_min_bars = pd.read_csv(
source_filename,
names=['date', 'open', 'high', 'low', 'close', 'volume'],
index_col=[0],
parse_dates=True,
date_parser=dateparse,
bundle = PoloniexBundle()
bench_raw = bundle._fetch_symbol_frame(
None,
symbol,
get_calendar(bundle.calendar_name),
first_date - trading_day,
last_date,
'daily',
)
five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s')
except (OSError, IOError):
# Otherwise load from Poloniex API
try:
pc = PoloniexCurator()
pc.append_data_single_pair(symbol)
five_min_bars = pc.to_dataframe(
time.mktime(first_date.timetuple()),
time.mktime(last_date.timetuple()),
currencyPair=symbol,
)
except (OSError, IOError, HTTPError):
logger.exception('Failed to new crypto benchmark returns')
raise
# compute daily bars for open calendar
open_calendar = get_calendar('OPEN')
daily_bars = compute_daily_bars(
five_min_bars,
open_calendar.all_sessions,
)
# filter daily bars to include first_date and last_date
daily_bars = daily_bars[
(daily_bars.index >= (first_date - trading_day)) &
(daily_bars.index <= last_date)
]
except (OSError, IOError, HTTPError):
logger.exception('Failed to fetch new crypto benchmark returns')
raise
# select close column and compute percent change between days
daily_close = daily_bars[['close']]
daily_close = bench_raw[['close']]
daily_close = daily_close.pct_change(1).iloc[1:]
try:
@@ -430,6 +367,7 @@ def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day,
logger.warn("Still don't have expected data after redownload!")
return data
def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day,
environ=None):
"""
@@ -544,11 +482,6 @@ def ensure_treasury_data(symbol, first_date, last_date, now, environ=None):
def _load_cached_data(filename, first_date, last_date, now, resource_name,
environ=None):
if resource_name == 'benchmark':
from_csv = pd.Series.from_csv
else:
from_csv = pd.DataFrame.from_csv
# Path for the cache.
path = get_data_filepath(filename, environ)
@@ -556,8 +489,10 @@ def _load_cached_data(filename, first_date, last_date, now, resource_name,
# yet, so don't try to read from 'path'.
if os.path.exists(path):
try:
data = from_csv(path)
data.index = pd.to_datetime(data.index).tz_localize('UTC')
data = pd.DataFrame.from_csv(path)
if data.empty:
raise ValueError("File is empty.")
data.index = pd.to_datetime(data.index, infer_datetime_format=True, errors='coerce' ).tz_localize('UTC')
if has_data_for_dates(data, first_date, last_date):
return data
+18 -8
View File
@@ -73,7 +73,10 @@ from catalyst.utils.sqlite_utils import (
coerce_string_to_conn,
)
from catalyst.utils.memoize import lazyval
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.cli import (
item_show_count,
maybe_show_progress,
)
from ._equities import _compute_row_slices, _read_bcolz_data
from ._adjustments import load_adjustments_from_sqlite
@@ -117,7 +120,15 @@ UINT64_MAX = iinfo(uint64).max
def check_uint32_safe(value, colname):
if value >= UINT32_MAX:
raise ValueError(
"Value %s from column '%s' is too large" % (value, colname)
"Value %s from column '%s' is too large "
"for uint32" % (value, colname)
)
def check_uint64_safe(value, colname):
if value >= UINT64_MAX:
raise ValueError(
"Value %s from column '%s' is too large "
"for uint64" % (value, colname)
)
@@ -218,10 +229,7 @@ class BcolzDailyBarWriter(object):
@property
def progress_bar_message(self):
return "Merging daily equity files:"
def progress_bar_item_show_func(self, value):
return value if value is None else str(value[0])
return 'Compiling daily data'
def write(self,
data,
@@ -249,15 +257,17 @@ class BcolzDailyBarWriter(object):
table : bcolz.ctable
The newly-written table.
"""
total = None if assets is None else len(assets)
ctx = maybe_show_progress(
(
(sid, self.to_ctable(df, invalid_data_behavior))
for sid, df in data
),
show_progress=show_progress,
item_show_func=self.progress_bar_item_show_func,
label=self.progress_bar_message,
length=len(assets) if assets is not None else None,
item_show_func=item_show_count(total),
length=total,
show_percent=False,
)
with ctx as it:
return self._write_internal(it, assets)
+7 -1
View File
@@ -25,7 +25,7 @@ from catalyst.api import (
def initialize(context):
context.ASSET_NAME = 'USDT_ETH'
context.ASSET_NAME = 'USDT_BTC'
context.TARGET_HODL_RATIO = 0.8
context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO
@@ -37,7 +37,13 @@ def initialize(context):
context.is_buying = True
context.asset = symbol(context.ASSET_NAME)
context.i = 0
def handle_data(context, data):
context.i += 1
print 'i:', context.i
starting_cash = context.portfolio.starting_cash
target_hodl_value = context.TARGET_HODL_RATIO * starting_cash
reserve_value = context.RESERVE_RATIO * starting_cash
+1 -1
View File
@@ -52,7 +52,7 @@ def initialize(context):
schedule_function(
rebalance,
date_rules.every_day(),
time_rules=times_rules.every_minute(),
)
+15
View File
@@ -111,6 +111,21 @@ class PerformanceTracker(object):
self.treasury_curves,
self.trading_calendar
)
elif self.emission_rate == '5-minute':
self.all_benchmark_returns = pd.Series(
index=pd.date_range(
self.sim_params.first_open,
self.sim_params.last_close,
freq='5min'
),
)
self.cumulative_risk_metrics = \
risk.RiskMetricsCumulative(
self.sim_params,
self.treasury_curves,
self.trading_calendar,
create_first_day_stats=True,
)
elif self.emission_rate == 'minute':
self.all_benchmark_returns = pd.Series(index=pd.date_range(
self.sim_params.first_open, self.sim_params.last_close,
+1 -1
View File
@@ -158,7 +158,7 @@ def choose_treasury(select_treasury, treasury_curves, start_session,
)
break
if search_day:
if search_day and trading_calendar.name != 'OPEN': # Supress warning for 'OPEN' calendar
if (search_dist is None or search_dist > 1) and \
search_days[0] <= end_session <= search_days[-1]:
message = "No rate within 1 trading day of end date = \
+23
View File
@@ -20,7 +20,9 @@ cimport cython
from cpython cimport bool
cdef np.int64_t _nanos_in_minute = 60000000000
cdef np.int64_t _nanos_in_five_minutes = 5 * _nanos_in_minute
NANOS_IN_MINUTE = _nanos_in_minute
NANOS_IN_FIVE_MINUTES = _nanos_in_five_minutes
cpdef enum:
BAR = 0
@@ -115,3 +117,24 @@ cdef class MinuteSimulationClock:
yield minute, BAR
if minute_emission:
yield minute, MINUTE_END
cdef class FiveMinuteSimulationClock(MinuteSimulationClock):
@cython.boundscheck(False)
@cython.wraparound(False)
cdef dict calc_minutes_by_session(self):
cdef dict five_minutes_by_session
cdef int session_idx
cdef np.int64_t session_nano
cdef np.ndarray[np.int64_t, ndim=1] five_minutes_nanos
five_minutes_by_session = {}
for session_idx, session_nano in enumerate(self.sessions_nanos):
five_minutes_nanos = np.arange(
self.market_opens_nanos[session_idx],
self.market_closes_nanos[session_idx],
_nanos_in_five_minutes
)
five_minutes_by_session[session_nano] = pd.to_datetime(
five_minutes_nanos, utc=True, box=True
)
return five_minutes_by_session
+2 -1
View File
@@ -34,6 +34,7 @@ class AlgorithmSimulator(object):
EMISSION_TO_PERF_KEY_MAP = {
'minute': 'minute_perf',
'5-minute': '5_minute_perf',
'daily': 'daily_perf'
}
@@ -201,7 +202,7 @@ class AlgorithmSimulator(object):
stack.enter_context(self.processor)
stack.enter_context(ZiplineAPI(self.algo))
if algo.data_frequency == 'minute':
if algo.data_frequency in set(('minute', '5-minute')):
def execute_order_cancellation_policy():
algo.blotter.execute_cancel_policy(SESSION_END)
@@ -33,13 +33,30 @@ class CryptoPricingLoader(PipelineLoader):
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, dataset):
self.raw_price_loader = raw_price_loader
self._columns = dataset.columns
def __init__(self, bundle, data_frequency, dataset):
cal = get_calendar('OPEN')
self._all_sessions = cal.all_sessions
if data_frequency == 'daily':
reader = bundle.daily_bar_reader
all_sessions = cal.all_sessions
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
all_sessions = cal.all_five_minutes
elif data_frequency == 'minute':
reader = bundle.minute_bar_reader
all_sessions = cal.all_minutes
else:
raise ValueError(
'Invalid data frequency: {}'.format(data_frequency)
)
self.raw_price_loader = reader
self._columns = dataset.columns
self._all_sessions = all_sessions
@classmethod
def from_files(cls, pricing_path):
@@ -89,6 +106,7 @@ class CryptoPricingLoader(PipelineLoader):
def _shift_dates(dates, start_date, end_date, shift):
try:
start = dates.get_loc(start_date)
except KeyError:
@@ -36,15 +36,34 @@ class USEquityPricingLoader(PipelineLoader):
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, adjustments_loader, dataset):
self.raw_price_loader = raw_price_loader
self.adjustments_loader = adjustments_loader
def __init__(self, bundle, data_frequency, dataset):
if data_frequency == 'daily':
reader = bundle.daily_bar_reader
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
elif daily_bar_reader == 'minute':
reader = bundle.minute_bar_reader
else:
raise ValueError(
'Invalid data frequency: {}'.format(data_frequency)
)
cal = reader.trading_calendar or get_calendar('NYSE')
if data_frequency == 'daily':
all_sessions = cal.all_sessions
elif data_frequency == '5-minute':
reader = bundle.five_minute_bar_reader
all_sessions = cal.all_five_minutes
elif daily_bar_reader == 'minute':
reader = bundle.minute_bar_reader
all_sessions = cal.all_minutes
self.raw_price_loader = reader
self.adjustments_loader = bundle.adjustments_loader
self._columns = dataset.columns
cal = self.raw_price_loader.trading_calendar or \
get_calendar("NYSE")
self._all_sessions = cal.all_sessions
self._all_sessions = all_sessions
@classmethod
def from_files(cls, pricing_path, adjustments_path):
+28
View File
@@ -65,6 +65,19 @@ class BenchmarkSource(object):
)
self._precalculated_series = minute_series
elif self.emission_rate == '5-minute':
five_minutes = \
trading_calendar.five_minutes_for_sessions_in_range(
sessions[0],
sessions[-1],
)
five_minute_series = daily_series.reindex(
index=five_minutes,
method='ffill',
)
self._precalculated_series = five_minute_series
else:
self._precalculated_series = daily_series
else:
@@ -155,6 +168,21 @@ class BenchmarkSource(object):
ffill=True
)[asset]
return benchmark_series.pct_change()[1:]
elif self.emission_rate == '5-minute':
five_minutes = trading_calendar.five_minutes_for_sessions_in_range(
self.sessions[0], self.sessions[-1]
)
benchmark_series = data_portal.get_history_window(
[asset],
five_minutes[-1],
bar_count=len(five_minutes) + 1,
frequency='5m',
field='price',
data_frequency=self.emission_rate,
ffill=True,
)[asset]
return benchmark_series.pct_change()[1:]
else:
start_date = asset.start_date
+1 -1
View File
@@ -25,7 +25,7 @@ _default_calendar_factories = {
'us_futures': QuantopianUSFuturesCalendar,
}
_default_calendar_aliases = {
'CATX': 'OPEN',
'POLO': 'OPEN',
'NASDAQ': 'NYSE',
'BATS': 'NYSE',
'CBOT': 'CME',
@@ -1,6 +1,7 @@
from datetime import time
from pytz import timezone
from pandas import Timestamp
from pandas.tseries.offsets import DateOffset
from catalyst.utils.memoize import lazyval
@@ -28,3 +29,6 @@ class OpenExchangeCalendar(TradingCalendar):
@lazyval
def day(self):
return DateOffset(days=1)
def __init__(self, *args, **kwargs):
super(OpenExchangeCalendar, self).__init__(start=Timestamp('2015-03-01', tz='UTC'), **kwargs)
+67 -4
View File
@@ -117,6 +117,9 @@ class TradingCalendar(with_metaclass(ABCMeta)):
self._trading_minutes_nanos = self.all_minutes.values.\
astype(np.int64)
self._trading_five_minutes_nanos = self.all_five_minutes.values.\
astype(np.int64)
self.first_trading_session = _all_days[0]
self.last_trading_session = _all_days[-1]
@@ -179,6 +182,18 @@ class TradingCalendar(with_metaclass(ABCMeta)):
"""
return int(self._minutes_per_session[start_session:end_session].sum())
@lazyval
def _five_minutes_per_session(self):
diff = self.schedule.market_close - self.schedule.market_open
diff = diff.astype('timedelta64[m]')
return (diff + 1) // 5
def five_minutes_count_for_sessions_in_range(self,
start_session,
end_session):
five_mins = self._five_minutes_per_session[start_session:end_session]
return int(five_mins.sum())
@property
def regular_holidays(self):
"""
@@ -371,6 +386,10 @@ class TradingCalendar(with_metaclass(ABCMeta)):
idx = next_divider_idx(self._trading_minutes_nanos, dt.value)
return self.all_minutes[idx]
def next_five_minute(self, dt):
idx = next_divider_idx(self._trading_five_minutes_nanos, dt.values)
return self.all_five_mintutes[idx]
def previous_minute(self, dt):
"""
Given a dt, return the previous exchange minute.
@@ -465,6 +484,12 @@ class TradingCalendar(with_metaclass(ABCMeta)):
end_minute=self.schedule.at[session_label, 'market_close'],
)
def five_minutes_for_session(self, session_label):
return self.five_minutes_in_range(
start_five_minute=self.schedule.at[session_label, 'market_open'],
end_five_minute=self.schedule.at[session_label, 'market_close'],
)
def minutes_window(self, start_dt, count):
start_dt_nanos = start_dt.value
all_minutes_nanos = self._trading_minutes_nanos
@@ -566,6 +591,20 @@ class TradingCalendar(with_metaclass(ABCMeta)):
return abs(end_idx - start_idx)
def five_minutes_in_range(self, start_five_minute, end_five_minute):
start_idx = searchsorted(self._trading_five_minutes_nanos,
start_five_minute.value)
end_idx = searchsorted(self._trading_five_minutes_nanos,
end_five_minute.value)
if end_five_minute.value == self._trading_five_minutes_nanos[end_idx]:
# if the end minute is a market minute, increase by 1
end_idx += 1
return self.all_five_minutes[start_idx:end_idx]
def minutes_in_range(self, start_minute, end_minute):
"""
Given start and end minutes, return all the calendar minutes
@@ -623,6 +662,15 @@ class TradingCalendar(with_metaclass(ABCMeta)):
return self.minutes_in_range(first_minute, last_minute)
def five_minutes_for_sessions_in_range(self,
start_session_label,
end_session_label):
first_minute, _ = self.open_and_close_for_session(start_session_label)
_, last_minute = self.open_and_close_for_session(end_session_label)
return self.five_minutes_in_range(first_minute, last_minute)
def open_and_close_for_session(self, session_label):
"""
Returns a tuple of timestamps of the open and close of the session
@@ -690,8 +738,7 @@ class TradingCalendar(with_metaclass(ABCMeta)):
def execution_time_from_close(self, close_dates):
return close_dates
@lazyval
def all_minutes(self):
def _all_minutes_with_interval(self, interval):
"""
Returns a DatetimeIndex representing all the minutes in this calendar.
"""
@@ -703,8 +750,10 @@ class TradingCalendar(with_metaclass(ABCMeta)):
deltas = closes_in_ns - opens_in_ns
nanos_in_interval = interval * NANOS_IN_MINUTE
# + 1 because we want 390 days per standard day, not 389
daily_sizes = (deltas / NANOS_IN_MINUTE) + 1
daily_sizes = (deltas / nanos_in_interval) + 1
num_minutes = np.sum(daily_sizes).astype(np.int64)
# One allocation for the entire thing. This assumes that each day
@@ -721,13 +770,27 @@ class TradingCalendar(with_metaclass(ABCMeta)):
np.arange(
opens_in_ns[day_idx],
closes_in_ns[day_idx] + NANOS_IN_MINUTE,
NANOS_IN_MINUTE
nanos_in_interval
)
idx += size_int
return DatetimeIndex(all_minutes).tz_localize("UTC")
@lazyval
def all_five_minutes(self):
"""
Returns a DatetimeIndex representing all the five minutes in this calendar.
"""
return self._all_minutes_with_interval(5)
@lazyval
def all_minutes(self):
"""
Returns a DatetimeIndex representing all the minutes in this calendar.
"""
return self._all_minutes_with_interval(1)
@preprocess(dt=coerce(pd.Timestamp, attrgetter('value')))
def minute_to_session_label(self, dt, direction="next"):
"""
+28 -1
View File
@@ -1,10 +1,34 @@
from itertools import count
import click
import pandas as pd
from .context_tricks import CallbackManager
DEFAULT_BAR_TEMPLATE = ' [%(bar)s] %(label)s: %(info)s'
DEFAULT_EMPTY_CHAR = ' '
DEFAULT_FILL_CHAR = '='
def maybe_show_progress(it, show_progress, **kwargs):
def item_show_count(total=None):
def maybe_show_total(index):
if total is not None:
return '{0}/{1}'.format(index, total)
return str(index)
def item_show_func(item, _it=iter(count())):
if item is not None:
starting = False
return maybe_show_total(next(_it))
return 'DONE'
return item_show_func
def maybe_show_progress(it,
show_progress,
empty_char=DEFAULT_EMPTY_CHAR,
fill_char=DEFAULT_FILL_CHAR,
bar_template=DEFAULT_BAR_TEMPLATE,
**kwargs):
"""Optionally show a progress bar for the given iterator.
Parameters
@@ -30,6 +54,9 @@ def maybe_show_progress(it, show_progress, **kwargs):
...
"""
if show_progress:
kwargs['bar_template'] = bar_template
kwargs['empty_char'] = empty_char
kwargs['fill_char'] = fill_char
return click.progressbar(it, **kwargs)
# context manager that just return `it` when we enter it
+1
View File
@@ -602,6 +602,7 @@ class date_rules(object):
class time_rules(object):
market_open = AfterOpen
market_close = BeforeClose
every_5_minutes = Always
every_minute = Always
+10 -8
View File
@@ -156,15 +156,15 @@ def _run(handle_data,
environ=environ,
)
first_trading_day =\
bundle_data.equity_minute_bar_reader.first_trading_day
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
equity_minute_reader=bundle_data.equity_minute_bar_reader,
equity_daily_reader=bundle_data.equity_daily_bar_reader,
minute_reader=bundle_data.minute_bar_reader,
five_minute_reader=bundle_data.five_minute_bar_reader,
daily_reader=bundle_data.daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
@@ -179,13 +179,14 @@ def _run(handle_data,
if b == 'poloniex':
return CryptoPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data,
data_frequency,
CryptoPricing,
)
elif b == 'quantopian-quandl':
elif b == 'quandl':
return USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
bundle_data,
data_frequency,
USEquityPricing,
)
raise ValueError(
@@ -216,6 +217,7 @@ def _run(handle_data,
end=end,
capital_base=capital_base,
data_frequency=data_frequency,
emission_rate=data_frequency,
),
**{
'initialize': initialize,