From f48f88861d8dfe8907328775990774e6cfd01963 Mon Sep 17 00:00:00 2001 From: Jean Bredeche Date: Thu, 19 Nov 2015 12:08:27 -0500 Subject: [PATCH] BUG: chunk sqlite queries into groups of 999. --- zipline/assets/assets.py | 47 +++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/zipline/assets/assets.py b/zipline/assets/assets.py index 43d79024..712c24de 100644 --- a/zipline/assets/assets.py +++ b/zipline/assets/assets.py @@ -21,7 +21,7 @@ import numpy as np import pandas as pd from pandas import isnull from six import with_metaclass, string_types, viewkeys -from six.moves import map as imap +from six.moves import map as imap, range import sqlalchemy as sa from zipline.errors import ( @@ -41,6 +41,7 @@ from zipline.assets.asset_writer import ( check_version_info, ASSET_DB_VERSION, asset_db_table_names, + SQLITE_MAX_VARIABLE_NUMBER ) from zipline.utils.control_flow import invert @@ -146,7 +147,9 @@ class AssetFinder(object): types : dict[sid -> str or None] Asset types for the provided sids. """ - found, missing = {}, set() + found = {} + missing = set() + for sid in sids: try: found[sid] = self._asset_type_cache[sid] @@ -157,18 +160,26 @@ class AssetFinder(object): return found router_cols = self.asset_router.c - query = sa.select((router_cols.sid, router_cols.asset_type)).where( - self.asset_router.c.sid.in_(map(int, missing)) - ) - for sid, type_ in query.execute().fetchall(): - missing.remove(sid) - found[sid] = self._asset_type_cache[sid] = type_ - for sid in missing: - found[sid] = self._asset_type_cache[sid] = None + for assets in self._group_into_chunks(missing): + query = sa.select((router_cols.sid, router_cols.asset_type)).where( + self.asset_router.c.sid.in_(map(int, assets)) + ) + for sid, type_ in query.execute().fetchall(): + missing.remove(sid) + found[sid] = self._asset_type_cache[sid] = type_ + + for sid in missing: + found[sid] = self._asset_type_cache[sid] = None return found + @staticmethod + def _group_into_chunks(items, chunk_size=SQLITE_MAX_VARIABLE_NUMBER): + items = list(items) + return [items[x:x+chunk_size] + for x in range(0, len(items), chunk_size)] + def group_by_type(self, sids): """ Group a list of sids by asset type. @@ -343,14 +354,16 @@ class AssetFinder(object): return {} cache = self._asset_cache - hits = {} - # Load misses from the db. - query = self._select_assets_by_sid(asset_tbl, sids) - for row in imap(dict, query.execute().fetchall()): - asset = asset_type(**_convert_asset_timestamp_fields(row)) - sid = asset.sid - hits[sid] = cache[sid] = asset + + for assets in self._group_into_chunks(sids): + # Load misses from the db. + query = self._select_assets_by_sid(asset_tbl, assets) + + for row in imap(dict, query.execute().fetchall()): + asset = asset_type(**_convert_asset_timestamp_fields(row)) + sid = asset.sid + hits[sid] = cache[sid] = asset # If we get here, it means something in our code thought that a # particular sid was an equity/future and called this function with a