ENH: Adds support for supplementary asset mappings (#1612)

* ENH: Adds support for supplementary asset mappings

- Adds a supplementary_mappings table to the assets.db, to hold point-
  in-time mappings of sids to arbitrary categories of values, e.g.
  alternative identifiers. This bumps ASSET_DB_VERSION.

- Adds supplementary_map and supplementary_map_by_sid to AssetFinder,
  caches of the underlying table that are fully populated on first
  access, which map the supplementary values to sids and vice versa,
  respectively.

- Adds lookup_by_supplementary_mapping method, which fronts
  supplementary_map to query for the asset last known to have held a
  value at a given dt.

- Add get_supplementary_field method, which fronts
  supplementary_map_by_sid to query for the last known value held by an
  asset at a given dt.
This commit is contained in:
Andrew Daniels
2016-12-16 15:20:53 -05:00
committed by GitHub
parent de7c32b22c
commit b501ecc736
7 changed files with 532 additions and 46 deletions
Binary file not shown.
+215
View File
@@ -62,10 +62,14 @@ from zipline.errors import (
EquitiesNotFound,
FutureContractsNotFound,
MultipleSymbolsFound,
MultipleValuesFoundForField,
MultipleValuesFoundForSid,
NoValueForSid,
AssetDBVersionError,
SidsNotFound,
SymbolNotFound,
AssetDBImpossibleDowngrade,
ValueNotFoundForField,
)
from zipline.testing import (
all_subindices,
@@ -966,6 +970,217 @@ class AssetFinderTestCase(WithTradingCalendars, ZiplineTestCase):
))
self.assertEqual({0, 1, 2}, set(self.asset_finder.sids))
def test_lookup_by_supplementary_field(self):
equities = pd.DataFrame.from_records(
[
{
'sid': 0,
'symbol': 'A',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
{
'sid': 1,
'symbol': 'B',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
{
'sid': 2,
'symbol': 'C',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
]
)
equity_supplementary_mappings = pd.DataFrame.from_records(
[
{
'sid': 0,
'field': 'ALT_ID',
'value': '100000000',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2013-6-28', tz='UTC'),
},
{
'sid': 1,
'field': 'ALT_ID',
'value': '100000001',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
{
'sid': 0,
'field': 'ALT_ID',
'value': '100000002',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
{
'sid': 2,
'field': 'ALT_ID',
'value': '100000000',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
]
)
self.write_assets(
equities=equities,
equity_supplementary_mappings=equity_supplementary_mappings,
)
af = self.asset_finder
# Before sid 0 has changed ALT_ID.
dt = pd.Timestamp('2013-6-28', tz='UTC')
asset_0 = af.lookup_by_supplementary_field('ALT_ID', '100000000', dt)
self.assertEqual(asset_0.sid, 0)
asset_1 = af.lookup_by_supplementary_field('ALT_ID', '100000001', dt)
self.assertEqual(asset_1.sid, 1)
# We don't know about this ALT_ID yet.
with self.assertRaisesRegexp(
ValueNotFoundForField,
"Value '{}' was not found for field '{}'.".format(
'100000002',
'ALT_ID',
)
):
af.lookup_by_supplementary_field('ALT_ID', '100000002', dt)
# After all assets have ended.
dt = pd.Timestamp('2014-01-02', tz='UTC')
asset_2 = af.lookup_by_supplementary_field('ALT_ID', '100000000', dt)
self.assertEqual(asset_2.sid, 2)
asset_1 = af.lookup_by_supplementary_field('ALT_ID', '100000001', dt)
self.assertEqual(asset_1.sid, 1)
asset_0 = af.lookup_by_supplementary_field('ALT_ID', '100000002', dt)
self.assertEqual(asset_0.sid, 0)
# At this point both sids 0 and 2 have held this value, so an
# as_of_date is required.
expected_in_repr = (
"Multiple occurrences of the value '{}' found for field '{}'."
).format('100000000', 'ALT_ID')
with self.assertRaisesRegexp(
MultipleValuesFoundForField,
expected_in_repr,
):
af.lookup_by_supplementary_field('ALT_ID', '100000000', None)
def test_get_supplementary_field(self):
equities = pd.DataFrame.from_records(
[
{
'sid': 0,
'symbol': 'A',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
{
'sid': 1,
'symbol': 'B',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
{
'sid': 2,
'symbol': 'C',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
'exchange': 'TEST',
},
]
)
equity_supplementary_mappings = pd.DataFrame.from_records(
[
{
'sid': 0,
'field': 'ALT_ID',
'value': '100000000',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2013-6-28', tz='UTC'),
},
{
'sid': 1,
'field': 'ALT_ID',
'value': '100000001',
'start_date': pd.Timestamp('2013-1-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
{
'sid': 0,
'field': 'ALT_ID',
'value': '100000002',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
{
'sid': 2,
'field': 'ALT_ID',
'value': '100000000',
'start_date': pd.Timestamp('2013-7-1', tz='UTC'),
'end_date': pd.Timestamp('2014-1-1', tz='UTC'),
},
]
)
self.write_assets(
equities=equities,
equity_supplementary_mappings=equity_supplementary_mappings,
)
finder = self.asset_finder
# Before sid 0 has changed ALT_ID and sid 2 has started.
dt = pd.Timestamp('2013-6-28', tz='UTC')
for sid, expected in [(0, '100000000'), (1, '100000001')]:
self.assertEqual(
finder.get_supplementary_field(sid, 'ALT_ID', dt),
expected,
)
# Since sid 2 has not yet started, we don't know about its
# ALT_ID.
with self.assertRaisesRegexp(
NoValueForSid,
"No '{}' value found for sid '{}'.".format('ALT_ID', 2),
):
finder.get_supplementary_field(2, 'ALT_ID', dt),
# After all assets have ended.
dt = pd.Timestamp('2014-01-02', tz='UTC')
for sid, expected in [
(0, '100000002'), (1, '100000001'), (2, '100000000'),
]:
self.assertEqual(
finder.get_supplementary_field(sid, 'ALT_ID', dt),
expected,
)
# Sid 0 has historically held two values for ALT_ID by this dt.
with self.assertRaisesRegexp(
MultipleValuesFoundForSid,
"Multiple '{}' values found for sid '{}'.".format('ALT_ID', 0),
):
finder.get_supplementary_field(0, 'ALT_ID', None),
def test_group_by_type(self):
equities = make_simple_equity_info(
range(5),
+5
View File
@@ -309,3 +309,8 @@ def _downgrade_v5(op):
'equities',
['fuzzy_symbol'],
)
@downgrades(6)
def _downgrade_v6(op):
op.drop_table('equity_supplementary_mappings')
+18 -1
View File
@@ -6,7 +6,7 @@ import sqlalchemy as sa
# assets database
# NOTE: When upgrading this remember to add a downgrade in:
# .asset_db_migrations
ASSET_DB_VERSION = 5
ASSET_DB_VERSION = 6
# A frozenset of the names of all tables in the assets db
# NOTE: When modifying this schema, update the ASSET_DB_VERSION value
@@ -14,6 +14,7 @@ asset_db_table_names = frozenset({
'asset_router',
'equities',
'equity_symbol_mappings',
'equity_supplementary_mappings',
'futures_contracts',
'futures_exchanges',
'futures_root_symbols',
@@ -84,6 +85,22 @@ equity_symbol_mappings = sa.Table(
),
)
equity_supplementary_mappings = sa.Table(
'equity_supplementary_mappings',
metadata,
sa.Column(
'sid',
sa.Integer,
sa.ForeignKey(equities.c.sid),
nullable=False,
primary_key=True
),
sa.Column('field', sa.Text, nullable=False, primary_key=True),
sa.Column('start_date', sa.Integer, nullable=False, primary_key=True),
sa.Column('end_date', sa.Integer, nullable=False),
sa.Column('value', sa.Text, nullable=False),
)
futures_exchanges = sa.Table(
'futures_exchanges',
metadata,
+66 -5
View File
@@ -28,6 +28,7 @@ from zipline.assets.asset_db_schema import (
asset_router,
equities as equities_table,
equity_symbol_mappings,
equity_supplementary_mappings as equity_supplementary_mappings_table,
futures_contracts as futures_contracts_table,
futures_exchanges,
futures_root_symbols,
@@ -47,6 +48,7 @@ AssetData = namedtuple(
'futures',
'exchanges',
'root_symbols',
'equity_supplementary_mappings',
),
)
@@ -102,6 +104,16 @@ _root_symbols_defaults = {
'exchange': None,
}
# Default values for the equity_supplementary_mappings DataFrame
_equity_supplementary_mappings_defaults = {
'sid': None,
'value': None,
'field': None,
'start_date': 0,
'end_date': 2 ** 62 - 1,
}
# Fuzzy symbol delimiters that may break up a company symbol and share class
_delimited_symbol_delimiters_regex = re.compile(r'[./\-_]')
_delimited_symbol_default_triggers = frozenset({np.nan, None, ''})
@@ -355,6 +367,7 @@ class AssetDBWriter(object):
futures=None,
exchanges=None,
root_symbols=None,
equity_supplementary_mappings=None,
chunk_size=DEFAULT_CHUNK_SIZE):
"""Write asset metadata to a sqlite database.
@@ -379,7 +392,7 @@ class AssetDBWriter(object):
The exchange where this asset is traded.
The index of this dataframe should contain the sids.
futures : pd.Dataframe, optional
futures : pd.DataFrame, optional
The future contract metadata. The columns for this dataframe are:
symbol : str
@@ -410,7 +423,7 @@ class AssetDBWriter(object):
multiplier: float
The amount of the underlying asset represented by this
contract.
exchanges : pd.Dataframe, optional
exchanges : pd.DataFrame, optional
The exchanges where assets can be traded. The columns of this
dataframe are:
@@ -418,7 +431,7 @@ class AssetDBWriter(object):
The name of the exchange.
timezone : str
The timezone of the exchange.
root_symbols : pd.Dataframe, optional
root_symbols : pd.DataFrame, optional
The root symbols for the futures contracts. The columns for this
dataframe are:
@@ -432,6 +445,8 @@ class AssetDBWriter(object):
A short description of this root symbol.
exchange : str
The exchange where this root symbol is traded.
equity_supplementary_mappings : pd.DataFrame, optional
Additional mappings from values of abitrary type to assets.
chunk_size : int, optional
The amount of rows to write to the SQLite table at once.
This defaults to the default number of bind params in sqlite.
@@ -452,6 +467,11 @@ class AssetDBWriter(object):
futures if futures is not None else pd.DataFrame(),
exchanges if exchanges is not None else pd.DataFrame(),
root_symbols if root_symbols is not None else pd.DataFrame(),
(
equity_supplementary_mappings
if equity_supplementary_mappings is not None
else pd.DataFrame()
),
)
# Write the data to SQL.
self._write_df_to_table(
@@ -466,6 +486,13 @@ class AssetDBWriter(object):
conn,
chunk_size,
)
self._write_df_to_table(
equity_supplementary_mappings_table,
data.equity_supplementary_mappings,
conn,
chunk_size,
idx=False,
)
self._write_assets(
'future',
data.futures,
@@ -480,10 +507,19 @@ class AssetDBWriter(object):
mapping_data=data.equities_mappings,
)
def _write_df_to_table(self, tbl, df, txn, chunk_size, idx_label=None):
def _write_df_to_table(
self,
tbl,
df,
txn,
chunk_size,
idx=True,
idx_label=None,
):
df.to_sql(
tbl.name,
txn.connection,
index=idx,
index_label=(
idx_label
if idx_label is not None else
@@ -639,7 +675,25 @@ class AssetDBWriter(object):
return futures_output
def _load_data(self, equities, futures, exchanges, root_symbols):
def _normalize_equity_supplementary_mappings(self, mappings):
mappings_output = _generate_output_dataframe(
data_subset=mappings,
defaults=_equity_supplementary_mappings_defaults,
)
for col in ('start_date', 'end_date'):
mappings_output[col] = _dt_to_epoch_ns(mappings_output[col])
return mappings_output
def _load_data(
self,
equities,
futures,
exchanges,
root_symbols,
equity_supplementary_mappings,
):
"""
Returns a standard set of pandas.DataFrames:
equities, futures, exchanges, root_symbols
@@ -657,6 +711,12 @@ class AssetDBWriter(object):
equities_output, equities_mappings = self._normalize_equities(equities)
futures_output = self._normalize_futures(futures)
equity_supplementary_mappings_output = (
self._normalize_equity_supplementary_mappings(
equity_supplementary_mappings,
)
)
exchanges_output = _generate_output_dataframe(
data_subset=exchanges,
defaults=_exchanges_defaults,
@@ -673,4 +733,5 @@ class AssetDBWriter(object):
futures=futures_output,
exchanges=exchanges_output,
root_symbols=root_symbols_output,
equity_supplementary_mappings=equity_supplementary_mappings_output,
)
+179 -40
View File
@@ -43,6 +43,10 @@ from zipline.errors import (
FutureContractsNotFound,
MapAssetIdentifierIndexError,
MultipleSymbolsFound,
MultipleValuesFoundForField,
MultipleValuesFoundForSid,
NoValueForSid,
ValueNotFoundForField,
SidsNotFound,
SymbolNotFound,
)
@@ -90,7 +94,67 @@ _asset_timestamp_fields = frozenset({
'auto_close_date',
})
SymbolOwnership = namedtuple('SymbolOwnership', 'start end sid symbol')
OwnershipPeriod = namedtuple('OwnershipPeriod', 'start end sid value')
def merge_ownership_periods(mappings):
"""
Given a dict of mappings where the values are lists of
OwnershipPeriod objects, returns a dict with the same structure with
new OwnershipPeriod objects adjusted so that the periods have no
gaps.
Orders the periods chronologically, and pushes forward the end date
of each period to match the start date of the following period. The
end date of the last period pushed forward to the max Timestamp.
"""
return valmap(
lambda v: tuple(
OwnershipPeriod(
a.start,
b.start,
a.sid,
a.value,
) for a, b in sliding_window(
2,
concatv(
sorted(v),
# concat with a fake ownership object to make the last
# end date be max timestamp
[OwnershipPeriod(
pd.Timestamp.max.tz_localize('utc'),
None,
None,
None,
)],
),
)
),
mappings,
)
def build_ownership_map(table, key_from_row, value_from_row):
"""
Builds a dict mapping to lists of OwnershipPeriods, from a db table.
"""
rows = sa.select(table.c).execute().fetchall()
mappings = {}
for row in rows:
mappings.setdefault(
key_from_row(row),
[],
).append(
OwnershipPeriod(
pd.Timestamp(row.start_date, unit='ns', tz='utc'),
pd.Timestamp(row.end_date, unit='ns', tz='utc'),
row.sid,
value_from_row(row),
),
)
return merge_ownership_periods(mappings)
@curry
@@ -259,46 +323,12 @@ class AssetFinder(object):
@lazyval
def symbol_ownership_map(self):
rows = sa.select(self.equity_symbol_mappings.c).execute().fetchall()
mappings = {}
for row in rows:
mappings.setdefault(
(row.company_symbol, row.share_class_symbol),
[],
).append(
SymbolOwnership(
pd.Timestamp(row.start_date, unit='ns', tz='utc'),
pd.Timestamp(row.end_date, unit='ns', tz='utc'),
row.sid,
row.symbol,
),
)
return valmap(
lambda v: tuple(
SymbolOwnership(
a.start,
b.start,
a.sid,
a.symbol,
) for a, b in sliding_window(
2,
concatv(
sorted(v),
# concat with a fake ownership object to make the last
# end date be max timestamp
[SymbolOwnership(
pd.Timestamp.max.tz_localize('utc'),
None,
None,
None,
)],
),
)
return build_ownership_map(
table=self.equity_symbol_mappings,
key_from_row=(
lambda row: (row.company_symbol, row.share_class_symbol)
),
mappings,
factory=lambda: mappings,
value_from_row=lambda row: row.symbol,
)
@lazyval
@@ -313,6 +343,22 @@ class AssetFinder(object):
fuzzy_owners.sort()
return fuzzy_mappings
@lazyval
def equity_supplementary_map(self):
return build_ownership_map(
table=self.equity_supplementary_mappings,
key_from_row=lambda row: (row.field, row.value),
value_from_row=lambda row: row.value,
)
@lazyval
def equity_supplementary_map_by_sid(self):
return build_ownership_map(
table=self.equity_supplementary_mappings,
key_from_row=lambda row: (row.field, row.sid),
value_from_row=lambda row: row.value,
)
def lookup_asset_types(self, sids):
"""
Retrieve asset types for a list of sids.
@@ -809,6 +855,99 @@ class AssetFinder(object):
raise SymbolNotFound(symbol=symbol)
return self.retrieve_asset(data['sid'])
def lookup_by_supplementary_field(self, field_name, value, as_of_date):
try:
owners = self.equity_supplementary_map[
field_name,
value,
]
assert owners, 'empty owners list for %r' % (field_name, value)
except KeyError:
# no equity has ever held this value
raise ValueNotFoundForField(field=field_name, value=value)
if not as_of_date:
if len(owners) > 1:
# more than one equity has held this value, this is ambigious
# without the date
raise MultipleValuesFoundForField(
field=field_name,
value=value,
options=set(map(
compose(self.retrieve_asset, attrgetter('sid')),
owners,
)),
)
# exactly one equity has ever held this value, we may resolve
# without the date
return self.retrieve_asset(owners[0].sid)
for start, end, sid, _ in owners:
if start <= as_of_date < end:
# find the equity that owned it on the given asof date
return self.retrieve_asset(sid)
# no equity held the value on the given asof date
raise ValueNotFoundForField(field=field_name, value=value)
def get_supplementary_field(
self,
sid,
field_name,
as_of_date,
):
"""Get the value of a supplementary field for an asset.
Parameters
----------
sid : int
The sid of the asset to query.
field_name : str
Name of the supplementary field.
as_of_date : pd.Timestamp, None
The last known value on this date is returned. If None, a
value is returned only if we've only ever had one value for
this sid. If None and we've had multiple values,
MultipleValuesFoundForSid is raised.
Raises
------
NoValueForSid
If we have no values for this asset, or no values was known
on this as_of_date.
MultipleValuesFoundForSid
If we have had multiple values for this asset over time, and
None was passed for as_of_date.
"""
try:
periods = self.equity_supplementary_map_by_sid[
field_name,
sid,
]
assert periods, 'empty periods list for %r' % (field_name, sid)
except KeyError:
raise NoValueForSid(field=field_name, sid=sid)
if not as_of_date:
if len(periods) > 1:
# This equity has held more than one value, this is ambigious
# without the date
raise MultipleValuesFoundForSid(
field=field_name,
sid=sid,
options={p.value for p in periods},
)
# this equity has only ever held this value, we may resolve
# without the date
return periods[0].value
for start, end, _, value in periods:
if start <= as_of_date < end:
return value
# Could not find a value for this sid on the as_of_date.
raise NoValueForSid(field=field_name, sid=sid)
@weak_lru_cache(100)
def _get_future_sids_for_root_symbol(self, root_symbol, as_of_date_ns):
fc_cols = self.futures_contracts.c
+49
View File
@@ -304,6 +304,55 @@ Root symbol '{root_symbol}' was not found.
""".strip()
class ValueNotFoundForField(ZiplineError):
"""
Raised when a lookup_by_supplementary_mapping() call contains a
value does not exist for the specified mapping type.
"""
msg = """
Value '{value}' was not found for field '{field}'.
""".strip()
class MultipleValuesFoundForField(ZiplineError):
"""
Raised when a lookup_by_supplementary_mapping() call contains a
value that changed over time for the specified field and is
thus not resolvable without additional information provided via
as_of_date.
"""
msg = """
Multiple occurrences of the value '{value}' found for field '{field}'.
Use the as_of_date' argument to specify when the lookup should be valid.
Possible options: {options}
""".strip()
class NoValueForSid(ZiplineError):
"""
Raised when a get_supplementary_field() call contains a sid that
does not have a value for the specified mapping type.
"""
msg = """
No '{field}' value found for sid '{sid}'.
""".strip()
class MultipleValuesFoundForSid(ZiplineError):
"""
Raised when a get_supplementary_field() call contains a value that
changed over time for the specified field and is thus not resolvable
without additional information provided via as_of_date.
"""
msg = """
Multiple '{field}' values found for sid '{sid}'. Use the as_of_date' argument
to specify when the lookup should be valid.
Possible options: {options}
""".strip()
class SidsNotFound(ZiplineError):
"""
Raised when a retrieve_asset() or retrieve_all() call contains a