Merge branch 'master' into portfolio

This commit is contained in:
fawce
2012-03-07 17:28:45 -05:00
3 changed files with 52 additions and 27 deletions
+43 -22
View File
@@ -2,15 +2,17 @@
Commonly used messaging components.
Contains the base class for all components.
"""
import os
import sys
import uuid
import time
import socket
import humanhash
from datetime import datetime
import zipline.util as qutil
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
@@ -63,11 +65,14 @@ class Component(object):
self.killed = False
self.controller = None
self.heartbeat_timeout = 2000
self.state_flag = COMPONENT_STATE.OK # OK | DONE | EXCEPTION
self._exception = None
self.state_flag = COMPONENT_STATE.OK
self.on_done = None
self._exception = None
self.fail_time = None
self.start_tic = None
self.stop_tic = None
self.note = None
# Humanhashes make this way easier to debug because they
# stick in your mind unlike a 32 byte string of random hex.
@@ -120,7 +125,7 @@ class Component(object):
raise NotImplementedError
def _run(self):
self.start_tick = time.clock()
self.start_tic = time.time()
self.done = False # TODO: use state flag
self.sockets = []
@@ -133,13 +138,7 @@ class Component(object):
import zmq
self.zmq = zmq
# TODO: this can cause max fd errors on BSD machines with
# low ulimits, its perfectly fine to use one Context in
# multithreaded enviroments, its only in multiprocess
# systems where this becomes needed. Add this option.
#
# http://zeromq.github.com/pyzmq/morethanbindings.html#thread-safety
self.context = self.zmq.Context()
self.context = self.zmq.Context.instance()
self.setup_poller()
@@ -147,9 +146,9 @@ class Component(object):
self.setup_sync()
self.setup_control()
self.loop()
self.shutdown()
self.end_tick = time.clock()
self.stop_tic = time.time()
# shouldn't block if we've done our job correctly
# self.context.term()
@@ -178,11 +177,6 @@ class Component(object):
self.shutdown()
self.teardown_sockets()
if self.context:
self.context.destroy()
if fail:
raise fail
else:
try:
self._run()
@@ -191,11 +185,21 @@ class Component(object):
self.teardown_sockets()
def working(self):
"""
Controls when the work loop will start and end
If we encounter an exception or signal done exit.
Overload for higher order behavior.
"""
return (not self.done)
def loop(self):
"""
Loop to do work while we still have work to do.
"""
while not self.done: # TODO: use state flag
while self.working():
self.confirm()
self.do_work()
@@ -210,7 +214,7 @@ class Component(object):
self.receive_sync_ack() # blocking
def runtime(self):
if self.ready():
if self.ready() and self.start_tic and self.stop_tic:
return self.stop_tic - self.start_tic
# ----------------------------
@@ -232,7 +236,8 @@ class Component(object):
Tear down after normal operation.
"""
pass
if self.on_done:
self.on_done()
def kill(self):
"""
@@ -251,7 +256,11 @@ class Component(object):
def signal_exception(self, exc=None):
self.state_flag = COMPONENT_STATE.EXCEPTION
self.stop_tic = time.time()
self._exception = exc
exc_type, exc_value, exc_traceback = sys.exc_info()
self.stack_trace = exc_traceback
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
@@ -362,7 +371,11 @@ class Component(object):
overall status of the simulation and to forcefully tear
down the simulation in case of a failure.
"""
assert self.controller
# Allow for the possibility of not having a controller,
# possibly the zipline devsimulator may not want this.
if not self.controller:
return
self.control_out = self.controller.message_sender(context=self.context)
self.control_in = self.controller.message_listener(context=self.context)
@@ -435,6 +448,14 @@ class Component(object):
"""
return False
def note(self):
"""
Information about the component. Mostly used for testing.
"""
def get_note(self):
return self.note or ''
def debug(self):
"""
Debug information about the component.
+5 -1
View File
@@ -26,10 +26,13 @@ class ComponentHost(Component):
def init(self):
# Component Registry
# Component Registry, keyed by get_id
# ----------------------
self.components = {}
# ----------------------
# Internal Registry, keyed by guid
self._components = {}
# ----------------------
self.sync_register = {}
self.timeout = datetime.timedelta(seconds=5)
@@ -69,6 +72,7 @@ class ComponentHost(Component):
component.addresses = self.addresses
component.controller = self.controller
self._components[component.guid] = component
self.components[component.get_id] = component
self.sync_register[component.get_id] = datetime.datetime.utcnow()
+4 -4
View File
@@ -76,7 +76,7 @@ class Controller(object):
self.polling = True
if not context:
self._ctx = zmq.Context()
self._ctx = zmq.Context.instance()
else:
self._ctx = context
@@ -142,7 +142,7 @@ class Controller(object):
"""
if not context:
context = zmq.Context()
context = zmq.Context.instance()
s = context.socket(zmq.PUSH)
s.connect(self.push_socket)
@@ -156,7 +156,7 @@ class Controller(object):
"""
if not context:
context = zmq.Context()
context = zmq.Context.instance()
s = context.socket(zmq.SUB)
s.connect(self.sub_socket)
@@ -168,7 +168,7 @@ class Controller(object):
self.polling = False
if not context:
context = zmq.Context()
context = zmq.Context.instance()
#logging.info('Shutdown controller')