diff --git a/nosetests.xml b/nosetests.xml
index 7769f599..a13d8ef0 100644
--- a/nosetests.xml
+++ b/nosetests.xml
@@ -1 +1 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/qsim/test/test_messaging.py b/qsim/test/test_messaging.py
index 2fe6787c..92888607 100644
--- a/qsim/test/test_messaging.py
+++ b/qsim/test/test_messaging.py
@@ -5,7 +5,7 @@ import tornado
import multiprocessing
from qsim.simulator.feed import DataFeed
-from qsim.transforms.base import MergedTransformsFeed
+from qsim.transforms.merge import MergedTransformsFeed
from qsim.transforms.technical import MovingAverage
import qsim.util as qutil
@@ -40,8 +40,8 @@ class MessagingTestCase(unittest.TestCase):
self.assertEqual(self.total_data_count, client.received_count, "The client should have received ({n}) the same number of messages as the feed sent ({m}).".format(n=client.received_count, m=self.total_data_count))
- def dtest_moving_average_to_client(self):
- mavg = MovingAverage(self.feed, self.config['transforms'][0])
+ def test_moving_average_to_client(self):
+ mavg = MovingAverage(self.feed, self.config['transforms'][0], result_address="tcp://127.0.0.1:20202")
mavg_proc = multiprocessing.Process(target=mavg.run)
mavg_proc.start()
diff --git a/qsim/transforms/base.py b/qsim/transforms/base.py
deleted file mode 100644
index de20902b..00000000
--- a/qsim/transforms/base.py
+++ /dev/null
@@ -1,247 +0,0 @@
-"""
-Transforms
-==========
-
-Transforms provide re-useable components for stream processing. All
-Transforms expect to receive data events from qsim.simulator.feed.DataFeed
-asynchronously via zeromq. Each transform is designed to run in independent
-process space, independently of all other transforms, to allow for parallel
-computation.
-
-Each transform must maintain the state necessary to calculate the transform of
-each new feed events.
-
-To simplify the consumption of feed and transform data events, this module
-also provides the TransformsMerge class. TransformsMerge initializes as set of
-transforms and subscribes to their output. Each feed event is then combined with
-all the transforms of that event into a single new message.
-
-"""
-import zmq
-import json
-import copy
-import multiprocessing
-import qsim.util as qutil
-import qsim.simulator.config as config
-
-class Transform(object):
- """Parent class for feed transforms. Subclass and override transform
- method to create a new derived value from the combined feed."""
-
- def __init__(self, feed, config_dict, result_address):
- """
- feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received.
- result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE"
- sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop
- config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value
- server - if True, transform will bind to the result address (and act as a server), if False it will connect. The
- the last transform in a series should be server=True so that clients can connect.
- """
-
- self.feed = feed
- self.result_address = result_address
- self.config = config.Config(config_dict)
- self.state = {}
- self.state['name'] = self.config.name
- self.sync = FeedSync(feed, self.state['name'])
- self.received_count = 0
- self.sent_count = 0
-
- def run(self):
- """Top level execution entry point for the transform::
-
- - connects to the feed socket to subscribe to events
- - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
- - processes all messages received from feed, until DONE message received
- - pushes all transforms
- - sends DONE to result socket, closes all sockets and context"""
- self.open()
- self.process_all()
- self.close()
-
- def open(self):
- """
- Establishes zmq connections.
- """
- self.context = zmq.Context()
-
- qutil.logger.info("starting {name} transform".format(name = self.state['name']))
- #create the feed SUB.
- self.feed_socket = self.context.socket(zmq.SUB)
- self.feed_socket.connect(self.feed.feed_address)
- self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
-
- #create the result PUSH
- self.result_socket = self.context.socket(zmq.PUSH)
- self.result_socket.connect(self.result_address)
-
- def process_all(self):
- """
- Loops until feed's DONE message is received:
- - receive an event from the data feed
- - call transform (subclass' method) on event
- - send the transformed event
- """
- qutil.logger.info("starting {name} event loop".format(name = self.state['name']))
- self.sync.confirm()
-
- while True:
- message = self.feed_socket.recv()
- if(message == "DONE"):
- qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name']))
- self.result_socket.send("DONE")
- break;
- self.received_count += 1
- event = json.loads(message)
- cur_state = self.transform(event)
- cur_state['dt'] = event['dt']
- cur_state['name'] = self.state['name']
- self.result_socket.send(json.dumps(cur_state))
- self.sent_count += 1
-
- def close(self):
- """
- Shut down zmq resources.
- """
- qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count))
-
- self.feed_socket.close()
- self.result_socket.close()
- self.context.term()
-
- def transform(self, event):
- """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"}
- Transforms run in parallel and results are merged into a single map, so transform names must be unique.
- Best practice is to use the self.state object initialized from the transform configuration, and only set the
- transformed value:
- self.state['value'] = transformed_value
- """
- return {}
-
-
-class MergedTransformsFeed(Transform):
- """ Merge data feed and array of transform feeds into a single result vector.
- PULL from feed
- PULL from child transforms
- PUSH merged message to client
-
- """
-
- def __init__(self, feed, props):
- """
- config - must have an entry for 'transforms':array of dicts, which are
- convertedto configs.
- """
- Transform.__init__(self, feed, props, "tcp://127.0.0.1:20202")
- self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104)
- self.transform_socket = None
- self.create_transforms(self.config.transforms)
-
-
- def create_transforms(self, configs):
- """
- Create transforms based on configs, set each transform's result address to
- this object's transform_address, so that all transformed events will be delivered
- to this object.
- """
- self.transforms = {}
- for props in configs:
- class_name = props['class']
- if(class_name == 'MovingAverage'):
- mavg = MovingAverage(self.feed, props, self.transform_address)
- self.transforms[mavg.name] = mavg
-
- keys = copy.copy(self.transforms.keys())
- keys.append("feed") #for the raw feed
- self.data_buffer = MergedParallelBuffer(keys)
-
- self.buffers = {}
- for name, transform in self.transforms.iteritems():
- self.buffers[name] = []
-
- def open(self):
- """Establish zmq context, feed socket, result socket for client, and transform
- socket to receive transformed events. Create and launch transforms. Will confirm
- ready with the DataFeed at the conclusion."""
- self.context = zmq.Context()
-
- qutil.logger.info("starting {name} transform".format(name = self.state['name']))
- #create the feed SUB.
- self.feed_socket = self.context.socket(zmq.SUB)
- self.feed_socket.connect(self.feed.feed_address)
- self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
-
- #create the result PUSH
- self.result_socket = self.context.socket(zmq.PUSH)
- self.result_socket.bind(self.result_address)
-
- #create the transform PULL.
- self.transform_socket = self.context.socket(zmq.PULL)
- self.transform_socket.bind(self.transform_address)
- self.data_buffer.out_socket = self.result_socket
-
- # Initialize poll set
- self.poller = zmq.Poller()
- self.poller.register(self.feed_socket, zmq.POLLIN)
- self.poller.register(self.transform_socket, zmq.POLLIN)
-
- for name, transform in self.transforms.iteritems():
- qutil.logger.info("starting {name}".format(name=name))
- proc = multiprocessing.Process(target=transform.run)
- proc.start()
-
- self.sync.confirm()
-
- def close(self):
- """
- Close all zmq sockets and context.
- """
- self.transform_socket.close()
- Transform.close(self)
-
- def process_all(self):
- """
- Uses a Poller to receive messages from all transforms and the feed.
- All transforms corresponding to the same event are merged with each other
- and the original feed event into a single message. That message is then
- sent to the result socket.
- """
- done_count = 0
- while True:
- socks = dict(self.poller.poll())
-
- if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN:
- message = self.feed_socket.recv()
- if(message == "DONE"):
- qutil.logger.info("finished receiving feed to merge")
- done_count += 1
- else:
- self.received_count += 1
- event = json.loads(message)
- self.data_buffer.append("feed",event)
-
- if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN:
- t_message = self.transform_socket.recv()
- if(t_message == "DONE"):
- qutil.logger.info("finished receiving a transform to merge")
- done_count += 1
- else:
- self.received_count += 1
- t_event = json.loads(t_message)
- self.data_buffer.append(t_event['name'], t_event)
-
- if(done_count >= len(self.data_buffer)):
- break #done!
-
- self.data_buffer.send_next()
-
- qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
- qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
-
- #drain any remaining messages in the buffer
- self.data_buffer.drain()
-
- #signal to client that we're done
- self.result_socket.send("DONE")
- qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
-
diff --git a/qsim/transforms/core.py b/qsim/transforms/core.py
new file mode 100644
index 00000000..4217aa92
--- /dev/null
+++ b/qsim/transforms/core.py
@@ -0,0 +1,121 @@
+"""
+Transforms
+==========
+
+Transforms provide re-useable components for stream processing. All
+Transforms expect to receive data events from qsim.simulator.feed.DataFeed
+asynchronously via zeromq. Each transform is designed to run in independent
+process space, independently of all other transforms, to allow for parallel
+computation.
+
+Each transform must maintain the state necessary to calculate the transform of
+each new feed events.
+
+To simplify the consumption of feed and transform data events, this module
+also provides the TransformsMerge class. TransformsMerge initializes as set of
+transforms and subscribes to their output. Each feed event is then combined with
+all the transforms of that event into a single new message.
+
+"""
+import zmq
+import json
+import qsim.util as qutil
+import qsim.simulator.config as config
+
+
+class BaseTransform(object):
+ """Parent class for feed transforms. Subclass and override transform
+ method to create a new derived value from the combined feed."""
+
+ def __init__(self, feed, config_dict, result_address):
+ """
+ feed_address - zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received.
+ result_address - zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE"
+ sync_address - zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop
+ config - must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value
+ server - if True, transform will bind to the result address (and act as a server), if False it will connect. The
+ the last transform in a series should be server=True so that clients can connect.
+ """
+
+ self.feed = feed
+ self.result_address = result_address
+ self.config = config.Config(config_dict)
+ self.state = {}
+ self.state['name'] = self.config.name
+ self.sync = qutil.FeedSync(feed, self.state['name'])
+ self.received_count = 0
+ self.sent_count = 0
+ self.context = None
+
+ def run(self):
+ """Top level execution entry point for the transform::
+
+ - connects to the feed socket to subscribe to events
+ - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
+ - processes all messages received from feed, until DONE message received
+ - pushes all transforms
+ - sends DONE to result socket, closes all sockets and context"""
+ self.open()
+ self.process_all()
+ self.close()
+
+ def open(self):
+ """
+ Establishes zmq connections.
+ """
+ self.context = zmq.Context()
+
+ qutil.logger.info("starting {name} transform".
+ format(name = self.state['name']))
+ #create the feed SUB.
+ self.feed_socket = self.context.socket(zmq.SUB)
+ self.feed_socket.connect(self.feed.feed_address)
+ self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
+
+ #create the result PUSH
+ self.result_socket = self.context.socket(zmq.PUSH)
+ self.result_socket.connect(self.result_address)
+
+ def process_all(self):
+ """
+ Loops until feed's DONE message is received:
+ - receive an event from the data feed
+ - call transform (subclass' method) on event
+ - send the transformed event
+ """
+ qutil.logger.info("starting {name} event loop".format(name = self.state['name']))
+ self.sync.confirm()
+
+ while True:
+ message = self.feed_socket.recv()
+ if(message == "DONE"):
+ qutil.logger.info("{name} received the Done message from the feed".format(name=self.state['name']))
+ self.result_socket.send("DONE")
+ break;
+ self.received_count += 1
+ event = json.loads(message)
+ cur_state = self.transform(event)
+ cur_state['dt'] = event['dt']
+ cur_state['name'] = self.state['name']
+ self.result_socket.send(json.dumps(cur_state))
+ self.sent_count += 1
+
+ def close(self):
+ """
+ Shut down zmq resources.
+ """
+ qutil.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count))
+
+ self.feed_socket.close()
+ self.result_socket.close()
+ self.context.term()
+
+ def transform(self, event):
+ """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"}
+ Transforms run in parallel and results are merged into a single map, so transform names must be unique.
+ Best practice is to use the self.state object initialized from the transform configuration, and only set the
+ transformed value:
+ self.state['value'] = transformed_value
+ """
+ return {}
+
\ No newline at end of file
diff --git a/qsim/transforms/merge.py b/qsim/transforms/merge.py
new file mode 100644
index 00000000..2a8c9ebe
--- /dev/null
+++ b/qsim/transforms/merge.py
@@ -0,0 +1,134 @@
+import copy
+import multiprocessing
+import zmq
+
+import technical as ta
+from core import BaseTransform
+import qsim.util as qutil
+
+class MergedTransformsFeed(BaseTransform):
+ """ Merge data feed and array of transform feeds into a single result vector.
+ PULL from feed
+ PULL from child transforms
+ PUSH merged message to client
+
+ """
+
+ def __init__(self, feed, props):
+ """
+ config - must have an entry for 'transforms':array of dicts, which are
+ convertedto configs.
+ """
+ BaseTransform.__init__(self, feed, props, "tcp://127.0.0.1:20202")
+ self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104)
+ self.transform_socket = None
+ self.create_transforms(self.config.transforms)
+
+
+ def create_transforms(self, configs):
+ """
+ Create transforms based on configs, set each transform's result address to
+ this object's transform_address, so that all transformed events will be delivered
+ to this object.
+ """
+ self.transforms = {}
+ for props in configs:
+ class_name = props['class']
+ if(class_name == 'MovingAverage'):
+ mavg = ta.MovingAverage(self.feed, props, self.transform_address)
+ self.transforms[mavg.config.name] = mavg
+
+ keys = copy.copy(self.transforms.keys())
+ keys.append("feed") #for the raw feed
+ self.data_buffer = qutil.MergedParallelBuffer(keys)
+
+ self.buffers = {}
+ for name, transform in self.transforms.iteritems():
+ self.buffers[name] = []
+
+ def open(self):
+ """Establish zmq context, feed socket, result socket for client, and transform
+ socket to receive transformed events. Create and launch transforms. Will confirm
+ ready with the DataFeed at the conclusion."""
+ self.context = zmq.Context()
+
+ qutil.logger.info("starting {name} transform".format(name = self.state['name']))
+ #create the feed SUB.
+ self.feed_socket = self.context.socket(zmq.SUB)
+ self.feed_socket.connect(self.feed.feed_address)
+ self.feed_socket.setsockopt(zmq.SUBSCRIBE,'')
+
+ #create the result PUSH
+ self.result_socket = self.context.socket(zmq.PUSH)
+ self.result_socket.bind(self.result_address)
+
+ #create the transform PULL.
+ self.transform_socket = self.context.socket(zmq.PULL)
+ self.transform_socket.bind(self.transform_address)
+ self.data_buffer.out_socket = self.result_socket
+
+ # Initialize poll set
+ self.poller = zmq.Poller()
+ self.poller.register(self.feed_socket, zmq.POLLIN)
+ self.poller.register(self.transform_socket, zmq.POLLIN)
+
+ for name, transform in self.transforms.iteritems():
+ qutil.logger.info("starting {name}".format(name=name))
+ proc = multiprocessing.Process(target=transform.run)
+ proc.start()
+
+ self.sync.confirm()
+
+ def close(self):
+ """
+ Close all zmq sockets and context.
+ """
+ self.transform_socket.close()
+ BaseTransform.close(self)
+
+ def process_all(self):
+ """
+ Uses a Poller to receive messages from all transforms and the feed.
+ All transforms corresponding to the same event are merged with each other
+ and the original feed event into a single message. That message is then
+ sent to the result socket.
+ """
+ done_count = 0
+ while True:
+ socks = dict(self.poller.poll())
+
+ if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN:
+ message = self.feed_socket.recv()
+ if(message == "DONE"):
+ qutil.logger.info("finished receiving feed to merge")
+ done_count += 1
+ else:
+ self.received_count += 1
+ event = json.loads(message)
+ self.data_buffer.append("feed",event)
+
+ if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN:
+ t_message = self.transform_socket.recv()
+ if(t_message == "DONE"):
+ qutil.logger.info("finished receiving a transform to merge")
+ done_count += 1
+ else:
+ self.received_count += 1
+ t_event = json.loads(t_message)
+ self.data_buffer.append(t_event['name'], t_event)
+
+ if(done_count >= len(self.data_buffer)):
+ break #done!
+
+ self.data_buffer.send_next()
+
+ qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+ qutil.logger.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages()))
+
+ #drain any remaining messages in the buffer
+ self.data_buffer.drain()
+
+ #signal to client that we're done
+ self.result_socket.send("DONE")
+ qutil.logger.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count))
+
diff --git a/qsim/transforms/technical.py b/qsim/transforms/technical.py
index 8c2d409a..9c51bd0f 100644
--- a/qsim/transforms/technical.py
+++ b/qsim/transforms/technical.py
@@ -1,9 +1,22 @@
-import qsim.transforms.base as base
+"""
+Transformations for common technical indicators.
+TODO: add MACD transform
+TODO: add trailing stop
-class MovingAverage(base.Transform):
+"""
+import datetime
+import qsim.util as qutil
+
+from core import BaseTransform
+
+class MovingAverage(BaseTransform):
+ """
+ Calculate a unweighted moving average for props['sid'] security
+ TODO: add sid filter.
+ """
def __init__(self, feed, props, result_address):
- base.Transform.__init__(self, feed, props, result_address)
+ BaseTransform.__init__(self, feed, props, result_address)
self.events = []
self.window = datetime.timedelta(days = self.config.get_integer('days'),
@@ -18,6 +31,8 @@ class MovingAverage(base.Transform):
def transform(self, event):
+ """Update the moving average with the latest data point."""
+
self.events.append(event)
#filter the event list to the window length.
diff --git a/setup.cfg b/setup.cfg
index df75e387..254b3bd7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,11 +6,11 @@ with-coverage=1
cover-package=qsim
#cover-erase=1
cover-html=1
-cover-html-dir=cover
+cover-html-dir=docs/_build/html/cover
# Drop into debugger on failure
-pdb=0
-pdb-failures=0
+#pdb=0
+#pdb-failures=0
# For Jenkins
with-coverage=1