From 22be49c594e9947e11399b2bac09310467045842 Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 8 Feb 2012 21:53:31 -0500 Subject: [PATCH] split up transform package into several modules --- nosetests.xml | 2 +- qsim/test/test_messaging.py | 6 +- qsim/transforms/base.py | 247 ----------------------------------- qsim/transforms/core.py | 121 +++++++++++++++++ qsim/transforms/merge.py | 134 +++++++++++++++++++ qsim/transforms/technical.py | 21 ++- setup.cfg | 6 +- 7 files changed, 280 insertions(+), 257 deletions(-) delete mode 100644 qsim/transforms/base.py create mode 100644 qsim/transforms/core.py create mode 100644 qsim/transforms/merge.py diff --git a/nosetests.xml b/nosetests.xml index 7769f599..a13d8ef0 100644 --- a/nosetests.xml +++ b/nosetests.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/qsim/test/test_messaging.py b/qsim/test/test_messaging.py index 2fe6787c..92888607 100644 --- a/qsim/test/test_messaging.py +++ b/qsim/test/test_messaging.py @@ -5,7 +5,7 @@ import tornado import multiprocessing from qsim.simulator.feed import DataFeed -from qsim.transforms.base import MergedTransformsFeed +from qsim.transforms.merge import MergedTransformsFeed from qsim.transforms.technical import MovingAverage import qsim.util as qutil @@ -40,8 +40,8 @@ class MessagingTestCase(unittest.TestCase): self.assertEqual(self.total_data_count, client.received_count, "The client should have received ({n}) the same number of messages as the feed sent ({m}).".format(n=client.received_count, m=self.total_data_count)) - def dtest_moving_average_to_client(self): - mavg = MovingAverage(self.feed, self.config['transforms'][0]) + def test_moving_average_to_client(self): + mavg = MovingAverage(self.feed, self.config['transforms'][0], result_address="tcp://127.0.0.1:20202") mavg_proc = multiprocessing.Process(target=mavg.run) mavg_proc.start() diff --git a/qsim/transforms/base.py b/qsim/transforms/base.py deleted file mode 100644 index de20902b..00000000 --- a/qsim/transforms/base.py +++ /dev/null @@ -1,247 +0,0 @@ -""" -Transforms -========== - -Transforms provide re-useable components for stream processing. All -Transforms expect to receive data events from qsim.simulator.feed.DataFeed -asynchronously via zeromq. Each transform is designed to run in independent -process space, independently of all other transforms, to allow for parallel -computation. - -Each transform must maintain the state necessary to calculate the transform of -each new feed events. - -To simplify the consumption of feed and transform data events, this module -also provides the TransformsMerge class. TransformsMerge initializes as set of -transforms and subscribes to their output. Each feed event is then combined with -all the transforms of that event into a single new message. - -""" -import zmq -import json -import copy -import multiprocessing -import qsim.util as qutil -import qsim.simulator.config as config - -class Transform(object): - """Parent class for feed transforms. Subclass and override transform - method to create a new derived value from the combined feed.""" - - def __init__(self, feed, config_dict, result_address): - """ - feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received. - result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE" - sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop - config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value - server - if True, transform will bind to the result address (and act as a server), if False it will connect. The - the last transform in a series should be server=True so that clients can connect. - """ - - self.feed = feed - self.result_address = result_address - self.config = config.Config(config_dict) - self.state = {} - self.state['name'] = self.config.name - self.sync = FeedSync(feed, self.state['name']) - self.received_count = 0 - self.sent_count = 0 - - def run(self): - """Top level execution entry point for the transform:: - - - connects to the feed socket to subscribe to events - - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms - - processes all messages received from feed, until DONE message received - - pushes all transforms - - sends DONE to result socket, closes all sockets and context""" - self.open() - self.process_all() - self.close() - - def open(self): - """ - Establishes zmq connections. - """ - self.context = zmq.Context() - - qutil.logger.info("starting {name} transform".format(name = self.state['name'])) - #create the feed SUB. - self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed.feed_address) - self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') - - #create the result PUSH - self.result_socket = self.context.socket(zmq.PUSH) - self.result_socket.connect(self.result_address) - - def process_all(self): - """ - Loops until feed's DONE message is received: - - receive an event from the data feed - - call transform (subclass' method) on event - - send the transformed event - """ - qutil.logger.info("starting {name} event loop".format(name = self.state['name'])) - self.sync.confirm() - - while True: - message = self.feed_socket.recv() - if(message == "DONE"): - qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name'])) - self.result_socket.send("DONE") - break; - self.received_count += 1 - event = json.loads(message) - cur_state = self.transform(event) - cur_state['dt'] = event['dt'] - cur_state['name'] = self.state['name'] - self.result_socket.send(json.dumps(cur_state)) - self.sent_count += 1 - - def close(self): - """ - Shut down zmq resources. - """ - qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) - - self.feed_socket.close() - self.result_socket.close() - self.context.term() - - def transform(self, event): - """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"} - Transforms run in parallel and results are merged into a single map, so transform names must be unique. - Best practice is to use the self.state object initialized from the transform configuration, and only set the - transformed value: - self.state['value'] = transformed_value - """ - return {} - - -class MergedTransformsFeed(Transform): - """ Merge data feed and array of transform feeds into a single result vector. - PULL from feed - PULL from child transforms - PUSH merged message to client - - """ - - def __init__(self, feed, props): - """ - config - must have an entry for 'transforms':array of dicts, which are - convertedto configs. - """ - Transform.__init__(self, feed, props, "tcp://127.0.0.1:20202") - self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104) - self.transform_socket = None - self.create_transforms(self.config.transforms) - - - def create_transforms(self, configs): - """ - Create transforms based on configs, set each transform's result address to - this object's transform_address, so that all transformed events will be delivered - to this object. - """ - self.transforms = {} - for props in configs: - class_name = props['class'] - if(class_name == 'MovingAverage'): - mavg = MovingAverage(self.feed, props, self.transform_address) - self.transforms[mavg.name] = mavg - - keys = copy.copy(self.transforms.keys()) - keys.append("feed") #for the raw feed - self.data_buffer = MergedParallelBuffer(keys) - - self.buffers = {} - for name, transform in self.transforms.iteritems(): - self.buffers[name] = [] - - def open(self): - """Establish zmq context, feed socket, result socket for client, and transform - socket to receive transformed events. Create and launch transforms. Will confirm - ready with the DataFeed at the conclusion.""" - self.context = zmq.Context() - - qutil.logger.info("starting {name} transform".format(name = self.state['name'])) - #create the feed SUB. - self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed.feed_address) - self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') - - #create the result PUSH - self.result_socket = self.context.socket(zmq.PUSH) - self.result_socket.bind(self.result_address) - - #create the transform PULL. - self.transform_socket = self.context.socket(zmq.PULL) - self.transform_socket.bind(self.transform_address) - self.data_buffer.out_socket = self.result_socket - - # Initialize poll set - self.poller = zmq.Poller() - self.poller.register(self.feed_socket, zmq.POLLIN) - self.poller.register(self.transform_socket, zmq.POLLIN) - - for name, transform in self.transforms.iteritems(): - qutil.logger.info("starting {name}".format(name=name)) - proc = multiprocessing.Process(target=transform.run) - proc.start() - - self.sync.confirm() - - def close(self): - """ - Close all zmq sockets and context. - """ - self.transform_socket.close() - Transform.close(self) - - def process_all(self): - """ - Uses a Poller to receive messages from all transforms and the feed. - All transforms corresponding to the same event are merged with each other - and the original feed event into a single message. That message is then - sent to the result socket. - """ - done_count = 0 - while True: - socks = dict(self.poller.poll()) - - if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN: - message = self.feed_socket.recv() - if(message == "DONE"): - qutil.logger.info("finished receiving feed to merge") - done_count += 1 - else: - self.received_count += 1 - event = json.loads(message) - self.data_buffer.append("feed",event) - - if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: - t_message = self.transform_socket.recv() - if(t_message == "DONE"): - qutil.logger.info("finished receiving a transform to merge") - done_count += 1 - else: - self.received_count += 1 - t_event = json.loads(t_message) - self.data_buffer.append(t_event['name'], t_event) - - if(done_count >= len(self.data_buffer)): - break #done! - - self.data_buffer.send_next() - - qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) - qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) - - #drain any remaining messages in the buffer - self.data_buffer.drain() - - #signal to client that we're done - self.result_socket.send("DONE") - qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) - diff --git a/qsim/transforms/core.py b/qsim/transforms/core.py new file mode 100644 index 00000000..4217aa92 --- /dev/null +++ b/qsim/transforms/core.py @@ -0,0 +1,121 @@ +""" +Transforms +========== + +Transforms provide re-useable components for stream processing. All +Transforms expect to receive data events from qsim.simulator.feed.DataFeed +asynchronously via zeromq. Each transform is designed to run in independent +process space, independently of all other transforms, to allow for parallel +computation. + +Each transform must maintain the state necessary to calculate the transform of +each new feed events. + +To simplify the consumption of feed and transform data events, this module +also provides the TransformsMerge class. TransformsMerge initializes as set of +transforms and subscribes to their output. Each feed event is then combined with +all the transforms of that event into a single new message. + +""" +import zmq +import json +import qsim.util as qutil +import qsim.simulator.config as config + + +class BaseTransform(object): + """Parent class for feed transforms. Subclass and override transform + method to create a new derived value from the combined feed.""" + + def __init__(self, feed, config_dict, result_address): + """ + feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received. + result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE" + sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop + config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value + server - if True, transform will bind to the result address (and act as a server), if False it will connect. The + the last transform in a series should be server=True so that clients can connect. + """ + + self.feed = feed + self.result_address = result_address + self.config = config.Config(config_dict) + self.state = {} + self.state['name'] = self.config.name + self.sync = qutil.FeedSync(feed, self.state['name']) + self.received_count = 0 + self.sent_count = 0 + self.context = None + + def run(self): + """Top level execution entry point for the transform:: + + - connects to the feed socket to subscribe to events + - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms + - processes all messages received from feed, until DONE message received + - pushes all transforms + - sends DONE to result socket, closes all sockets and context""" + self.open() + self.process_all() + self.close() + + def open(self): + """ + Establishes zmq connections. + """ + self.context = zmq.Context() + + qutil.logger.info("starting {name} transform". + format(name = self.state['name'])) + #create the feed SUB. + self.feed_socket = self.context.socket(zmq.SUB) + self.feed_socket.connect(self.feed.feed_address) + self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') + + #create the result PUSH + self.result_socket = self.context.socket(zmq.PUSH) + self.result_socket.connect(self.result_address) + + def process_all(self): + """ + Loops until feed's DONE message is received: + - receive an event from the data feed + - call transform (subclass' method) on event + - send the transformed event + """ + qutil.logger.info("starting {name} event loop".format(name = self.state['name'])) + self.sync.confirm() + + while True: + message = self.feed_socket.recv() + if(message == "DONE"): + qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name'])) + self.result_socket.send("DONE") + break; + self.received_count += 1 + event = json.loads(message) + cur_state = self.transform(event) + cur_state['dt'] = event['dt'] + cur_state['name'] = self.state['name'] + self.result_socket.send(json.dumps(cur_state)) + self.sent_count += 1 + + def close(self): + """ + Shut down zmq resources. + """ + qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) + + self.feed_socket.close() + self.result_socket.close() + self.context.term() + + def transform(self, event): + """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"} + Transforms run in parallel and results are merged into a single map, so transform names must be unique. + Best practice is to use the self.state object initialized from the transform configuration, and only set the + transformed value: + self.state['value'] = transformed_value + """ + return {} + \ No newline at end of file diff --git a/qsim/transforms/merge.py b/qsim/transforms/merge.py new file mode 100644 index 00000000..2a8c9ebe --- /dev/null +++ b/qsim/transforms/merge.py @@ -0,0 +1,134 @@ +import copy +import multiprocessing +import zmq + +import technical as ta +from core import BaseTransform +import qsim.util as qutil + +class MergedTransformsFeed(BaseTransform): + """ Merge data feed and array of transform feeds into a single result vector. + PULL from feed + PULL from child transforms + PUSH merged message to client + + """ + + def __init__(self, feed, props): + """ + config - must have an entry for 'transforms':array of dicts, which are + convertedto configs. + """ + BaseTransform.__init__(self, feed, props, "tcp://127.0.0.1:20202") + self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104) + self.transform_socket = None + self.create_transforms(self.config.transforms) + + + def create_transforms(self, configs): + """ + Create transforms based on configs, set each transform's result address to + this object's transform_address, so that all transformed events will be delivered + to this object. + """ + self.transforms = {} + for props in configs: + class_name = props['class'] + if(class_name == 'MovingAverage'): + mavg = ta.MovingAverage(self.feed, props, self.transform_address) + self.transforms[mavg.config.name] = mavg + + keys = copy.copy(self.transforms.keys()) + keys.append("feed") #for the raw feed + self.data_buffer = qutil.MergedParallelBuffer(keys) + + self.buffers = {} + for name, transform in self.transforms.iteritems(): + self.buffers[name] = [] + + def open(self): + """Establish zmq context, feed socket, result socket for client, and transform + socket to receive transformed events. Create and launch transforms. Will confirm + ready with the DataFeed at the conclusion.""" + self.context = zmq.Context() + + qutil.logger.info("starting {name} transform".format(name = self.state['name'])) + #create the feed SUB. + self.feed_socket = self.context.socket(zmq.SUB) + self.feed_socket.connect(self.feed.feed_address) + self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') + + #create the result PUSH + self.result_socket = self.context.socket(zmq.PUSH) + self.result_socket.bind(self.result_address) + + #create the transform PULL. + self.transform_socket = self.context.socket(zmq.PULL) + self.transform_socket.bind(self.transform_address) + self.data_buffer.out_socket = self.result_socket + + # Initialize poll set + self.poller = zmq.Poller() + self.poller.register(self.feed_socket, zmq.POLLIN) + self.poller.register(self.transform_socket, zmq.POLLIN) + + for name, transform in self.transforms.iteritems(): + qutil.logger.info("starting {name}".format(name=name)) + proc = multiprocessing.Process(target=transform.run) + proc.start() + + self.sync.confirm() + + def close(self): + """ + Close all zmq sockets and context. + """ + self.transform_socket.close() + BaseTransform.close(self) + + def process_all(self): + """ + Uses a Poller to receive messages from all transforms and the feed. + All transforms corresponding to the same event are merged with each other + and the original feed event into a single message. That message is then + sent to the result socket. + """ + done_count = 0 + while True: + socks = dict(self.poller.poll()) + + if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN: + message = self.feed_socket.recv() + if(message == "DONE"): + qutil.logger.info("finished receiving feed to merge") + done_count += 1 + else: + self.received_count += 1 + event = json.loads(message) + self.data_buffer.append("feed",event) + + if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: + t_message = self.transform_socket.recv() + if(t_message == "DONE"): + qutil.logger.info("finished receiving a transform to merge") + done_count += 1 + else: + self.received_count += 1 + t_event = json.loads(t_message) + self.data_buffer.append(t_event['name'], t_event) + + if(done_count >= len(self.data_buffer)): + break #done! + + self.data_buffer.send_next() + + qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) + qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) + + #drain any remaining messages in the buffer + self.data_buffer.drain() + + #signal to client that we're done + self.result_socket.send("DONE") + qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) + diff --git a/qsim/transforms/technical.py b/qsim/transforms/technical.py index 8c2d409a..9c51bd0f 100644 --- a/qsim/transforms/technical.py +++ b/qsim/transforms/technical.py @@ -1,9 +1,22 @@ -import qsim.transforms.base as base +""" +Transformations for common technical indicators. +TODO: add MACD transform +TODO: add trailing stop -class MovingAverage(base.Transform): +""" +import datetime +import qsim.util as qutil + +from core import BaseTransform + +class MovingAverage(BaseTransform): + """ + Calculate a unweighted moving average for props['sid'] security + TODO: add sid filter. + """ def __init__(self, feed, props, result_address): - base.Transform.__init__(self, feed, props, result_address) + BaseTransform.__init__(self, feed, props, result_address) self.events = [] self.window = datetime.timedelta(days = self.config.get_integer('days'), @@ -18,6 +31,8 @@ class MovingAverage(base.Transform): def transform(self, event): + """Update the moving average with the latest data point.""" + self.events.append(event) #filter the event list to the window length. diff --git a/setup.cfg b/setup.cfg index df75e387..254b3bd7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -6,11 +6,11 @@ with-coverage=1 cover-package=qsim #cover-erase=1 cover-html=1 -cover-html-dir=cover +cover-html-dir=docs/_build/html/cover # Drop into debugger on failure -pdb=0 -pdb-failures=0 +#pdb=0 +#pdb-failures=0 # For Jenkins with-coverage=1