From e196fcdbaf1a226e8f5ef5f28308f13d9be8261d Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Thu, 26 Mar 2020 22:02:53 +0800 Subject: [PATCH] Add gcs_service_enabled function to avoid getting environment variable directly (#7742) --- .../src/main/java/org/ray/runtime/config/RayConfig.java | 5 +++++ .../src/main/java/org/ray/runtime/runner/RunManager.java | 3 +-- python/ray/node.py | 2 +- python/ray/ray_constants.py | 8 +++++++- python/ray/tests/test_component_failures_3.py | 4 ++-- python/ray/tests/test_multinode_failures_2.py | 4 ++-- python/ray/tests/test_tempfile.py | 2 +- src/ray/common/constants.h | 6 ------ src/ray/common/ray_config_def.h | 9 +++++++++ src/ray/core_worker/core_worker.cc | 3 +-- src/ray/core_worker/test/core_worker_test.cc | 3 +-- src/ray/raylet/main.cc | 3 +-- 12 files changed, 31 insertions(+), 21 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index 1ba6246c7..8671ed73d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -72,6 +72,8 @@ public class RayConfig { public final String jobResourcePath; public final String pythonWorkerCommand; + public final boolean gcsServiceEnabled; + private static volatile RayConfig instance = null; public static RayConfig getInstance() { @@ -223,6 +225,9 @@ public class RayConfig { numWorkersPerProcess = config.getInt("ray.raylet.config.num_workers_per_process_java"); + gcsServiceEnabled = System.getenv("RAY_GCS_SERVICE_ENABLED") == null || + System.getenv("RAY_GCS_SERVICE_ENABLED").toLowerCase().equals("true"); + // Validate config. validate(); LOGGER.debug("Created config: {}", this); diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 626887768..61c970adc 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -226,8 +226,7 @@ public class RunManager { } // start gcs server - if (System.getenv("RAY_GCS_SERVICE_ENABLED") == null || - System.getenv("RAY_GCS_SERVICE_ENABLED") == "true") { + if (rayConfig.gcsServiceEnabled) { String redisPasswordOption = ""; if (!Strings.isNullOrEmpty(rayConfig.headRedisPassword)) { redisPasswordOption = rayConfig.headRedisPassword; diff --git a/python/ray/node.py b/python/ray/node.py index 60f41d869..443a10b4a 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -626,7 +626,7 @@ class Node: # If this is the head node, start the relevant head node processes. self.start_redis() - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: self.start_gcs_server() else: self.start_raylet_monitor() diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index d751d1baf..953a568fd 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -13,6 +13,12 @@ def env_integer(key, default): return default +def env_bool(key, default): + if key in os.environ: + return True if os.environ[key].lower() == "true" else False + return default + + ID_SIZE = 20 # The default maximum number of bytes to allocate to the object store unless @@ -197,4 +203,4 @@ MACH_PAGE_SIZE_BYTES = 4096 # RAY_GCS_SERVICE_ENABLED only set in ci job. # TODO(ffbin): Once we entirely migrate to service-based GCS, we should # remove it. -RAY_GCS_SERVICE_ENABLED = "RAY_GCS_SERVICE_ENABLED" +GCS_SERVICE_ENABLED = env_bool("RAY_GCS_SERVICE_ENABLED", True) diff --git a/python/ray/tests/test_component_failures_3.py b/python/ray/tests/test_component_failures_3.py index c5aba39d5..5cc47e7f8 100644 --- a/python/ray/tests/test_component_failures_3.py +++ b/python/ray/tests/test_component_failures_3.py @@ -82,7 +82,7 @@ def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_plasma_store() ray.worker._global_node.kill_log_monitor() ray.worker._global_node.kill_monitor() - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: ray.worker._global_node.kill_gcs_server() else: ray.worker._global_node.kill_raylet_monitor() @@ -96,7 +96,7 @@ def test_driver_lives_sequential(ray_start_regular): def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index 3d6f98b40..adf0e1416 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -132,7 +132,7 @@ def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_plasma_store() ray.worker._global_node.kill_log_monitor() ray.worker._global_node.kill_monitor() - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: ray.worker._global_node.kill_gcs_server() else: ray.worker._global_node.kill_raylet_monitor() @@ -145,7 +145,7 @@ def test_driver_lives_sequential(ray_start_regular): reason="Hanging with new GCS API.") def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 94c9c14fd..0e174fd23 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -145,7 +145,7 @@ def test_raylet_tempfiles(shutdown_only): "raylet.err" } - if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True): + if ray_constants.GCS_SERVICE_ENABLED: log_files_expected.update({"gcs_server.out", "gcs_server.err"}) else: log_files_expected.update({"raylet_monitor.out", "raylet_monitor.err"}) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 28c3d2d03..f28cd8299 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -43,10 +43,4 @@ constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = constexpr char kWorkerRayletConfigPlaceholder[] = "RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"; -/// RAY_GCS_SERVICE_ENABLED is an env variable which only set in ci job. -/// If the value of RAY_GCS_SERVICE_ENABLED is false, we will disable gcs service, -/// otherwise gcs service is enabled. -/// TODO(ffbin): Once we entirely migrate to service-based GCS, we should remove it. -constexpr char kRayGcsServiceEnabled[] = "RAY_GCS_SERVICE_ENABLED"; - #endif // RAY_CONSTANTS_H_ diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index fd035e4ec..1e7982a60 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -273,3 +273,12 @@ RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000) /// Duration to wait between retries for failed tasks. RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000) + +/// Whether to enable gcs service. +/// RAY_GCS_SERVICE_ENABLED is an env variable which only set in ci job. +/// If the value of RAY_GCS_SERVICE_ENABLED is false, we will disable gcs service, +/// otherwise gcs service is enabled. +/// TODO(ffbin): Once we entirely migrate to service-based GCS, we should remove it. +RAY_CONFIG(bool, gcs_service_enabled, + getenv("RAY_GCS_SERVICE_ENABLED") == nullptr || + getenv("RAY_GCS_SERVICE_ENABLED") == std::string("true")) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 279edb9ec..d227c77d8 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -115,8 +115,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RayLog::InstallFailureSignalHandler(); } // Initialize gcs client. - if (getenv(kRayGcsServiceEnabled) == nullptr || - strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) { + if (RayConfig::instance().gcs_service_enabled()) { gcs_client_ = std::make_shared(gcs_options); } else { gcs_client_ = std::make_shared(gcs_options); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 7ebb88302..b5f79ef54 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -109,8 +109,7 @@ class CoreWorkerTest : public ::testing::Test { } // start gcs server - if (getenv(kRayGcsServiceEnabled) == nullptr || - strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) { + if (RayConfig::instance().gcs_service_enabled()) { gcs_server_pid_ = StartGcsServer("127.0.0.1"); } else { // core worker test relies on node resources. It's important that one raylet can diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4c1ae4131..5369d1042 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -177,8 +177,7 @@ int main(int argc, char *argv[]) { ray::gcs::GcsClientOptions client_options(redis_address, redis_port, redis_password); std::shared_ptr gcs_client; - if (getenv(kRayGcsServiceEnabled) == nullptr || - strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) { + if (RayConfig::instance().gcs_service_enabled()) { gcs_client = std::make_shared(client_options); } else { gcs_client = std::make_shared(client_options);