Revert "Revert "[Autoscaler] Monitor refactor for backward compatability. (#13970)" (#14046)" (#14050)

* prepare for head node

* move command runner interface outside _private

* remove space

* Eric

* flake

* min_workers in multi node type

* fixing edge cases

* eric not idle

* fix target_workers to consider min_workers of node types

* idle timeout

* minor

* minor fix

* test

* lint

* eric v2

* eric 3

* min_workers constraint before bin packing

* Update resource_demand_scheduler.py

* Revert "Update resource_demand_scheduler.py"

This reverts commit 818a63a2c86d8437b3ef21c5035d701c1d1127b5.

* reducing diff

* make get_nodes_to_launch return a dict

* merge

* weird merge fix

* auto fill instance types for AWS

* Alex/Eric

* Update doc/source/cluster/autoscaling.rst

* merge autofill and input from user

* logger.exception

* make the yaml use the default autofill

* docs Eric

* remove test_autoscaler_yaml from windows tests

* lets try changing the test a bit

* return test

* lets see

* edward

* Limit max launch concurrency

* commenting frac TODO

* move to resource demand scheduler

* use STATUS UP TO DATE

* Eric

* make logger of gc freed refs debug instead of info

* add cluster name to docker mount prefix directory

* grrR

* fix tests

* moving docker directory to sdk

* move the import to prevent circular dependency

* smallf fix

* ian

* fix max launch concurrency bug to assume failing nodes as pending and consider only load_metric's connected nodes as running

* small fix

* Revert "Revert "[Autoscaler] Monitor refactor for backward compatability. (#13970)" (#14046)"

This reverts commit 6f9d39fb3e.

* fake news

Co-authored-by: Ameer Haj Ali <ameerhajali@ameers-mbp.lan>
Co-authored-by: Alex Wu <alex@anyscale.io>
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: Ameer Haj Ali <ameerhajali@Ameers-MacBook-Pro.local>
This commit is contained in:
Ameer Haj Ali
2021-02-11 03:59:08 +02:00
committed by GitHub
parent c5574a33e4
commit d87a82e891
6 changed files with 239 additions and 196 deletions
+23 -52
View File
@@ -8,6 +8,8 @@ import time
import traceback
import json
import grpc
import ray
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
@@ -17,11 +19,10 @@ from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_STATUS
import ray.gcs_utils
import ray.utils
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
import ray.ray_constants as ray_constants
from ray.ray_logging import setup_component_logger
from ray._raylet import GlobalStateAccessor
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized, _internal_kv_get
@@ -90,16 +91,17 @@ class Monitor:
redis_address, redis_password=redis_password)
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)
self.global_state_accessor = GlobalStateAccessor(
redis_address, redis_password, False)
self.global_state_accessor.connect()
# Initialize the gcs stub for getting all node resource usage.
gcs_address = self.redis.get("GcsServerAddress").decode("utf-8")
gcs_channel = grpc.insecure_channel(gcs_address)
self.gcs_node_resources_stub = \
gcs_service_pb2_grpc.NodeResourceInfoGcsServiceStub(gcs_channel)
# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
worker.redis_client = self.redis
worker.mode = 0
# Keep a mapping from raylet client ID to IP address to use
# for updating the load metrics.
self.raylet_id_to_ip_map = {}
head_node_ip = redis_address.split(":")[0]
self.load_metrics = LoadMetrics(local_ip=head_node_ip)
self.last_avail_resources = None
@@ -117,19 +119,14 @@ class Monitor:
logger.info("Monitor: Started")
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
if self.global_state_accessor is not None:
self.global_state_accessor.disconnect()
self.global_state_accessor = None
def update_load_metrics(self):
"""Fetches resource usage data from GCS and updates load metrics."""
all_resources = self.global_state_accessor.get_all_resource_usage()
resources_batch_data = \
ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources)
request = gcs_service_pb2.GetAllResourceUsageRequest()
response = self.gcs_node_resources_stub.GetAllResourceUsage(
request, timeout=4)
resources_batch_data = response.resource_usage_data
for resource_message in resources_batch_data.batch:
resource_load = dict(resource_message.resource_load)
total_resources = dict(resource_message.resources_total)
@@ -141,17 +138,10 @@ class Monitor:
pending_placement_groups = list(
resources_batch_data.placement_group_load.placement_group_data)
# Update the load metrics for this raylet.
node_id = ray.utils.binary_to_hex(resource_message.node_id)
ip = self.raylet_id_to_ip_map.get(node_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources, resource_load,
waiting_bundles, infeasible_bundles,
pending_placement_groups)
else:
logger.warning(
f"Monitor: could not find ip for node {node_id}")
ip = resource_message.node_manager_address
self.load_metrics.update(
ip, total_resources, available_resources, resource_load,
waiting_bundles, infeasible_bundles, pending_placement_groups)
def update_resource_requests(self):
"""Fetches resource requests from the internal KV and updates load."""
@@ -166,29 +156,10 @@ class Monitor:
except Exception:
logger.exception("Error parsing resource requests")
def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.
Args:
_append_port (bool): Defaults to False. Appending the port is
useful in testing, as mock clusters have many nodes with
the same IP and cannot be uniquely identified.
"""
all_raylet_nodes = ray.nodes()
self.raylet_id_to_ip_map = {}
for raylet_info in all_raylet_nodes:
node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"])
ip_address = (raylet_info.get("AuxAddress")
or raylet_info["NodeManagerAddress"]).split(":")[0]
if _append_port:
ip_address += ":" + str(raylet_info["NodeManagerPort"])
self.raylet_id_to_ip_map[node_id] = ip_address
def _run(self):
"""Run the monitor loop."""
while True:
self.update_raylet_map()
self.update_load_metrics()
self.update_resource_requests()
self.update_event_summary()
@@ -364,9 +335,9 @@ if __name__ == "__main__":
# Something went wrong, so push an error to all drivers.
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The monitor failed with the "
f"following error:\n{traceback_str}")
ray.utils.push_error_to_driver_through_redis(
f"following error:\n{traceback.format_exc()}")
from ray.utils import push_error_to_driver_through_redis
push_error_to_driver_through_redis(
redis_client, ray_constants.MONITOR_DIED_ERROR, message)
raise e
+48 -3
View File
@@ -4,6 +4,7 @@ import time
import ray
import ray.ray_constants as ray_constants
from ray.util.placement_group import placement_group, remove_placement_group
from ray.autoscaler.sdk import request_resources
from ray.monitor import Monitor
from ray.cluster_utils import Cluster
@@ -68,16 +69,45 @@ def test_system_config(ray_start_cluster_head):
def setup_monitor(address):
monitor = Monitor(
address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD)
monitor.update_raylet_map(_append_port=True)
return monitor
def assert_correct_pg(pg_response_data, pg_demands, strategy):
assert len(pg_response_data) == 1
pg_response_data = pg_response_data[0]
strategy_mapping_dict_protobuf = {
"PACK": 0,
"SPREAD": 1,
"STRICT_PACK": 2,
"STRICT_SPREAD": 3
}
assert pg_response_data.strategy == strategy_mapping_dict_protobuf[
strategy]
assert pg_response_data.creator_job_id
assert pg_response_data.creator_actor_id
assert pg_response_data.creator_actor_dead
assert pg_response_data.placement_group_id
for i, bundle in enumerate(pg_demands):
assert pg_response_data.bundles[i].unit_resources == bundle
assert pg_response_data.bundles[i].bundle_id.placement_group_id
# DO NOT CHANGE THIS VERIFICATION WITHOUT NOTIFYING (Eric/Ameer/Alex).
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
request_resources(num_cpus=42)
# add placement groups.
pg_demands = [{"GPU": 2}, {"extra_resource": 2}]
strategy = "STRICT_PACK"
pg = placement_group(pg_demands, strategy=strategy)
pg.ready()
time.sleep(2) # wait for placemnt groups to propogate.
# Disable event clearing for test.
monitor.event_summarizer.clear = lambda *a: None
visited_atleast_once = [set(), set()]
while True:
monitor.update_load_metrics()
monitor.update_resource_requests()
@@ -88,21 +118,29 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
req = monitor.load_metrics.resource_requests
assert req == [{"CPU": 1}] * 42, req
pg_response_data = monitor.load_metrics.pending_placement_groups
assert_correct_pg(pg_response_data, pg_demands, strategy)
if "memory" in resource_usage[0]:
del resource_usage[0]["memory"]
if "object_store_memory" in resource_usage[1]:
visited_atleast_once[0].add("memory")
if "object_store_memory" in resource_usage[0]:
del resource_usage[0]["object_store_memory"]
visited_atleast_once[0].add("object_store_memory")
if "memory" in resource_usage[1]:
del resource_usage[1]["memory"]
visited_atleast_once[1].add("memory")
if "object_store_memory" in resource_usage[1]:
del resource_usage[1]["object_store_memory"]
visited_atleast_once[1].add("object_store_memory")
for key in list(resource_usage[0].keys()):
if key.startswith("node:"):
del resource_usage[0][key]
visited_atleast_once[0].add("node:")
for key in list(resource_usage[1].keys()):
if key.startswith("node:"):
del resource_usage[1][key]
visited_atleast_once[1].add("node:")
if expected_resource_usage is None:
if all(x for x in resource_usage[0:]):
break
@@ -120,6 +158,13 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
# Sanity check we emitted a resize event.
assert any("Resized to" in x for x in monitor.event_summarizer.summary())
assert visited_atleast_once[0] == {
"memory", "object_store_memory", "node:"
}
assert visited_atleast_once[0] == visited_atleast_once[1]
remove_placement_group(pg)
return resource_usage
@@ -126,7 +126,9 @@ def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
if not sys.platform.startswith("win"):
# fails on windows.
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_gcs_server()
# If the driver can reach the tearDown method, then it is still alive.
+21 -13
View File
@@ -46,19 +46,6 @@ enum TaskType {
DRIVER_TASK = 3;
}
// Type of placement group strategy.
enum PlacementStrategy {
// Packs Bundles into as few nodes as possible.
PACK = 0;
// Places Bundles across distinct nodes or processes as even as possible.
SPREAD = 1;
// Packs Bundles within one node. The group is not allowed to span multiple nodes.
STRICT_PACK = 2;
// Places Bundles across distinct nodes.
// The group is not allowed to deploy more than one bundle on a node.
STRICT_SPREAD = 3;
}
// Address of a worker or node manager.
message Address {
bytes raylet_id = 1;
@@ -456,3 +443,24 @@ enum WorkerExitType {
// Worker exit due to placement group removal.
PLACEMENT_GROUP_REMOVED = 3;
}
///////////////////////////////////////////////////////////////////////////////
/* Please do not modify/remove/change the following enum to maintain
backwards compatibility in autoscaler. This is necessary to make sure we can
run autoscaler with any version of ray. For example, the K8s operator runs
autoscaler in a separate pod, if the user upgrades the ray version on the head
pod autoscaler can crash (if the newer version of ray modified the messages
below). */
// Type of placement group strategy.
enum PlacementStrategy {
// Packs Bundles into as few nodes as possible.
PACK = 0;
// Places Bundles across distinct nodes or processes as even as possible.
SPREAD = 1;
// Packs Bundles within one node. The group is not allowed to span multiple nodes.
STRICT_PACK = 2;
// Places Bundles across distinct nodes.
// The group is not allowed to deploy more than one bundle on a node.
STRICT_SPREAD = 3;
}
///////////////////////////////////////////////////////////////////////////////
+106 -97
View File
@@ -158,43 +158,6 @@ message ErrorTableData {
double timestamp = 4;
}
message PlacementGroupTableData {
// State of a placement group.
enum PlacementGroupState {
// Placement Group is pending or scheduling
PENDING = 0;
// Placement Group is created.
CREATED = 1;
// Placement Group is already removed and won't be reschedule.
REMOVED = 2;
// Placement Group is rescheduling because the node it placed is dead.
RESCHEDULING = 3;
}
// ID of the PlacementGroup.
bytes placement_group_id = 1;
// The name of the placement group.
string name = 2;
// The array of the bundle in Placement Group.
repeated Bundle bundles = 3;
// The schedule strategy of this Placement Group.
PlacementStrategy strategy = 4;
// Current state of this placement group.
PlacementGroupState state = 5;
// Fields to detect the owner of the placement group
// for automatic lifecycle management.
// The job id that created this placement group.
bytes creator_job_id = 6;
// The actor id that created this placement group.
bytes creator_actor_id = 7;
// Whether or not if the creator job is dead.
bool creator_job_dead = 8;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 9;
// Whether the placement group is persistent.
bool is_detached = 10;
}
message ScheduleData {
map<string, bytes> schedule_plan = 1;
}
@@ -275,71 +238,11 @@ message GcsNodeInfo {
int64 timestamp = 10;
}
// Represents the demand for a particular resource shape.
message ResourceDemand {
// The resource shape requested. This is a map from the resource string
// (e.g., "CPU") to the amount requested.
map<string, double> shape = 1;
// The number of requests that are ready to run (i.e., dependencies have been
// fulfilled), but that are waiting for resources.
uint64 num_ready_requests_queued = 2;
// The number of requests for which there is no node that is a superset of
// the requested resource shape.
uint64 num_infeasible_requests_queued = 3;
// The number of requests of this shape still queued in CoreWorkers that this
// raylet knows about.
int64 backlog_size = 4;
}
// Represents the demand sorted by resource shape.
message ResourceLoad {
// A list of all resource demands. The resource shape in each demand is
// unique.
repeated ResourceDemand resource_demands = 1;
}
message PlacementGroupLoad {
// The list of pending placement group specifications.
repeated PlacementGroupTableData placement_group_data = 1;
}
message HeartbeatTableData {
// Node id.
bytes node_id = 1;
}
message ResourcesData {
// Node id.
bytes node_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether available resources is changed. Only used when light
// heartbeat enabled.
bool resources_available_changed = 3;
// Total resource capacity configured for this node manager.
map<string, double> resources_total = 4;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 5;
// Indicates whether resource load is changed. Only used when
// light heartbeat enabled.
bool resource_load_changed = 6;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 7;
// Whether this node manager is requesting global GC.
bool should_global_gc = 8;
// IP address of the node.
string node_manager_address = 9;
}
message ResourceUsageBatchData {
repeated ResourcesData batch = 1;
// The total resource demand on all nodes included in the batch, sorted by
// resource shape.
ResourceLoad resource_load_by_shape = 2;
// The pending list of placement groups.
PlacementGroupLoad placement_group_load = 3;
}
// Data for a lease on task execution.
message TaskLeaseData {
// The task ID.
@@ -453,3 +356,109 @@ message PubSubMessage {
bytes id = 1;
bytes data = 2;
}
///////////////////////////////////////////////////////////////////////////////
/* Please do not modify/remove/change the following messages to maintain
backwards compatibility in autoscaler. This is necessary to make sure we can
run autoscaler with any version of ray. For example, the K8s operator runs
autoscaler in a separate pod, if the user upgrades the ray version on the head
pod autoscaler can crash (if the newer version of ray modified the messages
below). */
// Represents the demand for a particular resource shape.
message ResourceDemand {
// The resource shape requested. This is a map from the resource string
// (e.g., "CPU") to the amount requested.
map<string, double> shape = 1;
// The number of requests that are ready to run (i.e., dependencies have been
// fulfilled), but that are waiting for resources.
uint64 num_ready_requests_queued = 2;
// The number of requests for which there is no node that is a superset of
// the requested resource shape.
uint64 num_infeasible_requests_queued = 3;
// The number of requests of this shape still queued in CoreWorkers that this
// raylet knows about.
int64 backlog_size = 4;
}
// Represents the demand sorted by resource shape.
message ResourceLoad {
// A list of all resource demands. The resource shape in each demand is
// unique.
repeated ResourceDemand resource_demands = 1;
}
message ResourcesData {
// Node id.
bytes node_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether available resources is changed. Only used when light
// heartbeat enabled.
bool resources_available_changed = 3;
// Total resource capacity configured for this node manager.
map<string, double> resources_total = 4;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 5;
// Indicates whether resource load is changed. Only used when
// light heartbeat enabled.
bool resource_load_changed = 6;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 7;
// Whether this node manager is requesting global GC.
bool should_global_gc = 8;
// IP address of the node.
string node_manager_address = 9;
}
message ResourceUsageBatchData {
repeated ResourcesData batch = 1;
// The total resource demand on all nodes included in the batch, sorted by
// resource shape.
ResourceLoad resource_load_by_shape = 2;
// The pending list of placement groups.
PlacementGroupLoad placement_group_load = 3;
}
message PlacementGroupLoad {
// The list of pending placement group specifications.
repeated PlacementGroupTableData placement_group_data = 1;
}
message PlacementGroupTableData {
// State of a placement group.
enum PlacementGroupState {
// Placement Group is pending or scheduling
PENDING = 0;
// Placement Group is created.
CREATED = 1;
// Placement Group is already removed and won't be reschedule.
REMOVED = 2;
// Placement Group is rescheduling because the node it placed is dead.
RESCHEDULING = 3;
}
// ID of the PlacementGroup.
bytes placement_group_id = 1;
// The name of the placement group.
string name = 2;
// The array of the bundle in Placement Group.
repeated Bundle bundles = 3;
// The schedule strategy of this Placement Group.
PlacementStrategy strategy = 4;
// Current state of this placement group.
PlacementGroupState state = 5;
// Fields to detect the owner of the placement group
// for automatic lifecycle management.
// The job id that created this placement group.
bytes creator_job_id = 6;
// The actor id that created this placement group.
bytes creator_actor_id = 7;
// Whether or not if the creator job is dead.
bool creator_job_dead = 8;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 9;
// Whether the placement group is persistent.
bool is_detached = 10;
}
///////////////////////////////////////////////////////////////////////////////
+38 -30
View File
@@ -19,11 +19,6 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";
message GcsStatus {
int32 code = 1;
string message = 2;
}
message AddJobRequest {
JobTableData data = 1;
}
@@ -231,31 +226,6 @@ message ReportResourceUsageReply {
GcsStatus status = 1;
}
message GetAllResourceUsageRequest {
}
message GetAllResourceUsageReply {
GcsStatus status = 1;
ResourceUsageBatchData resource_usage_data = 2;
}
// Service for node resource info access.
service NodeResourceInfoGcsService {
// Get node's resources from GCS Service.
rpc GetResources(GetResourcesRequest) returns (GetResourcesReply);
// Update resources of a node in GCS Service.
rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply);
// Delete resources of a node in GCS Service.
rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply);
// Get available resources of all nodes.
rpc GetAllAvailableResources(GetAllAvailableResourcesRequest)
returns (GetAllAvailableResourcesReply);
// Report resource usage of a node to GCS Service.
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
}
// Service for heartbeat info access.
service HeartbeatInfoGcsService {
// Report heartbeat of a node to GCS Service.
@@ -553,3 +523,41 @@ service PlacementGroupInfoGcsService {
rpc WaitPlacementGroupUntilReady(WaitPlacementGroupUntilReadyRequest)
returns (WaitPlacementGroupUntilReadyReply);
}
///////////////////////////////////////////////////////////////////////////////
/* Please do not modify/remove/change the following messages to maintain
backwards compatibility in autoscaler. This is necessary to make sure we can
run autoscaler with any version of ray. For example, the K8s operator runs
autoscaler in a separate pod, if the user upgrades the ray version on the head
pod autoscaler can crash (if the newer version of ray modified the messages
below). */
message GetAllResourceUsageRequest {
}
message GetAllResourceUsageReply {
GcsStatus status = 1;
ResourceUsageBatchData resource_usage_data = 2;
}
// Service for node resource info access.
service NodeResourceInfoGcsService {
// Get node's resources from GCS Service.
rpc GetResources(GetResourcesRequest) returns (GetResourcesReply);
// Update resources of a node in GCS Service.
rpc UpdateResources(UpdateResourcesRequest) returns (UpdateResourcesReply);
// Delete resources of a node in GCS Service.
rpc DeleteResources(DeleteResourcesRequest) returns (DeleteResourcesReply);
// Get available resources of all nodes.
rpc GetAllAvailableResources(GetAllAvailableResourcesRequest)
returns (GetAllAvailableResourcesReply);
// Report resource usage of a node to GCS Service.
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
}
message GcsStatus {
int32 code = 1;
string message = 2;
}
///////////////////////////////////////////////////////////////////////////////