mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 05:15:44 +08:00
Done & Exception tracking through sockets -> Controller
This commit is contained in:
+22
-3
@@ -24,7 +24,7 @@ from datetime import datetime
|
||||
import zipline.util as qutil
|
||||
from zipline.gpoll import _Poller as GeventPoller
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
|
||||
COMPONENT_FAILURE, BACKTEST_STATE
|
||||
COMPONENT_FAILURE, BACKTEST_STATE, CONTROL_FRAME
|
||||
|
||||
|
||||
class Component(object):
|
||||
@@ -313,6 +313,12 @@ class Component(object):
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
self.stack_trace = exc_traceback
|
||||
|
||||
exception_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.EXCEPTION,
|
||||
str(exc)
|
||||
)
|
||||
self.control_out.send(exception_frame)
|
||||
|
||||
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
|
||||
|
||||
def signal_done(self):
|
||||
@@ -329,10 +335,19 @@ class Component(object):
|
||||
# TODO: proper framing
|
||||
self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE))
|
||||
|
||||
#notify controller we're done
|
||||
done_frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.DONE,
|
||||
''
|
||||
)
|
||||
self.control_out.send(done_frame)
|
||||
|
||||
self.receive_sync_ack()
|
||||
#notify internal work look that we're done
|
||||
self.done = True # TODO: use state flag
|
||||
|
||||
qutil.LOGGER.info("[%s] DONE" % self.get_id)
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
# -----------
|
||||
@@ -350,13 +365,15 @@ class Component(object):
|
||||
def receive_sync_ack(self):
|
||||
"""
|
||||
Wait for synchronization reply from the host.
|
||||
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
socks = dict(self.sync_poller.poll(self.heartbeat_timeout))
|
||||
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
|
||||
message = self.sync_socket.recv()
|
||||
else:
|
||||
raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id))
|
||||
#else:
|
||||
#raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id))
|
||||
|
||||
def bind_data(self):
|
||||
return self.bind_pull_socket(self.addresses['data_address'])
|
||||
@@ -445,6 +462,8 @@ class Component(object):
|
||||
def setup_sync(self):
|
||||
"""
|
||||
Setup the sync socket and poller. ( Connect )
|
||||
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
|
||||
|
||||
@@ -3,6 +3,7 @@ import pytz
|
||||
import math
|
||||
import pandas
|
||||
|
||||
# from gevent.select import select
|
||||
from zmq.core.poll import select
|
||||
|
||||
import zipline.messaging as qmsg
|
||||
@@ -200,6 +201,9 @@ class OrderDataSource(qmsg.DataSource):
|
||||
#pull all orders from client.
|
||||
orders = []
|
||||
count = 0
|
||||
|
||||
# TODO : this can be written in a concurrency agnostic
|
||||
# way... have a chat with Fawce about this ~Steve
|
||||
while True:
|
||||
|
||||
(rlist, wlist, xlist) = select(
|
||||
|
||||
@@ -113,6 +113,10 @@ class ComponentHost(Component):
|
||||
self.launch_controller()
|
||||
|
||||
def is_timed_out(self):
|
||||
"""
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
cur_time = datetime.datetime.utcnow()
|
||||
|
||||
if len(self.components) == 0:
|
||||
@@ -145,7 +149,7 @@ class ComponentHost(Component):
|
||||
self.signal_exception(exc)
|
||||
|
||||
if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around
|
||||
qutil.LOGGER.info("{id} is DONE".format(id=sync_id))
|
||||
#qutil.LOGGER.debug("{id} is DONE".format(id=sync_id))
|
||||
self.unregister_component(sync_id)
|
||||
self.state_flag = COMPONENT_STATE.DONE
|
||||
else:
|
||||
@@ -508,6 +512,7 @@ class BaseTransform(Component):
|
||||
|
||||
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
|
||||
message = self.feed_socket.recv()
|
||||
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
self.signal_done()
|
||||
return
|
||||
|
||||
+10
-8
@@ -236,7 +236,12 @@ class Controller(object):
|
||||
self.logging.info('Shutdown event loop')
|
||||
|
||||
def log_status(self):
|
||||
self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],))
|
||||
"""
|
||||
Snapshot of the tracked components at every period.
|
||||
"""
|
||||
#self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],))
|
||||
pass
|
||||
|
||||
# -------------
|
||||
# Publications
|
||||
# -------------
|
||||
@@ -352,7 +357,7 @@ class Controller(object):
|
||||
# The various "states of being that a component can inform us
|
||||
# of
|
||||
def new(self, component):
|
||||
self.logging.info('[Controller] New Tracked "%s" ' % component)
|
||||
self.logging.info('[Controller] Alive "%s" ' % component)
|
||||
|
||||
if component in self.topology or self.freeform:
|
||||
self.tracked.add(component)
|
||||
@@ -372,7 +377,7 @@ class Controller(object):
|
||||
self.logging.info('[Controller] Component "%s" done.' % component)
|
||||
|
||||
def exception(self, component, failure):
|
||||
self.logging.error('Component "%s"in exception state' % component)
|
||||
self.logging.error('Component "%s" in exception state' % component)
|
||||
|
||||
# -----------------
|
||||
# Protocol Handling
|
||||
@@ -395,7 +400,7 @@ class Controller(object):
|
||||
else:
|
||||
# Otherwise its something weird and we don't know
|
||||
# what to do so just say so
|
||||
self.logging.error("Weird stuff happened: %s" % status, self.ctime)
|
||||
self.logging.error("Weird stuff happened: %s" % msg)
|
||||
|
||||
# A component is telling us it failed, and how
|
||||
if id is CONTROL_PROTOCOL.EXCEPTION:
|
||||
@@ -404,7 +409,7 @@ class Controller(object):
|
||||
# A component is telling us its done with work and won't
|
||||
# be talking to us anymore
|
||||
if id is CONTROL_PROTOCOL.DONE:
|
||||
self.done(identity, status)
|
||||
self.done(identity)
|
||||
|
||||
# -------------------
|
||||
# Hooks for Endpoints
|
||||
@@ -456,9 +461,6 @@ class Controller(object):
|
||||
|
||||
assert hard or soft, """ Must specify kill hard or soft """
|
||||
|
||||
if not context:
|
||||
context = zmq.Context.instance()
|
||||
|
||||
if hard:
|
||||
self.state = CONTROL_STATES.SHUTDOWN
|
||||
|
||||
|
||||
Reference in New Issue
Block a user