diff --git a/zipline/component.py b/zipline/component.py index 3ffdc755..b644eb5a 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -7,7 +7,7 @@ import socket import humanhash import zipline.util as qutil -from zipline.protocol import CONTROL_PROTOCOL +from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE class Component(object): @@ -39,7 +39,10 @@ class Component(object): self.gevent_needed = False self.killed = False self.heartbeat_timeout = 2000 + self.state_flag = COMPONENT_STATE.OK # OK | DONE | EXCEPTION + # Humanhashes make this way easier to debug because they + # stick in your mind unlike a 32 byte string of random hex. self.guid = uuid.uuid4() self.huid = humanhash.humanize(self.guid.hex) @@ -47,10 +50,6 @@ class Component(object): # Core Methods # ------------ - @property - def get_id(self): - raise NotImplementedError - def open(self): raise NotImplementedError @@ -71,7 +70,7 @@ class Component(object): raise NotImplementedError def _run(self): - self.done = False + self.done = False # TODO: use state flag self.sockets = [] if self.gevent_needed: @@ -95,19 +94,27 @@ class Component(object): sock.close() def run(self, catch_exceptions=False): + """ + Run the component. + + Optionally takes an argument to catch and log all exceptions raised + during execution ues this with care since it makes it very hard to + debug since it mucks up your stacktraces. + """ fail = None - # Catching all exceptions makes this really hard to - # debug, is it with care. if catch_exceptions: try: self._run() - except Exception as e: - qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) - fail = e + except Exception as exc: + # TODO, we want to do this grab the stack + # frame so we can preserve stacktraces when we + # reraise the exception. + self.signal_exception(exc) + fail = exc finally: - if(self.context != None): + if self.context: self.context.destroy() if fail: raise fail @@ -117,35 +124,61 @@ class Component(object): self.context.destroy() def loop(self): - while not self.done: + """ + Loop to do work while we still have work to do. + """ + while not self.done: # TODO: use state flag self.confirm() self.do_work() - # ----------- - # Messaging - # ----------- - - def signal_done(self): - #notify down stream components that we're done - if(self.out_socket != None): - self.out_socket.send(str(CONTROL_PROTOCOL.DONE)) - #notify host we're done - - # TODO: proper framing - self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE)) - - self.receive_sync_ack() - #notify internal work look that we're done - self.done = True - def confirm(self): - # send a synchronization request to the host + """ + Send a synchronization request to the host. + """ # TODO: proper framing self.sync_socket.send(self.get_id + ":RUN") self.receive_sync_ack() # blocking + # ---------------------- + # Internal Maintenance + # ---------------------- + + def signal_exception(self, exc=None): + self.state_flag = COMPONENT_STATE.EXCEPTION + qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + + def signal_done(self): + """ + Notify down stream components that we're done. + """ + + self.state_flag = COMPONENT_STATE.DONE + + if self.out_socket: + self.out_socket.send(str(CONTROL_PROTOCOL.DONE)) + + #notify host we're done + # TODO: proper framing + self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE)) + + self.receive_sync_ack() + #notify internal work look that we're done + self.done = True # TODO: use state flag + + # ----------- + # Messaging + # ----------- + + def setup_poller(self): + """ + Setup the poller used for multiplexing the incoming data + handling sockets. + """ + + self.poll = self.zmq.Poller() + def receive_sync_ack(self): """ Wait for synchronization reply from the host. @@ -217,17 +250,9 @@ class Component(object): return sub_socket - def setup_poller(self): - """ - Setup the poller used for multiplexing the incoming data - handling sockets. - """ - - self.poll = self.zmq.Poller() - def setup_control(self): """ - Set up the control socket. Used to monitor the the + Set up the control socket. Used to monitor the overall status of the simulation and to forcefully tear down the simulation in case of a failure. """ @@ -240,6 +265,10 @@ class Component(object): self.sockets.extend([self.control_in, self.control_out]) def setup_sync(self): + """ + Setup the sync socket and poller. + """ + qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) self.sync_socket = self.context.socket(self.zmq.REQ) @@ -247,21 +276,42 @@ class Component(object): #self.sync_socket.setsockopt(self.zmq.LINGER,0) # Explictly, a different poller for obvious reasons. + # I'm not fond of having this poller init'd as a side + # effect of a method call. Still thinking about where to + # put it at the moment though... self.sync_poller = self.zmq.Poller() self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) self.sockets.append(self.sync_socket) + # --------------------- + # Description and Debug + # --------------------- + + @property + def get_id(self): + return 'UNKNOWN COMPONENT' + def debug(self): + """ + Debug information about the component. + """ return ( self.get_id , self.huid , socket.gethostname() , os.getpid() , hex(id(self)) , + self.sockets , ) def __repr__(self): + """ + Return a usefull string representation of the component + to indicate its type, unique identifier, and computational + context identifier name. + """ + return "<{name} {uuid} at {host} {pid} {pointer}>".format( name = self.get_id , uuid = self.huid , diff --git a/zipline/protocol.py b/zipline/protocol.py index ae11af04..7bde3fdf 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -124,9 +124,9 @@ def CONTORL_UNFRAME(msg): #except AssertionError: #raise INVALID_CONTROL_FRAME(msg) -# ================ +# ================== # Heartbeat Protocol -# ================ +# ================== # These encode the msgpack equivelant of 1 and 2. The heartbeat # frame should only be 1 byte on the wire. @@ -135,3 +135,13 @@ HEARTBEAT_PROTOCOL = namedict({ 'REQ' : b'\x01', 'REP' : b'\x02', }) + +# ================== +# Component State +# ================== + +COMPONENT_STATE = Enum( + 'OK' , # 0 + 'DONE' , # 1 + 'EXCEPTION' , # 2 +)