fixed dt to datetime alias logic-O

This commit is contained in:
fawce
2012-08-08 15:06:24 -04:00
parent 9611192326
commit a0db79428d
2 changed files with 7 additions and 11 deletions
-1
View File
@@ -292,7 +292,6 @@ class ComponentTestCase(TestCase):
BT_UPDATE_UNFRAME,
"tsc"
)
import nose.tools; nose.tools.set_trace()
mon_proc = launch_monitor(monitor)
for message in tsc_comp:
log.info(pf(message))
+7 -10
View File
@@ -28,7 +28,7 @@ def date_sorted_sources(*sources):
# Convert the list of generators into a flat stream by pulling
# one element at a time from each.
stream_in = roundrobin(sources, names)
# Guarantee the flat stream will be sorted by date, using
# source_id as tie-breaker, which is fully deterministic (given
# deterministic string representation for all args/kwargs)
@@ -65,6 +65,8 @@ def merged_transforms(sorted_stream, *transforms):
to_merge = roundrobin(tnfm_gens, namestrings)
# Pipe the stream into merge.
merged = merge(to_merge, namestrings)
dt_aliased = alias_dt(merged)
# Return the merged events.
return add_done(dt_aliased)
@@ -74,7 +76,7 @@ def sequential_transforms(stream_in, *transforms):
Each transform application will add a new entry indexed to the transform's
hash string.
"""
assert isinstance(transforms, (list, tuple))
for tnfm in transforms:
tnfm.forward_all = False
@@ -82,8 +84,8 @@ def sequential_transforms(stream_in, *transforms):
tnfm.append_value = True
# Recursively apply all transforms to the stream.
stream_out = reduce(lambda stream, tnfm: tnfm.transform(stream),
transforms,
stream_out = reduce(lambda stream, tnfm: tnfm.transform(stream),
transforms,
stream_in)
dt_aliased = alias_dt(stream_out)
@@ -95,13 +97,8 @@ def alias_dt(stream_in):
"""
for message in stream_in:
message['datetime'] = message['dt']
yield message
yield message
# Add a done message to a stream.
def add_done(stream_in):
return chain(stream_in, [done_message('Composite')])