diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index f278ef34..53d24732 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -52,18 +52,14 @@ class ExceptionTestCase(TestCase): **self.zipline_test_config ) output, _ = drain_zipline(self, zipline) - self.assertEqual(len(output), 1) + self.assertEqual(len(output), 2) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] self.assertTrue(payload['date']) del payload['date'] check(self, payload, INITIALIZE_TB) - self.assertTrue(zipline.sim.ready()) - self.assertFalse(zipline.sim.exception) - def test_exception_in_handle_data(self): - # Simulation # ---------- self.zipline_test_config['algorithm'] = \ @@ -77,15 +73,12 @@ class ExceptionTestCase(TestCase): ) output, _ = drain_zipline(self, zipline) - - self.assertEqual(len(output), 1) + self.assertEqual(len(output), 3) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] self.assertTrue(payload['date']) del payload['date'] check(self, payload, HANDLE_DATA_TB) - self.assertTrue(zipline.sim.ready()) - self.assertFalse(zipline.sim.exception) def test_zerodivision_exception_in_handle_data(self): @@ -101,14 +94,12 @@ class ExceptionTestCase(TestCase): ) output, _ = drain_zipline(self, zipline) - self.assertEqual(len(output), 5) + self.assertEqual(len(output), 6) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] self.assertTrue(payload['date']) del payload['date'] check(self, payload, ZERO_DIV_TB) - self.assertTrue(zipline.sim.ready()) - self.assertFalse(zipline.sim.exception) # TODO: # - define more zipline failure modes: exception in other @@ -120,21 +111,12 @@ class ExceptionTestCase(TestCase): INITIALIZE_TB =\ {'message': 'Algo exception in initialize', 'name': 'Exception', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.initialize_algo()', - 'lineno': 91, - 'method': 'do_work'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.do_op(self.algorithm.initialize)', - 'lineno': 74, - 'method': 'initialize_algo'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'callable_op(*args, **kwargs)', - 'lineno': 194, - 'method': 'do_op'}, + 'stack': [{'filename': '/zipline/lines.py', 'line': 'for event in self.gen:', 'lineno': 152, 'method': 'stream_results'}, + {'filename': '/zipline/gens/tradesimulation.py', 'line': 'self.algo,', 'lineno': 93, 'method': 'simulate'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.algo.initialize()', + 'lineno': 123, + 'method': '__init__'}, {'filename': '/zipline/test_algorithms.py', 'line': 'raise Exception("Algo exception in initialize")', 'lineno': 166, @@ -143,25 +125,27 @@ INITIALIZE_TB =\ HANDLE_DATA_TB =\ {'message': 'Algo exception in handle_data', 'name': 'Exception', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.process_event(event)', - 'lineno': 110, - 'method': 'do_work'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.run_algorithm()', - 'lineno': 158, - 'method': 'process_event'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.do_op(self.algorithm.handle_data, data)', - 'lineno': 180, - 'method': 'run_algorithm'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'callable_op(*args, **kwargs)', - 'lineno': 194, - 'method': 'do_op'}, + 'stack': [{'filename': '/zipline/lines.py', 'line': 'for event in self.gen:', 'lineno': 152, 'method': 'stream_results'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'for message in algo_results:', + 'lineno': 100, + 'method': 'simulate'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'return self.__generator.next()', + 'lineno': 144, + 'method': 'next'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.update_current_snapshot(event)', + 'lineno': 199, + 'method': '_gen'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.simulate_current_snapshot()', + 'lineno': 221, + 'method': 'update_current_snapshot'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.algo.handle_data(self.universe)', + 'lineno': 246, + 'method': 'simulate_current_snapshot'}, {'filename': '/zipline/test_algorithms.py', 'line': 'raise Exception("Algo exception in handle_data")', 'lineno': 187, @@ -170,23 +154,25 @@ HANDLE_DATA_TB =\ ZERO_DIV_TB= \ {'message': 'integer division or modulo by zero', 'name': 'ZeroDivisionError', - 'stack': [{'filename': '/zipline/core/component.py', 'line': 'self._run()', 'lineno': 210, 'method': 'run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.loop()', 'lineno': 201, 'method': '_run'}, - {'filename': '/zipline/core/component.py', 'line': 'self.do_work()', 'lineno': 241, 'method': 'loop'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.process_event(event)', - 'lineno': 110, - 'method': 'do_work'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.run_algorithm()', - 'lineno': 158, - 'method': 'process_event'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'self.do_op(self.algorithm.handle_data, data)', - 'lineno': 180, - 'method': 'run_algorithm'}, - {'filename': '/zipline/components/tradesimulation.py', - 'line': 'callable_op(*args, **kwargs)', - 'lineno': 194, - 'method': 'do_op'}, + 'stack': [{'filename': '/zipline/lines.py', 'line': 'for event in self.gen:', 'lineno': 152, 'method': 'stream_results'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'for message in algo_results:', + 'lineno': 100, + 'method': 'simulate'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'return self.__generator.next()', + 'lineno': 144, + 'method': 'next'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.update_current_snapshot(event)', + 'lineno': 199, + 'method': '_gen'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.simulate_current_snapshot()', + 'lineno': 221, + 'method': 'update_current_snapshot'}, + {'filename': '/zipline/gens/tradesimulation.py', + 'line': 'self.algo.handle_data(self.universe)', + 'lineno': 246, + 'method': 'simulate_current_snapshot'}, {'filename': '/zipline/test_algorithms.py', 'line': '5/0', 'lineno': 218, 'method': 'handle_data'}]} diff --git a/zipline/lines.py b/zipline/lines.py index 6b96d1ec..b4db5de6 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -99,10 +99,10 @@ class SimulatedTrading(object): self.date_sorted = date_sorted_sources(*sources) self.transforms = transforms - self.transforms.extend(StatefulTransform(Passthrough)) + self.transforms.append(StatefulTransform(Passthrough)) self.merged = merged_transforms(self.date_sorted, *self.transforms) self.trading_client = tsc(algorithm, environment, style) - self.gen = self.trading_client.simluate(self.merged) + self.gen = self.trading_client.simulate(self.merged) self.results_uri = results_socket_uri self.results_socket = None self.context = context @@ -111,6 +111,7 @@ class SimulatedTrading(object): # optional process if we fork simulate into an # independent process. self.proc = None + self.logger = Logger(sim_id) def simulate(self, blocking=True): @@ -122,7 +123,7 @@ class SimulatedTrading(object): return self.fork_and_sim() def fork_and_sim(self): - self.proc = multiprocessing.Process(self.run_gen) + self.proc = multiprocessing.Process(target=self.run_gen) self.proc.start() return self.proc @@ -133,15 +134,16 @@ class SimulatedTrading(object): def inject_event_data(record): # Record the simulation time. - record.extra['algo_dt'] = self.current_dt + #record.extra['algo_dt'] = self.current_dt + pass data_injector = Processor(inject_event_data) log_pipeline = NestedSetup([self.zmq_out,data_injector]) with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): - self.drain_gen() + self.stream_results() # if no log socket, just run the algo normally else: - self.drain_gen() + self.stream_results() def stream_results(self): assert self.results_socket, \ @@ -153,7 +155,8 @@ class SimulatedTrading(object): else: msg = zp.RISK_FRAME(event) self.results_socket.send(msg) - self.signal_done() + + self.signal_done() except Exception as exc: self.handle_exception(exc) finally: @@ -186,10 +189,8 @@ class SimulatedTrading(object): """ exc_type, exc_value, exc_traceback = sys.exc_info() - log.exception("Unexpected error in run for {id}.".format(id=self.sim_id)) - try: - log.info('{id} sending exception to monitor'\ + log.exception('{id} sending exception to result stream.'\ .format(id=self.sim_id)) msg = zp.EXCEPTION_FRAME( exc_traceback, @@ -197,11 +198,7 @@ class SimulatedTrading(object): exc_value.message ) - exception_frame = zp.CONTROL_FRAME( - zp.CONTROL_PROTOCOL.EXCEPTION, - msg - ) - self.results_socket.send(exception_frame) + self.results_socket.send(msg) except: log.exception("Exception while reporting simulation exception.") @@ -214,9 +211,6 @@ class SimulatedTrading(object): sock = self.context.socket(zmq.PUSH) sock.connect(self.results_uri) self.results_socket = sock - self.sockets.append(sock) - self.results_socket = sock - self.setup_logging() def setup_logging(self, socket = None): @@ -231,6 +225,10 @@ class SimulatedTrading(object): # in run_algorithm. The class provides a generator. self.stdout_capture = stdout_only_pipe + def join(self): + if self.proc: + self.proc.join() + @staticmethod def create_test_zipline(**config): """ @@ -333,8 +331,8 @@ class SimulatedTrading(object): test_algo, trading_environment, simulation_style, - zmq_context, results_socket_uri, + zmq_context, simulation_id) #------------------- diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index eda5a133..03442002 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -65,10 +65,10 @@ def drain_zipline(test, zipline): assert test.ctx, "method expects a valid zmq context" assert test.zipline_test_config, "method expects a valid test config" assert isinstance(test.zipline_test_config, dict) - assert test.zipline_test_config['results_socket'], \ + assert test.zipline_test_config['results_socket_uri'], \ "need to specify a socket address for logs/perf/risk" test.receiver = create_receiver( - test.zipline_test_config['results_socket'], + test.zipline_test_config['results_socket_uri'], test.ctx ) # Bind and connect are asynch, so allow time for bind before @@ -76,13 +76,12 @@ def drain_zipline(test, zipline): time.sleep(1) # start the simulation - zipline.simulate(blocking=False) + zipline.simulate(blocking=True) output, transaction_count = drain_receiver(test.receiver) # some processes will exit after the message stream is # finished. We block here to avoid collisions with subsequent # ziplines. - for process in zipline.sim.subprocesses: - process.join() + zipline.join() return output, transaction_count