From 53d3b0855b9527d2ee1ff9cbbd3902b85e9cf0ea Mon Sep 17 00:00:00 2001 From: Scott Sanderson Date: Mon, 7 Mar 2016 18:14:03 -0500 Subject: [PATCH] ENH: Add support for Classifiers. Classifiers are computations that represent grouping keys. They can be used in conjuction with normalization functions like ``zscore`` or ``demean`` to perform normalizations over subsets of a dataset. Notable changes: - Added ``demean()`` and ``zscore()`` methods to ``Factor``. - Added a classifier versions of ``Latest`` and ``CustomTermMixin``. The .latest attribute of int64 dataset columns no produces a classifier by default. - Added ``Everything``, a classifier that maps all data to the same value. - Added ``zipline.lib.normalize``, which implements a naive, pure-Python grouped normalize function. This will likely be moved to Cython in a subsequent PR. --- tests/pipeline/base.py | 18 + tests/pipeline/test_factor.py | 152 ++++++++- tests/pipeline/test_term.py | 6 +- tests/test_doctests.py | 5 +- zipline/lib/normalize.py | 45 +++ zipline/pipeline/__init__.py | 2 +- zipline/pipeline/classifiers/__init__.py | 9 + zipline/pipeline/classifiers/classifier.py | 54 +++ zipline/pipeline/classifiers/latest.py | 29 ++ zipline/pipeline/data/dataset.py | 15 +- zipline/pipeline/factors/factor.py | 365 +++++++++++++++++++-- zipline/pipeline/filters/filter.py | 2 +- zipline/pipeline/mixins.py | 4 + zipline/pipeline/term.py | 2 + 14 files changed, 674 insertions(+), 34 deletions(-) create mode 100644 zipline/lib/normalize.py create mode 100644 zipline/pipeline/classifiers/__init__.py create mode 100644 zipline/pipeline/classifiers/classifier.py create mode 100644 zipline/pipeline/classifiers/latest.py diff --git a/tests/pipeline/base.py b/tests/pipeline/base.py index f07c00b7..414091c0 100644 --- a/tests/pipeline/base.py +++ b/tests/pipeline/base.py @@ -147,6 +147,24 @@ class BasePipelineTestCase(TestCase): """ return arange(prod(shape), dtype=dtype).reshape(shape) + @with_default_shape + def randn_data(self, seed, shape): + """ + Build a block of testing data from a seeded RandomState. + """ + return np.random.RandomState(seed).randn(*shape) + + @with_default_shape + def eye_mask(self, shape): + """ + Build a mask using np.eye. + """ + return ~np.eye(*shape, dtype=bool) + + @with_default_shape + def ones_mask(self, shape): + return np.ones(shape, dtype=bool) + class EventLoaderCommonMixin(object): @abc.abstractproperty diff --git a/tests/pipeline/test_factor.py b/tests/pipeline/test_factor.py index 0722008d..770ff620 100644 --- a/tests/pipeline/test_factor.py +++ b/tests/pipeline/test_factor.py @@ -5,19 +5,24 @@ from itertools import product from nose_parameterized import parameterized from numpy import ( + apply_along_axis, arange, array, datetime64, empty, eye, nan, + nanmean, + nanstd, ones, + where, ) from numpy.random import randn, seed from zipline.errors import UnknownRankMethod from zipline.lib.rank import masked_rankdata_2d -from zipline.pipeline import Factor, Filter, TermGraph +from zipline.lib.normalize import naive_grouped_rowwise_apply as grouped_apply +from zipline.pipeline import Classifier, Factor, Filter, TermGraph from zipline.pipeline.factors import ( Returns, RSI, @@ -43,6 +48,20 @@ class F(Factor): window_length = 0 +class C(Classifier): + dtype = int64_dtype + missing_value = -1 + inputs = () + window_length = 0 + + +class OtherC(Classifier): + dtype = int64_dtype + missing_value = -1 + inputs = () + window_length = 0 + + class Mask(Filter): inputs = () window_length = 0 @@ -403,3 +422,134 @@ class FactorTestCase(BasePipelineTestCase): ) check_arrays(float_result, datetime_result) + + @parameter_space( + seed_value=range(1, 2), + normalizer_name_and_func=[ + ('demean', lambda row: row - nanmean(row)), + ('zscore', lambda row: (row - nanmean(row)) / nanstd(row)), + ], + add_nulls_to_factor=(False, True,) + ) + def test_normalizations(self, + seed_value, + normalizer_name_and_func, + add_nulls_to_factor): + + name, func = normalizer_name_and_func + + shape = (7, 7) + + # All Trues. + nomask = self.ones_mask(shape=shape) + # Falses on main diagonal. + eyemask = self.eye_mask(shape=shape) + # Falses on other diagonal. + eyemask_T = eyemask.T + # Falses on both diagonals. + xmask = eyemask & eyemask_T + + # Block of random data. + factor_data = self.randn_data(seed=seed_value, shape=shape) + if add_nulls_to_factor: + factor_data = where(eyemask, factor_data, nan) + + # Cycles of 0, 1, 2, 0, 1, 2, ... + classifier_data = ( + (self.arange_data(shape=shape, dtype=int) + seed_value) % 3 + ) + # With -1s on main diagonal. + classifier_data_eyenulls = where(eyemask, classifier_data, -1) + # With -1s on opposite diagonal. + classifier_data_eyenulls_T = where(eyemask_T, classifier_data, -1) + # With -1s on both diagonals. + classifier_data_xnulls = where(xmask, classifier_data, -1) + + f = self.f + c = C() + c_with_nulls = OtherC() + m = Mask() + method = getattr(f, name) + terms = { + 'vanilla': method(), + 'masked': method(mask=m), + 'grouped': method(groupby=c), + 'grouped_with_nulls': method(groupby=c_with_nulls), + 'both': method(mask=m, groupby=c), + 'both_with_nulls': method(mask=m, groupby=c_with_nulls), + } + + expected = { + 'vanilla': apply_along_axis(func, 1, factor_data,), + 'masked': where( + eyemask, + grouped_apply(factor_data, eyemask, func), + nan, + ), + 'grouped': grouped_apply( + factor_data, + classifier_data, + func, + ), + # If the classifier has nulls, we should get NaNs in the + # corresponding locations in the output. + 'grouped_with_nulls': where( + eyemask_T, + grouped_apply(factor_data, classifier_data_eyenulls_T, func), + nan, + ), + # Passing a mask with a classifier should behave as though the + # classifier had nulls where the mask was False. + 'both': where( + eyemask, + grouped_apply( + factor_data, + classifier_data_eyenulls, + func, + ), + nan, + ), + 'both_with_nulls': where( + xmask, + grouped_apply( + factor_data, + classifier_data_xnulls, + func, + ), + nan, + ) + } + + graph = TermGraph(terms) + results = self.run_graph( + graph, + initial_workspace={ + f: factor_data, + c: classifier_data, + c_with_nulls: classifier_data_eyenulls_T, + Mask(): eyemask, + }, + mask=self.build_mask(nomask), + ) + + for key in expected: + check_arrays(expected[key], results[key]) + + @parameter_space(normalizer=['demean', 'zscore']) + def test_cant_normalize_non_float(self, normalizer): + class DateFactor(Factor): + dtype = datetime64ns_dtype + inputs = () + window_length = 0 + + d = DateFactor() + with self.assertRaises(TypeError) as e: + getattr(d, normalizer)() + + errmsg = str(e.exception) + expected = ( + "{normalizer}() is only defined on Factors of dtype float64," + " but it was called on a Factor of dtype datetime64[ns]." + ).format(normalizer=normalizer) + + self.assertEqual(errmsg, expected) diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index ef9b8392..d8820e55 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -13,7 +13,7 @@ from zipline.errors import ( UnsupportedDType, WindowLengthNotSpecified, ) -from zipline.pipeline import Factor, Filter, TermGraph +from zipline.pipeline import Classifier, Factor, Filter, TermGraph from zipline.pipeline.data import Column, DataSet from zipline.pipeline.data.testing import TestingDataSet from zipline.pipeline.term import AssetExists, NotSpecified @@ -344,10 +344,12 @@ class ObjectIdentityTestCase(TestCase): SomeFactor(dtype=complex128_dtype) def test_latest_on_different_dtypes(self): - factor_dtypes = (int64_dtype, float64_dtype, datetime64ns_dtype) + factor_dtypes = (float64_dtype, datetime64ns_dtype) for column in TestingDataSet.columns: if column.dtype == bool_dtype: self.assertIsInstance(column.latest, Filter) + elif column.dtype == int64_dtype: + self.assertIsInstance(column.latest, Classifier) elif column.dtype in factor_dtypes: self.assertIsInstance(column.latest, Factor) else: diff --git a/tests/test_doctests.py b/tests/test_doctests.py index 46890127..0492485b 100644 --- a/tests/test_doctests.py +++ b/tests/test_doctests.py @@ -4,7 +4,7 @@ import doctest from unittest import TestCase from zipline import testing -from zipline.lib import adjustment +from zipline.lib import adjustment, normalize from zipline.pipeline import ( engine, expression, @@ -86,3 +86,6 @@ class DoctestTestCase(TestCase): def test_functional_docs(self): self._check_docs(functional) + + def test_normalize_docs(self): + self._check_docs(normalize) diff --git a/zipline/lib/normalize.py b/zipline/lib/normalize.py new file mode 100644 index 00000000..72ac7ec6 --- /dev/null +++ b/zipline/lib/normalize.py @@ -0,0 +1,45 @@ +import numpy as np + + +def naive_grouped_rowwise_apply(data, group_labels, func, out=None): + """ + Simple implementation of grouped row-wise function application. + + Parameters + ---------- + data : ndarray[ndim=2] + Input array over which to apply a grouped function. + group_labels : ndarray[ndim=2, dtype=int64] + Labels to use to bucket inputs from array. + Should be the same shape as array. + func : function[ndarray[ndim=1]] -> function[ndarray[ndim=1]] + Function to apply to pieces of each row in array. + out : ndarray, optional + Array into which to write output. If not supplied, a new array of the + same shape as ``data`` is allocated and returned. + + Example + ------- + >>> data = np.array([[1., 2., 3.], + ... [2., 3., 4.], + ... [5., 6., 7.]]) + >>> labels = np.array([[0, 0, 1], + ... [0, 1, 0], + ... [1, 0, 2]]) + >>> naive_grouped_rowwise_apply(data, labels, lambda row: row - row.min()) + array([[ 0., 1., 0.], + [ 0., 0., 2.], + [ 0., 0., 0.]]) + >>> naive_grouped_rowwise_apply(data, labels, lambda row: row / row.sum()) + array([[ 0.33333333, 0.66666667, 1. ], + [ 0.33333333, 1. , 0.66666667], + [ 1. , 1. , 1. ]]) + """ + if out is None: + out = np.empty_like(data) + + for (row, label_row, out_row) in zip(data, group_labels, out): + for label in np.unique(label_row): + locs = (label_row == label) + out_row[locs] = func(row[locs]) + return out diff --git a/zipline/pipeline/__init__.py b/zipline/pipeline/__init__.py index d9ca1918..89f0a33b 100644 --- a/zipline/pipeline/__init__.py +++ b/zipline/pipeline/__init__.py @@ -1,7 +1,7 @@ from __future__ import print_function from zipline.assets import AssetFinder -from .classifier import Classifier +from .classifiers import Classifier from .engine import SimplePipelineEngine from .factors import Factor, CustomFactor from .filters import Filter diff --git a/zipline/pipeline/classifiers/__init__.py b/zipline/pipeline/classifiers/__init__.py new file mode 100644 index 00000000..1bd9e900 --- /dev/null +++ b/zipline/pipeline/classifiers/__init__.py @@ -0,0 +1,9 @@ +from .classifier import Classifier, CustomClassifier, Everything +from latest import Latest + +__all__ = [ + 'Classifier', + 'CustomClassifier', + 'Everything', + 'Latest', +] diff --git a/zipline/pipeline/classifiers/classifier.py b/zipline/pipeline/classifiers/classifier.py new file mode 100644 index 00000000..514dbf6d --- /dev/null +++ b/zipline/pipeline/classifiers/classifier.py @@ -0,0 +1,54 @@ +""" +classifier.py +""" +from numpy import zeros, where + +from zipline.errors import UnsupportedDataType +from zipline.pipeline.term import ComputableTerm +from zipline.utils.numpy_utils import int64_dtype + +from ..mixins import CustomTermMixin, PositiveWindowLengthMixin + + +class Classifier(ComputableTerm): + + def _validate(self): + # Run superclass validation first so that we handle `dtype not passed` + # before this. + retval = super(Classifier, self)._validate() + # TODO: Support strings here. + if self.dtype != int64_dtype: + raise UnsupportedDataType( + typename=type(self).__name__, + dtype=self.dtype + ) + return retval + + +class Everything(Classifier): + """ + A trivial classifier that classifies everything the same. + """ + dtype = int64_dtype + window_length = 0 + inputs = () + missing_value = -1 + + def _compute(self, arrays, dates, assets, mask): + return where( + mask, + zeros(shape=mask.shape, dtype=int64_dtype), + self.missing_value, + ) + + +class CustomClassifier(PositiveWindowLengthMixin, CustomTermMixin, Classifier): + """ + Base class for user-defined Classifiers. + + See Also + -------- + zipline.pipeline.CustomFactor + zipline.pipeline.CustomFilter + """ + pass diff --git a/zipline/pipeline/classifiers/latest.py b/zipline/pipeline/classifiers/latest.py new file mode 100644 index 00000000..efc1dd55 --- /dev/null +++ b/zipline/pipeline/classifiers/latest.py @@ -0,0 +1,29 @@ +""" +Classifier that produces the most most recently-known value of a +integer-valued column. +""" +from zipline.utils.numpy_utils import int64_dtype + +from .classifier import CustomClassifier +from ..mixins import SingleInputMixin + + +class Latest(SingleInputMixin, CustomClassifier): + """ + Filter producing the most recently-known value of `inputs[0]` on each day. + """ + window_length = 1 + + def compute(self, today, assets, out, data): + out[:] = data[-1] + + def _validate(self): + if self.inputs[0].dtype != int64_dtype: + raise TypeError( + "{name} expected an input of dtype int64, " + "but got {not_bool} instead.".format( + name=type(self).__name__, + not_bool=self.inputs[0].dtype, + ) + ) + super(Latest, self)._validate() diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index 43513fc0..980156a1 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -16,6 +16,7 @@ from zipline.pipeline.term import ( from zipline.utils.input_validation import ensure_dtype from zipline.utils.numpy_utils import ( bool_dtype, + int64_dtype, NoDefaultMissingValue, ) from zipline.utils.preprocess import preprocess @@ -93,16 +94,20 @@ class BoundColumn(LoadableTerm): A column of data that's been concretely bound to a particular dataset. Instances of this class are dynamically created upon access to attributes - of DataSets. + of DataSets (for example, USEquityPricing.close is an instance of this + class). Attributes ---------- dtype : numpy.dtype The dtype of data produced when this column is loaded. latest : zipline.pipeline.data.Factor or zipline.pipeline.data.Filter - A Filter/Factor computing the most recently known value of this column - on each date. Produces a Filter if self.dtype == ``np.bool_``, - otherwise produces a Factor. + A Filter, Factor, or Classifier computing the most recently known value + of this column on each date. + + Produces a Filter if self.dtype == ``np.bool_``. + Produces a Classifier if self.dtype == ``np.int64`` + Otherwise produces a Factor. dataset : zipline.pipeline.data.DataSet The dataset to which this column is bound. name : str @@ -162,6 +167,8 @@ class BoundColumn(LoadableTerm): def latest(self): if self.dtype == bool_dtype: from zipline.pipeline.filters import Latest + elif self.dtype == int64_dtype: + from zipline.pipeline.classifiers import Latest else: from zipline.pipeline.factors import Latest return Latest( diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index d438d7fe..8ce4106f 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -5,20 +5,27 @@ from functools import wraps from operator import attrgetter from numbers import Number -from numpy import inf +from numpy import inf, where, nanstd from toolz import curry from zipline.errors import ( UnknownRankMethod, UnsupportedDataType, ) +from zipline.lib.normalize import naive_grouped_rowwise_apply from zipline.lib.rank import masked_rankdata_2d +from zipline.pipeline.classifiers import Classifier, Everything from zipline.pipeline.mixins import ( CustomTermMixin, PositiveWindowLengthMixin, SingleInputMixin, ) -from zipline.pipeline.term import ComputableTerm, NotSpecified, Term +from zipline.pipeline.term import ( + ComputableTerm, + NotSpecified, + NotSpecifiedType, + Term, +) from zipline.pipeline.expression import ( BadBinaryOperator, COMPARISONS, @@ -31,11 +38,13 @@ from zipline.pipeline.expression import ( unary_op_name, ) from zipline.pipeline.filters import ( + Filter, NumExprFilter, PercentileFilter, NullFilter, ) -from zipline.utils.control_flow import nullctx +from zipline.utils.input_validation import expect_types +from zipline.utils.math_utils import nanmean from zipline.utils.numpy_utils import ( bool_dtype, coerce_to_dtype, @@ -43,6 +52,7 @@ from zipline.utils.numpy_utils import ( float64_dtype, int64_dtype, ) +from zipline.utils.preprocess import preprocess _RANK_METHODS = frozenset(['average', 'min', 'max', 'dense', 'ordinal']) @@ -319,26 +329,67 @@ def function_application(func): return mathfunc -def if_not_float64_tell_caller_to_use_isnull(f): +def restrict_to_dtype(dtype, message_template): """ - Factor method decorator that checks if self.dtype if float64. + A factory for decorators that restricting Factor methods to only be + callable on Factors with a specific dtype. - If the factor instance is of another dtype, this raises a TypeError - directing the user to `isnull` or `notnull` instead. + This is conceptually similar to + zipline.utils.input_validation.expect_dtypes, but provides more flexibility + for providing error messages that are specifically targeting Factor + methods. + + Parameters + ---------- + dtype : numpy.dtype + The dtype on which the decorated method may be called. + message_template : str + A template for the error message to be raised. + `message_template.format` will be called with keyword arguments + `method_name`, `expected_dtype`, and `received_dtype`. + + Usage + ----- + @restrict_to_dtype( + dtype=float64_dtype, + message_template=( + "{method_name}() was called on a factor of dtype {received_dtype}." + "{method_name}() requires factors of dtype{expected_dtype}." + + ), + ) + def some_factor_method(self, ...): + self.stuff_that_requires_being_float64(...) """ - @wraps(f) - def wrapped_method(self): - if self.dtype != float64_dtype: + def processor(factor_method, _, factor_instance): + factor_dtype = factor_instance.dtype + if factor_dtype != dtype: raise TypeError( - "{meth}() was called on a factor of dtype {dtype}.\n" - "{meth}() is only defined for dtype float64." - "To filter missing data, use isnull() or notnull().".format( - meth=f.__name__, - dtype=self.dtype, - ), + message_template.format( + method_name=factor_method.__name__, + expected_dtype=dtype.name, + received_dtype=factor_dtype, + ) ) - return f(self) - return wrapped_method + return factor_instance + return preprocess(self=processor) + +if_not_float64_tell_caller_to_use_isnull = restrict_to_dtype( + dtype=float64_dtype, + message_template=( + "{method_name}() was called on a factor of dtype {received_dtype}.\n" + "{method_name}() is only defined for dtype {expected_dtype}." + "To filter missing data, use isnull() or notnull()." + ) +) + +float64_only = restrict_to_dtype( + dtype=float64_dtype, + message_template=( + "{method_name}() is only defined on Factors of dtype {expected_dtype}," + " but it was called on a Factor of dtype {received_dtype}." + ) +) FACTOR_DTYPES = frozenset([datetime64ns_dtype, float64_dtype, int64_dtype]) @@ -395,6 +446,190 @@ class Factor(ComputableTerm): ) return retval + @expect_types( + mask=(Filter, NotSpecifiedType), + groupby=(Classifier, NotSpecifiedType), + ) + @float64_only + def demean(self, mask=NotSpecified, groupby=NotSpecified): + """ + Construct a Factor that computes ``self`` and subtracts the mean from + row of the result. + + If ``mask`` is supplied, ignore values where ``mask`` returns False + when computing row means, and output NaN anywhere the mask is False. + + If ``groupby`` is supplied, compute by partitioning each row based on + the values produced by ``groupby``, de-meaning the partitioned arrays, + and stitching the sub-results back together. + + Parameters + ---------- + mask : zipline.pipeline.Filter, optional + A Filter defining values to ignore when computing means. + groupby : zipline.pipeline.Classifier, optional + A classifier defining partitions over which to compute means. + + Example + ------- + Let ``f`` be a Factor which would produce the following output:: + + AAPL MSFT MCD BK + 2017-03-13 1.0 2.0 3.0 4.0 + 2017-03-14 1.5 2.5 3.5 1.0 + 2017-03-15 2.0 3.0 4.0 1.5 + 2017-03-16 2.5 3.5 1.0 2.0 + + Let ``c`` be a Classifier producing the following output:: + + AAPL MSFT MCD BK + 2017-03-13 1 1 2 2 + 2017-03-14 1 1 2 2 + 2017-03-15 1 1 2 2 + 2017-03-16 1 1 2 2 + + Let ``m`` be a Filter producing the following output:: + + AAPL MSFT MCD BK + 2017-03-13 False True True True + 2017-03-14 True False True True + 2017-03-15 True True False True + 2017-03-16 True True True False + + Then ``f.demean()`` will subtract the mean from each row produced by + ``f``. + + :: + + AAPL MSFT MCD BK + 2017-03-13 -1.500 -0.500 0.500 1.500 + 2017-03-14 -0.625 0.375 1.375 -1.125 + 2017-03-15 -0.625 0.375 1.375 -1.125 + 2017-03-16 0.250 1.250 -1.250 -0.250 + + ``f.demean(mask=m)`` will subtract the mean from each row, but means + will be calculated ignoring values on the diagonal, and NaNs will + written to the diagonal in the output. Diagonal values are ignored + because they are the locations where the mask ``m`` produced False. + + :: + + AAPL MSFT MCD BK + 2017-03-13 NaN -1.000 0.000 1.000 + 2017-03-14 -0.500 NaN 1.500 -1.000 + 2017-03-15 -0.166 0.833 NaN -0.666 + 2017-03-16 0.166 1.166 -1.333 NaN + + ``f.demean(groupby=c)`` will subtract the group-mean of AAPL/MSFT and + MCD/BK from their respective entries. The AAPL/MSFT are grouped + together because both assets always produce 1 in the output of the + classifier ``c``. Similarly, MCD/BK are grouped together because they + always produce 2. + + :: + + AAPL MSFT MCD BK + 2017-03-13 -0.500 0.500 -0.500 0.500 + 2017-03-14 -0.500 0.500 1.250 -1.250 + 2017-03-15 -0.500 0.500 1.250 -1.250 + 2017-03-16 -0.500 0.500 -0.500 0.500 + + ``f.demean(mask=m, groupby=c)`` will also subtract the group-mean of + AAPL/MSFT and MCD/BK, but means will be calculated ignoring values on + the diagonal , and NaNs will be written to the diagonal in the output. + + :: + + AAPL MSFT MCD BK + 2017-03-13 NaN 0.000 -0.500 0.500 + 2017-03-14 0.000 NaN 1.250 -1.250 + 2017-03-15 -0.500 0.500 NaN 0.000 + 2017-03-16 -0.500 0.500 0.000 NaN + + Notes + ----- + Mean is sensitive to the magnitudes of outliers. When working with + factor that can potentially produce large outliers, it is often useful + to use the ``mask`` parameter to discard values at the extremes of the + distribution:: + + >>> base = MyFactor(...) + >>> normalized = base.demean(mask=base.percentile_between(1, 99)) + + ``demean()`` is only supported on Factors of dtype float64. + + See Also + -------- + :meth:`pandas.DataFrame.groupby` + """ + return GroupedRowTransform( + transform=lambda row: row - nanmean(row), + factor=self, + mask=mask, + groupby=groupby, + ) + + @expect_types( + mask=(Filter, NotSpecifiedType), + groupby=(Classifier, NotSpecifiedType), + ) + @float64_only + def zscore(self, mask=NotSpecified, groupby=NotSpecified): + """ + Construct a Factor that Z-Scores each day's results. + + The Z-Score of a row is defined as:: + + (row - row.mean()) / row.stddev() + + If ``mask`` is supplied, ignore values where ``mask`` returns False + when computing row means and standard deviations, and output NaN + anywhere the mask is False. + + If ``groupby`` is supplied, compute by partitioning each row based on + the values produced by ``groupby``, z-scoring the partitioned arrays, + and stitching the sub-results back together. + + Parameters + ---------- + mask : zipline.pipeline.Filter, optional + A Filter defining values to ignore when Z-Scoring. + groupby : zipline.pipeline.Classifier, optional + A classifier defining partitions over which to compute Z-Scores. + + Returns + ------- + zscored : zipline.pipeline.Factor + A Factor producing that z-scores the output of self. + + Notes + ----- + Mean and standard deviation are sensitive to the magnitudes of + outliers. When working with factor that can potentially produce large + outliers, it is often useful to use the ``mask`` parameter to discard + values at the extremes of the distribution:: + + >>> base = MyFactor(...) + >>> normalized = base.zscore(mask=base.percentile_between(1, 99)) + + ``zscore()`` is only supported on Factors of dtype float64. + + Example + ------- + See :meth:`~zipline.pipeline.factors.Factor.demean` for an in-depth + example of the semantics for ``mask`` and ``groupby``. + + See Also + -------- + :meth:`pandas.DataFrame.groupby` + """ + return GroupedRowTransform( + transform=lambda row: (row - nanmean(row)) / nanstd(row), + factor=self, + mask=mask, + groupby=groupby, + ) + def rank(self, method='ordinal', ascending=True, mask=NotSpecified): """ Construct a new Factor representing the sorted rank of each column @@ -431,9 +666,8 @@ class Factor(ComputableTerm): See Also -------- - scipy.stats.rankdata - zipline.lib.rank.masked_rankdata_2d - zipline.pipeline.factors.factor.Rank + :func:`scipy.stats.rankdata` + :class:`zipline.pipeline.factors.factor.Rank` """ return Rank(self, method=method, ascending=ascending, mask=mask) @@ -592,6 +826,90 @@ class NumExprFactor(NumericalExpression, Factor): pass +class GroupedRowTransform(Factor): + """ + A Factor that transforms an input factor by applying a row-wise + shape-preserving transformation on classifier-defined groups of that + Factor. + + This is most often useful for normalization operators like ``zscore`` or + ``demean``. + + Parameters + ---------- + transform : function[ndarray[ndim=1] -> ndarray[ndim=1]] + Function to apply over each row group. + factor : zipline.pipeline.Factor + The factor providing baseline data to transform. + mask : zipline.pipeline.Filter + Mask of entries to ignore when calculating transforms. + groupby : zipline.pipeline.Classifier + Classifier partitioning ``factor`` into groups to use when calculating + means. + + Notes + ----- + Users should rarely construct instances of this factor directly. Instead, + they should construct instances via factor normalization methods like + ``zscore`` and ``demean``. + + See Also + -------- + zipline.pipeline.factors.Factor.zscore + zipline.pipeline.factors.Factor.demean + """ + window_length = 0 + + def __new__(cls, transform, factor, mask, groupby): + + if mask is NotSpecified: + mask = factor.mask + else: + mask = mask & factor.mask + + if groupby is NotSpecified: + groupby = Everything(mask=mask) + + return super(GroupedRowTransform, cls).__new__( + GroupedRowTransform, + transform=transform, + inputs=(factor, groupby), + missing_value=factor.missing_value, + mask=mask, + dtype=factor.dtype, + ) + + def _init(self, transform, *args, **kwargs): + self._transform = transform + return super(GroupedRowTransform, self)._init(*args, **kwargs) + + @classmethod + def static_identity(cls, transform, *args, **kwargs): + return ( + super(GroupedRowTransform, cls).static_identity(*args, **kwargs), + transform, + ) + + def _compute(self, arrays, dates, assets, mask): + data = arrays[0] + null_group_value = self.inputs[1].missing_value + group_labels = where( + mask, + arrays[1], + null_group_value, + ) + + return where( + group_labels != null_group_value, + naive_grouped_rowwise_apply( + data=data, + group_labels=group_labels, + func=self._transform, + ), + self.missing_value, + ) + + class Rank(SingleInputMixin, Factor): """ A Factor representing the row-wise rank data of another Factor. @@ -607,8 +925,8 @@ class Rank(SingleInputMixin, Factor): See Also -------- - scipy.stats.rankdata : Underlying ranking algorithm. - zipline.factors.Factor.rank : Method-style interface to same functionality. + :func:`scipy.stats.rankdata` + :class:`Factor.rank` Notes ----- @@ -778,4 +1096,3 @@ class CustomFactor(PositiveWindowLengthMixin, CustomTermMixin, Factor): median_low15 = MedianValue([USEquityPricing.low], window_length=15) ''' dtype = float64_dtype - ctx = nullctx() diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 197f60c8..5d72811f 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -182,7 +182,7 @@ class NullFilter(SingleInputMixin, Filter): Parameters ---------- - factor : zipline.pipeline.factor.Factor + factor : zipline.pipeline.Factor The factor to compare against its missing_value. """ window_length = 0 diff --git a/zipline/pipeline/mixins.py b/zipline/pipeline/mixins.py index 12491acd..11e5cbcf 100644 --- a/zipline/pipeline/mixins.py +++ b/zipline/pipeline/mixins.py @@ -2,6 +2,8 @@ Mixins classes for use with Filters and Factors. """ from numpy import full_like + +from zipline.utils.control_flow import nullctx from zipline.errors import WindowLengthNotPositive from .term import NotSpecified @@ -43,6 +45,8 @@ class CustomTermMixin(object): Used by CustomFactor, CustomFilter, CustomClassifier, etc. """ + ctx = nullctx() + def __new__(cls, inputs=NotSpecified, window_length=NotSpecified, diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 426885fa..b424bafc 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -28,6 +28,8 @@ NotSpecified = sentinel( 'Singleton sentinel value used for Term defaults.', ) +NotSpecifiedType = type(NotSpecified) + class Term(with_metaclass(ABCMeta, object)): """