diff --git a/zipline/messaging.py b/zipline/messaging.py index 671eeea7..531be789 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -108,6 +108,12 @@ class Component(object): def connect_data(self): return self.connect_push_socket(self.addresses['data_address']) + def bind_order(self): + return self.bind_pull_socket(self.addresses['order_address']) + + def connect_order(self): + return self.connect_push_socket(self.addresses['order_address']) + def bind_feed(self): return self.bind_pub_socket(self.addresses['feed_address']) @@ -259,6 +265,7 @@ class ParallelBuffer(Component): self.sent_count = 0 self.received_count = 0 self.draining = False + #data source component ID -> List of messages self.data_buffer = {} self.ds_finished_counter = 0 @@ -422,7 +429,8 @@ class BaseTransform(Component): return event = json.loads(message) cur_state = self.transform(event) - cur_state['dt'] = event['dt'] + #TODO: do we want to relay the datetime again? maybe drop this? + #cur_state['dt'] = event['dt'] cur_state['id'] = self.state['name'] self.result_socket.send(json.dumps(cur_state), self.zmq.NOBLOCK) @@ -473,3 +481,36 @@ class DataSource(Component): def get_type(self): raise NotImplemented + + +class Client(Component): + + def __init__(self): + Component.__init__(self) + self.received_count = 0 + self.prev_dt = None + + def open(self): + self.data_feed, self.poller = self.connect_result() + self.order_feed, self.poller = self.connect_order() + + def do_work(self): + socks = dict(self.poller.poll(2000)) #timeout after 2 seconds. + if self.data_feed in socks and socks[self.data_feed] == self.zmq.POLLIN: + msg = self.data_feed.recv() + if(self.is_done_message(msg)): + qutil.LOGGER.info("Client is DONE!") + self.signal_done() + return + + self.received_count += 1 + event = json.loads(msg) + self.handle_event(event) + + def handle_event(self, event): + NotImplemented + + + def order(self, sid, volume): + order = {'sid':sid, 'volume':volume} + \ No newline at end of file diff --git a/zipline/test/test_messaging.py b/zipline/test/test_messaging.py index 15e91396..a03fe6a1 100644 --- a/zipline/test/test_messaging.py +++ b/zipline/test/test_messaging.py @@ -26,11 +26,10 @@ class SimulatorTestCase(unittest.TestCase): 'data_address' : "tcp://127.0.0.1:10101", 'feed_address' : "tcp://127.0.0.1:10102", 'merge_address' : "tcp://127.0.0.1:10103", - 'result_address' : "tcp://127.0.0.1:10104" + 'result_address' : "tcp://127.0.0.1:10104", + 'order_address' : "tcp://127.0.0.1:10105" } - self.addressesblarg = "test" - def get_simulator(self): return ThreadSimulator(self.addresses) diff --git a/zipline/transforms/technical.py b/zipline/transforms/technical.py index dcefc87e..df3d8b9a 100644 --- a/zipline/transforms/technical.py +++ b/zipline/transforms/technical.py @@ -30,7 +30,7 @@ class MovingAverage(BaseTransform): index = 0 for cur_event in self.events: cur_date = qutil.parse_date(cur_event['dt']) - if(cur_date - event_date): + if(cur_date - event_date) >= self.window: self.events.pop(index) self.current_total -= cur_event['price'] index += 1