mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 22:34:24 +08:00
[autoscaler] Rename instance_type => node_type, TAG_RAY_INSTANCE_TYPE => TAG_RAY_USER_NODE_TYPE (#10207)
This commit is contained in:
@@ -15,8 +15,8 @@ from ray.experimental.internal_kv import _internal_kv_put, \
|
||||
from ray.autoscaler.node_provider import get_node_provider
|
||||
from ray.autoscaler.tags import (
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
|
||||
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE,
|
||||
TAG_RAY_INSTANCE_TYPE, STATUS_UP_TO_DATE, NODE_TYPE_WORKER)
|
||||
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
|
||||
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER)
|
||||
from ray.autoscaler.updater import NodeUpdaterThread
|
||||
from ray.autoscaler.node_launcher import NodeLauncher
|
||||
from ray.autoscaler.resource_demand_scheduler import ResourceDemandScheduler
|
||||
@@ -65,11 +65,12 @@ class StandardAutoscaler:
|
||||
|
||||
# Check whether we can enable the resource demand scheduler.
|
||||
if "available_node_types" in self.config:
|
||||
self.instance_types = self.config["available_node_types"]
|
||||
self.available_node_types = self.config["available_node_types"]
|
||||
self.resource_demand_scheduler = ResourceDemandScheduler(
|
||||
self.provider, self.instance_types, self.config["max_workers"])
|
||||
self.provider, self.available_node_types,
|
||||
self.config["max_workers"])
|
||||
else:
|
||||
self.instance_types = None
|
||||
self.available_node_types = None
|
||||
self.resource_demand_scheduler = None
|
||||
|
||||
self.max_failures = max_failures
|
||||
@@ -97,7 +98,7 @@ class StandardAutoscaler:
|
||||
queue=self.launch_queue,
|
||||
index=i,
|
||||
pending=self.pending_launches,
|
||||
instance_types=self.instance_types,
|
||||
node_types=self.available_node_types,
|
||||
)
|
||||
node_launcher.daemon = True
|
||||
node_launcher.start()
|
||||
@@ -203,8 +204,8 @@ class StandardAutoscaler:
|
||||
nodes, self.pending_launches.breakdown(),
|
||||
self.resource_demand_vector))
|
||||
# TODO(ekl) also enforce max launch concurrency here?
|
||||
for instance_type, count in instances:
|
||||
self.launch_new_node(count, instance_type=instance_type)
|
||||
for node_type, count in instances:
|
||||
self.launch_new_node(count, node_type=node_type)
|
||||
|
||||
# Launch additional nodes of the default type, if still needed.
|
||||
num_workers = len(nodes) + num_pending
|
||||
@@ -262,10 +263,11 @@ class StandardAutoscaler:
|
||||
self.recover_if_needed(node_id, now)
|
||||
|
||||
def _node_resources(self, node_id):
|
||||
instance_type = self.provider.node_tags(node_id).get(
|
||||
TAG_RAY_INSTANCE_TYPE)
|
||||
if self.instance_types and instance_type in self.instance_types:
|
||||
return self.instance_types[instance_type].get("resources", {})
|
||||
node_type = self.provider.node_tags(node_id).get(
|
||||
TAG_RAY_USER_NODE_TYPE)
|
||||
if self.available_node_types:
|
||||
return self.available_node_types.get(node_type, {}).get(
|
||||
"resources", {})
|
||||
else:
|
||||
return {}
|
||||
|
||||
@@ -441,17 +443,16 @@ class StandardAutoscaler:
|
||||
return False
|
||||
return True
|
||||
|
||||
def launch_new_node(self, count: int,
|
||||
instance_type: Optional[str]) -> None:
|
||||
def launch_new_node(self, count: int, node_type: Optional[str]) -> None:
|
||||
logger.info(
|
||||
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
|
||||
self.pending_launches.inc(instance_type, count)
|
||||
self.pending_launches.inc(node_type, count)
|
||||
config = copy.deepcopy(self.config)
|
||||
self.launch_queue.put((config, count, instance_type))
|
||||
self.launch_queue.put((config, count, node_type))
|
||||
|
||||
def workers(self):
|
||||
return self.provider.non_terminated_nodes(
|
||||
tag_filters={TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER})
|
||||
tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
|
||||
|
||||
def log_info_string(self, nodes, target):
|
||||
tmp = "Cluster status: "
|
||||
|
||||
@@ -12,7 +12,7 @@ from botocore.config import Config
|
||||
import botocore
|
||||
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES
|
||||
from ray.autoscaler.tags import NODE_TYPE_WORKER, NODE_TYPE_HEAD
|
||||
from ray.autoscaler.tags import NODE_KIND_WORKER, NODE_KIND_HEAD
|
||||
from ray.autoscaler.aws.utils import LazyDefaultDict, handle_boto_error
|
||||
from ray.autoscaler.node_provider import PROVIDER_PRETTY_NAMES
|
||||
|
||||
@@ -29,8 +29,8 @@ SECURITY_GROUP_TEMPLATE = RAY + "-{}"
|
||||
# Mapping from the node type tag to the section of the autoscaler yaml that
|
||||
# contains the config for the node type.
|
||||
NODE_TYPE_CONFIG_KEYS = {
|
||||
NODE_TYPE_WORKER: "worker_nodes",
|
||||
NODE_TYPE_HEAD: "head_node",
|
||||
NODE_KIND_WORKER: "worker_nodes",
|
||||
NODE_KIND_HEAD: "head_node",
|
||||
}
|
||||
|
||||
DEFAULT_AMI_NAME = "AWS Deep Learning AMI (Ubuntu 18.04) V30.0"
|
||||
@@ -434,8 +434,8 @@ def _configure_security_group(config):
|
||||
|
||||
security_groups = _upsert_security_groups(config, node_types_to_configure)
|
||||
|
||||
if NODE_TYPE_HEAD in node_types_to_configure:
|
||||
head_sg = security_groups[NODE_TYPE_HEAD]
|
||||
if NODE_KIND_HEAD in node_types_to_configure:
|
||||
head_sg = security_groups[NODE_KIND_HEAD]
|
||||
|
||||
_set_config_info(head_security_group_src="default")
|
||||
cli_logger.old_info(
|
||||
@@ -444,8 +444,8 @@ def _configure_security_group(config):
|
||||
head_sg.group_name, head_sg.id)
|
||||
config["head_node"]["SecurityGroupIds"] = [head_sg.id]
|
||||
|
||||
if NODE_TYPE_WORKER in node_types_to_configure:
|
||||
workers_sg = security_groups[NODE_TYPE_WORKER]
|
||||
if NODE_KIND_WORKER in node_types_to_configure:
|
||||
workers_sg = security_groups[NODE_KIND_WORKER]
|
||||
|
||||
_set_config_info(workers_security_group_src="default")
|
||||
cli_logger.old_info(
|
||||
|
||||
@@ -11,7 +11,7 @@ from botocore.config import Config
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.aws.config import bootstrap_aws
|
||||
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME, \
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_TYPE, TAG_RAY_INSTANCE_TYPE
|
||||
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_KIND, TAG_RAY_INSTANCE_TYPE
|
||||
from ray.ray_constants import BOTO_MAX_RETRIES, BOTO_CREATE_MAX_RETRIES
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
@@ -209,8 +209,8 @@ class AWSNodeProvider(NodeProvider):
|
||||
"Values": [self.cluster_name],
|
||||
},
|
||||
{
|
||||
"Name": "tag:{}".format(TAG_RAY_NODE_TYPE),
|
||||
"Values": [tags[TAG_RAY_NODE_TYPE]],
|
||||
"Name": "tag:{}".format(TAG_RAY_NODE_KIND),
|
||||
"Values": [tags[TAG_RAY_NODE_KIND]],
|
||||
},
|
||||
{
|
||||
"Name": "tag:{}".format(TAG_RAY_LAUNCH_CONFIG),
|
||||
|
||||
@@ -135,7 +135,7 @@ class AzureNodeProvider(NodeProvider):
|
||||
nodes() must be called again to refresh results.
|
||||
|
||||
Examples:
|
||||
>>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
|
||||
>>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"})
|
||||
["node-1", "node-2"]
|
||||
"""
|
||||
nodes = self._get_filtered_nodes(tag_filters=tag_filters)
|
||||
|
||||
@@ -25,8 +25,8 @@ from ray.autoscaler.util import validate_config, hash_runtime_conf, \
|
||||
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS, \
|
||||
PROVIDER_PRETTY_NAMES, try_get_log_state, try_logging_config, \
|
||||
try_reload_log_state
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
|
||||
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD, TAG_RAY_INSTANCE_TYPE
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \
|
||||
TAG_RAY_NODE_NAME, NODE_KIND_WORKER, NODE_KIND_HEAD, TAG_RAY_USER_NODE_TYPE
|
||||
|
||||
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler.updater import NodeUpdaterThread
|
||||
@@ -82,7 +82,7 @@ def request_resources(num_cpus=None, bundles=None):
|
||||
Args:
|
||||
num_cpus: int -- the number of CPU cores to request
|
||||
bundles: List[dict] -- list of resource dicts (e.g., {"CPU": 1}). This
|
||||
only has an effect if you've configured `available_instance_types`
|
||||
only has an effect if you've configured `available_node_types`
|
||||
if your cluster config.
|
||||
"""
|
||||
r = _redis()
|
||||
@@ -311,7 +311,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
|
||||
def remaining_nodes():
|
||||
|
||||
workers = provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
})
|
||||
|
||||
if keep_min_workers:
|
||||
@@ -336,7 +336,7 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
|
||||
return workers
|
||||
|
||||
head = provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
|
||||
})
|
||||
|
||||
return head + workers
|
||||
@@ -379,7 +379,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
try:
|
||||
nodes = provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
})
|
||||
node = random.choice(nodes)
|
||||
cli_logger.print("Shutdown " + cf.bold("{}"), node)
|
||||
@@ -472,7 +472,7 @@ def get_or_create_head_node(config,
|
||||
config_file = os.path.abspath(config_file)
|
||||
try:
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD,
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
}
|
||||
nodes = provider.non_terminated_nodes(head_node_tags)
|
||||
if len(nodes) > 0:
|
||||
@@ -520,7 +520,7 @@ def get_or_create_head_node(config,
|
||||
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
|
||||
head_node_config = copy.deepcopy(config["head_node"])
|
||||
if "head_node_type" in config:
|
||||
head_node_tags[TAG_RAY_INSTANCE_TYPE] = config["head_node_type"]
|
||||
head_node_tags[TAG_RAY_USER_NODE_TYPE] = config["head_node_type"]
|
||||
head_node_config.update(config["available_node_types"][config[
|
||||
"head_node_type"]]["node_config"])
|
||||
|
||||
@@ -985,7 +985,7 @@ def get_worker_node_ips(config_file: str,
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
try:
|
||||
nodes = provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
})
|
||||
|
||||
if config.get("provider", {}).get("use_internal_ips", False) is True:
|
||||
@@ -1005,7 +1005,7 @@ def _get_worker_nodes(config, override_cluster_name):
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
try:
|
||||
return provider.non_terminated_nodes({
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
})
|
||||
finally:
|
||||
provider.cleanup()
|
||||
@@ -1018,7 +1018,7 @@ def _get_head_node(config: Dict[str, Any],
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
try:
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD,
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
}
|
||||
nodes = provider.non_terminated_nodes(head_node_tags)
|
||||
finally:
|
||||
|
||||
@@ -8,9 +8,9 @@ import logging
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.local.config import bootstrap_local
|
||||
from ray.autoscaler.tags import (
|
||||
TAG_RAY_NODE_TYPE,
|
||||
NODE_TYPE_WORKER,
|
||||
NODE_TYPE_HEAD,
|
||||
TAG_RAY_NODE_KIND,
|
||||
NODE_KIND_WORKER,
|
||||
NODE_KIND_HEAD,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -31,8 +31,8 @@ class ClusterState:
|
||||
workers = json.loads(open(self.save_path).read())
|
||||
head_config = workers.get(provider_config["head_ip"])
|
||||
if (not head_config or
|
||||
head_config.get("tags", {}).get(TAG_RAY_NODE_TYPE)
|
||||
!= NODE_TYPE_HEAD):
|
||||
head_config.get("tags", {}).get(TAG_RAY_NODE_KIND)
|
||||
!= NODE_KIND_HEAD):
|
||||
workers = {}
|
||||
logger.info("Head IP changed - recreating cluster.")
|
||||
else:
|
||||
@@ -43,23 +43,23 @@ class ClusterState:
|
||||
if worker_ip not in workers:
|
||||
workers[worker_ip] = {
|
||||
"tags": {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
},
|
||||
"state": "terminated",
|
||||
}
|
||||
else:
|
||||
assert (workers[worker_ip]["tags"][TAG_RAY_NODE_TYPE]
|
||||
== NODE_TYPE_WORKER)
|
||||
assert (workers[worker_ip]["tags"][TAG_RAY_NODE_KIND]
|
||||
== NODE_KIND_WORKER)
|
||||
if provider_config["head_ip"] not in workers:
|
||||
workers[provider_config["head_ip"]] = {
|
||||
"tags": {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
|
||||
},
|
||||
"state": "terminated",
|
||||
}
|
||||
else:
|
||||
assert (workers[provider_config["head_ip"]]["tags"][
|
||||
TAG_RAY_NODE_TYPE] == NODE_TYPE_HEAD)
|
||||
TAG_RAY_NODE_KIND] == NODE_KIND_HEAD)
|
||||
# Relevant when a user reduces the number of workers
|
||||
# without changing the headnode.
|
||||
list_of_node_ips = list(provider_config["worker_ips"])
|
||||
@@ -209,13 +209,13 @@ class LocalNodeProvider(NodeProvider):
|
||||
|
||||
def create_node(self, node_config, tags, count):
|
||||
"""Creates min(count, currently available) nodes."""
|
||||
node_type = tags[TAG_RAY_NODE_TYPE]
|
||||
node_type = tags[TAG_RAY_NODE_KIND]
|
||||
with self.state.file_lock:
|
||||
workers = self.state.get()
|
||||
for node_id, info in workers.items():
|
||||
if (info["state"] == "terminated"
|
||||
and (self.use_coordinator
|
||||
or info["tags"][TAG_RAY_NODE_TYPE] == node_type)):
|
||||
or info["tags"][TAG_RAY_NODE_KIND] == node_type)):
|
||||
info["tags"] = tags
|
||||
info["state"] = "running"
|
||||
self.state.put(node_id, info)
|
||||
|
||||
@@ -4,9 +4,9 @@ import logging
|
||||
import threading
|
||||
|
||||
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_NODE_STATUS,
|
||||
TAG_RAY_NODE_TYPE, TAG_RAY_NODE_NAME,
|
||||
TAG_RAY_INSTANCE_TYPE, STATUS_UNINITIALIZED,
|
||||
NODE_TYPE_WORKER)
|
||||
TAG_RAY_NODE_KIND, TAG_RAY_NODE_NAME,
|
||||
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED,
|
||||
NODE_KIND_WORKER)
|
||||
from ray.autoscaler.util import hash_launch_conf
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,29 +19,29 @@ class NodeLauncher(threading.Thread):
|
||||
provider,
|
||||
queue,
|
||||
pending,
|
||||
instance_types=None,
|
||||
node_types=None,
|
||||
index=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
self.queue = queue
|
||||
self.pending = pending
|
||||
self.provider = provider
|
||||
self.instance_types = instance_types
|
||||
self.node_types = node_types
|
||||
self.index = str(index) if index is not None else ""
|
||||
super(NodeLauncher, self).__init__(*args, **kwargs)
|
||||
|
||||
def _launch_node(self, config: Dict[str, Any], count: int,
|
||||
instance_type: Optional[str]):
|
||||
if self.instance_types:
|
||||
assert instance_type, instance_type
|
||||
worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}
|
||||
node_type: Optional[str]):
|
||||
if self.node_types:
|
||||
assert node_type, node_type
|
||||
worker_filter = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER}
|
||||
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
|
||||
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
|
||||
self.log("Launching {} nodes, type {}.".format(count, instance_type))
|
||||
self.log("Launching {} nodes, type {}.".format(count, node_type))
|
||||
node_config = copy.deepcopy(config["worker_nodes"])
|
||||
node_tags = {
|
||||
TAG_RAY_NODE_NAME: "ray-{}-worker".format(config["cluster_name"]),
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER,
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER,
|
||||
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED,
|
||||
TAG_RAY_LAUNCH_CONFIG: launch_hash,
|
||||
}
|
||||
@@ -49,10 +49,10 @@ class NodeLauncher(threading.Thread):
|
||||
# merge the configs. We merge the configs instead of overriding, so
|
||||
# that the bootstrapped per-cloud properties are preserved.
|
||||
# TODO(ekl) this logic is duplicated in commands.py (keep in sync)
|
||||
if instance_type:
|
||||
node_tags[TAG_RAY_INSTANCE_TYPE] = instance_type
|
||||
if node_type:
|
||||
node_tags[TAG_RAY_USER_NODE_TYPE] = node_type
|
||||
node_config.update(
|
||||
config["available_node_types"][instance_type]["node_config"])
|
||||
config["available_node_types"][node_type]["node_config"])
|
||||
self.provider.create_node(node_config, node_tags, count)
|
||||
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
|
||||
if set(after).issubset(before):
|
||||
@@ -60,14 +60,14 @@ class NodeLauncher(threading.Thread):
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
config, count, instance_type = self.queue.get()
|
||||
config, count, node_type = self.queue.get()
|
||||
self.log("Got {} nodes to launch.".format(count))
|
||||
try:
|
||||
self._launch_node(config, count, instance_type)
|
||||
self._launch_node(config, count, node_type)
|
||||
except Exception:
|
||||
logger.exception("Launch failed")
|
||||
finally:
|
||||
self.pending.dec(instance_type, count)
|
||||
self.pending.dec(node_type, count)
|
||||
|
||||
def log(self, statement):
|
||||
prefix = "NodeLauncher{}:".format(self.index)
|
||||
|
||||
@@ -186,7 +186,7 @@ class NodeProvider:
|
||||
nodes() must be called again to refresh results.
|
||||
|
||||
Examples:
|
||||
>>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"})
|
||||
>>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"})
|
||||
["node-1", "node-2"]
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -5,15 +5,15 @@ import collections
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE
|
||||
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# e.g., m4.16xlarge.
|
||||
InstanceType = str
|
||||
NodeType = str
|
||||
|
||||
# e.g., {"resources": ..., "max_workers": ...}.
|
||||
InstanceTypeConfigDict = str
|
||||
NodeTypeConfigDict = str
|
||||
|
||||
# e.g., {"GPU": 1}.
|
||||
ResourceDict = str
|
||||
@@ -24,60 +24,59 @@ NodeID = str
|
||||
|
||||
class ResourceDemandScheduler:
|
||||
def __init__(self, provider: NodeProvider,
|
||||
instance_types: Dict[InstanceType, InstanceTypeConfigDict],
|
||||
node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
max_workers: int):
|
||||
self.provider = provider
|
||||
self.instance_types = instance_types
|
||||
self.node_types = node_types
|
||||
self.max_workers = max_workers
|
||||
|
||||
def debug_string(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[NodeID, int]) -> str:
|
||||
node_resources, instance_type_counts = self.calculate_node_resources(
|
||||
node_resources, node_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
|
||||
out = "Worker instance types:"
|
||||
for instance_type, count in instance_type_counts.items():
|
||||
out += "\n - {}: {}".format(instance_type, count)
|
||||
if pending_nodes.get(instance_type):
|
||||
out += " ({} pending)".format(pending_nodes[instance_type])
|
||||
for node_type, count in node_type_counts.items():
|
||||
out += "\n - {}: {}".format(node_type, count)
|
||||
if pending_nodes.get(node_type):
|
||||
out += " ({} pending)".format(pending_nodes[node_type])
|
||||
|
||||
return out
|
||||
|
||||
def calculate_node_resources(
|
||||
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
|
||||
) -> (List[ResourceDict], Dict[InstanceType, int]):
|
||||
) -> (List[ResourceDict], Dict[NodeType, int]):
|
||||
"""Returns node resource list and instance type counts."""
|
||||
|
||||
node_resources = []
|
||||
instance_type_counts = collections.defaultdict(int)
|
||||
node_type_counts = collections.defaultdict(int)
|
||||
|
||||
def add_instance(instance_type):
|
||||
if instance_type not in self.instance_types:
|
||||
raise RuntimeError(
|
||||
"Missing entry for instance_type {} in "
|
||||
"available_instance_types config: {}".format(
|
||||
instance_type, self.instance_types))
|
||||
def add_instance(node_type):
|
||||
if node_type not in self.node_types:
|
||||
raise RuntimeError("Missing entry for node_type {} in "
|
||||
"available_node_types config: {}".format(
|
||||
node_type, self.node_types))
|
||||
# Careful not to include the same dict object multiple times.
|
||||
node_resources.append(
|
||||
copy.deepcopy(self.instance_types[instance_type]["resources"]))
|
||||
instance_type_counts[instance_type] += 1
|
||||
copy.deepcopy(self.node_types[node_type]["resources"]))
|
||||
node_type_counts[node_type] += 1
|
||||
|
||||
for node_id in nodes:
|
||||
tags = self.provider.node_tags(node_id)
|
||||
if TAG_RAY_INSTANCE_TYPE in tags:
|
||||
instance_type = tags[TAG_RAY_INSTANCE_TYPE]
|
||||
add_instance(instance_type)
|
||||
if TAG_RAY_USER_NODE_TYPE in tags:
|
||||
node_type = tags[TAG_RAY_USER_NODE_TYPE]
|
||||
add_instance(node_type)
|
||||
|
||||
for instance_type, count in pending_nodes.items():
|
||||
for node_type, count in pending_nodes.items():
|
||||
for _ in range(count):
|
||||
add_instance(instance_type)
|
||||
add_instance(node_type)
|
||||
|
||||
return node_resources, instance_type_counts
|
||||
return node_resources, node_type_counts
|
||||
|
||||
def get_instances_to_launch(self, nodes: List[NodeID],
|
||||
pending_nodes: Dict[InstanceType, int],
|
||||
pending_nodes: Dict[NodeType, int],
|
||||
resource_demands: List[ResourceDict]
|
||||
) -> List[Tuple[InstanceType, int]]:
|
||||
) -> List[Tuple[NodeType, int]]:
|
||||
"""Get a list of instance types that should be added to the cluster.
|
||||
|
||||
This method:
|
||||
@@ -91,30 +90,30 @@ class ResourceDemandScheduler:
|
||||
logger.info("No resource demands")
|
||||
return []
|
||||
|
||||
node_resources, instance_type_counts = self.calculate_node_resources(
|
||||
node_resources, node_type_counts = self.calculate_node_resources(
|
||||
nodes, pending_nodes)
|
||||
logger.info("Cluster resources: {}".format(node_resources))
|
||||
logger.info("Instance counts: {}".format(instance_type_counts))
|
||||
logger.info("Instance counts: {}".format(node_type_counts))
|
||||
|
||||
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
|
||||
logger.info("Resource demands: {}".format(resource_demands))
|
||||
logger.info("Unfulfilled demands: {}".format(unfulfilled))
|
||||
|
||||
instances = get_instances_for(
|
||||
self.instance_types, instance_type_counts,
|
||||
self.max_workers - len(nodes), unfulfilled)
|
||||
instances = get_instances_for(self.node_types, node_type_counts,
|
||||
self.max_workers - len(nodes),
|
||||
unfulfilled)
|
||||
logger.info("Instance requests: {}".format(instances))
|
||||
return instances
|
||||
|
||||
|
||||
def get_instances_for(
|
||||
instance_types: Dict[InstanceType, InstanceTypeConfigDict],
|
||||
existing_instances: Dict[InstanceType, int], max_to_add: int,
|
||||
resources: List[ResourceDict]) -> List[Tuple[InstanceType, int]]:
|
||||
node_types: Dict[NodeType, NodeTypeConfigDict],
|
||||
existing_instances: Dict[NodeType, int], max_to_add: int,
|
||||
resources: List[ResourceDict]) -> List[Tuple[NodeType, int]]:
|
||||
"""Determine instances to add given resource demands and constraints.
|
||||
|
||||
Args:
|
||||
instance_types: instance types config.
|
||||
node_types: instance types config.
|
||||
existing_instances: counts of existing instances already launched.
|
||||
This sets constraints on the number of new instances to add.
|
||||
max_to_add: global constraint on instances to add.
|
||||
@@ -128,15 +127,14 @@ def get_instances_for(
|
||||
|
||||
while resources and sum(instances_to_add.values()) < max_to_add:
|
||||
utilization_scores = []
|
||||
for instance_type in instance_types:
|
||||
if (existing_instances.get(
|
||||
instance_type, 0) + instances_to_add.get(instance_type, 0)
|
||||
>= instance_types[instance_type]["max_workers"]):
|
||||
for node_type in node_types:
|
||||
if (existing_instances.get(node_type, 0) + instances_to_add.get(
|
||||
node_type, 0) >= node_types[node_type]["max_workers"]):
|
||||
continue
|
||||
node_resources = instance_types[instance_type]["resources"]
|
||||
node_resources = node_types[node_type]["resources"]
|
||||
score = _utilization_score(node_resources, resources)
|
||||
if score is not None:
|
||||
utilization_scores.append((score, instance_type))
|
||||
utilization_scores.append((score, node_type))
|
||||
|
||||
# Give up, no feasible node.
|
||||
if not utilization_scores:
|
||||
@@ -144,10 +142,9 @@ def get_instances_for(
|
||||
break
|
||||
|
||||
utilization_scores = sorted(utilization_scores, reverse=True)
|
||||
best_instance_type = utilization_scores[0][1]
|
||||
instances_to_add[best_instance_type] += 1
|
||||
allocated_resources.append(
|
||||
instance_types[best_instance_type]["resources"])
|
||||
best_node_type = utilization_scores[0][1]
|
||||
instances_to_add[best_node_type] += 1
|
||||
allocated_resources.append(node_types[best_node_type]["resources"])
|
||||
residual = get_bin_pack_residual(allocated_resources[-1:], resources)
|
||||
assert len(residual) < len(resources), (resources, residual)
|
||||
resources = residual
|
||||
|
||||
@@ -3,14 +3,15 @@
|
||||
# Tag for the name of the node
|
||||
TAG_RAY_NODE_NAME = "ray-node-name"
|
||||
|
||||
# Tag for the type of node (e.g. Head, Worker)
|
||||
TAG_RAY_NODE_TYPE = "ray-node-type"
|
||||
NODE_TYPE_HEAD = "head"
|
||||
NODE_TYPE_WORKER = "worker"
|
||||
# Tag for the kind of node (e.g. Head, Worker). For legacy reasons, the tag
|
||||
# value says 'type' instead of 'kind'.
|
||||
TAG_RAY_NODE_KIND = "ray-node-type"
|
||||
NODE_KIND_HEAD = "head"
|
||||
NODE_KIND_WORKER = "worker"
|
||||
|
||||
# Tag for the provider-specific instance type (e.g., m4.4xlarge). This is used
|
||||
# for automatic worker instance type selection.
|
||||
TAG_RAY_INSTANCE_TYPE = "ray-instance-type"
|
||||
# Tag for user defined node types (e.g., m4xl_spot). This is used for multi
|
||||
# node type clusters.
|
||||
TAG_RAY_USER_NODE_TYPE = "ray-user-node-type"
|
||||
|
||||
# Tag that reports the current state of the node (e.g. Updating, Up-to-date)
|
||||
TAG_RAY_NODE_STATUS = "ray-node-status"
|
||||
|
||||
@@ -14,22 +14,22 @@ from ray.autoscaler.util import prepare_config, validate_config
|
||||
from ray.autoscaler.commands import get_or_create_head_node
|
||||
from ray.autoscaler.load_metrics import LoadMetrics
|
||||
from ray.autoscaler.autoscaler import StandardAutoscaler
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_INSTANCE_TYPE
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, TAG_RAY_USER_NODE_TYPE
|
||||
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
|
||||
from ray.test_utils import RayTestTimeoutException
|
||||
import pytest
|
||||
|
||||
|
||||
class MockNode:
|
||||
def __init__(self, node_id, tags, node_config, instance_type):
|
||||
def __init__(self, node_id, tags, node_config, node_type):
|
||||
self.node_id = node_id
|
||||
self.state = "pending"
|
||||
self.tags = tags
|
||||
self.external_ip = "1.2.3.4"
|
||||
self.internal_ip = "172.0.0.{}".format(self.node_id)
|
||||
self.node_config = node_config
|
||||
self.instance_type = instance_type
|
||||
self.node_type = node_type
|
||||
|
||||
def matches(self, tags):
|
||||
for k, v in tags.items():
|
||||
@@ -152,7 +152,7 @@ class MockProvider(NodeProvider):
|
||||
for _ in range(count):
|
||||
self.mock_nodes[self.next_id] = MockNode(
|
||||
self.next_id, tags.copy(), node_config,
|
||||
tags.get(TAG_RAY_INSTANCE_TYPE))
|
||||
tags.get(TAG_RAY_USER_NODE_TYPE))
|
||||
self.next_id += 1
|
||||
|
||||
def set_node_tags(self, node_id, tags):
|
||||
@@ -388,7 +388,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_has_call("1.2.3.4", "init_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "start_ray_head")
|
||||
self.assertEqual(self.provider.mock_nodes[0].instance_type, None)
|
||||
self.assertEqual(self.provider.mock_nodes[0].node_type, None)
|
||||
|
||||
def testScaleUp(self):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
@@ -443,7 +443,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config["max_workers"] = 5
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "worker"}, 10)
|
||||
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "worker"}, 10)
|
||||
runner = MockProcessRunner()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -527,9 +527,9 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
|
||||
self.provider = MockProvider()
|
||||
self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "head"}, 1)
|
||||
self.provider.create_node({}, {TAG_RAY_NODE_KIND: "head"}, 1)
|
||||
head_ip = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_TYPE: "head"}, )[0]
|
||||
tag_filters={TAG_RAY_NODE_KIND: "head"}, )[0]
|
||||
runner = MockProcessRunner()
|
||||
|
||||
lm = LoadMetrics()
|
||||
@@ -552,7 +552,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
|
||||
# Connect the head and workers to end the bringup phase
|
||||
addrs = self.provider.non_terminated_node_ips(
|
||||
tag_filters={TAG_RAY_NODE_TYPE: "worker"}, )
|
||||
tag_filters={TAG_RAY_NODE_KIND: "worker"}, )
|
||||
addrs += head_ip
|
||||
for addr in addrs:
|
||||
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
|
||||
|
||||
@@ -8,9 +8,9 @@ from ray.autoscaler.node_provider import NODE_PROVIDERS, get_node_provider
|
||||
from ray.autoscaler.local.node_provider import LocalNodeProvider
|
||||
from ray.autoscaler.local.coordinator_node_provider import (
|
||||
CoordinatorSenderNodeProvider)
|
||||
from ray.autoscaler.tags import (TAG_RAY_NODE_TYPE, TAG_RAY_CLUSTER_NAME,
|
||||
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER,
|
||||
NODE_TYPE_HEAD)
|
||||
from ray.autoscaler.tags import (TAG_RAY_NODE_KIND, TAG_RAY_CLUSTER_NAME,
|
||||
TAG_RAY_NODE_NAME, NODE_KIND_WORKER,
|
||||
NODE_KIND_HEAD)
|
||||
import pytest
|
||||
|
||||
|
||||
@@ -65,13 +65,13 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
|
||||
expected_workers = {}
|
||||
expected_workers[provider_config["head_ip"]] = {
|
||||
"tags": {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
|
||||
},
|
||||
"state": "terminated",
|
||||
}
|
||||
expected_workers[provider_config["worker_ips"][0]] = {
|
||||
"tags": {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
},
|
||||
"state": "terminated",
|
||||
}
|
||||
@@ -93,7 +93,7 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
|
||||
# Test adding back workers updates the cluster state.
|
||||
expected_workers[removed_ip] = {
|
||||
"tags": {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
},
|
||||
"state": "terminated",
|
||||
}
|
||||
@@ -171,7 +171,7 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
|
||||
assert node_provider_1.is_terminated(self.list_of_node_ips[0])
|
||||
assert not node_provider_1.node_tags(self.list_of_node_ips[0])
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD,
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_HEAD,
|
||||
}
|
||||
assert not node_provider_1.non_terminated_nodes(head_node_tags)
|
||||
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
|
||||
@@ -232,17 +232,17 @@ class OnPremCoordinatorServerTest(unittest.TestCase):
|
||||
worker_node_tags = {
|
||||
TAG_RAY_NODE_NAME: "ray-{}-worker".format(
|
||||
cluster_config["cluster_name"]),
|
||||
TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER
|
||||
TAG_RAY_NODE_KIND: NODE_KIND_WORKER
|
||||
}
|
||||
node_provider_3.create_node(cluster_config["worker_nodes"],
|
||||
worker_node_tags, 1)
|
||||
assert node_provider_3.non_terminated_nodes(
|
||||
{}) == self.list_of_node_ips
|
||||
worker_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}
|
||||
worker_filter = {TAG_RAY_NODE_KIND: NODE_KIND_WORKER}
|
||||
assert node_provider_3.non_terminated_nodes(worker_filter) == [
|
||||
self.list_of_node_ips[1]
|
||||
]
|
||||
head_filter = {TAG_RAY_NODE_TYPE: NODE_TYPE_HEAD}
|
||||
head_filter = {TAG_RAY_NODE_KIND: NODE_KIND_HEAD}
|
||||
assert node_provider_3.non_terminated_nodes(head_filter) == [
|
||||
self.list_of_node_ips[0]
|
||||
]
|
||||
|
||||
@@ -12,7 +12,7 @@ from ray.autoscaler.autoscaler import StandardAutoscaler
|
||||
from ray.autoscaler.load_metrics import LoadMetrics
|
||||
from ray.autoscaler.node_provider import NODE_PROVIDERS
|
||||
from ray.autoscaler.commands import get_or_create_head_node
|
||||
from ray.autoscaler.tags import TAG_RAY_INSTANCE_TYPE
|
||||
from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE
|
||||
from ray.autoscaler.resource_demand_scheduler import _utilization_score, \
|
||||
get_bin_pack_residual, get_instances_for
|
||||
|
||||
@@ -211,11 +211,13 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_has_call("1.2.3.4", "init_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "head_setup_cmd")
|
||||
runner.assert_has_call("1.2.3.4", "start_ray_head")
|
||||
self.assertEqual(self.provider.mock_nodes[0].instance_type, "m4.large")
|
||||
self.assertEqual(self.provider.mock_nodes[0].node_type, "m4.large")
|
||||
self.assertEqual(
|
||||
self.provider.mock_nodes[0].node_config.get("FooProperty"), 42)
|
||||
self.assertEqual(
|
||||
self.provider.mock_nodes[0].tags.get(TAG_RAY_INSTANCE_TYPE),
|
||||
self.provider.mock_nodes[0].node_config.get("TestProp"), 1)
|
||||
self.assertEqual(
|
||||
self.provider.mock_nodes[0].tags.get(TAG_RAY_USER_NODE_TYPE),
|
||||
"m4.large")
|
||||
|
||||
def testScaleUpMinSanity(self):
|
||||
@@ -253,16 +255,16 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.request_resources([{"CPU": 1}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
assert self.provider.mock_nodes[0].instance_type == "m4.large"
|
||||
assert self.provider.mock_nodes[0].node_type == "m4.large"
|
||||
autoscaler.request_resources([{"GPU": 8}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge"
|
||||
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
|
||||
autoscaler.request_resources([{"CPU": 32}] * 4)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(4)
|
||||
assert self.provider.mock_nodes[2].instance_type == "m4.16xlarge"
|
||||
assert self.provider.mock_nodes[3].instance_type == "m4.16xlarge"
|
||||
assert self.provider.mock_nodes[2].node_type == "m4.16xlarge"
|
||||
assert self.provider.mock_nodes[3].node_type == "m4.16xlarge"
|
||||
|
||||
def testResourcePassing(self):
|
||||
config = MULTI_WORKER_CLUSTER.copy()
|
||||
@@ -283,11 +285,11 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.request_resources([{"CPU": 1}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
assert self.provider.mock_nodes[0].instance_type == "m4.large"
|
||||
assert self.provider.mock_nodes[0].node_type == "m4.large"
|
||||
autoscaler.request_resources([{"GPU": 8}])
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
assert self.provider.mock_nodes[1].instance_type == "p2.8xlarge"
|
||||
assert self.provider.mock_nodes[1].node_type == "p2.8xlarge"
|
||||
|
||||
# TODO (Alex): Autoscaler creates the node during one update then
|
||||
# starts the updater in the enxt update. The sleep is largely
|
||||
|
||||
Reference in New Issue
Block a user