ENH: make BcolzMinuteBarWriter.write take iterable

Updates the BcolzMinuteBarWriter.write api to allow users to pass their
data as a stream instead of requiring that they loop over their data
externally. This matches the API presented by BcolzDailyBarWriter.
This commit is contained in:
Joe Jevnik
2016-04-20 15:42:27 -04:00
parent e73ce0bf2b
commit efac476976
12 changed files with 172 additions and 170 deletions
+4 -3
View File
@@ -20,9 +20,10 @@ Enhancements
* Made the data loading classes have more consistent interfaces. This includes
the equity bar writers, adjustment writer, and asset db writer. The new
interface is that the resource to be written to is passed at construction time
and the data to write is provided later to the `write` method as a
dataframe. This model allows us to pass these writer objects around as a
resource for other classes and functions to consume (:issue:`1109`).
and the data to write is provided later to the `write` method as
dataframes or some iterator of dataframes. This model allows us to pass these
writer objects around as a resource for other classes and functions to
consume (:issue:`1109` and :issue:`1149`).
* Added masking to :class:`zipline.pipeline.CustomFactor`.
Custom factors can now be passed a Filter upon instantiation. This tells the
+16 -16
View File
@@ -100,7 +100,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [50.0]
},
index=[minute])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
open_price = self.reader.get_value(sid, minute, 'open')
@@ -135,7 +135,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [50.0, 51.0]
},
index=[minute_0, minute_1])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
open_price = self.reader.get_value(sid, minute_0, 'open')
@@ -190,7 +190,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [50.0]
},
index=[minute])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
open_price = self.reader.get_value(sid, minute, 'open')
@@ -224,7 +224,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [0]
},
index=[minute])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
open_price = self.reader.get_value(sid, minute, 'open')
@@ -267,7 +267,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [50.0, 51.0]
},
index=minutes)
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
minute = minutes[0]
@@ -325,10 +325,10 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [50.0]
},
index=[minute])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
with self.assertRaises(BcolzMinuteOverlappingData):
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
def test_write_multiple_sids(self):
"""
@@ -361,7 +361,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [100.0]
},
index=[minute])
self.writer.write(sids[0], data)
self.writer.write_sid(sids[0], data)
data = DataFrame(
data={
@@ -372,7 +372,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [200.0]
},
index=[minute])
self.writer.write(sids[1], data)
self.writer.write_sid(sids[1], data)
sid = sids[0]
@@ -442,7 +442,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [100.0]
},
index=[minute])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
open_price = self.reader.get_value(sid, minute, 'open')
@@ -489,7 +489,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': full(9, 0),
},
index=[minutes])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
fields = ['open', 'high', 'low', 'close', 'volume']
@@ -531,7 +531,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': full(9, 0),
},
index=[minutes])
self.writer.write(sid, data)
self.writer.write_sid(sid, data)
fields = ['open', 'high', 'low', 'close', 'volume']
@@ -630,7 +630,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [1000, 0, 1001]
},
index=minutes)
self.writer.write(sids[0], data_1)
self.writer.write_sid(sids[0], data_1)
data_2 = DataFrame(
data={
@@ -641,7 +641,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [2000, 0, 2001]
},
index=minutes)
self.writer.write(sids[1], data_2)
self.writer.write_sid(sids[1], data_2)
reader = BcolzMinuteBarReader(self.dest)
@@ -681,7 +681,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [1000, 1001, 1002],
},
index=minutes)
self.writer.write(sids[0], data_1)
self.writer.write_sid(sids[0], data_1)
data_2 = DataFrame(
data={
@@ -692,7 +692,7 @@ class BcolzMinuteBarTestCase(TestCase):
'volume': [2000, 2001, 2002],
},
index=minutes)
self.writer.write(sids[1], data_2)
self.writer.write_sid(sids[1], data_2)
reader = BcolzMinuteBarReader(self.dest)
+18 -20
View File
@@ -58,18 +58,16 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase):
@classmethod
def make_minute_bar_data(cls):
return {
133: pd.DataFrame(
{
'open': [3.0, 3.0, 3.5, 4.0, 3.5],
'high': [3.15, 3.15, 3.15, 3.15, 3.15],
'low': [2.85, 2.85, 2.85, 2.85, 2.85],
'close': [3.0, 3.5, 4.0, 3.5, 3.0],
'volume': [2000, 2000, 2000, 2000, 2000],
},
index=cls.minutes,
),
}
yield 133, pd.DataFrame(
{
'open': [3.0, 3.0, 3.5, 4.0, 3.5],
'high': [3.15, 3.15, 3.15, 3.15, 3.15],
'low': [2.85, 2.85, 2.85, 2.85, 2.85],
'close': [3.0, 3.5, 4.0, 3.5, 3.0],
'volume': [2000, 2000, 2000, 2000, 2000],
},
index=cls.minutes,
)
@classmethod
def init_class_fixtures(cls):
@@ -77,8 +75,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase):
cls.ASSET133 = cls.env.asset_finder.retrieve_asset(133)
def test_volume_share_slippage(self):
assets = {
133: pd.DataFrame(
assets = (
(133, pd.DataFrame(
{
'open': [3.00],
'high': [3.15],
@@ -87,8 +85,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase):
'volume': [200],
},
index=[self.minutes[0]],
),
}
)),
)
days = pd.date_range(
start=normalize_date(self.minutes[0]),
end=normalize_date(self.minutes[-1])
@@ -465,8 +463,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase):
data['sid'] = self.ASSET133
order = Order(**data)
assets = {
133: pd.DataFrame(
assets = (
(133, pd.DataFrame(
{
'open': [event_data['open']],
'high': [event_data['high']],
@@ -475,8 +473,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase):
'volume': [event_data['volume']],
},
index=[pd.Timestamp('2006-01-05 14:31', tz='UTC')],
),
}
)),
)
days = pd.date_range(
start=normalize_date(self.minutes[0]),
end=normalize_date(self.minutes[-1])
+23 -31
View File
@@ -29,7 +29,6 @@ from testfixtures import TempDirectory
import numpy as np
import pandas as pd
import pytz
from toolz import merge
from zipline import TradingAlgorithm
from zipline.api import FixedSlippage
@@ -1039,25 +1038,20 @@ class TestBeforeTradingStart(WithDataPortal,
index=asset_minutes,
)
split_data.iloc[780:] = split_data.iloc[780:] / 2.0
return merge(
{
sid: create_minute_df_for_asset(
cls.env,
cls.data_start,
cls.sim_params.period_end,
)
for sid in (1, 8554)
},
{
2: create_minute_df_for_asset(
cls.env,
cls.data_start,
cls.sim_params.period_end,
50,
),
cls.SPLIT_ASSET_SID: split_data,
},
for sid in (1, 8554):
yield sid, create_minute_df_for_asset(
cls.env,
cls.data_start,
cls.sim_params.period_end,
)
yield 2, create_minute_df_for_asset(
cls.env,
cls.data_start,
cls.sim_params.period_end,
50,
)
yield cls.SPLIT_ASSET_SID, split_data
@classmethod
def make_splits_data(cls):
@@ -2552,18 +2546,16 @@ class TestOrderCancelation(WithDataPortal,
minutes_arr = np.arange(1, 1 + minutes_count)
# normal test data, but volume is pinned at 1 share per minute
return {
1: pd.DataFrame(
{
'open': minutes_arr + 1,
'high': minutes_arr + 2,
'low': minutes_arr - 1,
'close': minutes_arr,
'volume': np.full(minutes_count, 1),
},
index=asset_minutes,
),
}
yield 1, pd.DataFrame(
{
'open': minutes_arr + 1,
'high': minutes_arr + 2,
'low': minutes_arr - 1,
'close': minutes_arr,
'volume': np.full(minutes_count, 1),
},
index=asset_minutes,
)
@classmethod
def make_daily_bar_data(cls):
+2 -4
View File
@@ -122,14 +122,12 @@ class TestAPIShim(WithDataPortal, WithSimParams, ZiplineTestCase):
@classmethod
def make_minute_bar_data(cls):
return {
sid: create_minute_df_for_asset(
for sid in cls.sids:
yield sid, create_minute_df_for_asset(
cls.env,
cls.SIM_PARAMS_START,
cls.SIM_PARAMS_END,
)
for sid in cls.sids
}
@classmethod
def make_daily_bar_data(cls):
+20 -27
View File
@@ -15,7 +15,6 @@
from nose_parameterized import parameterized
import numpy as np
import pandas as pd
from toolz import merge
from zipline._protocol import handle_non_market_minutes
from zipline.protocol import BarData
@@ -109,32 +108,26 @@ class TestMinuteBarData(WithBarDataChecks,
# asset2 has trades every 10 minutes
# split_asset trades every minute
# illiquid_split_asset trades every 10 minutes
return merge(
{
sid: create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
)
for sid in (1, cls.SPLIT_ASSET_SID)
},
{
sid: create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
10,
)
for sid in (2, cls.ILLIQUID_SPLIT_ASSET_SID)
},
{
cls.HILARIOUSLY_ILLIQUID_ASSET_SID: create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
50,
)
},
for sid in (1, cls.SPLIT_ASSET_SID):
yield sid, create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
)
for sid in (2, cls.ILLIQUID_SPLIT_ASSET_SID):
yield sid, create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
10,
)
yield cls.HILARIOUSLY_ILLIQUID_ASSET_SID, create_minute_df_for_asset(
cls.env,
cls.bcolz_minute_bar_days[0],
cls.bcolz_minute_bar_days[-1],
50,
)
@classmethod
+2 -1
View File
@@ -23,6 +23,7 @@ from nose.tools import timed
import numpy as np
import pandas as pd
import pytz
from six import iteritems
from six.moves import range
from testfixtures import TempDirectory
@@ -219,7 +220,7 @@ class FinanceTestCase(WithLogger,
env,
env.days_in_range(minutes[0], minutes[-1]),
tempdir.path,
assets
iteritems(assets),
)
equity_minute_reader = BcolzMinuteBarReader(tempdir.path)
+44 -48
View File
@@ -2,12 +2,12 @@ from textwrap import dedent
from numbers import Real
import pandas as pd
from nose_parameterized import parameterized
import numpy as np
from numpy import nan
from numpy.testing import assert_almost_equal
from nose_parameterized import parameterized
import pandas as pd
from six import iteritems
from zipline import TradingAlgorithm
from zipline._protocol import handle_non_market_minutes
@@ -473,7 +473,7 @@ class MinuteEquityHistoryTestCase(WithHistory, ZiplineTestCase):
start_val=2,
interval=10,
)
return data
return iteritems(data)
def test_history_in_initialize(self):
algo_text = dedent(
@@ -986,24 +986,22 @@ class DailyEquityHistoryTestCase(WithHistory, ZiplineTestCase):
def make_minute_bar_data(cls):
asset1 = cls.asset_finder.retrieve_asset(1)
asset2 = cls.asset_finder.retrieve_asset(2)
return {
asset1.sid: create_minute_df_for_asset(
cls.env,
asset1.start_date,
asset1.end_date,
start_val=2,
),
asset2.sid: create_minute_df_for_asset(
cls.env,
asset2.start_date,
cls.env.previous_trading_day(asset2.end_date),
start_val=2,
minute_blacklist=[
pd.Timestamp('2015-01-08 14:31', tz='UTC'),
pd.Timestamp('2015-01-08 21:00', tz='UTC'),
],
),
}
yield asset1.sid, create_minute_df_for_asset(
cls.env,
asset1.start_date,
asset1.end_date,
start_val=2,
)
yield asset2.sid, create_minute_df_for_asset(
cls.env,
asset2.start_date,
cls.env.previous_trading_day(asset2.end_date),
start_val=2,
minute_blacklist=[
pd.Timestamp('2015-01-08 14:31', tz='UTC'),
pd.Timestamp('2015-01-08 21:00', tz='UTC'),
],
)
@classmethod
def create_df_for_asset(cls, start_day, end_day, interval=1,
@@ -1548,32 +1546,30 @@ class MinuteToDailyAggregationTestCase(WithBcolzMinuteBarReader,
@classmethod
def make_minute_bar_data(cls):
return {
# sid data is created so that at least one high is lower than a
# previous high, and the inverse for low
1: pd.DataFrame(
{
'open': [nan, 103.50, 102.50, 104.50, 101.50, nan],
'high': [nan, 103.90, 102.90, 104.90, 101.90, nan],
'low': [nan, 103.10, 102.10, 104.10, 101.10, nan],
'close': [nan, 103.30, 102.30, 104.30, 101.30, nan],
'volume': [0, 1003, 1002, 1004, 1001, 0]
},
index=cls.minutes,
),
# sid 2 is included to provide data on different bars than sid 1,
# as will as illiquidty mid-day
2: pd.DataFrame(
{
'open': [201.50, nan, 204.50, nan, 200.50, 202.50],
'high': [201.90, nan, 204.90, nan, 200.90, 202.90],
'low': [201.10, nan, 204.10, nan, 200.10, 202.10],
'close': [201.30, nan, 203.50, nan, 200.30, 202.30],
'volume': [2001, 0, 2004, 0, 2000, 2002],
},
index=cls.minutes,
),
}
# sid data is created so that at least one high is lower than a
# previous high, and the inverse for low
yield 1, pd.DataFrame(
{
'open': [nan, 103.50, 102.50, 104.50, 101.50, nan],
'high': [nan, 103.90, 102.90, 104.90, 101.90, nan],
'low': [nan, 103.10, 102.10, 104.10, 101.10, nan],
'close': [nan, 103.30, 102.30, 104.30, 101.30, nan],
'volume': [0, 1003, 1002, 1004, 1001, 0]
},
index=cls.minutes,
)
# sid 2 is included to provide data on different bars than sid 1,
# as will as illiquidty mid-day
yield 2, pd.DataFrame(
{
'open': [201.50, nan, 204.50, nan, 200.50, 202.50],
'high': [201.90, nan, 204.90, nan, 200.90, 202.90],
'low': [201.10, nan, 204.10, nan, 200.10, 202.10],
'close': [201.30, nan, 203.50, nan, 200.30, 202.30],
'volume': [2001, 0, 2004, 0, 2000, 2002],
},
index=cls.minutes,
)
expected_values = {
1: pd.DataFrame(
+31 -1
View File
@@ -29,6 +29,7 @@ from zipline.data._minute_bar_internal import (
)
from zipline.gens.sim_engine import NANOS_IN_MINUTE
from zipline.utils.cli import maybe_show_progress
from zipline.utils.memoize import lazyval
US_EQUITIES_MINUTES_PER_DAY = 390
@@ -441,7 +442,36 @@ class BcolzMinuteBarWriter(object):
assert new_last_date == date, "new_last_date={0} != date={1}".format(
new_last_date, date)
def write(self, sid, df):
def write(self, data, show_progress=False):
"""Write a stream of minute data.
Parameters
----------
data : iterable[(int, pd.DataFrame)]
The data to write. Each element should be a tuple of sid, data
where data has the following format:
columns : ('open', 'high', 'low', 'close', 'volume')
open : float64
high : float64
low : float64
close : float64
volume : float64|int64
index : DatetimeIndex of market minutes.
show_progress : bool, optional
Whether or not to show a progress bar while writing.
"""
ctx = maybe_show_progress(
data,
show_progress=show_progress,
item_show_func=lambda e: e if e is None else str(e[0]),
label="Merging minute equity files:",
)
write_sid = self.write_sid
with ctx as it:
for e in it:
write_sid(*e)
def write_sid(self, sid, df):
"""
Write the OHLCV data for the given sid.
If there is no bcolz ctable yet created for the sid, create it.
+3 -3
View File
@@ -209,7 +209,7 @@ class BcolzDailyBarWriter(object):
@property
def progress_bar_message(self):
return "Merging asset files:"
return "Merging daily equity files:"
def progress_bar_item_show_func(self, value):
return value if value is None else str(value[0])
@@ -229,9 +229,9 @@ class BcolzDailyBarWriter(object):
The assets that should be in ``data``. If this is provided
we will check ``data`` against the assets and provide better
progress information.
show_progress : bool
show_progress : bool, optional
Whether or not to show a progress bar while writing.
invalid_data_behavior : {'warn', 'raise', 'ignore'}
invalid_data_behavior : {'warn', 'raise', 'ignore'}, optional
What to do when data is encountered that is outside the range of
a uint32.
+6 -11
View File
@@ -349,7 +349,7 @@ def make_trade_data_for_asset_info(dates,
)
if writer:
writer.write(sid, df)
writer.write_sid(sid, df)
trade_data[sid] = df
@@ -424,8 +424,8 @@ def write_minute_data(env, tempdir, minutes, sids):
def create_minute_bar_data(minutes, sids):
length = len(minutes)
return {
sid: pd.DataFrame(
for sid_idx, sid in enumerate(sids):
yield sid, pd.DataFrame(
{
'open': np.arange(length) + 10 + sid_idx,
'high': np.arange(length) + 15 + sid_idx,
@@ -435,8 +435,6 @@ def create_minute_bar_data(minutes, sids):
},
index=minutes,
)
for sid_idx, sid in enumerate(sids)
}
def create_daily_bar_data(trading_days, sids):
@@ -492,20 +490,17 @@ def create_data_portal(env, tempdir, sim_params, sids, adjustment_reader=None):
)
def write_bcolz_minute_data(env, days, path, df_dict):
def write_bcolz_minute_data(env, days, path, data):
market_opens = env.open_and_closes.market_open.loc[days]
market_closes = env.open_and_closes.market_close.loc[days]
writer = BcolzMinuteBarWriter(
BcolzMinuteBarWriter(
days[0],
path,
market_opens,
market_closes,
US_EQUITIES_MINUTES_PER_DAY
)
for sid, df in iteritems(df_dict):
writer.write(sid, df)
).write(data)
def create_minute_df_for_asset(env,
+3 -5
View File
@@ -6,7 +6,7 @@ from contextlib2 import ExitStack
from logbook import NullHandler, Logger
from nose_parameterized import parameterized
from pandas.util.testing import assert_series_equal
from six import with_metaclass, iteritems
from six import with_metaclass
from toolz import flip
import numpy as np
import pandas as pd
@@ -681,7 +681,7 @@ class WithBcolzMinuteBarReader(WithTradingEnvironment, WithTmpDir):
Methods
-------
make_minute_bar_data() -> dict[int -> pd.DataFrame]
make_minute_bar_data() -> iterable[(int, pd.DataFrame)]
A class method that returns a dict mapping sid to dataframe
which will be written to the bcolz files that the class's
``BcolzMinuteBarReader`` will read from. By default this creates
@@ -734,9 +734,7 @@ class WithBcolzMinuteBarReader(WithTradingEnvironment, WithTmpDir):
cls.env.open_and_closes.market_close.loc[days],
US_EQUITIES_MINUTES_PER_DAY
)
cls.bcolz_minute_bar_data = cls.make_minute_bar_data()
for sid, df in iteritems(cls.bcolz_minute_bar_data):
writer.write(sid, df)
writer.write(cls.make_minute_bar_data())
cls.bcolz_minute_bar_reader = BcolzMinuteBarReader(p)