diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 799107d8..03c94139 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -166,6 +166,7 @@ class PerformanceTracker(): self.event_count = 0 self.result_stream = None self.last_dict = None + self.order_log = [] # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( @@ -228,6 +229,9 @@ class PerformanceTracker(): 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict(), 'timestamp' : datetime.datetime.now(), } + + def log_order(self, order): + self.order_log.append(order) def process_event(self, event): assert isinstance(event, zp.namedict) @@ -300,13 +304,14 @@ class PerformanceTracker(): and send it out on the result_stream. """ + log_msg = "Simulated {n} trading days out of {m}." + qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) + qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. self.handle_market_close() - log_msg = "Simulated {n} trading days out of {m}." - qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) self.risk_report = risk.RiskReport( self.returns, diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index e6d1fb31..77eb8ca5 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -119,12 +119,15 @@ class TradeSimulationClient(qmsg.Component): # otherwise, the algorithm has fallen behind the feed # and processing per event is longer than time between events. if event.dt >= self.current_dt: + # compress time by moving the current_time up to the event + # time. + self.current_dt = event.dt self.run_algorithm() # tally the time spent on this iteration self.last_iteration_dur = datetime.datetime.utcnow() - event_start # move the algorithm's clock forward to include iteration time - self.current_dt = self.current_dt + self.last_iteration_dur + # self.current_dt = self.current_dt + self.last_iteration_dur def run_algorithm(self): @@ -152,6 +155,7 @@ class TradeSimulationClient(qmsg.Component): }) self.order_socket.send(zp.ORDER_FRAME(order)) self.order_count += 1 + self.perf.log_order(order) def signal_order_done(self): self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) @@ -185,6 +189,7 @@ class OrderDataSource(qmsg.DataSource): """ qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE) self.sent_count = 0 + self.works = 0 @property def get_type(self): @@ -194,7 +199,8 @@ class OrderDataSource(qmsg.DataSource): @property def is_blocking(self): """ - This datasource is in a loop with the TradingSimulationClient + This datasource is in a loop with the TradingSimulationClient, + so we don't want it to block processing. """ return False @@ -207,17 +213,12 @@ class OrderDataSource(qmsg.DataSource): def do_work(self): - - #TODO: if this is the first iteration, break deadlock by sending a dummy order - if(self.sent_count == 0): - self.send(zp.namedict({})) + self.works += 1 + #pull all orders from client. orders = [] count = 0 - - # TODO : this can be written in a concurrency agnostic - # way... have a chat with Fawce about this ~Steve while True: # poll all the sockets @@ -246,17 +247,6 @@ class OrderDataSource(qmsg.DataSource): self.send(order) count += 1 self.sent_count += 1 - - # TODO: why didn't any unit tests catch this bug???? - - #else: - # # no orders, break out - # break - - #TODO: we have to send at least one dummy order per do_work iteration - # or the feed will block waiting for our messages. - if(count == 0): - self.send(zp.namedict({})) class TransactionSimulator(qmsg.BaseTransform): @@ -278,6 +268,19 @@ class TransactionSimulator(qmsg.BaseTransform): elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop + # + @property + def is_blocking(self): + """ + Including this explicitly for clarity, even though we are using the + default value. TransactionSimulator has a defined action for every + event type. Downstream components depend on the presence of the + TRANSACTION transform in all cases. When no transaction happens, + None is the value. Thus, we do want merging to block on the + availability of transaction messages. + """ + return True + def transform(self, event): """ Pulls one message from the event feed, then @@ -304,6 +307,8 @@ class TransactionSimulator(qmsg.BaseTransform): Amount is explicitly converted to an int. Orders of amount zero are ignored. """ + self.order_count += 1 + event.amount = int(event.amount) if event.amount == 0: log = "requested to trade zero shares of {sid}".format( @@ -312,7 +317,7 @@ class TransactionSimulator(qmsg.BaseTransform): qutil.LOGGER.debug(log) return - self.order_count += 1 + if(not self.open_orders.has_key(event.sid)): self.open_orders[event.sid] = [] @@ -321,9 +326,6 @@ class TransactionSimulator(qmsg.BaseTransform): event.filled = 0 self.open_orders[event.sid].append(event) - #def apply_trade_to_open_orders(self, event): - # return self.simulate_with_fixed_cost(event) - def simulate_buy_all(self, event): txn = self.create_transaction( event.sid, @@ -348,10 +350,11 @@ class TransactionSimulator(qmsg.BaseTransform): for order in orders: amount += order.amount - if(amount != 0): - direction = amount / math.fabs(amount) - else: - direction = 1 + if(amount == 0): + return + + direction = amount / math.fabs(amount) + txn = self.create_transaction( event.sid, @@ -426,9 +429,9 @@ for order: ) qutil.LOGGER.warn(warning) - orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] + #orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] - self.open_orders[event.sid] = orders + #self.open_orders[event.sid] = orders if simulated_amount != 0: diff --git a/zipline/lines.py b/zipline/lines.py index 466de2e3..43c2eac0 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -125,6 +125,9 @@ class SimulatedTrading(object): :py:class:`zipline.simulator.AddressAllocator` - simulator_class: a :py:class:`zipline.messaging.ComponentHost` subclass (not an instance) + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) self.algorithm = config['algorithm'] @@ -203,6 +206,9 @@ class SimulatedTrading(object): - trade_source - optional parameter to specify trades, if present. If not present :py:class:`ziplien.sources.SpecificEquityTrades` is the source, with daily frequency in trades. + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) @@ -230,12 +236,18 @@ class SimulatedTrading(object): if config.has_key('trade_count'): trade_count = config['trade_count'] else: + # to ensure all orders are filled, we provide one more + # trade than order trade_count = 101 if config.has_key('simulator_class'): simulator_class = config['simulator_class'] else: simulator_class = Simulator + + simulation_style = config.get('simulation_style') + if not simulation_style: + simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE #------------------- # Trade Source @@ -269,7 +281,7 @@ class SimulatedTrading(object): 'trading_environment':trading_environment, 'allocator':allocator, 'simulator_class':simulator_class, - 'simulation_style':SIMULATION_STYLE.FIXED_SLIPPAGE + 'simulation_style':simulation_style }) #------------------- diff --git a/zipline/messaging.py b/zipline/messaging.py index 7817ec8b..75e181b0 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -4,6 +4,8 @@ Commonly used messaging components. import datetime +from collections import Counter + import zipline.util as qutil from zipline.component import Component import zipline.protocol as zp @@ -81,11 +83,11 @@ class ComponentHost(Component): self.sync_register[component.get_id] = datetime.datetime.utcnow() if isinstance(component, DataSource): - self.feed.add_source(component.get_id) + self.feed.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 if isinstance(component, BaseTransform): - self.merge.add_source(component.get_id) + self.merge.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 @@ -192,6 +194,13 @@ class Feed(Component): # Depending on the size of this, might want to use a data # structure with better asymptotics. self.data_buffer = {} + + # source_id -> integer count + self.sent_counters = Counter() + self.recv_counters = Counter() + + # source_id -> boolean. True is for blocking + self.is_blocking_map = {} def init(self): pass @@ -294,6 +303,7 @@ class Feed(Component): event = self.next() if(event != None): self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) + self.sent_counters[event.source_id] += 1 self.sent_count += 1 def append(self, event): @@ -302,6 +312,7 @@ class Feed(Component): source_id. """ self.data_buffer[event.source_id].append(event) + self.recv_counters[event.source_id] += 1 self.received_count += 1 def next(self): @@ -338,7 +349,10 @@ class Feed(Component): Indicates whether the buffer has messages in buffer for all un-DONE sources. """ - for events in self.data_buffer.values(): + for source_id, events in self.data_buffer.iteritems(): + if not self.is_blocking_map[source_id]: + continue + if len(events) == 0: return False return True @@ -353,11 +367,12 @@ class Feed(Component): total += len(events) return total - def add_source(self, source_id): + def add_source(self, source_id, is_blocking=True): """ Add a data source to the buffer. """ self.data_buffer[source_id] = [] + self.is_blocking_map[source_id] = is_blocking def __len__(self): """ diff --git a/zipline/test/algorithms.py b/zipline/test/algorithms.py index 522bf76f..3a7bfc4d 100644 --- a/zipline/test/algorithms.py +++ b/zipline/test/algorithms.py @@ -70,7 +70,7 @@ class TestAlgorithm(): def handle_frame(self, frame): self.frame_count += 1 - #place an order for 100 shares of sid:133 + #place an order for 100 shares of sid if self.incr < self.count: self.order(self.sid, self.amount) self.incr += 1 diff --git a/zipline/test/factory.py b/zipline/test/factory.py index 381fbb7e..2c23b44f 100644 --- a/zipline/test/factory.py +++ b/zipline/test/factory.py @@ -38,12 +38,12 @@ def load_market_data(): return bm_returns, tr_curves -def create_trading_environment(): +def create_trading_environment(year=2006): """Construct a complete environment with reasonable defaults""" benchmark_returns, treasury_curves = load_market_data() - start = datetime(2006, 1, 1, tzinfo=pytz.utc) - end = datetime(2006, 12, 31, tzinfo=pytz.utc) + start = datetime(year, 1, 1, tzinfo=pytz.utc) + end = datetime(year, 12, 31, tzinfo=pytz.utc) trading_environment = TradingEnvironment( benchmark_returns, treasury_curves,