mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-03 20:37:52 +08:00
more progress on tsc
This commit is contained in:
@@ -9,6 +9,7 @@ from zipline.protocol import SIMULATION_STYLE
|
||||
log = logbook.Logger('Transaction Simulator')
|
||||
|
||||
class TransactionSimulator(object):
|
||||
FORWARDER = True
|
||||
|
||||
def __init__(self, open_orders, style=SIMULATION_STYLE.PARTIAL_VOLUME):
|
||||
self.open_orders = open_orders
|
||||
|
||||
@@ -54,15 +54,13 @@ if __name__ == "__main__":
|
||||
|
||||
merge_out = merged_transforms(sort_out, tnfm_bundles)
|
||||
|
||||
# for message in merge_out:
|
||||
# print "Event: \n", message.event
|
||||
# print "Transforms: \n", message.tnfms
|
||||
# for message in merge_out:
|
||||
# print message
|
||||
|
||||
algo = TestAlgorithm(2, 100, 100)
|
||||
environment = create_trading_environment()
|
||||
style = zp.SIMULATION_STYLE.PARTIAL_VOLUME
|
||||
|
||||
|
||||
client_out = tsc(merge_out, algo, environment, style)
|
||||
|
||||
for message in client_out:
|
||||
print message
|
||||
|
||||
|
||||
@@ -54,21 +54,19 @@ def merge(stream_in, tnfm_ids):
|
||||
|
||||
def merge_one(sources):
|
||||
dict_primer = zip(sources.keys(), repeat(None))
|
||||
transforms = ndict(dict_primer)
|
||||
event_fields = ndict()
|
||||
|
||||
for key, queue in sources.iteritems():
|
||||
|
||||
# Add transform value to the transforms dict.
|
||||
message = queue.popleft()
|
||||
transforms[message.tnfm_id] = message.tnfm_value
|
||||
event_fields[message.tnfm_id] = message.tnfm_value
|
||||
del message['tnfm_id']
|
||||
del message['tnfm_value']
|
||||
|
||||
# Merge any remaining fields into the event dict.
|
||||
event_fields.merge(message)
|
||||
|
||||
return ndict({'event' : event_fields, 'tnfms' : transforms})
|
||||
return event_fields
|
||||
|
||||
|
||||
#TODO: This is replicated in sort. Probably should be one source file.
|
||||
|
||||
@@ -49,7 +49,6 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
# Initialize txn_sim's dictionary of orders here so that we can
|
||||
# reference it from within the user's algorithm.
|
||||
|
||||
import nose.tools; nose.tools.set_trace()
|
||||
sids = algo.get_sid_filter()
|
||||
open_orders = {}
|
||||
|
||||
@@ -88,7 +87,7 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
algo.initialize()
|
||||
|
||||
# Pipe the in stream into the transaction simulator.
|
||||
# Creates a TRANSACTION field on the event containing transaction
|
||||
# Creates a txn field on the event containing transaction
|
||||
# information if we filled any pending orders on the event's sid.
|
||||
# TRANSACTION is None if we didn't fill any orders.
|
||||
with_txns = stateful_transform(
|
||||
@@ -100,14 +99,14 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
|
||||
|
||||
# Pipe the events with transactions to perf. This will remove the
|
||||
# TRANSACTION field added by TransactionSimulator and replace it with
|
||||
# txn field added by TransactionSimulator and replace it with
|
||||
# a portfolio object to be passed to the user's algorithm. Also adds
|
||||
# a PERF_MESSAGE field which is usually none, but contains an update
|
||||
# message once per day.
|
||||
with_portfolio_and_perf_msg = stateful_transform(
|
||||
stream_with_txns,
|
||||
with_txns,
|
||||
PerformanceTracker,
|
||||
trading_environment,
|
||||
environment,
|
||||
sids
|
||||
)
|
||||
|
||||
@@ -115,6 +114,10 @@ def trade_simulation_client(stream_in, algo, environment, sim_style):
|
||||
# Will also set the PERF_MESSAGE field if the batch contains a perf
|
||||
# message.
|
||||
|
||||
def batcher(stream):
|
||||
for msg in stream:
|
||||
yield msg
|
||||
|
||||
batches = batcher(with_portfolio_and_perf_msg)
|
||||
|
||||
for batch in batches:
|
||||
|
||||
@@ -54,7 +54,7 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
|
||||
"Stateful transform requires a class."
|
||||
assert tnfm_class.__dict__.has_key('update'), \
|
||||
"Stateful transform requires the class to have an update method"
|
||||
|
||||
|
||||
# Create an instance of our transform class.
|
||||
state = tnfm_class(*args, **kwargs)
|
||||
|
||||
@@ -71,13 +71,18 @@ def stateful_transform(stream_in, tnfm_class, *args, **kwargs):
|
||||
# Same shared pointer issue here as above.
|
||||
tnfm_value = state.update(deepcopy(message_copy))
|
||||
|
||||
# If we want to keep all original values, just append tnfm_id
|
||||
# If we want to keep all original values, plus append tnfm_id
|
||||
# and tnfm_value.
|
||||
if forward_all_fields:
|
||||
out_message = message_copy
|
||||
out_message.tnfm_id = namestring
|
||||
out_message.tnfm_value = tnfm_value
|
||||
yield out_message
|
||||
|
||||
# Special logic for TransactionSimulator. This is ugly but I
|
||||
# want to get to testing faster. Should be refactored later
|
||||
# to something that doesn't make Scott cry.
|
||||
elif tnfm_class.__name__ == 'TransactionSimulator'
|
||||
|
||||
# Otherwise send tnfm_id, tnfm_value, and the message date.
|
||||
else:
|
||||
|
||||
@@ -89,8 +89,6 @@ def get_next_trading_dt(current, interval, trading_calendar):
|
||||
|
||||
return next
|
||||
|
||||
|
||||
|
||||
def create_trade_history(sid, prices, amounts, interval, trading_calendar):
|
||||
trades = []
|
||||
current = trading_calendar.first_open
|
||||
|
||||
Reference in New Issue
Block a user