Merge pull request #1588 from quantopian/randc-built-in-factors

ENH: Add MACD, MA, and AnnVol as built in factors
This commit is contained in:
Scott Sanderson
2016-11-28 16:18:58 -05:00
committed by GitHub
3 changed files with 335 additions and 21 deletions
+180 -1
View File
@@ -5,6 +5,7 @@ from six.moves import range
import numpy as np
import pandas as pd
import talib
from numpy.random import RandomState
from zipline.lib.adjusted_array import AdjustedArray
from zipline.pipeline.data import USEquityPricing
@@ -16,11 +17,12 @@ from zipline.pipeline.factors import (
LinearWeightedMovingAverage,
RateOfChangePercentage,
TrueRange,
MovingAverageConvergenceDivergenceSignal,
AnnualizedVolatility,
)
from zipline.testing import parameter_space
from zipline.testing.fixtures import ZiplineTestCase
from zipline.testing.predicates import assert_equal
from .base import BasePipelineTestCase
@@ -403,3 +405,180 @@ class TestTrueRange(ZiplineTestCase):
tr.compute(today, assets, out, highs, lows, closes)
assert_equal(out, np.full((3,), 2.))
class MovingAverageConvergenceDivergenceTestCase(ZiplineTestCase):
def expected_ewma(self, data_df, window):
# Comment copied from `test_engine.py`:
# XXX: This is a comically inefficient way to compute a windowed EWMA.
# Don't use it outside of testing. We're using rolling-apply of an
# ewma (which is itself a rolling-window function) because we only want
# to look at ``window_length`` rows at a time.
return data_df.rolling(window).apply(
lambda sub: pd.DataFrame(sub)
.ewm(span=window)
.mean()
.values[-1])
@parameter_space(seed=range(5))
def test_MACD_window_length_generation(self, seed):
rng = RandomState(seed)
signal_period = rng.randint(1, 90)
fast_period = rng.randint(signal_period + 1, signal_period + 100)
slow_period = rng.randint(fast_period + 1, fast_period + 100)
ewma = MovingAverageConvergenceDivergenceSignal(
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
)
assert_equal(
ewma.window_length,
slow_period + signal_period - 1,
)
def test_bad_inputs(self):
template = (
"MACDSignal() expected a value greater than or equal to 1"
" for argument %r, but got 0 instead."
)
with self.assertRaises(ValueError) as e:
MovingAverageConvergenceDivergenceSignal(fast_period=0)
self.assertEqual(template % 'fast_period', str(e.exception))
with self.assertRaises(ValueError) as e:
MovingAverageConvergenceDivergenceSignal(slow_period=0)
self.assertEqual(template % 'slow_period', str(e.exception))
with self.assertRaises(ValueError) as e:
MovingAverageConvergenceDivergenceSignal(signal_period=0)
self.assertEqual(template % 'signal_period', str(e.exception))
with self.assertRaises(ValueError) as e:
MovingAverageConvergenceDivergenceSignal(
fast_period=5,
slow_period=4,
)
expected = (
"'slow_period' must be greater than 'fast_period', but got\n"
"slow_period=4, fast_period=5"
)
self.assertEqual(expected, str(e.exception))
@parameter_space(
seed=range(2),
fast_period=[3, 5],
slow_period=[8, 10],
signal_period=[3, 9],
__fail_fast=True,
)
def test_moving_average_convergence_divergence(self,
seed,
fast_period,
slow_period,
signal_period):
rng = RandomState(seed)
nassets = 3
macd = MovingAverageConvergenceDivergenceSignal(
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
)
today = pd.Timestamp('2016', tz='utc')
assets = pd.Index(np.arange(nassets))
out = np.empty(shape=(nassets,), dtype=np.float64)
close = rng.rand(macd.window_length, nassets)
macd.compute(
today,
assets,
out,
close,
fast_period,
slow_period,
signal_period,
)
close_df = pd.DataFrame(close)
fast_ewma = self.expected_ewma(
close_df,
fast_period,
)
slow_ewma = self.expected_ewma(
close_df,
slow_period,
)
signal_ewma = self.expected_ewma(
fast_ewma - slow_ewma,
signal_period
)
# Everything but the last row should be NaN.
self.assertTrue(signal_ewma.iloc[:-1].isnull().all().all())
# We're testing a single compute call, which we expect to be equivalent
# to the last row of the frame we calculated with pandas.
expected_signal = signal_ewma.values[-1]
np.testing.assert_almost_equal(
out,
expected_signal,
decimal=8
)
class AnnualizedVolatilityTestCase(ZiplineTestCase):
"""
Test Annualized Volatility
"""
def test_simple_volatility(self):
"""
Simple test for uniform returns should generate 0 volatility
"""
nassets = 3
ann_vol = AnnualizedVolatility()
today = pd.Timestamp('2016', tz='utc')
assets = np.arange(nassets, dtype=np.float64)
returns = np.full((ann_vol.window_length, nassets),
0.004,
dtype=np.float64)
out = np.empty(shape=(nassets,), dtype=np.float64)
ann_vol.compute(today, assets, out, returns, 252)
expected_vol = np.zeros(nassets)
np.testing.assert_almost_equal(
out,
expected_vol,
decimal=8
)
def test_volatility(self):
"""
Check volatility results against values calculated manually
"""
nassets = 3
ann_vol = AnnualizedVolatility()
today = pd.Timestamp('2016', tz='utc')
assets = np.arange(nassets, dtype=np.float64)
returns = np.random.normal(loc=0.001,
scale=0.01,
size=(ann_vol.window_length, nassets))
out = np.empty(shape=(nassets,), dtype=np.float64)
ann_vol.compute(today, assets, out, returns, 252)
mean = np.mean(returns, axis=0)
annualized_variance = ((returns - mean) ** 2).sum(axis=0) / \
returns.shape[0] * 252
expected_vol = np.sqrt(annualized_variance)
np.testing.assert_almost_equal(
out,
expected_vol,
decimal=8
)
+6
View File
@@ -14,6 +14,7 @@ from .statistical import (
RollingSpearmanOfReturns,
)
from .technical import (
AnnualizedVolatility,
Aroon,
AverageDollarVolume,
BollingerBands,
@@ -24,7 +25,9 @@ from .technical import (
FastStochasticOscillator,
IchimokuKinkoHyo,
LinearWeightedMovingAverage,
MACDSignal,
MaxDrawdown,
MovingAverageConvergenceDivergenceSignal,
RateOfChangePercentage,
Returns,
RSI,
@@ -35,6 +38,7 @@ from .technical import (
)
__all__ = [
'AnnualizedVolatility',
'Aroon',
'AverageDollarVolume',
'BollingerBands',
@@ -50,7 +54,9 @@ __all__ = [
'IchimokuKinkoHyo',
'Latest',
'LinearWeightedMovingAverage',
'MACDSignal',
'MaxDrawdown',
'MovingAverageConvergenceDivergenceSignal',
'RateOfChangePercentage',
'RecarrayField',
'Returns',
+149 -20
View File
@@ -26,8 +26,7 @@ from numexpr import evaluate
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.mixins import SingleInputMixin
from zipline.utils.numpy_utils import ignore_nanwarnings
from zipline.utils.input_validation import expect_types
from zipline.utils.input_validation import expect_bounded, expect_types
from zipline.utils.math_utils import (
nanargmax,
nanargmin,
@@ -37,6 +36,11 @@ from zipline.utils.math_utils import (
nansum,
nanmin,
)
from zipline.utils.numpy_utils import (
float64_dtype,
ignore_nanwarnings,
rolling_window,
)
from .factor import CustomFactor
@@ -160,6 +164,28 @@ class AverageDollarVolume(CustomFactor):
out[:] = nansum(close * volume, axis=0) / len(close)
def exponential_weights(length, decay_rate):
"""
Build a weight vector for an exponentially-weighted statistic.
The resulting ndarray is of the form::
[decay_rate ** length, ..., decay_rate ** 2, decay_rate]
Parameters
----------
length : int
The length of the desired weight vector.
decay_rate : float
The rate at which entries in the weight vector increase or decrease.
Returns
-------
weights : ndarray[float64]
"""
return full(length, decay_rate, float64_dtype) ** arange(length + 1, 1, -1)
class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor):
"""
Base class for factors implementing exponential-weighted operations.
@@ -191,14 +217,6 @@ class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor):
"""
params = ('decay_rate',)
@staticmethod
def weights(length, decay_rate):
"""
Return weighting vector for an exponential moving statistic on `length`
rows with a decay rate of `decay_rate`.
"""
return full(length, decay_rate, float) ** arange(length + 1, 1, -1)
@classmethod
@expect_types(span=Number)
def from_span(cls, inputs, window_length, span, **kwargs):
@@ -368,7 +386,7 @@ class ExponentialWeightedMovingAverage(_ExponentialWeightedFactor):
out[:] = average(
data,
axis=0,
weights=self.weights(len(data), decay_rate),
weights=exponential_weights(len(data), decay_rate),
)
@@ -386,13 +404,13 @@ class LinearWeightedMovingAverage(CustomFactor, SingleInputMixin):
ctx = ignore_nanwarnings()
def compute(self, today, assets, out, data):
num_days = data.shape[0]
ndays = data.shape[0]
# Initialize weights array
weights = arange(1, num_days + 1, dtype=float).reshape(num_days, 1)
weights = arange(1, ndays + 1, dtype=float64_dtype).reshape(ndays, 1)
# Compute normalizer
normalizer = (num_days * (num_days + 1)) / 2
normalizer = (ndays * (ndays + 1)) / 2
# Weight the data
weighted_data = data * weights
@@ -433,7 +451,7 @@ class ExponentialWeightedMovingStdDev(_ExponentialWeightedFactor):
"""
def compute(self, today, assets, out, data, decay_rate):
weights = self.weights(len(data), decay_rate)
weights = exponential_weights(len(data), decay_rate)
mean = average(data, axis=0, weights=weights)
variance = average((data - mean) ** 2, axis=0, weights=weights)
@@ -445,11 +463,6 @@ class ExponentialWeightedMovingStdDev(_ExponentialWeightedFactor):
out[:] = sqrt(variance * bias_correction)
# Convenience aliases.
EWMA = ExponentialWeightedMovingAverage
EWMSTD = ExponentialWeightedMovingStdDev
class BollingerBands(CustomFactor):
"""
Bollinger Bands technical indicator.
@@ -683,3 +696,119 @@ class TrueRange(CustomFactor):
)),
2
)
class MovingAverageConvergenceDivergenceSignal(CustomFactor):
"""
Moving Average Convergence/Divergence (MACD) Signal line
https://en.wikipedia.org/wiki/MACD
A technical indicator originally developed by Gerald Appel in the late
1970's. MACD shows the relationship between two moving averages and
reveals changes in the strength, direction, momentum, and duration of a
trend in a stock's price.
**Default Inputs:** :data:`zipline.pipeline.data.USEquityPricing.close`
Parameters
----------
fast_period : int > 0, optional
The window length for the "fast" EWMA. Default is 12.
slow_period : int > 0, > fast_period, optional
The window length for the "slow" EWMA. Default is 26.
signal_period' : int > 0, < fast_period, optional
The window length for the signal line. Default is 9.
Notes
-----
Unlike most pipeline expressions, this factor does not accept a
``window_length`` parameter. ``window_length`` is inferred from
``slow_period`` and ``signal_period``.
"""
inputs = (USEquityPricing.close,)
# We don't use the default form of `params` here because we want to
# dynamically calculate `window_length` from the period lengths in our
# __new__.
params = ('fast_period', 'slow_period', 'signal_period')
@expect_bounded(
__funcname='MACDSignal',
fast_period=(1, None), # These must all be >= 1.
slow_period=(1, None),
signal_period=(1, None),
)
def __new__(cls,
fast_period=12,
slow_period=26,
signal_period=9,
*args,
**kwargs):
if slow_period <= fast_period:
raise ValueError(
"'slow_period' must be greater than 'fast_period', but got\n"
"slow_period={slow}, fast_period={fast}".format(
slow=slow_period,
fast=fast_period,
)
)
return super(MovingAverageConvergenceDivergenceSignal, cls).__new__(
cls,
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
window_length=slow_period + signal_period - 1,
*args, **kwargs
)
def _ewma(self, data, length):
decay_rate = 1.0 - (2.0 / (1.0 + length))
return average(
data,
axis=1,
weights=exponential_weights(length, decay_rate)
)
def compute(self, today, assets, out, close, fast_period, slow_period,
signal_period):
slow_EWMA = self._ewma(
rolling_window(close, slow_period),
slow_period
)
fast_EWMA = self._ewma(
rolling_window(close, fast_period)[-signal_period:],
fast_period
)
macd = fast_EWMA - slow_EWMA
out[:] = self._ewma(macd.T, signal_period)
class AnnualizedVolatility(CustomFactor):
"""
Volatility
https://en.wikipedia.org/wiki/Volatility_(finance)
The degree of variation of a series over time as measured by the standard
deviation of daily returns.
**Default Inputs:**
:data:`zipline.pipeline.factors.Returns(window_length=2)`
Parameters
----------
annualization_factor : float, optional
The number of time units per year. Defaults is 252, the number of NYSE
trading days in a normal year.
"""
inputs = [Returns(window_length=2)]
params = {'annualization_factor': 252.0}
window_length = 252
def compute(self, today, assets, out, returns, annualization_factor):
out[:] = nanstd(returns, axis=0) * (annualization_factor ** .5)
# Convenience aliases.
EWMA = ExponentialWeightedMovingAverage
EWMSTD = ExponentialWeightedMovingStdDev
MACDSignal = MovingAverageConvergenceDivergenceSignal