From efac47697661e8b60d9ccda4d35b156f1eddc141 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Wed, 20 Apr 2016 15:42:27 -0400 Subject: [PATCH] ENH: make BcolzMinuteBarWriter.write take iterable Updates the BcolzMinuteBarWriter.write api to allow users to pass their data as a stream instead of requiring that they loop over their data externally. This matches the API presented by BcolzDailyBarWriter. --- docs/source/whatsnew/1.0.0.txt | 7 ++- tests/data/test_minute_bars.py | 32 +++++------ tests/finance/test_slippage.py | 38 ++++++------- tests/test_algorithm.py | 54 ++++++++---------- tests/test_api_shim.py | 6 +- tests/test_bar_data.py | 47 +++++++--------- tests/test_finance.py | 3 +- tests/test_history.py | 92 +++++++++++++++---------------- zipline/data/minute_bars.py | 32 ++++++++++- zipline/data/us_equity_pricing.py | 6 +- zipline/testing/core.py | 17 ++---- zipline/testing/fixtures.py | 8 +-- 12 files changed, 172 insertions(+), 170 deletions(-) diff --git a/docs/source/whatsnew/1.0.0.txt b/docs/source/whatsnew/1.0.0.txt index 12faa317..5d9b581b 100644 --- a/docs/source/whatsnew/1.0.0.txt +++ b/docs/source/whatsnew/1.0.0.txt @@ -20,9 +20,10 @@ Enhancements * Made the data loading classes have more consistent interfaces. This includes the equity bar writers, adjustment writer, and asset db writer. The new interface is that the resource to be written to is passed at construction time - and the data to write is provided later to the `write` method as a - dataframe. This model allows us to pass these writer objects around as a - resource for other classes and functions to consume (:issue:`1109`). + and the data to write is provided later to the `write` method as + dataframes or some iterator of dataframes. This model allows us to pass these + writer objects around as a resource for other classes and functions to + consume (:issue:`1109` and :issue:`1149`). * Added masking to :class:`zipline.pipeline.CustomFactor`. Custom factors can now be passed a Filter upon instantiation. This tells the diff --git a/tests/data/test_minute_bars.py b/tests/data/test_minute_bars.py index d99c0fc4..9abdffbb 100644 --- a/tests/data/test_minute_bars.py +++ b/tests/data/test_minute_bars.py @@ -100,7 +100,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [50.0] }, index=[minute]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) open_price = self.reader.get_value(sid, minute, 'open') @@ -135,7 +135,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [50.0, 51.0] }, index=[minute_0, minute_1]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) open_price = self.reader.get_value(sid, minute_0, 'open') @@ -190,7 +190,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [50.0] }, index=[minute]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) open_price = self.reader.get_value(sid, minute, 'open') @@ -224,7 +224,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [0] }, index=[minute]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) open_price = self.reader.get_value(sid, minute, 'open') @@ -267,7 +267,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [50.0, 51.0] }, index=minutes) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) minute = minutes[0] @@ -325,10 +325,10 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [50.0] }, index=[minute]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) with self.assertRaises(BcolzMinuteOverlappingData): - self.writer.write(sid, data) + self.writer.write_sid(sid, data) def test_write_multiple_sids(self): """ @@ -361,7 +361,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [100.0] }, index=[minute]) - self.writer.write(sids[0], data) + self.writer.write_sid(sids[0], data) data = DataFrame( data={ @@ -372,7 +372,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [200.0] }, index=[minute]) - self.writer.write(sids[1], data) + self.writer.write_sid(sids[1], data) sid = sids[0] @@ -442,7 +442,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [100.0] }, index=[minute]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) open_price = self.reader.get_value(sid, minute, 'open') @@ -489,7 +489,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': full(9, 0), }, index=[minutes]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) fields = ['open', 'high', 'low', 'close', 'volume'] @@ -531,7 +531,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': full(9, 0), }, index=[minutes]) - self.writer.write(sid, data) + self.writer.write_sid(sid, data) fields = ['open', 'high', 'low', 'close', 'volume'] @@ -630,7 +630,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [1000, 0, 1001] }, index=minutes) - self.writer.write(sids[0], data_1) + self.writer.write_sid(sids[0], data_1) data_2 = DataFrame( data={ @@ -641,7 +641,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [2000, 0, 2001] }, index=minutes) - self.writer.write(sids[1], data_2) + self.writer.write_sid(sids[1], data_2) reader = BcolzMinuteBarReader(self.dest) @@ -681,7 +681,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [1000, 1001, 1002], }, index=minutes) - self.writer.write(sids[0], data_1) + self.writer.write_sid(sids[0], data_1) data_2 = DataFrame( data={ @@ -692,7 +692,7 @@ class BcolzMinuteBarTestCase(TestCase): 'volume': [2000, 2001, 2002], }, index=minutes) - self.writer.write(sids[1], data_2) + self.writer.write_sid(sids[1], data_2) reader = BcolzMinuteBarReader(self.dest) diff --git a/tests/finance/test_slippage.py b/tests/finance/test_slippage.py index 67f8a37c..61720f69 100644 --- a/tests/finance/test_slippage.py +++ b/tests/finance/test_slippage.py @@ -58,18 +58,16 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase): @classmethod def make_minute_bar_data(cls): - return { - 133: pd.DataFrame( - { - 'open': [3.0, 3.0, 3.5, 4.0, 3.5], - 'high': [3.15, 3.15, 3.15, 3.15, 3.15], - 'low': [2.85, 2.85, 2.85, 2.85, 2.85], - 'close': [3.0, 3.5, 4.0, 3.5, 3.0], - 'volume': [2000, 2000, 2000, 2000, 2000], - }, - index=cls.minutes, - ), - } + yield 133, pd.DataFrame( + { + 'open': [3.0, 3.0, 3.5, 4.0, 3.5], + 'high': [3.15, 3.15, 3.15, 3.15, 3.15], + 'low': [2.85, 2.85, 2.85, 2.85, 2.85], + 'close': [3.0, 3.5, 4.0, 3.5, 3.0], + 'volume': [2000, 2000, 2000, 2000, 2000], + }, + index=cls.minutes, + ) @classmethod def init_class_fixtures(cls): @@ -77,8 +75,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase): cls.ASSET133 = cls.env.asset_finder.retrieve_asset(133) def test_volume_share_slippage(self): - assets = { - 133: pd.DataFrame( + assets = ( + (133, pd.DataFrame( { 'open': [3.00], 'high': [3.15], @@ -87,8 +85,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase): 'volume': [200], }, index=[self.minutes[0]], - ), - } + )), + ) days = pd.date_range( start=normalize_date(self.minutes[0]), end=normalize_date(self.minutes[-1]) @@ -465,8 +463,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase): data['sid'] = self.ASSET133 order = Order(**data) - assets = { - 133: pd.DataFrame( + assets = ( + (133, pd.DataFrame( { 'open': [event_data['open']], 'high': [event_data['high']], @@ -475,8 +473,8 @@ class SlippageTestCase(WithSimParams, WithDataPortal, ZiplineTestCase): 'volume': [event_data['volume']], }, index=[pd.Timestamp('2006-01-05 14:31', tz='UTC')], - ), - } + )), + ) days = pd.date_range( start=normalize_date(self.minutes[0]), end=normalize_date(self.minutes[-1]) diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index c74a346d..fbfda6ab 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -29,7 +29,6 @@ from testfixtures import TempDirectory import numpy as np import pandas as pd import pytz -from toolz import merge from zipline import TradingAlgorithm from zipline.api import FixedSlippage @@ -1039,25 +1038,20 @@ class TestBeforeTradingStart(WithDataPortal, index=asset_minutes, ) split_data.iloc[780:] = split_data.iloc[780:] / 2.0 - return merge( - { - sid: create_minute_df_for_asset( - cls.env, - cls.data_start, - cls.sim_params.period_end, - ) - for sid in (1, 8554) - }, - { - 2: create_minute_df_for_asset( - cls.env, - cls.data_start, - cls.sim_params.period_end, - 50, - ), - cls.SPLIT_ASSET_SID: split_data, - }, + for sid in (1, 8554): + yield sid, create_minute_df_for_asset( + cls.env, + cls.data_start, + cls.sim_params.period_end, + ) + + yield 2, create_minute_df_for_asset( + cls.env, + cls.data_start, + cls.sim_params.period_end, + 50, ) + yield cls.SPLIT_ASSET_SID, split_data @classmethod def make_splits_data(cls): @@ -2552,18 +2546,16 @@ class TestOrderCancelation(WithDataPortal, minutes_arr = np.arange(1, 1 + minutes_count) # normal test data, but volume is pinned at 1 share per minute - return { - 1: pd.DataFrame( - { - 'open': minutes_arr + 1, - 'high': minutes_arr + 2, - 'low': minutes_arr - 1, - 'close': minutes_arr, - 'volume': np.full(minutes_count, 1), - }, - index=asset_minutes, - ), - } + yield 1, pd.DataFrame( + { + 'open': minutes_arr + 1, + 'high': minutes_arr + 2, + 'low': minutes_arr - 1, + 'close': minutes_arr, + 'volume': np.full(minutes_count, 1), + }, + index=asset_minutes, + ) @classmethod def make_daily_bar_data(cls): diff --git a/tests/test_api_shim.py b/tests/test_api_shim.py index 09957090..5ff1c85c 100644 --- a/tests/test_api_shim.py +++ b/tests/test_api_shim.py @@ -122,14 +122,12 @@ class TestAPIShim(WithDataPortal, WithSimParams, ZiplineTestCase): @classmethod def make_minute_bar_data(cls): - return { - sid: create_minute_df_for_asset( + for sid in cls.sids: + yield sid, create_minute_df_for_asset( cls.env, cls.SIM_PARAMS_START, cls.SIM_PARAMS_END, ) - for sid in cls.sids - } @classmethod def make_daily_bar_data(cls): diff --git a/tests/test_bar_data.py b/tests/test_bar_data.py index d1dc2c2d..761530ff 100644 --- a/tests/test_bar_data.py +++ b/tests/test_bar_data.py @@ -15,7 +15,6 @@ from nose_parameterized import parameterized import numpy as np import pandas as pd -from toolz import merge from zipline._protocol import handle_non_market_minutes from zipline.protocol import BarData @@ -109,32 +108,26 @@ class TestMinuteBarData(WithBarDataChecks, # asset2 has trades every 10 minutes # split_asset trades every minute # illiquid_split_asset trades every 10 minutes - return merge( - { - sid: create_minute_df_for_asset( - cls.env, - cls.bcolz_minute_bar_days[0], - cls.bcolz_minute_bar_days[-1], - ) - for sid in (1, cls.SPLIT_ASSET_SID) - }, - { - sid: create_minute_df_for_asset( - cls.env, - cls.bcolz_minute_bar_days[0], - cls.bcolz_minute_bar_days[-1], - 10, - ) - for sid in (2, cls.ILLIQUID_SPLIT_ASSET_SID) - }, - { - cls.HILARIOUSLY_ILLIQUID_ASSET_SID: create_minute_df_for_asset( - cls.env, - cls.bcolz_minute_bar_days[0], - cls.bcolz_minute_bar_days[-1], - 50, - ) - }, + for sid in (1, cls.SPLIT_ASSET_SID): + yield sid, create_minute_df_for_asset( + cls.env, + cls.bcolz_minute_bar_days[0], + cls.bcolz_minute_bar_days[-1], + ) + + for sid in (2, cls.ILLIQUID_SPLIT_ASSET_SID): + yield sid, create_minute_df_for_asset( + cls.env, + cls.bcolz_minute_bar_days[0], + cls.bcolz_minute_bar_days[-1], + 10, + ) + + yield cls.HILARIOUSLY_ILLIQUID_ASSET_SID, create_minute_df_for_asset( + cls.env, + cls.bcolz_minute_bar_days[0], + cls.bcolz_minute_bar_days[-1], + 50, ) @classmethod diff --git a/tests/test_finance.py b/tests/test_finance.py index 0f20f015..fc32934c 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -23,6 +23,7 @@ from nose.tools import timed import numpy as np import pandas as pd import pytz +from six import iteritems from six.moves import range from testfixtures import TempDirectory @@ -219,7 +220,7 @@ class FinanceTestCase(WithLogger, env, env.days_in_range(minutes[0], minutes[-1]), tempdir.path, - assets + iteritems(assets), ) equity_minute_reader = BcolzMinuteBarReader(tempdir.path) diff --git a/tests/test_history.py b/tests/test_history.py index 0aec4913..ae4c1c61 100644 --- a/tests/test_history.py +++ b/tests/test_history.py @@ -2,12 +2,12 @@ from textwrap import dedent from numbers import Real -import pandas as pd +from nose_parameterized import parameterized import numpy as np from numpy import nan from numpy.testing import assert_almost_equal - -from nose_parameterized import parameterized +import pandas as pd +from six import iteritems from zipline import TradingAlgorithm from zipline._protocol import handle_non_market_minutes @@ -473,7 +473,7 @@ class MinuteEquityHistoryTestCase(WithHistory, ZiplineTestCase): start_val=2, interval=10, ) - return data + return iteritems(data) def test_history_in_initialize(self): algo_text = dedent( @@ -986,24 +986,22 @@ class DailyEquityHistoryTestCase(WithHistory, ZiplineTestCase): def make_minute_bar_data(cls): asset1 = cls.asset_finder.retrieve_asset(1) asset2 = cls.asset_finder.retrieve_asset(2) - return { - asset1.sid: create_minute_df_for_asset( - cls.env, - asset1.start_date, - asset1.end_date, - start_val=2, - ), - asset2.sid: create_minute_df_for_asset( - cls.env, - asset2.start_date, - cls.env.previous_trading_day(asset2.end_date), - start_val=2, - minute_blacklist=[ - pd.Timestamp('2015-01-08 14:31', tz='UTC'), - pd.Timestamp('2015-01-08 21:00', tz='UTC'), - ], - ), - } + yield asset1.sid, create_minute_df_for_asset( + cls.env, + asset1.start_date, + asset1.end_date, + start_val=2, + ) + yield asset2.sid, create_minute_df_for_asset( + cls.env, + asset2.start_date, + cls.env.previous_trading_day(asset2.end_date), + start_val=2, + minute_blacklist=[ + pd.Timestamp('2015-01-08 14:31', tz='UTC'), + pd.Timestamp('2015-01-08 21:00', tz='UTC'), + ], + ) @classmethod def create_df_for_asset(cls, start_day, end_day, interval=1, @@ -1548,32 +1546,30 @@ class MinuteToDailyAggregationTestCase(WithBcolzMinuteBarReader, @classmethod def make_minute_bar_data(cls): - return { - # sid data is created so that at least one high is lower than a - # previous high, and the inverse for low - 1: pd.DataFrame( - { - 'open': [nan, 103.50, 102.50, 104.50, 101.50, nan], - 'high': [nan, 103.90, 102.90, 104.90, 101.90, nan], - 'low': [nan, 103.10, 102.10, 104.10, 101.10, nan], - 'close': [nan, 103.30, 102.30, 104.30, 101.30, nan], - 'volume': [0, 1003, 1002, 1004, 1001, 0] - }, - index=cls.minutes, - ), - # sid 2 is included to provide data on different bars than sid 1, - # as will as illiquidty mid-day - 2: pd.DataFrame( - { - 'open': [201.50, nan, 204.50, nan, 200.50, 202.50], - 'high': [201.90, nan, 204.90, nan, 200.90, 202.90], - 'low': [201.10, nan, 204.10, nan, 200.10, 202.10], - 'close': [201.30, nan, 203.50, nan, 200.30, 202.30], - 'volume': [2001, 0, 2004, 0, 2000, 2002], - }, - index=cls.minutes, - ), - } + # sid data is created so that at least one high is lower than a + # previous high, and the inverse for low + yield 1, pd.DataFrame( + { + 'open': [nan, 103.50, 102.50, 104.50, 101.50, nan], + 'high': [nan, 103.90, 102.90, 104.90, 101.90, nan], + 'low': [nan, 103.10, 102.10, 104.10, 101.10, nan], + 'close': [nan, 103.30, 102.30, 104.30, 101.30, nan], + 'volume': [0, 1003, 1002, 1004, 1001, 0] + }, + index=cls.minutes, + ) + # sid 2 is included to provide data on different bars than sid 1, + # as will as illiquidty mid-day + yield 2, pd.DataFrame( + { + 'open': [201.50, nan, 204.50, nan, 200.50, 202.50], + 'high': [201.90, nan, 204.90, nan, 200.90, 202.90], + 'low': [201.10, nan, 204.10, nan, 200.10, 202.10], + 'close': [201.30, nan, 203.50, nan, 200.30, 202.30], + 'volume': [2001, 0, 2004, 0, 2000, 2002], + }, + index=cls.minutes, + ) expected_values = { 1: pd.DataFrame( diff --git a/zipline/data/minute_bars.py b/zipline/data/minute_bars.py index dca46e10..35693cfb 100644 --- a/zipline/data/minute_bars.py +++ b/zipline/data/minute_bars.py @@ -29,6 +29,7 @@ from zipline.data._minute_bar_internal import ( ) from zipline.gens.sim_engine import NANOS_IN_MINUTE +from zipline.utils.cli import maybe_show_progress from zipline.utils.memoize import lazyval US_EQUITIES_MINUTES_PER_DAY = 390 @@ -441,7 +442,36 @@ class BcolzMinuteBarWriter(object): assert new_last_date == date, "new_last_date={0} != date={1}".format( new_last_date, date) - def write(self, sid, df): + def write(self, data, show_progress=False): + """Write a stream of minute data. + + Parameters + ---------- + data : iterable[(int, pd.DataFrame)] + The data to write. Each element should be a tuple of sid, data + where data has the following format: + columns : ('open', 'high', 'low', 'close', 'volume') + open : float64 + high : float64 + low : float64 + close : float64 + volume : float64|int64 + index : DatetimeIndex of market minutes. + show_progress : bool, optional + Whether or not to show a progress bar while writing. + """ + ctx = maybe_show_progress( + data, + show_progress=show_progress, + item_show_func=lambda e: e if e is None else str(e[0]), + label="Merging minute equity files:", + ) + write_sid = self.write_sid + with ctx as it: + for e in it: + write_sid(*e) + + def write_sid(self, sid, df): """ Write the OHLCV data for the given sid. If there is no bcolz ctable yet created for the sid, create it. diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index d54e209b..ba02dc15 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -209,7 +209,7 @@ class BcolzDailyBarWriter(object): @property def progress_bar_message(self): - return "Merging asset files:" + return "Merging daily equity files:" def progress_bar_item_show_func(self, value): return value if value is None else str(value[0]) @@ -229,9 +229,9 @@ class BcolzDailyBarWriter(object): The assets that should be in ``data``. If this is provided we will check ``data`` against the assets and provide better progress information. - show_progress : bool + show_progress : bool, optional Whether or not to show a progress bar while writing. - invalid_data_behavior : {'warn', 'raise', 'ignore'} + invalid_data_behavior : {'warn', 'raise', 'ignore'}, optional What to do when data is encountered that is outside the range of a uint32. diff --git a/zipline/testing/core.py b/zipline/testing/core.py index a3dcae6e..6d7a4a05 100644 --- a/zipline/testing/core.py +++ b/zipline/testing/core.py @@ -349,7 +349,7 @@ def make_trade_data_for_asset_info(dates, ) if writer: - writer.write(sid, df) + writer.write_sid(sid, df) trade_data[sid] = df @@ -424,8 +424,8 @@ def write_minute_data(env, tempdir, minutes, sids): def create_minute_bar_data(minutes, sids): length = len(minutes) - return { - sid: pd.DataFrame( + for sid_idx, sid in enumerate(sids): + yield sid, pd.DataFrame( { 'open': np.arange(length) + 10 + sid_idx, 'high': np.arange(length) + 15 + sid_idx, @@ -435,8 +435,6 @@ def create_minute_bar_data(minutes, sids): }, index=minutes, ) - for sid_idx, sid in enumerate(sids) - } def create_daily_bar_data(trading_days, sids): @@ -492,20 +490,17 @@ def create_data_portal(env, tempdir, sim_params, sids, adjustment_reader=None): ) -def write_bcolz_minute_data(env, days, path, df_dict): +def write_bcolz_minute_data(env, days, path, data): market_opens = env.open_and_closes.market_open.loc[days] market_closes = env.open_and_closes.market_close.loc[days] - writer = BcolzMinuteBarWriter( + BcolzMinuteBarWriter( days[0], path, market_opens, market_closes, US_EQUITIES_MINUTES_PER_DAY - ) - - for sid, df in iteritems(df_dict): - writer.write(sid, df) + ).write(data) def create_minute_df_for_asset(env, diff --git a/zipline/testing/fixtures.py b/zipline/testing/fixtures.py index d1547489..ff82042c 100644 --- a/zipline/testing/fixtures.py +++ b/zipline/testing/fixtures.py @@ -6,7 +6,7 @@ from contextlib2 import ExitStack from logbook import NullHandler, Logger from nose_parameterized import parameterized from pandas.util.testing import assert_series_equal -from six import with_metaclass, iteritems +from six import with_metaclass from toolz import flip import numpy as np import pandas as pd @@ -681,7 +681,7 @@ class WithBcolzMinuteBarReader(WithTradingEnvironment, WithTmpDir): Methods ------- - make_minute_bar_data() -> dict[int -> pd.DataFrame] + make_minute_bar_data() -> iterable[(int, pd.DataFrame)] A class method that returns a dict mapping sid to dataframe which will be written to the bcolz files that the class's ``BcolzMinuteBarReader`` will read from. By default this creates @@ -734,9 +734,7 @@ class WithBcolzMinuteBarReader(WithTradingEnvironment, WithTmpDir): cls.env.open_and_closes.market_close.loc[days], US_EQUITIES_MINUTES_PER_DAY ) - cls.bcolz_minute_bar_data = cls.make_minute_bar_data() - for sid, df in iteritems(cls.bcolz_minute_bar_data): - writer.write(sid, df) + writer.write(cls.make_minute_bar_data()) cls.bcolz_minute_bar_reader = BcolzMinuteBarReader(p)