diff --git a/tests/test_components.py b/tests/test_components.py index ff353186..cf140677 100644 --- a/tests/test_components.py +++ b/tests/test_components.py @@ -64,19 +64,19 @@ class ComponentTestCase(TestCase): 'count' : count } + trade_gen = SpecificEquityTrades(*args_a, **kwargs_a) + monitor.add_to_topology(trade_gen.get_hash()) + + launch_monitor(monitor) + comp_a = Component( - SpecificEquityTrades(*args_a, **kwargs_a), + trade_gen, monitor, socket_uri, DATASOURCE_FRAME, DATASOURCE_UNFRAME ) - launch_monitor(monitor) - iter_a = iter(comp_a) - ev = iter_a.next() - return - for event in comp_a: log.info(event) @@ -96,16 +96,8 @@ class ComponentTestCase(TestCase): 'filter' : filter, 'count' : count } - - - comp_a = Component( - SpecificEquityTrades(*args_a, **kwargs_a), - monitor, - socket_uris[0], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME - ) - + trade_gen_a = SpecificEquityTrades(*args_a, **kwargs_a) + monitor.add_to_topology(trade_gen_a.get_hash()) #Set up source b. Two minutes between events. args_b = tuple() @@ -116,15 +108,8 @@ class ComponentTestCase(TestCase): 'filter' : filter, 'count' : count } - - - comp_b = Component( - SpecificEquityTrades(*args_b, **kwargs_b), - monitor, - socket_uris[1], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME - ) + trade_gen_b = SpecificEquityTrades(*args_b, **kwargs_b) + monitor.add_to_topology(trade_gen_b.get_hash()) #Set up source c. Three minutes between events. args_c = tuple() @@ -136,18 +121,41 @@ class ComponentTestCase(TestCase): 'count' : count } + trade_gen_c = SpecificEquityTrades(*args_c, **kwargs_c) + monitor.add_to_topology(trade_gen_c.get_hash()) + + launch_monitor(monitor) + + comp_a = Component( + trade_gen_a, + monitor, + socket_uris[0], + DATASOURCE_FRAME, + DATASOURCE_UNFRAME + ) + + comp_b = Component( + trade_gen_b, + monitor, + socket_uris[1], + DATASOURCE_FRAME, + DATASOURCE_UNFRAME + ) + comp_c = Component( - SpecificEquityTrades(*args_c, **kwargs_c), + trade_gen_c, monitor, socket_uris[2], DATASOURCE_FRAME, DATASOURCE_UNFRAME ) - launch_monitor(monitor) sources = [comp_a, comp_b, comp_c] - gens = [iter(source) for source in sources] - sorted_out = date_sorted_sources(gens) + + sorted_out = date_sorted_sources(*sources) + + import time + time.sleep(.25) prev = None sort_count = 0 diff --git a/zipline/core/component.py b/zipline/core/component.py index 4cb4c0d0..78ee69c8 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -17,8 +17,6 @@ from collections import namedtuple # pyzmq import zmq -from zipline.gens.zmqgen import gen_from_poller - from zipline.core.monitor import PARAMETERS from zipline.protocol import ( @@ -36,6 +34,10 @@ class KillSignal(Exception): def __init__(self): pass +class ShutdownSignal(Exception): + def __init__(self): + pass + ComponentSocketArgs = namedtuple('ComponentSocketArgs',['uri','style','bind']) class Component(object): @@ -83,25 +85,29 @@ class Component(object): self.unframe = unframe self.prefix = "" - # register two components with the monitor - monitor.add_to_topology(self.component_id) - monitor.add_to_topology("FORK-"+self.component_id) - # TODO: state_flag is deprecated, remove self.state_flag = COMPONENT_STATE.OK # track time of last ping we received from monitor - self.last_ping = None + self.last_ping = time.time() # Humanhashes make this way easier to debug because they stick # in your mind unlike a 32 byte string of random hex. self.guid = uuid.uuid4() self.huid = humanhash.humanize(self.guid.hex) + # first, start the generator in its own process. Once + # Monitor says "go", Events from the generator will be + # FRAME'd and PUSH'd to self.socket_uri. + proc = multiprocessing.Process( + target=self.loop_send + ) + proc.start() + # ------------ - # Generator + # Message Receiver/Generator # ------------ - self.gen = None + self.recv_gen = self.create_recv_gen() # ------------ @@ -109,94 +115,108 @@ class Component(object): # ------------ - def _run_out(self): + def loop_send(self): """ The main component loop. This is wrapped inside a exception reporting context inside of run. The core logic of the all components is run here. """ - # The process title so you can watch it in top, ps. - setproctitle(self.generator.__class__.__name__) - self.prefix = "FORK-" - - log.info("Start %r" % self) - log.info("Pid %s" % os.getpid()) - log.info("Group %s" % os.getpgrp()) - - self.open() - - self.signal_ready() - self.lock_ready() - self.wait_ready() - - # ----------------------- - # YOU SHALL NOT PASS!!!!! - # ----------------------- - # ... until the monitor signals GO - - for event in self.generator: - self.heartbeat() - event.source_id = self.get_id - msg = self.frame(event) - self.out_socket.send(msg) - - self.signal_done() - - def _run_in(self): - self.open(send=False) - self.signal_ready() - self.lock_ready() - self.wait_ready() - # ----------------------- - # YOU SHALL NOT PASS!!!!! - # ----------------------- - # ... until the monitor signals GO - - # return the generator - for event in gen_from_poller(self.poll, self.in_socket, self.unframe): - event.source_id = self.get_id - yield event - - self.signal_done() - - def run_safe(self, func): - """ - Run a function that is assumed to include wait_ready and - heartbeat. Used to wrap fork_generator and consume_gen. - """ try: - return func() + # The process title so you can watch it in top, ps. + setproctitle(self.generator.__class__.__name__) + self.prefix = "FORK-" + + log.info("Start %r" % self) + log.info("Pid %s" % os.getpid()) + log.info("Group %s" % os.getpgrp()) + + self.open() + + self.signal_ready() + self.lock_ready() + self.wait_ready() + + # ----------------------- + # YOU SHALL NOT PASS!!!!! + # ----------------------- + # ... until the monitor signals GO + + for event in self.generator: + self.heartbeat() + msg = self.frame(event) + self.out_socket.send(msg) + + self.signal_done() + + # keep heartbeating until we receive the shutdown + # message from the Monitor (raises a + # ShutdownSignal), or we don't hear from the Monitor + # for MAX_COMPONENT_WAIT. + while True: + self.heartbeat(timeout=1000) + except Exception as exc: - if not isinstance(exc, KillSignal): - self.signal_exception(exc) - else: - # if we get a kill signal, forcibly close all the - # sockets. - self.teardown_sockets() + self.handle_exception(exc) finally: log.info("Exiting %r" % self) - def _launch(self): - # first, start the generator in its own process. Once - # Monitor says "go", Events from the generator will be - # FRAME'd and PUSH'd to self.socket_uri. - proc = multiprocessing.Process( - target=self.run_safe, - args=(self._run_out,) - ) - proc.start() + def create_recv_gen(self): + try: + self.open(send=False) + self.signal_ready() + self.lock_ready() + # return the generator + return self.loop_recv() + except Exception as exc: + self.handle_exception(exc) + finally: + log.info("Created Recv Gen for %r" % self) - # Start the poller-generator, which will PULL messages - # from self.sockiet_uri, UNFRAME'd them, and yield them. - return self.run_safe(self._run_in) + def loop_recv(self): + try: + # we block on ready here until monitor sends the GO + self.wait_ready() + for event in self.gen_from_poller(self.poll, self.in_socket, self.unframe): + yield event + + self.signal_done() + except Exception as exc: + self.handle_exception(exc) + finally: + log.info("Exiting %r" % self) + + def gen_from_poller(self, poller, in_socket, unframe): + + while True: + socks = dict(poller.poll(0)) + self.heartbeat() + if socks.get(in_socket) == zmq.POLLIN: + message = in_socket.recv() + if message == str(CONTROL_PROTOCOL.DONE): + break + else: + event = unframe(message) + yield event + + def handle_exception(self, exc, re_raise=False): + if isinstance(exc, KillSignal): + # if we get a kill signal, forcibly close all the + # sockets. + self.teardown_sockets() + elif isinstance(exc, ShutdownSignal): + # signal from monitor of an orderly shutdown, + # do nothing. + pass + else: + self.signal_exception(exc) def __iter__(self): - if not self.gen: - self.gen = self._launch() + return self - return self.gen + def next(self): + return self.recv_gen.next() # ---------------------------- # Cleanup & Modes of Failure @@ -212,6 +232,12 @@ class Component(object): for sock in self.sockets: sock.close() + def shutdown(self): + """ + Clean shutdown. + """ + raise ShutdownSignal() + def kill(self): """ Unclean shutdown. @@ -335,7 +361,7 @@ class Component(object): # Side effectful call from the monitor to unlock # and begin doing work only when the entire topology # of the system beings to come online - log.info('Unlocking ' + self.__class__.__name__) + log.info('Unlocking ' + self.get_id) self.unlock_ready() # ========= @@ -346,7 +372,7 @@ class Component(object): # data that are done during a clean shutdown. Inform the # monitor that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() + self.shutdown() break # ========= @@ -360,7 +386,7 @@ class Component(object): elif time.time() - start_wait > PARAMETERS.MAX_COMPONENT_WAIT: log.info('No go signal from monitor, %s exiting' \ - % self.__class__.__name__) + % self.get_id) self.kill() break @@ -409,7 +435,7 @@ class Component(object): # data that are done during a clean shutdown. Inform the # monitor that we're done. elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() + self.shutdown() # ========= # Hard Kill @@ -421,7 +447,7 @@ class Component(object): # In case we didn't receive a ping, send a pre-emptive # pong to the monitor. - elif self.last_ping and time.time() - self.last_ping > 1: + elif time.time() - self.last_ping > 2: # send a ping ahead of schedule pre_pong = time.time() heartbeat_frame = CONTROL_FRAME( @@ -434,15 +460,14 @@ class Component(object): # doing work self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK) self.last_ping = pre_pong - elif self.last_ping and \ - time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: + elif time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT: # monitor is gone without sending the shutdown # signal, do a hard exit. self.kill() def signal_ready(self): - log.info(self.__class__.__name__ + ' is ready') + log.info(self.get_id + ' is ready') frame = CONTROL_FRAME( CONTROL_PROTOCOL.READY, '' @@ -472,13 +497,6 @@ class Component(object): self.control_out.send(done_frame) log.info("[%s] sent control done" % self.get_id) - # there is a narrow race condition where we finish just - # after the Monitor accepts our prior heartbeat, but just - # before the next one is sent. So, we hang around for one - # last heartbeat, and wait an unusually long time. - # TODO: decided if this is really necessary. - # self.heartbeat(timeout=5000) - # ----------- # Messaging # ----------- @@ -531,7 +549,6 @@ class Component(object): def bind_push_socket(self, addr): push_socket = self.context.socket(self.zmq.PUSH) push_socket.bind(addr) - self.out_socket = push_socket self.sockets.append(push_socket) return push_socket @@ -549,7 +566,6 @@ class Component(object): pull_socket = self.context.socket(self.zmq.PULL) pull_socket.bind(addr) self.poll.register(pull_socket, self.zmq.POLLIN) - self.sockets.append(pull_socket) return pull_socket @@ -558,26 +574,9 @@ class Component(object): push_socket = self.context.socket(self.zmq.PUSH) push_socket.connect(addr) self.sockets.append(push_socket) - self.out_socket = push_socket return push_socket - def bind_pub_socket(self, addr): - pub_socket = self.context.socket(self.zmq.PUB) - pub_socket.bind(addr) - self.out_socket = pub_socket - - return pub_socket - - def connect_sub_socket(self, addr): - sub_socket = self.context.socket(self.zmq.SUB) - sub_socket.connect(addr) - sub_socket.setsockopt(self.zmq.SUBSCRIBE,'') - self.sockets.append(sub_socket) - - self.poll.register(sub_socket, self.zmq.POLLIN) - - return sub_socket def setup_control(self): """ diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 83b022c5..7a920742 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -120,7 +120,7 @@ class Monitor(object): return def add_to_topology(self, component_id): - add = set([component_id]) + add = set([component_id, "FORK-" + component_id]) self.topology.update(add) def freeze_topology(self): @@ -340,6 +340,11 @@ class Monitor(object): log.info("breaking out of initial heartbeat") break + # Break out if the entire topology told us its DONE + if len(self.finished) == len(self.topology): + break + + # ================ # Heartbeat Stats # ================ @@ -432,27 +437,22 @@ class Monitor(object): bad = self.tracked - good - self.finished new = self.responses - good - self.finished - missing = self.topology - self.tracked - self.finished - for component in new: self.new(component) - if self.debug: - log.info('New component %r' % component) - for component in bad: self.timed_out(component) - for component in missing: + missing = self.topology - self.tracked - self.finished + for component in missing: if self.debug: log.info('Missing component %r' % component) - if self.debug: - for component in self.tracked: - if component not in self.topology: - log.info('Uninvited component %r' % component) + for component in self.tracked: + if component not in self.topology: + log.info('Uninvited component %r' % component) # -------------- # Init Handlers diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index d7968ed3..bacd07bd 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -43,7 +43,7 @@ def roundrobin(sources, namestrings): """ assert len(sources) == len(namestrings) mapping = OrderedDict(zip(namestrings, sources)) - + # While our generators have not been exhausted, pull elements while mapping.keys() != []: for namestring, source in mapping.iteritems(): diff --git a/zipline/gens/zmqgen.py b/zipline/gens/zmqgen.py index f9d5f919..66dbdca3 100644 --- a/zipline/gens/zmqgen.py +++ b/zipline/gens/zmqgen.py @@ -17,16 +17,3 @@ def gen_from_pull_socket(socket_uri, context, unframe): # this generator needs to know about the source_ids coming in via # the poller, and need to yield DONE messages for each # source_id. - -def gen_from_poller(poller, in_socket, unframe): - - while True: - socks = dict(poller.poll(1000)) - - if socks.get(in_socket) == zmq.POLLIN: - message = in_socket.recv() - if message == str(zp.CONTROL_PROTOCOL.DONE): - break - else: - event = unframe(message) - yield event