Merge pull request #27 from quantopian/date_fixes

Updated ordersource to be concurrency agnostic! *woohoo*
This commit is contained in:
fawce
2012-04-09 08:12:56 -07:00
+29 -31
View File
@@ -194,6 +194,7 @@ class OrderDataSource(qmsg.DataSource):
def do_work(self):
#TODO: if this is the first iteration, break deadlock by sending a dummy order
if(self.sent_count == 0):
self.send(zp.namedict({}))
@@ -204,39 +205,36 @@ class OrderDataSource(qmsg.DataSource):
# TODO : this can be written in a concurrency agnostic
# way... have a chat with Fawce about this ~Steve
while True:
(rlist, wlist, xlist) = select(
[self.order_socket],
[],
[self.order_socket],
#allow half the time of a heartbeat for the order
#timeout, so we have time to signal we are done.
timeout=self.heartbeat_timeout/2000
)
#no more orders, should this be an error condition?
if len(rlist) == 0 or len(xlist) > 0:
# no order message means there was a timeout above.
# this is an indeterminant case, we don't know the cause.
# the safest move is to break out of this loop and try again
break
order_msg = rlist[0].recv()
if order_msg == str(zp.ORDER_PROTOCOL.DONE):
self.signal_done()
return
if order_msg == str(zp.ORDER_PROTOCOL.BREAK):
break
while True:
# poll all the sockets
# we reduce the timeout here by a factor of 2, because we need
# to potentially receive the client's done message before the
# controller or heartbeat times out.
socks = dict(self.poll.poll(self.heartbeat_timeout/2))
order = zp.ORDER_UNFRAME(order_msg)
#send the order along
self.send(order)
count += 1
self.sent_count += 1
# see if the poller has results for the result_feed
if self.order_socket in socks and \
socks[self.order_socket] == self.zmq.POLLIN:
order_msg = self.order_socket.recv()
if order_msg == str(zp.ORDER_PROTOCOL.DONE):
self.signal_done()
return
if order_msg == str(zp.ORDER_PROTOCOL.BREAK):
break
order = zp.ORDER_UNFRAME(order_msg)
#send the order along
self.send(order)
count += 1
self.sent_count += 1
else:
# no orders, break out
break
#TODO: we have to send at least one dummy order per do_work iteration
# or the feed will block waiting for our messages.