diff --git a/python/ray/node.py b/python/ray/node.py index 100bf7dbe..f1ea737de 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -14,6 +14,7 @@ import time import ray import ray.ray_constants as ray_constants import ray.services +import ray.utils from ray.resource_spec import ResourceSpec from ray.utils import try_to_create_directory, try_to_symlink @@ -82,7 +83,7 @@ class Node: ray_params.update_if_absent( include_log_monitor=True, resources={}, - temp_dir="/tmp/ray", + temp_dir=ray.utils.get_ray_temp_dir(), worker_path=os.path.join( os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) @@ -313,7 +314,7 @@ class Node: """Get the path of the sockets directory.""" return self._sockets_dir - def _make_inc_temp(self, suffix="", prefix="", directory_name="/tmp/ray"): + def _make_inc_temp(self, suffix="", prefix="", directory_name=None): """Return a incremental temporary file name. The file is not created. Args: @@ -326,6 +327,8 @@ class Node: the same name, the returned name will look like "{directory_name}/{prefix}.{unique_index}{suffix}" """ + if directory_name is None: + directory_name = ray.utils.get_ray_temp_dir() directory_name = os.path.expanduser(directory_name) index = self._incremental_dict[suffix, prefix, directory_name] # `tempfile.TMP_MAX` could be extremely large, diff --git a/python/ray/projects/scripts.py b/python/ray/projects/scripts.py index d02649f50..2610f3232 100644 --- a/python/ray/projects/scripts.py +++ b/python/ray/projects/scripts.py @@ -199,9 +199,9 @@ class SessionRunner: requirements_txt = project_environment["requirements"] # Create a temporary requirements_txt in the head node. - remote_requirements_txt = ( - "/tmp/" + "ray_project_requirements_txt_{}".format( - time.time())) + remote_requirements_txt = os.path.join( + ray.utils.get_user_temp_dir(), + "ray_project_requirements_txt_{}".format(time.time())) rsync( self.project_definition.cluster_yaml(), diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 23117adff..b78ac1784 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -7,6 +7,7 @@ import time import datetime import grpc import subprocess +import sys from concurrent import futures import ray @@ -30,7 +31,7 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): def GetProfilingStats(self, request, context): pid = request.pid duration = request.duration - profiling_file_path = os.path.join("/tmp/ray/", + profiling_file_path = os.path.join(ray.utils.get_ray_temp_dir(), "{}_profiling.txt".format(pid)) process = subprocess.Popen( "sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope" @@ -127,7 +128,11 @@ class Reporter: @staticmethod def get_disk_usage(): - return {x: psutil.disk_usage(x) for x in ["/", "/tmp"]} + dirs = [ + os.environ["USERPROFILE"] if sys.platform == "win32" else os.sep, + ray.utils.get_user_temp_dir(), + ] + return {x: psutil.disk_usage(x) for x in dirs} @staticmethod def get_workers(): diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 09cfddd7e..28b9c3498 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -912,7 +912,8 @@ def timeline(address): logger.info("Connecting to Ray instance at {}.".format(address)) ray.init(address=address) time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - filename = "/tmp/ray-timeline-{}.json".format(time) + filename = os.path.join(ray.utils.get_user_temp_dir(), + "ray-timeline-{}.json".format(time)) ray.timeline(filename=filename) size = os.path.getsize(filename) logger.info("Trace file written to {} ({} bytes).".format(filename, size)) diff --git a/python/ray/services.py b/python/ray/services.py index 95ee2c484..4a206a00b 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -423,8 +423,9 @@ def start_ray_process(command, "If 'use_gdb' is true, then 'use_tmux' must be true as well.") # TODO(suquark): Any better temp file creation here? - gdb_init_path = "/tmp/ray/gdb_init_{}_{}".format( - process_type, time.time()) + gdb_init_path = os.path.join( + ray.utils.get_ray_temp_dir(), "gdb_init_{}_{}".format( + process_type, time.time())) ray_process_path = command[0] ray_process_args = command[1:] run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args]) @@ -1431,18 +1432,18 @@ def determine_plasma_store_config(object_store_memory, if shm_avail > object_store_memory: plasma_directory = "/dev/shm" else: - plasma_directory = "/tmp" + plasma_directory = ray.utils.get_user_temp_dir() logger.warning( - "WARNING: The object store is using /tmp instead of " + "WARNING: The object store is using {} instead of " "/dev/shm because /dev/shm has only {} bytes available. " "This may slow down performance! You may be able to free " "up space by deleting files in /dev/shm or terminating " "any running plasma_store_server processes. If you are " "inside a Docker container, you may need to pass an " "argument with the flag '--shm-size' to 'docker run'.". - format(shm_avail)) + format(ray.utils.get_user_temp_dir(), shm_avail)) else: - plasma_directory = "/tmp" + plasma_directory = ray.utils.get_user_temp_dir() # Do some sanity checks. if object_store_memory > system_memory: diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 8da0ba248..f9ef76644 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -22,7 +22,8 @@ from ray.test_utils import (relevant_errors, wait_for_condition, @pytest.fixture def ray_checkpointable_actor_cls(request): - checkpoint_dir = "/tmp/ray_temp_checkpoint_dir/" + checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(), + "ray_temp_checkpoint_dir") + os.sep if not os.path.isdir(checkpoint_dir): os.mkdir(checkpoint_dir) diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 6b693f002..3b4bdd23b 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -465,7 +465,8 @@ def test_pandas_parquet_serialization(): def test_socket_dir_not_existing(shutdown_only): random_name = ray.ObjectID.from_random().hex() - temp_raylet_socket_dir = "/tmp/ray/tests/{}".format(random_name) + temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(), + "tests", random_name) temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir, "raylet_socket") ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name) diff --git a/python/ray/tests/test_projects.py b/python/ray/tests/test_projects.py index becc778a6..b2d6b60da 100644 --- a/python/ray/tests/test_projects.py +++ b/python/ray/tests/test_projects.py @@ -51,7 +51,7 @@ def test_project_root(): project_definition = ray.projects.ProjectDefinition(path2) assert os.path.normpath(project_definition.root) == os.path.normpath(path) - path3 = "/tmp/" + path3 = ray.utils.get_user_temp_dir() + os.sep with pytest.raises(ValueError): project_definition = ray.projects.ProjectDefinition(path3) diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 6acd7dc01..473d940a3 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -4,6 +4,7 @@ import time import pytest import ray import ray.ray_constants as ray_constants +import subprocess from ray.cluster_utils import Cluster @@ -12,7 +13,8 @@ def test_conn_cluster(): with pytest.raises(Exception) as exc_info: ray.init( address="127.0.0.1:6379", - plasma_store_socket_name="/tmp/this_should_fail") + plasma_store_socket_name=os.path.join( + ray.utils.get_user_temp_dir(), "this_should_fail")) assert exc_info.value.args[0] == ( "When connecting to an existing cluster, " "plasma_store_socket_name must not be provided.") @@ -21,74 +23,111 @@ def test_conn_cluster(): with pytest.raises(Exception) as exc_info: ray.init( address="127.0.0.1:6379", - raylet_socket_name="/tmp/this_should_fail") + raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(), + "this_should_fail")) assert exc_info.value.args[0] == ( "When connecting to an existing cluster, " "raylet_socket_name must not be provided.") # temp_dir with pytest.raises(Exception) as exc_info: - ray.init(address="127.0.0.1:6379", temp_dir="/tmp/this_should_fail") + ray.init( + address="127.0.0.1:6379", + temp_dir=os.path.join(ray.utils.get_user_temp_dir(), + "this_should_fail")) assert exc_info.value.args[0] == ( "When connecting to an existing cluster, " "temp_dir must not be provided.") def test_tempdir(shutdown_only): - shutil.rmtree("/tmp/ray", ignore_errors=True) - ray.init(temp_dir="/tmp/i_am_a_temp_dir") + shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True) + ray.init( + temp_dir=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_dir")) assert os.path.exists( - "/tmp/i_am_a_temp_dir"), "Specified temp dir not found." - assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist." - shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True) + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_dir")), "Specified temp dir not found." + assert not os.path.exists( + ray.utils.get_ray_temp_dir()), ("Default temp dir should not exist.") + shutil.rmtree( + os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir"), + ignore_errors=True) def test_tempdir_commandline(): - shutil.rmtree("/tmp/ray", ignore_errors=True) - os.system("ray start --head --temp-dir=/tmp/i_am_a_temp_dir2") + shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True) + subprocess.check_call([ + "ray", "start", "--head", "--temp-dir=" + os.path.join( + ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2") + ]) assert os.path.exists( - "/tmp/i_am_a_temp_dir2"), "Specified temp dir not found." - assert not os.path.exists("/tmp/ray"), "Default temp dir should not exist." - os.system("ray stop") - shutil.rmtree("/tmp/i_am_a_temp_dir2", ignore_errors=True) + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_dir2")), "Specified temp dir not found." + assert not os.path.exists( + ray.utils.get_ray_temp_dir()), "Default temp dir should not exist." + subprocess.check_call(["ray", "stop"]) + shutil.rmtree( + os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"), + ignore_errors=True) def test_raylet_socket_name(shutdown_only): - ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket") + ray.init( + raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket")) assert os.path.exists( - "/tmp/i_am_a_temp_socket"), "Specified socket path not found." + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket")), "Specified socket path not found." ray.shutdown() try: - os.remove("/tmp/i_am_a_temp_socket") + os.remove( + os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket")) except OSError: pass # It could have been removed by Ray. cluster = Cluster(True) - cluster.add_node(raylet_socket_name="/tmp/i_am_a_temp_socket_2") + cluster.add_node( + raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")) assert os.path.exists( - "/tmp/i_am_a_temp_socket_2"), "Specified socket path not found." + os.path.join( + ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")), "Specified socket path not found." cluster.shutdown() try: - os.remove("/tmp/i_am_a_temp_socket_2") + os.remove( + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")) except OSError: pass # It could have been removed by Ray. def test_temp_plasma_store_socket(shutdown_only): - ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket") + ray.init( + plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket")) assert os.path.exists( - "/tmp/i_am_a_temp_socket"), "Specified socket path not found." + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket")), "Specified socket path not found." ray.shutdown() try: - os.remove("/tmp/i_am_a_temp_socket") + os.remove( + os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket")) except OSError: pass # It could have been removed by Ray. cluster = Cluster(True) - cluster.add_node(plasma_store_socket_name="/tmp/i_am_a_temp_socket_2") + cluster.add_node( + plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")) assert os.path.exists( - "/tmp/i_am_a_temp_socket_2"), "Specified socket path not found." + os.path.join( + ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")), "Specified socket path not found." cluster.shutdown() try: - os.remove("/tmp/i_am_a_temp_socket_2") + os.remove( + os.path.join(ray.utils.get_user_temp_dir(), + "i_am_a_temp_socket_2")) except OSError: pass # It could have been removed by Ray. @@ -135,7 +174,7 @@ def test_raylet_tempfiles(shutdown_only): def test_tempdir_privilege(shutdown_only): - os.chmod("/tmp/ray", 0o000) + os.chmod(ray.utils.get_ray_temp_dir(), 0o000) ray.init(num_cpus=1) session_dir = ray.worker._global_node.get_session_dir_path() assert os.path.exists(session_dir), "Specified socket path not found." diff --git a/python/ray/tune/examples/durable_trainable_example.py b/python/ray/tune/examples/durable_trainable_example.py index 4e13abf34..97bad8af1 100644 --- a/python/ray/tune/examples/durable_trainable_example.py +++ b/python/ray/tune/examples/durable_trainable_example.py @@ -19,9 +19,11 @@ class MockDurableTrainable(DurableTrainable): def __init__(self, remote_checkpoint_dir, *args, **kwargs): # Mock the path as a local path. local_dir_suffix = remote_checkpoint_dir.split("://")[1] - remote_checkpoint_dir = os.path.join("/tmp", local_dir_suffix) + remote_checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(), + local_dir_suffix) # Disallow malformed relative paths for delete safety. - assert os.path.abspath(remote_checkpoint_dir).startswith("/tmp") + assert os.path.abspath(remote_checkpoint_dir).startswith( + ray.utils.get_user_temp_dir()) logger.info("Using %s as the mocked remote checkpoint directory.", self.remote_checkpoint_dir) super(MockDurableTrainable, self).__init__(remote_checkpoint_dir, diff --git a/python/ray/tune/examples/pbt_dcgan_mnist/pbt_dcgan_mnist.py b/python/ray/tune/examples/pbt_dcgan_mnist/pbt_dcgan_mnist.py index 2515b1eed..fb743898e 100644 --- a/python/ray/tune/examples/pbt_dcgan_mnist/pbt_dcgan_mnist.py +++ b/python/ray/tune/examples/pbt_dcgan_mnist/pbt_dcgan_mnist.py @@ -26,7 +26,7 @@ from torch.nn import functional as F from scipy.stats import entropy # Training parameters -dataroot = "/tmp/" +dataroot = ray.utils.get_user_temp_dir() + os.sep workers = 2 batch_size = 64 image_size = 32 diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index c578b26bd..a906022eb 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -294,14 +294,16 @@ class TrainableFunctionApiTest(unittest.TestCase): def testLogdir(self): def train(config, reporter): - assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd() + assert os.path.join(ray.utils.get_user_temp_dir(), "logdir", + "foo") in os.getcwd(), os.getcwd() reporter(timesteps_total=1) register_trainable("f1", train) run_experiments({ "foo": { "run": "f1", - "local_dir": "/tmp/logdir", + "local_dir": os.path.join(ray.utils.get_user_temp_dir(), + "logdir"), "config": { "a": "b" }, @@ -330,14 +332,16 @@ class TrainableFunctionApiTest(unittest.TestCase): def testLongFilename(self): def train(config, reporter): - assert "/tmp/logdir/foo" in os.getcwd(), os.getcwd() + assert os.path.join(ray.utils.get_user_temp_dir(), "logdir", + "foo") in os.getcwd(), os.getcwd() reporter(timesteps_total=1) register_trainable("f1", train) run_experiments({ "foo": { "run": "f1", - "local_dir": "/tmp/logdir", + "local_dir": os.path.join(ray.utils.get_user_temp_dir(), + "logdir"), "config": { "a" * 50: tune.sample_from(lambda spec: 5.0 / 7), "b" * 50: tune.sample_from(lambda spec: "long" * 40), diff --git a/python/ray/tune/tests/test_trainable_util.py b/python/ray/tune/tests/test_trainable_util.py index 2664b4554..b88948afa 100644 --- a/python/ray/tune/tests/test_trainable_util.py +++ b/python/ray/tune/tests/test_trainable_util.py @@ -3,12 +3,15 @@ import pickle import shutil import unittest +import ray.utils + from ray.tune.trainable import TrainableUtil class TrainableUtilTest(unittest.TestCase): def setUp(self): - self.checkpoint_dir = "/tmp/tune/MyTrainable123" + self.checkpoint_dir = os.path.join(ray.utils.get_user_temp_dir(), + "tune", "MyTrainable123") TrainableUtil.make_checkpoint_dir(self.checkpoint_dir) def tearDown(self): diff --git a/python/ray/tune/utils/mock.py b/python/ray/tune/utils/mock.py index 857de431a..214b05726 100644 --- a/python/ray/tune/utils/mock.py +++ b/python/ray/tune/utils/mock.py @@ -1,11 +1,14 @@ import os +import ray.utils + from ray.rllib.agents.mock import _MockTrainer from ray.tune import DurableTrainable from ray.tune.sync_client import get_sync_client from ray.tune.syncer import NodeSyncer -MOCK_REMOTE_DIR = "/tmp/mock-tune-remote/" +MOCK_REMOTE_DIR = os.path.join(ray.utils.get_user_temp_dir(), + "mock-tune-remote") + os.sep # Sync and delete templates that operate on local directories. LOCAL_SYNC_TEMPLATE = "mkdir -p {target} && rsync -avz {source}/ {target}/" LOCAL_DELETE_TEMPLATE = "rm -rf {target}" diff --git a/python/ray/utils.py b/python/ray/utils.py index b03516abd..f796fce51 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -8,6 +8,7 @@ import os import six import subprocess import sys +import tempfile import threading import time import uuid @@ -29,6 +30,20 @@ win32_job = None win32_AssignProcessToJobObject = None +def get_user_temp_dir(): + if sys.platform.startswith("darwin") or sys.platform.startswith("linux"): + # Ideally we wouldn't need this fallback, but keep it for now for + # for compatibility + tempdir = os.path.join(os.sep, "tmp") + else: + tempdir = tempfile.gettempdir() + return tempdir + + +def get_ray_temp_dir(): + return os.path.join(get_user_temp_dir(), "ray") + + def _random_string(): id_hash = hashlib.sha1() id_hash.update(uuid.uuid4().bytes) diff --git a/rllib/examples/export/cartpole_dqn_export.py b/rllib/examples/export/cartpole_dqn_export.py index adaaf2bea..d500644e0 100644 --- a/rllib/examples/export/cartpole_dqn_export.py +++ b/rllib/examples/export/cartpole_dqn_export.py @@ -54,8 +54,8 @@ def restore_checkpoint(export_dir, prefix): if __name__ == "__main__": algo = "DQN" - model_dir = "/tmp/model_export_dir" - ckpt_dir = "/tmp/ckpt_export_dir" + model_dir = os.path.join(ray.utils.get_user_temp_dir(), "model_export_dir") + ckpt_dir = os.path.join(ray.utils.get_user_temp_dir(), "ckpt_export_dir") prefix = "model.ckpt" num_steps = 3 train_and_export(algo, num_steps, model_dir, ckpt_dir, prefix) diff --git a/rllib/examples/saving_experiences.py b/rllib/examples/saving_experiences.py index 6ad53570d..544127b89 100644 --- a/rllib/examples/saving_experiences.py +++ b/rllib/examples/saving_experiences.py @@ -3,6 +3,9 @@ # __sphinx_doc_begin__ import gym import numpy as np +import os + +import ray.utils from ray.rllib.models.preprocessors import get_preprocessor from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder @@ -10,7 +13,8 @@ from ray.rllib.offline.json_writer import JsonWriter if __name__ == "__main__": batch_builder = SampleBatchBuilder() # or MultiAgentSampleBatchBuilder - writer = JsonWriter("/tmp/demo-out") + writer = JsonWriter( + os.path.join(ray.utils.get_user_temp_dir(), "demo-out")) # You normally wouldn't want to manually create sample batches if a # simulator is available, but let's do it anyways for example purposes: diff --git a/rllib/tests/test_checkpoint_restore.py b/rllib/tests/test_checkpoint_restore.py index c6f38ed6b..99d8a6286 100644 --- a/rllib/tests/test_checkpoint_restore.py +++ b/rllib/tests/test_checkpoint_restore.py @@ -125,7 +125,8 @@ def export_test(alg_name, failures): res = algo.train() print("current status: " + str(res)) - export_dir = "/tmp/export_dir_%s" % alg_name + export_dir = os.path.join(ray.utils.get_user_temp_dir(), + "export_dir_%s" % alg_name) print("Exporting model ", alg_name, export_dir) algo.export_policy_model(export_dir) if not valid_tf_model(export_dir): diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 40657d394..e3e954546 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -27,11 +27,11 @@ // Adapted from Apache Arrow, Apache Kudu, TensorFlow #include "ray/common/status.h" -#include #include #include +#include namespace ray { diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index c4bdc80ef..ab769508d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -32,6 +32,7 @@ #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/raylet/raylet_client.h" +#include "ray/util/filesystem.h" #include "src/ray/protobuf/core_worker.pb.h" #include "src/ray/protobuf/gcs.pb.h" @@ -149,7 +150,8 @@ class CoreWorkerTest : public ::testing::Test { } std::string StartStore() { - std::string store_socket_name = "/tmp/store" + ObjectID::FromRandom().Hex(); + std::string store_socket_name = + ray::JoinPaths(ray::GetUserTempDir(), "store" + ObjectID::FromRandom().Hex()); std::string store_pid = store_socket_name + ".pid"; std::string plasma_command = store_executable + " -m 10000000 -s " + store_socket_name + @@ -171,7 +173,8 @@ class CoreWorkerTest : public ::testing::Test { std::string StartRaylet(std::string store_socket_name, std::string node_ip_address, int port, std::string redis_address, std::string resource) { - std::string raylet_socket_name = "/tmp/raylet" + ObjectID::FromRandom().Hex(); + std::string raylet_socket_name = + ray::JoinPaths(ray::GetUserTempDir(), "raylet" + ObjectID::FromRandom().Hex()); std::string ray_start_cmd = raylet_executable; ray_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name) .append(" --store_socket_name=" + store_socket_name) @@ -204,8 +207,8 @@ class CoreWorkerTest : public ::testing::Test { } std::string StartRayletMonitor(std::string redis_address) { - std::string raylet_monitor_pid = - "/tmp/raylet_monitor" + ObjectID::FromRandom().Hex() + ".pid"; + std::string raylet_monitor_pid = ray::JoinPaths( + ray::GetUserTempDir(), "raylet_monitor" + ObjectID::FromRandom().Hex() + ".pid"); std::string raylet_monitor_start_cmd = raylet_monitor_executable; raylet_monitor_start_cmd.append(" --redis_address=" + redis_address) .append(" --redis_port=6379") @@ -225,8 +228,8 @@ class CoreWorkerTest : public ::testing::Test { } std::string StartGcsServer(std::string redis_address) { - std::string gcs_server_pid = - "/tmp/gcs_server" + ObjectID::FromRandom().Hex() + ".pid"; + std::string gcs_server_pid = ray::JoinPaths( + ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex() + ".pid"); std::string gcs_server_start_cmd = gcs_server_executable; gcs_server_start_cmd.append(" --redis_address=" + redis_address) .append(" --redis_port=6379") diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 1f0f52be9..8db2aa7d4 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -20,10 +20,9 @@ #include #include "gtest/gtest.h" - #include "ray/common/status.h" - #include "ray/object_manager/object_manager.h" +#include "ray/util/filesystem.h" namespace ray { @@ -89,7 +88,7 @@ class TestObjectManagerBase : public ::testing::Test { } std::string StartStore(const std::string &id) { - std::string store_id = "/tmp/store"; + std::string store_id = ray::JoinPaths(ray::GetUserTempDir(), "store"); store_id = store_id + id; std::string store_pid = store_id + ".pid"; std::string plasma_command = store_executable + " -m 1000000000 -s " + store_id + diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index a8f5f569e..3b3b7c578 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "ray/object_manager/object_manager.h" + #include #include #include #include "gtest/gtest.h" - #include "ray/common/status.h" - -#include "ray/object_manager/object_manager.h" +#include "ray/util/filesystem.h" namespace { std::string store_executable; @@ -83,7 +83,7 @@ class TestObjectManagerBase : public ::testing::Test { } std::string StartStore(const std::string &id) { - std::string store_id = "/tmp/store"; + std::string store_id = ray::JoinPaths(ray::GetUserTempDir(), "store"); store_id = store_id + id; std::string store_pid = store_id + ".pid"; std::string plasma_command = store_executable + " -m 1000000000 -s " + store_id + diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 4fbace333..e839c9e81 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -16,10 +16,9 @@ #include #include "gtest/gtest.h" - #include "ray/common/status.h" - #include "ray/raylet/raylet.h" +#include "ray/util/filesystem.h" namespace ray { @@ -39,7 +38,7 @@ class TestObjectManagerBase : public ::testing::Test { } std::string StartStore(const std::string &id) { - std::string store_id = "/tmp/store"; + std::string store_id = ray::JoinPaths(ray::GetUserTempDir(), "store"); store_id = store_id + id; std::string plasma_command = store_executable + " -m 1000000000 -s " + store_id + " 1> /dev/null 2> /dev/null &"; diff --git a/src/ray/util/filesystem.cc b/src/ray/util/filesystem.cc new file mode 100644 index 000000000..55f1fcd33 --- /dev/null +++ b/src/ray/util/filesystem.cc @@ -0,0 +1,47 @@ +#include "ray/util/filesystem.h" + +#include + +#include "ray/util/logging.h" + +#ifdef _WIN32 +#include +#endif + +namespace ray { + +std::string GetRayTempDir() { return JoinPaths(GetUserTempDir(), "ray"); } + +std::string GetUserTempDir() { + std::string result; +#if defined(__APPLE__) || defined(__linux__) + // Prefer the hard-coded path for now, for compatibility. + result = "/tmp"; +#elif defined(_WIN32) + result.resize(1 << 8); + DWORD n = GetTempPath(static_cast(result.size()), &*result.begin()); + if (n > result.size()) { + result.resize(n); + n = GetTempPath(static_cast(result.size()), &*result.begin()); + } + result.resize(0 < n && n <= result.size() ? static_cast(n) : 0); +#else // not Linux, Darwin, or Windows + const char *candidates[] = {"TMPDIR", "TMP", "TEMP", "TEMPDIR"}; + const char *found = NULL; + for (char const *candidate : candidates) { + found = getenv(candidate); + if (found) { + break; + } + } + result = found ? found : "/tmp"; +#endif + // Strip trailing separators + while (!result.empty() && IsDirSep(result.back())) { + result.pop_back(); + } + RAY_CHECK(!result.empty()); + return result; +} + +} // namespace ray diff --git a/src/ray/util/filesystem.h b/src/ray/util/filesystem.h new file mode 100644 index 000000000..c3a8e4abb --- /dev/null +++ b/src/ray/util/filesystem.h @@ -0,0 +1,67 @@ +#ifndef RAY_UTIL_FILESYSTEM_H +#define RAY_UTIL_FILESYSTEM_H + +#include + +namespace ray { + +/// \return The portable directory separator (slash on all OSes). +static char GetAltDirSep() { return '/'; } + +/// \return The platform directory separator (backslash on Windows, slash on other OSes). +static char GetDirSep() { + char result; +#ifdef _WIN32 + result = '\\'; +#else + result = '/'; +#endif + return result; +} + +/// \return The platform PATH separator (semicolon on Windows, colon on other OSes). +static char GetPathSep() { + char result; +#ifdef _WIN32 + result = ';'; +#else + result = ':'; +#endif + return result; +} + +/// \return A non-volatile temporary directory in which Ray can stores its files. +std::string GetRayTempDir(); + +/// \return The non-volatile temporary directory for the current user (often /tmp). +std::string GetUserTempDir(); + +/// \return Whether or not the given character is a directory separator on this platform. +static bool IsDirSep(char ch) { + bool result = ch == GetDirSep(); +#ifdef _WIN32 + result |= ch == GetAltDirSep(); +#endif + return result; +} + +/// \return Whether or not the given character is a PATH separator on this platform. +static bool IsPathSep(char ch) { return ch == GetPathSep(); } + +/// \return The result of joining multiple path components. +template +std::string JoinPaths(std::string base, Paths... components) { + std::string to_append[] = {components...}; + for (size_t i = 0; i < sizeof(to_append) / sizeof(*to_append); ++i) { + const std::string &s = to_append[i]; + if (!base.empty() && !IsDirSep(base.back()) && !s.empty() && !IsDirSep(s[0])) { + base += GetDirSep(); + } + base += s; + } + return base; +} + +} // namespace ray + +#endif // RAY_UTIL_UTIL_H diff --git a/src/ray/util/logging_test.cc b/src/ray/util/logging_test.cc index 6fe20f240..340740fda 100644 --- a/src/ray/util/logging_test.cc +++ b/src/ray/util/logging_test.cc @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "ray/util/logging.h" + #include #include #include #include "gtest/gtest.h" -#include "ray/util/logging.h" +#include "ray/util/filesystem.h" namespace ray { @@ -55,14 +57,15 @@ TEST(PrintLogTest, LogTestWithoutInit) { TEST(PrintLogTest, LogTestWithInit) { // Test empty app name. - RayLog::StartRayLog("", RayLogLevel::DEBUG, "/tmp/"); + RayLog::StartRayLog("", RayLogLevel::DEBUG, ray::GetUserTempDir() + ray::GetDirSep()); PrintLog(); RayLog::ShutDownRayLog(); } // This test will output large amount of logs to stderr, should be disabled in travis. TEST(LogPerfTest, PerfTest) { - RayLog::StartRayLog("/fake/path/to/appdire/LogPerfTest", RayLogLevel::ERROR, "/tmp/"); + RayLog::StartRayLog("/fake/path/to/appdire/LogPerfTest", RayLogLevel::ERROR, + ray::GetUserTempDir() + ray::GetDirSep()); int rounds = 100000; int64_t start_time = current_time_ms(); diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index 6932b5bf7..e0839ccd7 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -1,3 +1,5 @@ +#include "ray/util/filesystem.h" + namespace ray { namespace streaming { @@ -70,7 +72,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { } std::string StartStore() { - std::string store_socket_name = "/tmp/store" + RandomObjectID().Hex(); + std::string store_socket_name = ray::JoinPaths( + ray::GetUserTempDir(), + "store" + RandomObjectID().Hex()); std::string store_pid = store_socket_name + ".pid"; std::string plasma_command = store_executable_ + " -m 10000000 -s " + store_socket_name + @@ -91,7 +95,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { } std::string StartGcsServer(std::string redis_address) { - std::string gcs_server_socket_name = "/tmp/gcs_server" + ObjectID::FromRandom().Hex(); + std::string gcs_server_socket_name = ray::JoinPaths( + ray::GetUserTempDir(), + "gcs_server" + ObjectID::FromRandom().Hex()); std::string ray_start_cmd = gcs_server_executable_; ray_start_cmd.append(" --redis_address=" + redis_address) .append(" --redis_port=6379") @@ -116,7 +122,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { std::string StartRaylet(std::string store_socket_name, std::string node_ip_address, int port, std::string redis_address, std::string resource) { - std::string raylet_socket_name = "/tmp/raylet" + RandomObjectID().Hex(); + std::string raylet_socket_name = ray::JoinPaths( + ray::GetUserTempDir(), + "raylet" + RandomObjectID().Hex()); std::string ray_start_cmd = raylet_executable_; ray_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name) .append(" --store_socket_name=" + store_socket_name)