From dd1056bf309c1dc3725a1fd3eca41db216a6dfd6 Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 1 Aug 2012 23:41:44 -0400 Subject: [PATCH] generator backed component, and a starter test for a source. --- tests/test_components.py | 45 +++- tests/test_monitor.py | 10 +- zipline/__init__.py | 4 +- zipline/core/__init__.py | 4 +- zipline/core/component.py | 514 +++++++++++++++--------------------- zipline/core/host.py | 2 +- zipline/core/monitor.py | 4 +- zipline/core/process.py | 8 +- zipline/gens/zmqgen.py | 27 ++ zipline/lines.py | 14 +- zipline/utils/test_utils.py | 44 +-- 11 files changed, 324 insertions(+), 352 deletions(-) create mode 100644 zipline/gens/zmqgen.py diff --git a/tests/test_components.py b/tests/test_components.py index 5c39abbf..02b05a69 100644 --- a/tests/test_components.py +++ b/tests/test_components.py @@ -1,10 +1,10 @@ import zmq +import pytz from datetime import datetime, timedelta from unittest2 import TestCase from collections import defaultdict -from zipline.test_algorithms import ExceptionAlgorithm, DivByZeroAlgorithm from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading @@ -15,18 +15,21 @@ from zipline.utils.test_utils import ( setup_logger, teardown_logger, launch_component, - gen_from_socket + create_monitor, + launch_monitor ) from zipline.core import Component +from zipline.core.component import ComponentSocketArgs from zipline.protocol import ( - DATASOURCE_FRAME + DATASOURCE_FRAME, + DATASOURCE_UNFRAME ) from zipline.gens.tradegens import SpecificEquityTrades from zipline.gens.utils import hash_args - +from zipline.gens.zmqgen import gen_from_poller import logbook log = logbook.Logger('ComponentTestCase') @@ -54,28 +57,50 @@ class ComponentTestCase(TestCase): teardown_logger(self) def test_specific_equity_source(self): + filter = [1,2,3,4] #Set up source a. One minute between events. args_a = tuple() kwargs_a = { 'sids' : [1,2], - 'start' : datetime(2012,6,6,0), + 'start' : datetime(2012,6,6,0,tzinfo=pytz.utc), 'delta' : timedelta(minutes = 1), - 'filter' : filter + 'filter' : filter, + 'count' : 100 } c_id = SpecificEquityTrades.__name__ + hash_args(args_a, kwargs_a) + mon = create_monitor(allocator) + + out_socket_args = ComponentSocketArgs( + style=zmq.PUSH, + uri=allocator.lease(1)[0], + bind=True + ) c = Component( SpecificEquityTrades, args_a, kwargs_a, - out_uri=self.out_uri, - frame=DATASOURCE_FRAME, - monitor_uri=None + c_id, + out_socket_args, + DATASOURCE_FRAME, + mon ) + + mon.manage(set([c.get_id])) + mon_proc = launch_monitor(mon) + # launch in a process proc = launch_component(c) - for msg in gen_from_socket(self.out_uri): + pull_socket = self.ctx.socket(zmq.PULL) + pull_socket.connect(out_socket_args.uri) + poller = zmq.Poller() + poller.register(pull_socket, zmq.POLLIN) + unframe = DATASOURCE_UNFRAME + for msg in gen_from_poller(poller, pull_socket, unframe): # assert things about the messages. log.info(msg) + + pull_socket.close() + log.info("DONE!") diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 5f55aaee..3d063954 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -1,7 +1,7 @@ from zipline.utils.test_utils import setup_logger, teardown_logger from unittest2 import TestCase, skip -from zipline.core.monitor import Controller +from zipline.core.monitor import Monitor class TestMonitor(TestCase): def setUp(self): @@ -15,12 +15,12 @@ class TestMonitor(TestCase): pub_socket = 'tcp://127.0.0.1:5000' route_socket = 'tcp://127.0.0.1:5001' - con = Controller(pub_socket, route_socket) - con.manage([]) + mon = Monitor(pub_socket, route_socket) + mon.manage([]) def test_init_topology(self): pub_socket = 'tcp://127.0.0.1:5000' route_socket = 'tcp://127.0.0.1:5001' - con = Controller(pub_socket, route_socket, ) - con.manage([ 'a', 'b', 'c', 'd' ]) + mon = Monitor(pub_socket, route_socket, ) + mon.manage([ 'a', 'b', 'c', 'd' ]) diff --git a/zipline/__init__.py b/zipline/__init__.py index 23bcca40..a84cd345 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -6,14 +6,14 @@ Zipline # it is a place to expose the public interfaces. import protocol # namespace -from core.monitor import Controller +from core.monitor import Monitor from lines import SimulatedTrading from core.host import ComponentHost from utils.protocol_utils import ndict __all__ = [ SimulatedTrading, - Controller, + Monitor, ComponentHost, protocol, ndict diff --git a/zipline/core/__init__.py b/zipline/core/__init__.py index d487dd05..a7d6b1f8 100644 --- a/zipline/core/__init__.py +++ b/zipline/core/__init__.py @@ -1,9 +1,9 @@ from host import ComponentHost from component import Component -from monitor import Controller +from monitor import Monitor __all__ = [ Component, - Controller, + Monitor, ComponentHost ] diff --git a/zipline/core/component.py b/zipline/core/component.py index ac92b565..f9dc63c1 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -16,6 +16,8 @@ from collections import namedtuple # pyzmq import zmq +from zipline.gens.zmqgen import gen_from_poller + from zipline.core.monitor import PARAMETERS from zipline.protocol import ( @@ -38,40 +40,6 @@ ComponentSocketArgs = namedtuple('ComponentSocket',['uri','style','bind']) class Component(object): - """ - Base class for components. Defines the the base messaging - interface for components. - - :param addresses: a dict of name_string -> zmq port address strings. - Must have the following entries - - :param data_address: socket address used for data sources to stream - their records. Will be used in PUSH/PULL sockets - between data sources and a Feed. Bind will always - be on the PULL side (we always have N producers and - 1 consumer) - - :param feed_address: socket address used to publish consolidated feed - from serialization of data sources - will be used in PUB/SUB sockets between Feed and - Transforms. Bind is always on the PUB side. - - :param merge_address: socket address used to publish transformed - values. will be used in PUSH/PULL from many - transforms to one Merge Bind will always be on - the PULL side (we always have N producers and - 1 consumer) - - :param results_address: socket address used to publish merged data - source feed and transforms to clients will be - used in PUB/SUB from one Merge to one or many - clients. Bind is always on the PUB side. - - bind/connect methods will return the correct socket type for each - address. - - """ - # ------------ # Construction # ------------ @@ -82,8 +50,10 @@ class Component(object): gen_kwargs, component_id, out_socket_args, - controller=None, - in_socket_args=None + frame, + monitor, + in_socket_args=None, + unframe=None ): assert component_id, \ @@ -97,11 +67,6 @@ class Component(object): assert isinstance(in_socket_args, ComponentSocketArgs), \ "in_socket_args args must be ComponentSocketArgs" - if monitor_socket_args: - assert isinstance(monitor_socket_args, ComponentSocketArgs), \ - "monitor_socket_args args must be ComponentSocketArgs" - - # ----------------- # Generator # ----------------- @@ -110,6 +75,7 @@ class Component(object): self.gen_kwargs = gen_kwargs self.gen_func = gen_func self.generator = None + self.frame = frame # lock for waiting on monitor "GO" self.waiting = None @@ -123,11 +89,13 @@ class Component(object): self.context = None self.out_socket = None self.in_socket = None - self.controller = controller + self.monitor = monitor + self.unframe = unframe # TODO: state_flag is deprecated, remove self.state_flag = COMPONENT_STATE.OK + # track time of last ping we received from monitor self.last_ping = None # Humanhashes make this way easier to debug because they stick @@ -140,70 +108,6 @@ class Component(object): # Core Methods # ------------ - def open(self): - """ - Open the connections needed to start doing work. - Perform any setup that must be done within process. - """ - # The process title so you can watch it in top, ps. - setproctitle(self.generator.__name__) - - if self.in_socket_args: - self.in_socket = self.open_socket(self.in_socket_args) - poller_gen = self.gen_from_zmq(self.in_socket) - self.gen_func(poller_gen, *self.gen_args, **self.gen_kwargs) - else: - self.generator = self.gen_func(*self.gen_args, **self.gen_kwargs) - - self.out_socket = self.open_socket(self.out_socket_args) - - def open_socket(self, sock_args): - if sock_args.bind: - return self.bind_socket(sock_args) - else: - return self.connect_socket(sock_args) - - def bind_socket(self, sock_args): - if sock_args.style == zmq.PULL: - return self.bind_pull_socket(sock_args.uri) - if sock_args.style == zmq.PUSH: - return self.bind_push_socket(sock_args.uri) - if sock_args.style == zmq.PUB: - return self.bind_pub_socket(sock_args.uri) - - raise Exception("Invalid socket arguments") - - def connect_socket(self, sock_args): - if sock_args.style == zmq.PULL: - return self.connect_pull_socket(sock_args.uri) - if sock_args.style == zmq.PUSH: - return self.connect_push_socket(sock_args.uri) - if sock_args.style == zmq.SUB: - return self.connect_sub_socket(sock_args.uri) - - raise Exception("Invalid socket arguments") - - def ready(self): - """ - Return ``True`` if and only if the component has finished - execution. - """ - return self.state_flag in [COMPONENT_STATE.DONE, \ - COMPONENT_STATE.EXCEPTION] - - def successful(self): - """ - Return ``True`` if and only if the component has finished - execution successfully, that is, without raising an error. - """ - return self.state_flag == COMPONENT_STATE.DONE and not \ - self.exception - - def init_zmq(self): - self.zmq = zmq - self.context = self.zmq.Context() - self.zmq_poller = self.zmq.Poller - return def _run(self): """ @@ -212,17 +116,15 @@ class Component(object): The core logic of the all components is run here. """ + # The process title so you can watch it in top, ps. + setproctitle(self.gen_func.__name__) + log.info("Start %r" % self) log.info("Pid %s" % os.getpid()) log.info("Group %s" % os.getpgrp()) - self.done = False # TODO: use state flag self.sockets = [] - self.init_zmq() - self.setup_poller() - - self.setup_control() self.open() self.signal_ready() @@ -232,10 +134,14 @@ class Component(object): # ----------------------- # YOU SHALL NOT PASS!!!!! # ----------------------- - # ... until the controller signals GO + # ... until the monitor signals GO - self.loop() + for event in self.generator: + self.heartbeat() + msg = self.frame(event) + self.out_socket.send(msg) + self.signal_done() def run(self, catch_exceptions=True): """ @@ -249,108 +155,10 @@ class Component(object): else: # if we get a kill signal, forcibly close all the # sockets. - # exc_info = sys.exc_info() - # self.relay_exception(exc_info[0], exc_info[1], exc_info[2]) self.teardown_sockets() - finally: log.info("Exiting %r" % self) - def working(self): - """ - Controls when the work loop will start and end - - If we encounter an exception or signal done exit. - - Overload for higher order behavior. - """ - return (not self.done) - - def loop(self, lockstep=True): - """ - Loop to do work while we still have work to do. - """ - - for event in self.generator: - self.heartbeat() - msg = self.frame(event) - self.out_socket.send(msg) - - def heartbeat(self, timeout=0): - # wait for synchronization reply from the host - socks = dict(self.poll.poll(timeout)) - - # ---------------- - # Control Dispatch - # ---------------- - assert self.control_in, 'Component does not have a control_in socket' - - if socks.get(self.control_in) == zmq.POLLIN: - msg = self.control_in.recv() - event, payload = CONTROL_UNFRAME(msg) - - # =========== - # Heartbeat - # =========== - - # The controller will send out a single number packed in - # a CONTROL_FRAME with ``heartbeat`` event every - # (n)-seconds. The component then has n seconds to - # respond to it. If not then it will be considered as - # malfunctioning or maybe CPU bound. - - if event == CONTROL_PROTOCOL.HEARTBEAT: - # Heart outgoing - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - payload - ) - - self.last_ping = float(payload) - # Echo back the heartbeat identifier to tell the - # controller that this component is still alive and - # doing work - self.control_out.send(heartbeat_frame) - - - # ========= - # Soft Kill - # ========= - - # Try and clean up properly and send out any reports or - # data that are done during a clean shutdown. Inform the - # controller that we're done. - elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() - - # ========= - # Hard Kill - # ========= - - # Just exit. - elif event == CONTROL_PROTOCOL.KILL: - self.kill() - - # In case we didn't receive a ping, send a pre-emptive - # pong to the monitor. - elif self.last_ping and time.time() - self.last_ping > 1: - # send a ping ahead of schedule - pre_pong = time.time() - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - str(pre_pong) - ) - - # Echo back the heartbeat identifier to tell the - # controller that this component is still alive and - # doing work - self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) - self.last_ping = pre_pong - elif self.last_ping and \ - time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: - # monitor is gone without sending the shutdown - # signal, do a hard exit. - self.kill() # ---------------------------- # Cleanup & Modes of Failure @@ -375,6 +183,62 @@ class Component(object): """ raise KillSignal() + def signal_exception(self, exc=None, scope=None): + """ + All exceptions inside any component should boil back to + this handler. + + Will inform the system that the component has failed and how it + has failed. + """ + self.state_flag = COMPONENT_STATE.EXCEPTION + exc_type, exc_value, exc_traceback = sys.exc_info() + + # if a downstream component fails, this component may try + # sending when there are zero connections to the socket, + # which will raise ZMQError(EAGAIN). So, it doesn't make + # sense to relay this exception to Monitor and the rest + # of the zipline. + if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN: + log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\ + .format(id=self.get_id)) + return + + # sys.stdout.write(trace) + log.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + + try: + log.info('{id} sending exception to monitor'\ + .format(id=self.get_id)) + msg = EXCEPTION_FRAME( + exc_traceback, + exc_type.__name__, + exc_value.message + ) + + exception_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.EXCEPTION, + msg + ) + self.control_out.send(exception_frame, self.zmq.NOBLOCK) + # The monitor should relay the exception back + # to all zipline components. Wait here until the + # notice arrives, and we can assume other zipline + # components have broken out of their message + # loops. + for i in xrange(PARAMETERS.MAX_COMPONENT_WAIT): + self.heartbeat(timeout=1000) + log.warn("{id} never heard back from monitor."\ + .format(id=self.get_id)) + + except KillSignal: + log.info("{id} received confirmation from monitor"\ + .format(id=self.get_id)) + except: + log.exception("Exception waiting for monitor reply") + + + # ---------------------- # Internal Maintenance # ---------------------- @@ -418,7 +282,7 @@ class Component(object): # Go # ==== - # A distributed lock from the controller to ensure + # A distributed lock from the monitor to ensure # synchronized start. if event == CONTROL_PROTOCOL.HEARTBEAT: @@ -430,7 +294,7 @@ class Component(object): log.info('Prestart Heartbeat ' + self.get_id) elif event == CONTROL_PROTOCOL.GO: - # Side effectful call from the controller to unlock + # Side effectful call from the monitor to unlock # and begin doing work only when the entire topology # of the system beings to come online log.info('Unlocking ' + self.__class__.__name__) @@ -442,7 +306,7 @@ class Component(object): # Try and clean up properly and send out any reports or # data that are done during a clean shutdown. Inform the - # controller that we're done. + # monitor that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: self.signal_done() break @@ -462,6 +326,82 @@ class Component(object): self.kill() break + def heartbeat(self, timeout=0): + # wait for synchronization reply from the host + socks = dict(self.poll.poll(timeout)) + + # ---------------- + # Control Dispatch + # ---------------- + assert self.control_in, 'Component does not have a control_in socket' + + if socks.get(self.control_in) == zmq.POLLIN: + msg = self.control_in.recv() + event, payload = CONTROL_UNFRAME(msg) + + # =========== + # Heartbeat + # =========== + + # The monitor will send out a single number packed in + # a CONTROL_FRAME with ``heartbeat`` event every + # (n)-seconds. The component then has n seconds to + # respond to it. If not then it will be considered as + # malfunctioning or maybe CPU bound. + + if event == CONTROL_PROTOCOL.HEARTBEAT: + # Heart outgoing + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + payload + ) + + self.last_ping = float(payload) + # Echo back the heartbeat identifier to tell the + # monitor that this component is still alive and + # doing work + self.control_out.send(heartbeat_frame) + + + # ========= + # Soft Kill + # ========= + + # Try and clean up properly and send out any reports or + # data that are done during a clean shutdown. Inform the + # monitor that we're done. + elif event == CONTROL_PROTOCOL.SHUTDOWN: + self.signal_done() + + # ========= + # Hard Kill + # ========= + + # Just exit. + elif event == CONTROL_PROTOCOL.KILL: + self.kill() + + # In case we didn't receive a ping, send a pre-emptive + # pong to the monitor. + elif self.last_ping and time.time() - self.last_ping > 1: + # send a ping ahead of schedule + pre_pong = time.time() + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + str(pre_pong) + ) + + # Echo back the heartbeat identifier to tell the + # monitor that this component is still alive and + # doing work + self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) + self.last_ping = pre_pong + elif self.last_ping and \ + time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: + # monitor is gone without sending the shutdown + # signal, do a hard exit. + self.kill() + def signal_ready(self): log.info(self.__class__.__name__ + ' is ready') @@ -471,60 +411,6 @@ class Component(object): ) self.control_out.send(frame) - def signal_exception(self, exc=None, scope=None): - """ - All exceptions inside any component should boil back to - this handler. - - Will inform the system that the component has failed and how it - has failed. - """ - self.state_flag = COMPONENT_STATE.EXCEPTION - exc_type, exc_value, exc_traceback = sys.exc_info() - - # if a downstream component fails, this component may try - # sending when there are zero connections to the socket, - # which will raise ZMQError(EAGAIN). So, it doesn't make - # sense to relay this exception to Monitor and the rest - # of the zipline. - if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN: - log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\ - .format(id=self.get_id)) - return - - # sys.stdout.write(trace) - log.exception("Unexpected error in run for {id}.".format(id=self.get_id)) - - if hasattr(self, 'control_out') and self.control_out: - try: - log.info('{id} sending exception to controller'\ - .format(id=self.get_id)) - msg = EXCEPTION_FRAME( - exc_traceback, - exc_type.__name__, - exc_value.message - ) - - exception_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.EXCEPTION, - msg - ) - self.control_out.send(exception_frame, self.zmq.NOBLOCK) - # The controller should relay the exception back - # to all zipline components. Wait here until the - # notice arrives, and we can assume other zipline - # components have broken out of their message - # loops. - for i in xrange(PARAMETERS.MAX_COMPONENT_WAIT): - self.heartbeat(timeout=1000) - log.warn("{id} never heard back from monitor."\ - .format(id=self.get_id)) - except KillSignal: - log.info("{id} received confirmation from controller"\ - .format(id=self.get_id)) - except: - log.exception("Exception waiting for controller reply") - def signal_done(self): """ Notify down stream components that we're done. @@ -534,20 +420,18 @@ class Component(object): # notify internal work loop that we're done self.done = True # TODO: use state flag - if hasattr(self, 'out_socket') and self.out_socket: - msg = zmq.Message(str(CONTROL_PROTOCOL.DONE)) - self.out_socket.send(msg) + msg = zmq.Message(str(CONTROL_PROTOCOL.DONE)) + self.out_socket.send(msg) - if hasattr(self, 'control_out'): - # notify controller we're done - done_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.DONE, - '' - ) + # notify monitor we're done + done_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.DONE, + '' + ) - self.control_out.send(done_frame) - log.info("[%s] sent control done" % self.get_id) + self.control_out.send(done_frame) + log.info("[%s] sent control done" % self.get_id) # there is a narrow race condition where we finish just # after the Monitor accepts our prior heartbeat, but just @@ -559,12 +443,60 @@ class Component(object): # Messaging # ----------- - def setup_poller(self): + def open(self): """ - Setup the poller used for multiplexing the incoming data - handling sockets. + Open the connections needed to start doing work. + Perform any setup that must be done within process. """ - self.poll = self.zmq_poller() + + self.zmq = zmq + self.context = self.zmq.Context() + self.poll = self.zmq.Poller() + + self.setup_control() + + if self.in_socket_args: + self.in_socket = self.open_socket(self.in_socket_args) + poller_gen = gen_from_poller( + self.poller, + self.in_socket, + self.unframe + ) + self.generator = self.gen_func( + poller_gen, + *self.gen_args, + **self.gen_kwargs + ) + else: + self.generator = self.gen_func(*self.gen_args, **self.gen_kwargs) + + self.out_socket = self.open_socket(self.out_socket_args) + + def open_socket(self, sock_args): + if sock_args.bind: + return self.bind_socket(sock_args) + else: + return self.connect_socket(sock_args) + + def bind_socket(self, sock_args): + if sock_args.style == zmq.PULL: + return self.bind_pull_socket(sock_args.uri) + if sock_args.style == zmq.PUSH: + return self.bind_push_socket(sock_args.uri) + if sock_args.style == zmq.PUB: + return self.bind_pub_socket(sock_args.uri) + + raise Exception("Invalid socket arguments") + + def connect_socket(self, sock_args): + if sock_args.style == zmq.PULL: + return self.connect_pull_socket(sock_args.uri) + if sock_args.style == zmq.PUSH: + return self.connect_push_socket(sock_args.uri) + if sock_args.style == zmq.SUB: + return self.connect_sub_socket(sock_args.uri) + + raise Exception("Invalid socket arguments") def bind_push_socket(self, addr): push_socket = self.context.socket(self.zmq.PUSH) @@ -623,12 +555,12 @@ class Component(object): of the simulation and to forcefully tear down the simulation in case of a failure. """ - self.control_out = self.controller.message_sender( + self.control_out = self.monitor.message_sender( identity = self.get_id, context = self.context, ) - self.control_in = self.controller.message_listener( + self.control_in = self.monitor.message_listener( context = self.context ) @@ -639,18 +571,6 @@ class Component(object): # Description and Debug # --------------------- - def extern_logger(self): - """ - Pipe logs out to a provided logging interface. - """ - pass - - def setup_extern_logger(self): - """ - Pipe logs out to a provided logging interface. - """ - pass - @property def get_id(self): """ diff --git a/zipline/core/host.py b/zipline/core/host.py index ea1ca0aa..37de82aa 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -103,7 +103,7 @@ class ComponentHost(object): log.info('== Roll Call ==') - log.info('Controller') + log.info('Monitor') self.launch_controller() diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 08dfd601..183de45b 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -38,7 +38,7 @@ class UnknownChatter(Exception): return """Component calling itself "%s" talking on unexpected channel""" % self.named -log = logbook.Logger('Controller') +log = logbook.Logger('Monitor') # The scalars determining the timing of the monitor behavior for # the system. @@ -56,7 +56,7 @@ PARAMETERS = ndict(dict( SYSTEM_TIMEOUT = 50, )) -class Controller(object): +class Monitor(object): """ A N to M messaging system for inter component communication. diff --git a/zipline/core/process.py b/zipline/core/process.py index b2a01429..dc1fcd3c 100644 --- a/zipline/core/process.py +++ b/zipline/core/process.py @@ -40,13 +40,13 @@ class ProcessSimulator(ComponentHost): # invoked by the host's open() def launch_controller(self): - proc = multiprocessing.Process(target=self.controller.run) + proc = multiprocessing.Process(target=self.monitor.run) proc.start() self.con = proc # Process specific - self.controller_process = proc - self.mapping[proc.pid] = 'Controller' + self.monitor_process = proc + self.mapping[proc.pid] = 'Monitor' def launch_component(self, component): proc = multiprocessing.Process(target=component.run) @@ -81,7 +81,7 @@ class ProcessSimulator(ComponentHost): process.join(timeout=1) process.terminate() - self.controller.shutdown(soft=True) + self.monitor.shutdown(soft=True) self.running = False self.con.terminate() diff --git a/zipline/gens/zmqgen.py b/zipline/gens/zmqgen.py new file mode 100644 index 00000000..e51e3bab --- /dev/null +++ b/zipline/gens/zmqgen.py @@ -0,0 +1,27 @@ +import zmq +import zipline.protocol as zp + +def gen_from_pull_socket(socket_uri, context, unframe): + """ + A generator that takes a socket_uri, and yields + messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE. + """ + pull_socket = context.socket(zmq.PULL) + pull_socket.connect(socket_uri) + poller = zmq.Poller() + poller.register(pull_socket, zmq.POLLIN) + + return gen_from_poller(poller, pull_socket, unframe) + +def gen_from_poller(poller, in_socket, unframe): + + while True: + socks = dict(poller.poll(1000)) + + if socks.get(in_socket) == zmq.POLLIN: + message = in_socket.recv() + if message == str(zp.CONTROL_PROTOCOL.DONE): + break + else: + event = unframe(message) + yield event diff --git a/zipline/lines.py b/zipline/lines.py index 6a70bc14..3d3ab2a6 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -70,7 +70,7 @@ from zipline.transforms import BaseTransform from zipline.test_algorithms import TestAlgorithm from zipline.components import TradeSimulationClient from zipline.core.process import ProcessSimulator -from zipline.core.monitor import Controller +from zipline.core.monitor import Monitor from zipline.finance.trading import SIMULATION_STYLE log = logbook.Logger('Lines') @@ -131,7 +131,7 @@ class SimulatedTrading(object): 'results_address' : sockets[4], } - self.con = Controller( + self.monitor = Monitor( # pub socket sockets[5], # route socket @@ -163,7 +163,7 @@ class SimulatedTrading(object): #setup transforms self.transforms = {} - self.sim.register_controller( self.con ) + self.sim.register_monitor( self.monitor ) @staticmethod @@ -348,15 +348,15 @@ class SimulatedTrading(object): return base | transforms | sources - def setup_controller(self): + def setup_monitor(self): """ - Prepare the controller to manage the topology specified + Prepare the monitor to manage the topology specified by this line. """ - self.con.manage(self.topology) + self.monitor.manage(self.topology) def simulate(self, blocking=True): - self.setup_controller() + self.setup_monitor() self.started = True self.sim_context = self.sim.simulate() diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 74591858..eda5a133 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -7,7 +7,7 @@ import blist from zipline.utils.date_utils import EPOCH from itertools import izip from logbook import FileHandler - +from zipline.core.monitor import Monitor def setup_logger(test, path='/var/log/zipline/zipline.log'): test.log_handler = FileHandler(path) @@ -144,31 +144,31 @@ def assert_single_position(test, zipline): ) -def launch_component(self, component): +def launch_component(component): proc = multiprocessing.Process(target=component.run) proc.start() - self.subprocesses.append(proc) - - self.mapping[proc.pid] = component.get_id return proc -def gen_from_socket(socket_uri, context, unframe): - """ - A generator that takes a socket_uri, and yields - messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE. - """ - pull_socket = context.socket(zmq.PULL) - pull_socket.connect(socket_uri) - poller = zmq.Poller() - poller.register(pull_socket, zmq.POLLIN) +def launch_monitor(monitor): + proc = multiprocessing.Process(target=monitor.run) + proc.start() + return proc - while True: - socks = dict(poller.poll(1000)) - if socks.get(pull_socket) == zmq.POLLIN: - message = pull_socket.recv() +def create_monitor(allocator): + sockets = allocator.lease(3) + mon = Monitor( + # pub socket + sockets[0], + # route socket + sockets[1], + # exception socket to match tradesimclient's result + # socket, because we want to relay exceptions to the + # same listener + sockets[2], + # this controller is expected to run in a test, so no + # need to signal the parent process on success or error. + send_sighup=False + ) - if message.type == zp.CONTROL_PROTOCOL.DONE: - break - else: - yield unframe(message) + return mon