diff --git a/zipline/component.py b/zipline/component.py index 9acf2cc2..13b852b9 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -24,7 +24,7 @@ from datetime import datetime import zipline.util as qutil from zipline.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ - COMPONENT_FAILURE, BACKTEST_STATE + COMPONENT_FAILURE, BACKTEST_STATE, CONTROL_FRAME class Component(object): @@ -313,6 +313,12 @@ class Component(object): exc_type, exc_value, exc_traceback = sys.exc_info() self.stack_trace = exc_traceback + exception_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.EXCEPTION, + str(exc) + ) + self.control_out.send(exception_frame) + qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): @@ -329,10 +335,19 @@ class Component(object): # TODO: proper framing self.sync_socket.send(self.get_id + ":" + str(CONTROL_PROTOCOL.DONE)) + #notify controller we're done + done_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.DONE, + '' + ) + self.control_out.send(done_frame) + self.receive_sync_ack() #notify internal work look that we're done self.done = True # TODO: use state flag + qutil.LOGGER.info("[%s] DONE" % self.get_id) + # ----------- # Messaging # ----------- @@ -350,13 +365,15 @@ class Component(object): def receive_sync_ack(self): """ Wait for synchronization reply from the host. + + DEPRECATED, left in for compatability for now. """ socks = dict(self.sync_poller.poll(self.heartbeat_timeout)) if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN: message = self.sync_socket.recv() - else: - raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id)) + #else: + #raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id)) def bind_data(self): return self.bind_pull_socket(self.addresses['data_address']) @@ -445,6 +462,8 @@ class Component(object): def setup_sync(self): """ Setup the sync socket and poller. ( Connect ) + + DEPRECATED, left in for compatability for now. """ qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 200a50d5..a32418b3 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -3,6 +3,7 @@ import pytz import math import pandas +# from gevent.select import select from zmq.core.poll import select import zipline.messaging as qmsg @@ -200,6 +201,9 @@ class OrderDataSource(qmsg.DataSource): #pull all orders from client. orders = [] count = 0 + + # TODO : this can be written in a concurrency agnostic + # way... have a chat with Fawce about this ~Steve while True: (rlist, wlist, xlist) = select( diff --git a/zipline/messaging.py b/zipline/messaging.py index 10b45d90..749006d0 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -113,6 +113,10 @@ class ComponentHost(Component): self.launch_controller() def is_timed_out(self): + """ + DEPRECATED, left in for compatability for now. + """ + cur_time = datetime.datetime.utcnow() if len(self.components) == 0: @@ -145,7 +149,7 @@ class ComponentHost(Component): self.signal_exception(exc) if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around - qutil.LOGGER.info("{id} is DONE".format(id=sync_id)) + #qutil.LOGGER.debug("{id} is DONE".format(id=sync_id)) self.unregister_component(sync_id) self.state_flag = COMPONENT_STATE.DONE else: @@ -508,6 +512,7 @@ class BaseTransform(Component): if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: message = self.feed_socket.recv() + if message == str(CONTROL_PROTOCOL.DONE): self.signal_done() return diff --git a/zipline/monitor.py b/zipline/monitor.py index 6edd0286..88a73269 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -236,7 +236,12 @@ class Controller(object): self.logging.info('Shutdown event loop') def log_status(self): - self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],)) + """ + Snapshot of the tracked components at every period. + """ + #self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],)) + pass + # ------------- # Publications # ------------- @@ -352,7 +357,7 @@ class Controller(object): # The various "states of being that a component can inform us # of def new(self, component): - self.logging.info('[Controller] New Tracked "%s" ' % component) + self.logging.info('[Controller] Alive "%s" ' % component) if component in self.topology or self.freeform: self.tracked.add(component) @@ -372,7 +377,7 @@ class Controller(object): self.logging.info('[Controller] Component "%s" done.' % component) def exception(self, component, failure): - self.logging.error('Component "%s"in exception state' % component) + self.logging.error('Component "%s" in exception state' % component) # ----------------- # Protocol Handling @@ -395,7 +400,7 @@ class Controller(object): else: # Otherwise its something weird and we don't know # what to do so just say so - self.logging.error("Weird stuff happened: %s" % status, self.ctime) + self.logging.error("Weird stuff happened: %s" % msg) # A component is telling us it failed, and how if id is CONTROL_PROTOCOL.EXCEPTION: @@ -404,7 +409,7 @@ class Controller(object): # A component is telling us its done with work and won't # be talking to us anymore if id is CONTROL_PROTOCOL.DONE: - self.done(identity, status) + self.done(identity) # ------------------- # Hooks for Endpoints @@ -456,9 +461,6 @@ class Controller(object): assert hard or soft, """ Must specify kill hard or soft """ - if not context: - context = zmq.Context.instance() - if hard: self.state = CONTROL_STATES.SHUTDOWN