mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-28 21:10:25 +08:00
Untangled the poller knot! :)
This commit is contained in:
+17
-17
@@ -188,7 +188,7 @@ class Component(object):
|
||||
|
||||
self.sockets.append(pull_socket)
|
||||
|
||||
return pull_socket, self.poll
|
||||
return pull_socket
|
||||
|
||||
def connect_push_socket(self, addr):
|
||||
push_socket = self.context.socket(self.zmq.PUSH)
|
||||
@@ -246,20 +246,20 @@ class Component(object):
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
#def debug(self):
|
||||
#return (
|
||||
#self.get_id ,
|
||||
#self.huid ,
|
||||
#socket.gethostname() ,
|
||||
#os.getpid() ,
|
||||
#hex(id(self)) ,
|
||||
#)
|
||||
def debug(self):
|
||||
return (
|
||||
self.get_id ,
|
||||
self.huid ,
|
||||
socket.gethostname() ,
|
||||
os.getpid() ,
|
||||
hex(id(self)) ,
|
||||
)
|
||||
|
||||
#def __repr__(self):
|
||||
#return "<{name} {uuid} at {host} {pid} {pointer}>".format(
|
||||
#name = self.get_id ,
|
||||
#uuid = self.huid ,
|
||||
#host = socket.gethostname() ,
|
||||
#pid = os.getpid() ,
|
||||
#pointer = hex(id(self)) ,
|
||||
#)
|
||||
def __repr__(self):
|
||||
return "<{name} {uuid} at {host} {pid} {pointer}>".format(
|
||||
name = self.get_id ,
|
||||
uuid = self.huid ,
|
||||
host = socket.gethostname() ,
|
||||
pid = os.getpid() ,
|
||||
pointer = hex(id(self)) ,
|
||||
)
|
||||
|
||||
@@ -161,12 +161,12 @@ class ParallelBuffer(Component):
|
||||
self.data_buffer[source_id] = []
|
||||
|
||||
def open(self):
|
||||
self.pull_socket, self.poller = self.bind_data()
|
||||
self.feed_socket = self.bind_feed()
|
||||
self.pull_socket = self.bind_data()
|
||||
self.feed_socket = self.bind_feed()
|
||||
|
||||
def do_work(self):
|
||||
# wait for synchronization reply from the host
|
||||
socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
|
||||
if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN:
|
||||
message = self.pull_socket.recv()
|
||||
@@ -265,8 +265,8 @@ class MergedParallelBuffer(ParallelBuffer):
|
||||
ParallelBuffer.__init__(self)
|
||||
|
||||
def open(self):
|
||||
self.pull_socket, self.poller = self.bind_merge()
|
||||
self.feed_socket = self.bind_result()
|
||||
self.pull_socket = self.bind_merge()
|
||||
self.feed_socket = self.bind_result()
|
||||
|
||||
def next(self):
|
||||
"""Get the next merged message from the feed buffer."""
|
||||
@@ -325,7 +325,7 @@ class BaseTransform(Component):
|
||||
- call transform (subclass' method) on event
|
||||
- send the transformed event
|
||||
"""
|
||||
socks = dict(self.poll.poll(2000)) #timeout after 2 seconds.
|
||||
socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
|
||||
message = self.feed_socket.recv()
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
|
||||
Reference in New Issue
Block a user