Resolved conflicts.

This commit is contained in:
Thomas Wiecki
2012-08-23 13:44:16 -04:00
10 changed files with 282 additions and 547 deletions
+2 -2
View File
@@ -160,7 +160,7 @@ class ExceptionTestCase(TestCase):
output, _ = drain_zipline(self, zipline)
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertEqual(payload['name'],'Timeout')
self.assertEqual(payload['name'],'TimeoutException')
self.assertEqual(payload['message'], 'Call to initialize timed out')
def test_heartbeat(self):
@@ -186,6 +186,6 @@ class ExceptionTestCase(TestCase):
# Assert that the last message is a timeout exception.
self.assertEqual(output[-1]['prefix'], 'EXCEPTION')
payload = output[-1]['payload']
self.assertEqual(payload['name'],'Timeout')
self.assertEqual(payload['name'],'TimeoutException')
self.assertEqual(payload['message'], 'Too much time spent in handle_data call')
+31 -47
View File
@@ -46,24 +46,20 @@ class EventWindowTestCase(TestCase):
def setUp(self):
setup_logger(self)
# Constants calling before open, during the day, and after
# close on a valid trading day.
self.pre_open = datetime(2012, 8, 7, 13, tzinfo = pytz.utc)
self.mid_day = datetime(2012, 8, 7, 15, tzinfo = pytz.utc)
self.post_close = datetime(2012, 8, 7, 22, tzinfo = pytz.utc)
self.monday = datetime(2012, 7, 9, 16, tzinfo=pytz.utc)
self.eleven_normal_days = [self.monday + i*timedelta(days=1)
for i in xrange(11)]
# Constants calling before open, during the day, and after
# close on a saturday.
self.pre_open_saturday = datetime(2012, 8, 11, 13, tzinfo = pytz.utc)
self.mid_day_saturday = datetime(2012, 8, 11, 15, tzinfo = pytz.utc)
self.post_close_saturday = datetime(2012, 8, 11, 22, tzinfo = pytz.utc)
# Modify the end of the period slightly to exercise the
# incomplete day logic.
self.eleven_normal_days[-1] -= timedelta(minutes = 1)
self.eleven_normal_days.append(self.monday+timedelta(days=11,seconds=1))
# Second set of dates to test holiday handling.
self.jul4_monday = datetime(2012, 7, 2, 16, tzinfo=pytz.utc)
self.week_of_jul4 = [self.jul4_monday + i*timedelta(days=1)
for i in xrange(5)]
# Constants calling before open, during the day, and after
# close on a holiday.
self.pre_open_holiday = datetime(2012, 12, 25, 13, tzinfo = pytz.utc)
self.mid_day_holiday = datetime(2012, 12, 25, tzinfo = pytz.utc)
self.post_close_holiday = datetime(2012, 12, 25, 22, tzinfo = pytz.utc)
def test_event_window_with_timedelta(self):
# Keep all events within a 5 minute window.
@@ -96,58 +92,46 @@ class EventWindowTestCase(TestCase):
for dropped in window.removed:
assert message.dt - dropped.dt >= timedelta(minutes = 5)
def test_market_aware_window(self):
def test_market_aware_window_normal_week(self):
window = NoopEventWindow(
market_aware = True,
delta = None,
days = 1
days = 3
)
dates = ([self.pre_open]*3)
dates += ([self.mid_day]*3)
dates += ([self.post_close]*3)
dates += [self.pre_open + timedelta(days = 1, seconds = 1)]
events = [to_dt(date) for date in dates]
events = [to_dt(date) for date in self.eleven_normal_days]
lengths = []
# Run the events.
for event in events:
window.update(event)
# Record the length of the window after each event.
lengths.append(len(window.ticks))
# We should have removed the pre_open events on the first day.
# The rest should be intact.
# The window stretches out during the weekend because we wait
# to drop events until the weekend ends. The last window is
# briefly longer because it doesn't complete a full day. The
# window then shrinks once the day completes
assert lengths == [1, 2, 3, 3, 3, 4, 5, 5, 5, 3, 4, 3]
assert window.added == events
assert window.removed == events[0:3]
assert list(window.ticks) == events[3:]
assert window.removed == events[:-3]
def test_market_aware_window_weekend(self):
def test_market_aware_window_holiday(self):
window = NoopEventWindow(
market_aware = True,
delta = None,
days = 2
)
dates = [self.pre_open_saturday - timedelta(days = 1, seconds=1)]
dates += [self.mid_day_saturday - timedelta(days = 1, seconds=1)]
dates += [self.post_close_saturday - timedelta(days = 1, seconds=1)]
dates += [self.mid_day_saturday + timedelta(days = 1)]
events = [to_dt(date) for date in dates]
events = [to_dt(date) for date in self.week_of_jul4]
lengths = []
# Run the events.
for event in events:
window.update(event)
# Record the length of the window after each event.
lengths.append(len(window.ticks))
# We shouldn't remove any events.
assert lengths == [1, 2, 3, 3, 2]
assert window.added == events
assert window.removed == []
assert list(window.ticks) == events
extra = to_dt(self.mid_day_saturday + timedelta(days = 2))
window.update(extra)
# We should remove only the first event.
assert window.removed == [events[0]]
assert list(window.ticks) == events[1:] + [extra]
assert window.removed == events[:-2]
def tearDown(self):
setup_logger(self)
+32 -55
View File
@@ -133,7 +133,7 @@ import zipline.finance.risk as risk
log = logbook.Logger('Performance')
class PerformanceTracker(object):
UPDATER = True
"""
Tracks the performance of the zipline as it is running in
the simulator, relays this out to the Deluge broker and then
@@ -166,12 +166,6 @@ class PerformanceTracker(object):
self.event_count = 0
self.last_dict = None
self.exceeded_max_loss = False
self.no_more_updates = False
self.compute_risk_metrics = True
self.results_socket = None
self.results_addr = None
# this performance period will span the entire simulation.
self.cumulative_performance = PerformancePeriod(
@@ -204,49 +198,34 @@ class PerformanceTracker(object):
for sid in sid_list:
self.cumulative_performance.positions[sid] = Position(sid)
self.todays_performance.positions[sid] = Position(sid)
def update(self, event):
if self.no_more_updates:
return zp.ndict({'dt':0})
elif event.dt == "DONE":
event.perf_message = self.handle_simulation_end()
del event['TRANSACTION']
self.no_more_updates = True
return event
elif self.exceeded_max_loss:
# in case of max_loss, signal to downstream
# generators that we are done.
event.dt = "DONE"
event.perf_message = self.handle_simulation_end()
del event['TRANSACTION']
self.no_more_updates = True
return event
else:
event.perf_message = self.process_event(event)
event.portfolio = self.get_portfolio()
del event['TRANSACTION']
return event
def transform(self, stream_in):
"""
Main generator work loop.
"""
for event in stream_in:
if event.dt == "DONE":
event.perf_message = self.handle_simulation_end()
del event['TRANSACTION']
yield event
elif self.exceeded_max_loss:
# in case of max_loss, signal to downstream
# generators that we are done.
event.dt = "DONE"
event.perf_message = self.handle_simulation_end()
del event['TRANSACTION']
yield event
# Cut off the rest of the stream.
raise StopIteration()
else:
event.perf_message = self.process_event(event)
event.portfolio = self.get_portfolio()
del event['TRANSACTION']
yield event
def get_portfolio(self):
return self.cumulative_performance.as_portfolio()
def open(self, context):
if self.results_addr:
sock = context.socket(zmq.PUSH)
sock.connect(self.results_addr)
self.results_socket = sock
else:
log.warn("Not streaming results because no results socket given")
def publish_to(self, results_addr):
"""
Publish the performance results asynchronously to a
socket.
"""
#assert isinstance(results_addr, basestring), type(results_addr)
#self.results_addr = results_addr
self.results_socket = results_addr
def to_dict(self):
"""
Creates a dictionary representing the state of this tracker.
@@ -269,7 +248,7 @@ class PerformanceTracker(object):
message = None
if self.exceeded_max_loss:
return
return message
assert isinstance(event, zp.ndict)
self.event_count += 1
@@ -290,7 +269,6 @@ class PerformanceTracker(object):
self.cumulative_performance.calculate_performance()
self.todays_performance.calculate_performance()
return message
def handle_market_close(self):
@@ -304,13 +282,12 @@ class PerformanceTracker(object):
self.returns.append(todays_return_obj)
#calculate risk metrics for cumulative performance
if self.compute_risk_metrics:
self.cumulative_risk_metrics = risk.RiskMetrics(
start_date=self.period_start,
end_date=self.market_close.replace(hour=0, minute=0, second=0),
returns=self.returns,
trading_environment=self.trading_environment
)
self.cumulative_risk_metrics = risk.RiskMetrics(
start_date=self.period_start,
end_date=self.market_close.replace(hour=0, minute=0, second=0),
returns=self.returns,
trading_environment=self.trading_environment
)
# increment the day counter before we move markers forward.
self.day_count += 1.0
+16 -2
View File
@@ -9,7 +9,6 @@ from zipline.protocol import SIMULATION_STYLE
log = logbook.Logger('Transaction Simulator')
class TransactionSimulator(object):
UPDATER = True
def __init__(self, sid_filter, style=SIMULATION_STYLE.PARTIAL_VOLUME):
self.open_orders = {}
@@ -35,6 +34,13 @@ class TransactionSimulator(object):
order.filled = 0
self.open_orders[order.sid].append(order)
def transform(self, stream_in):
"""
Main generator work loop.
"""
for event in stream_in:
yield self.update(event)
def update(self, event):
event.TRANSACTION = None
# We only fill transactions on trade events.
@@ -177,13 +183,21 @@ class TradingEnvironment(object):
self.period_trading_days = None
self.max_drawdown = max_drawdown
assert self.period_start <= self.period_end, \
"Period start falls after period end."
for bm in benchmark_returns:
self.trading_days.append(bm.date)
self.trading_day_map[bm.date] = bm
assert self.period_start <= self.trading_days[-1], \
"Period start falls after the last known trading day."
assert self.period_end >= self.trading_days[0], \
"Period end falls before the first known trading day."
self.first_open = self.calculate_first_open()
self.last_close = self.calculate_last_close()
self.prior_day_open = self.calculate_prior_day_open()
def calculate_first_open(self):
+7 -5
View File
@@ -43,7 +43,9 @@ def merged_transforms(sorted_stream, *transforms):
"""
for transform in transforms:
assert isinstance(transform, StatefulTransform)
transform.merged = True
transform.sequential = False
# Generate expected hashes for each transform
namestrings = [tnfm.get_hash() for tnfm in transforms]
@@ -75,11 +77,11 @@ def sequential_transforms(stream_in, *transforms):
"""
assert isinstance(transforms, (list, tuple))
for tnfm in transforms:
tnfm.forward_all = False
tnfm.update_in_place = False
tnfm.append_value = True
for tnfm in transforms:
tnfm.sequential = True
tnfm.merged = False
# Recursively apply all transforms to the stream.
stream_out = reduce(lambda stream, tnfm: tnfm.transform(stream),
transforms,
+32 -59
View File
@@ -6,7 +6,7 @@ from numbers import Integral
from itertools import groupby
from zipline import ndict
from zipline.utils.timeout import timeout, heartbeat, Timeout
from zipline.utils.timeout import Heartbeat, Timeout
from zipline.gens.transform import StatefulTransform
from zipline.finance.trading import TransactionSimulator
@@ -61,10 +61,16 @@ class TradeSimulationClient(object):
self.sids = algo.get_sid_filter()
self.environment = environment
self.style = sim_style
self.algo_sim = None
self.warmup_start = self.environment.prior_day_open
self.ordering_client = TransactionSimulator(self.sids, style=self.style)
self.perf_tracker = PerformanceTracker(self.environment, self.sids)
self.algo_start = self.environment.first_open
self.algo_sim = AlgorithmSimulator(
self.ordering_client,
self.algo,
self.algo_start
)
def get_hash(self):
"""
@@ -79,56 +85,36 @@ class TradeSimulationClient(object):
"""
# Simulate filling any open orders made by the previous run of
# the user's algorithm. Sets the txn field to true on any
# the user's algorithm. Fills the Transaction field on any
# event that results in a filled order.
ordering_client = StatefulTransform(
TransactionSimulator,
self.sids,
style = self.style
)
with_filled_orders = ordering_client.transform(stream_in)
with_filled_orders = self.ordering_client.transform(stream_in)
# Pipe the events with transactions to perf. This will remove
# the txn field added by TransactionSimulator and replace it
# with a portfolio object to be passed to the user's
# the TRANSACTION field added by TransactionSimulator and replace it
# with a portfolio field to be passed to the user's
# algorithm. Also adds a perf_message field which is usually
# none, but contains an update message once per day.
perf_tracker = StatefulTransform(
PerformanceTracker,
self.environment,
self.sids
)
with_portfolio = perf_tracker.transform(with_filled_orders)
with_portfolio = self.perf_tracker.transform(with_filled_orders)
# Pass the messages from perf along with the trading client's
# state into the algorithm for simulation. We provide a
# pointer to the ordering client's internal state so that the
# algorithm can place new orders into the client's order book.
self.algo_sim = AlgorithmSimulator(
with_portfolio,
ordering_client.state,
self.algo,
self.algo_start
)
# Pass the messages from perf to the user's algorithm for simulation.
# Events are batched by dt so that the algo handles all events for a
# given timestamp at one one go.
performance_messages = self.algo_sim.transform(with_portfolio)
# The algorithm will yield a daily_results message (as
# calculated by the performance tracker) at the end of each
# day. It will also yield a risk report at the end of the
# simulation.
for message in self.algo_sim:
for message in performance_messages:
yield message
class AlgorithmSimulator(object):
def __init__(self,
stream_in,
order_book,
algo,
algo_start):
self.stream_in = stream_in
# ==========
# Algo Setup
# ==========
@@ -155,7 +141,7 @@ class AlgorithmSimulator(object):
# Context manager that calls log_heartbeats every HEARTBEAT_INTERVAL
# seconds, raising an exception after MAX_HEARTBEATS
self.heartbeat_monitor = heartbeat(
self.heartbeat_monitor = Heartbeat(
HEARTBEAT_INTERVAL,
MAX_HEARTBEAT_INTERVALS,
frame_handler=log_heartbeats,
@@ -168,7 +154,6 @@ class AlgorithmSimulator(object):
# The algorithm's universe as of our most recent event.
self.universe = ndict()
for sid in self.sids:
self.universe[sid] = ndict()
self.universe.portfolio = None
@@ -188,22 +173,10 @@ class AlgorithmSimulator(object):
record.extra['algo_dt'] = self.snapshot_dt
self.processor = Processor(inject_algo_dt)
# This is a class, which is instantiated later
# in run_algorithm. The class provides a generator.
# Single_use generator that uses the @contextmanager decorator
# to monkey patch sys.stdout with a logbook interface.
self.stdout_capture = stdout_only_pipe
self.__generator = None
def __iter__(self):
return self
def next(self):
if self.__generator:
return self.__generator.next()
else:
self.__generator = self._gen()
return self.__generator.next()
def order(self, sid, amount):
"""
Closure to pass into the user's algo to allow placing orders
@@ -232,10 +205,10 @@ class AlgorithmSimulator(object):
# simulator so that it can fill the placed order when it
# receives its next message.
self.order_book.place_order(order)
def _gen(self):
def transform(self, stream_in):
"""
Internal generator work loop.
Main generator work loop.
"""
# Capture any output of this generator to stdout and pipe it
# to a logbook interface. Also inject the current algo
@@ -243,12 +216,12 @@ class AlgorithmSimulator(object):
with self.processor.threadbound(), self.stdout_capture(Logger('Print'),''):
# Call user's initialize method with a timeout.
with timeout(INIT_TIMEOUT, message="Call to initialize timed out"):
with Timeout(INIT_TIMEOUT, message="Call to initialize timed out"):
self.algo.initialize()
# Group together events with the same dt field. This depends on the
# events already being sorted.
for date, snapshot in groupby(self.stream_in, lambda e: e.dt):
for date, snapshot in groupby(stream_in, lambda e: e.dt):
# Set the simulation date to be the first event we see.
# This should only occur once, at the start of the test.
@@ -259,7 +232,7 @@ class AlgorithmSimulator(object):
if date == 'DONE':
for event in snapshot:
yield event.perf_message
break
raise StopIteration()
# We're still in the warmup period. Use the event to
# update our universe, but don't yield any perf messages,
@@ -293,7 +266,7 @@ class AlgorithmSimulator(object):
del event['perf_message']
self.update_universe(event)
# Send the current state of the universe to the user's algo.
self.simulate_snapshot(date)
@@ -316,7 +289,7 @@ class AlgorithmSimulator(object):
# Needs to be set so that we inject the proper date into algo
# log/print lines.
self.snapshot_dt = date
start_tic = datetime.now()
with self.heartbeat_monitor:
self.algo.handle_data(self.universe)
+75 -69
View File
@@ -12,14 +12,14 @@ from numbers import Number
from abc import ABCMeta, abstractmethod
from zipline import ndict
from zipline.utils.tradingcalendar import trading_days_between
from zipline.utils.tradingcalendar import non_trading_days
from zipline.gens.utils import assert_sort_unframe_protocol, \
assert_transform_protocol, hash_args
log = logbook.Logger('Transform')
class Passthrough(object):
FORWARDER = True
PASSTHROUGH = True
"""
Trivial class for forwarding events.
"""
@@ -29,23 +29,6 @@ class Passthrough(object):
def update(self, event):
pass
# Deprecated
def functional_transform(stream_in, func, *args, **kwargs):
"""
Generic transform generator that takes each message from an in-stream
and yields the output of a function on that message. Not sure how
useful this will be in reality, but good for testing.
"""
assert isinstance(func, types.FunctionType), \
"Functional"
namestring = func.__name__ + hash_args(*args, **kwargs)
for message in stream_in:
assert_sort_unframe_protocol(message)
out_value = func(message, *args, **kwargs)
assert_transform_protocol(out_value)
yield(namestring, out_value)
class StatefulTransform(object):
"""
Generic transform generator that takes each message from an
@@ -61,15 +44,16 @@ class StatefulTransform(object):
assert tnfm_class.__dict__.has_key('update'), \
"Stateful transform requires the class to have an update method"
self.forward_all = tnfm_class.__dict__.get('FORWARDER', False)
self.update_in_place = tnfm_class.__dict__.get('UPDATER', False)
self.append_value = tnfm_class.__dict__.get('APPENDER', False)
# You only one special behavior mode can be set.
assert sum(map(int, [self.forward_all,
self.update_in_place,
self.append_value])) <= 1
# Flag set inside the Passthrough transform class to signify special
# behavior if we are being fed to merged_transforms.
self.passthrough = tnfm_class.__dict__.get('PASSTHROUGH', False)
# Flags specifying how to append the calculated value.
# Merged is the default for ease of testing, but we use sequential
# in production.
self.sequential = False
self.merged = True
# Create an instance of our transform class.
self.state = tnfm_class(*args, **kwargs)
@@ -85,7 +69,8 @@ class StatefulTransform(object):
def _gen(self, stream_in):
# IMPORTANT: Messages may contain pointers that are shared with
# other streams, so we only manipulate copies.
# other streams. Transforms that modify their input
# messages should only manipulate copies.
log.info('Running StatefulTransform [%s]' % self.get_hash())
for message in stream_in:
@@ -94,61 +79,53 @@ class StatefulTransform(object):
if message == None:
continue
#TODO: refactor this to avoid unnecessary copying.
assert_sort_unframe_protocol(message)
#message_copy = deepcopy(message)
message_copy = message
# Same shared pointer issue here as above.
tnfm_value = self.state.update(message_copy)
# This flag is set by by merged_transforms to ensure
# isolation of messages.
if self.merged:
message = deepcopy(message)
tnfm_value = self.state.update(message)
# FORWARDER flag means we want to keep all original
# PASSTHROUGH flag means we want to keep all original
# values, plus append tnfm_id and tnfm_value. Used for
# preserving the original event fields when our output
# will be fed into a merge. Currently only Passthrough
# uses this flag.
if self.forward_all:
out_message = message_copy
if self.passthrough and self.merged:
out_message = message
out_message.tnfm_id = self.namestring
out_message.tnfm_value = tnfm_value
yield out_message
# UPDATER flag should be used for transforms that
# side-effectfully modify the event they are passed.
# Updated messages are passed along exactly as they are
# returned to use by our state class. Useful for chaining
# specific transforms that won't be fed to a merge. (See
# the implementation of TradeSimulationClient for example
# usage of this flag with PerformanceTracker and
# TransactionSimulator.
elif self.update_in_place:
yield tnfm_value
# APPENDER flag should be used to add a single new
# If the merged flag is set, we create a new message
# containing just the tnfm_id, the event's datetime, and
# the calculated tnfm_value. This is the default behavior
# for a non-passthrough transform being fed into a merge.
elif self.merged:
out_message = ndict()
out_message.tnfm_id = self.namestring
out_message.tnfm_value = tnfm_value
out_message.dt = message.dt
yield out_message
# Sequential flag should be used to add a single new
# key-value pair to the event. The new key is this
# transform's namestring, and it's value is the value
# transform's namestring, and its value is the value
# returned by state.update(event). This is almost
# identical to the behavior of FORWARDER, except we
# compress the two calculated values (tnfm_id, and
# tnfm_value) into a single field. This mode is used by
# the sequential_transforms composite.
elif self.append_value:
out_message = message_copy
# the sequential_transforms composite and is the default
# if no behavior is specified by the internal state class.
elif self.sequential:
out_message = message
out_message[self.namestring] = tnfm_value
yield out_message
# If no flags are set, we create a new message containing
# just the tnfm_id, the event's datetime, and the
# calculated tnfm_value. This is the default behavior for
# a transform being fed into a merge.
else:
out_message = ndict()
out_message.tnfm_id = self.namestring
out_message.tnfm_value = tnfm_value
out_message.dt = message_copy.dt
yield out_message
log.info('Finished StatefulTransform [%s]' % self.get_hash())
class EventWindow:
"""
Abstract base class for transform classes that calculate iterative
@@ -180,8 +157,10 @@ class EventWindow:
# Market-aware mode only works with full-day windows.
if self.market_aware:
assert self.days and not self.delta,\
assert self.days and self.delta == None,\
"Market-aware mode only works with full-day windows."
self.all_holidays = deque(non_trading_days)
self.cur_holidays = deque()
# Non-market-aware mode requires a timedelta.
else:
@@ -215,6 +194,9 @@ class EventWindow:
# Subclasses should override handle_add to define behavior for
# adding new ticks.
self.handle_add(event)
if self.market_aware:
self.add_new_holidays(event.dt)
# Clear out any expired events. drop_condition changes depending
# on whether or not we are running in market_aware mode.
@@ -223,16 +205,40 @@ class EventWindow:
# | |
# V V
while self.drop_condition(self.ticks[0].dt, self.ticks[-1].dt):
# popleft removes and returns the oldest tick in self.ticks
popped = self.ticks.popleft()
# Subclasses should override handle_remove to define
# behavior for removing ticks.
self.handle_remove(popped)
def add_new_holidays(self, newest):
# Add to our tracked window any untracked holidays that are
# older than our newest event. (newest should always be
# self.ticks[-1])
while len(self.all_holidays) > 0 and self.all_holidays[0] <= newest:
self.cur_holidays.append(self.all_holidays.popleft())
def drop_old_holidays(self, oldest):
# Drop from our tracked window any holidays that are older
# than our oldest tracked event. (oldest should always
# be self.ticks[0])
while len(self.cur_holidays) > 0 and self.cur_holidays[0] < oldest:
self.cur_holidays.popleft()
def out_of_market_window(self, oldest, newest):
return trading_days_between(oldest, newest) >= self.days
self.drop_old_holidays(oldest)
calendar_dates_between = (newest.date() - oldest.date()).days
holidays_between = len(self.cur_holidays)
trading_days_between = calendar_dates_between - holidays_between
# "Put back" a day if oldest is earlier in its day than newest,
# reflecting the fact that we haven't yet completed the last
# day in the window.
if oldest.time() > newest.time():
trading_days_between -= 1
return trading_days_between >= self.days
def out_of_delta(self, oldest, newest):
return (newest - oldest) >= self.delta
+25 -4
View File
@@ -121,7 +121,6 @@ class SimulatedTrading(object):
# exit status flag
self.success = False
def simulate(self, blocking=True, send_sighup=False):
# for non-blocking,
@@ -180,7 +179,7 @@ class SimulatedTrading(object):
else:
log.warning("Sending SIGINT")
os.kill(ppid, SIGINT)
def handle_exception(self, exc):
if isinstance(exc, CancelSignal):
# signal from monitor of an orderly shutdown,
@@ -207,9 +206,8 @@ class SimulatedTrading(object):
exc_type.__name__,
exc_value.message
)
self.results_socket.send(msg)
except:
log.exception("Exception while reporting simulation exception.")
@@ -362,3 +360,26 @@ class SimulatedTrading(object):
#-------------------
return sim
class SimulatedTradingLite(object):
"""
SimulatedTrading without multiprocess and without zmq.
Useful for profiling the core logic and for rapid testing
of new features.
"""
def __init__(self,
sources,
transforms,
algorithm,
environment,
style):
self.date_sorted = date_sorted_sources(*sources)
self.transforms = transforms
# Formerly merged_transforms.
self.with_tnfms = sequential_transforms(self.date_sorted, *self.transforms)
self.trading_client = tsc(algorithm, environment, style)
self.gen = self.trading_client.simulate(self.with_tnfms)
def get_results(self):
return self.gen
+9 -7
View File
@@ -6,20 +6,22 @@ from pprint import pprint as pp
from numbers import Number
from logbook import Logger
class Timeout(Exception):
class TimeoutException(Exception):
def __init__(self, frame, message=''):
self.frame = frame
self.message = message
# TODO: fix code replication here.
class timeout(object):
class Timeout(object):
"""
Utility to make a function raise TimeoutException if it spends
more than a specified number of seconds executing. Can be used
as a decorator to apply a static timeout to a function, or as
a context manager to dynamically add a timeout to a code block.
"""
def __init__(self, seconds, message=''):
self.seconds = seconds
self.message = message
@@ -27,13 +29,13 @@ class timeout(object):
assert seconds > 0, "Timeout must be greater than 0"
def handler(self, signum, frame):
raise Timeout(frame, self.message)
raise TimeoutException(frame, self.message)
def __call__(self, fn):
@wraps(fn)
def call_fn_with_timeout(*args, **kwargs):
# Set the alarm.
# Set the alarm, saving any handler that existed previously.
signal.signal(signal.SIGALRM, self.handler)
signal.setitimer(signal.ITIMER_REAL, self.seconds, 0)
try:
@@ -63,7 +65,7 @@ class timeout(object):
signal.signal(signal.SIGALRM, self.handler)
signal.setitimer(signal.ITIMER_REAL, 0, 0)
class heartbeat(object):
class Heartbeat(object):
"""
Utility to perform pseudo-heartbeat checks on a single-threaded
function. Calls frame_handler on the current stack frame of the
@@ -89,7 +91,7 @@ class heartbeat(object):
self.frame_handler(self.count, frame)
if self.count >= self.max_intervals:
raise Timeout(frame, self.timeout_message)
raise TimeoutException(frame, self.timeout_message)
def __call__(self, fn):
+53 -297
View File
@@ -1,358 +1,114 @@
import pytz
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
from dateutil import rrule
from zipline.utils.date_utils import utcnow
def market_opens(start, end, inclusive=False):
"""
Returns all market opens between the start date and the end date.
Must use utc-stamped datetimes.
"""
return opens.between(start, end, inc=inclusive)
start = datetime(2002, 1,1, tzinfo=pytz.utc)
end = utcnow()
def market_closes(start, end, inclusive=False):
"""
Returns all market closes between the start date and the end date.
Must use utc-stamped datetimes.
"""
return closes.between(start, end, inc=inclusive)
non_trading_rules = []
def trading_days_between(start, end):
"""
Calculate the number of "complete" trading days between two
events. We define this as the number of market opens that
occurred between start and end, with the caveat that we subtract 1
from this total if end falls on the same day as the last market
open and end occurs earlier in its own day than start. This
reflects the fact that we haven't completed a full day
corresponding to the last market open.
Examples:
1.)
start = Tuesday, Aug 7, 2012, 1:00 pm
end = Wednesday, Aug 8, 2012, 1:30 pm
There is one market open between these dates, on the morning of
Wednesday the 8th. This falls on the same calendar day as end,
but end is later in the day than start, so we count this as a full
day. The correct output is 1.
2.)
start = Tuesday, Aug 7, 2012, 1:30 pm
end = Wednesday, Aug 8, 2012, 1:00 pm
There is one market open between these dayes, on the morning of
Wednesday the 8th. This falls on the same calendar day as end,
and end is earlier in the day than start, so we do not count this
day as completed. The correct output is 0.
3.)
start = Tuesday, Aug 7, 2012, 1:00 pm
end = Saturday, Aug 11, 2012, 1:30 pm
There are 3 market opens between these dates, occurring on
Wednesday, Thursday, and Friday. The last open is not on
the same day as end, so we simply return 3
4.)
start = Tuesday, Aug 7, 2012, 1:30 pm
end = Monday, Aug, 13, 2012, 1:00 pm
There are 4 market opens between these dates, occurring on
Wednesday, Thursday, Friday, and the following Monday. The
last open occurs on the same calendar day as end, and end
is earlier in the day than start, so we do not count the
last market day as completed. The correct output is 3 days.
"""
# Calculate the number of opens between the events.
opens = (market_opens(start, end))
days_between = len(opens)
if days_between == 0:
return days_between
# If end falls on the same day as an open, subtract 1 from the
# total if end is earlier in its respective day than start.
last_open = opens[-1]
if last_open.date() == end.date() and earlier_in_day(end, start):
days_between -=1
return days_between
def earlier_in_day(d1, d2):
"""
Return true if d1 falls earlier in its own day than d2.
"""
return d1.time() < d2.time()
WEEKDAYS = [rrule.MO, rrule.TU, rrule.WE, rrule.TH, rrule.FR]
# Recurrence rule that generates all market opens since Jan 1, 1970.
# This does not exclude holidays.
market_opens_with_holidays = rrule.rrule(
rrule.DAILY,
byweekday=WEEKDAYS,
byhour = 14,
byminute = 30,
weekends = rrule.rrule(
rrule.YEARLY,
byweekday=(rrule.SA, rrule.SU),
cache = True,
dtstart=datetime(2000, 1, 1, tzinfo = pytz.utc),
until=datetime(2014 , 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(weekends)
# Recurrence rule that generates all market closes since Jan 1, 1970.
# This does not exclude holidays.
market_closes_with_holidays = rrule.rrule(
rrule.DAILY,
byweekday=WEEKDAYS,
byhour = 21,
byminute = 0,
cache = True,
dtstart=datetime(2001, 1, 1, tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
# Recurrence rules for excluding the market open/close on new years.
new_years_opens = rrule.rrule(
new_years = rrule.rrule(
rrule.MONTHLY,
byyearday = 1,
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
new_years_closes = rrule.rrule(
rrule.MONTHLY,
byyearday = 1,
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(new_years)
# Recurrence rules for excluding MLK day. It is always the third
# monday in January.
mlk_opens = rrule.rrule(
rrule.MONTHLY,
bymonth = 1,
byweekday = (rrule.MO(3)),
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
mlk_closes = rrule.rrule(
mlk_day = rrule.rrule(
rrule.MONTHLY,
bymonth = 1,
byweekday = (rrule.MO(+3)),
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
dtstart = start,
until = end
)
non_trading_rules.append(mlk_day)
# Recurrence rules for generating the market open/close for
# presidents' day. Presidents' day always occurs on the third monday
# of February.
presidents_day_opens = rrule.rrule(
presidents_day = rrule.rrule(
rrule.MONTHLY,
bymonth = 2,
byweekday = (rrule.MO(3)),
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
presidents_day_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 2,
byweekday = (rrule.MO(3)),
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(presidents_day)
# Recurrence rules for generating the market open/close for good
# friday. Good friday always falls 2 days before easter, which
# thankfully is a built-in refernce in this module.
good_friday_opens = rrule.rrule(
good_friday = rrule.rrule(
rrule.DAILY,
byeaster = -2,
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
good_friday_closes = rrule.rrule(
rrule.DAILY,
byeaster = -2,
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
non_trading_rules.append(good_friday)
# Recurrence rules for generating the market open/close for memorial
# day. Memorial day always occurs on the last monday of May.
memorial_day_opens = rrule.rrule(
memorial_day = rrule.rrule(
rrule.MONTHLY,
bymonth = 5,
byweekday = (rrule.MO(-1)),
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
memorial_day_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 5,
byweekday = (rrule.MO(-1)),
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(memorial_day)
# Recurrence rules for generating the market open/close for July 4th.
july_4th_opens = rrule.rrule(
july_4th = rrule.rrule(
rrule.MONTHLY,
bymonth = 6,
bymonth = 7,
bymonthday = 4,
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
july_4th_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 6,
bymonthday = 4,
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(july_4th)
# Recurrence rule for generating the market open/close for labor day.
# Labor day is always the first monday of September.
labor_day_opens = rrule.rrule(
labor_day = rrule.rrule(
rrule.MONTHLY,
bymonth = 9,
byweekday = (rrule.MO(1)),
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
labor_day_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 9,
byweekday = (rrule.MO(1)),
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(labor_day)
# Recurrence rule for generating the market open/close for
# thanksgiving. Thanksgiving always falls on the fourth thursday in
# November. (Who decides how these holidays work!?!)
thanksgiving_opens = rrule.rrule(
thanksgiving = rrule.rrule(
rrule.MONTHLY,
bymonth = 11,
byweekday = (rrule.TH(-1)),
byhour = 14,
byminute = 30,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
thanksgiving_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 11,
byweekday = (rrule.TH(-1)),
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(thanksgiving)
# Recurrence relation for generating the market open/close for
# christmas. Christmas always occurs on december 25th.
christmas_opens = rrule.rrule(
christmas = rrule.rrule(
rrule.MONTHLY,
bymonth = 12,
bymonthday = 25,
byhour = 14,
byminute = 30,
bymonthday = 25,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
)
christmas_closes = rrule.rrule(
rrule.MONTHLY,
bymonth = 12,
bymonthday = 25,
byhour = 21,
byminute = 0,
cache = True,
dtstart = datetime(2000, 1,1,tzinfo = pytz.utc),
until=datetime(2014, 1, 1, tzinfo = pytz.utc)
dtstart = start,
until = end
)
non_trading_rules.append(christmas)
# All NYSE observed holidays.
holiday_opens = [
new_years_opens,
mlk_opens,
presidents_day_opens,
good_friday_opens,
memorial_day_opens,
july_4th_opens,
labor_day_opens,
thanksgiving_opens,
christmas_opens
]
holiday_closes = [
new_years_closes,
mlk_closes,
presidents_day_closes,
good_friday_closes,
memorial_day_closes,
july_4th_closes,
labor_day_closes,
thanksgiving_closes,
christmas_closes
]
non_trading_ruleset = rrule.rruleset()
# Valid market opens are given by all market opens minus holidays.
opens = rrule.rruleset(cache=True)
opens.rrule(market_opens_with_holidays)
for holiday_rule in holiday_opens:
opens.exrule(holiday_rule)
closes = rrule.rruleset(cache=True)
closes.rrule(market_closes_with_holidays)
for holiday_rule in holiday_closes:
closes.exrule(holiday_rule)
# This runs the calendar to load all data into a cache.
open_count = opens.count()
close_count = closes.count()
for rule in non_trading_rules:
non_trading_ruleset.rrule(rule)
non_trading_days = non_trading_ruleset.between(start, end, inc=True)