diff --git a/python/ray/monitor.py b/python/ray/monitor.py index fe1edad63..72de4e870 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -8,6 +8,8 @@ import time import traceback import json +import grpc + import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster @@ -17,11 +19,10 @@ from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS -import ray.gcs_utils -import ray.utils + +from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc import ray.ray_constants as ray_constants from ray.ray_logging import setup_component_logger -from ray._raylet import GlobalStateAccessor from ray.experimental.internal_kv import _internal_kv_put, \ _internal_kv_initialized, _internal_kv_get @@ -90,16 +91,17 @@ class Monitor: redis_address, redis_password=redis_password) self.redis = ray._private.services.create_redis_client( redis_address, password=redis_password) - self.global_state_accessor = GlobalStateAccessor( - redis_address, redis_password, False) - self.global_state_accessor.connect() + + # Initialize the gcs stub for getting all node resource usage. + gcs_address = self.redis.get("GcsServerAddress").decode("utf-8") + gcs_channel = grpc.insecure_channel(gcs_address) + self.gcs_node_resources_stub = \ + gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel) + # Set the redis client and mode so _internal_kv works for autoscaler. worker = ray.worker.global_worker worker.redis_client = self.redis worker.mode = 0 - # Keep a mapping from raylet client ID to IP address to use - # for updating the load metrics. - self.raylet_id_to_ip_map = {} head_node_ip = redis_address.split(":")[0] self.load_metrics = LoadMetrics(local_ip=head_node_ip) self.last_avail_resources = None @@ -117,19 +119,14 @@ class Monitor: logger.info("Monitor: Started") - def __del__(self): - """Destruct the monitor object.""" - # We close the pubsub client to avoid leaking file descriptors. - if self.global_state_accessor is not None: - self.global_state_accessor.disconnect() - self.global_state_accessor = None - def update_load_metrics(self): """Fetches resource usage data from GCS and updates load metrics.""" - all_resources = self.global_state_accessor.get_all_resource_usage() - resources_batch_data = \ - ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources) + request = gcs_service_pb2.GetAllResourceUsageRequest() + response = self.gcs_node_resources_stub.GetAllResourceUsage( + request, timeout=3) + resources_batch_data = response.resource_usage_data + for resource_message in resources_batch_data.batch: resource_load = dict(resource_message.resource_load) total_resources = dict(resource_message.resources_total) @@ -141,17 +138,10 @@ class Monitor: pending_placement_groups = list( resources_batch_data.placement_group_load.placement_group_data) - # Update the load metrics for this raylet. - node_id = ray.utils.binary_to_hex(resource_message.node_id) - ip = self.raylet_id_to_ip_map.get(node_id) - if ip: - self.load_metrics.update(ip, total_resources, - available_resources, resource_load, - waiting_bundles, infeasible_bundles, - pending_placement_groups) - else: - logger.warning( - f"Monitor: could not find ip for node {node_id}") + ip = resource_message.node_manager_address + self.load_metrics.update( + ip, total_resources, available_resources, resource_load, + waiting_bundles, infeasible_bundles, pending_placement_groups) def update_resource_requests(self): """Fetches resource requests from the internal KV and updates load.""" @@ -166,29 +156,10 @@ class Monitor: except Exception: logger.exception("Error parsing resource requests") - def update_raylet_map(self, _append_port=False): - """Updates internal raylet map. - - Args: - _append_port (bool): Defaults to False. Appending the port is - useful in testing, as mock clusters have many nodes with - the same IP and cannot be uniquely identified. - """ - all_raylet_nodes = ray.nodes() - self.raylet_id_to_ip_map = {} - for raylet_info in all_raylet_nodes: - node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"]) - ip_address = (raylet_info.get("AuxAddress") - or raylet_info["NodeManagerAddress"]).split(":")[0] - if _append_port: - ip_address += ":" + str(raylet_info["NodeManagerPort"]) - self.raylet_id_to_ip_map[node_id] = ip_address - def _run(self): """Run the monitor loop.""" while True: - self.update_raylet_map() self.update_load_metrics() self.update_resource_requests() self.update_event_summary() @@ -364,9 +335,9 @@ if __name__ == "__main__": # Something went wrong, so push an error to all drivers. redis_client = ray._private.services.create_redis_client( args.redis_address, password=args.redis_password) - traceback_str = ray.utils.format_error_message(traceback.format_exc()) message = ("The monitor failed with the " - f"following error:\n{traceback_str}") - ray.utils.push_error_to_driver_through_redis( + f"following error:\n{traceback.format_exc()}") + from ray.utils import push_error_to_driver_through_redis + push_error_to_driver_through_redis( redis_client, ray_constants.MONITOR_DIED_ERROR, message) raise e diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index b3e739e64..7569dff68 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -4,6 +4,7 @@ import time import ray import ray.ray_constants as ray_constants +from ray.util.placement_group import placement_group, remove_placement_group from ray.autoscaler.sdk import request_resources from ray.monitor import Monitor from ray.cluster_utils import Cluster @@ -68,16 +69,45 @@ def test_system_config(ray_start_cluster_head): def setup_monitor(address): monitor = Monitor( address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD) - monitor.update_raylet_map(_append_port=True) return monitor +def assert_correct_pg(pg_response_data, pg_demands, strategy): + assert len(pg_response_data) == 1 + pg_response_data = pg_response_data[0] + strategy_mapping_dict_protobuf = { + "PACK": 0, + "SPREAD": 1, + "STRICT_PACK": 2, + "STRICT_SPREAD": 3 + } + assert pg_response_data.strategy == strategy_mapping_dict_protobuf[ + strategy] + assert pg_response_data.creator_job_id + assert pg_response_data.creator_actor_id + assert pg_response_data.creator_actor_dead + assert pg_response_data.placement_group_id + + for i, bundle in enumerate(pg_demands): + assert pg_response_data.bundles[i].unit_resources == bundle + assert pg_response_data.bundles[i].bundle_id.placement_group_id + + +# DO NOT CHANGE THIS VERIFICATION WITHOUT NOTIFYING (Eric/Ameer/Alex). def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): request_resources(num_cpus=42) + # add placement groups. + pg_demands = [{"GPU": 2}, {"extra_resource": 2}] + strategy = "STRICT_PACK" + pg = placement_group(pg_demands, strategy=strategy) + pg.ready() + time.sleep(2) # wait for placemnt groups to propogate. + # Disable event clearing for test. monitor.event_summarizer.clear = lambda *a: None + visited_atleast_once = [set(), set()] while True: monitor.update_load_metrics() monitor.update_resource_requests() @@ -88,21 +118,29 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): req = monitor.load_metrics.resource_requests assert req == [{"CPU": 1}] * 42, req + pg_response_data = monitor.load_metrics.pending_placement_groups + assert_correct_pg(pg_response_data, pg_demands, strategy) + if "memory" in resource_usage[0]: del resource_usage[0]["memory"] - if "object_store_memory" in resource_usage[1]: + visited_atleast_once[0].add("memory") + if "object_store_memory" in resource_usage[0]: del resource_usage[0]["object_store_memory"] + visited_atleast_once[0].add("object_store_memory") if "memory" in resource_usage[1]: del resource_usage[1]["memory"] + visited_atleast_once[1].add("memory") if "object_store_memory" in resource_usage[1]: del resource_usage[1]["object_store_memory"] + visited_atleast_once[1].add("object_store_memory") for key in list(resource_usage[0].keys()): if key.startswith("node:"): del resource_usage[0][key] + visited_atleast_once[0].add("node:") for key in list(resource_usage[1].keys()): if key.startswith("node:"): del resource_usage[1][key] - + visited_atleast_once[1].add("node:") if expected_resource_usage is None: if all(x for x in resource_usage[0:]): break @@ -120,6 +158,13 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): # Sanity check we emitted a resize event. assert any("Resized to" in x for x in monitor.event_summarizer.summary()) + assert visited_atleast_once[0] == { + "memory", "object_store_memory", "node:" + } + assert visited_atleast_once[0] == visited_atleast_once[1] + + remove_placement_group(pg) + return resource_usage diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 844f44bea..7178fe715 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -46,19 +46,6 @@ enum TaskType { DRIVER_TASK = 3; } -// Type of placement group strategy. -enum PlacementStrategy { - // Packs Bundles into as few nodes as possible. - PACK = 0; - // Places Bundles across distinct nodes or processes as even as possible. - SPREAD = 1; - // Packs Bundles within one node. The group is not allowed to span multiple nodes. - STRICT_PACK = 2; - // Places Bundles across distinct nodes. - // The group is not allowed to deploy more than one bundle on a node. - STRICT_SPREAD = 3; -} - // Address of a worker or node manager. message Address { bytes raylet_id = 1; @@ -456,3 +443,24 @@ enum WorkerExitType { // Worker exit due to placement group removal. PLACEMENT_GROUP_REMOVED = 3; } +/////////////////////////////////////////////////////////////////////////////// +/* Please do not modify/remove/change the following enum to maintain +backwards compatibility in autoscaler. This is necessary to make sure we can +run autoscaler with any version of ray. For example, the K8s operator runs +autoscaler in a separate pod, if the user upgrades the ray version on the head +pod autoscaler can crash (if the newer version of ray modified the messages +below). */ + +// Type of placement group strategy. +enum PlacementStrategy { + // Packs Bundles into as few nodes as possible. + PACK = 0; + // Places Bundles across distinct nodes or processes as even as possible. + SPREAD = 1; + // Packs Bundles within one node. The group is not allowed to span multiple nodes. + STRICT_PACK = 2; + // Places Bundles across distinct nodes. + // The group is not allowed to deploy more than one bundle on a node. + STRICT_SPREAD = 3; +} +/////////////////////////////////////////////////////////////////////////////// diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index a56bffbe1..5da9842f9 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -158,43 +158,6 @@ message ErrorTableData { double timestamp = 4; } -message PlacementGroupTableData { - // State of a placement group. - enum PlacementGroupState { - // Placement Group is pending or scheduling - PENDING = 0; - // Placement Group is created. - CREATED = 1; - // Placement Group is already removed and won't be reschedule. - REMOVED = 2; - // Placement Group is rescheduling because the node it placed is dead. - RESCHEDULING = 3; - } - - // ID of the PlacementGroup. - bytes placement_group_id = 1; - // The name of the placement group. - string name = 2; - // The array of the bundle in Placement Group. - repeated Bundle bundles = 3; - // The schedule strategy of this Placement Group. - PlacementStrategy strategy = 4; - // Current state of this placement group. - PlacementGroupState state = 5; - // Fields to detect the owner of the placement group - // for automatic lifecycle management. - // The job id that created this placement group. - bytes creator_job_id = 6; - // The actor id that created this placement group. - bytes creator_actor_id = 7; - // Whether or not if the creator job is dead. - bool creator_job_dead = 8; - // Whether or not if the creator actor is dead. - bool creator_actor_dead = 9; - // Whether the placement group is persistent. - bool is_detached = 10; -} - message ScheduleData { map schedule_plan = 1; } @@ -275,71 +238,11 @@ message GcsNodeInfo { int64 timestamp = 10; } -// Represents the demand for a particular resource shape. -message ResourceDemand { - // The resource shape requested. This is a map from the resource string - // (e.g., "CPU") to the amount requested. - map shape = 1; - // The number of requests that are ready to run (i.e., dependencies have been - // fulfilled), but that are waiting for resources. - uint64 num_ready_requests_queued = 2; - // The number of requests for which there is no node that is a superset of - // the requested resource shape. - uint64 num_infeasible_requests_queued = 3; - // The number of requests of this shape still queued in CoreWorkers that this - // raylet knows about. - int64 backlog_size = 4; -} - -// Represents the demand sorted by resource shape. -message ResourceLoad { - // A list of all resource demands. The resource shape in each demand is - // unique. - repeated ResourceDemand resource_demands = 1; -} - -message PlacementGroupLoad { - // The list of pending placement group specifications. - repeated PlacementGroupTableData placement_group_data = 1; -} - message HeartbeatTableData { // Node id. bytes node_id = 1; } -message ResourcesData { - // Node id. - bytes node_id = 1; - // Resource capacity currently available on this node manager. - map resources_available = 2; - // Indicates whether available resources is changed. Only used when light - // heartbeat enabled. - bool resources_available_changed = 3; - // Total resource capacity configured for this node manager. - map resources_total = 4; - // Aggregate outstanding resource load on this node manager. - map resource_load = 5; - // Indicates whether resource load is changed. Only used when - // light heartbeat enabled. - bool resource_load_changed = 6; - // The resource load on this node, sorted by resource shape. - ResourceLoad resource_load_by_shape = 7; - // Whether this node manager is requesting global GC. - bool should_global_gc = 8; - // IP address of the node. - string node_manager_address = 9; -} - -message ResourceUsageBatchData { - repeated ResourcesData batch = 1; - // The total resource demand on all nodes included in the batch, sorted by - // resource shape. - ResourceLoad resource_load_by_shape = 2; - // The pending list of placement groups. - PlacementGroupLoad placement_group_load = 3; -} - // Data for a lease on task execution. message TaskLeaseData { // The task ID. @@ -453,3 +356,109 @@ message PubSubMessage { bytes id = 1; bytes data = 2; } + +/////////////////////////////////////////////////////////////////////////////// +/* Please do not modify/remove/change the following messages to maintain +backwards compatibility in autoscaler. This is necessary to make sure we can +run autoscaler with any version of ray. For example, the K8s operator runs +autoscaler in a separate pod, if the user upgrades the ray version on the head +pod autoscaler can crash (if the newer version of ray modified the messages +below). */ + +// Represents the demand for a particular resource shape. +message ResourceDemand { + // The resource shape requested. This is a map from the resource string + // (e.g., "CPU") to the amount requested. + map shape = 1; + // The number of requests that are ready to run (i.e., dependencies have been + // fulfilled), but that are waiting for resources. + uint64 num_ready_requests_queued = 2; + // The number of requests for which there is no node that is a superset of + // the requested resource shape. + uint64 num_infeasible_requests_queued = 3; + // The number of requests of this shape still queued in CoreWorkers that this + // raylet knows about. + int64 backlog_size = 4; +} + +// Represents the demand sorted by resource shape. +message ResourceLoad { + // A list of all resource demands. The resource shape in each demand is + // unique. + repeated ResourceDemand resource_demands = 1; +} + +message ResourcesData { + // Node id. + bytes node_id = 1; + // Resource capacity currently available on this node manager. + map resources_available = 2; + // Indicates whether available resources is changed. Only used when light + // heartbeat enabled. + bool resources_available_changed = 3; + // Total resource capacity configured for this node manager. + map resources_total = 4; + // Aggregate outstanding resource load on this node manager. + map resource_load = 5; + // Indicates whether resource load is changed. Only used when + // light heartbeat enabled. + bool resource_load_changed = 6; + // The resource load on this node, sorted by resource shape. + ResourceLoad resource_load_by_shape = 7; + // Whether this node manager is requesting global GC. + bool should_global_gc = 8; + // IP address of the node. + string node_manager_address = 9; +} + +message ResourceUsageBatchData { + repeated ResourcesData batch = 1; + // The total resource demand on all nodes included in the batch, sorted by + // resource shape. + ResourceLoad resource_load_by_shape = 2; + // The pending list of placement groups. + PlacementGroupLoad placement_group_load = 3; +} + +message PlacementGroupLoad { + // The list of pending placement group specifications. + repeated PlacementGroupTableData placement_group_data = 1; +} + +message PlacementGroupTableData { + // State of a placement group. + enum PlacementGroupState { + // Placement Group is pending or scheduling + PENDING = 0; + // Placement Group is created. + CREATED = 1; + // Placement Group is already removed and won't be reschedule. + REMOVED = 2; + // Placement Group is rescheduling because the node it placed is dead. + RESCHEDULING = 3; + } + + // ID of the PlacementGroup. + bytes placement_group_id = 1; + // The name of the placement group. + string name = 2; + // The array of the bundle in Placement Group. + repeated Bundle bundles = 3; + // The schedule strategy of this Placement Group. + PlacementStrategy strategy = 4; + // Current state of this placement group. + PlacementGroupState state = 5; + // Fields to detect the owner of the placement group + // for automatic lifecycle management. + // The job id that created this placement group. + bytes creator_job_id = 6; + // The actor id that created this placement group. + bytes creator_actor_id = 7; + // Whether or not if the creator job is dead. + bool creator_job_dead = 8; + // Whether or not if the creator actor is dead. + bool creator_actor_dead = 9; + // Whether the placement group is persistent. + bool is_detached = 10; +} +/////////////////////////////////////////////////////////////////////////////// diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index ed5ca92e2..78462cb2a 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -19,11 +19,6 @@ package ray.rpc; import "src/ray/protobuf/common.proto"; import "src/ray/protobuf/gcs.proto"; -message GcsStatus { - int32 code = 1; - string message = 2; -} - message AddJobRequest { JobTableData data = 1; } @@ -213,31 +208,6 @@ message ReportResourceUsageReply { GcsStatus status = 1; } -message GetAllResourceUsageRequest { -} - -message GetAllResourceUsageReply { - GcsStatus status = 1; - ResourceUsageBatchData resource_usage_data = 2; -} - -// Service for node resource info access. -service NodeResourceInfoGcsService { - // Get node's resources from GCS Service. - rpc GetResources(GetResourcesRequest) returns (GetResourcesReply); - // Update resources of a node in GCS Service. - rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply); - // Delete resources of a node in GCS Service. - rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply); - // Get available resources of all nodes. - rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) - returns (GetAllAvailableResourcesReply); - // Report resource usage of a node to GCS Service. - rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); - // Get resource usage of all nodes from GCS Service. - rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); -} - // Service for heartbeat info access. service HeartbeatInfoGcsService { // Report heartbeat of a node to GCS Service. @@ -535,3 +505,41 @@ service PlacementGroupInfoGcsService { rpc WaitPlacementGroupUntilReady(WaitPlacementGroupUntilReadyRequest) returns (WaitPlacementGroupUntilReadyReply); } +/////////////////////////////////////////////////////////////////////////////// +/* Please do not modify/remove/change the following messages to maintain +backwards compatibility in autoscaler. This is necessary to make sure we can +run autoscaler with any version of ray. For example, the K8s operator runs +autoscaler in a separate pod, if the user upgrades the ray version on the head +pod autoscaler can crash (if the newer version of ray modified the messages +below). */ + +message GetAllResourceUsageRequest { +} + +message GetAllResourceUsageReply { + GcsStatus status = 1; + ResourceUsageBatchData resource_usage_data = 2; +} + +// Service for node resource info access. +service NodeResourceInfoGcsService { + // Get node's resources from GCS Service. + rpc GetResources(GetResourcesRequest) returns (GetResourcesReply); + // Update resources of a node in GCS Service. + rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply); + // Delete resources of a node in GCS Service. + rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply); + // Get available resources of all nodes. + rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) + returns (GetAllAvailableResourcesReply); + // Report resource usage of a node to GCS Service. + rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); + // Get resource usage of all nodes from GCS Service. + rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); +} + +message GcsStatus { + int32 code = 1; + string message = 2; +} +///////////////////////////////////////////////////////////////////////////////