From a63a7c6deae77bbaf15ddb1deaa4da4f89658bc8 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Fri, 24 Feb 2012 18:26:57 -0500 Subject: [PATCH] Untangled the poller knot! :) --- zipline/component.py | 34 +++++++++++++++++----------------- zipline/messaging.py | 12 ++++++------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 5862e7e4..4620e1d8 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -188,7 +188,7 @@ class Component(object): self.sockets.append(pull_socket) - return pull_socket, self.poll + return pull_socket def connect_push_socket(self, addr): push_socket = self.context.socket(self.zmq.PUSH) @@ -246,20 +246,20 @@ class Component(object): self.sockets.append(self.sync_socket) - #def debug(self): - #return ( - #self.get_id , - #self.huid , - #socket.gethostname() , - #os.getpid() , - #hex(id(self)) , - #) + def debug(self): + return ( + self.get_id , + self.huid , + socket.gethostname() , + os.getpid() , + hex(id(self)) , + ) - #def __repr__(self): - #return "<{name} {uuid} at {host} {pid} {pointer}>".format( - #name = self.get_id , - #uuid = self.huid , - #host = socket.gethostname() , - #pid = os.getpid() , - #pointer = hex(id(self)) , - #) + def __repr__(self): + return "<{name} {uuid} at {host} {pid} {pointer}>".format( + name = self.get_id , + uuid = self.huid , + host = socket.gethostname() , + pid = os.getpid() , + pointer = hex(id(self)) , + ) diff --git a/zipline/messaging.py b/zipline/messaging.py index 1613e388..afe69a9c 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -161,12 +161,12 @@ class ParallelBuffer(Component): self.data_buffer[source_id] = [] def open(self): - self.pull_socket, self.poller = self.bind_data() - self.feed_socket = self.bind_feed() + self.pull_socket = self.bind_data() + self.feed_socket = self.bind_feed() def do_work(self): # wait for synchronization reply from the host - socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds. + socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds. if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN: message = self.pull_socket.recv() @@ -265,8 +265,8 @@ class MergedParallelBuffer(ParallelBuffer): ParallelBuffer.__init__(self) def open(self): - self.pull_socket, self.poller = self.bind_merge() - self.feed_socket = self.bind_result() + self.pull_socket = self.bind_merge() + self.feed_socket = self.bind_result() def next(self): """Get the next merged message from the feed buffer.""" @@ -325,7 +325,7 @@ class BaseTransform(Component): - call transform (subclass' method) on event - send the transformed event """ - socks = dict(self.poll.poll(2000)) #timeout after 2 seconds. + socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds. if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: message = self.feed_socket.recv() if message == str(CONTROL_PROTOCOL.DONE):