mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-05 17:53:17 +08:00
Merge branch 'new_world_order' of github.com:quantopian/zipline into new_world_order
This commit is contained in:
@@ -133,6 +133,7 @@ import zipline.finance.risk as risk
|
||||
log = logbook.Logger('Performance')
|
||||
|
||||
class PerformanceTracker(object):
|
||||
UPDATER = True
|
||||
"""
|
||||
Tracks the performance of the zipline as it is running in
|
||||
the simulator, relays this out to the Deluge broker and then
|
||||
@@ -202,8 +203,10 @@ class PerformanceTracker(object):
|
||||
self.todays_performance.positions[sid] = Position(sid)
|
||||
|
||||
def update(self, event):
|
||||
event.perf_message = self.process_event()
|
||||
event.portfolio = self.get_portfolio
|
||||
import nose.tools; nose.tools.set_trace()
|
||||
event.perf_message = self.process_event(event)
|
||||
event.portfolio = self.get_portfolio()
|
||||
del event['TRANSACTION']
|
||||
return event
|
||||
|
||||
def get_portfolio(self):
|
||||
|
||||
@@ -9,7 +9,7 @@ from zipline.protocol import SIMULATION_STYLE
|
||||
log = logbook.Logger('Transaction Simulator')
|
||||
|
||||
class TransactionSimulator(object):
|
||||
FORWARDER = True
|
||||
UPDATER = True
|
||||
|
||||
def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME):
|
||||
self.open_orders = open_orders
|
||||
@@ -28,9 +28,9 @@ class TransactionSimulator(object):
|
||||
self.apply_trade_to_open_orders = self.simulate_noop
|
||||
|
||||
def update(self, event):
|
||||
event.txn = None
|
||||
event.TRANSACTION = None
|
||||
if event.type == zp.DATASOURCE_TYPE.TRADE:
|
||||
event.txn = self.apply_trade_to_open_orders(event)
|
||||
event.TRANSACTION = self.apply_trade_to_open_orders(event)
|
||||
return event
|
||||
|
||||
def simulate_buy_all(self, event):
|
||||
|
||||
+11
-10
@@ -1,3 +1,4 @@
|
||||
import pytz
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from zipline.utils.factory import create_trading_environment
|
||||
@@ -17,8 +18,8 @@ if __name__ == "__main__":
|
||||
#Set up source a. One minute between events.
|
||||
args_a = tuple()
|
||||
kwargs_a = {
|
||||
'sids' : [1,2],
|
||||
'start' : datetime(2012,6,6,0),
|
||||
'sids' : [1],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 1),
|
||||
'filter' : filter
|
||||
}
|
||||
@@ -27,9 +28,9 @@ if __name__ == "__main__":
|
||||
#Set up source b. Two minutes between events.
|
||||
args_b = tuple()
|
||||
kwargs_b = {
|
||||
'sids' : [2,3],
|
||||
'start' : datetime(2012,6,6,0),
|
||||
'delta' : timedelta(minutes = 2),
|
||||
'sids' : [2],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 1),
|
||||
'filter' : filter
|
||||
}
|
||||
bundle_b = SourceBundle(SpecificEquityTrades, args_b, kwargs_b)
|
||||
@@ -37,9 +38,9 @@ if __name__ == "__main__":
|
||||
#Set up source c. Three minutes between events.
|
||||
args_c = tuple()
|
||||
kwargs_c = {
|
||||
'sids' : [3,4],
|
||||
'start' : datetime(2012,6,6,0),
|
||||
'delta' : timedelta(minutes = 3),
|
||||
'sids' : [3],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 1),
|
||||
'filter' : filter
|
||||
}
|
||||
bundle_c = SourceBundle(SpecificEquityTrades, args_c, kwargs_c)
|
||||
@@ -58,9 +59,9 @@ if __name__ == "__main__":
|
||||
# print message
|
||||
|
||||
algo = TestAlgorithm(2, 100, 100)
|
||||
environment = create_trading_environment()
|
||||
environment = create_trading_environment(year = 2012)
|
||||
style = zp.SIMULATION_STYLE.PARTIAL_VOLUME
|
||||
|
||||
client_out = tsc(merge_out, algo, environment, style)
|
||||
|
||||
client_out.next()
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ def merge(stream_in, tnfm_ids):
|
||||
"""
|
||||
|
||||
assert isinstance(tnfm_ids, list)
|
||||
|
||||
|
||||
# Set up an internal queue for each expected source.
|
||||
tnfms = {}
|
||||
for id in tnfm_ids:
|
||||
|
||||
@@ -9,7 +9,7 @@ from datetime import datetime, timedelta
|
||||
from zipline.utils.factory import create_trade
|
||||
from zipline.gens.utils import hash_args, mock_done
|
||||
|
||||
def date_gen(start = datetime(2012, 6, 6, 0),
|
||||
def date_gen(start = datetime(2006, 6, 6, 12),
|
||||
delta = timedelta(minutes = 1),
|
||||
count = 100):
|
||||
"""
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logbook
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from numbers import Integral
|
||||
|
||||
from zipline import ndict
|
||||
@@ -75,7 +76,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
return
|
||||
|
||||
open_orders[sid].append(event)
|
||||
|
||||
|
||||
# Set the algo's order method.
|
||||
algo.set_order(order)
|
||||
|
||||
@@ -85,7 +86,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
# Call user-defined initialize method before we process any
|
||||
# events.
|
||||
algo.initialize()
|
||||
|
||||
|
||||
# Pipe the in stream into the transaction simulator.
|
||||
# Creates a txn field on the event containing transaction
|
||||
# information if we filled any pending orders on the event's sid.
|
||||
@@ -111,16 +112,45 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
)
|
||||
|
||||
# Batch the event stream by dt to be processed by the user's algo.
|
||||
# Will also set the PERF_MESSAGE field if the batch contains a perf
|
||||
# message.
|
||||
# Yields perf messages whenever it encounters them.
|
||||
perf_messages = algo_simulator(with_portfolio_and_perf_msg, algo)
|
||||
|
||||
def batcher(stream):
|
||||
for msg in stream:
|
||||
yield msg
|
||||
|
||||
|
||||
batches = batcher(with_portfolio_and_perf_msg)
|
||||
|
||||
for batch in batches:
|
||||
algo.handle_data(batch.data)
|
||||
if batch.perf_message:
|
||||
def algo_simulator(stream_in, sids, algo):
|
||||
|
||||
current_dt = None
|
||||
universe = ndict()
|
||||
|
||||
for sid in sids:
|
||||
universe[sid] = None
|
||||
universe.portfolio = None
|
||||
|
||||
for update in stream_in:
|
||||
#Yield perf messages to be relayed back to the browser.
|
||||
if update.perf_message:
|
||||
yield perf_message
|
||||
|
||||
if current_dt = None:
|
||||
current_dt = update.dt
|
||||
|
||||
# If this message is newer than the algorithm's simulated dt,
|
||||
# call handle data on a snapshot of the current algo universe,
|
||||
# then
|
||||
if message.dt >= current_dt + last_delta:
|
||||
start_tic = datetime.now()
|
||||
algo.handle_data(universe)
|
||||
stop_tic = datetime.now()
|
||||
last_delta = datetime
|
||||
|
||||
current_dt = message.dt + last_delta
|
||||
|
||||
batch.data[message.sid] = message
|
||||
batch.data.portfolio = message.portfolio
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
|
||||
are forwarded.
|
||||
"""
|
||||
forward_all_fields = tnfm_class.__dict__.get('FORWARDER', False)
|
||||
update_in_place = tnfm_class.__dict__.get('UPDATER', False)
|
||||
|
||||
assert isinstance(tnfm_class, (types.ObjectType, types.ClassType)), \
|
||||
"Stateful transform requires a class."
|
||||
@@ -72,29 +73,21 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
|
||||
tnfm_value = state.update(deepcopy(message_copy))
|
||||
|
||||
# If we want to keep all original values, plus append tnfm_id
|
||||
# and tnfm_value.
|
||||
# and tnfm_value. Used for Passthrough.
|
||||
if forward_all_fields:
|
||||
out_message = message_copy
|
||||
out_message.tnfm_id = namestring
|
||||
out_message.tnfm_value = tnfm_value
|
||||
yield out_message
|
||||
|
||||
# Special logic for TransactionSimulator and
|
||||
# PerformanceTracker. This is ugly but I want to get to
|
||||
# testing faster. Should be refactored later to something
|
||||
# that doesn't make Scott cry.
|
||||
elif tnfm_class.__name__ == 'TransactionSimulator':
|
||||
out_message = message_copy
|
||||
out_message.txn = tnfm_value
|
||||
yield out_message
|
||||
# Our expectation is that the transform simply updated the
|
||||
# message it was passed. Useful for chaining together
|
||||
# multiple transforms, e.g. TransactionSimulator/PerformanceTracker.
|
||||
elif update_in_place:
|
||||
yield tnfm_value
|
||||
|
||||
elif tnfm_class.__name__ == 'PerformanceTracker':
|
||||
out_message = message_copy
|
||||
del out_message['txn']
|
||||
out_message.portfolio = tnfm_value
|
||||
yield out_message
|
||||
|
||||
# Otherwise send tnfm_id, tnfm_value, and the message date.
|
||||
# Otherwise send tnfm_id, tnfm_value, and the message
|
||||
# date. Useful for transforms being piped to a merge.
|
||||
else:
|
||||
out_message = ndict()
|
||||
out_message.tnfm_id = namestring
|
||||
|
||||
Reference in New Issue
Block a user