diff --git a/zipline/core/component.py b/zipline/core/component.py index d10ccb2c..baadaa26 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -516,7 +516,7 @@ class Component(object): def bind_pub_socket(self, addr): pub_socket = self.context.socket(self.zmq.PUB) pub_socket.bind(addr) - pub_socket.setsockopt(self.zmq.LINGER, 0) + #pub_socket.setsockopt(self.zmq.LINGER, 0) self.out_socket = pub_socket return pub_socket diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index a34b5ec9..de0a8b59 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -9,7 +9,7 @@ import gevent_zeromq from setproctitle import setproctitle from signal import SIGHUP, SIGINT -from collections import OrderedDict +from collections import OrderedDict, Counter from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ @@ -95,6 +95,8 @@ class Controller(object): self.error_replay = OrderedDict() + self.missed_beats = Counter() + log.warn("Running Controller in development mode, will ONLY synchronize start.") def init_zmq(self, flavor): @@ -429,10 +431,25 @@ class Controller(object): if self.debug: log.info('Bad component %r' % component) - if self.debug: - for component in missing: + self.missed_beats.update(missing) + for component in missing: + if self.missed_beats[component] >\ + PARAMETERS.ALLOWED_SKIPPED_HEARTBEATS: + # TODO: determine when this propogates to a true + # failure, missing one heartbeat could just mean that + # its CPU overloaded + log.warning('Component missed max heartbeats, failing %s'\ + % component) + self.fail_universal() + + if self.debug: log.info('Missing component %r' % component) + if self.debug: + #for component in good: + # log.info('good component %r' % component) + + for component in self.tracked: if component not in self.topology: log.info('Uninvited component %r' % component) @@ -495,10 +512,12 @@ class Controller(object): log.warning('Component "%s" missed heartbeat' % component) self.tracked.remove(component) - # TODO: determine when this propogates to a true - # failure, missing one heartbeat could just mean that - # its CPU overloaded - #fail_handlers.get(component, universal)() + # TODO: determine when this propogates to a true + # failure, missing one heartbeat could just mean that + # its CPU overloaded + #log.warning('Component missed max heartbeats, failing %s'\ + # % component) + #fail_handlers.get(component, universal)() # ------------------- # Completion Handling