Revert "[Autoscaler] Monitor refactor for backward compatability. (#13970)" (#14046)

This reverts commit 7a6f8054d1.
This commit is contained in:
architkulkarni
2021-02-10 12:16:52 -08:00
committed by GitHub
parent 68e985ddcd
commit 6f9d39fb3e
5 changed files with 195 additions and 236 deletions
+52 -23
View File
@@ -8,8 +8,6 @@ import time
import traceback
import json
import grpc
import ray
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
@@ -19,10 +17,11 @@ from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
import ray.gcs_utils
import ray.utils
import ray.ray_constants as ray_constants
from ray.ray_logging import setup_component_logger
from ray._raylet import GlobalStateAccessor
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized, _internal_kv_get
@@ -91,17 +90,16 @@ class Monitor:
redis_address, redis_password=redis_password)
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)
# Initialize the gcs stub for getting all node resource usage.
gcs_address = self.redis.get("GcsServerAddress").decode("utf-8")
gcs_channel = grpc.insecure_channel(gcs_address)
self.gcs_node_resources_stub = \
gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel)
self.global_state_accessor = GlobalStateAccessor(
redis_address, redis_password, False)
self.global_state_accessor.connect()
# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
worker.redis_client = self.redis
worker.mode = 0
# Keep a mapping from raylet client ID to IP address to use
# for updating the load metrics.
self.raylet_id_to_ip_map = {}
head_node_ip = redis_address.split(":")[0]
self.load_metrics = LoadMetrics(local_ip=head_node_ip)
self.last_avail_resources = None
@@ -119,14 +117,19 @@ class Monitor:
logger.info("Monitor: Started")
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
if self.global_state_accessor is not None:
self.global_state_accessor.disconnect()
self.global_state_accessor = None
def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""
request = gcs_service_pb2.GetAllResourceUsageRequest()
response = self.gcs_node_resources_stub.GetAllResourceUsage(
request, timeout=3)
resources_batch_data = response.resource_usage_data
all_resources = self.global_state_accessor.get_all_resource_usage()
resources_batch_data = \
ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources)
for resource_message in resources_batch_data.batch:
resource_load = dict(resource_message.resource_load)
total_resources = dict(resource_message.resources_total)
@@ -138,10 +141,17 @@ class Monitor:
pending_placement_groups = list(
resources_batch_data.placement_group_load.placement_group_data)
ip = resource_message.node_manager_address
self.load_metrics.update(
ip, total_resources, available_resources, resource_load,
waiting_bundles, infeasible_bundles, pending_placement_groups)
# Update the load metrics for this raylet.
node_id = ray.utils.binary_to_hex(resource_message.node_id)
ip = self.raylet_id_to_ip_map.get(node_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources, resource_load,
waiting_bundles, infeasible_bundles,
pending_placement_groups)
else:
logger.warning(
f"Monitor: could not find ip for node {node_id}")
def update_resource_requests(self):
"""Fetches resource requests from the internal KV and updates load."""
@@ -156,10 +166,29 @@ class Monitor:
except Exception:
logger.exception("Error parsing resource requests")
def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.
Args:
_append_port (bool): Defaults to False. Appending the port is
useful in testing, as mock clusters have many nodes with
the same IP and cannot be uniquely identified.
"""
all_raylet_nodes = ray.nodes()
self.raylet_id_to_ip_map = {}
for raylet_info in all_raylet_nodes:
node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"])
ip_address = (raylet_info.get("AuxAddress")
or raylet_info["NodeManagerAddress"]).split(":")[0]
if _append_port:
ip_address += ":" + str(raylet_info["NodeManagerPort"])
self.raylet_id_to_ip_map[node_id] = ip_address
def _run(self):
"""Run the monitor loop."""
while True:
self.update_raylet_map()
self.update_load_metrics()
self.update_resource_requests()
self.update_event_summary()
@@ -335,9 +364,9 @@ if __name__ == "__main__":
# Something went wrong, so push an error to all drivers.
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The monitor failed with the "
f"following error:\n{traceback.format_exc()}")
from ray.utils import push_error_to_driver_through_redis
push_error_to_driver_through_redis(
f"following error:\n{traceback_str}")
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.MONITOR_DIED_ERROR, message)
raise e
+3 -48
View File
@@ -4,7 +4,6 @@ import time
import ray
import ray.ray_constants as ray_constants
from ray.util.placement_group import placement_group, remove_placement_group
from ray.autoscaler.sdk import request_resources
from ray.monitor import Monitor
from ray.cluster_utils import Cluster
@@ -69,45 +68,16 @@ def test_system_config(ray_start_cluster_head):
def setup_monitor(address):
monitor = Monitor(
address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD)
monitor.update_raylet_map(_append_port=True)
return monitor
def assert_correct_pg(pg_response_data, pg_demands, strategy):
assert len(pg_response_data) == 1
pg_response_data = pg_response_data[0]
strategy_mapping_dict_protobuf = {
"PACK": 0,
"SPREAD": 1,
"STRICT_PACK": 2,
"STRICT_SPREAD": 3
}
assert pg_response_data.strategy == strategy_mapping_dict_protobuf[
strategy]
assert pg_response_data.creator_job_id
assert pg_response_data.creator_actor_id
assert pg_response_data.creator_actor_dead
assert pg_response_data.placement_group_id
for i, bundle in enumerate(pg_demands):
assert pg_response_data.bundles[i].unit_resources == bundle
assert pg_response_data.bundles[i].bundle_id.placement_group_id
# DO NOT CHANGE THIS VERIFICATION WITHOUT NOTIFYING (Eric/Ameer/Alex).
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
request_resources(num_cpus=42)
# add placement groups.
pg_demands = [{"GPU": 2}, {"extra_resource": 2}]
strategy = "STRICT_PACK"
pg = placement_group(pg_demands, strategy=strategy)
pg.ready()
time.sleep(2) # wait for placemnt groups to propogate.
# Disable event clearing for test.
monitor.event_summarizer.clear = lambda *a: None
visited_atleast_once = [set(), set()]
while True:
monitor.update_load_metrics()
monitor.update_resource_requests()
@@ -118,29 +88,21 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
req = monitor.load_metrics.resource_requests
assert req == [{"CPU": 1}] * 42, req
pg_response_data = monitor.load_metrics.pending_placement_groups
assert_correct_pg(pg_response_data, pg_demands, strategy)
if "memory" in resource_usage[0]:
del resource_usage[0]["memory"]
visited_atleast_once[0].add("memory")
if "object_store_memory" in resource_usage[0]:
if "object_store_memory" in resource_usage[1]:
del resource_usage[0]["object_store_memory"]
visited_atleast_once[0].add("object_store_memory")
if "memory" in resource_usage[1]:
del resource_usage[1]["memory"]
visited_atleast_once[1].add("memory")
if "object_store_memory" in resource_usage[1]:
del resource_usage[1]["object_store_memory"]
visited_atleast_once[1].add("object_store_memory")
for key in list(resource_usage[0].keys()):
if key.startswith("node:"):
del resource_usage[0][key]
visited_atleast_once[0].add("node:")
for key in list(resource_usage[1].keys()):
if key.startswith("node:"):
del resource_usage[1][key]
visited_atleast_once[1].add("node:")
if expected_resource_usage is None:
if all(x for x in resource_usage[0:]):
break
@@ -158,13 +120,6 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
# Sanity check we emitted a resize event.
assert any("Resized to" in x for x in monitor.event_summarizer.summary())
assert visited_atleast_once[0] == {
"memory", "object_store_memory", "node:"
}
assert visited_atleast_once[0] == visited_atleast_once[1]
remove_placement_group(pg)
return resource_usage