[Autoscaler] simulator placement groups (#11777)

This commit is contained in:
Alex Wu
2020-11-10 18:10:36 -08:00
committed by GitHub
parent 46f3652102
commit 8afd2acdc1
2 changed files with 243 additions and 37 deletions
+4 -4
View File
@@ -17,6 +17,7 @@ py_test_module_list(
"test_advanced.py",
"test_advanced_2.py",
"test_array.py",
"test_autoscaling_policy.py",
"test_basic.py",
"test_basic_2.py",
"test_cancel.py",
@@ -25,7 +26,6 @@ py_test_module_list(
"test_error_ray_not_initialized.py",
"test_gcs_fault_tolerance.py",
"test_iter.py",
"test_resource_demand_scheduler.py",
],
size = "medium",
extra_srcs = SRCS,
@@ -57,12 +57,13 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_spilling.py",
"test_output.py",
"test_reference_counting_2.py",
"test_unreconstructable_errors.py",
"test_resource_demand_scheduler.py",
"test_serialization.py",
"test_tensorflow.py",
"test_object_spilling.py",
"test_unreconstructable_errors.py",
],
size = "medium",
extra_srcs = SRCS,
@@ -95,7 +96,6 @@ py_test_module_list(
"test_asyncio_cluster.py",
"test_autoscaler.py",
"test_autoscaler_yaml.py",
"test_autoscaling_policy.py",
"test_component_failures.py",
"test_command_runner.py",
"test_coordinator_server.py",
+239 -33
View File
@@ -1,11 +1,13 @@
import collections
import copy
import logging
import yaml
import tempfile
from typing import Dict, Callable
from typing import Dict, Callable, List
import shutil
from queue import PriorityQueue
import unittest
import pytest
import ray
from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
@@ -17,9 +19,12 @@ from ray.autoscaler._private.providers import (
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND
from ray.autoscaler.tags import (TAG_RAY_USER_NODE_TYPE, TAG_RAY_NODE_KIND,
NODE_KIND_HEAD)
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S
from ray.autoscaler._private.cli_logger import cli_logger
from ray.core.generated.common_pb2 import Bundle, PlacementStrategy
from ray.gcs_utils import PlacementGroupTableData
class Task:
@@ -29,7 +34,6 @@ class Task:
resources: Dict[str, float],
start_callback: Callable[[None], None] = None,
done_callback: Callable[[None], None] = None,
submission_time: float = None,
):
self.duration = duration
self.resources = resources
@@ -44,11 +48,32 @@ class Actor(Task):
pass
class PlacementGroup:
def __init__(
self,
duration: float,
bundles: List[Dict[str, float]],
strategy: int,
start_callback: Callable[[None], None] = None,
done_callback: Callable[[None], None] = None,
):
self.duration = duration
self.bundles = bundles
self.strategy = strategy
self.start_callback = start_callback
self.done_callback = done_callback
self.start_time = None
self.end_time = None
self.node = None
class Node:
def __init__(self, resources, in_cluster):
def __init__(self, resources, in_cluster, node_type, start_time):
self.total_resources = copy.deepcopy(resources)
self.available_resources = copy.deepcopy(resources)
self.in_cluster = in_cluster
self.node_type = node_type
self.start_time = start_time
def bundle_fits(self, bundle):
if not self.in_cluster:
@@ -93,10 +118,36 @@ class Event:
SIMULATOR_EVENT_AUTOSCALER_UPDATE = 0
SIMULATOR_EVENT_TASK_DONE = 1
SIMULATOR_EVEN_NODE_JOINED = 2
SIMULATOR_EVENT_NODE_JOINED = 2
SIMULATOR_EVENT_PG_DONE = 3
class Simulator:
"""This autoscaler simulator consists of a few components.
State is stored in 3 main data structures:
* Resource management state is stored in self.ip_to_nodes
* The scheduler's work queue is stored in self.work_queue
* An event queue which acts as the simulation's "timeline" in
self.event_queue
The logic is organized into 3 functions (and their helpers):
* self.run_autoscaler plays the role of `monitor.py` and translates
resource management state for load_metrics to consume.
* self.schedule is the only consumer of the work queue. It dispatches
work to the appropriate schedulers, which mutate cluster state and
produce events for the event queue.
* self.process_event is the sole consumer of the event queue. It
dispatches work to the appropriate event handlers.
There are 3 main ways of interacting with the simulator:
* simulator.submit: To submit tasks
* simulator.step: To go to the next "event"
* task/actor/placement group start/done callbacks
"""
def __init__(
self,
config_path,
@@ -119,7 +170,7 @@ class Simulator:
self.provider.create_node(
{},
{
TAG_RAY_NODE_KIND: "head",
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
TAG_RAY_USER_NODE_TYPE: self.config["head_node_type"],
},
1,
@@ -168,12 +219,13 @@ class Simulator:
node_type = node_tags[TAG_RAY_USER_NODE_TYPE]
resources = self.config["available_node_types"][node_type].get(
"resources", {})
node = Node(resources, join_immediately)
node = Node(resources, join_immediately, node_type,
self.virtual_time)
self.ip_to_nodes[ip] = node
if not join_immediately:
join_time = self.virtual_time + self.node_startup_delay_s
self.event_queue.put(
Event(join_time, SIMULATOR_EVEN_NODE_JOINED, node))
Event(join_time, SIMULATOR_EVENT_NODE_JOINED, node))
def submit(self, work):
if isinstance(work, list):
@@ -181,27 +233,74 @@ class Simulator:
else:
self.work_queue.append(work)
def _schedule_task(self, task):
for ip, node in self.ip_to_nodes.items():
if node.bundle_fits(task.resources):
node.allocate(task.resources)
task.node = node
task.start_time = self.virtual_time
end_time = self.virtual_time + task.duration
self.event_queue.put(
Event(end_time, SIMULATOR_EVENT_TASK_DONE, task))
if task.start_callback:
task.start_callback()
return True
def _get_node_to_run(self, bundle, nodes):
for ip, node in nodes.items():
if node.bundle_fits(bundle):
return ip, node
return None, None
return False
def _schedule_placement_group(self, pg, nodes):
# This scheduling algorithm is bad, but it is approximately as bad as
# the real placement group scheduler.
to_allocate = []
if (pg.strategy == PlacementStrategy.STRICT_PACK
or pg.strategy == PlacementStrategy.PACK):
combined = collections.defaultdict(float)
for bundle in pg.bundles:
for k, v in bundle.items():
combined[k] += v
ip, node_to_run = self._get_node_to_run(combined, nodes)
if node_to_run is None:
return False
to_allocate.append((combined, ip))
elif (pg.strategy == PlacementStrategy.STRICT_SPREAD
or pg.strategy == PlacementStrategy.SPREAD):
# TODO (Alex): More accurate handling of non-STRICT_PACK groups.
remaining_nodes = nodes.copy()
for bundle in pg.bundles:
ip, node_to_run = self._get_node_to_run(
bundle, remaining_nodes)
if node_to_run is None:
return False
del remaining_nodes[ip]
to_allocate.append((bundle, ip))
for bundle, ip in to_allocate:
node = self.ip_to_nodes[ip]
node.allocate(bundle)
pg.start_time = self.virtual_time
end_time = self.virtual_time + pg.duration
self.event_queue.put(
Event(end_time, SIMULATOR_EVENT_PG_DONE, (pg, to_allocate)))
if pg.start_callback:
pg.start_callback()
return True
def _schedule_task(self, task, nodes):
ip, node = self._get_node_to_run(task.resources, nodes)
if node is None:
return False
node.allocate(task.resources)
task.node = node
task.start_time = self.virtual_time
end_time = self.virtual_time + task.duration
self.event_queue.put(Event(end_time, SIMULATOR_EVENT_TASK_DONE, task))
if task.start_callback:
task.start_callback()
return True
def schedule(self):
# TODO (Alex): Implement a more realistic scheduling algorithm.
new_work_queue = []
for work in self.work_queue:
if isinstance(work, Task):
scheduled = self._schedule_task(work)
scheduled = self._schedule_task(work, self.ip_to_nodes)
elif isinstance(work, PlacementGroup):
scheduled = self._schedule_placement_group(
work, self.ip_to_nodes)
else:
assert False, "Unknown work object!"
if scheduled is False:
new_work_queue.append(work)
@@ -228,9 +327,9 @@ class Simulator:
return True
def run_autoscaler(self):
waiting_bundles = []
infeasible_bundles = []
placement_groups = []
for work in self.work_queue:
if isinstance(work, Task):
shape = work.resources
@@ -238,6 +337,16 @@ class Simulator:
infeasible_bundles.append(shape)
else:
waiting_bundles.append(shape)
if isinstance(work, PlacementGroup):
placement_groups.append(
PlacementGroupTableData(
state=PlacementGroupTableData.PENDING,
strategy=work.strategy,
bundles=[
Bundle(unit_resources=bundle)
for bundle in work.bundles
],
))
for ip, node in self.ip_to_nodes.items():
if not node.in_cluster:
@@ -245,13 +354,11 @@ class Simulator:
self.load_metrics.update(
ip=ip,
static_resources=node.total_resources,
update_dynamic_resources=True,
dynamic_resources=node.available_resources,
update_resource_load=False,
resource_load={},
waiting_bundles=waiting_bundles,
infeasible_bundles=infeasible_bundles,
pending_placement_groups=[],
pending_placement_groups=placement_groups,
)
self.autoscaler.update()
@@ -269,9 +376,17 @@ class Simulator:
task.node.free(task.resources)
if task.done_callback:
task.done_callback()
elif event.event_type == SIMULATOR_EVEN_NODE_JOINED:
elif event.event_type == SIMULATOR_EVENT_NODE_JOINED:
node = event.data
node.in_cluster = True
elif event.event_type == SIMULATOR_EVENT_PG_DONE:
pg, allocated = event.data
for bundle, ip in allocated:
self.ip_to_nodes[ip].free(bundle)
if pg.done_callback:
pg.done_callback()
else:
assert False, "Unknown event!"
def step(self):
self.virtual_time = self.event_queue.queue[0].time
@@ -282,9 +397,29 @@ class Simulator:
print(self.info_string())
return self.virtual_time
def node_costs(self):
"""Returns the cost of nodes. Cost is measured in terms of cumulative hours of
runtime per node type.
"""
costs = collections.defaultdict(float)
for node in self.ip_to_nodes.values():
if not node.in_cluster:
continue
runtime = self.virtual_time - node.start_time
costs[node.node_type] += runtime
return costs
def info_string(self):
return (f"[t={self.virtual_time}] Nodes: {len(self.ip_to_nodes)} " +
f"Remaining requests: {len(self.work_queue)} ")
num_connected_nodes = len(
[node for node in self.ip_to_nodes.values() if node.in_cluster])
num_pending_nodes = len(self.ip_to_nodes) - num_connected_nodes
return f"""[t={self.virtual_time}]
Connected nodes: {num_connected_nodes}
Pending nodes: {num_pending_nodes}
Remaining requests: {len(self.work_queue)}
"""
SAMPLE_CLUSTER_CONFIG = copy.deepcopy(MULTI_WORKER_CLUSTER)
@@ -330,7 +465,6 @@ class AutoscalingPolicyTest(unittest.TestCase):
return path
def testManyTasks(self):
cli_logger.configure(log_style="record", verbosity=-1)
config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG)
config_path = self.write_config(config)
self.provider = MockProvider()
@@ -354,10 +488,11 @@ class AutoscalingPolicyTest(unittest.TestCase):
while done_count < len(tasks):
time = simulator.step()
assert time < 400
assert time < 850
# TODO (Alex): Not clear what's actually worth asserting here.
assert simulator.node_costs()
def testManyActors(self):
# cli_logger.configure(log_style="record", verbosity=-1)
config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG)
config_path = self.write_config(config)
self.provider = MockProvider()
@@ -382,4 +517,75 @@ class AutoscalingPolicyTest(unittest.TestCase):
while start_count < len(tasks):
time = simulator.step()
assert time < 200
assert time < 650
def testManyPlacementGroups(self):
config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG)
config_path = self.write_config(config)
self.provider = MockProvider()
simulator = Simulator(config_path, self.provider)
start_count = 0
def start_callback():
nonlocal start_count
start_count += 1
placement_group_requests = []
for _ in range(500):
placement_group_requests.append(
PlacementGroup(
duration=float("inf"),
bundles=[{
"CPU": 1
}, {
"CPU": 2
}],
strategy=PlacementStrategy.STRICT_PACK,
start_callback=start_callback,
))
for _ in range(500):
placement_group_requests.append(
PlacementGroup(
duration=float("inf"),
bundles=[{
"CPU": 1
}, {
"CPU": 2
}],
strategy=PlacementStrategy.STRICT_SPREAD,
start_callback=start_callback,
))
# SPREAD and PACK tests fail, but under the real GCS placement group
# scheduling algorithm we could also be left in a situation in which
# the autoscaler thinks the placement group is placeable, but the
# placement group scheduler doesn't know how to schedule it.
# for _ in range(500):
# placement_group_requests.append(PlacementGroup(
# duration=float("inf"), bundles=[{"CPU": 1}, {"CPU": 2}],
# strategy=PlacementStrategy.PACK,
# start_callback=start_callback))
# for _ in range(500):
# placement_group_requests.append(PlacementGroup(
# duration=float("inf"),
# bundles=[{"CPU": 2}, {"CPU": 1}],
# strategy=PlacementStrategy.SPREAD,
# start_callback=start_callback))
simulator.submit(placement_group_requests)
time = 0
while start_count < len(placement_group_requests):
time = simulator.step()
assert time < 630
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))