WIP: Five Minute bars and FiveMinuteSimulationClock

This commit is contained in:
Conner Fromknecht
2017-07-11 13:20:19 -07:00
parent aeb6c01272
commit 826ad061f6
10 changed files with 231 additions and 128 deletions
+2 -2
View File
@@ -1,4 +1,4 @@
Dear Zipline Maintainers,
Dear Catalyst Maintainers,
Before I tell you about my issue, let me describe my environment:
@@ -7,7 +7,7 @@ Before I tell you about my issue, let me describe my environment:
* Operating System: (Windows Version or `$ uname --all`)
* Python Version: `$ python --version`
* Python Bitness: `$ python -c 'import math, sys;print(int(math.log(sys.maxsize + 1, 2) + 1))'`
* How did you install Zipline: (`pip`, `conda`, or `other (please explain)`)
* How did you install Catalyst: (`pip`, `conda`, or `other (please explain)`)
* Python packages: `$ pip freeze` or `$ conda list`
Now that you know a little about me, let me tell you about the issue I am
+1 -1
View File
@@ -123,7 +123,7 @@ def ipython_only(option):
)
@click.option(
'--data-frequency',
type=click.Choice({'daily', 'minute'}),
type=click.Choice({'daily', '5-minute', 'minute'}),
default='daily',
show_default=True,
help='The data frequency of the simulation.',
+41 -16
View File
@@ -133,7 +133,10 @@ from catalyst.utils.security_list import SecurityList
import catalyst.protocol
from catalyst.sources.requests_csv import PandasRequestsCSV
from catalyst.gens.sim_engine import MinuteSimulationClock
from catalyst.gens.sim_engine import (
MinuteSimulationClock,
FiveMinuteSimulationClock,
)
from catalyst.sources.benchmark_source import BenchmarkSource
from catalyst.catalyst_warnings import ZiplineDeprecationWarning
@@ -170,7 +173,7 @@ class TradingAlgorithm(object):
algo_filename : str, optional
The filename for the algoscript. This will be used in exception
tracebacks. default: '<string>'.
data_frequency : {'daily', 'minute'}, optional
data_frequency : {'daily', '5-minute', 'minute'}, optional
The duration of the bars.
instant_fill : bool, optional
Whether to fill orders immediately or on next bar. default: False
@@ -223,7 +226,7 @@ class TradingAlgorithm(object):
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : {'daily', 'minute'}
data_frequency : {'daily', '5-minute', 'minute'}
The duration of the bars.
capital_base : float <default: 1.0e5>
How much capital to start with.
@@ -449,7 +452,7 @@ class TradingAlgorithm(object):
self._in_before_trading_start = True
with handle_non_market_minutes(data) if \
self.data_frequency == "minute" else ExitStack():
self.data_frequency in ('minute', '5-minute') else ExitStack():
self._before_trading_start(self, data)
self._in_before_trading_start = False
@@ -505,10 +508,11 @@ class TradingAlgorithm(object):
market_closes = trading_o_and_c['market_close']
minutely_emission = False
if self.sim_params.data_frequency == 'minute':
if self.sim_params.data_frequency in set(('minute', '5-minute')):
market_opens = trading_o_and_c['market_open']
minutely_emission = self.sim_params.emission_rate == "minute"
minutely_emission = self.sim_params.emission_rate in \
set(('minute', '5-minute'))
else:
# in daily mode, we want to have one bar per session, timestamped
# as the last minute of the session.
@@ -528,10 +532,19 @@ class TradingAlgorithm(object):
# FIXME generalize these values
before_trading_start_minutes = days_at_time(
self.sim_params.sessions,
time(8, 45),
"US/Eastern"
time(0, 0),
'UTC',
)
if self.sim_params.data_frequency == '5-minute':
return FiveMinuteSimulationClock(
self.sim_params.sessions,
execution_opens,
execution_closes,
before_trading_start_minutes,
minute_emission=minutely_emission,
)
return MinuteSimulationClock(
self.sim_params.sessions,
execution_opens,
@@ -660,8 +673,11 @@ class TradingAlgorithm(object):
# Assume data is daily if timestamp times are
# standardized, otherwise assume minute bars.
times = data.major_axis.time
if np.all(times == times[0]):
time_count = times.nunique()
if time_count == 1:
self.sim_params.data_frequency = 'daily'
elif time_count == 288:
self.sim_params.data_frequency = '5-minute'
else:
self.sim_params.data_frequency = 'minute'
@@ -683,6 +699,8 @@ class TradingAlgorithm(object):
if self.sim_params.data_frequency == 'daily':
equity_reader_arg = 'equity_daily_reader'
elif self.sim_params.data_frequency == '5-minute':
equity_daily_reader = 'equity_5_minute_reader'
elif self.sim_params.data_frequency == 'minute':
equity_reader_arg = 'equity_minute_reader'
equity_reader = PanelBarReader(
@@ -926,9 +944,9 @@ class TradingAlgorithm(object):
The arena from the simulation parameters. This will normally
be ``'backtest'`` but some systems may use this distinguish
live trading from backtesting.
data_frequency : {'daily', 'minute'}
data_frequency : {'daily', '5-minute', 'minute'}
data_frequency tells the algorithm if it is running with
daily data or minute data.
daily, minute, or five-minute mode.
start : datetime
The start date for the simulation.
end : datetime
@@ -1102,12 +1120,19 @@ class TradingAlgorithm(object):
'date_rule. You should use keyword argument '
'time_rule= when calling schedule_function without '
'specifying a date_rule', stacklevel=3)
freq = self.sim_params.data_frequency
date_rule = date_rule or date_rules.every_day()
time_rule = ((time_rule or time_rules.every_minute())
if self.sim_params.data_frequency == 'minute' else
# If we are in daily mode the time_rule is ignored.
time_rules.every_minute())
if freq is 'daily':
# ignore time rule in daily mode
time_rule = time_rules.every_minute()
else:
# use provided time rule or default to every minute or 5 minutes
# based on desired data frequency.
time_rule = time_rule or (time_rules.every_5_minutes()
if freq is '5-minute' else
time_rules.every_minute())
# Check the type of the algorithm's schedule before pulling calendar
# Note that the ExchangeTradingSchedule is currently the only
@@ -1782,7 +1807,7 @@ class TradingAlgorithm(object):
@data_frequency.setter
def data_frequency(self, value):
assert value in ('daily', 'minute')
assert value in ('daily', '5-minute', 'minute')
self.sim_params.data_frequency = value
@api_method
+107 -73
View File
@@ -117,24 +117,8 @@ class AbstractBundle(object):
symbol_map = raw_metadata.symbol
daily_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'daily',
retries,
),
assets=raw_metadata.index,
show_progress=show_progress,
)
"""
for data_frequency in self.frequencies:
self._write_symbol_for_freq(
if 'daily' in self.frequencies:
daily_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
@@ -142,22 +126,31 @@ class AbstractBundle(object):
calendar,
start_session,
end_session,
data_frequency,
'daily',
retries,
),
data_frequency,
daily_bar_writer,
minute_bar_writer,
assets=raw_metadata.index,
show_progress=show_progress,
)
"""
metadata = self._post_process_metadata(raw_metadata, cache)
asset_db_writer.write(metadata)
if '5-minute' in self.frequencies:
minute_bar_writer.write(
self._fetch_symbol_iter(
api_key,
cache,
symbol_map,
calendar,
start_session,
end_session,
'5-minute',
retries,
),
show_progress=show_progress,
)
adjustment_writer.write()
else:
self._download(show_progress, output_dir)
@@ -188,7 +181,7 @@ class AbstractBundle(object):
raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ)
def item_show_func(_, _it=iter(count())):
return 'Downloading metadata page: {0}'.format(next(_it))
return 'Downloading metadata: {0}'.format('.' * next(_it))
with maybe_show_progress(
raw_iter,
@@ -263,65 +256,108 @@ class AbstractBundle(object):
calendar,
start_session,
end_session,
frequency,
data_frequency,
retries):
for asset_id, symbol in symbol_map.iteritems():
# Record start time of iteration, compare at end of iteration to
# adhere to the datas source's rate limit policy.
start_time = pd.Timestamp.utcnow()
try:
raw_data = cache[symbol]
except KeyError:
raw_data = None
if raw_data is not None and not raw_data.empty:
last = raw_data.index[-1].tz_localize('UTC')
else:
last = start_session
next_start_time = last + pd.Timedelta(minutes=5)
if start_time > next_start_time:
raw_diff = self.fetch_raw_symbol_frame(
api_key,
symbol,
last,
end_session,
frequency,
)
raw_diff = raw_diff[
(raw_diff.index >= last) &
(raw_diff.index <= end_session)
]
# Fetch new data if cached data is absent or stale, otherwise
# returns the cached data unaltered. The `should_sleep` flag
# indicates that an API call was attempted, and that we should be
# ensure aren't exceeding our rate limit before proceeding to the
# next symbol. If the raw_data is updated, it is cached before being
# returned.
raw_data, should_sleep = self._maybe_update_symbol_frame(
start_time,
api_key,
cache,
symbol,
start_session,
end_session,
data_frequency,
)
raw_data = cache[symbol] = (
raw_data.append(raw_diff)
if raw_data is not None else
raw_diff
)
raw_data = raw_data[~raw_data.index.duplicated(keep='last')]
should_sleep = True
else:
should_sleep = False
"""
sessions = calendar.sessions_in_range(start_session, end_session)
print 'raw_data before:\n', raw_data.head()
raw_data = raw_data.reindex(
sessions,
copy=False,
).fillna(0.0)
print 'raw_data after:\n', raw_data.head()
"""
# TODO(cfromknecht) further data validation?
# Pass asset_id and symbol data to writer.
yield asset_id, raw_data
# If an API call was made during this iteration and the time to
# reach this point was less than the inter-request `wait_time`,
# sleep until after enough time has elapsed to prevent getting rate
# limited.
if should_sleep:
remaining = pd.Timestamp.utcnow() - start_time + self.wait_time
if remaining.value > 0:
sleep(remaining.value / 10**9)
def _maybe_update_symbol_frame(self,
start_time,
api_key,
cache,
symbol,
start_session,
end_session,
data_frequency):
try:
raw_data = cache[symbol]
except KeyError:
raw_data = None
# Select the most recent date in cached dataset if it exists,
# otherwise use the provided `start_session`.
last = (
raw_data.index[-1].tz_localize('UTC')
if raw_data is not None and not raw_data.empty else
start_session
)
# Determine time at which cached data will be considered stale.
cache_expiration = last + pd.Timedelta(minutes=5)
if start_time <= cache_expiration:
# Data is fresh enough to reuse, no need to update. Iterator can
# proceed to next symbol directly since no API call was required.
should_sleep = False
else:
# Data for symbol is old enough to attempt an update or is not
# present in the cache. Fetch raw data for a single symbol
# with requested intervals and frequency.
raw_diff = self.fetch_raw_symbol_frame(
api_key,
symbol,
last,
end_session,
data_frequency,
)
# Filter incoming data to minimize overlap.
raw_diff = raw_diff[
(raw_diff.index >= last) &
(raw_diff.index <= end_session)
]
# Append incoming data to cached data if it exists,
# otherwise treat incoming data as the entire raw dataset.
raw_data = cache[symbol] = (
raw_data.append(raw_diff)
if raw_data is not None else
raw_diff
)
# Filter out any duplicates entries, keep last one as previous
# one was probably an incomplete frame.
raw_data = raw_data[~raw_data.index.duplicated(keep='last')]
# If we arrive here, we must have attempted an API call.
# This flag tells the iterator to pause before starting the next
# asset, that we don't exceed the data source's rate limit.
should_sleep = True
return raw_data, should_sleep
def _write_symbol_for_freq(self,
pricing_iter,
data_frequency,
@@ -338,13 +374,11 @@ class AbstractBundle(object):
elif data_frequency == '5-minute':
minute_bar_writer.write(
pricing_iter,
assets=assets,
show_progress=show_progress,
)
elif data_frequency == 'minute':
minute_bar_writer.write(
pricing_iter,
assets=assets,
show_progress=show_progress,
)
else:
+44 -33
View File
@@ -24,6 +24,10 @@ from bcolz import ctable
from intervaltree import IntervalTree
import logbook
import numpy as np
from numpy import (
iinfo,
uint64,
)
import pandas as pd
from pandas import HDFStore
import tables
@@ -39,23 +43,28 @@ from catalyst.data._minute_bar_internal import (
from catalyst.gens.sim_engine import NANOS_IN_MINUTE
from catalyst.data.bar_reader import BarReader, NoDataOnDate
from catalyst.data.us_equity_pricing import check_uint32_safe
from catalyst.data.us_equity_pricing import (
winsorise_uint64,
check_uint64_safe,
)
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.memoize import lazyval
logger = logbook.Logger('FiveMinuteBars')
CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY = 288
OPEN_FIVE_MINUTES_PER_DAY = 288
US_EQUITIES_MINUTES_PER_DAY = 390
FUTURES_MINUTES_PER_DAY = 1440
DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15
DEFAULT_EXPECTED_CRYPTO_LEN = CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY * 366 * 15
DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15
OHLC_RATIO = 1000
OHLC_RATIO = 1000000
OHLC = frozenset(['open', 'high', 'low', 'close'])
OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume'])
UINT64_MAX = iinfo(uint64).max
class BcolzFiveMinuteOverlappingData(Exception):
pass
@@ -68,7 +77,7 @@ class BcolzFiveMinuteWriterColumnMismatch(Exception):
class FiveMinuteBarReader(BarReader):
@property
def data_frequency(self):
return "five-minute"
return "5-minute"
def _calc_five_minute_index(market_opens, five_minutes_per_day):
@@ -116,19 +125,19 @@ def _sid_subdir_path(sid):
def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
"""Adapt OHLCV columns into uint32 columns.
"""Adapt OHLCV columns into uint64 columns.
Parameters
----------
cols : dict
A dict mapping each column name (open, high, low, close, volume)
to a float column to convert to uint32.
to a float column to convert to uint64.
scale_factor : int
Factor to use to scale float values before converting to uint32.
Factor to use to scale float values before converting to uint64.
sid : int
Sid of the relevant asset, for logging.
invalid_data_behavior : str
Specifies behavior when data cannot be converted to uint32.
Specifies behavior when data cannot be converted to uint64.
If 'raise', raises an exception.
If 'warn', logs a warning and filters out incompatible values.
If 'ignore', silently filters out incompatible values.
@@ -137,6 +146,7 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
scaled_highs = np.nan_to_num(cols['high']) * scale_factor
scaled_lows = np.nan_to_num(cols['low']) * scale_factor
scaled_closes = np.nan_to_num(cols['close']) * scale_factor
volumes = np.nan_to_num(cols['volume'])
exclude_mask = np.zeros_like(scaled_opens, dtype=bool)
@@ -145,11 +155,12 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
('high', scaled_highs),
('low', scaled_lows),
('close', scaled_closes),
('volume', volumes),
]:
max_val = scaled_col.max()
try:
check_uint32_safe(max_val, col_name)
check_uint64_safe(max_val, col_name)
except ValueError:
if invalid_data_behavior == 'raise':
raise
@@ -157,20 +168,20 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
if invalid_data_behavior == 'warn':
logger.warn(
'Values for sid={}, col={} contain some too large for '
'uint32 (max={}), filtering them out',
'uint64 (max={}), filtering them out',
sid, col_name, max_val,
)
# We want to exclude all rows that have an unsafe value in
# this column.
exclude_mask &= (scaled_col >= np.iinfo(np.uint32).max)
exclude_mask &= (scaled_col >= iinfo(uint64).max)
# Convert all cols to uint32.
opens = scaled_opens.astype(np.uint32)
highs = scaled_highs.astype(np.uint32)
lows = scaled_lows.astype(np.uint32)
closes = scaled_closes.astype(np.uint32)
volumes = cols['volume'].astype(np.uint32)
# Convert all cols to uint64.
opens = scaled_opens.astype(uint64)
highs = scaled_highs.astype(uint64)
lows = scaled_lows.astype(uint64)
closes = scaled_closes.astype(uint64)
volumes = volumes.astype(uint64)
# Exclude rows with unsafe values by setting to zero.
opens[exclude_mask] = 0
@@ -290,7 +301,7 @@ class BcolzFiveMinuteBarMetadata(object):
ohlc_ratio : int
The default ratio by which to multiply the pricing data to
convert the floats from floats to an integer to fit within
the np.uint32. If ohlc_ratios_per_sid is None or does not
the np.uint64. If ohlc_ratios_per_sid is None or does not
contain a mapping for a given sid, this ratio is used.
ohlc_ratios_per_sid : dict
A dict mapping each sid in the output to the factor by
@@ -374,13 +385,13 @@ class BcolzFiveMinuteBarWriter(object):
The last trading session in the data set.
default_ohlc_ratio : int, optional
The default ratio by which to multiply the pricing data to
convert from floats to integers that fit within np.uint32. If
convert from floats to integers that fit within np.uint64. If
ohlc_ratios_per_sid is None or does not contain a mapping for a
given sid, this ratio is used. Default is OHLC_RATIO (1000).
ohlc_ratios_per_sid : dict, optional
A dict mapping each sid in the output to the ratio by which to
multiply the pricing data to convert the floats from floats to
an integer to fit within the np.uint32.
an integer to fit within the np.uint64.
expectedlen : int, optional
The expected length of the dataset, used when creating the initial
bcolz ctable.
@@ -405,9 +416,9 @@ class BcolzFiveMinuteBarWriter(object):
The open, high, low, and close columns are integers which are 1000 times
the quoted price, so that the data can represented and stored as an
np.uint32, supporting market prices quoted up to the thousands place.
np.uint64, supporting market prices quoted up to the thousands place.
volume is a np.uint32 with no mutation of the tens place.
volume is a np.uint64 with no mutation of the tens place.
The 'index' for each individual asset are a repeating period of minutes of
length `minutes_per_day` starting from each market open.
@@ -575,7 +586,7 @@ class BcolzFiveMinuteBarWriter(object):
if not os.path.exists(sid_containing_dirname):
# Other sids may have already created the containing directory.
os.makedirs(sid_containing_dirname)
initial_array = np.empty(0, np.uint32)
initial_array = np.empty(0, np.uint64)
table = ctable(
rootdir=path,
columns=[
@@ -612,7 +623,7 @@ class BcolzFiveMinuteBarWriter(object):
five_minute_offset = len(table) % self._five_minutes_per_day
num_to_prepend = numdays * self._five_minutes_per_day - five_minute_offset
prepend_array = np.zeros(num_to_prepend, np.uint32)
prepend_array = np.zeros(num_to_prepend, np.uint64)
# Fill all OHLCV with zeros.
table.append([prepend_array] * 5)
table.flush()
@@ -817,11 +828,11 @@ class BcolzFiveMinuteBarWriter(object):
minutes_count = all_minutes_in_window.size
open_col = np.zeros(minutes_count, dtype=np.uint32)
high_col = np.zeros(minutes_count, dtype=np.uint32)
low_col = np.zeros(minutes_count, dtype=np.uint32)
close_col = np.zeros(minutes_count, dtype=np.uint32)
vol_col = np.zeros(minutes_count, dtype=np.uint32)
open_col = np.zeros(minutes_count, dtype=uint64)
high_col = np.zeros(minutes_count, dtype=uint64)
low_col = np.zeros(minutes_count, dtype=uint64)
close_col = np.zeros(minutes_count, dtype=uint64)
vol_col = np.zeros(minutes_count, dtype=uint64)
dt_ixs = np.searchsorted(all_minutes_in_window.values,
dts.astype('datetime64[ns]'))
@@ -1250,7 +1261,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader):
if field != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
out = np.zeros(shape, dtype=int64)
for i, sid in enumerate(sids):
carray = self._open_minute_file(field, sid)
+9 -1
View File
@@ -117,7 +117,15 @@ UINT64_MAX = iinfo(uint64).max
def check_uint32_safe(value, colname):
if value >= UINT32_MAX:
raise ValueError(
"Value %s from column '%s' is too large" % (value, colname)
"Value %s from column '%s' is too large "
"for uint32" % (value, colname)
)
def check_uint64_safe(value, colname):
if value >= UINT64_MAX:
raise ValueError(
"Value %s from column '%s' is too large "
"for uint64" % (value, colname)
)
+23
View File
@@ -20,7 +20,9 @@ cimport cython
from cpython cimport bool
cdef np.int64_t _nanos_in_minute = 60000000000
cdef np.int64_t _nanos_in_five_minutes = 5 * _nanos_in_minute
NANOS_IN_MINUTE = _nanos_in_minute
NANOS_IN_FIVE_MINUTES = _nanos_in_five_minutes
cpdef enum:
BAR = 0
@@ -115,3 +117,24 @@ cdef class MinuteSimulationClock:
yield minute, BAR
if minute_emission:
yield minute, MINUTE_END
cdef class FiveMinuteSimulationClock(MinuteSimulationClock):
@cython.boundscheck(False)
@cython.wraparound(False)
cdef dict calc_minutes_by_session(self):
cdef dict five_minutes_by_session
cdef int session_idx
cdef np.int64_t session_nano
cdef np.ndarray[np.int64_t, ndim=1] five_minutes_nanos
five_minutes_by_session = {}
for session_idx, session_nano in enumerate(self.sessions_nanos):
five_minutes_nanos = np.arange(
self.market_opens_nanos[session_idx],
self.market_closes_nanos[session_idx] + _nanos_in_five_minutes,
_nanos_in_five_minutes
)
five_minutes_by_session[session_nano] = pd.to_datetime(
five_minutes_nanos, utc=True, box=True
)
return five_minutes_by_session
+2 -1
View File
@@ -34,6 +34,7 @@ class AlgorithmSimulator(object):
EMISSION_TO_PERF_KEY_MAP = {
'minute': 'minute_perf',
'5-minute': '5_minute_perf',
'daily': 'daily_perf'
}
@@ -201,7 +202,7 @@ class AlgorithmSimulator(object):
stack.enter_context(self.processor)
stack.enter_context(ZiplineAPI(self.algo))
if algo.data_frequency == 'minute':
if algo.data_frequency in set(('minute', '5-minute')):
def execute_order_cancellation_policy():
algo.blotter.execute_cancel_policy(SESSION_END)
+1 -1
View File
@@ -25,7 +25,7 @@ _default_calendar_factories = {
'us_futures': QuantopianUSFuturesCalendar,
}
_default_calendar_aliases = {
'CATX': 'OPEN',
'POLO': 'OPEN',
'NASDAQ': 'NYSE',
'BATS': 'NYSE',
'CBOT': 'CME',
+1
View File
@@ -602,6 +602,7 @@ class date_rules(object):
class time_rules(object):
market_open = AfterOpen
market_close = BeforeClose
every_5_minutes = Always
every_minute = Always