new client base class, adding order function. committing to allow branch

This commit is contained in:
fawce
2012-02-17 17:25:04 -05:00
parent 1cabd30158
commit 59d43608b9
3 changed files with 45 additions and 5 deletions
+42 -1
View File
@@ -108,6 +108,12 @@ class Component(object):
def connect_data(self):
return self.connect_push_socket(self.addresses['data_address'])
def bind_order(self):
return self.bind_pull_socket(self.addresses['order_address'])
def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])
def bind_feed(self):
return self.bind_pub_socket(self.addresses['feed_address'])
@@ -259,6 +265,7 @@ class ParallelBuffer(Component):
self.sent_count = 0
self.received_count = 0
self.draining = False
#data source component ID -> List of messages
self.data_buffer = {}
self.ds_finished_counter = 0
@@ -422,7 +429,8 @@ class BaseTransform(Component):
return
event = json.loads(message)
cur_state = self.transform(event)
cur_state['dt'] = event['dt']
#TODO: do we want to relay the datetime again? maybe drop this?
#cur_state['dt'] = event['dt']
cur_state['id'] = self.state['name']
self.result_socket.send(json.dumps(cur_state), self.zmq.NOBLOCK)
@@ -473,3 +481,36 @@ class DataSource(Component):
def get_type(self):
raise NotImplemented
class Client(Component):
def __init__(self):
Component.__init__(self)
self.received_count = 0
self.prev_dt = None
def open(self):
self.data_feed, self.poller = self.connect_result()
self.order_feed, self.poller = self.connect_order()
def do_work(self):
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
if self.data_feed in socks and socks[self.data_feed] == self.zmq.POLLIN:
msg = self.data_feed.recv()
if(self.is_done_message(msg)):
qutil.LOGGER.info("Client is DONE!")
self.signal_done()
return
self.received_count += 1
event = json.loads(msg)
self.handle_event(event)
def handle_event(self, event):
NotImplemented
def order(self, sid, volume):
order = {'sid':sid, 'volume':volume}
+2 -3
View File
@@ -26,11 +26,10 @@ class SimulatorTestCase(unittest.TestCase):
'data_address' : "tcp://127.0.0.1:10101",
'feed_address' : "tcp://127.0.0.1:10102",
'merge_address' : "tcp://127.0.0.1:10103",
'result_address' : "tcp://127.0.0.1:10104"
'result_address' : "tcp://127.0.0.1:10104",
'order_address' : "tcp://127.0.0.1:10105"
}
self.addressesblarg = "test"
def get_simulator(self):
return ThreadSimulator(self.addresses)
+1 -1
View File
@@ -30,7 +30,7 @@ class MovingAverage(BaseTransform):
index = 0
for cur_event in self.events:
cur_date = qutil.parse_date(cur_event['dt'])
if(cur_date - event_date):
if(cur_date - event_date) >= self.window:
self.events.pop(index)
self.current_total -= cur_event['price']
index += 1