generator backed component, and a starter test for a source.

This commit is contained in:
fawce
2012-08-01 23:41:44 -04:00
parent 0b0f062e1b
commit dd1056bf30
11 changed files with 324 additions and 352 deletions
+35 -10
View File
@@ -1,10 +1,10 @@
import zmq
import pytz
from datetime import datetime, timedelta
from unittest2 import TestCase
from collections import defaultdict
from zipline.test_algorithms import ExceptionAlgorithm, DivByZeroAlgorithm
from zipline.finance.trading import SIMULATION_STYLE
from zipline.core.devsimulator import AddressAllocator
from zipline.lines import SimulatedTrading
@@ -15,18 +15,21 @@ from zipline.utils.test_utils import (
setup_logger,
teardown_logger,
launch_component,
gen_from_socket
create_monitor,
launch_monitor
)
from zipline.core import Component
from zipline.core.component import ComponentSocketArgs
from zipline.protocol import (
DATASOURCE_FRAME
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
from zipline.gens.tradegens import SpecificEquityTrades
from zipline.gens.utils import hash_args
from zipline.gens.zmqgen import gen_from_poller
import logbook
log = logbook.Logger('ComponentTestCase')
@@ -54,28 +57,50 @@ class ComponentTestCase(TestCase):
teardown_logger(self)
def test_specific_equity_source(self):
filter = [1,2,3,4]
#Set up source a. One minute between events.
args_a = tuple()
kwargs_a = {
'sids' : [1,2],
'start' : datetime(2012,6,6,0),
'start' : datetime(2012,6,6,0,tzinfo=pytz.utc),
'delta' : timedelta(minutes = 1),
'filter' : filter
'filter' : filter,
'count' : 100
}
c_id = SpecificEquityTrades.__name__ + hash_args(args_a, kwargs_a)
mon = create_monitor(allocator)
out_socket_args = ComponentSocketArgs(
style=zmq.PUSH,
uri=allocator.lease(1)[0],
bind=True
)
c = Component(
SpecificEquityTrades,
args_a,
kwargs_a,
out_uri=self.out_uri,
frame=DATASOURCE_FRAME,
monitor_uri=None
c_id,
out_socket_args,
DATASOURCE_FRAME,
mon
)
mon.manage(set([c.get_id]))
mon_proc = launch_monitor(mon)
# launch in a process
proc = launch_component(c)
for msg in gen_from_socket(self.out_uri):
pull_socket = self.ctx.socket(zmq.PULL)
pull_socket.connect(out_socket_args.uri)
poller = zmq.Poller()
poller.register(pull_socket, zmq.POLLIN)
unframe = DATASOURCE_UNFRAME
for msg in gen_from_poller(poller, pull_socket, unframe):
# assert things about the messages.
log.info(msg)
pull_socket.close()
log.info("DONE!")
+5 -5
View File
@@ -1,7 +1,7 @@
from zipline.utils.test_utils import setup_logger, teardown_logger
from unittest2 import TestCase, skip
from zipline.core.monitor import Controller
from zipline.core.monitor import Monitor
class TestMonitor(TestCase):
def setUp(self):
@@ -15,12 +15,12 @@ class TestMonitor(TestCase):
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
con = Controller(pub_socket, route_socket)
con.manage([])
mon = Monitor(pub_socket, route_socket)
mon.manage([])
def test_init_topology(self):
pub_socket = 'tcp://127.0.0.1:5000'
route_socket = 'tcp://127.0.0.1:5001'
con = Controller(pub_socket, route_socket, )
con.manage([ 'a', 'b', 'c', 'd' ])
mon = Monitor(pub_socket, route_socket, )
mon.manage([ 'a', 'b', 'c', 'd' ])
+2 -2
View File
@@ -6,14 +6,14 @@ Zipline
# it is a place to expose the public interfaces.
import protocol # namespace
from core.monitor import Controller
from core.monitor import Monitor
from lines import SimulatedTrading
from core.host import ComponentHost
from utils.protocol_utils import ndict
__all__ = [
SimulatedTrading,
Controller,
Monitor,
ComponentHost,
protocol,
ndict
+2 -2
View File
@@ -1,9 +1,9 @@
from host import ComponentHost
from component import Component
from monitor import Controller
from monitor import Monitor
__all__ = [
Component,
Controller,
Monitor,
ComponentHost
]
+217 -297
View File
@@ -16,6 +16,8 @@ from collections import namedtuple
# pyzmq
import zmq
from zipline.gens.zmqgen import gen_from_poller
from zipline.core.monitor import PARAMETERS
from zipline.protocol import (
@@ -38,40 +40,6 @@ ComponentSocketArgs = namedtuple('ComponentSocket',['uri','style','bind'])
class Component(object):
"""
Base class for components. Defines the the base messaging
interface for components.
:param addresses: a dict of name_string -> zmq port address strings.
Must have the following entries
:param data_address: socket address used for data sources to stream
their records. Will be used in PUSH/PULL sockets
between data sources and a Feed. Bind will always
be on the PULL side (we always have N producers and
1 consumer)
:param feed_address: socket address used to publish consolidated feed
from serialization of data sources
will be used in PUB/SUB sockets between Feed and
Transforms. Bind is always on the PUB side.
:param merge_address: socket address used to publish transformed
values. will be used in PUSH/PULL from many
transforms to one Merge Bind will always be on
the PULL side (we always have N producers and
1 consumer)
:param results_address: socket address used to publish merged data
source feed and transforms to clients will be
used in PUB/SUB from one Merge to one or many
clients. Bind is always on the PUB side.
bind/connect methods will return the correct socket type for each
address.
"""
# ------------
# Construction
# ------------
@@ -82,8 +50,10 @@ class Component(object):
gen_kwargs,
component_id,
out_socket_args,
controller=None,
in_socket_args=None
frame,
monitor,
in_socket_args=None,
unframe=None
):
assert component_id, \
@@ -97,11 +67,6 @@ class Component(object):
assert isinstance(in_socket_args, ComponentSocketArgs), \
"in_socket_args args must be ComponentSocketArgs"
if monitor_socket_args:
assert isinstance(monitor_socket_args, ComponentSocketArgs), \
"monitor_socket_args args must be ComponentSocketArgs"
# -----------------
# Generator
# -----------------
@@ -110,6 +75,7 @@ class Component(object):
self.gen_kwargs = gen_kwargs
self.gen_func = gen_func
self.generator = None
self.frame = frame
# lock for waiting on monitor "GO"
self.waiting = None
@@ -123,11 +89,13 @@ class Component(object):
self.context = None
self.out_socket = None
self.in_socket = None
self.controller = controller
self.monitor = monitor
self.unframe = unframe
# TODO: state_flag is deprecated, remove
self.state_flag = COMPONENT_STATE.OK
# track time of last ping we received from monitor
self.last_ping = None
# Humanhashes make this way easier to debug because they stick
@@ -140,70 +108,6 @@ class Component(object):
# Core Methods
# ------------
def open(self):
"""
Open the connections needed to start doing work.
Perform any setup that must be done within process.
"""
# The process title so you can watch it in top, ps.
setproctitle(self.generator.__name__)
if self.in_socket_args:
self.in_socket = self.open_socket(self.in_socket_args)
poller_gen = self.gen_from_zmq(self.in_socket)
self.gen_func(poller_gen, *self.gen_args, **self.gen_kwargs)
else:
self.generator = self.gen_func(*self.gen_args, **self.gen_kwargs)
self.out_socket = self.open_socket(self.out_socket_args)
def open_socket(self, sock_args):
if sock_args.bind:
return self.bind_socket(sock_args)
else:
return self.connect_socket(sock_args)
def bind_socket(self, sock_args):
if sock_args.style == zmq.PULL:
return self.bind_pull_socket(sock_args.uri)
if sock_args.style == zmq.PUSH:
return self.bind_push_socket(sock_args.uri)
if sock_args.style == zmq.PUB:
return self.bind_pub_socket(sock_args.uri)
raise Exception("Invalid socket arguments")
def connect_socket(self, sock_args):
if sock_args.style == zmq.PULL:
return self.connect_pull_socket(sock_args.uri)
if sock_args.style == zmq.PUSH:
return self.connect_push_socket(sock_args.uri)
if sock_args.style == zmq.SUB:
return self.connect_sub_socket(sock_args.uri)
raise Exception("Invalid socket arguments")
def ready(self):
"""
Return ``True`` if and only if the component has finished
execution.
"""
return self.state_flag in [COMPONENT_STATE.DONE, \
COMPONENT_STATE.EXCEPTION]
def successful(self):
"""
Return ``True`` if and only if the component has finished
execution successfully, that is, without raising an error.
"""
return self.state_flag == COMPONENT_STATE.DONE and not \
self.exception
def init_zmq(self):
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
return
def _run(self):
"""
@@ -212,17 +116,15 @@ class Component(object):
The core logic of the all components is run here.
"""
# The process title so you can watch it in top, ps.
setproctitle(self.gen_func.__name__)
log.info("Start %r" % self)
log.info("Pid %s" % os.getpid())
log.info("Group %s" % os.getpgrp())
self.done = False # TODO: use state flag
self.sockets = []
self.init_zmq()
self.setup_poller()
self.setup_control()
self.open()
self.signal_ready()
@@ -232,10 +134,14 @@ class Component(object):
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the controller signals GO
# ... until the monitor signals GO
self.loop()
for event in self.generator:
self.heartbeat()
msg = self.frame(event)
self.out_socket.send(msg)
self.signal_done()
def run(self, catch_exceptions=True):
"""
@@ -249,108 +155,10 @@ class Component(object):
else:
# if we get a kill signal, forcibly close all the
# sockets.
# exc_info = sys.exc_info()
# self.relay_exception(exc_info[0], exc_info[1], exc_info[2])
self.teardown_sockets()
finally:
log.info("Exiting %r" % self)
def working(self):
"""
Controls when the work loop will start and end
If we encounter an exception or signal done exit.
Overload for higher order behavior.
"""
return (not self.done)
def loop(self, lockstep=True):
"""
Loop to do work while we still have work to do.
"""
for event in self.generator:
self.heartbeat()
msg = self.frame(event)
self.out_socket.send(msg)
def heartbeat(self, timeout=0):
# wait for synchronization reply from the host
socks = dict(self.poll.poll(timeout))
# ----------------
# Control Dispatch
# ----------------
assert self.control_in, 'Component does not have a control_in socket'
if socks.get(self.control_in) == zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# ===========
# Heartbeat
# ===========
# The controller will send out a single number packed in
# a CONTROL_FRAME with ``heartbeat`` event every
# (n)-seconds. The component then has n seconds to
# respond to it. If not then it will be considered as
# malfunctioning or maybe CPU bound.
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.last_ping = float(payload)
# Echo back the heartbeat identifier to tell the
# controller that this component is still alive and
# doing work
self.control_out.send(heartbeat_frame)
# =========
# Soft Kill
# =========
# Try and clean up properly and send out any reports or
# data that are done during a clean shutdown. Inform the
# controller that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
# =========
# Hard Kill
# =========
# Just exit.
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
# In case we didn't receive a ping, send a pre-emptive
# pong to the monitor.
elif self.last_ping and time.time() - self.last_ping > 1:
# send a ping ahead of schedule
pre_pong = time.time()
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
str(pre_pong)
)
# Echo back the heartbeat identifier to tell the
# controller that this component is still alive and
# doing work
self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK)
self.last_ping = pre_pong
elif self.last_ping and \
time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
# monitor is gone without sending the shutdown
# signal, do a hard exit.
self.kill()
# ----------------------------
# Cleanup & Modes of Failure
@@ -375,6 +183,62 @@ class Component(object):
"""
raise KillSignal()
def signal_exception(self, exc=None, scope=None):
"""
All exceptions inside any component should boil back to
this handler.
Will inform the system that the component has failed and how it
has failed.
"""
self.state_flag = COMPONENT_STATE.EXCEPTION
exc_type, exc_value, exc_traceback = sys.exc_info()
# if a downstream component fails, this component may try
# sending when there are zero connections to the socket,
# which will raise ZMQError(EAGAIN). So, it doesn't make
# sense to relay this exception to Monitor and the rest
# of the zipline.
if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN:
log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\
.format(id=self.get_id))
return
# sys.stdout.write(trace)
log.exception("Unexpected error in run for {id}.".format(id=self.get_id))
try:
log.info('{id} sending exception to monitor'\
.format(id=self.get_id))
msg = EXCEPTION_FRAME(
exc_traceback,
exc_type.__name__,
exc_value.message
)
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
msg
)
self.control_out.send(exception_frame, self.zmq.NOBLOCK)
# The monitor should relay the exception back
# to all zipline components. Wait here until the
# notice arrives, and we can assume other zipline
# components have broken out of their message
# loops.
for i in xrange(PARAMETERS.MAX_COMPONENT_WAIT):
self.heartbeat(timeout=1000)
log.warn("{id} never heard back from monitor."\
.format(id=self.get_id))
except KillSignal:
log.info("{id} received confirmation from monitor"\
.format(id=self.get_id))
except:
log.exception("Exception waiting for monitor reply")
# ----------------------
# Internal Maintenance
# ----------------------
@@ -418,7 +282,7 @@ class Component(object):
# Go
# ====
# A distributed lock from the controller to ensure
# A distributed lock from the monitor to ensure
# synchronized start.
if event == CONTROL_PROTOCOL.HEARTBEAT:
@@ -430,7 +294,7 @@ class Component(object):
log.info('Prestart Heartbeat ' + self.get_id)
elif event == CONTROL_PROTOCOL.GO:
# Side effectful call from the controller to unlock
# Side effectful call from the monitor to unlock
# and begin doing work only when the entire topology
# of the system beings to come online
log.info('Unlocking ' + self.__class__.__name__)
@@ -442,7 +306,7 @@ class Component(object):
# Try and clean up properly and send out any reports or
# data that are done during a clean shutdown. Inform the
# controller that we're done.
# monitor that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
break
@@ -462,6 +326,82 @@ class Component(object):
self.kill()
break
def heartbeat(self, timeout=0):
# wait for synchronization reply from the host
socks = dict(self.poll.poll(timeout))
# ----------------
# Control Dispatch
# ----------------
assert self.control_in, 'Component does not have a control_in socket'
if socks.get(self.control_in) == zmq.POLLIN:
msg = self.control_in.recv()
event, payload = CONTROL_UNFRAME(msg)
# ===========
# Heartbeat
# ===========
# The monitor will send out a single number packed in
# a CONTROL_FRAME with ``heartbeat`` event every
# (n)-seconds. The component then has n seconds to
# respond to it. If not then it will be considered as
# malfunctioning or maybe CPU bound.
if event == CONTROL_PROTOCOL.HEARTBEAT:
# Heart outgoing
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
payload
)
self.last_ping = float(payload)
# Echo back the heartbeat identifier to tell the
# monitor that this component is still alive and
# doing work
self.control_out.send(heartbeat_frame)
# =========
# Soft Kill
# =========
# Try and clean up properly and send out any reports or
# data that are done during a clean shutdown. Inform the
# monitor that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
# =========
# Hard Kill
# =========
# Just exit.
elif event == CONTROL_PROTOCOL.KILL:
self.kill()
# In case we didn't receive a ping, send a pre-emptive
# pong to the monitor.
elif self.last_ping and time.time() - self.last_ping > 1:
# send a ping ahead of schedule
pre_pong = time.time()
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.OK,
str(pre_pong)
)
# Echo back the heartbeat identifier to tell the
# monitor that this component is still alive and
# doing work
self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK)
self.last_ping = pre_pong
elif self.last_ping and \
time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
# monitor is gone without sending the shutdown
# signal, do a hard exit.
self.kill()
def signal_ready(self):
log.info(self.__class__.__name__ + ' is ready')
@@ -471,60 +411,6 @@ class Component(object):
)
self.control_out.send(frame)
def signal_exception(self, exc=None, scope=None):
"""
All exceptions inside any component should boil back to
this handler.
Will inform the system that the component has failed and how it
has failed.
"""
self.state_flag = COMPONENT_STATE.EXCEPTION
exc_type, exc_value, exc_traceback = sys.exc_info()
# if a downstream component fails, this component may try
# sending when there are zero connections to the socket,
# which will raise ZMQError(EAGAIN). So, it doesn't make
# sense to relay this exception to Monitor and the rest
# of the zipline.
if isinstance(exc, zmq.ZMQError) and exc.errno == zmq.EAGAIN:
log.warn("{id} raised a ZMQError(EAGAIN) not relaying"\
.format(id=self.get_id))
return
# sys.stdout.write(trace)
log.exception("Unexpected error in run for {id}.".format(id=self.get_id))
if hasattr(self, 'control_out') and self.control_out:
try:
log.info('{id} sending exception to controller'\
.format(id=self.get_id))
msg = EXCEPTION_FRAME(
exc_traceback,
exc_type.__name__,
exc_value.message
)
exception_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.EXCEPTION,
msg
)
self.control_out.send(exception_frame, self.zmq.NOBLOCK)
# The controller should relay the exception back
# to all zipline components. Wait here until the
# notice arrives, and we can assume other zipline
# components have broken out of their message
# loops.
for i in xrange(PARAMETERS.MAX_COMPONENT_WAIT):
self.heartbeat(timeout=1000)
log.warn("{id} never heard back from monitor."\
.format(id=self.get_id))
except KillSignal:
log.info("{id} received confirmation from controller"\
.format(id=self.get_id))
except:
log.exception("Exception waiting for controller reply")
def signal_done(self):
"""
Notify down stream components that we're done.
@@ -534,20 +420,18 @@ class Component(object):
# notify internal work loop that we're done
self.done = True # TODO: use state flag
if hasattr(self, 'out_socket') and self.out_socket:
msg = zmq.Message(str(CONTROL_PROTOCOL.DONE))
self.out_socket.send(msg)
msg = zmq.Message(str(CONTROL_PROTOCOL.DONE))
self.out_socket.send(msg)
if hasattr(self, 'control_out'):
# notify controller we're done
done_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.DONE,
''
)
# notify monitor we're done
done_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.DONE,
''
)
self.control_out.send(done_frame)
log.info("[%s] sent control done" % self.get_id)
self.control_out.send(done_frame)
log.info("[%s] sent control done" % self.get_id)
# there is a narrow race condition where we finish just
# after the Monitor accepts our prior heartbeat, but just
@@ -559,12 +443,60 @@ class Component(object):
# Messaging
# -----------
def setup_poller(self):
def open(self):
"""
Setup the poller used for multiplexing the incoming data
handling sockets.
Open the connections needed to start doing work.
Perform any setup that must be done within process.
"""
self.poll = self.zmq_poller()
self.zmq = zmq
self.context = self.zmq.Context()
self.poll = self.zmq.Poller()
self.setup_control()
if self.in_socket_args:
self.in_socket = self.open_socket(self.in_socket_args)
poller_gen = gen_from_poller(
self.poller,
self.in_socket,
self.unframe
)
self.generator = self.gen_func(
poller_gen,
*self.gen_args,
**self.gen_kwargs
)
else:
self.generator = self.gen_func(*self.gen_args, **self.gen_kwargs)
self.out_socket = self.open_socket(self.out_socket_args)
def open_socket(self, sock_args):
if sock_args.bind:
return self.bind_socket(sock_args)
else:
return self.connect_socket(sock_args)
def bind_socket(self, sock_args):
if sock_args.style == zmq.PULL:
return self.bind_pull_socket(sock_args.uri)
if sock_args.style == zmq.PUSH:
return self.bind_push_socket(sock_args.uri)
if sock_args.style == zmq.PUB:
return self.bind_pub_socket(sock_args.uri)
raise Exception("Invalid socket arguments")
def connect_socket(self, sock_args):
if sock_args.style == zmq.PULL:
return self.connect_pull_socket(sock_args.uri)
if sock_args.style == zmq.PUSH:
return self.connect_push_socket(sock_args.uri)
if sock_args.style == zmq.SUB:
return self.connect_sub_socket(sock_args.uri)
raise Exception("Invalid socket arguments")
def bind_push_socket(self, addr):
push_socket = self.context.socket(self.zmq.PUSH)
@@ -623,12 +555,12 @@ class Component(object):
of the simulation and to forcefully tear down the simulation in
case of a failure.
"""
self.control_out = self.controller.message_sender(
self.control_out = self.monitor.message_sender(
identity = self.get_id,
context = self.context,
)
self.control_in = self.controller.message_listener(
self.control_in = self.monitor.message_listener(
context = self.context
)
@@ -639,18 +571,6 @@ class Component(object):
# Description and Debug
# ---------------------
def extern_logger(self):
"""
Pipe logs out to a provided logging interface.
"""
pass
def setup_extern_logger(self):
"""
Pipe logs out to a provided logging interface.
"""
pass
@property
def get_id(self):
"""
+1 -1
View File
@@ -103,7 +103,7 @@ class ComponentHost(object):
log.info('== Roll Call ==')
log.info('Controller')
log.info('Monitor')
self.launch_controller()
+2 -2
View File
@@ -38,7 +38,7 @@ class UnknownChatter(Exception):
return """Component calling itself "%s" talking on unexpected channel""" % self.named
log = logbook.Logger('Controller')
log = logbook.Logger('Monitor')
# The scalars determining the timing of the monitor behavior for
# the system.
@@ -56,7 +56,7 @@ PARAMETERS = ndict(dict(
SYSTEM_TIMEOUT = 50,
))
class Controller(object):
class Monitor(object):
"""
A N to M messaging system for inter component communication.
+4 -4
View File
@@ -40,13 +40,13 @@ class ProcessSimulator(ComponentHost):
# invoked by the host's open()
def launch_controller(self):
proc = multiprocessing.Process(target=self.controller.run)
proc = multiprocessing.Process(target=self.monitor.run)
proc.start()
self.con = proc
# Process specific
self.controller_process = proc
self.mapping[proc.pid] = 'Controller'
self.monitor_process = proc
self.mapping[proc.pid] = 'Monitor'
def launch_component(self, component):
proc = multiprocessing.Process(target=component.run)
@@ -81,7 +81,7 @@ class ProcessSimulator(ComponentHost):
process.join(timeout=1)
process.terminate()
self.controller.shutdown(soft=True)
self.monitor.shutdown(soft=True)
self.running = False
self.con.terminate()
+27
View File
@@ -0,0 +1,27 @@
import zmq
import zipline.protocol as zp
def gen_from_pull_socket(socket_uri, context, unframe):
"""
A generator that takes a socket_uri, and yields
messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE.
"""
pull_socket = context.socket(zmq.PULL)
pull_socket.connect(socket_uri)
poller = zmq.Poller()
poller.register(pull_socket, zmq.POLLIN)
return gen_from_poller(poller, pull_socket, unframe)
def gen_from_poller(poller, in_socket, unframe):
while True:
socks = dict(poller.poll(1000))
if socks.get(in_socket) == zmq.POLLIN:
message = in_socket.recv()
if message == str(zp.CONTROL_PROTOCOL.DONE):
break
else:
event = unframe(message)
yield event
+7 -7
View File
@@ -70,7 +70,7 @@ from zipline.transforms import BaseTransform
from zipline.test_algorithms import TestAlgorithm
from zipline.components import TradeSimulationClient
from zipline.core.process import ProcessSimulator
from zipline.core.monitor import Controller
from zipline.core.monitor import Monitor
from zipline.finance.trading import SIMULATION_STYLE
log = logbook.Logger('Lines')
@@ -131,7 +131,7 @@ class SimulatedTrading(object):
'results_address' : sockets[4],
}
self.con = Controller(
self.monitor = Monitor(
# pub socket
sockets[5],
# route socket
@@ -163,7 +163,7 @@ class SimulatedTrading(object):
#setup transforms
self.transforms = {}
self.sim.register_controller( self.con )
self.sim.register_monitor( self.monitor )
@staticmethod
@@ -348,15 +348,15 @@ class SimulatedTrading(object):
return base | transforms | sources
def setup_controller(self):
def setup_monitor(self):
"""
Prepare the controller to manage the topology specified
Prepare the monitor to manage the topology specified
by this line.
"""
self.con.manage(self.topology)
self.monitor.manage(self.topology)
def simulate(self, blocking=True):
self.setup_controller()
self.setup_monitor()
self.started = True
self.sim_context = self.sim.simulate()
+22 -22
View File
@@ -7,7 +7,7 @@ import blist
from zipline.utils.date_utils import EPOCH
from itertools import izip
from logbook import FileHandler
from zipline.core.monitor import Monitor
def setup_logger(test, path='/var/log/zipline/zipline.log'):
test.log_handler = FileHandler(path)
@@ -144,31 +144,31 @@ def assert_single_position(test, zipline):
)
def launch_component(self, component):
def launch_component(component):
proc = multiprocessing.Process(target=component.run)
proc.start()
self.subprocesses.append(proc)
self.mapping[proc.pid] = component.get_id
return proc
def gen_from_socket(socket_uri, context, unframe):
"""
A generator that takes a socket_uri, and yields
messages from the poller until it gets a zp.CONTROL_PROTOCOL.DONE.
"""
pull_socket = context.socket(zmq.PULL)
pull_socket.connect(socket_uri)
poller = zmq.Poller()
poller.register(pull_socket, zmq.POLLIN)
def launch_monitor(monitor):
proc = multiprocessing.Process(target=monitor.run)
proc.start()
return proc
while True:
socks = dict(poller.poll(1000))
if socks.get(pull_socket) == zmq.POLLIN:
message = pull_socket.recv()
def create_monitor(allocator):
sockets = allocator.lease(3)
mon = Monitor(
# pub socket
sockets[0],
# route socket
sockets[1],
# exception socket to match tradesimclient's result
# socket, because we want to relay exceptions to the
# same listener
sockets[2],
# this controller is expected to run in a test, so no
# need to signal the parent process on success or error.
send_sighup=False
)
if message.type == zp.CONTROL_PROTOCOL.DONE:
break
else:
yield unframe(message)
return mon