mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 12:22:42 +08:00
save
This commit is contained in:
@@ -15,7 +15,7 @@ def date_sorted_sources(*sources):
|
||||
"""
|
||||
Takes an iterable of SortBundles, generating namestrings and initialized datasources
|
||||
for each before piping them into a date_sort.
|
||||
"""
|
||||
n """
|
||||
|
||||
for source in sources:
|
||||
assert iter(source), "Source %s not iterable" % source
|
||||
@@ -27,9 +27,9 @@ def date_sorted_sources(*sources):
|
||||
# Convert the list of generators into a flat stream by pulling
|
||||
# one element at a time from each.
|
||||
stream_in = roundrobin(sources, names)
|
||||
|
||||
|
||||
# Guarantee the flat stream will be sorted by date, using source_id as
|
||||
# tie-breaker, which is fully deterministic (given deterministic string
|
||||
# tie-breaker, which is fully deterministic (given deterministic string
|
||||
# representation for all args/kwargs)
|
||||
|
||||
return date_sort(stream_in, names)
|
||||
@@ -51,15 +51,15 @@ def merged_transforms(sorted_stream, bundles):
|
||||
|
||||
# Create a copy of the stream for each transform.
|
||||
split = tee(sorted_stream, len(bundles))
|
||||
# Package a stream copy with each bundle
|
||||
# Package a stream copy with each bundle
|
||||
tnfms_with_streams = zip(split, bundles)
|
||||
|
||||
# Convert the copies into transform streams.
|
||||
tnfms = [
|
||||
StatefulTransform(
|
||||
stream_copy,
|
||||
bundle.tnfm,
|
||||
*bundle.args,
|
||||
stream_copy,
|
||||
bundle.tnfm,
|
||||
*bundle.args,
|
||||
**bundle.kwargs
|
||||
)
|
||||
for stream_copy, bundle in tnfms_with_streams
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import pytz
|
||||
|
||||
from time import sleep
|
||||
from pprint import pprint as pp
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from zipline.utils.factory import create_trading_environment
|
||||
@@ -20,7 +23,7 @@ if __name__ == "__main__":
|
||||
kwargs_a = {
|
||||
'sids' : [1,2,3],
|
||||
'start' : datetime(2012,1,3,15, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 1),
|
||||
'delta' : timedelta(minutes = 10),
|
||||
'filter' : filter
|
||||
}
|
||||
source_a = SpecificEquityTrades(*args_a, **kwargs_a)
|
||||
@@ -30,7 +33,7 @@ if __name__ == "__main__":
|
||||
kwargs_b = {
|
||||
'sids' : [2,3,4],
|
||||
'start' : datetime(2012,1,3,14, tzinfo = pytz.utc),
|
||||
'delta' : timedelta(minutes = 1),
|
||||
'delta' : timedelta(minutes = 10),
|
||||
'filter' : filter
|
||||
}
|
||||
source_b = SpecificEquityTrades(*args_b, **kwargs_b)
|
||||
@@ -45,13 +48,12 @@ if __name__ == "__main__":
|
||||
|
||||
merge_out = merged_transforms(sort_out, tnfm_bundles)
|
||||
|
||||
import nose.tools; nose.tools.set_trace()
|
||||
algo = TestAlgorithm(2, 100, 100, sid_filter = [2,3])
|
||||
algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3])
|
||||
environment = create_trading_environment(year = 2012)
|
||||
style = zp.SIMULATION_STYLE.FIXED_SLIPPAGE
|
||||
|
||||
client_out = tsc(merge_out, algo, environment, style)
|
||||
for message in client_out:
|
||||
pp(message)
|
||||
|
||||
sleep(1)
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ def roundrobin(sources, namestrings):
|
||||
assert len(sources) == len(namestrings)
|
||||
mapping = OrderedDict(zip(namestrings, sources))
|
||||
|
||||
import nose.tools; nose.tools.set_trace()
|
||||
# While our generators have not been exhausted, pull elements
|
||||
while mapping.keys() != []:
|
||||
for namestring, source in mapping.iteritems():
|
||||
@@ -55,8 +54,6 @@ def roundrobin(sources, namestrings):
|
||||
yield done_message(namestring)
|
||||
del mapping[namestring]
|
||||
|
||||
|
||||
|
||||
def hash_args(*args, **kwargs):
|
||||
"""Define a unique string for any set of representable args."""
|
||||
arg_string = '_'.join([str(arg) for arg in args])
|
||||
|
||||
Reference in New Issue
Block a user