From f759cac61bc540dc26c52d07402bc6344d8acb84 Mon Sep 17 00:00:00 2001 From: fawce Date: Tue, 10 Apr 2012 22:46:50 -0400 Subject: [PATCH 1/8] added a simple test for the transaction simulator, heavily revised the simulator as a result. --- zipline/finance/trading.py | 106 +++++++++++++++++++++++------------ zipline/test/test_finance.py | 90 +++++++++++++++++++++++++++-- 2 files changed, 155 insertions(+), 41 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 76ad2a19..51bdc961 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -249,9 +249,8 @@ class TransactionSimulator(qmsg.BaseTransform): self.open_orders = {} self.order_count = 0 self.txn_count = 0 - self.trade_window = datetime.timedelta(seconds=30) + self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) - self.volume_share = 0.05 self.commission = 0.03 def transform(self, event): @@ -267,9 +266,12 @@ class TransactionSimulator(qmsg.BaseTransform): self.state['value'] = txn else: self.state['value'] = None - qutil.LOGGER.info("unexpected event type in transform: {etype}".format(etype=event.type)) + log = "unexpected event type in transform: {etype}".format( + etype=event.type + ) + qutil.LOGGER.info(log) + #TODO: what to do if we get another kind of datasource event.type? - return self.state def add_open_order(self, event): @@ -279,13 +281,19 @@ class TransactionSimulator(qmsg.BaseTransform): """ event.amount = int(event.amount) if event.amount == 0: - qutil.LOGGER.debug("requested to trade zero shares of {sid}".format(sid=event.sid)) + log = "requested to trade zero shares of {sid}".format( + sid=event.sid + ) + qutil.LOGGER.debug(log) return self.order_count += 1 if(not self.open_orders.has_key(event.sid)): self.open_orders[event.sid] = [] + + # set the filled property to zero + event.filled = 0 self.open_orders[event.sid].append(event) def apply_trade_to_open_orders(self, event): @@ -293,47 +301,71 @@ class TransactionSimulator(qmsg.BaseTransform): if(event.volume == 0): #there are zero volume events bc some stocks trade #less frequently than once per minute. - return self.create_dummy_txn(event.dt) + return None if self.open_orders.has_key(event.sid): orders = self.open_orders[event.sid] + orders = sorted(orders, key=lambda o: o.dt) else: return None - - remaining_orders = [] + total_order = 0 dt = event.dt - + expired = [] + total_order = 0 + simulated_amount = 0 + simulated_impact = 0.0 + direction = 1.0 for order in orders: - #we're using minute bars, so allow orders within - #30 seconds of the trade - if((order.dt - event.dt) < self.trade_window): - total_order += order.amount - if(order.dt > dt): - dt = order.dt - #if the order still has time to live (TTL) keep track - elif((self.algo_time - order.dt) < self.orderTTL): - remaining_orders.append(order) - - self.open_orders[event.sid] = remaining_orders - - if(total_order != 0): - direction = total_order / math.fabs(total_order) - else: - direction = 1 - volume_share = (direction * total_order) / event.volume - if volume_share > .25: - volume_share = .25 - amount = volume_share * event.volume * direction - impact = (volume_share)**2 * .1 * direction * event.price - return self.create_transaction( - event.sid, - amount, - event.price + impact, - dt.replace(tzinfo = pytz.utc), - direction - ) + if(order.dt <= event.dt): + + # orders are only good on the day they are issued + if order.dt.day < event.dt.day: + continue + + open_amount = order.amount - order.filled + + if(open_amount != 0): + direction = open_amount / math.fabs(open_amount) + else: + direction = 1 + + desired_order = total_order + open_amount + + volume_share = direction * (desired_order) / event.volume + if volume_share > .25: + volume_share = .25 + simulated_amount = volume_share * event.volume * direction + simulated_impact = (volume_share)**2 * .1 * direction * event.price + + order.filled += (simulated_amount - total_order) + total_order = simulated_amount + + # we cap the volume share at 25% of a trade + if volume_share == .25: + break + + if simulated_amount == 0: + warning = "Calculated a zero volume transation on trade: {event}" + warning = warning.format(event=str(event)) + qutil.LOGGER.warn(warning) + + orders = [ x for x in orders if x.amount - x.filled > 0 and x.dt.day >= event.dt.day] + + self.open_orders[event.sid] = orders + + + if simulated_amount > 0: + return self.create_transaction( + event.sid, + simulated_amount, + event.price + simulated_impact, + dt.replace(tzinfo = pytz.utc), + direction + ) + else: + return None def create_transaction(self, sid, amount, price, dt, direction): diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 52538f6a..8ed37b25 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -21,6 +21,7 @@ TradeSimulationClient, TradingEnvironment from zipline.simulator import AddressAllocator, Simulator from zipline.monitor import Controller from zipline.lines import SimulatedTrading +from zipline.protocol_utils import namedict DEFAULT_TIMEOUT = 15 # seconds @@ -204,7 +205,9 @@ class FinanceTestCase(TestCase): self.zipline_test_config['trade_count'] = 200 self.zipline_test_config['algorithm'] = test_algo - zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) zipline.simulate(blocking=True) #check that the algorithm received no events @@ -214,8 +217,87 @@ class FinanceTestCase(TestCase): "The algorithm should not receive any events due to filtering." ) + + @timed(DEFAULT_TIMEOUT) + def test_transaction_sim(self): + + trade_count = 40 + trading_environment = factory.create_trading_environment() + trade_sim = TransactionSimulator() + price = [10.1] * trade_count + volume = [100] * trade_count + start_date = trading_environment.first_open + one_day = timedelta(days=1) + one_hour = timedelta(hours=1) + sid = 1 + generated_trades = factory.create_trade_history( + sid, + price, + volume, + one_hour, + trading_environment + ) + + trade_1 = generated_trades.pop() + trade_sim.transform(trade_1) + + order_amount = 100 + order_count = 2 + + for i in range(order_count): + order = namedict( + { + 'sid':sid, + 'amount':order_amount, + 'type':zp.DATASOURCE_TYPE.ORDER, + 'dt' : start_date + i * one_day + }) - - - + sim_state = trade_sim.transform(order) + + # there should not be a new transaction from an order. + self.assertTrue(sim_state['name'] == trade_sim.get_id) + self.assertTrue(sim_state['value'] == None) + + # there should now be one open order in the sid + + oo = trade_sim.open_orders + self.assertTrue(oo.has_key(sid)) + order_list = oo[sid] + self.assertEqual(order_count, len(order_list)) + + for order in order_list: + self.assertEqual(order.sid, sid) + self.assertEqual(order.amount, order_amount) + + transactions = [] + for trade in generated_trades: + sim_state = trade_sim.transform(trade) + + self.assertEqual(sim_state['name'], trade_sim.get_id) + + if sim_state['value']: + transactions.append(sim_state['value']) + + + if len(trade_sim.open_orders[sid]) == 0: + break + + total_volume = 0 + for txn in transactions: + total_volume += txn.amount + + self.assertEqual(total_volume, order_count * order_amount) + + # because we placed an order for 100 shares, and the volume + # of each trade is 100, the simulator should spread the order + # into 4 trades of 25 shares per order. + self.assertEqual(len(transactions), 4 * order_count) + + # the open orders should now be empty + oo = trade_sim.open_orders + self.assertTrue(oo.has_key(sid)) + order_list = oo[sid] + self.assertEqual(0, len(order_list)) + \ No newline at end of file From b78097241a6f1f0f667c9d24a069d377d2752a4b Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 11 Apr 2012 12:00:16 -0400 Subject: [PATCH 2/8] rounded out tests to cover more trading cases (spreading, collapsing, expiring) --- zipline/finance/trading.py | 33 ++++++++--- zipline/test/factory.py | 4 +- zipline/test/test_finance.py | 106 ++++++++++++++++++++++++++--------- 3 files changed, 105 insertions(+), 38 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 51bdc961..a089f392 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -308,8 +308,7 @@ class TransactionSimulator(qmsg.BaseTransform): orders = sorted(orders, key=lambda o: o.dt) else: return None - - total_order = 0 + dt = event.dt expired = [] total_order = 0 @@ -318,7 +317,7 @@ class TransactionSimulator(qmsg.BaseTransform): direction = 1.0 for order in orders: - if(order.dt <= event.dt): + if(order.dt < event.dt): # orders are only good on the day they are issued if order.dt.day < event.dt.day: @@ -336,7 +335,7 @@ class TransactionSimulator(qmsg.BaseTransform): volume_share = direction * (desired_order) / event.volume if volume_share > .25: volume_share = .25 - simulated_amount = volume_share * event.volume * direction + simulated_amount = int(volume_share * event.volume * direction) simulated_impact = (volume_share)**2 * .1 * direction * event.price order.filled += (simulated_amount - total_order) @@ -347,12 +346,20 @@ class TransactionSimulator(qmsg.BaseTransform): break if simulated_amount == 0: - warning = "Calculated a zero volume transation on trade: {event}" - warning = warning.format(event=str(event)) + warning = """ +Calculated a zero volume transation on trade: +{event} +for order: +{order} + """ + warning = warning.format( + event=str(event), + order=str(order) + ) qutil.LOGGER.warn(warning) - + + orders = [ x for x in orders if x.amount - x.filled > 0 and x.dt.day >= event.dt.day] - self.open_orders[event.sid] = orders @@ -477,7 +484,15 @@ class TradingEnvironment(object): return len(self.period_trading_days) - + def is_market_hours(self, test_date): + if not self.is_trading_day(test_date): + return False + + mkt_open = self.set_NYSE_time(test_date, 9, 30) + #TODO: half days? + mkt_close = self.set_NYSE_time(test_date, 16, 00) + + return test_date >= mkt_open and test_date <= mkt_close def is_trading_day(self, test_date): dt = self.normalize_date(test_date) diff --git a/zipline/test/factory.py b/zipline/test/factory.py index 4954cabd..92000650 100644 --- a/zipline/test/factory.py +++ b/zipline/test/factory.py @@ -68,7 +68,7 @@ def get_next_trading_dt(current, interval, trading_calendar): next = current while True: next = next + interval - if trading_calendar.is_trading_day(next): + if trading_calendar.is_market_hours(next): break return next @@ -79,9 +79,9 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar): for price, amount in zip(prices, amounts): - current = get_next_trading_dt(current, interval, trading_calendar) trade = create_trade(sid, price, amount, current) trades.append(trade) + current = get_next_trading_dt(current, interval, trading_calendar) assert len(trades) == len(prices) return trades diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 8ed37b25..42d5367c 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -218,51 +218,105 @@ class FinanceTestCase(TestCase): ) + # TODO: write a test that proves orders expire without being filled. @timed(DEFAULT_TIMEOUT) def test_transaction_sim(self): - trade_count = 40 + # create a scenario where order size and trade size are equal + # so that orders must be spread out over several trades. + params ={ + 'trade_count':360, + 'trade_amount':100, + 'trade_interval': timedelta(minutes=1), + 'order_count':2, + 'order_amount':100, + 'order_interval': timedelta(minutes=1), + # because we placed an order for 100 shares, and the volume + # of each trade is 100, the simulator should spread the order + # into 4 trades of 25 shares per order. + 'expected_txn_count':8, + 'expected_txn_volume':2 * 100 + } + + self.transaction_sim(**params) + + # create a scenario where order.amount <<< trade.volume + # to test that several orders can be covered properly by one trade. + params2 ={ + 'trade_count':6, + 'trade_amount':100, + 'trade_interval': timedelta(hours=1), + 'order_count':24, + 'order_amount':1, + 'order_interval': timedelta(minutes=1), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count':1, + 'expected_txn_volume':24 * 1 + } + self.transaction_sim(**params2) + + # create a scenario where orders expire without being filled + # entirely + params3 = { + 'trade_count':100, + 'trade_amount':100, + 'trade_delay': timedelta(minutes=5), + 'trade_interval': timedelta(days=1), + 'order_count':3, + 'order_amount':1000, + 'order_interval': timedelta(minutes=30), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count' : 1, + 'expected_txn_volume' : 25 + } + self.transaction_sim(**params3) + + def transaction_sim(self, **params): + trade_count = params['trade_count'] + trade_amount = params['trade_amount'] + trade_interval = params['trade_interval'] + trade_delay = params.get('trade_delay') + order_count = params['order_count'] + order_amount = params['order_amount'] + order_interval = params['order_interval'] + expected_txn_count = params['expected_txn_count'] + expected_txn_volume = params['expected_txn_volume'] + trading_environment = factory.create_trading_environment() trade_sim = TransactionSimulator() price = [10.1] * trade_count volume = [100] * trade_count start_date = trading_environment.first_open - one_day = timedelta(days=1) - one_hour = timedelta(hours=1) sid = 1 generated_trades = factory.create_trade_history( sid, price, volume, - one_hour, + trade_interval, trading_environment ) - trade_1 = generated_trades.pop() - trade_sim.transform(trade_1) - - order_amount = 100 - order_count = 2 - for i in range(order_count): order = namedict( { 'sid':sid, 'amount':order_amount, 'type':zp.DATASOURCE_TYPE.ORDER, - 'dt' : start_date + i * one_day + 'dt' : start_date + i * order_interval }) sim_state = trade_sim.transform(order) - # there should not be a new transaction from an order. - self.assertTrue(sim_state['name'] == trade_sim.get_id) - self.assertTrue(sim_state['value'] == None) + # there should not be a new transaction from an order. + self.assertTrue(sim_state['name'] == trade_sim.get_id) + self.assertTrue(sim_state['value'] == None) - # there should now be one open order in the sid - + # there should now be one open order list stored under the sid oo = trade_sim.open_orders + self.assertEqual(len(oo), 1) self.assertTrue(oo.has_key(sid)) order_list = oo[sid] self.assertEqual(order_count, len(order_list)) @@ -273,31 +327,29 @@ class FinanceTestCase(TestCase): transactions = [] for trade in generated_trades: + if trade_delay: + trade.dt = trade.dt + trade_delay + sim_state = trade_sim.transform(trade) self.assertEqual(sim_state['name'], trade_sim.get_id) - + if sim_state['value']: - transactions.append(sim_state['value']) + transactions.append(sim_state['value']) - - if len(trade_sim.open_orders[sid]) == 0: - break total_volume = 0 for txn in transactions: total_volume += txn.amount - self.assertEqual(total_volume, order_count * order_amount) - - # because we placed an order for 100 shares, and the volume - # of each trade is 100, the simulator should spread the order - # into 4 trades of 25 shares per order. - self.assertEqual(len(transactions), 4 * order_count) + self.assertEqual(total_volume, expected_txn_volume) + self.assertEqual(len(transactions), expected_txn_count) # the open orders should now be empty oo = trade_sim.open_orders self.assertTrue(oo.has_key(sid)) order_list = oo[sid] self.assertEqual(0, len(order_list)) + + \ No newline at end of file From aea2e1189cf41b177e48765dee5fbf7f4d9db00b Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 12 Apr 2012 10:46:10 -0400 Subject: [PATCH 3/8] fixes to the calculation of transactions and associated tests for long and short orders. --- zipline/finance/trading.py | 8 +-- zipline/lines.py | 22 +++---- zipline/test/algorithms.py | 4 +- zipline/test/factory.py | 29 ++++++++- zipline/test/test_finance.py | 96 ++++++++++++++++++++++++++++-- zipline/test/test_perf_tracking.py | 13 ++-- 6 files changed, 144 insertions(+), 28 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index a089f392..d3fbabe5 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -357,13 +357,13 @@ for order: order=str(order) ) qutil.LOGGER.warn(warning) - - - orders = [ x for x in orders if x.amount - x.filled > 0 and x.dt.day >= event.dt.day] + + orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] + self.open_orders[event.sid] = orders - if simulated_amount > 0: + if simulated_amount != 0: return self.create_transaction( event.sid, simulated_amount, diff --git a/zipline/lines.py b/zipline/lines.py index 2123cfdf..624a9456 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -191,16 +191,16 @@ class SimulatedTrading(object): - sid - an integer, which will be used as the security ID. - order_count - the number of orders the test algo will place, defaults to 100 + - order_amount - the number of shares per order, defaults to 100 - trade_count - the number of trades to simulate, defaults to 100 - simulator_class - optional parameter that provides an alternative subclass of ComponentHost to hold the whole zipline. Defaults to :py:class:`zipline.simulator.Simulator` - algorithm - optional parameter providing an algorithm. defaults to :py:class:`zipline.test.algorithms.TestAlgorithm` - - random - optional parameter to request random trades. if present - :py:class:`zipline.sources.RandomEquityTrades` is the source. If - not :py:class:`ziplien.sources.SpecificEquityTrades` is the - source + - trade_source - optional parameter to specify trades, if present. + If not present :py:class:`ziplien.sources.SpecificEquityTrades` + is the source, with daily frequency in trades. """ assert isinstance(config, dict) @@ -219,6 +219,11 @@ class SimulatedTrading(object): order_count = config['order_count'] else: order_count = 100 + + if config.has_key('order_amount'): + order_amount = config['order_amount'] + else: + order_amount = 100 if config.has_key('trade_count'): trade_count = config['trade_count'] @@ -235,12 +240,8 @@ class SimulatedTrading(object): #------------------- sids = [sid] #------------------- - if config.has_key('random'): - trade_source = factory.create_random_trade_source( - sids, - trade_count, - trading_environment - ) + if config.has_key('trade_source'): + trade_source = config['trade_source'] else: trade_source = factory.create_daily_trade_source( sids, @@ -253,7 +254,6 @@ class SimulatedTrading(object): if config.has_key('algorithm'): test_algo = config['algorithm'] else: - order_amount = 100 test_algo = TestAlgorithm( sid, order_amount, diff --git a/zipline/test/algorithms.py b/zipline/test/algorithms.py index 49ec15c0..522bf76f 100644 --- a/zipline/test/algorithms.py +++ b/zipline/test/algorithms.py @@ -76,8 +76,8 @@ class TestAlgorithm(): self.incr += 1 def get_sid_filter(self): - return [self.sid] - + return [self.sid] + class NoopAlgorithm(object): """ Dolce fa niente. diff --git a/zipline/test/factory.py b/zipline/test/factory.py index 92000650..381fbb7e 100644 --- a/zipline/test/factory.py +++ b/zipline/test/factory.py @@ -167,6 +167,7 @@ def create_random_trade_source(sid, trade_count, trading_environment): return source def create_daily_trade_source(sids, trade_count, trading_environment): + """ creates trade_count trades for each sid in sids list. first trade will be on trading_environment.period_start, and daily @@ -176,12 +177,38 @@ def create_daily_trade_source(sids, trade_count, trading_environment): Important side-effect: trading_environment.period_end will be modified to match the day of the final trade. """ + return create_trade_source( + sids, + trade_count, + timedelta(days=1), + trading_environment + ) + + +def create_minutely_trade_source(sids, trade_count, trading_environment): + + """ + creates trade_count trades for each sid in sids list. + first trade will be on trading_environment.period_start, and every minute + thereafter for each sid. Thus, two sids should result in two trades per + minute. + + Important side-effect: trading_environment.period_end will be modified + to match the day of the final trade. + """ + return create_trade_source( + sids, + trade_count, + timedelta(minutes=1), + trading_environment + ) + +def create_trade_source(sids, trade_count, trade_time_increment, trading_environment): trade_history = [] for sid in sids: price = [10.1] * trade_count volume = [100] * trade_count start_date = trading_environment.first_open - trade_time_increment = timedelta(days=1) generated_trades = create_trade_history( sid, diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 42d5367c..9b3e4187 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -24,6 +24,7 @@ from zipline.lines import SimulatedTrading from zipline.protocol_utils import namedict DEFAULT_TIMEOUT = 15 # seconds +EXTENDED_TIMEOUT = 90 allocator = AddressAllocator(1000) @@ -121,6 +122,38 @@ class FinanceTestCase(TestCase): .format(n=zipline.sim.feed.pending_messages())) + + @timed(EXTENDED_TIMEOUT) + def test_aggressive_buying(self): + + # Simulation + # ---------- + trade_count = 10 * 1000 + self.zipline_test_config['order_count'] = 5 * 1000 + self.zipline_test_config['trade_count'] = trade_count + self.zipline_test_config['order_amount'] = 100 + self.zipline_test_config['environment'] = factory.create_trading_environment() + + sid_list = [self.zipline_test_config['sid']] + + self.zipline_test_config['trade_source'] = factory.create_minutely_trade_source( + sid_list, + trade_count, + self.zipline_test_config['environment'] + ) + + zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + zipline.simulate(blocking=True) + + self.assertTrue(zipline.sim.ready()) + self.assertFalse(zipline.sim.exception) + + # TODO: Make more assertions about the final state of the components. + self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ + "The feed should be drained of all messages, found {n} remaining." \ + .format(n=zipline.sim.feed.pending_messages())) + + @timed(DEFAULT_TIMEOUT) def test_performance(self): #provide enough trades to ensure all orders are filled. @@ -218,9 +251,11 @@ class FinanceTestCase(TestCase): ) - # TODO: write a test that proves orders expire without being filled. + # TODO: write tests for short sales + # TODO: write a test to do massive buying or shorting. + @timed(DEFAULT_TIMEOUT) - def test_transaction_sim(self): + def test_partially_filled_orders(self): # create a scenario where order size and trade size are equal # so that orders must be spread out over several trades. @@ -240,9 +275,25 @@ class FinanceTestCase(TestCase): self.transaction_sim(**params) + # same scenario, but with short sales + params2 ={ + 'trade_count':360, + 'trade_amount':100, + 'trade_interval': timedelta(minutes=1), + 'order_count':2, + 'order_amount':-100, + 'order_interval': timedelta(minutes=1), + 'expected_txn_count':8, + 'expected_txn_volume':2 * -100 + } + + self.transaction_sim(**params2) + + @timed(DEFAULT_TIMEOUT) + def test_collapsing_orders(self): # create a scenario where order.amount <<< trade.volume # to test that several orders can be covered properly by one trade. - params2 ={ + params1 ={ 'trade_count':6, 'trade_amount':100, 'trade_interval': timedelta(hours=1), @@ -254,11 +305,26 @@ class FinanceTestCase(TestCase): 'expected_txn_count':1, 'expected_txn_volume':24 * 1 } - self.transaction_sim(**params2) + self.transaction_sim(**params1) + # second verse, same as the first. except short! + params2 ={ + 'trade_count':6, + 'trade_amount':100, + 'trade_interval': timedelta(hours=1), + 'order_count':24, + 'order_amount':-1, + 'order_interval': timedelta(minutes=1), + 'expected_txn_count':1, + 'expected_txn_volume':24 * -1 + } + self.transaction_sim(**params2) + + @timed(DEFAULT_TIMEOUT) + def test_partial_expiration_orders(self): # create a scenario where orders expire without being filled # entirely - params3 = { + params1 = { 'trade_count':100, 'trade_amount':100, 'trade_delay': timedelta(minutes=5), @@ -271,7 +337,25 @@ class FinanceTestCase(TestCase): 'expected_txn_count' : 1, 'expected_txn_volume' : 25 } - self.transaction_sim(**params3) + self.transaction_sim(**params1) + + # same scenario, but short sales. + params2 = { + 'trade_count':100, + 'trade_amount':100, + 'trade_delay': timedelta(minutes=5), + 'trade_interval': timedelta(days=1), + 'order_count':3, + 'order_amount':1000, + 'order_interval': timedelta(minutes=30), + # because we placed an orders totaling less than 25% of one trade + # the simulator should produce just one transaction. + 'expected_txn_count' : 1, + 'expected_txn_volume' : 25 + } + self.transaction_sim(**params2) + + def transaction_sim(self, **params): trade_count = params['trade_count'] diff --git a/zipline/test/test_perf_tracking.py b/zipline/test/test_perf_tracking.py index bd6efe7c..be7c9a25 100644 --- a/zipline/test/test_perf_tracking.py +++ b/zipline/test/test_perf_tracking.py @@ -22,8 +22,15 @@ class PerformanceTestCase(unittest.TestCase): 0, len(self.treasury_curves) ) - self.dt = self.treasury_curves.keys()[random_index] - self.end_dt = self.dt + datetime.timedelta(days=365) + for n in range(100): + self.dt = self.treasury_curves.keys()[random_index] + self.end_dt = self.dt + datetime.timedelta(days=365) + + now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) + + if self.end_dt <= now: + break + self.trading_environment = TradingEnvironment( self.benchmark_returns, self.treasury_curves, @@ -505,8 +512,6 @@ shares in position" price = 10.1 price_list = [price] * trade_count volume = [100] * trade_count - #start_date = datetime.datetime.strptime("01/01/2011","%m/%d/%Y") - #start_date = start_date.replace(tzinfo=pytz.utc) trade_time_increment = datetime.timedelta(days=1) trade_history = factory.create_trade_history( sid, From 1559a84c7b320b1e6f885561c2e3f856fa2de737 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 13 Apr 2012 09:53:20 -0400 Subject: [PATCH 4/8] added simulation style to transation simulator, to facilitate tests. Fixed roll-over bug in max cap and max leverage calculation. --- zipline/component.py | 3 +- zipline/finance/performance.py | 61 ++++++++++++++----------- zipline/finance/trading.py | 82 +++++++++++++++++++++++++++++++--- zipline/lines.py | 13 +++--- zipline/messaging.py | 4 +- zipline/test/test_finance.py | 37 ++++++++++++--- 6 files changed, 155 insertions(+), 45 deletions(-) diff --git a/zipline/component.py b/zipline/component.py index 0004051f..7bcd8500 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -75,7 +75,8 @@ class Component(object): self.out_socket = None self.killed = False self.controller = None - self.heartbeat_timeout = 2000 + # timeout after a full minute + self.heartbeat_timeout = 60 *1000 self.state_flag = COMPONENT_STATE.OK self.error_state = COMPONENT_FAILURE.NOFAILURE self.on_done = None diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 81a3d8d9..799107d8 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -160,8 +160,6 @@ class PerformanceTracker(): self.total_days = self.trading_environment.days_in_period # one indexed so that we reach 100% self.day_count = 0.0 - self.cumulative_capital_used = 0.0 - self.max_capital_used = 0.0 self.capital_base = self.trading_environment.capital_base self.returns = [] self.txn_count = 0 @@ -219,8 +217,8 @@ class PerformanceTracker(): 'period_start' : self.period_start, 'period_end' : self.period_end, 'progress' : self.progress, - 'cumulative_captial_used' : self.cumulative_capital_used, - 'max_capital_used' : self.max_capital_used, + 'cumulative_captial_used' : self.cumulative_perf.cumulative_capital_used, + 'max_capital_used' : self.cumulative_perf.max_capital_used, 'last_close' : self.market_close, 'last_open' : self.market_open, 'capital_base' : self.capital_base, @@ -232,36 +230,21 @@ class PerformanceTracker(): } def process_event(self, event): + assert isinstance(event, zp.namedict) self.event_count += 1 if(event.dt >= self.market_close): self.handle_market_close() - if not pandas.isnull(event.TRANSACTION): + if event.TRANSACTION: self.txn_count += 1 self.cumulative_performance.execute_transaction(event.TRANSACTION) self.todays_performance.execute_transaction(event.TRANSACTION) - - # we're adding a 10% cushion to the capital used, - # and then rounding to the nearest 5k - transaction_cost = event.TRANSACTION.price * event.TRANSACTION.amount - self.cumulative_capital_used += transaction_cost - - if math.fabs(self.cumulative_capital_used) > self.max_capital_used: - self.max_capital_used = math.fabs(self.cumulative_capital_used) - - cushioned_capital = 1.1 * self.max_capital_used - self.max_capital_used = self.round_to_nearest( - cushioned_capital, - base=5000 - ) - self.max_leverage = self.max_capital_used / self.capital_base - + #update last sale self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) - def handle_market_close(self): #calculate performance as of last trade @@ -338,9 +321,6 @@ class PerformanceTracker(): # this signals that the simulation is complete. self.result_stream.send("DONE") - def round_to_nearest(self, x, base=5): - return int(base * round(float(x)/base)) - class Position(): @@ -409,6 +389,8 @@ class PerformancePeriod(): self.starting_cash = starting_cash self.ending_cash = starting_cash self.processed_transactions = [] + self.cumulative_capital_used = 0.0 + self.max_capital_used = 0.0 self.calculate_performance() @@ -426,11 +408,40 @@ class PerformancePeriod(): self.returns = 0.0 def execute_transaction(self, txn): + + # Update Position + # ---------------- if(not self.positions.has_key(txn.sid)): self.positions[txn.sid] = Position(txn.sid) self.positions[txn.sid].update(txn) self.period_capital_used += -1 * txn.price * txn.amount + + + # Max Leverage + # --------------- + # Calculate the maximum capital used and maximum leverage + + transaction_cost = txn.price * txn.amount + self.cumulative_capital_used += transaction_cost + + if math.fabs(self.cumulative_capital_used) > self.max_capital_used: + self.max_capital_used = math.fabs(self.cumulative_capital_used) + + # We want to conveye a level, rather than a precise figure. + # round to the nearest 5,000 to keep the number easy on the eyes + self.max_capital_used = self.round_to_nearest( + self.max_capital_used, + base=5000 + ) + + # we're adding a 10% cushion to the capital used. + self.max_leverage = 1.1 * self.max_capital_used / self.starting_cash + + # add transaction to the list of processed transactions self.processed_transactions.append(txn) + + def round_to_nearest(self, x, base=5): + return int(base * round(float(x)/base)) def calculate_positions_value(self): mktValue = 0.0 diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index d3fbabe5..208dc79c 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -11,6 +11,17 @@ import zipline.util as qutil import zipline.protocol as zp import zipline.finance.performance as perf +from zipline.protocol_utils import Enum + +# the simulation style enumerates the available transaction simulation +# strategies. +SIMULATION_STYLE = Enum( + 'PARTIAL_VOLUME', + 'BUY_ALL', + 'FIXED_SLIPPAGE', + 'NOOP' +) + class TradeSimulationClient(qmsg.Component): def __init__(self, trading_environment): @@ -19,6 +30,7 @@ class TradeSimulationClient(qmsg.Component): self.prev_dt = None self.event_queue = None self.txn_count = 0 + self.order_count = 0 self.trading_environment = trading_environment self.current_dt = trading_environment.period_start self.last_iteration_dur = datetime.timedelta(seconds=0) @@ -132,13 +144,14 @@ class TradeSimulationClient(qmsg.Component): return self.connect_push_socket(self.addresses['order_address']) def order(self, sid, amount): + order = zp.namedict({ 'dt':self.current_dt, 'sid':sid, 'amount':amount }) - self.order_socket.send(zp.ORDER_FRAME(order)) + self.order_count += 1 def signal_order_done(self): self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) @@ -211,6 +224,8 @@ class OrderDataSource(qmsg.DataSource): # we reduce the timeout here by a factor of 2, because we need # to potentially receive the client's done message before the # controller or heartbeat times out. + + # TODO: shouldn't this block until we receive a message? socks = dict(self.poll.poll(self.heartbeat_timeout/2)) # see if the poller has results for the result_feed @@ -232,19 +247,20 @@ class OrderDataSource(qmsg.DataSource): count += 1 self.sent_count += 1 - else: - # no orders, break out - break + # TODO: why didn't any unit tests catch this bug???? + + #else: + # # no orders, break out + # break #TODO: we have to send at least one dummy order per do_work iteration # or the feed will block waiting for our messages. if(count == 0): self.send(zp.namedict({})) - class TransactionSimulator(qmsg.BaseTransform): - def __init__(self): + def __init__(self, style): qmsg.BaseTransform.__init__(self, zp.TRANSFORM_TYPE.TRANSACTION) self.open_orders = {} self.order_count = 0 @@ -252,6 +268,15 @@ class TransactionSimulator(qmsg.BaseTransform): self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) self.commission = 0.03 + + if not style or style == SIMULATION_STYLE.PARTIAL_VOLUME: + self.apply_trade_to_open_orders = self.simulate_with_partial_volume + elif style == SIMULATION_STYLE.BUY_ALL: + self.apply_trade_to_open_orders = self.simulate_buy_all + elif style == SIMULATION_STYLE.FIXED_SLIPPAGE: + self.apply_trade_to_open_orders = self.simulate_with_fixed_cost + elif style == SIMULATION_STYLE.NOOP: + self.apply_trade_to_open_orders = self.simulate_noop def transform(self, event): """ @@ -296,8 +321,51 @@ class TransactionSimulator(qmsg.BaseTransform): event.filled = 0 self.open_orders[event.sid].append(event) - def apply_trade_to_open_orders(self, event): + #def apply_trade_to_open_orders(self, event): + # return self.simulate_with_fixed_cost(event) + + def simulate_buy_all(self, event): + txn = self.create_transaction( + event.sid, + event.volume, + event.price, + event.dt, + 1 + ) + return txn + def simulate_noop(self, event): + return None + + def simulate_with_fixed_cost(self, event): + if self.open_orders.has_key(event.sid): + orders = self.open_orders[event.sid] + orders = sorted(orders, key=lambda o: o.dt) + else: + return None + + amount = 0 + for order in orders: + amount += order.amount + + if(amount != 0): + direction = amount / math.fabs(amount) + else: + direction = 1 + + txn = self.create_transaction( + event.sid, + amount, + event.price + 0.10, + event.dt, + direction + ) + + self.open_orders[event.sid] = [] + + return txn + + def simulate_with_partial_volume(self, event): if(event.volume == 0): #there are zero volume events bc some stocks trade #less frequently than once per minute. diff --git a/zipline/lines.py b/zipline/lines.py index 624a9456..466de2e3 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -90,7 +90,7 @@ from zipline.finance.trading import TransactionSimulator, OrderDataSource, \ TradeSimulationClient from zipline.simulator import AddressAllocator, Simulator from zipline.monitor import Controller - +from zipline.finance.trading import SIMULATION_STYLE class SimulatedTrading(object): @@ -130,6 +130,7 @@ class SimulatedTrading(object): self.algorithm = config['algorithm'] self.allocator = config['allocator'] self.trading_environment = config['trading_environment'] + self.sim_style = config.get('simulation_style') self.leased_sockets = [] self.sim_context = None @@ -169,7 +170,7 @@ class SimulatedTrading(object): self.add_source(self.order_source) #setup transforms - self.transaction_sim = TransactionSimulator() + self.transaction_sim = TransactionSimulator(self.sim_style) self.transforms = {} self.add_transform(self.transaction_sim) @@ -192,7 +193,8 @@ class SimulatedTrading(object): - order_count - the number of orders the test algo will place, defaults to 100 - order_amount - the number of shares per order, defaults to 100 - - trade_count - the number of trades to simulate, defaults to 100 + - trade_count - the number of trades to simulate, defaults to 101 + to ensure all orders are processed. - simulator_class - optional parameter that provides an alternative subclass of ComponentHost to hold the whole zipline. Defaults to :py:class:`zipline.simulator.Simulator` @@ -228,7 +230,7 @@ class SimulatedTrading(object): if config.has_key('trade_count'): trade_count = config['trade_count'] else: - trade_count = 100 + trade_count = 101 if config.has_key('simulator_class'): simulator_class = config['simulator_class'] @@ -266,7 +268,8 @@ class SimulatedTrading(object): 'algorithm':test_algo, 'trading_environment':trading_environment, 'allocator':allocator, - 'simulator_class':simulator_class + 'simulator_class':simulator_class, + 'simulation_style':SIMULATION_STYLE.FIXED_SLIPPAGE }) #------------------- diff --git a/zipline/messaging.py b/zipline/messaging.py index 2c4caebc..7817ec8b 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -37,7 +37,7 @@ class ComponentHost(Component): # ---------------------- self.sync_register = {} - self.timeout = datetime.timedelta(seconds=5) + self.timeout = datetime.timedelta(seconds=60) self.feed = Feed() self.merge = Merge() @@ -214,7 +214,7 @@ class Feed(Component): def do_work(self): # wait for synchronization reply from the host - socks = dict(self.poll.poll(self.heartbeat_timeout)) #timeout after 2 seconds. + socks = dict(self.poll.poll(self.heartbeat_timeout)) # TODO: Abstract this out, maybe on base component if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 9b3e4187..39a13bd1 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -21,6 +21,7 @@ TradeSimulationClient, TradingEnvironment from zipline.simulator import AddressAllocator, Simulator from zipline.monitor import Controller from zipline.lines import SimulatedTrading +from zipline.finance.performance import PerformanceTracker from zipline.protocol_utils import namedict DEFAULT_TIMEOUT = 15 # seconds @@ -120,7 +121,22 @@ class FinanceTestCase(TestCase): self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ "The feed should be drained of all messages, found {n} remaining." \ .format(n=zipline.sim.feed.pending_messages())) - + + + # the trading client should receive one transaction for every + # order placed. + self.assertEqual( + zipline.trading_client.txn_count, + zipline.trading_client.order_count + ) + + # the number of transactions in the performance tracker's cumulative + # period should be the same as the number of orders place by the + # algorithm. + self.assertEqual( + zipline.trading_client.order_count, + len(zipline.trading_client.perf.cumulative_performance.processed_transactions) + ) @timed(EXTENDED_TIMEOUT) @@ -152,8 +168,8 @@ class FinanceTestCase(TestCase): self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ "The feed should be drained of all messages, found {n} remaining." \ .format(n=zipline.sim.feed.pending_messages())) - - + + @timed(DEFAULT_TIMEOUT) def test_performance(self): #provide enough trades to ensure all orders are filled. @@ -358,6 +374,7 @@ class FinanceTestCase(TestCase): def transaction_sim(self, **params): + trade_count = params['trade_count'] trade_amount = params['trade_amount'] trade_interval = params['trade_interval'] @@ -409,6 +426,9 @@ class FinanceTestCase(TestCase): self.assertEqual(order.sid, sid) self.assertEqual(order.amount, order_amount) + + tracker = PerformanceTracker(trading_environment) + transactions = [] for trade in generated_trades: if trade_delay: @@ -418,10 +438,14 @@ class FinanceTestCase(TestCase): self.assertEqual(sim_state['name'], trade_sim.get_id) + txn = None if sim_state['value']: - transactions.append(sim_state['value']) + txn = sim_state['value'] + transactions.append(txn) + trade[sim_state['name']] = txn - + tracker.process_event(trade) + total_volume = 0 for txn in transactions: total_volume += txn.amount @@ -429,6 +453,9 @@ class FinanceTestCase(TestCase): self.assertEqual(total_volume, expected_txn_volume) self.assertEqual(len(transactions), expected_txn_count) + cumulative_pos = tracker.cumulative_performance.positions[sid] + self.assertEqual(total_volume, cumulative_pos.amount) + # the open orders should now be empty oo = trade_sim.open_orders self.assertTrue(oo.has_key(sid)) From dfc3523197eef2b8d3ef84cd1a30caaad5bb43b6 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 13 Apr 2012 09:58:12 -0400 Subject: [PATCH 5/8] fixed default to be partial fill --- zipline/finance/trading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 208dc79c..e6d1fb31 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -260,7 +260,7 @@ class OrderDataSource(qmsg.DataSource): class TransactionSimulator(qmsg.BaseTransform): - def __init__(self, style): + def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME): qmsg.BaseTransform.__init__(self, zp.TRANSFORM_TYPE.TRANSACTION) self.open_orders = {} self.order_count = 0 From 38982fcdab2bbcb3e212f5fc592f5a89e2a0e5cc Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 13 Apr 2012 15:08:17 -0400 Subject: [PATCH 6/8] major fix is with the non-blocking behavior of order source. also fixed time-compression in the trading client. --- zipline/finance/performance.py | 11 ++++-- zipline/finance/trading.py | 63 ++++++++++++++++++---------------- zipline/lines.py | 14 +++++++- zipline/messaging.py | 23 ++++++++++--- zipline/test/algorithms.py | 2 +- zipline/test/factory.py | 6 ++-- 6 files changed, 77 insertions(+), 42 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 799107d8..03c94139 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -166,6 +166,7 @@ class PerformanceTracker(): self.event_count = 0 self.result_stream = None self.last_dict = None + self.order_log = [] # this performance period will span the entire simulation. self.cumulative_performance = PerformancePeriod( @@ -228,6 +229,9 @@ class PerformanceTracker(): 'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict(), 'timestamp' : datetime.datetime.now(), } + + def log_order(self, order): + self.order_log.append(order) def process_event(self, event): assert isinstance(event, zp.namedict) @@ -300,13 +304,14 @@ class PerformanceTracker(): and send it out on the result_stream. """ + log_msg = "Simulated {n} trading days out of {m}." + qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) + qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. self.handle_market_close() - log_msg = "Simulated {n} trading days out of {m}." - qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) self.risk_report = risk.RiskReport( self.returns, diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index e6d1fb31..77eb8ca5 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -119,12 +119,15 @@ class TradeSimulationClient(qmsg.Component): # otherwise, the algorithm has fallen behind the feed # and processing per event is longer than time between events. if event.dt >= self.current_dt: + # compress time by moving the current_time up to the event + # time. + self.current_dt = event.dt self.run_algorithm() # tally the time spent on this iteration self.last_iteration_dur = datetime.datetime.utcnow() - event_start # move the algorithm's clock forward to include iteration time - self.current_dt = self.current_dt + self.last_iteration_dur + # self.current_dt = self.current_dt + self.last_iteration_dur def run_algorithm(self): @@ -152,6 +155,7 @@ class TradeSimulationClient(qmsg.Component): }) self.order_socket.send(zp.ORDER_FRAME(order)) self.order_count += 1 + self.perf.log_order(order) def signal_order_done(self): self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) @@ -185,6 +189,7 @@ class OrderDataSource(qmsg.DataSource): """ qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE) self.sent_count = 0 + self.works = 0 @property def get_type(self): @@ -194,7 +199,8 @@ class OrderDataSource(qmsg.DataSource): @property def is_blocking(self): """ - This datasource is in a loop with the TradingSimulationClient + This datasource is in a loop with the TradingSimulationClient, + so we don't want it to block processing. """ return False @@ -207,17 +213,12 @@ class OrderDataSource(qmsg.DataSource): def do_work(self): - - #TODO: if this is the first iteration, break deadlock by sending a dummy order - if(self.sent_count == 0): - self.send(zp.namedict({})) + self.works += 1 + #pull all orders from client. orders = [] count = 0 - - # TODO : this can be written in a concurrency agnostic - # way... have a chat with Fawce about this ~Steve while True: # poll all the sockets @@ -246,17 +247,6 @@ class OrderDataSource(qmsg.DataSource): self.send(order) count += 1 self.sent_count += 1 - - # TODO: why didn't any unit tests catch this bug???? - - #else: - # # no orders, break out - # break - - #TODO: we have to send at least one dummy order per do_work iteration - # or the feed will block waiting for our messages. - if(count == 0): - self.send(zp.namedict({})) class TransactionSimulator(qmsg.BaseTransform): @@ -278,6 +268,19 @@ class TransactionSimulator(qmsg.BaseTransform): elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop + # + @property + def is_blocking(self): + """ + Including this explicitly for clarity, even though we are using the + default value. TransactionSimulator has a defined action for every + event type. Downstream components depend on the presence of the + TRANSACTION transform in all cases. When no transaction happens, + None is the value. Thus, we do want merging to block on the + availability of transaction messages. + """ + return True + def transform(self, event): """ Pulls one message from the event feed, then @@ -304,6 +307,8 @@ class TransactionSimulator(qmsg.BaseTransform): Amount is explicitly converted to an int. Orders of amount zero are ignored. """ + self.order_count += 1 + event.amount = int(event.amount) if event.amount == 0: log = "requested to trade zero shares of {sid}".format( @@ -312,7 +317,7 @@ class TransactionSimulator(qmsg.BaseTransform): qutil.LOGGER.debug(log) return - self.order_count += 1 + if(not self.open_orders.has_key(event.sid)): self.open_orders[event.sid] = [] @@ -321,9 +326,6 @@ class TransactionSimulator(qmsg.BaseTransform): event.filled = 0 self.open_orders[event.sid].append(event) - #def apply_trade_to_open_orders(self, event): - # return self.simulate_with_fixed_cost(event) - def simulate_buy_all(self, event): txn = self.create_transaction( event.sid, @@ -348,10 +350,11 @@ class TransactionSimulator(qmsg.BaseTransform): for order in orders: amount += order.amount - if(amount != 0): - direction = amount / math.fabs(amount) - else: - direction = 1 + if(amount == 0): + return + + direction = amount / math.fabs(amount) + txn = self.create_transaction( event.sid, @@ -426,9 +429,9 @@ for order: ) qutil.LOGGER.warn(warning) - orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] + #orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] - self.open_orders[event.sid] = orders + #self.open_orders[event.sid] = orders if simulated_amount != 0: diff --git a/zipline/lines.py b/zipline/lines.py index 466de2e3..43c2eac0 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -125,6 +125,9 @@ class SimulatedTrading(object): :py:class:`zipline.simulator.AddressAllocator` - simulator_class: a :py:class:`zipline.messaging.ComponentHost` subclass (not an instance) + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) self.algorithm = config['algorithm'] @@ -203,6 +206,9 @@ class SimulatedTrading(object): - trade_source - optional parameter to specify trades, if present. If not present :py:class:`ziplien.sources.SpecificEquityTrades` is the source, with daily frequency in trades. + - simulation_style: optional parameter that configures the + :py:class:`zipline.finance.trading.TransactionSimulator`. Expects + a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) @@ -230,12 +236,18 @@ class SimulatedTrading(object): if config.has_key('trade_count'): trade_count = config['trade_count'] else: + # to ensure all orders are filled, we provide one more + # trade than order trade_count = 101 if config.has_key('simulator_class'): simulator_class = config['simulator_class'] else: simulator_class = Simulator + + simulation_style = config.get('simulation_style') + if not simulation_style: + simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE #------------------- # Trade Source @@ -269,7 +281,7 @@ class SimulatedTrading(object): 'trading_environment':trading_environment, 'allocator':allocator, 'simulator_class':simulator_class, - 'simulation_style':SIMULATION_STYLE.FIXED_SLIPPAGE + 'simulation_style':simulation_style }) #------------------- diff --git a/zipline/messaging.py b/zipline/messaging.py index 7817ec8b..75e181b0 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -4,6 +4,8 @@ Commonly used messaging components. import datetime +from collections import Counter + import zipline.util as qutil from zipline.component import Component import zipline.protocol as zp @@ -81,11 +83,11 @@ class ComponentHost(Component): self.sync_register[component.get_id] = datetime.datetime.utcnow() if isinstance(component, DataSource): - self.feed.add_source(component.get_id) + self.feed.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 if isinstance(component, BaseTransform): - self.merge.add_source(component.get_id) + self.merge.add_source(component.get_id, component.is_blocking) if not component.is_blocking: self.feed.ds_finished_counter +=1 @@ -192,6 +194,13 @@ class Feed(Component): # Depending on the size of this, might want to use a data # structure with better asymptotics. self.data_buffer = {} + + # source_id -> integer count + self.sent_counters = Counter() + self.recv_counters = Counter() + + # source_id -> boolean. True is for blocking + self.is_blocking_map = {} def init(self): pass @@ -294,6 +303,7 @@ class Feed(Component): event = self.next() if(event != None): self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) + self.sent_counters[event.source_id] += 1 self.sent_count += 1 def append(self, event): @@ -302,6 +312,7 @@ class Feed(Component): source_id. """ self.data_buffer[event.source_id].append(event) + self.recv_counters[event.source_id] += 1 self.received_count += 1 def next(self): @@ -338,7 +349,10 @@ class Feed(Component): Indicates whether the buffer has messages in buffer for all un-DONE sources. """ - for events in self.data_buffer.values(): + for source_id, events in self.data_buffer.iteritems(): + if not self.is_blocking_map[source_id]: + continue + if len(events) == 0: return False return True @@ -353,11 +367,12 @@ class Feed(Component): total += len(events) return total - def add_source(self, source_id): + def add_source(self, source_id, is_blocking=True): """ Add a data source to the buffer. """ self.data_buffer[source_id] = [] + self.is_blocking_map[source_id] = is_blocking def __len__(self): """ diff --git a/zipline/test/algorithms.py b/zipline/test/algorithms.py index 522bf76f..3a7bfc4d 100644 --- a/zipline/test/algorithms.py +++ b/zipline/test/algorithms.py @@ -70,7 +70,7 @@ class TestAlgorithm(): def handle_frame(self, frame): self.frame_count += 1 - #place an order for 100 shares of sid:133 + #place an order for 100 shares of sid if self.incr < self.count: self.order(self.sid, self.amount) self.incr += 1 diff --git a/zipline/test/factory.py b/zipline/test/factory.py index 381fbb7e..2c23b44f 100644 --- a/zipline/test/factory.py +++ b/zipline/test/factory.py @@ -38,12 +38,12 @@ def load_market_data(): return bm_returns, tr_curves -def create_trading_environment(): +def create_trading_environment(year=2006): """Construct a complete environment with reasonable defaults""" benchmark_returns, treasury_curves = load_market_data() - start = datetime(2006, 1, 1, tzinfo=pytz.utc) - end = datetime(2006, 12, 31, tzinfo=pytz.utc) + start = datetime(year, 1, 1, tzinfo=pytz.utc) + end = datetime(year, 12, 31, tzinfo=pytz.utc) trading_environment = TradingEnvironment( benchmark_returns, treasury_curves, From 4486dd0cad30e7f7e9b369468ff87e17555cbe2f Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 13 Apr 2012 16:54:49 -0400 Subject: [PATCH 7/8] removed some straggling test code. --- zipline/finance/trading.py | 6 +++--- zipline/test/test_finance.py | 16 +++++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 77eb8ca5..f9c0adee 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -127,7 +127,7 @@ class TradeSimulationClient(qmsg.Component): # tally the time spent on this iteration self.last_iteration_dur = datetime.datetime.utcnow() - event_start # move the algorithm's clock forward to include iteration time - # self.current_dt = self.current_dt + self.last_iteration_dur + self.current_dt = self.current_dt + self.last_iteration_dur def run_algorithm(self): @@ -429,9 +429,9 @@ for order: ) qutil.LOGGER.warn(warning) - #orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] + orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day] - #self.open_orders[event.sid] = orders + self.open_orders[event.sid] = orders if simulated_amount != 0: diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 39a13bd1..34c0a050 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -23,6 +23,7 @@ from zipline.monitor import Controller from zipline.lines import SimulatedTrading from zipline.finance.performance import PerformanceTracker from zipline.protocol_utils import namedict +from zipline.finance.trading import SIMULATION_STYLE DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 @@ -111,7 +112,12 @@ class FinanceTestCase(TestCase): # Simulation # ---------- - zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config) + + self.zipline_test_config['simulation_style'] = \ + SIMULATION_STYLE.FIXED_SLIPPAGE + zipline = SimulatedTrading.create_test_zipline( + **self.zipline_test_config + ) zipline.simulate(blocking=True) self.assertTrue(zipline.sim.ready()) @@ -125,10 +131,10 @@ class FinanceTestCase(TestCase): # the trading client should receive one transaction for every # order placed. - self.assertEqual( - zipline.trading_client.txn_count, - zipline.trading_client.order_count - ) + #self.assertEqual( + # zipline.trading_client.txn_count, + # zipline.trading_client.order_count + #) # the number of transactions in the performance tracker's cumulative # period should be the same as the number of orders place by the From 02d7f0a4c8988a32f849b5ebaa78f6b8a8ec3f6f Mon Sep 17 00:00:00 2001 From: fawce Date: Sat, 14 Apr 2012 15:09:52 -0400 Subject: [PATCH 8/8] heavy work on the feedback loop from trade client to order source. tests are passing using a busy wait inside the trade client. hopefully we can find a more elegant approach. --- zipline/finance/trading.py | 44 ++++++++++++++++++++++++++++-------- zipline/messaging.py | 2 +- zipline/sources.py | 1 + zipline/test/test_finance.py | 41 ++++++++++++++++++++++++--------- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index f9c0adee..cab1051a 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -3,6 +3,8 @@ import pytz import math import pandas +from collections import Counter + # from gevent.select import select from zmq.core.poll import select @@ -11,7 +13,7 @@ import zipline.util as qutil import zipline.protocol as zp import zipline.finance.performance as perf -from zipline.protocol_utils import Enum +from zipline.protocol_utils import Enum, namedict # the simulation style enumerates the available transaction simulation # strategies. @@ -35,6 +37,8 @@ class TradeSimulationClient(qmsg.Component): self.current_dt = trading_environment.period_start self.last_iteration_dur = datetime.timedelta(seconds=0) self.algorithm = None + self.attempts = 0 + self.max_attempts = 1000 assert self.trading_environment.frame_index != None self.event_frame = pandas.DataFrame( @@ -59,8 +63,11 @@ class TradeSimulationClient(qmsg.Component): def open(self): self.result_feed = self.connect_result() self.order_socket = self.connect_order() + # send a wake up call to the order data source. + self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK)) def do_work(self): + # poll all the sockets socks = dict(self.poll.poll(self.heartbeat_timeout)) @@ -68,6 +75,8 @@ class TradeSimulationClient(qmsg.Component): if self.result_feed in socks and \ socks[self.result_feed] == self.zmq.POLLIN: + self.attempts = 0 + # get the next message from the result feed msg = self.result_feed.recv() @@ -77,8 +86,7 @@ class TradeSimulationClient(qmsg.Component): # signal the performance tracker that the simulation has # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() - # shutdown the feedback loop to the OrderDataSource - self.signal_order_done() + # signal Simulator, our ComponentHost, that this component is # done and Simulator needn't block exit on this component. self.signal_done() @@ -86,13 +94,21 @@ class TradeSimulationClient(qmsg.Component): # result_feed is a merge component, so unframe accordingly event = zp.MERGE_UNFRAME(msg) - + self.received_count += 1 # update performance and relay the event to the algorithm self.process_event(event) - # signal done to order source. + # signal loop is done for order source. self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK)) - + else: + # no events in the sock means the non-order sources are + # drained. Signal the order_source that we're done, and + # the done will cascade through the whole zipline. + # shutdown the feedback loop to the OrderDataSource + if self.attempts > self.max_attempts: + self.signal_order_done() + else: + self.attempts += 1 def process_event(self, event): # track the number of transactions, for testing purposes. if(event.TRANSACTION != None): @@ -189,6 +205,7 @@ class OrderDataSource(qmsg.DataSource): """ qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE) self.sent_count = 0 + self.recv_count = Counter() self.works = 0 @property @@ -202,7 +219,7 @@ class OrderDataSource(qmsg.DataSource): This datasource is in a loop with the TradingSimulationClient, so we don't want it to block processing. """ - return False + return True def open(self): qmsg.DataSource.open(self) @@ -215,11 +232,11 @@ class OrderDataSource(qmsg.DataSource): self.works += 1 - #pull all orders from client. - orders = [] count = 0 + # one iteration of the client could include several orders + # so iterate until the client signals a break or a close. while True: # poll all the sockets # we reduce the timeout here by a factor of 2, because we need @@ -236,17 +253,26 @@ class OrderDataSource(qmsg.DataSource): order_msg = self.order_socket.recv() if order_msg == str(zp.ORDER_PROTOCOL.DONE): + qutil.LOGGER.info("order source is done") self.signal_done() + self.recv_count['done'] += 1 return if order_msg == str(zp.ORDER_PROTOCOL.BREAK): + # send a blank message to avoid an empty buffer + # in the feed + self.recv_count['break'] += 1 + if count == 0: + self.send(namedict({})) break order = zp.ORDER_UNFRAME(order_msg) + self.recv_count['order'] += 1 #send the order along self.send(order) count += 1 self.sent_count += 1 + class TransactionSimulator(qmsg.BaseTransform): diff --git a/zipline/messaging.py b/zipline/messaging.py index 75e181b0..b36dea95 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -347,7 +347,7 @@ class Feed(Component): def is_full(self): """ Indicates whether the buffer has messages in buffer for - all un-DONE sources. + all un-DONE, blocking sources. """ for source_id, events in self.data_buffer.iteritems(): if not self.is_blocking_map[source_id]: diff --git a/zipline/sources.py b/zipline/sources.py index b1385754..bf08644c 100644 --- a/zipline/sources.py +++ b/zipline/sources.py @@ -94,6 +94,7 @@ class SpecificEquityTrades(TradeDataSource): def get_type(self): zp.COMPONENT_TYPE.SOURCE + def do_work(self): if(len(self.event_list) == 0): self.signal_done() diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index 34c0a050..2ff7da61 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -107,6 +107,10 @@ class FinanceTestCase(TestCase): self.assertTrue(env.last_close.month == 12) self.assertTrue(env.last_close.day == 31) + # The following two tests appear broken no that the order source is + # non blocking. HUNCH: The trades are streaming through before the orders + # are placed. + @timed(DEFAULT_TIMEOUT) def test_orders(self): @@ -128,13 +132,12 @@ class FinanceTestCase(TestCase): "The feed should be drained of all messages, found {n} remaining." \ .format(n=zipline.sim.feed.pending_messages())) - # the trading client should receive one transaction for every # order placed. - #self.assertEqual( - # zipline.trading_client.txn_count, - # zipline.trading_client.order_count - #) + self.assertEqual( + zipline.trading_client.txn_count, + zipline.trading_client.order_count + ) # the number of transactions in the performance tracker's cumulative # period should be the same as the number of orders place by the @@ -150,10 +153,18 @@ class FinanceTestCase(TestCase): # Simulation # ---------- - trade_count = 10 * 1000 - self.zipline_test_config['order_count'] = 5 * 1000 - self.zipline_test_config['trade_count'] = trade_count - self.zipline_test_config['order_amount'] = 100 + + # TODO: for some reason the orders aren't filled without an extra + # trade. + trade_count = 5001 + self.zipline_test_config['order_count'] = trade_count - 1 + self.zipline_test_config['trade_count'] = trade_count + self.zipline_test_config['order_amount'] = 1 + + # tell the simulator to fill the orders in individual transactions + # matching the order volume exactly. + self.zipline_test_config['simulation_style'] = \ + SIMULATION_STYLE.FIXED_SLIPPAGE self.zipline_test_config['environment'] = factory.create_trading_environment() sid_list = [self.zipline_test_config['sid']] @@ -169,12 +180,20 @@ class FinanceTestCase(TestCase): self.assertTrue(zipline.sim.ready()) self.assertFalse(zipline.sim.exception) - - # TODO: Make more assertions about the final state of the components. + self.assertEqual(zipline.sim.feed.pending_messages(), 0, \ "The feed should be drained of all messages, found {n} remaining." \ .format(n=zipline.sim.feed.pending_messages())) + # + # the trading client should receive one transaction for every + # order placed. + self.assertEqual( + zipline.trading_client.txn_count, + zipline.trading_client.order_count + ) + + @timed(DEFAULT_TIMEOUT) def test_performance(self):