From 4deabcdfda16a11b98273bc1b473d5e422c19fc1 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 1 Aug 2012 10:42:55 -0400 Subject: [PATCH 1/2] new world order --- zipline/core/monitor.py | 2 +- zipline/gens/composites.py | 28 +++----- zipline/gens/tradegens.py | 9 +++ zipline/gens/tradesimulation.py | 120 ++++++++++++++++++++++++++++++++ zipline/gens/transform.py | 2 +- zipline/gens/utils.py | 19 +++-- 6 files changed, 155 insertions(+), 25 deletions(-) create mode 100644 zipline/gens/tradesimulation.py diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 1f64cdd9..c305e8ff 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -296,7 +296,7 @@ class Controller(object): # We break out of this loop if the time between # sending and receiving the heartbeat is more # than our poll period. - + if tic - self.ctime > self.period: log.info("heartbeat loop timedout: %s" % (tic - self.ctime)) log.info(repr(self.responses)) diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index 832909ad..66697fa7 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -11,34 +11,26 @@ from zipline.gens.transform import stateful_transform SortBundle = namedtuple("SortBundle", ['source', 'args', 'kwargs']) MergeBundle = namedtuple("MergeBundle", ['stream', 'tnfm', 'args', 'kwargs']) -def date_sorted_sources(sources, source_args, source_kwargs): +def date_sorted_sources(bundles): """ - Takes a list of generator functions, a list of tuples of positional arguments, - and a list of dictionaries of keyword arguments. Packages up all arguments - and passes them into a date_sort. + Takes an iterable of SortBundles, generating namestrings and initialized datasources + for each before piping them into a date_sort. """ - assert len(sources) == len(source_args) == len(source_kwargs) - # Package up sources and arguments. - - # Create a generator of SortBundle objects to be turned into - # namestrings and generator objects. - bundle_gen = starmap(SortBundle, zip(sources, source_args, source_kwargs)) - - # Load the results of the generator into a tuple so that the - # results can be used twice (once in namestring comprehension, - # once in the generator comprehension for intialized sources. - bundles = tuple(bundle_gen) + assert isinstance(bundles, (list, tuple)) + for bundle in bundles: + assert isinstance(bundle, SortBundle) # Calculate namestring hashes to pass to date_sort. names = [bundle.source.__name__ + hash_args(*bundle.args, **bundle.kwargs) for bundle in bundles] + # Pass each source its arguments. initialized = [bundle.source(*bundle.args, **bundle.kwargs) - for bundle in bundles] - + for bundle in bundles] + # Convert the list of generators into a flat stream by pulling # one element at a time from each. - stream_in = roundrobin(*initialized) + stream_in = roundrobin(initialized, names) # Guarantee the flat stream will be sorted by date, using source_id as # tie-breaker, which is fully deterministic (given deterministic string diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index c7ee74f8..a24cbe58 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -54,6 +54,15 @@ def SpecificEquityTrades(*args, **config): Yields all events in event_list that match the given sid_filter. If no event_list is specified, generates an internal stream of events to filter. Returns all events if filter is None. + + Configuration options: + + count: integer representing number of trades + sids : list of values representing simulated internal sids + start: start date + delta: timedelta between internal events + + """ # We shouldn't get any positional arguments. assert args == () diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py new file mode 100644 index 00000000..65523a41 --- /dev/null +++ b/zipline/gens/tradesimulation.py @@ -0,0 +1,120 @@ +from numbers import Integral + +from zipline.gens import stateful_transform +from zipline.finance.trading import TransactionSimulator +from zipline.finance.performance import PerformanceTracker + +def trade_simulation_client(stream_in, algo, environment, sim_style): + """ + Generator that takes the expected output of a merge, a user + algorithm, a trading environment, and a simulator style as + arguments. Pipes the merge stream through a TransactionSimulator + and a PerformanceTracker, which keep track of the current state of + our algorithm's simulated universe. Results are fed to the user's + algorithm, which directly inserts transactions into the + TransactionSimulator's order book. + + TransactionSimulator maintains a dictionary from sids to the + unfulfilled orders placed by the user's algorithm. As trade + events arrive, if the algorithm has open orders against the + trade's sid, the simulator will fill orders up to 25% of market + cap. Applied transactions are added to a txn field on the event + and forwarded to PerformanceTracker. The txn field is set to None + on non-trade events and events that do not match any open orders. + + PerformanceTracker receives the updated event messages from + TransactionSimulator, maintaining a set of daily and cumulative + performance metrics for the algorithm. The tracker removes the + txn field from each event it receives, replacing it with a + portfolio field to be fed into the user algo. At the end of each + trading day, the PerformanceTracker also generates a daily + performance report, which is appended to event's perf_report + field. + + Fully processed events are run through a batcher generator, which + batches together events with the same dt field into a single event + to be fed to the algo. The portfolio object is repeatedly + overwritten so that only the most recent snapshot of the universe + is sent to the algo. + """ + + #============ + # Algo Setup + #============ + + # Initialize txn_sim's dictionary of orders here so that we can + # reference it from within the user's algorithm. + sids = algo.get_sid_filter() + open_orders = {} + + for sid in sids: + open_orders[sids] = [] + + # Closure to pass into the user's algo to allow placing orders + # into the txn_sim's dict of open orders. + def order(self, sid, amount): + assert sid in sids, "Order on invalid sid: %i" % sid + order = zp.ndict({ + 'dt' : self.current_dt, + 'sid' : sid, + 'amount' : int(amount) + 'filled' : 0 + }) + + # Tell the user if they try to buy 0 shares of something. + if order.amount == 0: + log = "requested to trade zero shares of {sid}".format( + sid=event.sid + ) + log.debug(log) + return + + open_orders[sid].append(event) + + # Set the algo's order method. + algo.set_order(order) + + # Provide a logbook logging interface to user code. + algo.set_logger(Logger("Algolog")) + + # Call user-defined initialize method before we process any + # events. + algo.initialize() + + # Pipe the in stream into the transaction simulator. + # Creates a TRANSACTION field on the event containing transaction + # information if we filled any pending orders on the event's sid. + # TRANSACTION is None if we didn't fill any orders. + with_txns = stateful_transform(stream_in, + TransactionSimulator, + open_orders, + style = sim_style) + + + # Pipe the events with transactions to perf. This will remove the + # TRANSACTION field added by TransactionSimulator and replace it with + # a portfolio object to be passed to the user's algorithm. Also adds + # a PERF_MESSAGE field which is usually none, but contains an update + # message once per day. + with_portfolio_and_perf_msg = stateful_transform(stream_with_txns, + PerformanceTracker, + trading_environment, + sids) + + # Batch the event stream by dt to be processed by the user's algo. + # Will also set the PERF_MESSAGE field if the batch contains a perf + # message. + + batches = batcher(with_portfolio_and_perf_msg) + + for batch in batches: + algo.handle_data(batch.data) + if batch.perf_message: + yield perf_message + + + + + + + diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index a03a841a..64c817ca 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -43,7 +43,7 @@ def functional_transform(stream_in, func, *args, **kwargs): def stateful_transform(stream_in, tnfm_class, *args, **kwargs): """ Generic transform generator that takes each message from an in-stream - and sorts it to a state class. For each call to update, the state + and passes it to a state class. For each call to update, the state class must produce a message to be fed downstream. """ diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index e2f859cb..209c98b0 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -27,15 +27,24 @@ def alternate(g1, g2): if e2 != None: yield e2 -def roundrobin(*args): +def roundrobin(sources, namestrings): """ Takes N generators, pulling one element off each until all inputs are empty. """ - for elem_tuple in izip_longest(*args): - for value in elem_tuple: - if value != None: - yield value + assert len(sources) == len(namestrings) + mapping = OrderedDict(zip(namestrings, sources)) + + # While our generators have not been exhausted, pull elements + while mapping != []: + for namestring, source in mapping: + try: + message = source.next() + yield message + except StopIteration: + yield done_message(namestring) + del mapping(namestring) + def hash_args(*args, **kwargs): From 6cb3516b6b2a194750232efac3725b9a4be016c9 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 1 Aug 2012 11:12:09 -0400 Subject: [PATCH 2/2] save for attempted merge --- zipline/finance/performance.py | 15 +++++++------ zipline/finance/trading.py | 35 ++++++++---------------------- zipline/gens/examples.py | 38 +++++++++++++++++++++++++++++++++ zipline/gens/tradesimulation.py | 20 ++++++++++------- zipline/gens/zmq_gens.py | 8 ++++--- 5 files changed, 73 insertions(+), 43 deletions(-) create mode 100644 zipline/gens/examples.py diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index fdca878d..a96f5d98 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -144,7 +144,7 @@ class PerformanceTracker(object): """ - def __init__(self, trading_environment): + def __init__(self, trading_environment, sid_list): self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours = 6, minutes = 30) @@ -164,7 +164,6 @@ class PerformanceTracker(object): self.txn_count = 0 self.event_count = 0 self.last_dict = None - self.order_log = [] self.exceeded_max_loss = False self.results_socket = None @@ -198,9 +197,14 @@ class PerformanceTracker(object): keep_transactions = True ) - def set_sids(self, sid_list): for sid in sid_list: self.cumulative_performance.positions[sid] = Position(sid) + self.daily_performance.positions[sid] = Position(sid) + + def update(self, event): + event.perf_message = self.process_event() + event.portfolio = self.get_portfolio + return event def get_portfolio(self): return self.cumulative_performance.as_portfolio() @@ -238,8 +242,6 @@ class PerformanceTracker(object): 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict() } - def log_order(self, order): - self.order_log.append(order) def process_event(self, event): @@ -288,6 +290,8 @@ class PerformanceTracker(object): # calculate progress of test self.progress = self.day_count / self.total_days + # TODO!!!! + # Output results if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) @@ -584,7 +588,6 @@ class PerformancePeriod(object): return positions - # def get_positions_list(self): positions = [] for sid, pos in self.positions.iteritems(): diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index bf3a5374..9cae6e72 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -10,9 +10,8 @@ log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): - def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): - self.open_orders = {} - self.order_count = 0 + def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME): + self.open_orders = open_orders self.txn_count = 0 self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) @@ -27,28 +26,12 @@ class TransactionSimulator(object): elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop - def add_open_order(self, event): - # Orders are captured in a buffer by sid. No calculations are done here. - # Amount is explicitly converted to an int. - # Orders of amount zero are ignored. - - self.order_count += 1 - event.amount = int(event.amount) - - if event.amount == 0: - log = "requested to trade zero shares of {sid}".format( - sid=event.sid - ) - log.debug(log) - return - - if not self.open_orders.has_key(event.sid): - self.open_orders[event.sid] = [] - - # set the filled property to zero - event.filled = 0 - self.open_orders[event.sid].append(event) - + def update(self, event): + event.txn = None + if event.type == zp.DATASOURCE_TYPE.TRADE: + event.txn = self.apply_trade_to_open_orders(event) + return event + def simulate_buy_all(self, event): txn = self.create_transaction( event.sid, @@ -81,7 +64,7 @@ class TransactionSimulator(object): txn = self.create_transaction( event.sid, amount, - event.price + 0.10, + event.price + 0.10, # Magic constant? event.dt, direction ) diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py new file mode 100644 index 00000000..d9051b10 --- /dev/null +++ b/zipline/gens/examples.py @@ -0,0 +1,38 @@ +from zipline.gens.composites import + +if __name__ == "__main__": + + filter = [1,2,3,4] + #Set up source a. One hour between events. + args_a = tuple() + kwargs_a = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(minutes = ), + 'filter' : filter + } + #Set up source b. One day between events. + args_b = tuple() + kwargs_b = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(days = 1), + 'filter' : filter + } + #Set up source c. One minute between events. + args_c = tuple() + kwargs_c = {'sids' : [1,2,3,4], + 'start' : datetime(2012,6,6,0), + 'delta' : timedelta(minutes = 1), + 'filter' : filter + } + + sources = (SpecificEquityTrades,) * 4 + source_args = (args_a, args_b, args_c, args_d) + source_kwargs = (kwargs_a, kwargs_b, kwargs_c, kwargs_d) + + # Generate our expected source_ids. + zip_args = zip(source_args, source_kwargs) + expected_ids = ["SpecificEquityTrades" + hash_args(*args, **kwargs) + for args, kwargs in zip_args] + + # Pipe our sources into sort. + sort_out = date_sorted_sources(sources, source_args, source_kwargs) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 65523a41..7f0a30eb 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -85,10 +85,12 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # Creates a TRANSACTION field on the event containing transaction # information if we filled any pending orders on the event's sid. # TRANSACTION is None if we didn't fill any orders. - with_txns = stateful_transform(stream_in, - TransactionSimulator, - open_orders, - style = sim_style) + with_txns = stateful_transform( + stream_in, + TransactionSimulator, + open_orders, + style = sim_style + ) # Pipe the events with transactions to perf. This will remove the @@ -96,10 +98,12 @@ def trade_simulation_client(stream_in, algo, environment, sim_style): # a portfolio object to be passed to the user's algorithm. Also adds # a PERF_MESSAGE field which is usually none, but contains an update # message once per day. - with_portfolio_and_perf_msg = stateful_transform(stream_with_txns, - PerformanceTracker, - trading_environment, - sids) + with_portfolio_and_perf_msg = stateful_transform( + stream_with_txns, + PerformanceTracker, + trading_environment, + sids + ) # Batch the event stream by dt to be processed by the user's algo. # Will also set the PERF_MESSAGE field if the batch contains a perf diff --git a/zipline/gens/zmq_gens.py b/zipline/gens/zmq_gens.py index 524852a7..e60dae2b 100644 --- a/zipline/gens/zmq_gens.py +++ b/zipline/gens/zmq_gens.py @@ -2,15 +2,17 @@ import zmq import zipline.protocol as zp -def gen_from_zmq(poller, unframe): +def gen_from_zmq(poller, unframe, namestring): """ A generator that takes an initialized zmq poller and yields messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE. """ while True: message = poller.recv() - if message = zp.CONTROL_PROTOCOL.DONE: - yield "DONE" + # Done protocol should now be a message type so that + # done messages can also have source_ids. + if message.type == zp.CONTROL_PROTOCOL.DONE: + yield done_message(message.source_id) break else: yield unframe(message)