mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 11:52:58 +08:00
Cleanup, standardizing methods, more docstrings.
This commit is contained in:
+90
-40
@@ -7,7 +7,7 @@ import socket
|
||||
import humanhash
|
||||
|
||||
import zipline.util as qutil
|
||||
from zipline.protocol import CONTROL_PROTOCOL
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
|
||||
|
||||
class Component(object):
|
||||
|
||||
@@ -39,7 +39,10 @@ class Component(object):
|
||||
self.gevent_needed = False
|
||||
self.killed = False
|
||||
self.heartbeat_timeout = 2000
|
||||
self.state_flag = COMPONENT_STATE.OK # OK | DONE | EXCEPTION
|
||||
|
||||
# Humanhashes make this way easier to debug because they
|
||||
# stick in your mind unlike a 32 byte string of random hex.
|
||||
self.guid = uuid.uuid4()
|
||||
self.huid = humanhash.humanize(self.guid.hex)
|
||||
|
||||
@@ -47,10 +50,6 @@ class Component(object):
|
||||
# Core Methods
|
||||
# ------------
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def open(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -71,7 +70,7 @@ class Component(object):
|
||||
raise NotImplementedError
|
||||
|
||||
def _run(self):
|
||||
self.done = False
|
||||
self.done = False # TODO: use state flag
|
||||
self.sockets = []
|
||||
|
||||
if self.gevent_needed:
|
||||
@@ -95,19 +94,27 @@ class Component(object):
|
||||
sock.close()
|
||||
|
||||
def run(self, catch_exceptions=False):
|
||||
"""
|
||||
Run the component.
|
||||
|
||||
Optionally takes an argument to catch and log all exceptions raised
|
||||
during execution ues this with care since it makes it very hard to
|
||||
debug since it mucks up your stacktraces.
|
||||
"""
|
||||
|
||||
fail = None
|
||||
|
||||
# Catching all exceptions makes this really hard to
|
||||
# debug, is it with care.
|
||||
if catch_exceptions:
|
||||
try:
|
||||
self._run()
|
||||
except Exception as e:
|
||||
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
|
||||
fail = e
|
||||
except Exception as exc:
|
||||
# TODO, we want to do this grab the stack
|
||||
# frame so we can preserve stacktraces when we
|
||||
# reraise the exception.
|
||||
self.signal_exception(exc)
|
||||
fail = exc
|
||||
finally:
|
||||
if(self.context != None):
|
||||
if self.context:
|
||||
self.context.destroy()
|
||||
if fail:
|
||||
raise fail
|
||||
@@ -117,35 +124,61 @@ class Component(object):
|
||||
self.context.destroy()
|
||||
|
||||
def loop(self):
|
||||
while not self.done:
|
||||
"""
|
||||
Loop to do work while we still have work to do.
|
||||
"""
|
||||
while not self.done: # TODO: use state flag
|
||||
self.confirm()
|
||||
self.do_work()
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
# -----------
|
||||
|
||||
def signal_done(self):
|
||||
#notify down stream components that we're done
|
||||
if(self.out_socket != None):
|
||||
self.out_socket.send(str(CONTROL_PROTOCOL.DONE))
|
||||
#notify host we're done
|
||||
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
self.receive_sync_ack()
|
||||
#notify internal work look that we're done
|
||||
self.done = True
|
||||
|
||||
def confirm(self):
|
||||
# send a synchronization request to the host
|
||||
"""
|
||||
Send a synchronization request to the host.
|
||||
"""
|
||||
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":RUN")
|
||||
|
||||
self.receive_sync_ack() # blocking
|
||||
|
||||
# ----------------------
|
||||
# Internal Maintenance
|
||||
# ----------------------
|
||||
|
||||
def signal_exception(self, exc=None):
|
||||
self.state_flag = COMPONENT_STATE.EXCEPTION
|
||||
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
|
||||
|
||||
def signal_done(self):
|
||||
"""
|
||||
Notify down stream components that we're done.
|
||||
"""
|
||||
|
||||
self.state_flag = COMPONENT_STATE.DONE
|
||||
|
||||
if self.out_socket:
|
||||
self.out_socket.send(str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
#notify host we're done
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
self.receive_sync_ack()
|
||||
#notify internal work look that we're done
|
||||
self.done = True # TODO: use state flag
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
# -----------
|
||||
|
||||
def setup_poller(self):
|
||||
"""
|
||||
Setup the poller used for multiplexing the incoming data
|
||||
handling sockets.
|
||||
"""
|
||||
|
||||
self.poll = self.zmq.Poller()
|
||||
|
||||
def receive_sync_ack(self):
|
||||
"""
|
||||
Wait for synchronization reply from the host.
|
||||
@@ -217,17 +250,9 @@ class Component(object):
|
||||
|
||||
return sub_socket
|
||||
|
||||
def setup_poller(self):
|
||||
"""
|
||||
Setup the poller used for multiplexing the incoming data
|
||||
handling sockets.
|
||||
"""
|
||||
|
||||
self.poll = self.zmq.Poller()
|
||||
|
||||
def setup_control(self):
|
||||
"""
|
||||
Set up the control socket. Used to monitor the the
|
||||
Set up the control socket. Used to monitor the
|
||||
overall status of the simulation and to forcefully tear
|
||||
down the simulation in case of a failure.
|
||||
"""
|
||||
@@ -240,6 +265,10 @@ class Component(object):
|
||||
self.sockets.extend([self.control_in, self.control_out])
|
||||
|
||||
def setup_sync(self):
|
||||
"""
|
||||
Setup the sync socket and poller.
|
||||
"""
|
||||
|
||||
qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
|
||||
|
||||
self.sync_socket = self.context.socket(self.zmq.REQ)
|
||||
@@ -247,21 +276,42 @@ class Component(object):
|
||||
#self.sync_socket.setsockopt(self.zmq.LINGER,0)
|
||||
|
||||
# Explictly, a different poller for obvious reasons.
|
||||
# I'm not fond of having this poller init'd as a side
|
||||
# effect of a method call. Still thinking about where to
|
||||
# put it at the moment though...
|
||||
self.sync_poller = self.zmq.Poller()
|
||||
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
# ---------------------
|
||||
# Description and Debug
|
||||
# ---------------------
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
return 'UNKNOWN COMPONENT'
|
||||
|
||||
def debug(self):
|
||||
"""
|
||||
Debug information about the component.
|
||||
"""
|
||||
return (
|
||||
self.get_id ,
|
||||
self.huid ,
|
||||
socket.gethostname() ,
|
||||
os.getpid() ,
|
||||
hex(id(self)) ,
|
||||
self.sockets ,
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
Return a usefull string representation of the component
|
||||
to indicate its type, unique identifier, and computational
|
||||
context identifier name.
|
||||
"""
|
||||
|
||||
return "<{name} {uuid} at {host} {pid} {pointer}>".format(
|
||||
name = self.get_id ,
|
||||
uuid = self.huid ,
|
||||
|
||||
+12
-2
@@ -124,9 +124,9 @@ def CONTORL_UNFRAME(msg):
|
||||
#except AssertionError:
|
||||
#raise INVALID_CONTROL_FRAME(msg)
|
||||
|
||||
# ================
|
||||
# ==================
|
||||
# Heartbeat Protocol
|
||||
# ================
|
||||
# ==================
|
||||
|
||||
# These encode the msgpack equivelant of 1 and 2. The heartbeat
|
||||
# frame should only be 1 byte on the wire.
|
||||
@@ -135,3 +135,13 @@ HEARTBEAT_PROTOCOL = namedict({
|
||||
'REQ' : b'\x01',
|
||||
'REP' : b'\x02',
|
||||
})
|
||||
|
||||
# ==================
|
||||
# Component State
|
||||
# ==================
|
||||
|
||||
COMPONENT_STATE = Enum(
|
||||
'OK' , # 0
|
||||
'DONE' , # 1
|
||||
'EXCEPTION' , # 2
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user