diff --git a/qsim/core.py b/qsim/core.py index 39665d32..1c5415a7 100644 --- a/qsim/core.py +++ b/qsim/core.py @@ -3,12 +3,9 @@ Provides simulated data feed services. """ import multiprocessing import zmq -import time -import logging import json import copy -import qsim.sources as sources import qsim.util as qutil import qsim.messaging as qmsg @@ -22,8 +19,13 @@ class Simulator(object): """ self.sources = sources self.transforms = transforms - self.client = client - self.sync_register = {} + self.client = client + self.merge = None + self.feed = None + self.context = None + self.sync_context = None + self.syncservice = None + self.sync_register = {} self.sync_address = "tcp://127.0.0.1:{port}".format(port=10100) self.data_address = "tcp://127.0.0.1:{port}".format(port=10101) self.feed_address = "tcp://127.0.0.1:{port}".format(port=10102) @@ -43,7 +45,7 @@ class Simulator(object): for name, transform in self.transforms.iteritems(): transform.feed_address = self.feed_address #connect transform to receive feed. transform.merge_address = self.merge_address #connect transform to push results to merge - transform.sync = qmsg.Sync(self,name) #synchronize the transform against this simulation. + transform.sync = qmsg.Sync(self, name) #synchronize the transform against this simulation. self.launch_component(name, transform) #start transforms #connect merge to feed, set expected transforms @@ -75,7 +77,7 @@ class Simulator(object): self.sync_register[sync_id] = "UNCONFIRMED" def registration_complete(self): - for sync_id, status in self.sync_register.iteritems(): + for status in self.sync_register.values(): if status == "UNCONFIRMED": return False @@ -111,6 +113,9 @@ class DataFeed(object): self.data_address = data_address self.data_buffer = qmsg.ParallelBuffer(source_list) self.sync = sync + self.feed_socket = None + self.data_socket = None + self.context = None def run(self): # Prepare our context and sockets @@ -177,6 +182,8 @@ class BaseTransform(object): self.received_count = 0 self.sent_count = 0 self.context = None + self.feed_socket = None + self.result_socket = None def run(self): """Top level execution entry point for the transform:: @@ -226,7 +233,7 @@ class BaseTransform(object): if(message == "DONE"): qutil.LOGGER.info("{name} received the Done message from the feed".format(name=self.state['name'])) self.result_socket.send("DONE") - break; + break self.received_count += 1 event = json.loads(message) cur_state = self.transform(event) @@ -239,7 +246,10 @@ class BaseTransform(object): """ Shut down zmq resources. """ - qutil.LOGGER.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) + qutil.LOGGER.info("Transform {name} recieved {r} and sent {s}".format( + name=self.state['name'], + r=self.received_count, + s=self.sent_count)) self.feed_socket.close() self.result_socket.close() @@ -271,7 +281,12 @@ class TransformsMerge(object): self.result_address = result_address buffer_list = copy.copy(transform_list) buffer_list.append("feed") #for the raw feed - self.data_buffer = qmsg.MergedParallelBuffer(buffer_list) + self.data_buffer = qmsg.MergedParallelBuffer(buffer_list) + self.feed_socket = None + self.result_socket = None + self.poller = None + self.context = None + self.transform_socket = None def run(self): """""" @@ -338,7 +353,7 @@ class TransformsMerge(object): done_count += 1 else: event = json.loads(message) - self.data_buffer.append("feed",event) + self.data_buffer.append("feed", event) if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: t_message = self.transform_socket.recv() diff --git a/qsim/sources.py b/qsim/sources.py index 8c90be9f..7090bd77 100644 --- a/qsim/sources.py +++ b/qsim/sources.py @@ -4,11 +4,9 @@ Provides data handlers that can push messages to a qsim.core.DataFeed import datetime import zmq import json -import multiprocessing import random import qsim.util as qutil -import qsim.messaging as qmsg class DataSource(object): """ diff --git a/qsim/test/client.py b/qsim/test/client.py index d9350d70..bd03e0d5 100644 --- a/qsim/test/client.py +++ b/qsim/test/client.py @@ -1,11 +1,6 @@ -import copy -import multiprocessing import zmq -import logging import json - import qsim.util as qutil -import qsim.messaging as qmsg class TestClient(object): @@ -14,8 +9,10 @@ class TestClient(object): self.sync = None self.received_count = 0 self.expected_msg_count = expected_msg_count - self.ERROR = False + self.error = False self.utest = utest + self.data_feed = None + self.context = None def run(self): @@ -42,7 +39,7 @@ class TestClient(object): event = json.loads(msg) if(prev_dt != None): if(not event['dt'] >= prev_dt): - raise Exception("message arrived out of order: {date} after {prev}".format(date=event['dt'], prev=prev_dt)) + raise Exception("Message out of order: {date} after {prev}".format(date=event['dt'], prev=prev_dt)) prev_dt = event['dt'] if(self.received_count % 100 == 0): @@ -50,7 +47,7 @@ class TestClient(object): qutil.LOGGER.info("received {n} messages".format(n=self.received_count)) except: - self.ERROR = True + self.error = True qutil.LOGGER.exception("Error in test client.") finally: self.data_feed.close() diff --git a/qsim/test/test_messaging.py b/qsim/test/test_messaging.py index 6a74e164..20b2f062 100644 --- a/qsim/test/test_messaging.py +++ b/qsim/test/test_messaging.py @@ -50,7 +50,7 @@ class MessagingTestCase(unittest.TestCase): mavg2 = MovingAverage("mavg2", 60) transforms = {"mavg1":mavg1, "mavg2":mavg2} client = TestClient(self, expected_msg_count=800) - sim = Simulator(sources, {}, client) + sim = Simulator(sources, transforms, client) sim.launch() diff --git a/qsim/transforms/technical.py b/qsim/transforms/technical.py index 72464e47..f9166f38 100644 --- a/qsim/transforms/technical.py +++ b/qsim/transforms/technical.py @@ -5,8 +5,6 @@ TODO: add trailing stop """ import datetime -import qsim.util as qutil - from qsim.core import BaseTransform class MovingAverage(BaseTransform): @@ -42,6 +40,6 @@ class MovingAverage(BaseTransform): #self.average = total/len(self.events) #self.state['value'] = self.average - self.stat['value'] = 10 + self.state['value'] = 10 return self.state \ No newline at end of file