From 6ad2b5d87aabe12b7fb55d9afa2ca97c5fd24d39 Mon Sep 17 00:00:00 2001 From: Johann Schleier-Smith Date: Tue, 31 Jan 2017 00:28:00 -0800 Subject: [PATCH] Add Redis port option to startup script (#232) * specify redis address when starting head * cleanup * update starting cluster documentation * Whitespace. * Address Philipp's comments. * Change redis_host -> redis_ip_address. --- doc/using-ray-on-a-cluster.md | 13 +++-- python/plasma/test/test.py | 4 +- python/ray/services.py | 82 ++++++++++++++++++---------- python/ray/worker.py | 16 +++--- python/ray/workers/default_worker.py | 4 +- scripts/start_ray.py | 30 +++++++--- webui/backend/ray_ui.py | 4 +- 7 files changed, 96 insertions(+), 57 deletions(-) diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index 24705f71e..1baed584c 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -11,18 +11,21 @@ Ubuntu](install-on-ubuntu.md). ### Starting Ray on each machine. -On the head node (just choose some node to be the head node), run the following. +On the head node (just choose some node to be the head node), run the following, +replacing `` with a port of your choice, e.g., `6379`. ``` -./ray/scripts/start_ray.sh --head +./ray/scripts/start_ray.sh --head --redis-port ``` -This will print out the address of the Redis server that was started (and some -other address information). +The `--redis-port` arugment is optional, and if not provided Ray starts Redis +on a port selected at random. +In either case, the command will print out the address of the Redis server +that was started (and some other address information). Then on all of the other nodes, run the following. Make sure to replace `` with the value printed by the command on the head node (it -should look something like `123.45.67.89:12345`). +should look something like `123.45.67.89:6379`). ``` ./ray/scripts/start_ray.sh --redis-address diff --git a/python/plasma/test/test.py b/python/plasma/test/test.py index 96354f125..ba1c6db4e 100644 --- a/python/plasma/test/test.py +++ b/python/plasma/test/test.py @@ -493,7 +493,7 @@ class TestPlasmaManager(unittest.TestCase): store_name1, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) store_name2, self.p3 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - redis_address = services.start_redis("127.0.0.1") + redis_address = services.address("127.0.0.1", services.start_redis()) # Start two PlasmaManagers. manager_name1, self.p4, self.port1 = plasma.start_plasma_manager(store_name1, redis_address, use_valgrind=USE_VALGRIND) manager_name2, self.p5, self.port2 = plasma.start_plasma_manager(store_name2, redis_address, use_valgrind=USE_VALGRIND) @@ -789,7 +789,7 @@ class TestPlasmaManagerRecovery(unittest.TestCase): # Start a Plasma store. self.store_name, self.p2 = plasma.start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - self.redis_address = services.start_redis("127.0.0.1") + self.redis_address = services.address("127.0.0.1", services.start_redis()) # Start a PlasmaManagers. manager_name, self.p3, self.port1 = plasma.start_plasma_manager( self.store_name, diff --git a/python/ray/services.py b/python/ray/services.py index c7a7d216d..4fc524f1e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -52,8 +52,8 @@ ObjectStoreAddress = namedtuple("ObjectStoreAddress", ["name", "manager_name", "manager_port"]) -def address(host, port): - return host + ":" + str(port) +def address(ip_address, port): + return ip_address + ":" + str(port) def get_port(address): try: @@ -101,7 +101,7 @@ def cleanup(): """When running in local mode, shutdown the Ray processes. This method is used to shutdown processes that were started with - services.start_ray_local(). It kills all scheduler, object store, and worker + services.start_ray_head(). It kills all scheduler, object store, and worker processes that were started by this services module. Driver processes are started and disconnected by worker.py. """ @@ -140,19 +140,19 @@ def get_node_ip_address(address="8.8.8.8:53"): Returns: The IP address of the current node. """ - host, port = address.split(":") + ip_address, port = address.split(":") s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect((host, int(port))) + s.connect((ip_address, int(port))) return s.getsockname()[0] -def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): +def wait_for_redis_to_start(redis_ip_address, redis_port, num_retries=5): """Wait for a Redis server to be available. This is accomplished by creating a Redis client and sending a random command to the server until the command gets through. Args: - redis_host (str): The IP address of the redis server. + redis_ip_address (str): The IP address of the redis server. redis_port (int): The port of the redis server. num_retries (int): The number of times to try connecting with redis. The client will sleep for one second between attempts. @@ -160,13 +160,13 @@ def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): Raises: Exception: An exception is raised if we could not connect with Redis. """ - redis_client = redis.StrictRedis(host=redis_host, port=redis_port) + redis_client = redis.StrictRedis(host=redis_ip_address, port=redis_port) # Wait for the Redis server to start. counter = 0 while counter < num_retries: try: # Run some random command and see if it worked. - print("Waiting for redis server at {}:{} to respond...".format(redis_host, redis_port)) + print("Waiting for redis server at {}:{} to respond...".format(redis_ip_address, redis_port)) redis_client.client_list() except redis.ConnectionError as e: # Wait a little bit. @@ -178,10 +178,11 @@ def wait_for_redis_to_start(redis_host, redis_port, num_retries=5): if counter == num_retries: raise Exception("Unable to connect to Redis. If the Redis instance is on a different machine, check that your firewall is configured properly.") -def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=False): +def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): """Start a Redis server. Args: + port (int): If provided, start a Redis server with this port. num_retries (int): The number of times to attempt to start Redis. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process @@ -190,7 +191,8 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F /dev/null. Returns: - The address used by Redis. + The port used by Redis. If a port is passed in, then the same value is + returned. Raises: Exception: An exception is raised if Redis could not be started. @@ -200,10 +202,14 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F assert os.path.isfile(redis_filepath) assert os.path.isfile(redis_module) counter = 0 + if port is not None: + if num_retries != 1: + raise Exception("Num retries must be 1 if port is specified") + else: + port = new_port() while counter < num_retries: if counter > 0: print("Redis failed to start, retrying now.") - port = new_port() with open(os.devnull, "w") as FNULL: stdout = FNULL if redirect_output else None stderr = FNULL if redirect_output else None @@ -215,6 +221,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F if cleanup: all_processes[PROCESS_TYPE_REDIS_SERVER].append(p) break + port = new_port() counter += 1 if counter == num_retries: raise Exception("Couldn't start Redis.") @@ -229,8 +236,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F # Configure Redis to not run in protected mode so that processes on other # hosts can connect to it. TODO(rkn): Do this in a more secure way. redis_client.config_set("protected-mode", "no") - redis_address = address(node_ip_address, port) - return redis_address + return port def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): """Start a global scheduler process. @@ -370,7 +376,8 @@ def start_ray_processes(address_info=None, worker_path=None, cleanup=True, redirect_output=False, - include_global_scheduler=False): + include_global_scheduler=False, + include_redis=False): """Helper method to start Ray processes. Args: @@ -393,6 +400,8 @@ def start_ray_processes(address_info=None, /dev/null. include_global_scheduler (bool): If include_global_scheduler is True, then start a global scheduler process. + include_redis (bool): If include_redis is True, then start a Redis server + process. Returns: A dictionary of the address information for the processes that were @@ -410,12 +419,26 @@ def start_ray_processes(address_info=None, # warning messages when it starts up. Instead of suppressing the output, we # should address the warnings. redis_address = address_info.get("redis_address") - if redis_address is None: - redis_address = start_redis(node_ip_address, cleanup=cleanup, - redirect_output=redirect_output) - address_info["redis_address"] = redis_address - time.sleep(0.1) - redis_port = get_port(redis_address) + if include_redis: + if redis_address is None: + # Start a Redis server. The start_redis method will choose a random port. + redis_port = start_redis(cleanup=cleanup, redirect_output=redirect_output) + redis_address = address(node_ip_address, redis_port) + address_info["redis_address"] = redis_address + time.sleep(0.1) + else: + # A Redis address was provided, so start a Redis server with the given + # port. TODO(rkn): We should check that the IP address corresponds to the + # machine that this method is running on. + redis_ip_address, redis_port = redis_address.split(":") + new_redis_port = start_redis(port=int(redis_port), + num_retries=1, + cleanup=cleanup, + redirect_output=redirect_output) + assert redis_port == new_redis_port + else: + if redis_address is None: + raise Exception("Redis address expected") # Start the global scheduler, if necessary. if include_global_scheduler: @@ -519,13 +542,13 @@ def start_ray_node(node_ip_address, cleanup=cleanup, redirect_output=redirect_output) -def start_ray_local(address_info=None, - node_ip_address="127.0.0.1", - num_workers=0, - num_local_schedulers=1, - worker_path=None, - cleanup=True, - redirect_output=False): +def start_ray_head(address_info=None, + node_ip_address="127.0.0.1", + num_workers=0, + num_local_schedulers=1, + worker_path=None, + cleanup=True, + redirect_output=False): """Start Ray in local mode. Args: @@ -558,4 +581,5 @@ def start_ray_local(address_info=None, worker_path=worker_path, cleanup=cleanup, redirect_output=redirect_output, - include_global_scheduler=True) + include_global_scheduler=True, + include_redis=True) diff --git a/python/ray/worker.py b/python/ray/worker.py index 98fd369da..3d7ee5b03 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -649,10 +649,10 @@ def initialize_numbuf(worker=global_worker): register_class(RayGetArgumentError) def get_address_info_from_redis_helper(redis_address, node_ip_address): - redis_host, redis_port = redis_address.split(":") + redis_ip_address, redis_port = redis_address.split(":") # For this command to work, some other client (on the same machine as Redis) # must have run "CONFIG SET protected-mode no". - redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) # The client table prefix must be kept in sync with the file # "src/common/redis_module/ray_redis_module.c" where it is defined. REDIS_CLIENT_TABLE_PREFIX = "CL:" @@ -781,10 +781,10 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, num_local_schedulers = 1 # Start the scheduler, object store, and some workers. These will be killed # by the call to cleanup(), which happens when the Python script exits. - address_info = services.start_ray_local(address_info=address_info, - node_ip_address=node_ip_address, - num_workers=num_workers, - num_local_schedulers=num_local_schedulers) + address_info = services.start_ray_head(address_info=address_info, + node_ip_address=node_ip_address, + num_workers=num_workers, + num_local_schedulers=num_local_schedulers) else: if redis_address is None: raise Exception("If start_ray_local=False, then redis_address must be provided.") @@ -1075,8 +1075,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): worker.node_ip_address = info["node_ip_address"] worker.redis_address = info["redis_address"] # Create a Redis client. - redis_host, redis_port = info["redis_address"].split(":") - worker.redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + redis_ip_address, redis_port = info["redis_address"].split(":") + worker.redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) worker.lock = threading.Lock() # Create an object store client. worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index aa8a5a1b6..baf5febe7 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -48,10 +48,10 @@ being caught in "lib/python/ray/workers/default_worker.py". # We use a driver ID of all zeros to push an error message to all drivers. driver_id = DRIVER_ID_LENGTH * b"\x00" error_key = b"Error:" + driver_id + b":" + random_string() - redis_host, redis_port = args.redis_address.split(":") + redis_ip_address, redis_port = args.redis_address.split(":") # For this command to work, some other client (on the same machine as # Redis) must have run "CONFIG SET protected-mode no". - redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) redis_client.hmset(error_key, {"type": "worker_crash", "message": traceback_str, "note": "This error is unexpected and should not have happened."}) diff --git a/scripts/start_ray.py b/scripts/start_ray.py index 823d2fe68..eabc16e02 100644 --- a/scripts/start_ray.py +++ b/scripts/start_ray.py @@ -9,13 +9,14 @@ import ray.services as services parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") parser.add_argument("--node-ip-address", required=False, type=str, help="the IP address of the worker's node") -parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis") +parser.add_argument("--redis-address", required=False, type=str, help="the address to use for connecting to Redis") +parser.add_argument("--redis-port", required=False, type=str, help="the port to use for starting Redis") parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node") parser.add_argument("--head", action="store_true", help="provide this argument for the head node") def check_no_existing_redis_clients(node_ip_address, redis_address): - redis_host, redis_port = redis_address.split(":") - redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) + redis_ip_address, redis_port = redis_address.split(":") + redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) # The client table prefix must be kept in sync with the file # "src/common/redis_module/ray_redis_module.c" where it is defined. REDIS_CLIENT_TABLE_PREFIX = "CL:" @@ -41,16 +42,25 @@ if __name__ == "__main__": # Start Ray on the head node. if args.redis_address is not None: raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.") + # Get the node IP address if one is not provided. if args.node_ip_address is None: node_ip_address = services.get_node_ip_address() else: node_ip_address = args.node_ip_address print("Using IP address {} for this node.".format(node_ip_address)) - address_info = services.start_ray_local(node_ip_address=node_ip_address, - num_workers=args.num_workers, - cleanup=False, - redirect_output=True) + + if args.redis_port is not None: + address_info = {"redis_address": "{}:{}".format(node_ip_address, + args.redis_port)} + else: + address_info = None + + address_info = services.start_ray_head(address_info=address_info, + node_ip_address=node_ip_address, + num_workers=args.num_workers, + cleanup=False, + redirect_output=True) print(address_info) print("\nStarted Ray with {} workers on this node. A different number of " "workers can be set with the --num-workers flag (but you have to " @@ -69,12 +79,14 @@ if __name__ == "__main__": address_info["redis_address"])) else: # Start Ray on a non-head node. + if args.redis_port is not None: + raise Exception("If --head is not passed in, --redis-port is not allowed") if args.redis_address is None: raise Exception("If --head is not passed in, --redis-address must be provided.") - redis_host, redis_port = args.redis_address.split(":") + redis_ip_address, redis_port = args.redis_address.split(":") # Wait for the Redis server to be started. And throw an exception if we # can't connect to it. - services.wait_for_redis_to_start(redis_host, int(redis_port)) + services.wait_for_redis_to_start(redis_ip_address, int(redis_port)) # Get the node IP address if one is not provided. if args.node_ip_address is None: node_ip_address = services.get_node_ip_address(args.redis_address) diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 89a090e8f..93b6cbaab 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -33,7 +33,7 @@ def key_to_hex_identifiers(key): async def hello(websocket, path): - conn = await aioredis.create_connection((redis_host, redis_port), loop=loop) + conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) while True: command = json.loads(await websocket.recv()) @@ -107,7 +107,7 @@ async def hello(websocket, path): if __name__ == "__main__": args = parser.parse_args() redis_address = args.redis_address.split(":") - redis_host, redis_port = redis_address[0], int(redis_address[1]) + redis_ip_address, redis_port = redis_address[0], int(redis_address[1]) start_server = websockets.serve(hello, "localhost", args.port)