From be2fe3558325d3416fa2e650257d856e13feb3b7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 18 Jun 2017 19:13:30 -0700 Subject: [PATCH] Crypto integration + multiple pipeline bundles --- .gitignore | 3 + zipline/algorithm.py | 6 +- zipline/data/bundles/__init__.py | 2 + zipline/data/bundles/catalyst.py | 44 + zipline/data/data_portal.py | 2 + zipline/examples/momentum_pipeline.py | 11 +- zipline/pipeline/data/__init__.py | 2 + zipline/pipeline/data/crypto_pricing.py | 15 + zipline/pipeline/data/equity_pricing.py | 12 +- zipline/pipeline/factors/__init__.py | 44 - zipline/pipeline/factors/crypto/__init__.py | 47 + zipline/pipeline/factors/crypto/technical.py | 812 ++++++++++++++++++ zipline/pipeline/factors/equity/__init__.py | 47 + .../factors/{ => equity}/technical.py | 2 +- zipline/pipeline/loaders/__init__.py | 2 + zipline/pipeline/loaders/base.py | 4 + .../pipeline/loaders/crypto_pricing_loader.py | 136 +++ .../pipeline/loaders/equity_pricing_loader.py | 7 +- zipline/utils/calendars/calendar_utils.py | 5 + .../utils/calendars/exchange_calendar_open.py | 29 + zipline/utils/events.py | 5 +- zipline/utils/run_algo.py | 107 ++- 22 files changed, 1250 insertions(+), 94 deletions(-) create mode 100644 zipline/data/bundles/catalyst.py create mode 100644 zipline/pipeline/data/crypto_pricing.py create mode 100644 zipline/pipeline/factors/crypto/__init__.py create mode 100644 zipline/pipeline/factors/crypto/technical.py create mode 100644 zipline/pipeline/factors/equity/__init__.py rename zipline/pipeline/factors/{ => equity}/technical.py (99%) create mode 100644 zipline/pipeline/loaders/crypto_pricing_loader.py create mode 100644 zipline/utils/calendars/exchange_calendar_open.py diff --git a/.gitignore b/.gitignore index 8ca672cf..af955fbd 100644 --- a/.gitignore +++ b/.gitignore @@ -71,4 +71,7 @@ zipline.iml # PyCharm custom settings .idea +# Pickle files +*.pickle + TAGS diff --git a/zipline/algorithm.py b/zipline/algorithm.py index 44f2a1df..afe809b2 100644 --- a/zipline/algorithm.py +++ b/zipline/algorithm.py @@ -289,7 +289,7 @@ class TradingAlgorithm(object): # If a schedule has been provided, pop it. Otherwise, use NYSE. self.trading_calendar = kwargs.pop( 'trading_calendar', - get_calendar("NYSE") + get_calendar('NYSE') ) self.sim_params = kwargs.pop('sim_params', None) @@ -1114,6 +1114,8 @@ class TradingAlgorithm(object): # TradingSchedule class, so this is unlikely to be hit if calendar is None: cal = self.trading_calendar + elif calendar is calendars.CRYPTO_ASSETS: + cal = get_environment('OPEN') elif calendar is calendars.US_EQUITIES: cal = get_calendar('NYSE') elif calendar is calendars.US_FUTURES: @@ -1122,7 +1124,7 @@ class TradingAlgorithm(object): raise ScheduleFunctionInvalidCalendar( given_calendar=calendar, allowed_calendars=( - '[calendars.US_EQUITIES, calendars.US_FUTURES]' + '[calendars.CRYPTO_ASSETS, calendars.US_EQUITIES, calendars.US_FUTURES]' ), ) diff --git a/zipline/data/bundles/__init__.py b/zipline/data/bundles/__init__.py index 40746c6c..c75bae5a 100644 --- a/zipline/data/bundles/__init__.py +++ b/zipline/data/bundles/__init__.py @@ -1,5 +1,6 @@ # These imports are necessary to force module-scope register calls to happen. from . import quandl # noqa +from . import catalyst from .core import ( UnknownBundle, bundles, @@ -15,6 +16,7 @@ from .core import ( from .yahoo import yahoo_equities + __all__ = [ 'UnknownBundle', 'bundles', diff --git a/zipline/data/bundles/catalyst.py b/zipline/data/bundles/catalyst.py new file mode 100644 index 00000000..f7b2ca2a --- /dev/null +++ b/zipline/data/bundles/catalyst.py @@ -0,0 +1,44 @@ +import tarfile + +from .quandl import ( + ONE_MEGABYTE, + download_with_progress, + download_without_progress, +) + +from . import core as bundles + +CATALYST_URL = ( + 'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl' +) + +@bundles.register( + 'catalyst', + calendar_name='NYSE', + minutes_per_day=390, + create_writers=False, +) +def catalyst_bundle(environ, + asset_db_writer, + minute_bar_writer, + daily_bar_writer, + adjustment_writer, + calendar, + start_session, + end_session, + cache, + show_progress, + output_dir): + if show_progress: + data = download_with_progress( + CATALYST_URL, + chunk_size=ONE_MEGABYTE, + label="Downloading Bundle: catalyst", + ) + else: + data = download_without_progress(CATALYST_URL) + + with tarfile.open('r', fileobj=data) as tar: + if show_progress: + print("Writing data to %s." % output_dir) + tar.extractall(output_dir) diff --git a/zipline/data/data_portal.py b/zipline/data/data_portal.py index 65d614d8..7d1ec286 100644 --- a/zipline/data/data_portal.py +++ b/zipline/data/data_portal.py @@ -154,6 +154,8 @@ class DataPortal(object): minute_history_prefetch_length=_DEF_M_HIST_PREFETCH, daily_history_prefetch_length=_DEF_D_HIST_PREFETCH): + print 'trading_calendar: {0}'.format(trading_calendar) + self.trading_calendar = trading_calendar self.asset_finder = asset_finder diff --git a/zipline/examples/momentum_pipeline.py b/zipline/examples/momentum_pipeline.py index 73e8da03..a6a3d005 100644 --- a/zipline/examples/momentum_pipeline.py +++ b/zipline/examples/momentum_pipeline.py @@ -12,15 +12,18 @@ from zipline.api import ( schedule_function, ) from zipline.pipeline import Pipeline -from zipline.pipeline.factors import RSI +from zipline.pipeline.factors.crypto import RSI as cRSI +from zipline.pipeline.factors.equity import RSI as eRSI def make_pipeline(): - rsi = RSI() + crsi = cRSI() + ersi = eRSI() return Pipeline( columns={ - 'longs': rsi.top(3), - 'shorts': rsi.bottom(3), + 'longs': crsi.top(3), + 'shorts': crsi.bottom(3), + 'equity': ersi.top(3), }, ) diff --git a/zipline/pipeline/data/__init__.py b/zipline/pipeline/data/__init__.py index b4066453..2d03d8b2 100644 --- a/zipline/pipeline/data/__init__.py +++ b/zipline/pipeline/data/__init__.py @@ -1,4 +1,5 @@ from .equity_pricing import USEquityPricing +from .crypto_pricing import CryptoPricing from .dataset import DataSet, Column, BoundColumn __all__ = [ @@ -6,4 +7,5 @@ __all__ = [ 'Column', 'DataSet', 'USEquityPricing', + 'CryptoPricing', ] diff --git a/zipline/pipeline/data/crypto_pricing.py b/zipline/pipeline/data/crypto_pricing.py new file mode 100644 index 00000000..1e97d08f --- /dev/null +++ b/zipline/pipeline/data/crypto_pricing.py @@ -0,0 +1,15 @@ + +from zipline.utils.numpy_utils import float64_dtype + +from .dataset import Column, DataSet + + +class CryptoPricing(DataSet): + """ + Dataset representing daily trading prices and volumes of crypto-assets. + """ + open = Column(float64_dtype) + high = Column(float64_dtype) + low = Column(float64_dtype) + close = Column(float64_dtype) + volume = Column(float64_dtype) diff --git a/zipline/pipeline/data/equity_pricing.py b/zipline/pipeline/data/equity_pricing.py index aacb0e95..e81489d9 100644 --- a/zipline/pipeline/data/equity_pricing.py +++ b/zipline/pipeline/data/equity_pricing.py @@ -1,6 +1,4 @@ -""" -Dataset representing OHLCV data. -""" + from zipline.utils.numpy_utils import float64_dtype from .dataset import Column, DataSet @@ -10,8 +8,8 @@ class USEquityPricing(DataSet): """ Dataset representing daily trading prices and volumes. """ - open = Column(float64_dtype) - high = Column(float64_dtype) - low = Column(float64_dtype) - close = Column(float64_dtype) + open = Column(float64_dtype) + high = Column(float64_dtype) + low = Column(float64_dtype) + close = Column(float64_dtype) volume = Column(float64_dtype) diff --git a/zipline/pipeline/factors/__init__.py b/zipline/pipeline/factors/__init__.py index 0cd539c8..57ae0518 100644 --- a/zipline/pipeline/factors/__init__.py +++ b/zipline/pipeline/factors/__init__.py @@ -13,59 +13,15 @@ from .statistical import ( RollingPearsonOfReturns, RollingSpearmanOfReturns, ) -from .technical import ( - AnnualizedVolatility, - Aroon, - AverageDollarVolume, - BollingerBands, - EWMA, - EWMSTD, - ExponentialWeightedMovingAverage, - ExponentialWeightedMovingStdDev, - FastStochasticOscillator, - IchimokuKinkoHyo, - LinearWeightedMovingAverage, - MACDSignal, - MaxDrawdown, - MovingAverageConvergenceDivergenceSignal, - RateOfChangePercentage, - Returns, - RSI, - SimpleMovingAverage, - TrueRange, - VWAP, - WeightedAverageValue, -) __all__ = [ - 'AnnualizedVolatility', - 'Aroon', - 'AverageDollarVolume', - 'BollingerBands', 'BusinessDaysSincePreviousEvent', 'BusinessDaysUntilNextEvent', 'CustomFactor', - 'EWMA', - 'EWMSTD', - 'ExponentialWeightedMovingAverage', - 'ExponentialWeightedMovingStdDev', 'Factor', - 'FastStochasticOscillator', - 'IchimokuKinkoHyo', 'Latest', - 'LinearWeightedMovingAverage', - 'MACDSignal', - 'MaxDrawdown', - 'MovingAverageConvergenceDivergenceSignal', - 'RateOfChangePercentage', 'RecarrayField', - 'Returns', 'RollingLinearRegressionOfReturns', 'RollingPearsonOfReturns', 'RollingSpearmanOfReturns', - 'RSI', - 'SimpleMovingAverage', - 'TrueRange', - 'VWAP', - 'WeightedAverageValue', ] diff --git a/zipline/pipeline/factors/crypto/__init__.py b/zipline/pipeline/factors/crypto/__init__.py new file mode 100644 index 00000000..aa16cfbe --- /dev/null +++ b/zipline/pipeline/factors/crypto/__init__.py @@ -0,0 +1,47 @@ +from .technical import ( + AnnualizedVolatility, + Aroon, + AverageDollarVolume, + BollingerBands, + EWMA, + EWMSTD, + ExponentialWeightedMovingAverage, + ExponentialWeightedMovingStdDev, + FastStochasticOscillator, + IchimokuKinkoHyo, + LinearWeightedMovingAverage, + MACDSignal, + MaxDrawdown, + MovingAverageConvergenceDivergenceSignal, + RateOfChangePercentage, + Returns, + RSI, + SimpleMovingAverage, + TrueRange, + VWAP, + WeightedAverageValue, +) + +__all__ = [ + 'AnnualizedVolatility', + 'Aroon', + 'AverageDollarVolume', + 'BollingerBands', + 'EWMA', + 'EWMSTD', + 'ExponentialWeightedMovingAverage', + 'ExponentialWeightedMovingStdDev', + 'FastStochasticOscillator', + 'IchimokuKinkoHyo', + 'LinearWeightedMovingAverage', + 'MACDSignal', + 'MaxDrawdown', + 'MovingAverageConvergenceDivergenceSignal', + 'RateOfChangePercentage', + 'Returns', + 'RSI', + 'SimpleMovingAverage', + 'TrueRange', + 'VWAP', + 'WeightedAverageValue', +] diff --git a/zipline/pipeline/factors/crypto/technical.py b/zipline/pipeline/factors/crypto/technical.py new file mode 100644 index 00000000..7bd754e2 --- /dev/null +++ b/zipline/pipeline/factors/crypto/technical.py @@ -0,0 +1,812 @@ +""" +Technical Analysis Factors +-------------------------- +""" +from __future__ import division + +from numbers import Number +from numpy import ( + abs, + arange, + average, + clip, + diff, + dstack, + exp, + fmax, + full, + inf, + isnan, + log, + NINF, + sqrt, + sum as np_sum, +) +from numexpr import evaluate + +from zipline.pipeline.data import CryptoPricing +from zipline.pipeline.mixins import SingleInputMixin +from zipline.utils.input_validation import expect_bounded, expect_types +from zipline.utils.math_utils import ( + nanargmax, + nanargmin, + nanmax, + nanmean, + nanstd, + nansum, + nanmin, +) +from zipline.utils.numpy_utils import ( + float64_dtype, + ignore_nanwarnings, + rolling_window, +) +from ..factor import CustomFactor + + +class Returns(CustomFactor): + """ + Calculates the percent change in close price over the given window_length. + + **Default Inputs**: [CryptoPricing.close] + """ + inputs = [CryptoPricing.close] + window_safe = True + + def _validate(self): + super(Returns, self)._validate() + if self.window_length < 2: + raise ValueError( + "'Returns' expected a window length of at least 2, but was " + "given {window_length}. For daily returns, use a window " + "length of 2.".format(window_length=self.window_length) + ) + + def compute(self, today, assets, out, close): + out[:] = (close[-1] - close[0]) / close[0] + + +class RSI(CustomFactor, SingleInputMixin): + """ + Relative Strength Index + + **Default Inputs**: [CryptoPricing.close] + + **Default Window Length**: 15 + """ + window_length = 15 + inputs = (CryptoPricing.close,) + + def compute(self, today, assets, out, closes): + diffs = diff(closes, axis=0) + ups = nanmean(clip(diffs, 0, inf), axis=0) + downs = abs(nanmean(clip(diffs, -inf, 0), axis=0)) + return evaluate( + "100 - (100 / (1 + (ups / downs)))", + local_dict={'ups': ups, 'downs': downs}, + global_dict={}, + out=out, + ) + + +class SimpleMovingAverage(CustomFactor, SingleInputMixin): + """ + Average Value of an arbitrary column + + **Default Inputs**: None + + **Default Window Length**: None + """ + # numpy's nan functions throw warnings when passed an array containing only + # nans, but they still returns the desired value (nan), so we ignore the + # warning. + ctx = ignore_nanwarnings() + + def compute(self, today, assets, out, data): + out[:] = nanmean(data, axis=0) + + +class WeightedAverageValue(CustomFactor): + """ + Helper for VWAP-like computations. + + **Default Inputs:** None + + **Default Window Length:** None + """ + def compute(self, today, assets, out, base, weight): + out[:] = nansum(base * weight, axis=0) / nansum(weight, axis=0) + + +class VWAP(WeightedAverageValue): + """ + Volume Weighted Average Price + + **Default Inputs:** [CryptoPricing.close, CryptoPricing.volume] + + **Default Window Length:** None + """ + inputs = (CryptoPricing.close, CryptoPricing.volume) + + +class MaxDrawdown(CustomFactor, SingleInputMixin): + """ + Max Drawdown + + **Default Inputs:** None + + **Default Window Length:** None + """ + ctx = ignore_nanwarnings() + + def compute(self, today, assets, out, data): + drawdowns = fmax.accumulate(data, axis=0) - data + drawdowns[isnan(drawdowns)] = NINF + drawdown_ends = nanargmax(drawdowns, axis=0) + + # TODO: Accelerate this loop in Cython or Numba. + for i, end in enumerate(drawdown_ends): + peak = nanmax(data[:end + 1, i]) + out[i] = (peak - data[end, i]) / data[end, i] + + +class AverageDollarVolume(CustomFactor): + """ + Average Daily Dollar Volume + + **Default Inputs:** [CryptoPricing.close, CryptoPricing.volume] + + **Default Window Length:** None + """ + inputs = [CryptoPricing.close, CryptoPricing.volume] + + def compute(self, today, assets, out, close, volume): + out[:] = nansum(close * volume, axis=0) / len(close) + + +def exponential_weights(length, decay_rate): + """ + Build a weight vector for an exponentially-weighted statistic. + + The resulting ndarray is of the form:: + + [decay_rate ** length, ..., decay_rate ** 2, decay_rate] + + Parameters + ---------- + length : int + The length of the desired weight vector. + decay_rate : float + The rate at which entries in the weight vector increase or decrease. + + Returns + ------- + weights : ndarray[float64] + """ + return full(length, decay_rate, float64_dtype) ** arange(length + 1, 1, -1) + + +class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor): + """ + Base class for factors implementing exponential-weighted operations. + + **Default Inputs:** None + + **Default Window Length:** None + + Parameters + ---------- + inputs : length-1 list or tuple of BoundColumn + The expression over which to compute the average. + window_length : int > 0 + Length of the lookback window over which to compute the average. + decay_rate : float, 0 < decay_rate <= 1 + Weighting factor by which to discount past observations. + + When calculating historical averages, rows are multiplied by the + sequence:: + + decay_rate, decay_rate ** 2, decay_rate ** 3, ... + + Methods + ------- + weights + from_span + from_halflife + from_center_of_mass + """ + params = ('decay_rate',) + + @classmethod + @expect_types(span=Number) + def from_span(cls, inputs, window_length, span, **kwargs): + """ + Convenience constructor for passing `decay_rate` in terms of `span`. + + Forwards `decay_rate` as `1 - (2.0 / (1 + span))`. This provides the + behavior equivalent to passing `span` to pandas.ewma. + + Examples + -------- + .. code-block:: python + + # Equivalent to: + # my_ewma = EWMA( + # inputs=[CryptoPricing.close], + # window_length=30, + # decay_rate=(1 - (2.0 / (1 + 15.0))), + # ) + my_ewma = EWMA.from_span( + inputs=[CryptoPricing.close], + window_length=30, + span=15, + ) + + Notes + ----- + This classmethod is provided by both + :class:`ExponentialWeightedMovingAverage` and + :class:`ExponentialWeightedMovingStdDev`. + """ + if span <= 1: + raise ValueError( + "`span` must be a positive number. %s was passed." % span + ) + + decay_rate = (1.0 - (2.0 / (1.0 + span))) + assert 0.0 < decay_rate <= 1.0 + + return cls( + inputs=inputs, + window_length=window_length, + decay_rate=decay_rate, + **kwargs + ) + + @classmethod + @expect_types(halflife=Number) + def from_halflife(cls, inputs, window_length, halflife, **kwargs): + """ + Convenience constructor for passing ``decay_rate`` in terms of half + life. + + Forwards ``decay_rate`` as ``exp(log(.5) / halflife)``. This provides + the behavior equivalent to passing `halflife` to pandas.ewma. + + Examples + -------- + .. code-block:: python + + # Equivalent to: + # my_ewma = EWMA( + # inputs=[CryptoPricing.close], + # window_length=30, + # decay_rate=np.exp(np.log(0.5) / 15), + # ) + my_ewma = EWMA.from_halflife( + inputs=[CryptoPricing.close], + window_length=30, + halflife=15, + ) + + Notes + ----- + This classmethod is provided by both + :class:`ExponentialWeightedMovingAverage` and + :class:`ExponentialWeightedMovingStdDev`. + """ + if halflife <= 0: + raise ValueError( + "`span` must be a positive number. %s was passed." % halflife + ) + decay_rate = exp(log(.5) / halflife) + assert 0.0 < decay_rate <= 1.0 + + return cls( + inputs=inputs, + window_length=window_length, + decay_rate=decay_rate, + **kwargs + ) + + @classmethod + def from_center_of_mass(cls, + inputs, + window_length, + center_of_mass, + **kwargs): + """ + Convenience constructor for passing `decay_rate` in terms of center of + mass. + + Forwards `decay_rate` as `1 - (1 / 1 + center_of_mass)`. This provides + behavior equivalent to passing `center_of_mass` to pandas.ewma. + + Examples + -------- + .. code-block:: python + + # Equivalent to: + # my_ewma = EWMA( + # inputs=[CryptoPricing.close], + # window_length=30, + # decay_rate=(1 - (1 / 15.0)), + # ) + my_ewma = EWMA.from_center_of_mass( + inputs=[CryptoPricing.close], + window_length=30, + center_of_mass=15, + ) + + Notes + ----- + This classmethod is provided by both + :class:`ExponentialWeightedMovingAverage` and + :class:`ExponentialWeightedMovingStdDev`. + """ + return cls( + inputs=inputs, + window_length=window_length, + decay_rate=(1.0 - (1.0 / (1.0 + center_of_mass))), + **kwargs + ) + + +class ExponentialWeightedMovingAverage(_ExponentialWeightedFactor): + """ + Exponentially Weighted Moving Average + + **Default Inputs:** None + + **Default Window Length:** None + + Parameters + ---------- + inputs : length-1 list/tuple of BoundColumn + The expression over which to compute the average. + window_length : int > 0 + Length of the lookback window over which to compute the average. + decay_rate : float, 0 < decay_rate <= 1 + Weighting factor by which to discount past observations. + + When calculating historical averages, rows are multiplied by the + sequence:: + + decay_rate, decay_rate ** 2, decay_rate ** 3, ... + + Notes + ----- + - This class can also be imported under the name ``EWMA``. + + See Also + -------- + :func:`pandas.ewma` + """ + def compute(self, today, assets, out, data, decay_rate): + out[:] = average( + data, + axis=0, + weights=exponential_weights(len(data), decay_rate), + ) + + +class LinearWeightedMovingAverage(CustomFactor, SingleInputMixin): + """ + Weighted Average Value of an arbitrary column + + **Default Inputs**: None + + **Default Window Length**: None + """ + # numpy's nan functions throw warnings when passed an array containing only + # nans, but they still returns the desired value (nan), so we ignore the + # warning. + ctx = ignore_nanwarnings() + + def compute(self, today, assets, out, data): + ndays = data.shape[0] + + # Initialize weights array + weights = arange(1, ndays + 1, dtype=float64_dtype).reshape(ndays, 1) + + # Compute normalizer + normalizer = (ndays * (ndays + 1)) / 2 + + # Weight the data + weighted_data = data * weights + + # Compute weighted averages + out[:] = nansum(weighted_data, axis=0) / normalizer + + +class ExponentialWeightedMovingStdDev(_ExponentialWeightedFactor): + """ + Exponentially Weighted Moving Standard Deviation + + **Default Inputs:** None + + **Default Window Length:** None + + Parameters + ---------- + inputs : length-1 list/tuple of BoundColumn + The expression over which to compute the average. + window_length : int > 0 + Length of the lookback window over which to compute the average. + decay_rate : float, 0 < decay_rate <= 1 + Weighting factor by which to discount past observations. + + When calculating historical averages, rows are multiplied by the + sequence:: + + decay_rate, decay_rate ** 2, decay_rate ** 3, ... + + Notes + ----- + - This class can also be imported under the name ``EWMSTD``. + + See Also + -------- + :func:`pandas.ewmstd` + """ + + def compute(self, today, assets, out, data, decay_rate): + weights = exponential_weights(len(data), decay_rate) + + mean = average(data, axis=0, weights=weights) + variance = average((data - mean) ** 2, axis=0, weights=weights) + + squared_weight_sum = (np_sum(weights) ** 2) + bias_correction = ( + squared_weight_sum / (squared_weight_sum - np_sum(weights ** 2)) + ) + out[:] = sqrt(variance * bias_correction) + + +class BollingerBands(CustomFactor): + """ + Bollinger Bands technical indicator. + https://en.wikipedia.org/wiki/Bollinger_Bands + + **Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.close` + + Parameters + ---------- + inputs : length-1 iterable[BoundColumn] + The expression over which to compute bollinger bands. + window_length : int > 0 + Length of the lookback window over which to compute the bollinger + bands. + k : float + The number of standard deviations to add or subtract to create the + upper and lower bands. + """ + params = ('k',) + inputs = (CryptoPricing.close,) + outputs = 'lower', 'middle', 'upper' + + def compute(self, today, assets, out, close, k): + difference = k * nanstd(close, axis=0) + out.middle = middle = nanmean(close, axis=0) + out.upper = middle + difference + out.lower = middle - difference + + +class Aroon(CustomFactor): + """ + Aroon technical indicator. + https://www.fidelity.com/learning-center/trading-investing/technical-analysis/technical-indicator-guide/aroon-indicator # noqa + + **Defaults Inputs:** CryptoPricing.low, CryptoPricing.high + + Parameters + ---------- + window_length : int > 0 + Length of the lookback window over which to compute the Aroon + indicator. + """ + + inputs = (CryptoPricing.low, CryptoPricing.high) + outputs = ('down', 'up') + + def compute(self, today, assets, out, lows, highs): + wl = self.window_length + high_date_index = nanargmax(highs, axis=0) + low_date_index = nanargmin(lows, axis=0) + evaluate( + '(100 * high_date_index) / (wl - 1)', + local_dict={ + 'high_date_index': high_date_index, + 'wl': wl, + }, + out=out.up, + ) + evaluate( + '(100 * low_date_index) / (wl - 1)', + local_dict={ + 'low_date_index': low_date_index, + 'wl': wl, + }, + out=out.down, + ) + + +class FastStochasticOscillator(CustomFactor): + """ + Fast Stochastic Oscillator Indicator [%K, Momentum Indicator] + https://wiki.timetotrade.eu/Stochastic + + This stochastic is considered volatile, and varies a lot when used in + market analysis. It is recommended to use the slow stochastic oscillator + or a moving average of the %K [%D]. + + **Default Inputs:** :data: `zipline.pipeline.data.CryptoPricing.close` + :data: `zipline.pipeline.data.CryptoPricing.low` + :data: `zipline.pipeline.data.CryptoPricing.high` + + **Default Window Length:** 14 + + Returns + ------- + out: %K oscillator + """ + inputs = (CryptoPricing.close, CryptoPricing.low, CryptoPricing.high) + window_safe = True + window_length = 14 + + def compute(self, today, assets, out, closes, lows, highs): + + highest_highs = nanmax(highs, axis=0) + lowest_lows = nanmin(lows, axis=0) + today_closes = closes[-1] + + evaluate( + '((tc - ll) / (hh - ll)) * 100', + local_dict={ + 'tc': today_closes, + 'll': lowest_lows, + 'hh': highest_highs, + }, + global_dict={}, + out=out, + ) + + +class IchimokuKinkoHyo(CustomFactor): + """Compute the various metrics for the Ichimoku Kinko Hyo (Ichimoku Cloud). + http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:ichimoku_cloud # noqa + + **Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.high` + :data:`zipline.pipeline.data.CryptoPricing.low` + :data:`zipline.pipeline.data.CryptoPricing.close` + **Default Window Length:** 52 + + Parameters + ---------- + window_length : int > 0 + The length the the window for the senkou span b. + tenkan_sen_length : int >= 0, <= window_length + The length of the window for the tenkan-sen. + kijun_sen_length : int >= 0, <= window_length + The length of the window for the kijou-sen. + chikou_span_length : int >= 0, <= window_length + The lag for the chikou span. + """ + + params = { + 'tenkan_sen_length': 9, + 'kijun_sen_length': 26, + 'chikou_span_length': 26, + } + inputs = (CryptoPricing.high, CryptoPricing.low, CryptoPricing.close) + outputs = ( + 'tenkan_sen', + 'kijun_sen', + 'senkou_span_a', + 'senkou_span_b', + 'chikou_span', + ) + window_length = 52 + + def _validate(self): + super(IchimokuKinkoHyo, self)._validate() + for k, v in self.params.items(): + if v > self.window_length: + raise ValueError( + '%s must be <= the window_length: %s > %s' % ( + k, v, self.window_length, + ), + ) + + def compute(self, + today, + assets, + out, + high, + low, + close, + tenkan_sen_length, + kijun_sen_length, + chikou_span_length): + + out.tenkan_sen = tenkan_sen = ( + high[-tenkan_sen_length:].max(axis=0) + + low[-tenkan_sen_length:].min(axis=0) + ) / 2 + out.kijun_sen = kijun_sen = ( + high[-kijun_sen_length:].max(axis=0) + + low[-kijun_sen_length:].min(axis=0) + ) / 2 + out.senkou_span_a = (tenkan_sen + kijun_sen) / 2 + out.senkou_span_b = (high.max(axis=0) + low.min(axis=0)) / 2 + out.chikou_span = close[chikou_span_length] + + +class RateOfChangePercentage(CustomFactor): + """ + Rate of change Percentage + ROC measures the percentage change in price from one period to the next. + The ROC calculation compares the current price with the price `n` + periods ago. + Formula for calculation: ((price - prevPrice) / prevPrice) * 100 + price - the current price + prevPrice - the price n days ago, equals window length + """ + def compute(self, today, assets, out, close): + today_close = close[-1] + prev_close = close[0] + evaluate('((tc - pc) / pc) * 100', + local_dict={ + 'tc': today_close, + 'pc': prev_close + }, + global_dict={}, + out=out, + ) + + +class TrueRange(CustomFactor): + """ + True Range + + A technical indicator originally developed by J. Welles Wilder, Jr. + Indicates the true degree of daily price change in an underlying. + + **Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.high` + :data:`zipline.pipeline.data.CryptoPricing.low` + :data:`zipline.pipeline.data.CryptoPricing.close` + **Default Window Length:** 2 + """ + inputs = ( + CryptoPricing.high, + CryptoPricing.low, + CryptoPricing.close, + ) + window_length = 2 + + def compute(self, today, assets, out, highs, lows, closes): + high_to_low = highs[1:] - lows[1:] + high_to_prev_close = abs(highs[1:] - closes[:-1]) + low_to_prev_close = abs(lows[1:] - closes[:-1]) + out[:] = nanmax( + dstack(( + high_to_low, + high_to_prev_close, + low_to_prev_close, + )), + 2 + ) + + +class MovingAverageConvergenceDivergenceSignal(CustomFactor): + """ + Moving Average Convergence/Divergence (MACD) Signal line + https://en.wikipedia.org/wiki/MACD + + A technical indicator originally developed by Gerald Appel in the late + 1970's. MACD shows the relationship between two moving averages and + reveals changes in the strength, direction, momentum, and duration of a + trend in a stock's price. + + **Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.close` + + Parameters + ---------- + fast_period : int > 0, optional + The window length for the "fast" EWMA. Default is 12. + slow_period : int > 0, > fast_period, optional + The window length for the "slow" EWMA. Default is 26. + signal_period : int > 0, < fast_period, optional + The window length for the signal line. Default is 9. + + Notes + ----- + Unlike most pipeline expressions, this factor does not accept a + ``window_length`` parameter. ``window_length`` is inferred from + ``slow_period`` and ``signal_period``. + """ + inputs = (CryptoPricing.close,) + # We don't use the default form of `params` here because we want to + # dynamically calculate `window_length` from the period lengths in our + # __new__. + params = ('fast_period', 'slow_period', 'signal_period') + + @expect_bounded( + __funcname='MACDSignal', + fast_period=(1, None), # These must all be >= 1. + slow_period=(1, None), + signal_period=(1, None), + ) + def __new__(cls, + fast_period=12, + slow_period=26, + signal_period=9, + *args, + **kwargs): + + if slow_period <= fast_period: + raise ValueError( + "'slow_period' must be greater than 'fast_period', but got\n" + "slow_period={slow}, fast_period={fast}".format( + slow=slow_period, + fast=fast_period, + ) + ) + + return super(MovingAverageConvergenceDivergenceSignal, cls).__new__( + cls, + fast_period=fast_period, + slow_period=slow_period, + signal_period=signal_period, + window_length=slow_period + signal_period - 1, + *args, **kwargs + ) + + def _ewma(self, data, length): + decay_rate = 1.0 - (2.0 / (1.0 + length)) + return average( + data, + axis=1, + weights=exponential_weights(length, decay_rate) + ) + + def compute(self, today, assets, out, close, fast_period, slow_period, + signal_period): + slow_EWMA = self._ewma( + rolling_window(close, slow_period), + slow_period + ) + fast_EWMA = self._ewma( + rolling_window(close, fast_period)[-signal_period:], + fast_period + ) + macd = fast_EWMA - slow_EWMA + out[:] = self._ewma(macd.T, signal_period) + + +class AnnualizedVolatility(CustomFactor): + """ + Volatility. The degree of variation of a series over time as measured by + the standard deviation of daily returns. + https://en.wikipedia.org/wiki/Volatility_(finance) + + **Default Inputs:** :data:`zipline.pipeline.factors.Returns(window_length=2)` # noqa + + Parameters + ---------- + annualization_factor : float, optional + The number of time units per year. Defaults is 252, the number of NYSE + trading days in a normal year. + """ + inputs = [Returns(window_length=2)] + params = {'annualization_factor': 252.0} + window_length = 252 + + def compute(self, today, assets, out, returns, annualization_factor): + out[:] = nanstd(returns, axis=0) * (annualization_factor ** .5) + + +# Convenience aliases. +EWMA = ExponentialWeightedMovingAverage +EWMSTD = ExponentialWeightedMovingStdDev +MACDSignal = MovingAverageConvergenceDivergenceSignal diff --git a/zipline/pipeline/factors/equity/__init__.py b/zipline/pipeline/factors/equity/__init__.py new file mode 100644 index 00000000..aa16cfbe --- /dev/null +++ b/zipline/pipeline/factors/equity/__init__.py @@ -0,0 +1,47 @@ +from .technical import ( + AnnualizedVolatility, + Aroon, + AverageDollarVolume, + BollingerBands, + EWMA, + EWMSTD, + ExponentialWeightedMovingAverage, + ExponentialWeightedMovingStdDev, + FastStochasticOscillator, + IchimokuKinkoHyo, + LinearWeightedMovingAverage, + MACDSignal, + MaxDrawdown, + MovingAverageConvergenceDivergenceSignal, + RateOfChangePercentage, + Returns, + RSI, + SimpleMovingAverage, + TrueRange, + VWAP, + WeightedAverageValue, +) + +__all__ = [ + 'AnnualizedVolatility', + 'Aroon', + 'AverageDollarVolume', + 'BollingerBands', + 'EWMA', + 'EWMSTD', + 'ExponentialWeightedMovingAverage', + 'ExponentialWeightedMovingStdDev', + 'FastStochasticOscillator', + 'IchimokuKinkoHyo', + 'LinearWeightedMovingAverage', + 'MACDSignal', + 'MaxDrawdown', + 'MovingAverageConvergenceDivergenceSignal', + 'RateOfChangePercentage', + 'Returns', + 'RSI', + 'SimpleMovingAverage', + 'TrueRange', + 'VWAP', + 'WeightedAverageValue', +] diff --git a/zipline/pipeline/factors/technical.py b/zipline/pipeline/factors/equity/technical.py similarity index 99% rename from zipline/pipeline/factors/technical.py rename to zipline/pipeline/factors/equity/technical.py index ac81943d..71512f18 100644 --- a/zipline/pipeline/factors/technical.py +++ b/zipline/pipeline/factors/equity/technical.py @@ -41,7 +41,7 @@ from zipline.utils.numpy_utils import ( ignore_nanwarnings, rolling_window, ) -from .factor import CustomFactor +from ..factor import CustomFactor class Returns(CustomFactor): diff --git a/zipline/pipeline/loaders/__init__.py b/zipline/pipeline/loaders/__init__.py index 0e94c6c0..5a7fc731 100644 --- a/zipline/pipeline/loaders/__init__.py +++ b/zipline/pipeline/loaders/__init__.py @@ -1,5 +1,7 @@ from .equity_pricing_loader import USEquityPricingLoader +from .crypto_pricing_loader import CryptoPricingLoader __all__ = [ 'USEquityPricingLoader', + 'CryptoPricingLoader', ] diff --git a/zipline/pipeline/loaders/base.py b/zipline/pipeline/loaders/base.py index 0f5dcedb..da11a05f 100644 --- a/zipline/pipeline/loaders/base.py +++ b/zipline/pipeline/loaders/base.py @@ -19,3 +19,7 @@ class PipelineLoader(with_metaclass(ABCMeta)): @abstractmethod def load_adjusted_array(self, columns, dates, assets, mask): pass + + @abstractmethod + def columns(self): + pass diff --git a/zipline/pipeline/loaders/crypto_pricing_loader.py b/zipline/pipeline/loaders/crypto_pricing_loader.py new file mode 100644 index 00000000..4a019543 --- /dev/null +++ b/zipline/pipeline/loaders/crypto_pricing_loader.py @@ -0,0 +1,136 @@ +# Copyright 2015 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from numpy import ( + iinfo, + uint32, +) + +from zipline.data.us_equity_pricing import ( + BcolzDailyBarReader, + SQLiteAdjustmentReader, +) +from zipline.lib.adjusted_array import AdjustedArray +from zipline.errors import NoFurtherDataError +from zipline.utils.calendars import get_calendar + +from .base import PipelineLoader + +UINT32_MAX = iinfo(uint32).max + + +class CryptoPricingLoader(PipelineLoader): + """ + PipelineLoader for Crypto Pricing data + + Delegates loading of baselines and adjustments. + """ + + def __init__(self, raw_price_loader, dataset): + self.raw_price_loader = raw_price_loader + self._columns = dataset.columns + + cal = get_calendar('NYSE') + + self._all_sessions = cal.all_sessions + + @classmethod + def from_files(cls, pricing_path): + """ + Create a loader from a bcolz equity pricing dir and a SQLite + adjustments path. + + Parameters + ---------- + pricing_path : str + Path to a bcolz directory written by a BcolzDailyBarWriter. + """ + return cls( + BcolzDailyBarReader(pricing_path), + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # load_adjusted_array is called with dates on which the user's algo + # will be shown data, which means we need to return the data that would + # be known at the start of each date. We assume that the latest data + # known on day N is the data from day (N - 1), so we shift all query + # dates back by a day. + start_date, end_date = _shift_dates( + self._all_sessions, dates[0], dates[-1], shift=1, + ) + colnames = [c.name for c in columns] + raw_arrays = self.raw_price_loader.load_raw_arrays( + colnames, + start_date, + end_date, + assets, + ) + + out = {} + for c, c_raw in zip(columns, raw_arrays): + out[c] = AdjustedArray( + c_raw.astype(c.dtype), + mask, + {}, + c.missing_value, + ) + return out + + @property + def columns(self): + return self._columns + + +def _shift_dates(dates, start_date, end_date, shift): + try: + start = dates.get_loc(start_date) + except KeyError: + if start_date < dates[0]: + raise NoFurtherDataError( + msg=( + "Pipeline Query requested data starting on {query_start}, " + "but first known date is {calendar_start}" + ).format( + query_start=str(start_date), + calendar_start=str(dates[0]), + ) + ) + else: + raise ValueError("Query start %s not in calendar" % start_date) + + # Make sure that shifting doesn't push us out of the calendar. + if start < shift: + raise NoFurtherDataError( + msg=( + "Pipeline Query requested data from {shift}" + " days before {query_start}, but first known date is only " + "{start} days earlier." + ).format(shift=shift, query_start=start_date, start=start), + ) + + try: + end = dates.get_loc(end_date) + except KeyError: + if end_date > dates[-1]: + raise NoFurtherDataError( + msg=( + "Pipeline Query requesting data up to {query_end}, " + "but last known date is {calendar_end}" + ).format( + query_end=end_date, + calendar_end=dates[-1], + ) + ) + else: + raise ValueError("Query end %s not in calendar" % end_date) + return dates[start - shift], dates[end - shift] diff --git a/zipline/pipeline/loaders/equity_pricing_loader.py b/zipline/pipeline/loaders/equity_pricing_loader.py index 813036c5..ffafd021 100644 --- a/zipline/pipeline/loaders/equity_pricing_loader.py +++ b/zipline/pipeline/loaders/equity_pricing_loader.py @@ -36,9 +36,10 @@ class USEquityPricingLoader(PipelineLoader): Delegates loading of baselines and adjustments. """ - def __init__(self, raw_price_loader, adjustments_loader): + def __init__(self, raw_price_loader, adjustments_loader, dataset): self.raw_price_loader = raw_price_loader self.adjustments_loader = adjustments_loader + self._columns = dataset.columns cal = self.raw_price_loader.trading_calendar or \ get_calendar("NYSE") @@ -94,6 +95,10 @@ class USEquityPricingLoader(PipelineLoader): c.missing_value, ) return out + + @property + def columns(self): + return self._columns def _shift_dates(dates, start_date, end_date, shift): diff --git a/zipline/utils/calendars/calendar_utils.py b/zipline/utils/calendars/calendar_utils.py index 0a0b03b2..fcc714d0 100644 --- a/zipline/utils/calendars/calendar_utils.py +++ b/zipline/utils/calendars/calendar_utils.py @@ -3,6 +3,7 @@ from zipline.errors import ( CyclicCalendarAlias, InvalidCalendarName, ) +from zipline.utils.calendars.exchange_calendar_open import OpenExchangeCalendar from zipline.utils.calendars.exchange_calendar_cfe import CFEExchangeCalendar from zipline.utils.calendars.exchange_calendar_ice import ICEExchangeCalendar from zipline.utils.calendars.exchange_calendar_nyse import NYSEExchangeCalendar @@ -15,6 +16,7 @@ from zipline.utils.calendars.us_futures_calendar import ( ) _default_calendar_factories = { + 'OPEN': OpenExchangeCalendar, 'NYSE': NYSEExchangeCalendar, 'CME': CMEExchangeCalendar, 'ICE': ICEExchangeCalendar, @@ -25,6 +27,7 @@ _default_calendar_factories = { 'us_futures': QuantopianUSFuturesCalendar, } _default_calendar_aliases = { + 'CATX': 'OPEN', 'NASDAQ': 'NYSE', 'BATS': 'NYSE', 'CBOT': 'CME', @@ -72,6 +75,8 @@ class TradingCalendarDispatcher(object): """ canonical_name = self.resolve_alias(name) + print 'get_calendar(\'{name}\')'.format(name=canonical_name) + try: return self._calendars[canonical_name] except KeyError: diff --git a/zipline/utils/calendars/exchange_calendar_open.py b/zipline/utils/calendars/exchange_calendar_open.py new file mode 100644 index 00000000..2a48584a --- /dev/null +++ b/zipline/utils/calendars/exchange_calendar_open.py @@ -0,0 +1,29 @@ +from datetime import time +from pytz import timezone + +from .trading_calendar import TradingCalendar + +from zipline.utils.memoize import lazyval + + +class OpenExchangeCalendar(TradingCalendar): + @property + def name(self): + return 'OPEN' + + @property + def tz(self): + return timezone('US/Eastern') + + + @property + def open_time(self): + return time(0) + + @property + def close_time(self): + return time(23, 59) + + @lazyval + def day(self): + return 'D' diff --git a/zipline/utils/events.py b/zipline/utils/events.py index 878f1b00..e22e2006 100644 --- a/zipline/utils/events.py +++ b/zipline/utils/events.py @@ -606,8 +606,9 @@ class time_rules(object): class calendars(object): - US_EQUITIES = sentinel('US_EQUITIES') - US_FUTURES = sentinel('US_FUTURES') + CRYPTO_ASSETS = sentinel('CRYPTO_ASSETS') + US_EQUITIES = sentinel('US_EQUITIES') + US_FUTURES = sentinel('US_FUTURES') def make_eventrule(date_rule, time_rule, cal, half_days=True): diff --git a/zipline/utils/run_algo.py b/zipline/utils/run_algo.py index 639707fa..d100fd88 100644 --- a/zipline/utils/run_algo.py +++ b/zipline/utils/run_algo.py @@ -18,8 +18,8 @@ from zipline.algorithm import TradingAlgorithm from zipline.data.bundles.core import load from zipline.data.data_portal import DataPortal from zipline.finance.trading import TradingEnvironment -from zipline.pipeline.data import USEquityPricing -from zipline.pipeline.loaders import USEquityPricingLoader +from zipline.pipeline.data import USEquityPricing, CryptoPricing +from zipline.pipeline.loaders import USEquityPricingLoader, CryptoPricingLoader from zipline.utils.calendars import get_calendar from zipline.utils.factory import create_simulation_parameters import zipline.utils.paths as pth @@ -113,44 +113,85 @@ def _run(handle_data, click.echo(algotext) if bundle is not None: - bundle_data = load( - bundle, - environ, - bundle_timestamp, - ) + bundles = bundle.split(',') + print 'bundles: {0}'.format(bundles) - prefix, connstr = re.split( - r'sqlite:///', - str(bundle_data.asset_finder.engine.url), - maxsplit=1, - ) - if prefix: - raise ValueError( - "invalid url %r, must begin with 'sqlite:///'" % - str(bundle_data.asset_finder.engine.url), + def get_trading_env_and_data(bundles): + env = data = None + + b = 'catalyst' + if len(bundles) == 0: + return env, data + elif len(bundles) == 1: + b = bundles[0] + + bundle_data = load( + b, + environ, + bundle_timestamp, + ) + + prefix, connstr = re.split( + r'sqlite:///', + str(bundle_data.asset_finder.engine.url), + maxsplit=1, + ) + if prefix: + raise ValueError( + "invalid url %r, must begin with 'sqlite:///'" % + str(bundle_data.asset_finder.engine.url), + ) + env = TradingEnvironment(asset_db_path=connstr, environ=environ) + first_trading_day =\ + bundle_data.equity_minute_bar_reader.first_trading_day + data = DataPortal( + env.asset_finder, + get_calendar('NYSE'), + first_trading_day=first_trading_day, + equity_minute_reader=bundle_data.equity_minute_bar_reader, + equity_daily_reader=bundle_data.equity_daily_bar_reader, + adjustment_reader=bundle_data.adjustment_reader, ) - env = TradingEnvironment(asset_db_path=connstr, environ=environ) - first_trading_day =\ - bundle_data.equity_minute_bar_reader.first_trading_day - data = DataPortal( - env.asset_finder, get_calendar("NYSE"), - first_trading_day=first_trading_day, - equity_minute_reader=bundle_data.equity_minute_bar_reader, - equity_daily_reader=bundle_data.equity_daily_bar_reader, - adjustment_reader=bundle_data.adjustment_reader, - ) - pipeline_loader = USEquityPricingLoader( - bundle_data.equity_daily_bar_reader, - bundle_data.adjustment_reader, - ) + return env, data - def choose_loader(column): - if column in USEquityPricing.columns: - return pipeline_loader + def get_loader_for_bundle(b): + bundle_data = load( + b, + environ, + bundle_timestamp, + ) + + if b == 'catalyst': + print 'creating crypto pricing loader...' + return CryptoPricingLoader( + bundle_data.equity_daily_bar_reader, + CryptoPricing, + ) + elif b == 'quantopian-quandl': + return USEquityPricingLoader( + bundle_data.equity_daily_bar_reader, + bundle_data.adjustment_reader, + USEquityPricing, + ) raise ValueError( "No PipelineLoader registered for column %s." % column ) + + + loaders = [get_loader_for_bundle(b) for b in bundles] + env, data = get_trading_env_and_data(bundles) + + def choose_loader(column): + print 'finding pricing loader...' + for loader in loaders: + if column in loader.columns: + return loader + raise ValueError( + "No PipelineLoader registered for column %s." % column + ) + + print 'env: {0}'.format(env) else: env = TradingEnvironment(environ=environ) choose_loader = None