mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 14:40:25 +08:00
DEP: Remove legacy writer classes
This commit is contained in:
@@ -3,20 +3,11 @@ from abc import (
|
||||
abstractmethod,
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas.tseries.tools import normalize_date
|
||||
from six import with_metaclass, string_types
|
||||
from six import with_metaclass
|
||||
import sqlalchemy as sa
|
||||
|
||||
from zipline.errors import (
|
||||
ConsumeAssetMetaDataError,
|
||||
InvalidAssetType,
|
||||
SidAssignmentError,
|
||||
)
|
||||
from zipline.assets import (
|
||||
Asset, Equity, Future
|
||||
)
|
||||
from zipline.errors import SidAssignmentError
|
||||
|
||||
ASSET_FIELDS = frozenset({
|
||||
'sid',
|
||||
@@ -763,399 +754,3 @@ class AssetDBWriterFromDataFrame(AssetDBWriter):
|
||||
)
|
||||
|
||||
return equities_data, futures_data, exchanges_data, root_symbols_data
|
||||
|
||||
|
||||
class AssetDBWriterLegacy(AssetDBWriter):
|
||||
"""
|
||||
Overwrites some of the functionality of AssetDBWriter.
|
||||
Used for backward compatibility. Will be deprecated.
|
||||
|
||||
Methods
|
||||
-------
|
||||
write_all(db_conn, fuzzy_char=None, allow_sid_assignment=True,
|
||||
constraints=False)
|
||||
Write the data supplied at initialization to the database.
|
||||
write_block(identifier, **kwargs)
|
||||
Inserts the given metadata kwargs to the entry for the given
|
||||
sid. Matching fields in the existing entry will be overwritten.
|
||||
Will be deprecated in future versions of zipline.
|
||||
init_db(db_conn, constraints=False)
|
||||
Create the SQLite tables (called by write_all).
|
||||
load_data(equities, futures, exchanges, root_symbols)
|
||||
Returns data in standard format.
|
||||
consume_identifiers(self, db_conn, fuzzy_char=None,
|
||||
allow_sid_assignment=True,
|
||||
constraints=False)
|
||||
Consume the identifiers supplied at initialization.
|
||||
Will be deprecated in future versions of zipline.
|
||||
"""
|
||||
|
||||
def __init__(self, data):
|
||||
|
||||
self._data = data
|
||||
self.metadata_cache = {}
|
||||
|
||||
def write_all(self,
|
||||
db_conn,
|
||||
fuzzy_char=None,
|
||||
allow_sid_assignment=True,
|
||||
constraints=False):
|
||||
"""Top-level entry point for writing a new asset db.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db_conn: sqlite3.Connection
|
||||
A connection to our SQLite database.
|
||||
fuzzy_char: string
|
||||
A string to be used in fuzzy matching.
|
||||
allow_sid_assignment: boolean
|
||||
If True, allow the writer to assign sids where necessary.
|
||||
constraints: boolean
|
||||
If True, add SQL constraints to tables.
|
||||
"""
|
||||
|
||||
self.conn = db_conn
|
||||
|
||||
self.fuzzy_char = fuzzy_char
|
||||
self.allow_sid_assignment = allow_sid_assignment
|
||||
|
||||
# This flag controls if the AssetDBWriter is allowed to generate its
|
||||
# own sids. If False, metadata that does not contain a sid will raise
|
||||
# an exception when building assets.
|
||||
if allow_sid_assignment:
|
||||
self.end_date_to_assign = normalize_date(
|
||||
pd.Timestamp('now', tz='UTC'))
|
||||
|
||||
# Create SQL tables.
|
||||
self.init_db(self.conn, constraints)
|
||||
|
||||
# Write to SQL tables.
|
||||
for sid, metadata in self.load_data(self._data):
|
||||
self.write_block(sid, **metadata)
|
||||
|
||||
def write_block(self, identifier, **kwargs):
|
||||
"""
|
||||
Inserts the given metadata kwargs to the entry for the given
|
||||
sid. Matching fields in the existing entry will be overwritten.
|
||||
Will be deprecated in future versions of zipline.
|
||||
"""
|
||||
|
||||
if identifier in self.metadata_cache:
|
||||
# Multiple pass insertion no longer supported.
|
||||
# This could and probably should raise an Exception, but is
|
||||
# currently just a short-circuit for compatibility with existing
|
||||
# testing structure in the test_algorithm module which creates
|
||||
# multiple sources which all insert redundant metadata.
|
||||
return
|
||||
|
||||
entry = {}
|
||||
|
||||
for key, value in kwargs.items():
|
||||
# Do not accept invalid fields
|
||||
if key not in ASSET_FIELDS:
|
||||
continue
|
||||
# Do not accept Nones
|
||||
if value is None:
|
||||
continue
|
||||
# Do not accept empty strings
|
||||
if value == '':
|
||||
continue
|
||||
# Do not accept NaNs from dataframes
|
||||
if isinstance(value, float) and np.isnan(value):
|
||||
continue
|
||||
entry[key] = value
|
||||
|
||||
# Check if the sid is declared
|
||||
try:
|
||||
entry['sid']
|
||||
except KeyError:
|
||||
# If the sid is not a sid, assign one
|
||||
if hasattr(identifier, '__int__'):
|
||||
entry['sid'] = identifier.__int__()
|
||||
else:
|
||||
if self.allow_sid_assignment:
|
||||
# Assign the sid the value of its insertion order.
|
||||
# This assumes that we are assigning values to all assets.
|
||||
entry['sid'] = len(self.metadata_cache)
|
||||
else:
|
||||
raise SidAssignmentError(identifier=identifier)
|
||||
|
||||
# If the file_name is in the kwargs, it will be used as the symbol
|
||||
try:
|
||||
entry['symbol'] = entry.pop('file_name')
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# If the identifier coming in was a string and there is no defined
|
||||
# symbol yet, set the symbol to the incoming identifier
|
||||
try:
|
||||
entry['symbol']
|
||||
pass
|
||||
except KeyError:
|
||||
if isinstance(identifier, string_types):
|
||||
entry['symbol'] = identifier
|
||||
|
||||
# If the company_name is in the kwargs, it may be the asset_name
|
||||
try:
|
||||
company_name = entry.pop('company_name')
|
||||
try:
|
||||
entry['asset_name']
|
||||
except KeyError:
|
||||
entry['asset_name'] = company_name
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# If dates are given as nanos, pop them
|
||||
try:
|
||||
entry['start_date'] = entry.pop('start_date_nano')
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
entry['end_date'] = entry.pop('end_date_nano')
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
entry['notice_date'] = entry.pop('notice_date_nano')
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
entry['expiration_date'] = entry.pop('expiration_date_nano')
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Process dates to Timestamps
|
||||
try:
|
||||
entry['start_date'] = pd.Timestamp(entry['start_date'], tz='UTC')
|
||||
except KeyError:
|
||||
# Set a default start_date of the EPOCH, so that all date queries
|
||||
# work when a start date is not provided.
|
||||
entry['start_date'] = pd.Timestamp(0, tz='UTC')
|
||||
try:
|
||||
# Set a default end_date of 'now', so that all date queries
|
||||
# work when a end date is not provided.
|
||||
entry['end_date'] = pd.Timestamp(entry['end_date'], tz='UTC')
|
||||
except KeyError:
|
||||
entry['end_date'] = self.end_date_to_assign
|
||||
try:
|
||||
entry['notice_date'] = pd.Timestamp(entry['notice_date'],
|
||||
tz='UTC')
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
entry['expiration_date'] = pd.Timestamp(entry['expiration_date'],
|
||||
tz='UTC')
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Build an Asset of the appropriate type, default to Equity
|
||||
asset_type = entry.pop('asset_type', 'equity')
|
||||
if asset_type.lower() == 'equity':
|
||||
try:
|
||||
fuzzy = entry['symbol'].replace(self.fuzzy_char, '') \
|
||||
if self.fuzzy_char else None
|
||||
except KeyError:
|
||||
fuzzy = None
|
||||
asset = Equity(**entry)
|
||||
c = self.conn.cursor()
|
||||
t = (asset.sid,
|
||||
asset.symbol,
|
||||
asset.asset_name,
|
||||
asset.start_date.value if asset.start_date else None,
|
||||
asset.end_date.value if asset.end_date else None,
|
||||
asset.first_traded.value if asset.first_traded else None,
|
||||
asset.exchange,
|
||||
fuzzy)
|
||||
c.execute("""
|
||||
INSERT INTO equities (
|
||||
sid,
|
||||
symbol,
|
||||
asset_name,
|
||||
start_date,
|
||||
end_date,
|
||||
first_traded,
|
||||
exchange,
|
||||
fuzzy)
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", t)
|
||||
|
||||
t = (asset.sid,
|
||||
'equity')
|
||||
c.execute("""
|
||||
INSERT INTO asset_router (
|
||||
sid, asset_type)
|
||||
VALUES(?, ?)
|
||||
""", t)
|
||||
|
||||
elif asset_type.lower() == 'future':
|
||||
asset = Future(**entry)
|
||||
c = self.conn.cursor()
|
||||
t = (asset.sid,
|
||||
asset.symbol,
|
||||
asset.asset_name,
|
||||
asset.start_date.value if asset.start_date else None,
|
||||
asset.end_date.value if asset.end_date else None,
|
||||
asset.first_traded.value if asset.first_traded else None,
|
||||
asset.exchange,
|
||||
asset.root_symbol,
|
||||
asset.notice_date.value if asset.notice_date else None,
|
||||
asset.expiration_date.value
|
||||
if asset.expiration_date else None,
|
||||
asset.contract_multiplier)
|
||||
c.execute("""
|
||||
INSERT INTO futures_contracts(
|
||||
sid,
|
||||
symbol,
|
||||
asset_name,
|
||||
start_date,
|
||||
end_date,
|
||||
first_traded,
|
||||
exchange,
|
||||
root_symbol,
|
||||
notice_date,
|
||||
expiration_date,
|
||||
contract_multiplier)
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", t)
|
||||
|
||||
t = (asset.sid,
|
||||
'future')
|
||||
c.execute("""
|
||||
INSERT INTO asset_router (
|
||||
sid,
|
||||
asset_type)
|
||||
VALUES(?, ?)
|
||||
""", t)
|
||||
else:
|
||||
raise InvalidAssetType(asset_type=asset_type)
|
||||
|
||||
self.metadata_cache[identifier] = entry
|
||||
|
||||
self.conn.commit()
|
||||
|
||||
def consume_identifiers(self, db_conn, fuzzy_char=None,
|
||||
allow_sid_assignment=True, constraints=False):
|
||||
"""
|
||||
Consumes the given identifiers in to the metadata cache of this
|
||||
AssetDBWriter, and adds to database.
|
||||
Will be deprecated in future versions of zipline.
|
||||
"""
|
||||
|
||||
self.conn = db_conn
|
||||
self.fuzzy_char = fuzzy_char
|
||||
self.allow_sid_assignment = allow_sid_assignment
|
||||
|
||||
# This flag controls if the AssetDBWriter is allowed to generate its
|
||||
# own sids. If False, metadata that does not contain a sid will raiset
|
||||
# an exception when building assets.
|
||||
if allow_sid_assignment:
|
||||
self.end_date_to_assign = normalize_date(
|
||||
pd.Timestamp('now', tz='UTC'))
|
||||
|
||||
# Create SQL tables
|
||||
self.init_db(self.conn, constraints)
|
||||
|
||||
for identifier in self._data:
|
||||
# Handle case where full Assets are passed in
|
||||
# For example, in the creation of a DataFrameSource, the source's
|
||||
# 'sid' args may be full Assets
|
||||
if isinstance(identifier, Asset):
|
||||
sid = identifier.sid
|
||||
metadata = identifier.to_dict()
|
||||
metadata['asset_type'] = identifier.__class__.__name__
|
||||
self.write_block(sid, **metadata)
|
||||
else:
|
||||
self.write_block(identifier)
|
||||
|
||||
|
||||
class NullAssetDBWriterLegacy(AssetDBWriterLegacy):
|
||||
"""
|
||||
An implementation of AssetDBWriterLegacy for use
|
||||
when no data is initially specified.
|
||||
"""
|
||||
|
||||
def load_data(self, __):
|
||||
for i in iter(()):
|
||||
yield
|
||||
|
||||
|
||||
class AssetDBWriterLegacyFromList(AssetDBWriterLegacy):
|
||||
"""
|
||||
Returns a generator yielding entries from sid_list.
|
||||
"""
|
||||
def load_data(self, sid_list):
|
||||
|
||||
for i in sid_list:
|
||||
yield i
|
||||
|
||||
|
||||
class AssetDBWriterLegacyFromDictionary(AssetDBWriterLegacy):
|
||||
""" An implementation of AssetDBWriter for use
|
||||
with dictionaries.
|
||||
|
||||
Expects a dictionary to be passed to load_data
|
||||
with the following format:
|
||||
|
||||
{id_0: {start_date : ...}, id_1: {start_data: ...}, ...}
|
||||
"""
|
||||
|
||||
def load_data(self, dict_):
|
||||
"""
|
||||
Returns a generator yielding pairs of (identifier, metadata)
|
||||
"""
|
||||
for identifier, metadata in dict_.items():
|
||||
yield identifier, metadata
|
||||
|
||||
|
||||
class AssetDBWriterLegacyFromDataFrame(AssetDBWriterLegacy):
|
||||
""" An implementation of AssetDBWriter for use
|
||||
with pandas DataFrames.
|
||||
|
||||
Expects dataframe to be passed to load_data
|
||||
to have the following structure:
|
||||
* column names must be the metadata fields
|
||||
* index must be the different asset identifiers
|
||||
* array contents should be the metadata value
|
||||
"""
|
||||
|
||||
def load_data(self, dataframe):
|
||||
"""
|
||||
Returns a generator yielding pairs of (identifier, metadata)
|
||||
"""
|
||||
for identifier, row in dataframe.iterrows():
|
||||
yield identifier, row.to_dict()
|
||||
|
||||
|
||||
class AssetDBWriterLegacyFromReadable(AssetDBWriterLegacy):
|
||||
""" An implementation of AssetDBWriter for use
|
||||
with objects with a 'read' property.
|
||||
|
||||
The object's read method must return rows
|
||||
containing at least one of 'sid' or 'symbol' along
|
||||
with the other metadata fields.
|
||||
"""
|
||||
|
||||
def load_data(self, readable):
|
||||
"""
|
||||
Returns a generator yielding pairs of (identifier, metadata)
|
||||
"""
|
||||
for row in readable.read():
|
||||
id_metadata = {}
|
||||
for field in ASSET_FIELDS:
|
||||
try:
|
||||
row_value = row[field]
|
||||
# Avoid passing placeholder strings
|
||||
if row_value and (row_value != 'None'):
|
||||
id_metadata[field] = row[field]
|
||||
except KeyError:
|
||||
continue
|
||||
except IndexError:
|
||||
continue
|
||||
if 'sid' in id_metadata:
|
||||
identifier = id_metadata['sid']
|
||||
del id_metadata['sid']
|
||||
elif 'symbol' in id_metadata:
|
||||
identifier = id_metadata['symbol']
|
||||
del id_metadata['symbol']
|
||||
else:
|
||||
raise ConsumeAssetMetaDataError(obj=row)
|
||||
yield identifier, id_metadata
|
||||
|
||||
Reference in New Issue
Block a user