diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index b60e7fe4..76ad2a19 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -194,6 +194,7 @@ class OrderDataSource(qmsg.DataSource): def do_work(self): + #TODO: if this is the first iteration, break deadlock by sending a dummy order if(self.sent_count == 0): self.send(zp.namedict({})) @@ -204,39 +205,36 @@ class OrderDataSource(qmsg.DataSource): # TODO : this can be written in a concurrency agnostic # way... have a chat with Fawce about this ~Steve - while True: - - (rlist, wlist, xlist) = select( - [self.order_socket], - [], - [self.order_socket], - #allow half the time of a heartbeat for the order - #timeout, so we have time to signal we are done. - timeout=self.heartbeat_timeout/2000 - ) - - #no more orders, should this be an error condition? - if len(rlist) == 0 or len(xlist) > 0: - # no order message means there was a timeout above. - # this is an indeterminant case, we don't know the cause. - # the safest move is to break out of this loop and try again - break - - order_msg = rlist[0].recv() - - if order_msg == str(zp.ORDER_PROTOCOL.DONE): - self.signal_done() - return - - if order_msg == str(zp.ORDER_PROTOCOL.BREAK): - break + while True: + # poll all the sockets + # we reduce the timeout here by a factor of 2, because we need + # to potentially receive the client's done message before the + # controller or heartbeat times out. + socks = dict(self.poll.poll(self.heartbeat_timeout/2)) - order = zp.ORDER_UNFRAME(order_msg) - #send the order along - self.send(order) - count += 1 - self.sent_count += 1 + # see if the poller has results for the result_feed + if self.order_socket in socks and \ + socks[self.order_socket] == self.zmq.POLLIN: + + order_msg = self.order_socket.recv() + + if order_msg == str(zp.ORDER_PROTOCOL.DONE): + self.signal_done() + return + + if order_msg == str(zp.ORDER_PROTOCOL.BREAK): + break + + order = zp.ORDER_UNFRAME(order_msg) + #send the order along + self.send(order) + count += 1 + self.sent_count += 1 + + else: + # no orders, break out + break #TODO: we have to send at least one dummy order per do_work iteration # or the feed will block waiting for our messages.