mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 02:20:07 +08:00
Merge branch 'master' into stddev_transform
This commit is contained in:
@@ -1,7 +1,15 @@
|
||||
import logging
|
||||
import logbook
|
||||
import uuid
|
||||
import zmq
|
||||
|
||||
from zipline import ndict
|
||||
|
||||
from zipline.utils.logger import configure_logging, tail
|
||||
from zipline.utils.log_utils import ZeroMQLogHandler
|
||||
|
||||
from zipline.utils.test_utils import create_receiver, drain_receiver
|
||||
|
||||
from unittest2 import TestCase
|
||||
|
||||
|
||||
@@ -20,3 +28,35 @@ class LoggerTestCase(TestCase):
|
||||
last_line = tail(logfile, window=1)
|
||||
logged_msg = last_line.split(" - ")[1]
|
||||
self.assertEqual(test_msg, logged_msg)
|
||||
|
||||
|
||||
def test_zmq_handler(self):
|
||||
socket_addr = 'tcp://127.0.0.1:10000'
|
||||
ctx = zmq.Context()
|
||||
socket_push = ctx.socket(zmq.PUSH)
|
||||
socket_push.connect(socket_addr)
|
||||
recv = create_receiver(socket_addr, ctx)
|
||||
zmq_out = ZeroMQLogHandler(
|
||||
socket = socket_push,
|
||||
filter = lambda r, h: r.channel in ['test zmq logger'],
|
||||
context=ctx,
|
||||
#bubble=False
|
||||
)
|
||||
|
||||
log = logbook.Logger('test zmq logger')
|
||||
x = ndict({})
|
||||
x.a = 1
|
||||
ex = example(133)
|
||||
with zmq_out.threadbound():
|
||||
log.info(ex.num)
|
||||
|
||||
|
||||
output, _ = drain_receiver(recv, count=1)
|
||||
self.assertEqual(output[-1]['prefix'], 'LOG')
|
||||
self.assertTrue(isinstance(output[-1]['payload']['msg'], basestring))
|
||||
|
||||
|
||||
class example(object):
|
||||
|
||||
def __init__(self, num):
|
||||
self.num = num
|
||||
|
||||
+8
-5
@@ -437,8 +437,8 @@ def TRADE_FRAME(event):
|
||||
event.sid,
|
||||
event.price,
|
||||
event.open,
|
||||
event.close,
|
||||
event.high,
|
||||
event.close,
|
||||
event.high,
|
||||
event.low,
|
||||
event.volume,
|
||||
event.dt,
|
||||
@@ -665,11 +665,11 @@ def tuple_to_date(date_tuple):
|
||||
return dt
|
||||
|
||||
# Datasource type should completely determine the other fields of a
|
||||
# message with its type.
|
||||
# message with its type.
|
||||
DATASOURCE_TYPE = Enum(
|
||||
'AS_TRADED_EQUITY',
|
||||
'AS_TRADED_EQUITY',
|
||||
'MERGER',
|
||||
'SPLIT',
|
||||
'SPLIT',
|
||||
'DIVIDEND',
|
||||
'TRADE',
|
||||
'EMPTY',
|
||||
@@ -736,6 +736,9 @@ def LOG_FRAME(payload):
|
||||
assert payload.has_key('msg'),\
|
||||
"LOG_FRAME with no message"
|
||||
|
||||
# truncation will only work with strings and msgpack will
|
||||
# preserve primitives.
|
||||
payload['msg'] = str(payload['msg'])
|
||||
|
||||
return BT_UPDATE_FRAME('LOG', payload)
|
||||
|
||||
|
||||
@@ -89,6 +89,7 @@ class ZeroMQLogHandler(Handler):
|
||||
def __init__(self, socket=None, level=NOTSET, filter=None, bubble=False,
|
||||
context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS):
|
||||
Handler.__init__(self, level, filter, bubble)
|
||||
|
||||
try:
|
||||
import zmq
|
||||
except ImportError:
|
||||
|
||||
@@ -91,11 +91,13 @@ def create_receiver(socket_addr, ctx):
|
||||
|
||||
return receiver
|
||||
|
||||
def drain_receiver(receiver):
|
||||
def drain_receiver(receiver, count=None):
|
||||
output = []
|
||||
transaction_count = 0
|
||||
msg_counter = 0
|
||||
while True:
|
||||
msg = receiver.recv()
|
||||
msg_counter += 1
|
||||
update = zp.BT_UPDATE_UNFRAME(msg)
|
||||
output.append(update)
|
||||
if update['prefix'] == 'PERF':
|
||||
@@ -106,6 +108,9 @@ def drain_receiver(receiver):
|
||||
elif update['prefix'] == 'DONE':
|
||||
break
|
||||
|
||||
if count and msg_counter >= count:
|
||||
break
|
||||
|
||||
receiver.close()
|
||||
del receiver
|
||||
|
||||
|
||||
Reference in New Issue
Block a user