[autoscaler] Change the get behavior of node providers' _get_node (#4132)

* Change the get behavior of GCPNodeProvider._get_node

* Add lock around the GCPNodeProvider._get_node call

* rename nodes

* lint

* Update GCPNodeProvider._get_node to match aws implementation

* assert

* log

* log highest heartbeats

* rename

* bringup to connected

* prune heartbeat times

* fix bringup
This commit is contained in:
Kristian Hartikainen
2019-02-24 18:43:35 -08:00
committed by Eric Liang
parent d9da183c7d
commit 524e69a82d
8 changed files with 82 additions and 53 deletions
+27 -11
View File
@@ -155,6 +155,8 @@ class LoadMetrics(object):
self.last_heartbeat_time_by_ip[ip] = now
def mark_active(self, ip):
assert ip is not None, "IP should be known at this time"
logger.info("Node {} is newly setup, treating as active".format(ip))
self.last_heartbeat_time_by_ip[ip] = time.time()
def prune_active_ips(self, active_ips):
@@ -177,10 +179,14 @@ class LoadMetrics(object):
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.last_heartbeat_time_by_ip)
def approx_workers_used(self):
return self._info()["NumNodesUsed"]
def num_workers_connected(self):
return self._info()["NumNodesConnected"]
def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
@@ -210,6 +216,13 @@ class LoadMetrics(object):
heartbeat_times = [
now - t for t in self.last_heartbeat_time_by_ip.values()
]
most_delayed_heartbeats = sorted(
list(self.last_heartbeat_time_by_ip.items()),
key=lambda pair: pair[1])[:5]
most_delayed_heartbeats = {
ip: (now - t)
for ip, t in most_delayed_heartbeats
}
return {
"ResourceUsage": ", ".join([
"{}/{} {}".format(
@@ -227,6 +240,7 @@ class LoadMetrics(object):
int(np.min(heartbeat_times)) if heartbeat_times else -1,
int(np.mean(heartbeat_times)) if heartbeat_times else -1,
int(np.max(heartbeat_times)) if heartbeat_times else -1),
"MostDelayedHeartbeats": most_delayed_heartbeats,
}
@@ -239,7 +253,7 @@ class NodeLauncher(threading.Thread):
def _launch_node(self, config, count):
tag_filters = {TAG_RAY_NODE_TYPE: "worker"}
before = self.provider.nodes(tag_filters=tag_filters)
before = self.provider.non_terminated_nodes(tag_filters=tag_filters)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.provider.create_node(
config["worker_nodes"], {
@@ -249,7 +263,7 @@ class NodeLauncher(threading.Thread):
TAG_RAY_NODE_STATUS: "uninitialized",
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}, count)
after = self.provider.nodes(tag_filters=tag_filters)
after = self.provider.non_terminated_nodes(tag_filters=tag_filters)
if set(after).issubset(before):
logger.error("NodeLauncher: "
"No new nodes reported after node creation")
@@ -430,7 +444,8 @@ class StandardAutoscaler(object):
self.launch_new_node(num_launches)
nodes = self.workers()
self.log_info_string(nodes)
else:
elif self.load_metrics.num_workers_connected() >= target_workers:
logger.info("Ending bringup phase")
self.bringup = False
# Process any completed updates
@@ -492,15 +507,15 @@ class StandardAutoscaler(object):
"Error parsing config.")
def target_num_workers(self):
initial_workers = self.config["initial_workers"]
target_frac = self.config["target_utilization_fraction"]
cur_used = self.load_metrics.approx_workers_used()
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
if self.bringup:
ideal_num_workers = initial_workers
else:
target_frac = self.config["target_utilization_fraction"]
cur_used = self.load_metrics.approx_workers_used()
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
ideal_num_workers = max(ideal_num_workers,
self.config["initial_workers"])
return min(self.config["max_workers"],
max(self.config["min_workers"], ideal_num_workers))
@@ -603,7 +618,8 @@ class StandardAutoscaler(object):
self.launch_queue.put((config, count))
def workers(self):
return self.provider.nodes(tag_filters={TAG_RAY_NODE_TYPE: "worker"})
return self.provider.non_terminated_nodes(
tag_filters={TAG_RAY_NODE_TYPE: "worker"})
def log_info_string(self, nodes):
logger.info("StandardAutoscaler: {}".format(self.info_string(nodes)))
+9 -5
View File
@@ -93,7 +93,7 @@ class AWSNodeProvider(NodeProvider):
if self.tag_cache_kill_event.is_set():
return
def nodes(self, tag_filters):
def non_terminated_nodes(self, tag_filters):
# Note that these filters are acceptable because they are set on
# node initialization, and so can never be sitting in the cache.
tag_filters = to_aws_format(tag_filters)
@@ -230,12 +230,16 @@ class AWSNodeProvider(NodeProvider):
def _get_node(self, node_id):
"""Refresh and get info for this node, updating the cache."""
self.nodes({}) # Side effect: fetches and caches the node.
self.non_terminated_nodes({}) # Side effect: updates cache
assert node_id in self.cached_nodes, "Invalid instance id {}".format(
node_id)
if node_id in self.cached_nodes:
return self.cached_nodes[node_id]
return self.cached_nodes[node_id]
# Node not in {pending, running} -- retry with a point query. This
# usually means the node was recently preempted or terminated.
matches = list(self.ec2.instances.filter(InstanceIds=[node_id]))
assert len(matches) == 1, "Invalid instance id {}".format(node_id)
return matches[0]
def _get_cached_node(self, node_id):
"""Return node info from cache if possible, otherwise fetches it."""
+8 -8
View File
@@ -90,13 +90,13 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
A = []
else:
A = [
node_id for node_id in provider.nodes({
node_id for node_id in provider.non_terminated_nodes({
TAG_RAY_NODE_TYPE: "head"
})
]
A += [
node_id for node_id in provider.nodes({
node_id for node_id in provider.non_terminated_nodes({
TAG_RAY_NODE_TYPE: "worker"
})
]
@@ -128,7 +128,7 @@ def kill_node(config_file, yes, override_cluster_name):
provider = get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
nodes = provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
node = random.choice(nodes)
logger.info("kill_node: Terminating worker {}".format(node))
@@ -165,7 +165,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
head_node_tags = {
TAG_RAY_NODE_TYPE: "head",
}
nodes = provider.nodes(head_node_tags)
nodes = provider.non_terminated_nodes(head_node_tags)
if len(nodes) > 0:
head_node = nodes[0]
else:
@@ -192,7 +192,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
config["cluster_name"])
provider.create_node(config["head_node"], head_node_tags, 1)
nodes = provider.nodes(head_node_tags)
nodes = provider.non_terminated_nodes(head_node_tags)
assert len(nodes) == 1, "Failed to create head node."
head_node = nodes[0]
@@ -248,7 +248,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
updater.join()
# Refresh the node cache so we see the external ip if available
provider.nodes(head_node_tags)
provider.non_terminated_nodes(head_node_tags)
if config.get("provider", {}).get("use_internal_ips", False) is True:
head_node_ip = provider.internal_ip(head_node)
@@ -484,7 +484,7 @@ def get_worker_node_ips(config_file, override_cluster_name):
provider = get_node_provider(config["provider"], config["cluster_name"])
try:
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
nodes = provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
if config.get("provider", {}).get("use_internal_ips", False) is True:
return [provider.internal_ip(node) for node in nodes]
@@ -503,7 +503,7 @@ def _get_head_node(config,
head_node_tags = {
TAG_RAY_NODE_TYPE: "head",
}
nodes = provider.nodes(head_node_tags)
nodes = provider.non_terminated_nodes(head_node_tags)
finally:
provider.cleanup()
+12 -5
View File
@@ -53,7 +53,7 @@ class GCPNodeProvider(NodeProvider):
# excessive DescribeInstances requests.
self.cached_nodes = {}
def nodes(self, tag_filters):
def non_terminated_nodes(self, tag_filters):
with self.lock:
if tag_filters:
label_filter_expr = "(" + " AND ".join([
@@ -223,12 +223,19 @@ class GCPNodeProvider(NodeProvider):
return result
def _get_node(self, node_id):
self.nodes({}) # Side effect: fetches and caches the node.
self.non_terminated_nodes({}) # Side effect: updates cache
assert node_id in self.cached_nodes, "Invalid instance id {}".format(
node_id)
with self.lock:
if node_id in self.cached_nodes:
return self.cached_nodes[node_id]
return self.cached_nodes[node_id]
instance = self.compute.instances().get(
project=self.provider_config["project_id"],
zone=self.provider_config["availability_zone"],
instance=node_id,
).execute()
return instance
def _get_cached_node(self, node_id):
if node_id in self.cached_nodes:
+1 -1
View File
@@ -85,7 +85,7 @@ class LocalNodeProvider(NodeProvider):
"/tmp/cluster-{}.state".format(cluster_name),
provider_config)
def nodes(self, tag_filters):
def non_terminated_nodes(self, tag_filters):
workers = self.state.get()
matching_ips = []
for worker_ip, info in workers.items():
+2 -2
View File
@@ -132,7 +132,7 @@ class NodeProvider(object):
self.provider_config = provider_config
self.cluster_name = cluster_name
def nodes(self, tag_filters):
def non_terminated_nodes(self, tag_filters):
"""Return a list of node ids filtered by the specified tags dict.
This list must not include terminated nodes. For performance reasons,
@@ -141,7 +141,7 @@ class NodeProvider(object):
nodes() must be called again to refresh results.
Examples:
>>> provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
>>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
["node-1", "node-2"]
"""
raise NotImplementedError
+21 -21
View File
@@ -56,7 +56,7 @@ class MockProvider(NodeProvider):
self.ready_to_create = threading.Event()
self.ready_to_create.set()
def nodes(self, tag_filters):
def non_terminated_nodes(self, tag_filters):
if self.throw:
raise Exception("oops")
return [
@@ -193,7 +193,7 @@ class AutoscalingTest(unittest.TestCase):
def waitForNodes(self, expected, comparison=None, tag_filters={}):
MAX_ITER = 50
for i in range(MAX_ITER):
n = len(self.provider.nodes(tag_filters))
n = len(self.provider.non_terminated_nodes(tag_filters))
if comparison is None:
comparison = self.assertEqual
try:
@@ -260,7 +260,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(2)
autoscaler.update()
@@ -347,13 +347,13 @@ class AutoscalingTest(unittest.TestCase):
max_concurrent_launches=5,
max_failures=0,
update_interval_s=0)
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# Update will try to create, but will block until we set the flag
self.provider.ready_to_create.clear()
autoscaler.update()
assert autoscaler.num_launches_pending.value == 2
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# Set the flag, check it updates
self.provider.ready_to_create.set()
@@ -365,7 +365,7 @@ class AutoscalingTest(unittest.TestCase):
new_config["max_workers"] = 1
self.write_config(new_config)
autoscaler.update()
assert len(self.provider.nodes({})) == 1
assert len(self.provider.non_terminated_nodes({})) == 1
def testDelayedLaunchWithFailure(self):
config = SMALL_CLUSTER.copy()
@@ -380,7 +380,7 @@ class AutoscalingTest(unittest.TestCase):
max_concurrent_launches=8,
max_failures=0,
update_interval_s=0)
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# update() should launch a wave of 5 nodes (max_launch_batch)
# Force this first wave to block.
@@ -394,7 +394,7 @@ class AutoscalingTest(unittest.TestCase):
waiters = rtc1._Event__cond._Condition__waiters
self.waitFor(lambda: len(waiters) == 1)
assert autoscaler.num_launches_pending.value == 5
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# Call update() to launch a second wave of 3 nodes,
# as 5 + 3 = 8 = max_concurrent_launches.
@@ -410,7 +410,7 @@ class AutoscalingTest(unittest.TestCase):
self.provider.fail_creates = True
rtc1.set()
self.waitFor(lambda: autoscaler.num_launches_pending.value == 0)
assert len(self.provider.nodes({})) == 3
assert len(self.provider.non_terminated_nodes({})) == 3
# Retry the first wave, allowing it to succeed this time
self.provider.fail_creates = False
@@ -443,7 +443,7 @@ class AutoscalingTest(unittest.TestCase):
# not updated yet
# note that node termination happens in the main thread, so
# we do not need to add any delay here before checking
assert len(self.provider.nodes({})) == 2
assert len(self.provider.non_terminated_nodes({})) == 2
assert autoscaler.num_launches_pending.value == 0
def testLaunchConfigChange(self):
@@ -484,7 +484,7 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
time.sleep(0.1)
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 2
assert len(self.provider.non_terminated_nodes({})) == 2
# New a good config again
new_config = SMALL_CLUSTER.copy()
@@ -515,7 +515,7 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "terminated"
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(2)
@@ -591,12 +591,12 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, lm, max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
self.waitForNodes(1)
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 1
assert len(self.provider.non_terminated_nodes({})) == 1
# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
@@ -613,19 +613,19 @@ class AutoscalingTest(unittest.TestCase):
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 5
assert len(self.provider.non_terminated_nodes({})) == 5
# Scales down as nodes become unused
lm.last_used_time_by_ip["172.0.0.0"] = 0
lm.last_used_time_by_ip["172.0.0.1"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 3
assert len(self.provider.non_terminated_nodes({})) == 3
lm.last_used_time_by_ip["172.0.0.2"] = 0
lm.last_used_time_by_ip["172.0.0.3"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 1
assert len(self.provider.non_terminated_nodes({})) == 1
def testDontScaleBelowTarget(self):
config = SMALL_CLUSTER.copy()
@@ -637,10 +637,10 @@ class AutoscalingTest(unittest.TestCase):
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, lm, max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
@@ -654,12 +654,12 @@ class AutoscalingTest(unittest.TestCase):
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
assert len(self.provider.nodes({})) == 1
assert len(self.provider.non_terminated_nodes({})) == 1
# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
autoscaler.update()
assert len(self.provider.nodes({})) == 0
assert len(self.provider.non_terminated_nodes({})) == 0
@flaky(max_runs=4)
def testRecoverUnhealthyWorkers(self):