# # Copyright 2015 Quantopian, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from sys import maxsize import re from nose_parameterized import parameterized from numpy import ( arange, datetime64, nan, ) from numpy.testing import ( assert_array_equal, ) from pandas import ( DataFrame, Timestamp, ) from pandas.util.testing import assert_index_equal from catalyst.data.us_equity_pricing import ( BcolzDailyBarReader, BcolzDailyBarWriter, NoDataBeforeDate, NoDataAfterDate, ) from catalyst.pipeline.loaders.synthetic import ( OHLCV, asset_start, asset_end, expected_bar_value, expected_bar_values_2d, make_bar_data, ) from catalyst.testing import seconds_to_timestamp from catalyst.testing.fixtures import ( WithAssetFinder, WithBcolzEquityDailyBarReader, WithTmpDir, WithTradingCalendars, ZiplineTestCase, ) from catalyst.utils.calendars import get_calendar TEST_CALENDAR_START = Timestamp('2015-06-01', tz='UTC') TEST_CALENDAR_STOP = Timestamp('2015-06-30', tz='UTC') TEST_QUERY_START = Timestamp('2015-06-10', tz='UTC') TEST_QUERY_STOP = Timestamp('2015-06-19', tz='UTC') # One asset for each of the cases enumerated in load_raw_arrays_from_bcolz. EQUITY_INFO = DataFrame( [ # 1) The equity's trades start and end before query. {'start_date': '2015-06-01', 'end_date': '2015-06-05'}, # 2) The equity's trades start and end after query. {'start_date': '2015-06-22', 'end_date': '2015-06-30'}, # 3) The equity's data covers all dates in range. {'start_date': '2015-06-02', 'end_date': '2015-06-30'}, # 4) The equity's trades start before the query start, but stop # before the query end. {'start_date': '2015-06-01', 'end_date': '2015-06-15'}, # 5) The equity's trades start and end during the query. {'start_date': '2015-06-12', 'end_date': '2015-06-18'}, # 6) The equity's trades start during the query, but extend through # the whole query. {'start_date': '2015-06-15', 'end_date': '2015-06-25'}, ], index=arange(1, 7), columns=['start_date', 'end_date'], ).astype(datetime64) EQUITY_INFO['symbol'] = [chr(ord('A') + n) for n in range(len(EQUITY_INFO))] TEST_QUERY_ASSETS = EQUITY_INFO.index class BcolzDailyBarTestCase(WithBcolzEquityDailyBarReader, ZiplineTestCase): EQUITY_DAILY_BAR_START_DATE = TEST_CALENDAR_START EQUITY_DAILY_BAR_END_DATE = TEST_CALENDAR_STOP @classmethod def make_equity_info(cls): return EQUITY_INFO @classmethod def make_equity_daily_bar_data(cls): return make_bar_data( EQUITY_INFO, cls.equity_daily_bar_days, ) @classmethod def init_class_fixtures(cls): super(BcolzDailyBarTestCase, cls).init_class_fixtures() cls.sessions = cls.trading_calendar.sessions_in_range( cls.trading_calendar.minute_to_session_label(TEST_CALENDAR_START), cls.trading_calendar.minute_to_session_label(TEST_CALENDAR_STOP) ) @property def assets(self): return EQUITY_INFO.index def trading_days_between(self, start, end): return self.sessions[self.sessions.slice_indexer(start, end)] def asset_start(self, asset_id): return asset_start(EQUITY_INFO, asset_id) def asset_end(self, asset_id): return asset_end(EQUITY_INFO, asset_id) def dates_for_asset(self, asset_id): start, end = self.asset_start(asset_id), self.asset_end(asset_id) return self.trading_days_between(start, end) def test_write_ohlcv_content(self): result = self.bcolz_daily_bar_ctable for column in OHLCV: idx = 0 data = result[column][:] multiplier = 1 if column == 'volume' else 1000 for asset_id in self.assets: for date in self.dates_for_asset(asset_id): self.assertEqual( expected_bar_value( asset_id, date, column ) * multiplier, data[idx], ) idx += 1 self.assertEqual(idx, len(data)) def test_write_day_and_id(self): result = self.bcolz_daily_bar_ctable idx = 0 ids = result['id'] days = result['day'] for asset_id in self.assets: for date in self.dates_for_asset(asset_id): self.assertEqual(ids[idx], asset_id) self.assertEqual(date, seconds_to_timestamp(days[idx])) idx += 1 def test_write_attrs(self): result = self.bcolz_daily_bar_ctable expected_first_row = { '1': 0, '2': 5, # Asset 1 has 5 trading days. '3': 12, # Asset 2 has 7 trading days. '4': 33, # Asset 3 has 21 trading days. '5': 44, # Asset 4 has 11 trading days. '6': 49, # Asset 5 has 5 trading days. } expected_last_row = { '1': 4, '2': 11, '3': 32, '4': 43, '5': 48, '6': 57, # Asset 6 has 9 trading days. } expected_calendar_offset = { '1': 0, # Starts on 6-01, 1st trading day of month. '2': 15, # Starts on 6-22, 16th trading day of month. '3': 1, # Starts on 6-02, 2nd trading day of month. '4': 0, # Starts on 6-01, 1st trading day of month. '5': 9, # Starts on 6-12, 10th trading day of month. '6': 10, # Starts on 6-15, 11th trading day of month. } self.assertEqual(result.attrs['first_row'], expected_first_row) self.assertEqual(result.attrs['last_row'], expected_last_row) self.assertEqual( result.attrs['calendar_offset'], expected_calendar_offset, ) cal = get_calendar(result.attrs['calendar_name']) first_session = Timestamp(result.attrs['start_session_ns'], tz='UTC') end_session = Timestamp(result.attrs['end_session_ns'], tz='UTC') sessions = cal.sessions_in_range(first_session, end_session) assert_index_equal( self.sessions, sessions ) def test_read_first_trading_day(self): self.assertEqual( self.bcolz_equity_daily_bar_reader.first_trading_day, self.sessions[0], ) def _check_read_results(self, columns, assets, start_date, end_date): results = self.bcolz_equity_daily_bar_reader.load_raw_arrays( columns, start_date, end_date, assets, ) dates = self.trading_days_between(start_date, end_date) for column, result in zip(columns, results): assert_array_equal( result, expected_bar_values_2d( dates, EQUITY_INFO, column, ) ) @parameterized.expand([ (['open'],), (['close', 'volume'],), (['volume', 'high', 'low'],), (['open', 'high', 'low', 'close', 'volume'],), ]) def test_read(self, columns): self._check_read_results( columns, self.assets, TEST_QUERY_START, TEST_QUERY_STOP, ) def test_start_on_asset_start(self): """ Test loading with queries that starts on the first day of each asset's lifetime. """ columns = ['high', 'volume'] for asset in self.assets: self._check_read_results( columns, self.assets, start_date=self.asset_start(asset), end_date=self.sessions[-1], ) def test_start_on_asset_end(self): """ Test loading with queries that start on the last day of each asset's lifetime. """ columns = ['close', 'volume'] for asset in self.assets: self._check_read_results( columns, self.assets, start_date=self.asset_end(asset), end_date=self.sessions[-1], ) def test_end_on_asset_start(self): """ Test loading with queries that end on the first day of each asset's lifetime. """ columns = ['close', 'volume'] for asset in self.assets: self._check_read_results( columns, self.assets, start_date=self.sessions[0], end_date=self.asset_start(asset), ) def test_end_on_asset_end(self): """ Test loading with queries that end on the last day of each asset's lifetime. """ columns = ['close', 'volume'] for asset in self.assets: self._check_read_results( columns, self.assets, start_date=self.sessions[0], end_date=self.asset_end(asset), ) def test_unadjusted_get_value(self): reader = self.bcolz_equity_daily_bar_reader # At beginning price = reader.get_value(1, Timestamp('2015-06-01', tz='UTC'), 'close') # Synthetic writes price for date. self.assertEqual(108630.0, price) # Middle price = reader.get_value(1, Timestamp('2015-06-02', tz='UTC'), 'close') self.assertEqual(108631.0, price) # End price = reader.get_value(1, Timestamp('2015-06-05', tz='UTC'), 'close') self.assertEqual(108634.0, price) # Another sid at beginning. price = reader.get_value(2, Timestamp('2015-06-22', tz='UTC'), 'close') self.assertEqual(208651.0, price) # Ensure that volume does not have float adjustment applied. volume = reader.get_value(1, Timestamp('2015-06-02', tz='UTC'), 'volume') self.assertEqual(109631, volume) def test_unadjusted_get_value_no_data(self): table = self.bcolz_daily_bar_ctable reader = BcolzDailyBarReader(table) # before with self.assertRaises(NoDataBeforeDate): reader.get_value(2, Timestamp('2015-06-08', tz='UTC'), 'close') # after with self.assertRaises(NoDataAfterDate): reader.get_value(4, Timestamp('2015-06-16', tz='UTC'), 'close') def test_unadjusted_get_value_empty_value(self): reader = self.bcolz_equity_daily_bar_reader # A sid, day and corresponding index into which to overwrite a zero. zero_sid = 1 zero_day = Timestamp('2015-06-02', tz='UTC') zero_ix = reader.sid_day_index(zero_sid, zero_day) old = reader._spot_col('close')[zero_ix] try: # Write a zero into the synthetic pricing data at the day and sid, # so that a read should now return -1. # This a little hacky, in lieu of changing the synthetic data set. reader._spot_col('close')[zero_ix] = 0 close = reader.get_value(zero_sid, zero_day, 'close') assert_array_equal(nan, close) finally: reader._spot_col('close')[zero_ix] = old class BcolzDailyBarAlwaysReadAllTestCase(BcolzDailyBarTestCase): """ Force tests defined in BcolzDailyBarTestCase to always read the entire column into memory before selecting desired asset data, when invoking `load_raw_array`. """ BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = 0 class BcolzDailyBarNeverReadAllTestCase(BcolzDailyBarTestCase): """ Force tests defined in BcolzDailyBarTestCase to never read the entire column into memory before selecting desired asset data, when invoking `load_raw_array`. """ BCOLZ_DAILY_BAR_READ_ALL_THRESHOLD = maxsize class BcolzDailyBarWriterMissingDataTestCase(WithAssetFinder, WithTmpDir, WithTradingCalendars, ZiplineTestCase): # Sid 3 is active from 2015-06-02 to 2015-06-30. MISSING_DATA_SID = 3 # Leave out data for a day in the middle of the query range. MISSING_DATA_DAY = Timestamp('2015-06-15', tz='UTC') @classmethod def make_equity_info(cls): return EQUITY_INFO.loc[EQUITY_INFO.index == cls.MISSING_DATA_SID] def test_missing_values_assertion(self): sessions = self.trading_calendar.sessions_in_range( TEST_CALENDAR_START, TEST_CALENDAR_STOP, ) sessions_with_gap = sessions[sessions != self.MISSING_DATA_DAY] bar_data = make_bar_data(self.make_equity_info(), sessions_with_gap) writer = BcolzDailyBarWriter( self.tmpdir.path, self.trading_calendar, sessions[0], sessions[-1], ) # There are 21 sessions between the start and end date for this # asset, and we excluded one. expected_msg = re.escape( "Got 20 rows for daily bars table with first day=2015-06-02, last " "day=2015-06-30, expected 21 rows.\n" "Missing sessions: " "[Timestamp('2015-06-15 00:00:00+0000', tz='UTC')]\n" "Extra sessions: []" ) with self.assertRaisesRegexp(AssertionError, expected_msg): writer.write(bar_data)