major fix is with the non-blocking behavior of order source. also fixed time-compression in the trading client.

This commit is contained in:
fawce
2012-04-13 15:08:17 -04:00
parent dfc3523197
commit 38982fcdab
6 changed files with 77 additions and 42 deletions
+8 -3
View File
@@ -166,6 +166,7 @@ class PerformanceTracker():
self.event_count = 0
self.result_stream = None
self.last_dict = None
self.order_log = []
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
@@ -228,6 +229,9 @@ class PerformanceTracker():
'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict(),
'timestamp' : datetime.datetime.now(),
}
def log_order(self, order):
self.order_log.append(order)
def process_event(self, event):
assert isinstance(event, zp.namedict)
@@ -300,13 +304,14 @@ class PerformanceTracker():
and send it out on the result_stream.
"""
log_msg = "Simulated {n} trading days out of {m}."
qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
# the stream will end on the last trading day, but will not trigger
# an end of day, so we trigger the final market close here.
self.handle_market_close()
log_msg = "Simulated {n} trading days out of {m}."
qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
self.risk_report = risk.RiskReport(
self.returns,
+33 -30
View File
@@ -119,12 +119,15 @@ class TradeSimulationClient(qmsg.Component):
# otherwise, the algorithm has fallen behind the feed
# and processing per event is longer than time between events.
if event.dt >= self.current_dt:
# compress time by moving the current_time up to the event
# time.
self.current_dt = event.dt
self.run_algorithm()
# tally the time spent on this iteration
self.last_iteration_dur = datetime.datetime.utcnow() - event_start
# move the algorithm's clock forward to include iteration time
self.current_dt = self.current_dt + self.last_iteration_dur
# self.current_dt = self.current_dt + self.last_iteration_dur
def run_algorithm(self):
@@ -152,6 +155,7 @@ class TradeSimulationClient(qmsg.Component):
})
self.order_socket.send(zp.ORDER_FRAME(order))
self.order_count += 1
self.perf.log_order(order)
def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))
@@ -185,6 +189,7 @@ class OrderDataSource(qmsg.DataSource):
"""
qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE)
self.sent_count = 0
self.works = 0
@property
def get_type(self):
@@ -194,7 +199,8 @@ class OrderDataSource(qmsg.DataSource):
@property
def is_blocking(self):
"""
This datasource is in a loop with the TradingSimulationClient
This datasource is in a loop with the TradingSimulationClient,
so we don't want it to block processing.
"""
return False
@@ -207,17 +213,12 @@ class OrderDataSource(qmsg.DataSource):
def do_work(self):
#TODO: if this is the first iteration, break deadlock by sending a dummy order
if(self.sent_count == 0):
self.send(zp.namedict({}))
self.works += 1
#pull all orders from client.
orders = []
count = 0
# TODO : this can be written in a concurrency agnostic
# way... have a chat with Fawce about this ~Steve
while True:
# poll all the sockets
@@ -246,17 +247,6 @@ class OrderDataSource(qmsg.DataSource):
self.send(order)
count += 1
self.sent_count += 1
# TODO: why didn't any unit tests catch this bug????
#else:
# # no orders, break out
# break
#TODO: we have to send at least one dummy order per do_work iteration
# or the feed will block waiting for our messages.
if(count == 0):
self.send(zp.namedict({}))
class TransactionSimulator(qmsg.BaseTransform):
@@ -278,6 +268,19 @@ class TransactionSimulator(qmsg.BaseTransform):
elif style == SIMULATION_STYLE.NOOP:
self.apply_trade_to_open_orders = self.simulate_noop
#
@property
def is_blocking(self):
"""
Including this explicitly for clarity, even though we are using the
default value. TransactionSimulator has a defined action for every
event type. Downstream components depend on the presence of the
TRANSACTION transform in all cases. When no transaction happens,
None is the value. Thus, we do want merging to block on the
availability of transaction messages.
"""
return True
def transform(self, event):
"""
Pulls one message from the event feed, then
@@ -304,6 +307,8 @@ class TransactionSimulator(qmsg.BaseTransform):
Amount is explicitly converted to an int.
Orders of amount zero are ignored.
"""
self.order_count += 1
event.amount = int(event.amount)
if event.amount == 0:
log = "requested to trade zero shares of {sid}".format(
@@ -312,7 +317,7 @@ class TransactionSimulator(qmsg.BaseTransform):
qutil.LOGGER.debug(log)
return
self.order_count += 1
if(not self.open_orders.has_key(event.sid)):
self.open_orders[event.sid] = []
@@ -321,9 +326,6 @@ class TransactionSimulator(qmsg.BaseTransform):
event.filled = 0
self.open_orders[event.sid].append(event)
#def apply_trade_to_open_orders(self, event):
# return self.simulate_with_fixed_cost(event)
def simulate_buy_all(self, event):
txn = self.create_transaction(
event.sid,
@@ -348,10 +350,11 @@ class TransactionSimulator(qmsg.BaseTransform):
for order in orders:
amount += order.amount
if(amount != 0):
direction = amount / math.fabs(amount)
else:
direction = 1
if(amount == 0):
return
direction = amount / math.fabs(amount)
txn = self.create_transaction(
event.sid,
@@ -426,9 +429,9 @@ for order:
)
qutil.LOGGER.warn(warning)
orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day]
#orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day]
self.open_orders[event.sid] = orders
#self.open_orders[event.sid] = orders
if simulated_amount != 0:
+13 -1
View File
@@ -125,6 +125,9 @@ class SimulatedTrading(object):
:py:class:`zipline.simulator.AddressAllocator`
- simulator_class: a :py:class:`zipline.messaging.ComponentHost`
subclass (not an instance)
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
self.algorithm = config['algorithm']
@@ -203,6 +206,9 @@ class SimulatedTrading(object):
- trade_source - optional parameter to specify trades, if present.
If not present :py:class:`ziplien.sources.SpecificEquityTrades`
is the source, with daily frequency in trades.
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
@@ -230,12 +236,18 @@ class SimulatedTrading(object):
if config.has_key('trade_count'):
trade_count = config['trade_count']
else:
# to ensure all orders are filled, we provide one more
# trade than order
trade_count = 101
if config.has_key('simulator_class'):
simulator_class = config['simulator_class']
else:
simulator_class = Simulator
simulation_style = config.get('simulation_style')
if not simulation_style:
simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE
#-------------------
# Trade Source
@@ -269,7 +281,7 @@ class SimulatedTrading(object):
'trading_environment':trading_environment,
'allocator':allocator,
'simulator_class':simulator_class,
'simulation_style':SIMULATION_STYLE.FIXED_SLIPPAGE
'simulation_style':simulation_style
})
#-------------------
+19 -4
View File
@@ -4,6 +4,8 @@ Commonly used messaging components.
import datetime
from collections import Counter
import zipline.util as qutil
from zipline.component import Component
import zipline.protocol as zp
@@ -81,11 +83,11 @@ class ComponentHost(Component):
self.sync_register[component.get_id] = datetime.datetime.utcnow()
if isinstance(component, DataSource):
self.feed.add_source(component.get_id)
self.feed.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
if isinstance(component, BaseTransform):
self.merge.add_source(component.get_id)
self.merge.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
@@ -192,6 +194,13 @@ class Feed(Component):
# Depending on the size of this, might want to use a data
# structure with better asymptotics.
self.data_buffer = {}
# source_id -> integer count
self.sent_counters = Counter()
self.recv_counters = Counter()
# source_id -> boolean. True is for blocking
self.is_blocking_map = {}
def init(self):
pass
@@ -294,6 +303,7 @@ class Feed(Component):
event = self.next()
if(event != None):
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
self.sent_counters[event.source_id] += 1
self.sent_count += 1
def append(self, event):
@@ -302,6 +312,7 @@ class Feed(Component):
source_id.
"""
self.data_buffer[event.source_id].append(event)
self.recv_counters[event.source_id] += 1
self.received_count += 1
def next(self):
@@ -338,7 +349,10 @@ class Feed(Component):
Indicates whether the buffer has messages in buffer for
all un-DONE sources.
"""
for events in self.data_buffer.values():
for source_id, events in self.data_buffer.iteritems():
if not self.is_blocking_map[source_id]:
continue
if len(events) == 0:
return False
return True
@@ -353,11 +367,12 @@ class Feed(Component):
total += len(events)
return total
def add_source(self, source_id):
def add_source(self, source_id, is_blocking=True):
"""
Add a data source to the buffer.
"""
self.data_buffer[source_id] = []
self.is_blocking_map[source_id] = is_blocking
def __len__(self):
"""
+1 -1
View File
@@ -70,7 +70,7 @@ class TestAlgorithm():
def handle_frame(self, frame):
self.frame_count += 1
#place an order for 100 shares of sid:133
#place an order for 100 shares of sid
if self.incr < self.count:
self.order(self.sid, self.amount)
self.incr += 1
+3 -3
View File
@@ -38,12 +38,12 @@ def load_market_data():
return bm_returns, tr_curves
def create_trading_environment():
def create_trading_environment(year=2006):
"""Construct a complete environment with reasonable defaults"""
benchmark_returns, treasury_curves = load_market_data()
start = datetime(2006, 1, 1, tzinfo=pytz.utc)
end = datetime(2006, 12, 31, tzinfo=pytz.utc)
start = datetime(year, 1, 1, tzinfo=pytz.utc)
end = datetime(year, 12, 31, tzinfo=pytz.utc)
trading_environment = TradingEnvironment(
benchmark_returns,
treasury_curves,