Merge pull request #866 from quantopian/chunk-queries

BUG: chunk sqlite queries
This commit is contained in:
Jean Bredeche
2015-11-20 14:07:37 -05:00
+30 -17
View File
@@ -21,7 +21,7 @@ import numpy as np
import pandas as pd
from pandas import isnull
from six import with_metaclass, string_types, viewkeys
from six.moves import map as imap
from six.moves import map as imap, range
import sqlalchemy as sa
from zipline.errors import (
@@ -41,6 +41,7 @@ from zipline.assets.asset_writer import (
check_version_info,
ASSET_DB_VERSION,
asset_db_table_names,
SQLITE_MAX_VARIABLE_NUMBER
)
from zipline.utils.control_flow import invert
@@ -146,7 +147,9 @@ class AssetFinder(object):
types : dict[sid -> str or None]
Asset types for the provided sids.
"""
found, missing = {}, set()
found = {}
missing = set()
for sid in sids:
try:
found[sid] = self._asset_type_cache[sid]
@@ -157,18 +160,26 @@ class AssetFinder(object):
return found
router_cols = self.asset_router.c
query = sa.select((router_cols.sid, router_cols.asset_type)).where(
self.asset_router.c.sid.in_(map(int, missing))
)
for sid, type_ in query.execute().fetchall():
missing.remove(sid)
found[sid] = self._asset_type_cache[sid] = type_
for sid in missing:
found[sid] = self._asset_type_cache[sid] = None
for assets in self._group_into_chunks(missing):
query = sa.select((router_cols.sid, router_cols.asset_type)).where(
self.asset_router.c.sid.in_(map(int, assets))
)
for sid, type_ in query.execute().fetchall():
missing.remove(sid)
found[sid] = self._asset_type_cache[sid] = type_
for sid in missing:
found[sid] = self._asset_type_cache[sid] = None
return found
@staticmethod
def _group_into_chunks(items, chunk_size=SQLITE_MAX_VARIABLE_NUMBER):
items = list(items)
return [items[x:x+chunk_size]
for x in range(0, len(items), chunk_size)]
def group_by_type(self, sids):
"""
Group a list of sids by asset type.
@@ -343,14 +354,16 @@ class AssetFinder(object):
return {}
cache = self._asset_cache
hits = {}
# Load misses from the db.
query = self._select_assets_by_sid(asset_tbl, sids)
for row in imap(dict, query.execute().fetchall()):
asset = asset_type(**_convert_asset_timestamp_fields(row))
sid = asset.sid
hits[sid] = cache[sid] = asset
for assets in self._group_into_chunks(sids):
# Load misses from the db.
query = self._select_assets_by_sid(asset_tbl, assets)
for row in imap(dict, query.execute().fetchall()):
asset = asset_type(**_convert_asset_timestamp_fields(row))
sid = asset.sid
hits[sid] = cache[sid] = asset
# If we get here, it means something in our code thought that a
# particular sid was an equity/future and called this function with a