mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 13:20:24 +08:00
Cleaned up OOP on transforms.
This commit is contained in:
@@ -11,10 +11,10 @@ LOGGER = logging.getLogger('ZiplineLogger')
|
||||
|
||||
class Feed(Component):
|
||||
"""
|
||||
Connects to N PULL sockets, publishing all messages received to a PUB
|
||||
socket. Published messages are guaranteed to be in chronological order
|
||||
based on message property dt. Expects to be instantiated in one execution
|
||||
context (thread, process, etc) and run in another.
|
||||
Connects to N PULL sockets, publishing all messages received to a
|
||||
PUB socket. Published messages are guaranteed to be in chronological
|
||||
order based on message property dt. Expects to be instantiated in
|
||||
one execution context (thread, process, etc) and run in another.
|
||||
"""
|
||||
|
||||
def init(self):
|
||||
@@ -198,7 +198,7 @@ class Feed(Component):
|
||||
|
||||
def __len__(self):
|
||||
"""
|
||||
Buffer's length is same as internal map holding separate
|
||||
sorted arrays of events keyed by source id.
|
||||
Buffer's length is same as internal map holding separate sorted
|
||||
arrays of events keyed by source id.
|
||||
"""
|
||||
return len(self.data_buffer)
|
||||
|
||||
@@ -1,35 +1,18 @@
|
||||
import zipline.protocol as zp
|
||||
from zipline.transforms import BaseTransform
|
||||
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
|
||||
from zipline.transforms import BaseTransform
|
||||
from zipline.protocol import FEED_FRAME, TRANSFORM_TYPE
|
||||
|
||||
class PassthroughTransform(BaseTransform):
|
||||
"""
|
||||
A bypass transform which is also an identity transform::
|
||||
|
||||
+-------+
|
||||
+---| f |--->
|
||||
+-------+
|
||||
+------id------->
|
||||
|
||||
A bypass transform passes data through unchanged.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
BaseTransform.__init__(self, "PASSTHROUGH")
|
||||
self.init(**kwargs)
|
||||
|
||||
def init(self, **kwargs):
|
||||
pass
|
||||
|
||||
@property
|
||||
def get_type(self):
|
||||
return COMPONENT_TYPE.CONDUIT
|
||||
def init(self):
|
||||
self.state = { 'name': 'PASSTHROUGH' }
|
||||
|
||||
#TODO, could save some cycles by skipping the _UNFRAME call
|
||||
# and just setting value to original msg string.
|
||||
def transform(self, event):
|
||||
return {
|
||||
'name' : zp.TRANSFORM_TYPE.PASSTHROUGH,
|
||||
'value' : zp.FEED_FRAME(event)
|
||||
'name' : TRANSFORM_TYPE.PASSTHROUGH,
|
||||
'value' : FEED_FRAME(event)
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ class ComponentHost(Component):
|
||||
self.addresses = addresses
|
||||
self.running = False
|
||||
|
||||
# Component Registry, keyed by get_id
|
||||
# Component Registry, keyed by unique string
|
||||
# ----------------------
|
||||
self.components = {}
|
||||
# ----------------------
|
||||
@@ -109,8 +109,6 @@ class ComponentHost(Component):
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
cur_time = datetime.datetime.utcnow()
|
||||
|
||||
if len(self.components) == 0:
|
||||
LOGGER.info("Component register is empty.")
|
||||
return False
|
||||
|
||||
@@ -1,9 +1,17 @@
|
||||
"""
|
||||
Provides data handlers that can push messages to a zipline.core.DataFeed
|
||||
|
||||
::
|
||||
DataSource
|
||||
|
|
||||
TradeDataSource
|
||||
/ \
|
||||
RandomEquityTrades SpecificEquityTrades
|
||||
|
||||
"""
|
||||
import datetime
|
||||
import random
|
||||
import pytz
|
||||
import random
|
||||
import datetime
|
||||
from mock import Mock
|
||||
|
||||
from zipline.components import DataSource
|
||||
|
||||
@@ -21,15 +21,6 @@ class BaseTransform(Component):
|
||||
method to create a new derived value from the combined feed.
|
||||
"""
|
||||
|
||||
def __init__(self, name, **kwargs):
|
||||
Component.__init__(self)
|
||||
|
||||
self.state = {
|
||||
'name': name
|
||||
}
|
||||
|
||||
self.init(**kwargs)
|
||||
|
||||
def init(self):
|
||||
pass
|
||||
|
||||
@@ -124,10 +115,10 @@ class BaseTransform(Component):
|
||||
|
||||
{name:"name of new transform", value: "value of new field"}
|
||||
|
||||
Transforms run in parallel and results are merged into a single map, so
|
||||
transform names must be unique. Best practice is to use the self.state
|
||||
object initialized from the transform configuration, and only set the
|
||||
transformed value::
|
||||
Transforms run in parallel and results are merged into a
|
||||
single map, so transform names must be unique. Best practice
|
||||
is to use the self.state object initialized from the transform
|
||||
configuration, and only set the transformed value::
|
||||
|
||||
self.state['value'] = transformed_value
|
||||
"""
|
||||
|
||||
@@ -11,9 +11,14 @@ from operator import attrgetter
|
||||
from datetime import datetime, timedelta
|
||||
import zipline.finance.risk as risk
|
||||
import zipline.protocol as zp
|
||||
|
||||
from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades
|
||||
from zipline.finance.trading import TradingEnvironment
|
||||
|
||||
# TODO
|
||||
def data_locator():
|
||||
pass
|
||||
|
||||
def load_market_data():
|
||||
fp_bm = open("./tests/benchmark.msgpack", "rb")
|
||||
bm_list = msgpack.loads(fp_bm.read())
|
||||
|
||||
Reference in New Issue
Block a user