Merge pull request #117 from quantopian/ditch_zmq

Ditch zmq
This commit is contained in:
fawce
2012-09-12 10:30:20 -07:00
17 changed files with 96 additions and 1262 deletions
-1
View File
@@ -75,7 +75,6 @@ Scientific python on the Mac can be a bit confusing because of the many independ
- umfpack - you need this to build scipy. ```brew install umfpack```
- swig - you need this to build scipy. ```brew install swig```
- hdf5 - you need this to build tables. ```brew install hdf5```
- zeromq - you need this to run qbt. ```brew install zmq```
Data Sources
-7
View File
@@ -2,13 +2,6 @@ msgpack-python==0.1.12
humanhash==0.0.1
iso8601==0.1.4
# ZeroMQ
pyzmq==2.1.11
gevent-zeromq==0.2.2
# Unix
setproctitle==1.1.6
# Logging
Logbook==0.3
-2
View File
@@ -16,5 +16,3 @@ patsy==0.1.0
statsmodels>=0.5.0
scikit-learn==0.11
# ZeroMQ
pyzmq==2.1.11
-1
View File
@@ -110,7 +110,6 @@ options(
# Because I'm lazy
stuff_i_want_in_my_debug_shell = [
('qutil', 'zipline.util', []),
('zmq', 'zmq', []),
]
@task
+38 -74
View File
@@ -1,4 +1,3 @@
import zmq
from unittest2 import TestCase
from collections import defaultdict
@@ -9,7 +8,7 @@ from zipline.finance.trading import SIMULATION_STYLE
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
from zipline.gens.transform import StatefulTransform
from zipline.gens.tradesimulation import MAX_HEARTBEAT_INTERVALS
from zipline.utils.timeout import TimeoutException
from zipline.utils.test_utils import (
drain_zipline,
@@ -22,7 +21,6 @@ from zipline.utils.test_utils import (
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
class ExceptionTestCase(TestCase):
@@ -31,14 +29,11 @@ class ExceptionTestCase(TestCase):
def setUp(self):
self.zipline_test_config = {
'sid' : 133,
'results_socket_uri' : allocator.lease(1)[0],
'simulation_style' : SIMULATION_STYLE.FIXED_SLIPPAGE
}
self.ctx = zmq.Context()
setup_logger(self)
def tearDown(self):
self.ctx.term()
teardown_logger(self)
def test_datasource_exception(self):
@@ -46,15 +41,15 @@ class ExceptionTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
assert len(output) == 1
assert output[0]['prefix'] == 'EXCEPTION'
message = output[0]['payload']
for field in ['date', 'message', 'name', 'stack']:
assert field in message.keys()
assert message['message'] == 'integer division or modulo by zero'
assert message['name'] == 'ZeroDivisionError'
with self.assertRaises(ZeroDivisionError) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(
ctx.exception.message,
'integer division or modulo by zero'
)
def test_tranform_exception(self):
exc_tnfm = StatefulTransform(ExceptionTransform)
@@ -63,16 +58,11 @@ class ExceptionTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
assert len(output) == 1
assert output[0]['prefix'] == 'EXCEPTION'
message = output[0]['payload']
for field in ['date', 'message', 'name', 'stack']:
assert field in message.keys()
assert message['message'] == 'An assertion message'
assert message['name'] == 'AssertionError'
with self.assertRaises(AssertionError) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(ctx.exception.message,'An assertion message')
def test_exception_in_init(self):
# Simulation
@@ -86,16 +76,11 @@ class ExceptionTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertTrue(payload['date'])
self.assertEqual(payload['message'],'Algo exception in initialize')
self.assertEqual(payload['name'],'Exception')
# make sure our path shortening is working
self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py')
self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py')
with self.assertRaises(Exception) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(ctx.exception.message,'Algo exception in initialize')
def test_exception_in_handle_data(self):
# Simulation
@@ -110,16 +95,10 @@ class ExceptionTestCase(TestCase):
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertTrue(payload['date'])
del payload['date']
self.assertEqual(payload['message'],'Algo exception in handle_data')
self.assertEqual(payload['name'],'Exception')
# make sure our path shortening is working
self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py')
self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py')
with self.assertRaises(Exception) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(ctx.exception.message,'Algo exception in handle_data')
def test_zerodivision_exception_in_handle_data(self):
@@ -134,36 +113,30 @@ class ExceptionTestCase(TestCase):
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
with self.assertRaises(ZeroDivisionError) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(ctx.exception.message,'integer division or modulo by zero')
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertTrue(payload['date'])
del payload['date']
self.assertEqual(payload['message'],'integer division or modulo by zero')
self.assertEqual(payload['name'],'ZeroDivisionError')
# make sure our path shortening is working
self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py')
self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py')
def test_initialize_timeout(self):
self.zipline_test_config['algorithm'] = \
InitializeTimeoutAlgorithm(
self.zipline_test_config['sid']
)
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertEqual(payload['name'],'TimeoutException')
self.assertEqual(payload['message'], 'Call to initialize timed out')
with self.assertRaises(TimeoutException) as ctx:
output, _ = drain_zipline(self, zipline)
self.assertEqual(ctx.exception.message,'Call to initialize timed out')
def test_heartbeat(self):
self.zipline_test_config['algorithm'] = \
TooMuchProcessingAlgorithm(
self.zipline_test_config['sid']
@@ -171,20 +144,11 @@ class ExceptionTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
output, _ = drain_zipline(self, zipline)
# There should be a message for each hearbeat, plus a message
# for the final timeout.
assert len(output) == MAX_HEARTBEAT_INTERVALS + 1
# Assert that everything but the last message is a heartbeat log.
for message in output[0:-1]:
assert message['prefix'] == 'LOG'
assert message['payload']['func_name'] == 'log_heartbeats'
with self.assertRaises(TimeoutException) as ctx:
output, _ = drain_zipline(self, zipline)
# Assert that the last message is a timeout exception.
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertEqual(payload['name'],'TimeoutException')
self.assertEqual(payload['message'], 'Too much time spent in handle_data call')
self.assertEqual(
ctx.exception.message,
'Too much time spent in handle_data call'
)
-5
View File
@@ -2,7 +2,6 @@
Tests for the zipline.finance package
"""
import pytz
import zmq
from unittest2 import TestCase
from datetime import datetime, timedelta
@@ -13,7 +12,6 @@ from nose.tools import timed
import zipline.utils.factory as factory
from zipline.finance.trading import TradingEnvironment
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
from zipline.finance.performance import PerformanceTracker
from zipline.utils.protocol_utils import ndict
@@ -27,7 +25,6 @@ from zipline.utils.test_utils import \
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
class FinanceTestCase(TestCase):
@@ -36,9 +33,7 @@ class FinanceTestCase(TestCase):
def setUp(self):
self.zipline_test_config = {
'sid' : 133,
'results_socket_uri' : allocator.lease(1)[0]
}
self.ctx = zmq.Context()
setup_logger(self)
-3
View File
@@ -12,8 +12,6 @@ from zipline.core.devsimulator import AddressAllocator
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
from zipline.utils.test_utils import setup_logger, teardown_logger
class TestUpDown(TestCase):
@@ -26,7 +24,6 @@ class TestUpDown(TestCase):
def setUp(self):
self.zipline_test_config = {
'allocator' : allocator,
'sid' : 133,
'trade_count' : 5,
'amplitude' : 30,
-73
View File
@@ -1,73 +0,0 @@
"""
Test the FRAME/UNFRAME functions in the sequence expected from ziplines.
"""
from unittest2 import TestCase
from datetime import datetime, timedelta
from collections import defaultdict
from nose.tools import timed
import zipline.utils.factory as factory
import zipline.protocol as zp
DEFAULT_TIMEOUT = 5 # seconds
class ProtocolTestCase(TestCase):
leased_sockets = defaultdict(list)
def setUp(self):
#qutil.configure_logging()
self.trading_environment = factory.create_trading_environment()
@timed(DEFAULT_TIMEOUT)
def test_trade_feed_protocol(self):
sid = 133
price = [10.0] * 4
volume = [100] * 4
start_date = datetime.strptime("02/15/2012","%m/%d/%Y")
one_day_td = timedelta(days=1)
trades = factory.create_trade_history(
sid,
price,
volume,
one_day_td,
self.trading_environment
)
for trade in trades:
#simulate data source sending frame
msg = zp.DATASOURCE_FRAME(zp.ndict(trade))
#feed unpacking frame
recovered_trade = zp.DATASOURCE_UNFRAME(msg)
#feed sending frame
feed_msg = zp.FEED_FRAME(recovered_trade)
#transform unframing
recovered_feed = zp.FEED_UNFRAME(feed_msg)
#do a transform
trans_msg = zp.TRANSFORM_FRAME('helloworld', 2345.6)
#simulate passthrough transform -- passthrough shouldn't even
# unpack the msg, just resend.
passthrough_msg = zp.TRANSFORM_FRAME(zp.TRANSFORM_TYPE.PASSTHROUGH,\
feed_msg)
#merge unframes transform and passthrough
trans_recovered = zp.TRANSFORM_UNFRAME(trans_msg)
pt_recovered = zp.TRANSFORM_UNFRAME(passthrough_msg)
#simulated merge
pt_recovered.PASSTHROUGH.merge(trans_recovered)
#frame the merged event
merged_msg = zp.MERGE_FRAME(pt_recovered.PASSTHROUGH)
#unframe the merge and validate values
event = zp.MERGE_UNFRAME(merged_msg)
#check the transformed value, should only be in event, not trade.
self.assertTrue(event.helloworld == 2345.6)
event.delete('helloworld')
self.assertEqual(zp.ndict(trade), event)
-2
View File
@@ -5,10 +5,8 @@ Zipline
# This is *not* a place to dump arbitrary classes/modules for convenience,
# it is a place to expose the public interfaces.
import protocol # namespace
from utils.protocol_utils import ndict
__all__ = [
protocol,
ndict
]
+26 -193
View File
@@ -59,16 +59,9 @@ before invoking simulate.
| __init__. |
+---------------------------------+
"""
import sys
import zmq
import os
from signal import SIGHUP, SIGINT
import multiprocessing
from setproctitle import setproctitle
from zipline.test_algorithms import TestAlgorithm
from zipline.finance.trading import SIMULATION_STYLE
from zipline.utils.log_utils import ZeroMQLogHandler
from zipline.utils import factory
from zipline.gens.composites import (
@@ -78,8 +71,6 @@ from zipline.gens.composites import (
from zipline.gens.tradesimulation import TradeSimulationClient as tsc
from logbook import Logger
import zipline.protocol as zp
log = Logger('Lines')
@@ -90,10 +81,27 @@ class SimulatedTrading(object):
transforms,
algorithm,
environment,
style,
results_socket_uri,
context,
sim_id):
style):
"""
@sources - an iterable of iterables
These iterables must yield ndicts that contain:
- type :: a ziplines.protocol.DATASOURCE_TYPE
- dt :: a milliseconds since epoch timestamp in UTC
@transforms - An iterable of instances of StatefulTransform.
@algorithm - An object that implements:
`def initialize(self)`
`def handle_data(self, data)`
`def get_sid_filter(self)`
`def set_logger(self, logger)`
`def set_order(self, order_callable)`
@environment - An instance of finance.trading.TradingEnvironment
@style - protocol.SIMULATION_STYLE
"""
self.date_sorted = date_sorted_sources(*sources)
self.transforms = transforms
@@ -102,138 +110,12 @@ class SimulatedTrading(object):
*self.transforms)
self.trading_client = tsc(algorithm, environment, style)
self.gen = self.trading_client.simulate(self.with_tnfms)
self.results_uri = results_socket_uri
self.results_socket = None
self.context = context
self.sim_id = sim_id
# optional process if we fork simulate into an
# independent process.
self.proc = None
self.send_sighup = False
self.logger = Logger(sim_id)
self.print_logger = Logger('Print')
def __iter__(self):
return self
# exit status flag
self.success = False
def simulate(self, blocking=True, send_sighup=False):
# for non-blocking,
if blocking:
self.run_gen()
else:
self.send_sighup = send_sighup
return self.fork_and_sim()
def fork_and_sim(self):
self.proc = multiprocessing.Process(target=self.run_gen)
self.proc.start()
return self.proc
def run_gen(self):
setproctitle(self.sim_id)
self.open()
if self.zmq_out:
with self.zmq_out.threadbound():
self.stream_results()
# if no log socket, just run the algo normally
else:
self.stream_results()
def stream_results(self):
assert self.results_socket, \
"Results socket must exist to stream results"
try:
for event in self.gen:
if 'daily_perf' in event:
msg = zp.PERF_FRAME(event)
else:
msg = zp.RISK_FRAME(event)
self.results_socket.send(msg)
self.signal_done()
self.success = True
except Exception as exc:
self.handle_exception(exc)
finally:
# not much to do besides log our exit.
self.close()
def signal_done(self):
# notify monitor we're done
done_frame = zp.DONE_FRAME('success')
self.results_socket.send(done_frame)
def close(self):
log.info("Closing Simulation: {id}".format(id=self.sim_id))
if self.results_socket:
self.results_socket.close()
if self.proc and self.send_sighup:
ppid = os.getppid()
if self.success:
log.warning("Sending SIGHUP")
os.kill(ppid, SIGHUP)
else:
log.warning("Sending SIGINT")
os.kill(ppid, SIGINT)
def handle_exception(self, exc):
self.signal_exception(exc)
def signal_exception(self, exc=None):
"""
All exceptions inside any component should boil back to
this handler.
Will inform the system that the component has failed and how it
has failed.
"""
exc_type, exc_value, exc_traceback = sys.exc_info()
try:
log.exception('{id} sending exception to result stream.'\
.format(id=self.sim_id))
msg = zp.EXCEPTION_FRAME(
exc_traceback,
exc_type.__name__,
exc_value.message
)
self.results_socket.send(msg)
except:
log.exception("Exception while reporting simulation exception.")
def open(self):
if not self.context:
self.context = zmq.Context()
if self.results_uri:
sock = self.context.socket(zmq.PUSH)
sock.connect(self.results_uri)
self.results_socket = sock
self.setup_logging()
def setup_logging(self):
assert self.results_socket
# The filter behavior is: matches are logged, mismatches
# are bubbled. If bubble is True, matches are also
# bubbled. Since we do not want user logs in our system
# logs, we set bubble to False.
self.zmq_out = ZeroMQLogHandler(
socket=self.results_socket,
filter=lambda r, h: r.channel in ['Print', 'AlgoLog'],
bubble=False
)
def join(self):
if self.proc:
self.proc.join()
def get_pids(self):
if self.proc:
return [self.proc.pid]
else:
return []
def next(self):
return self.gen.next()
@staticmethod
def create_test_zipline(**config):
@@ -297,10 +179,6 @@ class SimulatedTrading(object):
if not simulation_style:
simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE
zmq_context = config.get('zmq_context', None)
simulation_id = config.get('simulation_id', 'test_simulation')
results_socket_uri = config.get('results_socket_uri', None)
#-------------------
# Trade Source
#-------------------
@@ -341,52 +219,7 @@ class SimulatedTrading(object):
test_algo,
trading_environment,
simulation_style,
results_socket_uri,
zmq_context,
simulation_id)
)
#-------------------
return sim
class SimulatedTradingLite(object):
"""
SimulatedTrading without multiprocess and without zmq.
Useful for profiling the core logic and for rapid testing
of new features.
"""
def __init__(self,
sources,
transforms,
algorithm,
environment,
style):
"""
@sources - an iterable of iterables
These iterables must yield ndicts that contain:
- type :: a ziplines.protocol.DATASOURCE_TYPE
- dt :: a milliseconds since epoch timestamp in UTC
@transforms - An iterable of instances of StatefulTransform.
@algorithm - An object that implements:
`def initialize(self)`
`def handle_data(self, data)`
`def get_sid_filter(self)`
`def set_logger(self, logger)`
`def set_order(self, order_callable)`
@environment - An instance of finance.trading.TradingEnvironment
@style - protocol.SIMULATION_STYLE
"""
self.date_sorted = date_sorted_sources(*sources)
self.transforms = transforms
# Formerly merged_transforms.
self.with_tnfms = sequential_transforms(self.date_sorted,
*self.transforms)
self.trading_client = tsc(algorithm, environment, style)
self.gen = self.trading_client.simulate(self.with_tnfms)
def get_results(self):
return self.gen
+1 -720
View File
@@ -1,668 +1,4 @@
"""
The messaging protocol for Zipline.
Asserts are in place because any protocol error corresponds to a
programmer error so we want it to fail fast and in an obvious way
so it doesn't happen again. ZeroMQ follows the same philosophy.
Notes
=====
Msgpack
-------
Msgpack is the fastest serialization protocol in Python at the
moment. Its 100% C is typically orders of magnitude faster than
json and pickle making it awesome for ZeroMQ.
You can only serialize Python structural primitives: strings,
numeric types, dicts, tuples and lists. Any any recursive
combinations of these.
Basically every basestring in Python corresponds to valid
msgpack message since the protocol is highly error tolerant.
Just keep in mind that if you ever unpack a raw msgpack string
make sure it looks like what you intend and/or catch ValueError
and TypeError exceptions.
It also has the nice benefit of never invoking ``eval`` ( unlike
json and pickle) which is a major security boon since it is
impossible to arbitrary code for evaluation through messages.
UltraJSON
---------
For anything going to the browser UltraJSON is the fastest
serializer, its mostly C as well.
The same domain of serialization as msgpack applies: Python
structural primitives. It also has the additional constraint
that anything outside of UTF8 can cause serious problems, so if
you have a strong desire to JSON encode ancient Sanskrit
( admit it, we all do ), just say no.
Data Structures
===============
Enum
----
Classic C style enumeration::
opts = Enum('FOO', 'BAR')
opts.FOO # 0
opts.BAR # 1
opts.FOO = opts.BAR # False
Oh, and if you do this::
protocol.Enum([1,2,3])
Your interpreter will segfault, think of this like an extreme assert.
Namedict
--------
Namedicts are dict like objects that have fields accessible by attribute lookup
as well as being indexable and iterable::
HEARTBEAT_PROTOCOL = ndict({
'REQ' : b'\x01',
'REP' : b'\x02',
})
HEARTBEAT_PROTOCOL.REQ # syntactic sugar
HEARTBEAT_PROTOCOL.REP # oh suga suga
HEARTBEAT_PROTOCOL['REQ'] # classic dictionary index
Namedtuple
----------
From the standard library, namedtuples are great for specifying
containers for spec'ing data container objects::
from collections import namedtuple
Person = namedtuple('Person', 'name age gender')
bob = Person(name='Bob', age=30, gender='male')
bob.name # 'Bob'
bob.age # 30
bob.gender # male
# The slots on the tuple are also finite and read-only. This
# is a good thing, keeps us honest!
bob.hobby = 'underwater archery'
# Will raise:
# AttributeError: 'Person' object has no attribute 'hobby'
bob.name = 'joe'
# Will raise:
# AttributeError: can't set attribute
# Namedtuples are normally read-only, but you can change the
# internals using a private operation.
bob._replace(gender='female')
# You can also dump out to dictionary form:
OrderedDict([('name', 'Bob'), ('age', 30), ('gender', 'male')])
# Or JSON.
json.dumps(bob._asdict())
'{"gender":"male","age":30,"name":"Bob"}'
"""
import msgpack
import numbers
import datetime
import pytz
import traceback
import re
import os
from collections import namedtuple
from utils.protocol_utils import Enum, FrameExceptionFactory, ndict, namelookup
from utils.date_utils import EPOCH, UN_EPOCH, epoch_now
# -----------------------
# Control Protocol
# -----------------------
PRODUCTION_PREFIXES = ['PERF', 'RISK', 'EXCEPTION','CANCEL','DONE', 'LOG']
PRICE_FIELDS = ['price', 'open', 'close', 'high', 'low']
INVALID_CONTROL_FRAME = FrameExceptionFactory('CONTROL')
CONTROL_STATES = Enum(
'INIT',
'SOURCES_READY',
'RUNNING',
'TERMINATE',
)
CONTROL_PROTOCOL = Enum(
'HEARTBEAT' , # 0 - req
'SHUTDOWN' , # 1 - req
'KILL' , # 2 - req
'GO' , # - req
'OK' , # 3 - rep
'DONE' , # 4 - rep
'EXCEPTION' , # 5 - rep
'READY' , # 6 - rep
)
def CONTROL_FRAME(event, payload):
assert isinstance(event, int,)
assert isinstance(payload, basestring)
return msgpack.dumps(tuple([event, payload]))
def CONTROL_UNFRAME(msg):
"""
A status code and a message.
"""
assert isinstance(msg, basestring)
try:
event, payload = msgpack.loads(msg)
assert isinstance(event, int)
assert isinstance(payload, basestring)
return event, payload
except TypeError:
raise INVALID_CONTROL_FRAME(msg)
except ValueError:
raise INVALID_CONTROL_FRAME(msg)
# -----------------------
# Component State
# -----------------------
COMPONENT_TYPE = Enum(
'SOURCE' , # 0
'CONDUIT' , # 1
'SINK' , # 2
)
COMPONENT_STATE = Enum(
'OK' , # 0
'DONE' , # 1
'EXCEPTION' , # 2
)
# NOFAILURE - Component is either not running or has not failed
# ALGOEXCEPT - Exception thrown in the given algorithm
# HOSTEXCEPT - Exception thrown on our end.
# INTERRUPT - Manually interuptted by user
COMPONENT_FAILURE = Enum(
'NOFAILURE' ,
'ALGOEXCEPT' ,
'HOSTEXCEPT' ,
'INTERRUPT' ,
)
BACKTEST_STATE = Enum(
'IDLE' ,
'QUEUED' ,
'INPROGRESS' ,
'CANCELLED' , # cancelled ( before natural completion )
'EXCEPTION' , # failure ( due to unnatural causes )
'DONE' , # done ( naturally completed )
)
# -----------------------
# Datasource Protocol
# -----------------------
INVALID_DATASOURCE_FRAME = FrameExceptionFactory('DATASOURCE')
def DATASOURCE_FRAME(event):
"""
Wraps any datasource payload with id and type, so that unpacking may choose
the write UNFRAME for the payload.
:param event: ndict with following properties
- *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator)
- *ds_type* a string denoting the datasource type. Must be on of:
- TRADE
- DONE
- (others to follow soon)
- *payload* a msgpack string carrying the payload for the frame
"""
assert isinstance(event.source_id, basestring)
assert isinstance(event.type, int), 'Unexpected type %s' % (event.type)
#datasources will send sometimes send empty msgs to feel gaps
if (event.type == DATASOURCE_TYPE.EMPTY):
return msgpack.dumps(tuple([
event.type,
event.source_id,
"EMPTY"
]))
elif(event.type == DATASOURCE_TYPE.TRADE):
return msgpack.dumps(tuple([
event.type,
event.source_id,
TRADE_FRAME(event)
]))
elif(event.type == DATASOURCE_TYPE.DONE):
return msgpack.dumps(tuple([
event.type,
event.source_id,
"DONE"
]))
else:
raise INVALID_DATASOURCE_FRAME(str(event))
def DATASOURCE_UNFRAME(msg):
"""
Extracts payload, and calls correct UNFRAME method based on the
datasource type passed along.
Returns a dict containing at least:
- source_id: instance-unique string
- type: datasource type
- dt: None, 'DONE' or a datetime object
other properties are added based on the datasource type:
- TRADE
- sid - int security identifier
- price - float
- volume - int
- dt - a datetime object
"""
try:
ds_type, source_id, payload = msgpack.loads(msg)
assert isinstance(ds_type, int)
rval = ndict({'source_id':source_id})
if payload == DATASOURCE_TYPE.EMPTY:
child_value = ndict({'dt':None})
elif(ds_type == DATASOURCE_TYPE.TRADE):
child_value = TRADE_UNFRAME(payload)
elif(ds_type == DATASOURCE_TYPE.DONE):
child_value = ndict({'dt' : 'DONE'})
else:
raise INVALID_DATASOURCE_FRAME(msg)
rval.merge(child_value)
return rval
except TypeError:
raise INVALID_DATASOURCE_FRAME(msg)
except ValueError:
raise INVALID_DATASOURCE_FRAME(msg)
# -----------------------
# Feed Protocol
# -----------------------
INVALID_FEED_FRAME = FrameExceptionFactory('FEED')
def FEED_FRAME(event):
"""
:param event: a ndict with at least
- source_id
- type
- dt
"""
assert isinstance(event, ndict), 'unknown type %s' % str(event)
source_id = event.source_id
ds_type = event.type
PACK_DATE(event)
payload = event.as_dict()
return msgpack.dumps(payload)
def FEED_UNFRAME(msg):
try:
payload = msgpack.loads(msg)
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(payload, dict)
rval = ndict(payload)
assert rval.source_id
assert rval.type in DATASOURCE_TYPE
assert rval.dt
UNPACK_DATE(rval)
return rval
except TypeError:
raise INVALID_FEED_FRAME(msg)
except ValueError:
raise INVALID_FEED_FRAME(msg)
# -----------------------
# Transform Protocol
# -----------------------
INVALID_TRANSFORM_FRAME = FrameExceptionFactory('TRANSFORM')
def TRANSFORM_FRAME(name, value):
assert isinstance(name, basestring)
if value == None:
return msgpack.dumps(tuple([name, TRANSFORM_TYPE.EMPTY]))
return msgpack.dumps(tuple([name, value]))
def TRANSFORM_UNFRAME(msg):
"""
:rtype: ndict with <transform_name>:<transform_value>
"""
try:
name, value = msgpack.loads(msg)
if(value == TRANSFORM_TYPE.EMPTY):
return ndict({name : None})
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(name, basestring)
if(name == TRANSFORM_TYPE.PASSTHROUGH):
value = FEED_UNFRAME(value)
return ndict({name : value})
except TypeError:
raise INVALID_TRANSFORM_FRAME(msg)
except ValueError:
raise INVALID_TRANSFORM_FRAME(msg)
# -----------------------
# Merge Protocol
# -----------------------
INVALID_MERGE_FRAME = FrameExceptionFactory('MERGE')
def MERGE_FRAME(event):
"""
:param event: a nameddict with at least:
- source_id
- type
"""
assert isinstance(event, ndict)
PACK_DATE(event)
payload = event.as_dict()
return msgpack.dumps(payload)
def MERGE_UNFRAME(msg):
try:
payload = msgpack.loads(msg)
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(payload, dict)
payload = ndict(payload)
UNPACK_DATE(payload)
return payload
except TypeError:
raise INVALID_MERGE_FRAME(msg)
except ValueError:
raise INVALID_MERGE_FRAME(msg)
# -----------------------
# Trades
# -----------------------
#
# - Should only be called from inside DATASOURCE_ (UN)FRAME.
def TRADE_FRAME(event):
"""
:param event: should be a ndict with:
- ds_id -- the datasource id sending this trade out
- sid -- the security id
- price -- float of the price printed for the trade
- volume -- int for shares in the trade
- dt -- datetime for the trade
"""
assert isinstance(event, ndict)
assert event.type == DATASOURCE_TYPE.TRADE
assert isinstance(event.sid, int)
for field in PRICE_FIELDS:
assert isinstance(event[field], numbers.Real)
assert isinstance(event.volume, numbers.Integral)
PACK_DATE(event)
return msgpack.dumps(tuple([
event.sid,
event.price,
event.open,
event.close,
event.high,
event.low,
event.volume,
event.dt,
event.type
]))
def TRADE_UNFRAME(msg):
try:
packed = msgpack.loads(msg)
sid, price, open, close, high, low, volume, dt, source_type = packed
assert isinstance(sid, int)
assert isinstance(price, numbers.Real)
assert isinstance(volume, numbers.Integral)
rval = ndict({
'sid' : sid,
'price' : price,
'open' : open,
'close' : close,
'high' : high,
'low' : low,
'volume' : volume,
'dt' : dt,
'type' : source_type
})
UNPACK_DATE(rval)
return rval
except TypeError:
raise INVALID_TRADE_FRAME(msg)
except ValueError:
raise INVALID_TRADE_FRAME(msg)
# -----------------------
# Performance and Risk
# -----------------------
def PERF_FRAME(perf):
"""
Frame the performance update created at the end of each simulated trading
day. The msgpack is a tuple with the first element statically set to 'PERF'.
Like RISK_FRAME, this method calls BT_UPDATE_FRAME internally, so that
clients can call BT_UPDATE_UNFRAME for all messages from the backtest.
:param perf: the dictionary created by zipline.trade_client.perf
:rvalue: a msgpack string
"""
#TODO: add asserts...
assert isinstance(perf['started_at'], datetime.datetime)
assert isinstance(perf['period_start'], datetime.datetime)
assert isinstance(perf['period_end'], datetime.datetime)
assert isinstance(perf['daily_perf'], dict)
assert isinstance(perf['cumulative_perf'], dict)
tp = perf['daily_perf']
cp = perf['cumulative_perf']
assert isinstance(tp['transactions'], list)
# we never want to send transactions for the cumulative period.
# performance.py should never send them, but just to be safe:
assert not cp.has_key('transactions')
assert isinstance(tp['positions'], list)
assert isinstance(cp['positions'], list)
assert isinstance(tp['period_close'], datetime.datetime)
assert isinstance(tp['period_open'], datetime.datetime)
assert isinstance(cp['period_close'], datetime.datetime)
assert isinstance(cp['period_open'], datetime.datetime)
perf['started_at'] = EPOCH(perf['started_at'])
perf['period_start'] = EPOCH(perf['period_start'])
perf['period_end'] = EPOCH(perf['period_end'])
tp['period_close'] = EPOCH(tp['period_close'])
tp['period_open'] = EPOCH(tp['period_open'])
cp['period_close'] = EPOCH(cp['period_close'])
cp['period_open'] = EPOCH(cp['period_open'])
tp['transactions'] = convert_transactions(tp['transactions'])
return BT_UPDATE_FRAME('PERF', perf)
def convert_transactions(transactions):
results = []
for txn in transactions:
txn['date'] = EPOCH(txn['dt'])
del(txn['dt'])
results.append(txn)
return results
def RISK_FRAME(risk):
return BT_UPDATE_FRAME('RISK', risk)
def EXCEPTION_FRAME(exception_tb, name, message):
stack_list = traceback.extract_tb(exception_tb)
rlist = []
for stack in stack_list:
filename = shorten_filename(stack[0])
# default the line to empty string rather than None
line = ''
if stack[3]:
line = stack[3]
rstack = {
'filename' : filename,
'lineno' : stack[1],
'method' : stack[2],
'line' : line
}
rlist.append(rstack)
result = {
'date' : epoch_now(),
'stack' : rlist,
'name' : name,
'message' : message
}
return BT_UPDATE_FRAME('EXCEPTION', result)
def shorten_filename(filename):
if filename == None:
return None
# check if the path contains zipeline
# looks for a zipline directory in the middle of the path
# this will work on
# /zipline/workspace/zipline/core/component.py, but fail for
# /home/fawce/projects/zipline/zipline/core/component.py
path_re = r'(.*)(/zipline/.*[.]py)'
match = re.search(path_re, filename)
if match and match.lastindex == 2:
filepath = match.group(2)
return os.path.join('/zipline',filepath)
else:
# return just the filename.
head, tail = os.path.split(filename)
return tail
def CANCEL_FRAME(date):
result = {
'date' : EPOCH(date)
}
return BT_UPDATE_FRAME('CANCEL', result)
def DONE_FRAME(msg):
assert isinstance(msg, basestring), \
"Done message must be a string."
return BT_UPDATE_FRAME('DONE', msg)
def BT_UPDATE_FRAME(prefix, payload):
"""
Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same
socket. This method provides a prefix to allow for muxing the messages
onto a single socket.
"""
return msgpack.dumps(tuple([prefix, payload]))
def BT_UPDATE_UNFRAME(msg):
"""
Risk, Perf, and LOG framing methods prefix the payload with
a shorthand for their type. That way, all messages received from the socket
can be PERF_FRAMED(), whether they are risk, perf, or log.
"""
prefix, payload = msgpack.loads(msg, use_list=True)
return dict(prefix=prefix, payload=payload)
# -----------------------
# Date Helpers
# -----------------------
def PACK_DATE(event):
"""
Packs the datetime property of event into msgpack'able longs.
This function should be called purely for its side effects.
The event's 'dt' property is replaced by a tuple of integers
- year, month, day, hour, minute, second, microsecond
PACK_DATE and UNPACK_DATE are inverse operations.
:param event: event must a ndict with a property named 'dt' that is a datetime.
:rtype: None
"""
assert isinstance(event.dt, datetime.datetime)
# utc only please
assert event.dt.tzinfo == pytz.utc
event['dt'] = date_to_tuple(event['dt'])
def date_to_tuple(dt):
year, month, day, hour, minute, second = dt.timetuple()[0:6]
micros = dt.microsecond
return tuple([year, month, day, hour, minute, second, micros])
def UNPACK_DATE(event):
"""
Unpacks the datetime property of event from msgpack'able longs.
This function should be called purely for its side effects.
The event's 'dt' property is converted to a datetime by reading and then
combining a tuple of integers.
UNPACK_DATE and PACK_DATE are inverse operations.
:param tuple event: event must a ndict with:
- a property named 'dt_tuple' that is a tuple of integers \
representing the date and time in UTC.
- dt_tuple must have year, month, day, hour, minute, second, and microsecond
:rtype: None
"""
assert isinstance(event.dt, tuple)
assert len(event.dt) == 7
for item in event.dt:
assert isinstance(item, numbers.Integral)
event.dt = tuple_to_date(event.dt)
def tuple_to_date(date_tuple):
year, month, day, hour, minute, second, micros = date_tuple
dt = datetime.datetime(year, month, day, hour, minute, second)
dt = dt.replace(microsecond = micros, tzinfo = pytz.utc)
return dt
from utils.protocol_utils import Enum, ndict, namelookup
# Datasource type should completely determine the other fields of a
# message with its type.
@@ -698,58 +34,3 @@ SIMULATION_STYLE = Enum(
'FIXED_SLIPPAGE',
'NOOP'
)
#Global variables for the fields we extract out of a standard logbook record.
LOG_FIELDS = set(['func_name', 'lineno', 'time', 'msg',\
'level', 'channel', ])
LOG_EXTRA_FIELDS = set(['algo_dt',])
LOG_DONE = "DONE"
def LOG_FRAME(payload):
"""
Expects a dictionary of the form:
{
'algo_dt' : 1199223000, #Algo simulation date.
'time' : 1199223001, #Realtime date of log creation.
'func_name' : 'foo',
'lineno' : 46,
'msg' : 'Successfully disintegrated llama #3',
'level' : 4, #Logbook enum
'channel' : 'MyLogger'
}
Frame checks that we have all expected fields and exports an
event/payload dict as JSON.
"""
assert isinstance(payload, dict), \
"LOG_FRAME expected a dict"
assert payload.has_key('algo_dt'), \
"LOG_FRAME with no algo_dt"
assert payload.has_key('time'), \
"LOG_FRAME with no time"
assert payload.has_key('channel'),\
"LOG_FRAME with no channel"
assert payload.has_key('level'),\
"LOG_FRAME with no level"
assert payload.has_key('msg'),\
"LOG_FRAME with no message"
# truncation will only work with strings and msgpack will
# preserve primitives.
payload['msg'] = str(payload['msg'])
return BT_UPDATE_FRAME('LOG', payload)
def LOG_UNFRAME(msg):
"""
msg should be a tuple of ('LOG',dict)
"""
record = msgpack.loads(msg)
assert isinstance(record, tuple)
assert len(record) == 2
assert record[0] == 'LOG'
payload = record[1]
return payload
+4 -3
View File
@@ -229,7 +229,8 @@ class InitializeTimeoutAlgorithm():
def initialize(self):
import time
from zipline.gens.tradesimulation import INIT_TIMEOUT
time.sleep(INIT_TIMEOUT + 1)
time.sleep(INIT_TIMEOUT + 1000)
def set_order(self, order_callable):
pass
@@ -245,7 +246,7 @@ class InitializeTimeoutAlgorithm():
def get_sid_filter(self):
return [self.sid]
class TooMuchProcessingAlgorithm():
def __init__(self, sid):
self.sid = sid
@@ -270,7 +271,7 @@ class TooMuchProcessingAlgorithm():
def get_sid_filter(self):
return [self.sid]
class TimeoutAlgorithm():
def __init__(self, sid):
+6 -1
View File
@@ -3,7 +3,6 @@ from collections import namedtuple
import time
import pytz
import iso8601
import calendar
from dateutil import rrule
from datetime import datetime, date, timedelta
from dateutil.relativedelta import *
@@ -141,3 +140,9 @@ def date_to_datetime(t):
dt = datetime.fromordinal(t.toordinal())
dt = dt.replace(tzinfo = pytz.utc)
return dt
def tuple_to_date(date_tuple):
year, month, day, hour, minute, second, micros = date_tuple
dt = datetime(year, month, day, hour, minute, second)
dt = dt.replace(microsecond = micros, tzinfo = pytz.utc)
return dt
+6 -4
View File
@@ -9,8 +9,10 @@ from os.path import join, abspath, dirname
from operator import attrgetter
from datetime import datetime, timedelta
from zipline.utils.date_utils import tuple_to_date
from zipline.utils.protocol_utils import ndict
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.gens.tradegens import RandomEquityTrades
from zipline.gens.tradegens import SpecificEquityTrades
@@ -33,7 +35,7 @@ def load_market_data():
bm_list = msgpack.loads(fp_bm.read())
bm_returns = []
for packed_date, returns in bm_list:
event_dt = zp.tuple_to_date(packed_date)
event_dt = tuple_to_date(packed_date)
#event_dt = event_dt.replace(
# hour=0,
# minute=0,
@@ -49,7 +51,7 @@ def load_market_data():
tr_list = msgpack.loads(fp_tr.read())
tr_curves = {}
for packed_date, curve in tr_list:
tr_dt = zp.tuple_to_date(packed_date)
tr_dt = tuple_to_date(packed_date)
#tr_dt = tr_dt.replace(hour=0, minute=0, second=0, tzinfo=pytz.utc)
tr_curves[tr_dt] = curve
@@ -95,7 +97,7 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar):
return trades
def create_txn(sid, price, amount, datetime, btrid=None):
txn = zp.ndict({
txn = ndict({
'sid' : sid,
'amount' : amount,
'dt' : datetime,
+5 -102
View File
@@ -1,20 +1,10 @@
import logbook
import zmq
import pytz
import datetime
from logbook import NOTSET
from logbook.handlers import Handler, FileHandler
from zipline.protocol import LOG_FRAME, LOG_FIELDS, \
LOG_EXTRA_FIELDS
from contextlib import contextmanager
log = logbook.Logger("LogUtils")
class redirecter(object):
class redirector(object):
def __init__(self, logger, name):
self.logger = logger
self.buffer = bytes()
@@ -33,7 +23,7 @@ class redirecter(object):
self.logger.error(out_form)
self.buffer = bytes()
class log_redirecter(object):
class log_redirector(object):
def __init__(self, logger):
self.logger = logger
@@ -57,8 +47,8 @@ def stdout_pipe(logger, pipe_name):
import sys
orig_fds = sys.stdout, sys.stderr
sys.stderr = redirecter(logger, pipe_name)
sys.stdout = redirecter(logger, pipe_name)
sys.stderr = redirector(logger, pipe_name)
sys.stdout = redirector(logger, pipe_name)
yield
sys.stderr.flush()
@@ -72,95 +62,8 @@ def stdout_only_pipe(logger, pipe_name):
"""
import sys
orig_fd = sys.stdout
sys.stdout = log_redirecter(logger)
sys.stdout = log_redirector(logger)
yield
sys.stdout.flush()
sys.stdout = orig_fd
class ZeroMQLogHandler(Handler):
"""
A handler that takes messages captured from the user algorithm stdout
and transforms them into LOG_FRAMES suitable for database storage.
Setup is similar to logbook.queues.ZeroMQHandler, except we connect
instead of binding and we extract record fields into a dict.
"""
def __init__(self, socket=None, level=NOTSET, filter=None, bubble=False,
context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS):
Handler.__init__(self, level, filter, bubble)
try:
import zmq
except ImportError:
raise RuntimeError('The pyzmq library is required for '
'the ZeroMQHandler.')
#: the zero mq context
self.context = context
#: the zero mq socket.
self.socket = socket #self.context.socket(zmq.PUSH)
#self.uri = uri
#if uri is not None:
# self.socket.connect(uri)
self.fds = fds
self.extra_fds = extra_fds
def export_record(self, record):
"""
Extract relevant fields from a log record, fiddling with datetime
fields to make json happy.
"""
from zipline.utils.date_utils import EPOCH
#Needed to extract record info from dictionary.
record.pull_information()
#Logbook stores record times as datetime objects, which
#can't be serialized by JSON, so we need to convert to
#unix epoch representation.
#Do the same if algo_dt is a datetime object.
if record.extra.has_key('algo_dt'):
algo_dt = record.extra['algo_dt']
if isinstance(algo_dt, datetime.datetime):
algo_dt = EPOCH(algo_dt.replace(tzinfo = pytz.utc))
record.extra['algo_dt'] = algo_dt
data = {}
#Extract all the fields we care about from LogRecord's internal
#dictionary.
for field in iter(self.fds):
if record.__dict__.has_key(field):
data[field] = record.__dict__[field]
else:
data[field] = None
for field in iter(self.extra_fds):
if record.extra.has_key(field):
data[field] = record.extra[field]
else:
data[field] = None
if data['time']:
assert isinstance(data['time'], datetime.datetime)
time = data['time'].replace(tzinfo = pytz.utc)
#logbook measures time in utc already, no need to convert.
data['time'] = EPOCH(time)
return data
def emit(self, record):
"""Extract relevant fields and send info as JSON over a zmq socket."""
payload = self.export_record(record)
self.socket.send(LOG_FRAME(payload))
def close(self):
pass
#self.socket.close()
+1 -1
View File
@@ -5,7 +5,7 @@ from collections import MutableMapping
def Enum(*options):
"""
Fast enums are very important when we want really tight zmq
Fast enums are very important when we want really tight
loops. These are probably going to evolve into pure C structs
anyways so might as well get going on that.
"""
+9 -70
View File
@@ -1,7 +1,4 @@
import multiprocessing
import zmq
import time
import zipline.protocol as zp
from datetime import datetime
import blist
from zipline.utils.date_utils import EPOCH
@@ -65,69 +62,24 @@ def check(test, a, b, label=None):
test.assertEqual(a, b, "mismatch on path: " + label)
def drain_zipline(test, zipline, p_blocking=False):
assert test.ctx, "method expects a valid zmq context"
assert test.zipline_test_config, "method expects a valid test config"
assert isinstance(test.zipline_test_config, dict)
assert test.zipline_test_config['results_socket_uri'], \
"need to specify a socket address for logs/perf/risk"
test.receiver = create_receiver(
test.zipline_test_config['results_socket_uri'],
test.ctx
)
# Bind and connect are asynch, so allow time for bind before
# starting the zipline (TSC connects internally).
time.sleep(1)
# start the simulation
zipline.simulate(blocking=p_blocking)
output, transaction_count = drain_receiver(test.receiver)
# some processes will exit after the message stream is
# finished. We block here to avoid collisions with subsequent
# ziplines.
zipline.join()
return output, transaction_count
def create_receiver(socket_addr, ctx):
receiver = ctx.socket(zmq.PULL)
receiver.bind(socket_addr)
return receiver
def drain_receiver(receiver, count=None):
def drain_zipline(test, zipline):
output = []
transaction_count = 0
msg_counter = 0
while True:
msg = receiver.recv()
# start the simulation
for update in zipline:
msg_counter += 1
update = zp.BT_UPDATE_UNFRAME(msg)
output.append(update)
if update['prefix'] == 'PERF':
if 'daily_perf' in update:
transaction_count += \
len(update['payload']['daily_perf']['transactions'])
elif update['prefix'] == 'EXCEPTION':
break
elif update['prefix'] == 'DONE':
break
if count and msg_counter >= count:
break
receiver.close()
del receiver
len(update['daily_perf']['transactions'])
return output, transaction_count
def assert_single_position(test, zipline, blocking=False):
output, transaction_count = drain_zipline(test,
zipline,
p_blocking=blocking)
test.assertEqual(output[-1]['prefix'], 'DONE')
def assert_single_position(test, zipline):
output, transaction_count = drain_zipline(test, zipline)
test.assertEqual(
test.zipline_test_config['order_count'],
@@ -137,8 +89,7 @@ def assert_single_position(test, zipline, blocking=False):
# the final message is the risk report, the second to
# last is the final day's results. Positions is a list of
# dicts.
perfs = [x for x in output if x['prefix'] == 'PERF']
closing_positions = perfs[-2]['payload']['daily_perf']['positions']
closing_positions = output[-2]['daily_perf']['positions']
test.assertEqual(
len(closing_positions),
@@ -154,18 +105,6 @@ def assert_single_position(test, zipline, blocking=False):
)
def launch_component(component):
proc = multiprocessing.Process(target=component.run)
proc.start()
return proc
def launch_monitor(monitor):
proc = multiprocessing.Process(target=monitor.run)
proc.start()
return proc
class ExceptionSource(object):
def __init__(self):