split up transform package into several modules

This commit is contained in:
fawce
2012-02-08 21:53:31 -05:00
parent cf2b3f1be0
commit 22be49c594
7 changed files with 280 additions and 257 deletions
+1 -1
View File
@@ -1 +1 @@
<?xml version="1.0" encoding="UTF-8"?><testsuite name="nosetests" tests="1" errors="0" failures="0" skip="0"><testcase classname="qsim.test.test_messaging.MessagingTestCase" name="test_client" time="0.374" /></testsuite>
<?xml version="1.0" encoding="UTF-8"?><testsuite name="nosetests" tests="2" errors="0" failures="0" skip="0"><testcase classname="qsim.test.test_messaging.MessagingTestCase" name="test_client" time="0.606" /><testcase classname="qsim.test.test_messaging.MessagingTestCase" name="test_moving_average_to_client" time="161.303" /></testsuite>
+3 -3
View File
@@ -5,7 +5,7 @@ import tornado
import multiprocessing
from qsim.simulator.feed import DataFeed
from qsim.transforms.base import MergedTransformsFeed
from qsim.transforms.merge import MergedTransformsFeed
from qsim.transforms.technical import MovingAverage
import qsim.util as qutil
@@ -40,8 +40,8 @@ class MessagingTestCase(unittest.TestCase):
self.assertEqual(self.total_data_count, client.received_count, "The client should have received ({n}) the same number of messages as the feed sent ({m}).".format(n=client.received_count, m=self.total_data_count))
def dtest_moving_average_to_client(self):
mavg = MovingAverage(self.feed, self.config['transforms'][0])
def test_moving_average_to_client(self):
mavg = MovingAverage(self.feed, self.config['transforms'][0], result_address="tcp://127.0.0.1:20202")
mavg_proc = multiprocessing.Process(target=mavg.run)
mavg_proc.start()
-247
View File
@@ -1,247 +0,0 @@
"""
Transforms
==========
Transforms provide re-useable components for stream processing. All
Transforms expect to receive data events from qsim.simulator.feed.DataFeed
asynchronously via zeromq. Each transform is designed to run in independent
process space, independently of all other transforms, to allow for parallel
computation.
Each transform must maintain the state necessary to calculate the transform of
each new feed events.
To simplify the consumption of feed and transform data events, this module
also provides the TransformsMerge class. TransformsMerge initializes as set of
transforms and subscribes to their output. Each feed event is then combined with
all the transforms of that event into a single new message.
"""
import zmq
import json
import copy
import multiprocessing
import qsim.util as qutil
import qsim.simulator.config as config
class Transform(object):
"""Parent class for feed transforms. Subclass and override transform
method to create a new derived value from the combined feed."""
def __init__(self, feed, config_dict, result_address):
"""
feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received.
result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE"
sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop
config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value
server - if True, transform will bind to the result address (and act as a server), if False it will connect. The
the last transform in a series should be server=True so that clients can connect.
"""
self.feed = feed
self.result_address = result_address
self.config = config.Config(config_dict)
self.state = {}
self.state['name'] = self.config.name
self.sync = FeedSync(feed, self.state['name'])
self.received_count = 0
self.sent_count = 0
def run(self):
"""Top level execution entry point for the transform::
- connects to the feed socket to subscribe to events
- connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
- processes all messages received from feed, until DONE message received
- pushes all transforms
- sends DONE to result socket, closes all sockets and context"""
self.open()
self.process_all()
self.close()
def open(self):
"""
Establishes zmq connections.
"""
self.context = zmq.Context()
qutil.logger.info("starting {name} transform".format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
self.result_socket = self.context.socket(zmq.PUSH)
self.result_socket.connect(self.result_address)
def process_all(self):
"""
Loops until feed's DONE message is received:
- receive an event from the data feed
- call transform (subclass' method) on event
- send the transformed event
"""
qutil.logger.info("starting {name} event loop".format(name = self.state['name']))
self.sync.confirm()
while True:
message = self.feed_socket.recv()
if(message == "DONE"):
qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name']))
self.result_socket.send("DONE")
break;
self.received_count += 1
event = json.loads(message)
cur_state = self.transform(event)
cur_state['dt'] = event['dt']
cur_state['name'] = self.state['name']
self.result_socket.send(json.dumps(cur_state))
self.sent_count += 1
def close(self):
"""
Shut down zmq resources.
"""
qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count))
self.feed_socket.close()
self.result_socket.close()
self.context.term()
def transform(self, event):
""" Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"}
Transforms run in parallel and results are merged into a single map, so transform names must be unique.
Best practice is to use the self.state object initialized from the transform configuration, and only set the
transformed value:
self.state['value'] = transformed_value
"""
return {}
class MergedTransformsFeed(Transform):
""" Merge data feed and array of transform feeds into a single result vector.
PULL from feed
PULL from child transforms
PUSH merged message to client
"""
def __init__(self, feed, props):
"""
config - must have an entry for 'transforms':array of dicts, which are
convertedto configs.
"""
Transform.__init__(self, feed, props, "tcp://127.0.0.1:20202")
self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104)
self.transform_socket = None
self.create_transforms(self.config.transforms)
def create_transforms(self, configs):
"""
Create transforms based on configs, set each transform's result address to
this object's transform_address, so that all transformed events will be delivered
to this object.
"""
self.transforms = {}
for props in configs:
class_name = props['class']
if(class_name == 'MovingAverage'):
mavg = MovingAverage(self.feed, props, self.transform_address)
self.transforms[mavg.name] = mavg
keys = copy.copy(self.transforms.keys())
keys.append("feed") #for the raw feed
self.data_buffer = MergedParallelBuffer(keys)
self.buffers = {}
for name, transform in self.transforms.iteritems():
self.buffers[name] = []
def open(self):
"""Establish zmq context, feed socket, result socket for client, and transform
socket to receive transformed events. Create and launch transforms. Will confirm
ready with the DataFeed at the conclusion."""
self.context = zmq.Context()
qutil.logger.info("starting {name} transform".format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
self.result_socket = self.context.socket(zmq.PUSH)
self.result_socket.bind(self.result_address)
#create the transform PULL.
self.transform_socket = self.context.socket(zmq.PULL)
self.transform_socket.bind(self.transform_address)
self.data_buffer.out_socket = self.result_socket
# Initialize poll set
self.poller = zmq.Poller()
self.poller.register(self.feed_socket, zmq.POLLIN)
self.poller.register(self.transform_socket, zmq.POLLIN)
for name, transform in self.transforms.iteritems():
qutil.logger.info("starting {name}".format(name=name))
proc = multiprocessing.Process(target=transform.run)
proc.start()
self.sync.confirm()
def close(self):
"""
Close all zmq sockets and context.
"""
self.transform_socket.close()
Transform.close(self)
def process_all(self):
"""
Uses a Poller to receive messages from all transforms and the feed.
All transforms corresponding to the same event are merged with each other
and the original feed event into a single message. That message is then
sent to the result socket.
"""
done_count = 0
while True:
socks = dict(self.poller.poll())
if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN:
message = self.feed_socket.recv()
if(message == "DONE"):
qutil.logger.info("finished receiving feed to merge")
done_count += 1
else:
self.received_count += 1
event = json.loads(message)
self.data_buffer.append("feed",event)
if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN:
t_message = self.transform_socket.recv()
if(t_message == "DONE"):
qutil.logger.info("finished receiving a transform to merge")
done_count += 1
else:
self.received_count += 1
t_event = json.loads(t_message)
self.data_buffer.append(t_event['name'], t_event)
if(done_count >= len(self.data_buffer)):
break #done!
self.data_buffer.send_next()
qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
#drain any remaining messages in the buffer
self.data_buffer.drain()
#signal to client that we're done
self.result_socket.send("DONE")
qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+121
View File
@@ -0,0 +1,121 @@
"""
Transforms
==========
Transforms provide re-useable components for stream processing. All
Transforms expect to receive data events from qsim.simulator.feed.DataFeed
asynchronously via zeromq. Each transform is designed to run in independent
process space, independently of all other transforms, to allow for parallel
computation.
Each transform must maintain the state necessary to calculate the transform of
each new feed events.
To simplify the consumption of feed and transform data events, this module
also provides the TransformsMerge class. TransformsMerge initializes as set of
transforms and subscribes to their output. Each feed event is then combined with
all the transforms of that event into a single new message.
"""
import zmq
import json
import qsim.util as qutil
import qsim.simulator.config as config
class BaseTransform(object):
"""Parent class for feed transforms. Subclass and override transform
method to create a new derived value from the combined feed."""
def __init__(self, feed, config_dict, result_address):
"""
feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received.
result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE"
sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop
config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value
server - if True, transform will bind to the result address (and act as a server), if False it will connect. The
the last transform in a series should be server=True so that clients can connect.
"""
self.feed = feed
self.result_address = result_address
self.config = config.Config(config_dict)
self.state = {}
self.state['name'] = self.config.name
self.sync = qutil.FeedSync(feed, self.state['name'])
self.received_count = 0
self.sent_count = 0
self.context = None
def run(self):
"""Top level execution entry point for the transform::
- connects to the feed socket to subscribe to events
- connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
- processes all messages received from feed, until DONE message received
- pushes all transforms
- sends DONE to result socket, closes all sockets and context"""
self.open()
self.process_all()
self.close()
def open(self):
"""
Establishes zmq connections.
"""
self.context = zmq.Context()
qutil.logger.info("starting {name} transform".
format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
self.result_socket = self.context.socket(zmq.PUSH)
self.result_socket.connect(self.result_address)
def process_all(self):
"""
Loops until feed's DONE message is received:
- receive an event from the data feed
- call transform (subclass' method) on event
- send the transformed event
"""
qutil.logger.info("starting {name} event loop".format(name = self.state['name']))
self.sync.confirm()
while True:
message = self.feed_socket.recv()
if(message == "DONE"):
qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name']))
self.result_socket.send("DONE")
break;
self.received_count += 1
event = json.loads(message)
cur_state = self.transform(event)
cur_state['dt'] = event['dt']
cur_state['name'] = self.state['name']
self.result_socket.send(json.dumps(cur_state))
self.sent_count += 1
def close(self):
"""
Shut down zmq resources.
"""
qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count))
self.feed_socket.close()
self.result_socket.close()
self.context.term()
def transform(self, event):
""" Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"}
Transforms run in parallel and results are merged into a single map, so transform names must be unique.
Best practice is to use the self.state object initialized from the transform configuration, and only set the
transformed value:
self.state['value'] = transformed_value
"""
return {}
+134
View File
@@ -0,0 +1,134 @@
import copy
import multiprocessing
import zmq
import technical as ta
from core import BaseTransform
import qsim.util as qutil
class MergedTransformsFeed(BaseTransform):
""" Merge data feed and array of transform feeds into a single result vector.
PULL from feed
PULL from child transforms
PUSH merged message to client
"""
def __init__(self, feed, props):
"""
config - must have an entry for 'transforms':array of dicts, which are
convertedto configs.
"""
BaseTransform.__init__(self, feed, props, "tcp://127.0.0.1:20202")
self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104)
self.transform_socket = None
self.create_transforms(self.config.transforms)
def create_transforms(self, configs):
"""
Create transforms based on configs, set each transform's result address to
this object's transform_address, so that all transformed events will be delivered
to this object.
"""
self.transforms = {}
for props in configs:
class_name = props['class']
if(class_name == 'MovingAverage'):
mavg = ta.MovingAverage(self.feed, props, self.transform_address)
self.transforms[mavg.config.name] = mavg
keys = copy.copy(self.transforms.keys())
keys.append("feed") #for the raw feed
self.data_buffer = qutil.MergedParallelBuffer(keys)
self.buffers = {}
for name, transform in self.transforms.iteritems():
self.buffers[name] = []
def open(self):
"""Establish zmq context, feed socket, result socket for client, and transform
socket to receive transformed events. Create and launch transforms. Will confirm
ready with the DataFeed at the conclusion."""
self.context = zmq.Context()
qutil.logger.info("starting {name} transform".format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
self.result_socket = self.context.socket(zmq.PUSH)
self.result_socket.bind(self.result_address)
#create the transform PULL.
self.transform_socket = self.context.socket(zmq.PULL)
self.transform_socket.bind(self.transform_address)
self.data_buffer.out_socket = self.result_socket
# Initialize poll set
self.poller = zmq.Poller()
self.poller.register(self.feed_socket, zmq.POLLIN)
self.poller.register(self.transform_socket, zmq.POLLIN)
for name, transform in self.transforms.iteritems():
qutil.logger.info("starting {name}".format(name=name))
proc = multiprocessing.Process(target=transform.run)
proc.start()
self.sync.confirm()
def close(self):
"""
Close all zmq sockets and context.
"""
self.transform_socket.close()
BaseTransform.close(self)
def process_all(self):
"""
Uses a Poller to receive messages from all transforms and the feed.
All transforms corresponding to the same event are merged with each other
and the original feed event into a single message. That message is then
sent to the result socket.
"""
done_count = 0
while True:
socks = dict(self.poller.poll())
if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN:
message = self.feed_socket.recv()
if(message == "DONE"):
qutil.logger.info("finished receiving feed to merge")
done_count += 1
else:
self.received_count += 1
event = json.loads(message)
self.data_buffer.append("feed",event)
if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN:
t_message = self.transform_socket.recv()
if(t_message == "DONE"):
qutil.logger.info("finished receiving a transform to merge")
done_count += 1
else:
self.received_count += 1
t_event = json.loads(t_message)
self.data_buffer.append(t_event['name'], t_event)
if(done_count >= len(self.data_buffer)):
break #done!
self.data_buffer.send_next()
qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
#drain any remaining messages in the buffer
self.data_buffer.drain()
#signal to client that we're done
self.result_socket.send("DONE")
qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+18 -3
View File
@@ -1,9 +1,22 @@
import qsim.transforms.base as base
"""
Transformations for common technical indicators.
TODO: add MACD transform
TODO: add trailing stop
class MovingAverage(base.Transform):
"""
import datetime
import qsim.util as qutil
from core import BaseTransform
class MovingAverage(BaseTransform):
"""
Calculate a unweighted moving average for props['sid'] security
TODO: add sid filter.
"""
def __init__(self, feed, props, result_address):
base.Transform.__init__(self, feed, props, result_address)
BaseTransform.__init__(self, feed, props, result_address)
self.events = []
self.window = datetime.timedelta(days = self.config.get_integer('days'),
@@ -18,6 +31,8 @@ class MovingAverage(base.Transform):
def transform(self, event):
"""Update the moving average with the latest data point."""
self.events.append(event)
#filter the event list to the window length.
+3 -3
View File
@@ -6,11 +6,11 @@ with-coverage=1
cover-package=qsim
#cover-erase=1
cover-html=1
cover-html-dir=cover
cover-html-dir=docs/_build/html/cover
# Drop into debugger on failure
pdb=0
pdb-failures=0
#pdb=0
#pdb-failures=0
# For Jenkins
with-coverage=1