mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 23:23:34 +08:00
Integrate controller into do_work.
This commit is contained in:
+42
-3
@@ -8,7 +8,7 @@ import zipline.util as qutil
|
||||
from zipline.component import Component
|
||||
import zipline.protocol as zp
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
COMPONENT_STATE
|
||||
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
|
||||
|
||||
class ComponentHost(Component):
|
||||
"""
|
||||
@@ -212,8 +212,29 @@ class ParallelBuffer(Component):
|
||||
# wait for synchronization reply from the host
|
||||
socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
|
||||
|
||||
# TODO: Abstract this out, maybe on base component
|
||||
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
|
||||
msg = self.control_in.recv()
|
||||
event, payload = CONTROL_UNFRAME(msg)
|
||||
|
||||
# -- Heartbeat --
|
||||
if event == CONTROL_PROTOCOL.HEARTBEAT:
|
||||
# Heart outgoing
|
||||
heartbeat_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.OK,
|
||||
payload
|
||||
)
|
||||
self.control_out.send(heartbeat_frame)
|
||||
|
||||
# -- Soft Kill --
|
||||
elif event == CONTROL_PROTOCOL.SHUTDOWN:
|
||||
self.done()
|
||||
self.shutdown()
|
||||
|
||||
# -- Hard Kill --
|
||||
elif event == CONTROL_PROTOCOL.KILL:
|
||||
self.kill()
|
||||
|
||||
|
||||
if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN:
|
||||
message = self.pull_socket.recv()
|
||||
@@ -241,13 +262,11 @@ class ParallelBuffer(Component):
|
||||
except zp.INVALID_DATASOURCE_FRAME as exc:
|
||||
return self.signal_exception(exc)
|
||||
|
||||
#
|
||||
def unframe(self, msg):
|
||||
return zp.DATASOURCE_UNFRAME(msg)
|
||||
|
||||
def frame(self, event):
|
||||
return zp.FEED_FRAME(event)
|
||||
|
||||
|
||||
# -------------
|
||||
# Flow Control
|
||||
@@ -464,8 +483,28 @@ class BaseTransform(Component):
|
||||
"""
|
||||
socks = dict(self.poll.poll(self.heartbeat_timeout))
|
||||
|
||||
# TODO: Abstract this out, maybe on base component
|
||||
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
|
||||
msg = self.control_in.recv()
|
||||
event, payload = CONTROL_UNFRAME(msg)
|
||||
|
||||
# -- Heartbeat --
|
||||
if event == CONTROL_PROTOCOL.HEARTBEAT:
|
||||
# Heart outgoing
|
||||
heartbeat_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.OK,
|
||||
payload
|
||||
)
|
||||
self.control_out.send(heartbeat_frame)
|
||||
|
||||
# -- Soft Kill --
|
||||
elif event == CONTROL_PROTOCOL.SHUTDOWN:
|
||||
self.done()
|
||||
self.shutdown()
|
||||
|
||||
# -- Hard Kill --
|
||||
elif event == CONTROL_PROTOCOL.KILL:
|
||||
self.kill()
|
||||
|
||||
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
|
||||
message = self.feed_socket.recv()
|
||||
|
||||
+29
-5
@@ -145,7 +145,7 @@ class Controller(object):
|
||||
"""
|
||||
|
||||
debug = False
|
||||
period = 5
|
||||
period = 1
|
||||
|
||||
def __init__(self, pub_socket, route_socket, logging = None):
|
||||
|
||||
@@ -237,6 +237,30 @@ class Controller(object):
|
||||
|
||||
def log_status(self):
|
||||
self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],))
|
||||
# -------------
|
||||
# Publications
|
||||
# -------------
|
||||
|
||||
def send_heart(self):
|
||||
heartbeat_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.HEARTBEAT,
|
||||
str(self.ctime)
|
||||
)
|
||||
self.pub.send(heartbeat_frame)
|
||||
|
||||
def send_hardkill(self):
|
||||
kill_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.KILL,
|
||||
None
|
||||
)
|
||||
self.pub.send(kill_frame)
|
||||
|
||||
def send_softkill(self):
|
||||
soft_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.KILL,
|
||||
None
|
||||
)
|
||||
self.pub.send(soft_frame)
|
||||
|
||||
# -----------
|
||||
# Event Loops
|
||||
@@ -262,7 +286,7 @@ class Controller(object):
|
||||
self.responses = set()
|
||||
|
||||
self.ctime = time.time()
|
||||
self.pub.send(str(self.ctime))
|
||||
self.send_heart()
|
||||
|
||||
while self.polling:
|
||||
# Reset the responses for this cycle
|
||||
@@ -345,10 +369,10 @@ class Controller(object):
|
||||
# TODO: This will be what we ship off to vbench at some
|
||||
# point...
|
||||
# print component finished at self.ctime
|
||||
pass
|
||||
self.logging.info('[Controller] Component "%s" done.' % component)
|
||||
|
||||
def exception(self, component, failure):
|
||||
pass
|
||||
self.logging.error('Component "%s"in exception state' % component)
|
||||
|
||||
# -----------------
|
||||
# Protocol Handling
|
||||
@@ -401,8 +425,8 @@ class Controller(object):
|
||||
context = self.zmq.Context.instance()
|
||||
|
||||
s = context.socket(zmq.DEALER)
|
||||
s.connect(self.route_socket)
|
||||
s.setsockopt(zmq.IDENTITY, identity)
|
||||
s.connect(self.route_socket)
|
||||
|
||||
self.associated.append(s)
|
||||
return s
|
||||
|
||||
+1
-1
@@ -154,7 +154,7 @@ def CONTROL_FRAME(event, payload):
|
||||
assert isinstance(event, int,)
|
||||
assert isinstance(payload, basestring)
|
||||
|
||||
return msgpack.dumps(tuple([id, status]))
|
||||
return msgpack.dumps(tuple([event, payload]))
|
||||
|
||||
def CONTROL_UNFRAME(msg):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user