diff --git a/tests/data/bundles/__init__.py b/tests/data/bundles/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/bundles/test_core.py b/tests/data/bundles/test_core.py index 1a0b2484..f79b3ef8 100644 --- a/tests/data/bundles/test_core.py +++ b/tests/data/bundles/test_core.py @@ -303,7 +303,7 @@ class BundleCoreTestCase(WithInstanceTmpDir, ZiplineTestCase): output_dir): _wrote_to.append(output_dir) - _wrote_to.clear() + _wrote_to[:] = [] self.ingest('bundle', environ=self.environ) assert_equal(len(_wrote_to), 1, msg='ingest was called more than once') ingestions = self._list_bundle() diff --git a/tests/data/bundles/test_yahoo.py b/tests/data/bundles/test_yahoo.py index 8c41d9d3..deae36dc 100644 --- a/tests/data/bundles/test_yahoo.py +++ b/tests/data/bundles/test_yahoo.py @@ -1,3 +1,5 @@ +from __future__ import division + import numpy as np import pandas as pd from six.moves.urllib.parse import urlparse, parse_qs @@ -5,7 +7,7 @@ from toolz import flip, identity from toolz.curried import merge_with, operator as op from zipline.data.bundles.core import _make_bundle_core -from zipline.data.bundles import yahoo_equities, load +from zipline.data.bundles import yahoo_equities from zipline.lib.adjustment import Float64Multiply from zipline.testing import test_resource_path, tmp_dir, read_compressed from zipline.testing.fixtures import WithResponses, ZiplineTestCase @@ -30,7 +32,9 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): (cls.bundles, cls.register, cls.unregister, - cls.ingest) = map(staticmethod, _make_bundle_core()) + cls.ingest, + cls.load, + cls.clean) = map(staticmethod, _make_bundle_core()) def _expected_data(self): sids = 0, 1, 2 @@ -165,8 +169,8 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): 'ZIPLINE_ROOT': zipline_root, } - self.ingest('bundle', environ=environ) - bundle = load('bundle', environ=environ) + self.ingest('bundle', environ=environ, show_progress=False) + bundle = self.load('bundle', environ=environ) sids = 0, 1, 2 equities = bundle.asset_finder.retrieve_all(sids) @@ -199,4 +203,5 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): adjustments, expected, msg=column, + decimal=4, ) diff --git a/zipline/__init__.py b/zipline/__init__.py index 5dfe287b..a314ec72 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -12,6 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os + + +if os.name == 'nt': + # we need to be able to write to our temp directoy on windows so we + # create a subdir in %TMP% that has write access and use that as %TMP% + def _(): + import atexit + import tempfile + + tempfile.tempdir = tempdir = tempfile.mkdtemp() + + @atexit.register + def cleanup_tempdir(): + import shutil + shutil.rmtree(tempdir) + _() + del _ # This is *not* a place to dump arbitrary classes/modules for convenience, diff --git a/zipline/assets/asset_writer.py b/zipline/assets/asset_writer.py index dfc8566b..d11df1c0 100644 --- a/zipline/assets/asset_writer.py +++ b/zipline/assets/asset_writer.py @@ -346,7 +346,6 @@ class AssetDBWriter(object): -------- zipline.assets.asset_finder """ - with self.engine.begin() as txn: # Create SQL tables if they do not exist. metadata = self.init_db(txn) @@ -358,7 +357,6 @@ class AssetDBWriter(object): exchanges if exchanges is not None else pd.DataFrame(), root_symbols if root_symbols is not None else pd.DataFrame(), ) - # Write the data to SQL. self._write_df_to_table( metadata.tables['futures_exchanges'], @@ -468,7 +466,6 @@ class AssetDBWriter(object): check_version_info(version_info, ASSET_DB_VERSION) else: write_version_info(version_info, ASSET_DB_VERSION) - return metadata def _normalize_equities(self, equities): diff --git a/zipline/data/_equities.pyx b/zipline/data/_equities.pyx index caa8ae69..5b80aada 100644 --- a/zipline/data/_equities.pyx +++ b/zipline/data/_equities.pyx @@ -212,7 +212,7 @@ cpdef _read_bcolz_data(ctable_t table, carray[first_row:last_row + 1] else: continue - + if column_name in {'open', 'high', 'low', 'close'}: where_nan = (outbuf == 0) outbuf_as_float = outbuf.astype(float64) * .001 diff --git a/zipline/data/bundles/core.py b/zipline/data/bundles/core.py index 873d9ca3..89064577 100644 --- a/zipline/data/bundles/core.py +++ b/zipline/data/bundles/core.py @@ -22,7 +22,6 @@ from ..minute_bars import ( from zipline.assets import AssetDBWriter, AssetFinder, ASSET_DB_VERSION from zipline.utils.cache import ( dataframe_cache, - working_file, working_dir, ) from zipline.utils.compat import mappingproxy @@ -38,39 +37,59 @@ open_and_closes = nyse_cal.schedule def asset_db_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION], + asset_db_relative(bundle_name, timestr, environ), environ=environ, ) def minute_equity_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'minute_equities.bcolz'], + minute_equity_relative(bundle_name, timestr, environ), environ=environ, ) def daily_equity_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'daily_equities.bcolz'], + daily_equity_relative(bundle_name, timestr, environ), environ=environ, ) def adjustment_db_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'adjustments.sqlite'], + adjustment_db_relative(bundle_name, timestr, environ), environ=environ, ) def cache_path(bundle_name, environ=None): return pth.data_path( - [bundle_name, '.cache'], + cache_relative(bundle_name, environ), environ=environ, ) +def adjustment_db_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'adjustments.sqlite' + + +def cache_relative(bundle_name, timestr, environ=None): + return bundle_name, '.cache' + + +def daily_equity_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'daily_equities.bcolz' + + +def minute_equity_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'minute_equities.bcolz' + + +def asset_db_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION + + def to_bundle_ingest_dirname(ts): """Convert a pandas Timestamp into the name of the directory for the ingestion. @@ -321,16 +340,19 @@ def _make_bundle_core(): cachepath = cache_path(name, environ=environ) pth.ensure_directory(pth.data_path([name, timestr], environ=environ)) pth.ensure_directory(cachepath) - with dataframe_cache(cachepath, clean_on_failure=False) as cache, \ ExitStack() as stack: # we use `cleanup_on_failure=False` so that we don't purge the # cache directory if the load fails in the middle - if bundle.create_writers: - daily_bars_path = stack.enter_context(working_dir( - daily_equity_path(name, timestr, environ=environ), - )).path + wd = stack.enter_context(working_dir( + pth.data_path([], environ=environ)) + ) + daily_bars_path = wd.ensure_dir( + *daily_equity_relative( + name, timestr, environ=environ, + ) + ) daily_bar_writer = BcolzDailyBarWriter( daily_bars_path, nyse_cal, @@ -341,35 +363,37 @@ def _make_bundle_core(): # when we create the SQLiteAdjustmentWriter below. The # SQLiteAdjustmentWriter needs to open the daily ctables so # that it can compute the adjustment ratios for the dividends. + daily_bar_writer.write(()) minute_bar_writer = BcolzMinuteBarWriter( bundle.calendar[0], - stack.enter_context(working_dir( - minute_equity_path(name, timestr, environ=environ), - )).path, + wd.ensure_dir(*minute_equity_relative( + name, timestr, environ=environ) + ), bundle.opens, bundle.closes, minutes_per_day=bundle.minutes_per_day, ) asset_db_writer = AssetDBWriter( - stack.enter_context(working_file( - asset_db_path(name, timestr, environ=environ), - )).path, + wd.getpath(*asset_db_relative( + name, timestr, environ=environ, + )) ) - adjustment_db_writer = SQLiteAdjustmentWriter( - stack.enter_context(working_file( - adjustment_db_path(name, timestr, environ=environ), - )).path, - BcolzDailyBarReader(daily_bars_path), - bundle.calendar, - overwrite=True, + + adjustment_db_writer = stack.enter_context( + SQLiteAdjustmentWriter( + wd.getpath(*adjustment_db_relative( + name, timestr, environ=environ)), + BcolzDailyBarReader(daily_bars_path), + bundle.calendar, + overwrite=True, + ) ) else: daily_bar_writer = None minute_bar_writer = None asset_db_writer = None adjustment_db_writer = None - bundle.ingest( environ, asset_db_writer, @@ -539,5 +563,4 @@ def _make_bundle_core(): return BundleCore(bundles, register, unregister, ingest, load, clean) - bundles, register, unregister, ingest, load, clean = _make_bundle_core() diff --git a/zipline/data/bundles/quandl.py b/zipline/data/bundles/quandl.py index dcb08d88..ba0526f5 100644 --- a/zipline/data/bundles/quandl.py +++ b/zipline/data/bundles/quandl.py @@ -258,7 +258,10 @@ def gen_symbol_data(api_key, _update_splits(splits, asset_id, raw_data) _update_dividends(dividends, asset_id, raw_data) - raw_data = raw_data.reindex(calendar, copy=False).fillna(0.0) + raw_data = raw_data.reindex( + calendar.tz_localize(None), + copy=False, + ).fillna(0.0) yield asset_id, raw_data if should_sleep: diff --git a/zipline/data/bundles/yahoo.py b/zipline/data/bundles/yahoo.py index a0d13fa6..8a500253 100644 --- a/zipline/data/bundles/yahoo.py +++ b/zipline/data/bundles/yahoo.py @@ -119,7 +119,7 @@ def yahoo_equities(symbols, start=None, end=None): yield sid, df sid += 1 - daily_bar_writer.write(_pricing_iter(), show_progress=True) + daily_bar_writer.write(_pricing_iter(), show_progress=show_progress) symbol_map = pd.Series(metadata.symbol.index, metadata.symbol) asset_db_writer.write(equities=metadata) diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index 1cc76bc3..a6d92504 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -15,7 +15,6 @@ from abc import ABCMeta, abstractmethod, abstractproperty from errno import ENOENT from functools import partial from os import remove -from os.path import exists import sqlite3 import warnings @@ -211,8 +210,15 @@ class BcolzDailyBarWriter(object): def __init__(self, filename, calendar, start_session, end_session): self._filename = filename - assert calendar.is_session(start_session), "Start session is invalid!" - assert calendar.is_session(end_session), "End session is invalid!" + if start_session != end_session: + if not calendar.is_session(start_session): + raise ValueError( + "Start session %s is invalid!" % start_session + ) + if not calendar.is_session(end_session): + raise ValueError( + "End session %s is invalid!" % end_session + ) self._start_session = start_session self._end_session = end_session @@ -875,7 +881,7 @@ class SQLiteAdjustmentWriter(object): if isinstance(conn_or_path, sqlite3.Connection): self.conn = conn_or_path elif isinstance(conn_or_path, str): - if overwrite and exists(conn_or_path): + if overwrite: try: remove(conn_or_path) except OSError as e: @@ -1083,6 +1089,12 @@ class SQLiteAdjustmentWriter(object): dividend_ratios = self.calc_dividend_ratios(dividends) self.write_frame('dividends', dividend_ratios) + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + def write(self, splits=None, mergers=None, diff --git a/zipline/utils/cache.py b/zipline/utils/cache.py index d2a9b14e..21aee7d4 100644 --- a/zipline/utils/cache.py +++ b/zipline/utils/cache.py @@ -5,7 +5,8 @@ from collections import namedtuple, MutableMapping import errno import os import pickle -from shutil import rmtree, copyfile, copytree +from distutils import dir_util +from shutil import rmtree, move from tempfile import mkdtemp, NamedTemporaryFile import pandas as pd @@ -272,11 +273,11 @@ class working_file(object): Notes ----- The file is moved on __exit__ if there are no exceptions. - ``working_file`` uses :func:`shutil.copyfile` to move the actual files, - meaning it has as strong of guarantees as :func:`shutil.copyfile`. + ``working_file`` uses :func:`shutil.move` to move the actual files, + meaning it has as strong of guarantees as :func:`shutil.move`. """ def __init__(self, final_path, *args, **kwargs): - self._tmpfile = NamedTemporaryFile(*args, **kwargs) + self._tmpfile = NamedTemporaryFile(delete=False, *args, **kwargs) self._final_path = final_path @property @@ -289,7 +290,8 @@ class working_file(object): def _commit(self): """Sync the temporary file to the final path. """ - copyfile(self.name, self._final_path) + self._tmpfile.close() + move(self._name, self._final_path) def __getattr__(self, attr): return getattr(self._tmpfile, attr) @@ -299,9 +301,9 @@ class working_file(object): return self def __exit__(self, *exc_info): + self._tmpfile.__exit__(*exc_info) if exc_info[0] is None: self._commit() - self._tmpfile.__exit__(*exc_info) class working_dir(object): @@ -318,15 +320,15 @@ class working_dir(object): Notes ----- The file is moved on __exit__ if there are no exceptions. - ``working_dir`` uses :func:`shutil.copytree` to move the actual files, - meaning it has as strong of guarantees as :func:`shutil.copytree`. + ``working_dir`` uses :func:`dir_util.copy_tree` to move the actual files, + meaning it has as strong of guarantees as :func:`dir_util.copy_tree`. """ def __init__(self, final_path, *args, **kwargs): self.path = mkdtemp() self._final_path = final_path - def mkdir(self, *path_parts): - """Create a subdirectory of the working directory. + def ensure_dir(self, *path_parts): + """Ensures a subdirectory of the working directory. Parameters ---------- @@ -334,7 +336,7 @@ class working_dir(object): The parts of the path after the working directory. """ path = self.getpath(*path_parts) - os.mkdir(path) + ensure_directory(path) return path def getpath(self, *path_parts): @@ -350,7 +352,7 @@ class working_dir(object): def _commit(self): """Sync the temporary directory to the final path. """ - copytree(self.path, self._final_path) + dir_util.copy_tree(self.path, self._final_path) def __enter__(self): return self