From ef4eafbbb876c5739ed66b4db84237fbb7a7a072 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Wed, 15 Jun 2016 19:13:39 -0400 Subject: [PATCH 1/6] TST: fix bundle test discovery --- tests/data/bundles/__init__.py | 0 tests/data/bundles/test_yahoo.py | 8 +++++--- 2 files changed, 5 insertions(+), 3 deletions(-) create mode 100644 tests/data/bundles/__init__.py diff --git a/tests/data/bundles/__init__.py b/tests/data/bundles/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/data/bundles/test_yahoo.py b/tests/data/bundles/test_yahoo.py index 8c41d9d3..f2054e09 100644 --- a/tests/data/bundles/test_yahoo.py +++ b/tests/data/bundles/test_yahoo.py @@ -5,7 +5,7 @@ from toolz import flip, identity from toolz.curried import merge_with, operator as op from zipline.data.bundles.core import _make_bundle_core -from zipline.data.bundles import yahoo_equities, load +from zipline.data.bundles import yahoo_equities from zipline.lib.adjustment import Float64Multiply from zipline.testing import test_resource_path, tmp_dir, read_compressed from zipline.testing.fixtures import WithResponses, ZiplineTestCase @@ -30,7 +30,9 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): (cls.bundles, cls.register, cls.unregister, - cls.ingest) = map(staticmethod, _make_bundle_core()) + cls.ingest, + cls.load, + cls.clean) = map(staticmethod, _make_bundle_core()) def _expected_data(self): sids = 0, 1, 2 @@ -166,7 +168,7 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): } self.ingest('bundle', environ=environ) - bundle = load('bundle', environ=environ) + bundle = self.load('bundle', environ=environ) sids = 0, 1, 2 equities = bundle.asset_finder.retrieve_all(sids) From e8728c0cd41ccb09fb3c631e12e95d492f79c38d Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Wed, 15 Jun 2016 19:52:32 -0400 Subject: [PATCH 2/6] TST: fix data tests --- tests/data/bundles/test_core.py | 2 +- tests/data/bundles/test_yahoo.py | 3 +++ zipline/data/_equities.pyx | 2 +- zipline/data/bundles/core.py | 8 +++++--- zipline/data/bundles/quandl.py | 5 ++++- zipline/data/us_equity_pricing.py | 3 +-- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/data/bundles/test_core.py b/tests/data/bundles/test_core.py index 1a0b2484..f79b3ef8 100644 --- a/tests/data/bundles/test_core.py +++ b/tests/data/bundles/test_core.py @@ -303,7 +303,7 @@ class BundleCoreTestCase(WithInstanceTmpDir, ZiplineTestCase): output_dir): _wrote_to.append(output_dir) - _wrote_to.clear() + _wrote_to[:] = [] self.ingest('bundle', environ=self.environ) assert_equal(len(_wrote_to), 1, msg='ingest was called more than once') ingestions = self._list_bundle() diff --git a/tests/data/bundles/test_yahoo.py b/tests/data/bundles/test_yahoo.py index f2054e09..80937cd6 100644 --- a/tests/data/bundles/test_yahoo.py +++ b/tests/data/bundles/test_yahoo.py @@ -1,3 +1,5 @@ +from __future__ import division + import numpy as np import pandas as pd from six.moves.urllib.parse import urlparse, parse_qs @@ -201,4 +203,5 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): adjustments, expected, msg=column, + decimal=4, ) diff --git a/zipline/data/_equities.pyx b/zipline/data/_equities.pyx index caa8ae69..5b80aada 100644 --- a/zipline/data/_equities.pyx +++ b/zipline/data/_equities.pyx @@ -212,7 +212,7 @@ cpdef _read_bcolz_data(ctable_t table, carray[first_row:last_row + 1] else: continue - + if column_name in {'open', 'high', 'low', 'close'}: where_nan = (outbuf == 0) outbuf_as_float = outbuf.astype(float64) * .001 diff --git a/zipline/data/bundles/core.py b/zipline/data/bundles/core.py index 873d9ca3..a0765f54 100644 --- a/zipline/data/bundles/core.py +++ b/zipline/data/bundles/core.py @@ -356,10 +356,12 @@ def _make_bundle_core(): asset_db_path(name, timestr, environ=environ), )).path, ) + wf = stack.enter_context(working_file( + adjustment_db_path(name, timestr, environ=environ), + )) + wf.close() # we need to close the file to delete it on windows adjustment_db_writer = SQLiteAdjustmentWriter( - stack.enter_context(working_file( - adjustment_db_path(name, timestr, environ=environ), - )).path, + wf.path, BcolzDailyBarReader(daily_bars_path), bundle.calendar, overwrite=True, diff --git a/zipline/data/bundles/quandl.py b/zipline/data/bundles/quandl.py index dcb08d88..ba0526f5 100644 --- a/zipline/data/bundles/quandl.py +++ b/zipline/data/bundles/quandl.py @@ -258,7 +258,10 @@ def gen_symbol_data(api_key, _update_splits(splits, asset_id, raw_data) _update_dividends(dividends, asset_id, raw_data) - raw_data = raw_data.reindex(calendar, copy=False).fillna(0.0) + raw_data = raw_data.reindex( + calendar.tz_localize(None), + copy=False, + ).fillna(0.0) yield asset_id, raw_data if should_sleep: diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index 1cc76bc3..cd94512b 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -15,7 +15,6 @@ from abc import ABCMeta, abstractmethod, abstractproperty from errno import ENOENT from functools import partial from os import remove -from os.path import exists import sqlite3 import warnings @@ -875,7 +874,7 @@ class SQLiteAdjustmentWriter(object): if isinstance(conn_or_path, sqlite3.Connection): self.conn = conn_or_path elif isinstance(conn_or_path, str): - if overwrite and exists(conn_or_path): + if overwrite: try: remove(conn_or_path) except OSError as e: From 25ed414a66d5c6d71871c80148bbe6c5bbf49570 Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 20 Jun 2016 14:17:09 -0400 Subject: [PATCH 3/6] MAINT: use new tempdir for windows --- zipline/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/zipline/__init__.py b/zipline/__init__.py index 5dfe287b..a314ec72 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -12,6 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os + + +if os.name == 'nt': + # we need to be able to write to our temp directoy on windows so we + # create a subdir in %TMP% that has write access and use that as %TMP% + def _(): + import atexit + import tempfile + + tempfile.tempdir = tempdir = tempfile.mkdtemp() + + @atexit.register + def cleanup_tempdir(): + import shutil + shutil.rmtree(tempdir) + _() + del _ # This is *not* a place to dump arbitrary classes/modules for convenience, From ec0ecfc1b9cc96ffa62206b07fd9890186ab2cad Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 20 Jun 2016 14:49:51 -0400 Subject: [PATCH 4/6] TST: don't use showprogress in tests --- tests/data/bundles/test_yahoo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/bundles/test_yahoo.py b/tests/data/bundles/test_yahoo.py index 80937cd6..deae36dc 100644 --- a/tests/data/bundles/test_yahoo.py +++ b/tests/data/bundles/test_yahoo.py @@ -169,7 +169,7 @@ class YahooBundleTestCase(WithResponses, ZiplineTestCase): 'ZIPLINE_ROOT': zipline_root, } - self.ingest('bundle', environ=environ) + self.ingest('bundle', environ=environ, show_progress=False) bundle = self.load('bundle', environ=environ) sids = 0, 1, 2 From b2ee6c6f84bb92b7b7d11516fb9f5522ac2753ab Mon Sep 17 00:00:00 2001 From: Joe Jevnik Date: Mon, 20 Jun 2016 15:20:02 -0400 Subject: [PATCH 5/6] BUG: forward show_progress in yahoo --- zipline/data/bundles/yahoo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/data/bundles/yahoo.py b/zipline/data/bundles/yahoo.py index a0d13fa6..8a500253 100644 --- a/zipline/data/bundles/yahoo.py +++ b/zipline/data/bundles/yahoo.py @@ -119,7 +119,7 @@ def yahoo_equities(symbols, start=None, end=None): yield sid, df sid += 1 - daily_bar_writer.write(_pricing_iter(), show_progress=True) + daily_bar_writer.write(_pricing_iter(), show_progress=show_progress) symbol_map = pd.Series(metadata.symbol.index, metadata.symbol) asset_db_writer.write(equities=metadata) From f43941933f3f2cc2c537ce81a14d9f1913065404 Mon Sep 17 00:00:00 2001 From: LotannaEzenwa Date: Mon, 11 Jul 2016 14:32:53 -0400 Subject: [PATCH 6/6] ENH: Makes working_dir work on Windows. Updates bundle core --- zipline/assets/asset_writer.py | 3 -- zipline/data/bundles/core.py | 77 ++++++++++++++++++++----------- zipline/data/us_equity_pricing.py | 17 ++++++- zipline/utils/cache.py | 26 ++++++----- 4 files changed, 78 insertions(+), 45 deletions(-) diff --git a/zipline/assets/asset_writer.py b/zipline/assets/asset_writer.py index dfc8566b..d11df1c0 100644 --- a/zipline/assets/asset_writer.py +++ b/zipline/assets/asset_writer.py @@ -346,7 +346,6 @@ class AssetDBWriter(object): -------- zipline.assets.asset_finder """ - with self.engine.begin() as txn: # Create SQL tables if they do not exist. metadata = self.init_db(txn) @@ -358,7 +357,6 @@ class AssetDBWriter(object): exchanges if exchanges is not None else pd.DataFrame(), root_symbols if root_symbols is not None else pd.DataFrame(), ) - # Write the data to SQL. self._write_df_to_table( metadata.tables['futures_exchanges'], @@ -468,7 +466,6 @@ class AssetDBWriter(object): check_version_info(version_info, ASSET_DB_VERSION) else: write_version_info(version_info, ASSET_DB_VERSION) - return metadata def _normalize_equities(self, equities): diff --git a/zipline/data/bundles/core.py b/zipline/data/bundles/core.py index a0765f54..89064577 100644 --- a/zipline/data/bundles/core.py +++ b/zipline/data/bundles/core.py @@ -22,7 +22,6 @@ from ..minute_bars import ( from zipline.assets import AssetDBWriter, AssetFinder, ASSET_DB_VERSION from zipline.utils.cache import ( dataframe_cache, - working_file, working_dir, ) from zipline.utils.compat import mappingproxy @@ -38,39 +37,59 @@ open_and_closes = nyse_cal.schedule def asset_db_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION], + asset_db_relative(bundle_name, timestr, environ), environ=environ, ) def minute_equity_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'minute_equities.bcolz'], + minute_equity_relative(bundle_name, timestr, environ), environ=environ, ) def daily_equity_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'daily_equities.bcolz'], + daily_equity_relative(bundle_name, timestr, environ), environ=environ, ) def adjustment_db_path(bundle_name, timestr, environ=None): return pth.data_path( - [bundle_name, timestr, 'adjustments.sqlite'], + adjustment_db_relative(bundle_name, timestr, environ), environ=environ, ) def cache_path(bundle_name, environ=None): return pth.data_path( - [bundle_name, '.cache'], + cache_relative(bundle_name, environ), environ=environ, ) +def adjustment_db_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'adjustments.sqlite' + + +def cache_relative(bundle_name, timestr, environ=None): + return bundle_name, '.cache' + + +def daily_equity_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'daily_equities.bcolz' + + +def minute_equity_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'minute_equities.bcolz' + + +def asset_db_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'assets-%d.sqlite' % ASSET_DB_VERSION + + def to_bundle_ingest_dirname(ts): """Convert a pandas Timestamp into the name of the directory for the ingestion. @@ -321,16 +340,19 @@ def _make_bundle_core(): cachepath = cache_path(name, environ=environ) pth.ensure_directory(pth.data_path([name, timestr], environ=environ)) pth.ensure_directory(cachepath) - with dataframe_cache(cachepath, clean_on_failure=False) as cache, \ ExitStack() as stack: # we use `cleanup_on_failure=False` so that we don't purge the # cache directory if the load fails in the middle - if bundle.create_writers: - daily_bars_path = stack.enter_context(working_dir( - daily_equity_path(name, timestr, environ=environ), - )).path + wd = stack.enter_context(working_dir( + pth.data_path([], environ=environ)) + ) + daily_bars_path = wd.ensure_dir( + *daily_equity_relative( + name, timestr, environ=environ, + ) + ) daily_bar_writer = BcolzDailyBarWriter( daily_bars_path, nyse_cal, @@ -341,37 +363,37 @@ def _make_bundle_core(): # when we create the SQLiteAdjustmentWriter below. The # SQLiteAdjustmentWriter needs to open the daily ctables so # that it can compute the adjustment ratios for the dividends. + daily_bar_writer.write(()) minute_bar_writer = BcolzMinuteBarWriter( bundle.calendar[0], - stack.enter_context(working_dir( - minute_equity_path(name, timestr, environ=environ), - )).path, + wd.ensure_dir(*minute_equity_relative( + name, timestr, environ=environ) + ), bundle.opens, bundle.closes, minutes_per_day=bundle.minutes_per_day, ) asset_db_writer = AssetDBWriter( - stack.enter_context(working_file( - asset_db_path(name, timestr, environ=environ), - )).path, + wd.getpath(*asset_db_relative( + name, timestr, environ=environ, + )) ) - wf = stack.enter_context(working_file( - adjustment_db_path(name, timestr, environ=environ), - )) - wf.close() # we need to close the file to delete it on windows - adjustment_db_writer = SQLiteAdjustmentWriter( - wf.path, - BcolzDailyBarReader(daily_bars_path), - bundle.calendar, - overwrite=True, + + adjustment_db_writer = stack.enter_context( + SQLiteAdjustmentWriter( + wd.getpath(*adjustment_db_relative( + name, timestr, environ=environ)), + BcolzDailyBarReader(daily_bars_path), + bundle.calendar, + overwrite=True, + ) ) else: daily_bar_writer = None minute_bar_writer = None asset_db_writer = None adjustment_db_writer = None - bundle.ingest( environ, asset_db_writer, @@ -541,5 +563,4 @@ def _make_bundle_core(): return BundleCore(bundles, register, unregister, ingest, load, clean) - bundles, register, unregister, ingest, load, clean = _make_bundle_core() diff --git a/zipline/data/us_equity_pricing.py b/zipline/data/us_equity_pricing.py index cd94512b..a6d92504 100644 --- a/zipline/data/us_equity_pricing.py +++ b/zipline/data/us_equity_pricing.py @@ -210,8 +210,15 @@ class BcolzDailyBarWriter(object): def __init__(self, filename, calendar, start_session, end_session): self._filename = filename - assert calendar.is_session(start_session), "Start session is invalid!" - assert calendar.is_session(end_session), "End session is invalid!" + if start_session != end_session: + if not calendar.is_session(start_session): + raise ValueError( + "Start session %s is invalid!" % start_session + ) + if not calendar.is_session(end_session): + raise ValueError( + "End session %s is invalid!" % end_session + ) self._start_session = start_session self._end_session = end_session @@ -1082,6 +1089,12 @@ class SQLiteAdjustmentWriter(object): dividend_ratios = self.calc_dividend_ratios(dividends) self.write_frame('dividends', dividend_ratios) + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + def write(self, splits=None, mergers=None, diff --git a/zipline/utils/cache.py b/zipline/utils/cache.py index d2a9b14e..21aee7d4 100644 --- a/zipline/utils/cache.py +++ b/zipline/utils/cache.py @@ -5,7 +5,8 @@ from collections import namedtuple, MutableMapping import errno import os import pickle -from shutil import rmtree, copyfile, copytree +from distutils import dir_util +from shutil import rmtree, move from tempfile import mkdtemp, NamedTemporaryFile import pandas as pd @@ -272,11 +273,11 @@ class working_file(object): Notes ----- The file is moved on __exit__ if there are no exceptions. - ``working_file`` uses :func:`shutil.copyfile` to move the actual files, - meaning it has as strong of guarantees as :func:`shutil.copyfile`. + ``working_file`` uses :func:`shutil.move` to move the actual files, + meaning it has as strong of guarantees as :func:`shutil.move`. """ def __init__(self, final_path, *args, **kwargs): - self._tmpfile = NamedTemporaryFile(*args, **kwargs) + self._tmpfile = NamedTemporaryFile(delete=False, *args, **kwargs) self._final_path = final_path @property @@ -289,7 +290,8 @@ class working_file(object): def _commit(self): """Sync the temporary file to the final path. """ - copyfile(self.name, self._final_path) + self._tmpfile.close() + move(self._name, self._final_path) def __getattr__(self, attr): return getattr(self._tmpfile, attr) @@ -299,9 +301,9 @@ class working_file(object): return self def __exit__(self, *exc_info): + self._tmpfile.__exit__(*exc_info) if exc_info[0] is None: self._commit() - self._tmpfile.__exit__(*exc_info) class working_dir(object): @@ -318,15 +320,15 @@ class working_dir(object): Notes ----- The file is moved on __exit__ if there are no exceptions. - ``working_dir`` uses :func:`shutil.copytree` to move the actual files, - meaning it has as strong of guarantees as :func:`shutil.copytree`. + ``working_dir`` uses :func:`dir_util.copy_tree` to move the actual files, + meaning it has as strong of guarantees as :func:`dir_util.copy_tree`. """ def __init__(self, final_path, *args, **kwargs): self.path = mkdtemp() self._final_path = final_path - def mkdir(self, *path_parts): - """Create a subdirectory of the working directory. + def ensure_dir(self, *path_parts): + """Ensures a subdirectory of the working directory. Parameters ---------- @@ -334,7 +336,7 @@ class working_dir(object): The parts of the path after the working directory. """ path = self.getpath(*path_parts) - os.mkdir(path) + ensure_directory(path) return path def getpath(self, *path_parts): @@ -350,7 +352,7 @@ class working_dir(object): def _commit(self): """Sync the temporary directory to the final path. """ - copytree(self.path, self._final_path) + dir_util.copy_tree(self.path, self._final_path) def __enter__(self): return self