mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 04:17:29 +08:00
Merge pull request #25 from quantopian/use-data-source-for-data-frame
Use data source for data frame
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
from zipline.sources.data_frame_source import DataFrameSource
|
||||
from zipline.sources.test_source import SpecificEquityTrades
|
||||
|
||||
__all__ = [
|
||||
'DataFrameSource',
|
||||
'SpecificEquityTrades'
|
||||
]
|
||||
@@ -0,0 +1,83 @@
|
||||
#
|
||||
# Copyright 2012 Quantopian, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Tools to generate data sources.
|
||||
"""
|
||||
import pandas as pd
|
||||
|
||||
from zipline.gens.utils import hash_args
|
||||
|
||||
from zipline.sources.data_source import DataSource
|
||||
|
||||
|
||||
class DataFrameSource(DataSource):
|
||||
"""
|
||||
Yields all events in event_list that match the given sid_filter.
|
||||
If no event_list is specified, generates an internal stream of events
|
||||
to filter. Returns all events if filter is None.
|
||||
|
||||
Configuration options:
|
||||
|
||||
sids : list of values representing simulated internal sids
|
||||
start : start date
|
||||
delta : timedelta between internal events
|
||||
filter : filter to remove the sids
|
||||
"""
|
||||
|
||||
def __init__(self, data, **kwargs):
|
||||
assert isinstance(data.index, pd.tseries.index.DatetimeIndex)
|
||||
|
||||
self.data = data
|
||||
# Unpack config dictionary with default values.
|
||||
self.sids = kwargs.get('sids', data.columns)
|
||||
self.start = kwargs.get('start', data.index[0])
|
||||
self.end = kwargs.get('end', data.index[-1])
|
||||
|
||||
# Hash_value for downstream sorting.
|
||||
self.arg_string = hash_args(data, **kwargs)
|
||||
|
||||
self._raw_data = None
|
||||
|
||||
@property
|
||||
def mapping(self):
|
||||
return {
|
||||
'dt': (lambda x: x, 'dt'),
|
||||
'sid': (lambda x: x, 'sid'),
|
||||
'price': (float, 'price'),
|
||||
'volume': (int, 'volume'),
|
||||
}
|
||||
|
||||
@property
|
||||
def instance_hash(self):
|
||||
return self.arg_string
|
||||
|
||||
def raw_data_gen(self):
|
||||
for dt, series in self.data.iterrows():
|
||||
for sid, price in series.iterkv():
|
||||
if sid in self.sids:
|
||||
event = {
|
||||
'dt': dt,
|
||||
'sid': sid,
|
||||
'price': price,
|
||||
'volume': 1000,
|
||||
}
|
||||
yield event
|
||||
|
||||
@property
|
||||
def raw_data(self):
|
||||
if not self._raw_data:
|
||||
self._raw_data = self.raw_data_gen()
|
||||
return self._raw_data
|
||||
@@ -0,0 +1,64 @@
|
||||
from abc import (
|
||||
ABCMeta,
|
||||
abstractproperty
|
||||
)
|
||||
|
||||
from zipline.protocol import DATASOURCE_TYPE
|
||||
from zipline.utils.protocol_utils import ndict
|
||||
|
||||
|
||||
class DataSource(object):
|
||||
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
@property
|
||||
def event_type(self):
|
||||
return DATASOURCE_TYPE.TRADE
|
||||
|
||||
@property
|
||||
def mapping(self):
|
||||
"""
|
||||
Mappings of the form:
|
||||
target_key: (mapping_function, source_key)
|
||||
"""
|
||||
return {}
|
||||
|
||||
@abstractproperty
|
||||
def raw_data(self):
|
||||
"""
|
||||
An iterator that yields the raw datasource,
|
||||
in chronological order of data, one event at a time.
|
||||
"""
|
||||
NotImplemented
|
||||
|
||||
@abstractproperty
|
||||
def instance_hash(self):
|
||||
"""
|
||||
A hash that represents the unique args to the source.
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_hash(self):
|
||||
return self.__class__.__name__ + "-" + self.instance_hash
|
||||
|
||||
def apply_mapping(self, raw_row):
|
||||
"""
|
||||
Override this to hand craft conversion of row.
|
||||
"""
|
||||
row = {target: mapping_func(raw_row[source_key])
|
||||
for target, (mapping_func, source_key)
|
||||
in self.mapping.items()}
|
||||
row.update({'source_id': self.get_hash()})
|
||||
row.update({'type': self.event_type})
|
||||
return row
|
||||
|
||||
@property
|
||||
def mapped_data(self):
|
||||
for row in self.raw_data:
|
||||
yield ndict(self.apply_mapping(row))
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
return self.mapped_data.next()
|
||||
@@ -14,22 +14,16 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Tools to generate data sources.
|
||||
A source to be used in testing.
|
||||
"""
|
||||
|
||||
__all__ = ['DataFrameSource', 'SpecificEquityTrades']
|
||||
|
||||
import random
|
||||
import pytz
|
||||
|
||||
from itertools import cycle, ifilter, izip
|
||||
from datetime import datetime, timedelta
|
||||
import pandas as pd
|
||||
from copy import copy
|
||||
import numpy as np
|
||||
|
||||
from zipline.protocol import DATASOURCE_TYPE
|
||||
from zipline.utils import ndict
|
||||
from zipline.gens.utils import hash_args, create_trade
|
||||
|
||||
|
||||
@@ -194,58 +188,3 @@ class SpecificEquityTrades(object):
|
||||
|
||||
# Return the filtered event stream.
|
||||
return filtered
|
||||
|
||||
|
||||
class DataFrameSource(SpecificEquityTrades):
|
||||
"""
|
||||
Yields all events in event_list that match the given sid_filter.
|
||||
If no event_list is specified, generates an internal stream of events
|
||||
to filter. Returns all events if filter is None.
|
||||
|
||||
Configuration options:
|
||||
|
||||
count : integer representing number of trades
|
||||
sids : list of values representing simulated internal sids
|
||||
start : start date
|
||||
delta : timedelta between internal events
|
||||
filter : filter to remove the sids
|
||||
"""
|
||||
|
||||
def __init__(self, data, **kwargs):
|
||||
assert isinstance(data.index, pd.tseries.index.DatetimeIndex)
|
||||
|
||||
self.data = data
|
||||
# Unpack config dictionary with default values.
|
||||
self.count = kwargs.get('count', len(data))
|
||||
self.sids = kwargs.get('sids', data.columns)
|
||||
self.start = kwargs.get('start', data.index[0])
|
||||
self.end = kwargs.get('end', data.index[-1])
|
||||
self.delta = kwargs.get('delta', data.index[1] - data.index[0])
|
||||
|
||||
# Hash_value for downstream sorting.
|
||||
self.arg_string = hash_args(data, **kwargs)
|
||||
|
||||
self.generator = self.create_fresh_generator()
|
||||
|
||||
def create_fresh_generator(self):
|
||||
def _generator(df=self.data):
|
||||
for dt, series in df.iterrows():
|
||||
if (dt < self.start) or (dt > self.end):
|
||||
continue
|
||||
event = {
|
||||
'dt': dt,
|
||||
'source_id': self.get_hash(),
|
||||
'type': DATASOURCE_TYPE.TRADE
|
||||
}
|
||||
|
||||
for sid, price in series.iterkv():
|
||||
event = copy(event)
|
||||
event['sid'] = sid
|
||||
event['price'] = price
|
||||
event['volume'] = 1000
|
||||
|
||||
yield ndict(event)
|
||||
|
||||
# Return the filtered event stream.
|
||||
drop_sids = lambda x: x.sid in self.sids
|
||||
return ifilter(drop_sids, _generator())
|
||||
Reference in New Issue
Block a user