diff --git a/tests/resources/example_data.tar.gz b/tests/resources/example_data.tar.gz index 9b6c6d8c..86c6b178 100644 Binary files a/tests/resources/example_data.tar.gz and b/tests/resources/example_data.tar.gz differ diff --git a/tests/test_assets.py b/tests/test_assets.py index 68c0e3d5..ec317435 100644 --- a/tests/test_assets.py +++ b/tests/test_assets.py @@ -62,10 +62,14 @@ from zipline.errors import ( EquitiesNotFound, FutureContractsNotFound, MultipleSymbolsFound, + MultipleValuesFoundForField, + MultipleValuesFoundForSid, + NoValueForSid, AssetDBVersionError, SidsNotFound, SymbolNotFound, AssetDBImpossibleDowngrade, + ValueNotFoundForField, ) from zipline.testing import ( all_subindices, @@ -966,6 +970,217 @@ class AssetFinderTestCase(WithTradingCalendars, ZiplineTestCase): )) self.assertEqual({0, 1, 2}, set(self.asset_finder.sids)) + def test_lookup_by_supplementary_field(self): + equities = pd.DataFrame.from_records( + [ + { + 'sid': 0, + 'symbol': 'A', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + { + 'sid': 1, + 'symbol': 'B', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + { + 'sid': 2, + 'symbol': 'C', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + ] + ) + + equity_supplementary_mappings = pd.DataFrame.from_records( + [ + { + 'sid': 0, + 'field': 'ALT_ID', + 'value': '100000000', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2013-6-28', tz='UTC'), + }, + { + 'sid': 1, + 'field': 'ALT_ID', + 'value': '100000001', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + { + 'sid': 0, + 'field': 'ALT_ID', + 'value': '100000002', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + { + 'sid': 2, + 'field': 'ALT_ID', + 'value': '100000000', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + ] + ) + + self.write_assets( + equities=equities, + equity_supplementary_mappings=equity_supplementary_mappings, + ) + + af = self.asset_finder + + # Before sid 0 has changed ALT_ID. + dt = pd.Timestamp('2013-6-28', tz='UTC') + + asset_0 = af.lookup_by_supplementary_field('ALT_ID', '100000000', dt) + self.assertEqual(asset_0.sid, 0) + + asset_1 = af.lookup_by_supplementary_field('ALT_ID', '100000001', dt) + self.assertEqual(asset_1.sid, 1) + + # We don't know about this ALT_ID yet. + with self.assertRaisesRegexp( + ValueNotFoundForField, + "Value '{}' was not found for field '{}'.".format( + '100000002', + 'ALT_ID', + ) + ): + af.lookup_by_supplementary_field('ALT_ID', '100000002', dt) + + # After all assets have ended. + dt = pd.Timestamp('2014-01-02', tz='UTC') + + asset_2 = af.lookup_by_supplementary_field('ALT_ID', '100000000', dt) + self.assertEqual(asset_2.sid, 2) + + asset_1 = af.lookup_by_supplementary_field('ALT_ID', '100000001', dt) + self.assertEqual(asset_1.sid, 1) + + asset_0 = af.lookup_by_supplementary_field('ALT_ID', '100000002', dt) + self.assertEqual(asset_0.sid, 0) + + # At this point both sids 0 and 2 have held this value, so an + # as_of_date is required. + expected_in_repr = ( + "Multiple occurrences of the value '{}' found for field '{}'." + ).format('100000000', 'ALT_ID') + + with self.assertRaisesRegexp( + MultipleValuesFoundForField, + expected_in_repr, + ): + af.lookup_by_supplementary_field('ALT_ID', '100000000', None) + + def test_get_supplementary_field(self): + equities = pd.DataFrame.from_records( + [ + { + 'sid': 0, + 'symbol': 'A', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + { + 'sid': 1, + 'symbol': 'B', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + { + 'sid': 2, + 'symbol': 'C', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + 'exchange': 'TEST', + }, + ] + ) + + equity_supplementary_mappings = pd.DataFrame.from_records( + [ + { + 'sid': 0, + 'field': 'ALT_ID', + 'value': '100000000', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2013-6-28', tz='UTC'), + }, + { + 'sid': 1, + 'field': 'ALT_ID', + 'value': '100000001', + 'start_date': pd.Timestamp('2013-1-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + { + 'sid': 0, + 'field': 'ALT_ID', + 'value': '100000002', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + { + 'sid': 2, + 'field': 'ALT_ID', + 'value': '100000000', + 'start_date': pd.Timestamp('2013-7-1', tz='UTC'), + 'end_date': pd.Timestamp('2014-1-1', tz='UTC'), + }, + ] + ) + + self.write_assets( + equities=equities, + equity_supplementary_mappings=equity_supplementary_mappings, + ) + finder = self.asset_finder + + # Before sid 0 has changed ALT_ID and sid 2 has started. + dt = pd.Timestamp('2013-6-28', tz='UTC') + + for sid, expected in [(0, '100000000'), (1, '100000001')]: + self.assertEqual( + finder.get_supplementary_field(sid, 'ALT_ID', dt), + expected, + ) + + # Since sid 2 has not yet started, we don't know about its + # ALT_ID. + with self.assertRaisesRegexp( + NoValueForSid, + "No '{}' value found for sid '{}'.".format('ALT_ID', 2), + ): + finder.get_supplementary_field(2, 'ALT_ID', dt), + + # After all assets have ended. + dt = pd.Timestamp('2014-01-02', tz='UTC') + + for sid, expected in [ + (0, '100000002'), (1, '100000001'), (2, '100000000'), + ]: + self.assertEqual( + finder.get_supplementary_field(sid, 'ALT_ID', dt), + expected, + ) + + # Sid 0 has historically held two values for ALT_ID by this dt. + with self.assertRaisesRegexp( + MultipleValuesFoundForSid, + "Multiple '{}' values found for sid '{}'.".format('ALT_ID', 0), + ): + finder.get_supplementary_field(0, 'ALT_ID', None), + def test_group_by_type(self): equities = make_simple_equity_info( range(5), diff --git a/zipline/assets/asset_db_migrations.py b/zipline/assets/asset_db_migrations.py index 2b1431c5..78eaf4a4 100644 --- a/zipline/assets/asset_db_migrations.py +++ b/zipline/assets/asset_db_migrations.py @@ -309,3 +309,8 @@ def _downgrade_v5(op): 'equities', ['fuzzy_symbol'], ) + + +@downgrades(6) +def _downgrade_v6(op): + op.drop_table('equity_supplementary_mappings') diff --git a/zipline/assets/asset_db_schema.py b/zipline/assets/asset_db_schema.py index ea544797..9c9c98c2 100644 --- a/zipline/assets/asset_db_schema.py +++ b/zipline/assets/asset_db_schema.py @@ -6,7 +6,7 @@ import sqlalchemy as sa # assets database # NOTE: When upgrading this remember to add a downgrade in: # .asset_db_migrations -ASSET_DB_VERSION = 5 +ASSET_DB_VERSION = 6 # A frozenset of the names of all tables in the assets db # NOTE: When modifying this schema, update the ASSET_DB_VERSION value @@ -14,6 +14,7 @@ asset_db_table_names = frozenset({ 'asset_router', 'equities', 'equity_symbol_mappings', + 'equity_supplementary_mappings', 'futures_contracts', 'futures_exchanges', 'futures_root_symbols', @@ -84,6 +85,22 @@ equity_symbol_mappings = sa.Table( ), ) +equity_supplementary_mappings = sa.Table( + 'equity_supplementary_mappings', + metadata, + sa.Column( + 'sid', + sa.Integer, + sa.ForeignKey(equities.c.sid), + nullable=False, + primary_key=True + ), + sa.Column('field', sa.Text, nullable=False, primary_key=True), + sa.Column('start_date', sa.Integer, nullable=False, primary_key=True), + sa.Column('end_date', sa.Integer, nullable=False), + sa.Column('value', sa.Text, nullable=False), +) + futures_exchanges = sa.Table( 'futures_exchanges', metadata, diff --git a/zipline/assets/asset_writer.py b/zipline/assets/asset_writer.py index c3d4b455..ec3c79de 100644 --- a/zipline/assets/asset_writer.py +++ b/zipline/assets/asset_writer.py @@ -28,6 +28,7 @@ from zipline.assets.asset_db_schema import ( asset_router, equities as equities_table, equity_symbol_mappings, + equity_supplementary_mappings as equity_supplementary_mappings_table, futures_contracts as futures_contracts_table, futures_exchanges, futures_root_symbols, @@ -47,6 +48,7 @@ AssetData = namedtuple( 'futures', 'exchanges', 'root_symbols', + 'equity_supplementary_mappings', ), ) @@ -102,6 +104,16 @@ _root_symbols_defaults = { 'exchange': None, } +# Default values for the equity_supplementary_mappings DataFrame +_equity_supplementary_mappings_defaults = { + 'sid': None, + 'value': None, + 'field': None, + 'start_date': 0, + 'end_date': 2 ** 62 - 1, +} + + # Fuzzy symbol delimiters that may break up a company symbol and share class _delimited_symbol_delimiters_regex = re.compile(r'[./\-_]') _delimited_symbol_default_triggers = frozenset({np.nan, None, ''}) @@ -355,6 +367,7 @@ class AssetDBWriter(object): futures=None, exchanges=None, root_symbols=None, + equity_supplementary_mappings=None, chunk_size=DEFAULT_CHUNK_SIZE): """Write asset metadata to a sqlite database. @@ -379,7 +392,7 @@ class AssetDBWriter(object): The exchange where this asset is traded. The index of this dataframe should contain the sids. - futures : pd.Dataframe, optional + futures : pd.DataFrame, optional The future contract metadata. The columns for this dataframe are: symbol : str @@ -410,7 +423,7 @@ class AssetDBWriter(object): multiplier: float The amount of the underlying asset represented by this contract. - exchanges : pd.Dataframe, optional + exchanges : pd.DataFrame, optional The exchanges where assets can be traded. The columns of this dataframe are: @@ -418,7 +431,7 @@ class AssetDBWriter(object): The name of the exchange. timezone : str The timezone of the exchange. - root_symbols : pd.Dataframe, optional + root_symbols : pd.DataFrame, optional The root symbols for the futures contracts. The columns for this dataframe are: @@ -432,6 +445,8 @@ class AssetDBWriter(object): A short description of this root symbol. exchange : str The exchange where this root symbol is traded. + equity_supplementary_mappings : pd.DataFrame, optional + Additional mappings from values of abitrary type to assets. chunk_size : int, optional The amount of rows to write to the SQLite table at once. This defaults to the default number of bind params in sqlite. @@ -452,6 +467,11 @@ class AssetDBWriter(object): futures if futures is not None else pd.DataFrame(), exchanges if exchanges is not None else pd.DataFrame(), root_symbols if root_symbols is not None else pd.DataFrame(), + ( + equity_supplementary_mappings + if equity_supplementary_mappings is not None + else pd.DataFrame() + ), ) # Write the data to SQL. self._write_df_to_table( @@ -466,6 +486,13 @@ class AssetDBWriter(object): conn, chunk_size, ) + self._write_df_to_table( + equity_supplementary_mappings_table, + data.equity_supplementary_mappings, + conn, + chunk_size, + idx=False, + ) self._write_assets( 'future', data.futures, @@ -480,10 +507,19 @@ class AssetDBWriter(object): mapping_data=data.equities_mappings, ) - def _write_df_to_table(self, tbl, df, txn, chunk_size, idx_label=None): + def _write_df_to_table( + self, + tbl, + df, + txn, + chunk_size, + idx=True, + idx_label=None, + ): df.to_sql( tbl.name, txn.connection, + index=idx, index_label=( idx_label if idx_label is not None else @@ -639,7 +675,25 @@ class AssetDBWriter(object): return futures_output - def _load_data(self, equities, futures, exchanges, root_symbols): + def _normalize_equity_supplementary_mappings(self, mappings): + mappings_output = _generate_output_dataframe( + data_subset=mappings, + defaults=_equity_supplementary_mappings_defaults, + ) + + for col in ('start_date', 'end_date'): + mappings_output[col] = _dt_to_epoch_ns(mappings_output[col]) + + return mappings_output + + def _load_data( + self, + equities, + futures, + exchanges, + root_symbols, + equity_supplementary_mappings, + ): """ Returns a standard set of pandas.DataFrames: equities, futures, exchanges, root_symbols @@ -657,6 +711,12 @@ class AssetDBWriter(object): equities_output, equities_mappings = self._normalize_equities(equities) futures_output = self._normalize_futures(futures) + equity_supplementary_mappings_output = ( + self._normalize_equity_supplementary_mappings( + equity_supplementary_mappings, + ) + ) + exchanges_output = _generate_output_dataframe( data_subset=exchanges, defaults=_exchanges_defaults, @@ -673,4 +733,5 @@ class AssetDBWriter(object): futures=futures_output, exchanges=exchanges_output, root_symbols=root_symbols_output, + equity_supplementary_mappings=equity_supplementary_mappings_output, ) diff --git a/zipline/assets/assets.py b/zipline/assets/assets.py index 179a2fd9..2ffb49b2 100644 --- a/zipline/assets/assets.py +++ b/zipline/assets/assets.py @@ -43,6 +43,10 @@ from zipline.errors import ( FutureContractsNotFound, MapAssetIdentifierIndexError, MultipleSymbolsFound, + MultipleValuesFoundForField, + MultipleValuesFoundForSid, + NoValueForSid, + ValueNotFoundForField, SidsNotFound, SymbolNotFound, ) @@ -90,7 +94,67 @@ _asset_timestamp_fields = frozenset({ 'auto_close_date', }) -SymbolOwnership = namedtuple('SymbolOwnership', 'start end sid symbol') +OwnershipPeriod = namedtuple('OwnershipPeriod', 'start end sid value') + + +def merge_ownership_periods(mappings): + """ + Given a dict of mappings where the values are lists of + OwnershipPeriod objects, returns a dict with the same structure with + new OwnershipPeriod objects adjusted so that the periods have no + gaps. + + Orders the periods chronologically, and pushes forward the end date + of each period to match the start date of the following period. The + end date of the last period pushed forward to the max Timestamp. + """ + return valmap( + lambda v: tuple( + OwnershipPeriod( + a.start, + b.start, + a.sid, + a.value, + ) for a, b in sliding_window( + 2, + concatv( + sorted(v), + # concat with a fake ownership object to make the last + # end date be max timestamp + [OwnershipPeriod( + pd.Timestamp.max.tz_localize('utc'), + None, + None, + None, + )], + ), + ) + ), + mappings, + ) + + +def build_ownership_map(table, key_from_row, value_from_row): + """ + Builds a dict mapping to lists of OwnershipPeriods, from a db table. + """ + rows = sa.select(table.c).execute().fetchall() + + mappings = {} + for row in rows: + mappings.setdefault( + key_from_row(row), + [], + ).append( + OwnershipPeriod( + pd.Timestamp(row.start_date, unit='ns', tz='utc'), + pd.Timestamp(row.end_date, unit='ns', tz='utc'), + row.sid, + value_from_row(row), + ), + ) + + return merge_ownership_periods(mappings) @curry @@ -259,46 +323,12 @@ class AssetFinder(object): @lazyval def symbol_ownership_map(self): - rows = sa.select(self.equity_symbol_mappings.c).execute().fetchall() - - mappings = {} - for row in rows: - mappings.setdefault( - (row.company_symbol, row.share_class_symbol), - [], - ).append( - SymbolOwnership( - pd.Timestamp(row.start_date, unit='ns', tz='utc'), - pd.Timestamp(row.end_date, unit='ns', tz='utc'), - row.sid, - row.symbol, - ), - ) - - return valmap( - lambda v: tuple( - SymbolOwnership( - a.start, - b.start, - a.sid, - a.symbol, - ) for a, b in sliding_window( - 2, - concatv( - sorted(v), - # concat with a fake ownership object to make the last - # end date be max timestamp - [SymbolOwnership( - pd.Timestamp.max.tz_localize('utc'), - None, - None, - None, - )], - ), - ) + return build_ownership_map( + table=self.equity_symbol_mappings, + key_from_row=( + lambda row: (row.company_symbol, row.share_class_symbol) ), - mappings, - factory=lambda: mappings, + value_from_row=lambda row: row.symbol, ) @lazyval @@ -313,6 +343,22 @@ class AssetFinder(object): fuzzy_owners.sort() return fuzzy_mappings + @lazyval + def equity_supplementary_map(self): + return build_ownership_map( + table=self.equity_supplementary_mappings, + key_from_row=lambda row: (row.field, row.value), + value_from_row=lambda row: row.value, + ) + + @lazyval + def equity_supplementary_map_by_sid(self): + return build_ownership_map( + table=self.equity_supplementary_mappings, + key_from_row=lambda row: (row.field, row.sid), + value_from_row=lambda row: row.value, + ) + def lookup_asset_types(self, sids): """ Retrieve asset types for a list of sids. @@ -809,6 +855,99 @@ class AssetFinder(object): raise SymbolNotFound(symbol=symbol) return self.retrieve_asset(data['sid']) + def lookup_by_supplementary_field(self, field_name, value, as_of_date): + try: + owners = self.equity_supplementary_map[ + field_name, + value, + ] + assert owners, 'empty owners list for %r' % (field_name, value) + except KeyError: + # no equity has ever held this value + raise ValueNotFoundForField(field=field_name, value=value) + + if not as_of_date: + if len(owners) > 1: + # more than one equity has held this value, this is ambigious + # without the date + raise MultipleValuesFoundForField( + field=field_name, + value=value, + options=set(map( + compose(self.retrieve_asset, attrgetter('sid')), + owners, + )), + ) + # exactly one equity has ever held this value, we may resolve + # without the date + return self.retrieve_asset(owners[0].sid) + + for start, end, sid, _ in owners: + if start <= as_of_date < end: + # find the equity that owned it on the given asof date + return self.retrieve_asset(sid) + + # no equity held the value on the given asof date + raise ValueNotFoundForField(field=field_name, value=value) + + def get_supplementary_field( + self, + sid, + field_name, + as_of_date, + ): + """Get the value of a supplementary field for an asset. + + Parameters + ---------- + sid : int + The sid of the asset to query. + field_name : str + Name of the supplementary field. + as_of_date : pd.Timestamp, None + The last known value on this date is returned. If None, a + value is returned only if we've only ever had one value for + this sid. If None and we've had multiple values, + MultipleValuesFoundForSid is raised. + + Raises + ------ + NoValueForSid + If we have no values for this asset, or no values was known + on this as_of_date. + MultipleValuesFoundForSid + If we have had multiple values for this asset over time, and + None was passed for as_of_date. + """ + try: + periods = self.equity_supplementary_map_by_sid[ + field_name, + sid, + ] + assert periods, 'empty periods list for %r' % (field_name, sid) + except KeyError: + raise NoValueForSid(field=field_name, sid=sid) + + if not as_of_date: + if len(periods) > 1: + # This equity has held more than one value, this is ambigious + # without the date + raise MultipleValuesFoundForSid( + field=field_name, + sid=sid, + options={p.value for p in periods}, + ) + # this equity has only ever held this value, we may resolve + # without the date + return periods[0].value + + for start, end, _, value in periods: + if start <= as_of_date < end: + return value + + # Could not find a value for this sid on the as_of_date. + raise NoValueForSid(field=field_name, sid=sid) + @weak_lru_cache(100) def _get_future_sids_for_root_symbol(self, root_symbol, as_of_date_ns): fc_cols = self.futures_contracts.c diff --git a/zipline/errors.py b/zipline/errors.py index 3944ce3e..656f554e 100644 --- a/zipline/errors.py +++ b/zipline/errors.py @@ -304,6 +304,55 @@ Root symbol '{root_symbol}' was not found. """.strip() +class ValueNotFoundForField(ZiplineError): + """ + Raised when a lookup_by_supplementary_mapping() call contains a + value does not exist for the specified mapping type. + """ + msg = """ +Value '{value}' was not found for field '{field}'. +""".strip() + + +class MultipleValuesFoundForField(ZiplineError): + """ + Raised when a lookup_by_supplementary_mapping() call contains a + value that changed over time for the specified field and is + thus not resolvable without additional information provided via + as_of_date. + """ + msg = """ +Multiple occurrences of the value '{value}' found for field '{field}'. +Use the as_of_date' argument to specify when the lookup should be valid. + +Possible options: {options} + """.strip() + + +class NoValueForSid(ZiplineError): + """ + Raised when a get_supplementary_field() call contains a sid that + does not have a value for the specified mapping type. + """ + msg = """ +No '{field}' value found for sid '{sid}'. +""".strip() + + +class MultipleValuesFoundForSid(ZiplineError): + """ + Raised when a get_supplementary_field() call contains a value that + changed over time for the specified field and is thus not resolvable + without additional information provided via as_of_date. + """ + msg = """ +Multiple '{field}' values found for sid '{sid}'. Use the as_of_date' argument +to specify when the lookup should be valid. + +Possible options: {options} +""".strip() + + class SidsNotFound(ZiplineError): """ Raised when a retrieve_asset() or retrieve_all() call contains a