diff --git a/zipline/components/feed.py b/zipline/components/feed.py index 72ad3f9a..1099f257 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -44,9 +44,9 @@ class Feed(Aggregate): self.pull_socket = self.bind_data() self.feed_socket = self.bind_feed() - # ------------- - # Core Methods - # ------------- + # ------- + # Framing + # ------- def unframe(self, msg): return zp.DATASOURCE_UNFRAME(msg) @@ -58,27 +58,6 @@ class Feed(Aggregate): # Flow Control # ------------- - def drain(self): - """ - Send all messages in the buffer. - """ - self.draining = True - while self.pending_messages() > 0: - self.send_next() - - def send_next(self): - """ - Send the (chronologically) next message in the buffer. - """ - if not (self.is_full() or self.draining): - return - - event = self.next() - if(event != None): - self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) - self.sent_counters[event.source_id] += 1 - self.sent_count += 1 - def append(self, event): """ Add an event to the buffer for the source specified by @@ -100,7 +79,7 @@ class Feed(Aggregate): earliest_event = None #iterate over the queues of events from all sources #(1 queue per datasource) - for events in self.data_buffer.values(): + for events in self.data_buffer.itervalues(): if len(events) == 0: continue cur_source = events