new world order

This commit is contained in:
scottsanderson
2012-08-01 10:42:55 -04:00
parent 5c62cce627
commit 4deabcdfda
6 changed files with 155 additions and 25 deletions
+1 -1
View File
@@ -296,7 +296,7 @@ class Controller(object):
# We break out of this loop if the time between
# sending and receiving the heartbeat is more
# than our poll period.
if tic - self.ctime > self.period:
log.info("heartbeat loop timedout: %s" % (tic - self.ctime))
log.info(repr(self.responses))
+10 -18
View File
@@ -11,34 +11,26 @@ from zipline.gens.transform import stateful_transform
SortBundle = namedtuple("SortBundle", ['source', 'args', 'kwargs'])
MergeBundle = namedtuple("MergeBundle", ['stream', 'tnfm', 'args', 'kwargs'])
def date_sorted_sources(sources, source_args, source_kwargs):
def date_sorted_sources(bundles):
"""
Takes a list of generator functions, a list of tuples of positional arguments,
and a list of dictionaries of keyword arguments. Packages up all arguments
and passes them into a date_sort.
Takes an iterable of SortBundles, generating namestrings and initialized datasources
for each before piping them into a date_sort.
"""
assert len(sources) == len(source_args) == len(source_kwargs)
# Package up sources and arguments.
# Create a generator of SortBundle objects to be turned into
# namestrings and generator objects.
bundle_gen = starmap(SortBundle, zip(sources, source_args, source_kwargs))
# Load the results of the generator into a tuple so that the
# results can be used twice (once in namestring comprehension,
# once in the generator comprehension for intialized sources.
bundles = tuple(bundle_gen)
assert isinstance(bundles, (list, tuple))
for bundle in bundles:
assert isinstance(bundle, SortBundle)
# Calculate namestring hashes to pass to date_sort.
names = [bundle.source.__name__ + hash_args(*bundle.args, **bundle.kwargs)
for bundle in bundles]
# Pass each source its arguments.
initialized = [bundle.source(*bundle.args, **bundle.kwargs)
for bundle in bundles]
for bundle in bundles]
# Convert the list of generators into a flat stream by pulling
# one element at a time from each.
stream_in = roundrobin(*initialized)
stream_in = roundrobin(initialized, names)
# Guarantee the flat stream will be sorted by date, using source_id as
# tie-breaker, which is fully deterministic (given deterministic string
+9
View File
@@ -54,6 +54,15 @@ def SpecificEquityTrades(*args, **config):
Yields all events in event_list that match the given sid_filter.
If no event_list is specified, generates an internal stream of events
to filter. Returns all events if filter is None.
Configuration options:
count: integer representing number of trades
sids : list of values representing simulated internal sids
start: start date
delta: timedelta between internal events
"""
# We shouldn't get any positional arguments.
assert args == ()
+120
View File
@@ -0,0 +1,120 @@
from numbers import Integral
from zipline.gens import stateful_transform
from zipline.finance.trading import TransactionSimulator
from zipline.finance.performance import PerformanceTracker
def trade_simulation_client(stream_in, algo, environment, sim_style):
"""
Generator that takes the expected output of a merge, a user
algorithm, a trading environment, and a simulator style as
arguments. Pipes the merge stream through a TransactionSimulator
and a PerformanceTracker, which keep track of the current state of
our algorithm's simulated universe. Results are fed to the user's
algorithm, which directly inserts transactions into the
TransactionSimulator's order book.
TransactionSimulator maintains a dictionary from sids to the
unfulfilled orders placed by the user's algorithm. As trade
events arrive, if the algorithm has open orders against the
trade's sid, the simulator will fill orders up to 25% of market
cap. Applied transactions are added to a txn field on the event
and forwarded to PerformanceTracker. The txn field is set to None
on non-trade events and events that do not match any open orders.
PerformanceTracker receives the updated event messages from
TransactionSimulator, maintaining a set of daily and cumulative
performance metrics for the algorithm. The tracker removes the
txn field from each event it receives, replacing it with a
portfolio field to be fed into the user algo. At the end of each
trading day, the PerformanceTracker also generates a daily
performance report, which is appended to event's perf_report
field.
Fully processed events are run through a batcher generator, which
batches together events with the same dt field into a single event
to be fed to the algo. The portfolio object is repeatedly
overwritten so that only the most recent snapshot of the universe
is sent to the algo.
"""
#============
# Algo Setup
#============
# Initialize txn_sim's dictionary of orders here so that we can
# reference it from within the user's algorithm.
sids = algo.get_sid_filter()
open_orders = {}
for sid in sids:
open_orders[sids] = []
# Closure to pass into the user's algo to allow placing orders
# into the txn_sim's dict of open orders.
def order(self, sid, amount):
assert sid in sids, "Order on invalid sid: %i" % sid
order = zp.ndict({
'dt' : self.current_dt,
'sid' : sid,
'amount' : int(amount)
'filled' : 0
})
# Tell the user if they try to buy 0 shares of something.
if order.amount == 0:
log = "requested to trade zero shares of {sid}".format(
sid=event.sid
)
log.debug(log)
return
open_orders[sid].append(event)
# Set the algo's order method.
algo.set_order(order)
# Provide a logbook logging interface to user code.
algo.set_logger(Logger("Algolog"))
# Call user-defined initialize method before we process any
# events.
algo.initialize()
# Pipe the in stream into the transaction simulator.
# Creates a TRANSACTION field on the event containing transaction
# information if we filled any pending orders on the event's sid.
# TRANSACTION is None if we didn't fill any orders.
with_txns = stateful_transform(stream_in,
TransactionSimulator,
open_orders,
style = sim_style)
# Pipe the events with transactions to perf. This will remove the
# TRANSACTION field added by TransactionSimulator and replace it with
# a portfolio object to be passed to the user's algorithm. Also adds
# a PERF_MESSAGE field which is usually none, but contains an update
# message once per day.
with_portfolio_and_perf_msg = stateful_transform(stream_with_txns,
PerformanceTracker,
trading_environment,
sids)
# Batch the event stream by dt to be processed by the user's algo.
# Will also set the PERF_MESSAGE field if the batch contains a perf
# message.
batches = batcher(with_portfolio_and_perf_msg)
for batch in batches:
algo.handle_data(batch.data)
if batch.perf_message:
yield perf_message
+1 -1
View File
@@ -43,7 +43,7 @@ def functional_transform(stream_in, func, *args, **kwargs):
def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
"""
Generic transform generator that takes each message from an in-stream
and sorts it to a state class. For each call to update, the state
and passes it to a state class. For each call to update, the state
class must produce a message to be fed downstream.
"""
+14 -5
View File
@@ -27,15 +27,24 @@ def alternate(g1, g2):
if e2 != None:
yield e2
def roundrobin(*args):
def roundrobin(sources, namestrings):
"""
Takes N generators, pulling one element off each until all inputs
are empty.
"""
for elem_tuple in izip_longest(*args):
for value in elem_tuple:
if value != None:
yield value
assert len(sources) == len(namestrings)
mapping = OrderedDict(zip(namestrings, sources))
# While our generators have not been exhausted, pull elements
while mapping != []:
for namestring, source in mapping:
try:
message = source.next()
yield message
except StopIteration:
yield done_message(namestring)
del mapping(namestring)
def hash_args(*args, **kwargs):