diff --git a/python/ray/services.py b/python/ray/services.py index 8549879fc..c5bf15d8c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1227,6 +1227,8 @@ def start_raylet_monitor(redis_address, gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port] + if redis_password: + command += [redis_password] p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: all_processes[PROCESS_TYPE_MONITOR].append(p) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index c4cf2b801..0d91f499e 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -37,7 +37,10 @@ class Cluster(object): head_node_args = head_node_args or {} self.add_node(**head_node_args) if connect: - ray.init(redis_address=self.redis_address) + redis_password = head_node_args.get("redis_password") + ray.init( + redis_address=self.redis_address, + redis_password=redis_password) def add_node(self, **override_kwargs): """Adds a node to the local Ray Cluster. diff --git a/python/ray/test/test_ray_init.py b/python/ray/test/test_ray_init.py index 3b2beaba1..2b17ce35e 100644 --- a/python/ray/test/test_ray_init.py +++ b/python/ray/test/test_ray_init.py @@ -7,6 +7,7 @@ import pytest import redis import ray +from ray.test.cluster_utils import Cluster @pytest.fixture @@ -29,7 +30,6 @@ class TestRedisPassword(object): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't support Redis authentication yet.") def test_redis_password(self, password, shutdown_only): - # Workaround for https://github.com/ray-project/ray/issues/3045 @ray.remote def f(): return 1 @@ -52,3 +52,19 @@ class TestRedisPassword(object): redis_client = redis.StrictRedis( host=redis_ip, port=redis_port, password=password) assert redis_client.ping() + + @pytest.mark.skipif( + os.environ.get("RAY_USE_NEW_GCS") == "on", + reason="New GCS API doesn't support Redis authentication yet.") + def test_redis_password_cluster(self, password, shutdown_only): + @ray.remote + def f(): + return 1 + + node_args = {"redis_password": password} + cluster = Cluster( + initialize_head=True, connect=True, head_node_args=node_args) + cluster.add_node(**node_args) + + object_id = f.remote() + ray.get(object_id) diff --git a/python/ray/worker.py b/python/ray/worker.py index 807979e35..53c3900c7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1918,8 +1918,9 @@ def connect(info, sys.stdout = log_stdout_file sys.stderr = log_stderr_file services.record_log_files_in_redis( - info["redis_address"], info["node_ip_address"], - [log_stdout_file, log_stderr_file]) + info["redis_address"], + info["node_ip_address"], [log_stdout_file, log_stderr_file], + password=redis_password) # Create an object for interfacing with the global state. global_state._initialize_global_state(