diff --git a/nosetests.xml b/nosetests.xml
index 3a2d4edb..7769f599 100644
--- a/nosetests.xml
+++ b/nosetests.xml
@@ -1 +1 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/qsim/transforms/base.py b/qsim/transforms/base.py
index 013b704b..c44a6357 100644
--- a/qsim/transforms/base.py
+++ b/qsim/transforms/base.py
@@ -1,6 +1,23 @@
+"""
+Transforms
+==========
+
+Transforms provide re-useable components for stream processing. All
+Transforms expect to receive data events from qsim.simulator.feed.DataFeed
+asynchronously via zeromq. Each transform is designed to run in independent
+process space, independently of all other transforms, to allow for parallel
+computation.
+
+Each transform must maintain the state necessary to calculate the transform of
+each new feed events.
+
+To simplify the consumption of feed and transform data events, this module
+also provides the TransformsMerge class. TransformsMerge initializes as set of
+transforms and subscribes to their output. Each feed event is then combined with
+all the transforms of that event into a single new message.
+
+"""
import zmq
-import logging
-import datetime
import json
import copy
import multiprocessing
@@ -8,7 +25,8 @@ import qsim.util as qutil
import qsim.simulator.config as config
class Transform(object):
- """Parent class for feed transforms. Subclass and override transform method to create a new derived value from the combined feed."""
+ """Parent class for feed transforms. Subclass and override transform
+ method to create a new derived value from the combined feed."""
def __init__(self, feed, config_dict, result_address):
"""
@@ -19,15 +37,13 @@ class Transform(object):
server - if True, transform will bind to the result address (and act as a server), if False it will connect. The
the last transform in a series should be server=True so that clients can connect.
"""
- self.logger = qutil.logger
+
self.feed = feed
- self.feed_address = feed.feed_address
self.result_address = result_address
self.config = config.Config(config_dict)
- self.name = self.config.get_string('name')
- self.sync = FeedSync(feed, self.name)
+ self.sync = FeedSync(feed, self.state['name'])
self.state = {}
- self.state['name'] = self.name
+ self.state['name'] = self.config.name
self.received_count = 0
self.sent_count = 0
@@ -39,10 +55,10 @@ class Transform(object):
def open(self):
self.context = zmq.Context()
- self.logger.info("starting {name} transform".format(name = self.name))
+ qutil.logger.info("starting {name} transform".format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
- self.feed_socket.connect(self.feed_address)
+ self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
@@ -50,25 +66,25 @@ class Transform(object):
self.result_socket.connect(self.result_address)
def process_all(self):
- self.logger.info("starting {name} event loop".format(name = self.name))
+ qutil.logger.info("starting {name} event loop".format(name = self.state['name']))
self.sync.confirm()
while True:
message = self.feed_socket.recv()
if(message == "DONE"):
- self.logger.info("{name} received the Done message from the feed".format(name=self.name))
+ qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name']))
self.result_socket.send("DONE")
break;
self.received_count += 1
event = json.loads(message)
cur_state = self.transform(event)
cur_state['dt'] = event['dt']
- cur_state['name'] = self.name
+ cur_state['name'] = self.state['name']
self.result_socket.send(json.dumps(cur_state))
self.sent_count += 1
def close(self):
- self.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.name, r=self.received_count, s=self.sent_count))
+ qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count))
self.feed_socket.close()
self.result_socket.close()
@@ -121,10 +137,10 @@ class MergedTransformsFeed(Transform):
def open(self):
self.context = zmq.Context()
- self.logger.info("starting {name} transform".format(name = self.name))
+ qutil.logger.info("starting {name} transform".format(name = self.state['name']))
#create the feed SUB.
self.feed_socket = self.context.socket(zmq.SUB)
- self.feed_socket.connect(self.feed_address)
+ self.feed_socket.connect(self.feed.feed_address)
self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
#create the result PUSH
@@ -142,7 +158,7 @@ class MergedTransformsFeed(Transform):
self.poller.register(self.transform_socket, zmq.POLLIN)
for name, transform in self.transforms.iteritems():
- self.logger.info("starting {name}".format(name=name))
+ qutil.logger.info("starting {name}".format(name=name))
proc = multiprocessing.Process(target=transform.run)
proc.start()
@@ -161,7 +177,7 @@ class MergedTransformsFeed(Transform):
if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN:
message = self.feed_socket.recv()
if(message == "DONE"):
- self.logger.info("finished receiving feed to merge")
+ qutil.logger.info("finished receiving feed to merge")
done_count += 1
else:
self.received_count += 1
@@ -171,7 +187,7 @@ class MergedTransformsFeed(Transform):
if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN:
t_message = self.transform_socket.recv()
if(t_message == "DONE"):
- self.logger.info("finished receiving a transform to merge")
+ qutil.logger.info("finished receiving a transform to merge")
done_count += 1
else:
self.received_count += 1
@@ -183,13 +199,13 @@ class MergedTransformsFeed(Transform):
self.data_buffer.send_next()
- self.logger.info("Transform {name} received {r} and sent {s}".format(name=self.name, r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
- self.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
+ qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+ qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
#drain any remaining messages in the buffer
self.data_buffer.drain()
#signal to client that we're done
self.result_socket.send("DONE")
- self.logger.info("Transform {name} received {r} and sent {s}".format(name=self.name, r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+ qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))