mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-28 20:38:21 +08:00
Polling work Checkpiont.
This commit is contained in:
+17
-7
@@ -83,6 +83,8 @@ class Component(object):
|
||||
self.zmq = zmq
|
||||
|
||||
self.context = self.zmq.Context()
|
||||
self.setup_poller()
|
||||
|
||||
self.open()
|
||||
self.setup_sync()
|
||||
self.setup_control()
|
||||
@@ -182,11 +184,11 @@ class Component(object):
|
||||
def bind_pull_socket(self, addr):
|
||||
pull_socket = self.context.socket(self.zmq.PULL)
|
||||
pull_socket.bind(addr)
|
||||
poller = self.zmq.Poller()
|
||||
poller.register(pull_socket, self.zmq.POLLIN)
|
||||
self.poll.register(pull_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(pull_socket)
|
||||
|
||||
return pull_socket, poller
|
||||
return pull_socket, self.poll
|
||||
|
||||
def connect_push_socket(self, addr):
|
||||
push_socket = self.context.socket(self.zmq.PUSH)
|
||||
@@ -211,11 +213,17 @@ class Component(object):
|
||||
sub_socket.setsockopt(self.zmq.SUBSCRIBE,'')
|
||||
self.sockets.append(sub_socket)
|
||||
|
||||
poller = self.zmq.Poller()
|
||||
poller.register(sub_socket, self.zmq.POLLIN)
|
||||
self.poll.register(sub_socket, self.zmq.POLLIN)
|
||||
|
||||
# TODO: migrate tuple unpacking to be consistent
|
||||
return sub_socket, poller
|
||||
return sub_socket
|
||||
|
||||
def setup_poller(self):
|
||||
"""
|
||||
Setup the poller used for multiplexing the incoming data
|
||||
handling sockets.
|
||||
"""
|
||||
|
||||
self.poll = self.zmq.Poller()
|
||||
|
||||
def setup_control(self):
|
||||
"""
|
||||
@@ -231,6 +239,8 @@ class Component(object):
|
||||
self.sync_socket = self.context.socket(self.zmq.REQ)
|
||||
self.sync_socket.connect(self.addresses['sync_address'])
|
||||
#self.sync_socket.setsockopt(self.zmq.LINGER,0)
|
||||
|
||||
# Explictly, a different poller for obvious reasons.
|
||||
self.sync_poller = self.zmq.Poller()
|
||||
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
|
||||
|
||||
|
||||
+13
-6
@@ -27,6 +27,7 @@ class ComponentHost(Component):
|
||||
self.sync_register = {}
|
||||
self.timeout = datetime.timedelta(seconds=5)
|
||||
self.gevent_needed = gevent_needed
|
||||
self.heartbeat_timeout = 2000
|
||||
|
||||
self.feed = ParallelBuffer()
|
||||
self.merge = MergedParallelBuffer()
|
||||
@@ -86,16 +87,22 @@ class ComponentHost(Component):
|
||||
if len(self.components) == 0:
|
||||
qutil.LOGGER.info("Component register is empty.")
|
||||
return True
|
||||
|
||||
for source, last_dt in self.sync_register.iteritems():
|
||||
if((cur_time - last_dt) > self.timeout):
|
||||
qutil.LOGGER.info("Time out for {source}. Current component registery: {reg}".format(source=source, reg=self.components))
|
||||
if (cur_time - last_dt) > self.timeout:
|
||||
qutil.LOGGER.info(
|
||||
"Time out for {source}. Current component registery: {reg}".
|
||||
format(source=source, reg=self.components)
|
||||
)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def loop(self):
|
||||
|
||||
while not self.is_timed_out():
|
||||
# wait for synchronization request
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
|
||||
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
|
||||
msg = self.sync_socket.recv()
|
||||
@@ -154,7 +161,7 @@ class ParallelBuffer(Component):
|
||||
|
||||
def do_work(self):
|
||||
# wait for synchronization reply from the host
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
|
||||
if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN:
|
||||
message = self.pull_socket.recv()
|
||||
@@ -302,7 +309,7 @@ class BaseTransform(Component):
|
||||
Establishes zmq connections.
|
||||
"""
|
||||
#create the feed.
|
||||
self.feed_socket, self.poller = self.connect_feed()
|
||||
self.feed_socket = self.connect_feed()
|
||||
#create the result PUSH
|
||||
self.result_socket = self.connect_merge()
|
||||
|
||||
@@ -313,7 +320,7 @@ class BaseTransform(Component):
|
||||
- call transform (subclass' method) on event
|
||||
- send the transformed event
|
||||
"""
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
socks = dict(self.poll.poll(2000)) #timeout after 2 seconds.
|
||||
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
|
||||
message = self.feed_socket.recv()
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
|
||||
+17
-22
@@ -44,15 +44,26 @@ class Controller(object):
|
||||
except zmq.ZMQError:
|
||||
raise Exception('Cannot not bind on %s' % pub_socket)
|
||||
|
||||
def run(self, debug_step=False, stats=True):
|
||||
def run(self, debug=False):
|
||||
self.polling = True
|
||||
|
||||
if self.debug or debug_step:
|
||||
return self._poll_verbose(True, stats)
|
||||
if debug:
|
||||
return self._poll(False)
|
||||
else:
|
||||
return self._poll(False, stats)
|
||||
return self._poll_fast()
|
||||
|
||||
def _poll_fast(self):
|
||||
"""
|
||||
C version of the polling forwarder.
|
||||
"""
|
||||
zmq.device(zmq.FORWARDER, self.pull, self.pub)
|
||||
|
||||
def _poll(self):
|
||||
"""
|
||||
Python version of the polling forwarder. With logging,
|
||||
mostly used for debugging.
|
||||
"""
|
||||
|
||||
def _poll(self, debug_step, stats):
|
||||
while self.polling:
|
||||
try:
|
||||
self.logging.info('msg')
|
||||
@@ -61,23 +72,7 @@ class Controller(object):
|
||||
except KeyboardInterrupt:
|
||||
self.polling = False
|
||||
break
|
||||
except Exception as e:
|
||||
# Its common to wrap these in wildcard exceptions so
|
||||
# that we don't loose messages, ever
|
||||
self.logging.error(str(e))
|
||||
self.failed += 1
|
||||
continue
|
||||
|
||||
def _poll_verbose(self, debug_step, stats):
|
||||
while self.polling:
|
||||
try:
|
||||
if debug_step:
|
||||
msg = self.pull.recv(copy=False)
|
||||
if self.dologging:
|
||||
self.logging.info(msg)
|
||||
self.pub.send(msg)
|
||||
self.success += 1
|
||||
except KeyboardInterrupt:
|
||||
except zmq.ZMQError:
|
||||
self.polling = False
|
||||
break
|
||||
except Exception as e:
|
||||
|
||||
@@ -12,16 +12,17 @@ class TestClient(qmsg.Component):
|
||||
self.expected_msg_count = expected_msg_count
|
||||
self.utest = utest
|
||||
self.prev_dt = None
|
||||
self.heartbeat_timeout = 2000
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
return "TEST_CLIENT"
|
||||
|
||||
def open(self):
|
||||
self.data_feed, self.poller = self.connect_result()
|
||||
self.data_feed = self.connect_result()
|
||||
|
||||
def do_work(self):
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
socks = dict(self.poll.poll(self.heartbeat_timeout))
|
||||
|
||||
if self.data_feed in socks and socks[self.data_feed] == self.zmq.POLLIN:
|
||||
msg = self.data_feed.recv()
|
||||
|
||||
Reference in New Issue
Block a user