mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:00:36 +08:00
Partially Use f string (#10218)
* flynt. trial 1. * Trial 1. * Addressed code review.
This commit is contained in:
+5
-6
@@ -331,7 +331,7 @@ class ActorClass:
|
||||
class DerivedActorClass(cls, modified_class):
|
||||
pass
|
||||
|
||||
name = "ActorClass({})".format(modified_class.__name__)
|
||||
name = f"ActorClass({modified_class.__name__})"
|
||||
DerivedActorClass.__module__ = modified_class.__module__
|
||||
DerivedActorClass.__name__ = name
|
||||
DerivedActorClass.__qualname__ = name
|
||||
@@ -480,8 +480,8 @@ class ActorClass:
|
||||
|
||||
if name is not None:
|
||||
if not isinstance(name, str):
|
||||
raise TypeError("name must be None or a string, "
|
||||
"got: '{}'.".format(type(name)))
|
||||
raise TypeError(
|
||||
f"name must be None or a string, got: '{type(name)}'.")
|
||||
if name == "":
|
||||
raise ValueError("Actor name cannot be an empty string.")
|
||||
|
||||
@@ -744,9 +744,8 @@ class ActorHandle:
|
||||
format(item, item))
|
||||
|
||||
def remote(self, *args, **kwargs):
|
||||
logger.warning(
|
||||
"Actor method {} is not supported by cross language."
|
||||
.format(item))
|
||||
logger.warning(f"Actor method {item} is not "
|
||||
"supported by cross language.")
|
||||
|
||||
return FakeActorMethod()
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ class RayTaskError(RayError):
|
||||
RayTaskError.__init__(self, function_name, traceback_str,
|
||||
cause_cls, proctitle, pid, ip)
|
||||
|
||||
name = "RayTaskError({})".format(self.cause_cls.__name__)
|
||||
name = f"RayTaskError({self.cause_cls.__name__})"
|
||||
cls.__name__ = name
|
||||
cls.__qualname__ = name
|
||||
|
||||
@@ -140,7 +140,7 @@ class RayletError(RayError):
|
||||
self.client_exc = client_exc
|
||||
|
||||
def __str__(self):
|
||||
return "The Raylet died with this message: {}".format(self.client_exc)
|
||||
return f"The Raylet died with this message: {self.client_exc}"
|
||||
|
||||
|
||||
class ObjectStoreFullError(RayError):
|
||||
|
||||
@@ -272,11 +272,10 @@ class FunctionActorManager:
|
||||
max_calls=0,
|
||||
))
|
||||
self._num_task_executions[job_id][function_id] = 0
|
||||
except Exception:
|
||||
logger.exception("Failed to load function %s.", function_name)
|
||||
raise RuntimeError(
|
||||
"Function {} failed to be loaded from local code.".format(
|
||||
function_descriptor))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Function {function_descriptor} failed "
|
||||
"to be loaded from local code. "
|
||||
f"Error message: {str(e)}")
|
||||
|
||||
def _wait_for_function(self, function_descriptor, job_id, timeout=10):
|
||||
"""Wait until the function to be executed is present on this worker.
|
||||
@@ -445,11 +444,10 @@ class FunctionActorManager:
|
||||
return actor_class.__ray_metadata__.modified_class
|
||||
else:
|
||||
return actor_class
|
||||
except Exception:
|
||||
logger.exception("Failed to load actor_class %s.", class_name)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
"Actor {} failed to be imported from local code.".format(
|
||||
class_name))
|
||||
f"Actor {class_name} failed to be imported from local code."
|
||||
f"Error Message: {str(e)}")
|
||||
|
||||
def _create_fake_actor_class(self, actor_class_name, actor_method_names):
|
||||
class TemporaryActor:
|
||||
|
||||
@@ -90,7 +90,7 @@ class ImportThread:
|
||||
key = self.redis_client.lindex("Exports", i)
|
||||
self._process_key(key)
|
||||
except (OSError, redis.exceptions.ConnectionError) as e:
|
||||
logger.error("ImportThread: {}".format(e))
|
||||
logger.error(f"ImportThread: {e}")
|
||||
finally:
|
||||
# Close the pubsub client to avoid leaking file descriptors.
|
||||
import_pubsub_client.close()
|
||||
|
||||
@@ -117,10 +117,9 @@ class LogMonitor:
|
||||
def update_log_filenames(self):
|
||||
"""Update the list of log files to monitor."""
|
||||
# output of user code is written here
|
||||
log_file_paths = glob.glob("{}/worker*[.out|.err]".format(
|
||||
self.logs_dir))
|
||||
log_file_paths = glob.glob(f"{self.logs_dir}/worker*[.out|.err]")
|
||||
# segfaults and other serious errors are logged here
|
||||
raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir))
|
||||
raylet_err_paths = glob.glob(f"{self.logs_dir}/raylet*.err")
|
||||
for file_path in log_file_paths + raylet_err_paths:
|
||||
if os.path.isfile(
|
||||
file_path) and file_path not in self.log_filenames:
|
||||
@@ -142,7 +141,7 @@ class LogMonitor:
|
||||
is_err_file=is_err_file,
|
||||
job_id=job_id))
|
||||
log_filename = os.path.basename(file_path)
|
||||
logger.info("Beginning to track file {}".format(log_filename))
|
||||
logger.info(f"Beginning to track file {log_filename}")
|
||||
|
||||
def open_closed_files(self):
|
||||
"""Open some closed files if they may have new lines.
|
||||
|
||||
@@ -49,7 +49,7 @@ class RayOutOfMemoryError(Exception):
|
||||
return ("More than {}% of the memory on ".format(int(
|
||||
100 * threshold)) + "node {} is used ({} / {} GB). ".format(
|
||||
platform.node(), round(used_gb, 2), round(total_gb, 2)) +
|
||||
"The top 10 memory consumers are:\n\n{}".format(proc_str) +
|
||||
f"The top 10 memory consumers are:\n\n{proc_str}" +
|
||||
"\n\nIn addition, up to {} GiB of shared memory is ".format(
|
||||
round(get_shared(psutil.virtual_memory()) / (1024**3), 2))
|
||||
+ "currently being used by the Ray object store. You can set "
|
||||
@@ -134,8 +134,7 @@ class MemoryMonitor:
|
||||
RayOutOfMemoryError.get_message(used_gb, total_gb,
|
||||
self.error_threshold))
|
||||
else:
|
||||
logger.debug("Memory usage is {} / {}".format(
|
||||
used_gb, total_gb))
|
||||
logger.debug(f"Memory usage is {used_gb} / {total_gb}")
|
||||
|
||||
if self.heap_limit:
|
||||
mem_info = psutil.Process(os.getpid()).memory_info()
|
||||
|
||||
@@ -176,5 +176,5 @@ class PrometheusServiceDiscoveryWriter(threading.Thread):
|
||||
"failed."
|
||||
.format(self.writer.get_target_file_name()))
|
||||
logger.warning(traceback.format_exc())
|
||||
logger.warning("Error message: {}".format(e))
|
||||
logger.warning(f"Error message: {e}")
|
||||
time.sleep(self.default_service_discovery_flush_period)
|
||||
|
||||
@@ -133,8 +133,7 @@ class Monitor:
|
||||
available_resources, resource_load)
|
||||
else:
|
||||
logger.warning(
|
||||
"Monitor: "
|
||||
"could not find ip for client {}".format(client_id))
|
||||
f"Monitor: could not find ip for client {client_id}")
|
||||
self.handle_resource_demands(message.resource_load_by_shape)
|
||||
|
||||
def xray_job_notification_handler(self, unused_channel, data):
|
||||
@@ -366,8 +365,8 @@ if __name__ == "__main__":
|
||||
redis_client = ray.services.create_redis_client(
|
||||
args.redis_address, password=args.redis_password)
|
||||
traceback_str = ray.utils.format_error_message(traceback.format_exc())
|
||||
message = "The monitor failed with the following error:\n{}".format(
|
||||
traceback_str)
|
||||
message = ("The monitor failed with the "
|
||||
f"following error:\n{traceback_str}")
|
||||
ray.utils.push_error_to_driver_through_redis(
|
||||
redis_client, ray_constants.MONITOR_DIED_ERROR, message)
|
||||
raise e
|
||||
|
||||
+12
-18
@@ -135,8 +135,7 @@ class Node:
|
||||
# date including microsecond
|
||||
date_str = datetime.datetime.today().strftime(
|
||||
"%Y-%m-%d_%H-%M-%S_%f")
|
||||
self.session_name = "session_{date_str}_{pid}".format(
|
||||
pid=os.getpid(), date_str=date_str)
|
||||
self.session_name = f"session_{date_str}_{os.getpid()}"
|
||||
else:
|
||||
redis_client = self.create_redis_client()
|
||||
self.session_name = ray.utils.decode(
|
||||
@@ -328,8 +327,7 @@ class Node:
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Get a unique identifier for this node."""
|
||||
return "{}:{}".format(self.node_ip_address,
|
||||
self._plasma_store_socket_name)
|
||||
return f"{self.node_ip_address}:{self._plasma_store_socket_name}"
|
||||
|
||||
@property
|
||||
def webui_url(self):
|
||||
@@ -472,8 +470,8 @@ class Node:
|
||||
log_stderr = self._make_inc_temp(
|
||||
suffix=".err", prefix=name, directory_name=self._logs_dir)
|
||||
else:
|
||||
log_stdout = os.path.join(self._logs_dir, "{}.out".format(name))
|
||||
log_stderr = os.path.join(self._logs_dir, "{}.err".format(name))
|
||||
log_stdout = os.path.join(self._logs_dir, f"{name}.out")
|
||||
log_stderr = os.path.join(self._logs_dir, f"{name}.err")
|
||||
return log_stdout, log_stderr
|
||||
|
||||
def _get_unused_port(self, close_on_exit=True):
|
||||
@@ -516,8 +514,8 @@ class Node:
|
||||
is_mac = sys.platform.startswith("darwin")
|
||||
if sys.platform == "win32":
|
||||
if socket_path is None:
|
||||
result = "tcp://{}:{}".format(self._localhost,
|
||||
self._get_unused_port()[0])
|
||||
result = (f"tcp://{self._localhost}"
|
||||
f":{self._get_unused_port()[0]}")
|
||||
else:
|
||||
if socket_path is None:
|
||||
result = self._make_inc_temp(
|
||||
@@ -554,8 +552,7 @@ class Node:
|
||||
redis_log_files = [self.get_log_file_handles("redis", unique=True)]
|
||||
for i in range(self._ray_params.num_redis_shards):
|
||||
redis_log_files.append(
|
||||
self.get_log_file_handles(
|
||||
"redis-shard_{}".format(i), unique=True))
|
||||
self.get_log_file_handles(f"redis-shard_{i}", unique=True))
|
||||
|
||||
(self._redis_address, redis_shards,
|
||||
process_infos) = ray.services.start_redis(
|
||||
@@ -752,8 +749,7 @@ class Node:
|
||||
ray.utils.binary_to_hex(worker_id),
|
||||
ray.utils.binary_to_hex(job_id), os.getpid())
|
||||
else:
|
||||
name = "worker-{}-{}".format(
|
||||
ray.utils.binary_to_hex(worker_id), os.getpid())
|
||||
name = f"worker-{ray.utils.binary_to_hex(worker_id)}-{os.getpid()}"
|
||||
|
||||
worker_stdout_file, worker_stderr_file = self._get_log_file_names(
|
||||
name, unique=False)
|
||||
@@ -779,9 +775,8 @@ class Node:
|
||||
|
||||
def start_head_processes(self):
|
||||
"""Start head processes on the node."""
|
||||
logger.debug(
|
||||
"Process STDOUT and STDERR is being redirected to {}.".format(
|
||||
self._logs_dir))
|
||||
logger.debug(f"Process STDOUT and STDERR is being "
|
||||
f"redirected to {self._logs_dir}.")
|
||||
assert self._redis_address is None
|
||||
# If this is the head node, start the relevant head node processes.
|
||||
self.start_redis()
|
||||
@@ -797,9 +792,8 @@ class Node:
|
||||
|
||||
def start_ray_processes(self):
|
||||
"""Start all of the processes on the node."""
|
||||
logger.debug(
|
||||
"Process STDOUT and STDERR is being redirected to {}.".format(
|
||||
self._logs_dir))
|
||||
logger.debug(f"Process STDOUT and STDERR is being "
|
||||
f"redirected to {self._logs_dir}.")
|
||||
|
||||
self.start_plasma_store()
|
||||
self.start_raylet()
|
||||
|
||||
@@ -228,8 +228,8 @@ class RayParams:
|
||||
if hasattr(self, arg):
|
||||
setattr(self, arg, kwargs[arg])
|
||||
else:
|
||||
raise ValueError("Invalid RayParams parameter in"
|
||||
" update: %s" % arg)
|
||||
raise ValueError(
|
||||
f"Invalid RayParams parameter in update: {arg}")
|
||||
|
||||
self._check_usage()
|
||||
|
||||
|
||||
@@ -217,8 +217,7 @@ class Collector(object):
|
||||
return metric
|
||||
|
||||
else:
|
||||
raise ValueError(
|
||||
"unsupported aggregation type %s" % type(agg_data))
|
||||
raise ValueError(f"unsupported aggregation type {type(agg_data)}")
|
||||
|
||||
def collect(self): # pragma: NO COVER
|
||||
"""Collect fetches the statistics from OpenCensus
|
||||
|
||||
@@ -164,7 +164,7 @@ BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)
|
||||
|
||||
LOGGER_FORMAT = (
|
||||
"%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s")
|
||||
LOGGER_FORMAT_HELP = "The logging format. default='{}'".format(LOGGER_FORMAT)
|
||||
LOGGER_FORMAT_HELP = f"The logging format. default='{LOGGER_FORMAT}'"
|
||||
LOGGER_LEVEL = "info"
|
||||
LOGGER_LEVEL_CHOICES = ["debug", "info", "warning", "error", "critical"]
|
||||
LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info',"
|
||||
|
||||
@@ -42,7 +42,7 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
|
||||
pid = request.pid
|
||||
duration = request.duration
|
||||
profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
"{}_profiling.txt".format(pid))
|
||||
f"{pid}_profiling.txt")
|
||||
process = subprocess.Popen(
|
||||
"sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope"
|
||||
.format(profiling_file_path, pid, duration),
|
||||
@@ -121,8 +121,7 @@ class Reporter:
|
||||
|
||||
_ = psutil.cpu_percent() # For initialization
|
||||
|
||||
self.redis_key = "{}.{}".format(ray.gcs_utils.REPORTER_CHANNEL,
|
||||
self.hostname)
|
||||
self.redis_key = f"{ray.gcs_utils.REPORTER_CHANNEL}.{self.hostname}"
|
||||
self.redis_client = ray.services.create_redis_client(
|
||||
redis_address, password=redis_password)
|
||||
|
||||
@@ -141,8 +140,7 @@ class Reporter:
|
||||
try:
|
||||
gpus = gpustat.new_query().gpus
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"gpustat failed to retrieve GPU information: {}".format(e))
|
||||
logger.debug(f"gpustat failed to retrieve GPU information: {e}")
|
||||
for gpu in gpus:
|
||||
# Note the keys in this dict have periods which throws
|
||||
# off javascript so we change .s to _s
|
||||
@@ -243,11 +241,11 @@ class Reporter:
|
||||
server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), ))
|
||||
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(
|
||||
self.reporter_grpc_server, server)
|
||||
port = server.add_insecure_port("[::]:{}".format(self.port))
|
||||
port = server.add_insecure_port(f"[::]:{self.port}")
|
||||
|
||||
server.start()
|
||||
# Publish the port.
|
||||
self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port)
|
||||
self.redis_client.set(f"REPORTER_PORT:{self.ip}", port)
|
||||
"""Run the reporter."""
|
||||
while True:
|
||||
try:
|
||||
|
||||
@@ -304,8 +304,8 @@ gpu model type.
|
||||
break
|
||||
pretty_name = _pretty_gpu_name(full_model_name)
|
||||
if pretty_name:
|
||||
constraint_name = "{}{}".format(
|
||||
ray_constants.RESOURCE_CONSTRAINT_PREFIX, pretty_name)
|
||||
constraint_name = (f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}"
|
||||
f"{pretty_name}")
|
||||
return {constraint_name: 1}
|
||||
return {}
|
||||
|
||||
@@ -324,8 +324,7 @@ def _get_gpu_info_string():
|
||||
if os.path.isdir(proc_gpus_path):
|
||||
gpu_dirs = os.listdir(proc_gpus_path)
|
||||
if len(gpu_dirs) > 0:
|
||||
gpu_info_path = "{}/{}/information".format(
|
||||
proc_gpus_path, gpu_dirs[0])
|
||||
gpu_info_path = f"{proc_gpus_path}/{gpu_dirs[0]}/information"
|
||||
info_str = open(gpu_info_path).read()
|
||||
return info_str
|
||||
return None
|
||||
|
||||
@@ -33,7 +33,7 @@ def check_no_existing_redis_clients(node_ip_address, redis_client):
|
||||
# The client table prefix must be kept in sync with the file
|
||||
# "src/ray/gcs/redis_module/ray_redis_module.cc" where it is defined.
|
||||
REDIS_CLIENT_TABLE_PREFIX = "CL:"
|
||||
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
|
||||
client_keys = redis_client.keys(f"{REDIS_CLIENT_TABLE_PREFIX}*")
|
||||
# Filter to clients on the same node and do some basic checking.
|
||||
for key in client_keys:
|
||||
info = redis_client.hgetall(key)
|
||||
@@ -580,7 +580,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
|
||||
"To connect to this Ray runtime from another node, run")
|
||||
cli_logger.print(
|
||||
cf.bold(" ray start --address='{}'{}"), redis_address,
|
||||
" --redis-password='{}'".format(redis_password)
|
||||
f" --redis-password='{redis_password}'"
|
||||
if redis_password else "")
|
||||
cli_logger.newline()
|
||||
cli_logger.print("Alternatively, use the following Python code:")
|
||||
@@ -1397,14 +1397,14 @@ def timeline(address):
|
||||
"""Take a Chrome tracing timeline for a Ray cluster."""
|
||||
if not address:
|
||||
address = services.find_redis_address_or_die()
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address)
|
||||
time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
filename = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"ray-timeline-{}.json".format(time))
|
||||
f"ray-timeline-{time}.json")
|
||||
ray.timeline(filename=filename)
|
||||
size = os.path.getsize(filename)
|
||||
logger.info("Trace file written to {} ({} bytes).".format(filename, size))
|
||||
logger.info(f"Trace file written to {filename} ({size} bytes).")
|
||||
logger.info(
|
||||
"You can open this with chrome://tracing in the Chrome browser.")
|
||||
|
||||
@@ -1419,7 +1419,7 @@ def statistics(address):
|
||||
"""Get the current metrics protobuf from a Ray cluster (developer tool)."""
|
||||
if not address:
|
||||
address = services.find_redis_address_or_die()
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address)
|
||||
|
||||
import grpc
|
||||
@@ -1429,7 +1429,7 @@ def statistics(address):
|
||||
for raylet in ray.nodes():
|
||||
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
|
||||
ray.nodes()[0]["NodeManagerPort"])
|
||||
logger.info("Querying raylet {}".format(raylet_address))
|
||||
logger.info(f"Querying raylet {raylet_address}")
|
||||
|
||||
channel = grpc.insecure_channel(raylet_address)
|
||||
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
|
||||
@@ -1455,7 +1455,7 @@ def memory(address, redis_password):
|
||||
"""Print object references held in a Ray cluster."""
|
||||
if not address:
|
||||
address = services.find_redis_address_or_die()
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address, redis_password=redis_password)
|
||||
print(ray.internal.internal_api.memory_summary())
|
||||
|
||||
@@ -1470,7 +1470,7 @@ def status(address):
|
||||
"""Print cluster status, including autoscaling info."""
|
||||
if not address:
|
||||
address = services.find_redis_address_or_die()
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address)
|
||||
print(debug_status())
|
||||
|
||||
@@ -1485,7 +1485,7 @@ def globalgc(address):
|
||||
"""Trigger Python garbage collection on all cluster workers."""
|
||||
if not address:
|
||||
address = services.find_redis_address_or_die()
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address)
|
||||
ray.internal.internal_api.global_gc()
|
||||
print("Triggered gc.collect() on all workers.")
|
||||
@@ -1582,8 +1582,7 @@ try:
|
||||
from ray.serve.scripts import serve_cli
|
||||
cli.add_command(serve_cli)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Integrating ray serve command line tool failed with {}".format(e))
|
||||
logger.debug(f"Integrating ray serve command line tool failed with {e}")
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -80,8 +80,7 @@ def _try_to_compute_deterministic_class_id(cls, depth=5):
|
||||
# class ID for this custom class on each worker, which could lead to the
|
||||
# same class definition being exported many many times.
|
||||
logger.warning(
|
||||
"WARNING: Could not produce a deterministic class ID for class "
|
||||
"{}".format(cls))
|
||||
f"WARNING: Could not produce a deterministic class ID for class {cls}")
|
||||
return hashlib.sha1(new_class_id).digest()
|
||||
|
||||
|
||||
@@ -255,9 +254,8 @@ class SerializationContext:
|
||||
try:
|
||||
error_type = int(metadata)
|
||||
except Exception:
|
||||
raise Exception(
|
||||
"Can't deserialize object: {}, metadata: {}".format(
|
||||
object_ref, metadata))
|
||||
raise Exception(f"Can't deserialize object: {object_ref}, "
|
||||
f"metadata: {metadata}")
|
||||
|
||||
# RayTaskError is serialized with pickle5 in the data field.
|
||||
# TODO (kfstorm): exception serialization should be language
|
||||
|
||||
+60
-68
@@ -163,8 +163,8 @@ def find_redis_address_or_die():
|
||||
pass
|
||||
if len(redis_addresses) > 1:
|
||||
raise ConnectionError(
|
||||
"Found multiple active Ray instances: {}. ".format(redis_addresses)
|
||||
+ "Please specify the one to connect to by setting `address`.")
|
||||
f"Found multiple active Ray instances: {redis_addresses}. "
|
||||
"Please specify the one to connect to by setting `address`.")
|
||||
sys.exit(1)
|
||||
elif not redis_addresses:
|
||||
raise ConnectionError(
|
||||
@@ -419,27 +419,26 @@ def start_ray_process(command,
|
||||
the process that was started.
|
||||
"""
|
||||
# Detect which flags are set through environment variables.
|
||||
valgrind_env_var = "RAY_{}_VALGRIND".format(process_type.upper())
|
||||
valgrind_env_var = f"RAY_{process_type.upper()}_VALGRIND"
|
||||
if os.environ.get(valgrind_env_var) == "1":
|
||||
logger.info("Detected environment variable '%s'.", valgrind_env_var)
|
||||
use_valgrind = True
|
||||
valgrind_profiler_env_var = "RAY_{}_VALGRIND_PROFILER".format(
|
||||
process_type.upper())
|
||||
valgrind_profiler_env_var = f"RAY_{process_type.upper()}_VALGRIND_PROFILER"
|
||||
if os.environ.get(valgrind_profiler_env_var) == "1":
|
||||
logger.info("Detected environment variable '%s'.",
|
||||
valgrind_profiler_env_var)
|
||||
use_valgrind_profiler = True
|
||||
perftools_profiler_env_var = "RAY_{}_PERFTOOLS_PROFILER".format(
|
||||
process_type.upper())
|
||||
perftools_profiler_env_var = (f"RAY_{process_type.upper()}"
|
||||
"_PERFTOOLS_PROFILER")
|
||||
if os.environ.get(perftools_profiler_env_var) == "1":
|
||||
logger.info("Detected environment variable '%s'.",
|
||||
perftools_profiler_env_var)
|
||||
use_perftools_profiler = True
|
||||
tmux_env_var = "RAY_{}_TMUX".format(process_type.upper())
|
||||
tmux_env_var = f"RAY_{process_type.upper()}_TMUX"
|
||||
if os.environ.get(tmux_env_var) == "1":
|
||||
logger.info("Detected environment variable '%s'.", tmux_env_var)
|
||||
use_tmux = True
|
||||
gdb_env_var = "RAY_{}_GDB".format(process_type.upper())
|
||||
gdb_env_var = f"RAY_{process_type.upper()}_GDB"
|
||||
if os.environ.get(gdb_env_var) == "1":
|
||||
logger.info("Detected environment variable '%s'.", gdb_env_var)
|
||||
use_gdb = True
|
||||
@@ -468,14 +467,13 @@ def start_ray_process(command,
|
||||
"If 'use_gdb' is true, then 'use_tmux' must be true as well.")
|
||||
|
||||
# TODO(suquark): Any better temp file creation here?
|
||||
gdb_init_path = os.path.join(
|
||||
ray.utils.get_ray_temp_dir(), "gdb_init_{}_{}".format(
|
||||
process_type, time.time()))
|
||||
gdb_init_path = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
f"gdb_init_{process_type}_{time.time()}")
|
||||
ray_process_path = command[0]
|
||||
ray_process_args = command[1:]
|
||||
run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
|
||||
with open(gdb_init_path, "w") as gdb_init_file:
|
||||
gdb_init_file.write("run {}".format(run_args))
|
||||
gdb_init_file.write(f"run {run_args}")
|
||||
command = ["gdb", ray_process_path, "-x", gdb_init_path]
|
||||
|
||||
if use_valgrind:
|
||||
@@ -499,7 +497,7 @@ def start_ray_process(command,
|
||||
# The command has to be created exactly as below to ensure that it
|
||||
# works on all versions of tmux. (Tested with tmux 1.8-5, travis'
|
||||
# version, and tmux 2.1)
|
||||
command = ["tmux", "new-session", "-d", "{}".format(" ".join(command))]
|
||||
command = ["tmux", "new-session", "-d", f"{' '.join(command)}"]
|
||||
|
||||
if fate_share:
|
||||
assert ray.utils.detect_fate_sharing_support(), (
|
||||
@@ -1059,8 +1057,8 @@ def start_log_monitor(redis_address,
|
||||
sys.executable,
|
||||
"-u",
|
||||
log_monitor_filepath,
|
||||
"--redis-address={}".format(redis_address),
|
||||
"--logs-dir={}".format(logs_dir),
|
||||
f"--redis-address={redis_address}",
|
||||
f"--logs-dir={logs_dir}",
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis-password", redis_password]
|
||||
@@ -1099,8 +1097,8 @@ def start_reporter(redis_address,
|
||||
os.path.dirname(os.path.abspath(__file__)), "reporter.py")
|
||||
command = [
|
||||
sys.executable, "-u", reporter_filepath,
|
||||
"--redis-address={}".format(redis_address), "--port={}".format(port),
|
||||
"--metrics-export-port={}".format(metrics_export_port)
|
||||
f"--redis-address={redis_address}", f"--port={port}",
|
||||
f"--metrics-export-port={metrics_export_port}"
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis-password", redis_password]
|
||||
@@ -1159,8 +1157,8 @@ def start_dashboard(require_dashboard,
|
||||
port_test_socket.bind(("127.0.0.1", port))
|
||||
port_test_socket.close()
|
||||
except socket.error:
|
||||
raise ValueError("The given dashboard port {}"
|
||||
" is already in use".format(port))
|
||||
raise ValueError(
|
||||
f"The given dashboard port {port} is already in use")
|
||||
|
||||
dashboard_filepath = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), "dashboard/dashboard.py")
|
||||
@@ -1168,10 +1166,10 @@ def start_dashboard(require_dashboard,
|
||||
sys.executable,
|
||||
"-u",
|
||||
dashboard_filepath,
|
||||
"--host={}".format(host),
|
||||
"--port={}".format(port),
|
||||
"--redis-address={}".format(redis_address),
|
||||
"--temp-dir={}".format(temp_dir),
|
||||
f"--host={host}",
|
||||
f"--port={port}",
|
||||
f"--redis-address={redis_address}",
|
||||
f"--temp-dir={temp_dir}",
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis-password", redis_password]
|
||||
@@ -1198,8 +1196,8 @@ def start_dashboard(require_dashboard,
|
||||
stderr_file=stderr_file,
|
||||
fate_share=fate_share)
|
||||
|
||||
dashboard_url = "{}:{}".format(
|
||||
host if host != "0.0.0.0" else get_node_ip_address(), port)
|
||||
dashboard_url = (
|
||||
f"{host if host != '0.0.0.0' else get_node_ip_address()}:{port}")
|
||||
|
||||
cli_logger.labeled_value("Dashboard URL", cf.underlined("http://{}"),
|
||||
dashboard_url)
|
||||
@@ -1244,14 +1242,14 @@ def start_gcs_server(redis_address,
|
||||
|
||||
command = [
|
||||
GCS_SERVER_EXECUTABLE,
|
||||
"--redis_address={}".format(gcs_ip_address),
|
||||
"--redis_port={}".format(gcs_port),
|
||||
"--config_list={}".format(config_str),
|
||||
"--gcs_server_port={}".format(gcs_server_port),
|
||||
"--metrics-agent-port={}".format(metrics_agent_port),
|
||||
f"--redis_address={gcs_ip_address}",
|
||||
f"--redis_port={gcs_port}",
|
||||
f"--config_list={config_str}",
|
||||
f"--gcs_server_port={gcs_server_port}",
|
||||
f"--metrics-agent-port={metrics_agent_port}",
|
||||
]
|
||||
if redis_password:
|
||||
command += ["--redis_password={}".format(redis_password)]
|
||||
command += [f"--redis_password={redis_password}"]
|
||||
process_info = start_ray_process(
|
||||
command,
|
||||
ray_constants.PROCESS_TYPE_GCS_SERVER,
|
||||
@@ -1372,18 +1370,15 @@ def start_raylet(redis_address,
|
||||
|
||||
# Create the command that the Raylet will use to start workers.
|
||||
start_worker_command = [
|
||||
sys.executable, worker_path,
|
||||
"--node-ip-address={}".format(node_ip_address),
|
||||
"--node-manager-port={}".format(node_manager_port),
|
||||
"--object-store-name={}".format(plasma_store_name),
|
||||
"--raylet-name={}".format(raylet_name),
|
||||
"--redis-address={}".format(redis_address),
|
||||
"--config-list={}".format(config_str),
|
||||
"--temp-dir={}".format(temp_dir),
|
||||
sys.executable, worker_path, f"--node-ip-address={node_ip_address}",
|
||||
f"--node-manager-port={node_manager_port}",
|
||||
f"--object-store-name={plasma_store_name}",
|
||||
f"--raylet-name={raylet_name}", f"--redis-address={redis_address}",
|
||||
f"--config-list={config_str}", f"--temp-dir={temp_dir}",
|
||||
f"--metrics-agent-port={metrics_agent_port}"
|
||||
]
|
||||
if redis_password:
|
||||
start_worker_command += ["--redis-password={}".format(redis_password)]
|
||||
start_worker_command += [f"--redis-password={redis_password}"]
|
||||
|
||||
# If the object manager port is None, then use 0 to cause the object
|
||||
# manager to choose its own port.
|
||||
@@ -1405,28 +1400,26 @@ def start_raylet(redis_address,
|
||||
|
||||
command = [
|
||||
RAYLET_EXECUTABLE,
|
||||
"--raylet_socket_name={}".format(raylet_name),
|
||||
"--store_socket_name={}".format(plasma_store_name),
|
||||
"--object_manager_port={}".format(object_manager_port),
|
||||
"--min_worker_port={}".format(min_worker_port),
|
||||
"--max_worker_port={}".format(max_worker_port),
|
||||
"--node_manager_port={}".format(node_manager_port),
|
||||
"--node_ip_address={}".format(node_ip_address),
|
||||
"--redis_address={}".format(gcs_ip_address),
|
||||
"--redis_port={}".format(gcs_port),
|
||||
"--num_initial_workers={}".format(num_initial_workers),
|
||||
"--maximum_startup_concurrency={}".format(maximum_startup_concurrency),
|
||||
"--static_resource_list={}".format(resource_argument),
|
||||
"--config_list={}".format(config_str),
|
||||
"--python_worker_command={}".format(
|
||||
subprocess.list2cmdline(start_worker_command)),
|
||||
"--java_worker_command={}".format(
|
||||
subprocess.list2cmdline(java_worker_command)),
|
||||
"--redis_password={}".format(redis_password or ""),
|
||||
"--temp_dir={}".format(temp_dir),
|
||||
"--session_dir={}".format(session_dir),
|
||||
"--metrics-agent-port={}".format(metrics_agent_port),
|
||||
"--metrics_export_port={}".format(metrics_export_port),
|
||||
f"--raylet_socket_name={raylet_name}",
|
||||
f"--store_socket_name={plasma_store_name}",
|
||||
f"--object_manager_port={object_manager_port}",
|
||||
f"--min_worker_port={min_worker_port}",
|
||||
f"--max_worker_port={max_worker_port}",
|
||||
f"--node_manager_port={node_manager_port}",
|
||||
f"--node_ip_address={node_ip_address}",
|
||||
f"--redis_address={gcs_ip_address}",
|
||||
f"--redis_port={gcs_port}",
|
||||
f"--num_initial_workers={num_initial_workers}",
|
||||
f"--maximum_startup_concurrency={maximum_startup_concurrency}",
|
||||
f"--static_resource_list={resource_argument}",
|
||||
f"--config_list={config_str}",
|
||||
f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
|
||||
f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa
|
||||
f"--redis_password={redis_password or ''}",
|
||||
f"--temp_dir={temp_dir}",
|
||||
f"--session_dir={session_dir}",
|
||||
f"--metrics-agent-port={metrics_agent_port}",
|
||||
f"--metrics_export_port={metrics_export_port}",
|
||||
]
|
||||
if start_initial_python_workers_for_first_job:
|
||||
command.append("--num_initial_python_workers_for_first_job={}".format(
|
||||
@@ -1436,8 +1429,8 @@ def start_raylet(redis_address,
|
||||
plasma_directory, object_store_memory = determine_plasma_store_config(
|
||||
resource_spec.object_store_memory, plasma_directory, huge_pages)
|
||||
command += [
|
||||
"--object_store_memory={}".format(object_store_memory),
|
||||
"--plasma_directory={}".format(plasma_directory),
|
||||
f"--object_store_memory={object_store_memory}",
|
||||
f"--plasma_directory={plasma_directory}",
|
||||
]
|
||||
if huge_pages:
|
||||
command.append("--huge_pages")
|
||||
@@ -1607,9 +1600,8 @@ def determine_plasma_store_config(object_store_memory,
|
||||
"plasma_directory is set.")
|
||||
|
||||
if not os.path.isdir(plasma_directory):
|
||||
raise ValueError(
|
||||
"The file {} does not exist or is not a directory.".format(
|
||||
plasma_directory))
|
||||
raise ValueError(f"The file {plasma_directory} does not "
|
||||
"exist or is not a directory.")
|
||||
|
||||
if huge_pages and plasma_directory is None:
|
||||
raise ValueError("If huge_pages is True, then the "
|
||||
|
||||
@@ -11,16 +11,15 @@ import ray
|
||||
|
||||
|
||||
def do_link(package, force=False, local_path=""):
|
||||
package_home = os.path.abspath(
|
||||
os.path.join(ray.__file__, "../{}".format(package)))
|
||||
package_home = os.path.abspath(os.path.join(ray.__file__, f"../{package}"))
|
||||
local_home = os.path.abspath(
|
||||
os.path.join(__file__, local_path + "../{}".format(package)))
|
||||
os.path.join(__file__, local_path + f"../{package}"))
|
||||
if not os.path.isdir(package_home):
|
||||
print("{} does not exist. Continuing to link.".format(package_home))
|
||||
print(f"{package_home} does not exist. Continuing to link.")
|
||||
assert os.path.isdir(local_home), local_home
|
||||
if not force and not click.confirm(
|
||||
"This will replace:\n {}\nwith a symlink to:\n {}".format(
|
||||
package_home, local_home),
|
||||
f"This will replace:\n {package_home}\nwith "
|
||||
f"a symlink to:\n {local_home}",
|
||||
default=True):
|
||||
return
|
||||
# Windows: Create directory junction.
|
||||
@@ -37,8 +36,8 @@ def do_link(package, force=False, local_path=""):
|
||||
else:
|
||||
sudo = []
|
||||
if not os.access(os.path.dirname(package_home), os.W_OK):
|
||||
print("You don't have write permission to {}, using sudo:".format(
|
||||
package_home))
|
||||
print("You don't have write permission "
|
||||
f"to {package_home}, using sudo:")
|
||||
sudo = ["sudo"]
|
||||
subprocess.check_call(sudo + ["rm", "-rf", package_home])
|
||||
subprocess.check_call(sudo + ["ln", "-s", local_home, package_home])
|
||||
|
||||
@@ -51,8 +51,8 @@ def get_signature(func):
|
||||
for attr in attrs:
|
||||
setattr(func, attr, getattr(original_func, attr))
|
||||
else:
|
||||
raise TypeError("{!r} is not a Python function we can process"
|
||||
.format(func))
|
||||
raise TypeError(
|
||||
f"{func!r} is not a Python function we can process")
|
||||
|
||||
return inspect.signature(func)
|
||||
|
||||
|
||||
+2
-2
@@ -99,8 +99,8 @@ class GlobalState:
|
||||
continue
|
||||
num_redis_shards = int(num_redis_shards)
|
||||
assert num_redis_shards >= 1, (
|
||||
"Expected at least one Redis "
|
||||
"shard, found {}.".format(num_redis_shards))
|
||||
f"Expected at least one Redis shard, found {num_redis_shards}."
|
||||
)
|
||||
|
||||
# Attempt to get all of the Redis shards.
|
||||
redis_shard_addresses = self.redis_client.lrange(
|
||||
|
||||
@@ -121,7 +121,7 @@ def wait_for_pid_to_exit(pid, timeout=20):
|
||||
return
|
||||
time.sleep(0.1)
|
||||
raise RayTestTimeoutException(
|
||||
"Timed out while waiting for process {} to exit.".format(pid))
|
||||
f"Timed out while waiting for process {pid} to exit.")
|
||||
|
||||
|
||||
def wait_for_children_of_pid(pid, num_children=1, timeout=20):
|
||||
|
||||
+18
-22
@@ -53,8 +53,8 @@ def from_range(n: int, num_shards: int = 2,
|
||||
else:
|
||||
end = (i + 1) * shard_size
|
||||
generators.append(range(start, end))
|
||||
name = "from_range[{}, shards={}{}]".format(
|
||||
n, num_shards, ", repeat=True" if repeat else "")
|
||||
name = (f"from_range[{n}, shards={num_shards}"
|
||||
f"{', repeat=True' if repeat else ''}]")
|
||||
return from_iterators(
|
||||
generators,
|
||||
repeat=repeat,
|
||||
@@ -111,7 +111,7 @@ def from_actors(actors: List["ray.actor.ActorHandle"],
|
||||
name (str): Optional name to give the iterator.
|
||||
"""
|
||||
if not name:
|
||||
name = "from_actors[shards={}]".format(len(actors))
|
||||
name = f"from_actors[shards={len(actors)}]"
|
||||
return ParallelIterator([_ActorSet(actors, [])], name, parent_iterators=[])
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ class ParallelIterator(Generic[T]):
|
||||
return repr(self)
|
||||
|
||||
def __repr__(self):
|
||||
return "ParallelIterator[{}]".format(self.name)
|
||||
return f"ParallelIterator[{self.name}]"
|
||||
|
||||
def _with_transform(self, local_it_fn, name):
|
||||
"""Helper function to create new Parallel Iterator"""
|
||||
@@ -290,7 +290,7 @@ class ParallelIterator(Generic[T]):
|
||||
... [0, 1, 2, 3]
|
||||
"""
|
||||
return self._with_transform(lambda local_it: local_it.batch(n),
|
||||
".batch({})".format(n))
|
||||
f".batch({n})")
|
||||
|
||||
def flatten(self) -> "ParallelIterator[T[0]]":
|
||||
"""Flatten batches of items into individual items.
|
||||
@@ -421,8 +421,7 @@ class ParallelIterator(Generic[T]):
|
||||
def make_gen_i(i):
|
||||
return lambda: base_iterator(num_partitions, i)
|
||||
|
||||
name = self.name + ".repartition[num_partitions={}]".format(
|
||||
num_partitions)
|
||||
name = self.name + f".repartition[num_partitions={num_partitions}]"
|
||||
|
||||
generators = [make_gen_i(s) for s in range(num_partitions)]
|
||||
worker_cls = ray.remote(ParallelIteratorWorker)
|
||||
@@ -449,7 +448,7 @@ class ParallelIterator(Generic[T]):
|
||||
... 2
|
||||
"""
|
||||
it = self.batch_across_shards().flatten()
|
||||
it.name = "{}.gather_sync()".format(self)
|
||||
it.name = f"{self}.gather_sync()"
|
||||
return it
|
||||
|
||||
def batch_across_shards(self) -> "LocalIterator[List[T]]":
|
||||
@@ -488,7 +487,7 @@ class ParallelIterator(Generic[T]):
|
||||
yield results
|
||||
futures = [a.par_iter_next.remote() for a in active]
|
||||
|
||||
name = "{}.batch_across_shards()".format(self)
|
||||
name = f"{self}.batch_across_shards()"
|
||||
return LocalIterator(base_iterator, SharedMetrics(), name=name)
|
||||
|
||||
def gather_async(self, batch_ms=0, num_async=1) -> "LocalIterator[T]":
|
||||
@@ -560,7 +559,7 @@ class ParallelIterator(Generic[T]):
|
||||
if timeout is not None:
|
||||
yield _NextValueNotReady()
|
||||
|
||||
name = "{}.gather_async()".format(self)
|
||||
name = f"{self}.gather_async()"
|
||||
local_iter = LocalIterator(base_iterator, SharedMetrics(), name=name)
|
||||
return local_iter
|
||||
|
||||
@@ -576,8 +575,7 @@ class ParallelIterator(Generic[T]):
|
||||
"""Return an iterator that is the union of this and the other."""
|
||||
if not isinstance(other, ParallelIterator):
|
||||
raise TypeError(
|
||||
"other must be of type ParallelIterator, got {}".format(
|
||||
type(other)))
|
||||
f"other must be of type ParallelIterator, got {type(other)}")
|
||||
actor_sets = []
|
||||
actor_sets.extend(self.actor_sets)
|
||||
actor_sets.extend(other.actor_sets)
|
||||
@@ -585,7 +583,7 @@ class ParallelIterator(Generic[T]):
|
||||
# keep an explicit reference to its parent iterator
|
||||
return ParallelIterator(
|
||||
actor_sets,
|
||||
"ParallelUnion[{}, {}]".format(self, other),
|
||||
f"ParallelUnion[{self}, {other}]",
|
||||
parent_iterators=self.parent_iterators + other.parent_iterators)
|
||||
|
||||
def select_shards(self,
|
||||
@@ -608,7 +606,7 @@ class ParallelIterator(Generic[T]):
|
||||
new_actor_set = _ActorSet(new_actors, old_actor_set.transforms)
|
||||
return ParallelIterator(
|
||||
[new_actor_set],
|
||||
"{}.select_shards({} total)".format(self, len(shards_to_keep)),
|
||||
f"{self}.select_shards({len(shards_to_keep)} total)",
|
||||
parent_iterators=self.parent_iterators)
|
||||
|
||||
def num_shards(self) -> int:
|
||||
@@ -674,7 +672,7 @@ class ParallelIterator(Generic[T]):
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
name = self.name + ".shard[{}]".format(shard_index)
|
||||
name = self.name + f".shard[{shard_index}]"
|
||||
return LocalIterator(base_iterator, SharedMetrics(), name=name)
|
||||
|
||||
|
||||
@@ -761,7 +759,7 @@ class LocalIterator(Generic[T]):
|
||||
return repr(self)
|
||||
|
||||
def __repr__(self):
|
||||
return "LocalIterator[{}]".format(self.name)
|
||||
return f"LocalIterator[{self.name}]"
|
||||
|
||||
def transform(self, fn: Callable[[Iterable[T]], Iterable[U]]
|
||||
) -> "LocalIterator[U]":
|
||||
@@ -871,7 +869,7 @@ class LocalIterator(Generic[T]):
|
||||
self.base_iterator,
|
||||
self.shared_metrics,
|
||||
self.local_transforms + [apply_batch],
|
||||
name=self.name + ".batch({})".format(n))
|
||||
name=self.name + f".batch({n})")
|
||||
|
||||
def flatten(self) -> "LocalIterator[T[0]]":
|
||||
def apply_flatten(it):
|
||||
@@ -1013,7 +1011,7 @@ class LocalIterator(Generic[T]):
|
||||
LocalIterator(
|
||||
make_next(i),
|
||||
self.shared_metrics, [],
|
||||
name=self.name + ".duplicate[{}]".format(i)))
|
||||
name=self.name + f".duplicate[{i}]"))
|
||||
|
||||
return iterators
|
||||
|
||||
@@ -1038,8 +1036,7 @@ class LocalIterator(Generic[T]):
|
||||
for it in others:
|
||||
if not isinstance(it, LocalIterator):
|
||||
raise ValueError(
|
||||
"other must be of type LocalIterator, got {}".format(
|
||||
type(it)))
|
||||
f"other must be of type LocalIterator, got {type(it)}")
|
||||
|
||||
active = []
|
||||
parent_iters = [self] + list(others)
|
||||
@@ -1090,8 +1087,7 @@ class LocalIterator(Generic[T]):
|
||||
return LocalIterator(
|
||||
build_union,
|
||||
shared_metrics, [],
|
||||
name="LocalUnion[{}, {}]".format(self, ", ".join(map(str,
|
||||
others))))
|
||||
name=f"LocalUnion[{self}, {', '.join(map(str, others))}]")
|
||||
|
||||
|
||||
class ParallelIteratorWorker(object):
|
||||
|
||||
@@ -168,7 +168,7 @@ class AsyncResult:
|
||||
"""
|
||||
|
||||
if not self.ready():
|
||||
raise ValueError("{0!r} not ready".format(self))
|
||||
raise ValueError(f"{self!r} not ready")
|
||||
return not self._result_thread.got_error()
|
||||
|
||||
|
||||
@@ -355,8 +355,8 @@ class Pool:
|
||||
os.environ[RAY_ADDRESS_ENV]))
|
||||
ray.init()
|
||||
elif ray_address is not None:
|
||||
logger.info("Connecting to ray cluster at address='{}'".format(
|
||||
ray_address))
|
||||
logger.info(
|
||||
f"Connecting to ray cluster at address='{ray_address}'")
|
||||
ray.init(address=ray_address)
|
||||
# Local mode.
|
||||
else:
|
||||
|
||||
@@ -158,7 +158,7 @@ def test_multi_model(ray_start_2_cpus, num_workers):
|
||||
data = list(iterator)
|
||||
for i, (model, optimizer) in enumerate(
|
||||
zip(self.models, self.optimizers)):
|
||||
result["model_{}".format(i)] = train(
|
||||
result[f"model_{i}"] = train(
|
||||
model=model,
|
||||
criterion=self.criterion,
|
||||
optimizer=optimizer,
|
||||
|
||||
@@ -208,7 +208,7 @@ if __name__ == "__main__":
|
||||
# Trains num epochs
|
||||
train_stats1 = trainer.train()
|
||||
train_stats1.update(trainer.validate())
|
||||
print("iter {}:".format(i), train_stats1)
|
||||
print(f"iter {i}:", train_stats1)
|
||||
|
||||
dt = (time.time() - training_start) / 3
|
||||
print(f"Training on workers takes: {dt:.3f} seconds/epoch")
|
||||
|
||||
@@ -85,10 +85,7 @@ class TFTrainer:
|
||||
ports = ray.get(
|
||||
[worker.find_free_port.remote() for worker in self.workers])
|
||||
|
||||
urls = [
|
||||
"{ip}:{port}".format(ip=ips[i], port=ports[i])
|
||||
for i in range(len(self.workers))
|
||||
]
|
||||
urls = [f"{ips[i]}:{ports[i]}" for i in range(len(self.workers))]
|
||||
|
||||
# Get setup tasks in order to throw errors on failure
|
||||
ray.get([
|
||||
|
||||
@@ -230,7 +230,7 @@ def reserve_resources(num_cpus, num_gpus, retries=20):
|
||||
cuda_device_set = {}
|
||||
match_devices = bool(cuda_devices)
|
||||
if match_devices:
|
||||
logger.debug("Found set devices: {}".format(cuda_devices))
|
||||
logger.debug(f"Found set devices: {cuda_devices}")
|
||||
assert isinstance(cuda_devices, str)
|
||||
cuda_device_set = set(cuda_devices.split(","))
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ if __name__ == "__main__":
|
||||
num_workers = 2 if args.local else int(ray.cluster_resources().get(device))
|
||||
from ray.util.sgd.torch.examples.train_example import LinearDataset
|
||||
|
||||
print("Model: %s" % args.model)
|
||||
print(f"Model: {args.model}")
|
||||
print("Batch size: %d" % args.batch_size)
|
||||
print("Number of %ss: %d" % (device, num_workers))
|
||||
|
||||
@@ -131,7 +131,7 @@ if __name__ == "__main__":
|
||||
# Results
|
||||
img_sec_mean = np.mean(img_secs)
|
||||
img_sec_conf = 1.96 * np.std(img_secs)
|
||||
print("Img/sec per %s: %.1f +-%.1f" % (device, img_sec_mean, img_sec_conf))
|
||||
print(f"Img/sec per {device}: {img_sec_mean:.1f} +-{img_sec_conf:.1f}")
|
||||
print("Total img/sec on %d %s(s): %.1f +-%.1f" %
|
||||
(num_workers, device, num_workers * img_sec_mean,
|
||||
num_workers * img_sec_conf))
|
||||
|
||||
@@ -77,7 +77,7 @@ def benchmark_step():
|
||||
optimizer.step()
|
||||
|
||||
|
||||
print("Model: %s" % args.model)
|
||||
print(f"Model: {args.model}")
|
||||
print("Batch size: %d" % args.batch_size)
|
||||
device = "GPU"
|
||||
print("Number of %ss: %d" % (device, args.num_gpus))
|
||||
@@ -98,7 +98,7 @@ for x in range(args.num_iters):
|
||||
# Results
|
||||
img_sec_mean = np.mean(img_secs)
|
||||
img_sec_conf = 1.96 * np.std(img_secs)
|
||||
print("Img/sec per %s: %.1f +-%.1f" % (device, img_sec_mean, img_sec_conf))
|
||||
print(f"Img/sec per {device}: {img_sec_mean:.1f} +-{img_sec_conf:.1f}")
|
||||
print("Total img/sec on %d %s(s): %.1f +-%.1f" % (
|
||||
args.num_gpus,
|
||||
device,
|
||||
|
||||
@@ -118,7 +118,7 @@ def log(s, nl=True):
|
||||
print(s, end="\n" if nl else "")
|
||||
|
||||
|
||||
log("Model: %s" % args.model)
|
||||
log(f"Model: {args.model}")
|
||||
log("Batch size: %d" % args.batch_size)
|
||||
device = "GPU" if args.cuda else "CPU"
|
||||
log("Number of %ss: %d" % (device, hvd.size()))
|
||||
@@ -139,6 +139,6 @@ for x in range(args.num_iters):
|
||||
# Results
|
||||
img_sec_mean = np.mean(img_secs)
|
||||
img_sec_conf = 1.96 * np.std(img_secs)
|
||||
log("Img/sec per %s: %.1f +-%.1f" % (device, img_sec_mean, img_sec_conf))
|
||||
log(f"Img/sec per {device}: {img_sec_mean:.1f} +-{img_sec_conf:.1f}")
|
||||
log("Total img/sec on %d %s(s): %.1f +-%.1f" %
|
||||
(hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))
|
||||
|
||||
@@ -20,13 +20,12 @@ def mock_data(train_dir, val_dir):
|
||||
|
||||
def mock_class(base, n):
|
||||
random_cls = random.randint(per_cls * n, per_cls * n + per_cls)
|
||||
sub_dir = join(base, "n{:08d}".format(random_cls))
|
||||
sub_dir = join(base, f"n{random_cls:08d}")
|
||||
os.makedirs(sub_dir, exist_ok=True)
|
||||
|
||||
for i in range(total_imgs):
|
||||
random_img = random.randint(per_img * i, per_img * i + per_img)
|
||||
file = join(sub_dir,
|
||||
"ILSVRC2012_val_{:08d}.JPEG".format(random_img))
|
||||
file = join(sub_dir, f"ILSVRC2012_val_{random_img:08d}.JPEG")
|
||||
|
||||
PIL.Image.fromarray(np.zeros((375, 500, 3),
|
||||
dtype=np.uint8)).save(file)
|
||||
|
||||
@@ -193,11 +193,11 @@ def main(args):
|
||||
state_dict = trainer.state_dict()
|
||||
state_dict.update(epoch=epoch, args=args)
|
||||
torch.save(state_dict,
|
||||
os.path.join(args.output_dir, "model_{}.pth".format(epoch)))
|
||||
os.path.join(args.output_dir, f"model_{epoch}.pth"))
|
||||
|
||||
total_time = time.time() - start_time
|
||||
total_time_str = str(datetime.timedelta(seconds=int(total_time)))
|
||||
print("Training time {}".format(total_time_str))
|
||||
print(f"Training time {total_time_str}")
|
||||
|
||||
|
||||
def parse_args():
|
||||
|
||||
@@ -49,8 +49,8 @@ class ConfusionMatrix(object):
|
||||
'IoU: {}\n'
|
||||
'mean IoU: {:.1f}').format(
|
||||
acc_global.item() * 100,
|
||||
['{:.1f}'.format(i) for i in (acc * 100).tolist()],
|
||||
['{:.1f}'.format(i) for i in (iu * 100).tolist()],
|
||||
[f'{i:.1f}' for i in (acc * 100).tolist()],
|
||||
[f'{i:.1f}' for i in (iu * 100).tolist()],
|
||||
iu.mean().item() * 100)
|
||||
|
||||
|
||||
|
||||
@@ -147,7 +147,7 @@ def data_creator(config):
|
||||
if args.tokenizer_name else args.model_name_or_path,
|
||||
cache_dir=args.cache_dir if args.cache_dir else None,
|
||||
)
|
||||
logger.info("tokenizer instantiation time: {}".format(time.time() - start))
|
||||
logger.info(f"tokenizer instantiation time: {time.time() - start}")
|
||||
|
||||
train_dataset = load_and_cache_examples(
|
||||
args, args.task_name, tokenizer, evaluate=False)
|
||||
@@ -322,7 +322,7 @@ def main():
|
||||
# Prepare GLUE task
|
||||
args.task_name = args.task_name.lower()
|
||||
if args.task_name not in processors:
|
||||
raise ValueError("Task not found: %s" % (args.task_name))
|
||||
raise ValueError(f"Task not found: {args.task_name}")
|
||||
args.output_mode = output_modes[args.task_name]
|
||||
|
||||
logging.basicConfig(
|
||||
|
||||
@@ -133,7 +133,7 @@ def save_and_evaluate_checkpoints(args, model, tokenizer):
|
||||
model.to(args.device)
|
||||
result = evaluate(args, model, tokenizer, prefix=prefix)
|
||||
result = dict(
|
||||
(k + "_{}".format(global_step), v) for k, v in result.items())
|
||||
(k + f"_{global_step}", v) for k, v in result.items())
|
||||
results.update(result)
|
||||
|
||||
return results
|
||||
@@ -163,7 +163,7 @@ def evaluate(args, model, tokenizer, prefix=""):
|
||||
batch_size=args.eval_batch_size)
|
||||
|
||||
# Eval!
|
||||
logger.info("***** Running evaluation {} *****".format(prefix))
|
||||
logger.info(f"***** Running evaluation {prefix} *****")
|
||||
logger.info(" Num examples = %d", len(eval_dataset))
|
||||
logger.info(" Batch size = %d", args.eval_batch_size)
|
||||
eval_loss = 0.0
|
||||
|
||||
@@ -82,7 +82,7 @@ class TorchRunner:
|
||||
return loaders
|
||||
else:
|
||||
raise ValueError(
|
||||
"Number of loaders must be <= 2. Got {}".format(loaders))
|
||||
f"Number of loaders must be <= 2. Got {loaders}")
|
||||
# No great way of checking type otherwise
|
||||
return loaders, None
|
||||
|
||||
@@ -146,7 +146,7 @@ class TorchRunner:
|
||||
if not isinstance(self.models, Iterable):
|
||||
self.models = [self.models]
|
||||
assert all(isinstance(model, nn.Module) for model in self.models), (
|
||||
"All models must be PyTorch models: {}.".format(self.models))
|
||||
f"All models must be PyTorch models: {self.models}.")
|
||||
if self.use_gpu and torch.cuda.is_available():
|
||||
self.models = [model.cuda() for model in self.models]
|
||||
|
||||
@@ -189,7 +189,7 @@ class TorchRunner:
|
||||
info=None,
|
||||
iterator=None):
|
||||
"""Runs a training epoch and updates the model parameters."""
|
||||
logger.debug("Begin Training Step {}".format(self.epochs + 1))
|
||||
logger.debug(f"Begin Training Step {self.epochs + 1}")
|
||||
info = info or {}
|
||||
self._toggle_profiling(profile=profile)
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ class TorchTrainer:
|
||||
if backend == "auto":
|
||||
backend = "nccl" if use_gpu else "gloo"
|
||||
|
||||
logger.debug("Using {} as backend.".format(backend))
|
||||
logger.debug(f"Using {backend} as backend.")
|
||||
self.backend = backend
|
||||
self.num_cpus_per_worker = num_cpus_per_worker
|
||||
self.use_gpu = use_gpu
|
||||
@@ -659,12 +659,12 @@ class TorchTrainer:
|
||||
"Failed to shutdown gracefully, forcing a shutdown.")
|
||||
|
||||
for worker in self.remote_workers:
|
||||
logger.warning("Killing worker {}.".format(worker))
|
||||
logger.warning(f"Killing worker {worker}.")
|
||||
ray.kill(worker)
|
||||
else:
|
||||
self.local_worker.shutdown()
|
||||
for worker in self.remote_workers:
|
||||
logger.debug("Killing worker {}.".format(worker))
|
||||
logger.debug(f"Killing worker {worker}.")
|
||||
ray.kill(worker)
|
||||
|
||||
self.local_worker = DeactivatedRunner()
|
||||
@@ -674,7 +674,7 @@ class TorchTrainer:
|
||||
"""Terminates models without giving up local resource reservation."""
|
||||
self.local_worker.shutdown(cleanup=False)
|
||||
for worker in self.remote_workers:
|
||||
logger.debug("Killing worker {}.".format(worker))
|
||||
logger.debug(f"Killing worker {worker}.")
|
||||
ray.kill(worker)
|
||||
self.local_worker = DeactivatedRunner()
|
||||
self.remote_workers = []
|
||||
|
||||
@@ -72,23 +72,18 @@ class TrainingOperator:
|
||||
self._models = models # List of models
|
||||
assert isinstance(
|
||||
models,
|
||||
Iterable), ("Components need to be iterable. Got: {}".format(
|
||||
type(models)))
|
||||
Iterable), (f"Components need to be iterable. Got: {type(models)}")
|
||||
self._optimizers = optimizers # List of optimizers
|
||||
assert isinstance(
|
||||
optimizers,
|
||||
Iterable), ("Components need to be iterable. Got: {}".format(
|
||||
type(optimizers)))
|
||||
assert isinstance(optimizers, Iterable), (
|
||||
f"Components need to be iterable. Got: {type(optimizers)}")
|
||||
self._train_loader = train_loader
|
||||
self._validation_loader = validation_loader
|
||||
self._world_rank = world_rank
|
||||
self._criterion = criterion
|
||||
self._schedulers = schedulers
|
||||
if schedulers:
|
||||
assert isinstance(
|
||||
schedulers,
|
||||
Iterable), ("Components need to be iterable. Got: {}".format(
|
||||
type(schedulers)))
|
||||
assert isinstance(schedulers, Iterable), (
|
||||
f"Components need to be iterable. Got: {type(schedulers)}")
|
||||
self._config = config
|
||||
self._use_fp16 = use_fp16
|
||||
self._device_ids = device_ids
|
||||
@@ -165,10 +160,9 @@ class TrainingOperator:
|
||||
desc = ""
|
||||
if info is not None and "epoch_idx" in info:
|
||||
if "num_epochs" in info:
|
||||
desc = "{}/{}e".format(info["epoch_idx"] + 1,
|
||||
info["num_epochs"])
|
||||
desc = f"{info['epoch_idx'] + 1}/{info['num_epochs']}e"
|
||||
else:
|
||||
desc = "{}e".format(info["epoch_idx"] + 1)
|
||||
desc = f"{info['epoch_idx'] + 1}e"
|
||||
_progress_bar = tqdm(
|
||||
total=info[NUM_STEPS] or len(self.train_loader),
|
||||
desc=desc,
|
||||
|
||||
@@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
|
||||
def setup_address():
|
||||
ip = ray.services.get_node_ip_address()
|
||||
port = find_free_port()
|
||||
return "tcp://{ip}:{port}".format(ip=ip, port=port)
|
||||
return f"tcp://{ip}:{port}"
|
||||
|
||||
|
||||
def setup_process_group(url, world_rank, world_size, timeout, backend="gloo"):
|
||||
@@ -28,7 +28,7 @@ def setup_process_group(url, world_rank, world_size, timeout, backend="gloo"):
|
||||
"""
|
||||
logger.debug("Connecting to {} world_rank: {} world_size: {}".format(
|
||||
url, world_rank, world_size))
|
||||
logger.debug("using {}".format(backend))
|
||||
logger.debug(f"using {backend}")
|
||||
if backend == "nccl" and "NCCL_BLOCKING_WAIT" not in os.environ:
|
||||
logger.debug(
|
||||
"Setting NCCL_BLOCKING_WAIT for detecting node failure. "
|
||||
|
||||
@@ -142,9 +142,9 @@ class TimerCollection:
|
||||
for k, t in self._timers.items():
|
||||
if t.count > 0:
|
||||
if mean:
|
||||
aggregates["mean_%s_s" % k] = t.mean
|
||||
aggregates[f"mean_{k}_s"] = t.mean
|
||||
if last:
|
||||
aggregates["last_%s_s" % k] = t.last
|
||||
aggregates[f"last_{k}_s"] = t.last
|
||||
return aggregates
|
||||
|
||||
|
||||
|
||||
+2
-3
@@ -213,8 +213,7 @@ def decode(byte_str, allow_none=False):
|
||||
return ""
|
||||
|
||||
if not isinstance(byte_str, bytes):
|
||||
raise ValueError(
|
||||
"The argument {} must be a bytes object.".format(byte_str))
|
||||
raise ValueError(f"The argument {byte_str} must be a bytes object.")
|
||||
if sys.version_info >= (3, 0):
|
||||
return byte_str.decode("ascii")
|
||||
else:
|
||||
@@ -456,7 +455,7 @@ def create_and_init_new_worker_log(path, worker_pid):
|
||||
# This should always be the first message to appear in the worker's
|
||||
# stdout and stderr log files. The string "Ray worker pid:" is
|
||||
# parsed in the log monitor process.
|
||||
print("Ray worker pid: {}".format(worker_pid), file=f)
|
||||
print(f"Ray worker pid: {worker_pid}", file=f)
|
||||
return f
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user