Crypto integration + multiple pipeline bundles

This commit is contained in:
Conner Fromknecht
2017-06-18 19:13:30 -07:00
parent 0840ea1bef
commit be2fe35583
22 changed files with 1250 additions and 94 deletions
+3
View File
@@ -71,4 +71,7 @@ zipline.iml
# PyCharm custom settings
.idea
# Pickle files
*.pickle
TAGS
+4 -2
View File
@@ -289,7 +289,7 @@ class TradingAlgorithm(object):
# If a schedule has been provided, pop it. Otherwise, use NYSE.
self.trading_calendar = kwargs.pop(
'trading_calendar',
get_calendar("NYSE")
get_calendar('NYSE')
)
self.sim_params = kwargs.pop('sim_params', None)
@@ -1114,6 +1114,8 @@ class TradingAlgorithm(object):
# TradingSchedule class, so this is unlikely to be hit
if calendar is None:
cal = self.trading_calendar
elif calendar is calendars.CRYPTO_ASSETS:
cal = get_environment('OPEN')
elif calendar is calendars.US_EQUITIES:
cal = get_calendar('NYSE')
elif calendar is calendars.US_FUTURES:
@@ -1122,7 +1124,7 @@ class TradingAlgorithm(object):
raise ScheduleFunctionInvalidCalendar(
given_calendar=calendar,
allowed_calendars=(
'[calendars.US_EQUITIES, calendars.US_FUTURES]'
'[calendars.CRYPTO_ASSETS, calendars.US_EQUITIES, calendars.US_FUTURES]'
),
)
+2
View File
@@ -1,5 +1,6 @@
# These imports are necessary to force module-scope register calls to happen.
from . import quandl # noqa
from . import catalyst
from .core import (
UnknownBundle,
bundles,
@@ -15,6 +16,7 @@ from .core import (
from .yahoo import yahoo_equities
__all__ = [
'UnknownBundle',
'bundles',
+44
View File
@@ -0,0 +1,44 @@
import tarfile
from .quandl import (
ONE_MEGABYTE,
download_with_progress,
download_without_progress,
)
from . import core as bundles
CATALYST_URL = (
'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl'
)
@bundles.register(
'catalyst',
calendar_name='NYSE',
minutes_per_day=390,
create_writers=False,
)
def catalyst_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
if show_progress:
data = download_with_progress(
CATALYST_URL,
chunk_size=ONE_MEGABYTE,
label="Downloading Bundle: catalyst",
)
else:
data = download_without_progress(CATALYST_URL)
with tarfile.open('r', fileobj=data) as tar:
if show_progress:
print("Writing data to %s." % output_dir)
tar.extractall(output_dir)
+2
View File
@@ -154,6 +154,8 @@ class DataPortal(object):
minute_history_prefetch_length=_DEF_M_HIST_PREFETCH,
daily_history_prefetch_length=_DEF_D_HIST_PREFETCH):
print 'trading_calendar: {0}'.format(trading_calendar)
self.trading_calendar = trading_calendar
self.asset_finder = asset_finder
+7 -4
View File
@@ -12,15 +12,18 @@ from zipline.api import (
schedule_function,
)
from zipline.pipeline import Pipeline
from zipline.pipeline.factors import RSI
from zipline.pipeline.factors.crypto import RSI as cRSI
from zipline.pipeline.factors.equity import RSI as eRSI
def make_pipeline():
rsi = RSI()
crsi = cRSI()
ersi = eRSI()
return Pipeline(
columns={
'longs': rsi.top(3),
'shorts': rsi.bottom(3),
'longs': crsi.top(3),
'shorts': crsi.bottom(3),
'equity': ersi.top(3),
},
)
+2
View File
@@ -1,4 +1,5 @@
from .equity_pricing import USEquityPricing
from .crypto_pricing import CryptoPricing
from .dataset import DataSet, Column, BoundColumn
__all__ = [
@@ -6,4 +7,5 @@ __all__ = [
'Column',
'DataSet',
'USEquityPricing',
'CryptoPricing',
]
+15
View File
@@ -0,0 +1,15 @@
from zipline.utils.numpy_utils import float64_dtype
from .dataset import Column, DataSet
class CryptoPricing(DataSet):
"""
Dataset representing daily trading prices and volumes of crypto-assets.
"""
open = Column(float64_dtype)
high = Column(float64_dtype)
low = Column(float64_dtype)
close = Column(float64_dtype)
volume = Column(float64_dtype)
+5 -7
View File
@@ -1,6 +1,4 @@
"""
Dataset representing OHLCV data.
"""
from zipline.utils.numpy_utils import float64_dtype
from .dataset import Column, DataSet
@@ -10,8 +8,8 @@ class USEquityPricing(DataSet):
"""
Dataset representing daily trading prices and volumes.
"""
open = Column(float64_dtype)
high = Column(float64_dtype)
low = Column(float64_dtype)
close = Column(float64_dtype)
open = Column(float64_dtype)
high = Column(float64_dtype)
low = Column(float64_dtype)
close = Column(float64_dtype)
volume = Column(float64_dtype)
-44
View File
@@ -13,59 +13,15 @@ from .statistical import (
RollingPearsonOfReturns,
RollingSpearmanOfReturns,
)
from .technical import (
AnnualizedVolatility,
Aroon,
AverageDollarVolume,
BollingerBands,
EWMA,
EWMSTD,
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
FastStochasticOscillator,
IchimokuKinkoHyo,
LinearWeightedMovingAverage,
MACDSignal,
MaxDrawdown,
MovingAverageConvergenceDivergenceSignal,
RateOfChangePercentage,
Returns,
RSI,
SimpleMovingAverage,
TrueRange,
VWAP,
WeightedAverageValue,
)
__all__ = [
'AnnualizedVolatility',
'Aroon',
'AverageDollarVolume',
'BollingerBands',
'BusinessDaysSincePreviousEvent',
'BusinessDaysUntilNextEvent',
'CustomFactor',
'EWMA',
'EWMSTD',
'ExponentialWeightedMovingAverage',
'ExponentialWeightedMovingStdDev',
'Factor',
'FastStochasticOscillator',
'IchimokuKinkoHyo',
'Latest',
'LinearWeightedMovingAverage',
'MACDSignal',
'MaxDrawdown',
'MovingAverageConvergenceDivergenceSignal',
'RateOfChangePercentage',
'RecarrayField',
'Returns',
'RollingLinearRegressionOfReturns',
'RollingPearsonOfReturns',
'RollingSpearmanOfReturns',
'RSI',
'SimpleMovingAverage',
'TrueRange',
'VWAP',
'WeightedAverageValue',
]
@@ -0,0 +1,47 @@
from .technical import (
AnnualizedVolatility,
Aroon,
AverageDollarVolume,
BollingerBands,
EWMA,
EWMSTD,
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
FastStochasticOscillator,
IchimokuKinkoHyo,
LinearWeightedMovingAverage,
MACDSignal,
MaxDrawdown,
MovingAverageConvergenceDivergenceSignal,
RateOfChangePercentage,
Returns,
RSI,
SimpleMovingAverage,
TrueRange,
VWAP,
WeightedAverageValue,
)
__all__ = [
'AnnualizedVolatility',
'Aroon',
'AverageDollarVolume',
'BollingerBands',
'EWMA',
'EWMSTD',
'ExponentialWeightedMovingAverage',
'ExponentialWeightedMovingStdDev',
'FastStochasticOscillator',
'IchimokuKinkoHyo',
'LinearWeightedMovingAverage',
'MACDSignal',
'MaxDrawdown',
'MovingAverageConvergenceDivergenceSignal',
'RateOfChangePercentage',
'Returns',
'RSI',
'SimpleMovingAverage',
'TrueRange',
'VWAP',
'WeightedAverageValue',
]
@@ -0,0 +1,812 @@
"""
Technical Analysis Factors
--------------------------
"""
from __future__ import division
from numbers import Number
from numpy import (
abs,
arange,
average,
clip,
diff,
dstack,
exp,
fmax,
full,
inf,
isnan,
log,
NINF,
sqrt,
sum as np_sum,
)
from numexpr import evaluate
from zipline.pipeline.data import CryptoPricing
from zipline.pipeline.mixins import SingleInputMixin
from zipline.utils.input_validation import expect_bounded, expect_types
from zipline.utils.math_utils import (
nanargmax,
nanargmin,
nanmax,
nanmean,
nanstd,
nansum,
nanmin,
)
from zipline.utils.numpy_utils import (
float64_dtype,
ignore_nanwarnings,
rolling_window,
)
from ..factor import CustomFactor
class Returns(CustomFactor):
"""
Calculates the percent change in close price over the given window_length.
**Default Inputs**: [CryptoPricing.close]
"""
inputs = [CryptoPricing.close]
window_safe = True
def _validate(self):
super(Returns, self)._validate()
if self.window_length < 2:
raise ValueError(
"'Returns' expected a window length of at least 2, but was "
"given {window_length}. For daily returns, use a window "
"length of 2.".format(window_length=self.window_length)
)
def compute(self, today, assets, out, close):
out[:] = (close[-1] - close[0]) / close[0]
class RSI(CustomFactor, SingleInputMixin):
"""
Relative Strength Index
**Default Inputs**: [CryptoPricing.close]
**Default Window Length**: 15
"""
window_length = 15
inputs = (CryptoPricing.close,)
def compute(self, today, assets, out, closes):
diffs = diff(closes, axis=0)
ups = nanmean(clip(diffs, 0, inf), axis=0)
downs = abs(nanmean(clip(diffs, -inf, 0), axis=0))
return evaluate(
"100 - (100 / (1 + (ups / downs)))",
local_dict={'ups': ups, 'downs': downs},
global_dict={},
out=out,
)
class SimpleMovingAverage(CustomFactor, SingleInputMixin):
"""
Average Value of an arbitrary column
**Default Inputs**: None
**Default Window Length**: None
"""
# numpy's nan functions throw warnings when passed an array containing only
# nans, but they still returns the desired value (nan), so we ignore the
# warning.
ctx = ignore_nanwarnings()
def compute(self, today, assets, out, data):
out[:] = nanmean(data, axis=0)
class WeightedAverageValue(CustomFactor):
"""
Helper for VWAP-like computations.
**Default Inputs:** None
**Default Window Length:** None
"""
def compute(self, today, assets, out, base, weight):
out[:] = nansum(base * weight, axis=0) / nansum(weight, axis=0)
class VWAP(WeightedAverageValue):
"""
Volume Weighted Average Price
**Default Inputs:** [CryptoPricing.close, CryptoPricing.volume]
**Default Window Length:** None
"""
inputs = (CryptoPricing.close, CryptoPricing.volume)
class MaxDrawdown(CustomFactor, SingleInputMixin):
"""
Max Drawdown
**Default Inputs:** None
**Default Window Length:** None
"""
ctx = ignore_nanwarnings()
def compute(self, today, assets, out, data):
drawdowns = fmax.accumulate(data, axis=0) - data
drawdowns[isnan(drawdowns)] = NINF
drawdown_ends = nanargmax(drawdowns, axis=0)
# TODO: Accelerate this loop in Cython or Numba.
for i, end in enumerate(drawdown_ends):
peak = nanmax(data[:end + 1, i])
out[i] = (peak - data[end, i]) / data[end, i]
class AverageDollarVolume(CustomFactor):
"""
Average Daily Dollar Volume
**Default Inputs:** [CryptoPricing.close, CryptoPricing.volume]
**Default Window Length:** None
"""
inputs = [CryptoPricing.close, CryptoPricing.volume]
def compute(self, today, assets, out, close, volume):
out[:] = nansum(close * volume, axis=0) / len(close)
def exponential_weights(length, decay_rate):
"""
Build a weight vector for an exponentially-weighted statistic.
The resulting ndarray is of the form::
[decay_rate ** length, ..., decay_rate ** 2, decay_rate]
Parameters
----------
length : int
The length of the desired weight vector.
decay_rate : float
The rate at which entries in the weight vector increase or decrease.
Returns
-------
weights : ndarray[float64]
"""
return full(length, decay_rate, float64_dtype) ** arange(length + 1, 1, -1)
class _ExponentialWeightedFactor(SingleInputMixin, CustomFactor):
"""
Base class for factors implementing exponential-weighted operations.
**Default Inputs:** None
**Default Window Length:** None
Parameters
----------
inputs : length-1 list or tuple of BoundColumn
The expression over which to compute the average.
window_length : int > 0
Length of the lookback window over which to compute the average.
decay_rate : float, 0 < decay_rate <= 1
Weighting factor by which to discount past observations.
When calculating historical averages, rows are multiplied by the
sequence::
decay_rate, decay_rate ** 2, decay_rate ** 3, ...
Methods
-------
weights
from_span
from_halflife
from_center_of_mass
"""
params = ('decay_rate',)
@classmethod
@expect_types(span=Number)
def from_span(cls, inputs, window_length, span, **kwargs):
"""
Convenience constructor for passing `decay_rate` in terms of `span`.
Forwards `decay_rate` as `1 - (2.0 / (1 + span))`. This provides the
behavior equivalent to passing `span` to pandas.ewma.
Examples
--------
.. code-block:: python
# Equivalent to:
# my_ewma = EWMA(
# inputs=[CryptoPricing.close],
# window_length=30,
# decay_rate=(1 - (2.0 / (1 + 15.0))),
# )
my_ewma = EWMA.from_span(
inputs=[CryptoPricing.close],
window_length=30,
span=15,
)
Notes
-----
This classmethod is provided by both
:class:`ExponentialWeightedMovingAverage` and
:class:`ExponentialWeightedMovingStdDev`.
"""
if span <= 1:
raise ValueError(
"`span` must be a positive number. %s was passed." % span
)
decay_rate = (1.0 - (2.0 / (1.0 + span)))
assert 0.0 < decay_rate <= 1.0
return cls(
inputs=inputs,
window_length=window_length,
decay_rate=decay_rate,
**kwargs
)
@classmethod
@expect_types(halflife=Number)
def from_halflife(cls, inputs, window_length, halflife, **kwargs):
"""
Convenience constructor for passing ``decay_rate`` in terms of half
life.
Forwards ``decay_rate`` as ``exp(log(.5) / halflife)``. This provides
the behavior equivalent to passing `halflife` to pandas.ewma.
Examples
--------
.. code-block:: python
# Equivalent to:
# my_ewma = EWMA(
# inputs=[CryptoPricing.close],
# window_length=30,
# decay_rate=np.exp(np.log(0.5) / 15),
# )
my_ewma = EWMA.from_halflife(
inputs=[CryptoPricing.close],
window_length=30,
halflife=15,
)
Notes
-----
This classmethod is provided by both
:class:`ExponentialWeightedMovingAverage` and
:class:`ExponentialWeightedMovingStdDev`.
"""
if halflife <= 0:
raise ValueError(
"`span` must be a positive number. %s was passed." % halflife
)
decay_rate = exp(log(.5) / halflife)
assert 0.0 < decay_rate <= 1.0
return cls(
inputs=inputs,
window_length=window_length,
decay_rate=decay_rate,
**kwargs
)
@classmethod
def from_center_of_mass(cls,
inputs,
window_length,
center_of_mass,
**kwargs):
"""
Convenience constructor for passing `decay_rate` in terms of center of
mass.
Forwards `decay_rate` as `1 - (1 / 1 + center_of_mass)`. This provides
behavior equivalent to passing `center_of_mass` to pandas.ewma.
Examples
--------
.. code-block:: python
# Equivalent to:
# my_ewma = EWMA(
# inputs=[CryptoPricing.close],
# window_length=30,
# decay_rate=(1 - (1 / 15.0)),
# )
my_ewma = EWMA.from_center_of_mass(
inputs=[CryptoPricing.close],
window_length=30,
center_of_mass=15,
)
Notes
-----
This classmethod is provided by both
:class:`ExponentialWeightedMovingAverage` and
:class:`ExponentialWeightedMovingStdDev`.
"""
return cls(
inputs=inputs,
window_length=window_length,
decay_rate=(1.0 - (1.0 / (1.0 + center_of_mass))),
**kwargs
)
class ExponentialWeightedMovingAverage(_ExponentialWeightedFactor):
"""
Exponentially Weighted Moving Average
**Default Inputs:** None
**Default Window Length:** None
Parameters
----------
inputs : length-1 list/tuple of BoundColumn
The expression over which to compute the average.
window_length : int > 0
Length of the lookback window over which to compute the average.
decay_rate : float, 0 < decay_rate <= 1
Weighting factor by which to discount past observations.
When calculating historical averages, rows are multiplied by the
sequence::
decay_rate, decay_rate ** 2, decay_rate ** 3, ...
Notes
-----
- This class can also be imported under the name ``EWMA``.
See Also
--------
:func:`pandas.ewma`
"""
def compute(self, today, assets, out, data, decay_rate):
out[:] = average(
data,
axis=0,
weights=exponential_weights(len(data), decay_rate),
)
class LinearWeightedMovingAverage(CustomFactor, SingleInputMixin):
"""
Weighted Average Value of an arbitrary column
**Default Inputs**: None
**Default Window Length**: None
"""
# numpy's nan functions throw warnings when passed an array containing only
# nans, but they still returns the desired value (nan), so we ignore the
# warning.
ctx = ignore_nanwarnings()
def compute(self, today, assets, out, data):
ndays = data.shape[0]
# Initialize weights array
weights = arange(1, ndays + 1, dtype=float64_dtype).reshape(ndays, 1)
# Compute normalizer
normalizer = (ndays * (ndays + 1)) / 2
# Weight the data
weighted_data = data * weights
# Compute weighted averages
out[:] = nansum(weighted_data, axis=0) / normalizer
class ExponentialWeightedMovingStdDev(_ExponentialWeightedFactor):
"""
Exponentially Weighted Moving Standard Deviation
**Default Inputs:** None
**Default Window Length:** None
Parameters
----------
inputs : length-1 list/tuple of BoundColumn
The expression over which to compute the average.
window_length : int > 0
Length of the lookback window over which to compute the average.
decay_rate : float, 0 < decay_rate <= 1
Weighting factor by which to discount past observations.
When calculating historical averages, rows are multiplied by the
sequence::
decay_rate, decay_rate ** 2, decay_rate ** 3, ...
Notes
-----
- This class can also be imported under the name ``EWMSTD``.
See Also
--------
:func:`pandas.ewmstd`
"""
def compute(self, today, assets, out, data, decay_rate):
weights = exponential_weights(len(data), decay_rate)
mean = average(data, axis=0, weights=weights)
variance = average((data - mean) ** 2, axis=0, weights=weights)
squared_weight_sum = (np_sum(weights) ** 2)
bias_correction = (
squared_weight_sum / (squared_weight_sum - np_sum(weights ** 2))
)
out[:] = sqrt(variance * bias_correction)
class BollingerBands(CustomFactor):
"""
Bollinger Bands technical indicator.
https://en.wikipedia.org/wiki/Bollinger_Bands
**Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.close`
Parameters
----------
inputs : length-1 iterable[BoundColumn]
The expression over which to compute bollinger bands.
window_length : int > 0
Length of the lookback window over which to compute the bollinger
bands.
k : float
The number of standard deviations to add or subtract to create the
upper and lower bands.
"""
params = ('k',)
inputs = (CryptoPricing.close,)
outputs = 'lower', 'middle', 'upper'
def compute(self, today, assets, out, close, k):
difference = k * nanstd(close, axis=0)
out.middle = middle = nanmean(close, axis=0)
out.upper = middle + difference
out.lower = middle - difference
class Aroon(CustomFactor):
"""
Aroon technical indicator.
https://www.fidelity.com/learning-center/trading-investing/technical-analysis/technical-indicator-guide/aroon-indicator # noqa
**Defaults Inputs:** CryptoPricing.low, CryptoPricing.high
Parameters
----------
window_length : int > 0
Length of the lookback window over which to compute the Aroon
indicator.
"""
inputs = (CryptoPricing.low, CryptoPricing.high)
outputs = ('down', 'up')
def compute(self, today, assets, out, lows, highs):
wl = self.window_length
high_date_index = nanargmax(highs, axis=0)
low_date_index = nanargmin(lows, axis=0)
evaluate(
'(100 * high_date_index) / (wl - 1)',
local_dict={
'high_date_index': high_date_index,
'wl': wl,
},
out=out.up,
)
evaluate(
'(100 * low_date_index) / (wl - 1)',
local_dict={
'low_date_index': low_date_index,
'wl': wl,
},
out=out.down,
)
class FastStochasticOscillator(CustomFactor):
"""
Fast Stochastic Oscillator Indicator [%K, Momentum Indicator]
https://wiki.timetotrade.eu/Stochastic
This stochastic is considered volatile, and varies a lot when used in
market analysis. It is recommended to use the slow stochastic oscillator
or a moving average of the %K [%D].
**Default Inputs:** :data: `zipline.pipeline.data.CryptoPricing.close`
:data: `zipline.pipeline.data.CryptoPricing.low`
:data: `zipline.pipeline.data.CryptoPricing.high`
**Default Window Length:** 14
Returns
-------
out: %K oscillator
"""
inputs = (CryptoPricing.close, CryptoPricing.low, CryptoPricing.high)
window_safe = True
window_length = 14
def compute(self, today, assets, out, closes, lows, highs):
highest_highs = nanmax(highs, axis=0)
lowest_lows = nanmin(lows, axis=0)
today_closes = closes[-1]
evaluate(
'((tc - ll) / (hh - ll)) * 100',
local_dict={
'tc': today_closes,
'll': lowest_lows,
'hh': highest_highs,
},
global_dict={},
out=out,
)
class IchimokuKinkoHyo(CustomFactor):
"""Compute the various metrics for the Ichimoku Kinko Hyo (Ichimoku Cloud).
http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:ichimoku_cloud # noqa
**Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.high`
:data:`zipline.pipeline.data.CryptoPricing.low`
:data:`zipline.pipeline.data.CryptoPricing.close`
**Default Window Length:** 52
Parameters
----------
window_length : int > 0
The length the the window for the senkou span b.
tenkan_sen_length : int >= 0, <= window_length
The length of the window for the tenkan-sen.
kijun_sen_length : int >= 0, <= window_length
The length of the window for the kijou-sen.
chikou_span_length : int >= 0, <= window_length
The lag for the chikou span.
"""
params = {
'tenkan_sen_length': 9,
'kijun_sen_length': 26,
'chikou_span_length': 26,
}
inputs = (CryptoPricing.high, CryptoPricing.low, CryptoPricing.close)
outputs = (
'tenkan_sen',
'kijun_sen',
'senkou_span_a',
'senkou_span_b',
'chikou_span',
)
window_length = 52
def _validate(self):
super(IchimokuKinkoHyo, self)._validate()
for k, v in self.params.items():
if v > self.window_length:
raise ValueError(
'%s must be <= the window_length: %s > %s' % (
k, v, self.window_length,
),
)
def compute(self,
today,
assets,
out,
high,
low,
close,
tenkan_sen_length,
kijun_sen_length,
chikou_span_length):
out.tenkan_sen = tenkan_sen = (
high[-tenkan_sen_length:].max(axis=0) +
low[-tenkan_sen_length:].min(axis=0)
) / 2
out.kijun_sen = kijun_sen = (
high[-kijun_sen_length:].max(axis=0) +
low[-kijun_sen_length:].min(axis=0)
) / 2
out.senkou_span_a = (tenkan_sen + kijun_sen) / 2
out.senkou_span_b = (high.max(axis=0) + low.min(axis=0)) / 2
out.chikou_span = close[chikou_span_length]
class RateOfChangePercentage(CustomFactor):
"""
Rate of change Percentage
ROC measures the percentage change in price from one period to the next.
The ROC calculation compares the current price with the price `n`
periods ago.
Formula for calculation: ((price - prevPrice) / prevPrice) * 100
price - the current price
prevPrice - the price n days ago, equals window length
"""
def compute(self, today, assets, out, close):
today_close = close[-1]
prev_close = close[0]
evaluate('((tc - pc) / pc) * 100',
local_dict={
'tc': today_close,
'pc': prev_close
},
global_dict={},
out=out,
)
class TrueRange(CustomFactor):
"""
True Range
A technical indicator originally developed by J. Welles Wilder, Jr.
Indicates the true degree of daily price change in an underlying.
**Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.high`
:data:`zipline.pipeline.data.CryptoPricing.low`
:data:`zipline.pipeline.data.CryptoPricing.close`
**Default Window Length:** 2
"""
inputs = (
CryptoPricing.high,
CryptoPricing.low,
CryptoPricing.close,
)
window_length = 2
def compute(self, today, assets, out, highs, lows, closes):
high_to_low = highs[1:] - lows[1:]
high_to_prev_close = abs(highs[1:] - closes[:-1])
low_to_prev_close = abs(lows[1:] - closes[:-1])
out[:] = nanmax(
dstack((
high_to_low,
high_to_prev_close,
low_to_prev_close,
)),
2
)
class MovingAverageConvergenceDivergenceSignal(CustomFactor):
"""
Moving Average Convergence/Divergence (MACD) Signal line
https://en.wikipedia.org/wiki/MACD
A technical indicator originally developed by Gerald Appel in the late
1970's. MACD shows the relationship between two moving averages and
reveals changes in the strength, direction, momentum, and duration of a
trend in a stock's price.
**Default Inputs:** :data:`zipline.pipeline.data.CryptoPricing.close`
Parameters
----------
fast_period : int > 0, optional
The window length for the "fast" EWMA. Default is 12.
slow_period : int > 0, > fast_period, optional
The window length for the "slow" EWMA. Default is 26.
signal_period : int > 0, < fast_period, optional
The window length for the signal line. Default is 9.
Notes
-----
Unlike most pipeline expressions, this factor does not accept a
``window_length`` parameter. ``window_length`` is inferred from
``slow_period`` and ``signal_period``.
"""
inputs = (CryptoPricing.close,)
# We don't use the default form of `params` here because we want to
# dynamically calculate `window_length` from the period lengths in our
# __new__.
params = ('fast_period', 'slow_period', 'signal_period')
@expect_bounded(
__funcname='MACDSignal',
fast_period=(1, None), # These must all be >= 1.
slow_period=(1, None),
signal_period=(1, None),
)
def __new__(cls,
fast_period=12,
slow_period=26,
signal_period=9,
*args,
**kwargs):
if slow_period <= fast_period:
raise ValueError(
"'slow_period' must be greater than 'fast_period', but got\n"
"slow_period={slow}, fast_period={fast}".format(
slow=slow_period,
fast=fast_period,
)
)
return super(MovingAverageConvergenceDivergenceSignal, cls).__new__(
cls,
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
window_length=slow_period + signal_period - 1,
*args, **kwargs
)
def _ewma(self, data, length):
decay_rate = 1.0 - (2.0 / (1.0 + length))
return average(
data,
axis=1,
weights=exponential_weights(length, decay_rate)
)
def compute(self, today, assets, out, close, fast_period, slow_period,
signal_period):
slow_EWMA = self._ewma(
rolling_window(close, slow_period),
slow_period
)
fast_EWMA = self._ewma(
rolling_window(close, fast_period)[-signal_period:],
fast_period
)
macd = fast_EWMA - slow_EWMA
out[:] = self._ewma(macd.T, signal_period)
class AnnualizedVolatility(CustomFactor):
"""
Volatility. The degree of variation of a series over time as measured by
the standard deviation of daily returns.
https://en.wikipedia.org/wiki/Volatility_(finance)
**Default Inputs:** :data:`zipline.pipeline.factors.Returns(window_length=2)` # noqa
Parameters
----------
annualization_factor : float, optional
The number of time units per year. Defaults is 252, the number of NYSE
trading days in a normal year.
"""
inputs = [Returns(window_length=2)]
params = {'annualization_factor': 252.0}
window_length = 252
def compute(self, today, assets, out, returns, annualization_factor):
out[:] = nanstd(returns, axis=0) * (annualization_factor ** .5)
# Convenience aliases.
EWMA = ExponentialWeightedMovingAverage
EWMSTD = ExponentialWeightedMovingStdDev
MACDSignal = MovingAverageConvergenceDivergenceSignal
@@ -0,0 +1,47 @@
from .technical import (
AnnualizedVolatility,
Aroon,
AverageDollarVolume,
BollingerBands,
EWMA,
EWMSTD,
ExponentialWeightedMovingAverage,
ExponentialWeightedMovingStdDev,
FastStochasticOscillator,
IchimokuKinkoHyo,
LinearWeightedMovingAverage,
MACDSignal,
MaxDrawdown,
MovingAverageConvergenceDivergenceSignal,
RateOfChangePercentage,
Returns,
RSI,
SimpleMovingAverage,
TrueRange,
VWAP,
WeightedAverageValue,
)
__all__ = [
'AnnualizedVolatility',
'Aroon',
'AverageDollarVolume',
'BollingerBands',
'EWMA',
'EWMSTD',
'ExponentialWeightedMovingAverage',
'ExponentialWeightedMovingStdDev',
'FastStochasticOscillator',
'IchimokuKinkoHyo',
'LinearWeightedMovingAverage',
'MACDSignal',
'MaxDrawdown',
'MovingAverageConvergenceDivergenceSignal',
'RateOfChangePercentage',
'Returns',
'RSI',
'SimpleMovingAverage',
'TrueRange',
'VWAP',
'WeightedAverageValue',
]
@@ -41,7 +41,7 @@ from zipline.utils.numpy_utils import (
ignore_nanwarnings,
rolling_window,
)
from .factor import CustomFactor
from ..factor import CustomFactor
class Returns(CustomFactor):
+2
View File
@@ -1,5 +1,7 @@
from .equity_pricing_loader import USEquityPricingLoader
from .crypto_pricing_loader import CryptoPricingLoader
__all__ = [
'USEquityPricingLoader',
'CryptoPricingLoader',
]
+4
View File
@@ -19,3 +19,7 @@ class PipelineLoader(with_metaclass(ABCMeta)):
@abstractmethod
def load_adjusted_array(self, columns, dates, assets, mask):
pass
@abstractmethod
def columns(self):
pass
@@ -0,0 +1,136 @@
# Copyright 2015 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from numpy import (
iinfo,
uint32,
)
from zipline.data.us_equity_pricing import (
BcolzDailyBarReader,
SQLiteAdjustmentReader,
)
from zipline.lib.adjusted_array import AdjustedArray
from zipline.errors import NoFurtherDataError
from zipline.utils.calendars import get_calendar
from .base import PipelineLoader
UINT32_MAX = iinfo(uint32).max
class CryptoPricingLoader(PipelineLoader):
"""
PipelineLoader for Crypto Pricing data
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, dataset):
self.raw_price_loader = raw_price_loader
self._columns = dataset.columns
cal = get_calendar('NYSE')
self._all_sessions = cal.all_sessions
@classmethod
def from_files(cls, pricing_path):
"""
Create a loader from a bcolz equity pricing dir and a SQLite
adjustments path.
Parameters
----------
pricing_path : str
Path to a bcolz directory written by a BcolzDailyBarWriter.
"""
return cls(
BcolzDailyBarReader(pricing_path),
)
def load_adjusted_array(self, columns, dates, assets, mask):
# load_adjusted_array is called with dates on which the user's algo
# will be shown data, which means we need to return the data that would
# be known at the start of each date. We assume that the latest data
# known on day N is the data from day (N - 1), so we shift all query
# dates back by a day.
start_date, end_date = _shift_dates(
self._all_sessions, dates[0], dates[-1], shift=1,
)
colnames = [c.name for c in columns]
raw_arrays = self.raw_price_loader.load_raw_arrays(
colnames,
start_date,
end_date,
assets,
)
out = {}
for c, c_raw in zip(columns, raw_arrays):
out[c] = AdjustedArray(
c_raw.astype(c.dtype),
mask,
{},
c.missing_value,
)
return out
@property
def columns(self):
return self._columns
def _shift_dates(dates, start_date, end_date, shift):
try:
start = dates.get_loc(start_date)
except KeyError:
if start_date < dates[0]:
raise NoFurtherDataError(
msg=(
"Pipeline Query requested data starting on {query_start}, "
"but first known date is {calendar_start}"
).format(
query_start=str(start_date),
calendar_start=str(dates[0]),
)
)
else:
raise ValueError("Query start %s not in calendar" % start_date)
# Make sure that shifting doesn't push us out of the calendar.
if start < shift:
raise NoFurtherDataError(
msg=(
"Pipeline Query requested data from {shift}"
" days before {query_start}, but first known date is only "
"{start} days earlier."
).format(shift=shift, query_start=start_date, start=start),
)
try:
end = dates.get_loc(end_date)
except KeyError:
if end_date > dates[-1]:
raise NoFurtherDataError(
msg=(
"Pipeline Query requesting data up to {query_end}, "
"but last known date is {calendar_end}"
).format(
query_end=end_date,
calendar_end=dates[-1],
)
)
else:
raise ValueError("Query end %s not in calendar" % end_date)
return dates[start - shift], dates[end - shift]
@@ -36,9 +36,10 @@ class USEquityPricingLoader(PipelineLoader):
Delegates loading of baselines and adjustments.
"""
def __init__(self, raw_price_loader, adjustments_loader):
def __init__(self, raw_price_loader, adjustments_loader, dataset):
self.raw_price_loader = raw_price_loader
self.adjustments_loader = adjustments_loader
self._columns = dataset.columns
cal = self.raw_price_loader.trading_calendar or \
get_calendar("NYSE")
@@ -94,6 +95,10 @@ class USEquityPricingLoader(PipelineLoader):
c.missing_value,
)
return out
@property
def columns(self):
return self._columns
def _shift_dates(dates, start_date, end_date, shift):
@@ -3,6 +3,7 @@ from zipline.errors import (
CyclicCalendarAlias,
InvalidCalendarName,
)
from zipline.utils.calendars.exchange_calendar_open import OpenExchangeCalendar
from zipline.utils.calendars.exchange_calendar_cfe import CFEExchangeCalendar
from zipline.utils.calendars.exchange_calendar_ice import ICEExchangeCalendar
from zipline.utils.calendars.exchange_calendar_nyse import NYSEExchangeCalendar
@@ -15,6 +16,7 @@ from zipline.utils.calendars.us_futures_calendar import (
)
_default_calendar_factories = {
'OPEN': OpenExchangeCalendar,
'NYSE': NYSEExchangeCalendar,
'CME': CMEExchangeCalendar,
'ICE': ICEExchangeCalendar,
@@ -25,6 +27,7 @@ _default_calendar_factories = {
'us_futures': QuantopianUSFuturesCalendar,
}
_default_calendar_aliases = {
'CATX': 'OPEN',
'NASDAQ': 'NYSE',
'BATS': 'NYSE',
'CBOT': 'CME',
@@ -72,6 +75,8 @@ class TradingCalendarDispatcher(object):
"""
canonical_name = self.resolve_alias(name)
print 'get_calendar(\'{name}\')'.format(name=canonical_name)
try:
return self._calendars[canonical_name]
except KeyError:
@@ -0,0 +1,29 @@
from datetime import time
from pytz import timezone
from .trading_calendar import TradingCalendar
from zipline.utils.memoize import lazyval
class OpenExchangeCalendar(TradingCalendar):
@property
def name(self):
return 'OPEN'
@property
def tz(self):
return timezone('US/Eastern')
@property
def open_time(self):
return time(0)
@property
def close_time(self):
return time(23, 59)
@lazyval
def day(self):
return 'D'
+3 -2
View File
@@ -606,8 +606,9 @@ class time_rules(object):
class calendars(object):
US_EQUITIES = sentinel('US_EQUITIES')
US_FUTURES = sentinel('US_FUTURES')
CRYPTO_ASSETS = sentinel('CRYPTO_ASSETS')
US_EQUITIES = sentinel('US_EQUITIES')
US_FUTURES = sentinel('US_FUTURES')
def make_eventrule(date_rule, time_rule, cal, half_days=True):
+74 -33
View File
@@ -18,8 +18,8 @@ from zipline.algorithm import TradingAlgorithm
from zipline.data.bundles.core import load
from zipline.data.data_portal import DataPortal
from zipline.finance.trading import TradingEnvironment
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.data import USEquityPricing, CryptoPricing
from zipline.pipeline.loaders import USEquityPricingLoader, CryptoPricingLoader
from zipline.utils.calendars import get_calendar
from zipline.utils.factory import create_simulation_parameters
import zipline.utils.paths as pth
@@ -113,44 +113,85 @@ def _run(handle_data,
click.echo(algotext)
if bundle is not None:
bundle_data = load(
bundle,
environ,
bundle_timestamp,
)
bundles = bundle.split(',')
print 'bundles: {0}'.format(bundles)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
def get_trading_env_and_data(bundles):
env = data = None
b = 'catalyst'
if len(bundles) == 0:
return env, data
elif len(bundles) == 1:
b = bundles[0]
bundle_data = load(
b,
environ,
bundle_timestamp,
)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
env = TradingEnvironment(asset_db_path=connstr, environ=environ)
first_trading_day =\
bundle_data.equity_minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
get_calendar('NYSE'),
first_trading_day=first_trading_day,
equity_minute_reader=bundle_data.equity_minute_bar_reader,
equity_daily_reader=bundle_data.equity_daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
env = TradingEnvironment(asset_db_path=connstr, environ=environ)
first_trading_day =\
bundle_data.equity_minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder, get_calendar("NYSE"),
first_trading_day=first_trading_day,
equity_minute_reader=bundle_data.equity_minute_bar_reader,
equity_daily_reader=bundle_data.equity_daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
pipeline_loader = USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
)
return env, data
def choose_loader(column):
if column in USEquityPricing.columns:
return pipeline_loader
def get_loader_for_bundle(b):
bundle_data = load(
b,
environ,
bundle_timestamp,
)
if b == 'catalyst':
print 'creating crypto pricing loader...'
return CryptoPricingLoader(
bundle_data.equity_daily_bar_reader,
CryptoPricing,
)
elif b == 'quantopian-quandl':
return USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader,
USEquityPricing,
)
raise ValueError(
"No PipelineLoader registered for column %s." % column
)
loaders = [get_loader_for_bundle(b) for b in bundles]
env, data = get_trading_env_and_data(bundles)
def choose_loader(column):
print 'finding pricing loader...'
for loader in loaders:
if column in loader.columns:
return loader
raise ValueError(
"No PipelineLoader registered for column %s." % column
)
print 'env: {0}'.format(env)
else:
env = TradingEnvironment(environ=environ)
choose_loader = None