mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 20:33:46 +08:00
Add delayed_signals class.
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
import os
|
||||
from signal import signal, SIGHUP, SIGINT
|
||||
import time
|
||||
from types import FrameType
|
||||
import unittest
|
||||
|
||||
from zipline.utils.delayed_signals import delayed_signals
|
||||
|
||||
class DelayedSignals(unittest.TestCase):
|
||||
def handler(self, signum, frame):
|
||||
print "Got signal " + str(signum)
|
||||
self.got[signum] = time.time()
|
||||
self.assertTrue(isinstance(frame, FrameType))
|
||||
|
||||
def setUp(self):
|
||||
signal(SIGHUP, self.handler)
|
||||
signal(SIGINT, self.handler)
|
||||
|
||||
def reset(self):
|
||||
self.got = {}
|
||||
|
||||
def test_delayed_signals(self):
|
||||
self.reset()
|
||||
with delayed_signals([SIGHUP]):
|
||||
os.kill(os.getpid(), SIGHUP)
|
||||
time.sleep(2)
|
||||
self.assertTrue(self.got[SIGHUP])
|
||||
self.assertTrue(time.time() - self.got[SIGHUP] < 2)
|
||||
|
||||
def test_immediate_signals(self):
|
||||
self.reset()
|
||||
os.kill(os.getpid(), SIGHUP)
|
||||
time.sleep(2)
|
||||
self.assertTrue(self.got[SIGHUP])
|
||||
self.assertTrue(time.time() - self.got[SIGHUP] > 1)
|
||||
|
||||
def test_multiple_signals(self):
|
||||
self.reset()
|
||||
with delayed_signals([SIGHUP, SIGINT]):
|
||||
os.kill(os.getpid(), SIGINT)
|
||||
self.assertFalse(SIGHUP in self.got)
|
||||
self.assertTrue(SIGINT in self.got)
|
||||
|
||||
@delayed_signals([SIGHUP])
|
||||
def kill_and_sleep(self):
|
||||
os.kill(os.getpid(), SIGHUP)
|
||||
time.sleep(2)
|
||||
|
||||
def test_decorator(self):
|
||||
self.reset()
|
||||
self.kill_and_sleep()
|
||||
self.assertTrue(SIGHUP in self.got)
|
||||
self.assertTrue(time.time() - self.got[SIGHUP] < 2)
|
||||
@@ -0,0 +1,42 @@
|
||||
from functools import wraps
|
||||
from signal import signal
|
||||
|
||||
class delayed_signals(object):
|
||||
"""
|
||||
Utility to temporary intercept one or more signals while a function or code
|
||||
block is executed, restore their signal handlers at the end of execution,
|
||||
and invoke them if the signals were in fact received during execution.
|
||||
|
||||
Can be used either as a decorator or a context manager.
|
||||
|
||||
Pass in an iterable of signals to intercept.
|
||||
"""
|
||||
|
||||
def handler(self, signum, frame=None):
|
||||
self.got.append([self.trapped.index(signum), frame])
|
||||
|
||||
def __init__(self, signals):
|
||||
self.trapped = signals
|
||||
self.orig_handlers = []
|
||||
self.got = []
|
||||
|
||||
def __enter__(self):
|
||||
for sig in self.trapped:
|
||||
self.orig_handlers.append(signal(sig, self.handler))
|
||||
|
||||
def __exit__(self, time, value, traceback):
|
||||
for i in xrange(len(self.trapped)):
|
||||
signal(self.trapped[i], self.orig_handlers[i])
|
||||
for intercepted in self.got:
|
||||
i = intercepted[0]
|
||||
signum = self.trapped[i]
|
||||
frame = intercepted[1]
|
||||
self.orig_handlers[i](signum, frame)
|
||||
|
||||
def __call__(self, fn):
|
||||
@wraps(fn)
|
||||
def call_fn(*args, **kwargs):
|
||||
with self:
|
||||
outval = fn(*args, **kwargs)
|
||||
return outval
|
||||
return call_fn
|
||||
Reference in New Issue
Block a user