From 3d8e355cfaf60857ff6d6e8ce670a4ccc70ab89f Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Wed, 4 Apr 2012 11:42:58 -0400 Subject: [PATCH] Integrate controller into do_work. --- zipline/messaging.py | 45 +++++++++++++++++++++++++++++++++++++++++--- zipline/monitor.py | 34 ++++++++++++++++++++++++++++----- zipline/protocol.py | 2 +- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/zipline/messaging.py b/zipline/messaging.py index 1bb80dcf..10b45d90 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -8,7 +8,7 @@ import zipline.util as qutil from zipline.component import Component import zipline.protocol as zp from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ - COMPONENT_STATE + COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME class ComponentHost(Component): """ @@ -212,8 +212,29 @@ class ParallelBuffer(Component): # wait for synchronization reply from the host socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds. + # TODO: Abstract this out, maybe on base component if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: msg = self.control_in.recv() + event, payload = CONTROL_UNFRAME(msg) + + # -- Heartbeat -- + if event == CONTROL_PROTOCOL.HEARTBEAT: + # Heart outgoing + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + payload + ) + self.control_out.send(heartbeat_frame) + + # -- Soft Kill -- + elif event == CONTROL_PROTOCOL.SHUTDOWN: + self.done() + self.shutdown() + + # -- Hard Kill -- + elif event == CONTROL_PROTOCOL.KILL: + self.kill() + if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN: message = self.pull_socket.recv() @@ -241,13 +262,11 @@ class ParallelBuffer(Component): except zp.INVALID_DATASOURCE_FRAME as exc: return self.signal_exception(exc) - # def unframe(self, msg): return zp.DATASOURCE_UNFRAME(msg) def frame(self, event): return zp.FEED_FRAME(event) - # ------------- # Flow Control @@ -464,8 +483,28 @@ class BaseTransform(Component): """ socks = dict(self.poll.poll(self.heartbeat_timeout)) + # TODO: Abstract this out, maybe on base component if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: msg = self.control_in.recv() + event, payload = CONTROL_UNFRAME(msg) + + # -- Heartbeat -- + if event == CONTROL_PROTOCOL.HEARTBEAT: + # Heart outgoing + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + payload + ) + self.control_out.send(heartbeat_frame) + + # -- Soft Kill -- + elif event == CONTROL_PROTOCOL.SHUTDOWN: + self.done() + self.shutdown() + + # -- Hard Kill -- + elif event == CONTROL_PROTOCOL.KILL: + self.kill() if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: message = self.feed_socket.recv() diff --git a/zipline/monitor.py b/zipline/monitor.py index dc258ca2..6edd0286 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -145,7 +145,7 @@ class Controller(object): """ debug = False - period = 5 + period = 1 def __init__(self, pub_socket, route_socket, logging = None): @@ -237,6 +237,30 @@ class Controller(object): def log_status(self): self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],)) + # ------------- + # Publications + # ------------- + + def send_heart(self): + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.HEARTBEAT, + str(self.ctime) + ) + self.pub.send(heartbeat_frame) + + def send_hardkill(self): + kill_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.KILL, + None + ) + self.pub.send(kill_frame) + + def send_softkill(self): + soft_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.KILL, + None + ) + self.pub.send(soft_frame) # ----------- # Event Loops @@ -262,7 +286,7 @@ class Controller(object): self.responses = set() self.ctime = time.time() - self.pub.send(str(self.ctime)) + self.send_heart() while self.polling: # Reset the responses for this cycle @@ -345,10 +369,10 @@ class Controller(object): # TODO: This will be what we ship off to vbench at some # point... # print component finished at self.ctime - pass + self.logging.info('[Controller] Component "%s" done.' % component) def exception(self, component, failure): - pass + self.logging.error('Component "%s"in exception state' % component) # ----------------- # Protocol Handling @@ -401,8 +425,8 @@ class Controller(object): context = self.zmq.Context.instance() s = context.socket(zmq.DEALER) - s.connect(self.route_socket) s.setsockopt(zmq.IDENTITY, identity) + s.connect(self.route_socket) self.associated.append(s) return s diff --git a/zipline/protocol.py b/zipline/protocol.py index 4730e8ff..133e7260 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -154,7 +154,7 @@ def CONTROL_FRAME(event, payload): assert isinstance(event, int,) assert isinstance(payload, basestring) - return msgpack.dumps(tuple([id, status])) + return msgpack.dumps(tuple([event, payload])) def CONTROL_UNFRAME(msg): """