diff --git a/zipline/component.py b/zipline/component.py index cfdc8781..9acf2cc2 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -201,28 +201,25 @@ class Component(object): debug since it mucks up your stacktraces. """ - fail = None - if catch_exceptions: try: self._run() except Exception as exc: - # TODO, we want to do this grab the stack - # frame so we can preserve stacktraces when we - # reraise the exception. + exc_info = sys.exc_info() self.signal_exception(exc) - fail = exc + + # Reraise the exception + raise exc_info[0], exc_info[1], exc_info[2] finally: self.shutdown() self.teardown_sockets() - else: - try: - self._run() - finally: - self.shutdown() - self.teardown_sockets() - + #else: + #try: + #self._run() + #finally: + #self.shutdown() + #self.teardown_sockets() def working(self): """ @@ -234,7 +231,7 @@ class Component(object): """ return (not self.done) - def loop(self): + def loop(self, lockstep=True): """ Loop to do work while we still have work to do. """ @@ -294,6 +291,12 @@ class Component(object): # ---------------------- def signal_exception(self, exc=None, scope=None): + """ + This is *very* important error tracking handler. + + Will inform the system that the component has failed and + how it has failed. + """ if scope == 'algo': self.error_state = COMPONENT_FAILURE.ALGOEXCEPT @@ -427,8 +430,14 @@ class Component(object): if not self.controller: return - self.control_out = self.controller.message_sender(context=self.context) - self.control_in = self.controller.message_listener(context=self.context) + self.control_out = self.controller.message_sender( + identity = self.get_id, + context = self.context, + ) + + self.control_in = self.controller.message_listener( + context = self.context + ) self.poll.register(self.control_in, self.zmq.POLLIN) self.sockets.extend([self.control_in, self.control_out]) diff --git a/zipline/lines.py b/zipline/lines.py index 2bc32aa2..2123cfdf 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -149,6 +149,11 @@ class SimulatedTrading(object): sockets[7], logging = qutil.LOGGER ) + + # TODO: Not freeform + self.con.manage( + 'freeform' + ) self.started = False diff --git a/zipline/messaging.py b/zipline/messaging.py index 5e6ff570..552c1115 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -128,7 +128,7 @@ class ComponentHost(Component): return False - def loop(self): + def loop(self, lockstep=True): while not self.is_timed_out(): # wait for synchronization request diff --git a/zipline/monitor.py b/zipline/monitor.py index 5db2ea54..2e0f2066 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -1,29 +1,131 @@ -import zmq -from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME +import time +import gevent + +from gevent_zeromq import zmq +import util as qutil + +from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ + CONTROL_UNFRAME, CONTROL_STATES + +# TODO: print -> qutil.LOGGER.info + +# When you're cold and waiting for the train... draw ascii art! +# +# Roll Call ( Discovery ) +# ----------------------- +# +# Controller ( 'foo', 'bar', 'fizz', 'pop' ) +# ------------------ +# | | | | +# +---+ +# | 0 | ? ? ? +# +---+ +# | +# IDENTITY: foo +# get message: PROTOCOL.HEARTBEAT +# reply with PROTOCOL.OK +# +# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' ) +# 'foo' in topology = YES -> +# track 'foo' +# ------------------ +# | | | | +# +---+ +# | 1 | ? ? ? +# +---+ + +# Heartbeating +# ------------ +# +# Controller ( time = 2.717828 ) +# ------------------ +# | | | | +# +---+ +---+ +---+ +---+ +# | 0 | | 0 | | 0 | | 0 | +# +---+ +---+ +---+ +---+ +# | +# IDENTITY: foo +# get message: time = 2.717828 +# reply with [ foo, 2.71828 ] +# +# Controller ( foo.status = OK ) +# ------------------ +# | | | | +# +---+ +---+ +---+ +---+ +# | 1 | | 0 | | 0 | | 0 | +# +---+ +---+ +---+ +---+ +# | +# Controller tracks this node as good +# for this heartbeat + +# Shutdown +# -------- +# +# Controller ( state = RUNNING ) +# ------------------ +# | | | | +# +---+ +---+ +---+ +---+ +# | 1 | | 1 | | 1 | | 1 | +# +---+ +---+ +---+ +---+ +# | +# IDENTITY: foo +# send [ DONE ] + +# Controller ( state = SHUTDOWN ) +# Controller topology.remove('foo') +# ------------------ +# | | | +# +---+ +---+ +---+ +---+ +# | | | 1 | | 1 | | 1 | +# +---+ +---+ +---+ +---+ +# | +# IDENTITY: foo +# yield, stop sending messages + +# Termination +# ------------ +# +# Controller ( state = TERMINATE ) +# ------------------ +# | | | | +# +---+ +---+ +---+ +---+ +# | 1 | | 1 | | 1 | | 1 | +# +---+ +---+ +---+ +---+ +# | +# get message PROTOCOL.KILL + +# Controller ( state = TERMINATE ) +# ------------------ +# | | | | +# +---+ +---+ +---+ +---+ +# | 0 | | 0 | | 0 | | 0 | +# +---+ +---+ +---+ +---+ + +class UnknownChatter(Exception): + def __init__(self, name): + self.named = name + def __str__(self): + return """Component calling itself "%s" talking on unexpected channel"""\ + % self.named class Controller(object): """ A N to M messaging system for inter component communication. - Ostensibly a broker of sorts. Putting messages to the broker - is durable, if the broker goes down messages will queue up - until the HWM and then go out when the new broker comes up. - The other end is not durable, it is simply PUB/SUB which has - the benefit of of allowing more fluid time evolution of the - whole system since the messaging passing topology will not - alter itself as a result of more nodes listening. - - The actual brokerin' is either a Python loop ( slow ) or a - zmq.FORWARDER device ( fast ). - - :param pull_socket: Socket to subscribe to for republication of - published messages. The endpoint for - :func message_sender: . :param pub_socket: Socket to publish messages, the starting point of :func message_listener: . + + :param route_socket: Socket to listen for status updates for + the individual components. + :func message_sender: . + :param logging: Logging interface for tracking broker state Defaults to None + Topology is the set of components we expect to show up. + States are the transitions the sytems go through. The + simplest is from RUNNING -> NOT RUNNING . + Usage:: controller = Controller( @@ -33,31 +135,34 @@ class Controller(object): # typically you'd want to run this async to your main # program since it blocks indefinetely. - controller.run() - - - sub = self.controller.message_listener() - push = self.controller.message_sender() - - push.send('DIE') - sub.recv() + controller.manage( + [ TOPOLOGY ] + [ STATES ] + ) """ debug = False + period = 1 - def __init__(self, pull_socket, pub_socket, logging = None): + def __init__(self, pub_socket, route_socket, logging = None): self._ctx = None polling = False + self.polling = polling + + self.tracked = set() + self.responses = set() + + self.ctime = 0 + self.tic = time.time() + self.freeform = False + self.associated = [] - self.pull_socket = pull_socket - self.push_socket = pull_socket # same port self.pub_socket = pub_socket - self.sub_socket = pub_socket # same port - self.terminated = False + self.route_socket = route_socket if logging: self.logging = logging @@ -66,52 +171,67 @@ class Controller(object): self.logging = False self.dologging = False - self.success = 0 - self.failed = 0 + def manage(self, topology, states=None, context=None): + """ + Give the controller a set set of components to manage and + a set of state transitions for the entire system. + """ - def run(self, debug=False, context=None): - """ - Run's the loop for the broker. - """ + # A freeform topology is where we heartbeat with anything + # that shows up. + if topology == 'freeform': + self.freeform = True + self.topology = frozenset([]) + else: + self.freeform = False + self.topology = frozenset(topology) + + default_states = [ + CONTROL_STATES.RUNNING, + CONTROL_STATES.SHUTDOWN, + CONTROL_STATES.TERMINATE, + ] + + self.states = states or default_states self.polling = True + # Start off in RUNNING, state + self.state = self.states[0] + if not context: self._ctx = zmq.Context.instance() else: self._ctx = context - #if not debug: - #return self._poll_fast() # the c loop - #else: + def run(self, flavor): + """ + Abstracts + """ + assert flavor in ['thread', 'mp', 'green'] return self._poll() # use a python loop - def _poll_fast(self): - """ - C version of the polling forwarder. - """ - self.pull = self._ctx.socket(zmq.PULL) - self.pub = self._ctx.socket(zmq.PUB) - - self.pull.bind(self.pull_socket) - self.pub.bind(self.pub_socket) - - zmq.device(zmq.FORWARDER, self.pull, self.pub) + def log_status(self): + print "[Controller] Tracking : %s" % ([c for c in self.tracked],) + gevent.spawn_later(5, self.log_status) def _poll(self): - """ - Python version of the polling forwarder. With logging, - mostly used for debugging. - """ - self.pull = self._ctx.socket(zmq.PULL) self.pub = self._ctx.socket(zmq.PUB) - - self.pull.bind(self.pull_socket) self.pub.bind(self.pub_socket) - self.associated.extend([self.pull, self.pub]) + self.router = self._ctx.socket(zmq.ROUTER) + self.router.bind(self.route_socket) + + self.associated.extend([self.pub, self.router]) + + # Spin the coroutines + gevent.spawn(self.log_status) + gevent.spawn_raw(self.recv_hearts_router) while self.polling: + self.beat() + gevent.sleep(self.period) + try: msg = self.pull.recv() self.pub.send(msg) @@ -126,16 +246,146 @@ class Controller(object): # that we don't loose messages, ever if self.logging: self.logging.error(str(e)) - self.failed += 1 continue + # After loop exits self.terminated = True + def beat(self): + toc = time.time() + + self.ctime += toc - self.tic + self.tic = toc + self.message = str(self.ctime) + + # These the set overloaded operations + # A & B ~ set.intersection + # A - B ~ set.difference + + # * good - Components we are currently tracking and who just sent + # us back the right response. + # * bad - Components we are currently tracking but who did not + # send us back a response. + # * new - Components we haven't heard from yet, but sent back the + # right response. + + good = self.tracked & self.responses + bad = self.tracked - good + new = self.responses - good + + for component in new: + self.new(component) + + for component in bad: + self.fail(component) + + # Reset the responses for this cycle + self.responses = set() + + gevent.spawn_raw(self.pub.send, str(self.ctime)) + + # ------------------ + # Component Handlers + # ------------------ + + # The various "states of being that a component can inform us + # of + def new(self, component): + print '[Controller] Tracking "%s" ' % component + + if component in self.topology or self.freeform: + self.tracked.add(component) + else: + # Some sort of socket collision has occured, this is + # a very bad failure mode. + raise UnknownChatter(component) + + def fail(self, component): + print '[Controller] Component "%s" timed out' % component + self.tracked.remove(component) + + def done(self, component): + # TODO: This will be what we ship off to vbench at some + # point... + # print component finished at self.ctime + pass + + def exception(self, component, failure): + pass + + # -------- + # IO Loops + # -------- + + def recv_hearts(self): + buffer = [] + + while True: + message = self.router.recv(zmq.NOBLOCK, copy=False) + + if message: + buffer.append(message) + if not self.router.rcvmore(): + break + + gevent.spawn_later(0.001, self.handle_pong, buffer) + + def recv_hearts_router(self): + buffer = [] + + while True: + message = self.router.recv() + + if message: + buffer.append(message) + + if not self.router.getsockopt(zmq.RCVMORE): + gevent.spawn_raw(self.handle_recv, buffer[:]) + buffer = [] + + # ----------------- + # Protocol Handling + # ----------------- + + def handle_recv(self, msg): + """ + Check for proper framing at the transport layer. + Seperates the proper frames from anything else that might + be coming over the wire. Which shouldn't happen ... right? + """ + identity = msg[0] + + id, status = CONTROL_UNFRAME(msg[1]) + + # A component is telling us its alive: + if id is CONTROL_PROTOCOL.OK: + + if status == str(self.ctime): + self.responses.add(identity) + else: + # Otherwise its something weird and we don't know + # what to do so just say so + print "Weird stuff happened: %s" % msg[1] + + # A component is telling us it failed, and how + if id is CONTROL_PROTOCOL.EXCEPTION: + self.exception(identity, status) + + # A component is telling us its done with work and won't + # be talking to us anymore + if id is CONTROL_PROTOCOL.DONE: + self.done(identity, status) + # ------------------- # Hooks for Endpoints # ------------------- - def message_sender(self, context=None): + # These are all connects so no complex allocation logic is + # needed. Dealers and Subscribers can all come and go as a + # function of time without impacting flow of the whole + # system. + + def message_sender(self, identity, context = None): """ Spin off a socket used for sending messages to this controller. @@ -144,12 +394,14 @@ class Controller(object): if not context: context = zmq.Context.instance() - s = context.socket(zmq.PUSH) - s.connect(self.push_socket) + s = context.socket(zmq.DEALER) + s.connect(self.route_socket) + s.setsockopt(zmq.IDENTITY, identity) + self.associated.append(s) return s - def message_listener(self, context = None, filters=None): + def message_listener(self, context = None): """ Spin off a socket used for receiving messages from this controller. @@ -159,27 +411,35 @@ class Controller(object): context = zmq.Context.instance() s = context.socket(zmq.SUB) - s.connect(self.sub_socket) - s.setsockopt(zmq.SUBSCRIBE, filters or '') + s.connect(self.pub_socket) + s.setsockopt(zmq.SUBSCRIBE, '') + self.associated.append(s) return s - def shutdown(self, context=None): + def shutdown(self, hard=False, soft=True, context=None): self.polling = False + assert hard or soft, """ Must specify kill hard or soft """ + if not context: context = zmq.Context.instance() - #logging.info('Shutdown controller') + if hard: + self.state = CONTROL_STATES.SHUTDOWN - s = self.message_sender(context) - s.send(CONTROL_FRAME( - 'controller', - CONTROL_PROTOCOL.SHUTDOWN, - )) + print '[Controller] Hard Shutdown' - #for asoc in self.associated: - #asoc.close() + #for asoc in self.associated: + #asoc.close() + + if soft: + self.state = CONTROL_STATES.TERMINATE + + print '[Controller] Soft Shutdown' + + #for asoc in self.associated: + #asoc.close() def destroy(self): """ @@ -190,12 +450,8 @@ class Controller(object): #def __del__(self): #self.shutdown() - def qos(self): - if not self.debug: - return - return float(self.success) / (self.success + self.failed) - if __name__ == '__main__': + print 'Running on ',\ 'tcp://127.0.0.1:5000', \ 'tcp://127.0.0.1:5001', @@ -204,4 +460,8 @@ if __name__ == '__main__': 'tcp://127.0.0.1:5000', 'tcp://127.0.0.1:5001', ) - controller.run() + controller.manage( + 'freeform', + [] + ) + controller.run('green') diff --git a/zipline/protocol.py b/zipline/protocol.py index f8cd1435..a1341179 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -134,10 +134,14 @@ from protocol_utils import Enum, FrameExceptionFactory, namedict INVALID_CONTROL_FRAME = FrameExceptionFactory('CONTROL') +CONTROL_STATES = Enum( + 'RUNNING', + 'SHUTDOWN', # a soft kill + 'TERMINATE', # a hard kill +) + CONTROL_PROTOCOL = Enum( - 'INIT' , # 0 - req - 'INFO' , # 1 - req - 'STATUS' , # 2 - req + 'HEARTBEAT' , # 0 - req 'SHUTDOWN' , # 3 - req 'KILL' , # 4 - req @@ -152,7 +156,10 @@ def CONTROL_FRAME(id, status): return msgpack.dumps(tuple([id, status])) -def CONTORL_UNFRAME(msg): +def CONTROL_UNFRAME(msg): + """ + A status code and a message. + """ assert isinstance(msg, basestring) try: @@ -165,8 +172,6 @@ def CONTORL_UNFRAME(msg): raise INVALID_CONTROL_FRAME(msg) except ValueError: raise INVALID_CONTROL_FRAME(msg) - #except AssertionError: - #raise INVALID_CONTROL_FRAME(msg) # ----------------------- # Heartbeat Protocol diff --git a/zipline/simulator.py b/zipline/simulator.py index 8d6480b5..8a3abb27 100644 --- a/zipline/simulator.py +++ b/zipline/simulator.py @@ -41,7 +41,10 @@ class Simulator(ComponentHost): return 'Simple Simulator' def launch_controller(self): - thread = threading.Thread(target=self.controller.run) + thread = threading.Thread( + target=self.controller.run, + args=('thread',) + ) thread.start() self.subthreads.append(thread) diff --git a/zipline/test/test_monitor.py b/zipline/test/test_monitor.py index 0fea158d..e69de29b 100644 --- a/zipline/test/test_monitor.py +++ b/zipline/test/test_monitor.py @@ -1,78 +0,0 @@ -""" -Dummy simulator backported from Qexec for development on Zipline. -""" - -import logging -from multiprocessing import Process -from unittest2 import TestCase - -from zipline.monitor import Controller - -import gevent -from gevent_zeromq import zmq - -ctx = zmq.Context() - -#TODO: disabled by prefixing the test methods with a d -class TestControlProtocol(TestCase): - - def setUpController(self): - self.controller.run() - - def setUp(self): - self.controller = Controller( - 'tcp://127.0.0.1:5000', - 'tcp://127.0.0.1:5001', - ) - self.control_proc = Process(target=self.setUpController) - self.control_proc.start() - - def tearDown(self): - self.control_proc.terminate() - ctx.destroy() - - def asyncMessage(self, socket): - return socket.recv() - - def send_and_receive(self, push, sub, message='\x01'): - msg = gevent.spawn(sub.recv) - push.send(message) - gevent.sleep(0) # explicit gevent yield - msg.join() - self.assertEqual(msg.value, message) - - def dtest_control_message(self): - - sub = self.controller.message_listener(context=ctx) - message = gevent.spawn(self.asyncMessage, sub) - - push = self.controller.message_sender(context=ctx) - - # Don't like introducing time constants but because of - # the way gevent scheduler works zmq will often send all - # the messages off before the other thread even gets to - # listening. - self.send_and_receive(push, sub) - sub.close() - push.close() - - def dtest_control_delivery(self): - # Assert that the number of messages sent on the wire is - # the number of messages received, ie we don't drop any. - # This is of course depenendent on the topology of the - # listeners being fixed. Which normally it isn't. - - sub = self.controller.message_listener(context=ctx) - message = gevent.spawn(self.asyncMessage, sub) - - push = self.controller.message_sender(context=ctx) - - # Don't like introducing time constants but because of - # the way gevent scheduler works zmq will often send all - # the messages off before the other thread even gets to - # listening. - for i in xrange(25): - self.send_and_receive(push, sub) - - sub.close() - push.close()