Restore new monitor system and test suite.

This commit is contained in:
Stephen Diehl
2012-07-03 08:35:16 -04:00
parent 6dbbcfaf66
commit 3c5cb2a451
2 changed files with 250 additions and 104 deletions
+43
View File
@@ -0,0 +1,43 @@
import gevent
from logbook.compat import LoggingHandler
from unittest2 import TestCase, skip
from zipline.core.monitor import Controller
class TestMonitor(TestCase):
def setUp(self):
self.log_handler = LoggingHandler()
self.log_handler.push_application()
def tearDown(self):
self.log_handler.pop_application()
def test_init(self):
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
con = Controller(pub_socket, route_socket)
con.manage([])
def test_init_topology(self):
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
con = Controller(pub_socket, route_socket, )
con.manage([ 'a', 'b', 'c', 'd' ])
@skip
def test_poll(self):
from mock_zmq import zmq_synthetic
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
cancel_socket = 'tcp://127.0.0.1:5002'
con = Controller(pub_socket, route_socket, cancel_socket)
con.manage([ 'a', 'b', 'c', 'd' ])
con.zmq = zmq_synthetic
con.zmq_flavor = 'green'
con.period = 0.00001
gevent.spawn(con.run).join(timeout=con.period)
+207 -104
View File
@@ -1,9 +1,12 @@
import os
import zmq
import sys
import time
import gevent
import itertools
import logbook
import gevent_zeromq
from signal import SIGHUP, SIGINT
from collections import OrderedDict
@@ -11,14 +14,17 @@ from zipline.utils.gpoll import _Poller as GeventPoller
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
from zipline.utils.protocol_utils import ndict
INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES
CONTROLLER_TRANSITIONS = frozenset([
(-1 , INIT),
(INIT , SOURCES_READY),
(SOURCES_READY , RUNNING),
(INIT , TERMINATE),
(SOURCES_READY , TERMINATE),
(INIT , TERMINATE), # pseudo failure mode
(SOURCES_READY , TERMINATE), # pseudo failure mode
(RUNNING , TERMINATE),
])
@@ -32,6 +38,18 @@ class UnknownChatter(Exception):
log = logbook.Logger('Controller')
# The scalars determining the timing of the monitor behavior for
# the system.
PARAMETERS = ndict(dict(
GENERATIONAL_PERIOD = 8,
ALLOWED_SKIPPED_HEARTBEATS = 3,
ALLOWED_INVALID_HEARTBEATS = 3,
PRESTART_HEARBEATS = 3,
SOURCES_START_HEARTBEATS = 3,
SYSTEM_TIMEOUT = 50,
))
class Controller(object):
"""
A N to M messaging system for inter component communication.
@@ -43,38 +61,24 @@ class Controller(object):
the individual components.
:func message_sender: .
Topology is the set of components we expect to show up.
States are the transitions the sytems go through. The
simplest is from RUNNING -> NOT RUNNING .
Usage::
controller = Controller(
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
# typically you'd want to run this async to your main
# program since it blocks indefinetely.
controller.manage(
[ TOPOLOGY ]
[ STATES ]
)
"""
debug = False
period = 1
# Turn on debug for verbose logging of the system.
debug = True
period = PARAMETERS.GENERATIONAL_PERIOD
def __init__(self, pub_socket, route_socket):
def __init__(self, pub_socket, route_socket, nosignals=False):
self.nosignals = False
self.context = None
self.zmq = None
self.zmq_poller = None
self.running = False
self.polling = False
self.alive = False
self.tracked = set()
self.finished = set()
self.responses = set()
self.ctime = 0
@@ -114,7 +118,7 @@ class Controller(object):
self.zmq_poller = self.zmq.Poller
return
def manage(self, topology, states=None, context=None):
def manage(self, topology):
"""
Give the controller a set set of components to manage and
a set of state transitions for the entire system.
@@ -128,31 +132,42 @@ class Controller(object):
else:
self.freeform = False
self.topology = frozenset(topology)
self.polling = True
self.state = CONTROL_STATES.INIT
self.alive = True
@property
def state(self):
#log.info('returned %s' % self._state)
return self._state
@state.setter
def state(self, new):
old, self._state = self._state, new
old = self._state
if (old, new) not in CONTROLLER_TRANSITIONS:
raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new))
if (old, new) in CONTROLLER_TRANSITIONS:
self._state = new
log.info("State Transition : %s -> %s" % (old, self._state))
else:
log.info("State Transition : %s -> %s" %(old, new))
raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new))
def run(self):
self.running = True
self.init_zmq(self.zmq_flavor)
self.state = CONTROL_STATES.INIT
# Interpreter SIDE EFFECT
# -----------------------
# The last breathe of the interpreter will assume that we've
# failed unless we specify otherwise.
sys.exitfunc = self.signal_interrupt
# We overload this if ( and only if ) the topology exits
# cleanly. This prevents failure modes where the monitor
# dies.
try:
return self._poll() # use a python loop
except KeyboardInterrupt:
log.debug('Shutdown event loop')
log.info('Shutdown event loop')
def log_status(self):
"""
@@ -172,6 +187,13 @@ class Controller(object):
# Publications
# -------------
def send_go(self):
go_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.GO,
''
)
self.pub.send(go_frame)
def send_heart(self):
if not self.running:
return
@@ -210,45 +232,53 @@ class Controller(object):
assert self.route_socket
assert self.pub_socket
assert self.cancel_socket
assert self.topology,\
""""Must define topology to monitor, call setup_controller() on
your Zipline. """
# -- Publish --
# =============
self.pub = self.context.socket(self.zmq.PUB)
self.pub.bind(self.pub_socket)
# -- Cancel --
# =============
assert isinstance(self.cancel_socket,basestring), self.cancel_socket
self.cancel = self.context.socket(self.zmq.REP)
self.cancel.connect(self.cancel_socket)
self.pub.setsockopt(zmq.LINGER, 0)
# -- Router --
# =============
self.router = self.context.socket(self.zmq.ROUTER)
self.router.bind(self.route_socket)
self.router.setsockopt(zmq.LINGER, 0)
poller = self.zmq.Poller()
poller.register(self.router, self.zmq.POLLIN)
poller.register(self.cancel, self.zmq.POLLIN)
self.associated += [self.pub, self.router, self.cancel]
self.associated += [self.pub, self.router]
# TODO: actually do this
self.state = CONTROL_STATES.SOURCES_READY
self.state = CONTROL_STATES.RUNNING
buffer = []
# ===================
# Heartbeat Iteration
# ===================
for i in itertools.count(0):
self.log_status()
# Reset the responses for this cycle
self.responses = set()
# broadcast the heartbeat packet
self.ctime = time.time()
self.send_heart()
while self.polling:
# Reset the responses for this cycle
# ==============
# Hearbeat Cycle
# ==============
# Wait the responses
while self.alive:
socks = dict(poller.poll(self.period))
tic = time.time()
@@ -270,28 +300,79 @@ class Controller(object):
log.error('Invalid frame', rawmessage)
pass
if socks.get(self.cancel) == self.zmq.POLLIN:
log.info('Received Cancellation')
rawmessage = self.cancel.recv()
self.cancel.send('')
self.shutdown(soft=True)
break
# ================
# Heartbeat Stats
# ================
self.beat()
complete = self.beat()
# ================
# Topology Status
# ================
# Is the entire topology told us its DONE
done = len(self.finished) == len(self.topology)
# Is the entire topology shown up to the party
complete = len(self.tracked) == len(self.topology)
if complete:
self.send_go()
log.info('Heartbeat (%s, %s)' % (done, complete))
# ================
# Exit Strategies
# ================
if self.zmq_flavor == 'green':
gevent.sleep(0)
if self.state is CONTROL_STATES.TERMINATE:
# Will also fall out of loop when done, if using
# non-freeform topology
if done:
log.info('Entire topology exited cleanly')
self.shutdown(soft=True)
# Noop exit func
sys.exitfunc = lambda: None
# Send SIGHUP to buritto
self.signal_hangup()
if not self.alive:
break
if not self.polling:
break
def signal_hangup(self):
"""
A clean exit, inform the burrito ( and arbiter ) that
we're good. The topology exited cleanly and we can prove
it.
"""
if not self.nosignals:
ppid = os.getpid()
os.kill(ppid, SIGHUP)
else:
log.warning("Would SIGHUP here, but disabled")
# After loop exits
self.terminated = True
def signal_interrupt(self):
"""
Send a SIGINT in the error mode that the monitor's
interpreter exits. If the monitor dies the system is
considered a failure.
"""
if not self.nosignals:
ppid = os.getpid()
os.kill(ppid, SIGINT)
else:
log.warning("Would SIGINT here, but disabled")
def beat(self):
"""
The tracking logic of the system. It's the "stethoscope"
that inspects to the heartbeats in a generation and
infers the state of the system from the responses.
"""
# These the set overloaded operations
# A & B ~ set.intersection
@@ -303,24 +384,44 @@ class Controller(object):
# send us back a response.
# * new - Components we haven't heard from yet, but sent back the
# right response.
# * finished - Components we were tracking but have now
# finished, when this set goes to zero this
# triggers the end of the topology.
good = self.tracked & self.responses
bad = self.tracked - good
new = self.responses - good
missing = self.topology - self.tracked
for component in new:
self.new(component)
if self.debug:
log.info('New component %r' % component)
for component in bad:
self.fail(component)
if self.debug:
log.info('Bad component %r' % component)
if self.debug:
for component in missing:
log.info('Missing component %r' % component)
for component in self.tracked:
if component not in self.topology:
log.info('Uninvited component %r' % component)
# --------------
# Init Handlers
# --------------
def new_source(self):
if self.state is CONTROL_STATES.RUNNING:
self.state = SOURCES_READY
#if self.state is CONTROL_STATES.RUNNING:
#self.state = SOURCES_READY
pass
def new_universal(self):
pass
@@ -331,7 +432,11 @@ class Controller(object):
if self.state is CONTROL_STATES.TERMINATE:
return
log.info(' Now Tracking "%s" ' % component)
if component in self.finished:
#log.info("Got heartbeat from supposedly finished component")
return
log.info('Now Tracking "%s" ' % component)
universal = self.new_universal
init_handlers = {
@@ -342,7 +447,7 @@ class Controller(object):
init_handlers.get(component, universal)()
self.tracked.add(component)
else:
# Some sort of socket collision has occured, this is
# Some sort of socket collision has occurred, this is
# a very bad failure mode.
raise UnknownChatter(component)
@@ -353,8 +458,8 @@ class Controller(object):
def fail_universal(self):
pass
# TODO: this requires higher order functionality
#log.error('System in exception state, shutting down')
#self.shutdown(soft=True)
log.error('System in exception state, shutting down')
self.shutdown(soft=True)
def fail(self, component):
if self.state is CONTROL_STATES.TERMINATE:
@@ -364,16 +469,22 @@ class Controller(object):
fail_handlers = { }
if component in self.topology or self.freeform:
log.warning('Component "%s" timed out' % component)
log.warning('Component "%s" missed heartbeat' % component)
self.tracked.remove(component)
fail_handlers.get(component, universal)()
# TODO: determine when this propogates to a true
# failure, missing one heartbeat could just mean that
# its CPU overloaded
#fail_handlers.get(component, universal)()
# -------------------
# Completion Handling
# -------------------
def done(self, component):
log.info('Component "%s" signaled done.' % component)
self.finished.add(component)
self.tracked.discard(component)
log.info('Component "%s" finished.' % component)
# --------------
# Error Handling
@@ -393,6 +504,7 @@ class Controller(object):
if component in self.topology or self.freeform:
self.error_replay[(component, time.time())] = failure
log.error('Component in exception state: %s' % component)
log.error(str(failure))
exception_handlers.get(component, universal)()
else:
@@ -406,30 +518,48 @@ class Controller(object):
"""
Check for proper framing at the transport layer.
Seperates the proper frames from anything else that might
be coming over the wire. Which shouldn't happen ... right?
be coming over the wire.
"""
identity = msg[0]
identity = msg[0] # identity of the socket
id, status = CONTROL_UNFRAME(msg[1])
# A component is telling us its alive:
# I'm alive, condemned to be a free process in the cold
# cold dark absurd Zipline universe.
if id is CONTROL_PROTOCOL.READY:
self.responses.add(identity)
return
# The heartbeat love song between a component and the
# controller
if id is CONTROL_PROTOCOL.OK:
if status == str(self.ctime):
# Go to your bosom; knock there, and ask your heart what
# it doth know...
self.responses.add(identity)
elif status < self.ctime:
# False face must hide what the false heart doth know.
log.warning('Delayed heartbeat received.')
else:
# Otherwise its something weird and we don't know
# what to do so just say so, probably line noise
# from ZeroMQ
log.error("Weird stuff happened: %s" % msg)
# What's in a name? that which we call a rose...
log.error("Weird heartbeat packet happened: %s" % msg)
return
# A component is telling us it failed, and how
if id is CONTROL_PROTOCOL.EXCEPTION:
self.exception(identity, status)
return
# A component is telling us its done with work and won't
# be talking to us anymore
if id is CONTROL_PROTOCOL.DONE:
self.done(identity)
return
# -------------------
# Hooks for Endpoints
@@ -474,54 +604,27 @@ class Controller(object):
def do_error_replay(self):
for (component, time), error in self.error_replay.iteritems():
log.info('Error Log for -- %s --:\n%s' %
(component, error))
log.debug('Component Log for -- %s --:\n%s' % (component, error))
def shutdown(self, hard=False, soft=True, context=None):
def shutdown(self, hard=False, soft=True):
assert hard or soft, """ Must specify kill hard or soft """
if self.state is CONTROL_STATES.TERMINATE:
return
if not self.polling:
return
self.polling = False
assert hard or soft, """ Must specify kill hard or soft """
self.alive = False
if hard:
self.state = CONTROL_STATES.TERMINATE
log.info('Hard Shutdown')
#for asoc in self.associated:
#asoc.close()
if soft:
self.state = CONTROL_STATES.TERMINATE
log.info('Soft Shutdown')
self.send_softkill()
#for asoc in self.associated:
#asoc.close()
self.do_error_replay()
if __name__ == '__main__':
print 'Running on '\
'tcp://127.0.0.1:5000 '\
'tcp://127.0.0.1:5001 '
controller = Controller(
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
controller.zmq_flavor = 'green'
controller.manage(
'freeform',
[]
)
controller.run()
self.pub.close()
self.router.close()