fixed up tests to abandon thread simulator.

This commit is contained in:
fawce
2012-07-24 16:43:02 -04:00
parent d0e987a8e8
commit ad10f2aa85
3 changed files with 109 additions and 51 deletions
+48
View File
@@ -0,0 +1,48 @@
from unittest2 import TestCase
from collections import defaultdict
from logbook.compat import LoggingHandler
from zipline.test_algorithms import ExceptionAlgorithm
from zipline.finance.trading import SIMULATION_STYLE
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
class FinanceTestCase(TestCase):
leased_sockets = defaultdict(list)
def setUp(self):
self.zipline_test_config = {
'allocator' : allocator,
'sid' : 133,
'devel' : True
}
self.log_handler = LoggingHandler()
self.log_handler.push_application()
def tearDown(self):
self.log_handler.pop_application()
def test_exception_in_init(self):
# Simulation
# ----------
self.zipline_test_config['simulation_style'] = \
SIMULATION_STYLE.FIXED_SLIPPAGE
self.zipline_test_config['algorithm'] = ExceptionAlgorithm('initialize')
self.zipline_test_config['devel'] = False
zipline = SimulatedTrading.create_test_zipline(
**self.zipline_test_config
)
zipline.simulate(blocking=True)
self.assertTrue(zipline.sim.ready())
self.assertTrue(zipline.sim.exception)
+4 -51
View File
@@ -21,13 +21,14 @@ from zipline.lines import SimulatedTrading
from zipline.finance.performance import PerformanceTracker
from zipline.utils.protocol_utils import ndict
from zipline.finance.trading import TransactionSimulator, SIMULATION_STYLE
from zipline.utils.test_utils import assert_single_position,\
drain_zipline
DEFAULT_TIMEOUT = 15 # seconds
EXTENDED_TIMEOUT = 90
allocator = AddressAllocator(1000)
class FinanceTestCase(TestCase):
leased_sockets = defaultdict(list)
@@ -113,28 +114,6 @@ class FinanceTestCase(TestCase):
self.assertTrue(env.last_close.month == 12)
self.assertTrue(env.last_close.day == 31)
def drain_zipline(self):
self.receiver = self.ctx.socket(zmq.PULL)
self.receiver.bind(self.zipline_test_config['results_socket'])
output = []
transaction_count = 0
while True:
msg = self.receiver.recv()
if msg == str(zp.CONTROL_PROTOCOL.DONE):
break
else:
update = zp.BT_UPDATE_UNFRAME(msg)
output.append(update)
if update['prefix'] == 'PERF':
transaction_count += \
len(update['payload']['daily_perf']['transactions'])
del self.receiver
return output, transaction_count
@timed(EXTENDED_TIMEOUT)
def test_full_zipline(self):
#provide enough trades to ensure all orders are filled.
@@ -143,33 +122,7 @@ class FinanceTestCase(TestCase):
zipline = SimulatedTrading.create_test_zipline(**self.zipline_test_config)
zipline.simulate(blocking=False)
output, transaction_count = self.drain_zipline()
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
self.assertEqual(
self.zipline_test_config['order_count'],
transaction_count
)
# the final message is the risk report, the second to
# last is the final day's results. Positions is a list of
# dicts.
closing_positions = output[-2]['payload']['daily_perf']['positions']
self.assertEqual(
len(closing_positions),
1,
"Portfolio should have one position."
)
sid = self.zipline_test_config['sid']
self.assertEqual(
closing_positions[0]['sid'],
sid,
"Portfolio should have one position in " + str(sid)
)
assert_single_position(self, zipline)
#@timed(DEFAULT_TIMEOUT)
def test_sid_filter(self):
@@ -194,7 +147,7 @@ class FinanceTestCase(TestCase):
zipline.simulate(blocking=False)
output, transaction_count = self.drain_zipline()
output, transaction_count = drain_zipline(self)
self.assertTrue(zipline.sim.ready())
self.assertFalse(zipline.sim.exception)
+57
View File
@@ -0,0 +1,57 @@
import zmq
import zipline.protocol as zp
def drain_zipline(test):
assert test.ctx, "method expects a valid zmq context"
assert test.zipline_test_config, "method expects a valid test config"
assert isinstance(test.zipline_test_config, dict)
assert test.zipline_test_config['results_socket'], \
"need to specify a socket address for logs/perf/risk"
test.receiver = test.ctx.socket(zmq.PULL)
test.receiver.bind(test.zipline_test_config['results_socket'])
output = []
transaction_count = 0
while True:
msg = test.receiver.recv()
if msg == str(zp.CONTROL_PROTOCOL.DONE):
break
else:
update = zp.BT_UPDATE_UNFRAME(msg)
output.append(update)
if update['prefix'] == 'PERF':
transaction_count += \
len(update['payload']['daily_perf']['transactions'])
del test.receiver
return output, transaction_count
def assert_single_position(test, zipline):
output, transaction_count = drain_zipline(test)
test.assertTrue(zipline.sim.ready())
test.assertFalse(zipline.sim.exception)
test.assertEqual(
test.zipline_test_config['order_count'],
transaction_count
)
# the final message is the risk report, the second to
# last is the final day's results. Positions is a list of
# dicts.
closing_positions = output[-2]['payload']['daily_perf']['positions']
test.assertEqual(
len(closing_positions),
1,
"Portfolio should have one position."
)
sid = test.zipline_test_config['sid']
test.assertEqual(
closing_positions[0]['sid'],
sid,
"Portfolio should have one position in " + str(sid)
)