diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index eacae981..764c3b27 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -106,10 +106,18 @@ class TradeSimulationClient(Component): # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() + # signal that the logging stream is done, close the + # logging socket + self.logging_done() + # signal Simulator, our ComponentHost, that this component is # done and Simulator needn't block exit on this component. self.signal_done() + def logging_done(self): + self.zmq_out.socket.send(zp.LOG_DONE) + self.zmq_out.close() + def process_event(self, event): # generate transactions, if applicable txn = self.txn_sim.apply_trade_to_open_orders(event) @@ -184,6 +192,7 @@ class TradeSimulationClient(Component): self.algorithm.handle_data(data) #Testing utility for log capture. + # TODO: remove test code from here. def test_run_algorithm(self): def inject_event_data(record): diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index a9b66b4f..c5a5149d 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -291,7 +291,6 @@ class Controller(object): # ============== # Wait the responses - checktime = self.ctime while self.alive: socks = dict(poller.poll(0)) @@ -312,14 +311,13 @@ class Controller(object): if not self.router.getsockopt(self.zmq.RCVMORE): self.handle_recv(buffer[:]) buffer = [] - #checktime = time.time() except INVALID_CONTROL_FRAME: log.error('Invalid frame', rawmessage) pass - if tic - checktime > self.period: - log.info("heartbeat loop timedout: %s" % (tic - checktime)) + if tic - self.ctime > self.period: + log.info("heartbeat loop timedout: %s" % (tic - self.ctime)) log.info(repr(self.responses)) break diff --git a/zipline/protocol.py b/zipline/protocol.py index 35cef68b..df98061b 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -608,6 +608,7 @@ SIMULATION_STYLE = Enum( LOG_FIELDS = set(['func_name', 'lineno', 'time', 'msg',\ 'level', 'channel', ]) LOG_EXTRA_FIELDS = set(['algo_dt',]) +LOG_DONE = "DONE" def LOG_FRAME(payload): """ @@ -621,14 +622,14 @@ def LOG_FRAME(payload): 'level' : 4, #Logbook enum 'channel' : 'MyLogger' } - + Frame checks that we have all expected fields and exports an event/payload dict as JSON. """ assert isinstance(payload, dict), \ "LOG_FRAME expected a dict" - + assert payload.has_key('algo_dt'), \ "LOG_FRAME with no algo_dt" assert payload.has_key('time'), \ @@ -639,11 +640,11 @@ def LOG_FRAME(payload): "LOG_FRAME with no level" assert payload.has_key('msg'),\ "LOG_FRAME with no message" - + data = {} data['e'] = 'LOG' data['p'] = payload - + return msgpack.dumps(data) def LOG_UNFRAME(msg): @@ -653,7 +654,5 @@ def LOG_UNFRAME(msg): record = msgpack.loads(msg) assert record['e'] == 'LOG' assert record.has_key('p') - + return record['p'] - - diff --git a/zipline/utils/log_utils.py b/zipline/utils/log_utils.py index bf08cf41..32a6b286 100644 --- a/zipline/utils/log_utils.py +++ b/zipline/utils/log_utils.py @@ -4,13 +4,16 @@ import pytz import datetime from logbook import NOTSET -from logbook.handlers import Handler +from logbook.handlers import Handler, FileHandler from zipline.protocol import LOG_FRAME, LOG_FIELDS, \ LOG_EXTRA_FIELDS from contextlib import contextmanager + +log = logbook.Logger("LogUtils") + class redirecter(object): def __init__(self, logger, name): self.logger = logger @@ -30,10 +33,10 @@ class redirecter(object): self.logger.error(out_form) self.buffer = bytes() -class log_redirecter(object): +class log_redirecter(object): def __init__(self, logger): self.logger = logger - + def write(self, line): #Absorb blank lines from print statements. if line =='\n': @@ -42,7 +45,7 @@ class log_redirecter(object): else: #TODO: add logic to guarantee we made this self.logger.info(line.strip('\n')) - + def flush(self, final=False): pass @@ -70,7 +73,7 @@ def stdout_only_pipe(logger, pipe_name): import sys orig_fd = sys.stdout sys.stdout = log_redirecter(logger) - + yield sys.stdout.flush() sys.stdout = orig_fd @@ -82,7 +85,7 @@ class ZeroMQLogHandler(Handler): Setup is similar to logbook.queues.ZeroMQHandler, except we connect instead of binding and we extract record fields into a dict. """ - + def __init__(self, uri=None, level=NOTSET, filter=None, bubble=False, context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS): Handler.__init__(self, level, filter, bubble) @@ -109,21 +112,21 @@ class ZeroMQLogHandler(Handler): fields to make json happy. """ from zipline.utils.date_utils import EPOCH - + #Needed to extract record info from dictionary. record.pull_information() #Logbook stores record times as datetime objects, which - #can't be serialized by JSON, so we need to convert to + #can't be serialized by JSON, so we need to convert to #unix epoch representation. if record.time: assert isinstance(record.time, datetime.datetime) - + time = record.time.replace(tzinfo = pytz.utc) #logbook measures time in utc already, no need to convert. record.time = EPOCH(time) - + #Do the same if algo_dt is a datetime object. if record.extra.has_key('algo_dt'): algo_dt = record.extra['algo_dt'] @@ -131,31 +134,31 @@ class ZeroMQLogHandler(Handler): if isinstance(algo_dt, datetime.datetime): algo_dt = EPOCH(algo_dt.replace(tzinfo = pytz.utc)) record.extra['algo_dt'] = algo_dt - + data = {} - + #Extract all the fields we care about from LogRecord's internal - #dictionary. + #dictionary. for field in iter(self.fds): if record.__dict__.has_key(field): data[field] = record.__dict__[field] else: data[field] = None - + for field in iter(self.extra_fds): if record.extra.has_key(field): data[field] = record.extra[field] else: data[field] = None return data - + def emit(self, record): """Extract relevant fields and send info as JSON over a zmq socket.""" - payload = self.export_record(record) - self.socket.send(LOG_FRAME(payload)) - + logger = FileHandler('/var/log/zipline/zipline.log') + with logger.applicationbound(): + payload = self.export_record(record) + self.socket.send(LOG_FRAME(payload)) + def close(self): - #self.socket.close() - pass - + self.socket.close()