mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-06 05:14:38 +08:00
devel flag on noop monitored system.
This commit is contained in:
@@ -97,6 +97,7 @@ class Component(object):
|
||||
self.stop_tic = None
|
||||
self.note = None
|
||||
self.confirmed = False
|
||||
self.devel = False
|
||||
|
||||
# Humanhashes make this way easier to debug because they stick
|
||||
# in your mind unlike a 32 byte string of random hex.
|
||||
|
||||
@@ -24,6 +24,13 @@ def do_handle_control_events(cls, poller):
|
||||
assert isinstance(cls, Component)
|
||||
assert cls.control_in, 'Component does not have a control_in socket'
|
||||
|
||||
# If we're in devel mode drop out because the controller
|
||||
# isn't guaranteed to be around anymore
|
||||
if cls.devel:
|
||||
import logbook
|
||||
logbook.info("Dropping out")
|
||||
return
|
||||
|
||||
if poller.get(cls.control_in) == zmq.POLLIN:
|
||||
msg = cls.control_in.recv()
|
||||
event, payload = CONTROL_UNFRAME(msg)
|
||||
|
||||
+18
-2
@@ -67,8 +67,9 @@ class Controller(object):
|
||||
debug = True
|
||||
period = PARAMETERS.GENERATIONAL_PERIOD
|
||||
|
||||
def __init__(self, pub_socket, route_socket, nosignals=False):
|
||||
def __init__(self, pub_socket, route_socket, devel=True):
|
||||
|
||||
self.devel = devel
|
||||
self.nosignals = False
|
||||
self.context = None
|
||||
self.zmq = None
|
||||
@@ -93,6 +94,8 @@ class Controller(object):
|
||||
|
||||
self.error_replay = OrderedDict()
|
||||
|
||||
log.warn("Running Controller in development mode, will ONLY synchronize start.")
|
||||
|
||||
def init_zmq(self, flavor):
|
||||
|
||||
assert self.zmq_flavor in ['thread', 'mp', 'green']
|
||||
@@ -101,6 +104,8 @@ class Controller(object):
|
||||
self.zmq = zmq
|
||||
self.context = self.zmq.Context()
|
||||
self.zmq_poller = self.zmq.Poller
|
||||
|
||||
log.warning("USING DEVELOPMENT MODE IN MP CONTEXT NOT RECOMMENDED")
|
||||
return
|
||||
if flavor == 'thread':
|
||||
self.zmq = zmq
|
||||
@@ -319,6 +324,17 @@ class Controller(object):
|
||||
if complete:
|
||||
self.send_go()
|
||||
|
||||
# If we're running in development stop here
|
||||
# because our responsibilites are over. The
|
||||
# zipline will either run to completion or die,
|
||||
# monitor doesn't care anymore because its all
|
||||
# threads.
|
||||
|
||||
if self.devel:
|
||||
log.warn("Shutting down Controller because in devel mode")
|
||||
sys.exitfunc = lambda: None
|
||||
self.shutdown(soft=True)
|
||||
|
||||
log.info('Heartbeat (%s, %s)' % (done, complete))
|
||||
|
||||
# ================
|
||||
@@ -338,7 +354,7 @@ class Controller(object):
|
||||
sys.exitfunc = lambda: None
|
||||
|
||||
# Send SIGHUP to buritto
|
||||
#self.signal_hangup()
|
||||
self.signal_hangup()
|
||||
|
||||
if not self.alive:
|
||||
break
|
||||
|
||||
+29
-1
@@ -60,6 +60,7 @@ before invoking simulate.
|
||||
+---------------------------------+
|
||||
"""
|
||||
|
||||
import inspect
|
||||
import zipline.utils.factory as factory
|
||||
|
||||
from zipline.components import DataSource
|
||||
@@ -113,6 +114,8 @@ class SimulatedTrading(object):
|
||||
self.trading_environment = config['trading_environment']
|
||||
self.sim_style = config.get('simulation_style')
|
||||
|
||||
self.devel = config['devel']
|
||||
|
||||
self.leased_sockets = []
|
||||
self.sim_context = None
|
||||
|
||||
@@ -129,6 +132,7 @@ class SimulatedTrading(object):
|
||||
self.con = Controller(
|
||||
sockets[6],
|
||||
sockets[7],
|
||||
devel = self.devel
|
||||
)
|
||||
|
||||
self.con.cancel_socket = self.allocator.lease(1)[0]
|
||||
@@ -260,12 +264,20 @@ class SimulatedTrading(object):
|
||||
'trading_environment' : trading_environment,
|
||||
'allocator' : allocator,
|
||||
'simulator_class' : simulator_class,
|
||||
'simulation_style' : simulation_style
|
||||
'simulation_style' : simulation_style,
|
||||
'devel' : config.get('devel', False)
|
||||
})
|
||||
#-------------------
|
||||
|
||||
zipline.add_source(trade_source)
|
||||
|
||||
# Save us from needless debugging
|
||||
inside_test = 'nose' in inspect.stack()[-1][1]
|
||||
if inside_test and not config.get('devel', False):
|
||||
assert False, """
|
||||
You need to run the SimulatedTrading inside a test with devel=True
|
||||
"""
|
||||
|
||||
return zipline
|
||||
|
||||
def add_source(self, source):
|
||||
@@ -318,6 +330,18 @@ class SimulatedTrading(object):
|
||||
|
||||
return leased
|
||||
|
||||
@property
|
||||
def components(self):
|
||||
"""
|
||||
Return the component instances inside of this topology
|
||||
"""
|
||||
|
||||
base = set(self.sim.components.values())
|
||||
transforms = set(self.transforms.values())
|
||||
sources = set(self.sources.values())
|
||||
|
||||
return base | transforms | sources
|
||||
|
||||
@property
|
||||
def topology(self):
|
||||
"""
|
||||
@@ -352,6 +376,10 @@ class SimulatedTrading(object):
|
||||
self.started = True
|
||||
self.sim_context = self.sim.simulate()
|
||||
|
||||
if self.devel:
|
||||
for component in self.components:
|
||||
component.devel = True
|
||||
|
||||
# If we're using a threaded simulator block on the pool
|
||||
# of thread since we're only ever in a test and we don't
|
||||
# generally monitor the state of the system as a hold at
|
||||
|
||||
Reference in New Issue
Block a user