Reverted path independence code in factory.py. Not sure how to fix with new path structure.

This commit is contained in:
Thomas Wiecki
2012-05-15 17:37:27 -04:00
52 changed files with 1225 additions and 1269 deletions
+1
View File
@@ -0,0 +1 @@
# TODO: move qexec console here
+37
View File
@@ -0,0 +1,37 @@
[loggers]
keys=root
[handlers]
keys=consoleHandler,filesystemHandler
[formatters]
keys=ziplineformat
# -------
[logger_root]
level=DEBUG
handlers=consoleHandler,filesystemHandler
qualname=ZiplineLogger
# -------
[handler_filesystemHandler]
class=handlers.RotatingFileHandler
level=DEBUG
formatter=ziplineformat
args=("/var/log/zipline/zipline.log",10*1024*1024,5)
propagate=1
[handler_consoleHandler]
class=StreamHandler
level=ERROR
formatter=ziplineformat
args=(sys.stdout,)
propagate=1
# -------
[formatter_ziplineformat]
format=%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S %Z
+9 -8
View File
@@ -1,15 +1,16 @@
import logging
from gevent_zeromq import zmq
import zipline.util as qutil
import zipline.messaging as qmsg
import zipline.protocol as zp
from zipline.core.component import Component
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE
from zipline.finance.trading import TradeSimulationClient
class TestClient(qmsg.Component):
LOGGER = logging.getLogger('ZiplineLogger')
class TestClient(Component):
def __init__(self):
qmsg.Component.__init__(self)
Component.__init__(self)
self.init()
def init(self):
@@ -55,7 +56,7 @@ class TestClient(qmsg.Component):
#logger.info('msg:' + str(msg))
if msg == str(CONTROL_PROTOCOL.DONE):
qutil.LOGGER.info("Client is DONE!")
LOGGER.info("Client is DONE!")
self.signal_done()
return
@@ -79,7 +80,7 @@ class TestClient(qmsg.Component):
self.prev_dt = event.dt
if self.received_count % 100 == 0:
qutil.LOGGER.info("received {n} messages".format(n=self.received_count))
LOGGER.info("received {n} messages".format(n=self.received_count))
def unframe(self, msg):
return zp.MERGE_UNFRAME(msg)
@@ -1,5 +1,6 @@
"""Tests for the zipline.finance package"""
import mock
"""
Tests for the zipline.finance package
"""
import pytz
from unittest2 import TestCase
@@ -8,22 +9,16 @@ from collections import defaultdict
from nose.tools import timed
import zipline.test.factory as factory
import zipline.util as qutil
import zipline.finance.risk as risk
import zipline.utils.factory as factory
import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.test.algorithms import TestAlgorithm
from zipline.sources import SpecificEquityTrades
from zipline.finance.trading import TransactionSimulator, \
TradeSimulationClient, TradingEnvironment
from zipline.simulator import AddressAllocator, Simulator
from zipline.monitor import Controller
from zipline.test_algorithms import TestAlgorithm
from zipline.finance.trading import TradingEnvironment
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
from zipline.finance.performance import PerformanceTracker
from zipline.protocol_utils import namedict
from zipline.finance.trading import SIMULATION_STYLE
from zipline.utils.protocol_utils import ndict
from zipline.finance.trading import TransactionSimulator, SIMULATION_STYLE
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
@@ -35,7 +30,7 @@ class FinanceTestCase(TestCase):
leased_sockets = defaultdict(list)
def setUp(self):
qutil.configure_logging()
#qutil.configure_logging()
self.zipline_test_config = {
'allocator':allocator,
'sid':133
@@ -148,7 +143,7 @@ class FinanceTestCase(TestCase):
# TODO: for some reason the orders aren't filled without an extra
# trade.
trade_count = 5001
trade_count = 5
self.zipline_test_config['order_count'] = trade_count - 1
self.zipline_test_config['trade_count'] = trade_count
self.zipline_test_config['order_amount'] = 1
@@ -156,7 +151,7 @@ class FinanceTestCase(TestCase):
# tell the simulator to fill the orders in individual transactions
# matching the order volume exactly.
self.zipline_test_config['simulation_style'] = \
SIMULATION_STYLE.FIXED_SLIPPAGE
SIMULATION_STYLE.FIXED_SLIPPAGE
self.zipline_test_config['environment'] = factory.create_trading_environment()
sid_list = [self.zipline_test_config['sid']]
@@ -416,7 +411,7 @@ class FinanceTestCase(TestCase):
alternate = params.get('alternate')
# if present, expect transaction amounts to match orders exactly.
complete_fill = params.get('complete_fill')
trading_environment = factory.create_trading_environment()
trade_sim = TransactionSimulator()
price = [10.1] * trade_count
@@ -424,22 +419,22 @@ class FinanceTestCase(TestCase):
start_date = trading_environment.first_open
sid = 1
generated_trades = factory.create_trade_history(
sid,
price,
volume,
trade_interval,
trading_environment
generated_trades = factory.create_trade_history(
sid,
price,
volume,
trade_interval,
trading_environment
)
if alternate:
alternator = -1
else:
alternator = 1
order_date = start_date
for i in xrange(order_count):
order = namedict(
order = ndict(
{
'sid' : sid,
'amount' : order_amount * alternator**i,
@@ -448,7 +443,7 @@ class FinanceTestCase(TestCase):
})
trade_sim.add_open_order(order)
order_date = order_date + order_interval
# move after market orders to just after market next
# market open.
@@ -456,40 +451,40 @@ class FinanceTestCase(TestCase):
if order_date.minute >= 00:
order_date = order_date + timedelta(days=1)
order_date = order_date.replace(hour=14, minute=30)
# there should now be one open order list stored under the sid
oo = trade_sim.open_orders
self.assertEqual(len(oo), 1)
self.assertTrue(oo.has_key(sid))
order_list = oo[sid]
self.assertEqual(order_count, len(order_list))
for i in xrange(order_count):
order = order_list[i]
self.assertEqual(order.sid, sid)
self.assertEqual(order.amount, order_amount * alternator**i)
tracker = PerformanceTracker(trading_environment)
# this approximates the loop inside TradingSimulationClient
transactions = []
for trade in generated_trades:
if trade_delay:
trade.dt = trade.dt + trade_delay
txn = trade_sim.apply_trade_to_open_orders(trade)
if txn:
transactions.append(txn)
trade.TRANSACTION = txn
transactions.append(txn)
trade.TRANSACTION = txn
else:
trade.TRANSACTION = None
tracker.process_event(trade)
tracker.process_event(trade)
if complete_fill:
self.assertEqual(len(transactions), len(order_list))
self.assertEqual(len(transactions), len(order_list))
total_volume = 0
for i in xrange(len(transactions)):
txn = transactions[i]
@@ -497,18 +492,18 @@ class FinanceTestCase(TestCase):
if complete_fill:
order = order_list[i]
self.assertEqual(order.amount, txn.amount)
self.assertEqual(total_volume, expected_txn_volume)
self.assertEqual(total_volume, expected_txn_volume)
self.assertEqual(len(transactions), expected_txn_count)
cumulative_pos = tracker.cumulative_performance.positions[sid]
self.assertEqual(total_volume, cumulative_pos.amount)
# the open orders should now be empty
oo = trade_sim.open_orders
self.assertTrue(oo.has_key(sid))
order_list = oo[sid]
self.assertEqual(0, len(order_list))
@@ -1,4 +1,4 @@
from zipline.protocol_utils import ndict, namedict
from zipline.utils.protocol_utils import ndict
def test_ndict():
nd = ndict({})
@@ -21,11 +21,18 @@ def test_ndict():
assert 'x' in nd
assert 'y' not in nd
# Mutability
nd2 = ndict({'x': 1})
assert nd2.x == 1
nd2.x = 2
assert nd2.x == 2
# Class isolation
assert '__init__' not in nd
assert '__iter__' not in nd
assert not nd.__dict__.has_key('x')
assert nd.get('__init__') is None
assert 'x' not in set(dir(nd))
# Comparison
nd2 = nd.copy()
@@ -4,18 +4,19 @@ import random
import datetime
import pytz
import zipline.test.factory as factory
import zipline.test.algorithms
import zipline.util as qutil
import zipline.utils.factory as factory
import zipline.test_algorithms
#import zipline.util as qutil
import zipline.finance.performance as perf
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.finance.trading import TradeSimulationClient, TradingEnvironment, \
SIMULATION_STYLE
SIMULATION_STYLE
class PerformanceTestCase(unittest.TestCase):
def setUp(self):
qutil.configure_logging()
#qutil.configure_logging()
self.benchmark_returns, self.treasury_curves = \
factory.load_market_data()
@@ -546,7 +547,7 @@ shares in position"
#create a transaction for all but
#first trade in each sid, to simulate None transaction
if(event.dt != self.trading_environment.period_start):
txn = zp.namedict({
txn = zp.ndict({
'sid' : event.sid,
'amount' : -25,
'dt' : event.dt,
@@ -565,4 +566,4 @@ shares in position"
cumulative_pos = perf_tracker.cumulative_performance.positions[sid]
expected_size = txn_count / 2 * -25
self.assertEqual(cumulative_pos.amount, expected_size)
@@ -9,11 +9,11 @@ from collections import defaultdict
from nose.tools import timed
import zipline.test.factory as factory
import zipline.util as qutil
import zipline.utils.factory as factory
from zipline.utils import logger
import zipline.protocol as zp
from zipline.sources import SpecificEquityTrades
from zipline.finance.sources import SpecificEquityTrades
DEFAULT_TIMEOUT = 5 # seconds
@@ -22,7 +22,7 @@ class ProtocolTestCase(TestCase):
leased_sockets = defaultdict(list)
def setUp(self):
qutil.configure_logging()
#qutil.configure_logging()
self.trading_environment = factory.create_trading_environment()
@timed(DEFAULT_TIMEOUT)
@@ -45,7 +45,7 @@ class ProtocolTestCase(TestCase):
for trade in trades:
#simulate data source sending frame
msg = zp.DATASOURCE_FRAME(zp.namedict(trade))
msg = zp.DATASOURCE_FRAME(zp.ndict(trade))
#feed unpacking frame
recovered_trade = zp.DATASOURCE_UNFRAME(msg)
#feed sending frame
@@ -74,13 +74,13 @@ class ProtocolTestCase(TestCase):
self.assertTrue(event.helloworld == 2345.6)
event.delete('helloworld')
self.assertEqual(zp.namedict(trade), event)
self.assertEqual(zp.ndict(trade), event)
@timed(DEFAULT_TIMEOUT)
def test_order_protocol(self):
#client places an order
now = datetime.utcnow().replace(tzinfo=pytz.utc)
order = zp.namedict({
order = zp.ndict({
'dt':now,
'sid':133,
'amount':100
@@ -94,7 +94,7 @@ class ProtocolTestCase(TestCase):
self.assertEqual(order.dt, now)
#order datasource datasource frames the order
order_event = zp.namedict({
order_event = zp.ndict({
"sid" : order.sid,
"amount" : order.amount,
"dt" : order.dt,
@@ -111,7 +111,7 @@ class ProtocolTestCase(TestCase):
self.assertEqual(now, recovered_order.dt)
#create a transaction from the order
txn = zp.namedict({
txn = zp.ndict({
'sid' : recovered_order.sid,
'amount' : recovered_order.amount,
'dt' : recovered_order.dt,
@@ -4,15 +4,14 @@ import datetime
import calendar
import pytz
import zipline.finance.risk as risk
import zipline.test.factory as factory
import zipline.util as qutil
from zipline.utils import factory
from zipline.finance.trading import TradingEnvironment
class Risk(unittest.TestCase):
def setUp(self):
qutil.configure_logging()
#qutil.configure_logging()
start_date = datetime.datetime(
year=2006,
month=1,
@@ -354,4 +353,4 @@ RETURNS = [
0.048 , -0.0307, -0.0357, 0.0033, -0.0412, -0.0407, 0.0455,
0.0159, -0.0051, -0.0274, -0.0213, 0.0361, 0.0051, -0.0378,
0.0084, 0.0066, -0.0103, -0.0037, 0.0478, -0.0278
]
]
+19 -2
View File
@@ -1,3 +1,20 @@
"""
QSim provides asynchronous simulation of historic data streams, simulated trade execution, and data stream transformations.
"""
Zipline
"""
# This is *not* a place to dump arbitrary classes/modules for convenience,
# it is a place to expose the public interfaces.
import protocol # namespace
from core.monitor import Controller
from lines import SimulatedTrading
from core.host import ComponentHost
from utils.protocol_utils import ndict
__all__ = [
SimulatedTrading,
Controller,
ComponentHost,
protocol,
ndict
]
+11
View File
@@ -0,0 +1,11 @@
from feed import Feed
from merge import Merge
from passthrough import PassthroughTransform
from datasource import DataSource
__all__ = [
Feed,
Merge,
PassthroughTransform,
DataSource,
]
+66
View File
@@ -0,0 +1,66 @@
"""
Commonly used messaging components.
"""
import logging
import zipline.protocol as zp
from zipline.core.component import Component
from zipline.protocol import COMPONENT_TYPE
LOGGER = logging.getLogger('ZiplineLogger')
class DataSource(Component):
"""
Baseclass for data sources. Subclass and implement send_all - usually this
means looping through all records in a store, converting to a dict, and
calling send(map).
Every datasource has a dict property to hold filters::
- key -- name of the filter, e.g. SID
- value -- a primitive representing the filter. e.g. a list of ints.
Modify the datasource's filters via the set_filter(name, value)
"""
def __init__(self, source_id):
Component.__init__(self)
self.id = source_id
self.init()
self.filter = {}
def init(self):
self.cur_event = None
def set_filter(self, name, value):
self.filter[name] = value
@property
def get_id(self):
return self.id
@property
def get_type(self):
return COMPONENT_TYPE.SOURCE
def open(self):
self.data_socket = self.connect_data()
def send(self, event):
"""
Emit data.
"""
assert isinstance(event, zp.ndict)
event['source_id'] = self.get_id
event['type'] = self.get_type
try:
ds_frame = self.frame(event)
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
self.data_socket.send(ds_frame)
def frame(self, event):
return zp.DATASOURCE_FRAME(event)
+209
View File
@@ -0,0 +1,209 @@
import logging
from collections import Counter
from zipline.core.component import Component
import zipline.protocol as zp
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
CONTROL_FRAME, CONTROL_UNFRAME
LOGGER = logging.getLogger('ZiplineLogger')
class Feed(Component):
"""
Connects to N PULL sockets, publishing all messages received to a PUB
socket. Published messages are guaranteed to be in chronological order
based on message property dt. Expects to be instantiated in one execution
context (thread, process, etc) and run in another.
"""
def __init__(self):
Component.__init__(self)
self.sent_count = 0
self.received_count = 0
self.draining = False
self.ds_finished_counter = 0
# Depending on the size of this, might want to use a data
# structure with better asymptotics.
self.data_buffer = {}
# source_id -> integer count
self.sent_counters = Counter()
self.recv_counters = Counter()
def init(self):
pass
@property
def get_id(self):
return "FEED"
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
# -------------
# Core Methods
# -------------
def open(self):
self.pull_socket = self.bind_data()
self.feed_socket = self.bind_feed()
def do_work(self):
# wait for synchronization reply from the host
socks = dict(self.poll.poll(self.heartbeat_timeout))
# TODO: Abstract this out, maybe on base component
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# -- Heartbeat --
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.control_out.send(heartbeat_frame)
# -- Soft Kill --
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# -- Hard Kill --
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN:
message = self.pull_socket.recv()
if message == str(CONTROL_PROTOCOL.DONE):
self.ds_finished_counter += 1
if len(self.data_buffer) == self.ds_finished_counter:
#drain any remaining messages in the buffer
LOGGER.debug("draining feed")
self.drain()
self.signal_done()
else:
try:
event = self.unframe(message)
# deserialization error
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
try:
self.append(event)
self.send_next()
# Invalid message
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
def unframe(self, msg):
return zp.DATASOURCE_UNFRAME(msg)
def frame(self, event):
return zp.FEED_FRAME(event)
# -------------
# Flow Control
# -------------
def drain(self):
"""
Send all messages in the buffer.
"""
self.draining = True
while self.pending_messages() > 0:
self.send_next()
def send_next(self):
"""
Send the (chronologically) next message in the buffer.
"""
if not (self.is_full() or self.draining):
return
event = self.next()
if(event != None):
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
self.sent_counters[event.source_id] += 1
self.sent_count += 1
def append(self, event):
"""
Add an event to the buffer for the source specified by
source_id.
"""
self.data_buffer[event.source_id].append(event)
self.recv_counters[event.source_id] += 1
self.received_count += 1
def next(self):
"""
Get the next message in chronological order.
"""
if not(self.is_full() or self.draining):
return
cur_source = None
earliest_source = None
earliest_event = None
#iterate over the queues of events from all sources
#(1 queue per datasource)
for events in self.data_buffer.values():
if len(events) == 0:
continue
cur_source = events
first_in_list = events[0]
if first_in_list.dt == None:
#this is a filler event, discard
events.pop(0)
continue
if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt):
earliest_event = first_in_list
earliest_source = cur_source
if earliest_event != None:
return earliest_source.pop(0)
def is_full(self):
"""
Indicates whether the buffer has messages in buffer for
all un-DONE, blocking sources.
"""
for source_id, events in self.data_buffer.iteritems():
if len(events) == 0:
return False
return True
def pending_messages(self):
"""
Returns the count of all events from all sources in the
buffer.
"""
total = 0
for events in self.data_buffer.values():
total += len(events)
return total
def add_source(self, source_id):
"""
Add a data source to the buffer.
"""
self.data_buffer[source_id] = []
def __len__(self):
"""
Buffer's length is same as internal map holding separate
sorted arrays of events keyed by source id.
"""
return len(self.data_buffer)
+68
View File
@@ -0,0 +1,68 @@
from feed import Feed
import zipline.protocol as zp
from zipline.protocol import COMPONENT_TYPE
# TODO: By Liskov merge must *be* a feed, don't believe this is
# the case.
class Merge(Feed):
"""
Merges multiple streams of events into single messages.
"""
def __init__(self):
Feed.__init__(self)
self.init()
def init(self):
pass
@property
def get_id(self):
return "MERGE"
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
def open(self):
self.pull_socket = self.bind_merge()
self.feed_socket = self.bind_result()
def next(self):
"""Get the next merged message from the feed buffer."""
if not (self.is_full() or self.draining):
return
if self.pending_messages() == 0:
return
#get the raw event from the passthrough transform.
result = self.data_buffer[zp.TRANSFORM_TYPE.PASSTHROUGH].pop(0).PASSTHROUGH
for source, events in self.data_buffer.iteritems():
if source == zp.TRANSFORM_TYPE.PASSTHROUGH:
continue
if len(events) > 0:
cur = events.pop(0)
result.merge(cur)
return result
def unframe(self, msg):
return zp.TRANSFORM_UNFRAME(msg)
def frame(self, event):
return zp.MERGE_FRAME(event)
def append(self, event):
"""
:param event: a ndict with one entry. key is the name of the
transform, value is the transformed value.
Add an event to the buffer for the source specified by
source_id.
"""
self.data_buffer[event.keys()[0]].append(event)
self.received_count += 1
+35
View File
@@ -0,0 +1,35 @@
import zipline.protocol as zp
from zipline.transforms import BaseTransform
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
class PassthroughTransform(BaseTransform):
"""
A bypass transform which is also an identity transform::
+-------+
+---| f |--->
+-------+
+------id------->
"""
def __init__(self, **kwargs):
BaseTransform.__init__(self, "PASSTHROUGH")
self.init(**kwargs)
def init(self, **kwargs):
pass
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
#TODO, could save some cycles by skipping the _UNFRAME call
# and just setting value to original msg string.
def transform(self, event):
return {
'name' : zp.TRANSFORM_TYPE.PASSTHROUGH,
'value' : zp.FEED_FRAME(event)
}
+9
View File
@@ -0,0 +1,9 @@
from host import ComponentHost
from component import Component
from monitor import Controller
__all__ = [
Component,
Controller,
ComponentHost
]
@@ -1,6 +1,4 @@
"""
Commonly used messaging components.
Contains the base class for all components.
"""
@@ -9,7 +7,7 @@ import sys
import uuid
import time
import socket
import gevent
import logging
import traceback
import humanhash
@@ -20,13 +18,11 @@ import gevent_zeromq
# zmq_ctypes
#import zmq_ctypes
from datetime import datetime
import zipline.util as qutil
from zipline.gpoll import _Poller as GeventPoller
from zipline.utils.gpoll import _Poller as GeventPoller
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
COMPONENT_FAILURE, BACKTEST_STATE, CONTROL_FRAME
COMPONENT_FAILURE, CONTROL_FRAME
LOGGER = logging.getLogger('ZiplineLogger')
class Component(object):
"""
@@ -315,7 +311,7 @@ class Component(object):
)
self.control_out.send(exception_frame)
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
def signal_done(self):
"""
@@ -342,7 +338,7 @@ class Component(object):
#notify internal work look that we're done
self.done = True # TODO: use state flag
qutil.LOGGER.info("[%s] DONE" % self.get_id)
LOGGER.info("[%s] DONE" % self.get_id)
# -----------
# Messaging
@@ -462,7 +458,7 @@ class Component(object):
DEPRECATED, left in for compatability for now.
"""
qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
self.sync_socket = self.context.socket(self.zmq.REQ)
self.sync_socket.connect(self.addresses['sync_address'])
@@ -3,11 +3,7 @@ Simulator hosts all the components necessary to execute a simluation. See :py:me
"""
import threading
import mock
from collections import defaultdict
from zipline.monitor import Controller
from zipline.messaging import ComponentHost
import zipline.util as qutil
from zipline.core import ComponentHost
class AddressAllocator(object):
@@ -35,7 +31,7 @@ class Simulator(ComponentHost):
ComponentHost.__init__(self, addresses)
self.subthreads = []
self.running = False
@property
def get_id(self):
return 'Simple Simulator'
+164
View File
@@ -0,0 +1,164 @@
import logging
import datetime
from component import Component
from zipline.transforms import BaseTransform
from zipline.components import Feed, Merge, PassthroughTransform, \
DataSource
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
LOGGER = logging.getLogger('ZiplineLogger')
class ComponentHost(Component):
"""
Components that can launch multiple sub-components, synchronize their
start, and then wait for all components to be finished.
"""
def __init__(self, addresses):
Component.__init__(self)
self.addresses = addresses
self.running = False
self.init()
def init(self):
assert hasattr(self, 'zmq_flavor'), """
You must specify a flavor of ZeroMQ for all
ComponentHost subclasses. """
# Component Registry, keyed by get_id
# ----------------------
self.components = {}
# ----------------------
# Internal Registry, keyed by guid
self._components = {}
# ----------------------
self.sync_register = {}
self.timeout = datetime.timedelta(seconds=60)
self.feed = Feed()
self.merge = Merge()
self.passthrough = PassthroughTransform()
self.controller = None
#register the feed and the merge
self.register_components([self.feed, self.merge, self.passthrough])
def register_controller(self, controller):
"""
Add the given components to the registry. Establish
communication with them.
"""
if self.controller != None:
raise Exception("There can be only one!")
self.controller = controller
self.controller.zmq_flavor = self.zmq_flavor
# Propogate the controller to all the subcomponents
for component in self.components.itervalues():
component.controller = controller
def register_components(self, components):
"""
Add the given components to the registry. Establish
communication with them.
"""
assert isinstance(components, list)
for component in components:
component.addresses = self.addresses
component.controller = self.controller
# Hosts share their zmq flavor with hosted components
component.zmq_flavor = self.zmq_flavor
self._components[component.guid] = component
self.components[component.get_id] = component
self.sync_register[component.get_id] = datetime.datetime.utcnow()
if isinstance(component, DataSource):
self.feed.add_source(component.get_id)
if isinstance(component, BaseTransform):
self.merge.add_source(component.get_id)
def unregister_component(self, component_id):
del self.components[component_id]
del self.sync_register[component_id]
def setup_sync(self):
"""
Setup the sync socket and poller. ( Bind )
"""
LOGGER.debug("Connecting sync server.")
self.sync_socket = self.context.socket(self.zmq.REP)
self.sync_socket.bind(self.addresses['sync_address'])
self.sync_poller = self.zmq_poller()
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
self.sockets.append(self.sync_socket)
def open(self):
for component in self.components.values():
self.launch_component(component)
self.launch_controller()
def is_running(self):
"""
DEPRECATED, left in for compatability for now.
"""
cur_time = datetime.datetime.utcnow()
if len(self.components) == 0:
LOGGER.info("Component register is empty.")
return False
return True
def loop(self, lockstep=True):
while self.is_running():
# wait for synchronization request at start, and DONE at end.
# don't timeout.
socks = dict(self.sync_poller.poll())
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
msg = self.sync_socket.recv()
try:
parts = msg.split(':')
sync_id, status = parts
except ValueError as exc:
self.signal_exception(exc)
if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around
LOGGER.debug("{id} is DONE".format(id=sync_id))
self.unregister_component(sync_id)
self.state_flag = COMPONENT_STATE.DONE
else:
self.sync_register[sync_id] = datetime.datetime.utcnow()
#qutil.LOGGER.info("confirmed {id}".format(id=msg))
# send synchronization reply
self.sync_socket.send('ack', self.zmq.NOBLOCK)
# ------------------
# Simulation Control
# ------------------
def launch_controller(self, controller):
raise NotImplementedError
def launch_component(self, component):
raise NotImplementedError
def teardown_component(self, component):
raise NotImplementedError
+12 -11
View File
@@ -1,18 +1,18 @@
import zmq
import time
import gevent
import itertools
# pyzmq
import zmq
import logging
import gevent_zeromq
from collections import OrderedDict
from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
states = CONTROL_STATES
from gpoll import _Poller as GeventPoller
from zipline.utils.gpoll import _Poller as GeventPoller
# Roll Call ( Discovery )
# -----------------------
@@ -159,7 +159,7 @@ class Controller(object):
debug = False
period = 1
def __init__(self, pub_socket, route_socket, logging = None):
def __init__(self, pub_socket, route_socket, logger = None):
self.context = None
self.zmq = None
@@ -182,11 +182,11 @@ class Controller(object):
self.error_replay = OrderedDict()
if logging:
self.logging = logging
if logger:
self.logging = logger
else:
import util as qutil
self.logging = qutil.LOGGER
default_logger = logging.getLogger('ZiplineLogger')
self.logging = default_logger
def init_zmq(self, flavor):
@@ -355,7 +355,7 @@ class Controller(object):
if tic - self.ctime > self.period:
break
if self.router in socks and socks[self.router] == self.zmq.POLLIN:
if socks.get(self.router) == self.zmq.POLLIN:
rawmessage = self.router.recv()
if rawmessage:
@@ -369,9 +369,10 @@ class Controller(object):
self.logging.error('Invalid frame', rawmessage)
pass
if self.cancel in socks and socks[self.cancel] == self.zmq.POLLIN:
if socks.get(self.cancel) == self.zmq.POLLIN:
self.logging.info('[Controller] Received Cancellation')
rawmessage = self.cancel.recv()
self.cancel.send('')
self.shutdown(soft=True)
break
+1 -1
View File
@@ -1,7 +1,7 @@
from datetime import timedelta
from collections import defaultdict
from zipline.messaging import BaseTransform
from zipline.transforms.base import BaseTransform
class MovingAverageTransform(BaseTransform):
+21 -18
View File
@@ -40,8 +40,8 @@ Performance Tracking
| | through all the events delivered to this tracker. |
| | For details look at the comments for |
| | :py:meth:`zipline.finance.risk.RiskMetrics.to_dict`|
+-----------------+----------------------------------------------------+
| exceeded_max_ | True if the simulation was stopped because single |
+-----------------+----------------------------------------------------+
| exceeded_max_ | True if the simulation was stopped because single |
| loss | day losses exceeded the max_drawdown stipulated in |
| | trading_environment. |
+-----------------+----------------------------------------------------+
@@ -110,6 +110,8 @@ Performance Period
"""
import logging
import datetime
import pytz
import msgpack
@@ -118,10 +120,11 @@ import math
import zmq
import zipline.util as qutil
import zipline.protocol as zp
import zipline.finance.risk as risk
LOGGER = logging.getLogger('ZiplineLogger')
class PerformanceTracker():
"""
Tracks the performance of the zipline as it is running in
@@ -188,7 +191,7 @@ class PerformanceTracker():
)
def get_portfolio(self):
return self.cumulative_performance.to_namedict()
return self.cumulative_performance.to_ndict()
def publish_to(self, zmq_socket, context=None):
"""
@@ -228,7 +231,7 @@ class PerformanceTracker():
if self.exceeded_max_loss:
return
assert isinstance(event, zp.namedict)
assert isinstance(event, zp.ndict)
self.event_count += 1
if(event.dt >= self.market_close):
@@ -280,8 +283,8 @@ class PerformanceTracker():
returns = self.todays_performance.returns
max_dd = -1 * self.trading_environment.max_drawdown
if returns < max_dd:
qutil.LOGGER.info(str(returns) + " broke through " + str(max_dd))
qutil.LOGGER.info("Exceeded max drawdown.")
LOGGER.info(str(returns) + " broke through " + str(max_dd))
LOGGER.info("Exceeded max drawdown.")
# mark the perf period with max loss flag,
# so it shows up in the update, but don't end the test
# here. Let the update go out before stopping
@@ -316,8 +319,8 @@ class PerformanceTracker():
"""
log_msg = "Simulated {n} trading days out of {m}."
qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
# the stream will end on the last trading day, but will not trigger
# an end of day, so we trigger the final market close here.
@@ -332,7 +335,7 @@ class PerformanceTracker():
)
if self.result_stream:
qutil.LOGGER.info("about to stream the risk report...")
LOGGER.info("about to stream the risk report...")
risk_dict = self.risk_report.to_dict()
msg = zp.RISK_FRAME(risk_dict)
@@ -518,18 +521,18 @@ class PerformancePeriod():
return rval
def to_namedict(self):
def to_ndict(self):
"""
Creates a namedict representing the state of this perfomance period.
Creates a ndict representing the state of this perfomance period.
Properties are the same as the results of to_dict. See header comments
for a detailed description.
"""
positions = self.get_positions(namedicted=True)
positions = self.get_positions(ndicted=True)
positions = zp.namedict(positions)
positions = zp.ndict(positions)
return zp.namedict({
return zp.ndict({
'ending_value' : self.ending_value,
'capital_used' : self.period_capital_used,
'starting_value' : self.starting_value,
@@ -542,12 +545,12 @@ class PerformancePeriod():
'transactions' : self.processed_transactions
})
def get_positions(self, namedicted=False):
def get_positions(self, ndicted=False):
positions = {}
for sid, pos in self.positions.iteritems():
cur = pos.to_dict()
if namedicted:
positions[sid] = zp.namedict(cur)
if ndicted:
positions[sid] = zp.ndict(cur)
else:
positions[sid] = cur
+1 -4
View File
@@ -1,8 +1,5 @@
import pandas
from datetime import timedelta
from collections import defaultdict
from zipline.messaging import BaseTransform
from zipline.transforms.base import BaseTransform
class ReturnsTransform(BaseTransform):
+3 -2
View File
@@ -36,14 +36,15 @@ Risk Report
"""
import logging
import datetime
import math
import pytz
import numpy as np
import numpy.linalg as la
import zipline.util as qutil
import zipline.protocol as zp
LOGGER = logging.getLogger('ZiplineLogger')
def advance_by_months(dt, jump_in_months):
month = dt.month + jump_in_months
@@ -243,7 +244,7 @@ class RiskMetrics():
cur_return = math.log(1.0 + r) + cur_return
#this is a guard for a single day returning -100%
except ValueError:
qutil.LOGGER.warn("{cur} return, zeroing the returns".format(cur=cur_return))
LOGGER.warn("{cur} return, zeroing the returns".format(cur=cur_return))
cur_return = 0.0
compounded_returns.append(cur_return)
@@ -6,11 +6,12 @@ import random
import pytz
from mock import Mock
import zipline.messaging as zm
from zipline.components import DataSource
from zipline.utils import ndict
import zipline.protocol as zp
class TradeDataSource(zm.DataSource):
class TradeDataSource(DataSource):
def send(self, event):
"""
@@ -24,7 +25,7 @@ class TradeDataSource(zm.DataSource):
if event.sid in self.filter['SID']:
message = zp.DATASOURCE_FRAME(event)
else:
blank = zp.namedict({
blank = ndict({
"type" : zp.DATASOURCE_TYPE.TRADE,
"source_id" : self.get_id
})
@@ -39,7 +40,7 @@ class RandomEquityTrades(TradeDataSource):
"""
def __init__(self, sid, source_id, count):
zm.DataSource.__init__(self, source_id)
DataSource.__init__(self, source_id)
self.count = count
self.incr = 0
self.sid = sid
@@ -59,7 +60,7 @@ class RandomEquityTrades(TradeDataSource):
self.price = self.price + random.uniform(-0.05, 0.05)
volume = random.randrange(100,10000,100)
event = zp.namedict({
event = zp.ndict({
"type" : zp.DATASOURCE_TYPE.TRADE,
"sid" : self.sid,
"price" : self.price,
@@ -70,7 +71,6 @@ class RandomEquityTrades(TradeDataSource):
self.incr += 1
class SpecificEquityTrades(TradeDataSource):
"""
Generates a random stream of trades for testing.
@@ -88,7 +88,7 @@ class SpecificEquityTrades(TradeDataSource):
'volume' : integer for volume
}
"""
zm.DataSource.__init__(self, source_id)
DataSource.__init__(self, source_id)
self.event_list = event_list
self.count = 0
@@ -113,7 +113,5 @@ class SpecificEquityTrades(TradeDataSource):
return
event = self.event_list.pop(0)
self.send(zp.namedict(event))
self.send(zp.ndict(event))
self.count +=1
+20 -29
View File
@@ -1,3 +1,4 @@
import logging
import datetime
import pytz
import math
@@ -6,14 +7,12 @@ import time
from collections import Counter
# from gevent.select import select
from zmq.core.poll import select
import zipline.messaging as qmsg
import zipline.util as qutil
from zipline.core import Component
import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.protocol_utils import Enum, ndict
from zipline.utils.protocol_utils import Enum, ndict
# the simulation style enumerates the available transaction simulation
# strategies.
@@ -24,10 +23,12 @@ SIMULATION_STYLE = Enum(
'NOOP'
)
class TradeSimulationClient(qmsg.Component):
LOGGER = logging.getLogger('ZiplineLogger')
class TradeSimulationClient(Component):
def __init__(self, trading_environment, sim_style):
qmsg.Component.__init__(self)
Component.__init__(self)
self.received_count = 0
self.prev_dt = None
self.event_queue = None
@@ -41,8 +42,7 @@ class TradeSimulationClient(qmsg.Component):
self.last_msg_dt = datetime.datetime.utcnow()
self.txn_sim = TransactionSimulator(sim_style)
assert self.trading_environment.frame_index != None
self.event_frame = ndict()
self.event_data = ndict()
self.perf = perf.PerformanceTracker(self.trading_environment)
@property
@@ -90,7 +90,7 @@ class TradeSimulationClient(qmsg.Component):
self.finish_simulation()
def finish_simulation(self):
qutil.LOGGER.info("Client is DONE!")
LOGGER.info("Client is DONE!")
# signal the performance tracker that the simulation has
# ended. Perf will internally calculate the full risk report.
self.perf.handle_simulation_end()
@@ -147,19 +147,19 @@ class TradeSimulationClient(qmsg.Component):
As per the algorithm protocol:
- Set the current portfolio for the algorithm as per protocol.
- Construct frame based on backlog of events, send to algorithm.
- Construct data based on backlog of events, send to algorithm.
"""
current_portfolio = self.perf.get_portfolio()
self.algorithm.set_portfolio(current_portfolio)
frame = self.get_frame()
if len(frame) > 0:
self.algorithm.handle_frame(frame)
data = self.get_data()
if len(data) > 0:
self.algorithm.handle_data(data)
def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])
def order(self, sid, amount):
order = zp.namedict({
order = zp.ndict({
'dt':self.current_dt,
'sid':sid,
'amount':amount
@@ -176,11 +176,11 @@ class TradeSimulationClient(qmsg.Component):
self.event_queue = []
self.event_queue.append(event)
def get_frame(self):
def get_data(self):
for event in self.event_queue:
self.event_frame[event['sid']] = event
self.event_data[event['sid']] = event
self.event_queue = []
return self.event_frame
return self.event_data
class TransactionSimulator(object):
@@ -214,7 +214,7 @@ class TransactionSimulator(object):
log = "requested to trade zero shares of {sid}".format(
sid=event.sid
)
qutil.LOGGER.debug(log)
LOGGER.debug(log)
return
if(not self.open_orders.has_key(event.sid)):
@@ -338,7 +338,7 @@ for orders:
event=str(event),
orders=str(orders)
)
qutil.LOGGER.warn(warning)
LOGGER.warn(warning)
return None
@@ -351,7 +351,7 @@ for orders:
'commission' : self.commission * amount * direction,
'source_id' : zp.FINANCE_COMPONENT.TRANSACTION_SIM
}
return zp.namedict(txn)
return zp.ndict(txn)
class TradingEnvironment(object):
@@ -370,7 +370,6 @@ class TradingEnvironment(object):
self.trading_day_map = {}
self.treasury_curves = treasury_curves
self.benchmark_returns = benchmark_returns
self.frame_index = ['sid', 'volume', 'dt', 'price', 'changed']
self.period_start = period_start
self.period_end = period_end
self.capital_base = capital_base
@@ -471,14 +470,6 @@ class TradingEnvironment(object):
return self.trading_day_map[date].returns
else:
return 0.0
def add_to_frame(self, name):
"""
Add an entry to the frame index.
:param name: new index entry name. Used by TradingSimulationClient
to
"""
self.frame_index.append(name)
+1 -3
View File
@@ -1,8 +1,6 @@
import pandas
from datetime import timedelta
from collections import defaultdict
from zipline.messaging import BaseTransform
from zipline.transforms.base import BaseTransform
from zipline.finance.movingaverage import EventWindow
class VWAPTransform(BaseTransform):
+70 -78
View File
@@ -1,20 +1,20 @@
"""
Ziplines are composed of multiple components connected by asynchronous
messaging. All ziplines follow a general topology of parallel sources,
datetimestamp serialization, parallel transformations, and finally sinks.
Furthermore, many ziplines have common needs. For example, all trade
simulations require a
Ziplines are composed of multiple components connected by asynchronous
messaging. All ziplines follow a general topology of parallel sources,
datetimestamp serialization, parallel transformations, and finally sinks.
Furthermore, many ziplines have common needs. For example, all trade
simulations require a
:py:class:`~zipline.finance.trading.TradeSimulationClient`.
To establish best practices and minimize code replication, the lines module
To establish best practices and minimize code replication, the lines module
provides complete zipline topologies. You can extend any zipline without
the need to extend the class. Simply instantiate any additional components
that you would like included in the zipline, and add them to the zipline
before invoking simulate.
that you would like included in the zipline, and add them to the zipline
before invoking simulate.
Here is a diagram of the SimulatedTrading zipline:
+----------------------+ +------------------------+
| Trade History | | (DataSource added |
@@ -60,62 +60,54 @@ before invoking simulate.
+---------------------------------+
"""
import mock
import pytz
import logging
from datetime import datetime, timedelta
from collections import defaultdict
import zipline.utils.factory as factory
from nose.tools import timed
from zipline.components import DataSource
from zipline.transforms import BaseTransform
import zipline.test.factory as factory
import zipline.util as qutil
import zipline.finance.risk as risk
import zipline.protocol as zp
import zipline.finance.performance as perf
import zipline.messaging as zmsg
from zipline.test.algorithms import TestAlgorithm
from zipline.sources import SpecificEquityTrades
from zipline.test_algorithms import TestAlgorithm
from zipline.finance.trading import TradeSimulationClient
from zipline.simulator import AddressAllocator, Simulator
from zipline.monitor import Controller
from zipline.core.devsimulator import Simulator
from zipline.core.monitor import Controller
from zipline.finance.trading import SIMULATION_STYLE
LOGGER = logging.getLogger('ZiplineLogger')
class SimulatedTrading(object):
"""
Zipline with::
- _no_ data sources.
- Trade simulation client, which is available to send callbacks on
events and also accept orders to be simulated.
- An order data source, which will receive orders from the trade
simulation client, and feed them into the event stream to be
simulation client, and feed them into the event stream to be
serialized and order alongside all other data source events.
- transaction simulation transformation, which receives the order
events and estimates a theoretical execution price and volume.
All components in this zipline are subject to heartbeat checks and
a control monitor, which can kill the entire zipline in the event of
exceptions in one of the components or an external request to end the
simulation.
"""
def __init__(self, **config):
"""
:param config: a dict with the following required properties::
- algorithm: a class that follows the algorithm protocol. See
:py:meth:`zipline.finance.trading.TradingSimulationClient.add_algorithm`
:py:meth:`zipline.finance.trading.TradingSimulationClient.add_algorithm
for details.
- trading_environment: an instance of
:py:class:`zipline.trading.TradingEnvironment`
- allocator: an instance of
- allocator: an instance of
:py:class:`zipline.simulator.AddressAllocator`
- simulator_class: a :py:class:`zipline.messaging.ComponentHost`
- simulator_class: a :py:class:`zipline.core.host.ComponentHost`
subclass (not an instance)
- simulation_style: optional parameter that configures the
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
@@ -124,10 +116,10 @@ class SimulatedTrading(object):
self.allocator = config['allocator']
self.trading_environment = config['trading_environment']
self.sim_style = config.get('simulation_style')
self.leased_sockets = []
self.sim_context = None
sockets = self.allocate_sockets(8)
addresses = {
'sync_address' : sockets[0],
@@ -141,75 +133,75 @@ class SimulatedTrading(object):
self.con = Controller(
sockets[6],
sockets[7],
logging = qutil.LOGGER
logger = LOGGER
)
self.con.cancel_socket = self.allocator.lease(1)[0]
# TODO: Not freeform
self.con.manage(
'freeform'
)
self.started = False
self.sim = config['simulator_class'](addresses)
self.clients = {}
self.trading_client = TradeSimulationClient(
self.trading_environment,
self.sim_style
)
self.add_client(self.trading_client)
# setup all sources
self.sources = {}
#self.order_source = OrderDataSource()
#self.add_source(self.order_source)
#setup transforms
#self.transaction_sim = TransactionSimulator(self.sim_style)
self.transforms = {}
#self.add_transform(self.transaction_sim)
self.sim.register_controller( self.con )
self.sim.on_done = self.shutdown()
self.trading_client.set_algorithm(self.algorithm)
@staticmethod
def create_test_zipline(**config):
"""
:param config: A configuration object that is a dict with:
- environment - a \
:py:class:`zipline.finance.trading.TradingEnvironment`
- allocator - a :py:class:`zipline.simulator.AddressAllocator`
- sid - an integer, which will be used as the security ID.
- sid - an integer, which will be used as the security ID.
- order_count - the number of orders the test algo will place,
defaults to 100
- order_amount - the number of shares per order, defaults to 100
- trade_count - the number of trades to simulate, defaults to 101
to ensure all orders are processed.
- simulator_class - optional parameter that provides an alternative
- simulator_class - optional parameter that provides an alternative
subclass of ComponentHost to hold the whole zipline. Defaults to
:py:class:`zipline.simulator.Simulator`
:py:class:`zipline.simulator.Simulator`
- algorithm - optional parameter providing an algorithm. defaults
to :py:class:`zipline.test.algorithms.TestAlgorithm`
- trade_source - optional parameter to specify trades, if present.
If not present :py:class:`ziplien.sources.SpecificEquityTrades`
If not present :py:class:`ziplien.sources.SpecificEquityTrades`
is the source, with daily frequency in trades.
- simulation_style: optional parameter that configures the
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
allocator = config['allocator']
sid = config['sid']
#--------------------
# Trading Environment
#--------------------
@@ -217,33 +209,33 @@ class SimulatedTrading(object):
trading_environment = config['environment']
else:
trading_environment = factory.create_trading_environment()
if config.has_key('order_count'):
order_count = config['order_count']
else:
order_count = 100
if config.has_key('order_amount'):
order_amount = config['order_amount']
else:
order_amount = 100
if config.has_key('trade_count'):
trade_count = config['trade_count']
else:
# to ensure all orders are filled, we provide one more
# trade than order
trade_count = 101
if config.has_key('simulator_class'):
simulator_class = config['simulator_class']
else:
simulator_class = Simulator
simulation_style = config.get('simulation_style')
if not simulation_style:
simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE
#-------------------
# Trade Source
#-------------------
@@ -283,41 +275,41 @@ class SimulatedTrading(object):
zipline.add_source(trade_source)
return zipline
def add_source(self, source):
"""
Adds the source to the zipline, sets the sid filter of the
source to the algorithm's sid filter.
"""
assert isinstance(source, zmsg.DataSource)
self.check_started()
assert isinstance(source, DataSource)
self.check_started()
source.set_filter('SID', self.algorithm.get_sid_filter())
self.sim.register_components([source])
self.sources[source.get_id] = source
def add_transform(self, transform):
assert isinstance(transform, zmsg.BaseTransform)
assert isinstance(transform, BaseTransform)
self.check_started()
self.sim.register_components([transform])
self.transforms[transform.get_id] = transform
def add_client(self, client):
assert isinstance(client, TradeSimulationClient)
self.check_started()
self.sim.register_components([client])
self.clients[client.get_id] = client
def check_started(self):
if self.started:
raise ZiplineException("TradeSimulation", "You cannot add \
components after the simulation has begun.")
def get_cumulative_performance(self):
return self.trading_client.perf.cumulative_performance.to_dict()
def publish_to(self, result_socket):
self.trading_client.perf.publish_to(result_socket)
def allocate_sockets(self, n):
"""
Allocate sockets local to this line, track them so
@@ -331,7 +323,7 @@ class SimulatedTrading(object):
self.leased_sockets.extend(leased)
return leased
def simulate(self, blocking=False):
self.started = True
self.sim_context = self.sim.simulate()
@@ -341,11 +333,11 @@ class SimulatedTrading(object):
def shutdown(self):
pass
#self.allocator.reaquire(*self.leased_sockets)
#--------------------------------
# Component property accessors
#--------------------------------
def get_positions(self):
"""
returns current positions as a dict. draws from the cumulative
@@ -354,14 +346,14 @@ class SimulatedTrading(object):
perf = self.trading_client.perf.cumulative_performance
positions = perf.get_positions()
return positions
class ZiplineException(Exception):
def __init__(self, zipline_name, msg):
self.name = zipline_name
self.message = msg
def __str__(self):
return "Unexpected exception {line}: {msg}".format(
line=self.name,
line=self.name,
msg=self.message
)
-636
View File
@@ -1,636 +0,0 @@
"""
Commonly used messaging components.
"""
import datetime
from collections import Counter
import zipline.util as qutil
from zipline.component import Component
import zipline.protocol as zp
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME
class ComponentHost(Component):
"""
Components that can launch multiple sub-components, synchronize their
start, and then wait for all components to be finished.
"""
def __init__(self, addresses):
Component.__init__(self)
self.addresses = addresses
self.running = False
self.init()
def init(self):
assert hasattr(self, 'zmq_flavor'), \
""" You must specify a flavor of ZeroMQ for all
ComponentHost subclasses. """
# Component Registry, keyed by get_id
# ----------------------
self.components = {}
# ----------------------
# Internal Registry, keyed by guid
self._components = {}
# ----------------------
self.sync_register = {}
self.timeout = datetime.timedelta(seconds=60)
self.feed = Feed()
self.merge = Merge()
self.passthrough = PassthroughTransform()
self.controller = None
#register the feed and the merge
self.register_components([self.feed, self.merge, self.passthrough])
def register_controller(self, controller):
"""
Add the given components to the registry. Establish
communication with them.
"""
if self.controller != None:
raise Exception("There can be only one!")
self.controller = controller
self.controller.zmq_flavor = self.zmq_flavor
# Propogate the controller to all the subcomponents
for component in self.components.itervalues():
component.controller = controller
def register_components(self, components):
"""
Add the given components to the registry. Establish
communication with them.
"""
assert isinstance(components, list)
for component in components:
component.addresses = self.addresses
component.controller = self.controller
# Hosts share their zmq flavor with hosted components
component.zmq_flavor = self.zmq_flavor
self._components[component.guid] = component
self.components[component.get_id] = component
self.sync_register[component.get_id] = datetime.datetime.utcnow()
if isinstance(component, DataSource):
self.feed.add_source(component.get_id)
if isinstance(component, BaseTransform):
self.merge.add_source(component.get_id)
def unregister_component(self, component_id):
del self.components[component_id]
del self.sync_register[component_id]
def setup_sync(self):
"""
Setup the sync socket and poller. ( Bind )
"""
qutil.LOGGER.debug("Connecting sync server.")
self.sync_socket = self.context.socket(self.zmq.REP)
self.sync_socket.bind(self.addresses['sync_address'])
self.sync_poller = self.zmq_poller()
self.sync_poller.register(self.sync_socket, self.zmq.POLLIN)
self.sockets.append(self.sync_socket)
def open(self):
for component in self.components.values():
self.launch_component(component)
self.launch_controller()
def is_running(self):
"""
DEPRECATED, left in for compatability for now.
"""
cur_time = datetime.datetime.utcnow()
if len(self.components) == 0:
qutil.LOGGER.info("Component register is empty.")
return False
return True
def loop(self, lockstep=True):
while self.is_running():
# wait for synchronization request at start, and DONE at end.
# don't timeout.
socks = dict(self.sync_poller.poll())
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
msg = self.sync_socket.recv()
try:
parts = msg.split(':')
sync_id, status = parts
except ValueError as exc:
self.signal_exception(exc)
if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around
#qutil.LOGGER.debug("{id} is DONE".format(id=sync_id))
self.unregister_component(sync_id)
self.state_flag = COMPONENT_STATE.DONE
else:
self.sync_register[sync_id] = datetime.datetime.utcnow()
#qutil.LOGGER.info("confirmed {id}".format(id=msg))
# send synchronization reply
self.sync_socket.send('ack', self.zmq.NOBLOCK)
# ------------------
# Simulation Control
# ------------------
def launch_controller(self, controller):
raise NotImplementedError
def launch_component(self, component):
raise NotImplementedError
def teardown_component(self, component):
raise NotImplementedError
class Feed(Component):
"""
Connects to N PULL sockets, publishing all messages received to a PUB
socket. Published messages are guaranteed to be in chronological order
based on message property dt. Expects to be instantiated in one execution
context (thread, process, etc) and run in another.
"""
def __init__(self):
Component.__init__(self)
self.sent_count = 0
self.received_count = 0
self.draining = False
self.ds_finished_counter = 0
# Depending on the size of this, might want to use a data
# structure with better asymptotics.
self.data_buffer = {}
# source_id -> integer count
self.sent_counters = Counter()
self.recv_counters = Counter()
def init(self):
pass
@property
def get_id(self):
return "FEED"
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
# -------------
# Core Methods
# -------------
def open(self):
self.pull_socket = self.bind_data()
self.feed_socket = self.bind_feed()
def do_work(self):
# wait for synchronization reply from the host
socks = dict(self.poll.poll(self.heartbeat_timeout))
# TODO: Abstract this out, maybe on base component
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# -- Heartbeat --
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.control_out.send(heartbeat_frame)
# -- Soft Kill --
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# -- Hard Kill --
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN:
message = self.pull_socket.recv()
if message == str(CONTROL_PROTOCOL.DONE):
self.ds_finished_counter += 1
if len(self.data_buffer) == self.ds_finished_counter:
#drain any remaining messages in the buffer
qutil.LOGGER.debug("draining feed")
self.drain()
self.signal_done()
else:
try:
event = self.unframe(message)
# deserialization error
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
try:
self.append(event)
self.send_next()
# Invalid message
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
def unframe(self, msg):
return zp.DATASOURCE_UNFRAME(msg)
def frame(self, event):
return zp.FEED_FRAME(event)
# -------------
# Flow Control
# -------------
def drain(self):
"""
Send all messages in the buffer.
"""
self.draining = True
while self.pending_messages() > 0:
self.send_next()
def send_next(self):
"""
Send the (chronologically) next message in the buffer.
"""
if not (self.is_full() or self.draining):
return
event = self.next()
if(event != None):
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
self.sent_counters[event.source_id] += 1
self.sent_count += 1
def append(self, event):
"""
Add an event to the buffer for the source specified by
source_id.
"""
self.data_buffer[event.source_id].append(event)
self.recv_counters[event.source_id] += 1
self.received_count += 1
def next(self):
"""
Get the next message in chronological order.
"""
if not(self.is_full() or self.draining):
return
cur_source = None
earliest_source = None
earliest_event = None
#iterate over the queues of events from all sources
#(1 queue per datasource)
for events in self.data_buffer.values():
if len(events) == 0:
continue
cur_source = events
first_in_list = events[0]
if first_in_list.dt == None:
#this is a filler event, discard
events.pop(0)
continue
if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt):
earliest_event = first_in_list
earliest_source = cur_source
if earliest_event != None:
return earliest_source.pop(0)
def is_full(self):
"""
Indicates whether the buffer has messages in buffer for
all un-DONE, blocking sources.
"""
for source_id, events in self.data_buffer.iteritems():
if len(events) == 0:
return False
return True
def pending_messages(self):
"""
Returns the count of all events from all sources in the
buffer.
"""
total = 0
for events in self.data_buffer.values():
total += len(events)
return total
def add_source(self, source_id):
"""
Add a data source to the buffer.
"""
self.data_buffer[source_id] = []
def __len__(self):
"""
Buffer's length is same as internal map holding separate
sorted arrays of events keyed by source id.
"""
return len(self.data_buffer)
class Merge(Feed):
"""
Merges multiple streams of events into single messages.
"""
def __init__(self):
Feed.__init__(self)
self.init()
def init(self):
pass
@property
def get_id(self):
return "MERGE"
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
def open(self):
self.pull_socket = self.bind_merge()
self.feed_socket = self.bind_result()
def next(self):
"""Get the next merged message from the feed buffer."""
if not (self.is_full() or self.draining):
return
if self.pending_messages() == 0:
return
#
#get the raw event from the passthrough transform.
result = self.data_buffer[zp.TRANSFORM_TYPE.PASSTHROUGH].pop(0).PASSTHROUGH
for source, events in self.data_buffer.iteritems():
if source == zp.TRANSFORM_TYPE.PASSTHROUGH:
continue
if len(events) > 0:
cur = events.pop(0)
result.merge(cur)
return result
def unframe(self, msg):
return zp.TRANSFORM_UNFRAME(msg)
def frame(self, event):
return zp.MERGE_FRAME(event)
def append(self, event):
"""
:param event: a namedict with one entry. key is the name of the
transform, value is the transformed value.
Add an event to the buffer for the source specified by
source_id.
"""
self.data_buffer[event.keys()[0]].append(event)
self.received_count += 1
class BaseTransform(Component):
"""
Top level execution entry point for the transform
- connects to the feed socket to subscribe to events
- connects to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
- processes all messages received from feed, until DONE message received
- pushes all transforms
- sends DONE to result socket, closes all sockets and context
Parent class for feed transforms. Subclass and override transform
method to create a new derived value from the combined feed.
"""
def __init__(self, name, **kwargs):
Component.__init__(self)
self.state = {
'name': name
}
self.init(**kwargs)
def init(self):
pass
@property
def get_id(self):
return self.state['name']
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
def open(self):
"""
Establishes zmq connections.
"""
#create the feed.
self.feed_socket = self.connect_feed()
#create the result PUSH
self.result_socket = self.connect_merge()
def do_work(self):
"""
Loops until feed's DONE message is received:
- receive an event from the data feed
- call transform (subclass' method) on event
- send the transformed event
"""
socks = dict(self.poll.poll(self.heartbeat_timeout))
# TODO: Abstract this out, maybe on base component
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# -- Heartbeat --
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.control_out.send(heartbeat_frame)
# -- Soft Kill --
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# -- Hard Kill --
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
message = self.feed_socket.recv()
if message == str(CONTROL_PROTOCOL.DONE):
self.signal_done()
return
try:
event = self.unframe(message)
except zp.INVALID_FEED_FRAME as exc:
return self.signal_exception(exc)
try:
cur_state = self.transform(event)
# This is overloaded, so it can fail in all sorts of
# unknown ways. Its best to catch it in the
# Transformer itself.
except Exception as exc:
return self.signal_exception(exc)
try:
transform_frame = self.frame(cur_state)
except zp.INVALID_TRANSFORM_FRAME as exc:
return self.signal_exception(exc)
self.result_socket.send(transform_frame, self.zmq.NOBLOCK)
def frame(self, cur_state):
return zp.TRANSFORM_FRAME(cur_state['name'], cur_state['value'])
def unframe(self, msg):
return zp.FEED_UNFRAME(msg)
def transform(self, event):
"""
Must return the transformed value as a map with::
{name:"name of new transform", value: "value of new field"}
Transforms run in parallel and results are merged into a single map, so
transform names must be unique. Best practice is to use the self.state
object initialized from the transform configuration, and only set the
transformed value::
self.state['value'] = transformed_value
"""
raise NotImplementedError
class PassthroughTransform(BaseTransform):
"""
A bypass transform which is also an identity transform::
+-------+
+---| f |--->
+-------+
+------id------->
"""
def __init__(self, **kwargs):
BaseTransform.__init__(self, "PASSTHROUGH")
self.init(**kwargs)
def init(self, **kwargs):
pass
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
#TODO, could save some cycles by skipping the _UNFRAME call and just setting value to original msg string.
def transform(self, event):
return {'name':zp.TRANSFORM_TYPE.PASSTHROUGH, 'value': zp.FEED_FRAME(event) }
class DataSource(Component):
"""
Baseclass for data sources. Subclass and implement send_all - usually this
means looping through all records in a store, converting to a dict, and
calling send(map).
Every datasource has a dict property to hold filters::
- key -- name of the filter, e.g. SID
- value -- a primitive representing the filter. e.g. a list of ints.
Modify the datasource's filters via the set_filter(name, value)
"""
def __init__(self, source_id):
Component.__init__(self)
self.id = source_id
self.init()
self.filter = {}
def init(self):
self.cur_event = None
def set_filter(self, name, value):
self.filter[name] = value
@property
def get_id(self):
return self.id
@property
def get_type(self):
return COMPONENT_TYPE.SOURCE
def open(self):
self.data_socket = self.connect_data()
def send(self, event):
"""
Emit data.
"""
assert isinstance(event, zp.namedict)
event['source_id'] = self.get_id
event['type'] = self.get_type
try:
ds_frame = self.frame(event)
except zp.INVALID_DATASOURCE_FRAME as exc:
return self.signal_exception(exc)
self.data_socket.send(ds_frame)
def frame(self, event):
return zp.DATASOURCE_FRAME(event)
+3
View File
@@ -0,0 +1,3 @@
"""
Thomas's parameter optimization library.
"""
+104
View File
@@ -0,0 +1,104 @@
"""
Viscosity - Tools for benchmarking ZeroMQ data flow.
"""
import time as timer
import logging
import pycounters
from contextlib import contextmanager, nested
from pycounters import base
from pycounters.shortcuts import frequency, time
from pycounters import shortcuts, reporters, start_auto_reporting, register_reporter
from pycounters import shortcuts,reporters,report_value, output_report, \
counters, register_counter, _reporting_decorator_context_manager
JSONFile = "counters.json"
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
reporter = reporters.JSONFileReporter(output_file=JSONFile)
logreport = reporters.LogReporter(logger)
register_reporter(logreport)
register_reporter(reporter)
class timecontext:
def __init__(self, name):
self.name = name
def __enter__(self):
cntr = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False)
if not cntr:
counter = counters.AverageTimeCounter(self.name)
register_counter(counter)
self.tic = timer.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
shortcuts.value(self.name, timer.time() - self.tic)
class ttimecontext:
def __init__(self, name):
self.name = name
def __enter__(self):
counter = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False)
if not counter:
counter = counters.EventCounter(self.name)
counter.value = 0
register_counter(counter)
self.counter = counter
self.tic = timer.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
val = (timer.time() - self.tic)
if not self.counter.value:
self.counter.value = long(0.0)
self.counter.value += val
class occurancecontext:
def __init__(self, name):
self.name = name
def __enter__(self):
cntr = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False)
if not cntr:
cntr = counters.TotalCounter(self.name)
counter = counters.TotalCounter(self.name)
register_counter(counter)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
shortcuts.value(self.name, 1)
if __name__ == '__main__':
with timecontext('average time'):
for i in xrange(5):
x = [2] * 1000
timer.sleep(0.01)
with occurancecontext('totalcount'):
for i in xrange(5):
x = [2] * 1000
with ttimecontext('total time'):
for i in xrange(5):
x = [2] * 1000
timer.sleep(1)
pycounters.output_report()
+39 -41
View File
@@ -65,7 +65,7 @@ Namedict
Namedicts are dict like objects that have fields accessible by attribute lookup
as well as being indexable and iterable::
HEARTBEAT_PROTOCOL = namedict({
HEARTBEAT_PROTOCOL = ndict({
'REQ' : b'\x01',
'REP' : b'\x02',
})
@@ -118,13 +118,10 @@ import msgpack
import numbers
import datetime
import pytz
import numpy
import time
import copy
from collections import namedtuple
from protocol_utils import Enum, FrameExceptionFactory, namedict
from date_utils import EPOCH, UN_EPOCH
from utils.protocol_utils import Enum, FrameExceptionFactory, ndict
from utils.date_utils import EPOCH, UN_EPOCH
# -----------------------
# Control Protocol
@@ -221,7 +218,7 @@ def DATASOURCE_FRAME(event):
Wraps any datasource payload with id and type, so that unpacking may choose
the write UNFRAME for the payload.
:param event: namedict with following properties
:param event: ndict with following properties
- *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator)
- *ds_type* a string denoting the datasource type. Must be on of:
@@ -260,42 +257,43 @@ def DATASOURCE_FRAME(event):
def DATASOURCE_UNFRAME(msg):
"""
Extracts payload, and calls correct UNFRAME method based on the \
datasource type passed along.
Extracts payload, and calls correct UNFRAME method based on the
datasource type passed along.
Returns a dict containing at least:
- source_id
- type
other properties are added based on the datasource type:
- TRADE
- sid - int security identifier
- price - float
- volume - int
- dt - a datetime object
"""
"""
try:
ds_type, source_id, payload = msgpack.loads(msg)
assert isinstance(ds_type, int)
rval = namedict({'source_id':source_id})
rval = ndict({'source_id':source_id})
if payload == DATASOURCE_TYPE.EMPTY:
child_value = namedict({'dt':None})
child_value = ndict({'dt':None})
elif(ds_type == DATASOURCE_TYPE.TRADE):
child_value = TRADE_UNFRAME(payload)
elif(ds_type == DATASOURCE_TYPE.ORDER):
child_value = ORDER_SOURCE_UNFRAME(payload)
else:
raise INVALID_DATASOURCE_FRAME(msg)
rval.merge(child_value)
return rval
except TypeError:
raise INVALID_DATASOURCE_FRAME(msg)
except ValueError:
@@ -309,12 +307,12 @@ INVALID_FEED_FRAME = FrameExceptionFactory('FEED')
def FEED_FRAME(event):
"""
:param event: a nameddict with at least
:param event: a ndict with at least
- source_id
- type
"""
assert isinstance(event, namedict)
assert isinstance(event, ndict)
source_id = event.source_id
ds_type = event.type
PACK_DATE(event)
@@ -326,7 +324,7 @@ def FEED_UNFRAME(msg):
payload = msgpack.loads(msg)
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(payload, dict)
rval = namedict(payload)
rval = ndict(payload)
UNPACK_DATE(rval)
return rval
except TypeError:
@@ -350,13 +348,13 @@ def TRANSFORM_FRAME(name, value):
def TRANSFORM_UNFRAME(msg):
"""
:rtype: namedict with <transform_name>:<transform_value>
:rtype: ndict with <transform_name>:<transform_value>
"""
try:
name, value = msgpack.loads(msg)
if(value == TRANSFORM_TYPE.EMPTY):
return namedict({name : None})
return ndict({name : None})
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(name, basestring)
if(name == TRANSFORM_TYPE.PASSTHROUGH):
@@ -364,7 +362,7 @@ def TRANSFORM_UNFRAME(msg):
elif(name == TRANSFORM_TYPE.TRANSACTION):
value = TRANSACTION_UNFRAME(value)
return namedict({name : value})
return ndict({name : value})
except TypeError:
raise INVALID_TRANSFORM_FRAME(msg)
except ValueError:
@@ -382,7 +380,7 @@ def MERGE_FRAME(event):
- source_id
- type
"""
assert isinstance(event, namedict)
assert isinstance(event, ndict)
PACK_DATE(event)
if(event.has_attr(TRANSFORM_TYPE.TRANSACTION)):
if(event.TRANSACTION == None):
@@ -397,7 +395,7 @@ def MERGE_UNFRAME(msg):
payload = msgpack.loads(msg)
#TODO: anything we can do to assert more about the content of the dict?
assert isinstance(payload, dict)
payload = namedict(payload)
payload = ndict(payload)
if(payload.has_attr(TRANSFORM_TYPE.TRANSACTION)):
if(payload.TRANSACTION == TRANSFORM_TYPE.EMPTY):
payload.TRANSACTION = None
@@ -425,7 +423,7 @@ INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE')
def TRADE_FRAME(event):
"""
:param event: should be a namedict with:
:param event: should be a ndict with:
- ds_id -- the datasource id sending this trade out
- sid -- the security id
@@ -434,7 +432,7 @@ def TRADE_FRAME(event):
- dt -- datetime for the trade
"""
assert isinstance(event, namedict)
assert isinstance(event, ndict)
assert event.type == DATASOURCE_TYPE.TRADE
assert isinstance(event.sid, int)
assert isinstance(event.price, numbers.Real)
@@ -456,7 +454,7 @@ def TRADE_UNFRAME(msg):
assert isinstance(sid, int)
assert isinstance(price, numbers.Real)
assert isinstance(volume, numbers.Integral)
rval = namedict({
rval = ndict({
'sid' : sid,
'price' : price,
'volume' : volume,
@@ -491,7 +489,7 @@ def ORDER_UNFRAME(msg):
sid, amount, dt = msgpack.loads(msg)
assert isinstance(sid, int)
assert isinstance(amount, int)
rval = namedict({
rval = ndict({
'sid':sid,
'amount':amount,
'dt':dt
@@ -513,7 +511,7 @@ def ORDER_UNFRAME(msg):
def TRANSACTION_FRAME(event):
assert isinstance(event, namedict)
assert isinstance(event, ndict)
assert isinstance(event.sid, int)
assert isinstance(event.price, numbers.Real)
assert isinstance(event.commission, numbers.Real)
@@ -535,7 +533,7 @@ def TRANSACTION_UNFRAME(msg):
assert isinstance(price, numbers.Real)
assert isinstance(commission, numbers.Real)
assert isinstance(amount, int)
rval = namedict({
rval = ndict({
'sid' : sid,
'price' : price,
'amount' : amount,
@@ -577,7 +575,7 @@ def ORDER_SOURCE_FRAME(event):
def ORDER_SOURCE_UNFRAME(msg):
try:
sid, amount, dt, source_id, source_type = msgpack.loads(msg)
event = namedict({
event = ndict({
"sid" : sid,
"amount" : amount,
"dt" : dt,
@@ -688,7 +686,7 @@ def PACK_DATE(event):
PACK_DATE and UNPACK_DATE are inverse operations.
:param event: event must a namedict with a property named 'dt' that is a datetime.
:param event: event must a ndict with a property named 'dt' that is a datetime.
:rtype: None
"""
assert isinstance(event.dt, datetime.datetime)
@@ -710,7 +708,7 @@ def UNPACK_DATE(event):
UNPACK_DATE and PACK_DATE are inverse operations.
:param tuple event: event must a namedict with:
:param tuple event: event must a ndict with:
- a property named 'dt_tuple' that is a tuple of integers \
representing the date and time in UTC.
@@ -742,15 +740,15 @@ ORDER_PROTOCOL = Enum(
)
#Transform type needs to be a namedict to facilitate merging.
TRANSFORM_TYPE = namedict({
#Transform type needs to be a ndict to facilitate merging.
TRANSFORM_TYPE = ndict({
'TRANSACTION' : 'TRANSACTION', #needed?
'PASSTHROUGH' : 'PASSTHROUGH',
'EMPTY' : ''
})
FINANCE_COMPONENT = namedict({
FINANCE_COMPONENT = ndict({
'TRADING_CLIENT' : 'TRADING_CLIENT',
'PORTFOLIO_CLIENT' : 'PORTFOLIO_CLIENT',
'ORDER_SOURCE' : 'ORDER_SOURCE',
View File
-97
View File
@@ -1,97 +0,0 @@
from datetime import timedelta
from collections import defaultdict
from unittest2 import TestCase
import zipline.test.factory as factory
import zipline.util as qutil
from zipline.finance.vwap import DailyVWAP, VWAPTransform
from zipline.finance.returns import ReturnsFromPriorClose
from zipline.finance.movingaverage import MovingAverage
from zipline.lines import SimulatedTrading
from zipline.simulator import AddressAllocator, Simulator
allocator = AddressAllocator(1000)
class ZiplineWithTransformsTestCase(TestCase):
leased_sockets = defaultdict(list)
def setUp(self):
# skip ahead 100 spots
allocator.lease(100)
qutil.configure_logging()
self.trading_environment = factory.create_trading_environment()
self.zipline_test_config = {
'allocator':allocator,
'sid':133
}
def test_vwap_tnfm(self):
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
vwap = VWAPTransform("vwap_10", daycount=10)
zipline.add_transform(vwap)
zipline.simulate(blocking=True)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
class FinanceTransformsTestCase(TestCase):
def setUp(self):
self.trading_environment = factory.create_trading_environment()
def test_vwap(self):
trade_history = factory.create_trade_history(
133,
[10.0, 10.0, 10.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.trading_environment
)
vwap = DailyVWAP(daycount=2)
for trade in trade_history:
vwap.update(trade)
self.assertEqual(vwap.vwap, 10.75)
def test_returns(self):
trade_history = factory.create_trade_history(
133,
[10.0, 10.0, 10.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.trading_environment
)
returns = ReturnsFromPriorClose()
for trade in trade_history:
returns.update(trade)
self.assertEqual(returns.returns, .1)
def test_moving_average(self):
trade_history = factory.create_trade_history(
133,
[10.0, 10.0, 10.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.trading_environment
)
ma = MovingAverage(daycount=2)
for trade in trade_history:
ma.update(trade)
self.assertEqual(ma.average, 10.5)
-22
View File
@@ -1,22 +0,0 @@
from zipline.messaging import BaseTransform
from zipline.protocol import COMPONENT_TYPE
class DivideByZeroTransform(BaseTransform):
"""
A transform that fails.
"""
def __init__(self, name):
BaseTransform.__init__(self, "PASSTHROUGH")
self.state['name'] = name
self.init()
def init(self):
pass
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
def transform(self, event):
return { 'value': 0/0 }
@@ -16,8 +16,8 @@ The algorithm must expose methods:
of valid sids. List must have a length between 1 and 10. If None is returned
the filter will block all events.
- handle_frame: method that accepts a :py:class:`pandas.Dataframe` of the
current state of the simulation universe. An example frame::
- handle_data: method that accepts a :py:class:`zipline.protocol_utils.ndict`
of the current state of the simulation universe. An example data ndict::
+-----------------+--------------+----------------+--------------------+
| | SID(133) | SID(134) | SID(135) |
@@ -74,7 +74,7 @@ class TestAlgorithm():
def set_portfolio(self, portfolio):
self.portfolio = portfolio
def handle_frame(self, frame):
def handle_data(self, data):
self.frame_count += 1
#place an order for 100 shares of sid
if self.incr < self.count:
@@ -110,7 +110,7 @@ class HeavyBuyAlgorithm():
def set_portfolio(self, portfolio):
self.portfolio = portfolio
def handle_frame(self, frame):
def handle_data(self, data):
self.frame_count += 1
#place an order for 100 shares of sid
self.order(self.sid, self.amount)
@@ -133,7 +133,7 @@ class NoopAlgorithm(object):
def set_portfolio(self, portfolio):
pass
def handle_frame(self, frame):
def handle_data(self, data):
pass
def get_sid_filter(self):
-80
View File
@@ -1,80 +0,0 @@
"""
Contains the various deployable topologies of ziplines.
This is mostly hardcoded at the moment but as the topologies
becomes more sophisiticated this logic will be the primary
router of sockets.
Ontology of Stream Processing
=============================
Source
******
A producer of data. The data could be in a datastore, coming from a
socket, etc. To access this data, we pull from the source. Sources increase the
total amount of data flowing through the system. Sources are generally not
pure since they involve IO.
Sink
****
A consumer of data. Basic examples would be a sum function (adding up a
stream of numbers fed in), a datastore sink, a socket etc. We push data
into a sink. When / If a sink completes processing, it may return some
value that exists outside of the system.
Sinks decrease the total amount of information flowing through the system.
Conduit
*******
A transformer of data. We push data into a conduit. Similar to a sink,
but instead of returning a single value at the end, a conduit can
return multiple outputs every time it is pushed to. The returned values
remain in the system.
Conduits may or may not be pure, it is usefull to distinguish between the
two since pure conduits have a variety of nice properties under composition
"""
from zipline.protocol import COMPONENT_TYPE
class Topology(object):
pass
class DiamondTopology(Topology):
"""
Exposes a feed, merge, and passthrough bypass::
+--------+
+---------->| |---------------+
| +--------+ |
| v
+---+----+ +---+----+ +--------+ +--------+ +---+----+
| +-->| +----->| |---------->| |--->| |
+---+----+ +---+----+ +--------+ +--------+ +---+----+
| ^
| +--------+ |
+---------->| |---------------+
| +--------+ |
| |
+------------passthru----------------+
"""
flow = {
'flow' : COMPONENT_TYPE.SOURCE ,
'serializers' : COMPONENT_TYPE.CONDUIT ,
'transforms' : COMPONENT_TYPE.CONDUIT ,
'merges' : COMPONENT_TYPE.CONDUIT ,
'clients' : COMPONENT_TYPE.SINK ,
}
def __init__(self):
self.sources = []
self.serializers = []
self.transforms = []
self.merges = []
self.clients = []
+9 -3
View File
@@ -6,14 +6,20 @@ Transforms provide re-useable components for stream processing. All
Transforms expect to receive data events from zipline.core.DataFeed
asynchronously via zeromq. Each transform is designed to run in independent
process space, independently of all other transforms, to allow for parallel
computation.
computation.
Each transform must maintain the state necessary to calculate the transform of
each new feed events.
each new feed events.
To simplify the consumption of feed and transform data events, this module
also provides the TransformsMerge class. TransformsMerge initializes as set of
transforms and subscribes to their output. Each feed event is then combined with
all the transforms of that event into a single new message.
"""
"""
from base import BaseTransform
__all__ = [
BaseTransform,
]
+134
View File
@@ -0,0 +1,134 @@
import logging
from zipline.core.component import Component
import zipline.protocol as zp
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
CONTROL_FRAME, CONTROL_UNFRAME
LOGGER = logging.getLogger('ZiplineLogger')
class BaseTransform(Component):
"""
Top level execution entry point for the transform
- connects to the feed socket to subscribe to events
- connects to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms
- processes all messages received from feed, until DONE message received
- pushes all transforms
- sends DONE to result socket, closes all sockets and context
Parent class for feed transforms. Subclass and override transform
method to create a new derived value from the combined feed.
"""
def __init__(self, name, **kwargs):
Component.__init__(self)
self.state = {
'name': name
}
self.init(**kwargs)
def init(self):
pass
@property
def get_id(self):
return self.state['name']
@property
def get_type(self):
return COMPONENT_TYPE.CONDUIT
def open(self):
"""
Establishes zmq connections.
"""
#create the feed.
self.feed_socket = self.connect_feed()
#create the result PUSH
self.result_socket = self.connect_merge()
def do_work(self):
"""
Loops until feed's DONE message is received:
- receive an event from the data feed
- call transform (subclass' method) on event
- send the transformed event
"""
socks = dict(self.poll.poll(self.heartbeat_timeout))
# TODO: Abstract this out, maybe on base component
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# -- Heartbeat --
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.control_out.send(heartbeat_frame)
# -- Soft Kill --
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# -- Hard Kill --
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
message = self.feed_socket.recv()
if message == str(CONTROL_PROTOCOL.DONE):
self.signal_done()
return
try:
event = self.unframe(message)
except zp.INVALID_FEED_FRAME as exc:
return self.signal_exception(exc)
try:
cur_state = self.transform(event)
# This is overloaded, so it can fail in all sorts of
# unknown ways. Its best to catch it in the
# Transformer itself.
except Exception as exc:
return self.signal_exception(exc)
try:
transform_frame = self.frame(cur_state)
except zp.INVALID_TRANSFORM_FRAME as exc:
return self.signal_exception(exc)
self.result_socket.send(transform_frame, self.zmq.NOBLOCK)
def frame(self, cur_state):
return zp.TRANSFORM_FRAME(cur_state['name'], cur_state['value'])
def unframe(self, msg):
return zp.FEED_UNFRAME(msg)
def transform(self, event):
"""
Must return the transformed value as a map with::
{name:"name of new transform", value: "value of new field"}
Transforms run in parallel and results are merged into a single map, so
transform names must be unique. Best practice is to use the self.state
object initialized from the transform configuration, and only set the
transformed value::
self.state['value'] = transformed_value
"""
raise NotImplementedError
-28
View File
@@ -1,28 +0,0 @@
"""
Small classes to assist with timezone calculations, LOGGER configuration,
and other common operations.
"""
import datetime
import pytz
import logging
import logging.handlers
LOGGER = logging.getLogger('ZiplineLogger')
def configure_logging(loglevel=logging.DEBUG):
"""
Configures zipline.util.LOGGER to write a rotating file
(10M per file, 5 files) to `` /var/log/zipline.log ``.
"""
LOGGER.setLevel(loglevel)
handler = logging.handlers.RotatingFileHandler(
"/var/log/zipline/{lfn}.log".format(lfn="zipline"),
maxBytes=10*1024*1024, backupCount=5
)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s",
"%Y-%m-%d %H:%M:%S %Z")
)
LOGGER.addHandler(handler)
LOGGER.info("logging started...")
+5
View File
@@ -0,0 +1,5 @@
from protocol_utils import ndict
__all__ = [
ndict,
]
@@ -1,6 +1,7 @@
"""
Factory functions to prepare useful data for tests.
"""
import pytz
import msgpack
import random
@@ -8,17 +9,14 @@ from os.path import join
from operator import attrgetter
from datetime import datetime, timedelta
import zipline
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.sources import SpecificEquityTrades, RandomEquityTrades
from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades
from zipline.finance.trading import TradingEnvironment
def load_market_data():
data_path = join(zipline.__path__[0], "test")
with open(join(data_path, "benchmark.msgpack"), "rb") as fp_bm:
bm_list = msgpack.loads(fp_bm.read())
fp_bm = open("./tests/benchmark.msgpack", "rb")
bm_list = msgpack.loads(fp_bm.read())
bm_returns = []
for packed_date, returns in bm_list:
event_dt = zp.tuple_to_date(packed_date)
@@ -33,8 +31,8 @@ def load_market_data():
bm_returns.append(daily_return)
bm_returns = sorted(bm_returns, key=attrgetter('date'))
with open(join(data_path, "treasury_curves.msgpack"), "rb") as fp_tr:
tr_list = msgpack.loads(fp_tr.read())
fp_tr = open(".//tests/treasury_curves.msgpack", "rb")
tr_list = msgpack.loads(fp_tr.read())
tr_curves = {}
for packed_date, curve in tr_list:
tr_dt = zp.tuple_to_date(packed_date)
@@ -42,7 +40,7 @@ def load_market_data():
tr_curves[tr_dt] = curve
return bm_returns, tr_curves
def create_trading_environment(year=2006):
"""Construct a complete environment with reasonable defaults"""
benchmark_returns, treasury_curves = load_market_data()
@@ -58,8 +56,9 @@ def create_trading_environment(year=2006):
)
return trading_environment
def create_trade(sid, price, amount, datetime):
row = zp.namedict({
row = zp.ndict({
'source_id' : "test_factory",
'type' : zp.DATASOURCE_TYPE.TRADE,
'sid' : sid,
@@ -83,7 +82,7 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar):
current = trading_calendar.first_open
for price, amount in zip(prices, amounts):
trade = create_trade(sid, price, amount, current)
trades.append(trade)
current = get_next_trading_dt(current, interval, trading_calendar)
@@ -92,11 +91,11 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar):
return trades
def create_txn(sid, price, amount, datetime, btrid=None):
txn = zp.namedict({
'sid':sid,
'amount':amount,
'dt':datetime,
'price':price,
txn = zp.ndict({
'sid' : sid,
'amount' : amount,
'dt' : datetime,
'price' : price,
})
return txn
@@ -172,7 +171,7 @@ def create_random_trade_source(sid, trade_count, trading_environment):
return source
def create_daily_trade_source(sids, trade_count, trading_environment):
"""
creates trade_count trades for each sid in sids list.
first trade will be on trading_environment.period_start, and daily
@@ -183,9 +182,9 @@ def create_daily_trade_source(sids, trade_count, trading_environment):
to match the day of the final trade.
"""
return create_trade_source(
sids,
trade_count,
timedelta(days=1),
sids,
trade_count,
timedelta(days=1),
trading_environment
)
@@ -193,26 +192,28 @@ def create_daily_trade_source(sids, trade_count, trading_environment):
def create_minutely_trade_source(sids, trade_count, trading_environment):
"""
creates trade_count trades for each sid in sids list.
first trade will be on trading_environment.period_start, and every minute
thereafter for each sid. Thus, two sids should result in two trades per
minute.
creates trade_count trades for each sid in sids list.
first trade will be on trading_environment.period_start, and every minute
thereafter for each sid. Thus, two sids should result in two trades per
minute.
Important side-effect: trading_environment.period_end will be modified
to match the day of the final trade.
to match the day of the final trade.
"""
return create_trade_source(
sids,
trade_count,
timedelta(minutes=1),
sids,
trade_count,
timedelta(minutes=1),
trading_environment
)
def create_trade_source(sids, trade_count, trade_time_increment, trading_environment):
trade_history = []
price = [10.1] * trade_count
volume = [100] * trade_count
for sid in sids:
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = trading_environment.first_open
generated_trades = create_trade_history(
+13
View File
@@ -0,0 +1,13 @@
"""
Small classes to assist with timezone calculations, LOGGER configuration,
and other common operations.
"""
import logging
import logging.config
def configure_logging():
logging.config.fileConfig(
'logging.cfg',
disable_existing_loggers = False
)
@@ -31,79 +31,6 @@ def FrameExceptionFactory(name):
return InvalidFrame
class namedict(MutableMapping):
"""
Namedicts are dict like objects that have fields accessible by attribute lookup
as well as being indexable and iterable::
HEARTBEAT_PROTOCOL = namedict({
'REQ' : b'\x01',
'REP' : b'\x02',
})
HEARTBEAT_PROTOCOL.REQ # syntactic sugar
HEARTBEAT_PROTOCOL.REP # oh suga suga
For more complex structs use collections.namedtuple:
"""
def __init__(self, dct=None):
if(dct):
self.__dict__.update(dct)
def __setitem__(self, key, value):
"""
Required for use by pymongo as_class parameter to find.
"""
if(key == '_id'):
self.__dict__['id'] = value
else:
self.__dict__[key] = value
def __getitem__(self, key):
return self.__dict__[key]
def __delitem__(self, key):
del self.__dict__[key]
def __iter__(self):
return self.__dict__.iterkeys()
def __len__(self):
return len(self.__dict__)
def keys(self):
return self.__dict__.keys()
def as_dict(self):
# shallow copy is O(n)
return copy.copy(self.__dict__)
def delete(self, key):
del(self.__dict__[key])
def merge(self, other_nd):
assert isinstance(other_nd, namedict)
self.__dict__.update(other_nd.__dict__)
def __repr__(self):
return "namedict: " + str(self.__dict__)
def __eq__(self, other):
# !!!!!!!!!!!!!!!!!!!!
# !!!! DANGEROUS !!!!!
# !!!!!!!!!!!!!!!!!!!!
return other != None and self.__dict__ == other.__dict__
def has_attr(self, name):
return self.__dict__.has_key(name)
def as_series(self):
s = pandas.Series(self.__dict__)
s.name = self.sid
return s
class ndict(MutableMapping):
"""
Xtreme Namedicts 2.0
@@ -123,6 +50,13 @@ class ndict(MutableMapping):
# Abstact Overloads
# -----------------
def __setattr__(self, key, value):
if '_ndict' in key or key == 'cls':
self.__dict__[key] = value
else:
self.__internal[key] = value
return value
def __setitem__(self, key, value):
"""
Required for use by pymongo as_class parameter to find.
@@ -132,7 +66,6 @@ class ndict(MutableMapping):
else:
self.__internal[key] = value
def __getattr__(self, key):
if key in self.cls:
return self.__dict__[key]
@@ -219,3 +152,23 @@ class ndict(MutableMapping):
#return False
#return True
# This is not neccesarily the most intuitive construction, but
# we're aiming for raw performance rather than readability. So
# we do things that we would not normally do in business logic.
def namelookup(dct):
ks = dct.keys()
vs = dct.values()
dct = {}
class _lookup:
__slots__ = ks
def __init__(self):
for k, v in zip(ks, vs):
setattr(self,k,v)
self.__setattr__ = self.locked
def locked(self,k,v):
raise Exception('Name lookups are fixed at init.')
def __repr__(self):
return '<namelookup %s>' % self.__slots__
del dct
return _lookup()
+9
View File
@@ -0,0 +1,9 @@
BANNER = """
Zipline {version}
Released under BSD3
""".strip()
VERSION = ( 0, 0, 1, 'dev' )
def pretty_version():
return BANNER.format(version='.'.join(VERSION))