From 1c593ce541b31d45bcb04a01874e9da1cfeda72d Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 8 Feb 2012 20:56:36 -0500 Subject: [PATCH] de-linting --- nosetests.xml | 2 +- qsim/transforms/base.py | 60 ++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/nosetests.xml b/nosetests.xml index 3a2d4edb..7769f599 100644 --- a/nosetests.xml +++ b/nosetests.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/qsim/transforms/base.py b/qsim/transforms/base.py index 013b704b..c44a6357 100644 --- a/qsim/transforms/base.py +++ b/qsim/transforms/base.py @@ -1,6 +1,23 @@ +""" +Transforms +========== + +Transforms provide re-useable components for stream processing. All +Transforms expect to receive data events from qsim.simulator.feed.DataFeed +asynchronously via zeromq. Each transform is designed to run in independent +process space, independently of all other transforms, to allow for parallel +computation. + +Each transform must maintain the state necessary to calculate the transform of +each new feed events. + +To simplify the consumption of feed and transform data events, this module +also provides the TransformsMerge class. TransformsMerge initializes as set of +transforms and subscribes to their output. Each feed event is then combined with +all the transforms of that event into a single new message. + +""" import zmq -import logging -import datetime import json import copy import multiprocessing @@ -8,7 +25,8 @@ import qsim.util as qutil import qsim.simulator.config as config class Transform(object): - """Parent class for feed transforms. Subclass and override transform method to create a new derived value from the combined feed.""" + """Parent class for feed transforms. Subclass and override transform + method to create a new derived value from the combined feed.""" def __init__(self, feed, config_dict, result_address): """ @@ -19,15 +37,13 @@ class Transform(object): server - if True, transform will bind to the result address (and act as a server), if False it will connect. The the last transform in a series should be server=True so that clients can connect. """ - self.logger = qutil.logger + self.feed = feed - self.feed_address = feed.feed_address self.result_address = result_address self.config = config.Config(config_dict) - self.name = self.config.get_string('name') - self.sync = FeedSync(feed, self.name) + self.sync = FeedSync(feed, self.state['name']) self.state = {} - self.state['name'] = self.name + self.state['name'] = self.config.name self.received_count = 0 self.sent_count = 0 @@ -39,10 +55,10 @@ class Transform(object): def open(self): self.context = zmq.Context() - self.logger.info("starting {name} transform".format(name = self.name)) + qutil.logger.info("starting {name} transform".format(name = self.state['name'])) #create the feed SUB. self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed_address) + self.feed_socket.connect(self.feed.feed_address) self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') #create the result PUSH @@ -50,25 +66,25 @@ class Transform(object): self.result_socket.connect(self.result_address) def process_all(self): - self.logger.info("starting {name} event loop".format(name = self.name)) + qutil.logger.info("starting {name} event loop".format(name = self.state['name'])) self.sync.confirm() while True: message = self.feed_socket.recv() if(message == "DONE"): - self.logger.info("{name} received the Done message from the feed".format(name=self.name)) + qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name'])) self.result_socket.send("DONE") break; self.received_count += 1 event = json.loads(message) cur_state = self.transform(event) cur_state['dt'] = event['dt'] - cur_state['name'] = self.name + cur_state['name'] = self.state['name'] self.result_socket.send(json.dumps(cur_state)) self.sent_count += 1 def close(self): - self.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.name, r=self.received_count, s=self.sent_count)) + qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) self.feed_socket.close() self.result_socket.close() @@ -121,10 +137,10 @@ class MergedTransformsFeed(Transform): def open(self): self.context = zmq.Context() - self.logger.info("starting {name} transform".format(name = self.name)) + qutil.logger.info("starting {name} transform".format(name = self.state['name'])) #create the feed SUB. self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed_address) + self.feed_socket.connect(self.feed.feed_address) self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') #create the result PUSH @@ -142,7 +158,7 @@ class MergedTransformsFeed(Transform): self.poller.register(self.transform_socket, zmq.POLLIN) for name, transform in self.transforms.iteritems(): - self.logger.info("starting {name}".format(name=name)) + qutil.logger.info("starting {name}".format(name=name)) proc = multiprocessing.Process(target=transform.run) proc.start() @@ -161,7 +177,7 @@ class MergedTransformsFeed(Transform): if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN: message = self.feed_socket.recv() if(message == "DONE"): - self.logger.info("finished receiving feed to merge") + qutil.logger.info("finished receiving feed to merge") done_count += 1 else: self.received_count += 1 @@ -171,7 +187,7 @@ class MergedTransformsFeed(Transform): if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: t_message = self.transform_socket.recv() if(t_message == "DONE"): - self.logger.info("finished receiving a transform to merge") + qutil.logger.info("finished receiving a transform to merge") done_count += 1 else: self.received_count += 1 @@ -183,13 +199,13 @@ class MergedTransformsFeed(Transform): self.data_buffer.send_next() - self.logger.info("Transform {name} received {r} and sent {s}".format(name=self.name, r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) - self.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) + qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) + qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) #drain any remaining messages in the buffer self.data_buffer.drain() #signal to client that we're done self.result_socket.send("DONE") - self.logger.info("Transform {name} received {r} and sent {s}".format(name=self.name, r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) + qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))