Merge pull request #1282 from quantopian/fix-data-tests-discovery

TST: fix bundle test discovery
This commit is contained in:
Lotanna Ezenwa
2016-07-25 18:36:38 -04:00
committed by GitHub
11 changed files with 113 additions and 53 deletions
View File
+1 -1
View File
@@ -303,7 +303,7 @@ class BundleCoreTestCase(WithInstanceTmpDir, ZiplineTestCase):
output_dir):
_wrote_to.append(output_dir)
_wrote_to.clear()
_wrote_to[:] = []
self.ingest('bundle', environ=self.environ)
assert_equal(len(_wrote_to), 1, msg='ingest was called more than once')
ingestions = self._list_bundle()
+9 -4
View File
@@ -1,3 +1,5 @@
from __future__ import division
import numpy as np
import pandas as pd
from six.moves.urllib.parse import urlparse, parse_qs
@@ -5,7 +7,7 @@ from toolz import flip, identity
from toolz.curried import merge_with, operator as op
from zipline.data.bundles.core import _make_bundle_core
from zipline.data.bundles import yahoo_equities, load
from zipline.data.bundles import yahoo_equities
from zipline.lib.adjustment import Float64Multiply
from zipline.testing import test_resource_path, tmp_dir, read_compressed
from zipline.testing.fixtures import WithResponses, ZiplineTestCase
@@ -30,7 +32,9 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase):
(cls.bundles,
cls.register,
cls.unregister,
cls.ingest) = map(staticmethod, _make_bundle_core())
cls.ingest,
cls.load,
cls.clean) = map(staticmethod, _make_bundle_core())
def _expected_data(self):
sids = 0, 1, 2
@@ -165,8 +169,8 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase):
'ZIPLINE_ROOT': zipline_root,
}
self.ingest('bundle', environ=environ)
bundle = load('bundle', environ=environ)
self.ingest('bundle', environ=environ, show_progress=False)
bundle = self.load('bundle', environ=environ)
sids = 0, 1, 2
equities = bundle.asset_finder.retrieve_all(sids)
@@ -199,4 +203,5 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase):
adjustments,
expected,
msg=column,
decimal=4,
)
+18
View File
@@ -12,6 +12,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
if os.name == 'nt':
# we need to be able to write to our temp directoy on windows so we
# create a subdir in %TMP% that has write access and use that as %TMP%
def _():
import atexit
import tempfile
tempfile.tempdir = tempdir = tempfile.mkdtemp()
@atexit.register
def cleanup_tempdir():
import shutil
shutil.rmtree(tempdir)
_()
del _
# This is *not* a place to dump arbitrary classes/modules for convenience,
-3
View File
@@ -346,7 +346,6 @@ class AssetDBWriter(object):
--------
zipline.assets.asset_finder
"""
with self.engine.begin() as txn:
# Create SQL tables if they do not exist.
metadata = self.init_db(txn)
@@ -358,7 +357,6 @@ class AssetDBWriter(object):
exchanges if exchanges is not None else pd.DataFrame(),
root_symbols if root_symbols is not None else pd.DataFrame(),
)
# Write the data to SQL.
self._write_df_to_table(
metadata.tables['futures_exchanges'],
@@ -468,7 +466,6 @@ class AssetDBWriter(object):
check_version_info(version_info, ASSET_DB_VERSION)
else:
write_version_info(version_info, ASSET_DB_VERSION)
return metadata
def _normalize_equities(self, equities):
+1 -1
View File
@@ -212,7 +212,7 @@ cpdef _read_bcolz_data(ctable_t table,
carray[first_row:last_row + 1]
else:
continue
if column_name in {'open', 'high', 'low', 'close'}:
where_nan = (outbuf == 0)
outbuf_as_float = outbuf.astype(float64) * .001
+49 -26
View File
@@ -22,7 +22,6 @@ from ..minute_bars import (
from zipline.assets import AssetDBWriter, AssetFinder, ASSET_DB_VERSION
from zipline.utils.cache import (
dataframe_cache,
working_file,
working_dir,
)
from zipline.utils.compat import mappingproxy
@@ -38,39 +37,59 @@ open_and_closes = nyse_cal.schedule
def asset_db_path(bundle_name, timestr, environ=None):
return pth.data_path(
[bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION],
asset_db_relative(bundle_name, timestr, environ),
environ=environ,
)
def minute_equity_path(bundle_name, timestr, environ=None):
return pth.data_path(
[bundle_name, timestr, 'minute_equities.bcolz'],
minute_equity_relative(bundle_name, timestr, environ),
environ=environ,
)
def daily_equity_path(bundle_name, timestr, environ=None):
return pth.data_path(
[bundle_name, timestr, 'daily_equities.bcolz'],
daily_equity_relative(bundle_name, timestr, environ),
environ=environ,
)
def adjustment_db_path(bundle_name, timestr, environ=None):
return pth.data_path(
[bundle_name, timestr, 'adjustments.sqlite'],
adjustment_db_relative(bundle_name, timestr, environ),
environ=environ,
)
def cache_path(bundle_name, environ=None):
return pth.data_path(
[bundle_name, '.cache'],
cache_relative(bundle_name, environ),
environ=environ,
)
def adjustment_db_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'adjustments.sqlite'
def cache_relative(bundle_name, timestr, environ=None):
return bundle_name, '.cache'
def daily_equity_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'daily_equities.bcolz'
def minute_equity_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'minute_equities.bcolz'
def asset_db_relative(bundle_name, timestr, environ=None):
return bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION
def to_bundle_ingest_dirname(ts):
"""Convert a pandas Timestamp into the name of the directory for the
ingestion.
@@ -321,16 +340,19 @@ def _make_bundle_core():
cachepath = cache_path(name, environ=environ)
pth.ensure_directory(pth.data_path([name, timestr], environ=environ))
pth.ensure_directory(cachepath)
with dataframe_cache(cachepath, clean_on_failure=False) as cache, \
ExitStack() as stack:
# we use `cleanup_on_failure=False` so that we don't purge the
# cache directory if the load fails in the middle
if bundle.create_writers:
daily_bars_path = stack.enter_context(working_dir(
daily_equity_path(name, timestr, environ=environ),
)).path
wd = stack.enter_context(working_dir(
pth.data_path([], environ=environ))
)
daily_bars_path = wd.ensure_dir(
*daily_equity_relative(
name, timestr, environ=environ,
)
)
daily_bar_writer = BcolzDailyBarWriter(
daily_bars_path,
nyse_cal,
@@ -341,35 +363,37 @@ def _make_bundle_core():
# when we create the SQLiteAdjustmentWriter below. The
# SQLiteAdjustmentWriter needs to open the daily ctables so
# that it can compute the adjustment ratios for the dividends.
daily_bar_writer.write(())
minute_bar_writer = BcolzMinuteBarWriter(
bundle.calendar[0],
stack.enter_context(working_dir(
minute_equity_path(name, timestr, environ=environ),
)).path,
wd.ensure_dir(*minute_equity_relative(
name, timestr, environ=environ)
),
bundle.opens,
bundle.closes,
minutes_per_day=bundle.minutes_per_day,
)
asset_db_writer = AssetDBWriter(
stack.enter_context(working_file(
asset_db_path(name, timestr, environ=environ),
)).path,
wd.getpath(*asset_db_relative(
name, timestr, environ=environ,
))
)
adjustment_db_writer = SQLiteAdjustmentWriter(
stack.enter_context(working_file(
adjustment_db_path(name, timestr, environ=environ),
)).path,
BcolzDailyBarReader(daily_bars_path),
bundle.calendar,
overwrite=True,
adjustment_db_writer = stack.enter_context(
SQLiteAdjustmentWriter(
wd.getpath(*adjustment_db_relative(
name, timestr, environ=environ)),
BcolzDailyBarReader(daily_bars_path),
bundle.calendar,
overwrite=True,
)
)
else:
daily_bar_writer = None
minute_bar_writer = None
asset_db_writer = None
adjustment_db_writer = None
bundle.ingest(
environ,
asset_db_writer,
@@ -539,5 +563,4 @@ def _make_bundle_core():
return BundleCore(bundles, register, unregister, ingest, load, clean)
bundles, register, unregister, ingest, load, clean = _make_bundle_core()
+4 -1
View File
@@ -258,7 +258,10 @@ def gen_symbol_data(api_key,
_update_splits(splits, asset_id, raw_data)
_update_dividends(dividends, asset_id, raw_data)
raw_data = raw_data.reindex(calendar, copy=False).fillna(0.0)
raw_data = raw_data.reindex(
calendar.tz_localize(None),
copy=False,
).fillna(0.0)
yield asset_id, raw_data
if should_sleep:
+1 -1
View File
@@ -119,7 +119,7 @@ def yahoo_equities(symbols, start=None, end=None):
yield sid, df
sid += 1
daily_bar_writer.write(_pricing_iter(), show_progress=True)
daily_bar_writer.write(_pricing_iter(), show_progress=show_progress)
symbol_map = pd.Series(metadata.symbol.index, metadata.symbol)
asset_db_writer.write(equities=metadata)
+16 -4
View File
@@ -15,7 +15,6 @@ from abc import ABCMeta, abstractmethod, abstractproperty
from errno import ENOENT
from functools import partial
from os import remove
from os.path import exists
import sqlite3
import warnings
@@ -211,8 +210,15 @@ class BcolzDailyBarWriter(object):
def __init__(self, filename, calendar, start_session, end_session):
self._filename = filename
assert calendar.is_session(start_session), "Start session is invalid!"
assert calendar.is_session(end_session), "End session is invalid!"
if start_session != end_session:
if not calendar.is_session(start_session):
raise ValueError(
"Start session %s is invalid!" % start_session
)
if not calendar.is_session(end_session):
raise ValueError(
"End session %s is invalid!" % end_session
)
self._start_session = start_session
self._end_session = end_session
@@ -875,7 +881,7 @@ class SQLiteAdjustmentWriter(object):
if isinstance(conn_or_path, sqlite3.Connection):
self.conn = conn_or_path
elif isinstance(conn_or_path, str):
if overwrite and exists(conn_or_path):
if overwrite:
try:
remove(conn_or_path)
except OSError as e:
@@ -1083,6 +1089,12 @@ class SQLiteAdjustmentWriter(object):
dividend_ratios = self.calc_dividend_ratios(dividends)
self.write_frame('dividends', dividend_ratios)
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
def write(self,
splits=None,
mergers=None,
+14 -12
View File
@@ -5,7 +5,8 @@ from collections import namedtuple, MutableMapping
import errno
import os
import pickle
from shutil import rmtree, copyfile, copytree
from distutils import dir_util
from shutil import rmtree, move
from tempfile import mkdtemp, NamedTemporaryFile
import pandas as pd
@@ -272,11 +273,11 @@ class working_file(object):
Notes
-----
The file is moved on __exit__ if there are no exceptions.
``working_file`` uses :func:`shutil.copyfile` to move the actual files,
meaning it has as strong of guarantees as :func:`shutil.copyfile`.
``working_file`` uses :func:`shutil.move` to move the actual files,
meaning it has as strong of guarantees as :func:`shutil.move`.
"""
def __init__(self, final_path, *args, **kwargs):
self._tmpfile = NamedTemporaryFile(*args, **kwargs)
self._tmpfile = NamedTemporaryFile(delete=False, *args, **kwargs)
self._final_path = final_path
@property
@@ -289,7 +290,8 @@ class working_file(object):
def _commit(self):
"""Sync the temporary file to the final path.
"""
copyfile(self.name, self._final_path)
self._tmpfile.close()
move(self._name, self._final_path)
def __getattr__(self, attr):
return getattr(self._tmpfile, attr)
@@ -299,9 +301,9 @@ class working_file(object):
return self
def __exit__(self, *exc_info):
self._tmpfile.__exit__(*exc_info)
if exc_info[0] is None:
self._commit()
self._tmpfile.__exit__(*exc_info)
class working_dir(object):
@@ -318,15 +320,15 @@ class working_dir(object):
Notes
-----
The file is moved on __exit__ if there are no exceptions.
``working_dir`` uses :func:`shutil.copytree` to move the actual files,
meaning it has as strong of guarantees as :func:`shutil.copytree`.
``working_dir`` uses :func:`dir_util.copy_tree` to move the actual files,
meaning it has as strong of guarantees as :func:`dir_util.copy_tree`.
"""
def __init__(self, final_path, *args, **kwargs):
self.path = mkdtemp()
self._final_path = final_path
def mkdir(self, *path_parts):
"""Create a subdirectory of the working directory.
def ensure_dir(self, *path_parts):
"""Ensures a subdirectory of the working directory.
Parameters
----------
@@ -334,7 +336,7 @@ class working_dir(object):
The parts of the path after the working directory.
"""
path = self.getpath(*path_parts)
os.mkdir(path)
ensure_directory(path)
return path
def getpath(self, *path_parts):
@@ -350,7 +352,7 @@ class working_dir(object):
def _commit(self):
"""Sync the temporary directory to the final path.
"""
copytree(self.path, self._final_path)
dir_util.copy_tree(self.path, self._final_path)
def __enter__(self):
return self