diff --git a/tests/test_components.py b/tests/test_components.py index ff353186..bbd881e7 100644 --- a/tests/test_components.py +++ b/tests/test_components.py @@ -64,18 +64,19 @@ class ComponentTestCase(TestCase): 'count' : count } + trade_gen = SpecificEquityTrades(*args_a, **kwargs_a) + monitor.add_to_topology(trade_gen.get_hash()) + + launch_monitor(monitor) + comp_a = Component( - SpecificEquityTrades(*args_a, **kwargs_a), + trade_gen, monitor, socket_uri, DATASOURCE_FRAME, DATASOURCE_UNFRAME ) - launch_monitor(monitor) - iter_a = iter(comp_a) - ev = iter_a.next() - return for event in comp_a: log.info(event) @@ -96,16 +97,8 @@ class ComponentTestCase(TestCase): 'filter' : filter, 'count' : count } - - - comp_a = Component( - SpecificEquityTrades(*args_a, **kwargs_a), - monitor, - socket_uris[0], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME - ) - + trade_gen_a = SpecificEquityTrades(*args_a, **kwargs_a) + monitor.add_to_topology(trade_gen_a.get_hash()) #Set up source b. Two minutes between events. args_b = tuple() @@ -116,15 +109,8 @@ class ComponentTestCase(TestCase): 'filter' : filter, 'count' : count } - - - comp_b = Component( - SpecificEquityTrades(*args_b, **kwargs_b), - monitor, - socket_uris[1], - DATASOURCE_FRAME, - DATASOURCE_UNFRAME - ) + trade_gen_b = SpecificEquityTrades(*args_b, **kwargs_b) + monitor.add_to_topology(trade_gen_b.get_hash()) #Set up source c. Three minutes between events. args_c = tuple() @@ -136,18 +122,38 @@ class ComponentTestCase(TestCase): 'count' : count } + trade_gen_c = SpecificEquityTrades(*args_c, **kwargs_c) + monitor.add_to_topology(trade_gen_c.get_hash()) + + launch_monitor(monitor) + + comp_a = Component( + trade_gen_a, + monitor, + socket_uris[0], + DATASOURCE_FRAME, + DATASOURCE_UNFRAME + ) + + comp_b = Component( + trade_gen_b, + monitor, + socket_uris[1], + DATASOURCE_FRAME, + DATASOURCE_UNFRAME + ) + comp_c = Component( - SpecificEquityTrades(*args_c, **kwargs_c), + trade_gen_c, monitor, socket_uris[2], DATASOURCE_FRAME, DATASOURCE_UNFRAME ) - launch_monitor(monitor) sources = [comp_a, comp_b, comp_c] - gens = [iter(source) for source in sources] - sorted_out = date_sorted_sources(gens) + + sorted_out = date_sorted_sources(*sources) prev = None sort_count = 0 diff --git a/zipline/core/component.py b/zipline/core/component.py index 4cb4c0d0..82aebc88 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -83,10 +83,6 @@ class Component(object): self.unframe = unframe self.prefix = "" - # register two components with the monitor - monitor.add_to_topology(self.component_id) - monitor.add_to_topology("FORK-"+self.component_id) - # TODO: state_flag is deprecated, remove self.state_flag = COMPONENT_STATE.OK @@ -98,10 +94,19 @@ class Component(object): self.guid = uuid.uuid4() self.huid = humanhash.humanize(self.guid.hex) + # first, start the generator in its own process. Once + # Monitor says "go", Events from the generator will be + # FRAME'd and PUSH'd to self.socket_uri. + proc = multiprocessing.Process( + target=self.loop_send + ) + proc.start() + # ------------ - # Generator + # Message Receiver/Generator # ------------ - self.gen = None + self.recv_gen = self.create_recv_gen() + # ------------ @@ -109,94 +114,87 @@ class Component(object): # ------------ - def _run_out(self): + def loop_send(self): """ The main component loop. This is wrapped inside a exception reporting context inside of run. The core logic of the all components is run here. """ - # The process title so you can watch it in top, ps. - setproctitle(self.generator.__class__.__name__) - self.prefix = "FORK-" - - log.info("Start %r" % self) - log.info("Pid %s" % os.getpid()) - log.info("Group %s" % os.getpgrp()) - - self.open() - - self.signal_ready() - self.lock_ready() - self.wait_ready() - - # ----------------------- - # YOU SHALL NOT PASS!!!!! - # ----------------------- - # ... until the monitor signals GO - - for event in self.generator: - self.heartbeat() - event.source_id = self.get_id - msg = self.frame(event) - self.out_socket.send(msg) - - self.signal_done() - - def _run_in(self): - self.open(send=False) - self.signal_ready() - self.lock_ready() - self.wait_ready() - # ----------------------- - # YOU SHALL NOT PASS!!!!! - # ----------------------- - # ... until the monitor signals GO - - # return the generator - for event in gen_from_poller(self.poll, self.in_socket, self.unframe): - event.source_id = self.get_id - yield event - - self.signal_done() - - def run_safe(self, func): - """ - Run a function that is assumed to include wait_ready and - heartbeat. Used to wrap fork_generator and consume_gen. - """ try: - return func() + # The process title so you can watch it in top, ps. + setproctitle(self.generator.__class__.__name__) + self.prefix = "FORK-" + + log.info("Start %r" % self) + log.info("Pid %s" % os.getpid()) + log.info("Group %s" % os.getpgrp()) + + self.open() + + self.signal_ready() + self.lock_ready() + self.wait_ready() + + # ----------------------- + # YOU SHALL NOT PASS!!!!! + # ----------------------- + # ... until the monitor signals GO + + for event in self.generator: + self.heartbeat() + msg = self.frame(event) + self.out_socket.send(msg) + + self.signal_done() + except Exception as exc: - if not isinstance(exc, KillSignal): - self.signal_exception(exc) - else: - # if we get a kill signal, forcibly close all the - # sockets. - self.teardown_sockets() + self.handle_exception(exc) finally: log.info("Exiting %r" % self) - def _launch(self): - # first, start the generator in its own process. Once - # Monitor says "go", Events from the generator will be - # FRAME'd and PUSH'd to self.socket_uri. - proc = multiprocessing.Process( - target=self.run_safe, - args=(self._run_out,) - ) - proc.start() + def create_recv_gen(self): + try: + self.open(send=False) + self.signal_ready() + self.lock_ready() + # return the generator + return self.loop_recv() + except Exception as exc: + self.handle_exception(exc) + finally: + log.info("Created Recv Gen for %r" % self) - # Start the poller-generator, which will PULL messages - # from self.sockiet_uri, UNFRAME'd them, and yield them. - return self.run_safe(self._run_in) + def loop_recv(self): + try: + # we block on ready here until monitor sends the GO + self.wait_ready() + log.info("Starting to drain {id}".format(id=self.get_id)) + for event in gen_from_poller(self.poll, self.in_socket, self.unframe): + self.heartbeat() + # event.source_id = self.get_id + yield event + + self.signal_done() + except Exception as exc: + self.handle_exception(exc) + finally: + log.info("Exiting %r" % self) + + def handle_exception(self, exc, re_raise=False): + if not isinstance(exc, KillSignal): + self.signal_exception(exc) + else: + # if we get a kill signal, forcibly close all the + # sockets. + self.teardown_sockets() def __iter__(self): - if not self.gen: - self.gen = self._launch() + return self - return self.gen + def next(self): + return self.recv_gen.next() # ---------------------------- # Cleanup & Modes of Failure diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 83b022c5..e9999ea4 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -120,7 +120,7 @@ class Monitor(object): return def add_to_topology(self, component_id): - add = set([component_id]) + add = set([component_id, "FORK-" + component_id]) self.topology.update(add) def freeze_topology(self): diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index 4a95198f..d76966f5 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -43,8 +43,7 @@ def roundrobin(sources, namestrings): """ assert len(sources) == len(namestrings) mapping = OrderedDict(zip(namestrings, sources)) - - import nose.tools; nose.tools.set_trace() + # While our generators have not been exhausted, pull elements while mapping.keys() != []: for namestring, source in mapping.iteritems(): diff --git a/zipline/gens/zmqgen.py b/zipline/gens/zmqgen.py index f9d5f919..463f8b42 100644 --- a/zipline/gens/zmqgen.py +++ b/zipline/gens/zmqgen.py @@ -21,7 +21,7 @@ def gen_from_pull_socket(socket_uri, context, unframe): def gen_from_poller(poller, in_socket, unframe): while True: - socks = dict(poller.poll(1000)) + socks = dict(poller.poll()) if socks.get(in_socket) == zmq.POLLIN: message = in_socket.recv()