Merge branch 'master' of github.com:quantopian/zipline into fix_infinite_risk

This commit is contained in:
scottsanderson
2012-08-22 02:50:49 -04:00
5 changed files with 97 additions and 1 deletions
+53
View File
@@ -0,0 +1,53 @@
import os
from signal import signal, SIGHUP, SIGINT
import time
from types import FrameType
import unittest
from zipline.utils.delayed_signals import delayed_signals
class DelayedSignals(unittest.TestCase):
def handler(self, signum, frame):
print "Got signal " + str(signum)
self.got[signum] = time.time()
self.assertTrue(isinstance(frame, FrameType))
def setUp(self):
signal(SIGHUP, self.handler)
signal(SIGINT, self.handler)
def reset(self):
self.got = {}
def test_delayed_signals(self):
self.reset()
with delayed_signals([SIGHUP]):
os.kill(os.getpid(), SIGHUP)
time.sleep(2)
self.assertTrue(self.got[SIGHUP])
self.assertTrue(time.time() - self.got[SIGHUP] < 2)
def test_immediate_signals(self):
self.reset()
os.kill(os.getpid(), SIGHUP)
time.sleep(2)
self.assertTrue(self.got[SIGHUP])
self.assertTrue(time.time() - self.got[SIGHUP] > 1)
def test_multiple_signals(self):
self.reset()
with delayed_signals([SIGHUP, SIGINT]):
os.kill(os.getpid(), SIGINT)
self.assertFalse(SIGHUP in self.got)
self.assertTrue(SIGINT in self.got)
@delayed_signals([SIGHUP])
def kill_and_sleep(self):
os.kill(os.getpid(), SIGHUP)
time.sleep(2)
def test_decorator(self):
self.reset()
self.kill_and_sleep()
self.assertTrue(SIGHUP in self.got)
self.assertTrue(time.time() - self.got[SIGHUP] < 2)
-1
View File
@@ -20,7 +20,6 @@ from zipline.finance.performance import PerformanceTracker
from zipline.utils.protocol_utils import ndict
from zipline.finance.trading import TransactionSimulator
from zipline.utils.test_utils import \
drain_zipline, \
setup_logger, \
teardown_logger,\
assert_single_position
+2
View File
@@ -171,6 +171,8 @@ class SimulatedTrading(object):
def close(self):
log.info("Closing Simulation: {id}".format(id=self.sim_id))
if self.results_socket:
self.results_socket.close()
if self.proc and self.send_sighup:
ppid = os.getppid()
if self.success:
+40
View File
@@ -0,0 +1,40 @@
from functools import wraps
from signal import signal
class delayed_signals(object):
"""
Utility to temporary intercept one or more signals while a function or code
block is executed, restore their signal handlers at the end of execution,
and invoke them if the signals were in fact received during execution.
Can be used either as a decorator or a context manager.
Pass in an iterable of signals to intercept.
"""
def handler(self, signum, frame=None):
self.got.append({'signum': signum, 'frame': frame})
def __init__(self, signals):
self.signals = signals
self.handlers = {}
self.got = []
def __enter__(self):
for signum in self.signals:
# signal() returns the old signal handler
self.handlers[signum] = signal(signum, self.handler)
def __exit__(self, time, value, traceback):
for signum, handler in self.handlers.items():
signal(signum, handler)
for signum, frame in ((i['signum'], i['frame']) for i in self.got):
self.handlers[signum](signum, frame)
def __call__(self, fn):
@wraps(fn)
def call_fn(*args, **kwargs):
with self:
outval = fn(*args, **kwargs)
return outval
return call_fn
+2
View File
@@ -15,6 +15,7 @@ def setup_logger(test, path='/var/log/zipline/zipline.log'):
def teardown_logger(test):
test.log_handler.pop_application()
test.log_handler.close()
def check_list(test, a, b, label):
test.assertTrue(isinstance(a, (list, blist.blist)))
@@ -119,6 +120,7 @@ def drain_receiver(receiver, count=None):
def assert_single_position(test, zipline, blocking=False):
output, transaction_count = drain_zipline(test, zipline, p_blocking=blocking)
test.assertEqual(output[-1]['prefix'], 'DONE')
test.assertEqual(
test.zipline_test_config['order_count'],