From abe906e90e176ede10698114fe6e8a5183b3fd1f Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Wed, 16 May 2012 18:19:07 -0400 Subject: [PATCH] Minor tweaks. --- zipline/components/feed.py | 29 ++++------------------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/zipline/components/feed.py b/zipline/components/feed.py index 72ad3f9a..1099f257 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -44,9 +44,9 @@ class Feed(Aggregate): self.pull_socket = self.bind_data() self.feed_socket = self.bind_feed() - # ------------- - # Core Methods - # ------------- + # ------- + # Framing + # ------- def unframe(self, msg): return zp.DATASOURCE_UNFRAME(msg) @@ -58,27 +58,6 @@ class Feed(Aggregate): # Flow Control # ------------- - def drain(self): - """ - Send all messages in the buffer. - """ - self.draining = True - while self.pending_messages() > 0: - self.send_next() - - def send_next(self): - """ - Send the (chronologically) next message in the buffer. - """ - if not (self.is_full() or self.draining): - return - - event = self.next() - if(event != None): - self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) - self.sent_counters[event.source_id] += 1 - self.sent_count += 1 - def append(self, event): """ Add an event to the buffer for the source specified by @@ -100,7 +79,7 @@ class Feed(Aggregate): earliest_event = None #iterate over the queues of events from all sources #(1 queue per datasource) - for events in self.data_buffer.values(): + for events in self.data_buffer.itervalues(): if len(events) == 0: continue cur_source = events