commit for fawce

This commit is contained in:
scottsanderson
2012-08-01 21:42:55 -04:00
parent 9e1a5c11cb
commit 14067d8323
7 changed files with 71 additions and 44 deletions
+5 -2
View File
@@ -133,6 +133,7 @@ import zipline.finance.risk as risk
log = logbook.Logger('Performance')
class PerformanceTracker(object):
UPDATER = True
"""
Tracks the performance of the zipline as it is running in
the simulator, relays this out to the Deluge broker and then
@@ -202,8 +203,10 @@ class PerformanceTracker(object):
self.todays_performance.positions[sid] = Position(sid)
def update(self, event):
event.perf_message = self.process_event()
event.portfolio = self.get_portfolio
import nose.tools; nose.tools.set_trace()
event.perf_message = self.process_event(event)
event.portfolio = self.get_portfolio()
del event['TRANSACTION']
return event
def get_portfolio(self):
+3 -3
View File
@@ -9,7 +9,7 @@ from zipline.protocol import SIMULATION_STYLE
log = logbook.Logger('Transaction Simulator')
class TransactionSimulator(object):
FORWARDER = True
UPDATER = True
def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME):
self.open_orders = open_orders
@@ -28,9 +28,9 @@ class TransactionSimulator(object):
self.apply_trade_to_open_orders = self.simulate_noop
def update(self, event):
event.txn = None
event.TRANSACTION = None
if event.type == zp.DATASOURCE_TYPE.TRADE:
event.txn = self.apply_trade_to_open_orders(event)
event.TRANSACTION = self.apply_trade_to_open_orders(event)
return event
def simulate_buy_all(self, event):
+11 -10
View File
@@ -1,3 +1,4 @@
import pytz
from datetime import datetime, timedelta
from zipline.utils.factory import create_trading_environment
@@ -17,8 +18,8 @@ if __name__ == "__main__":
#Set up source a. One minute between events.
args_a = tuple()
kwargs_a = {
'sids' : [1,2],
'start' : datetime(2012,6,6,0),
'sids' : [1],
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
'delta' : timedelta(minutes = 1),
'filter' : filter
}
@@ -27,9 +28,9 @@ if __name__ == "__main__":
#Set up source b. Two minutes between events.
args_b = tuple()
kwargs_b = {
'sids' : [2,3],
'start' : datetime(2012,6,6,0),
'delta' : timedelta(minutes = 2),
'sids' : [2],
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
'delta' : timedelta(minutes = 1),
'filter' : filter
}
bundle_b = SourceBundle(SpecificEquityTrades, args_b, kwargs_b)
@@ -37,9 +38,9 @@ if __name__ == "__main__":
#Set up source c. Three minutes between events.
args_c = tuple()
kwargs_c = {
'sids' : [3,4],
'start' : datetime(2012,6,6,0),
'delta' : timedelta(minutes = 3),
'sids' : [3],
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
'delta' : timedelta(minutes = 1),
'filter' : filter
}
bundle_c = SourceBundle(SpecificEquityTrades, args_c, kwargs_c)
@@ -58,9 +59,9 @@ if __name__ == "__main__":
# print message
algo = TestAlgorithm(2, 100, 100)
environment = create_trading_environment()
environment = create_trading_environment(year = 2012)
style = zp.SIMULATION_STYLE.PARTIAL_VOLUME
client_out = tsc(merge_out, algo, environment, style)
client_out.next()
+1 -1
View File
@@ -19,7 +19,7 @@ def merge(stream_in, tnfm_ids):
"""
assert isinstance(tnfm_ids, list)
# Set up an internal queue for each expected source.
tnfms = {}
for id in tnfm_ids:
+1 -1
View File
@@ -9,7 +9,7 @@ from datetime import datetime, timedelta
from zipline.utils.factory import create_trade
from zipline.gens.utils import hash_args, mock_done
def date_gen(start = datetime(2012, 6, 6, 0),
def date_gen(start = datetime(2006, 6, 6, 12),
delta = timedelta(minutes = 1),
count = 100):
"""
+41 -11
View File
@@ -1,5 +1,6 @@
import logbook
from datetime import datetime, timedelta
from numbers import Integral
from zipline import ndict
@@ -75,7 +76,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
return
open_orders[sid].append(event)
# Set the algo's order method.
algo.set_order(order)
@@ -85,7 +86,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
# Call user-defined initialize method before we process any
# events.
algo.initialize()
# Pipe the in stream into the transaction simulator.
# Creates a txn field on the event containing transaction
# information if we filled any pending orders on the event's sid.
@@ -111,16 +112,45 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
)
# Batch the event stream by dt to be processed by the user's algo.
# Will also set the PERF_MESSAGE field if the batch contains a perf
# message.
# Yields perf messages whenever it encounters them.
perf_messages = algo_simulator(with_portfolio_and_perf_msg, algo)
def batcher(stream):
for msg in stream:
yield msg
batches = batcher(with_portfolio_and_perf_msg)
for batch in batches:
algo.handle_data(batch.data)
if batch.perf_message:
def algo_simulator(stream_in, sids, algo):
current_dt = None
universe = ndict()
for sid in sids:
universe[sid] = None
universe.portfolio = None
for update in stream_in:
#Yield perf messages to be relayed back to the browser.
if update.perf_message:
yield perf_message
if current_dt = None:
current_dt = update.dt
# If this message is newer than the algorithm's simulated dt,
# call handle data on a snapshot of the current algo universe,
# then
if message.dt >= current_dt + last_delta:
start_tic = datetime.now()
algo.handle_data(universe)
stop_tic = datetime.now()
last_delta = datetime
current_dt = message.dt + last_delta
batch.data[message.sid] = message
batch.data.portfolio = message.portfolio
+9 -16
View File
@@ -49,6 +49,7 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
are forwarded.
"""
forward_all_fields = tnfm_class.__dict__.get('FORWARDER', False)
update_in_place = tnfm_class.__dict__.get('UPDATER', False)
assert isinstance(tnfm_class, (types.ObjectType, types.ClassType)), \
"Stateful transform requires a class."
@@ -72,29 +73,21 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
tnfm_value = state.update(deepcopy(message_copy))
# If we want to keep all original values, plus append tnfm_id
# and tnfm_value.
# and tnfm_value. Used for Passthrough.
if forward_all_fields:
out_message = message_copy
out_message.tnfm_id = namestring
out_message.tnfm_value = tnfm_value
yield out_message
# Special logic for TransactionSimulator and
# PerformanceTracker. This is ugly but I want to get to
# testing faster. Should be refactored later to something
# that doesn't make Scott cry.
elif tnfm_class.__name__ == 'TransactionSimulator':
out_message = message_copy
out_message.txn = tnfm_value
yield out_message
# Our expectation is that the transform simply updated the
# message it was passed. Useful for chaining together
# multiple transforms, e.g. TransactionSimulator/PerformanceTracker.
elif update_in_place:
yield tnfm_value
elif tnfm_class.__name__ == 'PerformanceTracker':
out_message = message_copy
del out_message['txn']
out_message.portfolio = tnfm_value
yield out_message
# Otherwise send tnfm_id, tnfm_value, and the message date.
# Otherwise send tnfm_id, tnfm_value, and the message
# date. Useful for transforms being piped to a merge.
else:
out_message = ndict()
out_message.tnfm_id = namestring