From f85ad50e604335cf455ac85543cf19027333c634 Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Mon, 19 Nov 2012 14:39:03 -0500 Subject: [PATCH 1/2] Breaks the sources module into pieces. Clearing the way for adding in a DataSource class within the sources module. --- zipline/sources/__init__.py | 7 ++ zipline/sources/data_frame_source.py | 84 +++++++++++++++++++ .../{sources.py => sources/test_source.py} | 63 +------------- 3 files changed, 92 insertions(+), 62 deletions(-) create mode 100644 zipline/sources/__init__.py create mode 100644 zipline/sources/data_frame_source.py rename zipline/{sources.py => sources/test_source.py} (74%) diff --git a/zipline/sources/__init__.py b/zipline/sources/__init__.py new file mode 100644 index 00000000..e22a4c1e --- /dev/null +++ b/zipline/sources/__init__.py @@ -0,0 +1,7 @@ +from zipline.sources.data_frame_source import DataFrameSource +from zipline.sources.test_source import SpecificEquityTrades + +__all__ = [ + 'DataFrameSource', + 'SpecificEquityTrades' +] diff --git a/zipline/sources/data_frame_source.py b/zipline/sources/data_frame_source.py new file mode 100644 index 00000000..e9897aae --- /dev/null +++ b/zipline/sources/data_frame_source.py @@ -0,0 +1,84 @@ +# +# Copyright 2012 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tools to generate data sources. +""" +from copy import copy +from itertools import ifilter + +import pandas as pd + +from zipline.gens.utils import hash_args + +from zipline.protocol import DATASOURCE_TYPE +from zipline.utils import ndict + +from zipline.sources.test_source import SpecificEquityTrades + + +class DataFrameSource(SpecificEquityTrades): + """ + Yields all events in event_list that match the given sid_filter. + If no event_list is specified, generates an internal stream of events + to filter. Returns all events if filter is None. + + Configuration options: + + count : integer representing number of trades + sids : list of values representing simulated internal sids + start : start date + delta : timedelta between internal events + filter : filter to remove the sids + """ + + def __init__(self, data, **kwargs): + assert isinstance(data.index, pd.tseries.index.DatetimeIndex) + + self.data = data + # Unpack config dictionary with default values. + self.count = kwargs.get('count', len(data)) + self.sids = kwargs.get('sids', data.columns) + self.start = kwargs.get('start', data.index[0]) + self.end = kwargs.get('end', data.index[-1]) + self.delta = kwargs.get('delta', data.index[1] - data.index[0]) + + # Hash_value for downstream sorting. + self.arg_string = hash_args(data, **kwargs) + + self.generator = self.create_fresh_generator() + + def create_fresh_generator(self): + def _generator(df=self.data): + for dt, series in df.iterrows(): + if (dt < self.start) or (dt > self.end): + continue + event = { + 'dt': dt, + 'source_id': self.get_hash(), + 'type': DATASOURCE_TYPE.TRADE + } + + for sid, price in series.iterkv(): + event = copy(event) + event['sid'] = sid + event['price'] = price + event['volume'] = 1000 + + yield ndict(event) + + # Return the filtered event stream. + drop_sids = lambda x: x.sid in self.sids + return ifilter(drop_sids, _generator()) diff --git a/zipline/sources.py b/zipline/sources/test_source.py similarity index 74% rename from zipline/sources.py rename to zipline/sources/test_source.py index 7d13f1a1..2525d524 100644 --- a/zipline/sources.py +++ b/zipline/sources/test_source.py @@ -14,22 +14,16 @@ # limitations under the License. """ -Tools to generate data sources. +A source to be used in testing. """ -__all__ = ['DataFrameSource', 'SpecificEquityTrades'] - import random import pytz from itertools import cycle, ifilter, izip from datetime import datetime, timedelta -import pandas as pd -from copy import copy import numpy as np -from zipline.protocol import DATASOURCE_TYPE -from zipline.utils import ndict from zipline.gens.utils import hash_args, create_trade @@ -194,58 +188,3 @@ class SpecificEquityTrades(object): # Return the filtered event stream. return filtered - - -class DataFrameSource(SpecificEquityTrades): - """ - Yields all events in event_list that match the given sid_filter. - If no event_list is specified, generates an internal stream of events - to filter. Returns all events if filter is None. - - Configuration options: - - count : integer representing number of trades - sids : list of values representing simulated internal sids - start : start date - delta : timedelta between internal events - filter : filter to remove the sids - """ - - def __init__(self, data, **kwargs): - assert isinstance(data.index, pd.tseries.index.DatetimeIndex) - - self.data = data - # Unpack config dictionary with default values. - self.count = kwargs.get('count', len(data)) - self.sids = kwargs.get('sids', data.columns) - self.start = kwargs.get('start', data.index[0]) - self.end = kwargs.get('end', data.index[-1]) - self.delta = kwargs.get('delta', data.index[1] - data.index[0]) - - # Hash_value for downstream sorting. - self.arg_string = hash_args(data, **kwargs) - - self.generator = self.create_fresh_generator() - - def create_fresh_generator(self): - def _generator(df=self.data): - for dt, series in df.iterrows(): - if (dt < self.start) or (dt > self.end): - continue - event = { - 'dt': dt, - 'source_id': self.get_hash(), - 'type': DATASOURCE_TYPE.TRADE - } - - for sid, price in series.iterkv(): - event = copy(event) - event['sid'] = sid - event['price'] = price - event['volume'] = 1000 - - yield ndict(event) - - # Return the filtered event stream. - drop_sids = lambda x: x.sid in self.sids - return ifilter(drop_sids, _generator()) From 5a95122f21524d7056da52507712d044c73838db Mon Sep 17 00:00:00 2001 From: Eddie Hebert Date: Mon, 19 Nov 2012 17:29:08 -0500 Subject: [PATCH 2/2] Converts data frame source over to using DataSource base object. --- zipline/sources/data_frame_source.py | 61 +++++++++++++------------- zipline/sources/data_source.py | 64 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 31 deletions(-) create mode 100644 zipline/sources/data_source.py diff --git a/zipline/sources/data_frame_source.py b/zipline/sources/data_frame_source.py index e9897aae..f9c43c07 100644 --- a/zipline/sources/data_frame_source.py +++ b/zipline/sources/data_frame_source.py @@ -16,20 +16,14 @@ """ Tools to generate data sources. """ -from copy import copy -from itertools import ifilter - import pandas as pd from zipline.gens.utils import hash_args -from zipline.protocol import DATASOURCE_TYPE -from zipline.utils import ndict - -from zipline.sources.test_source import SpecificEquityTrades +from zipline.sources.data_source import DataSource -class DataFrameSource(SpecificEquityTrades): +class DataFrameSource(DataSource): """ Yields all events in event_list that match the given sid_filter. If no event_list is specified, generates an internal stream of events @@ -37,7 +31,6 @@ class DataFrameSource(SpecificEquityTrades): Configuration options: - count : integer representing number of trades sids : list of values representing simulated internal sids start : start date delta : timedelta between internal events @@ -49,36 +42,42 @@ class DataFrameSource(SpecificEquityTrades): self.data = data # Unpack config dictionary with default values. - self.count = kwargs.get('count', len(data)) self.sids = kwargs.get('sids', data.columns) self.start = kwargs.get('start', data.index[0]) self.end = kwargs.get('end', data.index[-1]) - self.delta = kwargs.get('delta', data.index[1] - data.index[0]) # Hash_value for downstream sorting. self.arg_string = hash_args(data, **kwargs) - self.generator = self.create_fresh_generator() + self._raw_data = None - def create_fresh_generator(self): - def _generator(df=self.data): - for dt, series in df.iterrows(): - if (dt < self.start) or (dt > self.end): - continue - event = { - 'dt': dt, - 'source_id': self.get_hash(), - 'type': DATASOURCE_TYPE.TRADE - } + @property + def mapping(self): + return { + 'dt': (lambda x: x, 'dt'), + 'sid': (lambda x: x, 'sid'), + 'price': (float, 'price'), + 'volume': (int, 'volume'), + } - for sid, price in series.iterkv(): - event = copy(event) - event['sid'] = sid - event['price'] = price - event['volume'] = 1000 + @property + def instance_hash(self): + return self.arg_string - yield ndict(event) + def raw_data_gen(self): + for dt, series in self.data.iterrows(): + for sid, price in series.iterkv(): + if sid in self.sids: + event = { + 'dt': dt, + 'sid': sid, + 'price': price, + 'volume': 1000, + } + yield event - # Return the filtered event stream. - drop_sids = lambda x: x.sid in self.sids - return ifilter(drop_sids, _generator()) + @property + def raw_data(self): + if not self._raw_data: + self._raw_data = self.raw_data_gen() + return self._raw_data diff --git a/zipline/sources/data_source.py b/zipline/sources/data_source.py new file mode 100644 index 00000000..b5c8750e --- /dev/null +++ b/zipline/sources/data_source.py @@ -0,0 +1,64 @@ +from abc import ( + ABCMeta, + abstractproperty +) + +from zipline.protocol import DATASOURCE_TYPE +from zipline.utils.protocol_utils import ndict + + +class DataSource(object): + + __metaclass__ = ABCMeta + + @property + def event_type(self): + return DATASOURCE_TYPE.TRADE + + @property + def mapping(self): + """ + Mappings of the form: + target_key: (mapping_function, source_key) + """ + return {} + + @abstractproperty + def raw_data(self): + """ + An iterator that yields the raw datasource, + in chronological order of data, one event at a time. + """ + NotImplemented + + @abstractproperty + def instance_hash(self): + """ + A hash that represents the unique args to the source. + """ + pass + + def get_hash(self): + return self.__class__.__name__ + "-" + self.instance_hash + + def apply_mapping(self, raw_row): + """ + Override this to hand craft conversion of row. + """ + row = {target: mapping_func(raw_row[source_key]) + for target, (mapping_func, source_key) + in self.mapping.items()} + row.update({'source_id': self.get_hash()}) + row.update({'type': self.event_type}) + return row + + @property + def mapped_data(self): + for row in self.raw_data: + yield ndict(self.apply_mapping(row)) + + def __iter__(self): + return self + + def next(self): + return self.mapped_data.next()