mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 05:32:55 +08:00
addressing race condition between loop_send exit and DONE message delivery.
This commit is contained in:
@@ -77,7 +77,6 @@ class ComponentTestCase(TestCase):
|
||||
DATASOURCE_UNFRAME
|
||||
)
|
||||
|
||||
|
||||
for event in comp_a:
|
||||
log.info(event)
|
||||
|
||||
@@ -155,6 +154,9 @@ class ComponentTestCase(TestCase):
|
||||
|
||||
sorted_out = date_sorted_sources(*sources)
|
||||
|
||||
import time
|
||||
time.sleep(.25)
|
||||
|
||||
prev = None
|
||||
sort_count = 0
|
||||
for msg in sorted_out:
|
||||
|
||||
+46
-45
@@ -17,8 +17,6 @@ from collections import namedtuple
|
||||
# pyzmq
|
||||
import zmq
|
||||
|
||||
from zipline.gens.zmqgen import gen_from_poller
|
||||
|
||||
from zipline.core.monitor import PARAMETERS
|
||||
|
||||
from zipline.protocol import (
|
||||
@@ -36,6 +34,10 @@ class KillSignal(Exception):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
class ShutdownSignal(Exception):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
ComponentSocketArgs = namedtuple('ComponentSocketArgs',['uri','style','bind'])
|
||||
|
||||
class Component(object):
|
||||
@@ -87,7 +89,7 @@ class Component(object):
|
||||
self.state_flag = COMPONENT_STATE.OK
|
||||
|
||||
# track time of last ping we received from monitor
|
||||
self.last_ping = None
|
||||
self.last_ping = time.time()
|
||||
|
||||
# Humanhashes make this way easier to debug because they stick
|
||||
# in your mind unlike a 32 byte string of random hex.
|
||||
@@ -108,7 +110,6 @@ class Component(object):
|
||||
self.recv_gen = self.create_recv_gen()
|
||||
|
||||
|
||||
|
||||
# ------------
|
||||
# Core Methods
|
||||
# ------------
|
||||
@@ -148,6 +149,13 @@ class Component(object):
|
||||
|
||||
self.signal_done()
|
||||
|
||||
# keep heartbeating until we receive the shutdown
|
||||
# message from the Monitor (raises a
|
||||
# ShutdownSignal), or we don't hear from the Monitor
|
||||
# for MAX_COMPONENT_WAIT.
|
||||
while True:
|
||||
self.heartbeat(timeout=1000)
|
||||
|
||||
except Exception as exc:
|
||||
self.handle_exception(exc)
|
||||
finally:
|
||||
@@ -170,10 +178,7 @@ class Component(object):
|
||||
try:
|
||||
# we block on ready here until monitor sends the GO
|
||||
self.wait_ready()
|
||||
log.info("Starting to drain {id}".format(id=self.get_id))
|
||||
for event in gen_from_poller(self.poll, self.in_socket, self.unframe):
|
||||
self.heartbeat()
|
||||
# event.source_id = self.get_id
|
||||
for event in self.gen_from_poller(self.poll, self.in_socket, self.unframe):
|
||||
yield event
|
||||
|
||||
self.signal_done()
|
||||
@@ -182,13 +187,30 @@ class Component(object):
|
||||
finally:
|
||||
log.info("Exiting %r" % self)
|
||||
|
||||
def gen_from_poller(self, poller, in_socket, unframe):
|
||||
|
||||
while True:
|
||||
socks = dict(poller.poll(0))
|
||||
self.heartbeat()
|
||||
if socks.get(in_socket) == zmq.POLLIN:
|
||||
message = in_socket.recv()
|
||||
if message == str(CONTROL_PROTOCOL.DONE):
|
||||
break
|
||||
else:
|
||||
event = unframe(message)
|
||||
yield event
|
||||
|
||||
def handle_exception(self, exc, re_raise=False):
|
||||
if not isinstance(exc, KillSignal):
|
||||
self.signal_exception(exc)
|
||||
else:
|
||||
if isinstance(exc, KillSignal):
|
||||
# if we get a kill signal, forcibly close all the
|
||||
# sockets.
|
||||
self.teardown_sockets()
|
||||
elif isinstance(exc, ShutdownSignal):
|
||||
# signal from monitor of an orderly shutdown,
|
||||
# do nothing.
|
||||
pass
|
||||
else:
|
||||
self.signal_exception(exc)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
@@ -210,6 +232,12 @@ class Component(object):
|
||||
for sock in self.sockets:
|
||||
sock.close()
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Clean shutdown.
|
||||
"""
|
||||
raise ShutdownSignal()
|
||||
|
||||
def kill(self):
|
||||
"""
|
||||
Unclean shutdown.
|
||||
@@ -333,7 +361,7 @@ class Component(object):
|
||||
# Side effectful call from the monitor to unlock
|
||||
# and begin doing work only when the entire topology
|
||||
# of the system beings to come online
|
||||
log.info('Unlocking ' + self.__class__.__name__)
|
||||
log.info('Unlocking ' + self.get_id)
|
||||
self.unlock_ready()
|
||||
|
||||
# =========
|
||||
@@ -344,7 +372,7 @@ class Component(object):
|
||||
# data that are done during a clean shutdown. Inform the
|
||||
# monitor that we're done.
|
||||
elif event == CONTROL_PROTOCOL.SHUTDOWN:
|
||||
self.signal_done()
|
||||
self.shutdown()
|
||||
break
|
||||
|
||||
# =========
|
||||
@@ -358,7 +386,7 @@ class Component(object):
|
||||
|
||||
elif time.time() - start_wait > PARAMETERS.MAX_COMPONENT_WAIT:
|
||||
log.info('No go signal from monitor, %s exiting' \
|
||||
% self.__class__.__name__)
|
||||
% self.get_id)
|
||||
self.kill()
|
||||
break
|
||||
|
||||
@@ -407,7 +435,7 @@ class Component(object):
|
||||
# data that are done during a clean shutdown. Inform the
|
||||
# monitor that we're done.
|
||||
elif event == CONTROL_PROTOCOL.SHUTDOWN:
|
||||
self.signal_done()
|
||||
self.shutdown()
|
||||
|
||||
# =========
|
||||
# Hard Kill
|
||||
@@ -419,7 +447,7 @@ class Component(object):
|
||||
|
||||
# In case we didn't receive a ping, send a pre-emptive
|
||||
# pong to the monitor.
|
||||
elif self.last_ping and time.time() - self.last_ping > 1:
|
||||
elif time.time() - self.last_ping > 2:
|
||||
# send a ping ahead of schedule
|
||||
pre_pong = time.time()
|
||||
heartbeat_frame = CONTROL_FRAME(
|
||||
@@ -432,15 +460,14 @@ class Component(object):
|
||||
# doing work
|
||||
self.control_out.send(heartbeat_frame, self.zmq.NOBLOCK)
|
||||
self.last_ping = pre_pong
|
||||
elif self.last_ping and \
|
||||
time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
|
||||
elif time.time() - self.last_ping > PARAMETERS.MAX_COMPONENT_WAIT:
|
||||
# monitor is gone without sending the shutdown
|
||||
# signal, do a hard exit.
|
||||
self.kill()
|
||||
|
||||
|
||||
def signal_ready(self):
|
||||
log.info(self.__class__.__name__ + ' is ready')
|
||||
log.info(self.get_id + ' is ready')
|
||||
frame = CONTROL_FRAME(
|
||||
CONTROL_PROTOCOL.READY,
|
||||
''
|
||||
@@ -470,13 +497,6 @@ class Component(object):
|
||||
self.control_out.send(done_frame)
|
||||
log.info("[%s] sent control done" % self.get_id)
|
||||
|
||||
# there is a narrow race condition where we finish just
|
||||
# after the Monitor accepts our prior heartbeat, but just
|
||||
# before the next one is sent. So, we hang around for one
|
||||
# last heartbeat, and wait an unusually long time.
|
||||
# TODO: decided if this is really necessary.
|
||||
# self.heartbeat(timeout=5000)
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
# -----------
|
||||
@@ -529,7 +549,6 @@ class Component(object):
|
||||
def bind_push_socket(self, addr):
|
||||
push_socket = self.context.socket(self.zmq.PUSH)
|
||||
push_socket.bind(addr)
|
||||
self.out_socket = push_socket
|
||||
self.sockets.append(push_socket)
|
||||
|
||||
return push_socket
|
||||
@@ -547,7 +566,6 @@ class Component(object):
|
||||
pull_socket = self.context.socket(self.zmq.PULL)
|
||||
pull_socket.bind(addr)
|
||||
self.poll.register(pull_socket, self.zmq.POLLIN)
|
||||
|
||||
self.sockets.append(pull_socket)
|
||||
|
||||
return pull_socket
|
||||
@@ -556,26 +574,9 @@ class Component(object):
|
||||
push_socket = self.context.socket(self.zmq.PUSH)
|
||||
push_socket.connect(addr)
|
||||
self.sockets.append(push_socket)
|
||||
self.out_socket = push_socket
|
||||
|
||||
return push_socket
|
||||
|
||||
def bind_pub_socket(self, addr):
|
||||
pub_socket = self.context.socket(self.zmq.PUB)
|
||||
pub_socket.bind(addr)
|
||||
self.out_socket = pub_socket
|
||||
|
||||
return pub_socket
|
||||
|
||||
def connect_sub_socket(self, addr):
|
||||
sub_socket = self.context.socket(self.zmq.SUB)
|
||||
sub_socket.connect(addr)
|
||||
sub_socket.setsockopt(self.zmq.SUBSCRIBE,'')
|
||||
self.sockets.append(sub_socket)
|
||||
|
||||
self.poll.register(sub_socket, self.zmq.POLLIN)
|
||||
|
||||
return sub_socket
|
||||
|
||||
def setup_control(self):
|
||||
"""
|
||||
|
||||
+7
-13
@@ -340,9 +340,8 @@ class Monitor(object):
|
||||
log.info("breaking out of initial heartbeat")
|
||||
break
|
||||
|
||||
# Has the entire topology told us its DONE
|
||||
done = len(self.finished) == len(self.topology)
|
||||
if done:
|
||||
# Break out if the entire topology told us its DONE
|
||||
if len(self.finished) == len(self.topology):
|
||||
break
|
||||
|
||||
|
||||
@@ -438,27 +437,22 @@ class Monitor(object):
|
||||
bad = self.tracked - good - self.finished
|
||||
new = self.responses - good - self.finished
|
||||
|
||||
missing = self.topology - self.tracked - self.finished
|
||||
|
||||
for component in new:
|
||||
self.new(component)
|
||||
|
||||
if self.debug:
|
||||
log.info('New component %r' % component)
|
||||
|
||||
for component in bad:
|
||||
self.timed_out(component)
|
||||
|
||||
for component in missing:
|
||||
missing = self.topology - self.tracked - self.finished
|
||||
|
||||
for component in missing:
|
||||
if self.debug:
|
||||
log.info('Missing component %r' % component)
|
||||
|
||||
if self.debug:
|
||||
|
||||
for component in self.tracked:
|
||||
if component not in self.topology:
|
||||
log.info('Uninvited component %r' % component)
|
||||
for component in self.tracked:
|
||||
if component not in self.topology:
|
||||
log.info('Uninvited component %r' % component)
|
||||
|
||||
# --------------
|
||||
# Init Handlers
|
||||
|
||||
@@ -17,16 +17,3 @@ def gen_from_pull_socket(socket_uri, context, unframe):
|
||||
# this generator needs to know about the source_ids coming in via
|
||||
# the poller, and need to yield DONE messages for each
|
||||
# source_id.
|
||||
|
||||
def gen_from_poller(poller, in_socket, unframe):
|
||||
|
||||
while True:
|
||||
socks = dict(poller.poll())
|
||||
|
||||
if socks.get(in_socket) == zmq.POLLIN:
|
||||
message = in_socket.recv()
|
||||
if message == str(zp.CONTROL_PROTOCOL.DONE):
|
||||
break
|
||||
else:
|
||||
event = unframe(message)
|
||||
yield event
|
||||
|
||||
Reference in New Issue
Block a user