From 3ad1f250e65609effb8e22ea187488e25f8a4c91 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Wed, 16 May 2012 14:31:52 -0400 Subject: [PATCH] Cleaned up OOP, first round. --- tests/test_perf_tracking.py | 293 ++++++++++----------- zipline/components/__init__.py | 2 + zipline/components/feed.py | 7 +- zipline/components/merge.py | 22 +- zipline/components/tradesimulation.py | 165 ++++++++++++ zipline/core/component.py | 24 +- zipline/exceptions.py | 5 + zipline/finance/movingaverage.py | 26 +- zipline/finance/performance.py | 108 ++++---- zipline/finance/returns.py | 16 +- zipline/finance/risk.py | 23 +- zipline/finance/trading.py | 365 +++++++------------------- zipline/lines.py | 4 +- zipline/protocol.py | 110 ++++---- zipline/utils/date_utils.py | 26 +- zipline/utils/exception_utils.py | 16 ++ 16 files changed, 608 insertions(+), 604 deletions(-) create mode 100644 zipline/components/tradesimulation.py create mode 100644 zipline/exceptions.py create mode 100644 zipline/utils/exception_utils.py diff --git a/tests/test_perf_tracking.py b/tests/test_perf_tracking.py index e9f5b4d2..06581434 100644 --- a/tests/test_perf_tracking.py +++ b/tests/test_perf_tracking.py @@ -5,21 +5,17 @@ import datetime import pytz import zipline.utils.factory as factory -import zipline.test_algorithms -#import zipline.util as qutil import zipline.finance.performance as perf -import zipline.finance.risk as risk import zipline.protocol as zp -from zipline.finance.trading import TradeSimulationClient, TradingEnvironment, \ - SIMULATION_STYLE + +from zipline.finance.trading import TradingEnvironment class PerformanceTestCase(unittest.TestCase): - + def setUp(self): - #qutil.configure_logging() self.benchmark_returns, self.treasury_curves = \ factory.load_market_data() - + random_index = random.randint( 0, len(self.treasury_curves) @@ -27,32 +23,32 @@ class PerformanceTestCase(unittest.TestCase): for n in range(100): self.dt = self.treasury_curves.keys()[random_index] self.end_dt = self.dt + datetime.timedelta(days=365) - + now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) - + if self.end_dt <= now: break - + self.trading_environment = TradingEnvironment( - self.benchmark_returns, + self.benchmark_returns, self.treasury_curves, period_start = self.dt, period_end = self.end_dt ) - + self.onesec = datetime.timedelta(seconds=1) self.oneday = datetime.timedelta(days=1) self.tradingday = datetime.timedelta(hours=6, minutes=30) - - + + self.dt = self.trading_environment.trading_days[random_index] - + def tearDown(self): pass - + def test_long_position(self): """ - verify that the performance period calculates properly for a + verify that the performance period calculates properly for a single buy transaction """ #post some trades in the market @@ -63,30 +59,30 @@ class PerformanceTestCase(unittest.TestCase): self.onesec, self.trading_environment ) - + txn = factory.create_txn(1,10.0,100,self.dt + self.onesec) pp = perf.PerformancePeriod({}, 0.0, 1000.0) - + pp.execute_transaction(txn) for trade in trades: pp.update_last_sale(trade) - + pp.calculate_performance() - + self.assertEqual( pp.period_capital_used, -1 * txn.price * txn.amount, "capital used should be equal to the opposite of the transaction \ cost of sole txn in test" ) - + self.assertEqual(len(pp.positions),1,"should be just one position") - + self.assertEqual( pp.positions[1].sid, - txn.sid, + txn.sid, "position should be in security with id 1") - + self.assertEqual( pp.positions[1].amount, txn.amount, @@ -94,13 +90,13 @@ class PerformanceTestCase(unittest.TestCase): sharecount=txn.amount ) ) - + self.assertEqual( pp.positions[1].cost_basis, txn.price, "should have a cost basis of 10" ) - + self.assertEqual( pp.positions[1].last_sale_price, trades[-1]['price'], @@ -110,16 +106,16 @@ class PerformanceTestCase(unittest.TestCase): act=pp.positions[1].last_sale_price ) ) - + self.assertEqual( pp.ending_value, 1100, "ending value should be price of last trade times number of \ shares in position" ) - + self.assertEqual(pp.pnl, 100, "gain of 1 on 100 shares should be 100") - + def test_short_position(self): """verify that the performance period calculates properly for a \ single short-sale transaction""" @@ -130,148 +126,148 @@ single short-sale transaction""" self.onesec, self.trading_environment ) - + trades_1 = trades[:-2] - + txn = factory.create_txn(1, 10.0, -100, self.dt + self.onesec) pp = perf.PerformancePeriod({}, 0.0, 1000.0) - + pp.execute_transaction(txn) for trade in trades_1: pp.update_last_sale(trade) - + pp.calculate_performance() - + self.assertEqual( pp.period_capital_used, -1 * txn.price * txn.amount, "capital used should be equal to the opposite of the transaction\ cost of sole txn in test" ) - + self.assertEqual( len(pp.positions), 1, "should be just one position") - + self.assertEqual( pp.positions[1].sid, - txn.sid, + txn.sid, "position should be in security from the transaction" ) - + self.assertEqual( pp.positions[1].amount, -100, "should have a position of -100 shares" ) - + self.assertEqual( pp.positions[1].cost_basis, txn.price, "should have a cost basis of 10" ) - + self.assertEqual( pp.positions[1].last_sale_price, trades_1[-1]['price'], "last sale should be price of last trade" ) - + self.assertEqual( pp.ending_value, -1100, "ending value should be price of last trade times number of \ shares in position" ) - + self.assertEqual(pp.pnl,-100,"gain of 1 on 100 shares should be 100") - - # simulate additional trades, and ensure that the position value + + # simulate additional trades, and ensure that the position value # reflects the new price trades_2 = trades[-2:] - + #simulate a rollover to a new period pp2 = perf.PerformancePeriod( - pp.positions, - pp.ending_value, + pp.positions, + pp.ending_value, pp.ending_cash ) - + for trade in trades_2: pp2.update_last_sale(trade) - + pp2.calculate_performance() - + self.assertEqual( pp2.period_capital_used, 0, "capital used should be zero, there were no transactions in \ performance period" ) - + self.assertEqual( len(pp2.positions), 1, "should be just one position" ) - + self.assertEqual( pp2.positions[1].sid, - txn.sid, + txn.sid, "position should be in security from the transaction" ) - + self.assertEqual( pp2.positions[1].amount, -100, "should have a position of -100 shares" ) - + self.assertEqual( pp2.positions[1].cost_basis, txn.price, "should have a cost basis of 10" ) - + self.assertEqual( pp2.positions[1].last_sale_price, trades_2[-1].price, "last sale should be price of last trade" ) - + self.assertEqual( pp2.ending_value, -900, "ending value should be price of last trade times number of \ shares in position") - + self.assertEqual( pp2.pnl, 200, "drop of 2 on -100 shares should be 200" ) - + #now run a performance period encompassing the entire trade sample. ppTotal = perf.PerformancePeriod({}, 0.0, 1000.0) - + for trade in trades_1: ppTotal.update_last_sale(trade) - + ppTotal.execute_transaction(txn) - + for trade in trades_2: ppTotal.update_last_sale(trade) - + ppTotal.calculate_performance() - + self.assertEqual( ppTotal.period_capital_used, -1 * txn.price * txn.amount, "capital used should be equal to the opposite of the transaction \ cost of sole txn in test" ) - + self.assertEqual( len(ppTotal.positions), 1, @@ -279,44 +275,44 @@ cost of sole txn in test" ) self.assertEqual( ppTotal.positions[1].sid, - txn.sid, + txn.sid, "position should be in security from the transaction" ) - + self.assertEqual( ppTotal.positions[1].amount, -100, "should have a position of -100 shares" ) - + self.assertEqual( ppTotal.positions[1].cost_basis, txn.price, "should have a cost basis of 10" ) - + self.assertEqual( ppTotal.positions[1].last_sale_price, trades_2[-1].price, "last sale should be price of last trade" ) - + self.assertEqual( ppTotal.ending_value, -900, "ending value should be price of last trade times number of \ shares in position") - + self.assertEqual( ppTotal.pnl, 100, "drop of 1 on -100 shares should be 100" ) - + def test_covering_short(self): """verify performance where short is bought and covered, and shares \ trade after cover""" - + trades = factory.create_trade_history( 1, [10,10,10,11,9,8,7,8,9,10], @@ -324,104 +320,104 @@ trade after cover""" self.onesec, self.trading_environment ) - + short_txn = factory.create_txn( 1, 10.0, -100, self.dt + self.onesec ) - + cover_txn = factory.create_txn(1,7.0,100,self.dt + self.onesec * 6) pp = perf.PerformancePeriod({}, 0.0, 1000.0) - + pp.execute_transaction(short_txn) pp.execute_transaction(cover_txn) - + for trade in trades: pp.update_last_sale(trade) - + pp.calculate_performance() - + short_txn_cost = short_txn.price * short_txn.amount cover_txn_cost = cover_txn.price * cover_txn.amount - + self.assertEqual( pp.period_capital_used, -1 * short_txn_cost - cover_txn_cost, "capital used should be equal to the net transaction costs" ) - + self.assertEqual( len(pp.positions), 1, "should be just one position" ) - + self.assertEqual( pp.positions[1].sid, - short_txn.sid, + short_txn.sid, "position should be in security from the transaction" ) - + self.assertEqual( pp.positions[1].amount, 0, "should have a position of -100 shares" ) - + self.assertEqual( pp.positions[1].cost_basis, 0, "a covered position should have a cost basis of 0" ) - + self.assertEqual( pp.positions[1].last_sale_price, trades[-1].price, "last sale should be price of last trade" ) - + self.assertEqual( pp.ending_value, 0, "ending value should be price of last trade times number of \ shares in position" ) - + self.assertEqual( - pp.pnl, + pp.pnl, 300, "gain of 1 on 100 shares should be 300" ) - + def test_cost_basis_calc(self): trades = factory.create_trade_history( - 1, - [10,11,11,12], - [100,100,100,100], + 1, + [10,11,11,12], + [100,100,100,100], self.onesec, self.trading_environment ) - + transactions = factory.create_txn_history( - 1, - [10,11,11,12], - [100,100,100,100], + 1, + [10,11,11,12], + [100,100,100,100], self.onesec, self.trading_environment ) - + pp = perf.PerformancePeriod({}, 0.0, 1000.0) - + for txn in transactions: pp.execute_transaction(txn) - + for trade in trades: - pp.update_last_sale(trade) - + pp.update_last_sale(trade) + pp.calculate_performance() - + self.assertEqual( pp.positions[1].last_sale_price, trades[-1].price, @@ -429,72 +425,72 @@ shares in position" val=pp.positions[1].last_sale_price ) ) - + self.assertEqual( pp.positions[1].cost_basis, 11, "should have a cost basis of 11" ) - + self.assertEqual( - pp.pnl, + pp.pnl, 400 ) - + saleTxn = factory.create_txn( 1, 10.0, -100, self.dt + self.onesec * 4) - + down_tick = factory.create_trade( 1, 10.0, 100, trades[-1].dt + self.onesec) - - pp2 = perf.PerformancePeriod( - copy.deepcopy(pp.positions), - pp.ending_value, + + pp2 = perf.PerformancePeriod( + copy.deepcopy(pp.positions), + pp.ending_value, pp.ending_cash ) - + pp2.execute_transaction(saleTxn) pp2.update_last_sale(down_tick) - - pp2.calculate_performance() + + pp2.calculate_performance() self.assertEqual( pp2.positions[1].last_sale_price, 10, "should have a last sale of 10, was {val}".format(val=pp2.positions[1].last_sale_price) ) - + self.assertEqual( round(pp2.positions[1].cost_basis,2), 11.33, "should have a cost basis of 11.33" ) - + #print "second period pnl is {pnl}".format(pnl=pp2.pnl) self.assertEqual(pp2.pnl, -800, "this period goes from +400 to -400") - + pp3 = perf.PerformancePeriod({}, 0.0, 1000.0) - + transactions.append(saleTxn) for txn in transactions: pp3.execute_transaction(txn) - + trades.append(down_tick) for trade in trades: pp3.update_last_sale(trade) - + pp3.calculate_performance() self.assertEqual( pp3.positions[1].last_sale_price, 10, "should have a last sale of 10" ) - + self.assertEqual( round(pp3.positions[1].cost_basis,2), 11.33, @@ -502,47 +498,47 @@ shares in position" ) self.assertEqual( - pp3.pnl, - -400, + pp3.pnl, + -400, "should be -400 for all trades and transactions in period" ) def test_tracker(self): - + trade_count = 100 sid = 133 - price = 10.1 + price = 10.1 price_list = [price] * trade_count volume = [100] * trade_count trade_time_increment = datetime.timedelta(days=1) - trade_history = factory.create_trade_history( - sid, - price_list, - volume, - trade_time_increment, - self.trading_environment + trade_history = factory.create_trade_history( + sid, + price_list, + volume, + trade_time_increment, + self.trading_environment ) - + sid2 = 134 price2 = 12.12 - price2_list = [price2] * trade_count - trade_history2 = factory.create_trade_history( - sid2, - price2_list, - volume, - trade_time_increment, - self.trading_environment + price2_list = [price2] * trade_count + trade_history2 = factory.create_trade_history( + sid2, + price2_list, + volume, + trade_time_increment, + self.trading_environment ) - + trade_history.extend(trade_history2) - + self.trading_environment.period_start = trade_history[0].dt self.trading_environment.period_end = trade_history[-1].dt self.trading_environment.capital_base = 1000.0 self.trading_environment.frame_index = ['sid', 'volume', 'dt', \ 'price', 'changed'] perf_tracker = perf.PerformanceTracker(self.trading_environment) - + for event in trade_history: #create a transaction for all but #first trade in each sid, to simulate None transaction @@ -556,14 +552,13 @@ shares in position" }) else: txn = None - event[zp.TRANSFORM_TYPE.TRANSACTION] = txn + event[zp.TRANSFORM_TYPE.TRANSACTION] = txn perf_tracker.process_event(event) - + #we skip two trades, to test case of None transaction txn_count = len(trade_history) - 2 self.assertEqual(perf_tracker.txn_count, txn_count) - + cumulative_pos = perf_tracker.cumulative_performance.positions[sid] expected_size = txn_count / 2 * -25 self.assertEqual(cumulative_pos.amount, expected_size) - diff --git a/zipline/components/__init__.py b/zipline/components/__init__.py index b845f2db..fc570b55 100644 --- a/zipline/components/__init__.py +++ b/zipline/components/__init__.py @@ -2,10 +2,12 @@ from feed import Feed from merge import Merge from passthrough import PassthroughTransform from datasource import DataSource +from tradesimulation import TradeSimulationClient __all__ = [ Feed, Merge, PassthroughTransform, DataSource, + TradeSimulationClient, ] diff --git a/zipline/components/feed.py b/zipline/components/feed.py index bff79e79..191a1cc9 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -17,9 +17,7 @@ class Feed(Component): context (thread, process, etc) and run in another. """ - def __init__(self): - Component.__init__(self) - + def init(self): self.sent_count = 0 self.received_count = 0 self.draining = False @@ -33,9 +31,6 @@ class Feed(Component): self.sent_counters = Counter() self.recv_counters = Counter() - def init(self): - pass - @property def get_id(self): return "FEED" diff --git a/zipline/components/merge.py b/zipline/components/merge.py index 83694311..2811912e 100644 --- a/zipline/components/merge.py +++ b/zipline/components/merge.py @@ -3,21 +3,25 @@ from feed import Feed import zipline.protocol as zp from zipline.protocol import COMPONENT_TYPE -# TODO: By Liskov merge must *be* a feed, don't believe this is -# the case. +from collections import Counter class Merge(Feed): """ Merges multiple streams of events into single messages. """ - - def __init__(self): - Feed.__init__(self) - - self.init() - def init(self): - pass + self.sent_count = 0 + self.received_count = 0 + self.draining = False + self.ds_finished_counter = 0 + + # Depending on the size of this, might want to use a data + # structure with better asymptotics. + self.data_buffer = {} + + # source_id -> integer count + self.sent_counters = Counter() + self.recv_counters = Counter() @property def get_id(self): diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py new file mode 100644 index 00000000..b6df2922 --- /dev/null +++ b/zipline/components/tradesimulation.py @@ -0,0 +1,165 @@ +import logging +import datetime + +import zipline.protocol as zp +import zipline.finance.performance as perf + +from zipline.core.component import Component +from zipline.finance.trading import TransactionSimulator +from zipline.utils.protocol_utils import ndict + +LOGGER = logging.getLogger('ZiplineLogger') + +class TradeSimulationClient(Component): + + def init(self, trading_environment, sim_style): + self.received_count = 0 + self.prev_dt = None + self.event_queue = None + self.txn_count = 0 + self.order_count = 0 + self.trading_environment = trading_environment + self.current_dt = trading_environment.period_start + self.last_iteration_dur = datetime.timedelta(seconds=0) + self.algorithm = None + self.max_wait = datetime.timedelta(seconds=60) + self.last_msg_dt = datetime.datetime.utcnow() + self.txn_sim = TransactionSimulator(sim_style) + + self.event_data = ndict() + self.perf = perf.PerformanceTracker(self.trading_environment) + + @property + def get_id(self): + return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) + + def set_algorithm(self, algorithm): + """ + :param algorithm: must implement the algorithm protocol. See + :py:mod:`zipline.test.algorithm` + """ + self.algorithm = algorithm + # register the trading_client's order method with the algorithm + self.algorithm.set_order(self.order) + # ask the algorithm to initialize + self.algorithm.initialize() + + def open(self): + self.result_feed = self.connect_result() + + def do_work(self): + # poll all the sockets + socks = dict(self.poll.poll(self.heartbeat_timeout)) + + # see if the poller has results for the result_feed + if socks.get(self.result_feed) == self.zmq.POLLIN: + + self.last_msg_dt = datetime.datetime.utcnow() + + # get the next message from the result feed + msg = self.result_feed.recv() + + # if the feed is done, shut 'er down + if msg == str(zp.CONTROL_PROTOCOL.DONE): + self.finish_simulation() + return + + # result_feed is a merge component, so unframe accordingly + event = zp.MERGE_UNFRAME(msg) + self.received_count += 1 + # update performance and relay the event to the algorithm + self.process_event(event) + if self.perf.exceeded_max_loss: + self.finish_simulation() + + def finish_simulation(self): + LOGGER.info("Client is DONE!") + # signal the performance tracker that the simulation has + # ended. Perf will internally calculate the full risk report. + self.perf.handle_simulation_end() + + # signal Simulator, our ComponentHost, that this component is + # done and Simulator needn't block exit on this component. + self.signal_done() + + def process_event(self, event): + + # generate transactions, if applicable + txn = self.txn_sim.apply_trade_to_open_orders(event) + if txn: + event.TRANSACTION = txn + # track the number of transactions, for testing purposes. + self.txn_count += 1 + else: + event.TRANSACTION = None + + # the performance class needs to process each event, without + # skipping. Algorithm should wait until the performance has been + # updated, so that down stream components can safely assume that + # performance is up to date. Note that this is done before we + # mark the time for the algorithm's processing, thereby not + # running the algo's clock for performance book keeping. + self.perf.process_event(event) + + # mark the start time for client's processing of this event. + event_start = datetime.datetime.utcnow() + + + # queue the event. + self.queue_event(event) + + + # if the event is later than our current time, run the algo + # otherwise, the algorithm has fallen behind the feed + # and processing per event is longer than time between events. + if event.dt >= self.current_dt: + # compress time by moving the current_time up to the event + # time. + self.current_dt = event.dt + self.run_algorithm() + + # tally the time spent on this iteration + self.last_iteration_dur = datetime.datetime.utcnow() - event_start + # move the algorithm's clock forward to include iteration time + self.current_dt = self.current_dt + self.last_iteration_dur + + + def run_algorithm(self): + """ + As per the algorithm protocol: + + - Set the current portfolio for the algorithm as per protocol. + - Construct data based on backlog of events, send to algorithm. + """ + current_portfolio = self.perf.get_portfolio() + self.algorithm.set_portfolio(current_portfolio) + data = self.get_data() + if len(data) > 0: + self.algorithm.handle_data(data) + + def connect_order(self): + return self.connect_push_socket(self.addresses['order_address']) + + def order(self, sid, amount): + order = zp.ndict({ + 'dt':self.current_dt, + 'sid':sid, + 'amount':amount + }) + self.order_count += 1 + self.perf.log_order(order) + self.txn_sim.add_open_order(order) + + def signal_order_done(self): + self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) + + def queue_event(self, event): + if self.event_queue == None: + self.event_queue = [] + self.event_queue.append(event) + + def get_data(self): + for event in self.event_queue: + self.event_data[event['sid']] = event + self.event_queue = [] + return self.event_data diff --git a/zipline/core/component.py b/zipline/core/component.py index 479da548..d1f7f490 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -24,6 +24,8 @@ from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ LOGGER = logging.getLogger('ZiplineLogger') +from zipline.exceptions import ComponentNoInit + class Component(object): """ Base class for components. Defines the the base messaging @@ -64,7 +66,11 @@ class Component(object): """ - def __init__(self): + # ------------ + # Construction + # ------------ + + def __init__(self, *args, **kwargs): self.zmq = None self.context = None self.addresses = None @@ -90,14 +96,16 @@ class Component(object): self.guid = uuid.uuid4() self.huid = humanhash.humanize(self.guid.hex) - self.init() + # This is where component specific constructors should be + # defined. Arguments passed to init are threaded through. + self.init(*args, **kwargs) def init(self): """ Subclasses should override this to extend the setup for the class. Shouldn't have side effects. """ - pass + raise ComponentNoInit(self.__class__) # ------------ @@ -515,14 +523,6 @@ class Component(object): """ return False - def note(self): - """ - Information about the component. Mostly used for testing. - """ - - def get_note(self): - return self.note or '' - def debug(self): """ Debug information about the component. @@ -552,7 +552,7 @@ class Component(object): return "<{name} {uuid} at {host} {pid} {pointer}>".format( name = self.get_id , - uuid = self.huid , + uuid = self.guid , host = socket.gethostname() , pid = os.getpid() , pointer = hex(id(self)) , diff --git a/zipline/exceptions.py b/zipline/exceptions.py new file mode 100644 index 00000000..0c05a67e --- /dev/null +++ b/zipline/exceptions.py @@ -0,0 +1,5 @@ +from utils.exception_utils import CustomException + +class ComponentNoInit(CustomException): + argmap = ('classname',) + message = """Class {classname} does not define an init method.""" diff --git a/zipline/finance/movingaverage.py b/zipline/finance/movingaverage.py index 349a6638..bdd3bfc8 100644 --- a/zipline/finance/movingaverage.py +++ b/zipline/finance/movingaverage.py @@ -4,35 +4,35 @@ from collections import defaultdict from zipline.transforms.base import BaseTransform class MovingAverageTransform(BaseTransform): - + def init(self, daycount=3): self.daycount = daycount self.by_sid = defaultdict(self._create) - + def transform(self, event): cur = self.by_sid[event.sid] cur.update(event) self.state['value'] = cur.average return self.state - + def _create(self): return MovingAverage(self.daycount) class MovingAverage(object): - + def __init__(self, daycount): self.window = EventWindow(daycount) self.total = 0.0 self.average = 0.0 - + def update(self, event): self.window.update(event) - + self.total += event.price - + for dropped in self.window.dropped_ticks: self.total -= dropped.price - + if len(self.window.ticks) > 0: self.average = self.total / len(self.window.ticks) else: @@ -47,21 +47,19 @@ class EventWindow(object): self.ticks = [] self.dropped_ticks = [] self.delta = timedelta(days=daycount) - + def update(self, event): # add new event - self.ticks.append(event) + self.ticks.append(event) # determine which events are expired last_date = event['dt'] first_date = last_date - self.delta - + self.dropped_ticks = [] for tick in self.ticks: if tick['dt'] <= first_date: self.dropped_ticks.append(tick) # remove the expired events - slice_index = len(self.dropped_ticks) + slice_index = len(self.dropped_ticks) self.ticks = self.ticks[slice_index:] - - diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 6bf70058..d72c229d 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -130,7 +130,7 @@ class PerformanceTracker(): Tracks the performance of the zipline as it is running in the simulator, relays this out to the Deluge broker and then to the client. Visually: - + +--------------------+ Result Stream +--------+ | PerformanceTracker | ----------------> | Deluge | +--------------------+ +--------+ @@ -138,8 +138,8 @@ class PerformanceTracker(): """ def __init__(self, trading_environment): - - + + self.trading_environment = trading_environment self.trading_day = datetime.timedelta(hours = 6, minutes = 30) self.calendar_day = datetime.timedelta(hours = 24) @@ -152,7 +152,7 @@ class PerformanceTracker(): self.progress = 0.0 self.total_days = self.trading_environment.days_in_period # one indexed so that we reach 100% - self.day_count = 0.0 + self.day_count = 0.0 self.capital_base = self.trading_environment.capital_base self.returns = [] self.txn_count = 0 @@ -174,7 +174,7 @@ class PerformanceTracker(): self.period_start, self.period_end ) - + # this performance period will span just the current market day self.todays_performance = PerformancePeriod( # initial positions are empty @@ -220,17 +220,17 @@ class PerformanceTracker(): 'capital_base' : self.capital_base, 'cumulative_perf' : self.cumulative_performance.to_dict(), 'daily_perf' : self.todays_performance.to_dict(), - 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict() + 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict() } - + def log_order(self, order): self.order_log.append(order) - + def process_event(self, event): - + if self.exceeded_max_loss: return - + assert isinstance(event, zp.ndict) self.event_count += 1 @@ -241,7 +241,7 @@ class PerformanceTracker(): self.txn_count += 1 self.cumulative_performance.execute_transaction(event.TRANSACTION) self.todays_performance.execute_transaction(event.TRANSACTION) - + #update last sale self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) @@ -251,7 +251,7 @@ class PerformanceTracker(): #calculate performance as of last trade self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() - + # add the return results from today to the list of DailyReturn objects. todays_date = self.market_close.replace(hour=0, minute=0, second=0) todays_return_obj = risk.DailyReturn( @@ -267,17 +267,17 @@ class PerformanceTracker(): returns=self.returns, trading_environment=self.trading_environment ) - + # increment the day counter before we move markers forward. self.day_count += 1.0 # calculate progress of test self.progress = self.day_count / self.total_days - + # Output results if self.result_stream: msg = zp.PERF_FRAME(self.to_dict()) self.result_stream.send(msg) - + # if self.trading_environment.max_drawdown: returns = self.todays_performance.returns @@ -285,13 +285,13 @@ class PerformanceTracker(): if returns < max_dd: LOGGER.info(str(returns) + " broke through " + str(max_dd)) LOGGER.info("Exceeded max drawdown.") - # mark the perf period with max loss flag, + # mark the perf period with max loss flag, # so it shows up in the update, but don't end the test # here. Let the update go out before stopping self.exceeded_max_loss = True return - - + + #move the market day markers forward self.market_open = self.market_open + self.calendar_day @@ -301,7 +301,7 @@ class PerformanceTracker(): self.market_open = self.market_open + self.calendar_day self.market_close = self.market_open + self.trading_day - + # Roll over positions to current day. self.todays_performance = PerformancePeriod( self.todays_performance.positions, @@ -317,27 +317,27 @@ class PerformanceTracker(): When the simulation is complete, run the full period risk report and send it out on the result_stream. """ - + log_msg = "Simulated {n} trading days out of {m}." LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) - + # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. # In the case of max drawdown, we needn't close again. if not self.exceeded_max_loss: self.handle_market_close() - + self.risk_report = risk.RiskReport( self.returns, self.trading_environment, exceeded_max_loss = self.exceeded_max_loss ) - + if self.result_stream: LOGGER.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() - + msg = zp.RISK_FRAME(risk_dict) self.result_stream.send(msg) # this signals that the simulation is complete. @@ -399,17 +399,17 @@ class Position(): class PerformancePeriod(): def __init__( - self, - initial_positions, - starting_value, + self, + initial_positions, + starting_value, starting_cash, period_open=None, - period_close=None, + period_close=None, keep_transactions=False): - + self.period_open = period_open self.period_close = period_close - + self.ending_value = 0.0 self.period_capital_used = 0.0 self.pnl = 0.0 @@ -424,7 +424,7 @@ class PerformancePeriod(): self.cumulative_capital_used = 0.0 self.max_capital_used = 0.0 self.max_leverage = 0.0 - + self.calculate_performance() def calculate_performance(self): @@ -441,39 +441,39 @@ class PerformancePeriod(): self.returns = 0.0 def execute_transaction(self, txn): - + # Update Position # ---------------- if(not self.positions.has_key(txn.sid)): self.positions[txn.sid] = Position(txn.sid) self.positions[txn.sid].update(txn) self.period_capital_used += -1 * txn.price * txn.amount - + # Max Leverage # --------------- # Calculate the maximum capital used and maximum leverage - + transaction_cost = txn.price * txn.amount self.cumulative_capital_used += transaction_cost if math.fabs(self.cumulative_capital_used) > self.max_capital_used: self.max_capital_used = math.fabs(self.cumulative_capital_used) - + # We want to conveye a level, rather than a precise figure. # round to the nearest 5,000 to keep the number easy on the eyes self.max_capital_used = self.round_to_nearest( self.max_capital_used, base=5000 ) - + # we're adding a 10% cushion to the capital used. self.max_leverage = 1.1 * self.max_capital_used / self.starting_cash - - # add transaction to the list of processed transactions + + # add transaction to the list of processed transactions if self.keep_transactions: self.processed_transactions.append(txn) - + def round_to_nearest(self, x, base=5): return int(base * round(float(x)/base)) @@ -491,7 +491,7 @@ class PerformancePeriod(): def to_dict(self): """ - Creates a dictionary representing the state of this performance + Creates a dictionary representing the state of this performance period. See header comments for a detailed description. """ positions = self.get_positions_list() @@ -514,24 +514,24 @@ class PerformancePeriod(): 'period_open' : self.period_open, 'period_close' : self.period_close } - + # we want the key to be absent, not just empty if not self.keep_transactions: - del(rval['transactions']) - + del rval['transactions'] + return rval - + def to_ndict(self): """ Creates a ndict representing the state of this perfomance period. Properties are the same as the results of to_dict. See header comments - for a detailed description. - + for a detailed description. + """ positions = self.get_positions(ndicted=True) - + positions = zp.ndict(positions) - + return zp.ndict({ 'ending_value' : self.ending_value, 'capital_used' : self.period_capital_used, @@ -540,11 +540,11 @@ class PerformancePeriod(): 'ending_cash' : self.ending_cash, 'cumulative_capital_used' : self.cumulative_capital_used, 'max_capital_used' : self.max_capital_used, - 'max_leverage' : self.max_leverage, + 'max_leverage' : self.max_leverage, 'positions' : positions, 'transactions' : self.processed_transactions }) - + def get_positions(self, ndicted=False): positions = {} for sid, pos in self.positions.iteritems(): @@ -553,9 +553,9 @@ class PerformancePeriod(): positions[sid] = zp.ndict(cur) else: positions[sid] = cur - + return positions - + # def get_positions_list(self): positions = [] @@ -563,7 +563,3 @@ class PerformancePeriod(): cur = pos.to_dict() positions.append(cur) return positions - - - - diff --git a/zipline/finance/returns.py b/zipline/finance/returns.py index 5e031f15..3077d515 100644 --- a/zipline/finance/returns.py +++ b/zipline/finance/returns.py @@ -2,16 +2,16 @@ from collections import defaultdict from zipline.transforms.base import BaseTransform class ReturnsTransform(BaseTransform): - + def init(self): self.by_sid = defaultdict(self._create) - + def transform(self, event): cur = self.by_sid[event.sid] cur.update(event) self.state['value'] = cur.returns return self.state - + def _create(self): return ReturnsFromPriorClose() @@ -20,24 +20,24 @@ class ReturnsFromPriorClose(object): Calculates a security's returns since the previous close, using the current price. """ - + def __init__(self): self.last_close = None self.last_event = None self.returns = 0.0 - + def update(self, event): next_close = None if self.last_close: change = event.price - self.last_close.price self.returns = change / self.last_close.price - + if self.last_event: if self.last_event.dt.day != event.dt.day: # the current event is from the day after # the last event. Therefore the last event was # the last close self.last_close = self.last_event - + # the current event is now the last_event - self.last_event = event \ No newline at end of file + self.last_event = event diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index c68324bd..f54fcab9 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -39,7 +39,6 @@ Risk Report import logging import datetime import math -import pytz import numpy as np import numpy.linalg as la import zipline.protocol as zp @@ -65,7 +64,7 @@ def advance_by_months(dt, jump_in_months): class DailyReturn(): def __init__(self, date, returns): - + assert isinstance(date, datetime.datetime) self.date = date.replace(hour=0, minute=0, second=0) self.returns = returns @@ -83,18 +82,18 @@ class DailyReturn(): class RiskMetrics(): def __init__(self, start_date, end_date, returns, trading_environment): - self.treasury_curves = trading_environment.treasury_curves + self.treasury_curves = trading_environment.treasury_curves self.start_date = start_date self.end_date = end_date self.trading_environment = trading_environment self.algorithm_period_returns, self.algorithm_returns = \ self.calculate_period_returns(returns) - + benchmark_returns = [ x for x in self.trading_environment.benchmark_returns if x.date >= returns[0].date and x.date <= returns[-1].date ] - + self.benchmark_period_returns, self.benchmark_returns = \ self.calculate_period_returns(benchmark_returns) @@ -196,7 +195,7 @@ class RiskMetrics(): """ if self.algorithm_volatility == 0: return 0.0 - + return ( (self.algorithm_period_returns - self.treasury_period_return) / self.algorithm_volatility ) @@ -311,15 +310,15 @@ class RiskMetrics(): that date doesn't exceed treasury history range." message = message.format(dt=self.end_date,term=self.treasury_duration) raise Exception(message) - + class RiskReport(): def __init__( - self, - algorithm_returns, - trading_environment, + self, + algorithm_returns, + trading_environment, exceeded_max_loss=False): """ algorithm_returns needs to be a list of daily_return objects @@ -336,7 +335,7 @@ class RiskReport(): else: start_date = self.algorithm_returns[0].date end_date = self.algorithm_returns[-1].date - + self.month_periods = self.periodsInRange(1, start_date, end_date) self.three_month_periods = self.periodsInRange(3, start_date, end_date) self.six_month_periods = self.periodsInRange(6, start_date, end_date) @@ -370,7 +369,7 @@ class RiskReport(): ends = [] cur_start = start.replace(day=1) - # in edge cases (all sids filtered out, start/end are adjacent) + # in edge cases (all sids filtered out, start/end are adjacent) # a test will not generate any returns data if len(self.algorithm_returns) == 0: return ends diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 0efafdc4..6b5ac1f4 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -1,198 +1,23 @@ -import logging -import datetime import pytz import math -import time +import logging +import datetime -from collections import Counter - -# from gevent.select import select - -from zipline.core import Component import zipline.protocol as zp -import zipline.finance.performance as perf - -from zipline.utils.protocol_utils import Enum, ndict - -# the simulation style enumerates the available transaction simulation -# strategies. -SIMULATION_STYLE = Enum( - 'PARTIAL_VOLUME', - 'BUY_ALL', - 'FIXED_SLIPPAGE', - 'NOOP' -) +from zipline.protocol import SIMULATION_STYLE LOGGER = logging.getLogger('ZiplineLogger') -class TradeSimulationClient(Component): - - def __init__(self, trading_environment, sim_style): - Component.__init__(self) - self.received_count = 0 - self.prev_dt = None - self.event_queue = None - self.txn_count = 0 - self.order_count = 0 - self.trading_environment = trading_environment - self.current_dt = trading_environment.period_start - self.last_iteration_dur = datetime.timedelta(seconds=0) - self.algorithm = None - self.max_wait = datetime.timedelta(seconds=60) - self.last_msg_dt = datetime.datetime.utcnow() - self.txn_sim = TransactionSimulator(sim_style) - - self.event_data = ndict() - self.perf = perf.PerformanceTracker(self.trading_environment) - - @property - def get_id(self): - return str(zp.FINANCE_COMPONENT.TRADING_CLIENT) - - def set_algorithm(self, algorithm): - """ - :param algorithm: must implement the algorithm protocol. See - :py:mod:`zipline.test.algorithm` - """ - self.algorithm = algorithm - # register the trading_client's order method with the algorithm - self.algorithm.set_order(self.order) - # ask the algorithm to initialize - self.algorithm.initialize() - - def open(self): - self.result_feed = self.connect_result() - - def do_work(self): - # poll all the sockets - socks = dict(self.poll.poll(self.heartbeat_timeout)) - - # see if the poller has results for the result_feed - if self.result_feed in socks and \ - socks[self.result_feed] == self.zmq.POLLIN: - - self.last_msg_dt = datetime.datetime.utcnow() - - # get the next message from the result feed - msg = self.result_feed.recv() - - # if the feed is done, shut 'er down - if msg == str(zp.CONTROL_PROTOCOL.DONE): - self.finish_simulation() - return - - # result_feed is a merge component, so unframe accordingly - event = zp.MERGE_UNFRAME(msg) - self.received_count += 1 - # update performance and relay the event to the algorithm - self.process_event(event) - if self.perf.exceeded_max_loss: - self.finish_simulation() - - def finish_simulation(self): - LOGGER.info("Client is DONE!") - # signal the performance tracker that the simulation has - # ended. Perf will internally calculate the full risk report. - self.perf.handle_simulation_end() - - # signal Simulator, our ComponentHost, that this component is - # done and Simulator needn't block exit on this component. - self.signal_done() - - def process_event(self, event): - - - # generate transactions, if applicable - txn = self.txn_sim.apply_trade_to_open_orders(event) - if txn: - event.TRANSACTION = txn - # track the number of transactions, for testing purposes. - self.txn_count += 1 - else: - event.TRANSACTION = None - - # the performance class needs to process each event, without - # skipping. Algorithm should wait until the performance has been - # updated, so that down stream components can safely assume that - # performance is up to date. Note that this is done before we - # mark the time for the algorithm's processing, thereby not - # running the algo's clock for performance book keeping. - self.perf.process_event(event) - - # mark the start time for client's processing of this event. - event_start = datetime.datetime.utcnow() - - - # queue the event. - self.queue_event(event) - - - # if the event is later than our current time, run the algo - # otherwise, the algorithm has fallen behind the feed - # and processing per event is longer than time between events. - if event.dt >= self.current_dt: - # compress time by moving the current_time up to the event - # time. - self.current_dt = event.dt - self.run_algorithm() - - # tally the time spent on this iteration - self.last_iteration_dur = datetime.datetime.utcnow() - event_start - # move the algorithm's clock forward to include iteration time - self.current_dt = self.current_dt + self.last_iteration_dur - - - def run_algorithm(self): - """ - As per the algorithm protocol: - - - Set the current portfolio for the algorithm as per protocol. - - Construct data based on backlog of events, send to algorithm. - """ - current_portfolio = self.perf.get_portfolio() - self.algorithm.set_portfolio(current_portfolio) - data = self.get_data() - if len(data) > 0: - self.algorithm.handle_data(data) - - def connect_order(self): - return self.connect_push_socket(self.addresses['order_address']) - - def order(self, sid, amount): - order = zp.ndict({ - 'dt':self.current_dt, - 'sid':sid, - 'amount':amount - }) - self.order_count += 1 - self.perf.log_order(order) - self.txn_sim.add_open_order(order) - - def signal_order_done(self): - self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) - - def queue_event(self, event): - if self.event_queue == None: - self.event_queue = [] - self.event_queue.append(event) - - def get_data(self): - for event in self.event_queue: - self.event_data[event['sid']] = event - self.event_queue = [] - return self.event_data - - class TransactionSimulator(object): - - def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): + + def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): self.open_orders = {} self.order_count = 0 self.txn_count = 0 self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) self.commission = 0.03 - + if not style or style == SIMULATION_STYLE.PARTIAL_VOLUME: self.apply_trade_to_open_orders = self.simulate_with_partial_volume elif style == SIMULATION_STYLE.BUY_ALL: @@ -201,83 +26,82 @@ class TransactionSimulator(object): self.apply_trade_to_open_orders = self.simulate_with_fixed_cost elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop - + def add_open_order(self, event): - """Orders are captured in a buffer by sid. No calculations are done here. - Amount is explicitly converted to an int. - Orders of amount zero are ignored. - """ + # Orders are captured in a buffer by sid. No calculations are done here. + # Amount is explicitly converted to an int. + # Orders of amount zero are ignored. + self.order_count += 1 - event.amount = int(event.amount) + if event.amount == 0: log = "requested to trade zero shares of {sid}".format( sid=event.sid ) LOGGER.debug(log) return - - if(not self.open_orders.has_key(event.sid)): + + if not self.open_orders.has_key(event.sid): self.open_orders[event.sid] = [] - + # set the filled property to zero event.filled = 0 self.open_orders[event.sid].append(event) - + def simulate_buy_all(self, event): txn = self.create_transaction( - event.sid, - event.volume, - event.price, - event.dt, - 1 - ) + event.sid, + event.volume, + event.price, + event.dt, + 1 + ) return txn - + def simulate_noop(self, event): - return None - + return None + def simulate_with_fixed_cost(self, event): if self.open_orders.has_key(event.sid): - orders = self.open_orders[event.sid] + orders = self.open_orders[event.sid] orders = sorted(orders, key=lambda o: o.dt) else: return None - + amount = 0 for order in orders: amount += order.amount - + if(amount == 0): return - + direction = amount / math.fabs(amount) - - + txn = self.create_transaction( - event.sid, - amount, - event.price + 0.10, - event.dt, - direction - ) - + event.sid, + amount, + event.price + 0.10, + event.dt, + direction + ) + self.open_orders[event.sid] = [] - + return txn - + def simulate_with_partial_volume(self, event): if(event.volume == 0): - #there are zero volume events bc some stocks trade + #there are zero volume events bc some stocks trade #less frequently than once per minute. return None - + if self.open_orders.has_key(event.sid): - orders = self.open_orders[event.sid] + orders = self.open_orders[event.sid] orders = sorted(orders, key=lambda o: o.dt) else: return None - + dt = event.dt expired = [] total_order = 0 @@ -285,87 +109,87 @@ class TransactionSimulator(object): simulated_impact = 0.0 direction = 1.0 for order in orders: - + if(order.dt < event.dt): - + # orders are only good on the day they are issued if order.dt.day < event.dt.day: continue - + open_amount = order.amount - order.filled - + if(open_amount != 0): direction = open_amount / math.fabs(open_amount) else: direction = 1 - + desired_order = total_order + open_amount - + volume_share = direction * (desired_order) / event.volume if volume_share > .25: volume_share = .25 simulated_amount = int(volume_share * event.volume * direction) simulated_impact = (volume_share)**2 * .1 * direction * event.price - + order.filled += (simulated_amount - total_order) total_order = simulated_amount - + # we cap the volume share at 25% of a trade if volume_share == .25: break - + orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] - + self.open_orders[event.sid] = orders - - + + if simulated_amount != 0: return self.create_transaction( - event.sid, - simulated_amount, - event.price + simulated_impact, - dt.replace(tzinfo = pytz.utc), + event.sid, + simulated_amount, + event.price + simulated_impact, + dt.replace(tzinfo = pytz.utc), direction ) elif len(orders) > 0: warning = """ -Calculated a zero volume transaction on trade: -{event} -for orders: +Calculated a zero volume transaction on trade: +{event} +for orders: {orders} """ warning = warning.format( - event=str(event), + event=str(event), orders=str(orders) ) LOGGER.warn(warning) return None - - - def create_transaction(self, sid, amount, price, dt, direction): - self.txn_count += 1 - txn = {'sid' : sid, - 'amount' : int(amount), - 'dt' : dt, - 'price' : price, + + + def create_transaction(self, sid, amount, price, dt, direction): + self.txn_count += 1 + txn = {'sid' : sid, + 'amount' : int(amount), + 'dt' : dt, + 'price' : price, 'commission' : self.commission * amount * direction, 'source_id' : zp.FINANCE_COMPONENT.TRANSACTION_SIM } - return zp.ndict(txn) - + return zp.ndict(txn) + class TradingEnvironment(object): def __init__( - self, - benchmark_returns, - treasury_curves, - period_start = None, - period_end = None, + self, + benchmark_returns, + treasury_curves, + period_start = None, + period_end = None, capital_base = None, max_drawdown = None ): - + self.trading_days = [] self.trading_day_map = {} self.treasury_curves = treasury_curves @@ -375,11 +199,11 @@ class TradingEnvironment(object): self.capital_base = capital_base self.period_trading_days = None self.max_drawdown = max_drawdown - + for bm in benchmark_returns: self.trading_days.append(bm.date) self.trading_day_map[bm.date] = bm - + self.first_open = self.calculate_first_open() self.last_close = self.calculate_last_close() @@ -389,25 +213,25 @@ class TradingEnvironment(object): """ first_open = self.period_start one_day = datetime.timedelta(days=1) - + while not self.is_trading_day(first_open): first_open = first_open + one_day first_open = self.set_NYSE_time(first_open, 9, 30) return first_open - + def calculate_last_close(self): """ Finds the last trading day on or before self.period_end """ last_close = self.period_end one_day = datetime.timedelta(days=1) - + while not self.is_trading_day(last_close): last_close = last_close - one_day - + last_close = self.set_NYSE_time(last_close, 16, 00) - + return last_close #TODO: add other exchanges and timezones... @@ -432,13 +256,13 @@ class TradingEnvironment(object): day=test_date.day, tzinfo=pytz.utc ) - + @property def days_in_period(self): """return the number of trading days within the period [start, end)""" assert(self.period_start != None) assert(self.period_end != None) - + if self.period_trading_days == None: self.period_trading_days = [] for date in self.trading_days: @@ -446,18 +270,18 @@ class TradingEnvironment(object): break if date >= self.period_start: self.period_trading_days.append(date) - - + + return len(self.period_trading_days) - + def is_market_hours(self, test_date): if not self.is_trading_day(test_date): return False - + mkt_open = self.set_NYSE_time(test_date, 9, 30) #TODO: half days? mkt_close = self.set_NYSE_time(test_date, 16, 00) - + return test_date >= mkt_open and test_date <= mkt_close def is_trading_day(self, test_date): @@ -470,6 +294,3 @@ class TradingEnvironment(object): return self.trading_day_map[date].returns else: return 0.0 - - - diff --git a/zipline/lines.py b/zipline/lines.py index 7bfea1c5..b606c458 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -68,7 +68,7 @@ from zipline.components import DataSource from zipline.transforms import BaseTransform from zipline.test_algorithms import TestAlgorithm -from zipline.finance.trading import TradeSimulationClient +from zipline.components import TradeSimulationClient from zipline.core.devsimulator import Simulator from zipline.core.monitor import Controller from zipline.finance.trading import SIMULATION_STYLE @@ -335,7 +335,7 @@ class SimulatedTrading(object): #self.allocator.reaquire(*self.leased_sockets) #-------------------------------- - # Component property accessors + # Component property accessors #-------------------------------- def get_positions(self): diff --git a/zipline/protocol.py b/zipline/protocol.py index 374f2fda..8db194cb 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -120,7 +120,7 @@ import datetime import pytz from collections import namedtuple -from utils.protocol_utils import Enum, FrameExceptionFactory, ndict +from utils.protocol_utils import Enum, FrameExceptionFactory, ndict, namelookup from utils.date_utils import EPOCH, UN_EPOCH # ----------------------- @@ -217,39 +217,39 @@ def DATASOURCE_FRAME(event): """ Wraps any datasource payload with id and type, so that unpacking may choose the write UNFRAME for the payload. - + :param event: ndict with following properties - *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator) - *ds_type* a string denoting the datasource type. Must be on of: - + - TRADE - (others to follow soon) - + - *payload* a msgpack string carrying the payload for the frame """ assert isinstance(event.source_id, basestring) assert isinstance(event.type, int), 'Unexpected type %s' % (event.type) - + #datasources will send sometimes send empty msgs to feel gaps if len(event.keys()) == 2: return msgpack.dumps(tuple([ - event.type, - event.source_id, + event.type, + event.source_id, DATASOURCE_TYPE.EMPTY ])) if(event.type == DATASOURCE_TYPE.TRADE): return msgpack.dumps(tuple([ - event.type, - event.source_id, + event.type, + event.source_id, TRADE_FRAME(event) ])) elif(event.type == DATASOURCE_TYPE.ORDER): return msgpack.dumps(tuple([ - event.type, - event.source_id, + event.type, + event.source_id, ORDER_SOURCE_FRAME(event) ])) else: @@ -376,7 +376,7 @@ INVALID_MERGE_FRAME = FrameExceptionFactory('MERGE') def MERGE_FRAME(event): """ :param event: a nameddict with at least: - + - source_id - type """ @@ -416,7 +416,7 @@ INVALID_ORDER_FRAME = FrameExceptionFactory('ORDER') INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE') # ----------------------- -# Trades +# Trades # ----------------------- # # - Should only be called from inside DATASOURCE_ (UN)FRAME. @@ -424,7 +424,7 @@ INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE') def TRADE_FRAME(event): """ :param event: should be a ndict with: - + - ds_id -- the datasource id sending this trade out - sid -- the security id - price -- float of the price printed for the trade @@ -469,7 +469,7 @@ def TRADE_UNFRAME(msg): raise INVALID_TRADE_FRAME(msg) # ----------------------- -# Orders +# Orders # ----------------------- # - from client to order source @@ -478,7 +478,7 @@ def ORDER_FRAME(order): assert isinstance(order.amount, int) #no partial shares... PACK_DATE(order) return msgpack.dumps(tuple([ - order.sid, + order.sid, order.amount, order.dt ])) @@ -503,9 +503,9 @@ def ORDER_UNFRAME(msg): # ----------------------- -# TRANSACTIONS +# TRANSACTIONS # ----------------------- -# +# # - Should only be called from inside TRANSFORM_(UN)FRAME. @@ -550,7 +550,7 @@ def TRANSACTION_UNFRAME(msg): # ----------------------- -# ORDERS +# ORDERS # ----------------------- # # - from order source to feed @@ -592,7 +592,7 @@ def ORDER_SOURCE_UNFRAME(msg): raise INVALID_ORDER_FRAME(msg) except ValueError: raise INVALID_ORDER_FRAME(msg) - + # ----------------------- # Performance and Risk # ----------------------- @@ -607,21 +607,21 @@ def PERF_FRAME(perf): :param perf: the dictionary created by zipline.trade_client.perf :rvalue: a msgpack string """ - + #TODO: add asserts... - + assert isinstance(perf['started_at'], datetime.datetime) assert isinstance(perf['period_start'], datetime.datetime) assert isinstance(perf['period_end'], datetime.datetime) - + assert isinstance(perf['daily_perf'], dict) assert isinstance(perf['cumulative_perf'], dict) - + tp = perf['daily_perf'] cp = perf['cumulative_perf'] - + assert isinstance(tp['transactions'], list) - # we never want to send transactions for the cumulative period. + # we never want to send transactions for the cumulative period. # performance.py should never send them, but just to be safe: assert not cp.has_key('transactions') assert isinstance(tp['positions'], list) @@ -630,7 +630,7 @@ def PERF_FRAME(perf): assert isinstance(tp['period_open'], datetime.datetime) assert isinstance(cp['period_close'], datetime.datetime) assert isinstance(cp['period_open'], datetime.datetime) - + perf['started_at'] = EPOCH(perf['started_at']) perf['period_start'] = EPOCH(perf['period_start']) perf['period_end'] = EPOCH(perf['period_end']) @@ -638,11 +638,11 @@ def PERF_FRAME(perf): tp['period_open'] = EPOCH(tp['period_open']) cp['period_close'] = EPOCH(cp['period_close']) cp['period_open'] = EPOCH(cp['period_open']) - + tp['transactions'] = convert_transactions(tp['transactions']) return BT_UPDATE_FRAME('PERF', perf) - + def convert_transactions(transactions): results = [] for txn in transactions: @@ -651,18 +651,18 @@ def convert_transactions(transactions): del(txn['source_id']) results.append(txn) return results - + def RISK_FRAME(risk): return BT_UPDATE_FRAME('RISK', risk) - + def BT_UPDATE_FRAME(prefix, payload): """ - Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same + Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same socket. This method provides a prefix to allow for muxing the messages onto a single socket. """ return msgpack.dumps(tuple([prefix, payload])) - + def BT_UPDATE_UNFRAME(msg): """ Risk and Perf framing methods prefix the payload with @@ -675,23 +675,23 @@ def BT_UPDATE_UNFRAME(msg): # ----------------------- # Date Helpers # ----------------------- - + def PACK_DATE(event): """ Packs the datetime property of event into msgpack'able longs. - This function should be called purely for its side effects. + This function should be called purely for its side effects. The event's 'dt' property is replaced by a tuple of integers - + - year, month, day, hour, minute, second, microsecond - - PACK_DATE and UNPACK_DATE are inverse operations. - + + PACK_DATE and UNPACK_DATE are inverse operations. + :param event: event must a ndict with a property named 'dt' that is a datetime. :rtype: None """ assert isinstance(event.dt, datetime.datetime) # utc only please - assert event.dt.tzinfo == pytz.utc + assert event.dt.tzinfo == pytz.utc event['dt'] = date_to_tuple(event['dt']) def date_to_tuple(dt): @@ -702,18 +702,18 @@ def date_to_tuple(dt): def UNPACK_DATE(event): """ Unpacks the datetime property of event from msgpack'able longs. - This function should be called purely for its side effects. - The event's 'dt' property is converted to a datetime by reading and then + This function should be called purely for its side effects. + The event's 'dt' property is converted to a datetime by reading and then combining a tuple of integers. - - UNPACK_DATE and PACK_DATE are inverse operations. - + + UNPACK_DATE and PACK_DATE are inverse operations. + :param tuple event: event must a ndict with: - + - a property named 'dt_tuple' that is a tuple of integers \ - representing the date and time in UTC. + representing the date and time in UTC. - dt_tuple must have year, month, day, hour, minute, second, and microsecond - + :rtype: None """ assert isinstance(event.dt, tuple) @@ -721,13 +721,13 @@ def UNPACK_DATE(event): for item in event.dt: assert isinstance(item, numbers.Integral) event.dt = tuple_to_date(event.dt) - + def tuple_to_date(date_tuple): year, month, day, hour, minute, second, micros = date_tuple dt = datetime.datetime(year, month, day, hour, minute, second) dt = dt.replace(microsecond = micros, tzinfo = pytz.utc) return dt - + DATASOURCE_TYPE = Enum( 'ORDER', 'TRADE', @@ -748,7 +748,7 @@ TRANSFORM_TYPE = ndict({ }) -FINANCE_COMPONENT = ndict({ +FINANCE_COMPONENT = namelookup({ 'TRADING_CLIENT' : 'TRADING_CLIENT', 'PORTFOLIO_CLIENT' : 'PORTFOLIO_CLIENT', 'ORDER_SOURCE' : 'ORDER_SOURCE', @@ -756,3 +756,11 @@ FINANCE_COMPONENT = ndict({ }) +# the simulation style enumerates the available transaction simulation +# strategies. +SIMULATION_STYLE = Enum( + 'PARTIAL_VOLUME', + 'BUY_ALL', + 'FIXED_SLIPPAGE', + 'NOOP' +) diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py index 85ff98d3..0c28e983 100644 --- a/zipline/utils/date_utils.py +++ b/zipline/utils/date_utils.py @@ -24,38 +24,38 @@ def parse_iso8061(date_string): UNIX_EPOCH = datetime(1970, 1, 1, 0, 0, tzinfo = pytz.utc) def EPOCH(utc_datetime): """ - The key is to ensure all the dates you are using are in the utc timezone - before you start converting. See http://pytz.sourceforge.net/ to learn how - to do that properly. By normalizing to utc, you eliminate the ambiguity of - daylight savings transitions. Then you can safely use timedelta to calculate + The key is to ensure all the dates you are using are in the utc timezone + before you start converting. See http://pytz.sourceforge.net/ to learn how + to do that properly. By normalizing to utc, you eliminate the ambiguity of + daylight savings transitions. Then you can safely use timedelta to calculate distance from the unix epoch, and then convert to seconds or milliseconds. - - Note that the resulting unix timestamp is itself in the UTC timezone. If you - wish to see the timestamp in a localized timezone, you will need to make + + Note that the resulting unix timestamp is itself in the UTC timezone. If you + wish to see the timestamp in a localized timezone, you will need to make another conversion. - + Also note that this will only work for dates after 1970. """ assert isinstance(utc_datetime, datetime) # utc only please assert utc_datetime.tzinfo == pytz.utc - + # how long since the epoch? delta = utc_datetime - UNIX_EPOCH seconds = delta.total_seconds() ms = seconds * 1000 return ms - + def UN_EPOCH(ms_since_epoch): seconds_since_epoch = ms_since_epoch / 1000 delta = timedelta(seconds = seconds_since_epoch) dt = UNIX_EPOCH + delta return dt - + def iso8061_to_epoch(datestring): dt = parse_iso8061(datestring) return EPOCH(dt) - + def epoch_now(): dt = datetime.utcnow().replace(tzinfo=pytz.utc) return EPOCH(dt) @@ -72,7 +72,7 @@ class utcdatetime(datetime): return dt - + # Datetime Calculations # --------------------- diff --git a/zipline/utils/exception_utils.py b/zipline/utils/exception_utils.py new file mode 100644 index 00000000..67dc1ee9 --- /dev/null +++ b/zipline/utils/exception_utils.py @@ -0,0 +1,16 @@ +from textwrap import dedent + +class CustomException(Exception): + argmap = {0: 'classname'} + + def __init__(self, *args): + self.args = args + + def format(self): + assert len(self.args) == len(self.argmap), \ + """Wrong number of arguments passed to custom exception %s.""" \ + % self.__class__ + return self.message.format(**dict(zip(self.argmap, self.args))) + + def __str__(self): + return dedent(self.format()).strip('\n')