diff --git a/python/ray/node.py b/python/ray/node.py index 5d2483a64..23e5623a2 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -4,6 +4,7 @@ import datetime import errno import os import logging +import random import signal import socket import subprocess @@ -24,6 +25,7 @@ from ray.utils import try_to_create_directory, try_to_symlink logger = logging.getLogger(__name__) SESSION_LATEST = "session_latest" +NUMBER_OF_PORT_RETRIES = 40 class Node: @@ -167,7 +169,8 @@ class Node: # NOTE: There is a possible but unlikely race condition where # the port is bound by another process between now and when the # raylet starts. - self._ray_params.node_manager_port = self._get_unused_port() + self._ray_params.node_manager_port, self._socket = \ + self._get_unused_port(close_on_exit=False) if not connect_only and spawn_reaper and not self.kernel_fate_share: self.start_reaper_process() @@ -300,6 +303,14 @@ class Node: """Get the node manager's port.""" return self._ray_params.node_manager_port + @property + def socket(self): + """Get the socket reserving the node manager's port""" + try: + return self._socket + except AttributeError: + return None + @property def address_info(self): """Get a dictionary of addresses.""" @@ -395,12 +406,30 @@ class Node: log_stderr_file = open(log_stderr, "a", buffering=1) return log_stdout_file, log_stderr_file - def _get_unused_port(self): + def _get_unused_port(self, close_on_exit=True): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("", 0)) port = s.getsockname()[1] - s.close() - return port + + # Try to generate a port that is far above the 'next available' one. + # This solves issue #8254 where GRPC fails because the port assigned + # from this method has been used by a different process. + for _ in range(NUMBER_OF_PORT_RETRIES): + new_port = random.randint(port, 65535) + new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + new_s.bind(("", new_port)) + except OSError: + new_s.close() + continue + s.close() + if close_on_exit: + new_s.close() + return new_port, new_s + logger.error("Unable to succeed in selecting a random port.") + if close_on_exit: + s.close() + return port, s def _prepare_socket_file(self, socket_path, default_prefix): """Prepare the socket file for raylet and plasma. @@ -417,7 +446,7 @@ class Node: if sys.platform == "win32": if socket_path is None: result = "tcp://{}:{}".format(self._localhost, - self._get_unused_port()) + self._get_unused_port()[0]) else: if socket_path is None: result = self._make_inc_temp( @@ -598,7 +627,8 @@ class Node: include_java=self._ray_params.include_java, java_worker_options=self._ray_params.java_worker_options, load_code_from_local=self._ray_params.load_code_from_local, - fate_share=self.kernel_fate_share) + fate_share=self.kernel_fate_share, + socket_to_use=self.socket) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/services.py b/python/ray/services.py index db1aa42fb..05bf4988c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1230,7 +1230,8 @@ def start_raylet(redis_address, include_java=False, java_worker_options=None, load_code_from_local=False, - fate_share=None): + fate_share=None, + socket_to_use=None): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1361,6 +1362,8 @@ def start_raylet(redis_address, "--temp_dir={}".format(temp_dir), "--session_dir={}".format(session_dir), ] + if socket_to_use: + socket_to_use.close() process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_RAYLET,