From 6f9d39fb3ee94510dc29bef2c4de855412a5575b Mon Sep 17 00:00:00 2001 From: architkulkarni Date: Wed, 10 Feb 2021 12:16:52 -0800 Subject: [PATCH] Revert "[Autoscaler] Monitor refactor for backward compatability. (#13970)" (#14046) This reverts commit 7a6f8054d1fdf5a29907cad480e581cd5c864ea3. --- python/ray/monitor.py | 75 +++++++--- python/ray/tests/test_multi_node_2.py | 51 +------ src/ray/protobuf/common.proto | 34 ++--- src/ray/protobuf/gcs.proto | 203 ++++++++++++-------------- src/ray/protobuf/gcs_service.proto | 68 ++++----- 5 files changed, 195 insertions(+), 236 deletions(-) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 72de4e870..fe1edad63 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -8,8 +8,6 @@ import time import traceback import json -import grpc - import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster @@ -19,10 +17,11 @@ from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS - -from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc +import ray.gcs_utils +import ray.utils import ray.ray_constants as ray_constants from ray.ray_logging import setup_component_logger +from ray._raylet import GlobalStateAccessor from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized, _internal_kv_get @@ -91,17 +90,16 @@ class Monitor: redis_address, redis_password=redis_password) self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) - - # Initialize the gcs stub for getting all node resource usage. - gcs_address = self.redis.get("GcsServerAddress").decode("utf-8") - gcs_channel = grpc.insecure_channel(gcs_address) - self.gcs_node_resources_stub = \ - gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel) - + self.global_state_accessor = GlobalStateAccessor( + redis_address, redis_password, False) + self.global_state_accessor.connect() # Set the redis client and mode so _internal_kv works for autoscaler. worker = ray.worker.global_worker worker.redis_client = self.redis worker.mode = 0 + # Keep a mapping from raylet client ID to IP address to use + # for updating the load metrics. + self.raylet_id_to_ip_map = {} head_node_ip = redis_address.split(":")[0] self.load_metrics = LoadMetrics(local_ip=head_node_ip) self.last_avail_resources = None @@ -119,14 +117,19 @@ class Monitor: logger.info("Monitor: Started") + def __del__(self): + """Destruct the monitor object.""" + # We close the pubsub client to avoid leaking file descriptors. + if self.global_state_accessor is not None: + self.global_state_accessor.disconnect() + self.global_state_accessor = None + def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" - request = gcs_service_pb2.GetAllResourceUsageRequest() - response = self.gcs_node_resources_stub.GetAllResourceUsage( - request, timeout=3) - resources_batch_data = response.resource_usage_data - + all_resources = self.global_state_accessor.get_all_resource_usage() + resources_batch_data = \ + ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources) for resource_message in resources_batch_data.batch: resource_load = dict(resource_message.resource_load) total_resources = dict(resource_message.resources_total) @@ -138,10 +141,17 @@ class Monitor: pending_placement_groups = list( resources_batch_data.placement_group_load.placement_group_data) - ip = resource_message.node_manager_address - self.load_metrics.update( - ip, total_resources, available_resources, resource_load, - waiting_bundles, infeasible_bundles, pending_placement_groups) + # Update the load metrics for this raylet. + node_id = ray.utils.binary_to_hex(resource_message.node_id) + ip = self.raylet_id_to_ip_map.get(node_id) + if ip: + self.load_metrics.update(ip, total_resources, + available_resources, resource_load, + waiting_bundles, infeasible_bundles, + pending_placement_groups) + else: + logger.warning( + f"Monitor: could not find ip for node {node_id}") def update_resource_requests(self): """Fetches resource requests from the internal KV and updates load.""" @@ -156,10 +166,29 @@ class Monitor: except Exception: logger.exception("Error parsing resource requests") + def update_raylet_map(self, _append_port=False): + """Updates internal raylet map. + + Args: + _append_port (bool): Defaults to False. Appending the port is + useful in testing, as mock clusters have many nodes with + the same IP and cannot be uniquely identified. + """ + all_raylet_nodes = ray.nodes() + self.raylet_id_to_ip_map = {} + for raylet_info in all_raylet_nodes: + node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"]) + ip_address = (raylet_info.get("AuxAddress") + or raylet_info["NodeManagerAddress"]).split(":")[0] + if _append_port: + ip_address += ":" + str(raylet_info["NodeManagerPort"]) + self.raylet_id_to_ip_map[node_id] = ip_address + def _run(self): """Run the monitor loop.""" while True: + self.update_raylet_map() self.update_load_metrics() self.update_resource_requests() self.update_event_summary() @@ -335,9 +364,9 @@ if __name__ == "__main__": # Something went wrong, so push an error to all drivers. redis_client = ray._private.services.create_redis_client( args.redis_address, password=args.redis_password) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = ("The monitor failed with the " - f"following error:\n{traceback.format_exc()}") - from ray.utils import push_error_to_driver_through_redis - push_error_to_driver_through_redis( + f"following error:\n{traceback_str}") + ray.utils.push_error_to_driver_through_redis( redis_client, ray_constants.MONITOR_DIED_ERROR, message) raise e diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 7569dff68..b3e739e64 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -4,7 +4,6 @@ import time import ray import ray.ray_constants as ray_constants -from ray.util.placement_group import placement_group, remove_placement_group from ray.autoscaler.sdk import request_resources from ray.monitor import Monitor from ray.cluster_utils import Cluster @@ -69,45 +68,16 @@ def test_system_config(ray_start_cluster_head): def setup_monitor(address): monitor = Monitor( address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD) + monitor.update_raylet_map(_append_port=True) return monitor -def assert_correct_pg(pg_response_data, pg_demands, strategy): - assert len(pg_response_data) == 1 - pg_response_data = pg_response_data[0] - strategy_mapping_dict_protobuf = { - "PACK": 0, - "SPREAD": 1, - "STRICT_PACK": 2, - "STRICT_SPREAD": 3 - } - assert pg_response_data.strategy == strategy_mapping_dict_protobuf[ - strategy] - assert pg_response_data.creator_job_id - assert pg_response_data.creator_actor_id - assert pg_response_data.creator_actor_dead - assert pg_response_data.placement_group_id - - for i, bundle in enumerate(pg_demands): - assert pg_response_data.bundles[i].unit_resources == bundle - assert pg_response_data.bundles[i].bundle_id.placement_group_id - - -# DO NOT CHANGE THIS VERIFICATION WITHOUT NOTIFYING (Eric/Ameer/Alex). def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): request_resources(num_cpus=42) - # add placement groups. - pg_demands = [{"GPU": 2}, {"extra_resource": 2}] - strategy = "STRICT_PACK" - pg = placement_group(pg_demands, strategy=strategy) - pg.ready() - time.sleep(2) # wait for placemnt groups to propogate. - # Disable event clearing for test. monitor.event_summarizer.clear = lambda *a: None - visited_atleast_once = [set(), set()] while True: monitor.update_load_metrics() monitor.update_resource_requests() @@ -118,29 +88,21 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): req = monitor.load_metrics.resource_requests assert req == [{"CPU": 1}] * 42, req - pg_response_data = monitor.load_metrics.pending_placement_groups - assert_correct_pg(pg_response_data, pg_demands, strategy) - if "memory" in resource_usage[0]: del resource_usage[0]["memory"] - visited_atleast_once[0].add("memory") - if "object_store_memory" in resource_usage[0]: + if "object_store_memory" in resource_usage[1]: del resource_usage[0]["object_store_memory"] - visited_atleast_once[0].add("object_store_memory") if "memory" in resource_usage[1]: del resource_usage[1]["memory"] - visited_atleast_once[1].add("memory") if "object_store_memory" in resource_usage[1]: del resource_usage[1]["object_store_memory"] - visited_atleast_once[1].add("object_store_memory") for key in list(resource_usage[0].keys()): if key.startswith("node:"): del resource_usage[0][key] - visited_atleast_once[0].add("node:") for key in list(resource_usage[1].keys()): if key.startswith("node:"): del resource_usage[1][key] - visited_atleast_once[1].add("node:") + if expected_resource_usage is None: if all(x for x in resource_usage[0:]): break @@ -158,13 +120,6 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): # Sanity check we emitted a resize event. assert any("Resized to" in x for x in monitor.event_summarizer.summary()) - assert visited_atleast_once[0] == { - "memory", "object_store_memory", "node:" - } - assert visited_atleast_once[0] == visited_atleast_once[1] - - remove_placement_group(pg) - return resource_usage diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 7178fe715..844f44bea 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -46,6 +46,19 @@ enum TaskType { DRIVER_TASK = 3; } +// Type of placement group strategy. +enum PlacementStrategy { + // Packs Bundles into as few nodes as possible. + PACK = 0; + // Places Bundles across distinct nodes or processes as even as possible. + SPREAD = 1; + // Packs Bundles within one node. The group is not allowed to span multiple nodes. + STRICT_PACK = 2; + // Places Bundles across distinct nodes. + // The group is not allowed to deploy more than one bundle on a node. + STRICT_SPREAD = 3; +} + // Address of a worker or node manager. message Address { bytes raylet_id = 1; @@ -443,24 +456,3 @@ enum WorkerExitType { // Worker exit due to placement group removal. PLACEMENT_GROUP_REMOVED = 3; } -/////////////////////////////////////////////////////////////////////////////// -/* Please do not modify/remove/change the following enum to maintain -backwards compatibility in autoscaler. This is necessary to make sure we can -run autoscaler with any version of ray. For example, the K8s operator runs -autoscaler in a separate pod, if the user upgrades the ray version on the head -pod autoscaler can crash (if the newer version of ray modified the messages -below). */ - -// Type of placement group strategy. -enum PlacementStrategy { - // Packs Bundles into as few nodes as possible. - PACK = 0; - // Places Bundles across distinct nodes or processes as even as possible. - SPREAD = 1; - // Packs Bundles within one node. The group is not allowed to span multiple nodes. - STRICT_PACK = 2; - // Places Bundles across distinct nodes. - // The group is not allowed to deploy more than one bundle on a node. - STRICT_SPREAD = 3; -} -/////////////////////////////////////////////////////////////////////////////// diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 5da9842f9..a56bffbe1 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -158,6 +158,43 @@ message ErrorTableData { double timestamp = 4; } +message PlacementGroupTableData { + // State of a placement group. + enum PlacementGroupState { + // Placement Group is pending or scheduling + PENDING = 0; + // Placement Group is created. + CREATED = 1; + // Placement Group is already removed and won't be reschedule. + REMOVED = 2; + // Placement Group is rescheduling because the node it placed is dead. + RESCHEDULING = 3; + } + + // ID of the PlacementGroup. + bytes placement_group_id = 1; + // The name of the placement group. + string name = 2; + // The array of the bundle in Placement Group. + repeated Bundle bundles = 3; + // The schedule strategy of this Placement Group. + PlacementStrategy strategy = 4; + // Current state of this placement group. + PlacementGroupState state = 5; + // Fields to detect the owner of the placement group + // for automatic lifecycle management. + // The job id that created this placement group. + bytes creator_job_id = 6; + // The actor id that created this placement group. + bytes creator_actor_id = 7; + // Whether or not if the creator job is dead. + bool creator_job_dead = 8; + // Whether or not if the creator actor is dead. + bool creator_actor_dead = 9; + // Whether the placement group is persistent. + bool is_detached = 10; +} + message ScheduleData { map schedule_plan = 1; } @@ -238,11 +275,71 @@ message GcsNodeInfo { int64 timestamp = 10; } +// Represents the demand for a particular resource shape. +message ResourceDemand { + // The resource shape requested. This is a map from the resource string + // (e.g., "CPU") to the amount requested. + map shape = 1; + // The number of requests that are ready to run (i.e., dependencies have been + // fulfilled), but that are waiting for resources. + uint64 num_ready_requests_queued = 2; + // The number of requests for which there is no node that is a superset of + // the requested resource shape. + uint64 num_infeasible_requests_queued = 3; + // The number of requests of this shape still queued in CoreWorkers that this + // raylet knows about. + int64 backlog_size = 4; +} + +// Represents the demand sorted by resource shape. +message ResourceLoad { + // A list of all resource demands. The resource shape in each demand is + // unique. + repeated ResourceDemand resource_demands = 1; +} + +message PlacementGroupLoad { + // The list of pending placement group specifications. + repeated PlacementGroupTableData placement_group_data = 1; +} + message HeartbeatTableData { // Node id. bytes node_id = 1; } +message ResourcesData { + // Node id. + bytes node_id = 1; + // Resource capacity currently available on this node manager. + map resources_available = 2; + // Indicates whether available resources is changed. Only used when light + // heartbeat enabled. + bool resources_available_changed = 3; + // Total resource capacity configured for this node manager. + map resources_total = 4; + // Aggregate outstanding resource load on this node manager. + map resource_load = 5; + // Indicates whether resource load is changed. Only used when + // light heartbeat enabled. + bool resource_load_changed = 6; + // The resource load on this node, sorted by resource shape. + ResourceLoad resource_load_by_shape = 7; + // Whether this node manager is requesting global GC. + bool should_global_gc = 8; + // IP address of the node. + string node_manager_address = 9; +} + +message ResourceUsageBatchData { + repeated ResourcesData batch = 1; + // The total resource demand on all nodes included in the batch, sorted by + // resource shape. + ResourceLoad resource_load_by_shape = 2; + // The pending list of placement groups. + PlacementGroupLoad placement_group_load = 3; +} + // Data for a lease on task execution. message TaskLeaseData { // The task ID. @@ -356,109 +453,3 @@ message PubSubMessage { bytes id = 1; bytes data = 2; } - -/////////////////////////////////////////////////////////////////////////////// -/* Please do not modify/remove/change the following messages to maintain -backwards compatibility in autoscaler. This is necessary to make sure we can -run autoscaler with any version of ray. For example, the K8s operator runs -autoscaler in a separate pod, if the user upgrades the ray version on the head -pod autoscaler can crash (if the newer version of ray modified the messages -below). */ - -// Represents the demand for a particular resource shape. -message ResourceDemand { - // The resource shape requested. This is a map from the resource string - // (e.g., "CPU") to the amount requested. - map shape = 1; - // The number of requests that are ready to run (i.e., dependencies have been - // fulfilled), but that are waiting for resources. - uint64 num_ready_requests_queued = 2; - // The number of requests for which there is no node that is a superset of - // the requested resource shape. - uint64 num_infeasible_requests_queued = 3; - // The number of requests of this shape still queued in CoreWorkers that this - // raylet knows about. - int64 backlog_size = 4; -} - -// Represents the demand sorted by resource shape. -message ResourceLoad { - // A list of all resource demands. The resource shape in each demand is - // unique. - repeated ResourceDemand resource_demands = 1; -} - -message ResourcesData { - // Node id. - bytes node_id = 1; - // Resource capacity currently available on this node manager. - map resources_available = 2; - // Indicates whether available resources is changed. Only used when light - // heartbeat enabled. - bool resources_available_changed = 3; - // Total resource capacity configured for this node manager. - map resources_total = 4; - // Aggregate outstanding resource load on this node manager. - map resource_load = 5; - // Indicates whether resource load is changed. Only used when - // light heartbeat enabled. - bool resource_load_changed = 6; - // The resource load on this node, sorted by resource shape. - ResourceLoad resource_load_by_shape = 7; - // Whether this node manager is requesting global GC. - bool should_global_gc = 8; - // IP address of the node. - string node_manager_address = 9; -} - -message ResourceUsageBatchData { - repeated ResourcesData batch = 1; - // The total resource demand on all nodes included in the batch, sorted by - // resource shape. - ResourceLoad resource_load_by_shape = 2; - // The pending list of placement groups. - PlacementGroupLoad placement_group_load = 3; -} - -message PlacementGroupLoad { - // The list of pending placement group specifications. - repeated PlacementGroupTableData placement_group_data = 1; -} - -message PlacementGroupTableData { - // State of a placement group. - enum PlacementGroupState { - // Placement Group is pending or scheduling - PENDING = 0; - // Placement Group is created. - CREATED = 1; - // Placement Group is already removed and won't be reschedule. - REMOVED = 2; - // Placement Group is rescheduling because the node it placed is dead. - RESCHEDULING = 3; - } - - // ID of the PlacementGroup. - bytes placement_group_id = 1; - // The name of the placement group. - string name = 2; - // The array of the bundle in Placement Group. - repeated Bundle bundles = 3; - // The schedule strategy of this Placement Group. - PlacementStrategy strategy = 4; - // Current state of this placement group. - PlacementGroupState state = 5; - // Fields to detect the owner of the placement group - // for automatic lifecycle management. - // The job id that created this placement group. - bytes creator_job_id = 6; - // The actor id that created this placement group. - bytes creator_actor_id = 7; - // Whether or not if the creator job is dead. - bool creator_job_dead = 8; - // Whether or not if the creator actor is dead. - bool creator_actor_dead = 9; - // Whether the placement group is persistent. - bool is_detached = 10; -} -/////////////////////////////////////////////////////////////////////////////// diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 41c71c7e0..6e2c450dd 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -19,6 +19,11 @@ package ray.rpc; import "src/ray/protobuf/common.proto"; import "src/ray/protobuf/gcs.proto"; +message GcsStatus { + int32 code = 1; + string message = 2; +} + message AddJobRequest { JobTableData data = 1; } @@ -226,6 +231,31 @@ message ReportResourceUsageReply { GcsStatus status = 1; } +message GetAllResourceUsageRequest { +} + +message GetAllResourceUsageReply { + GcsStatus status = 1; + ResourceUsageBatchData resource_usage_data = 2; +} + +// Service for node resource info access. +service NodeResourceInfoGcsService { + // Get node's resources from GCS Service. + rpc GetResources(GetResourcesRequest) returns (GetResourcesReply); + // Update resources of a node in GCS Service. + rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply); + // Delete resources of a node in GCS Service. + rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply); + // Get available resources of all nodes. + rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) + returns (GetAllAvailableResourcesReply); + // Report resource usage of a node to GCS Service. + rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); + // Get resource usage of all nodes from GCS Service. + rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); +} + // Service for heartbeat info access. service HeartbeatInfoGcsService { // Report heartbeat of a node to GCS Service. @@ -523,41 +553,3 @@ service PlacementGroupInfoGcsService { rpc WaitPlacementGroupUntilReady(WaitPlacementGroupUntilReadyRequest) returns (WaitPlacementGroupUntilReadyReply); } -/////////////////////////////////////////////////////////////////////////////// -/* Please do not modify/remove/change the following messages to maintain -backwards compatibility in autoscaler. This is necessary to make sure we can -run autoscaler with any version of ray. For example, the K8s operator runs -autoscaler in a separate pod, if the user upgrades the ray version on the head -pod autoscaler can crash (if the newer version of ray modified the messages -below). */ - -message GetAllResourceUsageRequest { -} - -message GetAllResourceUsageReply { - GcsStatus status = 1; - ResourceUsageBatchData resource_usage_data = 2; -} - -// Service for node resource info access. -service NodeResourceInfoGcsService { - // Get node's resources from GCS Service. - rpc GetResources(GetResourcesRequest) returns (GetResourcesReply); - // Update resources of a node in GCS Service. - rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply); - // Delete resources of a node in GCS Service. - rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply); - // Get available resources of all nodes. - rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) - returns (GetAllAvailableResourcesReply); - // Report resource usage of a node to GCS Service. - rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); - // Get resource usage of all nodes from GCS Service. - rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); -} - -message GcsStatus { - int32 code = 1; - string message = 2; -} -///////////////////////////////////////////////////////////////////////////////