Merged Victor's hack for the minute writer precision

This commit is contained in:
fredfortier
2017-09-21 16:36:10 -04:00
parent 7335810cc2
commit 2f8768bb06
+53 -53
View File
@@ -39,7 +39,7 @@ from catalyst.data._minute_bar_internal import (
from catalyst.gens.sim_engine import NANOS_IN_MINUTE
from catalyst.data.bar_reader import BarReader, NoDataOnDate
from catalyst.data.us_equity_pricing import check_uint32_safe
from catalyst.data.us_equity_pricing import check_uint64_safe
from catalyst.utils.calendars import get_calendar
from catalyst.utils.cli import maybe_show_progress
from catalyst.utils.memoize import lazyval
@@ -52,7 +52,7 @@ FUTURES_MINUTES_PER_DAY = 1440
DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15
OHLC_RATIO = 1000
OHLC_RATIO = 100000000
class BcolzMinuteOverlappingData(Exception):
@@ -114,15 +114,15 @@ def _sid_subdir_path(sid):
def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
"""Adapt OHLCV columns into uint32 columns.
"""Adapt OHLCV columns into uint64 columns.
Parameters
----------
cols : dict
A dict mapping each column name (open, high, low, close, volume)
to a float column to convert to uint32.
to a float column to convert to uint64.
scale_factor : int
Factor to use to scale float values before converting to uint32.
Factor to use to scale float values before converting to uint64.
sid : int
Sid of the relevant asset, for logging.
invalid_data_behavior : str
@@ -135,6 +135,7 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
scaled_highs = np.nan_to_num(cols['high']) * scale_factor
scaled_lows = np.nan_to_num(cols['low']) * scale_factor
scaled_closes = np.nan_to_num(cols['close']) * scale_factor
scaled_volumes = np.nan_to_num(cols['volume']) * scale_factor
exclude_mask = np.zeros_like(scaled_opens, dtype=bool)
@@ -143,11 +144,12 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
('high', scaled_highs),
('low', scaled_lows),
('close', scaled_closes),
('volume', scaled_volumes),
]:
max_val = scaled_col.max()
try:
check_uint32_safe(max_val, col_name)
check_uint64_safe(max_val, col_name)
except ValueError:
if invalid_data_behavior == 'raise':
raise
@@ -155,20 +157,20 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior):
if invalid_data_behavior == 'warn':
logger.warn(
'Values for sid={}, col={} contain some too large for '
'uint32 (max={}), filtering them out',
'uint64 (max={}), filtering them out',
sid, col_name, max_val,
)
# We want to exclude all rows that have an unsafe value in
# this column.
exclude_mask &= (scaled_col >= np.iinfo(np.uint32).max)
exclude_mask &= (scaled_col >= np.iinfo(np.uint64).max)
# Convert all cols to uint32.
opens = scaled_opens.astype(np.uint32)
highs = scaled_highs.astype(np.uint32)
lows = scaled_lows.astype(np.uint32)
closes = scaled_closes.astype(np.uint32)
volumes = cols['volume'].astype(np.uint32)
opens = scaled_opens.astype(np.uint64)
highs = scaled_highs.astype(np.uint64)
lows = scaled_lows.astype(np.uint64)
closes = scaled_closes.astype(np.uint64)
volumes = scaled_volumes.astype(np.uint64)
# Exclude rows with unsafe values by setting to zero.
opens[exclude_mask] = 0
@@ -260,14 +262,14 @@ class BcolzMinuteBarMetadata(object):
)
def __init__(
self,
default_ohlc_ratio,
ohlc_ratios_per_sid,
calendar,
start_session,
end_session,
minutes_per_day,
version=FORMAT_VERSION,
self,
default_ohlc_ratio,
ohlc_ratios_per_sid,
calendar,
start_session,
end_session,
minutes_per_day,
version=FORMAT_VERSION,
):
self.calendar = calendar
self.start_session = start_session
@@ -288,7 +290,7 @@ class BcolzMinuteBarMetadata(object):
ohlc_ratio : int
The default ratio by which to multiply the pricing data to
convert the floats from floats to an integer to fit within
the np.uint32. If ohlc_ratios_per_sid is None or does not
the np.uint64. If ohlc_ratios_per_sid is None or does not
contain a mapping for a given sid, this ratio is used.
ohlc_ratios_per_sid : dict
A dict mapping each sid in the output to the factor by
@@ -340,10 +342,10 @@ class BcolzMinuteBarMetadata(object):
'first_trading_day': str(self.start_session.date()),
'market_opens': (
market_opens.values.astype('datetime64[m]').
astype(np.int64).tolist()),
astype(np.int64).tolist()),
'market_closes': (
market_closes.values.astype('datetime64[m]').
astype(np.int64).tolist()),
astype(np.int64).tolist()),
}
with open(self.metadata_path(rootdir), 'w+') as fp:
json.dump(metadata, fp)
@@ -372,13 +374,13 @@ class BcolzMinuteBarWriter(object):
The last trading session in the data set.
default_ohlc_ratio : int, optional
The default ratio by which to multiply the pricing data to
convert from floats to integers that fit within np.uint32. If
convert from floats to integers that fit within np.uint64. If
ohlc_ratios_per_sid is None or does not contain a mapping for a
given sid, this ratio is used. Default is OHLC_RATIO (1000).
given sid, this ratio is used. Default is OHLC_RATIO (10^8).
ohlc_ratios_per_sid : dict, optional
A dict mapping each sid in the output to the ratio by which to
multiply the pricing data to convert the floats from floats to
an integer to fit within the np.uint32.
an integer to fit within the np.uint64.
expectedlen : int, optional
The expected length of the dataset, used when creating the initial
bcolz ctable.
@@ -401,11 +403,9 @@ class BcolzMinuteBarWriter(object):
Each individual asset's data is stored as a bcolz table with a column for
each pricing field: (open, high, low, close, volume)
The open, high, low, and close columns are integers which are 1000 times
The open, high, low, close and volume columns are integers which are 10^8 times
the quoted price, so that the data can represented and stored as an
np.uint32, supporting market prices quoted up to the thousands place.
volume is a np.uint32 with no mutation of the tens place.
np.uint64, supporting market prices quoted up to the 1/10^8-th place.
The 'index' for each individual asset are a repeating period of minutes of
length `minutes_per_day` starting from each market open.
@@ -573,7 +573,7 @@ class BcolzMinuteBarWriter(object):
if not os.path.exists(sid_containing_dirname):
# Other sids may have already created the containing directory.
os.makedirs(sid_containing_dirname)
initial_array = np.empty(0, np.uint32)
initial_array = np.empty(0, np.uint64)
table = ctable(
rootdir=path,
columns=[
@@ -610,7 +610,7 @@ class BcolzMinuteBarWriter(object):
minute_offset = len(table) % self._minutes_per_day
num_to_prepend = numdays * self._minutes_per_day - minute_offset
prepend_array = np.zeros(num_to_prepend, np.uint32)
prepend_array = np.zeros(num_to_prepend, np.uint64)
# Fill all OHLCV with zeros.
table.append([prepend_array] * 5)
table.flush()
@@ -815,11 +815,11 @@ class BcolzMinuteBarWriter(object):
minutes_count = all_minutes_in_window.size
open_col = np.zeros(minutes_count, dtype=np.uint32)
high_col = np.zeros(minutes_count, dtype=np.uint32)
low_col = np.zeros(minutes_count, dtype=np.uint32)
close_col = np.zeros(minutes_count, dtype=np.uint32)
vol_col = np.zeros(minutes_count, dtype=np.uint32)
open_col = np.zeros(minutes_count, dtype=np.uint64)
high_col = np.zeros(minutes_count, dtype=np.uint64)
low_col = np.zeros(minutes_count, dtype=np.uint64)
close_col = np.zeros(minutes_count, dtype=np.uint64)
vol_col = np.zeros(minutes_count, dtype=np.uint64)
dt_ixs = np.searchsorted(all_minutes_in_window.values,
dts.astype('datetime64[ns]'))
@@ -914,10 +914,10 @@ class BcolzMinuteBarReader(MinuteBarReader):
)
self._schedule = self.calendar.schedule[slicer]
self._market_opens = self._schedule.market_open
self._market_open_values = self._market_opens.values.\
self._market_open_values = self._market_opens.values. \
astype('datetime64[m]').astype(np.int64)
self._market_closes = self._schedule.market_close
self._market_close_values = self._market_closes.values.\
self._market_close_values = self._market_closes.values. \
astype('datetime64[m]').astype(np.int64)
self._default_ohlc_inverse = 1.0 / metadata.default_ohlc_ratio
@@ -1125,8 +1125,8 @@ class BcolzMinuteBarReader(MinuteBarReader):
else:
return np.nan
if field != 'volume':
value *= self._ohlc_ratio_inverse_for_sid(sid)
#if field != 'volume':
value *= self._ohlc_ratio_inverse_for_sid(sid)
return value
def get_last_traded_dt(self, asset, dt):
@@ -1206,7 +1206,7 @@ class BcolzMinuteBarReader(MinuteBarReader):
minute_dt.value / NANOS_IN_MINUTE,
self._minutes_per_day,
False,
)
)
def load_raw_arrays(self, fields, start_dt, end_dt, sids):
"""
@@ -1248,7 +1248,7 @@ class BcolzMinuteBarReader(MinuteBarReader):
if field != 'volume':
out = np.full(shape, np.nan)
else:
out = np.zeros(shape, dtype=np.uint32)
out = np.zeros(shape, dtype=np.uint64)
for i, sid in enumerate(sids):
carray = self._open_minute_file(field, sid)
@@ -1256,17 +1256,17 @@ class BcolzMinuteBarReader(MinuteBarReader):
if indices_to_exclude is not None:
for excl_start, excl_stop in indices_to_exclude[::-1]:
excl_slice = np.s_[
excl_start - start_idx:excl_stop - start_idx + 1]
excl_start - start_idx:excl_stop - start_idx + 1]
values = np.delete(values, excl_slice)
where = values != 0
# first slice down to len(where) because we might not have
# written data for all the minutes requested
if field != 'volume':
out[:len(where), i][where] = (
values[where] * self._ohlc_ratio_inverse_for_sid(sid))
else:
out[:len(where), i][where] = values[where]
#if field != 'volume':
out[:len(where), i][where] = (
values[where] * self._ohlc_ratio_inverse_for_sid(sid))
#else:
# out[:len(where), i][where] = values[where]
results.append(out)
return results
@@ -1319,9 +1319,9 @@ class H5MinuteBarUpdateWriter(object):
def __init__(self, path, complevel=None, complib=None):
self._complevel = complevel if complevel \
is not None else self._COMPLEVEL
is not None else self._COMPLEVEL
self._complib = complib if complib \
is not None else self._COMPLIB
is not None else self._COMPLIB
self._path = path
def write(self, frames):
@@ -1358,4 +1358,4 @@ class H5MinuteBarUpdateReader(MinuteBarUpdateReader):
def read(self, dts, sids):
panel = self._panel[sids, dts, :]
return panel.iteritems()
return panel.iteritems()