diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index f16111ee..a9d6063e 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -160,7 +160,7 @@ class ExceptionTestCase(TestCase): output, _ = drain_zipline(self, zipline) self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] - self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['name'],'TimeoutException') self.assertEqual(payload['message'], 'Call to initialize timed out') def test_heartbeat(self): @@ -186,6 +186,6 @@ class ExceptionTestCase(TestCase): # Assert that the last message is a timeout exception. self.assertEqual(output[-1]['prefix'], 'EXCEPTION') payload = output[-1]['payload'] - self.assertEqual(payload['name'],'Timeout') + self.assertEqual(payload['name'],'TimeoutException') self.assertEqual(payload['message'], 'Too much time spent in handle_data call') diff --git a/tests/test_transforms.py b/tests/test_transforms.py index e515d725..8e203664 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -46,24 +46,20 @@ class EventWindowTestCase(TestCase): def setUp(self): setup_logger(self) - # Constants calling before open, during the day, and after - # close on a valid trading day. - self.pre_open = datetime(2012, 8, 7, 13, tzinfo = pytz.utc) - self.mid_day = datetime(2012, 8, 7, 15, tzinfo = pytz.utc) - self.post_close = datetime(2012, 8, 7, 22, tzinfo = pytz.utc) + self.monday = datetime(2012, 7, 9, 16, tzinfo=pytz.utc) + self.eleven_normal_days = [self.monday + i*timedelta(days=1) + for i in xrange(11)] - # Constants calling before open, during the day, and after - # close on a saturday. - self.pre_open_saturday = datetime(2012, 8, 11, 13, tzinfo = pytz.utc) - self.mid_day_saturday = datetime(2012, 8, 11, 15, tzinfo = pytz.utc) - self.post_close_saturday = datetime(2012, 8, 11, 22, tzinfo = pytz.utc) + # Modify the end of the period slightly to exercise the + # incomplete day logic. + self.eleven_normal_days[-1] -= timedelta(minutes = 1) + self.eleven_normal_days.append(self.monday+timedelta(days=11,seconds=1)) + + # Second set of dates to test holiday handling. + self.jul4_monday = datetime(2012, 7, 2, 16, tzinfo=pytz.utc) + self.week_of_jul4 = [self.jul4_monday + i*timedelta(days=1) + for i in xrange(5)] - # Constants calling before open, during the day, and after - # close on a holiday. - self.pre_open_holiday = datetime(2012, 12, 25, 13, tzinfo = pytz.utc) - self.mid_day_holiday = datetime(2012, 12, 25, tzinfo = pytz.utc) - self.post_close_holiday = datetime(2012, 12, 25, 22, tzinfo = pytz.utc) - def test_event_window_with_timedelta(self): # Keep all events within a 5 minute window. @@ -96,58 +92,46 @@ class EventWindowTestCase(TestCase): for dropped in window.removed: assert message.dt - dropped.dt >= timedelta(minutes = 5) - def test_market_aware_window(self): + def test_market_aware_window_normal_week(self): window = NoopEventWindow( market_aware = True, delta = None, - days = 1 + days = 3 ) - - dates = ([self.pre_open]*3) - dates += ([self.mid_day]*3) - dates += ([self.post_close]*3) - dates += [self.pre_open + timedelta(days = 1, seconds = 1)] - events = [to_dt(date) for date in dates] - + events = [to_dt(date) for date in self.eleven_normal_days] + lengths = [] # Run the events. for event in events: window.update(event) + # Record the length of the window after each event. + lengths.append(len(window.ticks)) - # We should have removed the pre_open events on the first day. - # The rest should be intact. - + # The window stretches out during the weekend because we wait + # to drop events until the weekend ends. The last window is + # briefly longer because it doesn't complete a full day. The + # window then shrinks once the day completes + assert lengths == [1, 2, 3, 3, 3, 4, 5, 5, 5, 3, 4, 3] assert window.added == events - assert window.removed == events[0:3] - assert list(window.ticks) == events[3:] + assert window.removed == events[:-3] - def test_market_aware_window_weekend(self): + def test_market_aware_window_holiday(self): window = NoopEventWindow( market_aware = True, delta = None, days = 2 ) - dates = [self.pre_open_saturday - timedelta(days = 1, seconds=1)] - dates += [self.mid_day_saturday - timedelta(days = 1, seconds=1)] - dates += [self.post_close_saturday - timedelta(days = 1, seconds=1)] - dates += [self.mid_day_saturday + timedelta(days = 1)] - - events = [to_dt(date) for date in dates] + events = [to_dt(date) for date in self.week_of_jul4] + lengths = [] # Run the events. for event in events: window.update(event) + # Record the length of the window after each event. + lengths.append(len(window.ticks)) - # We shouldn't remove any events. + assert lengths == [1, 2, 3, 3, 2] assert window.added == events - assert window.removed == [] - assert list(window.ticks) == events - - extra = to_dt(self.mid_day_saturday + timedelta(days = 2)) - window.update(extra) - - # We should remove only the first event. - assert window.removed == [events[0]] - assert list(window.ticks) == events[1:] + [extra] + assert window.removed == events[:-2] def tearDown(self): setup_logger(self) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 278d8189..487c25de 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -133,7 +133,7 @@ import zipline.finance.risk as risk log = logbook.Logger('Performance') class PerformanceTracker(object): - UPDATER = True + """ Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then @@ -166,12 +166,6 @@ class PerformanceTracker(object): self.event_count = 0 self.last_dict = None self.exceeded_max_loss = False - self.no_more_updates = False - - self.compute_risk_metrics = True - - self.results_socket = None - self.results_addr = None # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( @@ -204,49 +198,34 @@ class PerformanceTracker(object): for sid in sid_list: self.cumulative_performance.positions[sid] = Position(sid) self.todays_performance.positions[sid] = Position(sid) - - def update(self, event): - if self.no_more_updates: - return zp.ndict({'dt':0}) - elif event.dt == "DONE": - event.perf_message = self.handle_simulation_end() - del event['TRANSACTION'] - self.no_more_updates = True - return event - elif self.exceeded_max_loss: - # in case of max_loss, signal to downstream - # generators that we are done. - event.dt = "DONE" - event.perf_message = self.handle_simulation_end() - del event['TRANSACTION'] - self.no_more_updates = True - return event - else: - event.perf_message = self.process_event(event) - event.portfolio = self.get_portfolio() - del event['TRANSACTION'] - return event + + def transform(self, stream_in): + """ + Main generator work loop. + """ + for event in stream_in: + if event.dt == "DONE": + event.perf_message = self.handle_simulation_end() + del event['TRANSACTION'] + yield event + elif self.exceeded_max_loss: + # in case of max_loss, signal to downstream + # generators that we are done. + event.dt = "DONE" + event.perf_message = self.handle_simulation_end() + del event['TRANSACTION'] + yield event + # Cut off the rest of the stream. + raise StopIteration() + else: + event.perf_message = self.process_event(event) + event.portfolio = self.get_portfolio() + del event['TRANSACTION'] + yield event def get_portfolio(self): return self.cumulative_performance.as_portfolio() - def open(self, context): - if self.results_addr: - sock = context.socket(zmq.PUSH) - sock.connect(self.results_addr) - self.results_socket = sock - else: - log.warn("Not streaming results because no results socket given") - - def publish_to(self, results_addr): - """ - Publish the performance results asynchronously to a - socket. - """ - #assert isinstance(results_addr, basestring), type(results_addr) - #self.results_addr = results_addr - self.results_socket = results_addr - def to_dict(self): """ Creates a dictionary representing the state of this tracker. @@ -269,7 +248,7 @@ class PerformanceTracker(object): message = None if self.exceeded_max_loss: - return + return message assert isinstance(event, zp.ndict) self.event_count += 1 @@ -290,7 +269,6 @@ class PerformanceTracker(object): self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() - return message def handle_market_close(self): @@ -304,13 +282,12 @@ class PerformanceTracker(object): self.returns.append(todays_return_obj) #calculate risk metrics for cumulative performance - if self.compute_risk_metrics: - self.cumulative_risk_metrics = risk.RiskMetrics( - start_date=self.period_start, - end_date=self.market_close.replace(hour=0, minute=0, second=0), - returns=self.returns, - trading_environment=self.trading_environment - ) + self.cumulative_risk_metrics = risk.RiskMetrics( + start_date=self.period_start, + end_date=self.market_close.replace(hour=0, minute=0, second=0), + returns=self.returns, + trading_environment=self.trading_environment + ) # increment the day counter before we move markers forward. self.day_count += 1.0 diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 2fa99d9b..a53b5f87 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -9,7 +9,6 @@ from zipline.protocol import SIMULATION_STYLE log = logbook.Logger('Transaction Simulator') class TransactionSimulator(object): - UPDATER = True def __init__(self, sid_filter, style=SIMULATION_STYLE.PARTIAL_VOLUME): self.open_orders = {} @@ -35,6 +34,13 @@ class TransactionSimulator(object): order.filled = 0 self.open_orders[order.sid].append(order) + def transform(self, stream_in): + """ + Main generator work loop. + """ + for event in stream_in: + yield self.update(event) + def update(self, event): event.TRANSACTION = None # We only fill transactions on trade events. @@ -177,13 +183,21 @@ class TradingEnvironment(object): self.period_trading_days = None self.max_drawdown = max_drawdown + assert self.period_start <= self.period_end, \ + "Period start falls after period end." + for bm in benchmark_returns: self.trading_days.append(bm.date) self.trading_day_map[bm.date] = bm + assert self.period_start <= self.trading_days[-1], \ + "Period start falls after the last known trading day." + assert self.period_end >= self.trading_days[0], \ + "Period end falls before the first known trading day." + self.first_open = self.calculate_first_open() self.last_close = self.calculate_last_close() - + self.prior_day_open = self.calculate_prior_day_open() def calculate_first_open(self): diff --git a/zipline/gens/composites.py b/zipline/gens/composites.py index 0a99fc0f..27be9148 100644 --- a/zipline/gens/composites.py +++ b/zipline/gens/composites.py @@ -43,7 +43,9 @@ def merged_transforms(sorted_stream, *transforms): """ for transform in transforms: assert isinstance(transform, StatefulTransform) - + transform.merged = True + transform.sequential = False + # Generate expected hashes for each transform namestrings = [tnfm.get_hash() for tnfm in transforms] @@ -75,11 +77,11 @@ def sequential_transforms(stream_in, *transforms): """ assert isinstance(transforms, (list, tuple)) - for tnfm in transforms: - tnfm.forward_all = False - tnfm.update_in_place = False - tnfm.append_value = True + for tnfm in transforms: + tnfm.sequential = True + tnfm.merged = False + # Recursively apply all transforms to the stream. stream_out = reduce(lambda stream, tnfm: tnfm.transform(stream), transforms, diff --git a/zipline/gens/tradesimulation.py b/zipline/gens/tradesimulation.py index 8c650dbc..98aa727f 100644 --- a/zipline/gens/tradesimulation.py +++ b/zipline/gens/tradesimulation.py @@ -6,7 +6,7 @@ from numbers import Integral from itertools import groupby from zipline import ndict -from zipline.utils.timeout import timeout, heartbeat, Timeout +from zipline.utils.timeout import Heartbeat, Timeout from zipline.gens.transform import StatefulTransform from zipline.finance.trading import TransactionSimulator @@ -61,10 +61,16 @@ class TradeSimulationClient(object): self.sids = algo.get_sid_filter() self.environment = environment self.style = sim_style - self.algo_sim = None - self.warmup_start = self.environment.prior_day_open + self.ordering_client = TransactionSimulator(self.sids, style=self.style) + self.perf_tracker = PerformanceTracker(self.environment, self.sids) + self.algo_start = self.environment.first_open + self.algo_sim = AlgorithmSimulator( + self.ordering_client, + self.algo, + self.algo_start + ) def get_hash(self): """ @@ -79,56 +85,36 @@ class TradeSimulationClient(object): """ # Simulate filling any open orders made by the previous run of - # the user's algorithm. Sets the txn field to true on any + # the user's algorithm. Fills the Transaction field on any # event that results in a filled order. - ordering_client = StatefulTransform( - TransactionSimulator, - self.sids, - style = self.style - ) - with_filled_orders = ordering_client.transform(stream_in) + with_filled_orders = self.ordering_client.transform(stream_in) # Pipe the events with transactions to perf. This will remove - # the txn field added by TransactionSimulator and replace it - # with a portfolio object to be passed to the user's + # the TRANSACTION field added by TransactionSimulator and replace it + # with a portfolio field to be passed to the user's # algorithm. Also adds a perf_message field which is usually # none, but contains an update message once per day. - perf_tracker = StatefulTransform( - PerformanceTracker, - self.environment, - self.sids - ) - with_portfolio = perf_tracker.transform(with_filled_orders) + with_portfolio = self.perf_tracker.transform(with_filled_orders) - # Pass the messages from perf along with the trading client's - # state into the algorithm for simulation. We provide a - # pointer to the ordering client's internal state so that the - # algorithm can place new orders into the client's order book. - self.algo_sim = AlgorithmSimulator( - with_portfolio, - ordering_client.state, - self.algo, - self.algo_start - ) + # Pass the messages from perf to the user's algorithm for simulation. + # Events are batched by dt so that the algo handles all events for a + # given timestamp at one one go. + performance_messages = self.algo_sim.transform(with_portfolio) # The algorithm will yield a daily_results message (as # calculated by the performance tracker) at the end of each # day. It will also yield a risk report at the end of the # simulation. - - for message in self.algo_sim: + for message in performance_messages: yield message class AlgorithmSimulator(object): - + def __init__(self, - stream_in, order_book, algo, algo_start): - - self.stream_in = stream_in - + # ========== # Algo Setup # ========== @@ -155,7 +141,7 @@ class AlgorithmSimulator(object): # Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL # seconds, raising an exception after MAX_HEARTBEATS - self.heartbeat_monitor = heartbeat( + self.heartbeat_monitor = Heartbeat( HEARTBEAT_INTERVAL, MAX_HEARTBEAT_INTERVALS, frame_handler=log_heartbeats, @@ -168,7 +154,6 @@ class AlgorithmSimulator(object): # The algorithm's universe as of our most recent event. self.universe = ndict() - for sid in self.sids: self.universe[sid] = ndict() self.universe.portfolio = None @@ -188,22 +173,10 @@ class AlgorithmSimulator(object): record.extra['algo_dt'] = self.snapshot_dt self.processor = Processor(inject_algo_dt) - # This is a class, which is instantiated later - # in run_algorithm. The class provides a generator. + # Single_use generator that uses the @contextmanager decorator + # to monkey patch sys.stdout with a logbook interface. self.stdout_capture = stdout_only_pipe - self.__generator = None - - def __iter__(self): - return self - - def next(self): - if self.__generator: - return self.__generator.next() - else: - self.__generator = self._gen() - return self.__generator.next() - def order(self, sid, amount): """ Closure to pass into the user's algo to allow placing orders @@ -232,10 +205,10 @@ class AlgorithmSimulator(object): # simulator so that it can fill the placed order when it # receives its next message. self.order_book.place_order(order) - - def _gen(self): + + def transform(self, stream_in): """ - Internal generator work loop. + Main generator work loop. """ # Capture any output of this generator to stdout and pipe it # to a logbook interface. Also inject the current algo @@ -243,12 +216,12 @@ class AlgorithmSimulator(object): with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''): # Call user's initialize method with a timeout. - with timeout(INIT_TIMEOUT, message="Call to initialize timed out"): + with Timeout(INIT_TIMEOUT, message="Call to initialize timed out"): self.algo.initialize() # Group together events with the same dt field. This depends on the # events already being sorted. - for date, snapshot in groupby(self.stream_in, lambda e: e.dt): + for date, snapshot in groupby(stream_in, lambda e: e.dt): # Set the simulation date to be the first event we see. # This should only occur once, at the start of the test. @@ -259,7 +232,7 @@ class AlgorithmSimulator(object): if date == 'DONE': for event in snapshot: yield event.perf_message - break + raise StopIteration() # We're still in the warmup period. Use the event to # update our universe, but don't yield any perf messages, @@ -293,7 +266,7 @@ class AlgorithmSimulator(object): del event['perf_message'] self.update_universe(event) - + # Send the current state of the universe to the user's algo. self.simulate_snapshot(date) @@ -316,7 +289,7 @@ class AlgorithmSimulator(object): # Needs to be set so that we inject the proper date into algo # log/print lines. self.snapshot_dt = date - + start_tic = datetime.now() with self.heartbeat_monitor: self.algo.handle_data(self.universe) diff --git a/zipline/gens/transform.py b/zipline/gens/transform.py index 275a07ec..c19501ee 100644 --- a/zipline/gens/transform.py +++ b/zipline/gens/transform.py @@ -12,14 +12,14 @@ from numbers import Number from abc import ABCMeta, abstractmethod from zipline import ndict -from zipline.utils.tradingcalendar import trading_days_between +from zipline.utils.tradingcalendar import non_trading_days from zipline.gens.utils import assert_sort_unframe_protocol, \ assert_transform_protocol, hash_args log = logbook.Logger('Transform') class Passthrough(object): - FORWARDER = True + PASSTHROUGH = True """ Trivial class for forwarding events. """ @@ -29,23 +29,6 @@ class Passthrough(object): def update(self, event): pass -# Deprecated -def functional_transform(stream_in, func, *args, **kwargs): - """ - Generic transform generator that takes each message from an in-stream - and yields the output of a function on that message. Not sure how - useful this will be in reality, but good for testing. - """ - assert isinstance(func, types.FunctionType), \ - "Functional" - namestring = func.__name__ + hash_args(*args, **kwargs) - - for message in stream_in: - assert_sort_unframe_protocol(message) - out_value = func(message, *args, **kwargs) - assert_transform_protocol(out_value) - yield(namestring, out_value) - class StatefulTransform(object): """ Generic transform generator that takes each message from an @@ -61,15 +44,16 @@ class StatefulTransform(object): assert tnfm_class.__dict__.has_key('update'), \ "Stateful transform requires the class to have an update method" - self.forward_all = tnfm_class.__dict__.get('FORWARDER', False) - self.update_in_place = tnfm_class.__dict__.get('UPDATER', False) - self.append_value = tnfm_class.__dict__.get('APPENDER', False) - - # You only one special behavior mode can be set. - assert sum(map(int, [self.forward_all, - self.update_in_place, - self.append_value])) <= 1 - + # Flag set inside the Passthrough transform class to signify special + # behavior if we are being fed to merged_transforms. + self.passthrough = tnfm_class.__dict__.get('PASSTHROUGH', False) + + # Flags specifying how to append the calculated value. + # Merged is the default for ease of testing, but we use sequential + # in production. + self.sequential = False + self.merged = True + # Create an instance of our transform class. self.state = tnfm_class(*args, **kwargs) @@ -85,7 +69,8 @@ class StatefulTransform(object): def _gen(self, stream_in): # IMPORTANT: Messages may contain pointers that are shared with - # other streams, so we only manipulate copies. + # other streams. Transforms that modify their input + # messages should only manipulate copies. log.info('Running StatefulTransform [%s]' % self.get_hash()) for message in stream_in: @@ -94,61 +79,53 @@ class StatefulTransform(object): if message == None: continue - #TODO: refactor this to avoid unnecessary copying. - assert_sort_unframe_protocol(message) - #message_copy = deepcopy(message) - message_copy = message - # Same shared pointer issue here as above. - tnfm_value = self.state.update(message_copy) + + # This flag is set by by merged_transforms to ensure + # isolation of messages. + if self.merged: + message = deepcopy(message) + + tnfm_value = self.state.update(message) - # FORWARDER flag means we want to keep all original + # PASSTHROUGH flag means we want to keep all original # values, plus append tnfm_id and tnfm_value. Used for # preserving the original event fields when our output # will be fed into a merge. Currently only Passthrough # uses this flag. - if self.forward_all: - out_message = message_copy + if self.passthrough and self.merged: + out_message = message out_message.tnfm_id = self.namestring out_message.tnfm_value = tnfm_value yield out_message - # UPDATER flag should be used for transforms that - # side-effectfully modify the event they are passed. - # Updated messages are passed along exactly as they are - # returned to use by our state class. Useful for chaining - # specific transforms that won't be fed to a merge. (See - # the implementation of TradeSimulationClient for example - # usage of this flag with PerformanceTracker and - # TransactionSimulator. - elif self.update_in_place: - yield tnfm_value - - # APPENDER flag should be used to add a single new + # If the merged flag is set, we create a new message + # containing just the tnfm_id, the event's datetime, and + # the calculated tnfm_value. This is the default behavior + # for a non-passthrough transform being fed into a merge. + elif self.merged: + out_message = ndict() + out_message.tnfm_id = self.namestring + out_message.tnfm_value = tnfm_value + out_message.dt = message.dt + yield out_message + + # Sequential flag should be used to add a single new # key-value pair to the event. The new key is this - # transform's namestring, and it's value is the value + # transform's namestring, and its value is the value # returned by state.update(event). This is almost # identical to the behavior of FORWARDER, except we # compress the two calculated values (tnfm_id, and # tnfm_value) into a single field. This mode is used by - # the sequential_transforms composite. - elif self.append_value: - out_message = message_copy + # the sequential_transforms composite and is the default + # if no behavior is specified by the internal state class. + elif self.sequential: + out_message = message out_message[self.namestring] = tnfm_value yield out_message - - # If no flags are set, we create a new message containing - # just the tnfm_id, the event's datetime, and the - # calculated tnfm_value. This is the default behavior for - # a transform being fed into a merge. - else: - out_message = ndict() - out_message.tnfm_id = self.namestring - out_message.tnfm_value = tnfm_value - out_message.dt = message_copy.dt - yield out_message - + log.info('Finished StatefulTransform [%s]' % self.get_hash()) + class EventWindow: """ Abstract base class for transform classes that calculate iterative @@ -180,8 +157,10 @@ class EventWindow: # Market-aware mode only works with full-day windows. if self.market_aware: - assert self.days and not self.delta,\ + assert self.days and self.delta == None,\ "Market-aware mode only works with full-day windows." + self.all_holidays = deque(non_trading_days) + self.cur_holidays = deque() # Non-market-aware mode requires a timedelta. else: @@ -215,6 +194,9 @@ class EventWindow: # Subclasses should override handle_add to define behavior for # adding new ticks. self.handle_add(event) + + if self.market_aware: + self.add_new_holidays(event.dt) # Clear out any expired events. drop_condition changes depending # on whether or not we are running in market_aware mode. @@ -223,16 +205,40 @@ class EventWindow: # | | # V V while self.drop_condition(self.ticks[0].dt, self.ticks[-1].dt): - + # popleft removes and returns the oldest tick in self.ticks popped = self.ticks.popleft() # Subclasses should override handle_remove to define # behavior for removing ticks. self.handle_remove(popped) + + def add_new_holidays(self, newest): + # Add to our tracked window any untracked holidays that are + # older than our newest event. (newest should always be + # self.ticks[-1]) + while len(self.all_holidays) > 0 and self.all_holidays[0] <= newest: + self.cur_holidays.append(self.all_holidays.popleft()) + + def drop_old_holidays(self, oldest): + # Drop from our tracked window any holidays that are older + # than our oldest tracked event. (oldest should always + # be self.ticks[0]) + while len(self.cur_holidays) > 0 and self.cur_holidays[0] < oldest: + self.cur_holidays.popleft() def out_of_market_window(self, oldest, newest): - return trading_days_between(oldest, newest) >= self.days + self.drop_old_holidays(oldest) + calendar_dates_between = (newest.date() - oldest.date()).days + holidays_between = len(self.cur_holidays) + trading_days_between = calendar_dates_between - holidays_between + + # "Put back" a day if oldest is earlier in its day than newest, + # reflecting the fact that we haven't yet completed the last + # day in the window. + if oldest.time() > newest.time(): + trading_days_between -= 1 + return trading_days_between >= self.days def out_of_delta(self, oldest, newest): return (newest - oldest) >= self.delta diff --git a/zipline/lines.py b/zipline/lines.py index e861706c..e4e4af8c 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -121,7 +121,6 @@ class SimulatedTrading(object): # exit status flag self.success = False - def simulate(self, blocking=True, send_sighup=False): # for non-blocking, @@ -180,7 +179,7 @@ class SimulatedTrading(object): else: log.warning("Sending SIGINT") os.kill(ppid, SIGINT) - + def handle_exception(self, exc): if isinstance(exc, CancelSignal): # signal from monitor of an orderly shutdown, @@ -207,9 +206,8 @@ class SimulatedTrading(object): exc_type.__name__, exc_value.message ) - + self.results_socket.send(msg) - except: log.exception("Exception while reporting simulation exception.") @@ -362,3 +360,26 @@ class SimulatedTrading(object): #------------------- return sim + +class SimulatedTradingLite(object): + """ + SimulatedTrading without multiprocess and without zmq. + Useful for profiling the core logic and for rapid testing + of new features. + """ + def __init__(self, + sources, + transforms, + algorithm, + environment, + style): + + self.date_sorted = date_sorted_sources(*sources) + self.transforms = transforms + # Formerly merged_transforms. + self.with_tnfms = sequential_transforms(self.date_sorted, *self.transforms) + self.trading_client = tsc(algorithm, environment, style) + self.gen = self.trading_client.simulate(self.with_tnfms) + + def get_results(self): + return self.gen diff --git a/zipline/utils/timeout.py b/zipline/utils/timeout.py index 2985222e..34d4a4a5 100644 --- a/zipline/utils/timeout.py +++ b/zipline/utils/timeout.py @@ -6,20 +6,22 @@ from pprint import pprint as pp from numbers import Number from logbook import Logger -class Timeout(Exception): +class TimeoutException(Exception): def __init__(self, frame, message=''): self.frame = frame self.message = message + +# TODO: fix code replication here. -class timeout(object): +class Timeout(object): """ Utility to make a function raise TimeoutException if it spends more than a specified number of seconds executing. Can be used as a decorator to apply a static timeout to a function, or as a context manager to dynamically add a timeout to a code block. """ - + def __init__(self, seconds, message=''): self.seconds = seconds self.message = message @@ -27,13 +29,13 @@ class timeout(object): assert seconds > 0, "Timeout must be greater than 0" def handler(self, signum, frame): - raise Timeout(frame, self.message) + raise TimeoutException(frame, self.message) def __call__(self, fn): @wraps(fn) def call_fn_with_timeout(*args, **kwargs): - # Set the alarm. + # Set the alarm, saving any handler that existed previously. signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, self.seconds, 0) try: @@ -63,7 +65,7 @@ class timeout(object): signal.signal(signal.SIGALRM, self.handler) signal.setitimer(signal.ITIMER_REAL, 0, 0) -class heartbeat(object): +class Heartbeat(object): """ Utility to perform pseudo-heartbeat checks on a single-threaded function. Calls frame_handler on the current stack frame of the @@ -89,7 +91,7 @@ class heartbeat(object): self.frame_handler(self.count, frame) if self.count >= self.max_intervals: - raise Timeout(frame, self.timeout_message) + raise TimeoutException(frame, self.timeout_message) def __call__(self, fn): diff --git a/zipline/utils/tradingcalendar.py b/zipline/utils/tradingcalendar.py index f760e51e..36dd3eea 100644 --- a/zipline/utils/tradingcalendar.py +++ b/zipline/utils/tradingcalendar.py @@ -1,358 +1,114 @@ import pytz -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date from dateutil import rrule from zipline.utils.date_utils import utcnow -def market_opens(start, end, inclusive=False): - """ - Returns all market opens between the start date and the end date. - Must use utc-stamped datetimes. - """ - return opens.between(start, end, inc=inclusive) +start = datetime(2002, 1,1, tzinfo=pytz.utc) +end = utcnow() -def market_closes(start, end, inclusive=False): - """ - Returns all market closes between the start date and the end date. - Must use utc-stamped datetimes. - """ - return closes.between(start, end, inc=inclusive) +non_trading_rules = [] -def trading_days_between(start, end): - """ - Calculate the number of "complete" trading days between two - events. We define this as the number of market opens that - occurred between start and end, with the caveat that we subtract 1 - from this total if end falls on the same day as the last market - open and end occurs earlier in its own day than start. This - reflects the fact that we haven't completed a full day - corresponding to the last market open. - - Examples: - - 1.) - start = Tuesday, Aug 7, 2012, 1:00 pm - end = Wednesday, Aug 8, 2012, 1:30 pm - - There is one market open between these dates, on the morning of - Wednesday the 8th. This falls on the same calendar day as end, - but end is later in the day than start, so we count this as a full - day. The correct output is 1. - - 2.) - start = Tuesday, Aug 7, 2012, 1:30 pm - end = Wednesday, Aug 8, 2012, 1:00 pm - - There is one market open between these dayes, on the morning of - Wednesday the 8th. This falls on the same calendar day as end, - and end is earlier in the day than start, so we do not count this - day as completed. The correct output is 0. - - 3.) - start = Tuesday, Aug 7, 2012, 1:00 pm - end = Saturday, Aug 11, 2012, 1:30 pm - - There are 3 market opens between these dates, occurring on - Wednesday, Thursday, and Friday. The last open is not on - the same day as end, so we simply return 3 - - 4.) - start = Tuesday, Aug 7, 2012, 1:30 pm - end = Monday, Aug, 13, 2012, 1:00 pm - - There are 4 market opens between these dates, occurring on - Wednesday, Thursday, Friday, and the following Monday. The - last open occurs on the same calendar day as end, and end - is earlier in the day than start, so we do not count the - last market day as completed. The correct output is 3 days. - """ - # Calculate the number of opens between the events. - opens = (market_opens(start, end)) - days_between = len(opens) - if days_between == 0: - return days_between - - # If end falls on the same day as an open, subtract 1 from the - # total if end is earlier in its respective day than start. - last_open = opens[-1] - if last_open.date() == end.date() and earlier_in_day(end, start): - days_between -=1 - - return days_between - -def earlier_in_day(d1, d2): - """ - Return true if d1 falls earlier in its own day than d2. - """ - return d1.time() < d2.time() - -WEEKDAYS = [rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR] - -# Recurrence rule that generates all market opens since Jan 1, 1970. -# This does not exclude holidays. -market_opens_with_holidays = rrule.rrule( - rrule.DAILY, - byweekday=WEEKDAYS, - byhour = 14, - byminute = 30, +weekends = rrule.rrule( + rrule.YEARLY, + byweekday=(rrule.SA, rrule.SU), cache = True, - dtstart=datetime(2000, 1, 1, tzinfo = pytz.utc), - until=datetime(2014 , 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(weekends) -# Recurrence rule that generates all market closes since Jan 1, 1970. -# This does not exclude holidays. -market_closes_with_holidays = rrule.rrule( - rrule.DAILY, - byweekday=WEEKDAYS, - byhour = 21, - byminute = 0, - cache = True, - dtstart=datetime(2001, 1, 1, tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) - -# Recurrence rules for excluding the market open/close on new years. -new_years_opens = rrule.rrule( +new_years = rrule.rrule( rrule.MONTHLY, byyearday = 1, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -new_years_closes = rrule.rrule( - rrule.MONTHLY, - byyearday = 1, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(new_years) -# Recurrence rules for excluding MLK day. It is always the third -# monday in January. -mlk_opens = rrule.rrule( - rrule.MONTHLY, - bymonth = 1, - byweekday = (rrule.MO(3)), - byhour = 14, - byminute = 30, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -mlk_closes = rrule.rrule( +mlk_day = rrule.rrule( rrule.MONTHLY, bymonth = 1, byweekday = (rrule.MO(+3)), - byhour = 21, - byminute = 0, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) + dtstart = start, + until = end +) +non_trading_rules.append(mlk_day) -# Recurrence rules for generating the market open/close for -# presidents' day. Presidents' day always occurs on the third monday -# of February. -presidents_day_opens = rrule.rrule( +presidents_day = rrule.rrule( rrule.MONTHLY, bymonth = 2, byweekday = (rrule.MO(3)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -presidents_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 2, - byweekday = (rrule.MO(3)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(presidents_day) -# Recurrence rules for generating the market open/close for good -# friday. Good friday always falls 2 days before easter, which -# thankfully is a built-in refernce in this module. -good_friday_opens = rrule.rrule( +good_friday = rrule.rrule( rrule.DAILY, byeaster = -2, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) -good_friday_closes = rrule.rrule( - rrule.DAILY, - byeaster = -2, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) +non_trading_rules.append(good_friday) -# Recurrence rules for generating the market open/close for memorial -# day. Memorial day always occurs on the last monday of May. -memorial_day_opens = rrule.rrule( +memorial_day = rrule.rrule( rrule.MONTHLY, bymonth = 5, byweekday = (rrule.MO(-1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -memorial_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 5, - byweekday = (rrule.MO(-1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(memorial_day) -# Recurrence rules for generating the market open/close for July 4th. -july_4th_opens = rrule.rrule( +july_4th = rrule.rrule( rrule.MONTHLY, - bymonth = 6, + bymonth = 7, bymonthday = 4, - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -july_4th_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 6, - bymonthday = 4, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(july_4th) -# Recurrence rule for generating the market open/close for labor day. -# Labor day is always the first monday of September. -labor_day_opens = rrule.rrule( +labor_day = rrule.rrule( rrule.MONTHLY, bymonth = 9, byweekday = (rrule.MO(1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -labor_day_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 9, - byweekday = (rrule.MO(1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(labor_day) -# Recurrence rule for generating the market open/close for -# thanksgiving. Thanksgiving always falls on the fourth thursday in -# November. (Who decides how these holidays work!?!) -thanksgiving_opens = rrule.rrule( +thanksgiving = rrule.rrule( rrule.MONTHLY, bymonth = 11, byweekday = (rrule.TH(-1)), - byhour = 14, - byminute = 30, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -thanksgiving_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 11, - byweekday = (rrule.TH(-1)), - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(thanksgiving) -# Recurrence relation for generating the market open/close for -# christmas. Christmas always occurs on december 25th. - -christmas_opens = rrule.rrule( +christmas = rrule.rrule( rrule.MONTHLY, bymonth = 12, - bymonthday = 25, - byhour = 14, - byminute = 30, + bymonthday = 25, cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) -) -christmas_closes = rrule.rrule( - rrule.MONTHLY, - bymonth = 12, - bymonthday = 25, - byhour = 21, - byminute = 0, - cache = True, - dtstart = datetime(2000, 1,1,tzinfo = pytz.utc), - until=datetime(2014, 1, 1, tzinfo = pytz.utc) + dtstart = start, + until = end ) +non_trading_rules.append(christmas) -# All NYSE observed holidays. -holiday_opens = [ - new_years_opens, - mlk_opens, - presidents_day_opens, - good_friday_opens, - memorial_day_opens, - july_4th_opens, - labor_day_opens, - thanksgiving_opens, - christmas_opens -] -holiday_closes = [ - new_years_closes, - mlk_closes, - presidents_day_closes, - good_friday_closes, - memorial_day_closes, - july_4th_closes, - labor_day_closes, - thanksgiving_closes, - christmas_closes -] +non_trading_ruleset = rrule.rruleset() -# Valid market opens are given by all market opens minus holidays. -opens = rrule.rruleset(cache=True) -opens.rrule(market_opens_with_holidays) -for holiday_rule in holiday_opens: - opens.exrule(holiday_rule) - -closes = rrule.rruleset(cache=True) -closes.rrule(market_closes_with_holidays) -for holiday_rule in holiday_closes: - closes.exrule(holiday_rule) - -# This runs the calendar to load all data into a cache. -open_count = opens.count() -close_count = closes.count() +for rule in non_trading_rules: + non_trading_ruleset.rrule(rule) +non_trading_days = non_trading_ruleset.between(start, end, inc=True)