mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 05:14:14 +08:00
added an order protocol between orders transform and algo client
This commit is contained in:
+103
-47
@@ -1,86 +1,142 @@
|
||||
import json
|
||||
import datetime
|
||||
import zipline.messaging as qmsg
|
||||
import zipline.util as qutil
|
||||
|
||||
class TradingClient(qmsg.Component):
|
||||
|
||||
class TradeSimulationClient(qmsg.Component):
|
||||
|
||||
def __init__(self):
|
||||
qmsg.Component.__init__(self)
|
||||
self.received_count = 0
|
||||
self.prev_dt = None
|
||||
|
||||
|
||||
def get_id(self):
|
||||
return "TRADING_CLIENT"
|
||||
|
||||
|
||||
def open(self):
|
||||
self.data_feed, self.poller = self.connect_result()
|
||||
self.order_feed = self.connect_order()
|
||||
|
||||
self.result_feed = self.connect_result()
|
||||
self.order_socket = self.connect_order()
|
||||
|
||||
def do_work(self):
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
if self.data_feed in socks and socks[self.data_feed] == self.zmq.POLLIN:
|
||||
msg = self.data_feed.recv()
|
||||
if(self.is_done_message(msg)):
|
||||
qutil.LOGGER.info("Client is DONE!")
|
||||
self.signal_done()
|
||||
return
|
||||
|
||||
self.received_count += 1
|
||||
event = json.loads(msg)
|
||||
self.handle_event(event)
|
||||
#next feed event
|
||||
(rlist, wlist, xlist) = self.poller.selec([self.result_feed],
|
||||
[],
|
||||
[self.result_feed],
|
||||
timeout=self.heartbeat_timeout/1000) #select timeout is in sec
|
||||
#
|
||||
#no more orders, should be an error condition
|
||||
if len(rlist) == 0 or len(xlist) > 0:
|
||||
raise Exception("unexpected end of feed stream")
|
||||
message = rlist[0].recv()
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
self.signal_done()
|
||||
return #leave open orders hanging? client requests for orders?
|
||||
|
||||
event = json.loads(message)
|
||||
self._handle_event(event)
|
||||
|
||||
def connect_order(self):
|
||||
return self.connect_push_socket(self.addresses['order_address'])
|
||||
|
||||
def handle_event(self, event):
|
||||
NotImplemented
|
||||
|
||||
|
||||
def _handle_event(self, event):
|
||||
self.event_queue.append(event)
|
||||
if event['TRADE_SIM']['ALGO_TIME'] <= event['dt']:
|
||||
del(event['TRADE_SIM'])
|
||||
event['dt'] = qutil.parse_date(event['dt'])
|
||||
#event occurred in the present, send the queue to be processed
|
||||
self.handle_events(self.event_queue)
|
||||
|
||||
def handle_events(self, event_queue):
|
||||
raise NotImplementedError
|
||||
|
||||
def order(self, sid, volume):
|
||||
order = {'sid':sid, 'volume':volume}
|
||||
self.order_feed.send(json.dumps(order))
|
||||
|
||||
|
||||
|
||||
|
||||
class TradeSimulator(qmsg.BaseTransform):
|
||||
|
||||
def __init__(self):
|
||||
qmsq.BaseTransform.__init__(self, "")
|
||||
self.open_orders = {}
|
||||
self.algo_time = None
|
||||
self.open_orders = {}
|
||||
self.algo_time = None
|
||||
self.event_start = None
|
||||
self.last_event_time = None
|
||||
self.last_iteration_duration = None
|
||||
|
||||
def get_id(self):
|
||||
return "EQUITY_TRADE_SIM"
|
||||
return "TRADE_SIM"
|
||||
|
||||
def open():
|
||||
qmsg.BaseTransform.open(self)
|
||||
self.order_socket, self.order_poller = self.bind_order()
|
||||
self.order_socket = self.bind_order()
|
||||
|
||||
def bind_order(self):
|
||||
return self.bind_pull_socket(self.addresses['order_address'])
|
||||
|
||||
def do_work(self):
|
||||
"""
|
||||
Loops until feed's DONE message is received:
|
||||
- receive an event from the data feed
|
||||
- call transform (subclass' method) on event
|
||||
- send the transformed event
|
||||
Pulls one message from the event feed, then
|
||||
loops on orders until client sends DONE message.
|
||||
"""
|
||||
socks = dict(self.poller.poll(2000)) #timeout after 2 seconds.
|
||||
if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN:
|
||||
message = self.feed_socket.recv()
|
||||
if(self.is_done_message(message)):
|
||||
self.signal_done()
|
||||
return
|
||||
event = json.loads(message)
|
||||
|
||||
#next feed event
|
||||
(rlist, wlist, xlist) = self.poller.selec([self.feed_socket],
|
||||
[],
|
||||
[self.feed_socket],
|
||||
timeout=self.heartbeat_timeout/1000) #select timeout is in sec
|
||||
#
|
||||
#no more orders, should be an error condition
|
||||
if len(rlist) == 0 or len(xlist) > 0:
|
||||
raise Exception("unexpected end of feed stream")
|
||||
message = rlist[0].recv()
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
self.signal_done()
|
||||
return #leave open orders hanging? client requests for orders?
|
||||
|
||||
#receive all orders.
|
||||
while True:
|
||||
message = self.order_socket.recv()
|
||||
if(self.is_done_message(message)):
|
||||
break; #no more orders on this tick
|
||||
self.add_open_order(json.loads(order))
|
||||
event = json.loads(message)
|
||||
|
||||
if self.last_iteration_duration != None:
|
||||
self.algo_time = self.last_event_time + self.last_iteration_duration
|
||||
|
||||
self.last_event_time = qutil.parse_date(event['dt'])
|
||||
|
||||
if self.algo_time < self.last_event_time:
|
||||
#compress time, move algo's clock to the time of this event
|
||||
self.algo_time = self.last_event_time
|
||||
|
||||
fill = self.process_orders(event)
|
||||
|
||||
#TODO: decide what this transform should send downstream, maybe fills? effective algo time?
|
||||
self.state['value'] = {'FILL':fill,
|
||||
'ALGO_TIME':qutil.format_date(self.algo_time)}
|
||||
|
||||
#mark the start time for client's processing of this event.
|
||||
self.event_start = datetime.datetime.utcnow()
|
||||
self.result_socket.send(json.dumps(cur_state), self.zmq.NOBLOCK)
|
||||
|
||||
|
||||
while True: #this loop should also poll for portfolio state req/rep
|
||||
(rlist, wlist, xlist) = self.poller.selec([self.order_socket],
|
||||
[],
|
||||
[self.order_socket],
|
||||
timeout=self.heartbeat_timeout/1000) #select timeout is in sec
|
||||
|
||||
|
||||
#no more orders, should this be an error condition?
|
||||
if len(rlist) == 0 or len(xlist) > 0:
|
||||
break
|
||||
|
||||
cur_state['id'] = self.state['name']
|
||||
self.result_socket.send(json.dumps(cur_state), self.zmq.NOBLOCK)
|
||||
order_msg = rlist[0].recv()
|
||||
if order_msg == str(CONTROL_PROTOCOL.DONE):
|
||||
break
|
||||
|
||||
order = json.loads(order_msg)
|
||||
self.add_open_order(order)
|
||||
|
||||
#end of order processing loop
|
||||
self.last_iteration_duration = datetime.datetime.utcnow() - self.event_start
|
||||
|
||||
|
||||
def add_open_order(self, order):
|
||||
self.open_orders[order['sid']] = order
|
||||
|
||||
Reference in New Issue
Block a user