mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 21:54:18 +08:00
Merge branch 'new_world_order' of github.com:quantopian/zipline into new_world_order
Conflicts: zipline/gens/examples.py
This commit is contained in:
@@ -197,13 +197,12 @@ class PerformanceTracker(object):
|
||||
# save the transactions for the daily periods
|
||||
keep_transactions = True
|
||||
)
|
||||
|
||||
|
||||
for sid in sid_list:
|
||||
self.cumulative_performance.positions[sid] = Position(sid)
|
||||
self.todays_performance.positions[sid] = Position(sid)
|
||||
|
||||
def update(self, event):
|
||||
import nose.tools; nose.tools.set_trace()
|
||||
event.perf_message = self.process_event(event)
|
||||
event.portfolio = self.get_portfolio()
|
||||
del event['TRANSACTION']
|
||||
@@ -247,6 +246,8 @@ class PerformanceTracker(object):
|
||||
|
||||
|
||||
def process_event(self, event):
|
||||
|
||||
message = None
|
||||
|
||||
if self.exceeded_max_loss:
|
||||
return
|
||||
@@ -255,7 +256,7 @@ class PerformanceTracker(object):
|
||||
self.event_count += 1
|
||||
|
||||
if(event.dt >= self.market_close):
|
||||
self.handle_market_close()
|
||||
message = self.handle_market_close()
|
||||
|
||||
if event.TRANSACTION:
|
||||
self.txn_count += 1
|
||||
@@ -270,8 +271,10 @@ class PerformanceTracker(object):
|
||||
self.cumulative_performance.calculate_performance()
|
||||
self.todays_performance.calculate_performance()
|
||||
|
||||
def handle_market_close(self):
|
||||
return message
|
||||
|
||||
def handle_market_close(self):
|
||||
|
||||
# add the return results from today to the list of DailyReturn objects.
|
||||
todays_date = self.market_close.replace(hour=0, minute=0, second=0)
|
||||
todays_return_obj = risk.DailyReturn(
|
||||
@@ -293,14 +296,9 @@ class PerformanceTracker(object):
|
||||
# calculate progress of test
|
||||
self.progress = self.day_count / self.total_days
|
||||
|
||||
# TODO!!!!
|
||||
#TODO TODO TODO!!
|
||||
daily_update = self.to_dict()
|
||||
|
||||
# Output results
|
||||
if self.results_socket:
|
||||
msg = zp.PERF_FRAME(self.to_dict())
|
||||
self.results_socket.send(msg)
|
||||
|
||||
#
|
||||
if self.trading_environment.max_drawdown:
|
||||
returns = self.todays_performance.returns
|
||||
max_dd = -1 * self.trading_environment.max_drawdown
|
||||
@@ -311,7 +309,7 @@ class PerformanceTracker(object):
|
||||
# so it shows up in the update, but don't end the test
|
||||
# here. Let the update go out before stopping
|
||||
self.exceeded_max_loss = True
|
||||
return
|
||||
return daily_update
|
||||
|
||||
|
||||
#move the market day markers forward
|
||||
@@ -333,6 +331,8 @@ class PerformanceTracker(object):
|
||||
self.market_close,
|
||||
keep_transactions = True
|
||||
)
|
||||
|
||||
return daily_update
|
||||
|
||||
def handle_simulation_end(self):
|
||||
"""
|
||||
@@ -369,8 +369,8 @@ class Position(object):
|
||||
self.sid = sid
|
||||
self.amount = 0
|
||||
self.cost_basis = 0.0 ##per share
|
||||
self.last_sale_price = None
|
||||
self.last_sale_date = None
|
||||
self.last_sale_price = 0.0
|
||||
self.last_sale_date = 0.0
|
||||
|
||||
def update(self, txn):
|
||||
if(self.sid != txn.sid):
|
||||
|
||||
+73
-59
@@ -49,7 +49,7 @@ def fuzzy_dates(count = 500):
|
||||
for date in date_gen(count = count):
|
||||
yield date + timedelta(seconds = random.randint(-10, 10))
|
||||
|
||||
def SpecificEquityTrades(*args, **config):
|
||||
class SpecificEquityTrades(object):
|
||||
"""
|
||||
Yields all events in event_list that match the given sid_filter.
|
||||
If no event_list is specified, generates an internal stream of events
|
||||
@@ -57,71 +57,85 @@ def SpecificEquityTrades(*args, **config):
|
||||
|
||||
Configuration options:
|
||||
|
||||
count: integer representing number of trades
|
||||
sids : list of values representing simulated internal sids
|
||||
start: start date
|
||||
delta: timedelta between internal events
|
||||
|
||||
|
||||
count : integer representing number of trades
|
||||
sids : list of values representing simulated internal sids
|
||||
start : start date
|
||||
delta : timedelta between internal events
|
||||
filter : filter to remove the sids
|
||||
"""
|
||||
# We shouldn't get any positional arguments.
|
||||
assert args == ()
|
||||
|
||||
# Unpack config dictionary with default values.
|
||||
count = config.get('count', 500)
|
||||
sids = config.get('sids', [1, 2])
|
||||
start = config.get('start', datetime(2012, 6, 6, 0))
|
||||
delta = config.get('delta', timedelta(minutes = 1))
|
||||
def __init__(self, *args, **kwargs):
|
||||
# We shouldn't get any positional arguments.
|
||||
assert len(args) == 0
|
||||
|
||||
# Unpack config dictionary with default values.
|
||||
self.count = kwargs.get('count', 500)
|
||||
self.sids = kwargs.get('sids', [1, 2])
|
||||
self.start = kwargs.get('start', datetime(2012, 6, 6, 0))
|
||||
self.delta = kwargs.get('delta', timedelta(minutes = 1))
|
||||
|
||||
# Default to None for event_list and filter.
|
||||
self.event_list = kwargs.get('event_list')
|
||||
self.filter = kwargs.get('filter')
|
||||
|
||||
# Hash_value for downstream sorting.
|
||||
self.arg_string = hash_args(*args, **kwargs)
|
||||
|
||||
# Default to None for event_list and filter.
|
||||
event_list = config.get('event_list')
|
||||
filter = config.get('filter')
|
||||
def get_hash(self):
|
||||
return self.__class__.__name__ + "-" + self.arg_string
|
||||
|
||||
def __iter__(self):
|
||||
|
||||
if self.event_list:
|
||||
unfiltered = (event for event in event_list)
|
||||
|
||||
arg_string = hash_args(*args, **config)
|
||||
namestring = "SpecificEquityTrades" + arg_string
|
||||
# If we have an event_list, ignore the other arguments and use the list.
|
||||
# TODO: still append our namestring?
|
||||
if event_list:
|
||||
unfiltered = (event for event in event_list)
|
||||
# Set up iterators for each expected field.
|
||||
else:
|
||||
dates = date_gen(count=self.count,
|
||||
start=self.start,
|
||||
delta=self.delta
|
||||
)
|
||||
prices = mock_prices(self.count)
|
||||
volumes = mock_volumes(self.count)
|
||||
sids = cycle(self.sids)
|
||||
|
||||
# Combine the iterators into a single iterator of arguments
|
||||
arg_gen = izip(sids, prices, volumes, dates)
|
||||
|
||||
# Set up iterators for each expected field.
|
||||
else:
|
||||
dates = date_gen(count = count, start = start, delta = delta)
|
||||
prices = mock_prices(count)
|
||||
volumes = mock_volumes(count)
|
||||
# Convert argument packages into events.
|
||||
unfiltered = (create_trade(*args, source_id = self.get_hash())
|
||||
for args in arg_gen)
|
||||
|
||||
# If we specified a sid filter, filter out elements that don't
|
||||
# match the filter.
|
||||
if self.filter:
|
||||
filtered = ifilter(lambda event: event.sid in self.filter, unfiltered)
|
||||
|
||||
# Otherwise just use all events.
|
||||
else:
|
||||
filtered = unfiltered
|
||||
|
||||
# Return the filtered event stream.
|
||||
return filtered
|
||||
|
||||
|
||||
# !!!!!!! Deprecated for now !!!!!!!!!
|
||||
|
||||
def RandomEquityTrades(object):
|
||||
|
||||
def __init__(self):
|
||||
# We shouldn't get any positional args.
|
||||
assert args == ()
|
||||
|
||||
self.count = config.get('count', 500)
|
||||
self.sids = config.get('sids', [1,2])
|
||||
self.filter = config.get('filter')
|
||||
|
||||
dates = fuzzy_dates(count)
|
||||
prices = mock_prices(count, rand = True)
|
||||
volumes = mock_volumes(count, rand = True)
|
||||
sids = cycle(sids)
|
||||
|
||||
# Combine the iterators into a single iterator of arguments
|
||||
arg_gen = izip(sids, prices, volumes, dates)
|
||||
|
||||
# Convert argument packages into events.
|
||||
unfiltered = (create_trade(*args, source_id = namestring)
|
||||
for args in arg_gen)
|
||||
|
||||
# If we specified a sid filter, filter out elements that don't match the filter.
|
||||
if filter:
|
||||
filtered = ifilter(lambda event: event.sid in filter, unfiltered)
|
||||
|
||||
# Otherwise just use all events.
|
||||
else:
|
||||
filtered = unfiltered
|
||||
|
||||
# Return the filtered event stream.
|
||||
return filtered
|
||||
|
||||
def RandomEquityTrades(*args, **config):
|
||||
# We shouldn't get any positional args.
|
||||
assert args == ()
|
||||
|
||||
count = config.get('count', 500)
|
||||
sids = config.get('sids', [1,2])
|
||||
filter = config.get('filter')
|
||||
|
||||
dates = fuzzy_dates(count)
|
||||
prices = mock_prices(count, rand = True)
|
||||
volumes = mock_volumes(count, rand = True)
|
||||
sids = cycle(sids)
|
||||
|
||||
arg_gen = izip(sids, prices, volumes, dates)
|
||||
|
||||
unfiltered = (create_trade(*args) for args in arg_gen)
|
||||
|
||||
@@ -55,37 +55,6 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
|
||||
for sid in sids:
|
||||
open_orders[sid] = []
|
||||
|
||||
# Closure to pass into the user's algo to allow placing orders
|
||||
# into the txn_sim's dict of open orders.
|
||||
def order(self, sid, amount):
|
||||
assert sid in sids, "Order on invalid sid: %i" % sid
|
||||
order = ndict({
|
||||
'dt' : self.current_dt,
|
||||
'sid' : sid,
|
||||
'amount' : int(amount),
|
||||
'filled' : 0
|
||||
})
|
||||
|
||||
# Tell the user if they try to buy 0 shares of something.
|
||||
if order.amount == 0:
|
||||
log = "requested to trade zero shares of {sid}".format(
|
||||
sid=event.sid
|
||||
)
|
||||
log.debug(log)
|
||||
return
|
||||
|
||||
open_orders[sid].append(event)
|
||||
|
||||
# Set the algo's order method.
|
||||
algo.set_order(order)
|
||||
|
||||
# Provide a logbook logging interface to user code.
|
||||
algo.set_logger(logbook.Logger("Algolog"))
|
||||
|
||||
# Call user-defined initialize method before we process any
|
||||
# events.
|
||||
algo.initialize()
|
||||
|
||||
# Pipe the in stream into the transaction simulator.
|
||||
# Creates a txn field on the event containing transaction
|
||||
@@ -113,44 +82,115 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
|
||||
# Batch the event stream by dt to be processed by the user's algo.
|
||||
# Yields perf messages whenever it encounters them.
|
||||
perf_messages = algo_simulator(with_portfolio_and_perf_msg, algo)
|
||||
perf_messages = algo_simulator(with_portfolio_and_perf_msg, sids, algo, open_orders)
|
||||
|
||||
for message in perf_messages:
|
||||
yield message
|
||||
|
||||
|
||||
def algo_simulator(stream_in, sids, algo, order_book):
|
||||
|
||||
simulation_dt = None
|
||||
|
||||
# Closure to pass into the user's algo to allow placing orders
|
||||
# into the txn_sim's dict of open orders.
|
||||
def order(sid, amount):
|
||||
assert sid in sids, "Order on invalid sid: %i" % sid
|
||||
order = ndict({
|
||||
'dt' : simulation_dt,
|
||||
'sid' : sid,
|
||||
'amount' : int(amount),
|
||||
'filled' : 0
|
||||
})
|
||||
|
||||
def algo_simulator(stream_in, sids, algo):
|
||||
# Tell the user if they try to buy 0 shares of something.
|
||||
if order.amount == 0:
|
||||
log = "requested to trade zero shares of {sid}".format(
|
||||
sid=event.sid
|
||||
)
|
||||
log.debug(log)
|
||||
return
|
||||
|
||||
order_book[sid].append(order)
|
||||
|
||||
# Set the algo's order method.
|
||||
algo.set_order(order)
|
||||
|
||||
# Provide a logbook logging interface to user code.
|
||||
algo.set_logger(logbook.Logger("Algolog"))
|
||||
|
||||
# Call user-defined initialize method before we process any
|
||||
# events.
|
||||
algo.initialize()
|
||||
|
||||
current_dt = None
|
||||
universe = ndict()
|
||||
|
||||
for sid in sids:
|
||||
universe[sid] = None
|
||||
universe[sid] = ndict()
|
||||
universe.portfolio = None
|
||||
this_snapshot_dt = None
|
||||
|
||||
for update in stream_in:
|
||||
#Yield perf messages to be relayed back to the browser.
|
||||
if update.perf_message:
|
||||
yield perf_message
|
||||
|
||||
if current_dt = None:
|
||||
current_dt = update.dt
|
||||
|
||||
# If this message is newer than the algorithm's simulated dt,
|
||||
# call handle data on a snapshot of the current algo universe,
|
||||
# then
|
||||
if message.dt >= current_dt + last_delta:
|
||||
start_tic = datetime.now()
|
||||
algo.handle_data(universe)
|
||||
stop_tic = datetime.now()
|
||||
last_delta = datetime
|
||||
|
||||
current_dt = message.dt + last_delta
|
||||
|
||||
batch.data[message.sid] = message
|
||||
batch.data.portfolio = message.portfolio
|
||||
|
||||
for event in stream_in:
|
||||
# Yield any perf messages received to be relayed back to the browser.
|
||||
if event.perf_message:
|
||||
yield event.perf_message
|
||||
del event['perf_message']
|
||||
|
||||
|
||||
# This should only happen for the first event we run.
|
||||
if simulation_dt == None:
|
||||
simulation_dt = event.dt
|
||||
|
||||
# If we are currently creating a new message and this update
|
||||
# matches the message dt, update the state of the universe.
|
||||
|
||||
if this_snapshot_dt != None:
|
||||
|
||||
if event.dt == this_snapshot_dt:
|
||||
update_universe(event, universe)
|
||||
|
||||
# If we are constructing a snapshot and we hit a new dt, call
|
||||
# handle_data and record how long it takes.
|
||||
else:
|
||||
start_tic = datetime.now()
|
||||
algo.handle_data(universe)
|
||||
stop_tic = datetime.now()
|
||||
|
||||
# How long did you take?
|
||||
delta = stop_tic - start_tic
|
||||
|
||||
# Update the simulation time.
|
||||
simulation_dt = this_snapshot_dt + delta
|
||||
|
||||
# Update the universe with the new event.
|
||||
update_universe(event, universe)
|
||||
|
||||
# If the current event is later than the simulation
|
||||
# time, update the universe and start constructing
|
||||
# another snapshot.
|
||||
if event.dt >= simulation_dt:
|
||||
this_snapshot_dt = event.dt
|
||||
else:
|
||||
this_snapshot_dt = None
|
||||
# We have been fastforwarding. Update the universe
|
||||
# and check if we can start a new snapshot.
|
||||
else:
|
||||
update_universe(event, universe)
|
||||
if event.dt >= simulation_dt:
|
||||
this_snapshot_dt = event.dt
|
||||
|
||||
|
||||
|
||||
|
||||
def update_universe(event, universe):
|
||||
|
||||
universe.portfolio = event.portfolio
|
||||
del event['portfolio']
|
||||
|
||||
event_sid = event.sid
|
||||
del event['sid']
|
||||
|
||||
for field in event.keys():
|
||||
universe[event_sid][field] = event[field]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user