mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 23:52:57 +08:00
Merge pull request #978 from quantopian/allow-empty-writes
ENH: Add padding method to minute bars writer.
This commit is contained in:
@@ -23,6 +23,7 @@ from pandas import (
|
||||
DataFrame,
|
||||
DatetimeIndex,
|
||||
Timestamp,
|
||||
NaT
|
||||
)
|
||||
from testfixtures import TempDirectory
|
||||
|
||||
@@ -399,3 +400,49 @@ class BcolzMinuteBarTestCase(TestCase):
|
||||
volume_price = self.reader.get_value(sid, minute, 'volume')
|
||||
|
||||
self.assertEquals(200.0, volume_price)
|
||||
|
||||
def test_pad_data(self):
|
||||
"""
|
||||
Test writing empty data.
|
||||
"""
|
||||
sid = 1
|
||||
last_date = self.writer.last_date_in_output_for_sid(sid)
|
||||
self.assertIs(last_date, NaT)
|
||||
|
||||
self.writer.pad(sid, TEST_CALENDAR_START)
|
||||
|
||||
last_date = self.writer.last_date_in_output_for_sid(sid)
|
||||
self.assertEqual(last_date, TEST_CALENDAR_START)
|
||||
|
||||
freq = self.market_opens.index.freq
|
||||
minute = self.market_opens[TEST_CALENDAR_START + freq]
|
||||
data = DataFrame(
|
||||
data={
|
||||
'open': [15.0],
|
||||
'high': [17.0],
|
||||
'low': [11.0],
|
||||
'close': [15.0],
|
||||
'volume': [100.0]
|
||||
},
|
||||
index=[minute])
|
||||
self.writer.write(sid, data)
|
||||
|
||||
open_price = self.reader.get_value(sid, minute, 'open')
|
||||
|
||||
self.assertEquals(15.0, open_price)
|
||||
|
||||
high_price = self.reader.get_value(sid, minute, 'high')
|
||||
|
||||
self.assertEquals(17.0, high_price)
|
||||
|
||||
low_price = self.reader.get_value(sid, minute, 'low')
|
||||
|
||||
self.assertEquals(11.0, low_price)
|
||||
|
||||
close_price = self.reader.get_value(sid, minute, 'close')
|
||||
|
||||
self.assertEquals(15.0, close_price)
|
||||
|
||||
volume_price = self.reader.get_value(sid, minute, 'volume')
|
||||
|
||||
self.assertEquals(100.0, volume_price)
|
||||
|
||||
+58
-29
@@ -282,6 +282,8 @@ class BcolzMinuteBarWriter(object):
|
||||
given sid.
|
||||
"""
|
||||
sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid))
|
||||
if not os.path.exists(sizes_path):
|
||||
return pd.NaT
|
||||
with open(sizes_path, mode='r') as f:
|
||||
sizes = f.read()
|
||||
data = json.loads(sizes)
|
||||
@@ -344,6 +346,48 @@ class BcolzMinuteBarWriter(object):
|
||||
table.append([prepend_array] * 5)
|
||||
table.flush()
|
||||
|
||||
def pad(self, sid, date):
|
||||
"""
|
||||
Fill sid container with empty data through the specified date.
|
||||
|
||||
e.g. if the date is two days after the last date in the sid's existing
|
||||
output, 2 x `minute_per_day` worth of zeros will be added to the
|
||||
output.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
sid : int
|
||||
The asset identifier for the data being written.
|
||||
date : datetime-like
|
||||
The date used to calculate how many slots to be pad.
|
||||
The padding is done through the date, i.e. after the padding is
|
||||
done the `last_date_in_output_for_sid` will be equal to `date`
|
||||
"""
|
||||
table = self._ensure_ctable(sid)
|
||||
|
||||
last_date = self.last_date_in_output_for_sid(sid)
|
||||
|
||||
tds = self._trading_days
|
||||
|
||||
if date <= last_date or date < tds[0]:
|
||||
# No need to pad.
|
||||
return
|
||||
|
||||
if last_date == pd.NaT:
|
||||
# If there is no data, determine how many days to add so that
|
||||
# desired days are written to the correct slots.
|
||||
days_to_zerofill = tds[tds.slice_indexer(end=date)]
|
||||
else:
|
||||
days_to_zerofill = tds[tds.slice_indexer(
|
||||
start=last_date + tds.freq,
|
||||
end=date)]
|
||||
|
||||
self._zerofill(table, len(days_to_zerofill))
|
||||
|
||||
new_last_date = self.last_date_in_output_for_sid(sid)
|
||||
assert new_last_date == date, "new_last_date={0} != date={1}".format(
|
||||
new_last_date, date)
|
||||
|
||||
def write(self, sid, df):
|
||||
"""
|
||||
Write the OHLCV data for the given sid.
|
||||
@@ -373,40 +417,25 @@ class BcolzMinuteBarWriter(object):
|
||||
"""
|
||||
table = self._ensure_ctable(sid)
|
||||
|
||||
last_date = self.last_date_in_output_for_sid(sid)
|
||||
tds = self._trading_days
|
||||
days = tds[tds.slice_indexer(start=normalize_date(df.index[0]),
|
||||
end=normalize_date(df.index[-1]))]
|
||||
input_first_day = days[0]
|
||||
input_first_day = normalize_date(df.index[0])
|
||||
input_last_day = normalize_date(df.index[-1])
|
||||
|
||||
if last_date is pd.NaT:
|
||||
# If there is no data, determine how many days to add so that
|
||||
# desired days are written to the correct slots.
|
||||
days_to_zerofill = tds[tds.slice_indexer(end=input_first_day)]
|
||||
# Chop off the input first day.
|
||||
days_to_zerofill = days_to_zerofill[:-1]
|
||||
else:
|
||||
next_date = last_date + 1
|
||||
if next_date < input_first_day:
|
||||
# If last_date and input_first_day are not adjacent need to
|
||||
# fill in between.
|
||||
days_to_zerofill = tds[tds.slice_indexer(
|
||||
start=last_date + 1,
|
||||
end=input_first_day)]
|
||||
# Chop off the input first day.
|
||||
days_to_zerofill = days_to_zerofill[:-1]
|
||||
elif next_date > input_first_day:
|
||||
raise BcolzMinuteOverlappingData(dedent("""
|
||||
window start={0} is before expected write date={1} for
|
||||
sid={2}""".strip()).format(days[0], input_first_day, sid))
|
||||
else:
|
||||
days_to_zerofill = None
|
||||
last_date = self.last_date_in_output_for_sid(sid)
|
||||
|
||||
if days_to_zerofill is not None and len(days_to_zerofill):
|
||||
self._zerofill(table, len(days_to_zerofill))
|
||||
if last_date >= input_first_day:
|
||||
raise BcolzMinuteOverlappingData(dedent("""
|
||||
Data with last_date={0} already includes input start={1} for
|
||||
sid={2}""".strip()).format(last_date, input_first_day, sid))
|
||||
|
||||
day_before_input = input_first_day - tds.freq
|
||||
|
||||
self.pad(sid, day_before_input)
|
||||
table = self._ensure_ctable(sid)
|
||||
|
||||
days_to_write = tds[tds.slice_indexer(start=input_first_day,
|
||||
end=days[-1])]
|
||||
end=input_last_day)]
|
||||
|
||||
minutes_count = len(days_to_write) * self._minutes_per_day
|
||||
|
||||
all_minutes = self._minute_index
|
||||
|
||||
Reference in New Issue
Block a user