From 479ad502f6f0db73bfd2c590eb49fa3db166f80a Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 3 Aug 2012 01:05:53 -0400 Subject: [PATCH] addressing race condition between loop_send exit and DONE message delivery. --- tests/test_components.py | 4 +- zipline/core/component.py | 91 ++++++++++++++++++++------------------- zipline/core/monitor.py | 20 +++------ zipline/gens/zmqgen.py | 13 ------ 4 files changed, 56 insertions(+), 72 deletions(-) diff --git a/tests/test_components.py b/tests/test_components.py index bbd881e7..cf140677 100644 --- a/tests/test_components.py +++ b/tests/test_components.py @@ -77,7 +77,6 @@ class ComponentTestCase(TestCase): DATASOURCE_UNFRAME ) - for event in comp_a: log.info(event) @@ -155,6 +154,9 @@ class ComponentTestCase(TestCase): sorted_out = date_sorted_sources(*sources) + import time + time.sleep(.25) + prev = None sort_count = 0 for msg in sorted_out: diff --git a/zipline/core/component.py b/zipline/core/component.py index 82aebc88..78ee69c8 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -17,8 +17,6 @@ from collections import namedtuple # pyzmq import zmq -from zipline.gens.zmqgen import gen_from_poller - from zipline.core.monitor import PARAMETERS from zipline.protocol import ( @@ -36,6 +34,10 @@ class KillSignal(Exception): def __init__(self): pass +class ShutdownSignal(Exception): + def __init__(self): + pass + ComponentSocketArgs = namedtuple('ComponentSocketArgs',['uri','style','bind']) class Component(object): @@ -87,7 +89,7 @@ class Component(object): self.state_flag = COMPONENT_STATE.OK # track time of last ping we received from monitor - self.last_ping = None + self.last_ping = time.time() # Humanhashes make this way easier to debug because they stick # in your mind unlike a 32 byte string of random hex. @@ -108,7 +110,6 @@ class Component(object): self.recv_gen = self.create_recv_gen() - # ------------ # Core Methods # ------------ @@ -148,6 +149,13 @@ class Component(object): self.signal_done() + # keep heartbeating until we receive the shutdown + # message from the Monitor (raises a + # ShutdownSignal), or we don't hear from the Monitor + # for MAX_COMPONENT_WAIT. + while True: + self.heartbeat(timeout=1000) + except Exception as exc: self.handle_exception(exc) finally: @@ -170,10 +178,7 @@ class Component(object): try: # we block on ready here until monitor sends the GO self.wait_ready() - log.info("Starting to drain {id}".format(id=self.get_id)) - for event in gen_from_poller(self.poll, self.in_socket, self.unframe): - self.heartbeat() - # event.source_id = self.get_id + for event in self.gen_from_poller(self.poll, self.in_socket, self.unframe): yield event self.signal_done() @@ -182,13 +187,30 @@ class Component(object): finally: log.info("Exiting %r" % self) + def gen_from_poller(self, poller, in_socket, unframe): + + while True: + socks = dict(poller.poll(0)) + self.heartbeat() + if socks.get(in_socket) == zmq.POLLIN: + message = in_socket.recv() + if message == str(CONTROL_PROTOCOL.DONE): + break + else: + event = unframe(message) + yield event + def handle_exception(self, exc, re_raise=False): - if not isinstance(exc, KillSignal): - self.signal_exception(exc) - else: + if isinstance(exc, KillSignal): # if we get a kill signal, forcibly close all the # sockets. self.teardown_sockets() + elif isinstance(exc, ShutdownSignal): + # signal from monitor of an orderly shutdown, + # do nothing. + pass + else: + self.signal_exception(exc) def __iter__(self): return self @@ -210,6 +232,12 @@ class Component(object): for sock in self.sockets: sock.close() + def shutdown(self): + """ + Clean shutdown. + """ + raise ShutdownSignal() + def kill(self): """ Unclean shutdown. @@ -333,7 +361,7 @@ class Component(object): # Side effectful call from the monitor to unlock # and begin doing work only when the entire topology # of the system beings to come online - log.info('Unlocking ' + self.__class__.__name__) + log.info('Unlocking ' + self.get_id) self.unlock_ready() # ========= @@ -344,7 +372,7 @@ class Component(object): # data that are done during a clean shutdown. Inform the # monitor that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() + self.shutdown() break # ========= @@ -358,7 +386,7 @@ class Component(object): elif time.time() - start_wait > PARAMETERS.MAX_COMPONENT_WAIT: log.info('No go signal from monitor, %s exiting' \ - % self.__class__.__name__) + % self.get_id) self.kill() break @@ -407,7 +435,7 @@ class Component(object): # data that are done during a clean shutdown. Inform the # monitor that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() + self.shutdown() # ========= # Hard Kill @@ -419,7 +447,7 @@ class Component(object): # In case we didn't receive a ping, send a pre-emptive # pong to the monitor. - elif self.last_ping and time.time() - self.last_ping > 1: + elif time.time() - self.last_ping > 2: # send a ping ahead of schedule pre_pong = time.time() heartbeat_frame = CONTROL_FRAME( @@ -432,15 +460,14 @@ class Component(object): # doing work self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) self.last_ping = pre_pong - elif self.last_ping and \ - time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: + elif time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: # monitor is gone without sending the shutdown # signal, do a hard exit. self.kill() def signal_ready(self): - log.info(self.__class__.__name__ + ' is ready') + log.info(self.get_id + ' is ready') frame = CONTROL_FRAME( CONTROL_PROTOCOL.READY, '' @@ -470,13 +497,6 @@ class Component(object): self.control_out.send(done_frame) log.info("[%s] sent control done" % self.get_id) - # there is a narrow race condition where we finish just - # after the Monitor accepts our prior heartbeat, but just - # before the next one is sent. So, we hang around for one - # last heartbeat, and wait an unusually long time. - # TODO: decided if this is really necessary. - # self.heartbeat(timeout=5000) - # ----------- # Messaging # ----------- @@ -529,7 +549,6 @@ class Component(object): def bind_push_socket(self, addr): push_socket = self.context.socket(self.zmq.PUSH) push_socket.bind(addr) - self.out_socket = push_socket self.sockets.append(push_socket) return push_socket @@ -547,7 +566,6 @@ class Component(object): pull_socket = self.context.socket(self.zmq.PULL) pull_socket.bind(addr) self.poll.register(pull_socket, self.zmq.POLLIN) - self.sockets.append(pull_socket) return pull_socket @@ -556,26 +574,9 @@ class Component(object): push_socket = self.context.socket(self.zmq.PUSH) push_socket.connect(addr) self.sockets.append(push_socket) - self.out_socket = push_socket return push_socket - def bind_pub_socket(self, addr): - pub_socket = self.context.socket(self.zmq.PUB) - pub_socket.bind(addr) - self.out_socket = pub_socket - - return pub_socket - - def connect_sub_socket(self, addr): - sub_socket = self.context.socket(self.zmq.SUB) - sub_socket.connect(addr) - sub_socket.setsockopt(self.zmq.SUBSCRIBE,'') - self.sockets.append(sub_socket) - - self.poll.register(sub_socket, self.zmq.POLLIN) - - return sub_socket def setup_control(self): """ diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index d9e4bd19..7a920742 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -340,9 +340,8 @@ class Monitor(object): log.info("breaking out of initial heartbeat") break - # Has the entire topology told us its DONE - done = len(self.finished) == len(self.topology) - if done: + # Break out if the entire topology told us its DONE + if len(self.finished) == len(self.topology): break @@ -438,27 +437,22 @@ class Monitor(object): bad = self.tracked - good - self.finished new = self.responses - good - self.finished - missing = self.topology - self.tracked - self.finished - for component in new: self.new(component) - if self.debug: - log.info('New component %r' % component) - for component in bad: self.timed_out(component) - for component in missing: + missing = self.topology - self.tracked - self.finished + for component in missing: if self.debug: log.info('Missing component %r' % component) - if self.debug: - for component in self.tracked: - if component not in self.topology: - log.info('Uninvited component %r' % component) + for component in self.tracked: + if component not in self.topology: + log.info('Uninvited component %r' % component) # -------------- # Init Handlers diff --git a/zipline/gens/zmqgen.py b/zipline/gens/zmqgen.py index 463f8b42..66dbdca3 100644 --- a/zipline/gens/zmqgen.py +++ b/zipline/gens/zmqgen.py @@ -17,16 +17,3 @@ def gen_from_pull_socket(socket_uri, context, unframe): # this generator needs to know about the source_ids coming in via # the poller, and need to yield DONE messages for each # source_id. - -def gen_from_poller(poller, in_socket, unframe): - - while True: - socks = dict(poller.poll()) - - if socks.get(in_socket) == zmq.POLLIN: - message = in_socket.recv() - if message == str(zp.CONTROL_PROTOCOL.DONE): - break - else: - event = unframe(message) - yield event