diff --git a/backtest/util.py b/backtest/util.py index 592f7203..39dec830 100644 --- a/backtest/util.py +++ b/backtest/util.py @@ -2,6 +2,9 @@ Small classes to assist with db access, timezone calculations, and so on. """ +import datetime +import pytz + class DocWrap(): """ Provides attribute access style on top of dictionary results from pymongo. @@ -38,7 +41,8 @@ def parse_date(dt_str): """parse strings according to the same format as generated by format_date""" if(dt_str == None): return None - dt = datetime.datetime.strptime(dt_str(".")[0], '%Y/%m/%d-%H:%M:%S').replace(microsecond=int(dt_str(".")[1]+"000")).replace(tzinfo = pytz.utc) + parts = dt_str.split(".") + dt = datetime.datetime.strptime(parts[0], '%Y/%m/%d-%H:%M:%S').replace(microsecond=int(parts[1]+"000")).replace(tzinfo = pytz.utc) return dt def format_date(dt): diff --git a/data/transforms.py b/data/transforms.py index 285176bc..37d828ba 100644 --- a/data/transforms.py +++ b/data/transforms.py @@ -4,6 +4,7 @@ import datetime import json import config import multiprocessing +from backtest import util class Transform(object): """Parent class for feed transforms. Subclass to create a new derived value from the combined feed.""" @@ -23,6 +24,8 @@ class Transform(object): self.name = self.config.get_string('name') self.state = {} self.state['name'] = self.name + self.received_count = 0 + self.sent_count = 0 def run(self): self.context = zmq.Context() @@ -50,15 +53,20 @@ class Transform(object): while True: message = self.feed_socket.recv() - self.logger.info("got feed message at {name}".format(name=self.name)) + self.received_count += 1 if(message == "DONE"): break; event = json.loads(message) - cur_state = update(event) - + cur_state = self.update(event) self.result_socket.send(json.dumps(cur_state)) - self.logger.info("sent message from {name}".format(name=self.name)) + self.sent_count += 1 + + self.logger.info("Transform {name} recieved {r} and sent {s}".format(name=self.name, r=self.received_count, s=self.sent_count)) + self.feed_socket.close() + self.result_socket.close() + self.context.term() + def update(self, event): return {} @@ -136,7 +144,7 @@ class MovingAverage(Transform): self.events.append(event) #filter the event list to the window length. - self.events = [x for x in self.events if (x.dt - curTick.dt) <= self.window] + self.events = [x for x in self.events if (util.parse_date(x['dt']) - util.parse_date(event['dt'])) <= self.window] if(len(self.events) == 0): return 0.0 diff --git a/test.py b/test.py index a574c2d1..8678a29b 100644 --- a/test.py +++ b/test.py @@ -11,25 +11,27 @@ from qbt_client import TestClient def datafeed(): connection, db = connect_db() logger = logging.getLogger() - feed = DataFeed(db, 1) #one merge, two moving averages. + feed = DataFeed(db, 1) #one moving average, one client feed_proc = multiprocessing.Process(target=feed.run) feed_proc.start() - #config = {} - #config['name'] = '**merged feed**' - #config['transforms'] = [{'name':'mavg1', 'class':'MovingAverage', 'hours':1},{'name':'mavg2', 'class':'MovingAverage', 'hours':2}] + config = {} + config['name'] = '**merged feed**' + config['transforms'] = [{'name':'mavg1', 'class':'MovingAverage', 'hours':1},{'name':'mavg2', 'class':'MovingAverage', 'hours':2}] - #result_address = "tcp://127.0.0.1:20202" + result_address = "tcp://127.0.0.1:20202" - #mavg = MovingAverage(feed.feed_address, result_address, feed.sync_address, config['transforms'][0]) - #mavg.run() + mavg = MovingAverage(feed.feed_address, result_address, feed.sync_address, config['transforms'][0]) + mavg.run() + #mavg_proc = multiprocessing.Process(target=mavg.run()) + #mavg_proc.start() #merger = Merge(feed.feed_address, result_address, feed.sync_address, config) #merger_proc = multiprocessing.Process(target=merger.run) #merger_proc.start() - client = TestClient(feed.feed_address, feed.sync_address) - client.run() + #client = TestClient(feed.feed_address, feed.sync_address) + #client.run() logger.info("feed has {pending} messages".format(pending=feed.pending_messages())) assert(feed.pending_messages() == 0)