diff --git a/etc/requirements.txt b/etc/requirements.txt index 7125a63f..5c00af0a 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -47,7 +47,7 @@ bcolz==0.10.0 click==4.0.0 # FUNctional programming utilities -toolz==0.7.2 +toolz==0.7.4 # Asset writer and finder sqlalchemy==1.0.8 diff --git a/tests/pipeline/base.py b/tests/pipeline/base.py index 06ddae44..17556087 100644 --- a/tests/pipeline/base.py +++ b/tests/pipeline/base.py @@ -93,7 +93,7 @@ class BasePipelineTestCase(TestCase): Mapping from termname -> computed result. """ engine = SimplePipelineEngine( - ExplodingObject(), + lambda column: ExplodingObject(), self.__calendar, self.__finder, ) diff --git a/tests/pipeline/test_engine.py b/tests/pipeline/test_engine.py index db89e26a..eda1d237 100644 --- a/tests/pipeline/test_engine.py +++ b/tests/pipeline/test_engine.py @@ -2,6 +2,7 @@ Tests for SimplePipelineEngine """ from __future__ import division +from collections import OrderedDict from unittest import TestCase from itertools import product @@ -11,6 +12,8 @@ from numpy import ( nan, tile, zeros, + float32, + concatenate, ) from pandas import ( DataFrame, @@ -21,19 +24,20 @@ from pandas import ( Series, Timestamp, ) +from pandas.compat.chainmap import ChainMap from pandas.util.testing import assert_frame_equal +from six import iteritems, itervalues from testfixtures import TempDirectory from zipline.pipeline.loaders.synthetic import ( ConstantLoader, - MultiColumnLoader, NullAdjustmentReader, SyntheticDailyBarWriter, ) from zipline.data.us_equity_pricing import BcolzDailyBarReader from zipline.finance.trading import TradingEnvironment from zipline.pipeline import Pipeline -from zipline.pipeline.data import USEquityPricing +from zipline.pipeline.data import USEquityPricing, DataSet, Column from zipline.pipeline.loaders.frame import DataFrameLoader, MULTIPLY from zipline.pipeline.loaders.equity_pricing_loader import ( USEquityPricingLoader, @@ -85,6 +89,49 @@ def assert_multi_index_is_product(testcase, index, *levels): testcase.assertEqual(set(index), set(product(*levels))) +class ColumnArgs(tuple): + """A tuple of Columns that defines equivalence based on the order of the + columns' DataSets, instead of the columns themselves. This is used when + comparing the columns passed to a loader's load_adjusted_array method, + since we want to assert that they are ordered by DataSet. + """ + def __new__(cls, *cols): + return super(ColumnArgs, cls).__new__(cls, cols) + + @classmethod + def sorted_by_ds(cls, *cols): + return cls(*sorted(cols, key=lambda col: col.dataset)) + + def by_ds(self): + return tuple(col.dataset for col in self) + + def __eq__(self, other): + return set(self) == set(other) and self.by_ds() == other.by_ds() + + def __hash__(self): + return hash(frozenset(self)) + + +class RecordingConstantLoader(ConstantLoader): + def __init__(self, *args, **kwargs): + super(RecordingConstantLoader, self).__init__(*args, **kwargs) + + self.load_calls = [] + + def load_adjusted_array(self, columns, dates, assets, mask): + self.load_calls.append(ColumnArgs(*columns)) + + return super(RecordingConstantLoader, self).load_adjusted_array( + columns, dates, assets, mask, + ) + + +class RollingSumSum(CustomFactor): + def compute(self, today, assets, out, *inputs): + assert len(self.inputs) == len(inputs) + out[:] = sum(inputs).sum(axis=0) + + class ConstantInputTestCase(TestCase): def setUp(self): @@ -97,7 +144,7 @@ class ConstantInputTestCase(TestCase): USEquityPricing.high: 4, } self.assets = [1, 2, 3] - self.dates = date_range('2014-01-01', '2014-02-01', freq='D', tz='UTC') + self.dates = date_range('2014-01', '2014-03', freq='D', tz='UTC') self.loader = ConstantLoader( constants=self.constants, dates=self.dates, @@ -115,7 +162,9 @@ class ConstantInputTestCase(TestCase): def test_bad_dates(self): loader = self.loader - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) p = Pipeline() @@ -129,7 +178,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = array(self.assets) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) num_dates = 5 dates = self.dates[10:10 + num_dates] @@ -152,7 +203,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) result_shape = (num_dates, num_assets) = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -185,7 +238,9 @@ class ConstantInputTestCase(TestCase): loader = self.loader finder = self.asset_finder assets = self.assets - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) shape = num_dates, num_assets = (5, len(assets)) dates = self.dates[10:10 + num_dates] @@ -228,7 +283,9 @@ class ConstantInputTestCase(TestCase): def test_numeric_factor(self): constants = self.constants loader = self.loader - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) num_dates = 5 dates = self.dates[10:10 + num_dates] high, low = USEquityPricing.high, USEquityPricing.low @@ -271,6 +328,147 @@ class ConstantInputTestCase(TestCase): DataFrame(expected_avg, index=dates, columns=self.assets), ) + def test_rolling_and_nonrolling(self): + open_ = USEquityPricing.open + close = USEquityPricing.close + volume = USEquityPricing.volume + + # Test for thirty days up to the last day that we think all + # the assets existed. + dates_to_test = self.dates[-30:] + + constants = {open_: 1, close: 2, volume: 3} + loader = ConstantLoader( + constants=constants, + dates=self.dates, + assets=self.assets, + ) + engine = SimplePipelineEngine( + lambda column: loader, self.dates, self.asset_finder, + ) + + sumdiff = RollingSumDifference() + + result = engine.run_pipeline( + Pipeline( + columns={ + 'sumdiff': sumdiff, + 'open': open_.latest, + 'close': close.latest, + 'volume': volume.latest, + }, + ), + dates_to_test[0], + dates_to_test[-1] + ) + self.assertIsNotNone(result) + self.assertEqual( + {'sumdiff', 'open', 'close', 'volume'}, + set(result.columns) + ) + + result_index = self.assets * len(dates_to_test) + result_shape = (len(result_index),) + check_arrays( + result['sumdiff'], + Series(index=result_index, data=full(result_shape, -3)), + ) + + for name, const in [('open', 1), ('close', 2), ('volume', 3)]: + check_arrays( + result[name], + Series(index=result_index, data=full(result_shape, const)), + ) + + def test_loader_given_multiple_columns(self): + + class Loader1DataSet1(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + class Loader1DataSet2(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + class Loader2DataSet(DataSet): + col1 = Column(float32) + col2 = Column(float32) + + constants1 = {Loader1DataSet1.col1: 1, + Loader1DataSet1.col2: 2, + Loader1DataSet2.col1: 3, + Loader1DataSet2.col2: 4} + loader1 = RecordingConstantLoader(constants=constants1, + dates=self.dates, + assets=self.assets) + constants2 = {Loader2DataSet.col1: 5, + Loader2DataSet.col2: 6} + loader2 = RecordingConstantLoader(constants=constants2, + dates=self.dates, + assets=self.assets) + + engine = SimplePipelineEngine( + lambda column: + loader2 if column.dataset == Loader2DataSet else loader1, + self.dates, self.asset_finder, + ) + + pipe_col1 = RollingSumSum(inputs=[Loader1DataSet1.col1, + Loader1DataSet2.col1, + Loader2DataSet.col1], + window_length=2) + + pipe_col2 = RollingSumSum(inputs=[Loader1DataSet1.col2, + Loader1DataSet2.col2, + Loader2DataSet.col2], + window_length=3) + + pipe_col3 = RollingSumSum(inputs=[Loader2DataSet.col1], + window_length=3) + + columns = OrderedDict([ + ('pipe_col1', pipe_col1), + ('pipe_col2', pipe_col2), + ('pipe_col3', pipe_col3), + ]) + result = engine.run_pipeline( + Pipeline(columns=columns), + self.dates[2], # index is >= the largest window length - 1 + self.dates[-1] + ) + min_window = min(pip_col.window_length + for pip_col in itervalues(columns)) + col_to_val = ChainMap(constants1, constants2) + vals = {name: (sum(col_to_val[col] for col in pipe_col.inputs) + * pipe_col.window_length) + for name, pipe_col in iteritems(columns)} + + index = MultiIndex.from_product([self.dates[2:], self.assets]) + expected = DataFrame( + data={col: + concatenate(( + full((columns[col].window_length - min_window) + * index.levshape[1], + nan), + full((index.levshape[0] + - (columns[col].window_length - min_window)) + * index.levshape[1], + val))) + for col, val in iteritems(vals)}, + index=index, + columns=columns) + + assert_frame_equal(result, expected) + + self.assertEqual(set(loader1.load_calls), + {ColumnArgs.sorted_by_ds(Loader1DataSet1.col1, + Loader1DataSet2.col1), + ColumnArgs.sorted_by_ds(Loader1DataSet1.col2, + Loader1DataSet2.col2)}) + self.assertEqual(set(loader2.load_calls), + {ColumnArgs.sorted_by_ds(Loader2DataSet.col1, + Loader2DataSet.col2)}) + class FrameInputTestCase(TestCase): @@ -353,9 +551,12 @@ class FrameInputTestCase(TestCase): high_base.iloc[:apply_idxs[2], 1] /= 5.0 high_loader = DataFrameLoader(high, high_base, adjustments) - loader = MultiColumnLoader({low: low_loader, high: high_loader}) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) + engine = SimplePipelineEngine( + {low: low_loader, high: high_loader}.__getitem__, + self.dates, + self.asset_finder, + ) for window_length in range(1, 4): low_mavg = SimpleMovingAverage( @@ -465,7 +666,7 @@ class SyntheticBcolzTestCase(TestCase): def test_SMA(self): engine = SimplePipelineEngine( - self.pipeline_loader, + lambda column: self.pipeline_loader, self.env.trading_days, self.finder, ) @@ -517,7 +718,7 @@ class SyntheticBcolzTestCase(TestCase): # or zero, but verifying we correctly handle those corner cases is # valuable. engine = SimplePipelineEngine( - self.pipeline_loader, + lambda column: self.pipeline_loader, self.env.trading_days, self.finder, ) @@ -552,69 +753,3 @@ class SyntheticBcolzTestCase(TestCase): result = results['drawdown'].unstack() assert_frame_equal(expected, result) - - -class MultiColumnLoaderTestCase(TestCase): - def setUp(self): - self.assets = [1, 2, 3] - self.dates = date_range('2014-01', '2014-03', freq='D', tz='UTC') - - asset_info = make_simple_asset_info( - self.assets, - start_date=self.dates[0], - end_date=self.dates[-1], - ) - env = TradingEnvironment() - env.write_data(equities_df=asset_info) - self.asset_finder = env.asset_finder - - def test_engine_with_multicolumn_loader(self): - open_ = USEquityPricing.open - close = USEquityPricing.close - volume = USEquityPricing.volume - - # Test for thirty days up to the second to last day that we think all - # the assets existed. If we test the last day of our calendar, no - # assets will be in our output, because their end dates are all - dates_to_test = self.dates[-32:-2] - - constants = {open_: 1, close: 2, volume: 3} - loader = ConstantLoader( - constants=constants, - dates=self.dates, - assets=self.assets, - ) - engine = SimplePipelineEngine(loader, self.dates, self.asset_finder) - - sumdiff = RollingSumDifference() - - result = engine.run_pipeline( - Pipeline( - columns={ - 'sumdiff': sumdiff, - 'open': open_.latest, - 'close': close.latest, - 'volume': volume.latest, - }, - ), - dates_to_test[0], - dates_to_test[-1] - ) - self.assertIsNotNone(result) - self.assertEqual( - {'sumdiff', 'open', 'close', 'volume'}, - set(result.columns) - ) - - result_index = self.assets * len(dates_to_test) - result_shape = (len(result_index),) - check_arrays( - result['sumdiff'], - Series(index=result_index, data=full(result_shape, -3)), - ) - - for name, const in [('open', 1), ('close', 2), ('volume', 3)]: - check_arrays( - result[name], - Series(index=result_index, data=full(result_shape, const)), - ) diff --git a/tests/pipeline/test_pipeline_algo.py b/tests/pipeline/test_pipeline_algo.py index 40fef5f5..f947d25b 100644 --- a/tests/pipeline/test_pipeline_algo.py +++ b/tests/pipeline/test_pipeline_algo.py @@ -182,7 +182,7 @@ class ClosesOnly(TestCase): initialize=initialize, handle_data=late_attach, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -199,7 +199,7 @@ class ClosesOnly(TestCase): before_trading_start=late_attach, handle_data=barf, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -228,7 +228,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -256,7 +256,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -294,7 +294,7 @@ class ClosesOnly(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.first_asset_start - trading_day, end=self.last_asset_end + trading_day, env=self.env, @@ -524,7 +524,7 @@ class PipelineAlgorithmTestCase(TestCase): handle_data=handle_data, before_trading_start=before_trading_start, data_frequency='daily', - pipeline_loader=self.pipeline_loader, + get_pipeline_loader=lambda column: self.pipeline_loader, start=self.dates[max(window_lengths)], end=self.dates[-1], env=self.env, diff --git a/tests/pipeline/test_term.py b/tests/pipeline/test_term.py index 9fc9fac8..ffa2df7b 100644 --- a/tests/pipeline/test_term.py +++ b/tests/pipeline/test_term.py @@ -82,11 +82,14 @@ def to_dict(l): class DependencyResolutionTestCase(TestCase): - def setup(self): - pass + def check_dependency_order(self, ordered_terms): + seen = set() - def teardown(self): - pass + for term in ordered_terms: + for dep in term.dependencies: + self.assertIn(dep, seen) + + seen.add(term) def test_single_factor(self): """ @@ -97,12 +100,12 @@ class DependencyResolutionTestCase(TestCase): resolution_order = list(graph.ordered()) self.assertEqual(len(resolution_order), 4) - self.assertIs(resolution_order[0], AssetExists()) - self.assertEqual( - set([resolution_order[1], resolution_order[2]]), - set([SomeDataSet.foo, SomeDataSet.bar]), - ) - self.assertEqual(resolution_order[-1], SomeFactor()) + self.check_dependency_order(resolution_order) + self.assertIn(AssetExists(), resolution_order) + self.assertIn(SomeDataSet.foo, resolution_order) + self.assertIn(SomeDataSet.bar, resolution_order) + self.assertIn(SomeFactor(), resolution_order) + self.assertEqual(graph.node[SomeDataSet.foo]['extra_rows'], 4) self.assertEqual(graph.node[SomeDataSet.bar]['extra_rows'], 4) @@ -121,18 +124,14 @@ class DependencyResolutionTestCase(TestCase): # SomeFactor, its inputs, and AssetExists() self.assertEqual(len(resolution_order), 4) - - self.assertIs(resolution_order[0], AssetExists()) + self.check_dependency_order(resolution_order) + self.assertIn(AssetExists(), resolution_order) self.assertEqual(graph.extra_rows[AssetExists()], 4) - self.assertEqual( - set([resolution_order[1], resolution_order[2]]), - set([bar, buzz]), - ) - self.assertEqual( - resolution_order[-1], - SomeFactor([bar, buzz], window_length=5), - ) + self.assertIn(bar, resolution_order) + self.assertIn(buzz, resolution_order) + self.assertIn(SomeFactor([bar, buzz], window_length=5), + resolution_order) self.assertEqual(graph.extra_rows[bar], 4) self.assertEqual(graph.extra_rows[buzz], 4) @@ -148,20 +147,8 @@ class DependencyResolutionTestCase(TestCase): # bar should only appear once. self.assertEqual(len(resolution_order), 6) - indices = { - term: resolution_order.index(term) - for term in resolution_order - } - - self.assertEqual(indices[AssetExists()], 0) - - # Verify that f1's dependencies will be computed before f1. - self.assertLess(indices[SomeDataSet.foo], indices[f1]) - self.assertLess(indices[SomeDataSet.bar], indices[f1]) - - # Verify that f2's dependencies will be computed before f2. - self.assertLess(indices[SomeDataSet.bar], indices[f2]) - self.assertLess(indices[SomeDataSet.buzz], indices[f2]) + self.assertEqual(len(set(resolution_order)), 6) + self.check_dependency_order(resolution_order) def test_disallow_recursive_lookback(self): diff --git a/zipline/algorithm.py b/zipline/algorithm.py index c6e967a3..9c43d4df 100644 --- a/zipline/algorithm.py +++ b/zipline/algorithm.py @@ -232,7 +232,7 @@ class TradingAlgorithm(object): self.asset_finder = self.trading_environment.asset_finder # Initialize Pipeline API data. - self.init_engine(kwargs.pop('pipeline_loader', None)) + self.init_engine(kwargs.pop('get_pipeline_loader', None)) self._pipelines = {} # Create an always-expired cache so that we compute the first time data # is requested. @@ -323,15 +323,15 @@ class TradingAlgorithm(object): self.initialize_args = args self.initialize_kwargs = kwargs - def init_engine(self, loader): + def init_engine(self, get_loader): """ Construct and store a PipelineEngine from loader. - If loader is None, constructs a NoOpPipelineEngine. + If get_loader is None, constructs a NoOpPipelineEngine. """ - if loader is not None: + if get_loader is not None: self.engine = SimplePipelineEngine( - loader, + get_loader, self.trading_environment.trading_days, self.asset_finder, ) diff --git a/zipline/pipeline/classifier.py b/zipline/pipeline/classifier.py index aa1263ad..16219d58 100644 --- a/zipline/pipeline/classifier.py +++ b/zipline/pipeline/classifier.py @@ -2,8 +2,8 @@ classifier.py """ -from zipline.pipeline.term import Term +from zipline.pipeline.term import CompositeTerm -class Classifier(Term): +class Classifier(CompositeTerm): pass diff --git a/zipline/pipeline/data/dataset.py b/zipline/pipeline/data/dataset.py index 916c352d..fe8be41a 100644 --- a/zipline/pipeline/data/dataset.py +++ b/zipline/pipeline/data/dataset.py @@ -1,12 +1,13 @@ """ dataset.py """ +from functools import total_ordering from six import ( iteritems, with_metaclass, ) -from zipline.pipeline.term import Term +from zipline.pipeline.term import Term, AssetExists from zipline.pipeline.factors import Latest @@ -29,12 +30,13 @@ class BoundColumn(Term): """ A Column of data that's been concretely bound to a particular dataset. """ + mask = AssetExists() + extra_input_rows = 0 + inputs = () def __new__(cls, dtype, dataset, name): return super(BoundColumn, cls).__new__( cls, - inputs=(), - window_length=0, domain=dataset.domain, dtype=dtype, dataset=dataset, @@ -86,6 +88,7 @@ class BoundColumn(Term): return self.qualname +@total_ordering class DataSetMeta(type): """ Metaclass for DataSets @@ -109,6 +112,9 @@ class DataSetMeta(type): def columns(self): return self._columns + def __lt__(self, other): + return id(self) < id(other) + class DataSet(with_metaclass(DataSetMeta)): domain = None diff --git a/zipline/pipeline/engine.py b/zipline/pipeline/engine.py index d11746f0..2e4f2e0f 100644 --- a/zipline/pipeline/engine.py +++ b/zipline/pipeline/engine.py @@ -13,12 +13,13 @@ from six import ( ) from six.moves import zip_longest from numpy import array - from pandas import ( DataFrame, date_range, MultiIndex, ) +from toolz import groupby, juxt +from toolz.curried.operator import getitem from zipline.lib.adjusted_array import ensure_ndarray from zipline.errors import NoFurtherDataError @@ -82,8 +83,9 @@ class SimplePipelineEngine(object): Parameters ---------- - loader : PipelineLoader - A loader to use to retrieve raw data for atomic terms. + get_loader : callable + A function that is given an atomic term and returns a PipelineLoader + to use to retrieve raw data for that term. calendar : DatetimeIndex Array of dates to consider as trading days when computing a range between a fixed start and end. @@ -92,15 +94,15 @@ class SimplePipelineEngine(object): which assets are in the top-level universe at any point in time. """ __slots__ = [ - '_loader', + '_get_loader', '_calendar', '_finder', '_root_mask_term', '__weakref__', ] - def __init__(self, loader, calendar, asset_finder): - self._loader = loader + def __init__(self, get_loader, calendar, asset_finder): + self._get_loader = get_loader self._calendar = calendar self._finder = asset_finder self._root_mask_term = AssetExists() @@ -240,7 +242,8 @@ class SimplePipelineEngine(object): offset = graph.extra_rows[mask] - graph.extra_rows[term] return workspace[mask][offset:], dates[offset:] - def _inputs_for_term(self, term, workspace, graph): + @staticmethod + def _inputs_for_term(term, workspace, graph): """ Compute inputs for the given term. @@ -273,6 +276,15 @@ class SimplePipelineEngine(object): out.append(input_data) return out + def get_loader(self, term): + # AssetExists is one of the atomic terms in the graph, so we look up + # a loader here when grouping by loader, but since it's already in the + # workspace, we don't actually use that group. + if term is AssetExists(): + return None + + return self._get_loader(term) + def compute_chunk(self, graph, dates, assets, initial_workspace): """ Compute the Pipeline terms in the graph for the requested start and end @@ -297,11 +309,16 @@ class SimplePipelineEngine(object): Dictionary mapping requested results to outputs. """ self._validate_compute_chunk_params(dates, assets, initial_workspace) - loader = self._loader + get_loader = self.get_loader # Copy the supplied initial workspace so we don't mutate it in place. workspace = initial_workspace.copy() + # If atomic terms share the same loader and extra_rows, load them all + # together. + atomic_group_key = juxt(get_loader, getitem(graph.extra_rows)) + atomic_groups = groupby(atomic_group_key, graph.atomic_terms) + for term in graph.ordered(): # `term` may have been supplied in `initial_workspace`, and in the # future we may pre-compute atomic terms coming from the same @@ -315,10 +332,13 @@ class SimplePipelineEngine(object): mask, mask_dates = self._mask_and_dates_for_term( term, workspace, graph, dates ) + if term.atomic: - # FUTURE OPTIMIZATION: Scan the resolution order for terms in - # the same dataset and load them here as well. - to_load = [term] + to_load = sorted( + atomic_groups[atomic_group_key(term)], + key=lambda t: t.dataset + ) + loader = get_loader(term) loaded = loader.load_adjusted_array( to_load, mask_dates, assets, mask, ) diff --git a/zipline/pipeline/expression.py b/zipline/pipeline/expression.py index 8ec5200e..b4d100b0 100644 --- a/zipline/pipeline/expression.py +++ b/zipline/pipeline/expression.py @@ -12,7 +12,7 @@ from numpy import ( find_common_type, ) -from zipline.pipeline.term import Term, NotSpecified +from zipline.pipeline.term import Term, NotSpecified, CompositeTerm _VARIABLE_NAME_RE = re.compile("^(x_)([0-9]+)$") @@ -154,7 +154,7 @@ def is_comparison(op): return op in COMPARISONS -class NumericalExpression(Term): +class NumericalExpression(CompositeTerm): """ Term binding to a numexpr expression. diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index bc05fcb2..4c22d1ef 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -21,7 +21,7 @@ from zipline.pipeline.term import ( NotSpecified, RequiredWindowLengthMixin, SingleInputMixin, - Term, + CompositeTerm, ) from zipline.pipeline.expression import ( BadBinaryOperator, @@ -184,7 +184,7 @@ def function_application(func): return mathfunc -class Factor(Term): +class Factor(CompositeTerm): """ Pipeline API expression producing numerically-valued outputs. """ @@ -450,7 +450,7 @@ class CustomFactor(RequiredWindowLengthMixin, CustomTermMixin, Factor): class-level attribute named `inputs`. window_length : int, optional Number of rows of rows to pass for each input. If this - argument is passed to the CustomFactor constructor, we look for a + argument is not passed to the CustomFactor constructor, we look for a class-level attribute named `window_length`. Notes diff --git a/zipline/pipeline/filters/filter.py b/zipline/pipeline/filters/filter.py index 43575750..9869e910 100644 --- a/zipline/pipeline/filters/filter.py +++ b/zipline/pipeline/filters/filter.py @@ -15,7 +15,7 @@ from zipline.errors import ( ) from zipline.pipeline.term import ( SingleInputMixin, - Term, + CompositeTerm, ) from zipline.pipeline.expression import ( BadBinaryOperator, @@ -83,7 +83,7 @@ def binary_operator(op): return binary_operator -class Filter(Term): +class Filter(CompositeTerm): """ Pipeline API expression producing boolean-valued outputs. """ diff --git a/zipline/pipeline/graph.py b/zipline/pipeline/graph.py index 428b5424..af4a8326 100644 --- a/zipline/pipeline/graph.py +++ b/zipline/pipeline/graph.py @@ -102,15 +102,9 @@ class TermGraph(DiGraph): zipline.pipeline.engine.SimplePipelineEngine._inputs_for_term zipline.pipeline.engine.SimplePipelineEngine._mask_and_dates_for_term """ - out = {} - for term in self: - extra_input_rows = term.extra_input_rows - for input_ in term.inputs: - out[term, input_] = self.extra_rows[input_] - extra_input_rows - mask = term.mask - if term.mask is not None: - out[term, mask] = self.extra_rows[mask] - extra_input_rows - return out + return {(term, dep): self.extra_rows[dep] - term.extra_input_rows + for term in self + for dep in term.dependencies} @lazyval def extra_rows(self): @@ -168,6 +162,10 @@ class TermGraph(DiGraph): """ return iter(self._ordered) + @lazyval + def atomic_terms(self): + return tuple(term for term in self if term.atomic) + def _add_to_graph(self, term, parents, extra_rows): """ Add `term` and all its inputs to the graph. @@ -191,7 +189,7 @@ class TermGraph(DiGraph): dependency_extra_rows = extra_rows + term.extra_input_rows # Recursively add dependencies. - for dependency in term.inputs: + for dependency in term.dependencies: self._add_to_graph( dependency, parents, @@ -199,12 +197,6 @@ class TermGraph(DiGraph): ) self.add_edge(dependency, term) - # Add term's mask, which is really just a specially-enumerated input. - mask = term.mask - if mask is not None: - self._add_to_graph(mask, parents, extra_rows=dependency_extra_rows) - self.add_edge(mask, term) - parents.remove(term) def _ensure_extra_rows(self, term, N): diff --git a/zipline/pipeline/loaders/base.py b/zipline/pipeline/loaders/base.py index 5e8b84b7..0f5dcedb 100644 --- a/zipline/pipeline/loaders/base.py +++ b/zipline/pipeline/loaders/base.py @@ -17,5 +17,5 @@ class PipelineLoader(with_metaclass(ABCMeta)): TODO: DOCUMENT THIS MORE! """ @abstractmethod - def load_adjusted_array(self, columns, mask): + def load_adjusted_array(self, columns, dates, assets, mask): pass diff --git a/zipline/pipeline/loaders/synthetic.py b/zipline/pipeline/loaders/synthetic.py index 2aff90de..92c2a6e4 100644 --- a/zipline/pipeline/loaders/synthetic.py +++ b/zipline/pipeline/loaders/synthetic.py @@ -32,33 +32,7 @@ def nanos_to_seconds(nanos): return nanos / (1000 * 1000 * 1000) -class MultiColumnLoader(PipelineLoader): - """ - PipelineLoader that can delegate to sub-loaders. - - Parameters - ---------- - loaders : dict - Dictionary mapping columns -> loader - """ - def __init__(self, loaders): - self._loaders = loaders - - def load_adjusted_array(self, columns, dates, assets, mask): - """ - Load by delegating to sub-loaders. - """ - out = [] - for col in columns: - try: - loader = self._loaders[col] - except KeyError: - raise ValueError("Couldn't find loader for %s" % col) - out.extend(loader.load_adjusted_array([col], dates, assets, mask)) - return out - - -class ConstantLoader(MultiColumnLoader): +class ConstantLoader(PipelineLoader): """ Synthetic PipelineLoader that returns a constant value for each column. @@ -91,7 +65,20 @@ class ConstantLoader(MultiColumnLoader): adjustments=None, ) - super(ConstantLoader, self).__init__(loaders) + self._loaders = loaders + + def load_adjusted_array(self, columns, dates, assets, mask): + """ + Load by delegating to sub-loaders. + """ + out = [] + for col in columns: + try: + loader = self._loaders[col] + except KeyError: + raise ValueError("Couldn't find loader for %s" % col) + out.extend(loader.load_adjusted_array([col], dates, assets, mask)) + return out class SyntheticDailyBarWriter(BcolzDailyBarWriter): diff --git a/zipline/pipeline/term.py b/zipline/pipeline/term.py index 56855253..31301053 100644 --- a/zipline/pipeline/term.py +++ b/zipline/pipeline/term.py @@ -1,9 +1,11 @@ """ Base class for Filters, Factors and Classifiers """ +from abc import ABCMeta, abstractproperty from weakref import WeakValueDictionary from numpy import bool_, full, nan +from six import with_metaclass from zipline.errors import ( DTypeNotSpecified, @@ -38,23 +40,17 @@ class NotSpecified(object): return self -class Term(object): +class Term(with_metaclass(ABCMeta, object)): """ Base class for terms in a Pipeline API compute graph. """ # These are NotSpecified because a subclass is required to provide them. - inputs = NotSpecified - window_length = NotSpecified dtype = NotSpecified - mask = NotSpecified domain = NotSpecified _term_cache = WeakValueDictionary() def __new__(cls, - inputs=NotSpecified, - mask=NotSpecified, - window_length=NotSpecified, domain=NotSpecified, dtype=NotSpecified, *args, @@ -72,23 +68,6 @@ class Term(object): # Class-level attributes can be used to provide defaults for Term # subclasses. - if inputs is NotSpecified: - inputs = cls.inputs - # Having inputs = NotSpecified is an error, but we handle it later - # in self._validate rather than here. - if inputs is not NotSpecified: - # Allow users to specify lists as class-level defaults, but - # normalize to a tuple so that inputs is hashable. - inputs = tuple(inputs) - - if mask is NotSpecified: - mask = cls.mask - if mask is NotSpecified: - mask = AssetExists() - - if window_length is NotSpecified: - window_length = cls.window_length - if domain is NotSpecified: domain = cls.domain @@ -96,9 +75,6 @@ class Term(object): dtype = cls.dtype identity = cls.static_identity( - inputs=inputs, - mask=mask, - window_length=window_length, domain=domain, dtype=dtype, *args, **kwargs @@ -109,9 +85,6 @@ class Term(object): except KeyError: new_instance = cls._term_cache[identity] = \ super(Term, cls).__new__(cls)._init( - inputs=inputs, - mask=mask, - window_length=window_length, domain=domain, dtype=dtype, *args, **kwargs @@ -134,10 +107,7 @@ class Term(object): """ pass - def _init(self, inputs, mask, window_length, domain, dtype): - self.inputs = inputs - self.mask = mask - self.window_length = window_length + def _init(self, domain, dtype): self.domain = domain self.dtype = dtype @@ -145,7 +115,7 @@ class Term(object): return self @classmethod - def static_identity(cls, inputs, mask, window_length, domain, dtype): + def static_identity(cls, domain, dtype): """ Return the identity of the Term that would be constructed from the given arguments. @@ -157,79 +127,64 @@ class Term(object): This is a classmethod so that it can be called from Term.__new__ to determine whether to produce a new instance. """ - return (cls, inputs, mask, window_length, domain, dtype) + return (cls, domain, dtype) def _validate(self): """ Assert that this term is well-formed. This should be called exactly once, at the end of Term._init(). """ - if self.inputs is NotSpecified: - raise TermInputsNotSpecified(termname=type(self).__name__) - if self.window_length is NotSpecified: - raise WindowLengthNotSpecified(termname=type(self).__name__) if self.dtype is NotSpecified: raise DTypeNotSpecified(termname=type(self).__name__) - if self.mask is NotSpecified and not self.atomic: - # This isn't user error, this is a bug in our code. - raise AssertionError("{term} has no mask".format(term=self)) - if self.window_length: - for child in self.inputs: - if not child.atomic: - raise InputTermNotAtomic(parent=self, child=child) - - @lazyval - def atomic(self): + @abstractproperty + def inputs(self): """ - Whether or not this term has dependencies. - - If term.atomic is truthy, it should have dataset and dtype attributes. - """ - return len(self.inputs) == 0 - - @lazyval - def windowed(self): - """ - Whether or not this term represents a trailing window computation. - - If term.windowed is truthy, its compute_from_windows method will be - called with instances of AdjustedArray as inputs. - - If term.windowed is falsey, its compute_from_baseline will be called - with instances of np.ndarray as inputs. - """ - return ( - self.window_length is not NotSpecified - and self.window_length > 0 - ) - - @lazyval - def extra_input_rows(self): - """ - The number of extra rows needed for each of our inputs to compute this - term. - """ - return max(0, self.window_length - 1) - - def _compute(self, inputs, dates, assets, mask): - """ - Subclasses should implement this to perform actual computation. - - This is `_compute` rather than just `compute` because `compute` is - reserved for user-supplied functions in CustomFactor. + A tuple of other Terms that this Term requires for computation. """ raise NotImplementedError() + @abstractproperty + def mask(self): + """ + A 2D Filter representing asset/date pairs to include while + computing this Term. (True means include; False means exclude.) + """ + raise NotImplementedError() + + @lazyval + def dependencies(self): + return self.inputs + (self.mask,) + + @lazyval + def atomic(self): + return not any(dep for dep in self.dependencies + if dep is not AssetExists()) + + +class AssetExists(Term): + """ + Pseudo-filter describing whether or not an asset existed on a given day. + This is the default mask for all terms that haven't been passed a mask + explicitly. + + This is morally a Filter, in the sense that it produces a boolean value for + every asset on every date. We don't subclass Filter, however, because + `AssetExists` is computed directly by the PipelineEngine. + + See Also + -------- + zipline.assets.AssetFinder.lifetimes + """ + dtype = bool_ + dataset = None + extra_input_rows = 0 + inputs = () + dependencies = () + mask = None + def __repr__(self): - return ( - "{type}({inputs}, window_length={window_length})" - ).format( - type=type(self).__name__, - inputs=self.inputs, - window_length=self.window_length, - mask=self.mask, - ) + return "AssetExists()" # TODO: Move mixins to a separate file? @@ -307,28 +262,107 @@ class CustomTermMixin(object): return out -class AssetExists(Term): - """ - Pseudo-filter describing whether or not an asset existed on a given day. - This is the default mask for all terms that haven't been passed a mask - explicitly. +class CompositeTerm(Term): + inputs = NotSpecified + window_length = NotSpecified + mask = NotSpecified - This is morally a Filter, in the sense that it produces a boolean value for - every asset on every date. We don't subclass Filter, however, because - `AssetExists` is computed directly by the PipelineEngine. + def __new__(cls, inputs=NotSpecified, window_length=NotSpecified, + mask=NotSpecified, *args, **kwargs): - See Also - -------- - zipline.assets.AssetFinder.lifetimes - """ - inputs = () - dtype = bool_ - window_length = 0 - mask = None + if inputs is NotSpecified: + inputs = cls.inputs + # Having inputs = NotSpecified is an error, but we handle it later + # in self._validate rather than here. + if inputs is not NotSpecified: + # Allow users to specify lists as class-level defaults, but + # normalize to a tuple so that inputs is hashable. + inputs = tuple(inputs) - def _compute(self, *args, **kwargs): - # TODO: Consider moving the bulk of the logic from - # SimplePipelineEngine._compute_root_mask here. - raise NotImplementedError( - "Direct computation of AssetExists is not supported!" + if mask is NotSpecified: + mask = cls.mask + if mask is NotSpecified: + mask = AssetExists() + + if window_length is NotSpecified: + window_length = cls.window_length + + return super(CompositeTerm, cls).__new__(cls, inputs=inputs, mask=mask, + window_length=window_length, + *args, **kwargs) + + def _init(self, inputs, window_length, mask, *args, **kwargs): + self.inputs = inputs + self.window_length = window_length + self.mask = mask + return super(CompositeTerm, self)._init(*args, **kwargs) + + @classmethod + def static_identity(cls, inputs, window_length, mask, *args, **kwargs): + return ( + super(CompositeTerm, cls).static_identity(*args, **kwargs), + inputs, + window_length, + mask, + ) + + def _validate(self): + """ + Assert that this term is well-formed. This should be called exactly + once, at the end of Term._init(). + """ + if self.inputs is NotSpecified: + raise TermInputsNotSpecified(termname=type(self).__name__) + if self.window_length is NotSpecified: + raise WindowLengthNotSpecified(termname=type(self).__name__) + if self.mask is NotSpecified: + # This isn't user error, this is a bug in our code. + raise AssertionError("{term} has no mask".format(term=self)) + + if self.window_length: + for child in self.inputs: + if not child.atomic: + raise InputTermNotAtomic(parent=self, child=child) + + return super(CompositeTerm, self)._validate() + + def _compute(self, inputs, dates, assets, mask): + """ + Subclasses should implement this to perform actual computation. + This is `_compute` rather than just `compute` because `compute` is + reserved for user-supplied functions in CustomFactor. + """ + raise NotImplementedError() + + @lazyval + def windowed(self): + """ + Whether or not this term represents a trailing window computation. + + If term.windowed is truthy, its compute_from_windows method will be + called with instances of AdjustedArray as inputs. + + If term.windowed is falsey, its compute_from_baseline will be called + with instances of np.ndarray as inputs. + """ + return ( + self.window_length is not NotSpecified + and self.window_length > 0 + ) + + @lazyval + def extra_input_rows(self): + """ + The number of extra rows needed for each of our inputs to compute this + term. + """ + return max(0, self.window_length - 1) + + def __repr__(self): + return ( + "{type}({inputs}, window_length={window_length})" + ).format( + type=type(self).__name__, + inputs=self.inputs, + window_length=self.window_length, ) diff --git a/zipline/pipeline/visualize.py b/zipline/pipeline/visualize.py index 03297a27..44ffe530 100644 --- a/zipline/pipeline/visualize.py +++ b/zipline/pipeline/visualize.py @@ -92,7 +92,7 @@ def _render(g, out, format_, include_asset_exists=False): graph_attrs = {'rankdir': 'TB', 'splines': 'ortho'} cluster_attrs = {'style': 'filled', 'color': 'lightgoldenrod1'} - in_nodes = list(node for node in g if node.atomic) + in_nodes = g.atomic_terms out_nodes = list(g.outputs.values()) f = BytesIO()