From bb70efe4ebe45894d38734c3e86ea3a7c1525e60 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Fri, 24 Feb 2012 17:49:24 -0500 Subject: [PATCH] Polling work Checkpiont. --- zipline/component.py | 24 +++++++++++++++++------- zipline/messaging.py | 19 +++++++++++++------ zipline/monitor.py | 39 +++++++++++++++++---------------------- zipline/test/client.py | 5 +++-- 4 files changed, 50 insertions(+), 37 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 06a85b5b..95e887f6 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -83,6 +83,8 @@ class Component(object): self.zmq = zmq self.context = self.zmq.Context() + self.setup_poller() + self.open() self.setup_sync() self.setup_control() @@ -182,11 +184,11 @@ class Component(object): def bind_pull_socket(self, addr): pull_socket = self.context.socket(self.zmq.PULL) pull_socket.bind(addr) - poller = self.zmq.Poller() - poller.register(pull_socket, self.zmq.POLLIN) + self.poll.register(pull_socket, self.zmq.POLLIN) + self.sockets.append(pull_socket) - return pull_socket, poller + return pull_socket, self.poll def connect_push_socket(self, addr): push_socket = self.context.socket(self.zmq.PUSH) @@ -211,11 +213,17 @@ class Component(object): sub_socket.setsockopt(self.zmq.SUBSCRIBE,'') self.sockets.append(sub_socket) - poller = self.zmq.Poller() - poller.register(sub_socket, self.zmq.POLLIN) + self.poll.register(sub_socket, self.zmq.POLLIN) - # TODO: migrate tuple unpacking to be consistent - return sub_socket, poller + return sub_socket + + def setup_poller(self): + """ + Setup the poller used for multiplexing the incoming data + handling sockets. + """ + + self.poll = self.zmq.Poller() def setup_control(self): """ @@ -231,6 +239,8 @@ class Component(object): self.sync_socket = self.context.socket(self.zmq.REQ) self.sync_socket.connect(self.addresses['sync_address']) #self.sync_socket.setsockopt(self.zmq.LINGER,0) + + # Explictly, a different poller for obvious reasons. self.sync_poller = self.zmq.Poller() self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) diff --git a/zipline/messaging.py b/zipline/messaging.py index c7aa1d89..ebe23f57 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -27,6 +27,7 @@ class ComponentHost(Component): self.sync_register = {} self.timeout = datetime.timedelta(seconds=5) self.gevent_needed = gevent_needed + self.heartbeat_timeout = 2000 self.feed = ParallelBuffer() self.merge = MergedParallelBuffer() @@ -86,16 +87,22 @@ class ComponentHost(Component): if len(self.components) == 0: qutil.LOGGER.info("Component register is empty.") return True + for source, last_dt in self.sync_register.iteritems(): - if((cur_time - last_dt) > self.timeout): - qutil.LOGGER.info("Time out for {source}. Current component registery: {reg}".format(source=source, reg=self.components)) + if (cur_time - last_dt) > self.timeout: + qutil.LOGGER.info( + "Time out for {source}. Current component registery: {reg}". + format(source=source, reg=self.components) + ) return True + return False def loop(self): + while not self.is_timed_out(): # wait for synchronization request - socks = dict(self.poller.poll(2000)) #timeout after 2 seconds. + socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds. if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN: msg = self.sync_socket.recv() @@ -154,7 +161,7 @@ class ParallelBuffer(Component): def do_work(self): # wait for synchronization reply from the host - socks = dict(self.poller.poll(2000)) #timeout after 2 seconds. + socks = dict(self.poller.poll(self.heartbeat_timeout)) #timeout after 2 seconds. if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN: message = self.pull_socket.recv() @@ -302,7 +309,7 @@ class BaseTransform(Component): Establishes zmq connections. """ #create the feed. - self.feed_socket, self.poller = self.connect_feed() + self.feed_socket = self.connect_feed() #create the result PUSH self.result_socket = self.connect_merge() @@ -313,7 +320,7 @@ class BaseTransform(Component): - call transform (subclass' method) on event - send the transformed event """ - socks = dict(self.poller.poll(2000)) #timeout after 2 seconds. + socks = dict(self.poll.poll(2000)) #timeout after 2 seconds. if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: message = self.feed_socket.recv() if message == str(CONTROL_PROTOCOL.DONE): diff --git a/zipline/monitor.py b/zipline/monitor.py index 43ed7974..7c96db31 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -44,15 +44,26 @@ class Controller(object): except zmq.ZMQError: raise Exception('Cannot not bind on %s' % pub_socket) - def run(self, debug_step=False, stats=True): + def run(self, debug=False): self.polling = True - if self.debug or debug_step: - return self._poll_verbose(True, stats) + if debug: + return self._poll(False) else: - return self._poll(False, stats) + return self._poll_fast() + + def _poll_fast(self): + """ + C version of the polling forwarder. + """ + zmq.device(zmq.FORWARDER, self.pull, self.pub) + + def _poll(self): + """ + Python version of the polling forwarder. With logging, + mostly used for debugging. + """ - def _poll(self, debug_step, stats): while self.polling: try: self.logging.info('msg') @@ -61,23 +72,7 @@ class Controller(object): except KeyboardInterrupt: self.polling = False break - except Exception as e: - # Its common to wrap these in wildcard exceptions so - # that we don't loose messages, ever - self.logging.error(str(e)) - self.failed += 1 - continue - - def _poll_verbose(self, debug_step, stats): - while self.polling: - try: - if debug_step: - msg = self.pull.recv(copy=False) - if self.dologging: - self.logging.info(msg) - self.pub.send(msg) - self.success += 1 - except KeyboardInterrupt: + except zmq.ZMQError: self.polling = False break except Exception as e: diff --git a/zipline/test/client.py b/zipline/test/client.py index 8c3b408d..3ccf9e91 100644 --- a/zipline/test/client.py +++ b/zipline/test/client.py @@ -12,16 +12,17 @@ class TestClient(qmsg.Component): self.expected_msg_count = expected_msg_count self.utest = utest self.prev_dt = None + self.heartbeat_timeout = 2000 @property def get_id(self): return "TEST_CLIENT" def open(self): - self.data_feed, self.poller = self.connect_result() + self.data_feed = self.connect_result() def do_work(self): - socks = dict(self.poller.poll(2000)) #timeout after 2 seconds. + socks = dict(self.poll.poll(self.heartbeat_timeout)) if self.data_feed in socks and socks[self.data_feed] == self.zmq.POLLIN: msg = self.data_feed.recv()