From 3c5cb2a451be4ed73af0bed4f04dbfdf35c638a5 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Tue, 3 Jul 2012 08:35:16 -0400 Subject: [PATCH] Restore new monitor system and test suite. --- tests/test_monitor.py | 43 ++++++ zipline/core/monitor.py | 311 ++++++++++++++++++++++++++-------------- 2 files changed, 250 insertions(+), 104 deletions(-) create mode 100644 tests/test_monitor.py diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 00000000..5da84b2a --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,43 @@ +import gevent +from logbook.compat import LoggingHandler +from unittest2 import TestCase, skip + +from zipline.core.monitor import Controller + + +class TestMonitor(TestCase): + def setUp(self): + self.log_handler = LoggingHandler() + self.log_handler.push_application() + + def tearDown(self): + self.log_handler.pop_application() + + def test_init(self): + pub_socket = 'tcp://127.0.0.1:5000' + route_socket = 'tcp://127.0.0.1:5001' + + con = Controller(pub_socket, route_socket) + con.manage([]) + + def test_init_topology(self): + pub_socket = 'tcp://127.0.0.1:5000' + route_socket = 'tcp://127.0.0.1:5001' + + con = Controller(pub_socket, route_socket, ) + con.manage([ 'a', 'b', 'c', 'd' ]) + + @skip + def test_poll(self): + from mock_zmq import zmq_synthetic + pub_socket = 'tcp://127.0.0.1:5000' + route_socket = 'tcp://127.0.0.1:5001' + cancel_socket = 'tcp://127.0.0.1:5002' + + con = Controller(pub_socket, route_socket, cancel_socket) + con.manage([ 'a', 'b', 'c', 'd' ]) + con.zmq = zmq_synthetic + con.zmq_flavor = 'green' + + con.period = 0.00001 + gevent.spawn(con.run).join(timeout=con.period) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index bfb13037..be48a0fa 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -1,9 +1,12 @@ +import os import zmq +import sys import time import gevent import itertools import logbook import gevent_zeromq +from signal import SIGHUP, SIGINT from collections import OrderedDict @@ -11,14 +14,17 @@ from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ +from zipline.utils.protocol_utils import ndict + INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES CONTROLLER_TRANSITIONS = frozenset([ (-1 , INIT), (INIT , SOURCES_READY), (SOURCES_READY , RUNNING), - (INIT , TERMINATE), - (SOURCES_READY , TERMINATE), + + (INIT , TERMINATE), # pseudo failure mode + (SOURCES_READY , TERMINATE), # pseudo failure mode (RUNNING , TERMINATE), ]) @@ -32,6 +38,18 @@ class UnknownChatter(Exception): log = logbook.Logger('Controller') +# The scalars determining the timing of the monitor behavior for +# the system. + +PARAMETERS = ndict(dict( + GENERATIONAL_PERIOD = 8, + ALLOWED_SKIPPED_HEARTBEATS = 3, + ALLOWED_INVALID_HEARTBEATS = 3, + PRESTART_HEARBEATS = 3, + SOURCES_START_HEARTBEATS = 3, + SYSTEM_TIMEOUT = 50, +)) + class Controller(object): """ A N to M messaging system for inter component communication. @@ -43,38 +61,24 @@ class Controller(object): the individual components. :func message_sender: . - Topology is the set of components we expect to show up. - States are the transitions the sytems go through. The - simplest is from RUNNING -> NOT RUNNING . - - Usage:: - - controller = Controller( - 'tcp://127.0.0.1:5000', - 'tcp://127.0.0.1:5001', - ) - - # typically you'd want to run this async to your main - # program since it blocks indefinetely. - controller.manage( - [ TOPOLOGY ] - [ STATES ] - ) - """ - debug = False - period = 1 + # Turn on debug for verbose logging of the system. + debug = True + period = PARAMETERS.GENERATIONAL_PERIOD - def __init__(self, pub_socket, route_socket): + def __init__(self, pub_socket, route_socket, nosignals=False): + self.nosignals = False self.context = None self.zmq = None self.zmq_poller = None self.running = False - self.polling = False + self.alive = False self.tracked = set() + self.finished = set() + self.responses = set() self.ctime = 0 @@ -114,7 +118,7 @@ class Controller(object): self.zmq_poller = self.zmq.Poller return - def manage(self, topology, states=None, context=None): + def manage(self, topology): """ Give the controller a set set of components to manage and a set of state transitions for the entire system. @@ -128,31 +132,42 @@ class Controller(object): else: self.freeform = False self.topology = frozenset(topology) - - self.polling = True - self.state = CONTROL_STATES.INIT + self.alive = True @property def state(self): + #log.info('returned %s' % self._state) return self._state @state.setter def state(self, new): - old, self._state = self._state, new + old = self._state - if (old, new) not in CONTROLLER_TRANSITIONS: - raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) + if (old, new) in CONTROLLER_TRANSITIONS: + self._state = new + log.info("State Transition : %s -> %s" % (old, self._state)) else: - log.info("State Transition : %s -> %s" %(old, new)) + raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) def run(self): self.running = True self.init_zmq(self.zmq_flavor) + self.state = CONTROL_STATES.INIT + + # Interpreter SIDE EFFECT + # ----------------------- + # The last breathe of the interpreter will assume that we've + # failed unless we specify otherwise. + sys.exitfunc = self.signal_interrupt + # We overload this if ( and only if ) the topology exits + # cleanly. This prevents failure modes where the monitor + # dies. + try: return self._poll() # use a python loop except KeyboardInterrupt: - log.debug('Shutdown event loop') + log.info('Shutdown event loop') def log_status(self): """ @@ -172,6 +187,13 @@ class Controller(object): # Publications # ------------- + def send_go(self): + go_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.GO, + '' + ) + self.pub.send(go_frame) + def send_heart(self): if not self.running: return @@ -210,45 +232,53 @@ class Controller(object): assert self.route_socket assert self.pub_socket - assert self.cancel_socket + assert self.topology,\ + """"Must define topology to monitor, call setup_controller() on + your Zipline. """ # -- Publish -- # ============= self.pub = self.context.socket(self.zmq.PUB) self.pub.bind(self.pub_socket) - - # -- Cancel -- - # ============= - assert isinstance(self.cancel_socket,basestring), self.cancel_socket - self.cancel = self.context.socket(self.zmq.REP) - self.cancel.connect(self.cancel_socket) + self.pub.setsockopt(zmq.LINGER, 0) # -- Router -- # ============= self.router = self.context.socket(self.zmq.ROUTER) self.router.bind(self.route_socket) - + self.router.setsockopt(zmq.LINGER, 0) poller = self.zmq.Poller() poller.register(self.router, self.zmq.POLLIN) - poller.register(self.cancel, self.zmq.POLLIN) - self.associated += [self.pub, self.router, self.cancel] + self.associated += [self.pub, self.router] # TODO: actually do this self.state = CONTROL_STATES.SOURCES_READY + self.state = CONTROL_STATES.RUNNING buffer = [] + # =================== + # Heartbeat Iteration + # =================== + for i in itertools.count(0): self.log_status() + + # Reset the responses for this cycle self.responses = set() + # broadcast the heartbeat packet self.ctime = time.time() self.send_heart() - while self.polling: - # Reset the responses for this cycle + # ============== + # Hearbeat Cycle + # ============== + + # Wait the responses + while self.alive: socks = dict(poller.poll(self.period)) tic = time.time() @@ -270,28 +300,79 @@ class Controller(object): log.error('Invalid frame', rawmessage) pass - if socks.get(self.cancel) == self.zmq.POLLIN: - log.info('Received Cancellation') - rawmessage = self.cancel.recv() - self.cancel.send('') - self.shutdown(soft=True) - break + # ================ + # Heartbeat Stats + # ================ - self.beat() + complete = self.beat() + + # ================ + # Topology Status + # ================ + + # Is the entire topology told us its DONE + done = len(self.finished) == len(self.topology) + + # Is the entire topology shown up to the party + complete = len(self.tracked) == len(self.topology) + + if complete: + self.send_go() + + log.info('Heartbeat (%s, %s)' % (done, complete)) + + # ================ + # Exit Strategies + # ================ if self.zmq_flavor == 'green': gevent.sleep(0) - if self.state is CONTROL_STATES.TERMINATE: + # Will also fall out of loop when done, if using + # non-freeform topology + if done: + log.info('Entire topology exited cleanly') + self.shutdown(soft=True) + + # Noop exit func + sys.exitfunc = lambda: None + + # Send SIGHUP to buritto + self.signal_hangup() + + if not self.alive: break - if not self.polling: - break + def signal_hangup(self): + """ + A clean exit, inform the burrito ( and arbiter ) that + we're good. The topology exited cleanly and we can prove + it. + """ + if not self.nosignals: + ppid = os.getpid() + os.kill(ppid, SIGHUP) + else: + log.warning("Would SIGHUP here, but disabled") - # After loop exits - self.terminated = True + def signal_interrupt(self): + """ + Send a SIGINT in the error mode that the monitor's + interpreter exits. If the monitor dies the system is + considered a failure. + """ + if not self.nosignals: + ppid = os.getpid() + os.kill(ppid, SIGINT) + else: + log.warning("Would SIGINT here, but disabled") def beat(self): + """ + The tracking logic of the system. It's the "stethoscope" + that inspects to the heartbeats in a generation and + infers the state of the system from the responses. + """ # These the set overloaded operations # A & B ~ set.intersection @@ -303,24 +384,44 @@ class Controller(object): # send us back a response. # * new - Components we haven't heard from yet, but sent back the # right response. + # * finished - Components we were tracking but have now + # finished, when this set goes to zero this + # triggers the end of the topology. good = self.tracked & self.responses bad = self.tracked - good new = self.responses - good + missing = self.topology - self.tracked + for component in new: self.new(component) + if self.debug: + log.info('New component %r' % component) + for component in bad: self.fail(component) + if self.debug: + log.info('Bad component %r' % component) + + if self.debug: + for component in missing: + log.info('Missing component %r' % component) + + for component in self.tracked: + if component not in self.topology: + log.info('Uninvited component %r' % component) + # -------------- # Init Handlers # -------------- def new_source(self): - if self.state is CONTROL_STATES.RUNNING: - self.state = SOURCES_READY + #if self.state is CONTROL_STATES.RUNNING: + #self.state = SOURCES_READY + pass def new_universal(self): pass @@ -331,7 +432,11 @@ class Controller(object): if self.state is CONTROL_STATES.TERMINATE: return - log.info(' Now Tracking "%s" ' % component) + if component in self.finished: + #log.info("Got heartbeat from supposedly finished component") + return + + log.info('Now Tracking "%s" ' % component) universal = self.new_universal init_handlers = { @@ -342,7 +447,7 @@ class Controller(object): init_handlers.get(component, universal)() self.tracked.add(component) else: - # Some sort of socket collision has occured, this is + # Some sort of socket collision has occurred, this is # a very bad failure mode. raise UnknownChatter(component) @@ -353,8 +458,8 @@ class Controller(object): def fail_universal(self): pass # TODO: this requires higher order functionality - #log.error('System in exception state, shutting down') - #self.shutdown(soft=True) + log.error('System in exception state, shutting down') + self.shutdown(soft=True) def fail(self, component): if self.state is CONTROL_STATES.TERMINATE: @@ -364,16 +469,22 @@ class Controller(object): fail_handlers = { } if component in self.topology or self.freeform: - log.warning('Component "%s" timed out' % component) + log.warning('Component "%s" missed heartbeat' % component) self.tracked.remove(component) - fail_handlers.get(component, universal)() + + # TODO: determine when this propogates to a true + # failure, missing one heartbeat could just mean that + # its CPU overloaded + #fail_handlers.get(component, universal)() # ------------------- # Completion Handling # ------------------- def done(self, component): - log.info('Component "%s" signaled done.' % component) + self.finished.add(component) + self.tracked.discard(component) + log.info('Component "%s" finished.' % component) # -------------- # Error Handling @@ -393,6 +504,7 @@ class Controller(object): if component in self.topology or self.freeform: self.error_replay[(component, time.time())] = failure log.error('Component in exception state: %s' % component) + log.error(str(failure)) exception_handlers.get(component, universal)() else: @@ -406,30 +518,48 @@ class Controller(object): """ Check for proper framing at the transport layer. Seperates the proper frames from anything else that might - be coming over the wire. Which shouldn't happen ... right? + be coming over the wire. """ - identity = msg[0] + + identity = msg[0] # identity of the socket id, status = CONTROL_UNFRAME(msg[1]) - # A component is telling us its alive: + # I'm alive, condemned to be a free process in the cold + # cold dark absurd Zipline universe. + if id is CONTROL_PROTOCOL.READY: + self.responses.add(identity) + return + + # The heartbeat love song between a component and the + # controller if id is CONTROL_PROTOCOL.OK: if status == str(self.ctime): + # Go to your bosom; knock there, and ask your heart what + # it doth know... self.responses.add(identity) + elif status < self.ctime: + # False face must hide what the false heart doth know. + log.warning('Delayed heartbeat received.') else: # Otherwise its something weird and we don't know # what to do so just say so, probably line noise # from ZeroMQ - log.error("Weird stuff happened: %s" % msg) + + # What's in a name? that which we call a rose... + log.error("Weird heartbeat packet happened: %s" % msg) + return # A component is telling us it failed, and how if id is CONTROL_PROTOCOL.EXCEPTION: self.exception(identity, status) + return # A component is telling us its done with work and won't # be talking to us anymore if id is CONTROL_PROTOCOL.DONE: self.done(identity) + return # ------------------- # Hooks for Endpoints @@ -474,54 +604,27 @@ class Controller(object): def do_error_replay(self): for (component, time), error in self.error_replay.iteritems(): - log.info('Error Log for -- %s --:\n%s' % - (component, error)) + log.debug('Component Log for -- %s --:\n%s' % (component, error)) - def shutdown(self, hard=False, soft=True, context=None): + def shutdown(self, hard=False, soft=True): + + assert hard or soft, """ Must specify kill hard or soft """ if self.state is CONTROL_STATES.TERMINATE: return - if not self.polling: - return - - self.polling = False - - assert hard or soft, """ Must specify kill hard or soft """ + self.alive = False if hard: self.state = CONTROL_STATES.TERMINATE - log.info('Hard Shutdown') - #for asoc in self.associated: - #asoc.close() - if soft: self.state = CONTROL_STATES.TERMINATE - log.info('Soft Shutdown') self.send_softkill() - #for asoc in self.associated: - #asoc.close() - self.do_error_replay() -if __name__ == '__main__': - - print 'Running on '\ - 'tcp://127.0.0.1:5000 '\ - 'tcp://127.0.0.1:5001 ' - - controller = Controller( - 'tcp://127.0.0.1:5000', - 'tcp://127.0.0.1:5001', - ) - controller.zmq_flavor = 'green' - - controller.manage( - 'freeform', - [] - ) - controller.run() + self.pub.close() + self.router.close()