Merge pull request #32 from quantopian/fawce_sprint1

Fawce sprint1
This commit is contained in:
fawce
2012-04-16 08:10:17 -07:00
10 changed files with 638 additions and 144 deletions
+2 -1
View File
@@ -75,7 +75,8 @@ class Component(object):
self.out_socket = None
self.killed = False
self.controller = None
self.heartbeat_timeout = 2000
# timeout after a full minute
self.heartbeat_timeout = 60 *1000
self.state_flag = COMPONENT_STATE.OK
self.error_state = COMPONENT_FAILURE.NOFAILURE
self.on_done = None
+44 -28
View File
@@ -160,14 +160,13 @@ class PerformanceTracker():
self.total_days = self.trading_environment.days_in_period
# one indexed so that we reach 100%
self.day_count = 0.0
self.cumulative_capital_used = 0.0
self.max_capital_used = 0.0
self.capital_base = self.trading_environment.capital_base
self.returns = []
self.txn_count = 0
self.event_count = 0
self.result_stream = None
self.last_dict = None
self.order_log = []
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
@@ -219,8 +218,8 @@ class PerformanceTracker():
'period_start' : self.period_start,
'period_end' : self.period_end,
'progress' : self.progress,
'cumulative_captial_used' : self.cumulative_capital_used,
'max_capital_used' : self.max_capital_used,
'cumulative_captial_used' : self.cumulative_perf.cumulative_capital_used,
'max_capital_used' : self.cumulative_perf.max_capital_used,
'last_close' : self.market_close,
'last_open' : self.market_open,
'capital_base' : self.capital_base,
@@ -230,38 +229,26 @@ class PerformanceTracker():
'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict(),
'timestamp' : datetime.datetime.now(),
}
def log_order(self, order):
self.order_log.append(order)
def process_event(self, event):
assert isinstance(event, zp.namedict)
self.event_count += 1
if(event.dt >= self.market_close):
self.handle_market_close()
if not pandas.isnull(event.TRANSACTION):
if event.TRANSACTION:
self.txn_count += 1
self.cumulative_performance.execute_transaction(event.TRANSACTION)
self.todays_performance.execute_transaction(event.TRANSACTION)
# we're adding a 10% cushion to the capital used,
# and then rounding to the nearest 5k
transaction_cost = event.TRANSACTION.price * event.TRANSACTION.amount
self.cumulative_capital_used += transaction_cost
if math.fabs(self.cumulative_capital_used) > self.max_capital_used:
self.max_capital_used = math.fabs(self.cumulative_capital_used)
cushioned_capital = 1.1 * self.max_capital_used
self.max_capital_used = self.round_to_nearest(
cushioned_capital,
base=5000
)
self.max_leverage = self.max_capital_used / self.capital_base
#update last sale
self.cumulative_performance.update_last_sale(event)
self.todays_performance.update_last_sale(event)
def handle_market_close(self):
#calculate performance as of last trade
@@ -317,13 +304,14 @@ class PerformanceTracker():
and send it out on the result_stream.
"""
log_msg = "Simulated {n} trading days out of {m}."
qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
# the stream will end on the last trading day, but will not trigger
# an end of day, so we trigger the final market close here.
self.handle_market_close()
log_msg = "Simulated {n} trading days out of {m}."
qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
self.risk_report = risk.RiskReport(
self.returns,
@@ -338,9 +326,6 @@ class PerformanceTracker():
# this signals that the simulation is complete.
self.result_stream.send("DONE")
def round_to_nearest(self, x, base=5):
return int(base * round(float(x)/base))
class Position():
@@ -409,6 +394,8 @@ class PerformancePeriod():
self.starting_cash = starting_cash
self.ending_cash = starting_cash
self.processed_transactions = []
self.cumulative_capital_used = 0.0
self.max_capital_used = 0.0
self.calculate_performance()
@@ -426,11 +413,40 @@ class PerformancePeriod():
self.returns = 0.0
def execute_transaction(self, txn):
# Update Position
# ----------------
if(not self.positions.has_key(txn.sid)):
self.positions[txn.sid] = Position(txn.sid)
self.positions[txn.sid].update(txn)
self.period_capital_used += -1 * txn.price * txn.amount
# Max Leverage
# ---------------
# Calculate the maximum capital used and maximum leverage
transaction_cost = txn.price * txn.amount
self.cumulative_capital_used += transaction_cost
if math.fabs(self.cumulative_capital_used) > self.max_capital_used:
self.max_capital_used = math.fabs(self.cumulative_capital_used)
# We want to conveye a level, rather than a precise figure.
# round to the nearest 5,000 to keep the number easy on the eyes
self.max_capital_used = self.round_to_nearest(
self.max_capital_used,
base=5000
)
# we're adding a 10% cushion to the capital used.
self.max_leverage = 1.1 * self.max_capital_used / self.starting_cash
# add transaction to the list of processed transactions
self.processed_transactions.append(txn)
def round_to_nearest(self, x, base=5):
return int(base * round(float(x)/base))
def calculate_positions_value(self):
mktValue = 0.0
+219 -75
View File
@@ -3,6 +3,8 @@ import pytz
import math
import pandas
from collections import Counter
# from gevent.select import select
from zmq.core.poll import select
@@ -11,6 +13,17 @@ import zipline.util as qutil
import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.protocol_utils import Enum, namedict
# the simulation style enumerates the available transaction simulation
# strategies.
SIMULATION_STYLE = Enum(
'PARTIAL_VOLUME',
'BUY_ALL',
'FIXED_SLIPPAGE',
'NOOP'
)
class TradeSimulationClient(qmsg.Component):
def __init__(self, trading_environment):
@@ -19,10 +32,13 @@ class TradeSimulationClient(qmsg.Component):
self.prev_dt = None
self.event_queue = None
self.txn_count = 0
self.order_count = 0
self.trading_environment = trading_environment
self.current_dt = trading_environment.period_start
self.last_iteration_dur = datetime.timedelta(seconds=0)
self.algorithm = None
self.attempts = 0
self.max_attempts = 1000
assert self.trading_environment.frame_index != None
self.event_frame = pandas.DataFrame(
@@ -47,8 +63,11 @@ class TradeSimulationClient(qmsg.Component):
def open(self):
self.result_feed = self.connect_result()
self.order_socket = self.connect_order()
# send a wake up call to the order data source.
self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK))
def do_work(self):
# poll all the sockets
socks = dict(self.poll.poll(self.heartbeat_timeout))
@@ -56,6 +75,8 @@ class TradeSimulationClient(qmsg.Component):
if self.result_feed in socks and \
socks[self.result_feed] == self.zmq.POLLIN:
self.attempts = 0
# get the next message from the result feed
msg = self.result_feed.recv()
@@ -65,8 +86,7 @@ class TradeSimulationClient(qmsg.Component):
# signal the performance tracker that the simulation has
# ended. Perf will internally calculate the full risk report.
self.perf.handle_simulation_end()
# shutdown the feedback loop to the OrderDataSource
self.signal_order_done()
# signal Simulator, our ComponentHost, that this component is
# done and Simulator needn't block exit on this component.
self.signal_done()
@@ -74,13 +94,21 @@ class TradeSimulationClient(qmsg.Component):
# result_feed is a merge component, so unframe accordingly
event = zp.MERGE_UNFRAME(msg)
self.received_count += 1
# update performance and relay the event to the algorithm
self.process_event(event)
# signal done to order source.
# signal loop is done for order source.
self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK))
else:
# no events in the sock means the non-order sources are
# drained. Signal the order_source that we're done, and
# the done will cascade through the whole zipline.
# shutdown the feedback loop to the OrderDataSource
if self.attempts > self.max_attempts:
self.signal_order_done()
else:
self.attempts += 1
def process_event(self, event):
# track the number of transactions, for testing purposes.
if(event.TRANSACTION != None):
@@ -107,12 +135,15 @@ class TradeSimulationClient(qmsg.Component):
# otherwise, the algorithm has fallen behind the feed
# and processing per event is longer than time between events.
if event.dt >= self.current_dt:
# compress time by moving the current_time up to the event
# time.
self.current_dt = event.dt
self.run_algorithm()
# tally the time spent on this iteration
self.last_iteration_dur = datetime.datetime.utcnow() - event_start
# move the algorithm's clock forward to include iteration time
self.current_dt = self.current_dt + self.last_iteration_dur
self.current_dt = self.current_dt + self.last_iteration_dur
def run_algorithm(self):
@@ -132,13 +163,15 @@ class TradeSimulationClient(qmsg.Component):
return self.connect_push_socket(self.addresses['order_address'])
def order(self, sid, amount):
order = zp.namedict({
'dt':self.current_dt,
'sid':sid,
'amount':amount
})
self.order_socket.send(zp.ORDER_FRAME(order))
self.order_count += 1
self.perf.log_order(order)
def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))
@@ -172,6 +205,8 @@ class OrderDataSource(qmsg.DataSource):
"""
qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE)
self.sent_count = 0
self.recv_count = Counter()
self.works = 0
@property
def get_type(self):
@@ -181,9 +216,10 @@ class OrderDataSource(qmsg.DataSource):
@property
def is_blocking(self):
"""
This datasource is in a loop with the TradingSimulationClient
This datasource is in a loop with the TradingSimulationClient,
so we don't want it to block processing.
"""
return False
return True
def open(self):
qmsg.DataSource.open(self)
@@ -194,23 +230,20 @@ class OrderDataSource(qmsg.DataSource):
def do_work(self):
#TODO: if this is the first iteration, break deadlock by sending a dummy order
if(self.sent_count == 0):
self.send(zp.namedict({}))
#pull all orders from client.
orders = []
count = 0
self.works += 1
# TODO : this can be written in a concurrency agnostic
# way... have a chat with Fawce about this ~Steve
#pull all orders from client.
count = 0
# one iteration of the client could include several orders
# so iterate until the client signals a break or a close.
while True:
# poll all the sockets
# we reduce the timeout here by a factor of 2, because we need
# to potentially receive the client's done message before the
# controller or heartbeat times out.
# TODO: shouldn't this block until we receive a message?
socks = dict(self.poll.poll(self.heartbeat_timeout/2))
# see if the poller has results for the result_feed
@@ -220,39 +253,59 @@ class OrderDataSource(qmsg.DataSource):
order_msg = self.order_socket.recv()
if order_msg == str(zp.ORDER_PROTOCOL.DONE):
qutil.LOGGER.info("order source is done")
self.signal_done()
self.recv_count['done'] += 1
return
if order_msg == str(zp.ORDER_PROTOCOL.BREAK):
# send a blank message to avoid an empty buffer
# in the feed
self.recv_count['break'] += 1
if count == 0:
self.send(namedict({}))
break
order = zp.ORDER_UNFRAME(order_msg)
self.recv_count['order'] += 1
#send the order along
self.send(order)
count += 1
self.sent_count += 1
else:
# no orders, break out
break
#TODO: we have to send at least one dummy order per do_work iteration
# or the feed will block waiting for our messages.
if(count == 0):
self.send(zp.namedict({}))
class TransactionSimulator(qmsg.BaseTransform):
def __init__(self):
def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME):
qmsg.BaseTransform.__init__(self, zp.TRANSFORM_TYPE.TRANSACTION)
self.open_orders = {}
self.order_count = 0
self.txn_count = 0
self.trade_window = datetime.timedelta(seconds=30)
self.trade_window = datetime.timedelta(seconds=30)
self.orderTTL = datetime.timedelta(days=1)
self.volume_share = 0.05
self.commission = 0.03
if not style or style == SIMULATION_STYLE.PARTIAL_VOLUME:
self.apply_trade_to_open_orders = self.simulate_with_partial_volume
elif style == SIMULATION_STYLE.BUY_ALL:
self.apply_trade_to_open_orders = self.simulate_buy_all
elif style == SIMULATION_STYLE.FIXED_SLIPPAGE:
self.apply_trade_to_open_orders = self.simulate_with_fixed_cost
elif style == SIMULATION_STYLE.NOOP:
self.apply_trade_to_open_orders = self.simulate_noop
#
@property
def is_blocking(self):
"""
Including this explicitly for clarity, even though we are using the
default value. TransactionSimulator has a defined action for every
event type. Downstream components depend on the presence of the
TRANSACTION transform in all cases. When no transaction happens,
None is the value. Thus, we do want merging to block on the
availability of transaction messages.
"""
return True
def transform(self, event):
"""
@@ -267,9 +320,12 @@ class TransactionSimulator(qmsg.BaseTransform):
self.state['value'] = txn
else:
self.state['value'] = None
qutil.LOGGER.info("unexpected event type in transform: {etype}".format(etype=event.type))
log = "unexpected event type in transform: {etype}".format(
etype=event.type
)
qutil.LOGGER.info(log)
#TODO: what to do if we get another kind of datasource event.type?
return self.state
def add_open_order(self, event):
@@ -277,63 +333,143 @@ class TransactionSimulator(qmsg.BaseTransform):
Amount is explicitly converted to an int.
Orders of amount zero are ignored.
"""
self.order_count += 1
event.amount = int(event.amount)
if event.amount == 0:
qutil.LOGGER.debug("requested to trade zero shares of {sid}".format(sid=event.sid))
log = "requested to trade zero shares of {sid}".format(
sid=event.sid
)
qutil.LOGGER.debug(log)
return
self.order_count += 1
if(not self.open_orders.has_key(event.sid)):
self.open_orders[event.sid] = []
# set the filled property to zero
event.filled = 0
self.open_orders[event.sid].append(event)
def apply_trade_to_open_orders(self, event):
def simulate_buy_all(self, event):
txn = self.create_transaction(
event.sid,
event.volume,
event.price,
event.dt,
1
)
return txn
if(event.volume == 0):
#there are zero volume events bc some stocks trade
#less frequently than once per minute.
return self.create_dummy_txn(event.dt)
def simulate_noop(self, event):
return None
def simulate_with_fixed_cost(self, event):
if self.open_orders.has_key(event.sid):
orders = self.open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
else:
return None
remaining_orders = []
total_order = 0
dt = event.dt
amount = 0
for order in orders:
#we're using minute bars, so allow orders within
#30 seconds of the trade
if((order.dt - event.dt) < self.trade_window):
total_order += order.amount
if(order.dt > dt):
dt = order.dt
#if the order still has time to live (TTL) keep track
elif((self.algo_time - order.dt) < self.orderTTL):
remaining_orders.append(order)
self.open_orders[event.sid] = remaining_orders
if(total_order != 0):
direction = total_order / math.fabs(total_order)
else:
direction = 1
amount += order.amount
if(amount == 0):
return
volume_share = (direction * total_order) / event.volume
if volume_share > .25:
volume_share = .25
amount = volume_share * event.volume * direction
impact = (volume_share)**2 * .1 * direction * event.price
return self.create_transaction(
event.sid,
amount,
event.price + impact,
dt.replace(tzinfo = pytz.utc),
direction
)
direction = amount / math.fabs(amount)
txn = self.create_transaction(
event.sid,
amount,
event.price + 0.10,
event.dt,
direction
)
self.open_orders[event.sid] = []
return txn
def simulate_with_partial_volume(self, event):
if(event.volume == 0):
#there are zero volume events bc some stocks trade
#less frequently than once per minute.
return None
if self.open_orders.has_key(event.sid):
orders = self.open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
else:
return None
dt = event.dt
expired = []
total_order = 0
simulated_amount = 0
simulated_impact = 0.0
direction = 1.0
for order in orders:
if(order.dt < event.dt):
# orders are only good on the day they are issued
if order.dt.day < event.dt.day:
continue
open_amount = order.amount - order.filled
if(open_amount != 0):
direction = open_amount / math.fabs(open_amount)
else:
direction = 1
desired_order = total_order + open_amount
volume_share = direction * (desired_order) / event.volume
if volume_share > .25:
volume_share = .25
simulated_amount = int(volume_share * event.volume * direction)
simulated_impact = (volume_share)**2 * .1 * direction * event.price
order.filled += (simulated_amount - total_order)
total_order = simulated_amount
# we cap the volume share at 25% of a trade
if volume_share == .25:
break
if simulated_amount == 0:
warning = """
Calculated a zero volume transation on trade:
{event}
for order:
{order}
"""
warning = warning.format(
event=str(event),
order=str(order)
)
qutil.LOGGER.warn(warning)
orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day]
self.open_orders[event.sid] = orders
if simulated_amount != 0:
return self.create_transaction(
event.sid,
simulated_amount,
event.price + simulated_impact,
dt.replace(tzinfo = pytz.utc),
direction
)
else:
return None
def create_transaction(self, sid, amount, price, dt, direction):
@@ -445,7 +581,15 @@ class TradingEnvironment(object):
return len(self.period_trading_days)
def is_market_hours(self, test_date):
if not self.is_trading_day(test_date):
return False
mkt_open = self.set_NYSE_time(test_date, 9, 30)
#TODO: half days?
mkt_close = self.set_NYSE_time(test_date, 16, 00)
return test_date >= mkt_open and test_date <= mkt_close
def is_trading_day(self, test_date):
dt = self.normalize_date(test_date)
+31 -16
View File
@@ -90,7 +90,7 @@ from zipline.finance.trading import TransactionSimulator, OrderDataSource, \
TradeSimulationClient
from zipline.simulator import AddressAllocator, Simulator
from zipline.monitor import Controller
from zipline.finance.trading import SIMULATION_STYLE
class SimulatedTrading(object):
@@ -125,11 +125,15 @@ class SimulatedTrading(object):
:py:class:`zipline.simulator.AddressAllocator`
- simulator_class: a :py:class:`zipline.messaging.ComponentHost`
subclass (not an instance)
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
self.algorithm = config['algorithm']
self.allocator = config['allocator']
self.trading_environment = config['trading_environment']
self.sim_style = config.get('simulation_style')
self.leased_sockets = []
self.sim_context = None
@@ -169,7 +173,7 @@ class SimulatedTrading(object):
self.add_source(self.order_source)
#setup transforms
self.transaction_sim = TransactionSimulator()
self.transaction_sim = TransactionSimulator(self.sim_style)
self.transforms = {}
self.add_transform(self.transaction_sim)
@@ -191,16 +195,20 @@ class SimulatedTrading(object):
- sid - an integer, which will be used as the security ID.
- order_count - the number of orders the test algo will place,
defaults to 100
- trade_count - the number of trades to simulate, defaults to 100
- order_amount - the number of shares per order, defaults to 100
- trade_count - the number of trades to simulate, defaults to 101
to ensure all orders are processed.
- simulator_class - optional parameter that provides an alternative
subclass of ComponentHost to hold the whole zipline. Defaults to
:py:class:`zipline.simulator.Simulator`
- algorithm - optional parameter providing an algorithm. defaults
to :py:class:`zipline.test.algorithms.TestAlgorithm`
- random - optional parameter to request random trades. if present
:py:class:`zipline.sources.RandomEquityTrades` is the source. If
not :py:class:`ziplien.sources.SpecificEquityTrades` is the
source
- trade_source - optional parameter to specify trades, if present.
If not present :py:class:`ziplien.sources.SpecificEquityTrades`
is the source, with daily frequency in trades.
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
@@ -219,28 +227,35 @@ class SimulatedTrading(object):
order_count = config['order_count']
else:
order_count = 100
if config.has_key('order_amount'):
order_amount = config['order_amount']
else:
order_amount = 100
if config.has_key('trade_count'):
trade_count = config['trade_count']
else:
trade_count = 100
# to ensure all orders are filled, we provide one more
# trade than order
trade_count = 101
if config.has_key('simulator_class'):
simulator_class = config['simulator_class']
else:
simulator_class = Simulator
simulation_style = config.get('simulation_style')
if not simulation_style:
simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE
#-------------------
# Trade Source
#-------------------
sids = [sid]
#-------------------
if config.has_key('random'):
trade_source = factory.create_random_trade_source(
sids,
trade_count,
trading_environment
)
if config.has_key('trade_source'):
trade_source = config['trade_source']
else:
trade_source = factory.create_daily_trade_source(
sids,
@@ -253,7 +268,6 @@ class SimulatedTrading(object):
if config.has_key('algorithm'):
test_algo = config['algorithm']
else:
order_amount = 100
test_algo = TestAlgorithm(
sid,
order_amount,
@@ -266,7 +280,8 @@ class SimulatedTrading(object):
'algorithm':test_algo,
'trading_environment':trading_environment,
'allocator':allocator,
'simulator_class':simulator_class
'simulator_class':simulator_class,
'simulation_style':simulation_style
})
#-------------------
+22 -7
View File
@@ -4,6 +4,8 @@ Commonly used messaging components.
import datetime
from collections import Counter
import zipline.util as qutil
from zipline.component import Component
import zipline.protocol as zp
@@ -37,7 +39,7 @@ class ComponentHost(Component):
# ----------------------
self.sync_register = {}
self.timeout = datetime.timedelta(seconds=5)
self.timeout = datetime.timedelta(seconds=60)
self.feed = Feed()
self.merge = Merge()
@@ -81,11 +83,11 @@ class ComponentHost(Component):
self.sync_register[component.get_id] = datetime.datetime.utcnow()
if isinstance(component, DataSource):
self.feed.add_source(component.get_id)
self.feed.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
if isinstance(component, BaseTransform):
self.merge.add_source(component.get_id)
self.merge.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
@@ -192,6 +194,13 @@ class Feed(Component):
# Depending on the size of this, might want to use a data
# structure with better asymptotics.
self.data_buffer = {}
# source_id -> integer count
self.sent_counters = Counter()
self.recv_counters = Counter()
# source_id -> boolean. True is for blocking
self.is_blocking_map = {}
def init(self):
pass
@@ -214,7 +223,7 @@ class Feed(Component):
def do_work(self):
# wait for synchronization reply from the host
socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds.
socks = dict(self.poll.poll(self.heartbeat_timeout))
# TODO: Abstract this out, maybe on base component
if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN:
@@ -294,6 +303,7 @@ class Feed(Component):
event = self.next()
if(event != None):
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
self.sent_counters[event.source_id] += 1
self.sent_count += 1
def append(self, event):
@@ -302,6 +312,7 @@ class Feed(Component):
source_id.
"""
self.data_buffer[event.source_id].append(event)
self.recv_counters[event.source_id] += 1
self.received_count += 1
def next(self):
@@ -336,9 +347,12 @@ class Feed(Component):
def is_full(self):
"""
Indicates whether the buffer has messages in buffer for
all un-DONE sources.
all un-DONE, blocking sources.
"""
for events in self.data_buffer.values():
for source_id, events in self.data_buffer.iteritems():
if not self.is_blocking_map[source_id]:
continue
if len(events) == 0:
return False
return True
@@ -353,11 +367,12 @@ class Feed(Component):
total += len(events)
return total
def add_source(self, source_id):
def add_source(self, source_id, is_blocking=True):
"""
Add a data source to the buffer.
"""
self.data_buffer[source_id] = []
self.is_blocking_map[source_id] = is_blocking
def __len__(self):
"""
+1
View File
@@ -94,6 +94,7 @@ class SpecificEquityTrades(TradeDataSource):
def get_type(self):
zp.COMPONENT_TYPE.SOURCE
def do_work(self):
if(len(self.event_list) == 0):
self.signal_done()
+3 -3
View File
@@ -70,14 +70,14 @@ class TestAlgorithm():
def handle_frame(self, frame):
self.frame_count += 1
#place an order for 100 shares of sid:133
#place an order for 100 shares of sid
if self.incr < self.count:
self.order(self.sid, self.amount)
self.incr += 1
def get_sid_filter(self):
return [self.sid]
return [self.sid]
class NoopAlgorithm(object):
"""
Dolce fa niente.
+33 -6
View File
@@ -38,12 +38,12 @@ def load_market_data():
return bm_returns, tr_curves
def create_trading_environment():
def create_trading_environment(year=2006):
"""Construct a complete environment with reasonable defaults"""
benchmark_returns, treasury_curves = load_market_data()
start = datetime(2006, 1, 1, tzinfo=pytz.utc)
end = datetime(2006, 12, 31, tzinfo=pytz.utc)
start = datetime(year, 1, 1, tzinfo=pytz.utc)
end = datetime(year, 12, 31, tzinfo=pytz.utc)
trading_environment = TradingEnvironment(
benchmark_returns,
treasury_curves,
@@ -68,7 +68,7 @@ def get_next_trading_dt(current, interval, trading_calendar):
next = current
while True:
next = next + interval
if trading_calendar.is_trading_day(next):
if trading_calendar.is_market_hours(next):
break
return next
@@ -79,9 +79,9 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar):
for price, amount in zip(prices, amounts):
current = get_next_trading_dt(current, interval, trading_calendar)
trade = create_trade(sid, price, amount, current)
trades.append(trade)
current = get_next_trading_dt(current, interval, trading_calendar)
assert len(trades) == len(prices)
return trades
@@ -167,6 +167,7 @@ def create_random_trade_source(sid, trade_count, trading_environment):
return source
def create_daily_trade_source(sids, trade_count, trading_environment):
"""
creates trade_count trades for each sid in sids list.
first trade will be on trading_environment.period_start, and daily
@@ -176,12 +177,38 @@ def create_daily_trade_source(sids, trade_count, trading_environment):
Important side-effect: trading_environment.period_end will be modified
to match the day of the final trade.
"""
return create_trade_source(
sids,
trade_count,
timedelta(days=1),
trading_environment
)
def create_minutely_trade_source(sids, trade_count, trading_environment):
"""
creates trade_count trades for each sid in sids list.
first trade will be on trading_environment.period_start, and every minute
thereafter for each sid. Thus, two sids should result in two trades per
minute.
Important side-effect: trading_environment.period_end will be modified
to match the day of the final trade.
"""
return create_trade_source(
sids,
trade_count,
timedelta(minutes=1),
trading_environment
)
def create_trade_source(sids, trade_count, trade_time_increment, trading_environment):
trade_history = []
for sid in sids:
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = trading_environment.first_open
trade_time_increment = timedelta(days=1)
generated_trades = create_trade_history(
sid,
+274 -4
View File
@@ -21,8 +21,12 @@ TradeSimulationClient, TradingEnvironment
from zipline.simulator import AddressAllocator, Simulator
from zipline.monitor import Controller
from zipline.lines import SimulatedTrading
from zipline.finance.performance import PerformanceTracker
from zipline.protocol_utils import namedict
from zipline.finance.trading import SIMULATION_STYLE
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
@@ -103,12 +107,21 @@ class FinanceTestCase(TestCase):
self.assertTrue(env.last_close.month == 12)
self.assertTrue(env.last_close.day == 31)
# The following two tests appear broken no that the order source is
# non blocking. HUNCH: The trades are streaming through before the orders
# are placed.
@timed(DEFAULT_TIMEOUT)
def test_orders(self):
# Simulation
# ----------
zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config)
self.zipline_test_config['simulation_style'] = \
SIMULATION_STYLE.FIXED_SLIPPAGE
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
zipline.simulate(blocking=True)
self.assertTrue(zipline.sim.ready())
@@ -118,8 +131,70 @@ class FinanceTestCase(TestCase):
self.assertEqual(zipline.sim.feed.pending_messages(), 0, \
"The feed should be drained of all messages, found {n} remaining." \
.format(n=zipline.sim.feed.pending_messages()))
# the trading client should receive one transaction for every
# order placed.
self.assertEqual(
zipline.trading_client.txn_count,
zipline.trading_client.order_count
)
# the number of transactions in the performance tracker's cumulative
# period should be the same as the number of orders place by the
# algorithm.
self.assertEqual(
zipline.trading_client.order_count,
len(zipline.trading_client.perf.cumulative_performance.processed_transactions)
)
@timed(EXTENDED_TIMEOUT)
def test_aggressive_buying(self):
# Simulation
# ----------
# TODO: for some reason the orders aren't filled without an extra
# trade.
trade_count = 5001
self.zipline_test_config['order_count'] = trade_count - 1
self.zipline_test_config['trade_count'] = trade_count
self.zipline_test_config['order_amount'] = 1
# tell the simulator to fill the orders in individual transactions
# matching the order volume exactly.
self.zipline_test_config['simulation_style'] = \
SIMULATION_STYLE.FIXED_SLIPPAGE
self.zipline_test_config['environment'] = factory.create_trading_environment()
sid_list = [self.zipline_test_config['sid']]
self.zipline_test_config['trade_source'] = factory.create_minutely_trade_source(
sid_list,
trade_count,
self.zipline_test_config['environment']
)
zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config)
zipline.simulate(blocking=True)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
self.assertEqual(zipline.sim.feed.pending_messages(), 0, \
"The feed should be drained of all messages, found {n} remaining." \
.format(n=zipline.sim.feed.pending_messages()))
#
# the trading client should receive one transaction for every
# order placed.
self.assertEqual(
zipline.trading_client.txn_count,
zipline.trading_client.order_count
)
@timed(DEFAULT_TIMEOUT)
def test_performance(self):
#provide enough trades to ensure all orders are filled.
@@ -204,7 +279,9 @@ class FinanceTestCase(TestCase):
self.zipline_test_config['trade_count'] = 200
self.zipline_test_config['algorithm'] = test_algo
zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config)
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
zipline.simulate(blocking=True)
#check that the algorithm received no events
@@ -214,8 +291,201 @@ class FinanceTestCase(TestCase):
"The algorithm should not receive any events due to filtering."
)
# TODO: write tests for short sales
# TODO: write a test to do massive buying or shorting.
@timed(DEFAULT_TIMEOUT)
def test_partially_filled_orders(self):
# create a scenario where order size and trade size are equal
# so that orders must be spread out over several trades.
params ={
'trade_count':360,
'trade_amount':100,
'trade_interval': timedelta(minutes=1),
'order_count':2,
'order_amount':100,
'order_interval': timedelta(minutes=1),
# because we placed an order for 100 shares, and the volume
# of each trade is 100, the simulator should spread the order
# into 4 trades of 25 shares per order.
'expected_txn_count':8,
'expected_txn_volume':2 * 100
}
self.transaction_sim(**params)
# same scenario, but with short sales
params2 ={
'trade_count':360,
'trade_amount':100,
'trade_interval': timedelta(minutes=1),
'order_count':2,
'order_amount':-100,
'order_interval': timedelta(minutes=1),
'expected_txn_count':8,
'expected_txn_volume':2 * -100
}
self.transaction_sim(**params2)
@timed(DEFAULT_TIMEOUT)
def test_collapsing_orders(self):
# create a scenario where order.amount <<< trade.volume
# to test that several orders can be covered properly by one trade.
params1 ={
'trade_count':6,
'trade_amount':100,
'trade_interval': timedelta(hours=1),
'order_count':24,
'order_amount':1,
'order_interval': timedelta(minutes=1),
# because we placed an orders totaling less than 25% of one trade
# the simulator should produce just one transaction.
'expected_txn_count':1,
'expected_txn_volume':24 * 1
}
self.transaction_sim(**params1)
# second verse, same as the first. except short!
params2 ={
'trade_count':6,
'trade_amount':100,
'trade_interval': timedelta(hours=1),
'order_count':24,
'order_amount':-1,
'order_interval': timedelta(minutes=1),
'expected_txn_count':1,
'expected_txn_volume':24 * -1
}
self.transaction_sim(**params2)
@timed(DEFAULT_TIMEOUT)
def test_partial_expiration_orders(self):
# create a scenario where orders expire without being filled
# entirely
params1 = {
'trade_count':100,
'trade_amount':100,
'trade_delay': timedelta(minutes=5),
'trade_interval': timedelta(days=1),
'order_count':3,
'order_amount':1000,
'order_interval': timedelta(minutes=30),
# because we placed an orders totaling less than 25% of one trade
# the simulator should produce just one transaction.
'expected_txn_count' : 1,
'expected_txn_volume' : 25
}
self.transaction_sim(**params1)
# same scenario, but short sales.
params2 = {
'trade_count':100,
'trade_amount':100,
'trade_delay': timedelta(minutes=5),
'trade_interval': timedelta(days=1),
'order_count':3,
'order_amount':1000,
'order_interval': timedelta(minutes=30),
# because we placed an orders totaling less than 25% of one trade
# the simulator should produce just one transaction.
'expected_txn_count' : 1,
'expected_txn_volume' : 25
}
self.transaction_sim(**params2)
def transaction_sim(self, **params):
trade_count = params['trade_count']
trade_amount = params['trade_amount']
trade_interval = params['trade_interval']
trade_delay = params.get('trade_delay')
order_count = params['order_count']
order_amount = params['order_amount']
order_interval = params['order_interval']
expected_txn_count = params['expected_txn_count']
expected_txn_volume = params['expected_txn_volume']
trading_environment = factory.create_trading_environment()
trade_sim = TransactionSimulator()
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = trading_environment.first_open
sid = 1
generated_trades = factory.create_trade_history(
sid,
price,
volume,
trade_interval,
trading_environment
)
for i in range(order_count):
order = namedict(
{
'sid':sid,
'amount':order_amount,
'type':zp.DATASOURCE_TYPE.ORDER,
'dt' : start_date + i * order_interval
})
sim_state = trade_sim.transform(order)
# there should not be a new transaction from an order.
self.assertTrue(sim_state['name'] == trade_sim.get_id)
self.assertTrue(sim_state['value'] == None)
# there should now be one open order list stored under the sid
oo = trade_sim.open_orders
self.assertEqual(len(oo), 1)
self.assertTrue(oo.has_key(sid))
order_list = oo[sid]
self.assertEqual(order_count, len(order_list))
for order in order_list:
self.assertEqual(order.sid, sid)
self.assertEqual(order.amount, order_amount)
tracker = PerformanceTracker(trading_environment)
transactions = []
for trade in generated_trades:
if trade_delay:
trade.dt = trade.dt + trade_delay
sim_state = trade_sim.transform(trade)
self.assertEqual(sim_state['name'], trade_sim.get_id)
txn = None
if sim_state['value']:
txn = sim_state['value']
transactions.append(txn)
trade[sim_state['name']] = txn
tracker.process_event(trade)
total_volume = 0
for txn in transactions:
total_volume += txn.amount
self.assertEqual(total_volume, expected_txn_volume)
self.assertEqual(len(transactions), expected_txn_count)
cumulative_pos = tracker.cumulative_performance.positions[sid]
self.assertEqual(total_volume, cumulative_pos.amount)
# the open orders should now be empty
oo = trade_sim.open_orders
self.assertTrue(oo.has_key(sid))
order_list = oo[sid]
self.assertEqual(0, len(order_list))
+9 -4
View File
@@ -22,8 +22,15 @@ class PerformanceTestCase(unittest.TestCase):
0,
len(self.treasury_curves)
)
self.dt = self.treasury_curves.keys()[random_index]
self.end_dt = self.dt + datetime.timedelta(days=365)
for n in range(100):
self.dt = self.treasury_curves.keys()[random_index]
self.end_dt = self.dt + datetime.timedelta(days=365)
now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
if self.end_dt <= now:
break
self.trading_environment = TradingEnvironment(
self.benchmark_returns,
self.treasury_curves,
@@ -505,8 +512,6 @@ shares in position"
price = 10.1
price_list = [price] * trade_count
volume = [100] * trade_count
#start_date = datetime.datetime.strptime("01/01/2011","%m/%d/%Y")
#start_date = start_date.replace(tzinfo=pytz.utc)
trade_time_increment = datetime.timedelta(days=1)
trade_history = factory.create_trade_history(
sid,