diff --git a/doc/source/cluster/autoscaling.rst b/doc/source/cluster/autoscaling.rst index 265d5b014..af091c815 100644 --- a/doc/source/cluster/autoscaling.rst +++ b/doc/source/cluster/autoscaling.rst @@ -38,6 +38,12 @@ The basic autoscaling config settings are as follows: # considered idle if there are no tasks or actors running on it. idle_timeout_minutes: 5 +Programmatically Scaling a Cluster +---------------------------------- + +You can from within a Ray program command the autoscaler to scale the cluster up to a desired size with ``request_resources()`` call. The cluster will immediately attempt to scale to accomodate the requested resources, bypassing normal upscaling delay. + +.. autofunction:: ray.autoscaler.sdk.request_resources Manually Adding Nodes without Resources (Unmanaged Nodes) --------------------------------------------------------- @@ -159,4 +165,4 @@ The following configuration is for a GPU enabled node type: worker_image: - rayproject/ray-ml:latest-gpu worker_run_options: # Appended to top-level docker field. - - "-v /home:/home" \ No newline at end of file + - "-v /home:/home" diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 1fb973bd1..3b9373383 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -209,17 +209,14 @@ class StandardAutoscaler: # First let the resource demand scheduler launch nodes, if enabled. if self.resource_demand_scheduler: - resource_demand_vector = self.resource_demand_vector + \ - self.load_metrics.get_resource_demand_vector() - pending_placement_groups = \ - self.load_metrics.get_pending_placement_groups() to_launch = self.resource_demand_scheduler.get_nodes_to_launch( self.provider.non_terminated_nodes(tag_filters={}), self.pending_launches.breakdown(), - resource_demand_vector, + self.load_metrics.get_resource_demand_vector(), self.load_metrics.get_resource_utilization(), - pending_placement_groups, - self.load_metrics.get_static_node_resources_by_ip()) + self.load_metrics.get_pending_placement_groups(), + self.load_metrics.get_static_node_resources_by_ip(), + ensure_min_cluster_size=self.resource_demand_vector) for node_type, count in to_launch.items(): self.launch_new_node(count, node_type=node_type) @@ -563,7 +560,11 @@ class StandardAutoscaler: "StandardAutoscaler: Queue {} new nodes for launch".format(count)) self.pending_launches.inc(node_type, count) config = copy.deepcopy(self.config) - self.launch_queue.put((config, count, node_type)) + # Split into individual launch requests of the max batch size. + while count > 0: + self.launch_queue.put((config, min(count, self.max_launch_batch), + node_type)) + count -= self.max_launch_batch def all_workers(self): return self.workers() + self.unmanaged_workers() diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index df0bd4aa3..b4cfb93e2 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -19,6 +19,7 @@ try: # py3 except ImportError: # py2 from pipes import quote +import ray from ray.experimental.internal_kv import _internal_kv_get import ray._private.services as services from ray.autoscaler.node_provider import NodeProvider @@ -104,22 +105,23 @@ def request_resources(num_cpus: Optional[int] = None, This function is to be called e.g. on a node before submitting a bunch of ray.remote calls to ensure that resources rapidly become available. - This function is EXPERIMENTAL. - Args: - num_cpus: int -- the number of CPU cores to request - bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This - only has an effect if you've configured `available_node_types` - if your cluster config. + num_cpus (int): Scale the cluster to ensure this number of CPUs are + available. This request is persistent until another call to + request_resources() is made. + bundles (List[ResourceDict]): Scale the cluster to ensure this set of + resource shapes can fit. This request is persistent until another + call to request_resources() is made. """ + if not ray.is_initialized(): + raise RuntimeError("Ray is not initialized yet") r = _redis() - if num_cpus is not None and num_cpus > 0: - r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, - json.dumps({ - "CPU": num_cpus - })) + to_request = [] + if num_cpus: + to_request += [{"CPU": 1}] * num_cpus if bundles: - r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(bundles)) + to_request += bundles + r.publish(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request)) def create_or_update_cluster(config_file: str, diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 15944c1d8..4a0bcdee5 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -52,11 +52,14 @@ class ResourceDemandScheduler: and NODE_TYPE_LEGACY_WORKER in node_types) def get_nodes_to_launch( - self, nodes: List[NodeID], pending_nodes: Dict[NodeType, int], + self, + nodes: List[NodeID], + pending_nodes: Dict[NodeType, int], resource_demands: List[ResourceDict], - usage_by_ip: Dict[NodeIP, ResourceDict], + unused_resources_by_ip: Dict[NodeIP, ResourceDict], pending_placement_groups: List[PlacementGroupTableData], - static_node_resources: Dict[NodeIP, ResourceDict] + max_resources_by_ip: Dict[NodeIP, ResourceDict], + ensure_min_cluster_size: List[ResourceDict] = None, ) -> Dict[NodeType, int]: """Given resource demands, return node types to add to the cluster. @@ -74,19 +77,40 @@ class ResourceDemandScheduler: nodes: List of existing nodes in the cluster. pending_nodes: Summary of node types currently being launched. resource_demands: Vector of resource demands from the scheduler. - usage_by_ip: Mapping from ip to available resources. + unused_resources_by_ip: Mapping from ip to available resources. pending_placement_groups: Placement group demands. - static_node_resources: Mapping from ip to static node resources. + max_resources_by_ip: Mapping from ip to static node resources. + ensure_min_cluster_size: Try to ensure the cluster can fit at least + this set of resources. This differs from resources_demands in + that we don't take into account existing usage. """ + + # If the user is using request_resources() API, calculate the remaining + # delta resources required to meet their requested cluster size. + if ensure_min_cluster_size is not None: + used_resources = [] + for ip, max_res in max_resources_by_ip.items(): + res = copy.deepcopy(max_res) + _inplace_subtract(res, unused_resources_by_ip.get(ip, {})) + used_resources.append(res) + # Example: user requests 1000 CPUs, but the cluster is currently + # 500 CPUs in size with 250 used. Then, the delta is 750 CPUs that + # we need to fit to get the cluster to scale to 1000. + resource_requests, _ = get_bin_pack_residual( + used_resources, ensure_min_cluster_size) + resource_demands += resource_requests + else: + resource_requests = [] + if self.is_legacy_yaml: # When using legacy yaml files we need to infer the head & worker # node resources from the static node resources from LoadMetrics. - self._infer_legacy_node_resources_if_needed(static_node_resources) + self._infer_legacy_node_resources_if_needed(max_resources_by_ip) node_resources: List[ResourceDict] node_type_counts: Dict[NodeType, int] - node_resources, node_type_counts = \ - self.calculate_node_resources(nodes, pending_nodes, usage_by_ip) + node_resources, node_type_counts = self.calculate_node_resources( + nodes, pending_nodes, unused_resources_by_ip) logger.info("Cluster resources: {}".format(node_resources)) logger.info("Node counts: {}".format(node_type_counts)) @@ -118,6 +142,12 @@ class ResourceDemandScheduler: logger.info("Resource demands: {}".format(resource_demands)) logger.info("Unfulfilled demands: {}".format(unfulfilled)) max_to_add = self.max_workers - sum(node_type_counts.values()) + if resource_requests: + nodes_to_add_based_on_requests = get_nodes_for( + self.node_types, node_type_counts, max_to_add, + resource_requests) + else: + nodes_to_add_based_on_requests = {} nodes_to_add_based_on_demand = get_nodes_for( self.node_types, node_type_counts, max_to_add, unfulfilled) # Merge nodes to add based on demand and nodes to add based on @@ -133,7 +163,8 @@ class ResourceDemandScheduler: # Limit the number of concurrent launches total_nodes_to_add = self._get_concurrent_resource_demand_to_launch( - total_nodes_to_add, usage_by_ip.keys(), nodes, pending_nodes) + total_nodes_to_add, unused_resources_by_ip.keys(), nodes, + pending_nodes, nodes_to_add_based_on_requests) logger.info("Node requests: {}".format(total_nodes_to_add)) return total_nodes_to_add @@ -168,23 +199,23 @@ class ResourceDemandScheduler: return {} def _infer_legacy_node_resources_if_needed( - self, static_node_resources: Dict[NodeIP, ResourceDict] + self, max_resources_by_ip: Dict[NodeIP, ResourceDict] ) -> (bool, Dict[NodeType, int]): """Infers node resources for legacy config files. Updates the resources of the head and worker node types in self.node_types. Args: - static_node_resources: Mapping from ip to static node resources. + max_resources_by_ip: Mapping from ip to static node resources. """ # We fill the head node resources only once. if not self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"]: - assert len(static_node_resources) == 1 # Only the head node. + assert len(max_resources_by_ip) == 1 # Only the head node. self.node_types[NODE_TYPE_LEGACY_HEAD]["resources"] = next( - iter(static_node_resources.values())) + iter(max_resources_by_ip.values())) # We fill the worker node resources only once. if not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]: - if len(static_node_resources) > 1: + if len(max_resources_by_ip) > 1: # Set the node_types here as we already launched a worker node # from which we directly get the node_resources. worker_nodes = self.provider.non_terminated_nodes( @@ -194,15 +225,18 @@ class ResourceDemandScheduler: for node_id in worker_nodes ] for ip in worker_node_ips: - if ip in static_node_resources: + if ip in max_resources_by_ip: self.node_types[NODE_TYPE_LEGACY_WORKER][ - "resources"] = static_node_resources[ip] + "resources"] = max_resources_by_ip[ip] assert self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"] def _get_concurrent_resource_demand_to_launch( - self, to_launch: Dict[NodeType, int], - connected_nodes: List[NodeIP], non_terminated_nodes: List[NodeID], - pending_launches_nodes: Dict[NodeType, int] + self, + to_launch: Dict[NodeType, int], + connected_nodes: List[NodeIP], + non_terminated_nodes: List[NodeID], + pending_launches_nodes: Dict[NodeType, int], + nodes_to_add_based_on_requests: Dict[NodeType, int], ) -> Dict[NodeType, int]: """Updates the max concurrent resources to launch for each node type. @@ -221,6 +255,9 @@ class ResourceDemandScheduler: connected_nodes: Running nodes (from LoadMetrics). non_terminated_nodes: Non terminated nodes (pending/running). pending_launches_nodes: Nodes that are in the launch queue. + nodes_to_add_based_on_requests: Nodes to launch to satisfy + request_resources(). This overrides the launch limits since the + user is hinting to immediately scale up to this size. Returns: Dict[NodeType, int]: Maximum number of nodes to launch for each node type. @@ -240,15 +277,20 @@ class ResourceDemandScheduler: total_pending_nodes = pending_launches_nodes.get( node_type, 0) + pending_nodes[node_type] - # Allow more nodes if this is to respect min_workers constraint. - nodes_to_add = max( + upper_bound = max( max_allowed_pending_nodes - total_pending_nodes, - self.node_types[node_type].get("min_workers", 0) - - total_pending_nodes - running_nodes[node_type]) - if nodes_to_add > 0: + # Allow more nodes if this is to respect min_workers. + self.node_types[node_type].get("min_workers", 0) - + total_pending_nodes - running_nodes[node_type], + + # Allow more nodes from request_resources API. + nodes_to_add_based_on_requests.get(node_type, + 0) - total_pending_nodes) + + if upper_bound > 0: updated_nodes_to_launch[node_type] = min( - nodes_to_add, to_launch[node_type]) + upper_bound, to_launch[node_type]) return updated_nodes_to_launch @@ -274,7 +316,7 @@ class ResourceDemandScheduler: def calculate_node_resources( self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int], - usage_by_ip: Dict[str, ResourceDict] + unused_resources_by_ip: Dict[str, ResourceDict] ) -> (List[ResourceDict], Dict[NodeType, int]): """Returns node resource list and node type counts. @@ -317,7 +359,7 @@ class ResourceDemandScheduler: if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] ip = self.provider.internal_ip(node_id) - available_resources = usage_by_ip.get(ip) + available_resources = unused_resources_by_ip.get(ip) add_node(node_type, available_resources) for node_type, count in pending_nodes.items(): @@ -375,9 +417,9 @@ class ResourceDemandScheduler: def debug_string(self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int], - usage_by_ip: Dict[str, ResourceDict]) -> str: + unused_resources_by_ip: Dict[str, ResourceDict]) -> str: node_resources, node_type_counts = self.calculate_node_resources( - nodes, pending_nodes, usage_by_ip) + nodes, pending_nodes, unused_resources_by_ip) out = "Worker node types:" for node_type, count in node_type_counts.items(): @@ -537,13 +579,14 @@ def get_bin_pack_residual(node_resources: List[ResourceDict], Returns: List[ResourceDict] the residual list resources that do not fit. - + List[ResourceDict]: The updated node_resources after the method. """ unfulfilled = [] # A most naive bin packing algorithm. nodes = copy.deepcopy(node_resources) + # List of nodes that cannot be used again due to strict spread. used = [] for demand in resource_demands: found = False diff --git a/python/ray/autoscaler/sdk.py b/python/ray/autoscaler/sdk.py index fd35a16ac..c3ba09d51 100644 --- a/python/ray/autoscaler/sdk.py +++ b/python/ray/autoscaler/sdk.py @@ -180,13 +180,21 @@ def request_resources(num_cpus: Optional[int] = None, This function is to be called e.g. on a node before submitting a bunch of ray.remote calls to ensure that resources rapidly become available. - This function is EXPERIMENTAL. - Args: - num_cpus: int -- the number of CPU cores to request - bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This - only has an effect if you've configured `available_node_types` - if your cluster config. + num_cpus (int): Scale the cluster to ensure this number of CPUs are + available. This request is persistent until another call to + request_resources() is made. + bundles (List[ResourceDict]): Scale the cluster to ensure this set of + resource shapes can fit. This request is persistent until another + call to request_resources() is made. + + Examples: + >>> # Request 1000 CPUs. + >>> request_resources(num_cpus=1000) + >>> # Request 64 CPUs and also fit a 1-GPU/4-CPU task. + >>> request_resources(num_cpus=64, bundles=[{"GPU": 1, "CPU": 4}]) + >>> # Same as requesting num_cpus=3. + >>> request_resources(bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}]) """ return commands.request_resources(num_cpus, bundles) diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 376910afe..b305edca9 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -371,6 +371,81 @@ def test_calculate_node_resources(): assert to_launch == {"p2.8xlarge": 1} +def test_request_resources_existing_usage(): + provider = MockProvider() + TYPES = { + "p2.8xlarge": { + "node_config": {}, + "resources": { + "CPU": 32, + "GPU": 8 + }, + "max_workers": 40, + }, + } + scheduler = ResourceDemandScheduler(provider, TYPES, max_workers=100) + + # 5 nodes with 32 CPU and 8 GPU each + provider.create_node({}, { + TAG_RAY_USER_NODE_TYPE: "p2.8xlarge", + TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE + }, 2) + all_nodes = provider.non_terminated_nodes({}) + node_ips = provider.non_terminated_node_ips({}) + assert len(node_ips) == 2, node_ips + + # Fully utilized, no requests. + avail_by_ip = {ip: {} for ip in node_ips} + max_by_ip = {ip: {"GPU": 8, "CPU": 32} for ip in node_ips} + demands = [] + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert len(to_launch) == 0, to_launch + + # Fully utilized, resource requests exactly equal. + avail_by_ip = {ip: {} for ip in node_ips} + demands = [{"GPU": 4}] * 4 + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert len(to_launch) == 0, to_launch + + # Fully utilized, resource requests in excess. + avail_by_ip = {ip: {} for ip in node_ips} + demands = [{"GPU": 4}] * 7 + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert to_launch.get("p2.8xlarge") == 2, to_launch + + # Not utilized, no requests. + avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips} + demands = [] + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert len(to_launch) == 0, to_launch + + # Not utilized, resource requests exactly equal. + avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips} + demands = [{"GPU": 4}] * 4 + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert len(to_launch) == 0, to_launch + + # Not utilized, resource requests in excess. + avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips} + demands = [{"GPU": 4}] * 7 + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + assert to_launch.get("p2.8xlarge") == 2, to_launch + + # Not utilized, resource requests hugely in excess. + avail_by_ip = {ip: {"GPU": 4, "CPU": 32} for ip in node_ips} + demands = [{"GPU": 4}] * 70 + to_launch = scheduler.get_nodes_to_launch(all_nodes, {}, [], avail_by_ip, + [], max_by_ip, demands) + # This bypasses the launch rate limit. + assert to_launch.get("p2.8xlarge") == 33, to_launch + + def test_backlog_queue_impact_on_binpacking_time(): new_types = copy.deepcopy(TYPES_A) new_types["p2.8xlarge"]["max_workers"] = 1000 @@ -579,7 +654,7 @@ def test_get_concurrent_resource_demand_to_launch(): # Sanity check. updated_to_launch = \ - scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}) + scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}, {}) assert updated_to_launch == {} provider.create_node({}, { @@ -598,7 +673,7 @@ def test_get_concurrent_resource_demand_to_launch(): connected_nodes = [] # All the non_terminated_nodes are not connected yet. updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note: we have 2 pending/launching gpus, 3 pending/launching cpus, # 0 running gpu, and 0 running cpus. assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2} @@ -611,7 +686,7 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note that here we have 1 launching gpu, 1 launching cpu, # 1 running gpu, and 2 running cpus. assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4} @@ -632,7 +707,7 @@ def test_get_concurrent_resource_demand_to_launch(): pending_launches_nodes = {} # No pending launches updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note: we have 5 pending cpus. So we are not allowed to start any. # Still only 2 running cpus. assert updated_to_launch == {} @@ -643,7 +718,7 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note: that here we have 7 running cpus and nothing pending/launching. assert updated_to_launch == {"m4.large": 7} @@ -659,7 +734,7 @@ def test_get_concurrent_resource_demand_to_launch(): pending_launches_nodes = {"m4.large": 1} updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note: we have 8 pending/launching cpus and only 7 running. # So we should not launch anything (8 < 7). assert updated_to_launch == {} @@ -670,7 +745,7 @@ def test_get_concurrent_resource_demand_to_launch(): ] updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch( to_launch, connected_nodes, non_terminated_nodes, - pending_launches_nodes) + pending_launches_nodes, {}) # Note: that here we have 14 running cpus and 1 launching. assert updated_to_launch == {"m4.large": 13}