mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 14:40:25 +08:00
ENH: Update AssetDBWriterFromList to consume identifiers
Allow consumption of a list of identifiers, including assigning sids if necessary.
This commit is contained in:
+161
-34
@@ -98,6 +98,7 @@ class AssetDBWriter(with_metaclass(ABCMeta)):
|
||||
def write_all(self,
|
||||
engine,
|
||||
fuzzy_char=None,
|
||||
allow_sid_assignment=True,
|
||||
constraints=True):
|
||||
""" Write pre-supplied data to SQLite.
|
||||
|
||||
@@ -113,6 +114,7 @@ class AssetDBWriter(with_metaclass(ABCMeta)):
|
||||
If True, create SQL ForeignKey and Index constraints.
|
||||
|
||||
"""
|
||||
self.allow_sid_assignment = allow_sid_assignment
|
||||
# Create SQL tables
|
||||
self.init_db(engine, constraints)
|
||||
# Get the data to add to SQL
|
||||
@@ -144,10 +146,10 @@ class AssetDBWriter(with_metaclass(ABCMeta)):
|
||||
{'index': 'sid'},
|
||||
1,
|
||||
).to_dict('records')
|
||||
self.futures_contracts.insert().values(recs).execute(bind=bind)
|
||||
self.asset_router.insert().values([
|
||||
(rec['index'], 'future') for rec in recs
|
||||
]).execute(bind=bind)
|
||||
if recs:
|
||||
self.futures_contracts.insert().values(recs).execute(bind=bind)
|
||||
ar_recs = [(rec['sid'], 'future') for rec in recs]
|
||||
self.asset_router.insert().values(ar_recs).execute(bind=bind)
|
||||
|
||||
def _write_equities(self, equities, fuzzy_char, bind=None):
|
||||
# Apply fuzzy matching.
|
||||
@@ -158,10 +160,10 @@ class AssetDBWriter(with_metaclass(ABCMeta)):
|
||||
{'index': 'sid'},
|
||||
1,
|
||||
).to_dict('records')
|
||||
self.equities.insert().values(recs).execute(bind=bind)
|
||||
self.asset_router.insert().values([
|
||||
(rec['sid'], 'equity') for rec in recs
|
||||
]).execute(bind=bind)
|
||||
if recs:
|
||||
self.equities.insert().values(recs).execute(bind=bind)
|
||||
ar_recs = [(rec['sid'], 'equity') for rec in recs]
|
||||
self.asset_router.insert().values(ar_recs).execute(bind=bind)
|
||||
|
||||
def init_db(self, engine, constraints=True):
|
||||
"""Connect to database and create tables.
|
||||
@@ -299,43 +301,168 @@ class AssetDBWriterFromList(AssetDBWriter):
|
||||
|
||||
def load_data(self):
|
||||
|
||||
equities_data = pd.DataFrame(index=self._equities)
|
||||
# 0) Instantiate empty dictionaries
|
||||
_equities, _futures, _exchanges, _root_symbols = {}, {}, {}, {}
|
||||
|
||||
futures_data = pd.DataFrame(index=self._futures)
|
||||
# 1) Populate dictionaries
|
||||
id_counter = 0
|
||||
for output, data in [(_equities, self._equities),
|
||||
(_futures, self._futures), ]:
|
||||
for identifier in data:
|
||||
if hasattr(identifier, '__int__'):
|
||||
output[identifier.__int__()] = {}
|
||||
else:
|
||||
if self.allow_sid_assignment:
|
||||
output[id_counter] = {'symbol': identifier}
|
||||
id_counter += 1
|
||||
else:
|
||||
SidAssignmentError(identifier=identifier)
|
||||
|
||||
exchange_data = pd.DataFrame(index=self._exchanges)
|
||||
exchange_counter = 0
|
||||
for identifier in self._exchanges:
|
||||
if hasattr(identifier, '__int__'):
|
||||
_exchanges[identifier.__int__()] = {}
|
||||
else:
|
||||
_exchanges[exchange_counter] = {'exchange': identifier}
|
||||
exchange_counter += 1
|
||||
|
||||
root_symbol_data = pd.DataFrame(index=self._root_symbols)
|
||||
root_symbol_counter = 0
|
||||
for identifier in self._root_symbols:
|
||||
if hasattr(identifier, '__int__'):
|
||||
_root_symbols[identifier.__int__()] = {}
|
||||
else:
|
||||
_root_symbols[root_symbol_counter] = \
|
||||
{'root_symbol': identifier}
|
||||
root_symbol_counter += 1
|
||||
|
||||
# Assume the keys are the exchange_ids
|
||||
exchange_cols = ['exchange', 'timezone']
|
||||
exchanges = pd.DataFrame(columns=exchange_cols)
|
||||
# ******** Generate equities data ********
|
||||
equities_defaults = {
|
||||
'symbol': None,
|
||||
'asset_name': None,
|
||||
'start_date': 0,
|
||||
'end_date': None,
|
||||
'first_traded': None,
|
||||
'exchange': None,
|
||||
'fuzzy': None,
|
||||
}
|
||||
equities_cols = {'symbol', 'asset_name', 'start_date',
|
||||
'end_date', 'first_traded', 'exchange', 'fuzzy'}
|
||||
equities_data = pd.DataFrame.from_dict(_equities, orient='index')
|
||||
cols = set(equities_data.columns)
|
||||
|
||||
# Assume the keys are the root_symbol_ids
|
||||
root_symbols_cols = ['root_symbol', 'sector',
|
||||
'description', 'exchange_id']
|
||||
root_symbols = pd.DataFrame(columns=root_symbols_cols)
|
||||
# Drop columns with unrecognised headers.
|
||||
equities_data.drop(cols - (cols & equities_cols), axis=1, inplace=True)
|
||||
|
||||
# Assume the keys are the sids
|
||||
futures_cols = ['symbol', 'root_symbol', 'asset_name',
|
||||
# Get those columns which we need but
|
||||
# for which no data has been supplied.
|
||||
need = equities_cols - set(equities_data.columns)
|
||||
|
||||
# Combine the users supplied data with our required columns.
|
||||
equities_data = pd.concat(
|
||||
(equities_data, pd.DataFrame(
|
||||
self.dict_subset(equities_defaults, need),
|
||||
equities_data.index,
|
||||
)),
|
||||
axis=1,
|
||||
copy=False
|
||||
)
|
||||
|
||||
# ******** Generate futures data ********
|
||||
futures_defaults = {
|
||||
'symbol': None,
|
||||
'root_symbol': None,
|
||||
'asset_name': None,
|
||||
'start_date': 0,
|
||||
'end_date': None,
|
||||
'first_traded': None,
|
||||
'exchange': None,
|
||||
'notice_date': None,
|
||||
'expiration_date': None,
|
||||
'contract_multiplier': 1,
|
||||
}
|
||||
futures_cols = {'symbol', 'root_symbol', 'asset_name',
|
||||
'start_date', 'end_date', 'first_traded', 'exchange',
|
||||
'notice_date', 'expiration_date',
|
||||
'contract_multiplier']
|
||||
futures = pd.DataFrame(columns=futures_cols)
|
||||
'contract_multiplier'}
|
||||
futures_data = pd.DataFrame.from_dict(_futures, orient='index')
|
||||
cols = set(futures_data.columns)
|
||||
|
||||
# Assume the keys are the sids
|
||||
equities_cols = ['symbol', 'asset_name', 'start_date',
|
||||
'end_date', 'first_traded', 'exchange', 'fuzzy']
|
||||
equities = pd.DataFrame(columns=equities_cols)
|
||||
# Drop columns with unrecognised headers.
|
||||
futures_data.drop(cols - (cols & futures_cols), axis=1, inplace=True)
|
||||
|
||||
# Append any data the user has provided.
|
||||
exchanges = exchanges.append(exchange_data, verify_integrity=True)
|
||||
root_symbols = root_symbols.append(root_symbol_data,
|
||||
verify_integrity=True)
|
||||
futures = futures.append(futures_data, verify_integrity=True)
|
||||
equities = equities.append(equities_data, verify_integrity=True)
|
||||
# Get those columns which we need but
|
||||
# for which no data has been supplied.
|
||||
need = futures_cols - set(futures_data.columns)
|
||||
|
||||
return equities, futures, exchanges, root_symbols
|
||||
# Combine the users supplied data with our required columns.
|
||||
futures_data = pd.concat(
|
||||
(futures_data, pd.DataFrame(
|
||||
self.dict_subset(futures_defaults, need),
|
||||
futures_data.index,
|
||||
)),
|
||||
axis=1,
|
||||
copy=False
|
||||
)
|
||||
|
||||
# ******** Generate exchanges data ********
|
||||
exchanges_defaults = {
|
||||
'exchange': None,
|
||||
'timezone': None,
|
||||
}
|
||||
exchanges_cols = {'exchange', 'timezone', }
|
||||
exchanges_data = pd.DataFrame.from_dict(_exchanges, orient='index')
|
||||
cols = set(exchanges_data.columns)
|
||||
|
||||
# Drop columns with unrecognised headers.
|
||||
exchanges_data.drop(cols - (cols & exchanges_cols), axis=1,
|
||||
inplace=True)
|
||||
|
||||
# Get those columns which we need but
|
||||
# for which no data has been supplied.
|
||||
need = exchanges_cols - set(exchanges_data.columns)
|
||||
|
||||
# Combine the users supplied data with our required columns.
|
||||
exchanges_data = pd.concat(
|
||||
(exchanges_data, pd.DataFrame(
|
||||
self.dict_subset(exchanges_defaults, need),
|
||||
exchanges_data.index,
|
||||
)),
|
||||
axis=1,
|
||||
copy=False
|
||||
)
|
||||
|
||||
# ******** Generate root symbols data ********
|
||||
root_symbols_defaults = {
|
||||
'root_symbol': None,
|
||||
'sector': None,
|
||||
'description': None,
|
||||
'exchange_id': None,
|
||||
}
|
||||
root_symbols_cols = {'root_symbol', 'sector',
|
||||
'description', 'exchange_id'}
|
||||
root_symbols_data = pd.DataFrame.from_dict(_root_symbols,
|
||||
orient='index')
|
||||
cols = set(root_symbols_data.columns)
|
||||
|
||||
# Drop columns with unrecognised headers.
|
||||
root_symbols_data.drop(cols - (cols & root_symbols_cols), axis=1,
|
||||
inplace=True)
|
||||
|
||||
# Get those columns which we need but
|
||||
# for which no data has been supplied.
|
||||
need = root_symbols_cols - set(root_symbols_data.columns)
|
||||
|
||||
# Combine the users supplied data with our required columns.
|
||||
root_symbols_data = pd.concat(
|
||||
(root_symbols_data, pd.DataFrame(
|
||||
self.dict_subset(root_symbols_defaults, need),
|
||||
root_symbols_data.index,
|
||||
)),
|
||||
axis=1,
|
||||
copy=False
|
||||
)
|
||||
|
||||
return equities_data, futures_data, exchanges_data, root_symbols_data
|
||||
|
||||
|
||||
class AssetDBWriterFromDictionary(AssetDBWriter):
|
||||
|
||||
Reference in New Issue
Block a user