mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-01 18:37:40 +08:00
MAINT: Move equity data formats out of loader.
Put the logic for reading and writing the equity price and adjustment data into a module located in data, making it distinct from the pipeline loader usage of the formats. This prepares for both incoming changes of how adjustments are written, (which includes using the bcolz daily reader as an input), as well as eventually providing the readers to a DataPortal object.
This commit is contained in:
@@ -64,12 +64,12 @@ ext_modules = LazyCythonizingList([
|
||||
('zipline.lib.adjustment', ['zipline/lib/adjustment.pyx']),
|
||||
('zipline.lib.rank', ['zipline/lib/rank.pyx']),
|
||||
(
|
||||
'zipline.pipeline.loaders._equities',
|
||||
['zipline/pipeline/loaders/_equities.pyx'],
|
||||
'zipline.data._equities',
|
||||
['zipline/data/_equities.pyx'],
|
||||
),
|
||||
(
|
||||
'zipline.pipeline.loaders._adjustments',
|
||||
['zipline/pipeline/loaders/_adjustments.pyx'],
|
||||
'zipline.data._adjustments',
|
||||
['zipline/data/_adjustments.pyx'],
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@@ -0,0 +1,268 @@
|
||||
#
|
||||
# Copyright 2015 Quantopian, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from unittest import TestCase
|
||||
|
||||
from nose_parameterized import parameterized
|
||||
from numpy import (
|
||||
arange,
|
||||
datetime64,
|
||||
)
|
||||
from numpy.testing import (
|
||||
assert_array_equal,
|
||||
)
|
||||
from pandas import (
|
||||
DataFrame,
|
||||
DatetimeIndex,
|
||||
Timestamp,
|
||||
)
|
||||
from pandas.util.testing import assert_index_equal
|
||||
from testfixtures import TempDirectory
|
||||
|
||||
from zipline.pipeline.loaders.synthetic import (
|
||||
SyntheticDailyBarWriter,
|
||||
)
|
||||
from zipline.data.us_equity_pricing import (
|
||||
BcolzDailyBarReader,
|
||||
)
|
||||
from zipline.finance.trading import TradingEnvironment
|
||||
from zipline.pipeline.data import USEquityPricing
|
||||
from zipline.utils.test_utils import (
|
||||
seconds_to_timestamp,
|
||||
)
|
||||
|
||||
TEST_CALENDAR_START = Timestamp('2015-06-01', tz='UTC')
|
||||
TEST_CALENDAR_STOP = Timestamp('2015-06-30', tz='UTC')
|
||||
|
||||
TEST_QUERY_START = Timestamp('2015-06-10', tz='UTC')
|
||||
TEST_QUERY_STOP = Timestamp('2015-06-19', tz='UTC')
|
||||
|
||||
# One asset for each of the cases enumerated in load_raw_arrays_from_bcolz.
|
||||
EQUITY_INFO = DataFrame(
|
||||
[
|
||||
# 1) The equity's trades start and end before query.
|
||||
{'start_date': '2015-06-01', 'end_date': '2015-06-05'},
|
||||
# 2) The equity's trades start and end after query.
|
||||
{'start_date': '2015-06-22', 'end_date': '2015-06-30'},
|
||||
# 3) The equity's data covers all dates in range.
|
||||
{'start_date': '2015-06-02', 'end_date': '2015-06-30'},
|
||||
# 4) The equity's trades start before the query start, but stop
|
||||
# before the query end.
|
||||
{'start_date': '2015-06-01', 'end_date': '2015-06-15'},
|
||||
# 5) The equity's trades start and end during the query.
|
||||
{'start_date': '2015-06-12', 'end_date': '2015-06-18'},
|
||||
# 6) The equity's trades start during the query, but extend through
|
||||
# the whole query.
|
||||
{'start_date': '2015-06-15', 'end_date': '2015-06-25'},
|
||||
],
|
||||
index=arange(1, 7),
|
||||
columns=['start_date', 'end_date'],
|
||||
).astype(datetime64)
|
||||
|
||||
TEST_QUERY_ASSETS = EQUITY_INFO.index
|
||||
|
||||
|
||||
class BcolzDailyBarTestCase(TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
all_trading_days = TradingEnvironment().trading_days
|
||||
cls.trading_days = all_trading_days[
|
||||
all_trading_days.get_loc(TEST_CALENDAR_START):
|
||||
all_trading_days.get_loc(TEST_CALENDAR_STOP) + 1
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.asset_info = EQUITY_INFO
|
||||
self.writer = SyntheticDailyBarWriter(
|
||||
self.asset_info,
|
||||
self.trading_days,
|
||||
)
|
||||
|
||||
self.dir_ = TempDirectory()
|
||||
self.dir_.create()
|
||||
self.dest = self.dir_.getpath('daily_equity_pricing.bcolz')
|
||||
|
||||
def tearDown(self):
|
||||
self.dir_.cleanup()
|
||||
|
||||
@property
|
||||
def assets(self):
|
||||
return self.asset_info.index
|
||||
|
||||
def trading_days_between(self, start, end):
|
||||
return self.trading_days[self.trading_days.slice_indexer(start, end)]
|
||||
|
||||
def asset_start(self, asset_id):
|
||||
return self.writer.asset_start(asset_id)
|
||||
|
||||
def asset_end(self, asset_id):
|
||||
return self.writer.asset_end(asset_id)
|
||||
|
||||
def dates_for_asset(self, asset_id):
|
||||
start, end = self.asset_start(asset_id), self.asset_end(asset_id)
|
||||
return self.trading_days_between(start, end)
|
||||
|
||||
def test_write_ohlcv_content(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
for column in SyntheticDailyBarWriter.OHLCV:
|
||||
idx = 0
|
||||
data = result[column][:]
|
||||
multiplier = 1 if column == 'volume' else 1000
|
||||
for asset_id in self.assets:
|
||||
for date in self.dates_for_asset(asset_id):
|
||||
self.assertEqual(
|
||||
SyntheticDailyBarWriter.expected_value(
|
||||
asset_id,
|
||||
date,
|
||||
column
|
||||
) * multiplier,
|
||||
data[idx],
|
||||
)
|
||||
idx += 1
|
||||
self.assertEqual(idx, len(data))
|
||||
|
||||
def test_write_day_and_id(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
idx = 0
|
||||
ids = result['id']
|
||||
days = result['day']
|
||||
for asset_id in self.assets:
|
||||
for date in self.dates_for_asset(asset_id):
|
||||
self.assertEqual(ids[idx], asset_id)
|
||||
self.assertEqual(date, seconds_to_timestamp(days[idx]))
|
||||
idx += 1
|
||||
|
||||
def test_write_attrs(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
expected_first_row = {
|
||||
'1': 0,
|
||||
'2': 5, # Asset 1 has 5 trading days.
|
||||
'3': 12, # Asset 2 has 7 trading days.
|
||||
'4': 33, # Asset 3 has 21 trading days.
|
||||
'5': 44, # Asset 4 has 11 trading days.
|
||||
'6': 49, # Asset 5 has 5 trading days.
|
||||
}
|
||||
expected_last_row = {
|
||||
'1': 4,
|
||||
'2': 11,
|
||||
'3': 32,
|
||||
'4': 43,
|
||||
'5': 48,
|
||||
'6': 57, # Asset 6 has 9 trading days.
|
||||
}
|
||||
expected_calendar_offset = {
|
||||
'1': 0, # Starts on 6-01, 1st trading day of month.
|
||||
'2': 15, # Starts on 6-22, 16th trading day of month.
|
||||
'3': 1, # Starts on 6-02, 2nd trading day of month.
|
||||
'4': 0, # Starts on 6-01, 1st trading day of month.
|
||||
'5': 9, # Starts on 6-12, 10th trading day of month.
|
||||
'6': 10, # Starts on 6-15, 11th trading day of month.
|
||||
}
|
||||
self.assertEqual(result.attrs['first_row'], expected_first_row)
|
||||
self.assertEqual(result.attrs['last_row'], expected_last_row)
|
||||
self.assertEqual(
|
||||
result.attrs['calendar_offset'],
|
||||
expected_calendar_offset,
|
||||
)
|
||||
assert_index_equal(
|
||||
self.trading_days,
|
||||
DatetimeIndex(result.attrs['calendar'], tz='UTC'),
|
||||
)
|
||||
|
||||
def _check_read_results(self, columns, assets, start_date, end_date):
|
||||
table = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
reader = BcolzDailyBarReader(table)
|
||||
results = reader.load_raw_arrays(columns, start_date, end_date, assets)
|
||||
dates = self.trading_days_between(start_date, end_date)
|
||||
for column, result in zip(columns, results):
|
||||
assert_array_equal(
|
||||
result,
|
||||
self.writer.expected_values_2d(
|
||||
dates,
|
||||
assets,
|
||||
column.name,
|
||||
)
|
||||
)
|
||||
|
||||
@parameterized.expand([
|
||||
([USEquityPricing.open],),
|
||||
([USEquityPricing.close, USEquityPricing.volume],),
|
||||
([USEquityPricing.volume, USEquityPricing.high, USEquityPricing.low],),
|
||||
(USEquityPricing.columns,),
|
||||
])
|
||||
def test_read(self, columns):
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
TEST_QUERY_START,
|
||||
TEST_QUERY_STOP,
|
||||
)
|
||||
|
||||
def test_start_on_asset_start(self):
|
||||
"""
|
||||
Test loading with queries that starts on the first day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.high, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.asset_start(asset),
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
|
||||
def test_start_on_asset_end(self):
|
||||
"""
|
||||
Test loading with queries that start on the last day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.asset_end(asset),
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
|
||||
def test_end_on_asset_start(self):
|
||||
"""
|
||||
Test loading with queries that end on the first day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.asset_start(asset),
|
||||
)
|
||||
|
||||
def test_end_on_asset_end(self):
|
||||
"""
|
||||
Test loading with queries that end on the last day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.asset_end(asset),
|
||||
)
|
||||
@@ -30,12 +30,12 @@ from zipline.pipeline.loaders.synthetic import (
|
||||
NullAdjustmentReader,
|
||||
SyntheticDailyBarWriter,
|
||||
)
|
||||
from zipline.data.us_equity_pricing import BcolzDailyBarReader
|
||||
from zipline.finance.trading import TradingEnvironment
|
||||
from zipline.pipeline import Pipeline
|
||||
from zipline.pipeline.data import USEquityPricing
|
||||
from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY
|
||||
from zipline.pipeline.loaders.equity_pricing_loader import (
|
||||
BcolzDailyBarReader,
|
||||
USEquityPricingLoader,
|
||||
)
|
||||
from zipline.pipeline.engine import SimplePipelineEngine
|
||||
|
||||
@@ -40,16 +40,18 @@ from zipline.errors import (
|
||||
PipelineOutputDuringInitialize,
|
||||
NoSuchPipeline,
|
||||
)
|
||||
from zipline.data.us_equity_pricing import (
|
||||
BcolzDailyBarReader,
|
||||
DailyBarWriterFromCSVs,
|
||||
SQLiteAdjustmentWriter,
|
||||
SQLiteAdjustmentReader,
|
||||
)
|
||||
from zipline.finance import trading
|
||||
from zipline.pipeline import Pipeline
|
||||
from zipline.pipeline.factors import VWAP
|
||||
from zipline.pipeline.data import USEquityPricing
|
||||
from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY
|
||||
from zipline.pipeline.loaders.equity_pricing_loader import (
|
||||
BcolzDailyBarReader,
|
||||
DailyBarWriterFromCSVs,
|
||||
SQLiteAdjustmentReader,
|
||||
SQLiteAdjustmentWriter,
|
||||
USEquityPricingLoader,
|
||||
)
|
||||
from zipline.utils.test_utils import (
|
||||
|
||||
@@ -17,7 +17,6 @@ Tests for USEquityPricingLoader and related classes.
|
||||
"""
|
||||
from unittest import TestCase
|
||||
|
||||
from nose_parameterized import parameterized
|
||||
from numpy import (
|
||||
arange,
|
||||
datetime64,
|
||||
@@ -32,11 +31,9 @@ from numpy.testing import (
|
||||
from pandas import (
|
||||
concat,
|
||||
DataFrame,
|
||||
DatetimeIndex,
|
||||
Int64Index,
|
||||
Timestamp,
|
||||
)
|
||||
from pandas.util.testing import assert_index_equal
|
||||
from testfixtures import TempDirectory
|
||||
|
||||
from zipline.lib.adjustment import Float64Multiply
|
||||
@@ -44,12 +41,15 @@ from zipline.pipeline.loaders.synthetic import (
|
||||
NullAdjustmentReader,
|
||||
SyntheticDailyBarWriter,
|
||||
)
|
||||
from zipline.pipeline.loaders.equity_pricing_loader import (
|
||||
from zipline.data.us_equity_pricing import (
|
||||
BcolzDailyBarReader,
|
||||
SQLiteAdjustmentReader,
|
||||
SQLiteAdjustmentWriter,
|
||||
)
|
||||
from zipline.pipeline.loaders.equity_pricing_loader import (
|
||||
USEquityPricingLoader,
|
||||
)
|
||||
|
||||
from zipline.errors import WindowLengthTooLong
|
||||
from zipline.finance.trading import TradingEnvironment
|
||||
from zipline.pipeline.data import USEquityPricing
|
||||
@@ -97,201 +97,6 @@ EQUITY_INFO = DataFrame(
|
||||
TEST_QUERY_ASSETS = EQUITY_INFO.index
|
||||
|
||||
|
||||
class BcolzDailyBarTestCase(TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
all_trading_days = TradingEnvironment().trading_days
|
||||
cls.trading_days = all_trading_days[
|
||||
all_trading_days.get_loc(TEST_CALENDAR_START):
|
||||
all_trading_days.get_loc(TEST_CALENDAR_STOP) + 1
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.asset_info = EQUITY_INFO
|
||||
self.writer = SyntheticDailyBarWriter(
|
||||
self.asset_info,
|
||||
self.trading_days,
|
||||
)
|
||||
|
||||
self.dir_ = TempDirectory()
|
||||
self.dir_.create()
|
||||
self.dest = self.dir_.getpath('daily_equity_pricing.bcolz')
|
||||
|
||||
def tearDown(self):
|
||||
self.dir_.cleanup()
|
||||
|
||||
@property
|
||||
def assets(self):
|
||||
return self.asset_info.index
|
||||
|
||||
def trading_days_between(self, start, end):
|
||||
return self.trading_days[self.trading_days.slice_indexer(start, end)]
|
||||
|
||||
def asset_start(self, asset_id):
|
||||
return self.writer.asset_start(asset_id)
|
||||
|
||||
def asset_end(self, asset_id):
|
||||
return self.writer.asset_end(asset_id)
|
||||
|
||||
def dates_for_asset(self, asset_id):
|
||||
start, end = self.asset_start(asset_id), self.asset_end(asset_id)
|
||||
return self.trading_days_between(start, end)
|
||||
|
||||
def test_write_ohlcv_content(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
for column in SyntheticDailyBarWriter.OHLCV:
|
||||
idx = 0
|
||||
data = result[column][:]
|
||||
multiplier = 1 if column == 'volume' else 1000
|
||||
for asset_id in self.assets:
|
||||
for date in self.dates_for_asset(asset_id):
|
||||
self.assertEqual(
|
||||
SyntheticDailyBarWriter.expected_value(
|
||||
asset_id,
|
||||
date,
|
||||
column
|
||||
) * multiplier,
|
||||
data[idx],
|
||||
)
|
||||
idx += 1
|
||||
self.assertEqual(idx, len(data))
|
||||
|
||||
def test_write_day_and_id(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
idx = 0
|
||||
ids = result['id']
|
||||
days = result['day']
|
||||
for asset_id in self.assets:
|
||||
for date in self.dates_for_asset(asset_id):
|
||||
self.assertEqual(ids[idx], asset_id)
|
||||
self.assertEqual(date, seconds_to_timestamp(days[idx]))
|
||||
idx += 1
|
||||
|
||||
def test_write_attrs(self):
|
||||
result = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
expected_first_row = {
|
||||
'1': 0,
|
||||
'2': 5, # Asset 1 has 5 trading days.
|
||||
'3': 12, # Asset 2 has 7 trading days.
|
||||
'4': 33, # Asset 3 has 21 trading days.
|
||||
'5': 44, # Asset 4 has 11 trading days.
|
||||
'6': 49, # Asset 5 has 5 trading days.
|
||||
}
|
||||
expected_last_row = {
|
||||
'1': 4,
|
||||
'2': 11,
|
||||
'3': 32,
|
||||
'4': 43,
|
||||
'5': 48,
|
||||
'6': 57, # Asset 6 has 9 trading days.
|
||||
}
|
||||
expected_calendar_offset = {
|
||||
'1': 0, # Starts on 6-01, 1st trading day of month.
|
||||
'2': 15, # Starts on 6-22, 16th trading day of month.
|
||||
'3': 1, # Starts on 6-02, 2nd trading day of month.
|
||||
'4': 0, # Starts on 6-01, 1st trading day of month.
|
||||
'5': 9, # Starts on 6-12, 10th trading day of month.
|
||||
'6': 10, # Starts on 6-15, 11th trading day of month.
|
||||
}
|
||||
self.assertEqual(result.attrs['first_row'], expected_first_row)
|
||||
self.assertEqual(result.attrs['last_row'], expected_last_row)
|
||||
self.assertEqual(
|
||||
result.attrs['calendar_offset'],
|
||||
expected_calendar_offset,
|
||||
)
|
||||
assert_index_equal(
|
||||
self.trading_days,
|
||||
DatetimeIndex(result.attrs['calendar'], tz='UTC'),
|
||||
)
|
||||
|
||||
def _check_read_results(self, columns, assets, start_date, end_date):
|
||||
table = self.writer.write(self.dest, self.trading_days, self.assets)
|
||||
reader = BcolzDailyBarReader(table)
|
||||
results = reader.load_raw_arrays(columns, start_date, end_date, assets)
|
||||
dates = self.trading_days_between(start_date, end_date)
|
||||
for column, result in zip(columns, results):
|
||||
assert_array_equal(
|
||||
result,
|
||||
self.writer.expected_values_2d(
|
||||
dates,
|
||||
assets,
|
||||
column.name,
|
||||
)
|
||||
)
|
||||
|
||||
@parameterized.expand([
|
||||
([USEquityPricing.open],),
|
||||
([USEquityPricing.close, USEquityPricing.volume],),
|
||||
([USEquityPricing.volume, USEquityPricing.high, USEquityPricing.low],),
|
||||
(USEquityPricing.columns,),
|
||||
])
|
||||
def test_read(self, columns):
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
TEST_QUERY_START,
|
||||
TEST_QUERY_STOP,
|
||||
)
|
||||
|
||||
def test_start_on_asset_start(self):
|
||||
"""
|
||||
Test loading with queries that starts on the first day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.high, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.asset_start(asset),
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
|
||||
def test_start_on_asset_end(self):
|
||||
"""
|
||||
Test loading with queries that start on the last day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.asset_end(asset),
|
||||
end_date=self.trading_days[-1],
|
||||
)
|
||||
|
||||
def test_end_on_asset_start(self):
|
||||
"""
|
||||
Test loading with queries that end on the first day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.asset_start(asset),
|
||||
)
|
||||
|
||||
def test_end_on_asset_end(self):
|
||||
"""
|
||||
Test loading with queries that end on the last day of each asset's
|
||||
lifetime.
|
||||
"""
|
||||
columns = [USEquityPricing.close, USEquityPricing.volume]
|
||||
for asset in self.assets:
|
||||
self._check_read_results(
|
||||
columns,
|
||||
self.assets,
|
||||
start_date=self.trading_days[0],
|
||||
end_date=self.asset_end(asset),
|
||||
)
|
||||
|
||||
|
||||
# ADJUSTMENTS use the following scheme to indicate information about the value
|
||||
# upon inspection.
|
||||
#
|
||||
|
||||
@@ -0,0 +1,560 @@
|
||||
# Copyright 2015 Quantopian, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from abc import (
|
||||
ABCMeta,
|
||||
abstractmethod,
|
||||
)
|
||||
from contextlib import contextmanager
|
||||
from errno import ENOENT
|
||||
from os import remove
|
||||
from os.path import exists
|
||||
import sqlite3
|
||||
|
||||
from bcolz import (
|
||||
carray,
|
||||
ctable,
|
||||
)
|
||||
from click import progressbar
|
||||
from numpy import (
|
||||
array,
|
||||
float64,
|
||||
floating,
|
||||
full,
|
||||
iinfo,
|
||||
integer,
|
||||
issubdtype,
|
||||
uint32,
|
||||
)
|
||||
from pandas import (
|
||||
DatetimeIndex,
|
||||
read_csv,
|
||||
Timestamp,
|
||||
)
|
||||
from six import (
|
||||
iteritems,
|
||||
string_types,
|
||||
with_metaclass,
|
||||
)
|
||||
|
||||
from ._equities import _compute_row_slices, _read_bcolz_data
|
||||
from ._adjustments import load_adjustments_from_sqlite
|
||||
|
||||
OHLC = frozenset(['open', 'high', 'low', 'close'])
|
||||
US_EQUITY_PRICING_BCOLZ_COLUMNS = [
|
||||
'open', 'high', 'low', 'close', 'volume', 'day', 'id'
|
||||
]
|
||||
DAILY_US_EQUITY_PRICING_DEFAULT_FILENAME = 'daily_us_equity_pricing.bcolz'
|
||||
SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid'])
|
||||
SQLITE_ADJUSTMENT_COLUMN_DTYPES = {
|
||||
'effective_date': integer,
|
||||
'ratio': floating,
|
||||
'sid': integer,
|
||||
}
|
||||
SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers'])
|
||||
|
||||
UINT32_MAX = iinfo(uint32).max
|
||||
|
||||
|
||||
class BcolzDailyBarWriter(with_metaclass(ABCMeta)):
|
||||
"""
|
||||
Class capable of writing daily OHLCV data to disk in a format that can be
|
||||
read efficiently by BcolzDailyOHLCVReader.
|
||||
|
||||
See Also
|
||||
--------
|
||||
BcolzDailyBarReader : Consumer of the data written by this class.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def gen_tables(self, assets):
|
||||
"""
|
||||
Return an iterator of pairs of (asset_id, bcolz.ctable).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def to_uint32(self, array, colname):
|
||||
"""
|
||||
Convert raw column values produced by gen_tables into uint32 values.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
array : np.array
|
||||
An array of raw values.
|
||||
colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'}
|
||||
The name of the column being loaded.
|
||||
|
||||
For output being read by the default BcolzOHLCVReader, data should be
|
||||
stored in the following manner:
|
||||
|
||||
- Pricing columns (Open, High, Low, Close) should be stored as 1000 *
|
||||
as-traded dollar value.
|
||||
- Volume should be the as-traded volume.
|
||||
- Dates should be stored as seconds since midnight UTC, Jan 1, 1970.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def write(self, filename, calendar, assets, show_progress=False):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
filename : str
|
||||
The location at which we should write our output.
|
||||
calendar : pandas.DatetimeIndex
|
||||
Calendar to use to compute asset calendar offsets.
|
||||
assets : pandas.Int64Index
|
||||
The assets for which to write data.
|
||||
show_progress : bool
|
||||
Whether or not to show a progress bar while writing.
|
||||
|
||||
Returns
|
||||
-------
|
||||
table : bcolz.ctable
|
||||
The newly-written table.
|
||||
"""
|
||||
_iterator = self.gen_tables(assets)
|
||||
if show_progress:
|
||||
pbar = progressbar(
|
||||
_iterator,
|
||||
length=len(assets),
|
||||
item_show_func=lambda i: i if i is None else str(i[0]),
|
||||
label="Merging asset files:",
|
||||
)
|
||||
with pbar as pbar_iterator:
|
||||
return self._write_internal(filename, calendar, pbar_iterator)
|
||||
return self._write_internal(filename, calendar, _iterator)
|
||||
|
||||
def _write_internal(self, filename, calendar, iterator):
|
||||
"""
|
||||
Internal implementation of write.
|
||||
|
||||
`iterator` should be an iterator yielding pairs of (asset, ctable).
|
||||
"""
|
||||
total_rows = 0
|
||||
first_row = {}
|
||||
last_row = {}
|
||||
calendar_offset = {}
|
||||
|
||||
# Maps column name -> output carray.
|
||||
columns = {
|
||||
k: carray(array([], dtype=uint32))
|
||||
for k in US_EQUITY_PRICING_BCOLZ_COLUMNS
|
||||
}
|
||||
|
||||
for asset_id, table in iterator:
|
||||
nrows = len(table)
|
||||
for column_name in columns:
|
||||
if column_name == 'id':
|
||||
# We know what the content of this column is, so don't
|
||||
# bother reading it.
|
||||
columns['id'].append(full((nrows,), asset_id))
|
||||
continue
|
||||
columns[column_name].append(
|
||||
self.to_uint32(table[column_name][:], column_name)
|
||||
)
|
||||
|
||||
# Bcolz doesn't support ints as keys in `attrs`, so convert
|
||||
# assets to strings for use as attr keys.
|
||||
asset_key = str(asset_id)
|
||||
|
||||
# Calculate the index into the array of the first and last row
|
||||
# for this asset. This allows us to efficiently load single
|
||||
# assets when querying the data back out of the table.
|
||||
first_row[asset_key] = total_rows
|
||||
last_row[asset_key] = total_rows + nrows - 1
|
||||
total_rows += nrows
|
||||
|
||||
# Calculate the number of trading days between the first date
|
||||
# in the stored data and the first date of **this** asset. This
|
||||
# offset used for output alignment by the reader.
|
||||
|
||||
# HACK: Index with a list so that we get back an array we can pass
|
||||
# to self.to_uint32. We could try to extract this in the loop
|
||||
# above, but that makes the logic a lot messier.
|
||||
asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0]
|
||||
calendar_offset[asset_key] = calendar.get_loc(
|
||||
Timestamp(asset_first_day, unit='s', tz='UTC'),
|
||||
)
|
||||
|
||||
# This writes the table to disk.
|
||||
full_table = ctable(
|
||||
columns=[
|
||||
columns[colname]
|
||||
for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS
|
||||
],
|
||||
names=US_EQUITY_PRICING_BCOLZ_COLUMNS,
|
||||
rootdir=filename,
|
||||
mode='w',
|
||||
)
|
||||
full_table.attrs['first_row'] = first_row
|
||||
full_table.attrs['last_row'] = last_row
|
||||
full_table.attrs['calendar_offset'] = calendar_offset
|
||||
full_table.attrs['calendar'] = calendar.asi8.tolist()
|
||||
return full_table
|
||||
|
||||
|
||||
class DailyBarWriterFromCSVs(BcolzDailyBarWriter):
|
||||
"""
|
||||
BcolzDailyBarWriter constructed from a map from csvs to assets.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
asset_map : dict
|
||||
A map from asset_id -> path to csv with data for that asset.
|
||||
|
||||
CSVs should have the following columns:
|
||||
day : datetime64
|
||||
open : float64
|
||||
high : float64
|
||||
low : float64
|
||||
close : float64
|
||||
volume : int64
|
||||
"""
|
||||
_csv_dtypes = {
|
||||
'open': float64,
|
||||
'high': float64,
|
||||
'low': float64,
|
||||
'close': float64,
|
||||
'volume': float64,
|
||||
}
|
||||
|
||||
def __init__(self, asset_map):
|
||||
self._asset_map = asset_map
|
||||
|
||||
def gen_tables(self, assets):
|
||||
"""
|
||||
Read CSVs as DataFrames from our asset map.
|
||||
"""
|
||||
dtypes = self._csv_dtypes
|
||||
for asset in assets:
|
||||
path = self._asset_map.get(asset)
|
||||
if path is None:
|
||||
raise KeyError("No path supplied for asset %s" % asset)
|
||||
data = read_csv(path, parse_dates=['day'], dtype=dtypes)
|
||||
yield asset, ctable.fromdataframe(data)
|
||||
|
||||
def to_uint32(self, array, colname):
|
||||
arrmax = array.max()
|
||||
if colname in OHLC:
|
||||
self.check_uint_safe(arrmax * 1000, colname)
|
||||
return (array * 1000).astype(uint32)
|
||||
elif colname == 'volume':
|
||||
self.check_uint_safe(arrmax, colname)
|
||||
return array.astype(uint32)
|
||||
elif colname == 'day':
|
||||
nanos_per_second = (1000 * 1000 * 1000)
|
||||
self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname)
|
||||
return (array.view(int) / nanos_per_second).astype(uint32)
|
||||
|
||||
@staticmethod
|
||||
def check_uint_safe(value, colname):
|
||||
if value >= UINT32_MAX:
|
||||
raise ValueError(
|
||||
"Value %s from column '%s' is too large" % (value, colname)
|
||||
)
|
||||
|
||||
|
||||
class BcolzDailyBarReader(object):
|
||||
"""
|
||||
Reader for raw pricing data written by BcolzDailyOHLCVWriter.
|
||||
|
||||
A Bcolz CTable is comprised of Columns and Attributes.
|
||||
|
||||
Columns
|
||||
-------
|
||||
The table with which this loader interacts contains the following columns:
|
||||
|
||||
['open', 'high', 'low', 'close', 'volume', 'day', 'id'].
|
||||
|
||||
The data in these columns is interpreted as follows:
|
||||
|
||||
- Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 *
|
||||
as-traded dollar value.
|
||||
- Volume is interpreted as as-traded volume.
|
||||
- Day is interpreted as seconds since midnight UTC, Jan 1, 1970.
|
||||
- Id is the asset id of the row.
|
||||
|
||||
The data in each column is grouped by asset and then sorted by day within
|
||||
each asset block.
|
||||
|
||||
The table is built to represent a long time range of data, e.g. ten years
|
||||
of equity data, so the lengths of each asset block is not equal to each
|
||||
other. The blocks are clipped to the known start and end date of each asset
|
||||
to cut down on the number of empty values that would need to be included to
|
||||
make a regular/cubic dataset.
|
||||
|
||||
When read across the open, high, low, close, and volume with the same
|
||||
index should represent the same asset and day.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
The table with which this loader interacts contains the following
|
||||
attributes:
|
||||
|
||||
first_row : dict
|
||||
Map from asset_id -> index of first row in the dataset with that id.
|
||||
last_row : dict
|
||||
Map from asset_id -> index of last row in the dataset with that id.
|
||||
calendar_offset : dict
|
||||
Map from asset_id -> calendar index of first row.
|
||||
calendar : list[int64]
|
||||
Calendar used to compute offsets, in asi8 format (ns since EPOCH).
|
||||
|
||||
We use first_row and last_row together to quickly find ranges of rows to
|
||||
load when reading an asset's data into memory.
|
||||
|
||||
We use calendar_offset and calendar to orient loaded blocks within a
|
||||
range of queried dates.
|
||||
"""
|
||||
def __init__(self, table):
|
||||
if isinstance(table, string_types):
|
||||
table = ctable(rootdir=table, mode='r')
|
||||
|
||||
self._table = table
|
||||
self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC')
|
||||
self._first_rows = {
|
||||
int(asset_id): start_index
|
||||
for asset_id, start_index in iteritems(table.attrs['first_row'])
|
||||
}
|
||||
self._last_rows = {
|
||||
int(asset_id): end_index
|
||||
for asset_id, end_index in iteritems(table.attrs['last_row'])
|
||||
}
|
||||
self._calendar_offsets = {
|
||||
int(id_): offset
|
||||
for id_, offset in iteritems(table.attrs['calendar_offset'])
|
||||
}
|
||||
|
||||
def _compute_slices(self, start_idx, end_idx, assets):
|
||||
"""
|
||||
Compute the raw row indices to load for each asset on a query for the
|
||||
given dates after applying a shift.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start_idx : int
|
||||
Index of first date for which we want data.
|
||||
end_idx : int
|
||||
Index of last date for which we want data.
|
||||
assets : pandas.Int64Index
|
||||
Assets for which we want to compute row indices
|
||||
|
||||
Returns
|
||||
-------
|
||||
A 3-tuple of (first_rows, last_rows, offsets):
|
||||
first_rows : np.array[intp]
|
||||
Array with length == len(assets) containing the index of the first
|
||||
row to load for each asset in `assets`.
|
||||
last_rows : np.array[intp]
|
||||
Array with length == len(assets) containing the index of the last
|
||||
row to load for each asset in `assets`.
|
||||
offset : np.array[intp]
|
||||
Array with length == (len(asset) containing the index in a buffer
|
||||
of length `dates` corresponding to the first row of each asset.
|
||||
|
||||
The value of offset[i] will be 0 if asset[i] existed at the start
|
||||
of a query. Otherwise, offset[i] will be equal to the number of
|
||||
entries in `dates` for which the asset did not yet exist.
|
||||
"""
|
||||
# The core implementation of the logic here is implemented in Cython
|
||||
# for efficiency.
|
||||
return _compute_row_slices(
|
||||
self._first_rows,
|
||||
self._last_rows,
|
||||
self._calendar_offsets,
|
||||
start_idx,
|
||||
end_idx,
|
||||
assets,
|
||||
)
|
||||
|
||||
def load_raw_arrays(self, columns, start_date, end_date, assets):
|
||||
# Assumes that the given dates are actually in calendar.
|
||||
start_idx = self._calendar.get_loc(start_date)
|
||||
end_idx = self._calendar.get_loc(end_date)
|
||||
first_rows, last_rows, offsets = self._compute_slices(
|
||||
start_idx,
|
||||
end_idx,
|
||||
assets,
|
||||
)
|
||||
return _read_bcolz_data(
|
||||
self._table,
|
||||
(end_idx - start_idx + 1, len(assets)),
|
||||
[column.name for column in columns],
|
||||
first_rows,
|
||||
last_rows,
|
||||
offsets,
|
||||
)
|
||||
|
||||
|
||||
class SQLiteAdjustmentWriter(object):
|
||||
"""
|
||||
Writer for data to be read by SQLiteAdjustmentWriter
|
||||
|
||||
Parameters
|
||||
----------
|
||||
conn_or_path : str or sqlite3.Connection
|
||||
A handle to the target sqlite database.
|
||||
overwrite : bool, optional, default=False
|
||||
If True and conn_or_path is a string, remove any existing files at the
|
||||
given path before connecting.
|
||||
|
||||
See Also
|
||||
--------
|
||||
SQLiteAdjustmentReader
|
||||
"""
|
||||
|
||||
def __init__(self, conn_or_path, overwrite=False):
|
||||
if isinstance(conn_or_path, sqlite3.Connection):
|
||||
self.conn = conn_or_path
|
||||
elif isinstance(conn_or_path, str):
|
||||
if overwrite and exists(conn_or_path):
|
||||
try:
|
||||
remove(conn_or_path)
|
||||
except OSError as e:
|
||||
if e.errno != ENOENT:
|
||||
raise
|
||||
self.conn = sqlite3.connect(conn_or_path)
|
||||
else:
|
||||
raise TypeError("Unknown connection type %s" % type(conn_or_path))
|
||||
|
||||
def write_frame(self, tablename, frame):
|
||||
if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS:
|
||||
raise ValueError(
|
||||
"Unexpected frame columns:\n"
|
||||
"Expected Columns: %s\n"
|
||||
"Received Columns: %s" % (
|
||||
SQLITE_ADJUSTMENT_COLUMNS,
|
||||
frame.columns.tolist(),
|
||||
)
|
||||
)
|
||||
elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES:
|
||||
raise ValueError(
|
||||
"Adjustment table %s not in %s" % (
|
||||
tablename, SQLITE_ADJUSTMENT_TABLENAMES
|
||||
)
|
||||
)
|
||||
|
||||
expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES
|
||||
actual_dtypes = frame.dtypes
|
||||
for colname, expected in iteritems(expected_dtypes):
|
||||
actual = actual_dtypes[colname]
|
||||
if not issubdtype(actual, expected):
|
||||
raise TypeError(
|
||||
"Expected data of type {expected} for column '{colname}', "
|
||||
"but got {actual}.".format(
|
||||
expected=expected,
|
||||
colname=colname,
|
||||
actual=actual,
|
||||
)
|
||||
)
|
||||
return frame.to_sql(tablename, self.conn)
|
||||
|
||||
def write(self, splits, mergers, dividends):
|
||||
"""
|
||||
Writes data to a SQLite file to be read by SQLiteAdjustmentReader.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
splits : pandas.DataFrame
|
||||
Dataframe containing split data.
|
||||
mergers : pandas.DataFrame
|
||||
DataFrame containing merger data.
|
||||
dividends : pandas.DataFrame
|
||||
DataFrame containing dividend data.
|
||||
|
||||
Notes
|
||||
-----
|
||||
DataFrame input (`splits`, `mergers`, and `dividends`) should all have
|
||||
the following columns:
|
||||
|
||||
effective_date : int
|
||||
The date, represented as seconds since Unix epoch, on which the
|
||||
adjustment should be applied.
|
||||
ratio : float
|
||||
A value to apply to all data earlier than the effective date.
|
||||
sid : int
|
||||
The asset id associated with this adjustment.
|
||||
|
||||
The ratio column is interpreted as follows:
|
||||
- For all adjustment types, multiply price fields ('open', 'high',
|
||||
'low', and 'close') by the ratio.
|
||||
- For **splits only**, **divide** volume by the adjustment ratio.
|
||||
|
||||
Dividend ratios should be calculated as
|
||||
1.0 - (dividend_value / "close on day prior to dividend ex_date").
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
See Also
|
||||
--------
|
||||
SQLiteAdjustmentReader : Consumer for the data written by this class
|
||||
"""
|
||||
self.write_frame('splits', splits)
|
||||
self.write_frame('mergers', mergers)
|
||||
self.write_frame('dividends', dividends)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX splits_sids "
|
||||
"ON splits(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX splits_effective_date "
|
||||
"ON splits(effective_date)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX mergers_sids "
|
||||
"ON mergers(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX mergers_effective_date "
|
||||
"ON mergers(effective_date)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX dividends_sid "
|
||||
"ON dividends(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX dividends_effective_date "
|
||||
"ON dividends(effective_date)"
|
||||
)
|
||||
|
||||
def close(self):
|
||||
self.conn.close()
|
||||
|
||||
|
||||
class SQLiteAdjustmentReader(object):
|
||||
"""
|
||||
Loads adjustments based on corporate actions from a SQLite database.
|
||||
|
||||
Expects data written in the format output by `SQLiteAdjustmentWriter`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
conn : str or sqlite3.Connection
|
||||
Connection from which to load data.
|
||||
"""
|
||||
|
||||
def __init__(self, conn):
|
||||
if isinstance(conn, str):
|
||||
conn = sqlite3.connect(conn)
|
||||
self.conn = conn
|
||||
|
||||
def load_adjustments(self, columns, dates, assets):
|
||||
return load_adjustments_from_sqlite(
|
||||
self.conn,
|
||||
[column.name for column in columns],
|
||||
dates,
|
||||
assets,
|
||||
)
|
||||
@@ -11,41 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from abc import (
|
||||
ABCMeta,
|
||||
abstractmethod,
|
||||
)
|
||||
from contextlib import contextmanager
|
||||
from errno import ENOENT
|
||||
from os import remove
|
||||
from os.path import exists
|
||||
import sqlite3
|
||||
|
||||
from bcolz import (
|
||||
carray,
|
||||
ctable,
|
||||
)
|
||||
from click import progressbar
|
||||
from numpy import (
|
||||
array,
|
||||
float64,
|
||||
floating,
|
||||
full,
|
||||
iinfo,
|
||||
integer,
|
||||
issubdtype,
|
||||
uint32,
|
||||
)
|
||||
from pandas import (
|
||||
DatetimeIndex,
|
||||
read_csv,
|
||||
Timestamp,
|
||||
)
|
||||
from six import (
|
||||
iteritems,
|
||||
string_types,
|
||||
with_metaclass,
|
||||
)
|
||||
|
||||
from zipline.lib.adjusted_array import (
|
||||
adjusted_array,
|
||||
@@ -53,524 +22,10 @@ from zipline.lib.adjusted_array import (
|
||||
from zipline.errors import NoFurtherDataError
|
||||
|
||||
from .base import PipelineLoader
|
||||
from ._equities import _compute_row_slices, _read_bcolz_data
|
||||
from ._adjustments import load_adjustments_from_sqlite
|
||||
|
||||
OHLC = frozenset(['open', 'high', 'low', 'close'])
|
||||
US_EQUITY_PRICING_BCOLZ_COLUMNS = [
|
||||
'open', 'high', 'low', 'close', 'volume', 'day', 'id'
|
||||
]
|
||||
DAILY_US_EQUITY_PRICING_DEFAULT_FILENAME = 'daily_us_equity_pricing.bcolz'
|
||||
SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid'])
|
||||
SQLITE_ADJUSTMENT_COLUMN_DTYPES = {
|
||||
'effective_date': integer,
|
||||
'ratio': floating,
|
||||
'sid': integer,
|
||||
}
|
||||
SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers'])
|
||||
|
||||
UINT32_MAX = iinfo(uint32).max
|
||||
|
||||
|
||||
@contextmanager
|
||||
def passthrough(obj):
|
||||
yield obj
|
||||
|
||||
|
||||
class BcolzDailyBarWriter(with_metaclass(ABCMeta)):
|
||||
"""
|
||||
Class capable of writing daily OHLCV data to disk in a format that can be
|
||||
read efficiently by BcolzDailyOHLCVReader.
|
||||
|
||||
See Also
|
||||
--------
|
||||
BcolzDailyBarReader : Consumer of the data written by this class.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def gen_tables(self, assets):
|
||||
"""
|
||||
Return an iterator of pairs of (asset_id, bcolz.ctable).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abstractmethod
|
||||
def to_uint32(self, array, colname):
|
||||
"""
|
||||
Convert raw column values produced by gen_tables into uint32 values.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
array : np.array
|
||||
An array of raw values.
|
||||
colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'}
|
||||
The name of the column being loaded.
|
||||
|
||||
For output being read by the default BcolzOHLCVReader, data should be
|
||||
stored in the following manner:
|
||||
|
||||
- Pricing columns (Open, High, Low, Close) should be stored as 1000 *
|
||||
as-traded dollar value.
|
||||
- Volume should be the as-traded volume.
|
||||
- Dates should be stored as seconds since midnight UTC, Jan 1, 1970.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def write(self, filename, calendar, assets, show_progress=False):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
filename : str
|
||||
The location at which we should write our output.
|
||||
calendar : pandas.DatetimeIndex
|
||||
Calendar to use to compute asset calendar offsets.
|
||||
assets : pandas.Int64Index
|
||||
The assets for which to write data.
|
||||
show_progress : bool
|
||||
Whether or not to show a progress bar while writing.
|
||||
|
||||
Returns
|
||||
-------
|
||||
table : bcolz.ctable
|
||||
The newly-written table.
|
||||
"""
|
||||
_iterator = self.gen_tables(assets)
|
||||
if show_progress:
|
||||
pbar = progressbar(
|
||||
_iterator,
|
||||
length=len(assets),
|
||||
item_show_func=lambda i: i if i is None else str(i[0]),
|
||||
label="Merging asset files:",
|
||||
)
|
||||
with pbar as pbar_iterator:
|
||||
return self._write_internal(filename, calendar, pbar_iterator)
|
||||
return self._write_internal(filename, calendar, _iterator)
|
||||
|
||||
def _write_internal(self, filename, calendar, iterator):
|
||||
"""
|
||||
Internal implementation of write.
|
||||
|
||||
`iterator` should be an iterator yielding pairs of (asset, ctable).
|
||||
"""
|
||||
total_rows = 0
|
||||
first_row = {}
|
||||
last_row = {}
|
||||
calendar_offset = {}
|
||||
|
||||
# Maps column name -> output carray.
|
||||
columns = {
|
||||
k: carray(array([], dtype=uint32))
|
||||
for k in US_EQUITY_PRICING_BCOLZ_COLUMNS
|
||||
}
|
||||
|
||||
for asset_id, table in iterator:
|
||||
nrows = len(table)
|
||||
for column_name in columns:
|
||||
if column_name == 'id':
|
||||
# We know what the content of this column is, so don't
|
||||
# bother reading it.
|
||||
columns['id'].append(full((nrows,), asset_id))
|
||||
continue
|
||||
columns[column_name].append(
|
||||
self.to_uint32(table[column_name][:], column_name)
|
||||
)
|
||||
|
||||
# Bcolz doesn't support ints as keys in `attrs`, so convert
|
||||
# assets to strings for use as attr keys.
|
||||
asset_key = str(asset_id)
|
||||
|
||||
# Calculate the index into the array of the first and last row
|
||||
# for this asset. This allows us to efficiently load single
|
||||
# assets when querying the data back out of the table.
|
||||
first_row[asset_key] = total_rows
|
||||
last_row[asset_key] = total_rows + nrows - 1
|
||||
total_rows += nrows
|
||||
|
||||
# Calculate the number of trading days between the first date
|
||||
# in the stored data and the first date of **this** asset. This
|
||||
# offset used for output alignment by the reader.
|
||||
|
||||
# HACK: Index with a list so that we get back an array we can pass
|
||||
# to self.to_uint32. We could try to extract this in the loop
|
||||
# above, but that makes the logic a lot messier.
|
||||
asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0]
|
||||
calendar_offset[asset_key] = calendar.get_loc(
|
||||
Timestamp(asset_first_day, unit='s', tz='UTC'),
|
||||
)
|
||||
|
||||
# This writes the table to disk.
|
||||
full_table = ctable(
|
||||
columns=[
|
||||
columns[colname]
|
||||
for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS
|
||||
],
|
||||
names=US_EQUITY_PRICING_BCOLZ_COLUMNS,
|
||||
rootdir=filename,
|
||||
mode='w',
|
||||
)
|
||||
full_table.attrs['first_row'] = first_row
|
||||
full_table.attrs['last_row'] = last_row
|
||||
full_table.attrs['calendar_offset'] = calendar_offset
|
||||
full_table.attrs['calendar'] = calendar.asi8.tolist()
|
||||
return full_table
|
||||
|
||||
|
||||
class DailyBarWriterFromCSVs(BcolzDailyBarWriter):
|
||||
"""
|
||||
BcolzDailyBarWriter constructed from a map from csvs to assets.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
asset_map : dict
|
||||
A map from asset_id -> path to csv with data for that asset.
|
||||
|
||||
CSVs should have the following columns:
|
||||
day : datetime64
|
||||
open : float64
|
||||
high : float64
|
||||
low : float64
|
||||
close : float64
|
||||
volume : int64
|
||||
"""
|
||||
_csv_dtypes = {
|
||||
'open': float64,
|
||||
'high': float64,
|
||||
'low': float64,
|
||||
'close': float64,
|
||||
'volume': float64,
|
||||
}
|
||||
|
||||
def __init__(self, asset_map):
|
||||
self._asset_map = asset_map
|
||||
|
||||
def gen_tables(self, assets):
|
||||
"""
|
||||
Read CSVs as DataFrames from our asset map.
|
||||
"""
|
||||
dtypes = self._csv_dtypes
|
||||
for asset in assets:
|
||||
path = self._asset_map.get(asset)
|
||||
if path is None:
|
||||
raise KeyError("No path supplied for asset %s" % asset)
|
||||
data = read_csv(path, parse_dates=['day'], dtype=dtypes)
|
||||
yield asset, ctable.fromdataframe(data)
|
||||
|
||||
def to_uint32(self, array, colname):
|
||||
arrmax = array.max()
|
||||
if colname in OHLC:
|
||||
self.check_uint_safe(arrmax * 1000, colname)
|
||||
return (array * 1000).astype(uint32)
|
||||
elif colname == 'volume':
|
||||
self.check_uint_safe(arrmax, colname)
|
||||
return array.astype(uint32)
|
||||
elif colname == 'day':
|
||||
nanos_per_second = (1000 * 1000 * 1000)
|
||||
self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname)
|
||||
return (array.view(int) / nanos_per_second).astype(uint32)
|
||||
|
||||
@staticmethod
|
||||
def check_uint_safe(value, colname):
|
||||
if value >= UINT32_MAX:
|
||||
raise ValueError(
|
||||
"Value %s from column '%s' is too large" % (value, colname)
|
||||
)
|
||||
|
||||
|
||||
class BcolzDailyBarReader(object):
|
||||
"""
|
||||
Reader for raw pricing data written by BcolzDailyOHLCVWriter.
|
||||
|
||||
A Bcolz CTable is comprised of Columns and Attributes.
|
||||
|
||||
Columns
|
||||
-------
|
||||
The table with which this loader interacts contains the following columns:
|
||||
|
||||
['open', 'high', 'low', 'close', 'volume', 'day', 'id'].
|
||||
|
||||
The data in these columns is interpreted as follows:
|
||||
|
||||
- Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 *
|
||||
as-traded dollar value.
|
||||
- Volume is interpreted as as-traded volume.
|
||||
- Day is interpreted as seconds since midnight UTC, Jan 1, 1970.
|
||||
- Id is the asset id of the row.
|
||||
|
||||
The data in each column is grouped by asset and then sorted by day within
|
||||
each asset block.
|
||||
|
||||
The table is built to represent a long time range of data, e.g. ten years
|
||||
of equity data, so the lengths of each asset block is not equal to each
|
||||
other. The blocks are clipped to the known start and end date of each asset
|
||||
to cut down on the number of empty values that would need to be included to
|
||||
make a regular/cubic dataset.
|
||||
|
||||
When read across the open, high, low, close, and volume with the same
|
||||
index should represent the same asset and day.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
The table with which this loader interacts contains the following
|
||||
attributes:
|
||||
|
||||
first_row : dict
|
||||
Map from asset_id -> index of first row in the dataset with that id.
|
||||
last_row : dict
|
||||
Map from asset_id -> index of last row in the dataset with that id.
|
||||
calendar_offset : dict
|
||||
Map from asset_id -> calendar index of first row.
|
||||
calendar : list[int64]
|
||||
Calendar used to compute offsets, in asi8 format (ns since EPOCH).
|
||||
|
||||
We use first_row and last_row together to quickly find ranges of rows to
|
||||
load when reading an asset's data into memory.
|
||||
|
||||
We use calendar_offset and calendar to orient loaded blocks within a
|
||||
range of queried dates.
|
||||
"""
|
||||
def __init__(self, table):
|
||||
if isinstance(table, string_types):
|
||||
table = ctable(rootdir=table, mode='r')
|
||||
|
||||
self._table = table
|
||||
self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC')
|
||||
self._first_rows = {
|
||||
int(asset_id): start_index
|
||||
for asset_id, start_index in iteritems(table.attrs['first_row'])
|
||||
}
|
||||
self._last_rows = {
|
||||
int(asset_id): end_index
|
||||
for asset_id, end_index in iteritems(table.attrs['last_row'])
|
||||
}
|
||||
self._calendar_offsets = {
|
||||
int(id_): offset
|
||||
for id_, offset in iteritems(table.attrs['calendar_offset'])
|
||||
}
|
||||
|
||||
def _compute_slices(self, start_idx, end_idx, assets):
|
||||
"""
|
||||
Compute the raw row indices to load for each asset on a query for the
|
||||
given dates after applying a shift.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start_idx : int
|
||||
Index of first date for which we want data.
|
||||
end_idx : int
|
||||
Index of last date for which we want data.
|
||||
assets : pandas.Int64Index
|
||||
Assets for which we want to compute row indices
|
||||
|
||||
Returns
|
||||
-------
|
||||
A 3-tuple of (first_rows, last_rows, offsets):
|
||||
first_rows : np.array[intp]
|
||||
Array with length == len(assets) containing the index of the first
|
||||
row to load for each asset in `assets`.
|
||||
last_rows : np.array[intp]
|
||||
Array with length == len(assets) containing the index of the last
|
||||
row to load for each asset in `assets`.
|
||||
offset : np.array[intp]
|
||||
Array with length == (len(asset) containing the index in a buffer
|
||||
of length `dates` corresponding to the first row of each asset.
|
||||
|
||||
The value of offset[i] will be 0 if asset[i] existed at the start
|
||||
of a query. Otherwise, offset[i] will be equal to the number of
|
||||
entries in `dates` for which the asset did not yet exist.
|
||||
"""
|
||||
# The core implementation of the logic here is implemented in Cython
|
||||
# for efficiency.
|
||||
return _compute_row_slices(
|
||||
self._first_rows,
|
||||
self._last_rows,
|
||||
self._calendar_offsets,
|
||||
start_idx,
|
||||
end_idx,
|
||||
assets,
|
||||
)
|
||||
|
||||
def load_raw_arrays(self, columns, start_date, end_date, assets):
|
||||
# Assumes that the given dates are actually in calendar.
|
||||
start_idx = self._calendar.get_loc(start_date)
|
||||
end_idx = self._calendar.get_loc(end_date)
|
||||
first_rows, last_rows, offsets = self._compute_slices(
|
||||
start_idx,
|
||||
end_idx,
|
||||
assets,
|
||||
)
|
||||
return _read_bcolz_data(
|
||||
self._table,
|
||||
(end_idx - start_idx + 1, len(assets)),
|
||||
[column.name for column in columns],
|
||||
first_rows,
|
||||
last_rows,
|
||||
offsets,
|
||||
)
|
||||
|
||||
|
||||
class SQLiteAdjustmentWriter(object):
|
||||
"""
|
||||
Writer for data to be read by SQLiteAdjustmentWriter
|
||||
|
||||
Parameters
|
||||
----------
|
||||
conn_or_path : str or sqlite3.Connection
|
||||
A handle to the target sqlite database.
|
||||
overwrite : bool, optional, default=False
|
||||
If True and conn_or_path is a string, remove any existing files at the
|
||||
given path before connecting.
|
||||
|
||||
See Also
|
||||
--------
|
||||
SQLiteAdjustmentReader
|
||||
"""
|
||||
|
||||
def __init__(self, conn_or_path, overwrite=False):
|
||||
if isinstance(conn_or_path, sqlite3.Connection):
|
||||
self.conn = conn_or_path
|
||||
elif isinstance(conn_or_path, str):
|
||||
if overwrite and exists(conn_or_path):
|
||||
try:
|
||||
remove(conn_or_path)
|
||||
except OSError as e:
|
||||
if e.errno != ENOENT:
|
||||
raise
|
||||
self.conn = sqlite3.connect(conn_or_path)
|
||||
else:
|
||||
raise TypeError("Unknown connection type %s" % type(conn_or_path))
|
||||
|
||||
def write_frame(self, tablename, frame):
|
||||
if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS:
|
||||
raise ValueError(
|
||||
"Unexpected frame columns:\n"
|
||||
"Expected Columns: %s\n"
|
||||
"Received Columns: %s" % (
|
||||
SQLITE_ADJUSTMENT_COLUMNS,
|
||||
frame.columns.tolist(),
|
||||
)
|
||||
)
|
||||
elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES:
|
||||
raise ValueError(
|
||||
"Adjustment table %s not in %s" % (
|
||||
tablename, SQLITE_ADJUSTMENT_TABLENAMES
|
||||
)
|
||||
)
|
||||
|
||||
expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES
|
||||
actual_dtypes = frame.dtypes
|
||||
for colname, expected in iteritems(expected_dtypes):
|
||||
actual = actual_dtypes[colname]
|
||||
if not issubdtype(actual, expected):
|
||||
raise TypeError(
|
||||
"Expected data of type {expected} for column '{colname}', "
|
||||
"but got {actual}.".format(
|
||||
expected=expected,
|
||||
colname=colname,
|
||||
actual=actual,
|
||||
)
|
||||
)
|
||||
return frame.to_sql(tablename, self.conn)
|
||||
|
||||
def write(self, splits, mergers, dividends):
|
||||
"""
|
||||
Writes data to a SQLite file to be read by SQLiteAdjustmentReader.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
splits : pandas.DataFrame
|
||||
Dataframe containing split data.
|
||||
mergers : pandas.DataFrame
|
||||
DataFrame containing merger data.
|
||||
dividends : pandas.DataFrame
|
||||
DataFrame containing dividend data.
|
||||
|
||||
Notes
|
||||
-----
|
||||
DataFrame input (`splits`, `mergers`, and `dividends`) should all have
|
||||
the following columns:
|
||||
|
||||
effective_date : int
|
||||
The date, represented as seconds since Unix epoch, on which the
|
||||
adjustment should be applied.
|
||||
ratio : float
|
||||
A value to apply to all data earlier than the effective date.
|
||||
sid : int
|
||||
The asset id associated with this adjustment.
|
||||
|
||||
The ratio column is interpreted as follows:
|
||||
- For all adjustment types, multiply price fields ('open', 'high',
|
||||
'low', and 'close') by the ratio.
|
||||
- For **splits only**, **divide** volume by the adjustment ratio.
|
||||
|
||||
Dividend ratios should be calculated as
|
||||
1.0 - (dividend_value / "close on day prior to dividend ex_date").
|
||||
|
||||
Returns
|
||||
-------
|
||||
None
|
||||
|
||||
See Also
|
||||
--------
|
||||
SQLiteAdjustmentReader : Consumer for the data written by this class
|
||||
"""
|
||||
self.write_frame('splits', splits)
|
||||
self.write_frame('mergers', mergers)
|
||||
self.write_frame('dividends', dividends)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX splits_sids "
|
||||
"ON splits(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX splits_effective_date "
|
||||
"ON splits(effective_date)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX mergers_sids "
|
||||
"ON mergers(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX mergers_effective_date "
|
||||
"ON mergers(effective_date)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX dividends_sid "
|
||||
"ON dividends(sid)"
|
||||
)
|
||||
self.conn.execute(
|
||||
"CREATE INDEX dividends_effective_date "
|
||||
"ON dividends(effective_date)"
|
||||
)
|
||||
|
||||
def close(self):
|
||||
self.conn.close()
|
||||
|
||||
|
||||
class SQLiteAdjustmentReader(object):
|
||||
"""
|
||||
Loads adjustments based on corporate actions from a SQLite database.
|
||||
|
||||
Expects data written in the format output by `SQLiteAdjustmentWriter`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
conn : str or sqlite3.Connection
|
||||
Connection from which to load data.
|
||||
"""
|
||||
|
||||
def __init__(self, conn):
|
||||
if isinstance(conn, str):
|
||||
conn = sqlite3.connect(conn)
|
||||
self.conn = conn
|
||||
|
||||
def load_adjustments(self, columns, dates, assets):
|
||||
return load_adjustments_from_sqlite(
|
||||
self.conn,
|
||||
[column.name for column in columns],
|
||||
dates,
|
||||
assets,
|
||||
)
|
||||
|
||||
|
||||
class USEquityPricingLoader(PipelineLoader):
|
||||
"""
|
||||
PipelineLoader for US Equity Pricing data
|
||||
|
||||
@@ -17,7 +17,7 @@ from sqlite3 import connect as sqlite3_connect
|
||||
|
||||
from .base import PipelineLoader
|
||||
from .frame import DataFrameLoader
|
||||
from .equity_pricing_loader import (
|
||||
from zipline.data.us_equity_pricing import (
|
||||
BcolzDailyBarWriter,
|
||||
SQLiteAdjustmentReader,
|
||||
SQLiteAdjustmentWriter,
|
||||
|
||||
Reference in New Issue
Block a user