Minor tweaks.

This commit is contained in:
Stephen Diehl
2012-05-16 18:19:07 -04:00
parent 7f4c94d4e1
commit abe906e90e
+4 -25
View File
@@ -44,9 +44,9 @@ class Feed(Aggregate):
self.pull_socket = self.bind_data()
self.feed_socket = self.bind_feed()
# -------------
# Core Methods
# -------------
# -------
# Framing
# -------
def unframe(self, msg):
return zp.DATASOURCE_UNFRAME(msg)
@@ -58,27 +58,6 @@ class Feed(Aggregate):
# Flow Control
# -------------
def drain(self):
"""
Send all messages in the buffer.
"""
self.draining = True
while self.pending_messages() > 0:
self.send_next()
def send_next(self):
"""
Send the (chronologically) next message in the buffer.
"""
if not (self.is_full() or self.draining):
return
event = self.next()
if(event != None):
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
self.sent_counters[event.source_id] += 1
self.sent_count += 1
def append(self, event):
"""
Add an event to the buffer for the source specified by
@@ -100,7 +79,7 @@ class Feed(Aggregate):
earliest_event = None
#iterate over the queues of events from all sources
#(1 queue per datasource)
for events in self.data_buffer.values():
for events in self.data_buffer.itervalues():
if len(events) == 0:
continue
cur_source = events