From 649a8ea2e5f5d420ac6ed5e197e2aae9558db7a7 Mon Sep 17 00:00:00 2001 From: fawce Date: Tue, 13 Mar 2012 12:20:35 -0400 Subject: [PATCH] starting in on pandas integration. --- zipline/finance/trading.py | 55 ++++++++++++++++++++++++------------ zipline/protocol.py | 28 +++++++++++++----- zipline/test/test_finance.py | 39 ++++++++++++++----------- 3 files changed, 80 insertions(+), 42 deletions(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 5c4817de..685be0c3 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -1,6 +1,7 @@ import datetime import pytz import math +import pandas from zmq.core.poll import select @@ -10,13 +11,16 @@ import zipline.protocol as zp class TradeSimulationClient(qmsg.Component): - def __init__(self): + def __init__(self, simulation_dt): qmsg.Component.__init__(self) self.received_count = 0 self.prev_dt = None self.event_queue = [] self.event_callbacks = [] self.txn_count = 0 + self.current_dt = simulation_dt + self.last_iteration_duration = datetime.timedelta(seconds=0) + self.event_frame = None @property def get_id(self): @@ -52,9 +56,22 @@ class TradeSimulationClient(qmsg.Component): if(event.TRANSACTION != None): self.txn_count += 1 - for cb in self.event_callbacks: - cb(event) + #filter order flow out of the events sent to callbacks + if event.source_id != zp.FINANCE_COMPONENT.ORDER_SOURCE: + #mark the start time for client's processing of this event. + event_start = datetime.datetime.utcnow() + for cb in self.event_callbacks: + if(event.dt < self.current_dt): + self.queue_event(event) + else: + cb(self.event_frame) + + #update time based on receipt of the order + self.last_iteration_duration = datetime.datetime.utcnow() - event_start + + self.current_dt = self.current_dt + self.last_iteration_duration + #signal done to order source. self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK)) @@ -62,15 +79,26 @@ class TradeSimulationClient(qmsg.Component): return self.connect_push_socket(self.addresses['order_address']) def order(self, sid, amount): - self.order_socket.send(zp.ORDER_FRAME(sid, amount)) + order = zp.namedict({ + 'dt':self.current_dt, + 'sid':sid, + 'amount':amount + }) + + self.order_socket.send(zp.ORDER_FRAME(order)) def signal_order_done(self): self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE)) + def frame_event(self, event): + if self.event_frame == None: + self.event_frame = pandas.DataFrame() + self.event_frame.append(event) + class OrderDataSource(qmsg.DataSource): """DataSource that relays orders from the client""" - def __init__(self, simulation_dt): + def __init__(self): """ :param simulation_time: datetime in UTC timezone, sets the start time of simulation. orders @@ -83,8 +111,6 @@ class OrderDataSource(qmsg.DataSource): } """ qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE) - self.simulation_dt = simulation_dt - self.last_iteration_duration = datetime.timedelta(seconds=0) self.sent_count = 0 @property @@ -99,9 +125,6 @@ class OrderDataSource(qmsg.DataSource): return self.bind_pull_socket(self.addresses['order_address']) def do_work(self): - #mark the start time for client's processing of this event. - self.event_start = datetime.datetime.utcnow() - self.simulation_dt = self.simulation_dt + self.last_iteration_duration #TODO: if this is the first iteration, break deadlock by sending a dummy order if(self.sent_count == 0): @@ -109,7 +132,6 @@ class OrderDataSource(qmsg.DataSource): #pull all orders from client. orders = [] - order_dt = None count = 0 while True: @@ -132,6 +154,7 @@ class OrderDataSource(qmsg.DataSource): return order_msg = rlist[0].recv() + if order_msg == str(zp.ORDER_PROTOCOL.DONE): self.signal_done() return @@ -139,12 +162,8 @@ class OrderDataSource(qmsg.DataSource): if order_msg == str(zp.ORDER_PROTOCOL.BREAK): break - sid, amount = zp.ORDER_UNFRAME(order_msg) + order = zp.ORDER_UNFRAME(order_msg) #send the order along - self.last_iteration_duration = datetime.datetime.utcnow() - self.event_start - dt = self.simulation_dt + self.last_iteration_duration - order = zp.namedict({"dt":dt, 'sid':sid, 'amount':amount}) - self.send(order) count += 1 self.sent_count += 1 @@ -163,7 +182,7 @@ class TransactionSimulator(qmsg.BaseTransform): self.open_orders = {} self.order_count = 0 self.txn_count = 0 - self.trade_windwo = datetime.timedelta(seconds=30) + self.trade_window = datetime.timedelta(seconds=30) self.orderTTL = datetime.timedelta(days=1) self.volume_share = 0.05 self.commission = 0.03 @@ -221,7 +240,7 @@ class TransactionSimulator(qmsg.BaseTransform): for order in orders: #we're using minute bars, so allow orders within #30 seconds of the trade - if((order.dt - event.dt) < self.trade_windwo): + if((order.dt - event.dt) < self.trade_window): total_order += order.amount if(order.dt > dt): dt = order.dt diff --git a/zipline/protocol.py b/zipline/protocol.py index c1ad89fa..3088b3bd 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -119,6 +119,7 @@ import numbers import datetime import pytz import copy +import pandas from collections import namedtuple import zipline.util as qutil @@ -205,6 +206,9 @@ class namedict(object): def has_attr(self, name): return self.__dict__.has_key(name) + + def as_series(self): + s = pandas.Series(self.values(), self.keys()) # ================ # Control Protocol @@ -522,19 +526,29 @@ def TRADE_UNFRAME(msg): # Orders - from client to order source # ========= -def ORDER_FRAME(sid, amount): - assert isinstance(sid, int) - assert isinstance(amount, int) #no partial shares... - return msgpack.dumps(tuple([sid, amount])) +def ORDER_FRAME(order): + assert isinstance(order.sid, int) + assert isinstance(order.amount, int) #no partial shares... + PACK_DATE(order) + return msgpack.dumps(tuple([ + order.sid, + order.amount, + order.dt + ])) def ORDER_UNFRAME(msg): try: - sid, amount = msgpack.loads(msg) + sid, amount, dt = msgpack.loads(msg) assert isinstance(sid, int) assert isinstance(amount, int) - - return sid, amount + rval = namedict({ + 'sid':sid, + 'amount':amount, + 'dt':dt + }) + UNPACK_DATE(rval) + return rval except TypeError: raise INVALID_ORDER_FRAME(msg) except ValueError: diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index a1825d43..474e95cb 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -87,19 +87,25 @@ class FinanceTestCase(TestCase): def test_order_protocol(self): #client places an order - order_msg = zp.ORDER_FRAME(133, 100) + now = datetime.utcnow().replace(tzinfo=pytz.utc) + order = zp.namedict({ + 'dt':now, + 'sid':133, + 'amount':100 + }) + order_msg = zp.ORDER_FRAME(order) #order datasource receives - sid, amount = zp.ORDER_UNFRAME(order_msg) - self.assertEqual(sid, 133) - self.assertEqual(amount, 100) - + order = zp.ORDER_UNFRAME(order_msg) + self.assertEqual(order.sid, 133) + self.assertEqual(order.amount, 100) + self.assertEqual(order.dt, now) + #order datasource datasource frames the order - order_dt = datetime.utcnow().replace(tzinfo=pytz.utc) order_event = zp.namedict({ - "sid" : sid, - "amount" : amount, - "dt" : order_dt, + "sid" : order.sid, + "amount" : order.amount, + "dt" : order.dt, "source_id" : zp.FINANCE_COMPONENT.ORDER_SOURCE, "type" : zp.DATASOURCE_TYPE.ORDER }) @@ -110,7 +116,7 @@ class FinanceTestCase(TestCase): #transaction transform unframes recovered_order = zp.DATASOURCE_UNFRAME(order_ds_msg) - self.assertEqual(order_dt, recovered_order.dt) + self.assertEqual(now, recovered_order.dt) #create a transaction from the order txn = zp.namedict({ @@ -162,6 +168,7 @@ class FinanceTestCase(TestCase): price = [10.1] * 16 volume = [100] * 16 start_date = datetime.strptime("02/1/2012","%m/%d/%Y") + start_date = start_date.replace(tzinfo=pytz.utc) trade_time_increment = timedelta(days=1) trade_history = factory.create_trade_history( @@ -175,12 +182,11 @@ class FinanceTestCase(TestCase): set1 = SpecificEquityTrades("flat-133", trade_history) - trading_client = TradeSimulationClient() + trading_client = TradeSimulationClient(start_date) #client will send 10 orders for 100 shares of 133 test_algo = TestAlgorithm(133, 100, 10, trading_client) - ts = datetime.strptime("02/1/2012","%m/%d/%Y").replace(tzinfo=pytz.utc) - order_source = OrderDataSource(ts) + order_source = OrderDataSource() transaction_sim = TransactionSimulator() sim.register_components([ @@ -236,6 +242,7 @@ class FinanceTestCase(TestCase): price = [10.1] * trade_count volume = [100] * trade_count start_date = datetime.strptime("02/1/2012","%m/%d/%Y") + start_date = start_date.replace(tzinfo=pytz.utc) trade_time_increment = timedelta(days=1) trade_history = factory.create_trade_history( @@ -249,12 +256,10 @@ class FinanceTestCase(TestCase): set1 = SpecificEquityTrades("flat-133", trade_history) #client sill send 10 orders for 100 shares of 133 - trading_client = TradeSimulationClient() + trading_client = TradeSimulationClient(start_date) test_algo = TestAlgorithm(133, 100, 10, trading_client) - ts = datetime.strptime("02/1/2012","%m/%d/%Y") - ts = ts.replace(tzinfo=pytz.utc) - order_source = OrderDataSource(ts) + order_source = OrderDataSource() transaction_sim = TransactionSimulator() perf_tracker = perf.PerformanceTracker( trade_history[0]['dt'],