mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 20:15:35 +08:00
added logs
This commit is contained in:
@@ -1,12 +1,16 @@
|
||||
"""
|
||||
Generator version of Feed.
|
||||
Sorting generator.
|
||||
"""
|
||||
import logbook
|
||||
|
||||
from collections import deque
|
||||
from zipline import ndict
|
||||
from zipline.gens.utils import \
|
||||
assert_datasource_unframe_protocol, \
|
||||
assert_sort_protocol
|
||||
|
||||
log = logbook.Logger('Sorting')
|
||||
|
||||
def date_sort(stream_in, source_ids):
|
||||
"""
|
||||
A generator that takes a generator and a list of source_ids. We
|
||||
@@ -23,6 +27,7 @@ def date_sort(stream_in, source_ids):
|
||||
sources[id] = deque()
|
||||
|
||||
# Process incoming streams.
|
||||
log.info('Sorting first message')
|
||||
for message in stream_in:
|
||||
# Incoming messages should be the output of DATASOURCE_UNFRAME.
|
||||
assert_datasource_unframe_protocol(message), \
|
||||
@@ -46,6 +51,7 @@ def date_sort(stream_in, source_ids):
|
||||
assert len(queue) == 1, "Bad queue in date_sort on exit: %s" % queue
|
||||
assert queue[0].dt == "DONE", \
|
||||
"Bad last message in date_sort on exit: %s" % queue
|
||||
log.info('Successfully finished Sorting')
|
||||
|
||||
def ready(sources):
|
||||
"""
|
||||
|
||||
@@ -3,6 +3,7 @@ Generator versions of transforms.
|
||||
"""
|
||||
import types
|
||||
import pytz
|
||||
import logbook
|
||||
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timedelta
|
||||
@@ -15,6 +16,8 @@ from zipline.utils.tradingcalendar import trading_days_between
|
||||
from zipline.gens.utils import assert_sort_unframe_protocol, \
|
||||
assert_transform_protocol, hash_args
|
||||
|
||||
log = logbook.Logger('Transform')
|
||||
|
||||
class Passthrough(object):
|
||||
FORWARDER = True
|
||||
"""
|
||||
@@ -72,6 +75,7 @@ class StatefulTransform(object):
|
||||
|
||||
# Create the string associated with this generator's output.
|
||||
self.namestring = tnfm_class.__name__ + hash_args(*args, **kwargs)
|
||||
log.info('StatefulTransform [%s] initialized' % self.namestring)
|
||||
|
||||
def get_hash(self):
|
||||
return self.namestring
|
||||
@@ -82,7 +86,7 @@ class StatefulTransform(object):
|
||||
def _gen(self, stream_in):
|
||||
# IMPORTANT: Messages may contain pointers that are shared with
|
||||
# other streams, so we only manipulate copies.
|
||||
|
||||
log.info('Running StatefulTransform [%s]' % self.get_hash())
|
||||
for message in stream_in:
|
||||
|
||||
# allow upstream generators to yield None to avoid
|
||||
@@ -143,6 +147,7 @@ class StatefulTransform(object):
|
||||
out_message.dt = message_copy.dt
|
||||
yield out_message
|
||||
|
||||
log.info('Finished StatefulTransform [%s]' % self.get_hash())
|
||||
class EventWindow:
|
||||
"""
|
||||
Abstract base class for transform classes that calculate iterative
|
||||
|
||||
@@ -654,7 +654,13 @@ def tuple_to_date(date_tuple):
|
||||
dt = dt.replace(microsecond = micros, tzinfo = pytz.utc)
|
||||
return dt
|
||||
|
||||
# Datasource type should completely determine the other fields of a
|
||||
# message with its type.
|
||||
DATASOURCE_TYPE = Enum(
|
||||
'AS_TRADED_EQUITY',
|
||||
'MERGE',
|
||||
'SPLIT',
|
||||
'DIVIDEND',
|
||||
'TRADE',
|
||||
'EMPTY',
|
||||
'DONE'
|
||||
|
||||
Reference in New Issue
Block a user