From f885343690b8cb9533c85598ef2e19d5d8ee1bc7 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Fri, 2 Mar 2012 22:01:42 -0500 Subject: [PATCH 1/2] Remove thread unsafe context spawning. --- zipline/component.py | 4 +--- zipline/monitor.py | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 8c6621d9..6fda03c4 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -139,7 +139,7 @@ class Component(object): # systems where this becomes needed. Add this option. # # http://zeromq.github.com/pyzmq/morethanbindings.html#thread-safety - self.context = self.zmq.Context() + self.context = self.zmq.Context.instance() self.setup_poller() @@ -179,8 +179,6 @@ class Component(object): self.shutdown() self.teardown_sockets() - if self.context: - self.context.destroy() if fail: raise fail else: diff --git a/zipline/monitor.py b/zipline/monitor.py index 7f6cab2d..5db2ea54 100644 --- a/zipline/monitor.py +++ b/zipline/monitor.py @@ -76,7 +76,7 @@ class Controller(object): self.polling = True if not context: - self._ctx = zmq.Context() + self._ctx = zmq.Context.instance() else: self._ctx = context @@ -142,7 +142,7 @@ class Controller(object): """ if not context: - context = zmq.Context() + context = zmq.Context.instance() s = context.socket(zmq.PUSH) s.connect(self.push_socket) @@ -156,7 +156,7 @@ class Controller(object): """ if not context: - context = zmq.Context() + context = zmq.Context.instance() s = context.socket(zmq.SUB) s.connect(self.sub_socket) @@ -168,7 +168,7 @@ class Controller(object): self.polling = False if not context: - context = zmq.Context() + context = zmq.Context.instance() #logging.info('Shutdown controller') From ba5172f01a4a7b6b230414e9ab62d697b32c76e5 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 5 Mar 2012 21:26:07 -0500 Subject: [PATCH 2/2] More safe shutdown assertions. --- zipline/component.py | 61 ++++++++++++++++++++++++++++++-------------- zipline/messaging.py | 6 ++++- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 6fda03c4..4595c192 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -2,15 +2,17 @@ Commonly used messaging components. Contains the base class for all components. - """ import os +import sys import uuid import time import socket import humanhash +from datetime import datetime + import zipline.util as qutil from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE @@ -63,11 +65,14 @@ class Component(object): self.killed = False self.controller = None self.heartbeat_timeout = 2000 - self.state_flag = COMPONENT_STATE.OK # OK | DONE | EXCEPTION - self._exception = None + self.state_flag = COMPONENT_STATE.OK + self.on_done = None + self._exception = None + self.fail_time = None self.start_tic = None self.stop_tic = None + self.note = None # Humanhashes make this way easier to debug because they # stick in your mind unlike a 32 byte string of random hex. @@ -120,7 +125,7 @@ class Component(object): raise NotImplementedError def _run(self): - self.start_tick = time.clock() + self.start_tic = time.time() self.done = False # TODO: use state flag self.sockets = [] @@ -133,12 +138,6 @@ class Component(object): import zmq self.zmq = zmq - # TODO: this can cause max fd errors on BSD machines with - # low ulimits, its perfectly fine to use one Context in - # multithreaded enviroments, its only in multiprocess - # systems where this becomes needed. Add this option. - # - # http://zeromq.github.com/pyzmq/morethanbindings.html#thread-safety self.context = self.zmq.Context.instance() self.setup_poller() @@ -147,9 +146,9 @@ class Component(object): self.setup_sync() self.setup_control() self.loop() + self.shutdown() - - self.end_tick = time.clock() + self.stop_tic = time.time() # shouldn't block if we've done our job correctly # self.context.term() @@ -178,9 +177,6 @@ class Component(object): self.shutdown() self.teardown_sockets() - - if fail: - raise fail else: try: self._run() @@ -189,11 +185,21 @@ class Component(object): self.teardown_sockets() + def working(self): + """ + Controls when the work loop will start and end + + If we encounter an exception or signal done exit. + + Overload for higher order behavior. + """ + return (not self.done) + def loop(self): """ Loop to do work while we still have work to do. """ - while not self.done: # TODO: use state flag + while self.working(): self.confirm() self.do_work() @@ -208,7 +214,7 @@ class Component(object): self.receive_sync_ack() # blocking def runtime(self): - if self.ready(): + if self.ready() and self.start_tic and self.stop_tic: return self.stop_tic - self.start_tic # ---------------------------- @@ -230,7 +236,8 @@ class Component(object): Tear down after normal operation. """ - pass + if self.on_done: + self.on_done() def kill(self): """ @@ -249,7 +256,11 @@ class Component(object): def signal_exception(self, exc=None): self.state_flag = COMPONENT_STATE.EXCEPTION + self.stop_tic = time.time() + self._exception = exc + exc_type, exc_value, exc_traceback = sys.exc_info() + self.stack_trace = exc_traceback qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) @@ -360,7 +371,11 @@ class Component(object): overall status of the simulation and to forcefully tear down the simulation in case of a failure. """ - assert self.controller + + # Allow for the possibility of not having a controller, + # possibly the zipline devsimulator may not want this. + if not self.controller: + return self.control_out = self.controller.message_sender(context=self.context) self.control_in = self.controller.message_listener(context=self.context) @@ -433,6 +448,14 @@ class Component(object): """ return False + def note(self): + """ + Information about the component. Mostly used for testing. + """ + + def get_note(self): + return self.note or '' + def debug(self): """ Debug information about the component. diff --git a/zipline/messaging.py b/zipline/messaging.py index fbeadd90..7c80a3e1 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -26,10 +26,13 @@ class ComponentHost(Component): def init(self): - # Component Registry + # Component Registry, keyed by get_id # ---------------------- self.components = {} # ---------------------- + # Internal Registry, keyed by guid + self._components = {} + # ---------------------- self.sync_register = {} self.timeout = datetime.timedelta(seconds=5) @@ -69,6 +72,7 @@ class ComponentHost(Component): component.addresses = self.addresses component.controller = self.controller + self._components[component.guid] = component self.components[component.get_id] = component self.sync_register[component.get_id] = datetime.datetime.utcnow()