Exception tracking at the controller level.

This commit is contained in:
Stephen Diehl
2012-04-03 14:04:11 -04:00
parent 9073d19780
commit e8d98084df
7 changed files with 385 additions and 181 deletions
+25 -16
View File
@@ -201,28 +201,25 @@ class Component(object):
debug since it mucks up your stacktraces.
"""
fail = None
if catch_exceptions:
try:
self._run()
except Exception as exc:
# TODO, we want to do this grab the stack
# frame so we can preserve stacktraces when we
# reraise the exception.
exc_info = sys.exc_info()
self.signal_exception(exc)
fail = exc
# Reraise the exception
raise exc_info[0], exc_info[1], exc_info[2]
finally:
self.shutdown()
self.teardown_sockets()
else:
try:
self._run()
finally:
self.shutdown()
self.teardown_sockets()
#else:
#try:
#self._run()
#finally:
#self.shutdown()
#self.teardown_sockets()
def working(self):
"""
@@ -234,7 +231,7 @@ class Component(object):
"""
return (not self.done)
def loop(self):
def loop(self, lockstep=True):
"""
Loop to do work while we still have work to do.
"""
@@ -294,6 +291,12 @@ class Component(object):
# ----------------------
def signal_exception(self, exc=None, scope=None):
"""
This is *very* important error tracking handler.
Will inform the system that the component has failed and
how it has failed.
"""
if scope == 'algo':
self.error_state = COMPONENT_FAILURE.ALGOEXCEPT
@@ -427,8 +430,14 @@ class Component(object):
if not self.controller:
return
self.control_out = self.controller.message_sender(context=self.context)
self.control_in = self.controller.message_listener(context=self.context)
self.control_out = self.controller.message_sender(
identity = self.get_id,
context = self.context,
)
self.control_in = self.controller.message_listener(
context = self.context
)
self.poll.register(self.control_in, self.zmq.POLLIN)
self.sockets.extend([self.control_in, self.control_out])
+5
View File
@@ -149,6 +149,11 @@ class SimulatedTrading(object):
sockets[7],
logging = qutil.LOGGER
)
# TODO: Not freeform
self.con.manage(
'freeform'
)
self.started = False
+1 -1
View File
@@ -128,7 +128,7 @@ class ComponentHost(Component):
return False
def loop(self):
def loop(self, lockstep=True):
while not self.is_timed_out():
# wait for synchronization request
+339 -79
View File
@@ -1,29 +1,131 @@
import zmq
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME
import time
import gevent
from gevent_zeromq import zmq
import util as qutil
from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES
# TODO: print -> qutil.LOGGER.info
# When you're cold and waiting for the train... draw ascii art!
#
# Roll Call ( Discovery )
# -----------------------
#
# Controller ( 'foo', 'bar', 'fizz', 'pop' )
# ------------------
# | | | |
# +---+
# | 0 | ? ? ?
# +---+
# |
# IDENTITY: foo
# get message: PROTOCOL.HEARTBEAT
# reply with PROTOCOL.OK
#
# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' )
# 'foo' in topology = YES ->
# track 'foo'
# ------------------
# | | | |
# +---+
# | 1 | ? ? ?
# +---+
# Heartbeating
# ------------
#
# Controller ( time = 2.717828 )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 0 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# get message: time = 2.717828
# reply with [ foo, 2.71828 ]
#
# Controller ( foo.status = OK )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
# |
# Controller tracks this node as good
# for this heartbeat
# Shutdown
# --------
#
# Controller ( state = RUNNING )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# send [ DONE ]
# Controller ( state = SHUTDOWN )
# Controller topology.remove('foo')
# ------------------
# | | |
# +---+ +---+ +---+ +---+
# | | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# yield, stop sending messages
# Termination
# ------------
#
# Controller ( state = TERMINATE )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# get message PROTOCOL.KILL
# Controller ( state = TERMINATE )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 0 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
class UnknownChatter(Exception):
def __init__(self, name):
self.named = name
def __str__(self):
return """Component calling itself "%s" talking on unexpected channel"""\
% self.named
class Controller(object):
"""
A N to M messaging system for inter component communication.
Ostensibly a broker of sorts. Putting messages to the broker
is durable, if the broker goes down messages will queue up
until the HWM and then go out when the new broker comes up.
The other end is not durable, it is simply PUB/SUB which has
the benefit of of allowing more fluid time evolution of the
whole system since the messaging passing topology will not
alter itself as a result of more nodes listening.
The actual brokerin' is either a Python loop ( slow ) or a
zmq.FORWARDER device ( fast ).
:param pull_socket: Socket to subscribe to for republication of
published messages. The endpoint for
:func message_sender: .
:param pub_socket: Socket to publish messages, the starting
point of :func message_listener: .
:param route_socket: Socket to listen for status updates for
the individual components.
:func message_sender: .
:param logging: Logging interface for tracking broker state
Defaults to None
Topology is the set of components we expect to show up.
States are the transitions the sytems go through. The
simplest is from RUNNING -> NOT RUNNING .
Usage::
controller = Controller(
@@ -33,31 +135,34 @@ class Controller(object):
# typically you'd want to run this async to your main
# program since it blocks indefinetely.
controller.run()
sub = self.controller.message_listener()
push = self.controller.message_sender()
push.send('DIE')
sub.recv()
controller.manage(
[ TOPOLOGY ]
[ STATES ]
)
"""
debug = False
period = 1
def __init__(self, pull_socket, pub_socket, logging = None):
def __init__(self, pub_socket, route_socket, logging = None):
self._ctx = None
polling = False
self.polling = polling
self.tracked = set()
self.responses = set()
self.ctime = 0
self.tic = time.time()
self.freeform = False
self.associated = []
self.pull_socket = pull_socket
self.push_socket = pull_socket # same port
self.pub_socket = pub_socket
self.sub_socket = pub_socket # same port
self.terminated = False
self.route_socket = route_socket
if logging:
self.logging = logging
@@ -66,52 +171,67 @@ class Controller(object):
self.logging = False
self.dologging = False
self.success = 0
self.failed = 0
def manage(self, topology, states=None, context=None):
"""
Give the controller a set set of components to manage and
a set of state transitions for the entire system.
"""
def run(self, debug=False, context=None):
"""
Run's the loop for the broker.
"""
# A freeform topology is where we heartbeat with anything
# that shows up.
if topology == 'freeform':
self.freeform = True
self.topology = frozenset([])
else:
self.freeform = False
self.topology = frozenset(topology)
default_states = [
CONTROL_STATES.RUNNING,
CONTROL_STATES.SHUTDOWN,
CONTROL_STATES.TERMINATE,
]
self.states = states or default_states
self.polling = True
# Start off in RUNNING, state
self.state = self.states[0]
if not context:
self._ctx = zmq.Context.instance()
else:
self._ctx = context
#if not debug:
#return self._poll_fast() # the c loop
#else:
def run(self, flavor):
"""
Abstracts
"""
assert flavor in ['thread', 'mp', 'green']
return self._poll() # use a python loop
def _poll_fast(self):
"""
C version of the polling forwarder.
"""
self.pull = self._ctx.socket(zmq.PULL)
self.pub = self._ctx.socket(zmq.PUB)
self.pull.bind(self.pull_socket)
self.pub.bind(self.pub_socket)
zmq.device(zmq.FORWARDER, self.pull, self.pub)
def log_status(self):
print "[Controller] Tracking : %s" % ([c for c in self.tracked],)
gevent.spawn_later(5, self.log_status)
def _poll(self):
"""
Python version of the polling forwarder. With logging,
mostly used for debugging.
"""
self.pull = self._ctx.socket(zmq.PULL)
self.pub = self._ctx.socket(zmq.PUB)
self.pull.bind(self.pull_socket)
self.pub.bind(self.pub_socket)
self.associated.extend([self.pull, self.pub])
self.router = self._ctx.socket(zmq.ROUTER)
self.router.bind(self.route_socket)
self.associated.extend([self.pub, self.router])
# Spin the coroutines
gevent.spawn(self.log_status)
gevent.spawn_raw(self.recv_hearts_router)
while self.polling:
self.beat()
gevent.sleep(self.period)
try:
msg = self.pull.recv()
self.pub.send(msg)
@@ -126,16 +246,146 @@ class Controller(object):
# that we don't loose messages, ever
if self.logging:
self.logging.error(str(e))
self.failed += 1
continue
# After loop exits
self.terminated = True
def beat(self):
toc = time.time()
self.ctime += toc - self.tic
self.tic = toc
self.message = str(self.ctime)
# These the set overloaded operations
# A & B ~ set.intersection
# A - B ~ set.difference
# * good - Components we are currently tracking and who just sent
# us back the right response.
# * bad - Components we are currently tracking but who did not
# send us back a response.
# * new - Components we haven't heard from yet, but sent back the
# right response.
good = self.tracked & self.responses
bad = self.tracked - good
new = self.responses - good
for component in new:
self.new(component)
for component in bad:
self.fail(component)
# Reset the responses for this cycle
self.responses = set()
gevent.spawn_raw(self.pub.send, str(self.ctime))
# ------------------
# Component Handlers
# ------------------
# The various "states of being that a component can inform us
# of
def new(self, component):
print '[Controller] Tracking "%s" ' % component
if component in self.topology or self.freeform:
self.tracked.add(component)
else:
# Some sort of socket collision has occured, this is
# a very bad failure mode.
raise UnknownChatter(component)
def fail(self, component):
print '[Controller] Component "%s" timed out' % component
self.tracked.remove(component)
def done(self, component):
# TODO: This will be what we ship off to vbench at some
# point...
# print component finished at self.ctime
pass
def exception(self, component, failure):
pass
# --------
# IO Loops
# --------
def recv_hearts(self):
buffer = []
while True:
message = self.router.recv(zmq.NOBLOCK, copy=False)
if message:
buffer.append(message)
if not self.router.rcvmore():
break
gevent.spawn_later(0.001, self.handle_pong, buffer)
def recv_hearts_router(self):
buffer = []
while True:
message = self.router.recv()
if message:
buffer.append(message)
if not self.router.getsockopt(zmq.RCVMORE):
gevent.spawn_raw(self.handle_recv, buffer[:])
buffer = []
# -----------------
# Protocol Handling
# -----------------
def handle_recv(self, msg):
"""
Check for proper framing at the transport layer.
Seperates the proper frames from anything else that might
be coming over the wire. Which shouldn't happen ... right?
"""
identity = msg[0]
id, status = CONTROL_UNFRAME(msg[1])
# A component is telling us its alive:
if id is CONTROL_PROTOCOL.OK:
if status == str(self.ctime):
self.responses.add(identity)
else:
# Otherwise its something weird and we don't know
# what to do so just say so
print "Weird stuff happened: %s" % msg[1]
# A component is telling us it failed, and how
if id is CONTROL_PROTOCOL.EXCEPTION:
self.exception(identity, status)
# A component is telling us its done with work and won't
# be talking to us anymore
if id is CONTROL_PROTOCOL.DONE:
self.done(identity, status)
# -------------------
# Hooks for Endpoints
# -------------------
def message_sender(self, context=None):
# These are all connects so no complex allocation logic is
# needed. Dealers and Subscribers can all come and go as a
# function of time without impacting flow of the whole
# system.
def message_sender(self, identity, context = None):
"""
Spin off a socket used for sending messages to this
controller.
@@ -144,12 +394,14 @@ class Controller(object):
if not context:
context = zmq.Context.instance()
s = context.socket(zmq.PUSH)
s.connect(self.push_socket)
s = context.socket(zmq.DEALER)
s.connect(self.route_socket)
s.setsockopt(zmq.IDENTITY, identity)
self.associated.append(s)
return s
def message_listener(self, context = None, filters=None):
def message_listener(self, context = None):
"""
Spin off a socket used for receiving messages from this
controller.
@@ -159,27 +411,35 @@ class Controller(object):
context = zmq.Context.instance()
s = context.socket(zmq.SUB)
s.connect(self.sub_socket)
s.setsockopt(zmq.SUBSCRIBE, filters or '')
s.connect(self.pub_socket)
s.setsockopt(zmq.SUBSCRIBE, '')
self.associated.append(s)
return s
def shutdown(self, context=None):
def shutdown(self, hard=False, soft=True, context=None):
self.polling = False
assert hard or soft, """ Must specify kill hard or soft """
if not context:
context = zmq.Context.instance()
#logging.info('Shutdown controller')
if hard:
self.state = CONTROL_STATES.SHUTDOWN
s = self.message_sender(context)
s.send(CONTROL_FRAME(
'controller',
CONTROL_PROTOCOL.SHUTDOWN,
))
print '[Controller] Hard Shutdown'
#for asoc in self.associated:
#asoc.close()
#for asoc in self.associated:
#asoc.close()
if soft:
self.state = CONTROL_STATES.TERMINATE
print '[Controller] Soft Shutdown'
#for asoc in self.associated:
#asoc.close()
def destroy(self):
"""
@@ -190,12 +450,8 @@ class Controller(object):
#def __del__(self):
#self.shutdown()
def qos(self):
if not self.debug:
return
return float(self.success) / (self.success + self.failed)
if __name__ == '__main__':
print 'Running on ',\
'tcp://127.0.0.1:5000', \
'tcp://127.0.0.1:5001',
@@ -204,4 +460,8 @@ if __name__ == '__main__':
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
controller.run()
controller.manage(
'freeform',
[]
)
controller.run('green')
+11 -6
View File
@@ -134,10 +134,14 @@ from protocol_utils import Enum, FrameExceptionFactory, namedict
INVALID_CONTROL_FRAME = FrameExceptionFactory('CONTROL')
CONTROL_STATES = Enum(
'RUNNING',
'SHUTDOWN', # a soft kill
'TERMINATE', # a hard kill
)
CONTROL_PROTOCOL = Enum(
'INIT' , # 0 - req
'INFO' , # 1 - req
'STATUS' , # 2 - req
'HEARTBEAT' , # 0 - req
'SHUTDOWN' , # 3 - req
'KILL' , # 4 - req
@@ -152,7 +156,10 @@ def CONTROL_FRAME(id, status):
return msgpack.dumps(tuple([id, status]))
def CONTORL_UNFRAME(msg):
def CONTROL_UNFRAME(msg):
"""
A status code and a message.
"""
assert isinstance(msg, basestring)
try:
@@ -165,8 +172,6 @@ def CONTORL_UNFRAME(msg):
raise INVALID_CONTROL_FRAME(msg)
except ValueError:
raise INVALID_CONTROL_FRAME(msg)
#except AssertionError:
#raise INVALID_CONTROL_FRAME(msg)
# -----------------------
# Heartbeat Protocol
+4 -1
View File
@@ -41,7 +41,10 @@ class Simulator(ComponentHost):
return 'Simple Simulator'
def launch_controller(self):
thread = threading.Thread(target=self.controller.run)
thread = threading.Thread(
target=self.controller.run,
args=('thread',)
)
thread.start()
self.subthreads.append(thread)
-78
View File
@@ -1,78 +0,0 @@
"""
Dummy simulator backported from Qexec for development on Zipline.
"""
import logging
from multiprocessing import Process
from unittest2 import TestCase
from zipline.monitor import Controller
import gevent
from gevent_zeromq import zmq
ctx = zmq.Context()
#TODO: disabled by prefixing the test methods with a d
class TestControlProtocol(TestCase):
def setUpController(self):
self.controller.run()
def setUp(self):
self.controller = Controller(
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
self.control_proc = Process(target=self.setUpController)
self.control_proc.start()
def tearDown(self):
self.control_proc.terminate()
ctx.destroy()
def asyncMessage(self, socket):
return socket.recv()
def send_and_receive(self, push, sub, message='\x01'):
msg = gevent.spawn(sub.recv)
push.send(message)
gevent.sleep(0) # explicit gevent yield
msg.join()
self.assertEqual(msg.value, message)
def dtest_control_message(self):
sub = self.controller.message_listener(context=ctx)
message = gevent.spawn(self.asyncMessage, sub)
push = self.controller.message_sender(context=ctx)
# Don't like introducing time constants but because of
# the way gevent scheduler works zmq will often send all
# the messages off before the other thread even gets to
# listening.
self.send_and_receive(push, sub)
sub.close()
push.close()
def dtest_control_delivery(self):
# Assert that the number of messages sent on the wire is
# the number of messages received, ie we don't drop any.
# This is of course depenendent on the topology of the
# listeners being fixed. Which normally it isn't.
sub = self.controller.message_listener(context=ctx)
message = gevent.spawn(self.asyncMessage, sub)
push = self.controller.message_sender(context=ctx)
# Don't like introducing time constants but because of
# the way gevent scheduler works zmq will often send all
# the messages off before the other thread even gets to
# listening.
for i in xrange(25):
self.send_and_receive(push, sub)
sub.close()
push.close()