From f4f164dd9f68efc079f6960cd1ffeca9fc494833 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 20 Jun 2012 11:57:19 -0400 Subject: [PATCH 01/16] update to pass along the new log socket address --- zipline/lines.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zipline/lines.py b/zipline/lines.py index 070931c5..8aace760 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -99,7 +99,7 @@ class SimulatedTrading(object): :param config: a dict with the following required properties:: - algorithm: a class that follows the algorithm protocol. See - :py:meth:`zipline.finance.trading.TradingSimulationClient.add_algorithm + :py:meth:`zipline.finance.trading.TradeSimulationClient.add_algorithm for details. - trading_environment: an instance of :py:class:`zipline.trading.TradingEnvironment` @@ -127,7 +127,7 @@ class SimulatedTrading(object): 'feed_address' : sockets[2], 'merge_address' : sockets[3], 'result_address' : sockets[4], - 'order_address' : sockets[5] + 'order_address' : sockets[5] } self.con = Controller( @@ -150,7 +150,8 @@ class SimulatedTrading(object): self.clients = {} self.trading_client = TradeSimulationClient( self.trading_environment, - self.sim_style + self.sim_style, + config.log_socket ) self.add_client(self.trading_client) From 9c66f3ea7c34380973f41018c4f2fc239fb43463 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 20 Jun 2012 11:58:17 -0400 Subject: [PATCH 02/16] capturing stdout from user's algo and piping it to a zmq socket --- zipline/components/tradesimulation.py | 41 +++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 79312002..c8f4eb16 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -1,4 +1,5 @@ import logging +import logbook import datetime import zipline.protocol as zp @@ -7,12 +8,16 @@ import zipline.finance.performance as perf from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict +from zipline.utils.env_utils import stdout_only_pipe + +from logbook import Logger +from logbook import queues LOGGER = logging.getLogger('ZiplineLogger') class TradeSimulationClient(Component): - def init(self, trading_environment, sim_style): + def init(self, trading_environment, sim_style, log_socket): self.received_count = 0 self.prev_dt = None self.event_queue = None @@ -28,7 +33,18 @@ class TradeSimulationClient(Component): self.event_data = ndict() self.perf = perf.PerformanceTracker(self.trading_environment) + + self.log_socket = log_socket + #If we have a log socket,setup context managers for capturing user logs. + if log_socket: + log = Logger("User logs") + self.stdout_capture = stdout_only_pipe(log, 'user algo stdout') + + handler = queues.ZeroMQHandler() + handler.socket.connect(log_socket) + self.zmq_out = handler.threadbound() + @property def get_id(self): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) @@ -41,9 +57,16 @@ class TradeSimulationClient(Component): self.algorithm = algorithm # register the trading_client's order method with the algorithm self.algorithm.set_order(self.order) - # ask the algorithm to initialize - self.algorithm.initialize() + # ask the algorithm to initialize, routing stdout to a zmq PUB socket. + if self.log_socket: + with self.zmq_out, self.stdout_capture: + self.algorithm.initialize() + + # if we don't have a log socket, initialize anyway. + else: + self.algorithm.initialize() + def open(self): self.result_feed = self.connect_result() self.perf.open(self.context) @@ -67,7 +90,7 @@ class TradeSimulationClient(Component): # result_feed is a merge component, so unframe accordingly event = zp.MERGE_UNFRAME(msg) - self.received_count += 1 + self.received_count += 1 # update performance and relay the event to the algorithm self.process_event(event) if self.perf.exceeded_max_loss: @@ -136,8 +159,14 @@ class TradeSimulationClient(Component): self.algorithm.set_portfolio(current_portfolio) data = self.get_data() if len(data) > 0: - self.algorithm.handle_data(data) - + # try to run algo with log rerouting + if log_socket: + with self.zmq_out, self.stdout_capture: + self.algorithm.handle_data(data) + # if no log socket, just run the algo normally + else: + self.algorithm.handle_data(data) + def connect_order(self): return self.connect_push_socket(self.addresses['order_address']) From 8052ee6b526fb51929a30dde46fd275c27649cb8 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 21 Jun 2012 16:40:34 -0400 Subject: [PATCH 03/16] new handler class for log output --- zipline/components/tradesimulation.py | 30 +++++++++++++++++++-------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index c8f4eb16..5c3bec1a 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -8,10 +8,11 @@ import zipline.finance.performance as perf from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict -from zipline.utils.env_utils import stdout_only_pipe -from logbook import Logger -from logbook import queues +from qexec.executor.env_utils import stdout_only_pipe + +from logbook import Logger, NestedSetup, Processor, queues + LOGGER = logging.getLogger('ZiplineLogger') @@ -38,10 +39,10 @@ class TradeSimulationClient(Component): #If we have a log socket,setup context managers for capturing user logs. if log_socket: - log = Logger("User logs") + log = Logger("User Log") self.stdout_capture = stdout_only_pipe(log, 'user algo stdout') - - handler = queues.ZeroMQHandler() + + handler = queues.ZeroMQLogHandler() handler.socket.connect(log_socket) self.zmq_out = handler.threadbound() @@ -90,7 +91,7 @@ class TradeSimulationClient(Component): # result_feed is a merge component, so unframe accordingly event = zp.MERGE_UNFRAME(msg) - self.received_count += 1 + self.received_count += 1 # update performance and relay the event to the algorithm self.process_event(event) if self.perf.exceeded_max_loss: @@ -159,14 +160,25 @@ class TradeSimulationClient(Component): self.algorithm.set_portfolio(current_portfolio) data = self.get_data() if len(data) > 0: + + # data injection pipeline for log rerouting + def inject_event_data(record): + record.extra['algo_dt'] = event.dt + #record.extra['whatever else we want'] = event.whatever + + data_injector = Processor(inject_event_data) + log_pipeline = NestedSetup([self.zmq_out, + #e.g. FileHandler(...) + data_injector]) + # try to run algo with log rerouting if log_socket: - with self.zmq_out, self.stdout_capture: + with log_pipeline, self.stdout_capture: self.algorithm.handle_data(data) # if no log socket, just run the algo normally else: self.algorithm.handle_data(data) - + def connect_order(self): return self.connect_push_socket(self.addresses['order_address']) From a222e1f0d8ea3b542e977c01add28fe7e353cee7 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Fri, 22 Jun 2012 18:23:51 -0400 Subject: [PATCH 04/16] save --- zipline/components/tradesimulation.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 5c3bec1a..c8b585d3 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -9,7 +9,7 @@ from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict -from qexec.executor.env_utils import stdout_only_pipe +from qexec.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe from logbook import Logger, NestedSetup, Processor, queues @@ -39,11 +39,14 @@ class TradeSimulationClient(Component): #If we have a log socket,setup context managers for capturing user logs. if log_socket: + + #Temporarily putting this here to fix circular import bug. + + log = Logger("User Log") self.stdout_capture = stdout_only_pipe(log, 'user algo stdout') - handler = queues.ZeroMQLogHandler() - handler.socket.connect(log_socket) + handler = queues.ZeroMQLogHandler(uri = log_socket) self.zmq_out = handler.threadbound() @property @@ -162,6 +165,9 @@ class TradeSimulationClient(Component): if len(data) > 0: # data injection pipeline for log rerouting + # any fields injected here should be added to + # LOG_EXTRA_FIELDS in qexec/web/client_protocol + def inject_event_data(record): record.extra['algo_dt'] = event.dt #record.extra['whatever else we want'] = event.whatever From 4c40a3d55c054e773d2433566d895318f87628f8 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 28 Jun 2012 16:28:10 -0400 Subject: [PATCH 05/16] all manner of changes to support logging --- zipline/components/tradesimulation.py | 29 +++++++------- zipline/core/monitor.py | 24 ++++++------ zipline/lines.py | 14 ++++--- zipline/protocol.py | 55 +++++++++++++++++++++++++++ zipline/test_algorithms.py | 22 +++++++++++ 5 files changed, 111 insertions(+), 33 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index c8b585d3..72437d2f 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -37,13 +37,10 @@ class TradeSimulationClient(Component): self.log_socket = log_socket - #If we have a log socket,setup context managers for capturing user logs. + #If we have a log socket,setup context managers for capturing user prints. if log_socket: - #Temporarily putting this here to fix circular import bug. - - - log = Logger("User Log") + log = Logger("Print") self.stdout_capture = stdout_only_pipe(log, 'user algo stdout') handler = queues.ZeroMQLogHandler(uri = log_socket) @@ -62,7 +59,7 @@ class TradeSimulationClient(Component): # register the trading_client's order method with the algorithm self.algorithm.set_order(self.order) - # ask the algorithm to initialize, routing stdout to a zmq PUB socket. + # ask the algorithm to initialize, routing stdout to a zmq PUSH socket. if self.log_socket: with self.zmq_out, self.stdout_capture: self.algorithm.initialize() @@ -166,19 +163,19 @@ class TradeSimulationClient(Component): # data injection pipeline for log rerouting # any fields injected here should be added to - # LOG_EXTRA_FIELDS in qexec/web/client_protocol + # LOG_EXTRA_FIELDS in zipline/protocol.py - def inject_event_data(record): - record.extra['algo_dt'] = event.dt - #record.extra['whatever else we want'] = event.whatever - - data_injector = Processor(inject_event_data) - log_pipeline = NestedSetup([self.zmq_out, - #e.g. FileHandler(...) - data_injector]) # try to run algo with log rerouting - if log_socket: + if self.log_socket: + def inject_event_data(record): + record.extra['algo_dt'] = event.dt + #record.extra['whatever else we want'] = event.whatever + + data_injector = Processor(inject_event_data) + log_pipeline = NestedSetup([self.zmq_out, + #e.g. FileHandler(...) + data_injector]) with log_pipeline, self.stdout_capture: self.algorithm.handle_data(data) # if no log socket, just run the algo normally diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 5b2384f4..a42508c9 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -309,7 +309,7 @@ class Controller(object): assert self.route_socket assert self.pub_socket - assert self.cancel_socket + #assert self.cancel_socket # -- Publish -- # ============= @@ -318,9 +318,9 @@ class Controller(object): # -- Cancel -- # ============= - assert isinstance(self.cancel_socket,basestring), self.cancel_socket - self.cancel = self.context.socket(self.zmq.REP) - self.cancel.connect(self.cancel_socket) + #assert isinstance(self.cancel_socket,basestring), self.cancel_socket + #self.cancel = self.context.socket(self.zmq.REP) + #self.cancel.connect(self.cancel_socket) # -- Router -- # ============= @@ -330,9 +330,9 @@ class Controller(object): poller = self.zmq.Poller() poller.register(self.router, self.zmq.POLLIN) - poller.register(self.cancel, self.zmq.POLLIN) + #poller.register(self.cancel, self.zmq.POLLIN) - self.associated += [self.pub, self.router, self.cancel] + self.associated += [self.pub, self.router]# self.cancel] # TODO: actually do this self.state = CONTROL_STATES.SOURCES_READY @@ -369,12 +369,12 @@ class Controller(object): self.logging.error('Invalid frame', rawmessage) pass - if socks.get(self.cancel) == self.zmq.POLLIN: - self.logging.info('[Controller] Received Cancellation') - rawmessage = self.cancel.recv() - self.cancel.send('') - self.shutdown(soft=True) - break + #if socks.get(self.cancel) == self.zmq.POLLIN: + # self.logging.info('[Controller] Received Cancellation') + # rawmessage = self.cancel.recv() + # self.cancel.send('') + # self.shutdown(soft=True) + # break self.beat() diff --git a/zipline/lines.py b/zipline/lines.py index 8aace760..4213fb78 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -135,9 +135,7 @@ class SimulatedTrading(object): sockets[7], logger = LOGGER ) - - self.con.cancel_socket = self.allocator.lease(1)[0] - + # TODO: Not freeform self.con.manage( 'freeform' @@ -151,7 +149,7 @@ class SimulatedTrading(object): self.trading_client = TradeSimulationClient( self.trading_environment, self.sim_style, - config.log_socket + config['log_socket'] ) self.add_client(self.trading_client) @@ -258,6 +256,11 @@ class SimulatedTrading(object): order_amount, order_count ) + + if config.has_key('log_socket'): + log_socket = config['log_socket'] + else: + log_socket = None #------------------- # Simulation #------------------- @@ -266,7 +269,8 @@ class SimulatedTrading(object): 'trading_environment' : trading_environment, 'allocator' : allocator, 'simulator_class' : simulator_class, - 'simulation_style' : simulation_style + 'simulation_style' : simulation_style, + 'log_socket' : log_socket }) #------------------- diff --git a/zipline/protocol.py b/zipline/protocol.py index 7081291f..fb27b2d2 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -601,3 +601,58 @@ SIMULATION_STYLE = Enum( 'FIXED_SLIPPAGE', 'NOOP' ) + +#Global variables for the fields we extract out of a standard logbook record. +LOG_FIELDS = set(['func_name', 'lineno', 'time', 'msg',\ + 'level', 'channel', ]) +LOG_EXTRA_FIELDS = set(['algo_dt',]) + +def LOG_FRAME(payload): + """ + Expects a dictionary of the form: + { + 'algo_dt' : 1199223000, #Algo simulation date. + 'time' : 1199223001, #Realtime date of log creation. + 'func_name' : 'foo', + 'lineno' : 46, + 'message' : 'Successfully disintegrated llama #3' + 'level' : 4 #Logbook enum + 'channel' : 'MyLogger' + } + + Frame checks that we have all expected fields and exports an + event/payload dict as JSON. + """ + + assert isinstance(payload, dict), \ + "LOG_FRAME received:"+str(type(log_record)) + + assert payload.has_key('algo_dt'), \ + "LOG_FRAME with no algo_dt" + assert payload.has_key('time'), \ + "LOG_FRAME with no time" + assert payload.has_key('channel'),\ + "LOG_FRAME with no channel" + assert payload.has_key('level'),\ + "LOG_FRAME with no level" + assert payload.has_key('message'),\ + "LOG_FRAME with no message" + + data = {} + data['e'] = 'LOG' + data['p'] = payload + + return json.dumps(data) + +def LOG_UNFRAME(msg): + """ + Expects a json serialized dictionary in event/payload format. + """ + record = json.loads(data) + assert record['e'] == 'LOG' + assert record.has_key('p') + + return record['p'] + + + diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index 116fb2b9..fa68366e 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -137,3 +137,25 @@ class NoopAlgorithm(object): def get_sid_filter(self): return None + +class LogTestAlgorithm(object): + + def __init__(self): + import logbook + self.log = logbook.Logger() + + def initialize(self): + self.log.info("Initialize") + + def set_order(self, order_callable): + pass + + def set_portfolio(self, portfolio): + pass + + def handle_data(self, data): + self.log.info("handle_data") + pass + + def get_sid_filter(self): + return None From e4003cdc4c9dd78dd14958435447c5d0d6bd4158 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 1 Jul 2012 20:28:49 -0400 Subject: [PATCH 06/16] new algorithm for testing logging --- zipline/test_algorithms.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index fa68366e..059e68d5 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -138,14 +138,13 @@ class NoopAlgorithm(object): def get_sid_filter(self): return None -class LogTestAlgorithm(object): +class TestPrintAlgorithm(): def __init__(self): - import logbook - self.log = logbook.Logger() + pass def initialize(self): - self.log.info("Initialize") + print "Initializing" def set_order(self, order_callable): pass @@ -154,7 +153,7 @@ class LogTestAlgorithm(object): pass def handle_data(self, data): - self.log.info("handle_data") + print "handle_data" pass def get_sid_filter(self): From 87a5e442c047195fcab23de744ac8171914ab58e Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 1 Jul 2012 20:29:55 -0400 Subject: [PATCH 07/16] bug fixes to LOG_FRAME --- zipline/protocol.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/zipline/protocol.py b/zipline/protocol.py index fb27b2d2..89cd99aa 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -118,6 +118,7 @@ import msgpack import numbers import datetime import pytz + from collections import namedtuple from utils.protocol_utils import Enum, FrameExceptionFactory, ndict, namelookup @@ -615,8 +616,8 @@ def LOG_FRAME(payload): 'time' : 1199223001, #Realtime date of log creation. 'func_name' : 'foo', 'lineno' : 46, - 'message' : 'Successfully disintegrated llama #3' - 'level' : 4 #Logbook enum + 'message' : 'Successfully disintegrated llama #3', + 'level' : 4, #Logbook enum 'channel' : 'MyLogger' } @@ -625,7 +626,7 @@ def LOG_FRAME(payload): """ assert isinstance(payload, dict), \ - "LOG_FRAME received:"+str(type(log_record)) + "LOG_FRAME expected a dict" assert payload.has_key('algo_dt'), \ "LOG_FRAME with no algo_dt" @@ -642,17 +643,16 @@ def LOG_FRAME(payload): data['e'] = 'LOG' data['p'] = payload - return json.dumps(data) + return msgpack.dumps(data) def LOG_UNFRAME(msg): """ Expects a json serialized dictionary in event/payload format. """ - record = json.loads(data) + record = msgpack.loads(msg) assert record['e'] == 'LOG' assert record.has_key('p') return record['p'] - From 33a3e2d63a8a8af288ee5db76cce20f4458c6f7b Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Sun, 1 Jul 2012 20:30:32 -0400 Subject: [PATCH 08/16] spellcheck on a comment --- zipline/core/devsimulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/core/devsimulator.py b/zipline/core/devsimulator.py index 85ddc44e..7f382056 100644 --- a/zipline/core/devsimulator.py +++ b/zipline/core/devsimulator.py @@ -1,5 +1,5 @@ """ -Simulator hosts all the components necessary to execute a simluation. +Simulator hosts all the components necessary to execute a simulation. See :py:method"" """ From 5a7fa6d8931d6f700954fe13c67bc9c435e10025 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Mon, 2 Jul 2012 05:36:48 -0400 Subject: [PATCH 09/16] bugfixes to logging setup --- zipline/components/tradesimulation.py | 45 +++++++++++++++++++-------- zipline/protocol.py | 4 +-- zipline/test_algorithms.py | 4 +-- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 7e6f803d..94ada361 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -8,7 +8,7 @@ from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict -from qexec.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe +from zipline.utils.log_utils import ZeroMQLogHandler, stdout_only_pipe from logbook import Logger, NestedSetup, Processor, queues @@ -36,15 +36,14 @@ class TradeSimulationClient(Component): self.log_socket = log_socket - #If we have a log socket,setup context managers for capturing user prints. + + #If we have a log socket,setup context manager for exporting captured + #print statements if log_socket: - - log = Logger("Print") - self.stdout_capture = stdout_only_pipe(log, 'user algo stdout') - - handler = queues.ZeroMQLogHandler(uri = log_socket) - self.zmq_out = handler.threadbound() - + self.zmq_out = ZeroMQLogHandler(uri = log_socket) + self.logger = Logger("Print") + self.stdout_capture = stdout_only_pipe #THIS IS A CLASS! + @property def get_id(self): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) @@ -60,9 +59,10 @@ class TradeSimulationClient(Component): # ask the algorithm to initialize, routing stdout to a zmq PUSH socket. if self.log_socket: - with self.zmq_out, self.stdout_capture: + with self.zmq_out.threadbound(), \ + self.stdout_capture(self.logger, 'Algo print capture'): self.algorithm.initialize() - + # if we don't have a log socket, initialize anyway. else: self.algorithm.initialize() @@ -163,7 +163,6 @@ class TradeSimulationClient(Component): # any fields injected here should be added to # LOG_EXTRA_FIELDS in zipline/protocol.py - # try to run algo with log rerouting if self.log_socket: def inject_event_data(record): @@ -174,12 +173,32 @@ class TradeSimulationClient(Component): log_pipeline = NestedSetup([self.zmq_out, #e.g. FileHandler(...) data_injector]) - with log_pipeline, self.stdout_capture: + with log_pipeline.threadbound(), self.stdout_capture: self.algorithm.handle_data(data) # if no log socket, just run the algo normally else: self.algorithm.handle_data(data) + + #Testing utility for log capture. + def test_run_algorithm(self): + from zipline.utils.date_utils import epoch_now + + def inject_event_data(record): + record.extra['algo_dt'] = epoch_now() #Mock an event.dt + + data_injector = Processor(inject_event_data) + log_pipeline = NestedSetup([self.zmq_out, + #e.g. FileHandler(...) + data_injector]) + with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): + self.algorithm.handle_data('data') + # if no log socket, just run the algo normally + + + + + def connect_order(self): return self.connect_push_socket(self.addresses['order_address']) diff --git a/zipline/protocol.py b/zipline/protocol.py index 89cd99aa..20d35f1c 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -616,7 +616,7 @@ def LOG_FRAME(payload): 'time' : 1199223001, #Realtime date of log creation. 'func_name' : 'foo', 'lineno' : 46, - 'message' : 'Successfully disintegrated llama #3', + 'msg' : 'Successfully disintegrated llama #3', 'level' : 4, #Logbook enum 'channel' : 'MyLogger' } @@ -636,7 +636,7 @@ def LOG_FRAME(payload): "LOG_FRAME with no channel" assert payload.has_key('level'),\ "LOG_FRAME with no level" - assert payload.has_key('message'),\ + assert payload.has_key('msg'),\ "LOG_FRAME with no message" data = {} diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index 059e68d5..7d33bf13 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -144,7 +144,7 @@ class TestPrintAlgorithm(): pass def initialize(self): - print "Initializing" + print "Initializing..." def set_order(self, order_callable): pass @@ -153,7 +153,7 @@ class TestPrintAlgorithm(): pass def handle_data(self, data): - print "handle_data" + print "Handling Data..." pass def get_sid_filter(self): From 59fdd1f292f12d080824769b4f3ce86b325f373d Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Mon, 2 Jul 2012 23:36:15 -0400 Subject: [PATCH 10/16] bugfixes and disable initialization logging --- zipline/components/tradesimulation.py | 62 +++++++++++++++------------ 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 94ada361..87b21e55 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -35,15 +35,7 @@ class TradeSimulationClient(Component): self.perf = perf.PerformanceTracker(self.trading_environment) self.log_socket = log_socket - - - #If we have a log socket,setup context manager for exporting captured - #print statements - if log_socket: - self.zmq_out = ZeroMQLogHandler(uri = log_socket) - self.logger = Logger("Print") - self.stdout_capture = stdout_only_pipe #THIS IS A CLASS! - + @property def get_id(self): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) @@ -57,19 +49,38 @@ class TradeSimulationClient(Component): # register the trading_client's order method with the algorithm self.algorithm.set_order(self.order) + #TODO: re-enable initialization logging + # ask the algorithm to initialize, routing stdout to a zmq PUSH socket. - if self.log_socket: - with self.zmq_out.threadbound(), \ - self.stdout_capture(self.logger, 'Algo print capture'): - self.algorithm.initialize() + #if self.log_socket: + # with self.zmq_out.threadbound(), \ + # self.stdout_capture(self.logger, 'Algo print capture'): + # self.algorithm.initialize() # if we don't have a log socket, initialize anyway. - else: - self.algorithm.initialize() + #else: + # self.algorithm.initialize() + + self.algorithm.initialize() def open(self): self.result_feed = self.connect_result() self.perf.open(self.context) + + #If we have a log socket,setup context manager for exporting captured + #print statements + if self.log_socket: + self.zmq_out = ZeroMQLogHandler(uri = self.log_socket, context = self.context) + self.logger = Logger("Print") + self.stdout_capture = stdout_only_pipe #THIS IS A CLASS! + + #Initialize log capture for testing purposes. + def setup_logging(self, context): + if self.log_socket: + self.zmq_out = ZeroMQLogHandler(uri = self.log_socket, context = context) + self.logger = Logger("Print") + self.stdout_capture = stdout_only_pipe #THIS IS A CLASS! + def do_work(self): # poll all the sockets @@ -157,23 +168,26 @@ class TradeSimulationClient(Component): current_portfolio = self.perf.get_portfolio() self.algorithm.set_portfolio(current_portfolio) data = self.get_data() + if len(data) > 0: # data injection pipeline for log rerouting # any fields injected here should be added to # LOG_EXTRA_FIELDS in zipline/protocol.py - # try to run algo with log rerouting if self.log_socket: + def inject_event_data(record): - record.extra['algo_dt'] = event.dt - #record.extra['whatever else we want'] = event.whatever + + #Record the simulation time. + + record.extra['algo_dt'] = self.current_dt data_injector = Processor(inject_event_data) log_pipeline = NestedSetup([self.zmq_out, #e.g. FileHandler(...) data_injector]) - with log_pipeline.threadbound(), self.stdout_capture: + with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''): self.algorithm.handle_data(data) # if no log socket, just run the algo normally else: @@ -181,10 +195,9 @@ class TradeSimulationClient(Component): #Testing utility for log capture. def test_run_algorithm(self): - from zipline.utils.date_utils import epoch_now - + def inject_event_data(record): - record.extra['algo_dt'] = epoch_now() #Mock an event.dt + record.extra['algo_dt'] = datetime.datetime.utcnow() #Mock an event.dt data_injector = Processor(inject_event_data) log_pipeline = NestedSetup([self.zmq_out, @@ -194,11 +207,6 @@ class TradeSimulationClient(Component): self.algorithm.handle_data('data') # if no log socket, just run the algo normally - - - - - def connect_order(self): return self.connect_push_socket(self.addresses['order_address']) From b447333fae63a1e117a9bab3dc06f601467061e3 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 4 Jul 2012 03:46:42 -0400 Subject: [PATCH 11/16] remove logging from initialize for now --- zipline/components/tradesimulation.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 87b21e55..4212d458 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -41,6 +41,7 @@ class TradeSimulationClient(Component): return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) def set_algorithm(self, algorithm): + """ :param algorithm: must implement the algorithm protocol. See :py:mod:`zipline.test.algorithm` @@ -49,21 +50,20 @@ class TradeSimulationClient(Component): # register the trading_client's order method with the algorithm self.algorithm.set_order(self.order) - #TODO: re-enable initialization logging - + #TODO: re-enable initialization logging. This means we can't call set_algorithm + #until we have a context for this component. Possibly this could happen # ask the algorithm to initialize, routing stdout to a zmq PUSH socket. - #if self.log_socket: - # with self.zmq_out.threadbound(), \ - # self.stdout_capture(self.logger, 'Algo print capture'): - # self.algorithm.initialize() - - # if we don't have a log socket, initialize anyway. + + #with self.zmq_out.threadbound(), self.stdout_capture(self.logger, 'Algo print capture'): + # self.algorithm.initialize() + #if we don't have a log socket, initialize anyway. #else: # self.algorithm.initialize() self.algorithm.initialize() def open(self): + self.result_feed = self.connect_result() self.perf.open(self.context) @@ -80,12 +80,10 @@ class TradeSimulationClient(Component): self.zmq_out = ZeroMQLogHandler(uri = self.log_socket, context = context) self.logger = Logger("Print") self.stdout_capture = stdout_only_pipe #THIS IS A CLASS! - def do_work(self): # poll all the sockets socks = dict(self.poll.poll(self.heartbeat_timeout)) - # see if the poller has results for the result_feed if socks.get(self.result_feed) == self.zmq.POLLIN: @@ -103,6 +101,7 @@ class TradeSimulationClient(Component): event = zp.MERGE_UNFRAME(msg) self.received_count += 1 # update performance and relay the event to the algorithm + self.process_event(event) if self.perf.exceeded_max_loss: self.finish_simulation() @@ -118,7 +117,6 @@ class TradeSimulationClient(Component): self.signal_done() def process_event(self, event): - # generate transactions, if applicable txn = self.txn_sim.apply_trade_to_open_orders(event) if txn: @@ -143,10 +141,10 @@ class TradeSimulationClient(Component): # queue the event. self.queue_event(event) - # if the event is later than our current time, run the algo # otherwise, the algorithm has fallen behind the feed # and processing per event is longer than time between events. + if event.dt >= self.current_dt: # compress time by moving the current_time up to the event # time. @@ -168,13 +166,13 @@ class TradeSimulationClient(Component): current_portfolio = self.perf.get_portfolio() self.algorithm.set_portfolio(current_portfolio) data = self.get_data() - + if len(data) > 0: # data injection pipeline for log rerouting # any fields injected here should be added to # LOG_EXTRA_FIELDS in zipline/protocol.py - + if self.log_socket: def inject_event_data(record): From 14018b9ab77efd1d138e3a347b0cdf587314e9d0 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 4 Jul 2012 03:47:41 -0400 Subject: [PATCH 12/16] grammar police on comment --- zipline/core/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/core/component.py b/zipline/core/component.py index 2d0619ce..4839e147 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -213,7 +213,7 @@ class Component(object): Run the component. Optionally takes an argument to catch and log all exceptions - raised during execution ues this with care since it makes it + raised during execution. Use this with care since it makes it very hard to debug since it mucks up your stacktraces. """ From 1d0c4ffa1487e69027babca2458d7602f37cdee4 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 4 Jul 2012 03:47:58 -0400 Subject: [PATCH 13/16] :result --->results --- zipline/lines.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/zipline/lines.py b/zipline/lines.py index e29fccf8..768b1ef9 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -141,6 +141,7 @@ class SimulatedTrading(object): self.sim = config['simulator_class'](addresses) self.clients = {} + self.trading_client = TradeSimulationClient( self.trading_environment, self.sim_style, @@ -166,7 +167,7 @@ class SimulatedTrading(object): def create_test_zipline(**config): """ :param config: A configuration object that is a dict with: - + - environment - a \ :py:class:`zipline.finance.trading.TradingEnvironment` - allocator - a :py:class:`zipline.simulator.AddressAllocator` @@ -189,7 +190,7 @@ class SimulatedTrading(object): a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) - + allocator = config['allocator'] sid = config['sid'] @@ -306,8 +307,8 @@ class SimulatedTrading(object): def get_cumulative_performance(self): return self.trading_client.perf.cumulative_performance.to_dict() - def publish_to(self, result_socket): - self.trading_client.perf.publish_to(result_socket) + def publish_to(self, results_socket): + self.trading_client.perf.publish_to(results_socket) def allocate_sockets(self, n): """ From a789db17898436124436dcb716badaa2ef128f9a Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 4 Jul 2012 03:48:28 -0400 Subject: [PATCH 14/16] make git happy --- zipline/finance/performance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 9d761eb1..ab0c17e6 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -261,7 +261,7 @@ class PerformanceTracker(object): self.todays_performance.calculate_performance() def handle_market_close(self): - + # add the return results from today to the list of DailyReturn objects. todays_date = self.market_close.replace(hour=0, minute=0, second=0) todays_return_obj = risk.DailyReturn( @@ -347,7 +347,7 @@ class PerformanceTracker(object): if self.results_socket: log.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() - + msg = zp.RISK_FRAME(risk_dict) self.results_socket.send(msg) # this signals that the simulation is complete. From 6aa7bf893d4f241f10287e64dd83d3918b33173b Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 4 Jul 2012 15:31:16 -0400 Subject: [PATCH 15/16] trying to fix some odd behavior --- zipline/utils/log_utils.py | 161 +++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 zipline/utils/log_utils.py diff --git a/zipline/utils/log_utils.py b/zipline/utils/log_utils.py new file mode 100644 index 00000000..bf08cf41 --- /dev/null +++ b/zipline/utils/log_utils.py @@ -0,0 +1,161 @@ +import logbook +import zmq +import pytz +import datetime + +from logbook import NOTSET +from logbook.handlers import Handler + +from zipline.protocol import LOG_FRAME, LOG_FIELDS, \ + LOG_EXTRA_FIELDS + +from contextlib import contextmanager + +class redirecter(object): + def __init__(self, logger, name): + self.logger = logger + self.buffer = bytes() + self.name = name + + def write(self, line): + self.buffer += ''.join(['>>> ', line.strip('\n'), '\n']) + + def flush(self, final=False): + if not self.buffer: + return + out_form = """ [{pipe_name}] \n{buffer}""".format( + pipe_name = self.name, + buffer = self.buffer + ) + self.logger.error(out_form) + self.buffer = bytes() + +class log_redirecter(object): + def __init__(self, logger): + self.logger = logger + + def write(self, line): + #Absorb blank lines from print statements. + if line =='\n': + return + + else: + #TODO: add logic to guarantee we made this + self.logger.info(line.strip('\n')) + + def flush(self, final=False): + pass + +@contextmanager +def stdout_pipe(logger, pipe_name): + """ + Pipe stdout and stderr into a python logger interface + """ + import sys + orig_fds = sys.stdout, sys.stderr + + sys.stderr = redirecter(logger, pipe_name) + sys.stdout = redirecter(logger, pipe_name) + + yield + sys.stderr.flush() + sys.stdout.flush() + sys.stdout, sys.stderr = orig_fds + +@contextmanager +def stdout_only_pipe(logger, pipe_name): + """ + Pipes just stdout into a python logger interface + """ + import sys + orig_fd = sys.stdout + sys.stdout = log_redirecter(logger) + + yield + sys.stdout.flush() + sys.stdout = orig_fd + +class ZeroMQLogHandler(Handler): + """ + A handler that takes messages captured from the user algorithm stdout + and transforms them into LOG_FRAMES suitable for database storage. + Setup is similar to logbook.queues.ZeroMQHandler, except we connect + instead of binding and we extract record fields into a dict. + """ + + def __init__(self, uri=None, level=NOTSET, filter=None, bubble=False, + context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS): + Handler.__init__(self, level, filter, bubble) + try: + import zmq + except ImportError: + raise RuntimeError('The pyzmq library is required for ' + 'the ZeroMQHandler.') + #: the zero mq context + self.context = context + #: the zero mq socket. + self.socket = self.context.socket(zmq.PUSH) + + self.uri = uri + if uri is not None: + self.socket.connect(uri) + + self.fds = fds + self.extra_fds = extra_fds + + def export_record(self, record): + """ + Extract relevant fields from a log record, fiddling with datetime + fields to make json happy. + """ + from zipline.utils.date_utils import EPOCH + + #Needed to extract record info from dictionary. + record.pull_information() + + #Logbook stores record times as datetime objects, which + #can't be serialized by JSON, so we need to convert to + #unix epoch representation. + + if record.time: + assert isinstance(record.time, datetime.datetime) + + time = record.time.replace(tzinfo = pytz.utc) + #logbook measures time in utc already, no need to convert. + record.time = EPOCH(time) + + #Do the same if algo_dt is a datetime object. + if record.extra.has_key('algo_dt'): + algo_dt = record.extra['algo_dt'] + + if isinstance(algo_dt, datetime.datetime): + algo_dt = EPOCH(algo_dt.replace(tzinfo = pytz.utc)) + record.extra['algo_dt'] = algo_dt + + data = {} + + #Extract all the fields we care about from LogRecord's internal + #dictionary. + + for field in iter(self.fds): + if record.__dict__.has_key(field): + data[field] = record.__dict__[field] + else: + data[field] = None + + for field in iter(self.extra_fds): + if record.extra.has_key(field): + data[field] = record.extra[field] + else: + data[field] = None + return data + + def emit(self, record): + """Extract relevant fields and send info as JSON over a zmq socket.""" + payload = self.export_record(record) + self.socket.send(LOG_FRAME(payload)) + + def close(self): + #self.socket.close() + pass + From c182add0a3644b368e60fcc6543fd12d0bf2abbf Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 5 Jul 2012 17:50:51 -0400 Subject: [PATCH 16/16] minimal --- zipline/components/tradesimulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 4212d458..3bf66282 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -115,7 +115,7 @@ class TradeSimulationClient(Component): # signal Simulator, our ComponentHost, that this component is # done and Simulator needn't block exit on this component. self.signal_done() - + def process_event(self, event): # generate transactions, if applicable txn = self.txn_sim.apply_trade_to_open_orders(event)