ENH: Send transactions and orders as standalone events.

- Add transaction and order types
- Move TransactionSimulator from trading.py to tradesimulation.py
  (only used by other members of the tradesimulation module)
- Make Transaction an independent event, like dividend
- Add Blotter class.
- Flatten the transaction events to be independent of trade bar events
- Make orders into events that reach performance (need to add
handling)
- Issue IDs to orders and tracking each transaction's order id.
- Make volume share slippage fill orders independently, rather than
  aggregating them into a single transaction.
- Perf tracker holds orders, serializes them with transactions.
- Order state defined and maintained by order class.
- Minutely emission of orders based on last_modified date.
This commit is contained in:
Eddie Hebert
2013-04-14 18:59:57 -04:00
parent cfbbbe2f1c
commit 35f57ada3e
12 changed files with 531 additions and 338 deletions
+84 -93
View File
@@ -24,17 +24,15 @@ from unittest import TestCase
from zipline.finance.slippage import VolumeShareSlippage
from zipline.protocol import Event
from zipline.protocol import Event, DATASOURCE_TYPE
from zipline.gens.tradesimulation import Order
class SlippageTestCase(TestCase):
def test_volume_share_slippage(self):
event = Event(
{'volume': 200,
'TRANSACTION': None,
'type': 4,
'price': 3.0,
'datetime': datetime.datetime(
@@ -51,26 +49,31 @@ class SlippageTestCase(TestCase):
slippage_model = VolumeShareSlippage()
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'sid': 133})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
event,
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.01875),
'dt': datetime.datetime(
2006, 1, 5, 14, 31, tzinfo=pytz.utc),
'amount': int(50),
'sid': int(133),
'commission': None
'commission': None,
'type': DATASOURCE_TYPE.TRANSACTION,
'order_id': open_orders[0].id
}
self.assertIsNotNone(txn)
@@ -87,46 +90,46 @@ class SlippageTestCase(TestCase):
# long, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'sid': 133,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[2],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# long, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'sid': 133,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[3],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.500875),
'dt': datetime.datetime(
2006, 1, 5, 14, 34, tzinfo=pytz.utc),
'amount': int(100),
'sid': int(133)
'sid': int(133),
'order_id': open_orders[0].id
}
self.assertIsNotNone(txn)
@@ -136,40 +139,43 @@ class SlippageTestCase(TestCase):
# short, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
'filled': 0,
'sid': 133,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[0],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# short, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
'filled': 0,
'sid': 133,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[1],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.499125),
'dt': datetime.datetime(
@@ -190,89 +196,91 @@ class SlippageTestCase(TestCase):
# long, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'sid': 133,
'stop': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[2],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# long, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
'filled': 0,
'sid': 133,
'stop': 3.6})
]}
'stop': 3.6
})
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[3],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.500875),
'dt': datetime.datetime(
2006, 1, 5, 14, 34, tzinfo=pytz.utc),
'amount': int(100),
'sid': int(133)
'sid': int(133),
'order_id': open_orders[0].id
}
self.assertIsNotNone(txn)
for key, value in expected_txn.items():
self.assertEquals(value, txn[key])
# short, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
'filled': 0,
'sid': 133,
'stop': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[0],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# short, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
'filled': 0,
'sid': 133,
'stop': 3.4})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[1],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.499125),
'dt': datetime.datetime(
@@ -281,8 +289,6 @@ class SlippageTestCase(TestCase):
'sid': int(133)
}
self.assertIsNotNone(txn)
for key, value in expected_txn.items():
self.assertEquals(value, txn[key])
@@ -293,7 +299,7 @@ class SlippageTestCase(TestCase):
# long, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
@@ -301,29 +307,25 @@ class SlippageTestCase(TestCase):
'sid': 133,
'stop': 4.0,
'limit': 3.0})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[2],
open_orders
)
expected_txn = {}
self.assertEquals(len(txns), 0)
self.assertIsNone(txn)
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[3],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# long, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': 100,
@@ -331,22 +333,23 @@ class SlippageTestCase(TestCase):
'sid': 133,
'stop': 4.0,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[2],
open_orders
)
expected_txn = {}
self.assertEquals(len(txns), 0)
self.assertIsNone(txn)
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[3],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.500875),
'dt': datetime.datetime(
@@ -355,14 +358,12 @@ class SlippageTestCase(TestCase):
'sid': int(133)
}
self.assertIsNotNone(txn)
for key, value in expected_txn.items():
self.assertEquals(value, txn[key])
# short, does not trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
@@ -370,29 +371,25 @@ class SlippageTestCase(TestCase):
'sid': 133,
'stop': 3.0,
'limit': 4.0})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[0],
open_orders
)
expected_txn = {}
self.assertEquals(len(txns), 0)
self.assertIsNone(txn)
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[1],
open_orders
)
expected_txn = {}
self.assertIsNone(txn)
self.assertEquals(len(txns), 0)
# short, does trade
open_orders = {133: [
open_orders = [
Order(**{
'dt': datetime.datetime(2006, 1, 5, 14, 30, tzinfo=pytz.utc),
'amount': -100,
@@ -400,22 +397,23 @@ class SlippageTestCase(TestCase):
'sid': 133,
'stop': 3.0,
'limit': 3.5})
]}
]
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[0],
open_orders
)
expected_txn = {}
self.assertEquals(len(txns), 0)
self.assertIsNone(txn)
txn = slippage_model.simulate(
txns = slippage_model.simulate(
events[1],
open_orders
)
self.assertEquals(len(txns), 1)
txn = txns[0]
expected_txn = {
'price': float(3.499125),
'dt': datetime.datetime(
@@ -424,8 +422,6 @@ class SlippageTestCase(TestCase):
'sid': int(133)
}
self.assertIsNotNone(txn)
for key, value in expected_txn.items():
self.assertEquals(value, txn[key])
@@ -434,7 +430,6 @@ class SlippageTestCase(TestCase):
events = [
Event({
'volume': 2000,
'TRANSACTION': None,
'type': 4,
'price': 3.0,
'datetime': datetime.datetime(
@@ -450,7 +445,6 @@ class SlippageTestCase(TestCase):
}),
Event({
'volume': 2000,
'TRANSACTION': None,
'type': 4,
'price': 3.5,
'datetime': datetime.datetime(
@@ -466,7 +460,6 @@ class SlippageTestCase(TestCase):
}),
Event({
'volume': 2000,
'TRANSACTION': None,
'type': 4,
'price': 4.0,
'datetime': datetime.datetime(
@@ -482,7 +475,6 @@ class SlippageTestCase(TestCase):
}),
Event({
'volume': 2000,
'TRANSACTION': None,
'type': 4,
'price': 3.5,
'datetime': datetime.datetime(
@@ -498,7 +490,6 @@ class SlippageTestCase(TestCase):
}),
Event({
'volume': 2000,
'TRANSACTION': None,
'type': 4,
'price': 3.0,
'datetime': datetime.datetime(
+6 -4
View File
@@ -66,10 +66,12 @@ class TestAlgo(TradingAlgorithm):
self.ordered = True
self.asserter.assertGreaterEqual(
self.latest_date,
self.slippage.latest_date
)
else:
self.asserter.assertGreaterEqual(
self.latest_date,
self.slippage.latest_date
)
class AlgorithmGeneratorTestCase(TestCase):
+24 -17
View File
@@ -31,13 +31,12 @@ from nose.tools import timed
import zipline.utils.factory as factory
import zipline.utils.simfactory as simfactory
from zipline.gens.tradesimulation import Order
from zipline.gens.tradesimulation import Order, Blotter
import zipline.finance.trading as trading
from zipline.finance.trading import SimulationParameters
from zipline.finance.performance import PerformanceTracker
from zipline.finance.trading import TransactionSimulator
from zipline.utils.test_utils import(
setup_logger,
teardown_logger,
@@ -160,7 +159,12 @@ class FinanceTestCase(TestCase):
def test_full_zipline(self):
#provide enough trades to ensure all orders are filled.
self.zipline_test_config['order_count'] = 100
self.zipline_test_config['trade_count'] = 200
# making a small order amount, so that each order is filled
# in a single transaction, and txn_count == order_count.
self.zipline_test_config['order_amount'] = 25
# No transactions can be filled on the first trade, so
# we have one extra trade to ensure all orders are filled.
self.zipline_test_config['trade_count'] = 101
zipline = simfactory.create_test_zipline(**self.zipline_test_config)
assert_single_position(self, zipline)
@@ -205,7 +209,8 @@ class FinanceTestCase(TestCase):
@timed(DEFAULT_TIMEOUT)
def test_collapsing_orders(self):
# create a scenario where order.amount <<< trade.volume
# to test that several orders can be covered properly by one trade.
# to test that several orders can be covered properly by one trade,
# but are represented by multiple transactions.
params1 = {
'trade_count': 6,
'trade_amount': 100,
@@ -215,8 +220,8 @@ class FinanceTestCase(TestCase):
'order_interval': timedelta(minutes=1),
# because we placed an orders totaling less than 25% of one trade
# the simulator should produce just one transaction.
'expected_txn_count': 1,
'expected_txn_volume': 24 * 1
'expected_txn_count': 24,
'expected_txn_volume': 24
}
self.transaction_sim(**params1)
@@ -228,8 +233,8 @@ class FinanceTestCase(TestCase):
'order_count': 24,
'order_amount': -1,
'order_interval': timedelta(minutes=1),
'expected_txn_count': 1,
'expected_txn_volume': 24 * -1
'expected_txn_count': 24,
'expected_txn_volume': -24
}
self.transaction_sim(**params2)
@@ -242,8 +247,8 @@ class FinanceTestCase(TestCase):
'order_count': 24,
'order_amount': 1,
'order_interval': timedelta(minutes=1),
'expected_txn_count': 1,
'expected_txn_volume': 24 * 1
'expected_txn_count': 24,
'expected_txn_volume': 24
}
self.transaction_sim(**params3)
@@ -285,7 +290,7 @@ class FinanceTestCase(TestCase):
sid = 1
sim_params = factory.create_simulation_parameters()
trade_sim = TransactionSimulator()
blotter = Blotter()
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = sim_params.first_open
@@ -311,7 +316,7 @@ class FinanceTestCase(TestCase):
'dt': order_date
})
trade_sim.place_order(order)
blotter.place_order(order)
order_date = order_date + order_interval
# move after market orders to just after market next
@@ -322,7 +327,7 @@ class FinanceTestCase(TestCase):
order_date = order_date.replace(hour=14, minute=30)
# there should now be one open order list stored under the sid
oo = trade_sim.open_orders
oo = blotter.open_orders
self.assertEqual(len(oo), 1)
self.assertTrue(sid in oo)
order_list = oo[sid]
@@ -340,10 +345,12 @@ class FinanceTestCase(TestCase):
for dt, trades in itertools.groupby(generated_trades,
operator.attrgetter('dt')):
for trade in trades:
trade_sim.update(trade)
if trade.TRANSACTION:
transactions.append(trade.TRANSACTION)
txns = blotter.process_trade(trade)
for txn in txns:
transactions.append(txn)
tracker.process_event(txn)
tracker.process_event(trade)
if complete_fill:
@@ -364,7 +371,7 @@ class FinanceTestCase(TestCase):
self.assertEqual(total_volume, cumulative_pos.amount)
# the open orders should now be empty
oo = trade_sim.open_orders
oo = blotter.open_orders
self.assertTrue(sid in oo)
order_list = oo[sid]
self.assertEqual(0, len(order_list))
+87 -51
View File
@@ -24,11 +24,13 @@ from operator import attrgetter
import zipline.utils.factory as factory
import zipline.finance.performance as perf
from zipline.finance.slippage import Transaction
from zipline.finance.slippage import Transaction, create_transaction
from zipline.gens.composites import date_sorted_sources
from zipline.finance.trading import SimulationParameters
from zipline.gens.tradesimulation import Order
import zipline.finance.trading as trading
from zipline.protocol import DATASOURCE_TYPE
from zipline.utils.factory import create_random_simulation_parameters
onesec = datetime.timedelta(seconds=1)
@@ -36,6 +38,10 @@ oneday = datetime.timedelta(days=1)
tradingday = datetime.timedelta(hours=6, minutes=30)
def create_txn(sid, price, amount, dt):
return create_transaction(sid, amount, price, dt, "fakeuid")
class TestDividendPerformance(unittest.TestCase):
def setUp(self):
@@ -78,8 +84,8 @@ class TestDividendPerformance(unittest.TestCase):
events[2].dt
)
txn = factory.create_txn(1, 10.0, 100, events[0].dt)
events[0].TRANSACTION = txn
txn = create_txn(1, 10.0, 100, events[0].dt)
events.insert(0, txn)
events.insert(1, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
@@ -136,8 +142,8 @@ class TestDividendPerformance(unittest.TestCase):
)
events.insert(1, dividend)
txn = factory.create_txn(1, 10.0, 100, events[3].dt)
events[3].TRANSACTION = txn
txn = create_txn(1, 10.0, 100, events[3].dt)
events.insert(4, txn)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
((event.dt, [event]) for event in events))
@@ -183,11 +189,11 @@ class TestDividendPerformance(unittest.TestCase):
events[3].dt
)
buy_txn = factory.create_txn(1, 10.0, 100, events[0].dt)
events[0].TRANSACTION = buy_txn
sell_txn = factory.create_txn(1, 10.0, -100, events[2].dt)
events[2].TRANSACTION = sell_txn
events.insert(1, dividend)
buy_txn = create_txn(1, 10.0, 100, events[0].dt)
events.insert(1, buy_txn)
sell_txn = create_txn(1, 10.0, -100, events[3].dt)
events.insert(4, sell_txn)
events.insert(0, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
((event.dt, [event]) for event in events))
@@ -233,10 +239,10 @@ class TestDividendPerformance(unittest.TestCase):
events[5].dt
)
buy_txn = factory.create_txn(1, 10.0, 100, events[1].dt)
events[1].TRANSACTION = buy_txn
sell_txn = factory.create_txn(1, 10.0, -100, events[2].dt)
events[2].TRANSACTION = sell_txn
buy_txn = create_txn(1, 10.0, 100, events[1].dt)
events.insert(2, buy_txn)
sell_txn = create_txn(1, 10.0, -100, events[3].dt)
events.insert(4, sell_txn)
events.insert(1, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
@@ -280,11 +286,11 @@ class TestDividendPerformance(unittest.TestCase):
10.00,
events[0].dt,
events[1].dt,
events[-1].dt + 10*oneday
events[-1].dt + 10 * oneday
)
buy_txn = factory.create_txn(1, 10.0, 100, events[1].dt)
events[1].TRANSACTION = buy_txn
buy_txn = create_txn(1, 10.0, 100, events[1].dt)
events.insert(2, buy_txn)
events.insert(1, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
@@ -334,8 +340,8 @@ class TestDividendPerformance(unittest.TestCase):
events[2].dt
)
txn = factory.create_txn(1, 10.0, -100, self.dt+oneday)
events[0].TRANSACTION = txn
txn = create_txn(1, 10.0, -100, self.dt + oneday)
events.insert(1, txn)
events.insert(0, dividend)
perf_tracker = perf.PerformanceTracker(self.sim_params)
transformed_events = list(perf_tracker.transform(
@@ -431,7 +437,7 @@ class TestPositionPerformance(unittest.TestCase):
self.sim_params
)
txn = factory.create_txn(1, 10.0, 100, self.dt + onesec)
txn = create_txn(1, 10.0, 100, self.dt + onesec)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(txn)
@@ -502,7 +508,7 @@ single short-sale transaction"""
trades_1 = trades[:-2]
txn = factory.create_txn(1, 10.0, -100, self.dt + onesec)
txn = create_txn(1, 10.0, -100, self.dt + onesec)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(txn)
@@ -690,14 +696,14 @@ trade after cover"""
self.sim_params
)
short_txn = factory.create_txn(
short_txn = create_txn(
1,
10.0,
-100,
self.dt + onesec
)
cover_txn = factory.create_txn(1, 7.0, 100, self.dt + onesec * 6)
cover_txn = create_txn(1, 7.0, 100, self.dt + onesec * 6)
pp = perf.PerformancePeriod(1000.0)
pp.execute_transaction(short_txn)
@@ -805,7 +811,7 @@ shares in position"
400
)
saleTxn = factory.create_txn(
saleTxn = create_txn(
1,
10.0,
-100,
@@ -871,6 +877,11 @@ shares in position"
class TestPerformanceTracker(unittest.TestCase):
def setUp(self):
self.sim_params, self.dt, self.end_dt = \
create_random_simulation_parameters()
NumDaysToDelete = collections.namedtuple(
'NumDaysToDelete', ('start', 'middle', 'end'))
@@ -984,11 +995,15 @@ class TestPerformanceTracker(unittest.TestCase):
events = date_sorted_sources(trade_history, trade_history2)
events = [self.event_with_txn(event, trade_history[0].dt)
for event in events]
events = [event for event in
self.trades_with_txns(events, trade_history[0].dt)]
# Extract events with transactions to use for verification.
events_with_txns = [event for event in events if event.TRANSACTION]
txns = [event for event in
events if event.type == DATASOURCE_TYPE.TRANSACTION]
orders = [event for event in
events if event.type == DATASOURCE_TYPE.ORDER]
perf_messages = \
[msg for date, snapshot in
@@ -1002,10 +1017,11 @@ class TestPerformanceTracker(unittest.TestCase):
perf_messages.extend(end_perf_messages)
#we skip two trades, to test case of None transaction
self.assertEqual(perf_tracker.txn_count, len(events_with_txns))
self.assertEqual(perf_tracker.txn_count, len(txns))
self.assertEqual(perf_tracker.txn_count, len(orders))
cumulative_pos = perf_tracker.cumulative_performance.positions[sid]
expected_size = len(events_with_txns) / 2 * -25
expected_size = len(txns) / 2 * -25
self.assertEqual(cumulative_pos.amount, expected_size)
self.assertEqual(perf_tracker.last_close,
@@ -1014,22 +1030,30 @@ class TestPerformanceTracker(unittest.TestCase):
self.assertEqual(len(perf_messages),
sim_params.days_in_period)
def event_with_txn(self, event, no_txn_dt):
#create a transaction for all but
#first trade in each sid, to simulate None transaction
if event.dt != no_txn_dt:
txn = Transaction(**{
'sid': event.sid,
'amount': -25,
'dt': event.dt,
'price': 10.0,
'commission': 0.50
})
else:
txn = None
event['TRANSACTION'] = txn
def trades_with_txns(self, events, no_txn_dt):
for event in events:
return event
#create a transaction for all but
#first trade in each sid, to simulate None transaction
if event.dt != no_txn_dt:
order = Order(**{
'sid': event.sid,
'amount': -25,
'dt': event.dt
})
yield order
yield event
txn = Transaction(**{
'sid': event.sid,
'amount': -25,
'dt': event.dt,
'price': 10.0,
'commission': 0.50,
'order_id': order.id
})
yield txn
else:
yield event
@trading.use_environment(trading.TradingEnvironment())
def test_minute_tracker(self):
@@ -1047,13 +1071,17 @@ class TestPerformanceTracker(unittest.TestCase):
tracker = perf.PerformanceTracker(sim_params)
foo_event_1 = factory.create_trade('foo', 10.0, 20, start_dt)
order_event_1 = Order(**{
'sid': foo_event_1.sid,
'amount': -25,
'dt': foo_event_1.dt
})
bar_event_1 = factory.create_trade('bar', 100.0, 200, start_dt)
txn = Transaction(sid=foo_event_1.sid,
amount=-25,
dt=foo_event_1.dt,
price=10.0,
commission=0.50)
foo_event_1.TRANSACTION = txn
txn_event_1 = Transaction(sid=foo_event_1.sid,
amount=-25,
dt=foo_event_1.dt,
price=10.0,
commission=0.50)
foo_event_2 = factory.create_trade(
'foo', 11.0, 20, start_dt + datetime.timedelta(minutes=1))
@@ -1062,13 +1090,15 @@ class TestPerformanceTracker(unittest.TestCase):
events = [
foo_event_1,
order_event_1,
txn_event_1,
bar_event_1,
foo_event_2,
bar_event_2
]
import operator
messages = {date: snapshot[0].perf_messages[0] for date, snapshot in
messages = {date: snapshot[-1].perf_messages[0] for date, snapshot in
tracker.transform(
itertools.groupby(
events,
@@ -1085,6 +1115,12 @@ class TestPerformanceTracker(unittest.TestCase):
self.assertEquals(0, len(msg_2['intraday_perf']['transactions']),
"The second message should have no transactions.")
self.assertEquals(1, len(msg_1['intraday_perf']['orders']),
"The first message should contain one orders.")
# Check that orders aren't emitted for previous events.
self.assertEquals(0, len(msg_2['intraday_perf']['orders']),
"The second message should have no orders.")
# Ensure that period_close moves through time.
# Also, ensure that the period_closes are the expected dts.
self.assertEquals(foo_event_1.dt,
+1 -1
View File
@@ -339,7 +339,7 @@ class TradingAlgorithm(object):
Set the method that will be called to create a
transaction from open orders and trade events.
"""
self.trading_client.ordering_client.transact = transact
self.trading_client.blotter.transact = transact
def set_slippage(self, slippage):
if not isinstance(slippage, (VolumeShareSlippage, FixedSlippage)):
+45 -11
View File
@@ -173,6 +173,7 @@ class PerformanceTracker(object):
# don't save the transactions for the cumulative
# period
keep_transactions=False,
keep_orders=False,
# don't serialize positions for cumualtive period
serialize_positions=False
)
@@ -185,6 +186,7 @@ class PerformanceTracker(object):
self.market_open,
self.market_close,
keep_transactions=True,
keep_orders=True,
serialize_positions=True
)
@@ -280,23 +282,36 @@ class PerformanceTracker(object):
elif self.emission_rate == 'minute':
messages.append(self.to_dict())
if event.TRANSACTION:
self.txn_count += 1
self.cumulative_performance.execute_transaction(
event.TRANSACTION
)
self.todays_performance.execute_transaction(event.TRANSACTION)
#update last sale
self.cumulative_performance.update_last_sale(event)
self.todays_performance.update_last_sale(event)
del event['TRANSACTION']
elif event.type == zp.DATASOURCE_TYPE.TRANSACTION:
# Trade simulation always follows a transaction with the
# TRADE event that was used to simulate it, so we don't
# check for end of day rollover messages here.
self.txn_count += 1
self.cumulative_performance.execute_transaction(
event
)
self.todays_performance.execute_transaction(event)
# Transactions are consumed by performance, and not
# relayed to the next element in the generator chain.
messages = None
elif event.type == zp.DATASOURCE_TYPE.DIVIDEND:
self.cumulative_performance.add_dividend(event)
self.todays_performance.add_dividend(event)
# this event will not be relayed up
# Dividends are consumed by performance, and not
# relayed to the next element in the generator chain.
messages = None
elif event.type == zp.DATASOURCE_TYPE.ORDER:
self.cumulative_performance.record_order(event)
self.todays_performance.record_order(event)
messages = None
elif event.type == zp.DATASOURCE_TYPE.CUSTOM:
# we just want to relay this event unchanged.
messages = []
@@ -476,6 +491,7 @@ class PerformancePeriod(object):
period_open=None,
period_close=None,
keep_transactions=True,
keep_orders=False,
serialize_positions=True):
self.period_open = period_open
@@ -492,6 +508,8 @@ class PerformancePeriod(object):
self.ending_cash = starting_cash
self.keep_transactions = keep_transactions
self.processed_transactions = []
self.keep_orders = keep_orders
self.placed_orders = []
self.cumulative_capital_used = 0.0
self.max_capital_used = 0.0
self.max_leverage = 0.0
@@ -517,6 +535,8 @@ class PerformancePeriod(object):
self.period_cash_flow = 0.0
self.pnl = 0.0
self.processed_transactions = []
self.placed_orders = \
[order for order in self.placed_orders if order.open]
self.cumulative_capital_used = 0.0
self.max_capital_used = 0.0
self.max_leverage = 0.0
@@ -576,6 +596,10 @@ class PerformancePeriod(object):
else:
self.returns = 0.0
def record_order(self, order):
if self.keep_orders:
self.placed_orders.append(order)
def execute_transaction(self, txn):
# Update Position
# ----------------
@@ -664,14 +688,24 @@ class PerformancePeriod(object):
if self.keep_transactions:
if dt:
# Only include transactions for given dt
transactions = [x.__dict__
transactions = [x.to_dict()
for x in self.processed_transactions
if x.dt == dt]
else:
transactions = [x.__dict__
transactions = [x.to_dict()
for x in self.processed_transactions]
rval['transactions'] = transactions
if self.keep_orders:
if dt:
# only include orders modified as of the given dt.
orders = [x.to_dict()
for x in self.placed_orders
if x.last_modified_dt == dt]
else:
orders = [x.to_dict() for x in self.placed_orders]
rval['orders'] = orders
return rval
def as_portfolio(self):
+109 -113
View File
@@ -12,18 +12,51 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import pytz
import math
from copy import copy
from functools import partial
from zipline.protocol import DATASOURCE_TYPE
import numpy as np
from logbook import Processor
def check_order_triggers(order, event):
"""
Given an order and a trade event, return a tuple of
(stop_reached, limit_reached).
For market orders, will return (False, False).
For stop orders, limit_reached will always be False.
For limit orders, stop_reached will always be False.
Orders that have been triggered already (price targets reached),
the order's current values are returned.
"""
if order.triggered:
return (order.stop_reached, order.limit_reached)
stop_reached = False
limit_reached = False
# if the stop price is reached, simply set stop_reached
if order.stop is not None:
if (order.direction * (event.price - order.stop) <= 0):
# convert stop -> limit or market
stop_reached = True
# if the limit price is reached, we execute this order at
# (event.price + simulated_impact)
# we skip this order with a continue when the limit is not reached
if order.limit is not None:
# if limit conditions not met, then continue
if (order.direction * (event.price - order.limit) <= 0):
limit_reached = True
return (stop_reached, limit_reached)
def transact_stub(slippage, commission, event, open_orders):
"""
This is intended to be wrapped in a partial, so that the
@@ -35,13 +68,15 @@ def transact_stub(slippage, commission, event, open_orders):
with Processor(inject_algo_dt).threadbound():
transaction = slippage.simulate(event, open_orders)
if transaction and not np.allclose(transaction.amount, 0):
direction = math.copysign(1, transaction.amount)
per_share, total_commission = commission.calculate(transaction)
transaction.price = transaction.price + (per_share * direction)
transaction.commission = total_commission
return transaction
transactions = slippage.simulate(event, open_orders)
for transaction in transactions:
if transaction and not np.allclose(transaction.amount, 0):
direction = math.copysign(1, transaction.amount)
per_share, total_commission = commission.calculate(transaction)
transaction.price = transaction.price + (per_share * direction)
transaction.commission = total_commission
return transactions
def transact_partial(slippage, commission):
@@ -50,24 +85,32 @@ def transact_partial(slippage, commission):
class Transaction(object):
def __init__(self, sid, amount, dt, price, commission=None):
def __init__(self, sid, amount, dt, price, order_id=None, commission=None):
self.sid = sid
self.amount = amount
self.dt = dt
self.price = price
self.order_id = order_id
self.commission = commission
self.type = DATASOURCE_TYPE.TRANSACTION
def __getitem__(self, name):
return self.__dict__[name]
def to_dict(self):
py = copy(self.__dict__)
del py['type']
return py
def create_transaction(sid, amount, price, dt):
def create_transaction(sid, amount, price, dt, order_id):
txn = {
'sid': sid,
'amount': int(amount),
'dt': dt,
'price': price,
'order_id': order_id
}
transaction = Transaction(**txn)
@@ -78,35 +121,19 @@ class VolumeShareSlippage(object):
def __init__(self,
volume_limit=.25,
price_impact=0.1,
delay=timedelta(minutes=1)):
price_impact=0.1):
self.volume_limit = volume_limit
self.price_impact = price_impact
self.delay = delay
def simulate(self, event, open_orders):
if np.allclose(event.volume, 0):
#there are zero volume events bc some stocks trade
#less frequently than once per minute.
return None
if event.sid in open_orders:
orders = open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
# Only use orders for the current day or before
current_orders = filter(
lambda o: o.dt + self.delay <= event.dt,
orders)
else:
return None
def simulate(self, event, current_orders):
dt = event.dt
total_order = 0
simulated_amount = 0
simulated_impact = 0.0
max_volume = self.volume_limit * event.volume
total_volume = 0
txns = []
for order in current_orders:
open_amount = order.amount - order.filled
@@ -114,64 +141,49 @@ class VolumeShareSlippage(object):
if np.allclose(open_amount, 0):
continue
direction = math.copysign(1, open_amount)
# check price limits, continue if the
# order isn't triggered yet
order.check_triggers(event)
if not order.triggered:
continue
# if the stop price is reached, simply set stop to None
# othrewise we skip this order with a continue
if order.stop is not None:
if (direction * (event.price - order.stop) < 0):
# convert stop -> limit or market
order.stop = None
else:
continue
# price impact accounts for the total volume of transactions
# created against the current minute bar
remaining_volume = max_volume - total_volume
if remaining_volume <= 0 or np.allclose(remaining_volume, 0):
# we can't fill any more transactions
return txns
# if the limit price is reached, we execute this order at
# (event.price + simulated_impact)
# we skip this order with a continue when the limit is not reached
if order.limit is not None:
# if limit conditions not met, then continue
if (direction * (event.price - order.limit) > 0):
continue
# the current order amount will be the min of the
# volume available in the bar or the open amount.
cur_amount = min(remaining_volume, abs(open_amount))
cur_amount = cur_amount * order.direction
# tally the current amount into our total amount ordered.
# total amount will be used to calculate price impact
total_volume = total_volume + order.direction * cur_amount
desired_order = total_order + open_amount
volume_share = min(direction * (desired_order) / event.volume,
volume_share = min(order.direction * (total_volume) / event.volume,
self.volume_limit)
if np.allclose(volume_share, self.volume_limit):
simulated_amount = \
int(self.volume_limit * event.volume * direction)
else:
# we can fill the entire desired order
# let's not deal with floating-point errors
simulated_amount = desired_order
simulated_impact = (volume_share) ** 2 \
* self.price_impact * direction * event.price
* self.price_impact * order.direction * event.price
order.filled += (simulated_amount - total_order)
total_order = simulated_amount
# we cap the volume share at configured % of a trade
if np.allclose(volume_share, self.volume_limit):
break
filled_orders = [x for x in orders
if abs(x.amount - x.filled) > 0
and x.dt.day >= event.dt.day]
open_orders[event.sid] = filled_orders
if simulated_amount != 0:
return create_transaction(
txn = create_transaction(
event.sid,
simulated_amount,
cur_amount,
# In the future, we may want to change the next line
# for limit pricing
event.price + simulated_impact,
dt.replace(tzinfo=pytz.utc),
order.id
)
# mark the last_modified date of the order to match
order.last_modified_dt = event.dt
txns.append(txn)
return txns
class FixedSlippage(object):
@@ -183,48 +195,32 @@ class FixedSlippage(object):
"""
self.spread = spread
def simulate(self, event, open_orders):
if event.sid in open_orders:
orders = open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
else:
return None
def simulate(self, event, orders):
amount = 0
txns = []
for order in orders:
# what if we have 2 orders, one for 100 shares long,
# TODO: what if we have 2 orders, one for 100 shares long,
# and one for 100 shares short
# such as in a hedging scenario?
amount += order.amount
direction = math.copysign(1, amount)
# if the stop price is reached, simply set stop to None
# othrewise we skip this order with a continue
if order.stop is not None:
if (direction * (event.price - order.stop) < 0):
# convert stop -> limit or market
order.stop = None
else:
continue
# check price limits, continue if the
# order isn't triggered yet
order.check_triggers(event)
if not order.triggered:
continue
# if the limit price is reached, we execute this order at
# (event.price + simulated_impact)
# we skip this order with a continue when the limit is not reached
if order.limit is not None:
# if limit conditions not met, then continue
if (direction * (event.price - order.limit) > 0):
continue
if np.allclose(order.amount, 0):
return txns
if np.allclose(amount, 0):
return
txn = create_transaction(
event.sid,
order.amount,
event.price + (self.spread / 2.0 * order.direction),
event.dt.replace(tzinfo=pytz.utc),
order.id
)
txn = create_transaction(
event.sid,
amount,
event.price + (self.spread / 2.0 * direction),
event.dt
)
open_orders[event.sid] = []
return txn
# mark the last_modified date of the order to match
order.last_modified = event.dt
txns.append(txn)
return txns
+1 -33
View File
@@ -19,19 +19,13 @@ import logbook
import datetime
from functools import wraps
from collections import defaultdict, OrderedDict
from delorean import Delorean
import pandas as pd
from pandas import DatetimeIndex
from collections import OrderedDict
from zipline.data.loader import load_market_data
import zipline.protocol as zp
from zipline.finance.slippage import (
VolumeShareSlippage,
transact_partial
)
from zipline.finance.commission import PerShare
log = logbook.Logger('Transaction Simulator')
@@ -75,32 +69,6 @@ log = logbook.Logger('Transaction Simulator')
environment = None
class TransactionSimulator(object):
def __init__(self):
self.transact = transact_partial(VolumeShareSlippage(), PerShare())
self.open_orders = defaultdict(list)
def place_order(self, order):
# initialized filled field.
order.filled = 0
self.open_orders[order.sid].append(order)
def transform(self, stream_in):
"""
Main generator work loop.
"""
for date, snapshot in stream_in:
yield date, [self.update(event) for event in snapshot]
def update(self, event):
event.TRANSACTION = None
# We only fill transactions on trade events.
if event.type == zp.DATASOURCE_TYPE.TRADE:
event.TRANSACTION = self.transact(event, self.open_orders)
return event
class TradingEnvironment(object):
def __init__(
+154 -15
View File
@@ -13,19 +13,110 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import math
import numpy as np
import uuid
from copy import copy
from logbook import Logger, Processor
from collections import defaultdict
from zipline import ndict
from zipline.protocol import SIDData
from zipline.finance.trading import TransactionSimulator
from zipline.protocol import SIDData, DATASOURCE_TYPE
from zipline.finance.performance import PerformanceTracker
from zipline.gens.utils import hash_args
from zipline.finance.slippage import (
VolumeShareSlippage,
transact_partial,
check_order_triggers
)
from zipline.finance.commission import PerShare
log = Logger('Trade Simulation')
from zipline.utils.protocol_utils import Enum
ORDER_STATUS = Enum(
'OPEN',
'FILLED'
)
class Blotter(object):
def __init__(self):
self.transact = transact_partial(VolumeShareSlippage(), PerShare())
# these orders are aggregated by sid
self.open_orders = defaultdict(list)
# keep a dict of orders by their own id
self.orders = {}
# track transactions by sid and by order
self.txns_by_sid = defaultdict(list)
self.txns_by_order = defaultdict(list)
# holding orders that have come in since the last
# event.
self.new_orders = []
def place_order(self, order):
# initialized filled field.
order.filled = 0
self.open_orders[order.sid].append(order)
self.orders[order.id] = order
self.new_orders.append(order)
def transform(self, stream_in):
"""
Main generator work loop.
"""
for date, snapshot in stream_in:
# relay any orders placed in prior snapshot
# handling and reset the internal holding pen
results = self.new_orders
self.new_orders = []
for event in snapshot:
results.append(event)
# We only fill transactions on trade events.
if event.type == DATASOURCE_TYPE.TRADE:
txns = self.process_trade(event)
results.extend(txns)
yield date, results
def process_trade(self, trade_event):
if np.allclose(trade_event.volume, 0):
# there are zero volume trade_events bc some stocks trade
# less frequently than once per minute.
return []
if trade_event.sid in self.open_orders:
orders = self.open_orders[trade_event.sid]
orders = sorted(orders, key=lambda o: o.dt)
# Only use orders for the current day or before
current_orders = filter(
lambda o: o.dt <= trade_event.dt,
orders)
else:
return []
txns = self.transact(trade_event, current_orders)
for txn in txns:
self.txns_by_order[txn.order_id].append(txn)
self.txns_by_sid[txn.sid].append(txn)
self.orders[txn.order_id].filled += txn.amount
# update the open orders for the trade_event's sid
self.open_orders[trade_event.sid] = \
[order for order in orders if order.open]
# drop any filled orders.
filled = \
[order.id for order in orders if not order.open]
for order_id in filled:
del self.orders[order_id]
return txns
class Order(object):
def __init__(self, dt, sid, amount, stop=None, limit=None, filled=0):
@@ -37,12 +128,61 @@ class Order(object):
a negative sign indicates a sell
@filled - how many shares of the order have been filled so far
"""
# get a string representation of the uuid.
self.id = uuid.uuid4().get_hex()
self.dt = dt
self.last_modified_dt = dt
self.sid = sid
self.amount = amount
self.filled = filled
self.status = ORDER_STATUS.OPEN
self.stop = stop
self.limit = limit
self.stop_reached = False
self.limit_reached = False
self.direction = math.copysign(1, self.amount)
self.type = DATASOURCE_TYPE.ORDER
def to_dict(self):
py = copy(self.__dict__)
for field in ['type', 'direction']:
del py[field]
return py
def check_triggers(self, event):
"""
Update internal state based on price triggers and the
trade event's price.
"""
self.last_modified_dt = event.dt
self.stop_reached, self.limit_reached = \
check_order_triggers(self, event)
@property
def open(self):
remainder = self.amount - self.filled
if remainder != 0:
self.status = ORDER_STATUS.OPEN
else:
self.status = ORDER_STATUS.FILLED
return self.status == ORDER_STATUS.OPEN
@property
def triggered(self):
"""
For a market order, True.
For a stop order, True IFF stop_reached.
For a limit order, True IFF limit_reached.
For a stop-limit order, True IFF (stp_reached AND limit_reached)
"""
if self.stop and not self.stop_reached:
return False
if self.limit and not self.limit_reached:
return False
return True
def __getitem__(self, name):
return self.__dict__[name]
@@ -82,17 +222,19 @@ class TradeSimulationClient(object):
is sent to the algo.
"""
def __init__(self, algo, sim_params):
def __init__(self, algo, sim_params, blotter=None):
self.algo = algo
self.sim_params = sim_params
self.ordering_client = TransactionSimulator()
if not blotter:
self.blotter = Blotter()
self.perf_tracker = PerformanceTracker(self.sim_params)
self.algo_start = self.sim_params.first_open
self.algo_sim = AlgorithmSimulator(
self.ordering_client,
self.blotter,
self.perf_tracker,
self.algo,
self.algo_start
@@ -113,7 +255,7 @@ class TradeSimulationClient(object):
# Simulate filling any open orders made by the previous run of
# the user's algorithm. Fills the Transaction field on any
# event that results in a filled order.
with_filled_orders = self.ordering_client.transform(stream_in)
with_filled_orders = self.blotter.transform(stream_in)
# Pipe the events with transactions to perf. This will remove
# the TRANSACTION field added by TransactionSimulator and replace it
@@ -143,7 +285,7 @@ class AlgorithmSimulator(object):
}
def __init__(self,
order_book,
blotter,
perf_tracker,
algo,
algo_start):
@@ -154,7 +296,7 @@ class AlgorithmSimulator(object):
# We extract the order book from the txn client so that
# the algo can place new orders.
self.order_book = order_book
self.blotter = blotter
self.perf_tracker = perf_tracker
self.perf_key = self.EMISSION_TO_PERF_KEY_MAP[
@@ -230,13 +372,10 @@ class AlgorithmSimulator(object):
# Add non-zero orders to the order book.
# !!!IMPORTANT SIDE-EFFECT!!!
# This modifies the internal state of the transaction
# simulator so that it can fill the placed order when it
# This modifies the internal state of the blotter
# so that it can fill the placed order when it
# receives its next message.
err_str = self.order_book.place_order(order)
if err_str is not None and len(err_str) > 0:
# error, trade was not placed, log it out
log.debug(err_str)
self.blotter.place_order(order)
def transform(self, stream_in):
"""
+2
View File
@@ -26,6 +26,8 @@ DATASOURCE_TYPE = Enum(
'SPLIT',
'DIVIDEND',
'TRADE',
'TRANSACTION',
'ORDER',
'EMPTY',
'DONE',
'CUSTOM'
+1
View File
@@ -174,6 +174,7 @@ def create_txn(sid, price, amount, datetime):
'amount': amount,
'dt': datetime,
'price': price,
'type': DATASOURCE_TYPE.TRANSACTION
})
return txn
+17
View File
@@ -3,6 +3,7 @@ import blist
from zipline.utils.date_utils import EPOCH
from itertools import izip
from logbook import FileHandler
from zipline.gens.tradesimulation import ORDER_STATUS
def setup_logger(test, path='test.log'):
@@ -92,6 +93,22 @@ def assert_single_position(test, zipline):
# dicts.
closing_positions = output[-2]['daily_perf']['positions']
# confirm that all orders were filled.
# iterate over the output updates, overwriting
# orders when they are updated. Then check the status on all.
orders_by_id = {}
for update in output:
if 'daily_perf' in update:
if 'orders' in update['daily_perf']:
for order in update['daily_perf']['orders']:
orders_by_id[order['id']] = order
for order in orders_by_id.itervalues():
test.assertEqual(
order['status'],
ORDER_STATUS.FILLED,
"")
test.assertEqual(
len(closing_positions),
1,