From dbba7f2a53f45ea80321bca7c99492f44efd75f7 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Sat, 25 Aug 2018 17:08:45 -0700 Subject: [PATCH] [autoscaler] Cleanup Logging (#2709) Moves Autoscaler onto Python `logging` module. --- python/ray/autoscaler/autoscaler.py | 63 ++++++++++---------- python/ray/autoscaler/aws/config.py | 30 ++++++---- python/ray/autoscaler/commands.py | 21 +++---- python/ray/autoscaler/docker.py | 6 +- python/ray/autoscaler/gcp/config.py | 20 ++++--- python/ray/autoscaler/gcp/node_provider.py | 8 ++- python/ray/autoscaler/local/node_provider.py | 9 ++- python/ray/autoscaler/updater.py | 57 +++++++++--------- test/autoscaler_test.py | 2 +- 9 files changed, 120 insertions(+), 96 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index e42af69a0..2a3734ba4 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -6,13 +6,13 @@ import binascii import copy import json import hashlib +import logging import math import os from six.moves import queue import subprocess import threading import time -import traceback from collections import defaultdict from datetime import datetime @@ -32,6 +32,8 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_NAME) import ray.services as services +logger = logging.getLogger(__name__) + REQUIRED, OPTIONAL = True, False # For (a, b), if a is a dictionary object, then @@ -151,11 +153,13 @@ class LoadMetrics(object): def prune(mapping): unwanted = set(mapping) - active_ips for unwanted_key in unwanted: - print("Removed mapping", unwanted_key, mapping[unwanted_key]) + logger.info("Removed mapping: {} - {}".format( + unwanted_key, mapping[unwanted_key])) del mapping[unwanted_key] if unwanted: - print("Removed {} stale ip mappings: {} not in {}".format( - len(unwanted), unwanted, active_ips)) + logger.info( + "Removed {} stale ip mappings: {} not in {}".format( + len(unwanted), unwanted, active_ips)) prune(self.last_used_time_by_ip) prune(self.static_resources_by_ip) @@ -164,7 +168,7 @@ class LoadMetrics(object): def approx_workers_used(self): return self._info()["NumNodesUsed"] - def debug_string(self): + def info_string(self): return " - {}".format("\n - ".join( ["{}: {}".format(k, v) for k, v in sorted(self._info().items())])) @@ -238,7 +242,7 @@ class NodeLauncher(threading.Thread): }, count) after = self.provider.nodes(tag_filters=tag_filters) if set(after).issubset(before): - print("Warning: No new nodes reported after node creation") + logger.error("No new nodes reported after node creation") def run(self): while True: @@ -342,18 +346,18 @@ class StandardAutoscaler(object): for local_path in self.config["file_mounts"].values(): assert os.path.exists(local_path) - print("StandardAutoscaler: {}".format(self.config)) + logger.info("StandardAutoscaler: {}".format(self.config)) def update(self): try: self.reload_config(errors_fatal=False) self._update() except Exception as e: - print("StandardAutoscaler: Error during autoscaling: {}" - "".format(traceback.format_exc())) + logger.exception("Error during autoscaling.") self.num_failures += 1 if self.num_failures > self.max_failures: - print("*** StandardAutoscaler: Too many errors, abort. ***") + logger.critical( + "*** StandardAutoscaler: Too many errors, abort. ***") raise e def _update(self): @@ -365,7 +369,7 @@ class StandardAutoscaler(object): self.last_update_time = time.time() num_pending = self.num_launches_pending.value nodes = self.workers() - print(self.debug_string(nodes)) + logger.info(self.info_string(nodes)) self.load_metrics.prune_active_ips( [self.provider.internal_ip(node_id) for node_id in nodes]) target_workers = self.target_num_workers() @@ -379,29 +383,29 @@ class StandardAutoscaler(object): if node_ip in last_used and last_used[node_ip] < horizon and \ len(nodes) - num_terminated > target_workers: num_terminated += 1 - print("StandardAutoscaler: Terminating idle node: " - "{}".format(node_id)) + logger.info("StandardAutoscaler: Terminating idle node: " + "{}".format(node_id)) self.provider.terminate_node(node_id) elif not self.launch_config_ok(node_id): num_terminated += 1 - print("StandardAutoscaler: Terminating outdated node: " - "{}".format(node_id)) + logger.info("StandardAutoscaler: Terminating outdated node: " + "{}".format(node_id)) self.provider.terminate_node(node_id) if num_terminated > 0: nodes = self.workers() - print(self.debug_string(nodes)) + logger.info(self.info_string(nodes)) # Terminate nodes if there are too many num_terminated = 0 while len(nodes) > self.config["max_workers"]: num_terminated += 1 - print("StandardAutoscaler: Terminating unneeded node: " - "{}".format(nodes[-1])) + logger.info("StandardAutoscaler: Terminating unneeded node: " + "{}".format(nodes[-1])) self.provider.terminate_node(nodes[-1]) nodes = nodes[:-1] if num_terminated > 0: nodes = self.workers() - print(self.debug_string(nodes)) + logger.info(self.info_string(nodes)) # Launch new nodes if needed num_workers = len(nodes) + num_pending @@ -410,7 +414,7 @@ class StandardAutoscaler(object): self.max_concurrent_launches - num_pending) num_launches = min(max_allowed, target_workers - num_workers) self.launch_new_node(num_launches) - print(self.debug_string()) + logger.info(self.info_string()) # Process any completed updates completed = [] @@ -428,7 +432,7 @@ class StandardAutoscaler(object): # immediately trying to restart Ray on the new node. self.load_metrics.mark_active(self.provider.internal_ip(node_id)) nodes = self.workers() - print(self.debug_string(nodes)) + logger.info(self.info_string(nodes)) # Update nodes with out-of-date files for node_id in nodes: @@ -457,8 +461,7 @@ class StandardAutoscaler(object): if errors_fatal: raise e else: - print("StandardAutoscaler: Error parsing config: {}", - traceback.format_exc()) + logger.exception("StandardAutoscaler: Error parsing config.") def target_num_workers(self): target_frac = self.config["target_utilization_fraction"] @@ -478,7 +481,7 @@ class StandardAutoscaler(object): def files_up_to_date(self, node_id): applied = self.provider.node_tags(node_id).get(TAG_RAY_RUNTIME_CONFIG) if applied != self.runtime_hash: - print( + logger.info( "StandardAutoscaler: {} has runtime state {}, want {}".format( node_id, applied, self.runtime_hash)) return False @@ -492,9 +495,9 @@ class StandardAutoscaler(object): delta = time.time() - last_heartbeat_time if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S: return - print("StandardAutoscaler: No heartbeat from node " - "{} in {} seconds, restarting Ray to recover...".format( - node_id, delta)) + logger.warning("StandardAutoscaler: No heartbeat from node " + "{} in {} seconds, restarting Ray to recover...".format( + node_id, delta)) updater = self.node_updater_cls( node_id, self.config["provider"], @@ -550,7 +553,7 @@ class StandardAutoscaler(object): return True def launch_new_node(self, count): - print("StandardAutoscaler: Launching {} new nodes".format(count)) + logger.info("StandardAutoscaler: Launching {} new nodes".format(count)) self.num_launches_pending.inc(count) config = copy.deepcopy(self.config) self.launch_queue.put((config, count)) @@ -558,7 +561,7 @@ class StandardAutoscaler(object): def workers(self): return self.provider.nodes(tag_filters={TAG_RAY_NODE_TYPE: "worker"}) - def debug_string(self, nodes=None): + def info_string(self, nodes=None): if nodes is None: nodes = self.workers() suffix = "" @@ -571,7 +574,7 @@ class StandardAutoscaler(object): len(self.num_failed_updates)) return "StandardAutoscaler [{}]: {}/{} target nodes{}\n{}".format( datetime.now(), len(nodes), self.target_num_workers(), suffix, - self.load_metrics.debug_string()) + self.load_metrics.info_string()) def typename(v): diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index beaa38ee2..8e5d3a4da 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -33,6 +33,7 @@ def key_pair(i, region): # Suppress excessive connection dropped logs from boto logging.getLogger("botocore").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) def bootstrap_aws(config): @@ -60,7 +61,7 @@ def _configure_iam_role(config): profile = _get_instance_profile(DEFAULT_RAY_INSTANCE_PROFILE, config) if profile is None: - print("Creating new instance profile {}".format( + logger.info("Creating new instance profile {}".format( DEFAULT_RAY_INSTANCE_PROFILE)) client = _client("iam", config) client.create_instance_profile( @@ -73,7 +74,7 @@ def _configure_iam_role(config): if not profile.roles: role = _get_role(DEFAULT_RAY_IAM_ROLE, config) if role is None: - print("Creating new role {}".format(DEFAULT_RAY_IAM_ROLE)) + logger.info("Creating new role {}".format(DEFAULT_RAY_IAM_ROLE)) iam = _resource("iam", config) iam.create_role( RoleName=DEFAULT_RAY_IAM_ROLE, @@ -97,7 +98,8 @@ def _configure_iam_role(config): profile.add_role(RoleName=role.name) time.sleep(15) # wait for propagation - print("Role not specified for head node, using {}".format(profile.arn)) + logger.info("Role not specified for head node, using {}".format( + profile.arn)) config["head_node"]["IamInstanceProfile"] = {"Arn": profile.arn} return config @@ -122,7 +124,7 @@ def _configure_key_pair(config): # We can safely create a new key. if not key and not os.path.exists(key_path): - print("Creating new key pair {}".format(key_name)) + logger.info("Creating new key pair {}".format(key_name)) key = ec2.create_key_pair(KeyName=key_name) with open(key_path, "w") as f: f.write(key.key_material) @@ -133,7 +135,7 @@ def _configure_key_pair(config): assert os.path.exists(key_path), \ "Private key file {} not found for {}".format(key_path, key_name) - print("KeyName not specified for nodes, using {}".format(key_name)) + logger.info("KeyName not specified for nodes, using {}".format(key_name)) config["auth"]["ssh_private_key"] = key_path config["head_node"]["KeyName"] = key_name @@ -170,11 +172,13 @@ def _configure_subnet(config): subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets] if "SubnetIds" not in config["head_node"]: config["head_node"]["SubnetIds"] = subnet_ids - print("SubnetIds not specified for head node, using ", subnet_descr) + logger.info("SubnetIds not specified for head node," + " using {}".format(subnet_descr)) if "SubnetIds" not in config["worker_nodes"]: config["worker_nodes"]["SubnetIds"] = subnet_ids - print("SubnetId not specified for workers, using ", subnet_descr) + logger.info("SubnetId not specified for workers," + " using {}".format(subnet_descr)) return config @@ -189,7 +193,7 @@ def _configure_security_group(config): security_group = _get_security_group(config, vpc_id, group_name) if security_group is None: - print("Creating new security group {}".format(group_name)) + logger.info("Creating new security group {}".format(group_name)) client = _client("ec2", config) client.create_security_group( Description="Auto-created security group for Ray workers", @@ -216,13 +220,15 @@ def _configure_security_group(config): }]) if "SecurityGroupIds" not in config["head_node"]: - print("SecurityGroupIds not specified for head node, using {}".format( - security_group.group_name)) + logger.info( + "SecurityGroupIds not specified for head node, using {}".format( + security_group.group_name)) config["head_node"]["SecurityGroupIds"] = [security_group.id] if "SecurityGroupIds" not in config["worker_nodes"]: - print("SecurityGroupIds not specified for workers, using {}".format( - security_group.group_name)) + logger.info( + "SecurityGroupIds not specified for workers, using {}".format( + security_group.group_name)) config["worker_nodes"]["SecurityGroupIds"] = [security_group.id] return config diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 2d4a93660..2410c20d6 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -10,6 +10,7 @@ import tempfile import time import sys import click +import logging import yaml try: # py3 @@ -24,12 +25,13 @@ from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_NODE_NAME from ray.autoscaler.updater import NodeUpdaterProcess +logger = logging.getLogger(__name__) + def create_or_update_cluster(config_file, override_min_workers, override_max_workers, no_restart, restart_only, yes, override_cluster_name): """Create or updates an autoscaling Ray cluster from a config json.""" - config = yaml.load(open(config_file).read()) if override_min_workers is not None: config["min_workers"] = override_min_workers @@ -80,13 +82,13 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): if not workers_only: for node in provider.nodes({TAG_RAY_NODE_TYPE: "head"}): - print("Terminating head node {}".format(node)) + logger.info("Terminating head node {}".format(node)) provider.terminate_node(node) nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) while nodes: for node in nodes: - print("Terminating worker {}".format(node)) + logger.info("Terminating worker {}".format(node)) provider.terminate_node(node) time.sleep(5) nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) @@ -116,9 +118,9 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, TAG_RAY_LAUNCH_CONFIG) != launch_hash: if head_node is not None: confirm("Head node config out-of-date. It will be terminated", yes) - print("Terminating outdated head node {}".format(head_node)) + logger.info("Terminating outdated head node {}".format(head_node)) provider.terminate_node(head_node) - print("Launching new head node...") + logger.info("Launching new head node...") head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( config["cluster_name"]) @@ -131,7 +133,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, # TODO(ekl) right now we always update the head node even if the hash # matches. We could prompt the user for what they want to do in this case. runtime_hash = hash_runtime_conf(config["file_mounts"], config) - print("Updating files on head node...") + logger.info("Updating files on head node...") # Rewrite the auth config so that the head node can update the workers remote_key_path = "~/ray_bootstrap_key.pem" @@ -181,10 +183,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, provider.nodes(head_node_tags) if updater.exitcode != 0: - print("Error: updating {} failed".format( + logger.error("Updating {} failed".format( provider.external_ip(head_node))) sys.exit(1) - print("Head node up-to-date, IP address is: {}".format( + logger.info("Head node up-to-date, IP address is: {}".format( provider.external_ip(head_node))) monitor_str = "tail -n 100 -f /tmp/raylogs/monitor-*" @@ -337,9 +339,8 @@ def _get_head_node(config, return _get_head_node( config, config_file, override_cluster_name, create_if_needed=False) else: - print("Head node of cluster ({}) not found!".format( + raise RuntimeError("Head node of cluster ({}) not found!".format( config["cluster_name"])) - sys.exit(1) def confirm(msg, yes): diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index 8d8e2e8a2..7ec121ae4 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -2,12 +2,15 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import os try: # py3 from shlex import quote except ImportError: # py2 from pipes import quote +logger = logging.getLogger(__name__) + def dockerize_if_needed(config): if "docker" not in config: @@ -16,7 +19,8 @@ def dockerize_if_needed(config): cname = config["docker"].get("container_name") if not docker_image: if cname: - print("Container name given but no Docker image - continuing...") + logger.warning( + "Container name given but no Docker image - continuing...") return config else: assert cname, "Must provide container name!" diff --git a/python/ray/autoscaler/gcp/config.py b/python/ray/autoscaler/gcp/config.py index 6df3cb777..d6ae2edeb 100644 --- a/python/ray/autoscaler/gcp/config.py +++ b/python/ray/autoscaler/gcp/config.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import os +import logging import time from cryptography.hazmat.primitives import serialization @@ -29,10 +30,12 @@ DEFAULT_SERVICE_ACCOUNT_ROLES = ("roles/storage.objectAdmin", MAX_POLLS = 12 POLL_INTERVAL = 5 +logger = logging.getLogger(__name__) + def wait_for_crm_operation(operation): """Poll for cloud resource manager operation until finished.""" - print("Waiting for operation {} to finish...".format(operation)) + logger.info("Waiting for operation {} to finish...".format(operation)) for _ in range(MAX_POLLS): result = crm.operations().get(name=operation["name"]).execute() @@ -40,7 +43,7 @@ def wait_for_crm_operation(operation): raise Exception(result["error"]) if "done" in result and result["done"]: - print("Done.") + logger.info("Done.") break time.sleep(POLL_INTERVAL) @@ -50,7 +53,8 @@ def wait_for_crm_operation(operation): def wait_for_compute_global_operation(project_name, operation): """Poll for global compute operation until finished.""" - print("Waiting for operation {} to finish...".format(operation["name"])) + logger.info("Waiting for operation {} to finish...".format( + operation["name"])) for _ in range(MAX_POLLS): result = compute.globalOperations().get( @@ -61,7 +65,7 @@ def wait_for_compute_global_operation(project_name, operation): raise Exception(result["error"]) if result["status"] == "DONE": - print("Done.") + logger.info("Done.") break time.sleep(POLL_INTERVAL) @@ -154,7 +158,7 @@ def _configure_iam_role(config): service_account = _get_service_account(email, config) if service_account is None: - print("Creating new service account {}".format( + logger.info("Creating new service account {}".format( DEFAULT_SERVICE_ACCOUNT_ID)) service_account = _create_service_account( @@ -227,7 +231,7 @@ def _configure_key_pair(config): # Create a key since it doesn't exist locally or in GCP if not key_found and not os.path.exists(private_key_path): - print("Creating new key pair {}".format(key_name)) + logger.info("Creating new key pair {}".format(key_name)) public_key, private_key = generate_rsa_key_pair() _create_project_ssh_key_pair(project, public_key, ssh_user) @@ -252,8 +256,8 @@ def _configure_key_pair(config): "Private key file {} not found for user {}" "".format(private_key_path, ssh_user)) - print("Private key not specified in config, using {}" - "".format(private_key_path)) + logger.info("Private key not specified in config, using {}" + "".format(private_key_path)) config["auth"]["ssh_private_key"] = private_key_path diff --git a/python/ray/autoscaler/gcp/node_provider.py b/python/ray/autoscaler/gcp/node_provider.py index e3f1b1c5d..a191dea34 100644 --- a/python/ray/autoscaler/gcp/node_provider.py +++ b/python/ray/autoscaler/gcp/node_provider.py @@ -5,6 +5,7 @@ from __future__ import print_function from uuid import uuid4 import time +import logging from googleapiclient import discovery from ray.autoscaler.node_provider import NodeProvider @@ -14,10 +15,13 @@ from ray.autoscaler.gcp.config import MAX_POLLS, POLL_INTERVAL INSTANCE_NAME_MAX_LEN = 64 INSTANCE_NAME_UUID_LEN = 8 +logger = logging.getLogger(__name__) + def wait_for_compute_zone_operation(compute, project_name, operation, zone): """Poll for compute zone operation until finished.""" - print("Waiting for operation {} to finish...".format(operation["name"])) + logger.info("Waiting for operation {} to finish...".format( + operation["name"])) for _ in range(MAX_POLLS): result = compute.zoneOperations().get( @@ -27,7 +31,7 @@ def wait_for_compute_zone_operation(compute, project_name, operation, zone): raise Exception(result["error"]) if result["status"] == "DONE": - print("Done.") + logger.info("Done.") break time.sleep(POLL_INTERVAL) diff --git a/python/ray/autoscaler/local/node_provider.py b/python/ray/autoscaler/local/node_provider.py index 886d5c2a8..432c7b0ab 100644 --- a/python/ray/autoscaler/local/node_provider.py +++ b/python/ray/autoscaler/local/node_provider.py @@ -6,10 +6,13 @@ from filelock import FileLock import json import os import socket +import logging from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler.tags import TAG_RAY_NODE_TYPE +logger = logging.getLogger(__name__) + class ClusterState(object): def __init__(self, lock_path, save_path, provider_config): @@ -21,7 +24,7 @@ class ClusterState(object): workers = json.loads(open(self.save_path).read()) else: workers = {} - print("Loaded cluster state", workers) + logger.info("Loaded cluster state: {}".format(workers)) for worker_ip in provider_config["worker_ips"]: if worker_ip not in workers: workers[worker_ip] = { @@ -45,7 +48,7 @@ class ClusterState(object): TAG_RAY_NODE_TYPE] == "head" assert len(workers) == len(provider_config["worker_ips"]) + 1 with open(self.save_path, "w") as f: - print("Writing cluster state", workers) + logger.info("Writing cluster state: {}".format(workers)) f.write(json.dumps(workers)) def get(self): @@ -60,7 +63,7 @@ class ClusterState(object): workers = self.get() workers[worker_id] = info with open(self.save_path, "w") as f: - print("Writing cluster state", workers) + logger.info("Writing cluster state: {}".format(workers)) f.write(json.dumps(workers)) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 27f9aceb6..b132971d2 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -6,6 +6,7 @@ try: # py3 from shlex import quote except ImportError: # py2 from pipes import quote +import logging import os import subprocess import sys @@ -22,6 +23,8 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG NODE_START_WAIT_S = 300 SSH_CHECK_INTERVAL = 5 +logger = logging.getLogger(__name__) + def pretty_cmd(cmd_str): return "\n\n\t{}\n\n".format(cmd_str) @@ -55,9 +58,13 @@ class NodeUpdater(object): } self.setup_cmds = setup_cmds self.runtime_hash = runtime_hash + self.logger = logger.getChild(str(node_id)) if redirect_output: self.logfile = tempfile.NamedTemporaryFile( mode="w", prefix="node-updater-", delete=False) + handler = logging.StreamHandler(stream=self.logfile) + handler.setLevel(logging.INFO) + self.logger.addHandler(handler) self.output_name = self.logfile.name self.stdout = self.logfile self.stderr = self.logfile @@ -74,8 +81,9 @@ class NodeUpdater(object): return self.provider.external_ip(self.node_id) def run(self): - print("NodeUpdater: Updating {} to {}, logging to {}".format( - self.node_id, self.runtime_hash, self.output_name)) + self.logger.info( + "NodeUpdater: Updating {} to {}, logging to {}".format( + self.node_id, self.runtime_hash, self.output_name)) try: self.do_update() except Exception as e: @@ -83,26 +91,23 @@ class NodeUpdater(object): if hasattr(e, "cmd"): error_str = "(Exit Status {}) {}".format( e.returncode, pretty_cmd(" ".join(e.cmd))) - print( - "NodeUpdater: Error updating {}" - "See {} for remote logs.".format(error_str, self.output_name), - file=self.stdout) + self.logger.error("NodeUpdater: Error updating {}" + "See {} for remote logs.".format( + error_str, self.output_name)) self.provider.set_node_tags(self.node_id, {TAG_RAY_NODE_STATUS: "update-failed"}) if self.logfile is not None: - print("----- BEGIN REMOTE LOGS -----\n" + - open(self.logfile.name).read() + - "\n----- END REMOTE LOGS -----") + self.logger.info("----- BEGIN REMOTE LOGS -----\n" + + open(self.logfile.name).read() + + "\n----- END REMOTE LOGS -----") raise e self.provider.set_node_tags( self.node_id, { TAG_RAY_NODE_STATUS: "up-to-date", TAG_RAY_RUNTIME_CONFIG: self.runtime_hash }) - print( - "NodeUpdater: Applied config {} to node {}".format( - self.runtime_hash, self.node_id), - file=self.stdout) + self.logger.info("NodeUpdater: Applied config {} to node {}".format( + self.runtime_hash, self.node_id)) def do_update(self): self.provider.set_node_tags(self.node_id, @@ -112,9 +117,8 @@ class NodeUpdater(object): # Wait for external IP while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): - print( - "NodeUpdater: Waiting for IP of {}...".format(self.node_id), - file=self.stdout) + self.logger.info("NodeUpdater: Waiting for IP of {}...".format( + self.node_id)) self.ssh_ip = self.get_node_ip() if self.ssh_ip is not None: break @@ -126,10 +130,9 @@ class NodeUpdater(object): while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): try: - print( + self.logger.info( "NodeUpdater: Waiting for SSH to {}...".format( - self.node_id), - file=self.stdout) + self.node_id)) if not self.provider.is_running(self.node_id): raise Exception("Node not running yet...") self.ssh_cmd( @@ -142,9 +145,9 @@ class NodeUpdater(object): if hasattr(e, "cmd"): retry_str = "(Exit Status {}): {}".format( e.returncode, pretty_cmd(" ".join(e.cmd))) - print( + self.logger.debug( "NodeUpdater: SSH not up, retrying: {}".format(retry_str), - file=self.stdout) + ) time.sleep(SSH_CHECK_INTERVAL) else: break @@ -154,10 +157,8 @@ class NodeUpdater(object): self.provider.set_node_tags(self.node_id, {TAG_RAY_NODE_STATUS: "syncing-files"}) for remote_path, local_path in self.file_mounts.items(): - print( - "NodeUpdater: Syncing {} to {}...".format( - local_path, remote_path), - file=self.stdout) + self.logger.info("NodeUpdater: Syncing {} to {}...".format( + local_path, remote_path)) assert os.path.exists(local_path), local_path if os.path.isdir(local_path): if not local_path.endswith("/"): @@ -212,10 +213,8 @@ class NodeUpdater(object): expect_error=False, port_forward=None): if verbose: - print( - "NodeUpdater: running {} on {}...".format( - pretty_cmd(cmd), self.ssh_ip), - file=self.stdout) + self.logger.info("NodeUpdater: running {} on {}...".format( + pretty_cmd(cmd), self.ssh_ip)) ssh = ["ssh"] if allocate_tty: ssh.append("-tt") diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 52b6fde7b..4312fdcb1 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -163,7 +163,7 @@ class LoadMetricsTest(unittest.TestCase): lm = LoadMetrics() lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) - debug = lm.debug_string() + debug = lm.info_string() assert "ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU" in debug assert "NumNodesConnected: 2" in debug assert "NumNodesUsed: 1.88" in debug