Stopped nodes can rejoin immediately (#428)

* Ignore deleted clients when reading address info from Redis

* Remove self from db_client table when exiting cleanly

* Fix valgrind test

* Do not call plasma_perform_release when disconnecting
This commit is contained in:
Stephanie Wang
2017-04-05 23:50:38 -07:00
committed by Robert Nishihara
parent 4043769ba2
commit 93679df724
6 changed files with 93 additions and 45 deletions
+3
View File
@@ -199,6 +199,9 @@ class Monitor(object):
elif client_type == PLASMA_MANAGER_CLIENT_TYPE:
if db_client_id not in self.dead_plasma_managers:
self.dead_plasma_managers.add(db_client_id)
# Stop tracking this plasma manager's heartbeats, since it's
# already dead.
del self.live_plasma_managers[db_client_id]
def plasma_manager_heartbeat_handler(self, channel, data):
"""Handle a plasma manager heartbeat from Redis.
+9 -2
View File
@@ -820,11 +820,18 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address):
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to clients on the same node and do some basic checking.
# Filter to live clients on the same node and do some basic checking.
plasma_managers = []
local_schedulers = []
for key in client_keys:
info = redis_client.hgetall(key)
# Ignore clients that were deleted.
deleted = info[b"deleted"]
deleted = bool(int(deleted))
if deleted:
continue
assert b"ray_client_id" in info
assert b"node_ip_address" in info
assert b"client_type" in info
@@ -833,7 +840,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address):
plasma_managers.append(info)
elif info[b"client_type"].decode("ascii") == "local_scheduler":
local_schedulers.append(info)
# Make sure that we got at one plasma manager and local scheduler.
# Make sure that we got at least one plasma manager and local scheduler.
assert len(plasma_managers) >= 1
assert len(local_schedulers) >= 1
# Build the address information.