diff --git a/zipline/component.py b/zipline/component.py index 0004051f..7bcd8500 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -75,7 +75,8 @@ class Component(object): self.out_socket = None self.killed = False self.controller = None - self.heartbeat_timeout = 2000 + # timeout after a full minute + self.heartbeat_timeout = 60 *1000 self.state_flag = COMPONENT_STATE.OK self.error_state = COMPONENT_FAILURE.NOFAILURE self.on_done = None diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 81a3d8d9..03c94139 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -160,14 +160,13 @@ class PerformanceTracker(): self.total_days = self.trading_environment.days_in_period # one indexed so that we reach 100% self.day_count = 0.0 - self.cumulative_capital_used = 0.0 - self.max_capital_used = 0.0 self.capital_base = self.trading_environment.capital_base self.returns = [] self.txn_count = 0 self.event_count = 0 self.result_stream = None self.last_dict = None + self.order_log = [] # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( @@ -219,8 +218,8 @@ class PerformanceTracker(): 'period_start' : self.period_start, 'period_end' : self.period_end, 'progress' : self.progress, - 'cumulative_captial_used' : self.cumulative_capital_used, - 'max_capital_used' : self.max_capital_used, + 'cumulative_captial_used' : self.cumulative_perf.cumulative_capital_used, + 'max_capital_used' : self.cumulative_perf.max_capital_used, 'last_close' : self.market_close, 'last_open' : self.market_open, 'capital_base' : self.capital_base, @@ -230,38 +229,26 @@ class PerformanceTracker(): 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict(), 'timestamp' : datetime.datetime.now(), } + + def log_order(self, order): + self.order_log.append(order) def process_event(self, event): + assert isinstance(event, zp.namedict) self.event_count += 1 if(event.dt >= self.market_close): self.handle_market_close() - if not pandas.isnull(event.TRANSACTION): + if event.TRANSACTION: self.txn_count += 1 self.cumulative_performance.execute_transaction(event.TRANSACTION) self.todays_performance.execute_transaction(event.TRANSACTION) - - # we're adding a 10% cushion to the capital used, - # and then rounding to the nearest 5k - transaction_cost = event.TRANSACTION.price * event.TRANSACTION.amount - self.cumulative_capital_used += transaction_cost - - if math.fabs(self.cumulative_capital_used) > self.max_capital_used: - self.max_capital_used = math.fabs(self.cumulative_capital_used) - - cushioned_capital = 1.1 * self.max_capital_used - self.max_capital_used = self.round_to_nearest( - cushioned_capital, - base=5000 - ) - self.max_leverage = self.max_capital_used / self.capital_base - + #update last sale self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) - def handle_market_close(self): #calculate performance as of last trade @@ -317,13 +304,14 @@ class PerformanceTracker(): and send it out on the result_stream. """ + log_msg = "Simulated {n} trading days out of {m}." + qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) + qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. self.handle_market_close() - log_msg = "Simulated {n} trading days out of {m}." - qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) self.risk_report = risk.RiskReport( self.returns, @@ -338,9 +326,6 @@ class PerformanceTracker(): # this signals that the simulation is complete. self.result_stream.send("DONE") - def round_to_nearest(self, x, base=5): - return int(base * round(float(x)/base)) - class Position(): @@ -409,6 +394,8 @@ class PerformancePeriod(): self.starting_cash = starting_cash self.ending_cash = starting_cash self.processed_transactions = [] + self.cumulative_capital_used = 0.0 + self.max_capital_used = 0.0 self.calculate_performance() @@ -426,11 +413,40 @@ class PerformancePeriod(): self.returns = 0.0 def execute_transaction(self, txn): + + # Update Position + # ---------------- if(not self.positions.has_key(txn.sid)): self.positions[txn.sid] = Position(txn.sid) self.positions[txn.sid].update(txn) self.period_capital_used += -1 * txn.price * txn.amount + + + # Max Leverage + # --------------- + # Calculate the maximum capital used and maximum leverage + + transaction_cost = txn.price * txn.amount + self.cumulative_capital_used += transaction_cost + + if math.fabs(self.cumulative_capital_used) > self.max_capital_used: + self.max_capital_used = math.fabs(self.cumulative_capital_used) + + # We want to conveye a level, rather than a precise figure. + # round to the nearest 5,000 to keep the number easy on the eyes + self.max_capital_used = self.round_to_nearest( + self.max_capital_used, + base=5000 + ) + + # we're adding a 10% cushion to the capital used. + self.max_leverage = 1.1 * self.max_capital_used / self.starting_cash + + # add transaction to the list of processed transactions self.processed_transactions.append(txn) + + def round_to_nearest(self, x, base=5): + return int(base * round(float(x)/base)) def calculate_positions_value(self): mktValue = 0.0 diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 76ad2a19..cab1051a 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -3,6 +3,8 @@ import pytz import math import pandas +from collections import Counter + # from gevent.select import select from zmq.core.poll import select @@ -11,6 +13,17 @@ import zipline.util as qutil import zipline.protocol as zp import zipline.finance.performance as perf +from zipline.protocol_utils import Enum, namedict + +# the simulation style enumerates the available transaction simulation +# strategies. +SIMULATION_STYLE = Enum( + 'PARTIAL_VOLUME', + 'BUY_ALL', + 'FIXED_SLIPPAGE', + 'NOOP' +) + class TradeSimulationClient(qmsg.Component): def __init__(self, trading_environment): @@ -19,10 +32,13 @@ class TradeSimulationClient(qmsg.Component): self.prev_dt = None self.event_queue = None self.txn_count = 0 + self.order_count = 0 self.trading_environment = trading_environment self.current_dt = trading_environment.period_start self.last_iteration_dur = datetime.timedelta(seconds=0) self.algorithm = None + self.attempts = 0 + self.max_attempts = 1000 assert self.trading_environment.frame_index != None self.event_frame = pandas.DataFrame( @@ -47,8 +63,11 @@ class TradeSimulationClient(qmsg.Component): def open(self): self.result_feed = self.connect_result() self.order_socket = self.connect_order() + # send a wake up call to the order data source. + self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK)) def do_work(self): + # poll all the sockets socks = dict(self.poll.poll(self.heartbeat_timeout)) @@ -56,6 +75,8 @@ class TradeSimulationClient(qmsg.Component): if self.result_feed in socks and \ socks[self.result_feed] == self.zmq.POLLIN: + self.attempts = 0 + # get the next message from the result feed msg = self.result_feed.recv() @@ -65,8 +86,7 @@ class TradeSimulationClient(qmsg.Component): # signal the performance tracker that the simulation has # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() - # shutdown the feedback loop to the OrderDataSource - self.signal_order_done() + # signal Simulator, our ComponentHost, that this component is # done and Simulator needn't block exit on this component. self.signal_done() @@ -74,13 +94,21 @@ class TradeSimulationClient(qmsg.Component): # result_feed is a merge component, so unframe accordingly event = zp.MERGE_UNFRAME(msg) - + self.received_count += 1 # update performance and relay the event to the algorithm self.process_event(event) - # signal done to order source. + # signal loop is done for order source. self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK)) - + else: + # no events in the sock means the non-order sources are + # drained. Signal the order_source that we're done, and + # the done will cascade through the whole zipline. + # shutdown the feedback loop to the OrderDataSource + if self.attempts > self.max_attempts: + self.signal_order_done() + else: + self.attempts += 1 def process_event(self, event): # track the number of transactions, for testing purposes. if(event.TRANSACTION != None): @@ -107,12 +135,15 @@ class TradeSimulationClient(qmsg.Component): # otherwise, the algorithm has fallen behind the feed # and processing per event is longer than time between events. if event.dt >= self.current_dt: + # compress time by moving the current_time up to the event + # time. + self.current_dt = event.dt self.run_algorithm() # tally the time spent on this iteration self.last_iteration_dur = datetime.datetime.utcnow() - event_start # move the algorithm's clock forward to include iteration time - self.current_dt = self.current_dt + self.last_iteration_dur + self.current_dt = self.current_dt + self.last_iteration_dur def run_algorithm(self): @@ -132,13 +163,15 @@ class TradeSimulationClient(qmsg.Component): return self.connect_push_socket(self.addresses['order_address']) def order(self, sid, amount): + order = zp.namedict({ 'dt':self.current_dt, 'sid':sid, 'amount':amount }) - self.order_socket.send(zp.ORDER_FRAME(order)) + self.order_count += 1 + self.perf.log_order(order) def signal_order_done(self): self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) @@ -172,6 +205,8 @@ class OrderDataSource(qmsg.DataSource): """ qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE) self.sent_count = 0 + self.recv_count = Counter() + self.works = 0 @property def get_type(self): @@ -181,9 +216,10 @@ class OrderDataSource(qmsg.DataSource): @property def is_blocking(self): """ - This datasource is in a loop with the TradingSimulationClient + This datasource is in a loop with the TradingSimulationClient, + so we don't want it to block processing. """ - return False + return True def open(self): qmsg.DataSource.open(self) @@ -194,23 +230,20 @@ class OrderDataSource(qmsg.DataSource): def do_work(self): - - #TODO: if this is the first iteration, break deadlock by sending a dummy order - if(self.sent_count == 0): - self.send(zp.namedict({})) - - #pull all orders from client. - orders = [] - count = 0 + self.works += 1 - # TODO : this can be written in a concurrency agnostic - # way... have a chat with Fawce about this ~Steve + #pull all orders from client. + count = 0 + # one iteration of the client could include several orders + # so iterate until the client signals a break or a close. while True: # poll all the sockets # we reduce the timeout here by a factor of 2, because we need # to potentially receive the client's done message before the # controller or heartbeat times out. + + # TODO: shouldn't this block until we receive a message? socks = dict(self.poll.poll(self.heartbeat_timeout/2)) # see if the poller has results for the result_feed @@ -220,39 +253,59 @@ class OrderDataSource(qmsg.DataSource): order_msg = self.order_socket.recv() if order_msg == str(zp.ORDER_PROTOCOL.DONE): + qutil.LOGGER.info("order source is done") self.signal_done() + self.recv_count['done'] += 1 return if order_msg == str(zp.ORDER_PROTOCOL.BREAK): + # send a blank message to avoid an empty buffer + # in the feed + self.recv_count['break'] += 1 + if count == 0: + self.send(namedict({})) break order = zp.ORDER_UNFRAME(order_msg) + self.recv_count['order'] += 1 #send the order along self.send(order) count += 1 self.sent_count += 1 - - else: - # no orders, break out - break - - #TODO: we have to send at least one dummy order per do_work iteration - # or the feed will block waiting for our messages. - if(count == 0): - self.send(zp.namedict({})) - + class TransactionSimulator(qmsg.BaseTransform): - def __init__(self): + def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): qmsg.BaseTransform.__init__(self, zp.TRANSFORM_TYPE.TRANSACTION) self.open_orders = {} self.order_count = 0 self.txn_count = 0 - self.trade_window = datetime.timedelta(seconds=30) + self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) - self.volume_share = 0.05 self.commission = 0.03 + + if not style or style == SIMULATION_STYLE.PARTIAL_VOLUME: + self.apply_trade_to_open_orders = self.simulate_with_partial_volume + elif style == SIMULATION_STYLE.BUY_ALL: + self.apply_trade_to_open_orders = self.simulate_buy_all + elif style == SIMULATION_STYLE.FIXED_SLIPPAGE: + self.apply_trade_to_open_orders = self.simulate_with_fixed_cost + elif style == SIMULATION_STYLE.NOOP: + self.apply_trade_to_open_orders = self.simulate_noop + + # + @property + def is_blocking(self): + """ + Including this explicitly for clarity, even though we are using the + default value. TransactionSimulator has a defined action for every + event type. Downstream components depend on the presence of the + TRANSACTION transform in all cases. When no transaction happens, + None is the value. Thus, we do want merging to block on the + availability of transaction messages. + """ + return True def transform(self, event): """ @@ -267,9 +320,12 @@ class TransactionSimulator(qmsg.BaseTransform): self.state['value'] = txn else: self.state['value'] = None - qutil.LOGGER.info("unexpected event type in transform: {etype}".format(etype=event.type)) + log = "unexpected event type in transform: {etype}".format( + etype=event.type + ) + qutil.LOGGER.info(log) + #TODO: what to do if we get another kind of datasource event.type? - return self.state def add_open_order(self, event): @@ -277,63 +333,143 @@ class TransactionSimulator(qmsg.BaseTransform): Amount is explicitly converted to an int. Orders of amount zero are ignored. """ + self.order_count += 1 + event.amount = int(event.amount) if event.amount == 0: - qutil.LOGGER.debug("requested to trade zero shares of {sid}".format(sid=event.sid)) + log = "requested to trade zero shares of {sid}".format( + sid=event.sid + ) + qutil.LOGGER.debug(log) return - self.order_count += 1 + if(not self.open_orders.has_key(event.sid)): self.open_orders[event.sid] = [] + + # set the filled property to zero + event.filled = 0 self.open_orders[event.sid].append(event) - def apply_trade_to_open_orders(self, event): + def simulate_buy_all(self, event): + txn = self.create_transaction( + event.sid, + event.volume, + event.price, + event.dt, + 1 + ) + return txn - if(event.volume == 0): - #there are zero volume events bc some stocks trade - #less frequently than once per minute. - return self.create_dummy_txn(event.dt) - + def simulate_noop(self, event): + return None + + def simulate_with_fixed_cost(self, event): if self.open_orders.has_key(event.sid): orders = self.open_orders[event.sid] + orders = sorted(orders, key=lambda o: o.dt) else: return None - remaining_orders = [] - total_order = 0 - dt = event.dt - + amount = 0 for order in orders: - #we're using minute bars, so allow orders within - #30 seconds of the trade - if((order.dt - event.dt) < self.trade_window): - total_order += order.amount - if(order.dt > dt): - dt = order.dt - #if the order still has time to live (TTL) keep track - elif((self.algo_time - order.dt) < self.orderTTL): - remaining_orders.append(order) - - self.open_orders[event.sid] = remaining_orders - - if(total_order != 0): - direction = total_order / math.fabs(total_order) - else: - direction = 1 + amount += order.amount + + if(amount == 0): + return - volume_share = (direction * total_order) / event.volume - if volume_share > .25: - volume_share = .25 - amount = volume_share * event.volume * direction - impact = (volume_share)**2 * .1 * direction * event.price - return self.create_transaction( - event.sid, - amount, - event.price + impact, - dt.replace(tzinfo = pytz.utc), - direction - ) + direction = amount / math.fabs(amount) + + + txn = self.create_transaction( + event.sid, + amount, + event.price + 0.10, + event.dt, + direction + ) + + self.open_orders[event.sid] = [] + + return txn + + def simulate_with_partial_volume(self, event): + if(event.volume == 0): + #there are zero volume events bc some stocks trade + #less frequently than once per minute. + return None + + if self.open_orders.has_key(event.sid): + orders = self.open_orders[event.sid] + orders = sorted(orders, key=lambda o: o.dt) + else: + return None + + dt = event.dt + expired = [] + total_order = 0 + simulated_amount = 0 + simulated_impact = 0.0 + direction = 1.0 + for order in orders: + + if(order.dt < event.dt): + + # orders are only good on the day they are issued + if order.dt.day < event.dt.day: + continue + + open_amount = order.amount - order.filled + + if(open_amount != 0): + direction = open_amount / math.fabs(open_amount) + else: + direction = 1 + + desired_order = total_order + open_amount + + volume_share = direction * (desired_order) / event.volume + if volume_share > .25: + volume_share = .25 + simulated_amount = int(volume_share * event.volume * direction) + simulated_impact = (volume_share)**2 * .1 * direction * event.price + + order.filled += (simulated_amount - total_order) + total_order = simulated_amount + + # we cap the volume share at 25% of a trade + if volume_share == .25: + break + + if simulated_amount == 0: + warning = """ +Calculated a zero volume transation on trade: +{event} +for order: +{order} + """ + warning = warning.format( + event=str(event), + order=str(order) + ) + qutil.LOGGER.warn(warning) + + orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] + + self.open_orders[event.sid] = orders + + + if simulated_amount != 0: + return self.create_transaction( + event.sid, + simulated_amount, + event.price + simulated_impact, + dt.replace(tzinfo = pytz.utc), + direction + ) + else: + return None def create_transaction(self, sid, amount, price, dt, direction): @@ -445,7 +581,15 @@ class TradingEnvironment(object): return len(self.period_trading_days) - + def is_market_hours(self, test_date): + if not self.is_trading_day(test_date): + return False + + mkt_open = self.set_NYSE_time(test_date, 9, 30) + #TODO: half days? + mkt_close = self.set_NYSE_time(test_date, 16, 00) + + return test_date >= mkt_open and test_date <= mkt_close def is_trading_day(self, test_date): dt = self.normalize_date(test_date) diff --git a/zipline/lines.py b/zipline/lines.py index 2123cfdf..43c2eac0 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -90,7 +90,7 @@ from zipline.finance.trading import TransactionSimulator, OrderDataSource, \ TradeSimulationClient from zipline.simulator import AddressAllocator, Simulator from zipline.monitor import Controller - +from zipline.finance.trading import SIMULATION_STYLE class SimulatedTrading(object): @@ -125,11 +125,15 @@ class SimulatedTrading(object): :py:class:`zipline.simulator.AddressAllocator` - simulator_class: a :py:class:`zipline.messaging.ComponentHost` subclass (not an instance) + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) self.algorithm = config['algorithm'] self.allocator = config['allocator'] self.trading_environment = config['trading_environment'] + self.sim_style = config.get('simulation_style') self.leased_sockets = [] self.sim_context = None @@ -169,7 +173,7 @@ class SimulatedTrading(object): self.add_source(self.order_source) #setup transforms - self.transaction_sim = TransactionSimulator() + self.transaction_sim = TransactionSimulator(self.sim_style) self.transforms = {} self.add_transform(self.transaction_sim) @@ -191,16 +195,20 @@ class SimulatedTrading(object): - sid - an integer, which will be used as the security ID. - order_count - the number of orders the test algo will place, defaults to 100 - - trade_count - the number of trades to simulate, defaults to 100 + - order_amount - the number of shares per order, defaults to 100 + - trade_count - the number of trades to simulate, defaults to 101 + to ensure all orders are processed. - simulator_class - optional parameter that provides an alternative subclass of ComponentHost to hold the whole zipline. Defaults to :py:class:`zipline.simulator.Simulator` - algorithm - optional parameter providing an algorithm. defaults to :py:class:`zipline.test.algorithms.TestAlgorithm` - - random - optional parameter to request random trades. if present - :py:class:`zipline.sources.RandomEquityTrades` is the source. If - not :py:class:`ziplien.sources.SpecificEquityTrades` is the - source + - trade_source - optional parameter to specify trades, if present. + If not present :py:class:`ziplien.sources.SpecificEquityTrades` + is the source, with daily frequency in trades. + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) @@ -219,28 +227,35 @@ class SimulatedTrading(object): order_count = config['order_count'] else: order_count = 100 + + if config.has_key('order_amount'): + order_amount = config['order_amount'] + else: + order_amount = 100 if config.has_key('trade_count'): trade_count = config['trade_count'] else: - trade_count = 100 + # to ensure all orders are filled, we provide one more + # trade than order + trade_count = 101 if config.has_key('simulator_class'): simulator_class = config['simulator_class'] else: simulator_class = Simulator + + simulation_style = config.get('simulation_style') + if not simulation_style: + simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE #------------------- # Trade Source #------------------- sids = [sid] #------------------- - if config.has_key('random'): - trade_source = factory.create_random_trade_source( - sids, - trade_count, - trading_environment - ) + if config.has_key('trade_source'): + trade_source = config['trade_source'] else: trade_source = factory.create_daily_trade_source( sids, @@ -253,7 +268,6 @@ class SimulatedTrading(object): if config.has_key('algorithm'): test_algo = config['algorithm'] else: - order_amount = 100 test_algo = TestAlgorithm( sid, order_amount, @@ -266,7 +280,8 @@ class SimulatedTrading(object): 'algorithm':test_algo, 'trading_environment':trading_environment, 'allocator':allocator, - 'simulator_class':simulator_class + 'simulator_class':simulator_class, + 'simulation_style':simulation_style }) #------------------- diff --git a/zipline/messaging.py b/zipline/messaging.py index 2c4caebc..b36dea95 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -4,6 +4,8 @@ Commonly used messaging components. import datetime +from collections import Counter + import zipline.util as qutil from zipline.component import Component import zipline.protocol as zp @@ -37,7 +39,7 @@ class ComponentHost(Component): # ---------------------- self.sync_register = {} - self.timeout = datetime.timedelta(seconds=5) + self.timeout = datetime.timedelta(seconds=60) self.feed = Feed() self.merge = Merge() @@ -81,11 +83,11 @@ class ComponentHost(Component): self.sync_register[component.get_id] = datetime.datetime.utcnow() if isinstance(component, DataSource): - self.feed.add_source(component.get_id) + self.feed.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 if isinstance(component, BaseTransform): - self.merge.add_source(component.get_id) + self.merge.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 @@ -192,6 +194,13 @@ class Feed(Component): # Depending on the size of this, might want to use a data # structure with better asymptotics. self.data_buffer = {} + + # source_id -> integer count + self.sent_counters = Counter() + self.recv_counters = Counter() + + # source_id -> boolean. True is for blocking + self.is_blocking_map = {} def init(self): pass @@ -214,7 +223,7 @@ class Feed(Component): def do_work(self): # wait for synchronization reply from the host - socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds. + socks = dict(self.poll.poll(self.heartbeat_timeout)) # TODO: Abstract this out, maybe on base component if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: @@ -294,6 +303,7 @@ class Feed(Component): event = self.next() if(event != None): self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) + self.sent_counters[event.source_id] += 1 self.sent_count += 1 def append(self, event): @@ -302,6 +312,7 @@ class Feed(Component): source_id. """ self.data_buffer[event.source_id].append(event) + self.recv_counters[event.source_id] += 1 self.received_count += 1 def next(self): @@ -336,9 +347,12 @@ class Feed(Component): def is_full(self): """ Indicates whether the buffer has messages in buffer for - all un-DONE sources. + all un-DONE, blocking sources. """ - for events in self.data_buffer.values(): + for source_id, events in self.data_buffer.iteritems(): + if not self.is_blocking_map[source_id]: + continue + if len(events) == 0: return False return True @@ -353,11 +367,12 @@ class Feed(Component): total += len(events) return total - def add_source(self, source_id): + def add_source(self, source_id, is_blocking=True): """ Add a data source to the buffer. """ self.data_buffer[source_id] = [] + self.is_blocking_map[source_id] = is_blocking def __len__(self): """ diff --git a/zipline/sources.py b/zipline/sources.py index b1385754..bf08644c 100644 --- a/zipline/sources.py +++ b/zipline/sources.py @@ -94,6 +94,7 @@ class SpecificEquityTrades(TradeDataSource): def get_type(self): zp.COMPONENT_TYPE.SOURCE + def do_work(self): if(len(self.event_list) == 0): self.signal_done() diff --git a/zipline/test/algorithms.py b/zipline/test/algorithms.py index 49ec15c0..3a7bfc4d 100644 --- a/zipline/test/algorithms.py +++ b/zipline/test/algorithms.py @@ -70,14 +70,14 @@ class TestAlgorithm(): def handle_frame(self, frame): self.frame_count += 1 - #place an order for 100 shares of sid:133 + #place an order for 100 shares of sid if self.incr < self.count: self.order(self.sid, self.amount) self.incr += 1 def get_sid_filter(self): - return [self.sid] - + return [self.sid] + class NoopAlgorithm(object): """ Dolce fa niente. diff --git a/zipline/test/factory.py b/zipline/test/factory.py index 4954cabd..2c23b44f 100644 --- a/zipline/test/factory.py +++ b/zipline/test/factory.py @@ -38,12 +38,12 @@ def load_market_data(): return bm_returns, tr_curves -def create_trading_environment(): +def create_trading_environment(year=2006): """Construct a complete environment with reasonable defaults""" benchmark_returns, treasury_curves = load_market_data() - start = datetime(2006, 1, 1, tzinfo=pytz.utc) - end = datetime(2006, 12, 31, tzinfo=pytz.utc) + start = datetime(year, 1, 1, tzinfo=pytz.utc) + end = datetime(year, 12, 31, tzinfo=pytz.utc) trading_environment = TradingEnvironment( benchmark_returns, treasury_curves, @@ -68,7 +68,7 @@ def get_next_trading_dt(current, interval, trading_calendar): next = current while True: next = next + interval - if trading_calendar.is_trading_day(next): + if trading_calendar.is_market_hours(next): break return next @@ -79,9 +79,9 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar): for price, amount in zip(prices, amounts): - current = get_next_trading_dt(current, interval, trading_calendar) trade = create_trade(sid, price, amount, current) trades.append(trade) + current = get_next_trading_dt(current, interval, trading_calendar) assert len(trades) == len(prices) return trades @@ -167,6 +167,7 @@ def create_random_trade_source(sid, trade_count, trading_environment): return source def create_daily_trade_source(sids, trade_count, trading_environment): + """ creates trade_count trades for each sid in sids list. first trade will be on trading_environment.period_start, and daily @@ -176,12 +177,38 @@ def create_daily_trade_source(sids, trade_count, trading_environment): Important side-effect: trading_environment.period_end will be modified to match the day of the final trade. """ + return create_trade_source( + sids, + trade_count, + timedelta(days=1), + trading_environment + ) + + +def create_minutely_trade_source(sids, trade_count, trading_environment): + + """ + creates trade_count trades for each sid in sids list. + first trade will be on trading_environment.period_start, and every minute + thereafter for each sid. Thus, two sids should result in two trades per + minute. + + Important side-effect: trading_environment.period_end will be modified + to match the day of the final trade. + """ + return create_trade_source( + sids, + trade_count, + timedelta(minutes=1), + trading_environment + ) + +def create_trade_source(sids, trade_count, trade_time_increment, trading_environment): trade_history = [] for sid in sids: price = [10.1] * trade_count volume = [100] * trade_count start_date = trading_environment.first_open - trade_time_increment = timedelta(days=1) generated_trades = create_trade_history( sid, diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 52538f6a..2ff7da61 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -21,8 +21,12 @@ TradeSimulationClient, TradingEnvironment from zipline.simulator import AddressAllocator, Simulator from zipline.monitor import Controller from zipline.lines import SimulatedTrading +from zipline.finance.performance import PerformanceTracker +from zipline.protocol_utils import namedict +from zipline.finance.trading import SIMULATION_STYLE DEFAULT_TIMEOUT = 15 # seconds +EXTENDED_TIMEOUT = 90 allocator = AddressAllocator(1000) @@ -103,12 +107,21 @@ class FinanceTestCase(TestCase): self.assertTrue(env.last_close.month == 12) self.assertTrue(env.last_close.day == 31) + # The following two tests appear broken no that the order source is + # non blocking. HUNCH: The trades are streaming through before the orders + # are placed. + @timed(DEFAULT_TIMEOUT) def test_orders(self): # Simulation # ---------- - zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + + self.zipline_test_config['simulation_style'] = \ + SIMULATION_STYLE.FIXED_SLIPPAGE + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) zipline.simulate(blocking=True) self.assertTrue(zipline.sim.ready()) @@ -118,8 +131,70 @@ class FinanceTestCase(TestCase): self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ "The feed should be drained of all messages, found {n} remaining." \ .format(n=zipline.sim.feed.pending_messages())) + + # the trading client should receive one transaction for every + # order placed. + self.assertEqual( + zipline.trading_client.txn_count, + zipline.trading_client.order_count + ) + + # the number of transactions in the performance tracker's cumulative + # period should be the same as the number of orders place by the + # algorithm. + self.assertEqual( + zipline.trading_client.order_count, + len(zipline.trading_client.perf.cumulative_performance.processed_transactions) + ) + + @timed(EXTENDED_TIMEOUT) + def test_aggressive_buying(self): + # Simulation + # ---------- + + # TODO: for some reason the orders aren't filled without an extra + # trade. + trade_count = 5001 + self.zipline_test_config['order_count'] = trade_count - 1 + self.zipline_test_config['trade_count'] = trade_count + self.zipline_test_config['order_amount'] = 1 + + # tell the simulator to fill the orders in individual transactions + # matching the order volume exactly. + self.zipline_test_config['simulation_style'] = \ + SIMULATION_STYLE.FIXED_SLIPPAGE + self.zipline_test_config['environment'] = factory.create_trading_environment() + + sid_list = [self.zipline_test_config['sid']] + + self.zipline_test_config['trade_source'] = factory.create_minutely_trade_source( + sid_list, + trade_count, + self.zipline_test_config['environment'] + ) + + zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + zipline.simulate(blocking=True) + + self.assertTrue(zipline.sim.ready()) + self.assertFalse(zipline.sim.exception) + + self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ + "The feed should be drained of all messages, found {n} remaining." \ + .format(n=zipline.sim.feed.pending_messages())) + + # + # the trading client should receive one transaction for every + # order placed. + self.assertEqual( + zipline.trading_client.txn_count, + zipline.trading_client.order_count + ) + + + @timed(DEFAULT_TIMEOUT) def test_performance(self): #provide enough trades to ensure all orders are filled. @@ -204,7 +279,9 @@ class FinanceTestCase(TestCase): self.zipline_test_config['trade_count'] = 200 self.zipline_test_config['algorithm'] = test_algo - zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) zipline.simulate(blocking=True) #check that the algorithm received no events @@ -214,8 +291,201 @@ class FinanceTestCase(TestCase): "The algorithm should not receive any events due to filtering." ) + + # TODO: write tests for short sales + # TODO: write a test to do massive buying or shorting. + + @timed(DEFAULT_TIMEOUT) + def test_partially_filled_orders(self): + + # create a scenario where order size and trade size are equal + # so that orders must be spread out over several trades. + params ={ + 'trade_count':360, + 'trade_amount':100, + 'trade_interval': timedelta(minutes=1), + 'order_count':2, + 'order_amount':100, + 'order_interval': timedelta(minutes=1), + # because we placed an order for 100 shares, and the volume + # of each trade is 100, the simulator should spread the order + # into 4 trades of 25 shares per order. + 'expected_txn_count':8, + 'expected_txn_volume':2 * 100 + } + + self.transaction_sim(**params) + + # same scenario, but with short sales + params2 ={ + 'trade_count':360, + 'trade_amount':100, + 'trade_interval': timedelta(minutes=1), + 'order_count':2, + 'order_amount':-100, + 'order_interval': timedelta(minutes=1), + 'expected_txn_count':8, + 'expected_txn_volume':2 * -100 + } + + self.transaction_sim(**params2) + + @timed(DEFAULT_TIMEOUT) + def test_collapsing_orders(self): + # create a scenario where order.amount <<< trade.volume + # to test that several orders can be covered properly by one trade. + params1 ={ + 'trade_count':6, + 'trade_amount':100, + 'trade_interval': timedelta(hours=1), + 'order_count':24, + 'order_amount':1, + 'order_interval': timedelta(minutes=1), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count':1, + 'expected_txn_volume':24 * 1 + } + self.transaction_sim(**params1) + + # second verse, same as the first. except short! + params2 ={ + 'trade_count':6, + 'trade_amount':100, + 'trade_interval': timedelta(hours=1), + 'order_count':24, + 'order_amount':-1, + 'order_interval': timedelta(minutes=1), + 'expected_txn_count':1, + 'expected_txn_volume':24 * -1 + } + self.transaction_sim(**params2) + + @timed(DEFAULT_TIMEOUT) + def test_partial_expiration_orders(self): + # create a scenario where orders expire without being filled + # entirely + params1 = { + 'trade_count':100, + 'trade_amount':100, + 'trade_delay': timedelta(minutes=5), + 'trade_interval': timedelta(days=1), + 'order_count':3, + 'order_amount':1000, + 'order_interval': timedelta(minutes=30), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count' : 1, + 'expected_txn_volume' : 25 + } + self.transaction_sim(**params1) + + # same scenario, but short sales. + params2 = { + 'trade_count':100, + 'trade_amount':100, + 'trade_delay': timedelta(minutes=5), + 'trade_interval': timedelta(days=1), + 'order_count':3, + 'order_amount':1000, + 'order_interval': timedelta(minutes=30), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count' : 1, + 'expected_txn_volume' : 25 + } + self.transaction_sim(**params2) + + + + def transaction_sim(self, **params): + + trade_count = params['trade_count'] + trade_amount = params['trade_amount'] + trade_interval = params['trade_interval'] + trade_delay = params.get('trade_delay') + order_count = params['order_count'] + order_amount = params['order_amount'] + order_interval = params['order_interval'] + expected_txn_count = params['expected_txn_count'] + expected_txn_volume = params['expected_txn_volume'] + + trading_environment = factory.create_trading_environment() + trade_sim = TransactionSimulator() + price = [10.1] * trade_count + volume = [100] * trade_count + start_date = trading_environment.first_open + sid = 1 + generated_trades = factory.create_trade_history( + sid, + price, + volume, + trade_interval, + trading_environment + ) + + for i in range(order_count): + order = namedict( + { + 'sid':sid, + 'amount':order_amount, + 'type':zp.DATASOURCE_TYPE.ORDER, + 'dt' : start_date + i * order_interval + }) + sim_state = trade_sim.transform(order) + + # there should not be a new transaction from an order. + self.assertTrue(sim_state['name'] == trade_sim.get_id) + self.assertTrue(sim_state['value'] == None) + + # there should now be one open order list stored under the sid + oo = trade_sim.open_orders + self.assertEqual(len(oo), 1) + self.assertTrue(oo.has_key(sid)) + order_list = oo[sid] + self.assertEqual(order_count, len(order_list)) + + for order in order_list: + self.assertEqual(order.sid, sid) + self.assertEqual(order.amount, order_amount) + + + tracker = PerformanceTracker(trading_environment) + + transactions = [] + for trade in generated_trades: + if trade_delay: + trade.dt = trade.dt + trade_delay + + sim_state = trade_sim.transform(trade) + + self.assertEqual(sim_state['name'], trade_sim.get_id) - - + txn = None + if sim_state['value']: + txn = sim_state['value'] + transactions.append(txn) + trade[sim_state['name']] = txn + + tracker.process_event(trade) + + total_volume = 0 + for txn in transactions: + total_volume += txn.amount + + self.assertEqual(total_volume, expected_txn_volume) + self.assertEqual(len(transactions), expected_txn_count) + + cumulative_pos = tracker.cumulative_performance.positions[sid] + self.assertEqual(total_volume, cumulative_pos.amount) + + # the open orders should now be empty + oo = trade_sim.open_orders + self.assertTrue(oo.has_key(sid)) + order_list = oo[sid] + self.assertEqual(0, len(order_list)) + + + \ No newline at end of file diff --git a/zipline/test/test_perf_tracking.py b/zipline/test/test_perf_tracking.py index bd6efe7c..be7c9a25 100644 --- a/zipline/test/test_perf_tracking.py +++ b/zipline/test/test_perf_tracking.py @@ -22,8 +22,15 @@ class PerformanceTestCase(unittest.TestCase): 0, len(self.treasury_curves) ) - self.dt = self.treasury_curves.keys()[random_index] - self.end_dt = self.dt + datetime.timedelta(days=365) + for n in range(100): + self.dt = self.treasury_curves.keys()[random_index] + self.end_dt = self.dt + datetime.timedelta(days=365) + + now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) + + if self.end_dt <= now: + break + self.trading_environment = TradingEnvironment( self.benchmark_returns, self.treasury_curves, @@ -505,8 +512,6 @@ shares in position" price = 10.1 price_list = [price] * trade_count volume = [100] * trade_count - #start_date = datetime.datetime.strptime("01/01/2011","%m/%d/%Y") - #start_date = start_date.replace(tzinfo=pytz.utc) trade_time_increment = datetime.timedelta(days=1) trade_history = factory.create_trade_history( sid,