Merge branch 'new_world_order' of github.com:quantopian/zipline into new_world_order

This commit is contained in:
scottsanderson
2012-08-03 14:07:43 -04:00
5 changed files with 163 additions and 169 deletions
+37 -29
View File
@@ -64,19 +64,19 @@ class ComponentTestCase(TestCase):
'count' : count
}
trade_gen = SpecificEquityTrades(*args_a, **kwargs_a)
monitor.add_to_topology(trade_gen.get_hash())
launch_monitor(monitor)
comp_a = Component(
SpecificEquityTrades(*args_a, **kwargs_a),
trade_gen,
monitor,
socket_uri,
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
launch_monitor(monitor)
iter_a = iter(comp_a)
ev = iter_a.next()
return
for event in comp_a:
log.info(event)
@@ -96,16 +96,8 @@ class ComponentTestCase(TestCase):
'filter' : filter,
'count' : count
}
comp_a = Component(
SpecificEquityTrades(*args_a, **kwargs_a),
monitor,
socket_uris[0],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
trade_gen_a = SpecificEquityTrades(*args_a, **kwargs_a)
monitor.add_to_topology(trade_gen_a.get_hash())
#Set up source b. Two minutes between events.
args_b = tuple()
@@ -116,15 +108,8 @@ class ComponentTestCase(TestCase):
'filter' : filter,
'count' : count
}
comp_b = Component(
SpecificEquityTrades(*args_b, **kwargs_b),
monitor,
socket_uris[1],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
trade_gen_b = SpecificEquityTrades(*args_b, **kwargs_b)
monitor.add_to_topology(trade_gen_b.get_hash())
#Set up source c. Three minutes between events.
args_c = tuple()
@@ -136,18 +121,41 @@ class ComponentTestCase(TestCase):
'count' : count
}
trade_gen_c = SpecificEquityTrades(*args_c, **kwargs_c)
monitor.add_to_topology(trade_gen_c.get_hash())
launch_monitor(monitor)
comp_a = Component(
trade_gen_a,
monitor,
socket_uris[0],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
comp_b = Component(
trade_gen_b,
monitor,
socket_uris[1],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
comp_c = Component(
SpecificEquityTrades(*args_c, **kwargs_c),
trade_gen_c,
monitor,
socket_uris[2],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
launch_monitor(monitor)
sources = [comp_a, comp_b, comp_c]
gens = [iter(source) for source in sources]
sorted_out = date_sorted_sources(gens)
sorted_out = date_sorted_sources(*sources)
import time
time.sleep(.25)
prev = None
sort_count = 0
+114 -115
View File
@@ -17,8 +17,6 @@ from collections import namedtuple
# pyzmq
import zmq
from zipline.gens.zmqgen import gen_from_poller
from zipline.core.monitor import PARAMETERS
from zipline.protocol import (
@@ -36,6 +34,10 @@ class KillSignal(Exception):
def __init__(self):
pass
class ShutdownSignal(Exception):
def __init__(self):
pass
ComponentSocketArgs = namedtuple('ComponentSocketArgs',['uri','style','bind'])
class Component(object):
@@ -83,25 +85,29 @@ class Component(object):
self.unframe = unframe
self.prefix = ""
# register two components with the monitor
monitor.add_to_topology(self.component_id)
monitor.add_to_topology("FORK-"+self.component_id)
# TODO: state_flag is deprecated, remove
self.state_flag = COMPONENT_STATE.OK
# track time of last ping we received from monitor
self.last_ping = None
self.last_ping = time.time()
# Humanhashes make this way easier to debug because they stick
# in your mind unlike a 32 byte string of random hex.
self.guid = uuid.uuid4()
self.huid = humanhash.humanize(self.guid.hex)
# first, start the generator in its own process. Once
# Monitor says "go", Events from the generator will be
# FRAME'd and PUSH'd to self.socket_uri.
proc = multiprocessing.Process(
target=self.loop_send
)
proc.start()
# ------------
# Generator
# Message Receiver/Generator
# ------------
self.gen = None
self.recv_gen = self.create_recv_gen()
# ------------
@@ -109,94 +115,108 @@ class Component(object):
# ------------
def _run_out(self):
def loop_send(self):
"""
The main component loop. This is wrapped inside a
exception reporting context inside of run.
The core logic of the all components is run here.
"""
# The process title so you can watch it in top, ps.
setproctitle(self.generator.__class__.__name__)
self.prefix = "FORK-"
log.info("Start %r" % self)
log.info("Pid %s" % os.getpid())
log.info("Group %s" % os.getpgrp())
self.open()
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
for event in self.generator:
self.heartbeat()
event.source_id = self.get_id
msg = self.frame(event)
self.out_socket.send(msg)
self.signal_done()
def _run_in(self):
self.open(send=False)
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
# return the generator
for event in gen_from_poller(self.poll, self.in_socket, self.unframe):
event.source_id = self.get_id
yield event
self.signal_done()
def run_safe(self, func):
"""
Run a function that is assumed to include wait_ready and
heartbeat. Used to wrap fork_generator and consume_gen.
"""
try:
return func()
# The process title so you can watch it in top, ps.
setproctitle(self.generator.__class__.__name__)
self.prefix = "FORK-"
log.info("Start %r" % self)
log.info("Pid %s" % os.getpid())
log.info("Group %s" % os.getpgrp())
self.open()
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
for event in self.generator:
self.heartbeat()
msg = self.frame(event)
self.out_socket.send(msg)
self.signal_done()
# keep heartbeating until we receive the shutdown
# message from the Monitor (raises a
# ShutdownSignal), or we don't hear from the Monitor
# for MAX_COMPONENT_WAIT.
while True:
self.heartbeat(timeout=1000)
except Exception as exc:
if not isinstance(exc, KillSignal):
self.signal_exception(exc)
else:
# if we get a kill signal, forcibly close all the
# sockets.
self.teardown_sockets()
self.handle_exception(exc)
finally:
log.info("Exiting %r" % self)
def _launch(self):
# first, start the generator in its own process. Once
# Monitor says "go", Events from the generator will be
# FRAME'd and PUSH'd to self.socket_uri.
proc = multiprocessing.Process(
target=self.run_safe,
args=(self._run_out,)
)
proc.start()
def create_recv_gen(self):
try:
self.open(send=False)
self.signal_ready()
self.lock_ready()
# return the generator
return self.loop_recv()
except Exception as exc:
self.handle_exception(exc)
finally:
log.info("Created Recv Gen for %r" % self)
# Start the poller-generator, which will PULL messages
# from self.sockiet_uri, UNFRAME'd them, and yield them.
return self.run_safe(self._run_in)
def loop_recv(self):
try:
# we block on ready here until monitor sends the GO
self.wait_ready()
for event in self.gen_from_poller(self.poll, self.in_socket, self.unframe):
yield event
self.signal_done()
except Exception as exc:
self.handle_exception(exc)
finally:
log.info("Exiting %r" % self)
def gen_from_poller(self, poller, in_socket, unframe):
while True:
socks = dict(poller.poll(0))
self.heartbeat()
if socks.get(in_socket) == zmq.POLLIN:
message = in_socket.recv()
if message == str(CONTROL_PROTOCOL.DONE):
break
else:
event = unframe(message)
yield event
def handle_exception(self, exc, re_raise=False):
if isinstance(exc, KillSignal):
# if we get a kill signal, forcibly close all the
# sockets.
self.teardown_sockets()
elif isinstance(exc, ShutdownSignal):
# signal from monitor of an orderly shutdown,
# do nothing.
pass
else:
self.signal_exception(exc)
def __iter__(self):
if not self.gen:
self.gen = self._launch()
return self
return self.gen
def next(self):
return self.recv_gen.next()
# ----------------------------
# Cleanup & Modes of Failure
@@ -212,6 +232,12 @@ class Component(object):
for sock in self.sockets:
sock.close()
def shutdown(self):
"""
Clean shutdown.
"""
raise ShutdownSignal()
def kill(self):
"""
Unclean shutdown.
@@ -335,7 +361,7 @@ class Component(object):
# Side effectful call from the monitor to unlock
# and begin doing work only when the entire topology
# of the system beings to come online
log.info('Unlocking ' + self.__class__.__name__)
log.info('Unlocking ' + self.get_id)
self.unlock_ready()
# =========
@@ -346,7 +372,7 @@ class Component(object):
# data that are done during a clean shutdown. Inform the
# monitor that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
break
# =========
@@ -360,7 +386,7 @@ class Component(object):
elif time.time() - start_wait > PARAMETERS.MAX_COMPONENT_WAIT:
log.info('No go signal from monitor, %s exiting' \
% self.__class__.__name__)
% self.get_id)
self.kill()
break
@@ -409,7 +435,7 @@ class Component(object):
# data that are done during a clean shutdown. Inform the
# monitor that we're done.
elif event == CONTROL_PROTOCOL.SHUTDOWN:
self.signal_done()
self.shutdown()
# =========
# Hard Kill
@@ -421,7 +447,7 @@ class Component(object):
# In case we didn't receive a ping, send a pre-emptive
# pong to the monitor.
elif self.last_ping and time.time() - self.last_ping > 1:
elif time.time() - self.last_ping > 2:
# send a ping ahead of schedule
pre_pong = time.time()
heartbeat_frame = CONTROL_FRAME(
@@ -434,15 +460,14 @@ class Component(object):
# doing work
self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK)
self.last_ping = pre_pong
elif self.last_ping and \
time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
elif time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
# monitor is gone without sending the shutdown
# signal, do a hard exit.
self.kill()
def signal_ready(self):
log.info(self.__class__.__name__ + ' is ready')
log.info(self.get_id + ' is ready')
frame = CONTROL_FRAME(
CONTROL_PROTOCOL.READY,
''
@@ -472,13 +497,6 @@ class Component(object):
self.control_out.send(done_frame)
log.info("[%s] sent control done" % self.get_id)
# there is a narrow race condition where we finish just
# after the Monitor accepts our prior heartbeat, but just
# before the next one is sent. So, we hang around for one
# last heartbeat, and wait an unusually long time.
# TODO: decided if this is really necessary.
# self.heartbeat(timeout=5000)
# -----------
# Messaging
# -----------
@@ -531,7 +549,6 @@ class Component(object):
def bind_push_socket(self, addr):
push_socket = self.context.socket(self.zmq.PUSH)
push_socket.bind(addr)
self.out_socket = push_socket
self.sockets.append(push_socket)
return push_socket
@@ -549,7 +566,6 @@ class Component(object):
pull_socket = self.context.socket(self.zmq.PULL)
pull_socket.bind(addr)
self.poll.register(pull_socket, self.zmq.POLLIN)
self.sockets.append(pull_socket)
return pull_socket
@@ -558,26 +574,9 @@ class Component(object):
push_socket = self.context.socket(self.zmq.PUSH)
push_socket.connect(addr)
self.sockets.append(push_socket)
self.out_socket = push_socket
return push_socket
def bind_pub_socket(self, addr):
pub_socket = self.context.socket(self.zmq.PUB)
pub_socket.bind(addr)
self.out_socket = pub_socket
return pub_socket
def connect_sub_socket(self, addr):
sub_socket = self.context.socket(self.zmq.SUB)
sub_socket.connect(addr)
sub_socket.setsockopt(self.zmq.SUBSCRIBE,'')
self.sockets.append(sub_socket)
self.poll.register(sub_socket, self.zmq.POLLIN)
return sub_socket
def setup_control(self):
"""
+11 -11
View File
@@ -120,7 +120,7 @@ class Monitor(object):
return
def add_to_topology(self, component_id):
add = set([component_id])
add = set([component_id, "FORK-" + component_id])
self.topology.update(add)
def freeze_topology(self):
@@ -340,6 +340,11 @@ class Monitor(object):
log.info("breaking out of initial heartbeat")
break
# Break out if the entire topology told us its DONE
if len(self.finished) == len(self.topology):
break
# ================
# Heartbeat Stats
# ================
@@ -432,27 +437,22 @@ class Monitor(object):
bad = self.tracked - good - self.finished
new = self.responses - good - self.finished
missing = self.topology - self.tracked - self.finished
for component in new:
self.new(component)
if self.debug:
log.info('New component %r' % component)
for component in bad:
self.timed_out(component)
for component in missing:
missing = self.topology - self.tracked - self.finished
for component in missing:
if self.debug:
log.info('Missing component %r' % component)
if self.debug:
for component in self.tracked:
if component not in self.topology:
log.info('Uninvited component %r' % component)
for component in self.tracked:
if component not in self.topology:
log.info('Uninvited component %r' % component)
# --------------
# Init Handlers
+1 -1
View File
@@ -43,7 +43,7 @@ def roundrobin(sources, namestrings):
"""
assert len(sources) == len(namestrings)
mapping = OrderedDict(zip(namestrings, sources))
# While our generators have not been exhausted, pull elements
while mapping.keys() != []:
for namestring, source in mapping.iteritems():
-13
View File
@@ -17,16 +17,3 @@ def gen_from_pull_socket(socket_uri, context, unframe):
# this generator needs to know about the source_ids coming in via
# the poller, and need to yield DONE messages for each
# source_id.
def gen_from_poller(poller, in_socket, unframe):
while True:
socks = dict(poller.poll(1000))
if socks.get(in_socket) == zmq.POLLIN:
message = in_socket.recv()
if message == str(zp.CONTROL_PROTOCOL.DONE):
break
else:
event = unframe(message)
yield event