diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 065d5095..92c96f3a 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -197,13 +197,12 @@ class PerformanceTracker(object): # save the transactions for the daily periods keep_transactions = True ) - + for sid in sid_list: self.cumulative_performance.positions[sid] = Position(sid) self.todays_performance.positions[sid] = Position(sid) def update(self, event): - import nose.tools; nose.tools.set_trace() event.perf_message = self.process_event(event) event.portfolio = self.get_portfolio() del event['TRANSACTION'] @@ -247,6 +246,8 @@ class PerformanceTracker(object): def process_event(self, event): + + message = None if self.exceeded_max_loss: return @@ -255,7 +256,7 @@ class PerformanceTracker(object): self.event_count += 1 if(event.dt >= self.market_close): - self.handle_market_close() + message = self.handle_market_close() if event.TRANSACTION: self.txn_count += 1 @@ -270,8 +271,10 @@ class PerformanceTracker(object): self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() - def handle_market_close(self): + return message + def handle_market_close(self): + # add the return results from today to the list of DailyReturn objects. todays_date = self.market_close.replace(hour=0, minute=0, second=0) todays_return_obj = risk.DailyReturn( @@ -293,14 +296,9 @@ class PerformanceTracker(object): # calculate progress of test self.progress = self.day_count / self.total_days - # TODO!!!! + #TODO TODO TODO!! + daily_update = self.to_dict() - # Output results - if self.results_socket: - msg = zp.PERF_FRAME(self.to_dict()) - self.results_socket.send(msg) - - # if self.trading_environment.max_drawdown: returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown @@ -311,7 +309,7 @@ class PerformanceTracker(object): # so it shows up in the update, but don't end the test # here. Let the update go out before stopping self.exceeded_max_loss = True - return + return daily_update #move the market day markers forward @@ -333,6 +331,8 @@ class PerformanceTracker(object): self.market_close, keep_transactions = True ) + + return daily_update def handle_simulation_end(self): """ @@ -369,8 +369,8 @@ class Position(object): self.sid = sid self.amount = 0 self.cost_basis = 0.0 ##per share - self.last_sale_price = None - self.last_sale_date = None + self.last_sale_price = 0.0 + self.last_sale_date = 0.0 def update(self, txn): if(self.sid != txn.sid): diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index 7420e1b4..8552b530 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -49,7 +49,7 @@ def fuzzy_dates(count = 500): for date in date_gen(count = count): yield date + timedelta(seconds = random.randint(-10, 10)) -def SpecificEquityTrades(*args, **config): +class SpecificEquityTrades(object): """ Yields all events in event_list that match the given sid_filter. If no event_list is specified, generates an internal stream of events @@ -57,71 +57,85 @@ def SpecificEquityTrades(*args, **config): Configuration options: - count: integer representing number of trades - sids : list of values representing simulated internal sids - start: start date - delta: timedelta between internal events - - + count : integer representing number of trades + sids : list of values representing simulated internal sids + start : start date + delta : timedelta between internal events + filter : filter to remove the sids """ - # We shouldn't get any positional arguments. - assert args == () - # Unpack config dictionary with default values. - count = config.get('count', 500) - sids = config.get('sids', [1, 2]) - start = config.get('start', datetime(2012, 6, 6, 0)) - delta = config.get('delta', timedelta(minutes = 1)) + def __init__(self, *args, **kwargs): + # We shouldn't get any positional arguments. + assert len(args) == 0 + + # Unpack config dictionary with default values. + self.count = kwargs.get('count', 500) + self.sids = kwargs.get('sids', [1, 2]) + self.start = kwargs.get('start', datetime(2012, 6, 6, 0)) + self.delta = kwargs.get('delta', timedelta(minutes = 1)) + + # Default to None for event_list and filter. + self.event_list = kwargs.get('event_list') + self.filter = kwargs.get('filter') + + # Hash_value for downstream sorting. + self.arg_string = hash_args(*args, **kwargs) - # Default to None for event_list and filter. - event_list = config.get('event_list') - filter = config.get('filter') + def get_hash(self): + return self.__class__.__name__ + "-" + self.arg_string + + def __iter__(self): + + if self.event_list: + unfiltered = (event for event in event_list) - arg_string = hash_args(*args, **config) - namestring = "SpecificEquityTrades" + arg_string - # If we have an event_list, ignore the other arguments and use the list. - # TODO: still append our namestring? - if event_list: - unfiltered = (event for event in event_list) + # Set up iterators for each expected field. + else: + dates = date_gen(count=self.count, + start=self.start, + delta=self.delta + ) + prices = mock_prices(self.count) + volumes = mock_volumes(self.count) + sids = cycle(self.sids) + + # Combine the iterators into a single iterator of arguments + arg_gen = izip(sids, prices, volumes, dates) - # Set up iterators for each expected field. - else: - dates = date_gen(count = count, start = start, delta = delta) - prices = mock_prices(count) - volumes = mock_volumes(count) + # Convert argument packages into events. + unfiltered = (create_trade(*args, source_id = self.get_hash()) + for args in arg_gen) + + # If we specified a sid filter, filter out elements that don't + # match the filter. + if self.filter: + filtered = ifilter(lambda event: event.sid in self.filter, unfiltered) + + # Otherwise just use all events. + else: + filtered = unfiltered + + # Return the filtered event stream. + return filtered + + +# !!!!!!! Deprecated for now !!!!!!!!! + +def RandomEquityTrades(object): + + def __init__(self): + # We shouldn't get any positional args. + assert args == () + + self.count = config.get('count', 500) + self.sids = config.get('sids', [1,2]) + self.filter = config.get('filter') + + dates = fuzzy_dates(count) + prices = mock_prices(count, rand = True) + volumes = mock_volumes(count, rand = True) sids = cycle(sids) - # Combine the iterators into a single iterator of arguments - arg_gen = izip(sids, prices, volumes, dates) - - # Convert argument packages into events. - unfiltered = (create_trade(*args, source_id = namestring) - for args in arg_gen) - - # If we specified a sid filter, filter out elements that don't match the filter. - if filter: - filtered = ifilter(lambda event: event.sid in filter, unfiltered) - - # Otherwise just use all events. - else: - filtered = unfiltered - - # Return the filtered event stream. - return filtered - -def RandomEquityTrades(*args, **config): - # We shouldn't get any positional args. - assert args == () - - count = config.get('count', 500) - sids = config.get('sids', [1,2]) - filter = config.get('filter') - - dates = fuzzy_dates(count) - prices = mock_prices(count, rand = True) - volumes = mock_volumes(count, rand = True) - sids = cycle(sids) - arg_gen = izip(sids, prices, volumes, dates) unfiltered = (create_trade(*args) for args in arg_gen) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 2c12dc6f..04567fac 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -55,37 +55,6 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): for sid in sids: open_orders[sid] = [] - - # Closure to pass into the user's algo to allow placing orders - # into the txn_sim's dict of open orders. - def order(self, sid, amount): - assert sid in sids, "Order on invalid sid: %i" % sid - order = ndict({ - 'dt' : self.current_dt, - 'sid' : sid, - 'amount' : int(amount), - 'filled' : 0 - }) - - # Tell the user if they try to buy 0 shares of something. - if order.amount == 0: - log = "requested to trade zero shares of {sid}".format( - sid=event.sid - ) - log.debug(log) - return - - open_orders[sid].append(event) - - # Set the algo's order method. - algo.set_order(order) - - # Provide a logbook logging interface to user code. - algo.set_logger(logbook.Logger("Algolog")) - - # Call user-defined initialize method before we process any - # events. - algo.initialize() # Pipe the in stream into the transaction simulator. # Creates a txn field on the event containing transaction @@ -113,44 +82,115 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Batch the event stream by dt to be processed by the user's algo. # Yields perf messages whenever it encounters them. - perf_messages = algo_simulator(with_portfolio_and_perf_msg, algo) + perf_messages = algo_simulator(with_portfolio_and_perf_msg, sids, algo, open_orders) + for message in perf_messages: + yield message + + +def algo_simulator(stream_in, sids, algo, order_book): + simulation_dt = None + # Closure to pass into the user's algo to allow placing orders + # into the txn_sim's dict of open orders. + def order(sid, amount): + assert sid in sids, "Order on invalid sid: %i" % sid + order = ndict({ + 'dt' : simulation_dt, + 'sid' : sid, + 'amount' : int(amount), + 'filled' : 0 + }) -def algo_simulator(stream_in, sids, algo): + # Tell the user if they try to buy 0 shares of something. + if order.amount == 0: + log = "requested to trade zero shares of {sid}".format( + sid=event.sid + ) + log.debug(log) + return + + order_book[sid].append(order) + + # Set the algo's order method. + algo.set_order(order) + + # Provide a logbook logging interface to user code. + algo.set_logger(logbook.Logger("Algolog")) + + # Call user-defined initialize method before we process any + # events. + algo.initialize() - current_dt = None universe = ndict() - for sid in sids: - universe[sid] = None + universe[sid] = ndict() universe.portfolio = None + this_snapshot_dt = None - for update in stream_in: - #Yield perf messages to be relayed back to the browser. - if update.perf_message: - yield perf_message - - if current_dt = None: - current_dt = update.dt - - # If this message is newer than the algorithm's simulated dt, - # call handle data on a snapshot of the current algo universe, - # then - if message.dt >= current_dt + last_delta: - start_tic = datetime.now() - algo.handle_data(universe) - stop_tic = datetime.now() - last_delta = datetime - - current_dt = message.dt + last_delta - - batch.data[message.sid] = message - batch.data.portfolio = message.portfolio - + for event in stream_in: + # Yield any perf messages received to be relayed back to the browser. + if event.perf_message: + yield event.perf_message + del event['perf_message'] - + # This should only happen for the first event we run. + if simulation_dt == None: + simulation_dt = event.dt + + # If we are currently creating a new message and this update + # matches the message dt, update the state of the universe. + + if this_snapshot_dt != None: + + if event.dt == this_snapshot_dt: + update_universe(event, universe) + + # If we are constructing a snapshot and we hit a new dt, call + # handle_data and record how long it takes. + else: + start_tic = datetime.now() + algo.handle_data(universe) + stop_tic = datetime.now() + + # How long did you take? + delta = stop_tic - start_tic + + # Update the simulation time. + simulation_dt = this_snapshot_dt + delta + + # Update the universe with the new event. + update_universe(event, universe) + + # If the current event is later than the simulation + # time, update the universe and start constructing + # another snapshot. + if event.dt >= simulation_dt: + this_snapshot_dt = event.dt + else: + this_snapshot_dt = None + # We have been fastforwarding. Update the universe + # and check if we can start a new snapshot. + else: + update_universe(event, universe) + if event.dt >= simulation_dt: + this_snapshot_dt = event.dt + + + + +def update_universe(event, universe): + + universe.portfolio = event.portfolio + del event['portfolio'] + + event_sid = event.sid + del event['sid'] + + for field in event.keys(): + universe[event_sid][field] = event[field] +