mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 14:40:25 +08:00
MAINT: Use raw SQL driver rather than pandas to_sql method
Allows us to create an SQL transaction to avoid errors from race conditions.
This commit is contained in:
@@ -123,32 +123,66 @@ class AssetDBWriter(with_metaclass(ABCMeta)):
|
||||
# Get the data to add to SQL
|
||||
equities, futures, exchanges, root_symbols = self.load_data()
|
||||
|
||||
# Write to the SQL tables
|
||||
c = db_conn.cursor()
|
||||
# Write to the SQL tables, using the raw SQL driver instead
|
||||
# of the pandas.DataFrame.to_sql method, as the former
|
||||
# allows us to create an SQL transaction
|
||||
c.execute('BEGIN')
|
||||
# Everything between here and the db_conn.commit()
|
||||
# will be part of of the same SQL transaction.
|
||||
self._write_exchanges(exchanges, db_conn)
|
||||
self._write_root_symbols(root_symbols, db_conn)
|
||||
self._write_futures(futures, db_conn)
|
||||
self._write_equities(equities, db_conn)
|
||||
db_conn.commit()
|
||||
|
||||
def _write_exchanges(self, exchanges, db_conn):
|
||||
|
||||
exchanges.to_sql('futures_exchanges', db_conn, if_exists='replace',
|
||||
index=True, index_label='exchange_id')
|
||||
data = [tuple(x) for x in exchanges.to_records()]
|
||||
c = db_conn.cursor()
|
||||
# The OR IGNORE syntax means we do not insert data
|
||||
# which would violate an SQL constraint.
|
||||
c.executemany("""
|
||||
INSERT OR IGNORE INTO futures_exchanges
|
||||
('exchange_id', 'exchange', 'timezone')
|
||||
VALUES (?, ?, ?)
|
||||
""", data)
|
||||
|
||||
def _write_root_symbols(self, root_symbols, db_conn):
|
||||
|
||||
root_symbols.to_sql('futures_root_symbols', db_conn,
|
||||
if_exists='replace', index=True,
|
||||
index_label='root_symbol_id')
|
||||
data = [tuple(x) for x in root_symbols.to_records()]
|
||||
|
||||
c = db_conn.cursor()
|
||||
c.executemany("""
|
||||
INSERT OR IGNORE INTO futures_root_symbols
|
||||
('root_symbol_id', 'root_symbol', 'sector',
|
||||
'description', 'exchange_id')
|
||||
VALUES(?, ?, ?, ?, ?)
|
||||
""", data)
|
||||
|
||||
def _write_futures(self, futures, db_conn):
|
||||
|
||||
futures.to_sql('futures_contracts', db_conn, if_exists='append',
|
||||
index=True, index_label='sid')
|
||||
data = [tuple(x) for x in futures.to_records()]
|
||||
|
||||
c = db_conn.cursor()
|
||||
c.executemany("""
|
||||
INSERT OR IGNORE INTO futures_contracts
|
||||
('sid', 'symbol', 'root_symbol', 'asset_name',
|
||||
'start_date', 'end_date', 'first_traded', 'exchange',
|
||||
'notice_date', 'expiration_date', 'contract_multiplier')
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", data)
|
||||
|
||||
def _write_equities(self, equities, db_conn):
|
||||
|
||||
equities.to_sql('equities', db_conn, if_exists='append',
|
||||
index=True, index_label='sid')
|
||||
data = [tuple(x) for x in equities.to_records()]
|
||||
c = db_conn.cursor()
|
||||
c.executemany("""
|
||||
INSERT OR IGNORE INTO equities
|
||||
('sid', 'symbol', 'asset_name', 'start_date',
|
||||
'end_date', 'first_traded', 'exchange', 'fuzzy')
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", data)
|
||||
|
||||
def init_db(self,
|
||||
db_conn,
|
||||
|
||||
Reference in New Issue
Block a user