Plasma and worker node failure. (#373)

* Failing test case

* Local scheduler exits cleanly after plasma store dies

* Tolerate one plasma store failure

* Tolerate plasma store failures on all nodes except head node

* Plasma manager heartbeats

* Component failure tests

* Don't run the helper for Python testing

* Fix C test

* Fix hanging plasma transfer test

* Fix python3

* Consolidate ClientConnection code

* Fix valgrind test

* fix c test

* We can restart worker nodes!

* Fix flatbuffers bug

* Address comments

* Only register actual workers with the local scheduler

* Fix bug

* Fix segfaults

* Add test case that tests for driver liveness, fix local scheduler bug

* Clean up after tests

* Allocate retry info on the stack

* Send SIGKILL before waiting

* Relax unit test conditions

* Driver liveness test case and documentation
This commit is contained in:
Stephanie Wang
2017-03-17 17:03:58 -07:00
committed by Robert Nishihara
parent 964d5cac48
commit 12c9618c0c
27 changed files with 730 additions and 310 deletions
+2 -1
View File
@@ -90,7 +90,8 @@ class TestGlobalScheduler(unittest.TestCase):
redis_address=redis_address,
static_resource_list=[10, 0])
# Connect to the scheduler.
local_scheduler_client = local_scheduler.LocalSchedulerClient(local_scheduler_name, NIL_ACTOR_ID)
local_scheduler_client = local_scheduler.LocalSchedulerClient(
local_scheduler_name, NIL_ACTOR_ID, False)
self.local_scheduler_clients.append(local_scheduler_client)
self.local_scheduler_pids.append(p4)
+2 -1
View File
@@ -41,7 +41,8 @@ class TestLocalSchedulerClient(unittest.TestCase):
# Start a local scheduler.
scheduler_name, self.p2 = local_scheduler.start_local_scheduler(plasma_store_name, use_valgrind=USE_VALGRIND)
# Connect to the scheduler.
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(scheduler_name, NIL_ACTOR_ID)
self.local_scheduler_client = local_scheduler.LocalSchedulerClient(
scheduler_name, NIL_ACTOR_ID, False)
def tearDown(self):
# Check that the processes are still alive.
+208 -83
View File
@@ -18,21 +18,28 @@ from ray.core.generated.TaskReply import TaskReply
# These variables must be kept in sync with the C codebase.
# common/common.h
HEARTBEAT_TIMEOUT_MILLISECONDS = 100
NUM_HEARTBEATS_TIMEOUT = 100
DB_CLIENT_ID_SIZE = 20
NIL_ID = b"\xff" * DB_CLIENT_ID_SIZE
# common/task.h
TASK_STATUS_LOST = 32
# common/state/redis.cc
PLASMA_MANAGER_HEARTBEAT_CHANNEL = b"plasma_managers"
# common/redis_module/ray_redis_module.cc
TASK_PREFIX = "TT:"
OBJECT_PREFIX = "OL:"
DB_CLIENT_PREFIX = "CL:"
DB_CLIENT_TABLE_NAME = b"db_clients"
# local_scheduler/local_scheduler.h
LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS = 100
LOCAL_SCHEDULER_CLIENT_TYPE = b"local_scheduler"
# plasma/plasma_manager.cc
PLASMA_MANAGER_CLIENT_TYPE = b"plasma_manager"
# Set up logging.
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.WARN)
class Monitor(object):
"""A monitor for Ray processes.
@@ -45,69 +52,45 @@ class Monitor(object):
redis: A connection to the Redis server.
subscribe_client: A pubsub client for the Redis server. This is used to
receive notifications about failed components.
local_schedulers: A set of the local scheduler IDs of all of the currently
live local schedulers in the cluster. In addition, this also includes
NIL_ID.
subscribed: A dictionary mapping channel names (str) to whether or not the
subscription to that channel has succeeded yet (bool).
dead_local_schedulers: A set of the local scheduler IDs of all of the local
schedulers that were up at one point and have died since then.
live_plasma_managers: A counter mapping live plasma manager IDs to the
number of heartbeats that have passed since we last heard from that
plasma manager. A plasma manager is live if we received a heartbeat from
it at any point, and if it has not timed out.
dead_plasma_managers: A set of the plasma manager IDs of all the plasma
managers that were up at one point and have died since then.
"""
def __init__(self, redis_address, redis_port):
# Initialize the Redis clients.
self.redis = redis.StrictRedis(host=redis_address, port=redis_port, db=0)
self.subscribe_client = self.redis.pubsub()
self.subscribed = {}
# Initialize data structures to keep track of the active database clients.
self.local_schedulers = set()
# Add the NIL_ID so that we don't accidentally mark tasks that aren't
# associated with a node as LOST during cleanup.
self.local_schedulers.add(NIL_ID)
self.dead_local_schedulers = set()
self.live_plasma_managers = Counter()
self.dead_plasma_managers = set()
def subscribe(self):
"""Subscribe to the db_clients channel.
def subscribe(self, channel):
"""Subscribe to the given channel.
Args:
channel (str): The channel to subscribe to.
Raises:
Exception: An exception is raised if the subscription fails.
"""
self.subscribe_client.subscribe(DB_CLIENT_TABLE_NAME)
# Wait for the first message to signal that the subscription was successful.
while True:
message = self.subscribe_client.get_message()
if message is None:
time.sleep(LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS / 1000)
continue
break
# The first message's payload should be the index of our subscription.
if "data" not in message:
Exception("Unable to subscribe to local scheduler table.")
def read_message(self):
"""Read a message from the db_clients channel.
Returns:
None if no message was to read. Otherwise, a tuple of (db_client_id,
client_type, auxiliary_address, is_insertion) is returned. The value
is_insertion is a bool that is true if the update to the db_clients
table was an insertion and false if deletion.
"""
message = self.subscribe_client.get_message()
if message is None:
return None
# Parse the message.
data = message["data"]
notification_object = SubscribeToDBClientTableReply.GetRootAsSubscribeToDBClientTableReply(data, 0)
db_client_id = notification_object.DbClientId()
client_type = notification_object.ClientType()
auxiliary_address = notification_object.AuxAddress()
is_insertion = notification_object.IsInsertion()
return db_client_id, client_type, auxiliary_address, is_insertion
self.subscribe_client.subscribe(channel)
self.subscribed[channel] = False
def cleanup_task_table(self):
"""Clean up global state for a failed local schedulers.
"""Clean up global state for failed local schedulers.
This marks any tasks that were scheduled on dead local schedulers as
TASK_STATUS_LOST. A local scheduler is deemed dead if it is not in
self.local_schedulers.
TASK_STATUS_LOST. A local scheduler is deemed dead if it is in
self.dead_local_schedulers.
"""
task_ids = self.redis.scan_iter(match="{prefix}*".format(prefix=TASK_PREFIX))
num_tasks_updated = 0
@@ -118,29 +101,146 @@ class Monitor(object):
task_object = TaskReply.GetRootAsTaskReply(response, 0)
local_scheduler_id = task_object.LocalSchedulerId()
# See if the corresponding local scheduler is alive.
if local_scheduler_id not in self.local_schedulers:
num_tasks_updated += 1
if local_scheduler_id in self.dead_local_schedulers:
# If the task is scheduled on a dead local scheduler, mark the task as
# lost.
ok = self.redis.execute_command("RAY.TASK_TABLE_UPDATE",
task_id,
TASK_STATUS_LOST,
NIL_ID)
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
if num_tasks_updated > 0:
log.warn("Marked {} tasks as lost.".format(num_tasks_updated))
def cleanup_object_table(self):
"""Clean up global state for failed plasma managers.
This removes dead plasma managers from any location entries in the object
table. A plasma manager is deemed dead if it is in
self.dead_plasma_managers.
"""
# TODO(swang): Also kill the associated plasma store, since it's no longer
# reachable without a plasma manager.
object_ids = self.redis.scan_iter(match="{prefix}*".format(prefix=OBJECT_PREFIX))
num_objects_removed = 0
for object_id in object_ids:
object_id = object_id[len(OBJECT_PREFIX):]
managers = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", object_id)
for manager in managers:
if manager in self.dead_plasma_managers:
# If the object was on a dead plasma manager, remove that location
# entry.
ok = self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", object_id,
manager)
if ok != b"OK":
log.warn("Failed to remove object location for dead plasma "
"manager.")
num_objects_removed += 1
if num_objects_removed > 0:
log.warn("Marked {} objects as lost.".format(num_objects_removed))
def scan_db_client_table(self):
"""Scan the database client table for the current clients.
"""Scan the database client table for dead clients.
After subscribing to the client table, it's necessary to call this before
reading any messages from the subscription channel.
reading any messages from the subscription channel. This ensures that we do
not miss any notifications for deleted clients that occurred before we
subscribed.
"""
db_client_keys = self.redis.keys("{prefix}*".format(prefix=DB_CLIENT_PREFIX))
for db_client_key in db_client_keys:
db_client_id = db_client_key[len(DB_CLIENT_PREFIX):]
client_type = self.redis.hget(db_client_key, "client_type")
if client_type == LOCAL_SCHEDULER_CLIENT_TYPE:
self.local_schedulers.add(db_client_id)
client_type, deleted = self.redis.hmget(db_client_key,
[b"client_type", b"deleted"])
deleted = bool(int(deleted))
if deleted:
if client_type == LOCAL_SCHEDULER_CLIENT_TYPE:
self.dead_local_schedulers.add(db_client_id)
elif client_type == PLASMA_MANAGER_CLIENT_TYPE:
self.dead_plasma_managers.add(db_client_id)
def subscribe_handler(self, channel, data):
"""Handle a subscription success message from Redis.
"""
log.debug("Subscribed to {}, data was {}".format(channel, data))
self.subscribed[channel] = True
def db_client_notification_handler(self, channel, data):
"""Handle a notification from the db_client table from Redis.
This handler processes notifications from the db_client table.
Notifications should be parsed using the SubscribeToDBClientTableReply
flatbuffer. Deletions are processed, insertions are ignored. Cleanup of the
associated state in the state tables should be handled by the caller.
"""
notification_object = SubscribeToDBClientTableReply.GetRootAsSubscribeToDBClientTableReply(data, 0)
db_client_id = notification_object.DbClientId()
client_type = notification_object.ClientType()
auxiliary_address = notification_object.AuxAddress()
is_insertion = notification_object.IsInsertion()
# If the update was an insertion, we ignore it.
if is_insertion:
return
# If the update was a deletion, add them to our accounting for dead
# local schedulers and plasma managers.
log.warn("Removed {}".format(client_type))
if client_type == LOCAL_SCHEDULER_CLIENT_TYPE:
if db_client_id not in self.dead_local_schedulers:
self.dead_local_schedulers.add(db_client_id)
elif client_type == PLASMA_MANAGER_CLIENT_TYPE:
if db_client_id not in self.dead_plasma_managers:
self.dead_plasma_managers.add(db_client_id)
def plasma_manager_heartbeat_handler(self, channel, data):
"""Handle a plasma manager heartbeat from Redis.
This resets the number of heartbeats that we've missed from this plasma
manager.
"""
# The first DB_CLIENT_ID_SIZE characters are the client ID.
db_client_id = data[:DB_CLIENT_ID_SIZE]
# Reset the number of heartbeats that we've missed from this plasma
# manager.
self.live_plasma_managers[db_client_id] = 0
def process_messages(self):
"""Process all messages ready in the subscription channels.
This reads messages from the subscription channels and calls the
appropriate handlers until there are no messages left.
"""
while True:
message = self.subscribe_client.get_message()
if message is None:
return
# Parse the message.
channel = message["channel"]
data = message["data"]
# Determine the appropriate message handler.
message_handler = None
if not self.subscribed[channel]:
# If the data was an integer, then the message was a response to an
# initial subscription request.
is_subscribe = int(data)
message_handler = self.subscribe_handler
elif channel == PLASMA_MANAGER_HEARTBEAT_CHANNEL:
assert(self.subscribed[channel])
# The message was a heartbeat from a plasma manager.
message_handler = self.plasma_manager_heartbeat_handler
elif channel == DB_CLIENT_TABLE_NAME:
assert(self.subscribed[channel])
# The message was a notification from the db_client table.
message_handler = self.db_client_notification_handler
# Call the handler.
assert(message_handler is not None)
message_handler(channel, data)
def run(self):
"""Run the monitor.
@@ -149,39 +249,64 @@ class Monitor(object):
clients and cleaning up state accordingly.
"""
# Initialize the subscription channel.
self.subscribe()
self.subscribe(DB_CLIENT_TABLE_NAME)
self.subscribe(PLASMA_MANAGER_HEARTBEAT_CHANNEL)
# Scan the database table and clean up any state associated with clients
# not in the database table. NOTE: This must be called before reading any
# messages from the subscription channel. This ensures that we start in a
# consistent state, since we may have missed notifications that were sent
# before we connected to the subscription channel.
# Scan the database table for dead database clients. NOTE: This must be
# called before reading any messages from the subscription channel. This
# ensures that we start in a consistent state, since we may have missed
# notifications that were sent before we connected to the subscription
# channel.
self.scan_db_client_table()
self.cleanup_task_table()
log.debug("Scanned schedulers: {}".format(self.local_schedulers))
# If there were any dead clients at startup, clean up the associated state
# in the state tables.
if len(self.dead_local_schedulers) > 0:
self.cleanup_task_table()
if len(self.dead_plasma_managers) > 0:
self.cleanup_object_table()
log.debug("{} dead local schedulers, {} plasma "
"managers total, {} dead plasma managers".format(
len(self.dead_local_schedulers),
len(self.live_plasma_managers) + len(self.dead_plasma_managers),
len(self.dead_plasma_managers)
))
# Read messages from the subscription channel.
# Handle messages from the subscription channels.
while True:
time.sleep(LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS / 1000)
client = self.read_message()
# There was no message to be read.
if client is None:
continue
# Record how many dead local schedulers and plasma managers we had at the
# beginning of this round.
num_dead_local_schedulers = len(self.dead_local_schedulers)
num_dead_plasma_managers = len(self.dead_plasma_managers)
# Process a round of messages.
self.process_messages()
# If any new local schedulers or plasma managers were marked as dead in
# this round, clean up the associated state.
if len(self.dead_local_schedulers) > num_dead_local_schedulers:
self.cleanup_task_table()
if len(self.dead_plasma_managers) > num_dead_plasma_managers:
self.cleanup_object_table()
db_client_id, client_type, auxiliary_address, is_insertion = client
# Handle plasma managers that timed out during this round.
plasma_manager_ids = list(self.live_plasma_managers.keys())
for plasma_manager_id in plasma_manager_ids:
if self.live_plasma_managers[plasma_manager_id] >= NUM_HEARTBEATS_TIMEOUT:
log.warn("Timed out {}".format(PLASMA_MANAGER_CLIENT_TYPE))
# Remove the plasma manager from the managers whose heartbeats we're
# tracking.
del self.live_plasma_managers[plasma_manager_id]
# Remove the plasma manager from the db_client table. The
# corresponding state in the object table will be cleaned up once we
# receive the notification for this db_client deletion.
self.redis.execute_command("RAY.DISCONNECT", plasma_manager_id)
# If the update was an insertion, record the client ID.
if is_insertion:
self.local_schedulers.add(db_client_id)
log.debug("Added scheduler: {}".format(db_client_id))
continue
# Increment the number of heartbeats that we've missed from each plasma manager.
for plasma_manager_id in self.live_plasma_managers:
self.live_plasma_managers[plasma_manager_id] += 1
# Wait for a heartbeat interval before processing the next round of
# messages.
time.sleep(HEARTBEAT_TIMEOUT_MILLISECONDS * 1e-3)
# If the update was a deletion, clean up global state.
if client_type == LOCAL_SCHEDULER_CLIENT_TYPE:
if db_client_id in self.local_schedulers:
log.warn("Removed scheduler: {}".format(db_client_id))
self.local_schedulers.remove(db_client_id)
self.cleanup_task_table()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=("Parse Redis server for the "
+27 -6
View File
@@ -699,11 +699,21 @@ class TestPlasmaManager(unittest.TestCase):
self.assertEqual(set(waiting), set(object_ids_perm[(i + 1):]))
def test_transfer(self):
num_attempts = 100
for _ in range(100):
# Create an object.
object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
# Transfer the buffer to the the other PlasmaStore.
self.client1.transfer("127.0.0.1", self.port2, object_id1)
# Transfer the buffer to the the other Plasma store. There is a race
# condition on the create and transfer of the object, so keep trying
# until the object appears on the second Plasma store.
for i in range(num_attempts):
self.client1.transfer("127.0.0.1", self.port2, object_id1)
buff = self.client2.get([object_id1], timeout_ms=100)[0]
if buff is not None:
break
self.assertNotEqual(buff, None)
del buff
# Compare the two buffers.
assert_get_object_equal(self, self.client1, self.client2, object_id1,
memory_buffer=memory_buffer1, metadata=metadata1)
@@ -715,8 +725,17 @@ class TestPlasmaManager(unittest.TestCase):
# Create an object.
object_id2, memory_buffer2, metadata2 = create_object(self.client2, 20000, 20000)
# Transfer the buffer to the the other PlasmaStore.
self.client2.transfer("127.0.0.1", self.port1, object_id2)
# Transfer the buffer to the the other Plasma store. There is a race
# condition on the create and transfer of the object, so keep trying
# until the object appears on the second Plasma store.
for i in range(num_attempts):
self.client2.transfer("127.0.0.1", self.port1, object_id2)
buff = self.client1.get([object_id2], timeout_ms=100)[0]
if buff is not None:
break
self.assertNotEqual(buff, None)
del buff
# Compare the two buffers.
assert_get_object_equal(self, self.client1, self.client2, object_id2,
memory_buffer=memory_buffer2, metadata=metadata2)
@@ -761,7 +780,9 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
# Store the processes that will be explicitly killed during tearDown so
# that a test case can remove ones that will be killed during the test.
self.processes_to_kill = [self.p2, self.p3]
# NOTE: The plasma managers must be killed before the plasma store since
# plasma store death will bring down the managers.
self.processes_to_kill = [self.p3, self.p2]
def tearDown(self):
# Check that the processes are still alive.
@@ -798,7 +819,7 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
# Start a second plasma manager attached to the same store.
manager_name, self.p5, self.port2 = plasma.start_plasma_manager(self.store_name, self.redis_address, use_valgrind=USE_VALGRIND)
self.processes_to_kill.append(self.p5)
self.processes_to_kill = [self.p5] + self.processes_to_kill
# Check that the second manager knows about existing objects.
client2 = plasma.PlasmaClient(self.store_name, manager_name)
+8 -4
View File
@@ -1238,10 +1238,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
redis_ip_address, redis_port = info["redis_address"].split(":")
worker.redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port))
worker.lock = threading.Lock()
# Create an object store client.
worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id)
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
# The concept of a driver is the same as the concept of a "job". Register
@@ -1255,6 +1251,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
"local_scheduler_socket": info["local_scheduler_socket_name"]}
driver_info["name"] = main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE"
worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info)
is_worker = False
elif mode == WORKER_MODE:
# Register the worker with Redis.
worker.redis_client.hmset(b"Workers:" + worker.worker_id,
@@ -1262,8 +1259,15 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info["manager_socket_name"],
"local_scheduler_socket": info["local_scheduler_socket_name"]})
is_worker = True
else:
raise Exception("This code should be unreachable.")
# Create an object store client.
worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id, is_worker)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
if mode in [SCRIPT_MODE, SILENT_MODE]: