diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 1f64cdd9..c305e8ff 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -296,7 +296,7 @@ class Controller(object): # We break out of this loop if the time between # sending and receiving the heartbeat is more # than our poll period. - + if tic - self.ctime > self.period: log.info("heartbeat loop timedout: %s" % (tic - self.ctime)) log.info(repr(self.responses)) diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index 832909ad..66697fa7 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -11,34 +11,26 @@ from zipline.gens.transform import stateful_transform SortBundle = namedtuple("SortBundle", ['source', 'args', 'kwargs']) MergeBundle = namedtuple("MergeBundle", ['stream', 'tnfm', 'args', 'kwargs']) -def date_sorted_sources(sources, source_args, source_kwargs): +def date_sorted_sources(bundles): """ - Takes a list of generator functions, a list of tuples of positional arguments, - and a list of dictionaries of keyword arguments. Packages up all arguments - and passes them into a date_sort. + Takes an iterable of SortBundles, generating namestrings and initialized datasources + for each before piping them into a date_sort. """ - assert len(sources) == len(source_args) == len(source_kwargs) - # Package up sources and arguments. - - # Create a generator of SortBundle objects to be turned into - # namestrings and generator objects. - bundle_gen = starmap(SortBundle, zip(sources, source_args, source_kwargs)) - - # Load the results of the generator into a tuple so that the - # results can be used twice (once in namestring comprehension, - # once in the generator comprehension for intialized sources. - bundles = tuple(bundle_gen) + assert isinstance(bundles, (list, tuple)) + for bundle in bundles: + assert isinstance(bundle, SortBundle) # Calculate namestring hashes to pass to date_sort. names = [bundle.source.__name__ + hash_args(*bundle.args, **bundle.kwargs) for bundle in bundles] + # Pass each source its arguments. initialized = [bundle.source(*bundle.args, **bundle.kwargs) - for bundle in bundles] - + for bundle in bundles] + # Convert the list of generators into a flat stream by pulling # one element at a time from each. - stream_in = roundrobin(*initialized) + stream_in = roundrobin(initialized, names) # Guarantee the flat stream will be sorted by date, using source_id as # tie-breaker, which is fully deterministic (given deterministic string diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index c7ee74f8..a24cbe58 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -54,6 +54,15 @@ def SpecificEquityTrades(*args, **config): Yields all events in event_list that match the given sid_filter. If no event_list is specified, generates an internal stream of events to filter. Returns all events if filter is None. + + Configuration options: + + count: integer representing number of trades + sids : list of values representing simulated internal sids + start: start date + delta: timedelta between internal events + + """ # We shouldn't get any positional arguments. assert args == () diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py new file mode 100644 index 00000000..65523a41 --- /dev/null +++ b/zipline/gens/tradesimulation.py @@ -0,0 +1,120 @@ +from numbers import Integral + +from zipline.gens import stateful_transform +from zipline.finance.trading import TransactionSimulator +from zipline.finance.performance import PerformanceTracker + +def trade_simulation_client(stream_in, algo, environment, sim_style): + """ + Generator that takes the expected output of a merge, a user + algorithm, a trading environment, and a simulator style as + arguments. Pipes the merge stream through a TransactionSimulator + and a PerformanceTracker, which keep track of the current state of + our algorithm's simulated universe. Results are fed to the user's + algorithm, which directly inserts transactions into the + TransactionSimulator's order book. + + TransactionSimulator maintains a dictionary from sids to the + unfulfilled orders placed by the user's algorithm. As trade + events arrive, if the algorithm has open orders against the + trade's sid, the simulator will fill orders up to 25% of market + cap. Applied transactions are added to a txn field on the event + and forwarded to PerformanceTracker. The txn field is set to None + on non-trade events and events that do not match any open orders. + + PerformanceTracker receives the updated event messages from + TransactionSimulator, maintaining a set of daily and cumulative + performance metrics for the algorithm. The tracker removes the + txn field from each event it receives, replacing it with a + portfolio field to be fed into the user algo. At the end of each + trading day, the PerformanceTracker also generates a daily + performance report, which is appended to event's perf_report + field. + + Fully processed events are run through a batcher generator, which + batches together events with the same dt field into a single event + to be fed to the algo. The portfolio object is repeatedly + overwritten so that only the most recent snapshot of the universe + is sent to the algo. + """ + + #============ + # Algo Setup + #============ + + # Initialize txn_sim's dictionary of orders here so that we can + # reference it from within the user's algorithm. + sids = algo.get_sid_filter() + open_orders = {} + + for sid in sids: + open_orders[sids] = [] + + # Closure to pass into the user's algo to allow placing orders + # into the txn_sim's dict of open orders. + def order(self, sid, amount): + assert sid in sids, "Order on invalid sid: %i" % sid + order = zp.ndict({ + 'dt' : self.current_dt, + 'sid' : sid, + 'amount' : int(amount) + 'filled' : 0 + }) + + # Tell the user if they try to buy 0 shares of something. + if order.amount == 0: + log = "requested to trade zero shares of {sid}".format( + sid=event.sid + ) + log.debug(log) + return + + open_orders[sid].append(event) + + # Set the algo's order method. + algo.set_order(order) + + # Provide a logbook logging interface to user code. + algo.set_logger(Logger("Algolog")) + + # Call user-defined initialize method before we process any + # events. + algo.initialize() + + # Pipe the in stream into the transaction simulator. + # Creates a TRANSACTION field on the event containing transaction + # information if we filled any pending orders on the event's sid. + # TRANSACTION is None if we didn't fill any orders. + with_txns = stateful_transform(stream_in, + TransactionSimulator, + open_orders, + style = sim_style) + + + # Pipe the events with transactions to perf. This will remove the + # TRANSACTION field added by TransactionSimulator and replace it with + # a portfolio object to be passed to the user's algorithm. Also adds + # a PERF_MESSAGE field which is usually none, but contains an update + # message once per day. + with_portfolio_and_perf_msg = stateful_transform(stream_with_txns, + PerformanceTracker, + trading_environment, + sids) + + # Batch the event stream by dt to be processed by the user's algo. + # Will also set the PERF_MESSAGE field if the batch contains a perf + # message. + + batches = batcher(with_portfolio_and_perf_msg) + + for batch in batches: + algo.handle_data(batch.data) + if batch.perf_message: + yield perf_message + + + + + + + diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index a03a841a..64c817ca 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -43,7 +43,7 @@ def functional_transform(stream_in, func, *args, **kwargs): def stateful_transform(stream_in, tnfm_class, *args, **kwargs): """ Generic transform generator that takes each message from an in-stream - and sorts it to a state class. For each call to update, the state + and passes it to a state class. For each call to update, the state class must produce a message to be fed downstream. """ diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index e2f859cb..209c98b0 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -27,15 +27,24 @@ def alternate(g1, g2): if e2 != None: yield e2 -def roundrobin(*args): +def roundrobin(sources, namestrings): """ Takes N generators, pulling one element off each until all inputs are empty. """ - for elem_tuple in izip_longest(*args): - for value in elem_tuple: - if value != None: - yield value + assert len(sources) == len(namestrings) + mapping = OrderedDict(zip(namestrings, sources)) + + # While our generators have not been exhausted, pull elements + while mapping != []: + for namestring, source in mapping: + try: + message = source.next() + yield message + except StopIteration: + yield done_message(namestring) + del mapping(namestring) + def hash_args(*args, **kwargs):