[Tech Debt] Use f-string for python/ray/*.py (#10268)

* In progress.

* Done with critical path.

* Modified cluster_utils.py and log_monitor.py

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-08-23 22:01:31 -07:00
committed by GitHub
parent b61a79efd7
commit 1f54acd274
11 changed files with 89 additions and 90 deletions
+25 -26
View File
@@ -96,9 +96,8 @@ class ActorMethod:
def __call__(self, *args, **kwargs):
raise TypeError("Actor methods cannot be called directly. Instead "
"of running 'object.{}()', try "
"'object.{}.remote()'.".format(self._method_name,
self._method_name))
f"of running 'object.{self._method_name}()', try "
f"'object.{self._method_name}.remote()'.")
def remote(self, *args, **kwargs):
return self._remote(args, kwargs)
@@ -158,9 +157,9 @@ class ActorClassMethodMetadata(object):
def __init__(self):
class_name = type(self).__name__
raise TypeError("{} can not be constructed directly, "
"instead of running '{}()', try '{}.create()'".format(
class_name, class_name, class_name))
raise TypeError(f"{class_name} can not be constructed directly, "
f"instead of running '{class_name}()', "
f"try '{class_name}.create()'")
@classmethod
def reset_cache(cls):
@@ -288,13 +287,14 @@ class ActorClass:
"""
for base in bases:
if isinstance(base, ActorClass):
raise TypeError("Attempted to define subclass '{}' of actor "
"class '{}'. Inheriting from actor classes is "
"not currently supported. You can instead "
"inherit from a non-actor base class and make "
"the derived class an actor class (with "
"@ray.remote).".format(
name, base.__ray_metadata__.class_name))
raise TypeError(
f"Attempted to define subclass '{name}' of actor "
f"class '{base.__ray_metadata__.class_name}'. "
"Inheriting from actor classes is "
"not currently supported. You can instead "
"inherit from a non-actor base class and make "
"the derived class an actor class (with "
"@ray.remote).")
# This shouldn't be reached because one of the base classes must be
# an actor class if this was meant to be subclassed.
@@ -312,9 +312,8 @@ class ActorClass:
Exception: Always.
"""
raise TypeError("Actors cannot be instantiated directly. "
"Instead of '{}()', use '{}.remote()'.".format(
self.__ray_metadata__.class_name,
self.__ray_metadata__.class_name))
f"Instead of '{self.__ray_metadata__.class_name}()', "
f"use '{self.__ray_metadata__.class_name}.remote()'.")
@classmethod
def _ray_from_modified_class(cls, modified_class, class_id, max_restarts,
@@ -325,9 +324,9 @@ class ActorClass:
"_ray_from_function_descriptor"
]:
if hasattr(modified_class, attribute):
logger.warning("Creating an actor from class {} overwrites "
"attribute {} of that class".format(
modified_class.__name__, attribute))
logger.warning("Creating an actor from class "
f"{modified_class.__name__} overwrites "
f"attribute {attribute} of that class")
# Make sure the actor class we are constructing inherits from the
# original class so it retains all class properties.
@@ -500,9 +499,9 @@ class ActorClass:
pass
else:
raise ValueError(
"The name {name} is already taken. Please use "
f"The name {name} is already taken. Please use "
"a different name or get the existing actor using "
"ray.get_actor('{name}')".format(name=name))
f"ray.get_actor('{name}')")
detached = True
else:
detached = False
@@ -740,8 +739,8 @@ class ActorHandle:
def __getattr__(self, item):
if not self._ray_is_cross_language:
raise AttributeError("'{}' object has no attribute '{}'".format(
type(self).__name__, item))
raise AttributeError(f"'{type(self).__name__}' object has "
f"no attribute '{item}'")
if item in ["__ray_terminate__", "__ray_checkpoint__"]:
class FakeActorMethod(object):
@@ -771,9 +770,9 @@ class ActorHandle:
return self._ray_method_signatures.keys()
def __repr__(self):
return "Actor({}, {})".format(
self._ray_actor_creation_function_descriptor.class_name,
self._actor_id.hex())
return (f"Actor("
f"{self._ray_actor_creation_function_descriptor.class_name},"
f"{self._actor_id.hex()})")
@property
def _actor_id(self):
+2 -2
View File
@@ -203,8 +203,8 @@ class Cluster:
return
else:
logger.debug(
"{} nodes are currently registered, but we are expecting "
"{}".format(len(live_clients), expected))
f"{len(live_clients)} nodes are currently registered, "
f"but we are expecting {expected}")
time.sleep(0.1)
raise TimeoutError("Timed out while waiting for nodes to join.")
+1 -1
View File
@@ -52,7 +52,7 @@ def get_function_descriptor_for_actor_method(
"")
else:
raise NotImplementedError("Cross language remote actor method "
"not support language {}".format(language))
f"not support language {language}")
def java_function(class_name, function_name):
+8 -7
View File
@@ -100,9 +100,10 @@ class RayTaskError(RayError):
in_worker = False
for line in lines:
if line.startswith("Traceback "):
out.append("{}{}{} (pid={}, ip={})".format(
colorama.Fore.CYAN, self.proctitle, colorama.Fore.RESET,
self.pid, self.ip))
out.append(f"{colorama.Fore.CYAN}"
f"{self.proctitle}"
f"{colorama.Fore.RESET} "
f"(pid={self.pid}, ip={self.ip})")
elif in_worker:
in_worker = False
elif "ray/worker.py" in line or "ray/function_manager.py" in line:
@@ -178,13 +179,13 @@ class UnreconstructableError(RayError):
def __str__(self):
return (
"Object {} is lost (either LRU evicted or deleted by user) and "
f"Object {self.object_ref.hex()} is lost "
"(either LRU evicted or deleted by user) and "
"cannot be reconstructed. Try increasing the object store "
"memory available with ray.init(object_store_memory=<bytes>) "
"or setting object store limits with "
"ray.remote(object_store_memory=<bytes>). See also: {}".format(
self.object_ref.hex(),
"https://docs.ray.io/en/latest/memory-management.html"))
"ray.remote(object_store_memory=<bytes>). "
"See also: https://docs.ray.io/en/latest/memory-management.html")
class RayTimeoutError(RayError):
+10 -9
View File
@@ -195,9 +195,10 @@ class FunctionActorManager:
push_error_to_driver(
self._worker,
ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR,
"Failed to unpickle the remote function '{}' with "
"function ID {}. Traceback:\n{}".format(
function_name, function_id.hex(), traceback_str),
"Failed to unpickle the remote function "
f"'{function_name}' with "
f"function ID {function_id.hex()}. "
f"Traceback:\n{traceback_str}",
job_id=job_id)
else:
# The below line is necessary. Because in the driver process,
@@ -454,9 +455,9 @@ class FunctionActorManager:
pass
def temporary_actor_method(*args, **kwargs):
raise RuntimeError(
"The actor with name {} failed to be imported, "
"and so cannot execute this method.".format(actor_class_name))
raise RuntimeError(f"The actor with name {actor_class_name} "
"failed to be imported, "
"and so cannot execute this method.")
for method in actor_method_names:
setattr(TemporaryActor, method, temporary_actor_method)
@@ -505,9 +506,9 @@ class FunctionActorManager:
push_error_to_driver(
self._worker,
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
"Failed to unpickle actor class '{}' for actor ID {}. "
"Traceback:\n{}".format(
class_name, self._worker.actor_id.hex(), traceback_str),
f"Failed to unpickle actor class '{class_name}' "
f"for actor ID {self._worker.actor_id.hex()}. "
f"Traceback:\n{traceback_str}",
job_id=job_id)
# TODO(rkn): In the future, it might make sense to have the worker
# exit here. However, currently that would lead to hanging if
+14 -12
View File
@@ -106,8 +106,9 @@ class LogMonitor:
shutil.move(file_info.filename, target)
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(file_info.filename))
logger.warning(
f"Warning: The file {file_info.filename} "
"was not found.")
else:
raise e
else:
@@ -169,8 +170,8 @@ class LogMonitor:
except (IOError, OSError) as e:
# Catch "file not found" errors.
if e.errno == errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(file_info.filename))
logger.warning(f"Warning: The file {file_info.filename} "
"was not found.")
self.log_filenames.remove(file_info.filename)
continue
raise e
@@ -182,8 +183,9 @@ class LogMonitor:
f = open(file_info.filename, "rb")
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
logger.warning("Warning: The file {} was not "
"found.".format(file_info.filename))
logger.warning(
f"Warning: The file {file_info.filename} "
"was not found.")
self.log_filenames.remove(file_info.filename)
continue
else:
@@ -224,10 +226,10 @@ class LogMonitor:
next_line = next_line[:-1]
lines_to_publish.append(next_line)
except Exception:
logger.error("Error: Reading file: {}, position: {} "
"failed.".format(
file_info.full_path,
file_info.file_info.file_handle.tell()))
logger.error(
f"Error: Reading file: {file_info.full_path}, "
f"position: {file_info.file_info.file_handle.tell()} "
"failed.")
raise
if file_info.file_position == 0:
@@ -320,8 +322,8 @@ if __name__ == "__main__":
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The log monitor on node {} failed with the following "
"error:\n{}".format(platform.node(), traceback_str))
message = (f"The log monitor on node {platform.node()} "
f"failed with the following error:\n{traceback_str}")
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.LOG_MONITOR_DIED_ERROR, message)
raise e
+2 -2
View File
@@ -101,8 +101,8 @@ class RemoteFunction:
def __call__(self, *args, **kwargs):
raise TypeError("Remote functions cannot be called directly. Instead "
"of running '{}()', try '{}.remote()'.".format(
self._function_name, self._function_name))
f"of running '{self._function_name}()', "
f"try '{self._function_name}.remote()'.")
def _submit(self,
args=None,
+1 -1
View File
@@ -437,7 +437,7 @@ class SerializationContext:
except Exception:
raise ValueError(
"Failed to use pickle in generating a unique id"
"for '{}'. Provide a unique class_id.".format(cls))
f"for '{cls}'. Provide a unique class_id.")
else:
# In this case, the class ID only needs to be meaningful on
# this worker and not across workers.
+1 -2
View File
@@ -73,8 +73,7 @@ def extract_signature(func, ignore_first=False):
if ignore_first:
if len(signature_parameters) == 0:
raise ValueError("Methods must take a 'self' argument, but the "
"method '{}' does not have one.".format(
func.__name__))
f"method '{func.__name__}' does not have one.")
signature_parameters = signature_parameters[1:]
return signature_parameters
+4 -3
View File
@@ -116,9 +116,10 @@ class GlobalState:
# Check to see if we timed out.
if time.time() - start_time >= timeout:
raise TimeoutError("Timed out while attempting to initialize the "
"global state. num_redis_shards = {}, "
"redis_shard_addresses = {}".format(
num_redis_shards, redis_shard_addresses))
"global state. "
f"num_redis_shards = {num_redis_shards}, "
"redis_shard_addresses = "
f"{redis_shard_addresses}")
# Get the rest of the information.
self.redis_clients = []
+21 -25
View File
@@ -296,8 +296,8 @@ class Worker:
for object_ref in object_refs:
if not isinstance(object_ref, ObjectRef):
raise TypeError(
"Attempting to call `get` on the value {}, "
"which is not an ray.ObjectRef.".format(object_ref))
f"Attempting to call `get` on the value {object_ref}, "
"which is not an ray.ObjectRef.")
timeout_ms = int(timeout * 1000) if timeout else -1
data_metadata_pairs = self.core_worker.get_objects(
@@ -465,13 +465,12 @@ def print_failed_task(task_status):
task_status (Dict): A dictionary containing the name, operationid, and
error message for a failed task.
"""
logger.error("""
logger.error(f"""
Error: Task failed
Function Name: {}
Task ID: {}
Error Message: \n{}
""".format(task_status["function_name"], task_status["operationid"],
task_status["error_message"]))
Function Name: {task_status["function_name"]}
Task ID: {task_status["operationid"]}
Error Message: \n{task_status["error_message"]}
""")
def init(address=None,
@@ -1062,7 +1061,7 @@ def print_logs(redis_client, threads_stopped, job_id):
file=print_file)
except (OSError, redis.exceptions.ConnectionError) as e:
logger.error("print_logs: {}".format(e))
logger.error(f"print_logs: {e}")
finally:
# Close the pubsub client to avoid leaking file descriptors.
pubsub_client.close()
@@ -1097,10 +1096,9 @@ def print_error_messages_raylet(task_error_queue, threads_stopped):
if threads_stopped.is_set():
break
if t < last_task_error_raise_time + UNCAUGHT_ERROR_GRACE_PERIOD:
logger.debug("Suppressing error from worker: {}".format(error))
logger.debug(f"Suppressing error from worker: {error}")
else:
logger.error(
"Possible unhandled error from worker: {}".format(error))
logger.error(f"Possible unhandled error from worker: {error}")
def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
@@ -1156,7 +1154,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
else:
logger.warning(error_message)
except (OSError, redis.exceptions.ConnectionError) as e:
logger.error("listen_error_messages_raylet: {}".format(e))
logger.error(f"listen_error_messages_raylet: {e}")
finally:
# Close the pubsub client to avoid leaking file descriptors.
worker.error_message_pubsub_client.close()
@@ -1314,7 +1312,7 @@ def connect(node,
if driver_object_store_memory is not None:
worker.core_worker.set_object_store_client_options(
"ray_driver_{}".format(os.getpid()), driver_object_store_memory)
f"ray_driver_{os.getpid()}", driver_object_store_memory)
# Start the import thread
worker.import_thread = import_thread.ImportThread(worker, mode,
@@ -1480,8 +1478,8 @@ def show_in_webui(message, key="", dtype="text"):
worker.check_connected()
acceptable_dtypes = {"text", "html"}
assert dtype in acceptable_dtypes, "dtype accepts only: {}".format(
acceptable_dtypes)
assert dtype in acceptable_dtypes, (
f"dtype accepts only: {acceptable_dtypes}")
message_wrapped = {"message": message, "dtype": dtype}
message_encoded = json.dumps(message_wrapped).encode()
@@ -1649,18 +1647,17 @@ def wait(object_refs, num_returns=1, timeout=None):
"ray.ObjectRef")
if not isinstance(object_refs, list):
raise TypeError(
"wait() expected a list of ray.ObjectRef, got {}".format(
type(object_refs)))
raise TypeError("wait() expected a list of ray.ObjectRef, "
f"got {type(object_refs)}")
if timeout is not None and timeout < 0:
raise ValueError("The 'timeout' argument must be nonnegative. "
"Received {}".format(timeout))
f"Received {timeout}")
for object_ref in object_refs:
if not isinstance(object_ref, ObjectRef):
raise TypeError("wait() expected a list of ray.ObjectRef, "
"got list containing {}".format(type(object_ref)))
f"got list containing {type(object_ref)}")
worker.check_connected()
# TODO(swang): Check main thread.
@@ -1727,7 +1724,7 @@ def kill(actor, no_restart=True):
"""
if not isinstance(actor, ray.actor.ActorHandle):
raise ValueError("ray.kill() only supported for actors. "
"Got: {}.".format(type(actor)))
f"Got: {type(actor)}.")
worker = ray.worker.global_worker
worker.check_connected()
worker.core_worker.kill_actor(actor._ray_actor_id, no_restart)
@@ -1761,7 +1758,7 @@ def cancel(object_ref, force=False):
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
"ray.cancel() only supported for non-actor object refs. "
"Got: {}.".format(type(object_ref)))
f"Got: {type(object_ref)}.")
return worker.core_worker.cancel_task(object_ref, force)
@@ -1953,8 +1950,7 @@ def remote(*args, **kwargs):
resources = kwargs.get("resources")
if not isinstance(resources, dict) and resources is not None:
raise TypeError("The 'resources' keyword argument must be a "
"dictionary, but received type {}.".format(
type(resources)))
f"dictionary, but received type {type(resources)}.")
if resources is not None:
assert "CPU" not in resources, "Use the 'num_cpus' argument."
assert "GPU" not in resources, "Use the 'num_gpus' argument."