diff --git a/python/ray/node.py b/python/ray/node.py index 878fc0f98..ed4944898 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -55,8 +55,9 @@ class Node(object): connect_only (bool): If true, connect to the node without starting new processes. """ - assert not (shutdown_at_exit and connect_only), ( - "'shutdown_at_exit' and 'connect_only' cannot be both true.") + if shutdown_at_exit and connect_only: + raise ValueError("'shutdown_at_exit' and 'connect_only' cannot " + "be both true.") self.all_processes = {} ray_params.update_if_absent( diff --git a/python/ray/services.py b/python/ray/services.py index ea88daa86..27ae9609b 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -501,9 +501,10 @@ def start_redis(node_ip_address, started. """ - assert len(redirect_files) == 1 + num_redis_shards, ( - "The number of redirect file pairs should be equal to the number of" - "redis shards (including the primary shard) we will start.") + if len(redirect_files) != 1 + num_redis_shards: + raise ValueError("The number of redirect file pairs should be equal " + "to the number of redis shards (including the " + "primary shard) we will start.") if redis_shard_ports is None: redis_shard_ports = num_redis_shards * [None] elif len(redis_shard_ports) != num_redis_shards: diff --git a/python/ray/worker.py b/python/ray/worker.py index 05764444d..1cd99ee57 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1872,18 +1872,18 @@ def connect(info, if (not worker.redis_client.exists("webui") and info["webui_url"] is not None): worker.redis_client.hmset("webui", {"url": info["webui_url"]}) - is_worker = False elif mode == WORKER_MODE: + # Register the worker with Redis. + worker_dict = { + "node_ip_address": worker.node_ip_address, + "plasma_store_socket": info["store_socket_name"], + } # Check the RedirectOutput key in Redis and based on its value redirect # worker output and error to their own files. # This key is set in services.py when Redis is started. redirect_worker_output_val = worker.redis_client.get("RedirectOutput") if (redirect_worker_output_val is not None and int(redirect_worker_output_val) == 1): - redirect_worker_output = 1 - else: - redirect_worker_output = 0 - if redirect_worker_output: log_stdout_file, log_stderr_file = ( _global_node.new_worker_redirected_log_file(worker.worker_id)) # Redirect stdout/stderr at the file descriptor level. If we simply @@ -1899,16 +1899,9 @@ def connect(info, sys.stdout.flush() sys.stderr.flush() - # Register the worker with Redis. - worker_dict = { - "node_ip_address": worker.node_ip_address, - "plasma_store_socket": info["store_socket_name"], - } - if redirect_worker_output: worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name) worker_dict["stderr_file"] = os.path.abspath(log_stderr_file.name) worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict) - is_worker = True else: raise Exception("This code should be unreachable.") @@ -1916,8 +1909,6 @@ def connect(info, worker.plasma_client = thread_safe_client( plasma.connect(info["store_socket_name"], None, 0, 300)) - raylet_socket = info["raylet_socket_name"] - # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. if mode == SCRIPT_MODE: @@ -1974,9 +1965,9 @@ def connect(info, worker.task_context.current_task_id = driver_task.task_id() worker.raylet_client = ray._raylet.RayletClient( - raylet_socket, + info["raylet_socket_name"], ClientID(worker.worker_id), - is_worker, + (mode == WORKER_MODE), DriverID(worker.current_task_id.binary()), )