[Core] Randomize and 'Reserve' Port Generated for Node Manager (#8628)

This commit is contained in:
Ian Rodney
2020-06-03 12:19:03 -07:00
committed by GitHub
parent 1534568272
commit 7a2c9524d1
2 changed files with 40 additions and 7 deletions
+36 -6
View File
@@ -4,6 +4,7 @@ import datetime
import errno
import os
import logging
import random
import signal
import socket
import subprocess
@@ -24,6 +25,7 @@ from ray.utils import try_to_create_directory, try_to_symlink
logger = logging.getLogger(__name__)
SESSION_LATEST = "session_latest"
NUMBER_OF_PORT_RETRIES = 40
class Node:
@@ -167,7 +169,8 @@ class Node:
# NOTE: There is a possible but unlikely race condition where
# the port is bound by another process between now and when the
# raylet starts.
self._ray_params.node_manager_port = self._get_unused_port()
self._ray_params.node_manager_port, self._socket = \
self._get_unused_port(close_on_exit=False)
if not connect_only and spawn_reaper and not self.kernel_fate_share:
self.start_reaper_process()
@@ -300,6 +303,14 @@ class Node:
"""Get the node manager's port."""
return self._ray_params.node_manager_port
@property
def socket(self):
"""Get the socket reserving the node manager's port"""
try:
return self._socket
except AttributeError:
return None
@property
def address_info(self):
"""Get a dictionary of addresses."""
@@ -395,12 +406,30 @@ class Node:
log_stderr_file = open(log_stderr, "a", buffering=1)
return log_stdout_file, log_stderr_file
def _get_unused_port(self):
def _get_unused_port(self, close_on_exit=True):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
port = s.getsockname()[1]
s.close()
return port
# Try to generate a port that is far above the 'next available' one.
# This solves issue #8254 where GRPC fails because the port assigned
# from this method has been used by a different process.
for _ in range(NUMBER_OF_PORT_RETRIES):
new_port = random.randint(port, 65535)
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
new_s.bind(("", new_port))
except OSError:
new_s.close()
continue
s.close()
if close_on_exit:
new_s.close()
return new_port, new_s
logger.error("Unable to succeed in selecting a random port.")
if close_on_exit:
s.close()
return port, s
def _prepare_socket_file(self, socket_path, default_prefix):
"""Prepare the socket file for raylet and plasma.
@@ -417,7 +446,7 @@ class Node:
if sys.platform == "win32":
if socket_path is None:
result = "tcp://{}:{}".format(self._localhost,
self._get_unused_port())
self._get_unused_port()[0])
else:
if socket_path is None:
result = self._make_inc_temp(
@@ -598,7 +627,8 @@ class Node:
include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
fate_share=self.kernel_fate_share)
fate_share=self.kernel_fate_share,
socket_to_use=self.socket)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
+4 -1
View File
@@ -1230,7 +1230,8 @@ def start_raylet(redis_address,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
fate_share=None):
fate_share=None,
socket_to_use=None):
"""Start a raylet, which is a combined local scheduler and object manager.
Args:
@@ -1361,6 +1362,8 @@ def start_raylet(redis_address,
"--temp_dir={}".format(temp_dir),
"--session_dir={}".format(session_dir),
]
if socket_to_use:
socket_to_use.close()
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_RAYLET,