[autoscaler] Cleanup Logging (#2709)

Moves Autoscaler onto Python `logging` module.
This commit is contained in:
Richard Liaw
2018-08-25 17:08:45 -07:00
committed by GitHub
parent 982cde664f
commit dbba7f2a53
9 changed files with 120 additions and 96 deletions
+33 -30
View File
@@ -6,13 +6,13 @@ import binascii
import copy
import json
import hashlib
import logging
import math
import os
from six.moves import queue
import subprocess
import threading
import time
import traceback
from collections import defaultdict
from datetime import datetime
@@ -32,6 +32,8 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_NODE_NAME)
import ray.services as services
logger = logging.getLogger(__name__)
REQUIRED, OPTIONAL = True, False
# For (a, b), if a is a dictionary object, then
@@ -151,11 +153,13 @@ class LoadMetrics(object):
def prune(mapping):
unwanted = set(mapping) - active_ips
for unwanted_key in unwanted:
print("Removed mapping", unwanted_key, mapping[unwanted_key])
logger.info("Removed mapping: {} - {}".format(
unwanted_key, mapping[unwanted_key]))
del mapping[unwanted_key]
if unwanted:
print("Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
logger.info(
"Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
@@ -164,7 +168,7 @@ class LoadMetrics(object):
def approx_workers_used(self):
return self._info()["NumNodesUsed"]
def debug_string(self):
def info_string(self):
return " - {}".format("\n - ".join(
["{}: {}".format(k, v) for k, v in sorted(self._info().items())]))
@@ -238,7 +242,7 @@ class NodeLauncher(threading.Thread):
}, count)
after = self.provider.nodes(tag_filters=tag_filters)
if set(after).issubset(before):
print("Warning: No new nodes reported after node creation")
logger.error("No new nodes reported after node creation")
def run(self):
while True:
@@ -342,18 +346,18 @@ class StandardAutoscaler(object):
for local_path in self.config["file_mounts"].values():
assert os.path.exists(local_path)
print("StandardAutoscaler: {}".format(self.config))
logger.info("StandardAutoscaler: {}".format(self.config))
def update(self):
try:
self.reload_config(errors_fatal=False)
self._update()
except Exception as e:
print("StandardAutoscaler: Error during autoscaling: {}"
"".format(traceback.format_exc()))
logger.exception("Error during autoscaling.")
self.num_failures += 1
if self.num_failures > self.max_failures:
print("*** StandardAutoscaler: Too many errors, abort. ***")
logger.critical(
"*** StandardAutoscaler: Too many errors, abort. ***")
raise e
def _update(self):
@@ -365,7 +369,7 @@ class StandardAutoscaler(object):
self.last_update_time = time.time()
num_pending = self.num_launches_pending.value
nodes = self.workers()
print(self.debug_string(nodes))
logger.info(self.info_string(nodes))
self.load_metrics.prune_active_ips(
[self.provider.internal_ip(node_id) for node_id in nodes])
target_workers = self.target_num_workers()
@@ -379,29 +383,29 @@ class StandardAutoscaler(object):
if node_ip in last_used and last_used[node_ip] < horizon and \
len(nodes) - num_terminated > target_workers:
num_terminated += 1
print("StandardAutoscaler: Terminating idle node: "
"{}".format(node_id))
logger.info("StandardAutoscaler: Terminating idle node: "
"{}".format(node_id))
self.provider.terminate_node(node_id)
elif not self.launch_config_ok(node_id):
num_terminated += 1
print("StandardAutoscaler: Terminating outdated node: "
"{}".format(node_id))
logger.info("StandardAutoscaler: Terminating outdated node: "
"{}".format(node_id))
self.provider.terminate_node(node_id)
if num_terminated > 0:
nodes = self.workers()
print(self.debug_string(nodes))
logger.info(self.info_string(nodes))
# Terminate nodes if there are too many
num_terminated = 0
while len(nodes) > self.config["max_workers"]:
num_terminated += 1
print("StandardAutoscaler: Terminating unneeded node: "
"{}".format(nodes[-1]))
logger.info("StandardAutoscaler: Terminating unneeded node: "
"{}".format(nodes[-1]))
self.provider.terminate_node(nodes[-1])
nodes = nodes[:-1]
if num_terminated > 0:
nodes = self.workers()
print(self.debug_string(nodes))
logger.info(self.info_string(nodes))
# Launch new nodes if needed
num_workers = len(nodes) + num_pending
@@ -410,7 +414,7 @@ class StandardAutoscaler(object):
self.max_concurrent_launches - num_pending)
num_launches = min(max_allowed, target_workers - num_workers)
self.launch_new_node(num_launches)
print(self.debug_string())
logger.info(self.info_string())
# Process any completed updates
completed = []
@@ -428,7 +432,7 @@ class StandardAutoscaler(object):
# immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(self.provider.internal_ip(node_id))
nodes = self.workers()
print(self.debug_string(nodes))
logger.info(self.info_string(nodes))
# Update nodes with out-of-date files
for node_id in nodes:
@@ -457,8 +461,7 @@ class StandardAutoscaler(object):
if errors_fatal:
raise e
else:
print("StandardAutoscaler: Error parsing config: {}",
traceback.format_exc())
logger.exception("StandardAutoscaler: Error parsing config.")
def target_num_workers(self):
target_frac = self.config["target_utilization_fraction"]
@@ -478,7 +481,7 @@ class StandardAutoscaler(object):
def files_up_to_date(self, node_id):
applied = self.provider.node_tags(node_id).get(TAG_RAY_RUNTIME_CONFIG)
if applied != self.runtime_hash:
print(
logger.info(
"StandardAutoscaler: {} has runtime state {}, want {}".format(
node_id, applied, self.runtime_hash))
return False
@@ -492,9 +495,9 @@ class StandardAutoscaler(object):
delta = time.time() - last_heartbeat_time
if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S:
return
print("StandardAutoscaler: No heartbeat from node "
"{} in {} seconds, restarting Ray to recover...".format(
node_id, delta))
logger.warning("StandardAutoscaler: No heartbeat from node "
"{} in {} seconds, restarting Ray to recover...".format(
node_id, delta))
updater = self.node_updater_cls(
node_id,
self.config["provider"],
@@ -550,7 +553,7 @@ class StandardAutoscaler(object):
return True
def launch_new_node(self, count):
print("StandardAutoscaler: Launching {} new nodes".format(count))
logger.info("StandardAutoscaler: Launching {} new nodes".format(count))
self.num_launches_pending.inc(count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count))
@@ -558,7 +561,7 @@ class StandardAutoscaler(object):
def workers(self):
return self.provider.nodes(tag_filters={TAG_RAY_NODE_TYPE: "worker"})
def debug_string(self, nodes=None):
def info_string(self, nodes=None):
if nodes is None:
nodes = self.workers()
suffix = ""
@@ -571,7 +574,7 @@ class StandardAutoscaler(object):
len(self.num_failed_updates))
return "StandardAutoscaler [{}]: {}/{} target nodes{}\n{}".format(
datetime.now(), len(nodes), self.target_num_workers(), suffix,
self.load_metrics.debug_string())
self.load_metrics.info_string())
def typename(v):
+18 -12
View File
@@ -33,6 +33,7 @@ def key_pair(i, region):
# Suppress excessive connection dropped logs from boto
logging.getLogger("botocore").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
def bootstrap_aws(config):
@@ -60,7 +61,7 @@ def _configure_iam_role(config):
profile = _get_instance_profile(DEFAULT_RAY_INSTANCE_PROFILE, config)
if profile is None:
print("Creating new instance profile {}".format(
logger.info("Creating new instance profile {}".format(
DEFAULT_RAY_INSTANCE_PROFILE))
client = _client("iam", config)
client.create_instance_profile(
@@ -73,7 +74,7 @@ def _configure_iam_role(config):
if not profile.roles:
role = _get_role(DEFAULT_RAY_IAM_ROLE, config)
if role is None:
print("Creating new role {}".format(DEFAULT_RAY_IAM_ROLE))
logger.info("Creating new role {}".format(DEFAULT_RAY_IAM_ROLE))
iam = _resource("iam", config)
iam.create_role(
RoleName=DEFAULT_RAY_IAM_ROLE,
@@ -97,7 +98,8 @@ def _configure_iam_role(config):
profile.add_role(RoleName=role.name)
time.sleep(15) # wait for propagation
print("Role not specified for head node, using {}".format(profile.arn))
logger.info("Role not specified for head node, using {}".format(
profile.arn))
config["head_node"]["IamInstanceProfile"] = {"Arn": profile.arn}
return config
@@ -122,7 +124,7 @@ def _configure_key_pair(config):
# We can safely create a new key.
if not key and not os.path.exists(key_path):
print("Creating new key pair {}".format(key_name))
logger.info("Creating new key pair {}".format(key_name))
key = ec2.create_key_pair(KeyName=key_name)
with open(key_path, "w") as f:
f.write(key.key_material)
@@ -133,7 +135,7 @@ def _configure_key_pair(config):
assert os.path.exists(key_path), \
"Private key file {} not found for {}".format(key_path, key_name)
print("KeyName not specified for nodes, using {}".format(key_name))
logger.info("KeyName not specified for nodes, using {}".format(key_name))
config["auth"]["ssh_private_key"] = key_path
config["head_node"]["KeyName"] = key_name
@@ -170,11 +172,13 @@ def _configure_subnet(config):
subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets]
if "SubnetIds" not in config["head_node"]:
config["head_node"]["SubnetIds"] = subnet_ids
print("SubnetIds not specified for head node, using ", subnet_descr)
logger.info("SubnetIds not specified for head node,"
" using {}".format(subnet_descr))
if "SubnetIds" not in config["worker_nodes"]:
config["worker_nodes"]["SubnetIds"] = subnet_ids
print("SubnetId not specified for workers, using ", subnet_descr)
logger.info("SubnetId not specified for workers,"
" using {}".format(subnet_descr))
return config
@@ -189,7 +193,7 @@ def _configure_security_group(config):
security_group = _get_security_group(config, vpc_id, group_name)
if security_group is None:
print("Creating new security group {}".format(group_name))
logger.info("Creating new security group {}".format(group_name))
client = _client("ec2", config)
client.create_security_group(
Description="Auto-created security group for Ray workers",
@@ -216,13 +220,15 @@ def _configure_security_group(config):
}])
if "SecurityGroupIds" not in config["head_node"]:
print("SecurityGroupIds not specified for head node, using {}".format(
security_group.group_name))
logger.info(
"SecurityGroupIds not specified for head node, using {}".format(
security_group.group_name))
config["head_node"]["SecurityGroupIds"] = [security_group.id]
if "SecurityGroupIds" not in config["worker_nodes"]:
print("SecurityGroupIds not specified for workers, using {}".format(
security_group.group_name))
logger.info(
"SecurityGroupIds not specified for workers, using {}".format(
security_group.group_name))
config["worker_nodes"]["SecurityGroupIds"] = [security_group.id]
return config
+11 -10
View File
@@ -10,6 +10,7 @@ import tempfile
import time
import sys
import click
import logging
import yaml
try: # py3
@@ -24,12 +25,13 @@ from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_NODE_NAME
from ray.autoscaler.updater import NodeUpdaterProcess
logger = logging.getLogger(__name__)
def create_or_update_cluster(config_file, override_min_workers,
override_max_workers, no_restart, restart_only,
yes, override_cluster_name):
"""Create or updates an autoscaling Ray cluster from a config json."""
config = yaml.load(open(config_file).read())
if override_min_workers is not None:
config["min_workers"] = override_min_workers
@@ -80,13 +82,13 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
if not workers_only:
for node in provider.nodes({TAG_RAY_NODE_TYPE: "head"}):
print("Terminating head node {}".format(node))
logger.info("Terminating head node {}".format(node))
provider.terminate_node(node)
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
while nodes:
for node in nodes:
print("Terminating worker {}".format(node))
logger.info("Terminating worker {}".format(node))
provider.terminate_node(node)
time.sleep(5)
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
@@ -116,9 +118,9 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
if head_node is not None:
confirm("Head node config out-of-date. It will be terminated", yes)
print("Terminating outdated head node {}".format(head_node))
logger.info("Terminating outdated head node {}".format(head_node))
provider.terminate_node(head_node)
print("Launching new head node...")
logger.info("Launching new head node...")
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
config["cluster_name"])
@@ -131,7 +133,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
# TODO(ekl) right now we always update the head node even if the hash
# matches. We could prompt the user for what they want to do in this case.
runtime_hash = hash_runtime_conf(config["file_mounts"], config)
print("Updating files on head node...")
logger.info("Updating files on head node...")
# Rewrite the auth config so that the head node can update the workers
remote_key_path = "~/ray_bootstrap_key.pem"
@@ -181,10 +183,10 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
provider.nodes(head_node_tags)
if updater.exitcode != 0:
print("Error: updating {} failed".format(
logger.error("Updating {} failed".format(
provider.external_ip(head_node)))
sys.exit(1)
print("Head node up-to-date, IP address is: {}".format(
logger.info("Head node up-to-date, IP address is: {}".format(
provider.external_ip(head_node)))
monitor_str = "tail -n 100 -f /tmp/raylogs/monitor-*"
@@ -337,9 +339,8 @@ def _get_head_node(config,
return _get_head_node(
config, config_file, override_cluster_name, create_if_needed=False)
else:
print("Head node of cluster ({}) not found!".format(
raise RuntimeError("Head node of cluster ({}) not found!".format(
config["cluster_name"]))
sys.exit(1)
def confirm(msg, yes):
+5 -1
View File
@@ -2,12 +2,15 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import os
try: # py3
from shlex import quote
except ImportError: # py2
from pipes import quote
logger = logging.getLogger(__name__)
def dockerize_if_needed(config):
if "docker" not in config:
@@ -16,7 +19,8 @@ def dockerize_if_needed(config):
cname = config["docker"].get("container_name")
if not docker_image:
if cname:
print("Container name given but no Docker image - continuing...")
logger.warning(
"Container name given but no Docker image - continuing...")
return config
else:
assert cname, "Must provide container name!"
+12 -8
View File
@@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
import os
import logging
import time
from cryptography.hazmat.primitives import serialization
@@ -29,10 +30,12 @@ DEFAULT_SERVICE_ACCOUNT_ROLES = ("roles/storage.objectAdmin",
MAX_POLLS = 12
POLL_INTERVAL = 5
logger = logging.getLogger(__name__)
def wait_for_crm_operation(operation):
"""Poll for cloud resource manager operation until finished."""
print("Waiting for operation {} to finish...".format(operation))
logger.info("Waiting for operation {} to finish...".format(operation))
for _ in range(MAX_POLLS):
result = crm.operations().get(name=operation["name"]).execute()
@@ -40,7 +43,7 @@ def wait_for_crm_operation(operation):
raise Exception(result["error"])
if "done" in result and result["done"]:
print("Done.")
logger.info("Done.")
break
time.sleep(POLL_INTERVAL)
@@ -50,7 +53,8 @@ def wait_for_crm_operation(operation):
def wait_for_compute_global_operation(project_name, operation):
"""Poll for global compute operation until finished."""
print("Waiting for operation {} to finish...".format(operation["name"]))
logger.info("Waiting for operation {} to finish...".format(
operation["name"]))
for _ in range(MAX_POLLS):
result = compute.globalOperations().get(
@@ -61,7 +65,7 @@ def wait_for_compute_global_operation(project_name, operation):
raise Exception(result["error"])
if result["status"] == "DONE":
print("Done.")
logger.info("Done.")
break
time.sleep(POLL_INTERVAL)
@@ -154,7 +158,7 @@ def _configure_iam_role(config):
service_account = _get_service_account(email, config)
if service_account is None:
print("Creating new service account {}".format(
logger.info("Creating new service account {}".format(
DEFAULT_SERVICE_ACCOUNT_ID))
service_account = _create_service_account(
@@ -227,7 +231,7 @@ def _configure_key_pair(config):
# Create a key since it doesn't exist locally or in GCP
if not key_found and not os.path.exists(private_key_path):
print("Creating new key pair {}".format(key_name))
logger.info("Creating new key pair {}".format(key_name))
public_key, private_key = generate_rsa_key_pair()
_create_project_ssh_key_pair(project, public_key, ssh_user)
@@ -252,8 +256,8 @@ def _configure_key_pair(config):
"Private key file {} not found for user {}"
"".format(private_key_path, ssh_user))
print("Private key not specified in config, using {}"
"".format(private_key_path))
logger.info("Private key not specified in config, using {}"
"".format(private_key_path))
config["auth"]["ssh_private_key"] = private_key_path
+6 -2
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
from uuid import uuid4
import time
import logging
from googleapiclient import discovery
from ray.autoscaler.node_provider import NodeProvider
@@ -14,10 +15,13 @@ from ray.autoscaler.gcp.config import MAX_POLLS, POLL_INTERVAL
INSTANCE_NAME_MAX_LEN = 64
INSTANCE_NAME_UUID_LEN = 8
logger = logging.getLogger(__name__)
def wait_for_compute_zone_operation(compute, project_name, operation, zone):
"""Poll for compute zone operation until finished."""
print("Waiting for operation {} to finish...".format(operation["name"]))
logger.info("Waiting for operation {} to finish...".format(
operation["name"]))
for _ in range(MAX_POLLS):
result = compute.zoneOperations().get(
@@ -27,7 +31,7 @@ def wait_for_compute_zone_operation(compute, project_name, operation, zone):
raise Exception(result["error"])
if result["status"] == "DONE":
print("Done.")
logger.info("Done.")
break
time.sleep(POLL_INTERVAL)
+6 -3
View File
@@ -6,10 +6,13 @@ from filelock import FileLock
import json
import os
import socket
import logging
from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE
logger = logging.getLogger(__name__)
class ClusterState(object):
def __init__(self, lock_path, save_path, provider_config):
@@ -21,7 +24,7 @@ class ClusterState(object):
workers = json.loads(open(self.save_path).read())
else:
workers = {}
print("Loaded cluster state", workers)
logger.info("Loaded cluster state: {}".format(workers))
for worker_ip in provider_config["worker_ips"]:
if worker_ip not in workers:
workers[worker_ip] = {
@@ -45,7 +48,7 @@ class ClusterState(object):
TAG_RAY_NODE_TYPE] == "head"
assert len(workers) == len(provider_config["worker_ips"]) + 1
with open(self.save_path, "w") as f:
print("Writing cluster state", workers)
logger.info("Writing cluster state: {}".format(workers))
f.write(json.dumps(workers))
def get(self):
@@ -60,7 +63,7 @@ class ClusterState(object):
workers = self.get()
workers[worker_id] = info
with open(self.save_path, "w") as f:
print("Writing cluster state", workers)
logger.info("Writing cluster state: {}".format(workers))
f.write(json.dumps(workers))
+28 -29
View File
@@ -6,6 +6,7 @@ try: # py3
from shlex import quote
except ImportError: # py2
from pipes import quote
import logging
import os
import subprocess
import sys
@@ -22,6 +23,8 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG
NODE_START_WAIT_S = 300
SSH_CHECK_INTERVAL = 5
logger = logging.getLogger(__name__)
def pretty_cmd(cmd_str):
return "\n\n\t{}\n\n".format(cmd_str)
@@ -55,9 +58,13 @@ class NodeUpdater(object):
}
self.setup_cmds = setup_cmds
self.runtime_hash = runtime_hash
self.logger = logger.getChild(str(node_id))
if redirect_output:
self.logfile = tempfile.NamedTemporaryFile(
mode="w", prefix="node-updater-", delete=False)
handler = logging.StreamHandler(stream=self.logfile)
handler.setLevel(logging.INFO)
self.logger.addHandler(handler)
self.output_name = self.logfile.name
self.stdout = self.logfile
self.stderr = self.logfile
@@ -74,8 +81,9 @@ class NodeUpdater(object):
return self.provider.external_ip(self.node_id)
def run(self):
print("NodeUpdater: Updating {} to {}, logging to {}".format(
self.node_id, self.runtime_hash, self.output_name))
self.logger.info(
"NodeUpdater: Updating {} to {}, logging to {}".format(
self.node_id, self.runtime_hash, self.output_name))
try:
self.do_update()
except Exception as e:
@@ -83,26 +91,23 @@ class NodeUpdater(object):
if hasattr(e, "cmd"):
error_str = "(Exit Status {}) {}".format(
e.returncode, pretty_cmd(" ".join(e.cmd)))
print(
"NodeUpdater: Error updating {}"
"See {} for remote logs.".format(error_str, self.output_name),
file=self.stdout)
self.logger.error("NodeUpdater: Error updating {}"
"See {} for remote logs.".format(
error_str, self.output_name))
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "update-failed"})
if self.logfile is not None:
print("----- BEGIN REMOTE LOGS -----\n" +
open(self.logfile.name).read() +
"\n----- END REMOTE LOGS -----")
self.logger.info("----- BEGIN REMOTE LOGS -----\n" +
open(self.logfile.name).read() +
"\n----- END REMOTE LOGS -----")
raise e
self.provider.set_node_tags(
self.node_id, {
TAG_RAY_NODE_STATUS: "up-to-date",
TAG_RAY_RUNTIME_CONFIG: self.runtime_hash
})
print(
"NodeUpdater: Applied config {} to node {}".format(
self.runtime_hash, self.node_id),
file=self.stdout)
self.logger.info("NodeUpdater: Applied config {} to node {}".format(
self.runtime_hash, self.node_id))
def do_update(self):
self.provider.set_node_tags(self.node_id,
@@ -112,9 +117,8 @@ class NodeUpdater(object):
# Wait for external IP
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
print(
"NodeUpdater: Waiting for IP of {}...".format(self.node_id),
file=self.stdout)
self.logger.info("NodeUpdater: Waiting for IP of {}...".format(
self.node_id))
self.ssh_ip = self.get_node_ip()
if self.ssh_ip is not None:
break
@@ -126,10 +130,9 @@ class NodeUpdater(object):
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
try:
print(
self.logger.info(
"NodeUpdater: Waiting for SSH to {}...".format(
self.node_id),
file=self.stdout)
self.node_id))
if not self.provider.is_running(self.node_id):
raise Exception("Node not running yet...")
self.ssh_cmd(
@@ -142,9 +145,9 @@ class NodeUpdater(object):
if hasattr(e, "cmd"):
retry_str = "(Exit Status {}): {}".format(
e.returncode, pretty_cmd(" ".join(e.cmd)))
print(
self.logger.debug(
"NodeUpdater: SSH not up, retrying: {}".format(retry_str),
file=self.stdout)
)
time.sleep(SSH_CHECK_INTERVAL)
else:
break
@@ -154,10 +157,8 @@ class NodeUpdater(object):
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "syncing-files"})
for remote_path, local_path in self.file_mounts.items():
print(
"NodeUpdater: Syncing {} to {}...".format(
local_path, remote_path),
file=self.stdout)
self.logger.info("NodeUpdater: Syncing {} to {}...".format(
local_path, remote_path))
assert os.path.exists(local_path), local_path
if os.path.isdir(local_path):
if not local_path.endswith("/"):
@@ -212,10 +213,8 @@ class NodeUpdater(object):
expect_error=False,
port_forward=None):
if verbose:
print(
"NodeUpdater: running {} on {}...".format(
pretty_cmd(cmd), self.ssh_ip),
file=self.stdout)
self.logger.info("NodeUpdater: running {} on {}...".format(
pretty_cmd(cmd), self.ssh_ip))
ssh = ["ssh"]
if allocate_tty:
ssh.append("-tt")