starting in on pandas integration.

This commit is contained in:
fawce
2012-03-13 12:20:35 -04:00
parent ecd98f1aa0
commit 649a8ea2e5
3 changed files with 80 additions and 42 deletions
+37 -18
View File
@@ -1,6 +1,7 @@
import datetime
import pytz
import math
import pandas
from zmq.core.poll import select
@@ -10,13 +11,16 @@ import zipline.protocol as zp
class TradeSimulationClient(qmsg.Component):
def __init__(self):
def __init__(self, simulation_dt):
qmsg.Component.__init__(self)
self.received_count = 0
self.prev_dt = None
self.event_queue = []
self.event_callbacks = []
self.txn_count = 0
self.current_dt = simulation_dt
self.last_iteration_duration = datetime.timedelta(seconds=0)
self.event_frame = None
@property
def get_id(self):
@@ -52,9 +56,22 @@ class TradeSimulationClient(qmsg.Component):
if(event.TRANSACTION != None):
self.txn_count += 1
for cb in self.event_callbacks:
cb(event)
#filter order flow out of the events sent to callbacks
if event.source_id != zp.FINANCE_COMPONENT.ORDER_SOURCE:
#mark the start time for client's processing of this event.
event_start = datetime.datetime.utcnow()
for cb in self.event_callbacks:
if(event.dt < self.current_dt):
self.queue_event(event)
else:
cb(self.event_frame)
#update time based on receipt of the order
self.last_iteration_duration = datetime.datetime.utcnow() - event_start
self.current_dt = self.current_dt + self.last_iteration_duration
#signal done to order source.
self.order_socket.send(str(zp.ORDER_PROTOCOL.BREAK))
@@ -62,15 +79,26 @@ class TradeSimulationClient(qmsg.Component):
return self.connect_push_socket(self.addresses['order_address'])
def order(self, sid, amount):
self.order_socket.send(zp.ORDER_FRAME(sid, amount))
order = zp.namedict({
'dt':self.current_dt,
'sid':sid,
'amount':amount
})
self.order_socket.send(zp.ORDER_FRAME(order))
def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))
def frame_event(self, event):
if self.event_frame == None:
self.event_frame = pandas.DataFrame()
self.event_frame.append(event)
class OrderDataSource(qmsg.DataSource):
"""DataSource that relays orders from the client"""
def __init__(self, simulation_dt):
def __init__(self):
"""
:param simulation_time: datetime in UTC timezone, sets the start
time of simulation. orders
@@ -83,8 +111,6 @@ class OrderDataSource(qmsg.DataSource):
}
"""
qmsg.DataSource.__init__(self, zp.FINANCE_COMPONENT.ORDER_SOURCE)
self.simulation_dt = simulation_dt
self.last_iteration_duration = datetime.timedelta(seconds=0)
self.sent_count = 0
@property
@@ -99,9 +125,6 @@ class OrderDataSource(qmsg.DataSource):
return self.bind_pull_socket(self.addresses['order_address'])
def do_work(self):
#mark the start time for client's processing of this event.
self.event_start = datetime.datetime.utcnow()
self.simulation_dt = self.simulation_dt + self.last_iteration_duration
#TODO: if this is the first iteration, break deadlock by sending a dummy order
if(self.sent_count == 0):
@@ -109,7 +132,6 @@ class OrderDataSource(qmsg.DataSource):
#pull all orders from client.
orders = []
order_dt = None
count = 0
while True:
@@ -132,6 +154,7 @@ class OrderDataSource(qmsg.DataSource):
return
order_msg = rlist[0].recv()
if order_msg == str(zp.ORDER_PROTOCOL.DONE):
self.signal_done()
return
@@ -139,12 +162,8 @@ class OrderDataSource(qmsg.DataSource):
if order_msg == str(zp.ORDER_PROTOCOL.BREAK):
break
sid, amount = zp.ORDER_UNFRAME(order_msg)
order = zp.ORDER_UNFRAME(order_msg)
#send the order along
self.last_iteration_duration = datetime.datetime.utcnow() - self.event_start
dt = self.simulation_dt + self.last_iteration_duration
order = zp.namedict({"dt":dt, 'sid':sid, 'amount':amount})
self.send(order)
count += 1
self.sent_count += 1
@@ -163,7 +182,7 @@ class TransactionSimulator(qmsg.BaseTransform):
self.open_orders = {}
self.order_count = 0
self.txn_count = 0
self.trade_windwo = datetime.timedelta(seconds=30)
self.trade_window = datetime.timedelta(seconds=30)
self.orderTTL = datetime.timedelta(days=1)
self.volume_share = 0.05
self.commission = 0.03
@@ -221,7 +240,7 @@ class TransactionSimulator(qmsg.BaseTransform):
for order in orders:
#we're using minute bars, so allow orders within
#30 seconds of the trade
if((order.dt - event.dt) < self.trade_windwo):
if((order.dt - event.dt) < self.trade_window):
total_order += order.amount
if(order.dt > dt):
dt = order.dt
+21 -7
View File
@@ -119,6 +119,7 @@ import numbers
import datetime
import pytz
import copy
import pandas
from collections import namedtuple
import zipline.util as qutil
@@ -205,6 +206,9 @@ class namedict(object):
def has_attr(self, name):
return self.__dict__.has_key(name)
def as_series(self):
s = pandas.Series(self.values(), self.keys())
# ================
# Control Protocol
@@ -522,19 +526,29 @@ def TRADE_UNFRAME(msg):
# Orders - from client to order source
# =========
def ORDER_FRAME(sid, amount):
assert isinstance(sid, int)
assert isinstance(amount, int) #no partial shares...
return msgpack.dumps(tuple([sid, amount]))
def ORDER_FRAME(order):
assert isinstance(order.sid, int)
assert isinstance(order.amount, int) #no partial shares...
PACK_DATE(order)
return msgpack.dumps(tuple([
order.sid,
order.amount,
order.dt
]))
def ORDER_UNFRAME(msg):
try:
sid, amount = msgpack.loads(msg)
sid, amount, dt = msgpack.loads(msg)
assert isinstance(sid, int)
assert isinstance(amount, int)
return sid, amount
rval = namedict({
'sid':sid,
'amount':amount,
'dt':dt
})
UNPACK_DATE(rval)
return rval
except TypeError:
raise INVALID_ORDER_FRAME(msg)
except ValueError:
+22 -17
View File
@@ -87,19 +87,25 @@ class FinanceTestCase(TestCase):
def test_order_protocol(self):
#client places an order
order_msg = zp.ORDER_FRAME(133, 100)
now = datetime.utcnow().replace(tzinfo=pytz.utc)
order = zp.namedict({
'dt':now,
'sid':133,
'amount':100
})
order_msg = zp.ORDER_FRAME(order)
#order datasource receives
sid, amount = zp.ORDER_UNFRAME(order_msg)
self.assertEqual(sid, 133)
self.assertEqual(amount, 100)
order = zp.ORDER_UNFRAME(order_msg)
self.assertEqual(order.sid, 133)
self.assertEqual(order.amount, 100)
self.assertEqual(order.dt, now)
#order datasource datasource frames the order
order_dt = datetime.utcnow().replace(tzinfo=pytz.utc)
order_event = zp.namedict({
"sid" : sid,
"amount" : amount,
"dt" : order_dt,
"sid" : order.sid,
"amount" : order.amount,
"dt" : order.dt,
"source_id" : zp.FINANCE_COMPONENT.ORDER_SOURCE,
"type" : zp.DATASOURCE_TYPE.ORDER
})
@@ -110,7 +116,7 @@ class FinanceTestCase(TestCase):
#transaction transform unframes
recovered_order = zp.DATASOURCE_UNFRAME(order_ds_msg)
self.assertEqual(order_dt, recovered_order.dt)
self.assertEqual(now, recovered_order.dt)
#create a transaction from the order
txn = zp.namedict({
@@ -162,6 +168,7 @@ class FinanceTestCase(TestCase):
price = [10.1] * 16
volume = [100] * 16
start_date = datetime.strptime("02/1/2012","%m/%d/%Y")
start_date = start_date.replace(tzinfo=pytz.utc)
trade_time_increment = timedelta(days=1)
trade_history = factory.create_trade_history(
@@ -175,12 +182,11 @@ class FinanceTestCase(TestCase):
set1 = SpecificEquityTrades("flat-133", trade_history)
trading_client = TradeSimulationClient()
trading_client = TradeSimulationClient(start_date)
#client will send 10 orders for 100 shares of 133
test_algo = TestAlgorithm(133, 100, 10, trading_client)
ts = datetime.strptime("02/1/2012","%m/%d/%Y").replace(tzinfo=pytz.utc)
order_source = OrderDataSource(ts)
order_source = OrderDataSource()
transaction_sim = TransactionSimulator()
sim.register_components([
@@ -236,6 +242,7 @@ class FinanceTestCase(TestCase):
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = datetime.strptime("02/1/2012","%m/%d/%Y")
start_date = start_date.replace(tzinfo=pytz.utc)
trade_time_increment = timedelta(days=1)
trade_history = factory.create_trade_history(
@@ -249,12 +256,10 @@ class FinanceTestCase(TestCase):
set1 = SpecificEquityTrades("flat-133", trade_history)
#client sill send 10 orders for 100 shares of 133
trading_client = TradeSimulationClient()
trading_client = TradeSimulationClient(start_date)
test_algo = TestAlgorithm(133, 100, 10, trading_client)
ts = datetime.strptime("02/1/2012","%m/%d/%Y")
ts = ts.replace(tzinfo=pytz.utc)
order_source = OrderDataSource(ts)
order_source = OrderDataSource()
transaction_sim = TransactionSimulator()
perf_tracker = perf.PerformanceTracker(
trade_history[0]['dt'],