diff --git a/setup.py b/setup.py index 519d4918..0ee21c9d 100644 --- a/setup.py +++ b/setup.py @@ -64,12 +64,12 @@ ext_modules = LazyCythonizingList([ ('zipline.lib.adjustment', ['zipline/lib/adjustment.pyx']), ('zipline.lib.rank', ['zipline/lib/rank.pyx']), ( - 'zipline.pipeline.loaders._equities', - ['zipline/pipeline/loaders/_equities.pyx'], + 'zipline.data._equities', + ['zipline/data/_equities.pyx'], ), ( - 'zipline.pipeline.loaders._adjustments', - ['zipline/pipeline/loaders/_adjustments.pyx'], + 'zipline.data._adjustments', + ['zipline/data/_adjustments.pyx'], ), ]) diff --git a/tests/data/__init__.py b/tests/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/test_us_equity_pricing.py b/tests/data/test_us_equity_pricing.py new file mode 100644 index 00000000..f0a418c5 --- /dev/null +++ b/tests/data/test_us_equity_pricing.py @@ -0,0 +1,268 @@ +# +# Copyright 2015 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from nose_parameterized import parameterized +from numpy import ( + arange, + datetime64, +) +from numpy.testing import ( + assert_array_equal, +) +from pandas import ( + DataFrame, + DatetimeIndex, + Timestamp, +) +from pandas.util.testing import assert_index_equal +from testfixtures import TempDirectory + +from zipline.pipeline.loaders.synthetic import ( + SyntheticDailyBarWriter, +) +from zipline.data.us_equity_pricing import ( + BcolzDailyBarReader, +) +from zipline.finance.trading import TradingEnvironment +from zipline.pipeline.data import USEquityPricing +from zipline.utils.test_utils import ( + seconds_to_timestamp, +) + +TEST_CALENDAR_START = Timestamp('2015-06-01', tz='UTC') +TEST_CALENDAR_STOP = Timestamp('2015-06-30', tz='UTC') + +TEST_QUERY_START = Timestamp('2015-06-10', tz='UTC') +TEST_QUERY_STOP = Timestamp('2015-06-19', tz='UTC') + +# One asset for each of the cases enumerated in load_raw_arrays_from_bcolz. +EQUITY_INFO = DataFrame( + [ + # 1) The equity's trades start and end before query. + {'start_date': '2015-06-01', 'end_date': '2015-06-05'}, + # 2) The equity's trades start and end after query. + {'start_date': '2015-06-22', 'end_date': '2015-06-30'}, + # 3) The equity's data covers all dates in range. + {'start_date': '2015-06-02', 'end_date': '2015-06-30'}, + # 4) The equity's trades start before the query start, but stop + # before the query end. + {'start_date': '2015-06-01', 'end_date': '2015-06-15'}, + # 5) The equity's trades start and end during the query. + {'start_date': '2015-06-12', 'end_date': '2015-06-18'}, + # 6) The equity's trades start during the query, but extend through + # the whole query. + {'start_date': '2015-06-15', 'end_date': '2015-06-25'}, + ], + index=arange(1, 7), + columns=['start_date', 'end_date'], +).astype(datetime64) + +TEST_QUERY_ASSETS = EQUITY_INFO.index + + +class BcolzDailyBarTestCase(TestCase): + + @classmethod + def setUpClass(cls): + all_trading_days = TradingEnvironment().trading_days + cls.trading_days = all_trading_days[ + all_trading_days.get_loc(TEST_CALENDAR_START): + all_trading_days.get_loc(TEST_CALENDAR_STOP) + 1 + ] + + def setUp(self): + + self.asset_info = EQUITY_INFO + self.writer = SyntheticDailyBarWriter( + self.asset_info, + self.trading_days, + ) + + self.dir_ = TempDirectory() + self.dir_.create() + self.dest = self.dir_.getpath('daily_equity_pricing.bcolz') + + def tearDown(self): + self.dir_.cleanup() + + @property + def assets(self): + return self.asset_info.index + + def trading_days_between(self, start, end): + return self.trading_days[self.trading_days.slice_indexer(start, end)] + + def asset_start(self, asset_id): + return self.writer.asset_start(asset_id) + + def asset_end(self, asset_id): + return self.writer.asset_end(asset_id) + + def dates_for_asset(self, asset_id): + start, end = self.asset_start(asset_id), self.asset_end(asset_id) + return self.trading_days_between(start, end) + + def test_write_ohlcv_content(self): + result = self.writer.write(self.dest, self.trading_days, self.assets) + for column in SyntheticDailyBarWriter.OHLCV: + idx = 0 + data = result[column][:] + multiplier = 1 if column == 'volume' else 1000 + for asset_id in self.assets: + for date in self.dates_for_asset(asset_id): + self.assertEqual( + SyntheticDailyBarWriter.expected_value( + asset_id, + date, + column + ) * multiplier, + data[idx], + ) + idx += 1 + self.assertEqual(idx, len(data)) + + def test_write_day_and_id(self): + result = self.writer.write(self.dest, self.trading_days, self.assets) + idx = 0 + ids = result['id'] + days = result['day'] + for asset_id in self.assets: + for date in self.dates_for_asset(asset_id): + self.assertEqual(ids[idx], asset_id) + self.assertEqual(date, seconds_to_timestamp(days[idx])) + idx += 1 + + def test_write_attrs(self): + result = self.writer.write(self.dest, self.trading_days, self.assets) + expected_first_row = { + '1': 0, + '2': 5, # Asset 1 has 5 trading days. + '3': 12, # Asset 2 has 7 trading days. + '4': 33, # Asset 3 has 21 trading days. + '5': 44, # Asset 4 has 11 trading days. + '6': 49, # Asset 5 has 5 trading days. + } + expected_last_row = { + '1': 4, + '2': 11, + '3': 32, + '4': 43, + '5': 48, + '6': 57, # Asset 6 has 9 trading days. + } + expected_calendar_offset = { + '1': 0, # Starts on 6-01, 1st trading day of month. + '2': 15, # Starts on 6-22, 16th trading day of month. + '3': 1, # Starts on 6-02, 2nd trading day of month. + '4': 0, # Starts on 6-01, 1st trading day of month. + '5': 9, # Starts on 6-12, 10th trading day of month. + '6': 10, # Starts on 6-15, 11th trading day of month. + } + self.assertEqual(result.attrs['first_row'], expected_first_row) + self.assertEqual(result.attrs['last_row'], expected_last_row) + self.assertEqual( + result.attrs['calendar_offset'], + expected_calendar_offset, + ) + assert_index_equal( + self.trading_days, + DatetimeIndex(result.attrs['calendar'], tz='UTC'), + ) + + def _check_read_results(self, columns, assets, start_date, end_date): + table = self.writer.write(self.dest, self.trading_days, self.assets) + reader = BcolzDailyBarReader(table) + results = reader.load_raw_arrays(columns, start_date, end_date, assets) + dates = self.trading_days_between(start_date, end_date) + for column, result in zip(columns, results): + assert_array_equal( + result, + self.writer.expected_values_2d( + dates, + assets, + column.name, + ) + ) + + @parameterized.expand([ + ([USEquityPricing.open],), + ([USEquityPricing.close, USEquityPricing.volume],), + ([USEquityPricing.volume, USEquityPricing.high, USEquityPricing.low],), + (USEquityPricing.columns,), + ]) + def test_read(self, columns): + self._check_read_results( + columns, + self.assets, + TEST_QUERY_START, + TEST_QUERY_STOP, + ) + + def test_start_on_asset_start(self): + """ + Test loading with queries that starts on the first day of each asset's + lifetime. + """ + columns = [USEquityPricing.high, USEquityPricing.volume] + for asset in self.assets: + self._check_read_results( + columns, + self.assets, + start_date=self.asset_start(asset), + end_date=self.trading_days[-1], + ) + + def test_start_on_asset_end(self): + """ + Test loading with queries that start on the last day of each asset's + lifetime. + """ + columns = [USEquityPricing.close, USEquityPricing.volume] + for asset in self.assets: + self._check_read_results( + columns, + self.assets, + start_date=self.asset_end(asset), + end_date=self.trading_days[-1], + ) + + def test_end_on_asset_start(self): + """ + Test loading with queries that end on the first day of each asset's + lifetime. + """ + columns = [USEquityPricing.close, USEquityPricing.volume] + for asset in self.assets: + self._check_read_results( + columns, + self.assets, + start_date=self.trading_days[0], + end_date=self.asset_start(asset), + ) + + def test_end_on_asset_end(self): + """ + Test loading with queries that end on the last day of each asset's + lifetime. + """ + columns = [USEquityPricing.close, USEquityPricing.volume] + for asset in self.assets: + self._check_read_results( + columns, + self.assets, + start_date=self.trading_days[0], + end_date=self.asset_end(asset), + ) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index 65d200bd..db89e26a 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -30,12 +30,12 @@ from zipline.pipeline.loaders.synthetic import ( NullAdjustmentReader, SyntheticDailyBarWriter, ) +from zipline.data.us_equity_pricing import BcolzDailyBarReader from zipline.finance.trading import TradingEnvironment from zipline.pipeline import Pipeline from zipline.pipeline.data import USEquityPricing from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY from zipline.pipeline.loaders.equity_pricing_loader import ( - BcolzDailyBarReader, USEquityPricingLoader, ) from zipline.pipeline.engine import SimplePipelineEngine diff --git a/tests/pipeline/test_pipeline_algo.py b/tests/pipeline/test_pipeline_algo.py index 9742f40a..40fef5f5 100644 --- a/tests/pipeline/test_pipeline_algo.py +++ b/tests/pipeline/test_pipeline_algo.py @@ -40,16 +40,18 @@ from zipline.errors import ( PipelineOutputDuringInitialize, NoSuchPipeline, ) +from zipline.data.us_equity_pricing import ( + BcolzDailyBarReader, + DailyBarWriterFromCSVs, + SQLiteAdjustmentWriter, + SQLiteAdjustmentReader, +) from zipline.finance import trading from zipline.pipeline import Pipeline from zipline.pipeline.factors import VWAP from zipline.pipeline.data import USEquityPricing from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY from zipline.pipeline.loaders.equity_pricing_loader import ( - BcolzDailyBarReader, - DailyBarWriterFromCSVs, - SQLiteAdjustmentReader, - SQLiteAdjustmentWriter, USEquityPricingLoader, ) from zipline.utils.test_utils import ( diff --git a/tests/pipeline/test_us_equity_pricing_loader.py b/tests/pipeline/test_us_equity_pricing_loader.py index c4fb9d79..7a768d2b 100644 --- a/tests/pipeline/test_us_equity_pricing_loader.py +++ b/tests/pipeline/test_us_equity_pricing_loader.py @@ -17,7 +17,6 @@ Tests for USEquityPricingLoader and related classes. """ from unittest import TestCase -from nose_parameterized import parameterized from numpy import ( arange, datetime64, @@ -32,11 +31,9 @@ from numpy.testing import ( from pandas import ( concat, DataFrame, - DatetimeIndex, Int64Index, Timestamp, ) -from pandas.util.testing import assert_index_equal from testfixtures import TempDirectory from zipline.lib.adjustment import Float64Multiply @@ -44,12 +41,15 @@ from zipline.pipeline.loaders.synthetic import ( NullAdjustmentReader, SyntheticDailyBarWriter, ) -from zipline.pipeline.loaders.equity_pricing_loader import ( +from zipline.data.us_equity_pricing import ( BcolzDailyBarReader, SQLiteAdjustmentReader, SQLiteAdjustmentWriter, +) +from zipline.pipeline.loaders.equity_pricing_loader import ( USEquityPricingLoader, ) + from zipline.errors import WindowLengthTooLong from zipline.finance.trading import TradingEnvironment from zipline.pipeline.data import USEquityPricing @@ -97,201 +97,6 @@ EQUITY_INFO = DataFrame( TEST_QUERY_ASSETS = EQUITY_INFO.index -class BcolzDailyBarTestCase(TestCase): - - @classmethod - def setUpClass(cls): - all_trading_days = TradingEnvironment().trading_days - cls.trading_days = all_trading_days[ - all_trading_days.get_loc(TEST_CALENDAR_START): - all_trading_days.get_loc(TEST_CALENDAR_STOP) + 1 - ] - - def setUp(self): - - self.asset_info = EQUITY_INFO - self.writer = SyntheticDailyBarWriter( - self.asset_info, - self.trading_days, - ) - - self.dir_ = TempDirectory() - self.dir_.create() - self.dest = self.dir_.getpath('daily_equity_pricing.bcolz') - - def tearDown(self): - self.dir_.cleanup() - - @property - def assets(self): - return self.asset_info.index - - def trading_days_between(self, start, end): - return self.trading_days[self.trading_days.slice_indexer(start, end)] - - def asset_start(self, asset_id): - return self.writer.asset_start(asset_id) - - def asset_end(self, asset_id): - return self.writer.asset_end(asset_id) - - def dates_for_asset(self, asset_id): - start, end = self.asset_start(asset_id), self.asset_end(asset_id) - return self.trading_days_between(start, end) - - def test_write_ohlcv_content(self): - result = self.writer.write(self.dest, self.trading_days, self.assets) - for column in SyntheticDailyBarWriter.OHLCV: - idx = 0 - data = result[column][:] - multiplier = 1 if column == 'volume' else 1000 - for asset_id in self.assets: - for date in self.dates_for_asset(asset_id): - self.assertEqual( - SyntheticDailyBarWriter.expected_value( - asset_id, - date, - column - ) * multiplier, - data[idx], - ) - idx += 1 - self.assertEqual(idx, len(data)) - - def test_write_day_and_id(self): - result = self.writer.write(self.dest, self.trading_days, self.assets) - idx = 0 - ids = result['id'] - days = result['day'] - for asset_id in self.assets: - for date in self.dates_for_asset(asset_id): - self.assertEqual(ids[idx], asset_id) - self.assertEqual(date, seconds_to_timestamp(days[idx])) - idx += 1 - - def test_write_attrs(self): - result = self.writer.write(self.dest, self.trading_days, self.assets) - expected_first_row = { - '1': 0, - '2': 5, # Asset 1 has 5 trading days. - '3': 12, # Asset 2 has 7 trading days. - '4': 33, # Asset 3 has 21 trading days. - '5': 44, # Asset 4 has 11 trading days. - '6': 49, # Asset 5 has 5 trading days. - } - expected_last_row = { - '1': 4, - '2': 11, - '3': 32, - '4': 43, - '5': 48, - '6': 57, # Asset 6 has 9 trading days. - } - expected_calendar_offset = { - '1': 0, # Starts on 6-01, 1st trading day of month. - '2': 15, # Starts on 6-22, 16th trading day of month. - '3': 1, # Starts on 6-02, 2nd trading day of month. - '4': 0, # Starts on 6-01, 1st trading day of month. - '5': 9, # Starts on 6-12, 10th trading day of month. - '6': 10, # Starts on 6-15, 11th trading day of month. - } - self.assertEqual(result.attrs['first_row'], expected_first_row) - self.assertEqual(result.attrs['last_row'], expected_last_row) - self.assertEqual( - result.attrs['calendar_offset'], - expected_calendar_offset, - ) - assert_index_equal( - self.trading_days, - DatetimeIndex(result.attrs['calendar'], tz='UTC'), - ) - - def _check_read_results(self, columns, assets, start_date, end_date): - table = self.writer.write(self.dest, self.trading_days, self.assets) - reader = BcolzDailyBarReader(table) - results = reader.load_raw_arrays(columns, start_date, end_date, assets) - dates = self.trading_days_between(start_date, end_date) - for column, result in zip(columns, results): - assert_array_equal( - result, - self.writer.expected_values_2d( - dates, - assets, - column.name, - ) - ) - - @parameterized.expand([ - ([USEquityPricing.open],), - ([USEquityPricing.close, USEquityPricing.volume],), - ([USEquityPricing.volume, USEquityPricing.high, USEquityPricing.low],), - (USEquityPricing.columns,), - ]) - def test_read(self, columns): - self._check_read_results( - columns, - self.assets, - TEST_QUERY_START, - TEST_QUERY_STOP, - ) - - def test_start_on_asset_start(self): - """ - Test loading with queries that starts on the first day of each asset's - lifetime. - """ - columns = [USEquityPricing.high, USEquityPricing.volume] - for asset in self.assets: - self._check_read_results( - columns, - self.assets, - start_date=self.asset_start(asset), - end_date=self.trading_days[-1], - ) - - def test_start_on_asset_end(self): - """ - Test loading with queries that start on the last day of each asset's - lifetime. - """ - columns = [USEquityPricing.close, USEquityPricing.volume] - for asset in self.assets: - self._check_read_results( - columns, - self.assets, - start_date=self.asset_end(asset), - end_date=self.trading_days[-1], - ) - - def test_end_on_asset_start(self): - """ - Test loading with queries that end on the first day of each asset's - lifetime. - """ - columns = [USEquityPricing.close, USEquityPricing.volume] - for asset in self.assets: - self._check_read_results( - columns, - self.assets, - start_date=self.trading_days[0], - end_date=self.asset_start(asset), - ) - - def test_end_on_asset_end(self): - """ - Test loading with queries that end on the last day of each asset's - lifetime. - """ - columns = [USEquityPricing.close, USEquityPricing.volume] - for asset in self.assets: - self._check_read_results( - columns, - self.assets, - start_date=self.trading_days[0], - end_date=self.asset_end(asset), - ) - - # ADJUSTMENTS use the following scheme to indicate information about the value # upon inspection. # diff --git a/zipline/pipeline/loaders/_adjustments.pyx b/zipline/data/_adjustments.pyx similarity index 100% rename from zipline/pipeline/loaders/_adjustments.pyx rename to zipline/data/_adjustments.pyx diff --git a/zipline/pipeline/loaders/_equities.pyx b/zipline/data/_equities.pyx similarity index 100% rename from zipline/pipeline/loaders/_equities.pyx rename to zipline/data/_equities.pyx diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py new file mode 100644 index 00000000..b31a9021 --- /dev/null +++ b/zipline/data/us_equity_pricing.py @@ -0,0 +1,560 @@ +# Copyright 2015 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from abc import ( + ABCMeta, + abstractmethod, +) +from contextlib import contextmanager +from errno import ENOENT +from os import remove +from os.path import exists +import sqlite3 + +from bcolz import ( + carray, + ctable, +) +from click import progressbar +from numpy import ( + array, + float64, + floating, + full, + iinfo, + integer, + issubdtype, + uint32, +) +from pandas import ( + DatetimeIndex, + read_csv, + Timestamp, +) +from six import ( + iteritems, + string_types, + with_metaclass, +) + +from ._equities import _compute_row_slices, _read_bcolz_data +from ._adjustments import load_adjustments_from_sqlite + +OHLC = frozenset(['open', 'high', 'low', 'close']) +US_EQUITY_PRICING_BCOLZ_COLUMNS = [ + 'open', 'high', 'low', 'close', 'volume', 'day', 'id' +] +DAILY_US_EQUITY_PRICING_DEFAULT_FILENAME = 'daily_us_equity_pricing.bcolz' +SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid']) +SQLITE_ADJUSTMENT_COLUMN_DTYPES = { + 'effective_date': integer, + 'ratio': floating, + 'sid': integer, +} +SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers']) + +UINT32_MAX = iinfo(uint32).max + + +class BcolzDailyBarWriter(with_metaclass(ABCMeta)): + """ + Class capable of writing daily OHLCV data to disk in a format that can be + read efficiently by BcolzDailyOHLCVReader. + + See Also + -------- + BcolzDailyBarReader : Consumer of the data written by this class. + """ + + @abstractmethod + def gen_tables(self, assets): + """ + Return an iterator of pairs of (asset_id, bcolz.ctable). + """ + raise NotImplementedError() + + @abstractmethod + def to_uint32(self, array, colname): + """ + Convert raw column values produced by gen_tables into uint32 values. + + Parameters + ---------- + array : np.array + An array of raw values. + colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'} + The name of the column being loaded. + + For output being read by the default BcolzOHLCVReader, data should be + stored in the following manner: + + - Pricing columns (Open, High, Low, Close) should be stored as 1000 * + as-traded dollar value. + - Volume should be the as-traded volume. + - Dates should be stored as seconds since midnight UTC, Jan 1, 1970. + """ + raise NotImplementedError() + + def write(self, filename, calendar, assets, show_progress=False): + """ + Parameters + ---------- + filename : str + The location at which we should write our output. + calendar : pandas.DatetimeIndex + Calendar to use to compute asset calendar offsets. + assets : pandas.Int64Index + The assets for which to write data. + show_progress : bool + Whether or not to show a progress bar while writing. + + Returns + ------- + table : bcolz.ctable + The newly-written table. + """ + _iterator = self.gen_tables(assets) + if show_progress: + pbar = progressbar( + _iterator, + length=len(assets), + item_show_func=lambda i: i if i is None else str(i[0]), + label="Merging asset files:", + ) + with pbar as pbar_iterator: + return self._write_internal(filename, calendar, pbar_iterator) + return self._write_internal(filename, calendar, _iterator) + + def _write_internal(self, filename, calendar, iterator): + """ + Internal implementation of write. + + `iterator` should be an iterator yielding pairs of (asset, ctable). + """ + total_rows = 0 + first_row = {} + last_row = {} + calendar_offset = {} + + # Maps column name -> output carray. + columns = { + k: carray(array([], dtype=uint32)) + for k in US_EQUITY_PRICING_BCOLZ_COLUMNS + } + + for asset_id, table in iterator: + nrows = len(table) + for column_name in columns: + if column_name == 'id': + # We know what the content of this column is, so don't + # bother reading it. + columns['id'].append(full((nrows,), asset_id)) + continue + columns[column_name].append( + self.to_uint32(table[column_name][:], column_name) + ) + + # Bcolz doesn't support ints as keys in `attrs`, so convert + # assets to strings for use as attr keys. + asset_key = str(asset_id) + + # Calculate the index into the array of the first and last row + # for this asset. This allows us to efficiently load single + # assets when querying the data back out of the table. + first_row[asset_key] = total_rows + last_row[asset_key] = total_rows + nrows - 1 + total_rows += nrows + + # Calculate the number of trading days between the first date + # in the stored data and the first date of **this** asset. This + # offset used for output alignment by the reader. + + # HACK: Index with a list so that we get back an array we can pass + # to self.to_uint32. We could try to extract this in the loop + # above, but that makes the logic a lot messier. + asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0] + calendar_offset[asset_key] = calendar.get_loc( + Timestamp(asset_first_day, unit='s', tz='UTC'), + ) + + # This writes the table to disk. + full_table = ctable( + columns=[ + columns[colname] + for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS + ], + names=US_EQUITY_PRICING_BCOLZ_COLUMNS, + rootdir=filename, + mode='w', + ) + full_table.attrs['first_row'] = first_row + full_table.attrs['last_row'] = last_row + full_table.attrs['calendar_offset'] = calendar_offset + full_table.attrs['calendar'] = calendar.asi8.tolist() + return full_table + + +class DailyBarWriterFromCSVs(BcolzDailyBarWriter): + """ + BcolzDailyBarWriter constructed from a map from csvs to assets. + + Parameters + ---------- + asset_map : dict + A map from asset_id -> path to csv with data for that asset. + + CSVs should have the following columns: + day : datetime64 + open : float64 + high : float64 + low : float64 + close : float64 + volume : int64 + """ + _csv_dtypes = { + 'open': float64, + 'high': float64, + 'low': float64, + 'close': float64, + 'volume': float64, + } + + def __init__(self, asset_map): + self._asset_map = asset_map + + def gen_tables(self, assets): + """ + Read CSVs as DataFrames from our asset map. + """ + dtypes = self._csv_dtypes + for asset in assets: + path = self._asset_map.get(asset) + if path is None: + raise KeyError("No path supplied for asset %s" % asset) + data = read_csv(path, parse_dates=['day'], dtype=dtypes) + yield asset, ctable.fromdataframe(data) + + def to_uint32(self, array, colname): + arrmax = array.max() + if colname in OHLC: + self.check_uint_safe(arrmax * 1000, colname) + return (array * 1000).astype(uint32) + elif colname == 'volume': + self.check_uint_safe(arrmax, colname) + return array.astype(uint32) + elif colname == 'day': + nanos_per_second = (1000 * 1000 * 1000) + self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname) + return (array.view(int) / nanos_per_second).astype(uint32) + + @staticmethod + def check_uint_safe(value, colname): + if value >= UINT32_MAX: + raise ValueError( + "Value %s from column '%s' is too large" % (value, colname) + ) + + +class BcolzDailyBarReader(object): + """ + Reader for raw pricing data written by BcolzDailyOHLCVWriter. + + A Bcolz CTable is comprised of Columns and Attributes. + + Columns + ------- + The table with which this loader interacts contains the following columns: + + ['open', 'high', 'low', 'close', 'volume', 'day', 'id']. + + The data in these columns is interpreted as follows: + + - Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 * + as-traded dollar value. + - Volume is interpreted as as-traded volume. + - Day is interpreted as seconds since midnight UTC, Jan 1, 1970. + - Id is the asset id of the row. + + The data in each column is grouped by asset and then sorted by day within + each asset block. + + The table is built to represent a long time range of data, e.g. ten years + of equity data, so the lengths of each asset block is not equal to each + other. The blocks are clipped to the known start and end date of each asset + to cut down on the number of empty values that would need to be included to + make a regular/cubic dataset. + + When read across the open, high, low, close, and volume with the same + index should represent the same asset and day. + + Attributes + ---------- + The table with which this loader interacts contains the following + attributes: + + first_row : dict + Map from asset_id -> index of first row in the dataset with that id. + last_row : dict + Map from asset_id -> index of last row in the dataset with that id. + calendar_offset : dict + Map from asset_id -> calendar index of first row. + calendar : list[int64] + Calendar used to compute offsets, in asi8 format (ns since EPOCH). + + We use first_row and last_row together to quickly find ranges of rows to + load when reading an asset's data into memory. + + We use calendar_offset and calendar to orient loaded blocks within a + range of queried dates. + """ + def __init__(self, table): + if isinstance(table, string_types): + table = ctable(rootdir=table, mode='r') + + self._table = table + self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC') + self._first_rows = { + int(asset_id): start_index + for asset_id, start_index in iteritems(table.attrs['first_row']) + } + self._last_rows = { + int(asset_id): end_index + for asset_id, end_index in iteritems(table.attrs['last_row']) + } + self._calendar_offsets = { + int(id_): offset + for id_, offset in iteritems(table.attrs['calendar_offset']) + } + + def _compute_slices(self, start_idx, end_idx, assets): + """ + Compute the raw row indices to load for each asset on a query for the + given dates after applying a shift. + + Parameters + ---------- + start_idx : int + Index of first date for which we want data. + end_idx : int + Index of last date for which we want data. + assets : pandas.Int64Index + Assets for which we want to compute row indices + + Returns + ------- + A 3-tuple of (first_rows, last_rows, offsets): + first_rows : np.array[intp] + Array with length == len(assets) containing the index of the first + row to load for each asset in `assets`. + last_rows : np.array[intp] + Array with length == len(assets) containing the index of the last + row to load for each asset in `assets`. + offset : np.array[intp] + Array with length == (len(asset) containing the index in a buffer + of length `dates` corresponding to the first row of each asset. + + The value of offset[i] will be 0 if asset[i] existed at the start + of a query. Otherwise, offset[i] will be equal to the number of + entries in `dates` for which the asset did not yet exist. + """ + # The core implementation of the logic here is implemented in Cython + # for efficiency. + return _compute_row_slices( + self._first_rows, + self._last_rows, + self._calendar_offsets, + start_idx, + end_idx, + assets, + ) + + def load_raw_arrays(self, columns, start_date, end_date, assets): + # Assumes that the given dates are actually in calendar. + start_idx = self._calendar.get_loc(start_date) + end_idx = self._calendar.get_loc(end_date) + first_rows, last_rows, offsets = self._compute_slices( + start_idx, + end_idx, + assets, + ) + return _read_bcolz_data( + self._table, + (end_idx - start_idx + 1, len(assets)), + [column.name for column in columns], + first_rows, + last_rows, + offsets, + ) + + +class SQLiteAdjustmentWriter(object): + """ + Writer for data to be read by SQLiteAdjustmentWriter + + Parameters + ---------- + conn_or_path : str or sqlite3.Connection + A handle to the target sqlite database. + overwrite : bool, optional, default=False + If True and conn_or_path is a string, remove any existing files at the + given path before connecting. + + See Also + -------- + SQLiteAdjustmentReader + """ + + def __init__(self, conn_or_path, overwrite=False): + if isinstance(conn_or_path, sqlite3.Connection): + self.conn = conn_or_path + elif isinstance(conn_or_path, str): + if overwrite and exists(conn_or_path): + try: + remove(conn_or_path) + except OSError as e: + if e.errno != ENOENT: + raise + self.conn = sqlite3.connect(conn_or_path) + else: + raise TypeError("Unknown connection type %s" % type(conn_or_path)) + + def write_frame(self, tablename, frame): + if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS: + raise ValueError( + "Unexpected frame columns:\n" + "Expected Columns: %s\n" + "Received Columns: %s" % ( + SQLITE_ADJUSTMENT_COLUMNS, + frame.columns.tolist(), + ) + ) + elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES: + raise ValueError( + "Adjustment table %s not in %s" % ( + tablename, SQLITE_ADJUSTMENT_TABLENAMES + ) + ) + + expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES + actual_dtypes = frame.dtypes + for colname, expected in iteritems(expected_dtypes): + actual = actual_dtypes[colname] + if not issubdtype(actual, expected): + raise TypeError( + "Expected data of type {expected} for column '{colname}', " + "but got {actual}.".format( + expected=expected, + colname=colname, + actual=actual, + ) + ) + return frame.to_sql(tablename, self.conn) + + def write(self, splits, mergers, dividends): + """ + Writes data to a SQLite file to be read by SQLiteAdjustmentReader. + + Parameters + ---------- + splits : pandas.DataFrame + Dataframe containing split data. + mergers : pandas.DataFrame + DataFrame containing merger data. + dividends : pandas.DataFrame + DataFrame containing dividend data. + + Notes + ----- + DataFrame input (`splits`, `mergers`, and `dividends`) should all have + the following columns: + + effective_date : int + The date, represented as seconds since Unix epoch, on which the + adjustment should be applied. + ratio : float + A value to apply to all data earlier than the effective date. + sid : int + The asset id associated with this adjustment. + + The ratio column is interpreted as follows: + - For all adjustment types, multiply price fields ('open', 'high', + 'low', and 'close') by the ratio. + - For **splits only**, **divide** volume by the adjustment ratio. + + Dividend ratios should be calculated as + 1.0 - (dividend_value / "close on day prior to dividend ex_date"). + + Returns + ------- + None + + See Also + -------- + SQLiteAdjustmentReader : Consumer for the data written by this class + """ + self.write_frame('splits', splits) + self.write_frame('mergers', mergers) + self.write_frame('dividends', dividends) + self.conn.execute( + "CREATE INDEX splits_sids " + "ON splits(sid)" + ) + self.conn.execute( + "CREATE INDEX splits_effective_date " + "ON splits(effective_date)" + ) + self.conn.execute( + "CREATE INDEX mergers_sids " + "ON mergers(sid)" + ) + self.conn.execute( + "CREATE INDEX mergers_effective_date " + "ON mergers(effective_date)" + ) + self.conn.execute( + "CREATE INDEX dividends_sid " + "ON dividends(sid)" + ) + self.conn.execute( + "CREATE INDEX dividends_effective_date " + "ON dividends(effective_date)" + ) + + def close(self): + self.conn.close() + + +class SQLiteAdjustmentReader(object): + """ + Loads adjustments based on corporate actions from a SQLite database. + + Expects data written in the format output by `SQLiteAdjustmentWriter`. + + Parameters + ---------- + conn : str or sqlite3.Connection + Connection from which to load data. + """ + + def __init__(self, conn): + if isinstance(conn, str): + conn = sqlite3.connect(conn) + self.conn = conn + + def load_adjustments(self, columns, dates, assets): + return load_adjustments_from_sqlite( + self.conn, + [column.name for column in columns], + dates, + assets, + ) diff --git a/zipline/pipeline/loaders/equity_pricing_loader.py b/zipline/pipeline/loaders/equity_pricing_loader.py index ba93b2ba..aeb16fa8 100644 --- a/zipline/pipeline/loaders/equity_pricing_loader.py +++ b/zipline/pipeline/loaders/equity_pricing_loader.py @@ -11,41 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from abc import ( - ABCMeta, - abstractmethod, -) -from contextlib import contextmanager -from errno import ENOENT -from os import remove -from os.path import exists -import sqlite3 - -from bcolz import ( - carray, - ctable, -) -from click import progressbar from numpy import ( - array, - float64, - floating, - full, iinfo, - integer, - issubdtype, uint32, ) -from pandas import ( - DatetimeIndex, - read_csv, - Timestamp, -) -from six import ( - iteritems, - string_types, - with_metaclass, -) from zipline.lib.adjusted_array import ( adjusted_array, @@ -53,524 +22,10 @@ from zipline.lib.adjusted_array import ( from zipline.errors import NoFurtherDataError from .base import PipelineLoader -from ._equities import _compute_row_slices, _read_bcolz_data -from ._adjustments import load_adjustments_from_sqlite - -OHLC = frozenset(['open', 'high', 'low', 'close']) -US_EQUITY_PRICING_BCOLZ_COLUMNS = [ - 'open', 'high', 'low', 'close', 'volume', 'day', 'id' -] -DAILY_US_EQUITY_PRICING_DEFAULT_FILENAME = 'daily_us_equity_pricing.bcolz' -SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid']) -SQLITE_ADJUSTMENT_COLUMN_DTYPES = { - 'effective_date': integer, - 'ratio': floating, - 'sid': integer, -} -SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers']) UINT32_MAX = iinfo(uint32).max -@contextmanager -def passthrough(obj): - yield obj - - -class BcolzDailyBarWriter(with_metaclass(ABCMeta)): - """ - Class capable of writing daily OHLCV data to disk in a format that can be - read efficiently by BcolzDailyOHLCVReader. - - See Also - -------- - BcolzDailyBarReader : Consumer of the data written by this class. - """ - - @abstractmethod - def gen_tables(self, assets): - """ - Return an iterator of pairs of (asset_id, bcolz.ctable). - """ - raise NotImplementedError() - - @abstractmethod - def to_uint32(self, array, colname): - """ - Convert raw column values produced by gen_tables into uint32 values. - - Parameters - ---------- - array : np.array - An array of raw values. - colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'} - The name of the column being loaded. - - For output being read by the default BcolzOHLCVReader, data should be - stored in the following manner: - - - Pricing columns (Open, High, Low, Close) should be stored as 1000 * - as-traded dollar value. - - Volume should be the as-traded volume. - - Dates should be stored as seconds since midnight UTC, Jan 1, 1970. - """ - raise NotImplementedError() - - def write(self, filename, calendar, assets, show_progress=False): - """ - Parameters - ---------- - filename : str - The location at which we should write our output. - calendar : pandas.DatetimeIndex - Calendar to use to compute asset calendar offsets. - assets : pandas.Int64Index - The assets for which to write data. - show_progress : bool - Whether or not to show a progress bar while writing. - - Returns - ------- - table : bcolz.ctable - The newly-written table. - """ - _iterator = self.gen_tables(assets) - if show_progress: - pbar = progressbar( - _iterator, - length=len(assets), - item_show_func=lambda i: i if i is None else str(i[0]), - label="Merging asset files:", - ) - with pbar as pbar_iterator: - return self._write_internal(filename, calendar, pbar_iterator) - return self._write_internal(filename, calendar, _iterator) - - def _write_internal(self, filename, calendar, iterator): - """ - Internal implementation of write. - - `iterator` should be an iterator yielding pairs of (asset, ctable). - """ - total_rows = 0 - first_row = {} - last_row = {} - calendar_offset = {} - - # Maps column name -> output carray. - columns = { - k: carray(array([], dtype=uint32)) - for k in US_EQUITY_PRICING_BCOLZ_COLUMNS - } - - for asset_id, table in iterator: - nrows = len(table) - for column_name in columns: - if column_name == 'id': - # We know what the content of this column is, so don't - # bother reading it. - columns['id'].append(full((nrows,), asset_id)) - continue - columns[column_name].append( - self.to_uint32(table[column_name][:], column_name) - ) - - # Bcolz doesn't support ints as keys in `attrs`, so convert - # assets to strings for use as attr keys. - asset_key = str(asset_id) - - # Calculate the index into the array of the first and last row - # for this asset. This allows us to efficiently load single - # assets when querying the data back out of the table. - first_row[asset_key] = total_rows - last_row[asset_key] = total_rows + nrows - 1 - total_rows += nrows - - # Calculate the number of trading days between the first date - # in the stored data and the first date of **this** asset. This - # offset used for output alignment by the reader. - - # HACK: Index with a list so that we get back an array we can pass - # to self.to_uint32. We could try to extract this in the loop - # above, but that makes the logic a lot messier. - asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0] - calendar_offset[asset_key] = calendar.get_loc( - Timestamp(asset_first_day, unit='s', tz='UTC'), - ) - - # This writes the table to disk. - full_table = ctable( - columns=[ - columns[colname] - for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS - ], - names=US_EQUITY_PRICING_BCOLZ_COLUMNS, - rootdir=filename, - mode='w', - ) - full_table.attrs['first_row'] = first_row - full_table.attrs['last_row'] = last_row - full_table.attrs['calendar_offset'] = calendar_offset - full_table.attrs['calendar'] = calendar.asi8.tolist() - return full_table - - -class DailyBarWriterFromCSVs(BcolzDailyBarWriter): - """ - BcolzDailyBarWriter constructed from a map from csvs to assets. - - Parameters - ---------- - asset_map : dict - A map from asset_id -> path to csv with data for that asset. - - CSVs should have the following columns: - day : datetime64 - open : float64 - high : float64 - low : float64 - close : float64 - volume : int64 - """ - _csv_dtypes = { - 'open': float64, - 'high': float64, - 'low': float64, - 'close': float64, - 'volume': float64, - } - - def __init__(self, asset_map): - self._asset_map = asset_map - - def gen_tables(self, assets): - """ - Read CSVs as DataFrames from our asset map. - """ - dtypes = self._csv_dtypes - for asset in assets: - path = self._asset_map.get(asset) - if path is None: - raise KeyError("No path supplied for asset %s" % asset) - data = read_csv(path, parse_dates=['day'], dtype=dtypes) - yield asset, ctable.fromdataframe(data) - - def to_uint32(self, array, colname): - arrmax = array.max() - if colname in OHLC: - self.check_uint_safe(arrmax * 1000, colname) - return (array * 1000).astype(uint32) - elif colname == 'volume': - self.check_uint_safe(arrmax, colname) - return array.astype(uint32) - elif colname == 'day': - nanos_per_second = (1000 * 1000 * 1000) - self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname) - return (array.view(int) / nanos_per_second).astype(uint32) - - @staticmethod - def check_uint_safe(value, colname): - if value >= UINT32_MAX: - raise ValueError( - "Value %s from column '%s' is too large" % (value, colname) - ) - - -class BcolzDailyBarReader(object): - """ - Reader for raw pricing data written by BcolzDailyOHLCVWriter. - - A Bcolz CTable is comprised of Columns and Attributes. - - Columns - ------- - The table with which this loader interacts contains the following columns: - - ['open', 'high', 'low', 'close', 'volume', 'day', 'id']. - - The data in these columns is interpreted as follows: - - - Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 * - as-traded dollar value. - - Volume is interpreted as as-traded volume. - - Day is interpreted as seconds since midnight UTC, Jan 1, 1970. - - Id is the asset id of the row. - - The data in each column is grouped by asset and then sorted by day within - each asset block. - - The table is built to represent a long time range of data, e.g. ten years - of equity data, so the lengths of each asset block is not equal to each - other. The blocks are clipped to the known start and end date of each asset - to cut down on the number of empty values that would need to be included to - make a regular/cubic dataset. - - When read across the open, high, low, close, and volume with the same - index should represent the same asset and day. - - Attributes - ---------- - The table with which this loader interacts contains the following - attributes: - - first_row : dict - Map from asset_id -> index of first row in the dataset with that id. - last_row : dict - Map from asset_id -> index of last row in the dataset with that id. - calendar_offset : dict - Map from asset_id -> calendar index of first row. - calendar : list[int64] - Calendar used to compute offsets, in asi8 format (ns since EPOCH). - - We use first_row and last_row together to quickly find ranges of rows to - load when reading an asset's data into memory. - - We use calendar_offset and calendar to orient loaded blocks within a - range of queried dates. - """ - def __init__(self, table): - if isinstance(table, string_types): - table = ctable(rootdir=table, mode='r') - - self._table = table - self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC') - self._first_rows = { - int(asset_id): start_index - for asset_id, start_index in iteritems(table.attrs['first_row']) - } - self._last_rows = { - int(asset_id): end_index - for asset_id, end_index in iteritems(table.attrs['last_row']) - } - self._calendar_offsets = { - int(id_): offset - for id_, offset in iteritems(table.attrs['calendar_offset']) - } - - def _compute_slices(self, start_idx, end_idx, assets): - """ - Compute the raw row indices to load for each asset on a query for the - given dates after applying a shift. - - Parameters - ---------- - start_idx : int - Index of first date for which we want data. - end_idx : int - Index of last date for which we want data. - assets : pandas.Int64Index - Assets for which we want to compute row indices - - Returns - ------- - A 3-tuple of (first_rows, last_rows, offsets): - first_rows : np.array[intp] - Array with length == len(assets) containing the index of the first - row to load for each asset in `assets`. - last_rows : np.array[intp] - Array with length == len(assets) containing the index of the last - row to load for each asset in `assets`. - offset : np.array[intp] - Array with length == (len(asset) containing the index in a buffer - of length `dates` corresponding to the first row of each asset. - - The value of offset[i] will be 0 if asset[i] existed at the start - of a query. Otherwise, offset[i] will be equal to the number of - entries in `dates` for which the asset did not yet exist. - """ - # The core implementation of the logic here is implemented in Cython - # for efficiency. - return _compute_row_slices( - self._first_rows, - self._last_rows, - self._calendar_offsets, - start_idx, - end_idx, - assets, - ) - - def load_raw_arrays(self, columns, start_date, end_date, assets): - # Assumes that the given dates are actually in calendar. - start_idx = self._calendar.get_loc(start_date) - end_idx = self._calendar.get_loc(end_date) - first_rows, last_rows, offsets = self._compute_slices( - start_idx, - end_idx, - assets, - ) - return _read_bcolz_data( - self._table, - (end_idx - start_idx + 1, len(assets)), - [column.name for column in columns], - first_rows, - last_rows, - offsets, - ) - - -class SQLiteAdjustmentWriter(object): - """ - Writer for data to be read by SQLiteAdjustmentWriter - - Parameters - ---------- - conn_or_path : str or sqlite3.Connection - A handle to the target sqlite database. - overwrite : bool, optional, default=False - If True and conn_or_path is a string, remove any existing files at the - given path before connecting. - - See Also - -------- - SQLiteAdjustmentReader - """ - - def __init__(self, conn_or_path, overwrite=False): - if isinstance(conn_or_path, sqlite3.Connection): - self.conn = conn_or_path - elif isinstance(conn_or_path, str): - if overwrite and exists(conn_or_path): - try: - remove(conn_or_path) - except OSError as e: - if e.errno != ENOENT: - raise - self.conn = sqlite3.connect(conn_or_path) - else: - raise TypeError("Unknown connection type %s" % type(conn_or_path)) - - def write_frame(self, tablename, frame): - if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS: - raise ValueError( - "Unexpected frame columns:\n" - "Expected Columns: %s\n" - "Received Columns: %s" % ( - SQLITE_ADJUSTMENT_COLUMNS, - frame.columns.tolist(), - ) - ) - elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES: - raise ValueError( - "Adjustment table %s not in %s" % ( - tablename, SQLITE_ADJUSTMENT_TABLENAMES - ) - ) - - expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES - actual_dtypes = frame.dtypes - for colname, expected in iteritems(expected_dtypes): - actual = actual_dtypes[colname] - if not issubdtype(actual, expected): - raise TypeError( - "Expected data of type {expected} for column '{colname}', " - "but got {actual}.".format( - expected=expected, - colname=colname, - actual=actual, - ) - ) - return frame.to_sql(tablename, self.conn) - - def write(self, splits, mergers, dividends): - """ - Writes data to a SQLite file to be read by SQLiteAdjustmentReader. - - Parameters - ---------- - splits : pandas.DataFrame - Dataframe containing split data. - mergers : pandas.DataFrame - DataFrame containing merger data. - dividends : pandas.DataFrame - DataFrame containing dividend data. - - Notes - ----- - DataFrame input (`splits`, `mergers`, and `dividends`) should all have - the following columns: - - effective_date : int - The date, represented as seconds since Unix epoch, on which the - adjustment should be applied. - ratio : float - A value to apply to all data earlier than the effective date. - sid : int - The asset id associated with this adjustment. - - The ratio column is interpreted as follows: - - For all adjustment types, multiply price fields ('open', 'high', - 'low', and 'close') by the ratio. - - For **splits only**, **divide** volume by the adjustment ratio. - - Dividend ratios should be calculated as - 1.0 - (dividend_value / "close on day prior to dividend ex_date"). - - Returns - ------- - None - - See Also - -------- - SQLiteAdjustmentReader : Consumer for the data written by this class - """ - self.write_frame('splits', splits) - self.write_frame('mergers', mergers) - self.write_frame('dividends', dividends) - self.conn.execute( - "CREATE INDEX splits_sids " - "ON splits(sid)" - ) - self.conn.execute( - "CREATE INDEX splits_effective_date " - "ON splits(effective_date)" - ) - self.conn.execute( - "CREATE INDEX mergers_sids " - "ON mergers(sid)" - ) - self.conn.execute( - "CREATE INDEX mergers_effective_date " - "ON mergers(effective_date)" - ) - self.conn.execute( - "CREATE INDEX dividends_sid " - "ON dividends(sid)" - ) - self.conn.execute( - "CREATE INDEX dividends_effective_date " - "ON dividends(effective_date)" - ) - - def close(self): - self.conn.close() - - -class SQLiteAdjustmentReader(object): - """ - Loads adjustments based on corporate actions from a SQLite database. - - Expects data written in the format output by `SQLiteAdjustmentWriter`. - - Parameters - ---------- - conn : str or sqlite3.Connection - Connection from which to load data. - """ - - def __init__(self, conn): - if isinstance(conn, str): - conn = sqlite3.connect(conn) - self.conn = conn - - def load_adjustments(self, columns, dates, assets): - return load_adjustments_from_sqlite( - self.conn, - [column.name for column in columns], - dates, - assets, - ) - - class USEquityPricingLoader(PipelineLoader): """ PipelineLoader for US Equity Pricing data diff --git a/zipline/pipeline/loaders/synthetic.py b/zipline/pipeline/loaders/synthetic.py index 15bdeb37..2aff90de 100644 --- a/zipline/pipeline/loaders/synthetic.py +++ b/zipline/pipeline/loaders/synthetic.py @@ -17,7 +17,7 @@ from sqlite3 import connect as sqlite3_connect from .base import PipelineLoader from .frame import DataFrameLoader -from .equity_pricing_loader import ( +from zipline.data.us_equity_pricing import ( BcolzDailyBarWriter, SQLiteAdjustmentReader, SQLiteAdjustmentWriter,