From ca58632815261bd2b669d8e7f77c1fc78d948862 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Mon, 27 Jun 2016 09:51:27 -0400 Subject: [PATCH] MAINT: Remove DataSource and derived classes. The `DataSource` class and other classes derived from it are no longer used. Instead `DataPortal` and various `MinuteBarReader` and `DailyBarReaders` should be used. --- tests/test_algorithm.py | 74 +----------- zipline/sources/__init__.py | 7 -- zipline/sources/data_frame_source.py | 172 --------------------------- zipline/sources/data_source.py | 68 ----------- zipline/sources/simulated.py | 158 ------------------------ 5 files changed, 2 insertions(+), 477 deletions(-) delete mode 100644 zipline/sources/data_frame_source.py delete mode 100644 zipline/sources/data_source.py delete mode 100644 zipline/sources/simulated.py diff --git a/tests/test_algorithm.py b/tests/test_algorithm.py index 09d21887..1a08f2ca 100644 --- a/tests/test_algorithm.py +++ b/tests/test_algorithm.py @@ -17,7 +17,7 @@ import datetime from datetime import timedelta from textwrap import dedent import warnings -from unittest import TestCase, skip +from unittest import skip from copy import deepcopy import logbook @@ -74,8 +74,7 @@ from zipline.api import ( from zipline.finance.commission import PerShare from zipline.finance.execution import LimitOrder from zipline.finance.order import ORDER_STATUS -from zipline.finance.trading import TradingEnvironment, SimulationParameters -from zipline.sources import DataPanelSource +from zipline.finance.trading import SimulationParameters from zipline.testing import ( FakeDataPortal, create_daily_df_for_asset, @@ -3327,75 +3326,6 @@ class TestOrderCancelation(WithDataPortal, self.assertFalse(log_catcher.has_warnings) -@skip("fix in Q2") -class TestRemoveData(TestCase): - """ - tests if futures data is removed after max(expiration_date, end_date) - """ - def setUp(self): - self.env = env = TradingEnvironment() - start_date = pd.Timestamp('2015-01-02', tz='UTC') - start_ix = env.trading_days.get_loc(start_date) - days = env.trading_days - - metadata = { - 0: { - 'symbol': 'X', - 'start_date': env.trading_days[start_ix + 2], - 'expiration_date': env.trading_days[start_ix + 5], - 'end_date': env.trading_days[start_ix + 6], - }, - 1: { - 'symbol': 'Y', - 'start_date': env.trading_days[start_ix + 4], - 'expiration_date': env.trading_days[start_ix + 7], - 'end_date': env.trading_days[start_ix + 8], - } - } - - env.write_data(futures_data=metadata) - assetX, assetY = env.asset_finder.retrieve_all([0, 1]) - - index_x = days[days.slice_indexer(assetX.start_date, assetX.end_date)] - data_x = pd.DataFrame([[1, 100], [2, 100], [3, 100], [4, 100], - [5, 100]], - index=index_x, columns=['price', 'volume']) - - index_y = days[days.slice_indexer(assetY.start_date, assetY.end_date)] - data_y = pd.DataFrame([[6, 100], [7, 100], [8, 100], [9, 100], - [10, 100]], - index=index_y, columns=['price', 'volume']) - - self.trade_data = pd.Panel({0: data_x, 1: data_y}) - self.live_asset_counts = [] - assets = env.asset_finder.retrieve_all([0, 1]) - for day in self.trade_data.major_axis: - count = 0 - for asset in assets: - # We shouldn't see assets on their expiration dates. - if asset.start_date <= day <= asset.end_date: - count += 1 - self.live_asset_counts.append(count) - - def test_remove_data(self): - source = DataPanelSource(self.trade_data) - - def initialize(context): - context.data_lengths = [] - - def handle_data(context, data): - context.data_lengths.append(len(data)) - - algo = TradingAlgorithm( - initialize=initialize, - handle_data=handle_data, - env=self.env, - ) - - algo.run(source) - self.assertEqual(algo.data_lengths, self.live_asset_counts) - - class TestEquityAutoClose(WithTmpDir, WithTradingSchedule, ZiplineTestCase): """ Tests if delisted equities are properly removed from a portfolio holding diff --git a/zipline/sources/__init__.py b/zipline/sources/__init__.py index 343cce84..2d7ded60 100644 --- a/zipline/sources/__init__.py +++ b/zipline/sources/__init__.py @@ -1,12 +1,5 @@ -from .data_source import DataSource -from .data_frame_source import DataFrameSource, DataPanelSource from .test_source import SpecificEquityTrades -from .simulated import RandomWalkSource __all__ = [ - 'DataSource', - 'DataFrameSource', - 'DataPanelSource', 'SpecificEquityTrades', - 'RandomWalkSource' ] diff --git a/zipline/sources/data_frame_source.py b/zipline/sources/data_frame_source.py deleted file mode 100644 index eab04a61..00000000 --- a/zipline/sources/data_frame_source.py +++ /dev/null @@ -1,172 +0,0 @@ -# -# Copyright 2015 Quantopian, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Tools to generate data sources. -""" -import numpy as np -import pandas as pd - -from zipline.gens.utils import hash_args - -from zipline.sources.data_source import DataSource - - -class DataFrameSource(DataSource): - """ - Data source that yields from a pandas DataFrame. - - :Axis layout: - * columns : sids - * index : datetime - - :Note: - Bars where the price is nan are filtered out. - """ - - def __init__(self, data, **kwargs): - assert isinstance(data.index, pd.tseries.index.DatetimeIndex) - # Only accept integer SIDs as the items of the DataFrame - assert isinstance(data.columns, pd.Int64Index) - # TODO is ffilling correct/necessary? - # Forward fill prices - self.data = data.fillna(method='ffill') - # Unpack config dictionary with default values. - self.start = kwargs.get('start', self.data.index[0]) - self.end = kwargs.get('end', self.data.index[-1]) - self.sids = self.data.columns - - # Hash_value for downstream sorting. - self.arg_string = hash_args(data, **kwargs) - - self._raw_data = None - - self.started_sids = set() - - @property - def mapping(self): - return { - 'dt': (lambda x: x, 'dt'), - 'sid': (lambda x: x, 'sid'), - 'price': (float, 'price'), - 'volume': (int, 'volume'), - } - - @property - def instance_hash(self): - return self.arg_string - - def raw_data_gen(self): - for dt, series in self.data.iterrows(): - for sid, price in series.iteritems(): - # Skip SIDs that can not be forward filled - if np.isnan(price) and \ - sid not in self.started_sids: - continue - self.started_sids.add(sid) - - event = { - 'dt': dt, - 'sid': sid, - 'price': price, - # Just chose something large - # if no volume available. - 'volume': 1e9, - } - yield event - - @property - def raw_data(self): - if not self._raw_data: - self._raw_data = self.raw_data_gen() - return self._raw_data - - -class DataPanelSource(DataSource): - """ - Data source that yields from a pandas Panel. - - :Axis layout: - * items : sids - * major_axis : datetime - * minor_axis : price, volume, ... - - :Note: - Bars where the price is nan are filtered out. - """ - - def __init__(self, data, **kwargs): - assert isinstance(data.major_axis, pd.tseries.index.DatetimeIndex) - # Only accept integer SIDs as the items of the Panel - assert isinstance(data.items, pd.Int64Index) - # TODO is ffilling correct/necessary? - # forward fill with volumes of 0 - self.data = data.fillna(value={'volume': 0}) - # Unpack config dictionary with default values. - self.start = kwargs.get('start', self.data.major_axis[0]) - self.end = kwargs.get('end', self.data.major_axis[-1]) - self.sids = self.data.items - - # Hash_value for downstream sorting. - self.arg_string = hash_args(data, **kwargs) - - self._raw_data = None - - self.started_sids = set() - - @property - def mapping(self): - mapping = { - 'dt': (lambda x: x, 'dt'), - 'sid': (lambda x: x, 'sid'), - 'price': (float, 'price'), - 'volume': (int, 'volume'), - } - - # Add additional fields. - for field_name in self.data.minor_axis: - if field_name in ['price', 'volume', 'dt', 'sid']: - continue - mapping[field_name] = (lambda x: x, field_name) - - return mapping - - @property - def instance_hash(self): - return self.arg_string - - def raw_data_gen(self): - for dt in self.data.major_axis: - df = self.data.major_xs(dt) - for sid, series in df.iteritems(): - # Skip SIDs that can not be forward filled - if np.isnan(series['price']): - continue - self.started_sids.add(sid) - - event = { - 'dt': dt, - 'sid': sid, - } - for field_name, value in series.iteritems(): - event[field_name] = value - - yield event - - @property - def raw_data(self): - if not self._raw_data: - self._raw_data = self.raw_data_gen() - return self._raw_data diff --git a/zipline/sources/data_source.py b/zipline/sources/data_source.py deleted file mode 100644 index de91793a..00000000 --- a/zipline/sources/data_source.py +++ /dev/null @@ -1,68 +0,0 @@ -from abc import ( - ABCMeta, - abstractproperty -) - -from six import with_metaclass - -from zipline.protocol import DATASOURCE_TYPE -from zipline.protocol import Event - - -class DataSource(with_metaclass(ABCMeta)): - - @property - def event_type(self): - return DATASOURCE_TYPE.TRADE - - @property - def mapping(self): - """ - Mappings of the form: - target_key: (mapping_function, source_key) - """ - return {} - - @abstractproperty - def raw_data(self): - """ - An iterator that yields the raw datasource, - in chronological order of data, one event at a time. - """ - NotImplemented - - @abstractproperty - def instance_hash(self): - """ - A hash that represents the unique args to the source. - """ - pass - - def get_hash(self): - return self.__class__.__name__ + "-" + self.instance_hash - - def apply_mapping(self, raw_row): - """ - Override this to hand craft conversion of row. - """ - row = {} - row.update({'type': self.event_type}) - row.update({target: mapping_func(raw_row[source_key]) - for target, (mapping_func, source_key) - in self.mapping.items()}) - row.update({'source_id': self.get_hash()}) - return row - - @property - def mapped_data(self): - for row in self.raw_data: - yield Event(self.apply_mapping(row)) - - def __iter__(self): - return self - - def next(self): - return self.mapped_data.next() - - def __next__(self): - return next(self.mapped_data) diff --git a/zipline/sources/simulated.py b/zipline/sources/simulated.py deleted file mode 100644 index 272f5c78..00000000 --- a/zipline/sources/simulated.py +++ /dev/null @@ -1,158 +0,0 @@ -# -# Copyright 2014 Quantopian, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from copy import copy -import six - -import numpy as np -from datetime import timedelta -import pandas as pd - -from zipline.sources.data_source import DataSource -from zipline.gens.utils import hash_args -from zipline.utils.calendars import default_nyse_schedule - - -class RandomWalkSource(DataSource): - """RandomWalkSource that emits events with prices that follow a - random walk. Will generate valid datetimes that match market hours - of the supplied calendar and can generate emit events with - user-defined frequencies (e.g. minutely). - - """ - VALID_FREQS = frozenset(('daily', 'minute')) - - def __init__(self, start_prices=None, freq='minute', start=None, - end=None, drift=0.1, sd=0.1, - trading_schedule=default_nyse_schedule): - """ - :Arguments: - start_prices : dict - sid -> starting price. - Default: {0: 100, 1: 500} - freq : str - Emits events according to freq. - Can be 'daily' or 'minute' - start : datetime - Start dt to emit events. - end : datetime - End dt until to which emit events. - drift: float - Constant drift of the price series. - sd: float - Standard deviation of the price series. - trading_schedule : TradingSchedule object - TradingSchedule to use. - See zipline.utils for different choices. - - :Example: - # Assumes you have instantiated your Algorithm - # as myalgo. - myalgo = MyAlgo() - source = RandomWalkSource() - myalgo.run(source) - - """ - # Hash_value for downstream sorting. - self.arg_string = hash_args(start_prices, freq, start, end, - trading_schedule.__name__) - - if freq not in self.VALID_FREQS: - raise ValueError('%s not in %s' % (freq, self.VALID_FREQS)) - - self.freq = freq - if start_prices is None: - self.start_prices = {0: 100, - 1: 500} - else: - self.start_prices = start_prices - - self.trading_schedule = trading_schedule - if start is None: - self.start = trading_schedule.first_execution_day - else: - self.start = start - if end is None: - self.end = trading_schedule.last_execution_day - else: - self.end = end - - self.drift = drift - self.sd = sd - - self.sids = self.start_prices.keys() - - self.open_and_closes = \ - trading_schedule.schedule[self.start:self.end] - - self._raw_data = None - - @property - def instance_hash(self): - return self.arg_string - - @property - def mapping(self): - return { - 'dt': (lambda x: x, 'dt'), - 'sid': (lambda x: x, 'sid'), - 'price': (float, 'price'), - 'volume': (int, 'volume'), - 'open_price': (float, 'open_price'), - 'high': (float, 'high'), - 'low': (float, 'low'), - } - - def _gen_next_step(self, x): - x += np.random.randn() * self.sd + self.drift - return max(x, 0.1) - - def _gen_events(self, cur_prices, current_dt): - for sid, price in six.iteritems(cur_prices): - cur_prices[sid] = self._gen_next_step(cur_prices[sid]) - - event = { - 'dt': current_dt, - 'sid': sid, - 'price': cur_prices[sid], - 'volume': np.random.randint(1e5, 1e6), - 'open_price': cur_prices[sid], - 'high': cur_prices[sid] + .1, - 'low': cur_prices[sid] - .1, - } - - yield event - - def raw_data_gen(self): - cur_prices = copy(self.start_prices) - for _, (open_dt, close_dt) in self.open_and_closes.iterrows(): - current_dt = copy(open_dt) - if self.freq == 'minute': - # Emit minutely trade signals from open to close - while current_dt <= close_dt: - for event in self._gen_events(cur_prices, current_dt): - yield event - current_dt += timedelta(minutes=1) - elif self.freq == 'daily': - # Emit one signal per day at close - for event in self._gen_events( - cur_prices, pd.tslib.normalize_date(close_dt)): - yield event - - @property - def raw_data(self): - if not self._raw_data: - self._raw_data = self.raw_data_gen() - return self._raw_data