Fix tempfile issues (#4605)

This commit is contained in:
Si-Yuan
2019-05-06 07:06:15 +08:00
committed by Robert Nishihara
parent dca1c25d88
commit bd00735fe8
5 changed files with 131 additions and 68 deletions
+43 -15
View File
@@ -62,6 +62,7 @@ class Node(object):
if shutdown_at_exit and connect_only:
raise ValueError("'shutdown_at_exit' and 'connect_only' cannot "
"be both true.")
self.head = head
self.all_processes = {}
# Try to get node IP address with the parameters.
@@ -78,6 +79,7 @@ class Node(object):
include_log_monitor=True,
resources={},
include_webui=False,
temp_dir="/tmp/ray",
worker_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"workers/default_worker.py"))
@@ -87,7 +89,19 @@ class Node(object):
self._config = (json.loads(ray_params._internal_config)
if ray_params._internal_config else None)
self._init_temp()
if head:
redis_client = None
# date including microsecond
date_str = datetime.datetime.today().strftime(
"%Y-%m-%d_%H-%M-%S_%f")
self.session_name = "session_{date_str}_{pid}".format(
pid=os.getpid(), date_str=date_str)
else:
redis_client = self.create_redis_client()
self.session_name = ray.utils.decode(
redis_client.get("session_name"))
self._init_temp(redis_client)
if connect_only:
# Get socket names from the configuration.
@@ -119,7 +133,6 @@ class Node(object):
ray_params.update_if_absent(num_redis_shards=1, include_webui=True)
self._webui_url = None
else:
redis_client = self.create_redis_client()
self._webui_url = (
ray.services.get_webui_url_from_redis(redis_client))
ray_params.include_java = (
@@ -128,6 +141,10 @@ class Node(object):
# Start processes.
if head:
self.start_head_processes()
redis_client = self.create_redis_client()
redis_client.set("session_name", self.session_name)
redis_client.set("session_dir", self._session_dir)
redis_client.set("temp_dir", self._temp_dir)
if not connect_only:
self.start_ray_processes()
@@ -136,25 +153,31 @@ class Node(object):
atexit.register(lambda: self.kill_all_processes(
check_alive=False, allow_graceful=True))
def _init_temp(self):
def _init_temp(self, redis_client):
# Create an dictionary to store temp file index.
self._incremental_dict = collections.defaultdict(lambda: 0)
self._temp_dir = self._ray_params.temp_dir
if self._temp_dir is None:
date_str = datetime.datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
self._temp_dir = self._make_inc_temp(
prefix="session_{date_str}_{pid}".format(
pid=os.getpid(), date_str=date_str),
directory_name="/tmp/ray")
if self.head:
self._temp_dir = self._ray_params.temp_dir
else:
self._temp_dir = ray.utils.decode(redis_client.get("temp_dir"))
try_to_create_directory(self._temp_dir)
try_to_create_directory(self._temp_dir, warn_if_exist=False)
if self.head:
self._session_dir = os.path.join(self._temp_dir, self.session_name)
else:
self._session_dir = ray.utils.decode(
redis_client.get("session_dir"))
# Send a warning message if the session exists.
try_to_create_directory(self._session_dir)
# Create a directory to be used for socket files.
self._sockets_dir = os.path.join(self._temp_dir, "sockets")
try_to_create_directory(self._sockets_dir)
self._sockets_dir = os.path.join(self._session_dir, "sockets")
try_to_create_directory(self._sockets_dir, warn_if_exist=False)
# Create a directory to be used for process log files.
self._logs_dir = os.path.join(self._temp_dir, "logs")
try_to_create_directory(self._logs_dir)
self._logs_dir = os.path.join(self._session_dir, "logs")
try_to_create_directory(self._logs_dir, warn_if_exist=False)
@property
def node_ip_address(self):
@@ -204,6 +227,7 @@ class Node(object):
"object_store_address": self._plasma_store_socket_name,
"raylet_socket_name": self._raylet_socket_name,
"webui_url": self._webui_url,
"session_dir": self._session_dir,
}
def create_redis_client(self):
@@ -215,6 +239,10 @@ class Node(object):
"""Get the path of the temporary directory."""
return self._temp_dir
def get_session_dir_path(self):
"""Get the path of the session directory."""
return self._session_dir
def get_logs_dir_path(self):
"""Get the path of the log files directory."""
return self._logs_dir
+35 -12
View File
@@ -9,12 +9,6 @@ import pytest
import ray
from ray.tests.cluster_utils import Cluster
# Py2 compatibility
try:
FileNotFoundError
except NameError:
FileNotFoundError = OSError
def test_conn_cluster():
# plasma_store_socket_name
@@ -45,13 +39,25 @@ def test_conn_cluster():
def test_tempdir():
shutil.rmtree("/tmp/ray", ignore_errors=True)
ray.init(temp_dir="/tmp/i_am_a_temp_dir")
assert os.path.exists(
"/tmp/i_am_a_temp_dir"), "Specified temp dir not found."
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
ray.shutdown()
shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True)
def test_tempdir_commandline():
shutil.rmtree("/tmp/ray", ignore_errors=True)
os.system("ray start --head --temp-dir=/tmp/i_am_a_temp_dir2")
assert os.path.exists(
"/tmp/i_am_a_temp_dir2"), "Specified temp dir not found."
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
os.system("ray stop")
shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True)
def test_raylet_socket_name():
ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket")
assert os.path.exists(
@@ -59,7 +65,7 @@ def test_raylet_socket_name():
ray.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket")
except FileNotFoundError:
except OSError:
pass # It could have been removed by Ray.
cluster = Cluster(True)
cluster.add_node(raylet_socket_name="/tmp/i_am_a_temp_socket_2")
@@ -68,7 +74,7 @@ def test_raylet_socket_name():
cluster.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket_2")
except FileNotFoundError:
except OSError:
pass # It could have been removed by Ray.
@@ -79,7 +85,7 @@ def test_temp_plasma_store_socket():
ray.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket")
except FileNotFoundError:
except OSError:
pass # It could have been removed by Ray.
cluster = Cluster(True)
cluster.add_node(plasma_store_socket_name="/tmp/i_am_a_temp_socket_2")
@@ -88,14 +94,14 @@ def test_temp_plasma_store_socket():
cluster.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket_2")
except FileNotFoundError:
except OSError:
pass # It could have been removed by Ray.
def test_raylet_tempfiles():
ray.init(num_cpus=0)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
top_levels = set(os.listdir(node.get_session_dir_path()))
assert top_levels.issuperset({"sockets", "logs"})
log_files = set(os.listdir(node.get_logs_dir_path()))
assert log_files.issuperset({
@@ -110,7 +116,7 @@ def test_raylet_tempfiles():
ray.init(num_cpus=2)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
top_levels = set(os.listdir(node.get_session_dir_path()))
assert top_levels.issuperset({"sockets", "logs"})
time.sleep(3) # wait workers to start
log_files = set(os.listdir(node.get_logs_dir_path()))
@@ -128,3 +134,20 @@ def test_raylet_tempfiles():
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
ray.shutdown()
def test_tempdir_privilege():
os.chmod("/tmp/ray", 0o000)
ray.init(num_cpus=1)
session_dir = ray.worker._global_node.get_session_dir_path()
assert os.path.exists(session_dir), "Specified socket path not found."
ray.shutdown()
def test_session_dir_uniqueness():
session_dirs = set()
for _ in range(3):
ray.init(num_cpus=1)
session_dirs.add(ray.worker._global_node.get_session_dir_path)
ray.shutdown()
assert len(session_dirs) == 3
+26 -19
View File
@@ -500,25 +500,7 @@ def is_main_thread():
return threading.current_thread().getName() == "MainThread"
def try_to_create_directory(directory_path):
"""Attempt to create a directory that is globally readable/writable.
Args:
directory_path: The path of the directory to create.
"""
logger = logging.getLogger("ray")
directory_path = os.path.expanduser(directory_path)
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
logger.warning(
"Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
# Change the log directory permissions so others can use it. This is
# important when multiple people are using the same machine.
def try_make_directory_shared(directory_path):
try:
os.chmod(directory_path, 0o0777)
except OSError as e:
@@ -531,3 +513,28 @@ def try_to_create_directory(directory_path):
pass
else:
raise
def try_to_create_directory(directory_path, warn_if_exist=True):
"""Attempt to create a directory that is globally readable/writable.
Args:
directory_path: The path of the directory to create.
warn_if_exist (bool): Warn if the directory already exists.
"""
logger = logging.getLogger("ray")
directory_path = os.path.expanduser(directory_path)
if not os.path.exists(directory_path):
try:
os.makedirs(directory_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise e
if warn_if_exist:
logger.warning(
"Attempted to create '{}', but the directory already "
"exists.".format(directory_path))
# Change the log directory permissions so others can use it. This is
# important when multiple people are using the same machine.
try_make_directory_shared(directory_path)
+1 -2
View File
@@ -1694,8 +1694,7 @@ def connect(node,
mode=WORKER_MODE,
log_to_driver=False,
worker=global_worker,
driver_id=None,
load_code_from_local=False):
driver_id=None):
"""Connect this worker to the raylet, to Plasma, and to Redis.
Args: