From 7051d6826dc8b4f634d9a86a0b77b05de57deed1 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 21 Aug 2012 12:18:26 -0400 Subject: [PATCH 1/8] remove unnecessary copying --- zipline/gens/composites.py | 2 +- zipline/gens/transform.py | 25 ++++++++++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index b3fa7576..78bea152 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -43,7 +43,7 @@ def merged_transforms(sorted_stream, *transforms): """ for transform in transforms: assert isinstance(transform, StatefulTransform) - + transform.set_copying() # Generate expected hashes for each transform namestrings = [tnfm.get_hash() for tnfm in transforms] diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 51475f45..612cd098 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -72,6 +72,7 @@ class StatefulTransform(object): # Create an instance of our transform class. self.state = tnfm_class(*args, **kwargs) + self._copying = False # Create the string associated with this generator's output. self.namestring = tnfm_class.__name__ + hash_args(*args, **kwargs) @@ -80,12 +81,16 @@ class StatefulTransform(object): def get_hash(self): return self.namestring + def set_copyting(self): + self._copying = True + def transform(self, stream_in): return self._gen(stream_in) def _gen(self, stream_in): # IMPORTANT: Messages may contain pointers that are shared with - # other streams, so we only manipulate copies. + # other streams. Transforms that modify their input + # messages should only manipulate copies. log.info('Running StatefulTransform [%s]' % self.get_hash()) for message in stream_in: @@ -94,13 +99,15 @@ class StatefulTransform(object): if message == None: continue - #TODO: refactor this to avoid unnecessary copying. - assert_sort_unframe_protocol(message) - message_copy = deepcopy(message) - + + # Copying flag is used by merged_transforms to ensure + # isolation of messages. + if self._copying: + message = deepcopy(message) + # Same shared pointer issue here as above. - tnfm_value = self.state.update(deepcopy(message_copy)) + tnfm_value = self.state.update(message) # FORWARDER flag means we want to keep all original # values, plus append tnfm_id and tnfm_value. Used for @@ -108,7 +115,7 @@ class StatefulTransform(object): # will be fed into a merge. Currently only Passthrough # uses this flag. if self.forward_all: - out_message = message_copy + out_message = message out_message.tnfm_id = self.namestring out_message.tnfm_value = tnfm_value yield out_message @@ -133,7 +140,7 @@ class StatefulTransform(object): # tnfm_value) into a single field. This mode is used by # the sequential_transforms composite. elif self.append_value: - out_message = message_copy + out_message = message out_message[self.namestring] = tnfm_value yield out_message @@ -145,7 +152,7 @@ class StatefulTransform(object): out_message = ndict() out_message.tnfm_id = self.namestring out_message.tnfm_value = tnfm_value - out_message.dt = message_copy.dt + out_message.dt = message.dt yield out_message log.info('Finished StatefulTransform [%s]' % self.get_hash()) From 3bcdf0f2aff57a4d3068646fbc7394430a963c45 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 21 Aug 2012 14:31:15 -0400 Subject: [PATCH 2/8] break out of loop properly when we get done --- zipline/gens/tradesimulation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 3fdcc565..f8fd5c19 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -115,7 +115,7 @@ class TradeSimulationClient(object): # calculated by the performance tracker) at the end of each # day. It will also yield a risk report at the end of the # simulation. - + for message in self.algo_sim: yield message @@ -259,7 +259,7 @@ class AlgorithmSimulator(object): if date == 'DONE': for event in snapshot: yield event.perf_message - + break # We're still in the warmup period. Use the event to # update our universe, but don't yield any perf messages, # and don't send a snapshot to handle_data. @@ -292,7 +292,7 @@ class AlgorithmSimulator(object): del event['perf_message'] self.update_universe(event) - + # Send the current state of the universe to the user's algo. self.simulate_snapshot(date) @@ -315,7 +315,7 @@ class AlgorithmSimulator(object): # Needs to be set so that we inject the proper date into algo # log/print lines. self.snapshot_dt = date - + start_tic = datetime.now() with self.heartbeat_monitor: self.algo.handle_data(self.universe) From d24bfbbf4334e9ffa77d4eeb045ec61fb50eb163 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 21 Aug 2012 14:34:18 -0400 Subject: [PATCH 3/8] SimulatedLite --- zipline/lines.py | 24 +++++++++++++++++++++++- zipline/utils/timeout.py | 6 ++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/zipline/lines.py b/zipline/lines.py index 1c3a558f..23940711 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -121,7 +121,6 @@ class SimulatedTrading(object): # exit status flag self.success = False - def simulate(self, blocking=True, send_sighup=False): # for non-blocking, @@ -356,3 +355,26 @@ class SimulatedTrading(object): #------------------- return sim + +class SimulatedTradingLite(object): + """ + SimulatedTrading without multiprocess and without zmq. + Useful for profiling the core logic and for rapid testing + of new features. + """ + def __init__(self, + sources, + transforms, + algorithm, + environment, + style): + + self.date_sorted = date_sorted_sources(*sources) + self.transforms = transforms + # Formerly merged_transforms. + self.with_tnfms = sequential_transforms(self.date_sorted, *self.transforms) + self.trading_client = tsc(algorithm, environment, style) + self.gen = self.trading_client.simulate(self.with_tnfms) + + def get_results(self): + return self.gen diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 2985222e..4d61a030 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -11,6 +11,8 @@ class Timeout(Exception): def __init__(self, frame, message=''): self.frame = frame self.message = message + +# TODO: fix code replication here. class timeout(object): """ @@ -33,7 +35,7 @@ class timeout(object): @wraps(fn) def call_fn_with_timeout(*args, **kwargs): - # Set the alarm. + # Set the alarm, saving any handler that existed previously. signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) try: @@ -45,7 +47,7 @@ class timeout(object): # call to fn takes too long. finally: signal.setitimer(signal.ITIMER_REAL, 0, 0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) + signal.signal(signal.SIGALRM, self.prior_handler) # Return the value of fn if it finished before the alarm. This # won't execute if the Timeout was raised. From 1f78a07d3036505e7c86e805da5b2fe9c25a25c5 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Tue, 21 Aug 2012 19:55:40 -0400 Subject: [PATCH 4/8] refactor tradesimulation client to not use StatefulTransform unnecessarily --- zipline/finance/performance.py | 54 +++++++++--------- zipline/finance/trading.py | 8 ++- zipline/gens/composites.py | 4 +- zipline/gens/tradesimulation.py | 81 +++++++++------------------ zipline/gens/transform.py | 98 +++++++++++---------------------- 5 files changed, 95 insertions(+), 150 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 98ac8daf..05a3a875 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -133,7 +133,7 @@ import zipline.finance.risk as risk log = logbook.Logger('Performance') class PerformanceTracker(object): - UPDATER = True + """ Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then @@ -166,7 +166,6 @@ class PerformanceTracker(object): self.event_count = 0 self.last_dict = None self.exceeded_max_loss = False - self.no_more_updates = False self.results_socket = None self.results_addr = None @@ -202,28 +201,30 @@ class PerformanceTracker(object): for sid in sid_list: self.cumulative_performance.positions[sid] = Position(sid) self.todays_performance.positions[sid] = Position(sid) - - def update(self, event): - if self.no_more_updates: - return zp.ndict({'dt':0}) - elif event.dt == "DONE": - event.perf_message = self.handle_simulation_end() - del event['TRANSACTION'] - self.no_more_updates = True - return event - elif self.exceeded_max_loss: - # in case of max_loss, signal to downstream - # generators that we are done. - event.dt = "DONE" - event.perf_message = self.handle_simulation_end() - del event['TRANSACTION'] - self.no_more_updates = True - return event - else: - event.perf_message = self.process_event(event) - event.portfolio = self.get_portfolio() - del event['TRANSACTION'] - return event + + def transform(self, stream_in): + """ + Main generator work loop. + """ + for event in stream_in: + if event.dt == "DONE": + event.perf_message = self.handle_simulation_end() + del event['TRANSACTION'] + yield event + elif self.exceeded_max_loss: + # in case of max_loss, signal to downstream + # generators that we are done. + event.dt = "DONE" + event.perf_message = self.handle_simulation_end() + del event['TRANSACTION'] + yield event + # Cut off the rest of the stream. + yield StopIteration() + else: + event.perf_message = self.process_event(event) + event.portfolio = self.get_portfolio() + del event['TRANSACTION'] + yield event def get_portfolio(self): return self.cumulative_performance.as_portfolio() @@ -241,8 +242,6 @@ class PerformanceTracker(object): Publish the performance results asynchronously to a socket. """ - #assert isinstance(results_addr, basestring), type(results_addr) - #self.results_addr = results_addr self.results_socket = results_addr def to_dict(self): @@ -267,7 +266,7 @@ class PerformanceTracker(object): message = None if self.exceeded_max_loss: - return + return message assert isinstance(event, zp.ndict) self.event_count += 1 @@ -288,7 +287,6 @@ class PerformanceTracker(object): self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() - return message def handle_market_close(self): diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 2fa99d9b..4b971394 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -9,7 +9,6 @@ from zipline.protocol import SIMULATION_STYLE log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): - UPDATER = True def __init__(self, sid_filter, style=SIMULATION_STYLE.PARTIAL_VOLUME): self.open_orders = {} @@ -35,6 +34,13 @@ class TransactionSimulator(object): order.filled = 0 self.open_orders[order.sid].append(order) + def transform(self, stream_in): + """ + Main generator work loop. + """ + for event in stream_in: + yield self.update(event) + def update(self, event): event.TRANSACTION = None # We only fill transactions on trade events. diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index 78bea152..cec8b448 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -43,7 +43,9 @@ def merged_transforms(sorted_stream, *transforms): """ for transform in transforms: assert isinstance(transform, StatefulTransform) - transform.set_copying() + transform.merged = True + transform.sequential = False + # Generate expected hashes for each transform namestrings = [tnfm.get_hash() for tnfm in transforms] diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 5d036495..83f077fe 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -61,10 +61,16 @@ class TradeSimulationClient(object): self.sids = algo.get_sid_filter() self.environment = environment self.style = sim_style - self.algo_sim = None - self.warmup_start = self.environment.prior_day_open + self.ordering_client = TransactionSimulator(self.sids, style=self.style) + self.perf_tracker = PerformanceTracker(self.environment, self.sids) + self.algo_start = self.environment.first_open + self.algo_sim = AlgorithmSimulator( + self.ordering_client, + self.algo, + self.algo_start + ) def get_hash(self): """ @@ -79,56 +85,36 @@ class TradeSimulationClient(object): """ # Simulate filling any open orders made by the previous run of - # the user's algorithm. Sets the txn field to true on any + # the user's algorithm. Sets the TRANSACTION field to true on any # event that results in a filled order. - ordering_client = StatefulTransform( - TransactionSimulator, - self.sids, - style = self.style - ) - with_filled_orders = ordering_client.transform(stream_in) + with_filled_orders = self.ordering_client.transform(stream_in) # Pipe the events with transactions to perf. This will remove - # the txn field added by TransactionSimulator and replace it - # with a portfolio object to be passed to the user's + # the TRANSACTION field added by TransactionSimulator and replace it + # with a portfolio field to be passed to the user's # algorithm. Also adds a perf_message field which is usually # none, but contains an update message once per day. - perf_tracker = StatefulTransform( - PerformanceTracker, - self.environment, - self.sids - ) - with_portfolio = perf_tracker.transform(with_filled_orders) + with_portfolio = self.perf_tracker.transform(with_filled_orders) - # Pass the messages from perf along with the trading client's - # state into the algorithm for simulation. We provide a - # pointer to the ordering client's internal state so that the - # algorithm can place new orders into the client's order book. - self.algo_sim = AlgorithmSimulator( - with_portfolio, - ordering_client.state, - self.algo, - self.algo_start - ) + # Pass the messages from perf to the user's algorithm for simulation. + # Events are batched by dt so that the algo handles all events for a + # given timestamp at one one go. + performance_messages = self.algo_sim.transform(with_portfolio) # The algorithm will yield a daily_results message (as # calculated by the performance tracker) at the end of each # day. It will also yield a risk report at the end of the # simulation. - - for message in self.algo_sim: + for message in performance_messages: yield message class AlgorithmSimulator(object): - + def __init__(self, - stream_in, order_book, algo, algo_start): - - self.stream_in = stream_in - + # ========== # Algo Setup # ========== @@ -168,7 +154,6 @@ class AlgorithmSimulator(object): # The algorithm's universe as of our most recent event. self.universe = ndict() - for sid in self.sids: self.universe[sid] = ndict() self.universe.portfolio = None @@ -188,22 +173,10 @@ class AlgorithmSimulator(object): record.extra['algo_dt'] = self.snapshot_dt self.processor = Processor(inject_algo_dt) - # This is a class, which is instantiated later - # in run_algorithm. The class provides a generator. + # Single_use generator that uses the @contextmanager decorator + # to monkey patch sys.stdout with a logbook interface. self.stdout_capture = stdout_only_pipe - self.__generator = None - - def __iter__(self): - return self - - def next(self): - if self.__generator: - return self.__generator.next() - else: - self.__generator = self._gen() - return self.__generator.next() - def order(self, sid, amount): """ Closure to pass into the user's algo to allow placing orders @@ -232,10 +205,10 @@ class AlgorithmSimulator(object): # simulator so that it can fill the placed order when it # receives its next message. self.order_book.place_order(order) - - def _gen(self): + + def transform(self, stream_in): """ - Internal generator work loop. + Main generator work loop. """ # Capture any output of this generator to stdout and pipe it # to a logbook interface. Also inject the current algo @@ -248,7 +221,7 @@ class AlgorithmSimulator(object): # Group together events with the same dt field. This depends on the # events already being sorted. - for date, snapshot in groupby(self.stream_in, lambda e: e.dt): + for date, snapshot in groupby(stream_in, lambda e: e.dt): # Set the simulation date to be the first event we see. # This should only occur once, at the start of the test. @@ -259,7 +232,7 @@ class AlgorithmSimulator(object): if date == 'DONE': for event in snapshot: yield event.perf_message - break + raise StopIteration() # We're still in the warmup period. Use the event to # update our universe, but don't yield any perf messages, diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 612cd098..e10380b6 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -19,7 +19,7 @@ from zipline.gens.utils import assert_sort_unframe_protocol, \ log = logbook.Logger('Transform') class Passthrough(object): - FORWARDER = True + PASSTHROUGH = True """ Trivial class for forwarding events. """ @@ -29,23 +29,6 @@ class Passthrough(object): def update(self, event): pass -# Deprecated -def functional_transform(stream_in, func, *args, **kwargs): - """ - Generic transform generator that takes each message from an in-stream - and yields the output of a function on that message. Not sure how - useful this will be in reality, but good for testing. - """ - assert isinstance(func, types.FunctionType), \ - "Functional" - namestring = func.__name__ + hash_args(*args, **kwargs) - - for message in stream_in: - assert_sort_unframe_protocol(message) - out_value = func(message, *args, **kwargs) - assert_transform_protocol(out_value) - yield(namestring, out_value) - class StatefulTransform(object): """ Generic transform generator that takes each message from an @@ -61,18 +44,15 @@ class StatefulTransform(object): assert tnfm_class.__dict__.has_key('update'), \ "Stateful transform requires the class to have an update method" - self.forward_all = tnfm_class.__dict__.get('FORWARDER', False) - self.update_in_place = tnfm_class.__dict__.get('UPDATER', False) - self.append_value = tnfm_class.__dict__.get('APPENDER', False) - - # You only one special behavior mode can be set. - assert sum(map(int, [self.forward_all, - self.update_in_place, - self.append_value])) <= 1 - + # Flag set inside the Passthrough transform class to signify special + # behavior if we are being fed to merged_transforms. + self.passthrough = tnfm_class.__dict__.get('PASSTHROUGH', False) + + self.sequential = True + self.merged = False + # Create an instance of our transform class. self.state = tnfm_class(*args, **kwargs) - self._copying = False # Create the string associated with this generator's output. self.namestring = tnfm_class.__name__ + hash_args(*args, **kwargs) @@ -81,9 +61,6 @@ class StatefulTransform(object): def get_hash(self): return self.namestring - def set_copyting(self): - self._copying = True - def transform(self, stream_in): return self._gen(stream_in) @@ -101,59 +78,48 @@ class StatefulTransform(object): assert_sort_unframe_protocol(message) - # Copying flag is used by merged_transforms to ensure + # This flag is set by by merged_transforms to ensure # isolation of messages. - if self._copying: + if self.merged: message = deepcopy(message) - - # Same shared pointer issue here as above. + tnfm_value = self.state.update(message) - # FORWARDER flag means we want to keep all original + # PASSTHROUGH flag means we want to keep all original # values, plus append tnfm_id and tnfm_value. Used for # preserving the original event fields when our output # will be fed into a merge. Currently only Passthrough # uses this flag. - if self.forward_all: + if self.passthrough and self.merged: out_message = message out_message.tnfm_id = self.namestring out_message.tnfm_value = tnfm_value yield out_message - # UPDATER flag should be used for transforms that - # side-effectfully modify the event they are passed. - # Updated messages are passed along exactly as they are - # returned to use by our state class. Useful for chaining - # specific transforms that won't be fed to a merge. (See - # the implementation of TradeSimulationClient for example - # usage of this flag with PerformanceTracker and - # TransactionSimulator. - elif self.update_in_place: - yield tnfm_value - - # APPENDER flag should be used to add a single new - # key-value pair to the event. The new key is this - # transform's namestring, and it's value is the value - # returned by state.update(event). This is almost - # identical to the behavior of FORWARDER, except we - # compress the two calculated values (tnfm_id, and - # tnfm_value) into a single field. This mode is used by - # the sequential_transforms composite. - elif self.append_value: - out_message = message - out_message[self.namestring] = tnfm_value - yield out_message - - # If no flags are set, we create a new message containing - # just the tnfm_id, the event's datetime, and the - # calculated tnfm_value. This is the default behavior for - # a transform being fed into a merge. - else: + # If the merged flag is set, we create a new message + # containing just the tnfm_id, the event's datetime, and + # the calculated tnfm_value. This is the default behavior + # for a non-passthrough transform being fed into a merge. + elif self.merged: out_message = ndict() out_message.tnfm_id = self.namestring out_message.tnfm_value = tnfm_value out_message.dt = message.dt yield out_message + + # Sequential flag should be used to add a single new + # key-value pair to the event. The new key is this + # transform's namestring, and its value is the value + # returned by state.update(event). This is almost + # identical to the behavior of FORWARDER, except we + # compress the two calculated values (tnfm_id, and + # tnfm_value) into a single field. This mode is used by + # the sequential_transforms composite and is the default + # if no behavior is specified by the internal state class. + elif self.sequential: + out_message = message + out_message[self.namestring] = tnfm_value + yield out_message log.info('Finished StatefulTransform [%s]' % self.get_hash()) class EventWindow: From 24fddfbde0c463a0a4b7efd7b64bf19b75977f3f Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Wed, 22 Aug 2012 02:50:16 -0400 Subject: [PATCH 5/8] tradingcalender, attempt #2 --- tests/test_transforms.py | 78 +++---- zipline/finance/performance.py | 2 +- zipline/gens/composites.py | 8 +- zipline/gens/transform.py | 50 ++++- zipline/utils/tradingcalendar.py | 350 +++++-------------------------- 5 files changed, 132 insertions(+), 356 deletions(-) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index e515d725..8e203664 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -46,24 +46,20 @@ class EventWindowTestCase(TestCase): def setUp(self): setup_logger(self) - # Constants calling before open, during the day, and after - # close on a valid trading day. - self.pre_open = datetime(2012, 8, 7, 13, tzinfo = pytz.utc) - self.mid_day = datetime(2012, 8, 7, 15, tzinfo = pytz.utc) - self.post_close = datetime(2012, 8, 7, 22, tzinfo = pytz.utc) + self.monday = datetime(2012, 7, 9, 16, tzinfo=pytz.utc) + self.eleven_normal_days = [self.monday + i*timedelta(days=1) + for i in xrange(11)] - # Constants calling before open, during the day, and after - # close on a saturday. - self.pre_open_saturday = datetime(2012, 8, 11, 13, tzinfo = pytz.utc) - self.mid_day_saturday = datetime(2012, 8, 11, 15, tzinfo = pytz.utc) - self.post_close_saturday = datetime(2012, 8, 11, 22, tzinfo = pytz.utc) + # Modify the end of the period slightly to exercise the + # incomplete day logic. + self.eleven_normal_days[-1] -= timedelta(minutes = 1) + self.eleven_normal_days.append(self.monday+timedelta(days=11,seconds=1)) + + # Second set of dates to test holiday handling. + self.jul4_monday = datetime(2012, 7, 2, 16, tzinfo=pytz.utc) + self.week_of_jul4 = [self.jul4_monday + i*timedelta(days=1) + for i in xrange(5)] - # Constants calling before open, during the day, and after - # close on a holiday. - self.pre_open_holiday = datetime(2012, 12, 25, 13, tzinfo = pytz.utc) - self.mid_day_holiday = datetime(2012, 12, 25, tzinfo = pytz.utc) - self.post_close_holiday = datetime(2012, 12, 25, 22, tzinfo = pytz.utc) - def test_event_window_with_timedelta(self): # Keep all events within a 5 minute window. @@ -96,58 +92,46 @@ class EventWindowTestCase(TestCase): for dropped in window.removed: assert message.dt - dropped.dt >= timedelta(minutes = 5) - def test_market_aware_window(self): + def test_market_aware_window_normal_week(self): window = NoopEventWindow( market_aware = True, delta = None, - days = 1 + days = 3 ) - - dates = ([self.pre_open]*3) - dates += ([self.mid_day]*3) - dates += ([self.post_close]*3) - dates += [self.pre_open + timedelta(days = 1, seconds = 1)] - events = [to_dt(date) for date in dates] - + events = [to_dt(date) for date in self.eleven_normal_days] + lengths = [] # Run the events. for event in events: window.update(event) + # Record the length of the window after each event. + lengths.append(len(window.ticks)) - # We should have removed the pre_open events on the first day. - # The rest should be intact. - + # The window stretches out during the weekend because we wait + # to drop events until the weekend ends. The last window is + # briefly longer because it doesn't complete a full day. The + # window then shrinks once the day completes + assert lengths == [1, 2, 3, 3, 3, 4, 5, 5, 5, 3, 4, 3] assert window.added == events - assert window.removed == events[0:3] - assert list(window.ticks) == events[3:] + assert window.removed == events[:-3] - def test_market_aware_window_weekend(self): + def test_market_aware_window_holiday(self): window = NoopEventWindow( market_aware = True, delta = None, days = 2 ) - dates = [self.pre_open_saturday - timedelta(days = 1, seconds=1)] - dates += [self.mid_day_saturday - timedelta(days = 1, seconds=1)] - dates += [self.post_close_saturday - timedelta(days = 1, seconds=1)] - dates += [self.mid_day_saturday + timedelta(days = 1)] - - events = [to_dt(date) for date in dates] + events = [to_dt(date) for date in self.week_of_jul4] + lengths = [] # Run the events. for event in events: window.update(event) + # Record the length of the window after each event. + lengths.append(len(window.ticks)) - # We shouldn't remove any events. + assert lengths == [1, 2, 3, 3, 2] assert window.added == events - assert window.removed == [] - assert list(window.ticks) == events - - extra = to_dt(self.mid_day_saturday + timedelta(days = 2)) - window.update(extra) - - # We should remove only the first event. - assert window.removed == [events[0]] - assert list(window.ticks) == events[1:] + [extra] + assert window.removed == events[:-2] def tearDown(self): setup_logger(self) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 05a3a875..de7809d2 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -219,7 +219,7 @@ class PerformanceTracker(object): del event['TRANSACTION'] yield event # Cut off the rest of the stream. - yield StopIteration() + raise StopIteration() else: event.perf_message = self.process_event(event) event.portfolio = self.get_portfolio() diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index cec8b448..63d69d0f 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -77,11 +77,11 @@ def sequential_transforms(stream_in, *transforms): """ assert isinstance(transforms, (list, tuple)) - for tnfm in transforms: - tnfm.forward_all = False - tnfm.update_in_place = False - tnfm.append_value = True + for tnfm in transforms: + tnfm.sequential = True + tnfm.merged = False + # Recursively apply all transforms to the stream. stream_out = reduce(lambda stream, tnfm: tnfm.transform(stream), transforms, diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index e10380b6..33827448 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -12,7 +12,7 @@ from numbers import Number from abc import ABCMeta, abstractmethod from zipline import ndict -from zipline.utils.tradingcalendar import trading_days_between +from zipline.utils.tradingcalendar import non_trading_days from zipline.gens.utils import assert_sort_unframe_protocol, \ assert_transform_protocol, hash_args @@ -48,8 +48,11 @@ class StatefulTransform(object): # behavior if we are being fed to merged_transforms. self.passthrough = tnfm_class.__dict__.get('PASSTHROUGH', False) - self.sequential = True - self.merged = False + # Flags specifying how to append the calculated value. + # Merged is the default for ease of testing, but we use sequential + # in production. + self.sequential = False + self.merged = True # Create an instance of our transform class. self.state = tnfm_class(*args, **kwargs) @@ -120,8 +123,9 @@ class StatefulTransform(object): out_message = message out_message[self.namestring] = tnfm_value yield out_message - + log.info('Finished StatefulTransform [%s]' % self.get_hash()) + class EventWindow: """ Abstract base class for transform classes that calculate iterative @@ -153,8 +157,13 @@ class EventWindow: # Market-aware mode only works with full-day windows. if self.market_aware: - assert self.days and not self.delta,\ + assert self.days and self.delta == None,\ "Market-aware mode only works with full-day windows." + self.all_holidays = deque(non_trading_days) + self.cur_holidays = deque() + # Keeping a copy of days as a timedelta makes it easier + # to track holidays. + self.delta = timedelta(days=self.days) # Non-market-aware mode requires a timedelta. else: @@ -188,6 +197,9 @@ class EventWindow: # Subclasses should override handle_add to define behavior for # adding new ticks. self.handle_add(event) + + if self.market_aware: + self.add_new_holidays(event.dt) # Clear out any expired events. drop_condition changes depending # on whether or not we are running in market_aware mode. @@ -196,16 +208,40 @@ class EventWindow: # | | # V V while self.drop_condition(self.ticks[0].dt, self.ticks[-1].dt): - + # popleft removes and returns the oldest tick in self.ticks popped = self.ticks.popleft() # Subclasses should override handle_remove to define # behavior for removing ticks. self.handle_remove(popped) + + def add_new_holidays(self, newest): + # Add to our tracked window any untracked holidays that are + # older than our newest event. (newest should always be + # self.ticks[-1]) + while len(self.all_holidays) > 0 and self.all_holidays[0] <= newest: + self.cur_holidays.append(self.all_holidays.popleft()) + + def drop_old_holidays(self, oldest): + # Drop from our tracked window any holidays that are older + # than our oldest tracked event. (oldest should always + # be self.ticks[0]) + while len(self.cur_holidays) > 0 and self.cur_holidays[0] < oldest: + self.cur_holidays.popleft() def out_of_market_window(self, oldest, newest): - return trading_days_between(oldest, newest) >= self.days + self.drop_old_holidays(oldest) + calendar_dates_between = (newest.date() - oldest.date()).days + holidays_between = len(self.cur_holidays) + trading_days_between = calendar_dates_between - holidays_between + + # "Put back" a day if oldest is earlier in its day than newest, + # reflecting the fact that we haven't yet completed the last + # day in the window. + if oldest.time() > newest.time(): + trading_days_between -= 1 + return trading_days_between >= self.days def out_of_delta(self, oldest, newest): return (newest - oldest) >= self.delta diff --git a/zipline/utils/tradingcalendar.py b/zipline/utils/tradingcalendar.py index f760e51e..36dd3eea 100644 --- a/zipline/utils/tradingcalendar.py +++ b/zipline/utils/tradingcalendar.py @@ -1,358 +1,114 @@ import pytz -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date from dateutil import rrule from zipline.utils.date_utils import utcnow -def market_opens(start, end, inclusive=False): - """ - Returns all market opens between the start date and the end date. - Must use utc-stamped datetimes. - """ - return opens.between(start, end, inc=inclusive) +start = datetime(2002, 1,1, tzinfo=pytz.utc) +end = utcnow() -def market_closes(start, end, inclusive=False): - """ - Returns all market closes between the start date and the end date. - Must use utc-stamped datetimes. - """ - return closes.between(start, end, inc=inclusive) +non_trading_rules = [] -def trading_days_between(start, end): - """ - Calculate the number of "complete" trading days between two - events. We define this as the number of market opens that - occurred between start and end, with the caveat that we subtract 1 - from this total if end falls on the same day as the last market - open and end occurs earlier in its own day than start. This - reflects the fact that we haven't completed a full day - corresponding to the last market open. - - Examples: - - 1.) - start = Tuesday, Aug 7, 2012, 1:00 pm - end = Wednesday, Aug 8, 2012, 1:30 pm - - There is one market open between these dates, on the morning of - Wednesday the 8th. This falls on the same calendar day as end, - but end is later in the day than start, so we count this as a full - day. The correct output is 1. - - 2.) - start = Tuesday, Aug 7, 2012, 1:30 pm - end = Wednesday, Aug 8, 2012, 1:00 pm - - There is one market open between these dayes, on the morning of - Wednesday the 8th. This falls on the same calendar day as end, - and end is earlier in the day than start, so we do not count this - day as completed. The correct output is 0. - - 3.) - start = Tuesday, Aug 7, 2012, 1:00 pm - end = Saturday, Aug 11, 2012, 1:30 pm - - There are 3 market opens between these dates, occurring on - Wednesday, Thursday, and Friday. The last open is not on - the same day as end, so we simply return 3 - - 4.) - start = Tuesday, Aug 7, 2012, 1:30 pm - end = Monday, Aug, 13, 2012, 1:00 pm - - There are 4 market opens between these dates, occurring on - Wednesday, Thursday, Friday, and the following Monday. The - last open occurs on the same calendar day as end, and end - is earlier in the day than start, so we do not count the - last market day as completed. The correct output is 3 days. - """ - # Calculate the number of opens between the events. - opens = (market_opens(start, end)) - days_between = len(opens) - if days_between == 0: - return days_between - - # If end falls on the same day as an open, subtract 1 from the - # total if end is earlier in its respective day than start. - last_open = opens[-1] - if last_open.date() == end.date() and earlier_in_day(end, start): - days_between -=1 - - return days_between - -def earlier_in_day(d1, d2): - """ - Return true if d1 falls earlier in its own day than d2. - """ - return d1.time() < d2.time() - -WEEKDAYS = [rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR] - -# Recurrence rule that generates all market opens since Jan 1, 1970. -# This does not exclude holidays. -market_opens_with_holidays = rrule.rrule( - rrule.DAILY, - byweekday=WEEKDAYS, - byhour = 14, - byminute = 30, +weekends = rrule.rrule( + rrule.YEARLY, + byweekday=(rrule.SA, rrule.SU), cache = True, - dtstart=datetime(2000, 1, 1, tzinfo = pytz.utc), - until=datetime(2014 , 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(weekends) -# Recurrence rule that generates all market closes since Jan 1, 1970. -# This does not exclude holidays. -market_closes_with_holidays = rrule.rrule( - rrule.DAILY, - byweekday=WEEKDAYS, - byhour = 21, - byminute = 0, - cache = True, - dtstart=datetime(2001, 1, 1, tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) - -# Recurrence rules for excluding the market open/close on new years. -new_years_opens = rrule.rrule( +new_years = rrule.rrule( rrule.MONTHLY, byyearday = 1, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -new_years_closes = rrule.rrule( - rrule.MONTHLY, - byyearday = 1, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(new_years) -# Recurrence rules for excluding MLK day. It is always the third -# monday in January. -mlk_opens = rrule.rrule( - rrule.MONTHLY, - bymonth = 1, - byweekday = (rrule.MO(3)), - byhour = 14, - byminute = 30, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -mlk_closes = rrule.rrule( +mlk_day = rrule.rrule( rrule.MONTHLY, bymonth = 1, byweekday = (rrule.MO(+3)), - byhour = 21, - byminute = 0, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) + dtstart = start, + until = end +) +non_trading_rules.append(mlk_day) -# Recurrence rules for generating the market open/close for -# presidents' day. Presidents' day always occurs on the third monday -# of February. -presidents_day_opens = rrule.rrule( +presidents_day = rrule.rrule( rrule.MONTHLY, bymonth = 2, byweekday = (rrule.MO(3)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -presidents_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 2, - byweekday = (rrule.MO(3)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(presidents_day) -# Recurrence rules for generating the market open/close for good -# friday. Good friday always falls 2 days before easter, which -# thankfully is a built-in refernce in this module. -good_friday_opens = rrule.rrule( +good_friday = rrule.rrule( rrule.DAILY, byeaster = -2, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) -good_friday_closes = rrule.rrule( - rrule.DAILY, - byeaster = -2, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) +non_trading_rules.append(good_friday) -# Recurrence rules for generating the market open/close for memorial -# day. Memorial day always occurs on the last monday of May. -memorial_day_opens = rrule.rrule( +memorial_day = rrule.rrule( rrule.MONTHLY, bymonth = 5, byweekday = (rrule.MO(-1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -memorial_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 5, - byweekday = (rrule.MO(-1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(memorial_day) -# Recurrence rules for generating the market open/close for July 4th. -july_4th_opens = rrule.rrule( +july_4th = rrule.rrule( rrule.MONTHLY, - bymonth = 6, + bymonth = 7, bymonthday = 4, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -july_4th_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 6, - bymonthday = 4, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(july_4th) -# Recurrence rule for generating the market open/close for labor day. -# Labor day is always the first monday of September. -labor_day_opens = rrule.rrule( +labor_day = rrule.rrule( rrule.MONTHLY, bymonth = 9, byweekday = (rrule.MO(1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -labor_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 9, - byweekday = (rrule.MO(1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(labor_day) -# Recurrence rule for generating the market open/close for -# thanksgiving. Thanksgiving always falls on the fourth thursday in -# November. (Who decides how these holidays work!?!) -thanksgiving_opens = rrule.rrule( +thanksgiving = rrule.rrule( rrule.MONTHLY, bymonth = 11, byweekday = (rrule.TH(-1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -thanksgiving_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 11, - byweekday = (rrule.TH(-1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(thanksgiving) -# Recurrence relation for generating the market open/close for -# christmas. Christmas always occurs on december 25th. - -christmas_opens = rrule.rrule( +christmas = rrule.rrule( rrule.MONTHLY, bymonth = 12, - bymonthday = 25, - byhour = 14, - byminute = 30, + bymonthday = 25, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -christmas_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 12, - bymonthday = 25, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(christmas) -# All NYSE observed holidays. -holiday_opens = [ - new_years_opens, - mlk_opens, - presidents_day_opens, - good_friday_opens, - memorial_day_opens, - july_4th_opens, - labor_day_opens, - thanksgiving_opens, - christmas_opens -] -holiday_closes = [ - new_years_closes, - mlk_closes, - presidents_day_closes, - good_friday_closes, - memorial_day_closes, - july_4th_closes, - labor_day_closes, - thanksgiving_closes, - christmas_closes -] +non_trading_ruleset = rrule.rruleset() -# Valid market opens are given by all market opens minus holidays. -opens = rrule.rruleset(cache=True) -opens.rrule(market_opens_with_holidays) -for holiday_rule in holiday_opens: - opens.exrule(holiday_rule) - -closes = rrule.rruleset(cache=True) -closes.rrule(market_closes_with_holidays) -for holiday_rule in holiday_closes: - closes.exrule(holiday_rule) - -# This runs the calendar to load all data into a cache. -open_count = opens.count() -close_count = closes.count() +for rule in non_trading_rules: + non_trading_ruleset.rrule(rule) +non_trading_days = non_trading_ruleset.between(start, end, inc=True) From 55ffa2b3919c181bd10f9c0a4132e57dc8d20a5e Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 23 Aug 2012 10:47:35 -0400 Subject: [PATCH 6/8] fix bug where trading environment enters infinite loop on invalid dates --- zipline/finance/trading.py | 10 +++++++++- zipline/gens/transform.py | 3 --- zipline/lines.py | 5 ++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 4b971394..a53b5f87 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -183,13 +183,21 @@ class TradingEnvironment(object): self.period_trading_days = None self.max_drawdown = max_drawdown + assert self.period_start <= self.period_end, \ + "Period start falls after period end." + for bm in benchmark_returns: self.trading_days.append(bm.date) self.trading_day_map[bm.date] = bm + assert self.period_start <= self.trading_days[-1], \ + "Period start falls after the last known trading day." + assert self.period_end >= self.trading_days[0], \ + "Period end falls before the first known trading day." + self.first_open = self.calculate_first_open() self.last_close = self.calculate_last_close() - + self.prior_day_open = self.calculate_prior_day_open() def calculate_first_open(self): diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 33827448..c19501ee 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -161,9 +161,6 @@ class EventWindow: "Market-aware mode only works with full-day windows." self.all_holidays = deque(non_trading_days) self.cur_holidays = deque() - # Keeping a copy of days as a timedelta makes it easier - # to track holidays. - self.delta = timedelta(days=self.days) # Non-market-aware mode requires a timedelta. else: diff --git a/zipline/lines.py b/zipline/lines.py index 5bf786f3..419c5e93 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -181,7 +181,7 @@ class SimulatedTrading(object): else: log.warning("Sending SIGINT") os.kill(ppid, SIGINT) - + def handle_exception(self, exc): if isinstance(exc, CancelSignal): # signal from monitor of an orderly shutdown, @@ -208,9 +208,8 @@ class SimulatedTrading(object): exc_type.__name__, exc_value.message ) - + self.results_socket.send(msg) - except: log.exception("Exception while reporting simulation exception.") From 567659d058003f2e50b158169970aecd66f52830 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 23 Aug 2012 11:48:52 -0400 Subject: [PATCH 7/8] rename capitalize heartbeat and timeout --- zipline/utils/timeout.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 4d61a030..5dab8340 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -6,7 +6,7 @@ from pprint import pprint as pp from numbers import Number from logbook import Logger -class Timeout(Exception): +class TimeoutException(Exception): def __init__(self, frame, message=''): self.frame = frame @@ -14,7 +14,7 @@ class Timeout(Exception): # TODO: fix code replication here. -class timeout(object): +class Timeout(object): """ Utility to make a function raise TimeoutException if it spends more than a specified number of seconds executing. Can be used @@ -47,7 +47,7 @@ class timeout(object): # call to fn takes too long. finally: signal.setitimer(signal.ITIMER_REAL, 0, 0) - signal.signal(signal.SIGALRM, self.prior_handler) + signal.signal(signal.SIGALRM, signal.SIG_DFL) # Return the value of fn if it finished before the alarm. This # won't execute if the Timeout was raised. @@ -65,7 +65,7 @@ class timeout(object): signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, 0, 0) -class heartbeat(object): +class Heartbeat(object): """ Utility to perform pseudo-heartbeat checks on a single-threaded function. Calls frame_handler on the current stack frame of the From 22523b5c12ad61a710e074b1698aaab0b4ee4c10 Mon Sep 17 00:00:00 2001 From: scottsanderson Date: Thu, 23 Aug 2012 13:09:31 -0400 Subject: [PATCH 8/8] clean up cruft and fix bugs from timeout rename --- tests/test_exception_handling.py | 4 ++-- zipline/finance/performance.py | 18 ------------------ zipline/gens/tradesimulation.py | 8 ++++---- zipline/utils/timeout.py | 6 +++--- 4 files changed, 9 insertions(+), 27 deletions(-) diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index f16111ee..a9d6063e 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -160,7 +160,7 @@ class ExceptionTestCase(TestCase): output, _ = drain_zipline(self, zipline) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] - self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['name'],'TimeoutException') self.assertEqual(payload['message'], 'Call to initialize timed out') def test_heartbeat(self): @@ -186,6 +186,6 @@ class ExceptionTestCase(TestCase): # Assert that the last message is a timeout exception. self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] - self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['name'],'TimeoutException') self.assertEqual(payload['message'], 'Too much time spent in handle_data call') diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index de7809d2..487c25de 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -167,9 +167,6 @@ class PerformanceTracker(object): self.last_dict = None self.exceeded_max_loss = False - self.results_socket = None - self.results_addr = None - # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( # initial positions are empty @@ -229,21 +226,6 @@ class PerformanceTracker(object): def get_portfolio(self): return self.cumulative_performance.as_portfolio() - def open(self, context): - if self.results_addr: - sock = context.socket(zmq.PUSH) - sock.connect(self.results_addr) - self.results_socket = sock - else: - log.warn("Not streaming results because no results socket given") - - def publish_to(self, results_addr): - """ - Publish the performance results asynchronously to a - socket. - """ - self.results_socket = results_addr - def to_dict(self): """ Creates a dictionary representing the state of this tracker. diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 83f077fe..98aa727f 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -6,7 +6,7 @@ from numbers import Integral from itertools import groupby from zipline import ndict -from zipline.utils.timeout import timeout, heartbeat, Timeout +from zipline.utils.timeout import Heartbeat, Timeout from zipline.gens.transform import StatefulTransform from zipline.finance.trading import TransactionSimulator @@ -85,7 +85,7 @@ class TradeSimulationClient(object): """ # Simulate filling any open orders made by the previous run of - # the user's algorithm. Sets the TRANSACTION field to true on any + # the user's algorithm. Fills the Transaction field on any # event that results in a filled order. with_filled_orders = self.ordering_client.transform(stream_in) @@ -141,7 +141,7 @@ class AlgorithmSimulator(object): # Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL # seconds, raising an exception after MAX_HEARTBEATS - self.heartbeat_monitor = heartbeat( + self.heartbeat_monitor = Heartbeat( HEARTBEAT_INTERVAL, MAX_HEARTBEAT_INTERVALS, frame_handler=log_heartbeats, @@ -216,7 +216,7 @@ class AlgorithmSimulator(object): with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''): # Call user's initialize method with a timeout. - with timeout(INIT_TIMEOUT, message="Call to initialize timed out"): + with Timeout(INIT_TIMEOUT, message="Call to initialize timed out"): self.algo.initialize() # Group together events with the same dt field. This depends on the diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 5dab8340..34d4a4a5 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -21,7 +21,7 @@ class Timeout(object): as a decorator to apply a static timeout to a function, or as a context manager to dynamically add a timeout to a code block. """ - + def __init__(self, seconds, message=''): self.seconds = seconds self.message = message @@ -29,7 +29,7 @@ class Timeout(object): assert seconds > 0, "Timeout must be greater than 0" def handler(self, signum, frame): - raise Timeout(frame, self.message) + raise TimeoutException(frame, self.message) def __call__(self, fn): @@ -91,7 +91,7 @@ class Heartbeat(object): self.frame_handler(self.count, frame) if self.count >= self.max_intervals: - raise Timeout(frame, self.timeout_message) + raise TimeoutException(frame, self.timeout_message) def __call__(self, fn):