Fix problem in which actors and workers running tasks are not killed by driver exit. (#490)

* Augment test to verify that relevant workers and actors are killed during driver cleanup.

* Fix bug in which we were only killing one worker when a driver exited.

* Fix remove driver test.

* Fix and augment test.
This commit is contained in:
Robert Nishihara
2017-04-26 15:13:39 -07:00
committed by Philipp Moritz
parent b7ace01b5f
commit 1627f89945
5 changed files with 208 additions and 48 deletions
+25 -10
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import redis
import time
@@ -53,34 +54,48 @@ def _wait_for_nodes_to_join(num_nodes, timeout=20):
num_nodes))
def _broadcast_event(event_name, redis_address):
def _broadcast_event(event_name, redis_address, data=None):
"""Broadcast an event.
This is used to synchronize drivers for the multi-node tests.
Args:
event_name: The name of the event to wait for.
redis_address: The address of the Redis server to use for synchronization.
This is used to synchronize drivers for the multi-node tests.
data: Extra data to include in the broadcast (this will be returned by the
corresponding _wait_for_event call). This data must be json serializable.
"""
redis_host, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
redis_client.rpush(EVENT_KEY, event_name)
payload = json.dumps((event_name, data))
redis_client.rpush(EVENT_KEY, payload)
def _wait_for_event(event_name, redis_address, extra_buffer=1):
def _wait_for_event(event_name, redis_address, extra_buffer=0):
"""Block until an event has been broadcast.
This is used to synchronize drivers for the multi-node tests.
Args:
event_name: The name of the event to wait for.
redis_address: The address of the Redis server to use for synchronization.
extra_buffer: An amount of time in seconds to wait after the event.
This is used to synchronize drivers for the multi-node tests.
Returns:
The data that was passed into the corresponding _broadcast_event call.
"""
redis_host, redis_port = redis_address.split(":")
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
while True:
event_names = redis_client.lrange(EVENT_KEY, 0, -1)
if event_name.encode("ascii") in event_names:
break
time.sleep(extra_buffer)
event_infos = redis_client.lrange(EVENT_KEY, 0, -1)
events = dict()
for event_info in event_infos:
name, data = json.loads(event_info)
if name in events:
raise Exception("The same event {} was broadcast twice.".format(name))
events[name] = data
if event_name in events:
# Potentially sleep a little longer and then return the event data.
time.sleep(extra_buffer)
return events[event_name]
time.sleep(0.1)