Cleaned up OOP, first round.

This commit is contained in:
Stephen Diehl
2012-05-16 14:31:52 -04:00
parent 93cc4bc172
commit 3ad1f250e6
16 changed files with 608 additions and 604 deletions
+144 -149
View File
@@ -5,21 +5,17 @@ import datetime
import pytz
import zipline.utils.factory as factory
import zipline.test_algorithms
#import zipline.util as qutil
import zipline.finance.performance as perf
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.finance.trading import TradeSimulationClient, TradingEnvironment, \
SIMULATION_STYLE
from zipline.finance.trading import TradingEnvironment
class PerformanceTestCase(unittest.TestCase):
def setUp(self):
#qutil.configure_logging()
self.benchmark_returns, self.treasury_curves = \
factory.load_market_data()
random_index = random.randint(
0,
len(self.treasury_curves)
@@ -27,32 +23,32 @@ class PerformanceTestCase(unittest.TestCase):
for n in range(100):
self.dt = self.treasury_curves.keys()[random_index]
self.end_dt = self.dt + datetime.timedelta(days=365)
now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
if self.end_dt <= now:
break
self.trading_environment = TradingEnvironment(
self.benchmark_returns,
self.benchmark_returns,
self.treasury_curves,
period_start = self.dt,
period_end = self.end_dt
)
self.onesec = datetime.timedelta(seconds=1)
self.oneday = datetime.timedelta(days=1)
self.tradingday = datetime.timedelta(hours=6, minutes=30)
self.dt = self.trading_environment.trading_days[random_index]
def tearDown(self):
pass
def test_long_position(self):
"""
verify that the performance period calculates properly for a
verify that the performance period calculates properly for a
single buy transaction
"""
#post some trades in the market
@@ -63,30 +59,30 @@ class PerformanceTestCase(unittest.TestCase):
self.onesec,
self.trading_environment
)
txn = factory.create_txn(1,10.0,100,self.dt + self.onesec)
pp = perf.PerformancePeriod({}, 0.0, 1000.0)
pp.execute_transaction(txn)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.period_capital_used,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(len(pp.positions),1,"should be just one position")
self.assertEqual(
pp.positions[1].sid,
txn.sid,
txn.sid,
"position should be in security with id 1")
self.assertEqual(
pp.positions[1].amount,
txn.amount,
@@ -94,13 +90,13 @@ class PerformanceTestCase(unittest.TestCase):
sharecount=txn.amount
)
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1]['price'],
@@ -110,16 +106,16 @@ class PerformanceTestCase(unittest.TestCase):
act=pp.positions[1].last_sale_price
)
)
self.assertEqual(
pp.ending_value,
1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl, 100, "gain of 1 on 100 shares should be 100")
def test_short_position(self):
"""verify that the performance period calculates properly for a \
single short-sale transaction"""
@@ -130,148 +126,148 @@ single short-sale transaction"""
self.onesec,
self.trading_environment
)
trades_1 = trades[:-2]
txn = factory.create_txn(1, 10.0, -100, self.dt + self.onesec)
pp = perf.PerformancePeriod({}, 0.0, 1000.0)
pp.execute_transaction(txn)
for trade in trades_1:
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.period_capital_used,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction\
cost of sole txn in test"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position")
self.assertEqual(
pp.positions[1].sid,
txn.sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades_1[-1]['price'],
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
-1100,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(pp.pnl,-100,"gain of 1 on 100 shares should be 100")
# simulate additional trades, and ensure that the position value
# simulate additional trades, and ensure that the position value
# reflects the new price
trades_2 = trades[-2:]
#simulate a rollover to a new period
pp2 = perf.PerformancePeriod(
pp.positions,
pp.ending_value,
pp.positions,
pp.ending_value,
pp.ending_cash
)
for trade in trades_2:
pp2.update_last_sale(trade)
pp2.calculate_performance()
self.assertEqual(
pp2.period_capital_used,
0,
"capital used should be zero, there were no transactions in \
performance period"
)
self.assertEqual(
len(pp2.positions),
1,
"should be just one position"
)
self.assertEqual(
pp2.positions[1].sid,
txn.sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp2.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
pp2.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
pp2.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp2.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
pp2.pnl,
200,
"drop of 2 on -100 shares should be 200"
)
#now run a performance period encompassing the entire trade sample.
ppTotal = perf.PerformancePeriod({}, 0.0, 1000.0)
for trade in trades_1:
ppTotal.update_last_sale(trade)
ppTotal.execute_transaction(txn)
for trade in trades_2:
ppTotal.update_last_sale(trade)
ppTotal.calculate_performance()
self.assertEqual(
ppTotal.period_capital_used,
-1 * txn.price * txn.amount,
"capital used should be equal to the opposite of the transaction \
cost of sole txn in test"
)
self.assertEqual(
len(ppTotal.positions),
1,
@@ -279,44 +275,44 @@ cost of sole txn in test"
)
self.assertEqual(
ppTotal.positions[1].sid,
txn.sid,
txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
ppTotal.positions[1].amount,
-100,
"should have a position of -100 shares"
)
self.assertEqual(
ppTotal.positions[1].cost_basis,
txn.price,
"should have a cost basis of 10"
)
self.assertEqual(
ppTotal.positions[1].last_sale_price,
trades_2[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
ppTotal.ending_value,
-900,
"ending value should be price of last trade times number of \
shares in position")
self.assertEqual(
ppTotal.pnl,
100,
"drop of 1 on -100 shares should be 100"
)
def test_covering_short(self):
"""verify performance where short is bought and covered, and shares \
trade after cover"""
trades = factory.create_trade_history(
1,
[10,10,10,11,9,8,7,8,9,10],
@@ -324,104 +320,104 @@ trade after cover"""
self.onesec,
self.trading_environment
)
short_txn = factory.create_txn(
1,
10.0,
-100,
self.dt + self.onesec
)
cover_txn = factory.create_txn(1,7.0,100,self.dt + self.onesec * 6)
pp = perf.PerformancePeriod({}, 0.0, 1000.0)
pp.execute_transaction(short_txn)
pp.execute_transaction(cover_txn)
for trade in trades:
pp.update_last_sale(trade)
pp.calculate_performance()
short_txn_cost = short_txn.price * short_txn.amount
cover_txn_cost = cover_txn.price * cover_txn.amount
self.assertEqual(
pp.period_capital_used,
-1 * short_txn_cost - cover_txn_cost,
"capital used should be equal to the net transaction costs"
)
self.assertEqual(
len(pp.positions),
1,
"should be just one position"
)
self.assertEqual(
pp.positions[1].sid,
short_txn.sid,
short_txn.sid,
"position should be in security from the transaction"
)
self.assertEqual(
pp.positions[1].amount,
0,
"should have a position of -100 shares"
)
self.assertEqual(
pp.positions[1].cost_basis,
0,
"a covered position should have a cost basis of 0"
)
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1].price,
"last sale should be price of last trade"
)
self.assertEqual(
pp.ending_value,
0,
"ending value should be price of last trade times number of \
shares in position"
)
self.assertEqual(
pp.pnl,
pp.pnl,
300,
"gain of 1 on 100 shares should be 300"
)
def test_cost_basis_calc(self):
trades = factory.create_trade_history(
1,
[10,11,11,12],
[100,100,100,100],
1,
[10,11,11,12],
[100,100,100,100],
self.onesec,
self.trading_environment
)
transactions = factory.create_txn_history(
1,
[10,11,11,12],
[100,100,100,100],
1,
[10,11,11,12],
[100,100,100,100],
self.onesec,
self.trading_environment
)
pp = perf.PerformancePeriod({}, 0.0, 1000.0)
for txn in transactions:
pp.execute_transaction(txn)
for trade in trades:
pp.update_last_sale(trade)
pp.update_last_sale(trade)
pp.calculate_performance()
self.assertEqual(
pp.positions[1].last_sale_price,
trades[-1].price,
@@ -429,72 +425,72 @@ shares in position"
val=pp.positions[1].last_sale_price
)
)
self.assertEqual(
pp.positions[1].cost_basis,
11,
"should have a cost basis of 11"
)
self.assertEqual(
pp.pnl,
pp.pnl,
400
)
saleTxn = factory.create_txn(
1,
10.0,
-100,
self.dt + self.onesec * 4)
down_tick = factory.create_trade(
1,
10.0,
100,
trades[-1].dt + self.onesec)
pp2 = perf.PerformancePeriod(
copy.deepcopy(pp.positions),
pp.ending_value,
pp2 = perf.PerformancePeriod(
copy.deepcopy(pp.positions),
pp.ending_value,
pp.ending_cash
)
pp2.execute_transaction(saleTxn)
pp2.update_last_sale(down_tick)
pp2.calculate_performance()
pp2.calculate_performance()
self.assertEqual(
pp2.positions[1].last_sale_price,
10,
"should have a last sale of 10, was {val}".format(val=pp2.positions[1].last_sale_price)
)
self.assertEqual(
round(pp2.positions[1].cost_basis,2),
11.33,
"should have a cost basis of 11.33"
)
#print "second period pnl is {pnl}".format(pnl=pp2.pnl)
self.assertEqual(pp2.pnl, -800, "this period goes from +400 to -400")
pp3 = perf.PerformancePeriod({}, 0.0, 1000.0)
transactions.append(saleTxn)
for txn in transactions:
pp3.execute_transaction(txn)
trades.append(down_tick)
for trade in trades:
pp3.update_last_sale(trade)
pp3.calculate_performance()
self.assertEqual(
pp3.positions[1].last_sale_price,
10,
"should have a last sale of 10"
)
self.assertEqual(
round(pp3.positions[1].cost_basis,2),
11.33,
@@ -502,47 +498,47 @@ shares in position"
)
self.assertEqual(
pp3.pnl,
-400,
pp3.pnl,
-400,
"should be -400 for all trades and transactions in period"
)
def test_tracker(self):
trade_count = 100
sid = 133
price = 10.1
price = 10.1
price_list = [price] * trade_count
volume = [100] * trade_count
trade_time_increment = datetime.timedelta(days=1)
trade_history = factory.create_trade_history(
sid,
price_list,
volume,
trade_time_increment,
self.trading_environment
trade_history = factory.create_trade_history(
sid,
price_list,
volume,
trade_time_increment,
self.trading_environment
)
sid2 = 134
price2 = 12.12
price2_list = [price2] * trade_count
trade_history2 = factory.create_trade_history(
sid2,
price2_list,
volume,
trade_time_increment,
self.trading_environment
price2_list = [price2] * trade_count
trade_history2 = factory.create_trade_history(
sid2,
price2_list,
volume,
trade_time_increment,
self.trading_environment
)
trade_history.extend(trade_history2)
self.trading_environment.period_start = trade_history[0].dt
self.trading_environment.period_end = trade_history[-1].dt
self.trading_environment.capital_base = 1000.0
self.trading_environment.frame_index = ['sid', 'volume', 'dt', \
'price', 'changed']
perf_tracker = perf.PerformanceTracker(self.trading_environment)
for event in trade_history:
#create a transaction for all but
#first trade in each sid, to simulate None transaction
@@ -556,14 +552,13 @@ shares in position"
})
else:
txn = None
event[zp.TRANSFORM_TYPE.TRANSACTION] = txn
event[zp.TRANSFORM_TYPE.TRANSACTION] = txn
perf_tracker.process_event(event)
#we skip two trades, to test case of None transaction
txn_count = len(trade_history) - 2
self.assertEqual(perf_tracker.txn_count, txn_count)
cumulative_pos = perf_tracker.cumulative_performance.positions[sid]
expected_size = txn_count / 2 * -25
self.assertEqual(cumulative_pos.amount, expected_size)
+2
View File
@@ -2,10 +2,12 @@ from feed import Feed
from merge import Merge
from passthrough import PassthroughTransform
from datasource import DataSource
from tradesimulation import TradeSimulationClient
__all__ = [
Feed,
Merge,
PassthroughTransform,
DataSource,
TradeSimulationClient,
]
+1 -6
View File
@@ -17,9 +17,7 @@ class Feed(Component):
context (thread, process, etc) and run in another.
"""
def __init__(self):
Component.__init__(self)
def init(self):
self.sent_count = 0
self.received_count = 0
self.draining = False
@@ -33,9 +31,6 @@ class Feed(Component):
self.sent_counters = Counter()
self.recv_counters = Counter()
def init(self):
pass
@property
def get_id(self):
return "FEED"
+13 -9
View File
@@ -3,21 +3,25 @@ from feed import Feed
import zipline.protocol as zp
from zipline.protocol import COMPONENT_TYPE
# TODO: By Liskov merge must *be* a feed, don't believe this is
# the case.
from collections import Counter
class Merge(Feed):
"""
Merges multiple streams of events into single messages.
"""
def __init__(self):
Feed.__init__(self)
self.init()
def init(self):
pass
self.sent_count = 0
self.received_count = 0
self.draining = False
self.ds_finished_counter = 0
# Depending on the size of this, might want to use a data
# structure with better asymptotics.
self.data_buffer = {}
# source_id -> integer count
self.sent_counters = Counter()
self.recv_counters = Counter()
@property
def get_id(self):
+165
View File
@@ -0,0 +1,165 @@
import logging
import datetime
import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.core.component import Component
from zipline.finance.trading import TransactionSimulator
from zipline.utils.protocol_utils import ndict
LOGGER = logging.getLogger('ZiplineLogger')
class TradeSimulationClient(Component):
def init(self, trading_environment, sim_style):
self.received_count = 0
self.prev_dt = None
self.event_queue = None
self.txn_count = 0
self.order_count = 0
self.trading_environment = trading_environment
self.current_dt = trading_environment.period_start
self.last_iteration_dur = datetime.timedelta(seconds=0)
self.algorithm = None
self.max_wait = datetime.timedelta(seconds=60)
self.last_msg_dt = datetime.datetime.utcnow()
self.txn_sim = TransactionSimulator(sim_style)
self.event_data = ndict()
self.perf = perf.PerformanceTracker(self.trading_environment)
@property
def get_id(self):
return str(zp.FINANCE_COMPONENT.TRADING_CLIENT)
def set_algorithm(self, algorithm):
"""
:param algorithm: must implement the algorithm protocol. See
:py:mod:`zipline.test.algorithm`
"""
self.algorithm = algorithm
# register the trading_client's order method with the algorithm
self.algorithm.set_order(self.order)
# ask the algorithm to initialize
self.algorithm.initialize()
def open(self):
self.result_feed = self.connect_result()
def do_work(self):
# poll all the sockets
socks = dict(self.poll.poll(self.heartbeat_timeout))
# see if the poller has results for the result_feed
if socks.get(self.result_feed) == self.zmq.POLLIN:
self.last_msg_dt = datetime.datetime.utcnow()
# get the next message from the result feed
msg = self.result_feed.recv()
# if the feed is done, shut 'er down
if msg == str(zp.CONTROL_PROTOCOL.DONE):
self.finish_simulation()
return
# result_feed is a merge component, so unframe accordingly
event = zp.MERGE_UNFRAME(msg)
self.received_count += 1
# update performance and relay the event to the algorithm
self.process_event(event)
if self.perf.exceeded_max_loss:
self.finish_simulation()
def finish_simulation(self):
LOGGER.info("Client is DONE!")
# signal the performance tracker that the simulation has
# ended. Perf will internally calculate the full risk report.
self.perf.handle_simulation_end()
# signal Simulator, our ComponentHost, that this component is
# done and Simulator needn't block exit on this component.
self.signal_done()
def process_event(self, event):
# generate transactions, if applicable
txn = self.txn_sim.apply_trade_to_open_orders(event)
if txn:
event.TRANSACTION = txn
# track the number of transactions, for testing purposes.
self.txn_count += 1
else:
event.TRANSACTION = None
# the performance class needs to process each event, without
# skipping. Algorithm should wait until the performance has been
# updated, so that down stream components can safely assume that
# performance is up to date. Note that this is done before we
# mark the time for the algorithm's processing, thereby not
# running the algo's clock for performance book keeping.
self.perf.process_event(event)
# mark the start time for client's processing of this event.
event_start = datetime.datetime.utcnow()
# queue the event.
self.queue_event(event)
# if the event is later than our current time, run the algo
# otherwise, the algorithm has fallen behind the feed
# and processing per event is longer than time between events.
if event.dt >= self.current_dt:
# compress time by moving the current_time up to the event
# time.
self.current_dt = event.dt
self.run_algorithm()
# tally the time spent on this iteration
self.last_iteration_dur = datetime.datetime.utcnow() - event_start
# move the algorithm's clock forward to include iteration time
self.current_dt = self.current_dt + self.last_iteration_dur
def run_algorithm(self):
"""
As per the algorithm protocol:
- Set the current portfolio for the algorithm as per protocol.
- Construct data based on backlog of events, send to algorithm.
"""
current_portfolio = self.perf.get_portfolio()
self.algorithm.set_portfolio(current_portfolio)
data = self.get_data()
if len(data) > 0:
self.algorithm.handle_data(data)
def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])
def order(self, sid, amount):
order = zp.ndict({
'dt':self.current_dt,
'sid':sid,
'amount':amount
})
self.order_count += 1
self.perf.log_order(order)
self.txn_sim.add_open_order(order)
def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))
def queue_event(self, event):
if self.event_queue == None:
self.event_queue = []
self.event_queue.append(event)
def get_data(self):
for event in self.event_queue:
self.event_data[event['sid']] = event
self.event_queue = []
return self.event_data
+12 -12
View File
@@ -24,6 +24,8 @@ from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
LOGGER = logging.getLogger('ZiplineLogger')
from zipline.exceptions import ComponentNoInit
class Component(object):
"""
Base class for components. Defines the the base messaging
@@ -64,7 +66,11 @@ class Component(object):
"""
def __init__(self):
# ------------
# Construction
# ------------
def __init__(self, *args, **kwargs):
self.zmq = None
self.context = None
self.addresses = None
@@ -90,14 +96,16 @@ class Component(object):
self.guid = uuid.uuid4()
self.huid = humanhash.humanize(self.guid.hex)
self.init()
# This is where component specific constructors should be
# defined. Arguments passed to init are threaded through.
self.init(*args, **kwargs)
def init(self):
"""
Subclasses should override this to extend the setup for
the class. Shouldn't have side effects.
"""
pass
raise ComponentNoInit(self.__class__)
# ------------
@@ -515,14 +523,6 @@ class Component(object):
"""
return False
def note(self):
"""
Information about the component. Mostly used for testing.
"""
def get_note(self):
return self.note or ''
def debug(self):
"""
Debug information about the component.
@@ -552,7 +552,7 @@ class Component(object):
return "<{name} {uuid} at {host} {pid} {pointer}>".format(
name = self.get_id ,
uuid = self.huid ,
uuid = self.guid ,
host = socket.gethostname() ,
pid = os.getpid() ,
pointer = hex(id(self)) ,
+5
View File
@@ -0,0 +1,5 @@
from utils.exception_utils import CustomException
class ComponentNoInit(CustomException):
argmap = ('classname',)
message = """Class {classname} does not define an init method."""
+12 -14
View File
@@ -4,35 +4,35 @@ from collections import defaultdict
from zipline.transforms.base import BaseTransform
class MovingAverageTransform(BaseTransform):
def init(self, daycount=3):
self.daycount = daycount
self.by_sid = defaultdict(self._create)
def transform(self, event):
cur = self.by_sid[event.sid]
cur.update(event)
self.state['value'] = cur.average
return self.state
def _create(self):
return MovingAverage(self.daycount)
class MovingAverage(object):
def __init__(self, daycount):
self.window = EventWindow(daycount)
self.total = 0.0
self.average = 0.0
def update(self, event):
self.window.update(event)
self.total += event.price
for dropped in self.window.dropped_ticks:
self.total -= dropped.price
if len(self.window.ticks) > 0:
self.average = self.total / len(self.window.ticks)
else:
@@ -47,21 +47,19 @@ class EventWindow(object):
self.ticks = []
self.dropped_ticks = []
self.delta = timedelta(days=daycount)
def update(self, event):
# add new event
self.ticks.append(event)
self.ticks.append(event)
# determine which events are expired
last_date = event['dt']
first_date = last_date - self.delta
self.dropped_ticks = []
for tick in self.ticks:
if tick['dt'] <= first_date:
self.dropped_ticks.append(tick)
# remove the expired events
slice_index = len(self.dropped_ticks)
slice_index = len(self.dropped_ticks)
self.ticks = self.ticks[slice_index:]
+52 -56
View File
@@ -130,7 +130,7 @@ class PerformanceTracker():
Tracks the performance of the zipline as it is running in
the simulator, relays this out to the Deluge broker and then
to the client. Visually:
+--------------------+ Result Stream +--------+
| PerformanceTracker | ----------------> | Deluge |
+--------------------+ +--------+
@@ -138,8 +138,8 @@ class PerformanceTracker():
"""
def __init__(self, trading_environment):
self.trading_environment = trading_environment
self.trading_day = datetime.timedelta(hours = 6, minutes = 30)
self.calendar_day = datetime.timedelta(hours = 24)
@@ -152,7 +152,7 @@ class PerformanceTracker():
self.progress = 0.0
self.total_days = self.trading_environment.days_in_period
# one indexed so that we reach 100%
self.day_count = 0.0
self.day_count = 0.0
self.capital_base = self.trading_environment.capital_base
self.returns = []
self.txn_count = 0
@@ -174,7 +174,7 @@ class PerformanceTracker():
self.period_start,
self.period_end
)
# this performance period will span just the current market day
self.todays_performance = PerformancePeriod(
# initial positions are empty
@@ -220,17 +220,17 @@ class PerformanceTracker():
'capital_base' : self.capital_base,
'cumulative_perf' : self.cumulative_performance.to_dict(),
'daily_perf' : self.todays_performance.to_dict(),
'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict()
'cumulative_risk_metrics' : self.cumulative_risk_metrics.to_dict()
}
def log_order(self, order):
self.order_log.append(order)
def process_event(self, event):
if self.exceeded_max_loss:
return
assert isinstance(event, zp.ndict)
self.event_count += 1
@@ -241,7 +241,7 @@ class PerformanceTracker():
self.txn_count += 1
self.cumulative_performance.execute_transaction(event.TRANSACTION)
self.todays_performance.execute_transaction(event.TRANSACTION)
#update last sale
self.cumulative_performance.update_last_sale(event)
self.todays_performance.update_last_sale(event)
@@ -251,7 +251,7 @@ class PerformanceTracker():
#calculate performance as of last trade
self.cumulative_performance.calculate_performance()
self.todays_performance.calculate_performance()
# add the return results from today to the list of DailyReturn objects.
todays_date = self.market_close.replace(hour=0, minute=0, second=0)
todays_return_obj = risk.DailyReturn(
@@ -267,17 +267,17 @@ class PerformanceTracker():
returns=self.returns,
trading_environment=self.trading_environment
)
# increment the day counter before we move markers forward.
self.day_count += 1.0
# calculate progress of test
self.progress = self.day_count / self.total_days
# Output results
if self.result_stream:
msg = zp.PERF_FRAME(self.to_dict())
self.result_stream.send(msg)
#
if self.trading_environment.max_drawdown:
returns = self.todays_performance.returns
@@ -285,13 +285,13 @@ class PerformanceTracker():
if returns < max_dd:
LOGGER.info(str(returns) + " broke through " + str(max_dd))
LOGGER.info("Exceeded max drawdown.")
# mark the perf period with max loss flag,
# mark the perf period with max loss flag,
# so it shows up in the update, but don't end the test
# here. Let the update go out before stopping
self.exceeded_max_loss = True
return
#move the market day markers forward
self.market_open = self.market_open + self.calendar_day
@@ -301,7 +301,7 @@ class PerformanceTracker():
self.market_open = self.market_open + self.calendar_day
self.market_close = self.market_open + self.trading_day
# Roll over positions to current day.
self.todays_performance = PerformancePeriod(
self.todays_performance.positions,
@@ -317,27 +317,27 @@ class PerformanceTracker():
When the simulation is complete, run the full period risk report
and send it out on the result_stream.
"""
log_msg = "Simulated {n} trading days out of {m}."
LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
# the stream will end on the last trading day, but will not trigger
# an end of day, so we trigger the final market close here.
# In the case of max drawdown, we needn't close again.
if not self.exceeded_max_loss:
self.handle_market_close()
self.risk_report = risk.RiskReport(
self.returns,
self.trading_environment,
exceeded_max_loss = self.exceeded_max_loss
)
if self.result_stream:
LOGGER.info("about to stream the risk report...")
risk_dict = self.risk_report.to_dict()
msg = zp.RISK_FRAME(risk_dict)
self.result_stream.send(msg)
# this signals that the simulation is complete.
@@ -399,17 +399,17 @@ class Position():
class PerformancePeriod():
def __init__(
self,
initial_positions,
starting_value,
self,
initial_positions,
starting_value,
starting_cash,
period_open=None,
period_close=None,
period_close=None,
keep_transactions=False):
self.period_open = period_open
self.period_close = period_close
self.ending_value = 0.0
self.period_capital_used = 0.0
self.pnl = 0.0
@@ -424,7 +424,7 @@ class PerformancePeriod():
self.cumulative_capital_used = 0.0
self.max_capital_used = 0.0
self.max_leverage = 0.0
self.calculate_performance()
def calculate_performance(self):
@@ -441,39 +441,39 @@ class PerformancePeriod():
self.returns = 0.0
def execute_transaction(self, txn):
# Update Position
# ----------------
if(not self.positions.has_key(txn.sid)):
self.positions[txn.sid] = Position(txn.sid)
self.positions[txn.sid].update(txn)
self.period_capital_used += -1 * txn.price * txn.amount
# Max Leverage
# ---------------
# Calculate the maximum capital used and maximum leverage
transaction_cost = txn.price * txn.amount
self.cumulative_capital_used += transaction_cost
if math.fabs(self.cumulative_capital_used) > self.max_capital_used:
self.max_capital_used = math.fabs(self.cumulative_capital_used)
# We want to conveye a level, rather than a precise figure.
# round to the nearest 5,000 to keep the number easy on the eyes
self.max_capital_used = self.round_to_nearest(
self.max_capital_used,
base=5000
)
# we're adding a 10% cushion to the capital used.
self.max_leverage = 1.1 * self.max_capital_used / self.starting_cash
# add transaction to the list of processed transactions
# add transaction to the list of processed transactions
if self.keep_transactions:
self.processed_transactions.append(txn)
def round_to_nearest(self, x, base=5):
return int(base * round(float(x)/base))
@@ -491,7 +491,7 @@ class PerformancePeriod():
def to_dict(self):
"""
Creates a dictionary representing the state of this performance
Creates a dictionary representing the state of this performance
period. See header comments for a detailed description.
"""
positions = self.get_positions_list()
@@ -514,24 +514,24 @@ class PerformancePeriod():
'period_open' : self.period_open,
'period_close' : self.period_close
}
# we want the key to be absent, not just empty
if not self.keep_transactions:
del(rval['transactions'])
del rval['transactions']
return rval
def to_ndict(self):
"""
Creates a ndict representing the state of this perfomance period.
Properties are the same as the results of to_dict. See header comments
for a detailed description.
for a detailed description.
"""
positions = self.get_positions(ndicted=True)
positions = zp.ndict(positions)
return zp.ndict({
'ending_value' : self.ending_value,
'capital_used' : self.period_capital_used,
@@ -540,11 +540,11 @@ class PerformancePeriod():
'ending_cash' : self.ending_cash,
'cumulative_capital_used' : self.cumulative_capital_used,
'max_capital_used' : self.max_capital_used,
'max_leverage' : self.max_leverage,
'max_leverage' : self.max_leverage,
'positions' : positions,
'transactions' : self.processed_transactions
})
def get_positions(self, ndicted=False):
positions = {}
for sid, pos in self.positions.iteritems():
@@ -553,9 +553,9 @@ class PerformancePeriod():
positions[sid] = zp.ndict(cur)
else:
positions[sid] = cur
return positions
#
def get_positions_list(self):
positions = []
@@ -563,7 +563,3 @@ class PerformancePeriod():
cur = pos.to_dict()
positions.append(cur)
return positions
+8 -8
View File
@@ -2,16 +2,16 @@ from collections import defaultdict
from zipline.transforms.base import BaseTransform
class ReturnsTransform(BaseTransform):
def init(self):
self.by_sid = defaultdict(self._create)
def transform(self, event):
cur = self.by_sid[event.sid]
cur.update(event)
self.state['value'] = cur.returns
return self.state
def _create(self):
return ReturnsFromPriorClose()
@@ -20,24 +20,24 @@ class ReturnsFromPriorClose(object):
Calculates a security's returns since the previous close, using the
current price.
"""
def __init__(self):
self.last_close = None
self.last_event = None
self.returns = 0.0
def update(self, event):
next_close = None
if self.last_close:
change = event.price - self.last_close.price
self.returns = change / self.last_close.price
if self.last_event:
if self.last_event.dt.day != event.dt.day:
# the current event is from the day after
# the last event. Therefore the last event was
# the last close
self.last_close = self.last_event
# the current event is now the last_event
self.last_event = event
self.last_event = event
+11 -12
View File
@@ -39,7 +39,6 @@ Risk Report
import logging
import datetime
import math
import pytz
import numpy as np
import numpy.linalg as la
import zipline.protocol as zp
@@ -65,7 +64,7 @@ def advance_by_months(dt, jump_in_months):
class DailyReturn():
def __init__(self, date, returns):
assert isinstance(date, datetime.datetime)
self.date = date.replace(hour=0, minute=0, second=0)
self.returns = returns
@@ -83,18 +82,18 @@ class DailyReturn():
class RiskMetrics():
def __init__(self, start_date, end_date, returns, trading_environment):
self.treasury_curves = trading_environment.treasury_curves
self.treasury_curves = trading_environment.treasury_curves
self.start_date = start_date
self.end_date = end_date
self.trading_environment = trading_environment
self.algorithm_period_returns, self.algorithm_returns = \
self.calculate_period_returns(returns)
benchmark_returns = [
x for x in self.trading_environment.benchmark_returns
if x.date >= returns[0].date and x.date <= returns[-1].date
]
self.benchmark_period_returns, self.benchmark_returns = \
self.calculate_period_returns(benchmark_returns)
@@ -196,7 +195,7 @@ class RiskMetrics():
"""
if self.algorithm_volatility == 0:
return 0.0
return ( (self.algorithm_period_returns - self.treasury_period_return) /
self.algorithm_volatility )
@@ -311,15 +310,15 @@ class RiskMetrics():
that date doesn't exceed treasury history range."
message = message.format(dt=self.end_date,term=self.treasury_duration)
raise Exception(message)
class RiskReport():
def __init__(
self,
algorithm_returns,
trading_environment,
self,
algorithm_returns,
trading_environment,
exceeded_max_loss=False):
"""
algorithm_returns needs to be a list of daily_return objects
@@ -336,7 +335,7 @@ class RiskReport():
else:
start_date = self.algorithm_returns[0].date
end_date = self.algorithm_returns[-1].date
self.month_periods = self.periodsInRange(1, start_date, end_date)
self.three_month_periods = self.periodsInRange(3, start_date, end_date)
self.six_month_periods = self.periodsInRange(6, start_date, end_date)
@@ -370,7 +369,7 @@ class RiskReport():
ends = []
cur_start = start.replace(day=1)
# in edge cases (all sids filtered out, start/end are adjacent)
# in edge cases (all sids filtered out, start/end are adjacent)
# a test will not generate any returns data
if len(self.algorithm_returns) == 0:
return ends
+93 -272
View File
@@ -1,198 +1,23 @@
import logging
import datetime
import pytz
import math
import time
import logging
import datetime
from collections import Counter
# from gevent.select import select
from zipline.core import Component
import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.utils.protocol_utils import Enum, ndict
# the simulation style enumerates the available transaction simulation
# strategies.
SIMULATION_STYLE = Enum(
'PARTIAL_VOLUME',
'BUY_ALL',
'FIXED_SLIPPAGE',
'NOOP'
)
from zipline.protocol import SIMULATION_STYLE
LOGGER = logging.getLogger('ZiplineLogger')
class TradeSimulationClient(Component):
def __init__(self, trading_environment, sim_style):
Component.__init__(self)
self.received_count = 0
self.prev_dt = None
self.event_queue = None
self.txn_count = 0
self.order_count = 0
self.trading_environment = trading_environment
self.current_dt = trading_environment.period_start
self.last_iteration_dur = datetime.timedelta(seconds=0)
self.algorithm = None
self.max_wait = datetime.timedelta(seconds=60)
self.last_msg_dt = datetime.datetime.utcnow()
self.txn_sim = TransactionSimulator(sim_style)
self.event_data = ndict()
self.perf = perf.PerformanceTracker(self.trading_environment)
@property
def get_id(self):
return str(zp.FINANCE_COMPONENT.TRADING_CLIENT)
def set_algorithm(self, algorithm):
"""
:param algorithm: must implement the algorithm protocol. See
:py:mod:`zipline.test.algorithm`
"""
self.algorithm = algorithm
# register the trading_client's order method with the algorithm
self.algorithm.set_order(self.order)
# ask the algorithm to initialize
self.algorithm.initialize()
def open(self):
self.result_feed = self.connect_result()
def do_work(self):
# poll all the sockets
socks = dict(self.poll.poll(self.heartbeat_timeout))
# see if the poller has results for the result_feed
if self.result_feed in socks and \
socks[self.result_feed] == self.zmq.POLLIN:
self.last_msg_dt = datetime.datetime.utcnow()
# get the next message from the result feed
msg = self.result_feed.recv()
# if the feed is done, shut 'er down
if msg == str(zp.CONTROL_PROTOCOL.DONE):
self.finish_simulation()
return
# result_feed is a merge component, so unframe accordingly
event = zp.MERGE_UNFRAME(msg)
self.received_count += 1
# update performance and relay the event to the algorithm
self.process_event(event)
if self.perf.exceeded_max_loss:
self.finish_simulation()
def finish_simulation(self):
LOGGER.info("Client is DONE!")
# signal the performance tracker that the simulation has
# ended. Perf will internally calculate the full risk report.
self.perf.handle_simulation_end()
# signal Simulator, our ComponentHost, that this component is
# done and Simulator needn't block exit on this component.
self.signal_done()
def process_event(self, event):
# generate transactions, if applicable
txn = self.txn_sim.apply_trade_to_open_orders(event)
if txn:
event.TRANSACTION = txn
# track the number of transactions, for testing purposes.
self.txn_count += 1
else:
event.TRANSACTION = None
# the performance class needs to process each event, without
# skipping. Algorithm should wait until the performance has been
# updated, so that down stream components can safely assume that
# performance is up to date. Note that this is done before we
# mark the time for the algorithm's processing, thereby not
# running the algo's clock for performance book keeping.
self.perf.process_event(event)
# mark the start time for client's processing of this event.
event_start = datetime.datetime.utcnow()
# queue the event.
self.queue_event(event)
# if the event is later than our current time, run the algo
# otherwise, the algorithm has fallen behind the feed
# and processing per event is longer than time between events.
if event.dt >= self.current_dt:
# compress time by moving the current_time up to the event
# time.
self.current_dt = event.dt
self.run_algorithm()
# tally the time spent on this iteration
self.last_iteration_dur = datetime.datetime.utcnow() - event_start
# move the algorithm's clock forward to include iteration time
self.current_dt = self.current_dt + self.last_iteration_dur
def run_algorithm(self):
"""
As per the algorithm protocol:
- Set the current portfolio for the algorithm as per protocol.
- Construct data based on backlog of events, send to algorithm.
"""
current_portfolio = self.perf.get_portfolio()
self.algorithm.set_portfolio(current_portfolio)
data = self.get_data()
if len(data) > 0:
self.algorithm.handle_data(data)
def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])
def order(self, sid, amount):
order = zp.ndict({
'dt':self.current_dt,
'sid':sid,
'amount':amount
})
self.order_count += 1
self.perf.log_order(order)
self.txn_sim.add_open_order(order)
def signal_order_done(self):
self.order_socket.send(str(zp.ORDER_PROTOCOL.DONE))
def queue_event(self, event):
if self.event_queue == None:
self.event_queue = []
self.event_queue.append(event)
def get_data(self):
for event in self.event_queue:
self.event_data[event['sid']] = event
self.event_queue = []
return self.event_data
class TransactionSimulator(object):
def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME):
def __init__(self, style=SIMULATION_STYLE.PARTIAL_VOLUME):
self.open_orders = {}
self.order_count = 0
self.txn_count = 0
self.trade_window = datetime.timedelta(seconds=30)
self.orderTTL = datetime.timedelta(days=1)
self.commission = 0.03
if not style or style == SIMULATION_STYLE.PARTIAL_VOLUME:
self.apply_trade_to_open_orders = self.simulate_with_partial_volume
elif style == SIMULATION_STYLE.BUY_ALL:
@@ -201,83 +26,82 @@ class TransactionSimulator(object):
self.apply_trade_to_open_orders = self.simulate_with_fixed_cost
elif style == SIMULATION_STYLE.NOOP:
self.apply_trade_to_open_orders = self.simulate_noop
def add_open_order(self, event):
"""Orders are captured in a buffer by sid. No calculations are done here.
Amount is explicitly converted to an int.
Orders of amount zero are ignored.
"""
# Orders are captured in a buffer by sid. No calculations are done here.
# Amount is explicitly converted to an int.
# Orders of amount zero are ignored.
self.order_count += 1
event.amount = int(event.amount)
if event.amount == 0:
log = "requested to trade zero shares of {sid}".format(
sid=event.sid
)
LOGGER.debug(log)
return
if(not self.open_orders.has_key(event.sid)):
if not self.open_orders.has_key(event.sid):
self.open_orders[event.sid] = []
# set the filled property to zero
event.filled = 0
self.open_orders[event.sid].append(event)
def simulate_buy_all(self, event):
txn = self.create_transaction(
event.sid,
event.volume,
event.price,
event.dt,
1
)
event.sid,
event.volume,
event.price,
event.dt,
1
)
return txn
def simulate_noop(self, event):
return None
return None
def simulate_with_fixed_cost(self, event):
if self.open_orders.has_key(event.sid):
orders = self.open_orders[event.sid]
orders = self.open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
else:
return None
amount = 0
for order in orders:
amount += order.amount
if(amount == 0):
return
direction = amount / math.fabs(amount)
txn = self.create_transaction(
event.sid,
amount,
event.price + 0.10,
event.dt,
direction
)
event.sid,
amount,
event.price + 0.10,
event.dt,
direction
)
self.open_orders[event.sid] = []
return txn
def simulate_with_partial_volume(self, event):
if(event.volume == 0):
#there are zero volume events bc some stocks trade
#there are zero volume events bc some stocks trade
#less frequently than once per minute.
return None
if self.open_orders.has_key(event.sid):
orders = self.open_orders[event.sid]
orders = self.open_orders[event.sid]
orders = sorted(orders, key=lambda o: o.dt)
else:
return None
dt = event.dt
expired = []
total_order = 0
@@ -285,87 +109,87 @@ class TransactionSimulator(object):
simulated_impact = 0.0
direction = 1.0
for order in orders:
if(order.dt < event.dt):
# orders are only good on the day they are issued
if order.dt.day < event.dt.day:
continue
open_amount = order.amount - order.filled
if(open_amount != 0):
direction = open_amount / math.fabs(open_amount)
else:
direction = 1
desired_order = total_order + open_amount
volume_share = direction * (desired_order) / event.volume
if volume_share > .25:
volume_share = .25
simulated_amount = int(volume_share * event.volume * direction)
simulated_impact = (volume_share)**2 * .1 * direction * event.price
order.filled += (simulated_amount - total_order)
total_order = simulated_amount
# we cap the volume share at 25% of a trade
if volume_share == .25:
break
orders = [ x for x in orders if abs(x.amount - x.filled) > 0 and x.dt.day >= event.dt.day]
self.open_orders[event.sid] = orders
if simulated_amount != 0:
return self.create_transaction(
event.sid,
simulated_amount,
event.price + simulated_impact,
dt.replace(tzinfo = pytz.utc),
event.sid,
simulated_amount,
event.price + simulated_impact,
dt.replace(tzinfo = pytz.utc),
direction
)
elif len(orders) > 0:
warning = """
Calculated a zero volume transaction on trade:
{event}
for orders:
Calculated a zero volume transaction on trade:
{event}
for orders:
{orders}
"""
warning = warning.format(
event=str(event),
event=str(event),
orders=str(orders)
)
LOGGER.warn(warning)
return None
def create_transaction(self, sid, amount, price, dt, direction):
self.txn_count += 1
txn = {'sid' : sid,
'amount' : int(amount),
'dt' : dt,
'price' : price,
def create_transaction(self, sid, amount, price, dt, direction):
self.txn_count += 1
txn = {'sid' : sid,
'amount' : int(amount),
'dt' : dt,
'price' : price,
'commission' : self.commission * amount * direction,
'source_id' : zp.FINANCE_COMPONENT.TRANSACTION_SIM
}
return zp.ndict(txn)
return zp.ndict(txn)
class TradingEnvironment(object):
def __init__(
self,
benchmark_returns,
treasury_curves,
period_start = None,
period_end = None,
self,
benchmark_returns,
treasury_curves,
period_start = None,
period_end = None,
capital_base = None,
max_drawdown = None
):
self.trading_days = []
self.trading_day_map = {}
self.treasury_curves = treasury_curves
@@ -375,11 +199,11 @@ class TradingEnvironment(object):
self.capital_base = capital_base
self.period_trading_days = None
self.max_drawdown = max_drawdown
for bm in benchmark_returns:
self.trading_days.append(bm.date)
self.trading_day_map[bm.date] = bm
self.first_open = self.calculate_first_open()
self.last_close = self.calculate_last_close()
@@ -389,25 +213,25 @@ class TradingEnvironment(object):
"""
first_open = self.period_start
one_day = datetime.timedelta(days=1)
while not self.is_trading_day(first_open):
first_open = first_open + one_day
first_open = self.set_NYSE_time(first_open, 9, 30)
return first_open
def calculate_last_close(self):
"""
Finds the last trading day on or before self.period_end
"""
last_close = self.period_end
one_day = datetime.timedelta(days=1)
while not self.is_trading_day(last_close):
last_close = last_close - one_day
last_close = self.set_NYSE_time(last_close, 16, 00)
return last_close
#TODO: add other exchanges and timezones...
@@ -432,13 +256,13 @@ class TradingEnvironment(object):
day=test_date.day,
tzinfo=pytz.utc
)
@property
def days_in_period(self):
"""return the number of trading days within the period [start, end)"""
assert(self.period_start != None)
assert(self.period_end != None)
if self.period_trading_days == None:
self.period_trading_days = []
for date in self.trading_days:
@@ -446,18 +270,18 @@ class TradingEnvironment(object):
break
if date >= self.period_start:
self.period_trading_days.append(date)
return len(self.period_trading_days)
def is_market_hours(self, test_date):
if not self.is_trading_day(test_date):
return False
mkt_open = self.set_NYSE_time(test_date, 9, 30)
#TODO: half days?
mkt_close = self.set_NYSE_time(test_date, 16, 00)
return test_date >= mkt_open and test_date <= mkt_close
def is_trading_day(self, test_date):
@@ -470,6 +294,3 @@ class TradingEnvironment(object):
return self.trading_day_map[date].returns
else:
return 0.0
+2 -2
View File
@@ -68,7 +68,7 @@ from zipline.components import DataSource
from zipline.transforms import BaseTransform
from zipline.test_algorithms import TestAlgorithm
from zipline.finance.trading import TradeSimulationClient
from zipline.components import TradeSimulationClient
from zipline.core.devsimulator import Simulator
from zipline.core.monitor import Controller
from zipline.finance.trading import SIMULATION_STYLE
@@ -335,7 +335,7 @@ class SimulatedTrading(object):
#self.allocator.reaquire(*self.leased_sockets)
#--------------------------------
# Component property accessors
# Component property accessors
#--------------------------------
def get_positions(self):
+59 -51
View File
@@ -120,7 +120,7 @@ import datetime
import pytz
from collections import namedtuple
from utils.protocol_utils import Enum, FrameExceptionFactory, ndict
from utils.protocol_utils import Enum, FrameExceptionFactory, ndict, namelookup
from utils.date_utils import EPOCH, UN_EPOCH
# -----------------------
@@ -217,39 +217,39 @@ def DATASOURCE_FRAME(event):
"""
Wraps any datasource payload with id and type, so that unpacking may choose
the write UNFRAME for the payload.
:param event: ndict with following properties
- *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator)
- *ds_type* a string denoting the datasource type. Must be on of:
- TRADE
- (others to follow soon)
- *payload* a msgpack string carrying the payload for the frame
"""
assert isinstance(event.source_id, basestring)
assert isinstance(event.type, int), 'Unexpected type %s' % (event.type)
#datasources will send sometimes send empty msgs to feel gaps
if len(event.keys()) == 2:
return msgpack.dumps(tuple([
event.type,
event.source_id,
event.type,
event.source_id,
DATASOURCE_TYPE.EMPTY
]))
if(event.type == DATASOURCE_TYPE.TRADE):
return msgpack.dumps(tuple([
event.type,
event.source_id,
event.type,
event.source_id,
TRADE_FRAME(event)
]))
elif(event.type == DATASOURCE_TYPE.ORDER):
return msgpack.dumps(tuple([
event.type,
event.source_id,
event.type,
event.source_id,
ORDER_SOURCE_FRAME(event)
]))
else:
@@ -376,7 +376,7 @@ INVALID_MERGE_FRAME = FrameExceptionFactory('MERGE')
def MERGE_FRAME(event):
"""
:param event: a nameddict with at least:
- source_id
- type
"""
@@ -416,7 +416,7 @@ INVALID_ORDER_FRAME = FrameExceptionFactory('ORDER')
INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE')
# -----------------------
# Trades
# Trades
# -----------------------
#
# - Should only be called from inside DATASOURCE_ (UN)FRAME.
@@ -424,7 +424,7 @@ INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE')
def TRADE_FRAME(event):
"""
:param event: should be a ndict with:
- ds_id -- the datasource id sending this trade out
- sid -- the security id
- price -- float of the price printed for the trade
@@ -469,7 +469,7 @@ def TRADE_UNFRAME(msg):
raise INVALID_TRADE_FRAME(msg)
# -----------------------
# Orders
# Orders
# -----------------------
# - from client to order source
@@ -478,7 +478,7 @@ def ORDER_FRAME(order):
assert isinstance(order.amount, int) #no partial shares...
PACK_DATE(order)
return msgpack.dumps(tuple([
order.sid,
order.sid,
order.amount,
order.dt
]))
@@ -503,9 +503,9 @@ def ORDER_UNFRAME(msg):
# -----------------------
# TRANSACTIONS
# TRANSACTIONS
# -----------------------
#
#
# - Should only be called from inside TRANSFORM_(UN)FRAME.
@@ -550,7 +550,7 @@ def TRANSACTION_UNFRAME(msg):
# -----------------------
# ORDERS
# ORDERS
# -----------------------
#
# - from order source to feed
@@ -592,7 +592,7 @@ def ORDER_SOURCE_UNFRAME(msg):
raise INVALID_ORDER_FRAME(msg)
except ValueError:
raise INVALID_ORDER_FRAME(msg)
# -----------------------
# Performance and Risk
# -----------------------
@@ -607,21 +607,21 @@ def PERF_FRAME(perf):
:param perf: the dictionary created by zipline.trade_client.perf
:rvalue: a msgpack string
"""
#TODO: add asserts...
assert isinstance(perf['started_at'], datetime.datetime)
assert isinstance(perf['period_start'], datetime.datetime)
assert isinstance(perf['period_end'], datetime.datetime)
assert isinstance(perf['daily_perf'], dict)
assert isinstance(perf['cumulative_perf'], dict)
tp = perf['daily_perf']
cp = perf['cumulative_perf']
assert isinstance(tp['transactions'], list)
# we never want to send transactions for the cumulative period.
# we never want to send transactions for the cumulative period.
# performance.py should never send them, but just to be safe:
assert not cp.has_key('transactions')
assert isinstance(tp['positions'], list)
@@ -630,7 +630,7 @@ def PERF_FRAME(perf):
assert isinstance(tp['period_open'], datetime.datetime)
assert isinstance(cp['period_close'], datetime.datetime)
assert isinstance(cp['period_open'], datetime.datetime)
perf['started_at'] = EPOCH(perf['started_at'])
perf['period_start'] = EPOCH(perf['period_start'])
perf['period_end'] = EPOCH(perf['period_end'])
@@ -638,11 +638,11 @@ def PERF_FRAME(perf):
tp['period_open'] = EPOCH(tp['period_open'])
cp['period_close'] = EPOCH(cp['period_close'])
cp['period_open'] = EPOCH(cp['period_open'])
tp['transactions'] = convert_transactions(tp['transactions'])
return BT_UPDATE_FRAME('PERF', perf)
def convert_transactions(transactions):
results = []
for txn in transactions:
@@ -651,18 +651,18 @@ def convert_transactions(transactions):
del(txn['source_id'])
results.append(txn)
return results
def RISK_FRAME(risk):
return BT_UPDATE_FRAME('RISK', risk)
def BT_UPDATE_FRAME(prefix, payload):
"""
Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same
Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same
socket. This method provides a prefix to allow for muxing the messages
onto a single socket.
"""
return msgpack.dumps(tuple([prefix, payload]))
def BT_UPDATE_UNFRAME(msg):
"""
Risk and Perf framing methods prefix the payload with
@@ -675,23 +675,23 @@ def BT_UPDATE_UNFRAME(msg):
# -----------------------
# Date Helpers
# -----------------------
def PACK_DATE(event):
"""
Packs the datetime property of event into msgpack'able longs.
This function should be called purely for its side effects.
This function should be called purely for its side effects.
The event's 'dt' property is replaced by a tuple of integers
- year, month, day, hour, minute, second, microsecond
PACK_DATE and UNPACK_DATE are inverse operations.
PACK_DATE and UNPACK_DATE are inverse operations.
:param event: event must a ndict with a property named 'dt' that is a datetime.
:rtype: None
"""
assert isinstance(event.dt, datetime.datetime)
# utc only please
assert event.dt.tzinfo == pytz.utc
assert event.dt.tzinfo == pytz.utc
event['dt'] = date_to_tuple(event['dt'])
def date_to_tuple(dt):
@@ -702,18 +702,18 @@ def date_to_tuple(dt):
def UNPACK_DATE(event):
"""
Unpacks the datetime property of event from msgpack'able longs.
This function should be called purely for its side effects.
The event's 'dt' property is converted to a datetime by reading and then
This function should be called purely for its side effects.
The event's 'dt' property is converted to a datetime by reading and then
combining a tuple of integers.
UNPACK_DATE and PACK_DATE are inverse operations.
UNPACK_DATE and PACK_DATE are inverse operations.
:param tuple event: event must a ndict with:
- a property named 'dt_tuple' that is a tuple of integers \
representing the date and time in UTC.
representing the date and time in UTC.
- dt_tuple must have year, month, day, hour, minute, second, and microsecond
:rtype: None
"""
assert isinstance(event.dt, tuple)
@@ -721,13 +721,13 @@ def UNPACK_DATE(event):
for item in event.dt:
assert isinstance(item, numbers.Integral)
event.dt = tuple_to_date(event.dt)
def tuple_to_date(date_tuple):
year, month, day, hour, minute, second, micros = date_tuple
dt = datetime.datetime(year, month, day, hour, minute, second)
dt = dt.replace(microsecond = micros, tzinfo = pytz.utc)
return dt
DATASOURCE_TYPE = Enum(
'ORDER',
'TRADE',
@@ -748,7 +748,7 @@ TRANSFORM_TYPE = ndict({
})
FINANCE_COMPONENT = ndict({
FINANCE_COMPONENT = namelookup({
'TRADING_CLIENT' : 'TRADING_CLIENT',
'PORTFOLIO_CLIENT' : 'PORTFOLIO_CLIENT',
'ORDER_SOURCE' : 'ORDER_SOURCE',
@@ -756,3 +756,11 @@ FINANCE_COMPONENT = ndict({
})
# the simulation style enumerates the available transaction simulation
# strategies.
SIMULATION_STYLE = Enum(
'PARTIAL_VOLUME',
'BUY_ALL',
'FIXED_SLIPPAGE',
'NOOP'
)
+13 -13
View File
@@ -24,38 +24,38 @@ def parse_iso8061(date_string):
UNIX_EPOCH = datetime(1970, 1, 1, 0, 0, tzinfo = pytz.utc)
def EPOCH(utc_datetime):
"""
The key is to ensure all the dates you are using are in the utc timezone
before you start converting. See http://pytz.sourceforge.net/ to learn how
to do that properly. By normalizing to utc, you eliminate the ambiguity of
daylight savings transitions. Then you can safely use timedelta to calculate
The key is to ensure all the dates you are using are in the utc timezone
before you start converting. See http://pytz.sourceforge.net/ to learn how
to do that properly. By normalizing to utc, you eliminate the ambiguity of
daylight savings transitions. Then you can safely use timedelta to calculate
distance from the unix epoch, and then convert to seconds or milliseconds.
Note that the resulting unix timestamp is itself in the UTC timezone. If you
wish to see the timestamp in a localized timezone, you will need to make
Note that the resulting unix timestamp is itself in the UTC timezone. If you
wish to see the timestamp in a localized timezone, you will need to make
another conversion.
Also note that this will only work for dates after 1970.
"""
assert isinstance(utc_datetime, datetime)
# utc only please
assert utc_datetime.tzinfo == pytz.utc
# how long since the epoch?
delta = utc_datetime - UNIX_EPOCH
seconds = delta.total_seconds()
ms = seconds * 1000
return ms
def UN_EPOCH(ms_since_epoch):
seconds_since_epoch = ms_since_epoch / 1000
delta = timedelta(seconds = seconds_since_epoch)
dt = UNIX_EPOCH + delta
return dt
def iso8061_to_epoch(datestring):
dt = parse_iso8061(datestring)
return EPOCH(dt)
def epoch_now():
dt = datetime.utcnow().replace(tzinfo=pytz.utc)
return EPOCH(dt)
@@ -72,7 +72,7 @@ class utcdatetime(datetime):
return dt
# Datetime Calculations
# ---------------------
+16
View File
@@ -0,0 +1,16 @@
from textwrap import dedent
class CustomException(Exception):
argmap = {0: 'classname'}
def __init__(self, *args):
self.args = args
def format(self):
assert len(self.args) == len(self.argmap), \
"""Wrong number of arguments passed to custom exception %s.""" \
% self.__class__
return self.message.format(**dict(zip(self.argmap, self.args)))
def __str__(self):
return dedent(self.format()).strip('\n')