mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 15:58:09 +08:00
Merge pull request #1198 from quantopian/modify-minutes-writer
Allow BcolzMinuteBarWriter to append to most recent day
This commit is contained in:
@@ -331,6 +331,48 @@ class BcolzMinuteBarTestCase(TestCase):
|
||||
with self.assertRaises(BcolzMinuteOverlappingData):
|
||||
self.writer.write_sid(sid, data)
|
||||
|
||||
def test_append_to_same_day(self):
|
||||
"""
|
||||
Test writing data with the same date as existing data in our file.
|
||||
"""
|
||||
sid = 1
|
||||
|
||||
first_minute = self.market_opens[TEST_CALENDAR_START]
|
||||
data = DataFrame(
|
||||
data={
|
||||
'open': [10.0],
|
||||
'high': [20.0],
|
||||
'low': [30.0],
|
||||
'close': [40.0],
|
||||
'volume': [50.0]
|
||||
},
|
||||
index=[first_minute])
|
||||
self.writer.write_sid(sid, data)
|
||||
|
||||
# Write data in the same day as the previous minute
|
||||
second_minute = first_minute + Timedelta(minutes=1)
|
||||
new_data = DataFrame(
|
||||
data={
|
||||
'open': [5.0],
|
||||
'high': [10.0],
|
||||
'low': [3.0],
|
||||
'close': [7.0],
|
||||
'volume': [10.0]
|
||||
},
|
||||
index=[second_minute])
|
||||
self.writer.write_sid(sid, new_data)
|
||||
|
||||
open_price = self.reader.get_value(sid, second_minute, 'open')
|
||||
self.assertEquals(5.0, open_price)
|
||||
high_price = self.reader.get_value(sid, second_minute, 'high')
|
||||
self.assertEquals(10.0, high_price)
|
||||
low_price = self.reader.get_value(sid, second_minute, 'low')
|
||||
self.assertEquals(3.0, low_price)
|
||||
close_price = self.reader.get_value(sid, second_minute, 'close')
|
||||
self.assertEquals(7.0, close_price)
|
||||
volume_price = self.reader.get_value(sid, second_minute, 'volume')
|
||||
self.assertEquals(10.0, volume_price)
|
||||
|
||||
def test_write_multiple_sids(self):
|
||||
"""
|
||||
Test writing multiple sids.
|
||||
@@ -806,3 +848,22 @@ class BcolzMinuteBarTestCase(TestCase):
|
||||
Timestamp('2015-11-30 21:01:00', tz='UTC'),
|
||||
'open'),
|
||||
600)
|
||||
|
||||
def test_set_sid_attrs(self):
|
||||
"""Confirm that we can set the attributes of a sid's file correctly.
|
||||
"""
|
||||
|
||||
sid = 1
|
||||
start_day = Timestamp('2015-11-27', tz='UTC')
|
||||
end_day = Timestamp('2015-06-02', tz='UTC')
|
||||
attrs = {
|
||||
'start_day': start_day.value / int(1e9),
|
||||
'end_day': end_day.value / int(1e9),
|
||||
'factor': 100,
|
||||
}
|
||||
|
||||
# Write the attributes
|
||||
self.writer.set_sid_attrs(sid, **attrs)
|
||||
# Read the attributes
|
||||
for k, v in attrs.items():
|
||||
self.assertEqual(self.reader.get_sid_attr(sid, k), v)
|
||||
|
||||
+41
-14
@@ -441,6 +441,13 @@ class BcolzMinuteBarWriter(object):
|
||||
assert new_last_date == date, "new_last_date={0} != date={1}".format(
|
||||
new_last_date, date)
|
||||
|
||||
def set_sid_attrs(self, sid, **kwargs):
|
||||
"""Write all the supplied kwargs as attributes of the sid's file.
|
||||
"""
|
||||
table = self._ensure_ctable(sid)
|
||||
for k, v in kwargs.items():
|
||||
table.attrs[k] = v
|
||||
|
||||
def write(self, data, show_progress=False):
|
||||
"""Write a stream of minute data.
|
||||
|
||||
@@ -559,29 +566,37 @@ class BcolzMinuteBarWriter(object):
|
||||
tds = self._trading_days
|
||||
input_first_day = pd.Timestamp(dts[0].astype('datetime64[D]'),
|
||||
tz='UTC')
|
||||
input_last_day = pd.Timestamp(dts[-1].astype('datetime64[D]'),
|
||||
tz='UTC')
|
||||
|
||||
last_date = self.last_date_in_output_for_sid(sid)
|
||||
|
||||
if last_date >= input_first_day:
|
||||
raise BcolzMinuteOverlappingData(dedent("""
|
||||
Data with last_date={0} already includes input start={1} for
|
||||
sid={2}""".strip()).format(last_date, input_first_day, sid))
|
||||
|
||||
day_before_input = input_first_day - tds.freq
|
||||
|
||||
self.pad(sid, day_before_input)
|
||||
table = self._ensure_ctable(sid)
|
||||
|
||||
days_to_write = tds[tds.slice_indexer(start=input_first_day,
|
||||
end=input_last_day)]
|
||||
|
||||
minutes_count = len(days_to_write) * self._minutes_per_day
|
||||
# Get the number of minutes already recorded in this sid's ctable
|
||||
num_rec_mins = table.size
|
||||
|
||||
all_minutes = self._minute_index
|
||||
indexer = all_minutes.slice_indexer(start=days_to_write[0])
|
||||
all_minutes_in_window = all_minutes[indexer]
|
||||
# Get the latest minute we wish to write to the ctable
|
||||
last_minute_to_write = dts[-1]
|
||||
|
||||
# In the event that we've already written some minutely data to the
|
||||
# ctable, guard against overwritting that data.
|
||||
if num_rec_mins > 0:
|
||||
last_recorded_minute = np.datetime64(all_minutes[num_rec_mins - 1])
|
||||
if last_minute_to_write <= last_recorded_minute:
|
||||
raise BcolzMinuteOverlappingData(dedent("""
|
||||
Data with last_date={0} already includes input start={1} for
|
||||
sid={2}""".strip()).format(last_date, input_first_day, sid))
|
||||
|
||||
latest_min_count = all_minutes.get_loc(last_minute_to_write)
|
||||
|
||||
# Get all the minutes we wish to write (all market minutes after the
|
||||
# latest currently written, up to and including last_minute_to_write)
|
||||
all_minutes_in_window = all_minutes[num_rec_mins:latest_min_count + 1]
|
||||
|
||||
minutes_count = all_minutes_in_window.size
|
||||
|
||||
open_col = np.zeros(minutes_count, dtype=np.uint32)
|
||||
high_col = np.zeros(minutes_count, dtype=np.uint32)
|
||||
@@ -756,6 +771,15 @@ class BcolzMinuteBarReader(object):
|
||||
|
||||
return carray
|
||||
|
||||
def get_sid_attr(self, sid, name):
|
||||
sid_subdir = _sid_subdir_path(sid)
|
||||
sid_path = os.path.join(self._rootdir, sid_subdir)
|
||||
attrs = bcolz.attrs.attrs(sid_path, 'r')
|
||||
try:
|
||||
return attrs[name]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_value(self, sid, dt, field):
|
||||
"""
|
||||
Retrieve the pricing info for the given sid, dt, and field.
|
||||
@@ -791,7 +815,10 @@ class BcolzMinuteBarReader(object):
|
||||
self._last_get_value_dt_value = dt.value
|
||||
self._last_get_value_dt_position = minute_pos
|
||||
|
||||
value = self._open_minute_file(field, sid)[minute_pos]
|
||||
try:
|
||||
value = self._open_minute_file(field, sid)[minute_pos]
|
||||
except IndexError:
|
||||
value = 0
|
||||
if value == 0:
|
||||
if field == 'volume':
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user