From 03b17bbfb51047901e64d18ac3726b45e1037af9 Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 19 Jul 2012 20:17:44 -0400 Subject: [PATCH 1/5] added a condition to break out of the heartbeat on the initial loop, once all components report themselves once. --- zipline/core/monitor.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 11b91fa8..1767958d 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -289,6 +289,8 @@ class Controller(object): # Hearbeat Cycle # ============== + initializing = len(self.tracked) == 0 and len(self.finished) == 0 + # Wait the responses while self.alive: @@ -318,6 +320,13 @@ class Controller(object): log.info(repr(self.responses)) break + # if this is the first time heartbeating, break + # out early if we get everything tracked no need + # to hold out for the full heartbeat. + if initializing and len(self.responses) == len(self.topology): + log.info("breaking out of initial heartbeat") + break + # ================ # Heartbeat Stats # ================ From 5dd35a47098d9b311e761cdbc190335a715461b4 Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 19 Jul 2012 23:31:14 -0400 Subject: [PATCH 2/5] added default positions for portfolio object. --- zipline/components/tradesimulation.py | 10 ++++++++-- zipline/finance/performance.py | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 6c761114..a110087c 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -64,6 +64,11 @@ class TradeSimulationClient(Component): # self.algorithm.initialize() self.algorithm.initialize() + # we need to provide the performance tracker with the + # sids referenced in the algorithm, so portfolio can + # initialize with all possible sids. + + self.perf.set_sids(self.algorithm.get_sid_filter()) def open(self): self.result_feed = self.connect_result() @@ -175,11 +180,12 @@ class TradeSimulationClient(Component): - Set the current portfolio for the algorithm as per protocol. - Construct data based on backlog of events, send to algorithm. """ - current_portfolio = self.perf.get_portfolio() - self.algorithm.set_portfolio(current_portfolio) + # current_portfolio = self.perf.get_portfolio() + # self.algorithm.set_portfolio(current_portfolio) data = self.get_data() if len(data) > 0: + data.portfolio = self.perf.get_portfolio() # data injection pipeline for log rerouting # any fields injected here should be added to diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 7844b706..fdca878d 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -198,6 +198,10 @@ class PerformanceTracker(object): keep_transactions = True ) + def set_sids(self, sid_list): + for sid in sid_list: + self.cumulative_performance.positions[sid] = Position(sid) + def get_portfolio(self): return self.cumulative_performance.as_portfolio() From f364cde3d21d7d213eaa3482df0330aecf38de91 Mon Sep 17 00:00:00 2001 From: fawce Date: Sat, 21 Jul 2012 09:52:06 -0400 Subject: [PATCH 3/5] added a class to throw exceptions from the algorithm --- tests/test_monitor.py | 1 - zipline/components/tradesimulation.py | 2 -- zipline/test_algorithms.py | 42 +++++++++++++++++++++++++-- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 5da84b2a..8b670356 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -4,7 +4,6 @@ from unittest2 import TestCase, skip from zipline.core.monitor import Controller - class TestMonitor(TestCase): def setUp(self): self.log_handler = LoggingHandler() diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index a110087c..b1a4ff53 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -180,8 +180,6 @@ class TradeSimulationClient(Component): - Set the current portfolio for the algorithm as per protocol. - Construct data based on backlog of events, send to algorithm. """ - # current_portfolio = self.perf.get_portfolio() - # self.algorithm.set_portfolio(current_portfolio) data = self.get_data() if len(data) > 0: diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index 7d33bf13..56788c49 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -138,17 +138,55 @@ class NoopAlgorithm(object): def get_sid_filter(self): return None +class ExceptionAlgorithm(object): + """ + Dolce fa niente. + """ + + def __init__(self, throw_from): + self.throw_from == throw_from + + def initialize(self): + if self.throw_from == "initialize": + raise Exception("Algo exception in initialize") + else: + pass + + def set_order(self, order_callable): + if self.throw_from == "set_order": + raise Exception("Algo exception in set_order") + else: + pass + + def set_portfolio(self, portfolio): + if self.throw_from == "set_portfolio": + raise Exception("Algo exception in set_portfolio") + else: + pass + + def handle_data(self, data): + if self.throw_from == "handle_data": + raise Exception("Algo exception in handle_data") + else: + pass + + def get_sid_filter(self): + if self.throw_from == "get_sid_filter": + raise Exception("Algo exception in get_sid_filter") + else: + return None + class TestPrintAlgorithm(): def __init__(self): pass - + def initialize(self): print "Initializing..." def set_order(self, order_callable): pass - + def set_portfolio(self, portfolio): pass From fc1882daaaeebeb966ac7edc98a21c2121fd4795 Mon Sep 17 00:00:00 2001 From: fawce Date: Sun, 22 Jul 2012 07:33:42 -0400 Subject: [PATCH 4/5] enabled logging with a logbook zmq logger from within the algorithm. --- tests/test_finance.py | 2 +- zipline/components/tradesimulation.py | 54 +++++++++++++-------------- zipline/core/monitor.py | 9 ++--- zipline/test_algorithms.py | 45 ++++++++++++++++++++-- 4 files changed, 73 insertions(+), 37 deletions(-) diff --git a/tests/test_finance.py b/tests/test_finance.py index b9783471..f841e37f 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -150,7 +150,7 @@ class FinanceTestCase(TestCase): # TODO: for some reason the orders aren't filled without an extra # trade. - trade_count = 5 + trade_count = 5000 self.zipline_test_config['order_count'] = trade_count - 1 self.zipline_test_config['trade_count'] = trade_count self.zipline_test_config['order_amount'] = 1 diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index b1a4ff53..0a711c4b 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -44,36 +44,26 @@ class TradeSimulationClient(Component): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) def set_algorithm(self, algorithm): - """ :param algorithm: must implement the algorithm protocol. See :py:mod:`zipline.test.algorithm` """ self.algorithm = algorithm - # register the trading_client's order method with the algorithm self.algorithm.set_order(self.order) - - #TODO: re-enable initialization logging. This means we can't call set_algorithm - #until we have a context for this component. Possibly this could happen - # ask the algorithm to initialize, routing stdout to a zmq PUSH socket. - - #with self.zmq_out.threadbound(), self.stdout_capture(self.logger, 'Algo print capture'): - # self.algorithm.initialize() - #if we don't have a log socket, initialize anyway. - #else: - # self.algorithm.initialize() - - self.algorithm.initialize() # we need to provide the performance tracker with the # sids referenced in the algorithm, so portfolio can # initialize with all possible sids. - self.perf.set_sids(self.algorithm.get_sid_filter()) + + # self.algorithm.initialize() + + def open(self): self.result_feed = self.connect_result() if not self.results_socket: log.warn(" No results socket, will not broadcast sim data.") + self.algorithm.set_logger(log) else: sock = self.context.socket(zmq.PUSH) sock.connect(self.results_socket) @@ -81,12 +71,14 @@ class TradeSimulationClient(Component): self.sockets.append(sock) self.out_socket = sock - self.setup_logging(sock) self.perf.publish_to(sock) + # register the trading_client's order method with the algorithm + self.algorithm.set_logger(self.algo_log) + + self.run_logged_op(self.algorithm.initialize) - #Initialize log capture for testing purposes. def setup_logging(self, socket = None): sock = socket or self.results_socket @@ -95,6 +87,8 @@ class TradeSimulationClient(Component): ) self.logger = Logger("Print") + self.algo_log = Logger("AlgoLog") + # N.B. that this is a class, which is instantiated later # in run_algorithm. The class provides a generator. self.stdout_capture = stdout_only_pipe @@ -188,21 +182,25 @@ class TradeSimulationClient(Component): # data injection pipeline for log rerouting # any fields injected here should be added to # LOG_EXTRA_FIELDS in zipline/protocol.py - if self.zmq_out: + self.run_logged_op(self.algorithm.handle_data, data) - def inject_event_data(record): + def run_logged_op(self, callable_op, *args, **kwargs): + """ Wrap a callable operation with the zmq logbook + handler if it exits.""" + if self.zmq_out: - #Record the simulation time. + def inject_event_data(record): + # Record the simulation time. + record.extra['algo_dt'] = self.current_dt - record.extra['algo_dt'] = self.current_dt - - data_injector = Processor(inject_event_data) - log_pipeline = NestedSetup([self.zmq_out,data_injector]) - with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): - self.algorithm.handle_data(data) + data_injector = Processor(inject_event_data) + log_pipeline = NestedSetup([self.zmq_out,data_injector]) + with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): + callable_op(*args, **kwargs) # if no log socket, just run the algo normally - else: - self.algorithm.handle_data(data) + else: + callable_op(*args, **kwargs) + #Testing utility for log capture. # TODO: remove test code from here. diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 1767958d..efcc6314 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -99,7 +99,6 @@ class Controller(object): log.warn("Running Controller in development mode, will ONLY synchronize start.") def init_zmq(self, flavor): - assert self.zmq_flavor in ['thread', 'mp', 'green'] if flavor == 'mp': @@ -131,7 +130,6 @@ class Controller(object): Give the controller a set set of components to manage and a set of state transitions for the entire system. """ - # A freeform topology is where we heartbeat with anything # that shows up. if topology == 'freeform': @@ -323,9 +321,10 @@ class Controller(object): # if this is the first time heartbeating, break # out early if we get everything tracked no need # to hold out for the full heartbeat. - if initializing and len(self.responses) == len(self.topology): - log.info("breaking out of initial heartbeat") - break + if initializing and not self.freeform: + if len(self.responses) == len(self.topology): + log.info("breaking out of initial heartbeat") + break # ================ # Heartbeat Stats diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index 56788c49..aa07fa54 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -70,6 +70,9 @@ class TestAlgorithm(): def set_order(self, order_callable): self.order = order_callable + def set_logger(self, logger): + pass + def set_portfolio(self, portfolio): self.portfolio = portfolio @@ -106,6 +109,9 @@ class HeavyBuyAlgorithm(): def set_order(self, order_callable): self.order = order_callable + def set_logger(self, logger): + pass + def set_portfolio(self, portfolio): self.portfolio = portfolio @@ -129,6 +135,9 @@ class NoopAlgorithm(object): def set_order(self, order_callable): pass + def set_logger(self, logger): + pass + def set_portfolio(self, portfolio): pass @@ -140,7 +149,8 @@ class NoopAlgorithm(object): class ExceptionAlgorithm(object): """ - Dolce fa niente. + Throw an exception from the method name specified in the + constructor. """ def __init__(self, throw_from): @@ -158,6 +168,9 @@ class ExceptionAlgorithm(object): else: pass + def set_logger(self, logger): + pass + def set_portfolio(self, portfolio): if self.throw_from == "set_portfolio": raise Exception("Algo exception in set_portfolio") @@ -174,7 +187,7 @@ class ExceptionAlgorithm(object): if self.throw_from == "get_sid_filter": raise Exception("Algo exception in get_sid_filter") else: - return None + return [1] class TestPrintAlgorithm(): @@ -187,6 +200,9 @@ class TestPrintAlgorithm(): def set_order(self, order_callable): pass + def set_logger(self, logger): + pass + def set_portfolio(self, portfolio): pass @@ -195,4 +211,27 @@ class TestPrintAlgorithm(): pass def get_sid_filter(self): - return None + return [1] + +class TestLoggingAlgorithm(): + + def __init__(self): + self.log = None + + def initialize(self): + self.log.info("Initializing...") + + def set_order(self, order_callable): + pass + + def set_logger(self, logger): + self.log = logger + + def set_portfolio(self, portfolio): + pass + + def handle_data(self, data): + self.log.info("Handling Data...") + + def get_sid_filter(self): + return [1] From dfce16278b6bce1886d4a37dbcff5584081a3ccb Mon Sep 17 00:00:00 2001 From: fawce Date: Mon, 23 Jul 2012 16:43:08 -0400 Subject: [PATCH 5/5] handoff to @ssanderson --- zipline/components/tradesimulation.py | 32 +++++++++++++++------------ 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 0a711c4b..12b4c8d7 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -49,22 +49,19 @@ class TradeSimulationClient(Component): :py:mod:`zipline.test.algorithm` """ self.algorithm = algorithm + # register the client's order method with the algorithm self.algorithm.set_order(self.order) # we need to provide the performance tracker with the # sids referenced in the algorithm, so portfolio can # initialize with all possible sids. self.perf.set_sids(self.algorithm.get_sid_filter()) - - # self.algorithm.initialize() - + # N.B. Initialize is now called from open, because we + # need to have a socket open for logging. def open(self): self.result_feed = self.connect_result() - if not self.results_socket: - log.warn(" No results socket, will not broadcast sim data.") - self.algorithm.set_logger(log) - else: + if self.results_socket: sock = self.context.socket(zmq.PUSH) sock.connect(self.results_socket) self.results_socket = sock @@ -73,9 +70,15 @@ class TradeSimulationClient(Component): self.setup_logging(sock) self.perf.publish_to(sock) + self.initialize_algo() - # register the trading_client's order method with the algorithm - self.algorithm.set_logger(self.algo_log) + def initialize_algo(self): + """ Setup loggers for algorithm and run algorithm's own + initialize method. + """ + self.logger = Logger("Print") + self.algo_log = Logger("AlgoLog") + self.algorithm.set_logger(self.algo_log) self.run_logged_op(self.algorithm.initialize) @@ -86,10 +89,8 @@ class TradeSimulationClient(Component): socket = sock, ) - self.logger = Logger("Print") - self.algo_log = Logger("AlgoLog") - # N.B. that this is a class, which is instantiated later + # This is a class, which is instantiated later # in run_algorithm. The class provides a generator. self.stdout_capture = stdout_only_pipe @@ -205,9 +206,13 @@ class TradeSimulationClient(Component): #Testing utility for log capture. # TODO: remove test code from here. def test_run_algorithm(self): + # since open is never called from some tests we need to + # set the logger explicitly + self.algorithm.set_logger(self.algo_log) def inject_event_data(record): - record.extra['algo_dt'] = datetime.datetime.utcnow() #Mock an event.dt + # Mock an event.dt + record.extra['algo_dt'] = datetime.datetime.utcnow() data_injector = Processor(inject_event_data) log_pipeline = NestedSetup([self.zmq_out, @@ -215,7 +220,6 @@ class TradeSimulationClient(Component): data_injector]) with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): self.algorithm.handle_data('data') - # if no log socket, just run the algo normally def connect_order(self): return self.connect_push_socket(self.addresses['order_address'])