ENH: Adds quantopian-quandl bundle as new default.

This data bundle will use the quantopian mirror of the quandl WIKI data
instead of downloading from quandl directly. This dramatically improves
the speed because we do not pay the rate limiting for quandl and we can
send the data in the format zipline expects.
This commit is contained in:
Joe Jevnik
2016-05-03 23:35:49 -04:00
parent 8756bf2c91
commit 89542e33bd
11 changed files with 622 additions and 261 deletions
+3 -3
View File
@@ -127,7 +127,7 @@ on OSX):
--capital-base FLOAT The starting capital for the simulation.
[default: 10000000.0]
-b, --bundle BUNDLE-NAME The data bundle to use for the simulation.
[default: quandl]
[default: quantopian-quandl]
--bundle-timestamp TIMESTAMP The date to lookup data on or before.
[default: <current-time>]
-s, --start DATE The start date of the simulation.
@@ -140,8 +140,8 @@ on OSX):
As you can see there are a couple of flags that specify where to find your
algorithm (``-f``) as well as parameters specifying which data to use,
defaulting to the :ref:`quandl-data-bundle`. There are also arguments for the
date range to run the algorithm over (``--start`` and ``--end``). Finally,
defaulting to the :ref:`quantopian-quandl-mirror`. There are also arguments for
the date range to run the algorithm over (``--start`` and ``--end``). Finally,
you'll want to save the performance metrics of your algorithm so that you can
analyze how it performed. This is done via the ``--output`` flag and will cause
it to write the performance ``DataFrame`` in the pickle Python file format.
+30 -10
View File
@@ -16,10 +16,11 @@ could involve downloading and processing a lot of data. This can be run with:
.. code-block:: bash
$ python -m zipline ingest <bundle>
$ python -m zipline ingest [-b <bundle>]
where ``<bundle>`` is the name of the bundle to ingest.
where ``<bundle>`` is the name of the bundle to ingest, defaulting to
:ref:`quantipian-quandl <quantopian-quandl-mirror>`.
Old Data
~~~~~~~~
@@ -39,16 +40,16 @@ For example:
.. code-block:: bash
# clean everything older than <date>
$ python -m zipline clean <bundle> --before <date>
$ python -m zipline clean [-b <bundle>] --before <date>
# clean everything newer than <date>
$ python -m zipline clean <bundle> --after <date>
$ python -m zipline clean [-b <bundle>] --after <date>
# keep everything in the range of [before, after] and delete the rest
$ python -m zipline clean <bundle> --before <date> --after <after>
$ python -m zipline clean [-b <bundle>] --before <date> --after <after>
# clean all but the last <int> runs
$ python -m zipline clean <bundle> --keep-last <int>
$ python -m zipline clean [-b <bundle>] --keep-last <int>
Running Backtests with Data Bundles
@@ -77,11 +78,9 @@ Default Data Bundles
Quandl WIKI Bundle
``````````````````
By default zipline comes with the ``quandl`` data bundle which uses quandl's
`WIKI dataset <https://www.quandl.com/data/WIKI>`_. The quandl data bundle
includes daily pricing data, splits, cash dividends, and asset metadata. This is
the bundle that ``run`` will use by default if no other bundle is specified. To
includes daily pricing data, splits, cash dividends, and asset metadata. To
ingest this data bundle we recommend creating an account on quandl.com to get an
API key to be able to make more API requests per day. Once we have an API key we
may run:
@@ -104,6 +103,15 @@ attempt 5 times.
equity.
.. _quantopian-quandl-mirror:
Quantopian Quandl WIKI Mirror
'''''''''''''''''''''''''''''
Quantopian provides a mirror of the quandl WIKI dataset with the data in the
formats that zipline expects. This is available under the name:
``quantopian-quandl`` and is the default bundle for zipline.
Yahoo Bundle Factories
``````````````````````
@@ -166,7 +174,8 @@ The signature of the ingest function should be:
adjustment_writer,
calendar,
cache,
show_progress)
show_progress,
output_dir)
``environ``
```````````
@@ -275,3 +284,14 @@ the total needed, or how far into some data conversion the ingest function
is. One tool that may help with implementing ``show_progress`` for a loop is
:class:`~zipline.utils.cli.maybe_show_progress`. This argument should always be
forwarded to ``minute_bar_writer.write`` and ``daily_bar_writer.write``.
``output_dir``
``````````````
``output_dir`` is a string representing the file path where all the data will be
written. This will be some subdirectory of ``$ZIPLINE_ROOT`` and will contain
the time of the start of the current ingestion. This can be used to directly
move resources here if for some reason your ingest function can produce it's own
outputs without the writers. For example, the ``quantopian:quandl`` bundle uses
this to directly untar the bundle into the ``output_dir``.
+246 -13
View File
@@ -1,9 +1,12 @@
import os
from nose_parameterized import parameterized
import pandas as pd
from toolz import valmap
import toolz.curried.operator as op
from zipline.assets.synthetic import make_simple_equity_info
from zipline.data.bundles import load
from zipline.data.bundles import UnknownBundle
from zipline.data.bundles.core import _make_bundle_core
from zipline.lib.adjustment import Float64Multiply
from zipline.pipeline.loaders.synthetic import (
@@ -12,30 +15,39 @@ from zipline.pipeline.loaders.synthetic import (
)
from zipline.testing import (
subtest,
tmp_dir,
str_to_seconds,
tmp_trading_env,
)
from zipline.testing.fixtures import ZiplineTestCase
from zipline.testing.fixtures import WithInstanceTmpDir, ZiplineTestCase
from zipline.testing.predicates import (
assert_equal,
assert_false,
assert_in,
assert_is,
assert_is_instance,
assert_is_none,
assert_raises,
assert_true,
)
from zipline.utils.cache import dataframe_cache
from zipline.utils.functional import apply
from zipline.utils.tradingcalendar import trading_days
import zipline.utils.paths as pth
class BundleCoreTestCase(ZiplineTestCase):
_1_ns = pd.Timedelta(1, unit='ns')
class BundleCoreTestCase(WithInstanceTmpDir, ZiplineTestCase):
def init_instance_fixtures(self):
super(BundleCoreTestCase, self).init_instance_fixtures()
(self.bundles,
self.register,
self.unregister,
self.ingest) = _make_bundle_core()
self.ingest,
self.load,
self.clean) = _make_bundle_core()
self.environ = {'ZIPLINE_ROOT': self.instance_tmpdir.path}
def test_register_decorator(self):
@apply
@@ -75,17 +87,35 @@ class BundleCoreTestCase(ZiplineTestCase):
assert_false(self.bundles)
def test_register_no_create(self):
called = [False]
@self.register('bundle', create_writers=False)
def bundle_ingest(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
cache,
show_progress,
output_dir):
assert_is_none(asset_db_writer)
assert_is_none(minute_bar_writer)
assert_is_none(daily_bar_writer)
assert_is_none(adjustment_writer)
called[0] = True
self.ingest('bundle', self.environ)
assert_true(called[0])
def test_ingest(self):
zipline_root = self.enter_instance_context(tmp_dir()).path
env = self.enter_instance_context(tmp_trading_env())
start = pd.Timestamp('2014-01-06', tz='utc')
end = pd.Timestamp('2014-01-10', tz='utc')
calendar = trading_days[trading_days.slice_indexer(start, end)]
minutes = env.minutes_for_days_in_range(calendar[0], calendar[-1])
outer_environ = {
'ZIPLINE_ROOT': zipline_root,
}
sids = tuple(range(3))
equities = make_simple_equity_info(
@@ -122,8 +152,9 @@ class BundleCoreTestCase(ZiplineTestCase):
adjustment_writer,
calendar,
cache,
show_progress):
assert_is(environ, outer_environ)
show_progress,
output_dir):
assert_is(environ, self.environ)
asset_db_writer.write(equities=equities)
minute_bar_writer.write(minute_bar_data)
@@ -134,8 +165,8 @@ class BundleCoreTestCase(ZiplineTestCase):
assert_is_instance(cache, dataframe_cache)
assert_is_instance(show_progress, bool)
self.ingest('bundle', environ=outer_environ)
bundle = load('bundle', environ=outer_environ)
self.ingest('bundle', environ=self.environ)
bundle = self.load('bundle', environ=self.environ)
assert_equal(set(bundle.asset_finder.sids), set(sids))
@@ -216,3 +247,205 @@ class BundleCoreTestCase(ZiplineTestCase):
},
msg='volume',
)
@parameterized.expand([('clean',), ('load',)])
def test_bundle_doesnt_exist(self, fnname):
with assert_raises(UnknownBundle) as e:
getattr(self, fnname)('ayy', environ=self.environ)
assert_equal(e.exception.name, 'ayy')
def test_load_no_data(self):
# register but do not ingest data
self.register('bundle', lambda *args: None)
ts = pd.Timestamp('2014')
with assert_raises(ValueError) as e:
self.load('bundle', timestamp=ts, environ=self.environ)
assert_in(
"no data for bundle 'bundle' on or before %s" % ts,
str(e.exception),
)
def _list_bundle(self):
return {
os.path.join(pth.data_path(['bundle', d], environ=self.environ))
for d in os.listdir(
pth.data_path(['bundle'], environ=self.environ),
)
}
def _empty_ingest(self, _wrote_to=[]):
"""Run the nth empty ingest.
Returns
-------
wrote_to : str
The timestr of the bundle written.
"""
if not self.bundles:
@self.register('bundle',
calendar=pd.DatetimeIndex([pd.Timestamp('2014')]))
def _(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
cache,
show_progress,
output_dir):
_wrote_to.append(output_dir)
_wrote_to.clear()
self.ingest('bundle', environ=self.environ)
assert_equal(len(_wrote_to), 1, msg='ingest was called more than once')
ingestions = self._list_bundle()
assert_in(
_wrote_to[0],
ingestions,
msg='output_dir was not in the bundle directory',
)
return _wrote_to[0]
def test_clean_keep_last(self):
first = self._empty_ingest()
assert_equal(
self.clean('bundle', keep_last=1, environ=self.environ),
set(),
)
assert_equal(
self._list_bundle(),
{first},
msg='directory should not have changed',
)
second = self._empty_ingest()
assert_equal(
self._list_bundle(),
{first, second},
msg='two ingestions are not present',
)
assert_equal(
self.clean('bundle', keep_last=1, environ=self.environ),
{first},
)
assert_equal(
self._list_bundle(),
{second},
msg='first ingestion was not removed with keep_last=2',
)
third = self._empty_ingest()
fourth = self._empty_ingest()
fifth = self._empty_ingest()
assert_equal(
self._list_bundle(),
{second, third, fourth, fifth},
msg='larger set of ingestions did not happen correctly',
)
assert_equal(
self.clean('bundle', keep_last=2, environ=self.environ),
{second, third},
)
assert_equal(
self._list_bundle(),
{fourth, fifth},
msg='keep_last=2 did not remove the correct number of ingestions',
)
@staticmethod
def _ts_of_run(run):
return pd.Timestamp(int(run.rsplit(os.path.sep, 1)[-1]))
def test_clean_before_after(self):
first = self._empty_ingest()
assert_equal(
self.clean(
'bundle',
before=self._ts_of_run(first),
environ=self.environ,
),
set(),
)
assert_equal(
self._list_bundle(),
{first},
msg='directory should not have changed (before)',
)
assert_equal(
self.clean(
'bundle',
after=self._ts_of_run(first),
environ=self.environ,
),
set(),
)
assert_equal(
self._list_bundle(),
{first},
msg='directory should not have changed (after)',
)
assert_equal(
self.clean(
'bundle',
before=self._ts_of_run(first) + _1_ns,
environ=self.environ,
),
{first},
)
assert_equal(
self._list_bundle(),
set(),
msg='directory now be empty (before)',
)
second = self._empty_ingest()
assert_equal(
self.clean(
'bundle',
after=self._ts_of_run(second) - _1_ns,
environ=self.environ,
),
{second},
)
assert_equal(
self._list_bundle(),
set(),
msg='directory now be empty (after)',
)
third = self._empty_ingest()
fourth = self._empty_ingest()
fifth = self._empty_ingest()
sixth = self._empty_ingest()
assert_equal(
self._list_bundle(),
{third, fourth, fifth, sixth},
msg='larger set of ingestions did no happen correctly',
)
assert_equal(
self.clean(
'bundle',
before=self._ts_of_run(fourth),
after=self._ts_of_run(fifth),
environ=self.environ,
),
{third, sixth},
)
assert_equal(
self._list_bundle(),
{fourth, fifth},
msg='did not strip first and last directories',
)
+5 -3
View File
@@ -12,9 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This code is based on a unittest written by John Salvatier:
# https://github.com/pymc-devs/pymc/blob/pymc3/tests/test_examples.py
from functools import partial
import tarfile
import matplotlib
@@ -22,6 +20,7 @@ from nose_parameterized import parameterized
import pandas as pd
from zipline import examples, run_algorithm
from zipline.data.bundles import register, unregister
from zipline.testing import test_resource_path
from zipline.testing.fixtures import WithTmpDir, ZiplineTestCase
from zipline.testing.predicates import assert_equal
@@ -76,6 +75,9 @@ class ExamplesTests(WithTmpDir, ZiplineTestCase):
def init_class_fixtures(cls):
super(ExamplesTests, cls).init_class_fixtures()
register('test', lambda *args: None)
cls.add_class_callback(partial(unregister, 'test'))
with tarfile.open(test_resource_path('example_data.tar.gz')) as tar:
tar.extractall(cls.tmpdir.path)
+22 -9
View File
@@ -1,4 +1,3 @@
import datetime
import os
from functools import wraps
@@ -137,7 +136,7 @@ def ipython_only(option):
@click.option(
'-b',
'--bundle',
default='quandl',
default='quantopian-quandl',
metavar='BUNDLE-NAME',
show_default=True,
help='The data bundle to use for the simulation.',
@@ -275,26 +274,40 @@ def zipline_magic(line, cell=None):
@cli.command()
@click.argument('BUNDLE-NAME')
@click.option(
'-b',
'--bundle',
default='quantopian-quandl',
metavar='BUNDLE-NAME',
show_default=True,
help='The data bundle to ingest.',
)
@click.option(
'--show-progress/--no-show-progress',
is_flag=True,
default=True,
help='Print progress information to the terminal.'
)
def ingest(bundle_name, show_progress):
def ingest(bundle, show_progress):
"""Ingest the data for the given bundle.
"""
bundles.ingest(
bundle_name,
bundle,
os.environ,
datetime.date.today(),
pd.Timestamp.utcnow(),
show_progress,
)
@cli.command()
@click.argument('BUNDLE-NAME')
@click.option(
'-b',
'--bundle',
default='quantopian-quandl',
metavar='BUNDLE-NAME',
show_default=True,
help='The data bundle to clean.',
)
@click.option(
'-b',
'--before',
@@ -317,11 +330,11 @@ def ingest(bundle_name, show_progress):
help='Clear all but the last N downloads.'
' This may not be passed with -b / --before or -a / --after',
)
def clean(bundle_name, before, after, keep_last):
def clean(bundle, before, after, keep_last):
"""Clean up data downloaded with the ingest command.
"""
bundles.clean(
bundle_name,
bundle,
before,
after,
keep_last,
+3
View File
@@ -1,4 +1,6 @@
from . import quandl # noqa
from .core import (
UnknownBundle,
bundles,
clean,
ingest,
@@ -10,6 +12,7 @@ from .yahoo import yahoo_equities
__all__ = [
'UnknownBundle',
'bundles',
'clean',
'ingest',
+247 -214
View File
@@ -4,6 +4,7 @@ import os
import shutil
import warnings
from contextlib2 import ExitStack
import click
import pandas as pd
from toolz import curry, complement, compose
@@ -59,16 +60,26 @@ def adjustment_db_path(bundle_name, timestr, environ=None):
)
def cache_path(bundle_name, timestr, environ=None):
def cache_path(bundle_name, environ=None):
return pth.data_path(
[bundle_name, timestr, '.cache'],
[bundle_name, '.cache'],
environ=environ,
)
_BundlePayload = namedtuple(
'_BundlePayload',
'calendar opens closes minutes_per_day ingest',
'calendar opens closes minutes_per_day ingest create_writers',
)
BundleData = namedtuple(
'BundleData',
'asset_finder minute_bar_reader daily_bar_reader adjustment_reader',
)
BundleCore = namedtuple(
'BundleCore',
'bundles register unregister ingest load clean',
)
@@ -87,6 +98,33 @@ class UnknownBundle(click.ClickException, LookupError):
return self.message
class BadClean(click.ClickException, ValueError):
"""Exception indicating that an invalid argument set was passed to
``clean``.
Parameters
----------
before, after, keep_last : any
The bad arguments to ``clean``.
See Also
--------
clean
"""
def __init__(self, before, after, keep_last):
super(BadClean, self).__init__(
'Cannot pass a combination of `before` and `after` with'
'`keep_last`. Got: before=%r, after=%r, keep_n=%r\n' % (
before,
after,
keep_last,
),
)
def __str__(self):
return self.message
def _make_bundle_core():
"""Create a family of data bundle functions that read from the same
bundle mapping.
@@ -99,8 +137,12 @@ def _make_bundle_core():
The function which registers new bundles in the ``bundles`` mapping.
unregister : callable
The function which deregisters bundles from the ``bundles`` mapping.
ingest_bundle : callable
ingest : callable
The function which downloads and write data for a given data bundle.
load : callable
The function which loads the ingested bundles back into memory.
clean : callable
The function which cleans up data written with ``ingest``.
"""
_bundles = {} # the registered bundles
# Expose _bundles through a proxy so that users cannot mutate this
@@ -114,7 +156,8 @@ def _make_bundle_core():
calendar=trading_days,
opens=open_and_closes['market_open'],
closes=open_and_closes['market_close'],
minutes_per_day=390):
minutes_per_day=390,
create_writers=True):
"""Register a data bundle ingest function.
Parameters
@@ -154,6 +197,10 @@ def _make_bundle_core():
NYSE calendar.
minutes_per_day : int, optional
The number of minutes in each normal trading day.
create_writers : bool, optional
Should the ingest machinery create the writers for the ingest
function. This can be disabled as an optimization for cases where
they are not needed, like the ``quantopian-quandl`` bundle.
Notes
-----
@@ -180,6 +227,7 @@ def _make_bundle_core():
closes,
minutes_per_day,
f,
create_writers,
)
return f
@@ -208,7 +256,7 @@ def _make_bundle_core():
def ingest(name,
environ=os.environ,
timestamp=None,
show_progress=True):
show_progress=False):
"""Ingest data for a given bundle.
Parameters
@@ -232,238 +280,223 @@ def _make_bundle_core():
timestamp = pd.Timestamp.utcnow()
timestamp = timestamp.tz_convert('utc').tz_localize(None)
timestr = str(timestamp.value)
cachepath = cache_path(name, timestr, environ=environ)
cachepath = cache_path(name, environ=environ)
pth.ensure_directory(pth.data_path([name, timestr], environ=environ))
pth.ensure_directory(cachepath)
with dataframe_cache(cachepath, clean_on_failure=False) as cache, \
working_dir(
daily_equity_path(name, timestr, environ=environ),
) as daily_bars_dir, \
working_dir(
minute_equity_path(name, timestr, environ=environ),
) as minute_bars_dir, \
working_file(
asset_db_path(name, timestr, environ=environ),
) as asset_db_file, \
working_file(
adjustment_db_path(name, timestr, environ=environ),
) as adjustment_db_file:
ExitStack() as stack:
# we use `cleanup_on_failure=False` so that we don't purge the
# cache directory if the load fails in the middle
daily_bar_writer = BcolzDailyBarWriter(
daily_bars_dir.name,
bundle.calendar,
)
# Do an empty write to ensure that the daily ctables exist
# when we create the SQLiteAdjustmentWriter below. The
# SQLiteAdjustmentWriter needs to open the daily ctables so that
# it can compute the adjustment ratios for the dividends.
daily_bar_writer.write(())
bundle.ingest(
environ,
AssetDBWriter(asset_db_file.name),
BcolzMinuteBarWriter(
if bundle.create_writers:
daily_bars_path = stack.enter_context(working_dir(
daily_equity_path(name, timestr, environ=environ),
)).path
daily_bar_writer = BcolzDailyBarWriter(
daily_bars_path,
bundle.calendar,
)
# Do an empty write to ensure that the daily ctables exist
# when we create the SQLiteAdjustmentWriter below. The
# SQLiteAdjustmentWriter needs to open the daily ctables so
# that it can compute the adjustment ratios for the dividends.
daily_bar_writer.write(())
minute_bar_writer = BcolzMinuteBarWriter(
bundle.calendar[0],
minute_bars_dir.name,
stack.enter_context(working_dir(
minute_equity_path(name, timestr, environ=environ),
)).path,
bundle.opens,
bundle.closes,
minutes_per_day=bundle.minutes_per_day,
),
daily_bar_writer,
SQLiteAdjustmentWriter(
adjustment_db_file.name,
BcolzDailyBarReader(daily_bars_dir.name),
)
asset_db_writer = AssetDBWriter(
stack.enter_context(working_file(
asset_db_path(name, timestr, environ=environ),
)).path,
)
adjustment_db_writer = SQLiteAdjustmentWriter(
stack.enter_context(working_file(
adjustment_db_path(name, timestr, environ=environ),
)).path,
BcolzDailyBarReader(daily_bars_path),
bundle.calendar,
overwrite=True,
),
)
else:
daily_bar_writer = None
minute_bar_writer = None
asset_db_writer = None
adjustment_db_writer = None
bundle.ingest(
environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_db_writer,
bundle.calendar,
cache,
show_progress,
pth.data_path([name, timestr], environ=environ),
)
return bundles, register, unregister, ingest
def most_recent_data(bundle_name, timestamp, environ=None):
"""Get the path to the most recent data after ``date``for the
given bundle.
Parameters
----------
bundle_name : str
The name of the bundle to lookup.
timestamp : datetime
The timestamp to begin searching on or before.
environ : dict, optional
An environment dict to forward to zipline_root.
"""
if bundle_name not in bundles:
raise UnknownBundle(bundle_name)
bundles, register, unregister, ingest = _make_bundle_core()
try:
candidates = os.listdir(
pth.data_path([bundle_name], environ=environ),
)
return pth.data_path(
[bundle_name,
max(
filter(complement(pth.hidden), candidates),
key=compose(pd.Timestamp, int),
)],
environ=environ,
)
except (ValueError, OSError) as e:
if getattr(e, 'errno', ~errno.ENOENT) != errno.ENOENT:
raise
raise ValueError(
'no data for bundle %r on or before %s' % (
bundle_name,
timestamp,
),
)
BundleData = namedtuple(
'BundleData',
'asset_finder minute_bar_reader daily_bar_reader adjustment_reader',
)
def load(name, environ=os.environ, timestamp=None):
"""Loads a previously ingested bundle.
Parameters
----------
name : str
The name of the bundle.
environ : mapping, optional
The environment variables. Defaults of os.environ.
timestamp : datetime, optional
The timestamp of the data to lookup.
Defaults to the current time.
def most_recent_data(bundle_name, timestamp, environ=None):
"""Get the path to the most recent data after ``date``for the given bundle.
Parameters
----------
bundle_name : str
The name of the bundle to lookup.
timestamp : datetime
The timestamp to begin searching on or before.
environ : dict, optional
An environment dict to forward to zipline_root.
"""
try:
candidates = os.listdir(pth.data_path([bundle_name], environ=environ))
return pth.data_path(
[bundle_name,
max(
filter(complement(pth.hidden), candidates),
key=compose(pd.Timestamp, int),
)],
environ=environ,
)
except ValueError:
raise ValueError(
'no data for bundle %r on or before %s' % (
bundle_name,
timestamp,
Returns
-------
bundle_data : BundleData
The raw data readers for this bundle.
"""
if timestamp is None:
timestamp = pd.Timestamp.utcnow()
timestr = most_recent_data(name, timestamp, environ=environ)
return BundleData(
asset_finder=AssetFinder(
asset_db_path(name, timestr, environ=environ),
),
minute_bar_reader=BcolzMinuteBarReader(
minute_equity_path(name, timestr, environ=environ),
),
daily_bar_reader=BcolzDailyBarReader(
daily_equity_path(name, timestr, environ=environ),
),
adjustment_reader=SQLiteAdjustmentReader(
adjustment_db_path(name, timestr, environ=environ),
),
)
except OSError as e:
if e.errno != errno.ENOENT:
raise
raise UnknownBundle(bundle_name)
def load(name, environ=os.environ, timestamp=None):
"""Loads a previously ingested bundle.
Parameters
----------
name : str
The name of the bundle.
environ : mapping, optional
The environment variables. Defaults of os.environ.
timestamp : datetime, optional
The timestamp of the data to lookup.
Defaults to the current time.
Returns
-------
bundle_data : BundleData
The raw data readers for this bundle.
"""
if timestamp is None:
timestamp = pd.Timestamp.utcnow()
timestr = most_recent_data(name, timestamp, environ=environ)
return BundleData(
asset_finder=AssetFinder(
asset_db_path(name, timestr, environ=environ),
),
minute_bar_reader=BcolzMinuteBarReader(
minute_equity_path(name, timestr, environ=environ),
),
daily_bar_reader=BcolzDailyBarReader(
daily_equity_path(name, timestr, environ=environ),
),
adjustment_reader=SQLiteAdjustmentReader(
adjustment_db_path(name, timestr, environ=environ),
),
@preprocess(
before=optionally(ensure_timestamp),
after=optionally(ensure_timestamp),
)
def clean(name,
before=None,
after=None,
keep_last=None,
environ=os.environ):
"""Clean up data that was created with ``ingest`` or
``$ python -m zipline ingest``
Parameters
----------
name : str
The name of the bundle to remove data for.
before : datetime, optional
Remove data ingested before this date.
This argument is mutually exclusive with: keep_last
after : datetime, optional
Remove data ingested after this date.
This argument is mutually exclusive with: keep_last
keep_last : int, optional
Remove all but the last ``keep_last`` ingestions.
This argument is mutually exclusive with:
before
after
environ : mapping, optional
The environment variables. Defaults of os.environ.
Returns
-------
cleaned : set[str]
The names of the runs that were removed.
Raises
------
BadClean
Raised when ``before`` and or ``after`` are passed with
``keep_last``. This is a subclass of ``ValueError``.
"""
try:
all_runs = sorted(
filter(
complement(pth.hidden),
os.listdir(pth.data_path([name], environ=environ)),
),
key=compose(pd.Timestamp, int),
)
except OSError as e:
if e.errno != errno.ENOENT:
raise
raise UnknownBundle(name)
if ((before is not None or after is not None) and
keep_last is not None):
raise BadClean(before, after, keep_last)
if keep_last is None:
def should_clean(name):
dt = pd.Timestamp(int(name))
return (
(before is not None and dt < before) or
(after is not None and dt > after)
)
else:
last_n_dts = set(all_runs[-keep_last:])
def should_clean(name):
return name not in last_n_dts
cleaned = set()
for run in all_runs:
if should_clean(run):
path = pth.data_path([name, run], environ=environ)
shutil.rmtree(path)
cleaned.add(path)
return cleaned
return BundleCore(bundles, register, unregister, ingest, load, clean)
class BadClean(click.ClickException, ValueError):
"""Exception indicating that an invalid argument set was passed to
``clean``.
Parameters
----------
before, after, keep_last : any
The bad arguments to ``clean``.
See Also
--------
clean
"""
def __init__(self, before, after, keep_last):
super(BadClean, self).__init__(
'Cannot pass a combination of `before` and `after` with'
'`keep_last`. Got: before=%r, after=%r, keep_n=%r\n' % (
before,
after,
keep_last,
),
)
def __str__(self):
return self.message
@preprocess(
before=optionally(ensure_timestamp),
after=optionally(ensure_timestamp),
)
def clean(name, before=None, after=None, keep_last=None, environ=os.environ):
"""Clean up data that was created with ``ingest`` or
``$ python -m zipline ingest``
Parameters
----------
name : str
The name of the bundle to remove data for.
before : datetime, optional
Remove data ingested before this date.
This argument is mutually exclusive with: keep_last
after : datetime, optional
Remove data ingested after this date.
This argument is mutually exclusive with: keep_last
keep_last : int, optional
Remove all but the last ``keep_last`` ingestions.
This argument is mutually exclusive with:
before
after
Returns
-------
cleaned : set[str]
The names of the runs that were removed.
Raises
------
BadClean
Raised when ``before`` and or ``after`` are passed with ``keep_last``.
This is a subclass of ``ValueError``.
"""
try:
all_runs = sorted(
pd.Timestamp(f)
for f in os.listdir(pth.data_path([name], environ=environ))
if not pth.hidden(f)
)
except OSError as e:
if e.errno != errno.ENOENT:
raise
raise UnknownBundle(name)
if (before is not None or after is not None) and keep_last is not None:
raise BadClean(before, after, keep_last)
if keep_last is None:
def in_last_n(dt):
return False
else:
last_n_dts = set(all_runs[:keep_last])
def in_last_n(dt):
return dt in last_n_dts
def should_clean(name):
dt = pd.Timestamp(name)
return (
(
(before is not None and dt < before) or
(after is not None and dt > after)
) and
not in_last_n(dt)
)
cleaned = set()
for run in all_runs:
if should_clean(run):
shutil.rmdir(run)
cleaned.add(run)
return cleaned
bundles, register, unregister, ingest, load, clean = _make_bundle_core()
+30 -3
View File
@@ -1,15 +1,19 @@
"""
Module for building a complete daily dataset from Quandl's WIKI dataset.
"""
from contextlib import closing
from io import BytesIO
from itertools import count
import tarfile
from time import time, sleep
from logbook import Logger
import pandas as pd
from six.moves.urllib.parse import urlencode
from six.moves.urllib.request import urlopen
from . import core as bundles
from zipline.utils.cli import maybe_show_progress
from zipline.data import bundles
log = Logger(__name__)
seconds_per_call = (pd.Timedelta('10 minutes') / 2000).total_seconds()
@@ -260,12 +264,13 @@ def gen_symbol_data(api_key,
@bundles.register('quandl')
def quandl_bundle(environ,
asset_db_writer,
minute_bar_writer, # unused
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
cache,
show_progress):
show_progress,
output_dir):
"""Build a zipline data bundle from the Quandl WIKI dataset.
"""
api_key = environ.get('QUANDL_API_KEY')
@@ -291,8 +296,30 @@ def quandl_bundle(environ,
dividends,
environ.get('QUANDL_DOWNLOAD_ATTEMPTS', 5),
),
show_progress=show_progress,
)
adjustment_writer.write(
splits=pd.concat(splits, ignore_index=True),
dividends=pd.concat(dividends, ignore_index=True),
)
QUANTOPIAN_QUANDL_URL = (
'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl'
)
@bundles.register('quantopian-quandl', create_writers=False)
def quantopian_quandl_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
cache,
show_progress,
output_dir):
# use closing for py2 compat
with closing(urlopen(QUANTOPIAN_QUANDL_URL)) as f, \
tarfile.open('r', fileobj=BytesIO(f.read())) as tar:
tar.extractall(output_dir)
+1
View File
@@ -62,6 +62,7 @@ def yahoo_equities(symbols, start=None, end=None):
calendar,
cache,
show_progress,
output_dir,
# pass these as defaults to make them 'nonlocal' in py2
start=start,
end=end):
+32 -3
View File
@@ -280,6 +280,13 @@ class working_file(object):
self._tmpfile = NamedTemporaryFile(*args, **kwargs)
self._final_path = final_path
@property
def path(self):
"""Alias for ``name`` to be consistent with
:class:`~zipline.utils.cache.working_dir`.
"""
return self._tmpfile.name
def _commit(self):
"""Sync the temporary file to the final path.
"""
@@ -316,13 +323,35 @@ class working_dir(object):
meaning it has as strong of guarantees as :func:`shutil.copytree`.
"""
def __init__(self, final_path, *args, **kwargs):
self.name = mkdtemp()
self.path = mkdtemp()
self._final_path = final_path
def mkdir(self, *path_parts):
"""Create a subdirectory of the working directory.
Parameters
----------
path_parts : iterable[str]
The parts of the path after the working directory.
"""
path = self.getpath(*path_parts)
os.mkdir(path)
return path
def getpath(self, *path_parts):
"""Get a path relative to the working directory.
Parameters
----------
path_parts : iterable[str]
The parts of the path after the working directory.
"""
return os.path.join(self.path, *path_parts)
def _commit(self):
"""Sync the temporary directory to the final path.
"""
copytree(self.name, self._final_path)
copytree(self.path, self._final_path)
def __enter__(self):
return self
@@ -330,4 +359,4 @@ class working_dir(object):
def __exit__(self, *exc_info):
if exc_info[0] is None:
self._commit()
rmtree(self.name)
rmtree(self.path)
+3 -3
View File
@@ -263,7 +263,7 @@ def run_algorithm(start,
``bundle_timestamp``
bundle : str, optional
The name of the data bundle to use to load the data to run the backtest
with. This defaults to 'quandl'.
with. This defaults to 'quantopian-quandl'.
This argument is mutually exclusive with ``data``.
bundle_timestamp : datetime, optional
The datetime to lookup the bundle data for. This defaults to the
@@ -299,8 +299,8 @@ def run_algorithm(start,
'bundle': bundle,
})
if not non_none_data:
# if neither data nor bundle are passed use 'quandl'
bundle = 'quandl'
# if neither data nor bundle are passed use 'quantopian-quandl'
bundle = 'quantopian-quandl'
if len(non_none_data) != 1:
raise ValueError(