mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 22:23:13 +08:00
Change /tmp to platform-specific temporary directory (#7529)
This commit is contained in:
+5
-2
@@ -14,6 +14,7 @@ import time
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.services
|
||||
import ray.utils
|
||||
from ray.resource_spec import ResourceSpec
|
||||
from ray.utils import try_to_create_directory, try_to_symlink
|
||||
|
||||
@@ -82,7 +83,7 @@ class Node:
|
||||
ray_params.update_if_absent(
|
||||
include_log_monitor=True,
|
||||
resources={},
|
||||
temp_dir="/tmp/ray",
|
||||
temp_dir=ray.utils.get_ray_temp_dir(),
|
||||
worker_path=os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)),
|
||||
"workers/default_worker.py"))
|
||||
@@ -313,7 +314,7 @@ class Node:
|
||||
"""Get the path of the sockets directory."""
|
||||
return self._sockets_dir
|
||||
|
||||
def _make_inc_temp(self, suffix="", prefix="", directory_name="/tmp/ray"):
|
||||
def _make_inc_temp(self, suffix="", prefix="", directory_name=None):
|
||||
"""Return a incremental temporary file name. The file is not created.
|
||||
|
||||
Args:
|
||||
@@ -326,6 +327,8 @@ class Node:
|
||||
the same name, the returned name will look like
|
||||
"{directory_name}/{prefix}.{unique_index}{suffix}"
|
||||
"""
|
||||
if directory_name is None:
|
||||
directory_name = ray.utils.get_ray_temp_dir()
|
||||
directory_name = os.path.expanduser(directory_name)
|
||||
index = self._incremental_dict[suffix, prefix, directory_name]
|
||||
# `tempfile.TMP_MAX` could be extremely large,
|
||||
|
||||
@@ -199,9 +199,9 @@ class SessionRunner:
|
||||
requirements_txt = project_environment["requirements"]
|
||||
|
||||
# Create a temporary requirements_txt in the head node.
|
||||
remote_requirements_txt = (
|
||||
"/tmp/" + "ray_project_requirements_txt_{}".format(
|
||||
time.time()))
|
||||
remote_requirements_txt = os.path.join(
|
||||
ray.utils.get_user_temp_dir(),
|
||||
"ray_project_requirements_txt_{}".format(time.time()))
|
||||
|
||||
rsync(
|
||||
self.project_definition.cluster_yaml(),
|
||||
|
||||
@@ -7,6 +7,7 @@ import time
|
||||
import datetime
|
||||
import grpc
|
||||
import subprocess
|
||||
import sys
|
||||
from concurrent import futures
|
||||
|
||||
import ray
|
||||
@@ -30,7 +31,7 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
|
||||
def GetProfilingStats(self, request, context):
|
||||
pid = request.pid
|
||||
duration = request.duration
|
||||
profiling_file_path = os.path.join("/tmp/ray/",
|
||||
profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
"{}_profiling.txt".format(pid))
|
||||
process = subprocess.Popen(
|
||||
"sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope"
|
||||
@@ -127,7 +128,11 @@ class Reporter:
|
||||
|
||||
@staticmethod
|
||||
def get_disk_usage():
|
||||
return {x: psutil.disk_usage(x) for x in ["/", "/tmp"]}
|
||||
dirs = [
|
||||
os.environ["USERPROFILE"] if sys.platform == "win32" else os.sep,
|
||||
ray.utils.get_user_temp_dir(),
|
||||
]
|
||||
return {x: psutil.disk_usage(x) for x in dirs}
|
||||
|
||||
@staticmethod
|
||||
def get_workers():
|
||||
|
||||
@@ -912,7 +912,8 @@ def timeline(address):
|
||||
logger.info("Connecting to Ray instance at {}.".format(address))
|
||||
ray.init(address=address)
|
||||
time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
filename = "/tmp/ray-timeline-{}.json".format(time)
|
||||
filename = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"ray-timeline-{}.json".format(time))
|
||||
ray.timeline(filename=filename)
|
||||
size = os.path.getsize(filename)
|
||||
logger.info("Trace file written to {} ({} bytes).".format(filename, size))
|
||||
|
||||
@@ -423,8 +423,9 @@ def start_ray_process(command,
|
||||
"If 'use_gdb' is true, then 'use_tmux' must be true as well.")
|
||||
|
||||
# TODO(suquark): Any better temp file creation here?
|
||||
gdb_init_path = "/tmp/ray/gdb_init_{}_{}".format(
|
||||
process_type, time.time())
|
||||
gdb_init_path = os.path.join(
|
||||
ray.utils.get_ray_temp_dir(), "gdb_init_{}_{}".format(
|
||||
process_type, time.time()))
|
||||
ray_process_path = command[0]
|
||||
ray_process_args = command[1:]
|
||||
run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
|
||||
@@ -1431,18 +1432,18 @@ def determine_plasma_store_config(object_store_memory,
|
||||
if shm_avail > object_store_memory:
|
||||
plasma_directory = "/dev/shm"
|
||||
else:
|
||||
plasma_directory = "/tmp"
|
||||
plasma_directory = ray.utils.get_user_temp_dir()
|
||||
logger.warning(
|
||||
"WARNING: The object store is using /tmp instead of "
|
||||
"WARNING: The object store is using {} instead of "
|
||||
"/dev/shm because /dev/shm has only {} bytes available. "
|
||||
"This may slow down performance! You may be able to free "
|
||||
"up space by deleting files in /dev/shm or terminating "
|
||||
"any running plasma_store_server processes. If you are "
|
||||
"inside a Docker container, you may need to pass an "
|
||||
"argument with the flag '--shm-size' to 'docker run'.".
|
||||
format(shm_avail))
|
||||
format(ray.utils.get_user_temp_dir(), shm_avail))
|
||||
else:
|
||||
plasma_directory = "/tmp"
|
||||
plasma_directory = ray.utils.get_user_temp_dir()
|
||||
|
||||
# Do some sanity checks.
|
||||
if object_store_memory > system_memory:
|
||||
|
||||
@@ -22,7 +22,8 @@ from ray.test_utils import (relevant_errors, wait_for_condition,
|
||||
|
||||
@pytest.fixture
|
||||
def ray_checkpointable_actor_cls(request):
|
||||
checkpoint_dir = "/tmp/ray_temp_checkpoint_dir/"
|
||||
checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"ray_temp_checkpoint_dir") + os.sep
|
||||
if not os.path.isdir(checkpoint_dir):
|
||||
os.mkdir(checkpoint_dir)
|
||||
|
||||
|
||||
@@ -465,7 +465,8 @@ def test_pandas_parquet_serialization():
|
||||
|
||||
def test_socket_dir_not_existing(shutdown_only):
|
||||
random_name = ray.ObjectID.from_random().hex()
|
||||
temp_raylet_socket_dir = "/tmp/ray/tests/{}".format(random_name)
|
||||
temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
"tests", random_name)
|
||||
temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir,
|
||||
"raylet_socket")
|
||||
ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name)
|
||||
|
||||
@@ -51,7 +51,7 @@ def test_project_root():
|
||||
project_definition = ray.projects.ProjectDefinition(path2)
|
||||
assert os.path.normpath(project_definition.root) == os.path.normpath(path)
|
||||
|
||||
path3 = "/tmp/"
|
||||
path3 = ray.utils.get_user_temp_dir() + os.sep
|
||||
with pytest.raises(ValueError):
|
||||
project_definition = ray.projects.ProjectDefinition(path3)
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import time
|
||||
import pytest
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
import subprocess
|
||||
from ray.cluster_utils import Cluster
|
||||
|
||||
|
||||
@@ -12,7 +13,8 @@ def test_conn_cluster():
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
ray.init(
|
||||
address="127.0.0.1:6379",
|
||||
plasma_store_socket_name="/tmp/this_should_fail")
|
||||
plasma_store_socket_name=os.path.join(
|
||||
ray.utils.get_user_temp_dir(), "this_should_fail"))
|
||||
assert exc_info.value.args[0] == (
|
||||
"When connecting to an existing cluster, "
|
||||
"plasma_store_socket_name must not be provided.")
|
||||
@@ -21,74 +23,111 @@ def test_conn_cluster():
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
ray.init(
|
||||
address="127.0.0.1:6379",
|
||||
raylet_socket_name="/tmp/this_should_fail")
|
||||
raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"this_should_fail"))
|
||||
assert exc_info.value.args[0] == (
|
||||
"When connecting to an existing cluster, "
|
||||
"raylet_socket_name must not be provided.")
|
||||
|
||||
# temp_dir
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
ray.init(address="127.0.0.1:6379", temp_dir="/tmp/this_should_fail")
|
||||
ray.init(
|
||||
address="127.0.0.1:6379",
|
||||
temp_dir=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"this_should_fail"))
|
||||
assert exc_info.value.args[0] == (
|
||||
"When connecting to an existing cluster, "
|
||||
"temp_dir must not be provided.")
|
||||
|
||||
|
||||
def test_tempdir(shutdown_only):
|
||||
shutil.rmtree("/tmp/ray", ignore_errors=True)
|
||||
ray.init(temp_dir="/tmp/i_am_a_temp_dir")
|
||||
shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True)
|
||||
ray.init(
|
||||
temp_dir=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_dir"))
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_dir"), "Specified temp dir not found."
|
||||
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
|
||||
shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True)
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_dir")), "Specified temp dir not found."
|
||||
assert not os.path.exists(
|
||||
ray.utils.get_ray_temp_dir()), ("Default temp dir should not exist.")
|
||||
shutil.rmtree(
|
||||
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir"),
|
||||
ignore_errors=True)
|
||||
|
||||
|
||||
def test_tempdir_commandline():
|
||||
shutil.rmtree("/tmp/ray", ignore_errors=True)
|
||||
os.system("ray start --head --temp-dir=/tmp/i_am_a_temp_dir2")
|
||||
shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True)
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--temp-dir=" + os.path.join(
|
||||
ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2")
|
||||
])
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_dir2"), "Specified temp dir not found."
|
||||
assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist."
|
||||
os.system("ray stop")
|
||||
shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True)
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_dir2")), "Specified temp dir not found."
|
||||
assert not os.path.exists(
|
||||
ray.utils.get_ray_temp_dir()), "Default temp dir should not exist."
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
shutil.rmtree(
|
||||
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"),
|
||||
ignore_errors=True)
|
||||
|
||||
|
||||
def test_raylet_socket_name(shutdown_only):
|
||||
ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket")
|
||||
ray.init(
|
||||
raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket"))
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket")), "Specified socket path not found."
|
||||
ray.shutdown()
|
||||
try:
|
||||
os.remove("/tmp/i_am_a_temp_socket")
|
||||
os.remove(
|
||||
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket"))
|
||||
except OSError:
|
||||
pass # It could have been removed by Ray.
|
||||
cluster = Cluster(True)
|
||||
cluster.add_node(raylet_socket_name="/tmp/i_am_a_temp_socket_2")
|
||||
cluster.add_node(
|
||||
raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2"))
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket_2"), "Specified socket path not found."
|
||||
os.path.join(
|
||||
ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2")), "Specified socket path not found."
|
||||
cluster.shutdown()
|
||||
try:
|
||||
os.remove("/tmp/i_am_a_temp_socket_2")
|
||||
os.remove(
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2"))
|
||||
except OSError:
|
||||
pass # It could have been removed by Ray.
|
||||
|
||||
|
||||
def test_temp_plasma_store_socket(shutdown_only):
|
||||
ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket")
|
||||
ray.init(
|
||||
plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket"))
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket")), "Specified socket path not found."
|
||||
ray.shutdown()
|
||||
try:
|
||||
os.remove("/tmp/i_am_a_temp_socket")
|
||||
os.remove(
|
||||
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket"))
|
||||
except OSError:
|
||||
pass # It could have been removed by Ray.
|
||||
cluster = Cluster(True)
|
||||
cluster.add_node(plasma_store_socket_name="/tmp/i_am_a_temp_socket_2")
|
||||
cluster.add_node(
|
||||
plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2"))
|
||||
assert os.path.exists(
|
||||
"/tmp/i_am_a_temp_socket_2"), "Specified socket path not found."
|
||||
os.path.join(
|
||||
ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2")), "Specified socket path not found."
|
||||
cluster.shutdown()
|
||||
try:
|
||||
os.remove("/tmp/i_am_a_temp_socket_2")
|
||||
os.remove(
|
||||
os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"i_am_a_temp_socket_2"))
|
||||
except OSError:
|
||||
pass # It could have been removed by Ray.
|
||||
|
||||
@@ -135,7 +174,7 @@ def test_raylet_tempfiles(shutdown_only):
|
||||
|
||||
|
||||
def test_tempdir_privilege(shutdown_only):
|
||||
os.chmod("/tmp/ray", 0o000)
|
||||
os.chmod(ray.utils.get_ray_temp_dir(), 0o000)
|
||||
ray.init(num_cpus=1)
|
||||
session_dir = ray.worker._global_node.get_session_dir_path()
|
||||
assert os.path.exists(session_dir), "Specified socket path not found."
|
||||
|
||||
@@ -19,9 +19,11 @@ class MockDurableTrainable(DurableTrainable):
|
||||
def __init__(self, remote_checkpoint_dir, *args, **kwargs):
|
||||
# Mock the path as a local path.
|
||||
local_dir_suffix = remote_checkpoint_dir.split("://")[1]
|
||||
remote_checkpoint_dir = os.path.join("/tmp", local_dir_suffix)
|
||||
remote_checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
local_dir_suffix)
|
||||
# Disallow malformed relative paths for delete safety.
|
||||
assert os.path.abspath(remote_checkpoint_dir).startswith("/tmp")
|
||||
assert os.path.abspath(remote_checkpoint_dir).startswith(
|
||||
ray.utils.get_user_temp_dir())
|
||||
logger.info("Using %s as the mocked remote checkpoint directory.",
|
||||
self.remote_checkpoint_dir)
|
||||
super(MockDurableTrainable, self).__init__(remote_checkpoint_dir,
|
||||
|
||||
@@ -26,7 +26,7 @@ from torch.nn import functional as F
|
||||
from scipy.stats import entropy
|
||||
|
||||
# Training parameters
|
||||
dataroot = "/tmp/"
|
||||
dataroot = ray.utils.get_user_temp_dir() + os.sep
|
||||
workers = 2
|
||||
batch_size = 64
|
||||
image_size = 32
|
||||
|
||||
@@ -294,14 +294,16 @@ class TrainableFunctionApiTest(unittest.TestCase):
|
||||
|
||||
def testLogdir(self):
|
||||
def train(config, reporter):
|
||||
assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd()
|
||||
assert os.path.join(ray.utils.get_user_temp_dir(), "logdir",
|
||||
"foo") in os.getcwd(), os.getcwd()
|
||||
reporter(timesteps_total=1)
|
||||
|
||||
register_trainable("f1", train)
|
||||
run_experiments({
|
||||
"foo": {
|
||||
"run": "f1",
|
||||
"local_dir": "/tmp/logdir",
|
||||
"local_dir": os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"logdir"),
|
||||
"config": {
|
||||
"a": "b"
|
||||
},
|
||||
@@ -330,14 +332,16 @@ class TrainableFunctionApiTest(unittest.TestCase):
|
||||
|
||||
def testLongFilename(self):
|
||||
def train(config, reporter):
|
||||
assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd()
|
||||
assert os.path.join(ray.utils.get_user_temp_dir(), "logdir",
|
||||
"foo") in os.getcwd(), os.getcwd()
|
||||
reporter(timesteps_total=1)
|
||||
|
||||
register_trainable("f1", train)
|
||||
run_experiments({
|
||||
"foo": {
|
||||
"run": "f1",
|
||||
"local_dir": "/tmp/logdir",
|
||||
"local_dir": os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"logdir"),
|
||||
"config": {
|
||||
"a" * 50: tune.sample_from(lambda spec: 5.0 / 7),
|
||||
"b" * 50: tune.sample_from(lambda spec: "long" * 40),
|
||||
|
||||
@@ -3,12 +3,15 @@ import pickle
|
||||
import shutil
|
||||
import unittest
|
||||
|
||||
import ray.utils
|
||||
|
||||
from ray.tune.trainable import TrainableUtil
|
||||
|
||||
|
||||
class TrainableUtilTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.checkpoint_dir = "/tmp/tune/MyTrainable123"
|
||||
self.checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"tune", "MyTrainable123")
|
||||
TrainableUtil.make_checkpoint_dir(self.checkpoint_dir)
|
||||
|
||||
def tearDown(self):
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import os
|
||||
|
||||
import ray.utils
|
||||
|
||||
from ray.rllib.agents.mock import _MockTrainer
|
||||
from ray.tune import DurableTrainable
|
||||
from ray.tune.sync_client import get_sync_client
|
||||
from ray.tune.syncer import NodeSyncer
|
||||
|
||||
MOCK_REMOTE_DIR = "/tmp/mock-tune-remote/"
|
||||
MOCK_REMOTE_DIR = os.path.join(ray.utils.get_user_temp_dir(),
|
||||
"mock-tune-remote") + os.sep
|
||||
# Sync and delete templates that operate on local directories.
|
||||
LOCAL_SYNC_TEMPLATE = "mkdir -p {target} && rsync -avz {source}/ {target}/"
|
||||
LOCAL_DELETE_TEMPLATE = "rm -rf {target}"
|
||||
|
||||
@@ -8,6 +8,7 @@ import os
|
||||
import six
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
@@ -29,6 +30,20 @@ win32_job = None
|
||||
win32_AssignProcessToJobObject = None
|
||||
|
||||
|
||||
def get_user_temp_dir():
|
||||
if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
|
||||
# Ideally we wouldn't need this fallback, but keep it for now for
|
||||
# for compatibility
|
||||
tempdir = os.path.join(os.sep, "tmp")
|
||||
else:
|
||||
tempdir = tempfile.gettempdir()
|
||||
return tempdir
|
||||
|
||||
|
||||
def get_ray_temp_dir():
|
||||
return os.path.join(get_user_temp_dir(), "ray")
|
||||
|
||||
|
||||
def _random_string():
|
||||
id_hash = hashlib.sha1()
|
||||
id_hash.update(uuid.uuid4().bytes)
|
||||
|
||||
Reference in New Issue
Block a user