mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 03:28:20 +08:00
fixed logging bugs, added a DONE signal for logging.
This commit is contained in:
@@ -106,10 +106,18 @@ class TradeSimulationClient(Component):
|
||||
# ended. Perf will internally calculate the full risk report.
|
||||
self.perf.handle_simulation_end()
|
||||
|
||||
# signal that the logging stream is done, close the
|
||||
# logging socket
|
||||
self.logging_done()
|
||||
|
||||
# signal Simulator, our ComponentHost, that this component is
|
||||
# done and Simulator needn't block exit on this component.
|
||||
self.signal_done()
|
||||
|
||||
def logging_done(self):
|
||||
self.zmq_out.socket.send(zp.LOG_DONE)
|
||||
self.zmq_out.close()
|
||||
|
||||
def process_event(self, event):
|
||||
# generate transactions, if applicable
|
||||
txn = self.txn_sim.apply_trade_to_open_orders(event)
|
||||
@@ -184,6 +192,7 @@ class TradeSimulationClient(Component):
|
||||
self.algorithm.handle_data(data)
|
||||
|
||||
#Testing utility for log capture.
|
||||
# TODO: remove test code from here.
|
||||
def test_run_algorithm(self):
|
||||
|
||||
def inject_event_data(record):
|
||||
|
||||
@@ -291,7 +291,6 @@ class Controller(object):
|
||||
# ==============
|
||||
|
||||
# Wait the responses
|
||||
checktime = self.ctime
|
||||
while self.alive:
|
||||
|
||||
socks = dict(poller.poll(0))
|
||||
@@ -312,14 +311,13 @@ class Controller(object):
|
||||
if not self.router.getsockopt(self.zmq.RCVMORE):
|
||||
self.handle_recv(buffer[:])
|
||||
buffer = []
|
||||
#checktime = time.time()
|
||||
|
||||
except INVALID_CONTROL_FRAME:
|
||||
log.error('Invalid frame', rawmessage)
|
||||
pass
|
||||
|
||||
if tic - checktime > self.period:
|
||||
log.info("heartbeat loop timedout: %s" % (tic - checktime))
|
||||
if tic - self.ctime > self.period:
|
||||
log.info("heartbeat loop timedout: %s" % (tic - self.ctime))
|
||||
log.info(repr(self.responses))
|
||||
break
|
||||
|
||||
|
||||
+6
-7
@@ -608,6 +608,7 @@ SIMULATION_STYLE = Enum(
|
||||
LOG_FIELDS = set(['func_name', 'lineno', 'time', 'msg',\
|
||||
'level', 'channel', ])
|
||||
LOG_EXTRA_FIELDS = set(['algo_dt',])
|
||||
LOG_DONE = "DONE"
|
||||
|
||||
def LOG_FRAME(payload):
|
||||
"""
|
||||
@@ -621,14 +622,14 @@ def LOG_FRAME(payload):
|
||||
'level' : 4, #Logbook enum
|
||||
'channel' : 'MyLogger'
|
||||
}
|
||||
|
||||
|
||||
Frame checks that we have all expected fields and exports an
|
||||
event/payload dict as JSON.
|
||||
"""
|
||||
|
||||
assert isinstance(payload, dict), \
|
||||
"LOG_FRAME expected a dict"
|
||||
|
||||
|
||||
assert payload.has_key('algo_dt'), \
|
||||
"LOG_FRAME with no algo_dt"
|
||||
assert payload.has_key('time'), \
|
||||
@@ -639,11 +640,11 @@ def LOG_FRAME(payload):
|
||||
"LOG_FRAME with no level"
|
||||
assert payload.has_key('msg'),\
|
||||
"LOG_FRAME with no message"
|
||||
|
||||
|
||||
data = {}
|
||||
data['e'] = 'LOG'
|
||||
data['p'] = payload
|
||||
|
||||
|
||||
return msgpack.dumps(data)
|
||||
|
||||
def LOG_UNFRAME(msg):
|
||||
@@ -653,7 +654,5 @@ def LOG_UNFRAME(msg):
|
||||
record = msgpack.loads(msg)
|
||||
assert record['e'] == 'LOG'
|
||||
assert record.has_key('p')
|
||||
|
||||
|
||||
return record['p']
|
||||
|
||||
|
||||
|
||||
+24
-21
@@ -4,13 +4,16 @@ import pytz
|
||||
import datetime
|
||||
|
||||
from logbook import NOTSET
|
||||
from logbook.handlers import Handler
|
||||
from logbook.handlers import Handler, FileHandler
|
||||
|
||||
from zipline.protocol import LOG_FRAME, LOG_FIELDS, \
|
||||
LOG_EXTRA_FIELDS
|
||||
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
log = logbook.Logger("LogUtils")
|
||||
|
||||
class redirecter(object):
|
||||
def __init__(self, logger, name):
|
||||
self.logger = logger
|
||||
@@ -30,10 +33,10 @@ class redirecter(object):
|
||||
self.logger.error(out_form)
|
||||
self.buffer = bytes()
|
||||
|
||||
class log_redirecter(object):
|
||||
class log_redirecter(object):
|
||||
def __init__(self, logger):
|
||||
self.logger = logger
|
||||
|
||||
|
||||
def write(self, line):
|
||||
#Absorb blank lines from print statements.
|
||||
if line =='\n':
|
||||
@@ -42,7 +45,7 @@ class log_redirecter(object):
|
||||
else:
|
||||
#TODO: add logic to guarantee we made this
|
||||
self.logger.info(line.strip('\n'))
|
||||
|
||||
|
||||
def flush(self, final=False):
|
||||
pass
|
||||
|
||||
@@ -70,7 +73,7 @@ def stdout_only_pipe(logger, pipe_name):
|
||||
import sys
|
||||
orig_fd = sys.stdout
|
||||
sys.stdout = log_redirecter(logger)
|
||||
|
||||
|
||||
yield
|
||||
sys.stdout.flush()
|
||||
sys.stdout = orig_fd
|
||||
@@ -82,7 +85,7 @@ class ZeroMQLogHandler(Handler):
|
||||
Setup is similar to logbook.queues.ZeroMQHandler, except we connect
|
||||
instead of binding and we extract record fields into a dict.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, uri=None, level=NOTSET, filter=None, bubble=False,
|
||||
context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS):
|
||||
Handler.__init__(self, level, filter, bubble)
|
||||
@@ -109,21 +112,21 @@ class ZeroMQLogHandler(Handler):
|
||||
fields to make json happy.
|
||||
"""
|
||||
from zipline.utils.date_utils import EPOCH
|
||||
|
||||
|
||||
#Needed to extract record info from dictionary.
|
||||
record.pull_information()
|
||||
|
||||
#Logbook stores record times as datetime objects, which
|
||||
#can't be serialized by JSON, so we need to convert to
|
||||
#can't be serialized by JSON, so we need to convert to
|
||||
#unix epoch representation.
|
||||
|
||||
if record.time:
|
||||
assert isinstance(record.time, datetime.datetime)
|
||||
|
||||
|
||||
time = record.time.replace(tzinfo = pytz.utc)
|
||||
#logbook measures time in utc already, no need to convert.
|
||||
record.time = EPOCH(time)
|
||||
|
||||
|
||||
#Do the same if algo_dt is a datetime object.
|
||||
if record.extra.has_key('algo_dt'):
|
||||
algo_dt = record.extra['algo_dt']
|
||||
@@ -131,31 +134,31 @@ class ZeroMQLogHandler(Handler):
|
||||
if isinstance(algo_dt, datetime.datetime):
|
||||
algo_dt = EPOCH(algo_dt.replace(tzinfo = pytz.utc))
|
||||
record.extra['algo_dt'] = algo_dt
|
||||
|
||||
|
||||
data = {}
|
||||
|
||||
|
||||
#Extract all the fields we care about from LogRecord's internal
|
||||
#dictionary.
|
||||
#dictionary.
|
||||
|
||||
for field in iter(self.fds):
|
||||
if record.__dict__.has_key(field):
|
||||
data[field] = record.__dict__[field]
|
||||
else:
|
||||
data[field] = None
|
||||
|
||||
|
||||
for field in iter(self.extra_fds):
|
||||
if record.extra.has_key(field):
|
||||
data[field] = record.extra[field]
|
||||
else:
|
||||
data[field] = None
|
||||
return data
|
||||
|
||||
|
||||
def emit(self, record):
|
||||
"""Extract relevant fields and send info as JSON over a zmq socket."""
|
||||
payload = self.export_record(record)
|
||||
self.socket.send(LOG_FRAME(payload))
|
||||
|
||||
logger = FileHandler('/var/log/zipline/zipline.log')
|
||||
with logger.applicationbound():
|
||||
payload = self.export_record(record)
|
||||
self.socket.send(LOG_FRAME(payload))
|
||||
|
||||
def close(self):
|
||||
#self.socket.close()
|
||||
pass
|
||||
|
||||
self.socket.close()
|
||||
|
||||
Reference in New Issue
Block a user