diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 8f7b3ab6f..b69e7238f 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -17,6 +17,7 @@ py_test_module_list( "test_advanced.py", "test_advanced_2.py", "test_array.py", + "test_autoscaling_policy.py", "test_basic.py", "test_basic_2.py", "test_cancel.py", @@ -25,7 +26,6 @@ py_test_module_list( "test_error_ray_not_initialized.py", "test_gcs_fault_tolerance.py", "test_iter.py", - "test_resource_demand_scheduler.py", ], size = "medium", extra_srcs = SRCS, @@ -57,12 +57,13 @@ py_test_module_list( "test_multi_node_2.py", "test_multinode_failures_2.py", "test_multiprocessing.py", + "test_object_spilling.py", "test_output.py", "test_reference_counting_2.py", - "test_unreconstructable_errors.py", + "test_resource_demand_scheduler.py", "test_serialization.py", "test_tensorflow.py", - "test_object_spilling.py", + "test_unreconstructable_errors.py", ], size = "medium", extra_srcs = SRCS, @@ -95,7 +96,6 @@ py_test_module_list( "test_asyncio_cluster.py", "test_autoscaler.py", "test_autoscaler_yaml.py", - "test_autoscaling_policy.py", "test_component_failures.py", "test_command_runner.py", "test_coordinator_server.py", diff --git a/python/ray/tests/test_autoscaling_policy.py b/python/ray/tests/test_autoscaling_policy.py index 5322ebece..df5cc7fa8 100644 --- a/python/ray/tests/test_autoscaling_policy.py +++ b/python/ray/tests/test_autoscaling_policy.py @@ -1,11 +1,13 @@ +import collections import copy import logging import yaml import tempfile -from typing import Dict, Callable +from typing import Dict, Callable, List import shutil from queue import PriorityQueue import unittest +import pytest import ray from ray.tests.test_autoscaler import MockProvider, MockProcessRunner @@ -17,9 +19,12 @@ from ray.autoscaler._private.providers import ( from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.node_launcher import NodeLauncher -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND +from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND, + NODE_KIND_HEAD) from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S from ray.autoscaler._private.cli_logger import cli_logger +from ray.core.generated.common_pb2 import Bundle, PlacementStrategy +from ray.gcs_utils import PlacementGroupTableData class Task: @@ -29,7 +34,6 @@ class Task: resources: Dict[str, float], start_callback: Callable[[None], None] = None, done_callback: Callable[[None], None] = None, - submission_time: float = None, ): self.duration = duration self.resources = resources @@ -44,11 +48,32 @@ class Actor(Task): pass +class PlacementGroup: + def __init__( + self, + duration: float, + bundles: List[Dict[str, float]], + strategy: int, + start_callback: Callable[[None], None] = None, + done_callback: Callable[[None], None] = None, + ): + self.duration = duration + self.bundles = bundles + self.strategy = strategy + self.start_callback = start_callback + self.done_callback = done_callback + self.start_time = None + self.end_time = None + self.node = None + + class Node: - def __init__(self, resources, in_cluster): + def __init__(self, resources, in_cluster, node_type, start_time): self.total_resources = copy.deepcopy(resources) self.available_resources = copy.deepcopy(resources) self.in_cluster = in_cluster + self.node_type = node_type + self.start_time = start_time def bundle_fits(self, bundle): if not self.in_cluster: @@ -93,10 +118,36 @@ class Event: SIMULATOR_EVENT_AUTOSCALER_UPDATE = 0 SIMULATOR_EVENT_TASK_DONE = 1 -SIMULATOR_EVEN_NODE_JOINED = 2 +SIMULATOR_EVENT_NODE_JOINED = 2 +SIMULATOR_EVENT_PG_DONE = 3 class Simulator: + """This autoscaler simulator consists of a few components. + + State is stored in 3 main data structures: + * Resource management state is stored in self.ip_to_nodes + * The scheduler's work queue is stored in self.work_queue + * An event queue which acts as the simulation's "timeline" in + self.event_queue + + + The logic is organized into 3 functions (and their helpers): + * self.run_autoscaler plays the role of `monitor.py` and translates + resource management state for load_metrics to consume. + * self.schedule is the only consumer of the work queue. It dispatches + work to the appropriate schedulers, which mutate cluster state and + produce events for the event queue. + * self.process_event is the sole consumer of the event queue. It + dispatches work to the appropriate event handlers. + + There are 3 main ways of interacting with the simulator: + * simulator.submit: To submit tasks + * simulator.step: To go to the next "event" + * task/actor/placement group start/done callbacks + + """ + def __init__( self, config_path, @@ -119,7 +170,7 @@ class Simulator: self.provider.create_node( {}, { - TAG_RAY_NODE_KIND: "head", + TAG_RAY_NODE_KIND: NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE: self.config["head_node_type"], }, 1, @@ -168,12 +219,13 @@ class Simulator: node_type = node_tags[TAG_RAY_USER_NODE_TYPE] resources = self.config["available_node_types"][node_type].get( "resources", {}) - node = Node(resources, join_immediately) + node = Node(resources, join_immediately, node_type, + self.virtual_time) self.ip_to_nodes[ip] = node if not join_immediately: join_time = self.virtual_time + self.node_startup_delay_s self.event_queue.put( - Event(join_time, SIMULATOR_EVEN_NODE_JOINED, node)) + Event(join_time, SIMULATOR_EVENT_NODE_JOINED, node)) def submit(self, work): if isinstance(work, list): @@ -181,27 +233,74 @@ class Simulator: else: self.work_queue.append(work) - def _schedule_task(self, task): - for ip, node in self.ip_to_nodes.items(): - if node.bundle_fits(task.resources): - node.allocate(task.resources) - task.node = node - task.start_time = self.virtual_time - end_time = self.virtual_time + task.duration - self.event_queue.put( - Event(end_time, SIMULATOR_EVENT_TASK_DONE, task)) - if task.start_callback: - task.start_callback() - return True + def _get_node_to_run(self, bundle, nodes): + for ip, node in nodes.items(): + if node.bundle_fits(bundle): + return ip, node + return None, None - return False + def _schedule_placement_group(self, pg, nodes): + # This scheduling algorithm is bad, but it is approximately as bad as + # the real placement group scheduler. + to_allocate = [] + if (pg.strategy == PlacementStrategy.STRICT_PACK + or pg.strategy == PlacementStrategy.PACK): + combined = collections.defaultdict(float) + for bundle in pg.bundles: + for k, v in bundle.items(): + combined[k] += v + ip, node_to_run = self._get_node_to_run(combined, nodes) + if node_to_run is None: + return False + to_allocate.append((combined, ip)) + elif (pg.strategy == PlacementStrategy.STRICT_SPREAD + or pg.strategy == PlacementStrategy.SPREAD): + # TODO (Alex): More accurate handling of non-STRICT_PACK groups. + remaining_nodes = nodes.copy() + for bundle in pg.bundles: + ip, node_to_run = self._get_node_to_run( + bundle, remaining_nodes) + if node_to_run is None: + return False + del remaining_nodes[ip] + to_allocate.append((bundle, ip)) + + for bundle, ip in to_allocate: + node = self.ip_to_nodes[ip] + node.allocate(bundle) + pg.start_time = self.virtual_time + end_time = self.virtual_time + pg.duration + self.event_queue.put( + Event(end_time, SIMULATOR_EVENT_PG_DONE, (pg, to_allocate))) + if pg.start_callback: + pg.start_callback() + return True + + def _schedule_task(self, task, nodes): + ip, node = self._get_node_to_run(task.resources, nodes) + if node is None: + return False + + node.allocate(task.resources) + task.node = node + task.start_time = self.virtual_time + end_time = self.virtual_time + task.duration + self.event_queue.put(Event(end_time, SIMULATOR_EVENT_TASK_DONE, task)) + if task.start_callback: + task.start_callback() + return True def schedule(self): # TODO (Alex): Implement a more realistic scheduling algorithm. new_work_queue = [] for work in self.work_queue: if isinstance(work, Task): - scheduled = self._schedule_task(work) + scheduled = self._schedule_task(work, self.ip_to_nodes) + elif isinstance(work, PlacementGroup): + scheduled = self._schedule_placement_group( + work, self.ip_to_nodes) + else: + assert False, "Unknown work object!" if scheduled is False: new_work_queue.append(work) @@ -228,9 +327,9 @@ class Simulator: return True def run_autoscaler(self): - waiting_bundles = [] infeasible_bundles = [] + placement_groups = [] for work in self.work_queue: if isinstance(work, Task): shape = work.resources @@ -238,6 +337,16 @@ class Simulator: infeasible_bundles.append(shape) else: waiting_bundles.append(shape) + if isinstance(work, PlacementGroup): + placement_groups.append( + PlacementGroupTableData( + state=PlacementGroupTableData.PENDING, + strategy=work.strategy, + bundles=[ + Bundle(unit_resources=bundle) + for bundle in work.bundles + ], + )) for ip, node in self.ip_to_nodes.items(): if not node.in_cluster: @@ -245,13 +354,11 @@ class Simulator: self.load_metrics.update( ip=ip, static_resources=node.total_resources, - update_dynamic_resources=True, dynamic_resources=node.available_resources, - update_resource_load=False, resource_load={}, waiting_bundles=waiting_bundles, infeasible_bundles=infeasible_bundles, - pending_placement_groups=[], + pending_placement_groups=placement_groups, ) self.autoscaler.update() @@ -269,9 +376,17 @@ class Simulator: task.node.free(task.resources) if task.done_callback: task.done_callback() - elif event.event_type == SIMULATOR_EVEN_NODE_JOINED: + elif event.event_type == SIMULATOR_EVENT_NODE_JOINED: node = event.data node.in_cluster = True + elif event.event_type == SIMULATOR_EVENT_PG_DONE: + pg, allocated = event.data + for bundle, ip in allocated: + self.ip_to_nodes[ip].free(bundle) + if pg.done_callback: + pg.done_callback() + else: + assert False, "Unknown event!" def step(self): self.virtual_time = self.event_queue.queue[0].time @@ -282,9 +397,29 @@ class Simulator: print(self.info_string()) return self.virtual_time + def node_costs(self): + """Returns the cost of nodes. Cost is measured in terms of cumulative hours of + runtime per node type. + + """ + costs = collections.defaultdict(float) + + for node in self.ip_to_nodes.values(): + if not node.in_cluster: + continue + runtime = self.virtual_time - node.start_time + costs[node.node_type] += runtime + return costs + def info_string(self): - return (f"[t={self.virtual_time}] Nodes: {len(self.ip_to_nodes)} " + - f"Remaining requests: {len(self.work_queue)} ") + num_connected_nodes = len( + [node for node in self.ip_to_nodes.values() if node.in_cluster]) + num_pending_nodes = len(self.ip_to_nodes) - num_connected_nodes + return f"""[t={self.virtual_time}] + Connected nodes: {num_connected_nodes} + Pending nodes: {num_pending_nodes} + Remaining requests: {len(self.work_queue)} + """ SAMPLE_CLUSTER_CONFIG = copy.deepcopy(MULTI_WORKER_CLUSTER) @@ -330,7 +465,6 @@ class AutoscalingPolicyTest(unittest.TestCase): return path def testManyTasks(self): - cli_logger.configure(log_style="record", verbosity=-1) config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG) config_path = self.write_config(config) self.provider = MockProvider() @@ -354,10 +488,11 @@ class AutoscalingPolicyTest(unittest.TestCase): while done_count < len(tasks): time = simulator.step() - assert time < 400 + assert time < 850 + # TODO (Alex): Not clear what's actually worth asserting here. + assert simulator.node_costs() def testManyActors(self): - # cli_logger.configure(log_style="record", verbosity=-1) config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG) config_path = self.write_config(config) self.provider = MockProvider() @@ -382,4 +517,75 @@ class AutoscalingPolicyTest(unittest.TestCase): while start_count < len(tasks): time = simulator.step() - assert time < 200 + assert time < 650 + + def testManyPlacementGroups(self): + config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG) + config_path = self.write_config(config) + self.provider = MockProvider() + simulator = Simulator(config_path, self.provider) + + start_count = 0 + + def start_callback(): + nonlocal start_count + start_count += 1 + + placement_group_requests = [] + + for _ in range(500): + placement_group_requests.append( + PlacementGroup( + duration=float("inf"), + bundles=[{ + "CPU": 1 + }, { + "CPU": 2 + }], + strategy=PlacementStrategy.STRICT_PACK, + start_callback=start_callback, + )) + + for _ in range(500): + placement_group_requests.append( + PlacementGroup( + duration=float("inf"), + bundles=[{ + "CPU": 1 + }, { + "CPU": 2 + }], + strategy=PlacementStrategy.STRICT_SPREAD, + start_callback=start_callback, + )) + + # SPREAD and PACK tests fail, but under the real GCS placement group + # scheduling algorithm we could also be left in a situation in which + # the autoscaler thinks the placement group is placeable, but the + # placement group scheduler doesn't know how to schedule it. + + # for _ in range(500): + # placement_group_requests.append(PlacementGroup( + # duration=float("inf"), bundles=[{"CPU": 1}, {"CPU": 2}], + # strategy=PlacementStrategy.PACK, + # start_callback=start_callback)) + + # for _ in range(500): + # placement_group_requests.append(PlacementGroup( + # duration=float("inf"), + # bundles=[{"CPU": 2}, {"CPU": 1}], + # strategy=PlacementStrategy.SPREAD, + # start_callback=start_callback)) + + simulator.submit(placement_group_requests) + + time = 0 + while start_count < len(placement_group_requests): + time = simulator.step() + + assert time < 630 + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__]))