diff --git a/BUILD.bazel b/BUILD.bazel index d93084b37..b23e8816f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1199,7 +1199,7 @@ cc_library( ":gcs", ":object_manager_fbs", ":object_manager_rpc", - ":plasma_client", + ":plasma_store_server_lib", ":ray_common", ":ray_util", "@boost//:asio", diff --git a/python/ray/node.py b/python/ray/node.py index ef82a931a..a4f373cd8 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -107,7 +107,11 @@ class Node: self._localhost = socket.gethostbyname("localhost") self._ray_params = ray_params self._redis_address = ray_params.redis_address - self._config = ray_params._internal_config + self._config = ray_params._internal_config or {} + + # Enable Plasma Store as a thread by default. + if "plasma_store_as_thread" not in self._config: + self._config["plasma_store_as_thread"] = True if head: redis_client = None @@ -571,6 +575,7 @@ class Node: stderr_file=stderr_file, plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, + keep_idle=bool(self._config.get("plasma_store_as_thread")), fate_share=self.kernel_fate_share) assert ( ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes) @@ -627,6 +632,8 @@ class Node: include_java=self._ray_params.include_java, java_worker_options=self._ray_params.java_worker_options, load_code_from_local=self._ray_params.load_code_from_local, + plasma_directory=self._ray_params.plasma_directory, + huge_pages=self._ray_params.huge_pages, fate_share=self.kernel_fate_share, socket_to_use=self.socket) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes diff --git a/python/ray/services.py b/python/ray/services.py index f82d1ab16..c285cfd6e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1190,7 +1190,6 @@ def start_gcs_server(redis_address, """ gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" - config = config or {} config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) command = [ GCS_SERVER_EXECUTABLE, @@ -1230,6 +1229,8 @@ def start_raylet(redis_address, include_java=False, java_worker_options=None, load_code_from_local=False, + plasma_directory=None, + huge_pages=False, fate_share=None, socket_to_use=None): """Start a raylet, which is a combined local scheduler and object manager. @@ -1273,7 +1274,6 @@ def start_raylet(redis_address, # The caller must provide a node manager port so that we can correctly # populate the command to start a worker. assert node_manager_port is not None and node_manager_port != 0 - config = config or {} config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) if use_valgrind and use_profiler: @@ -1362,6 +1362,16 @@ def start_raylet(redis_address, "--temp_dir={}".format(temp_dir), "--session_dir={}".format(session_dir), ] + if config.get("plasma_store_as_thread"): + # command related to the plasma store + plasma_directory, object_store_memory = determine_plasma_store_config( + resource_spec.object_store_memory, plasma_directory, huge_pages) + command += [ + "--object_store_memory={}".format(object_store_memory), + "--plasma_directory={}".format(plasma_directory), + ] + if huge_pages: + command.append("--huge_pages") if socket_to_use: socket_to_use.close() process_info = start_ray_process( @@ -1554,6 +1564,7 @@ def start_plasma_store(resource_spec, stdout_file=None, stderr_file=None, plasma_directory=None, + keep_idle=False, huge_pages=False, fate_share=None, use_valgrind=False): @@ -1571,6 +1582,7 @@ def start_plasma_store(resource_spec, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. + keep_idle: If True, run the plasma store as an idle placeholder. Returns: ProcessInfo for the process that was started. @@ -1594,6 +1606,8 @@ def start_plasma_store(resource_spec, command += ["-d", plasma_directory] if huge_pages: command += ["-h"] + if keep_idle: + command.append("-z") process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_PLASMA_STORE, @@ -1721,7 +1735,6 @@ def start_raylet_monitor(redis_address, """ gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" - config = config or {} config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) command = [ RAYLET_MONITOR_EXECUTABLE, diff --git a/python/ray/tests/test_component_failures_2.py b/python/ray/tests/test_component_failures_2.py index 6240ab3b0..d83e335ef 100644 --- a/python/ray/tests/test_component_failures_2.py +++ b/python/ray/tests/test_component_failures_2.py @@ -153,29 +153,6 @@ def test_raylet_failed(ray_start_cluster): True) -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") -@pytest.mark.parametrize( - "ray_start_cluster", [{ - "num_cpus": 8, - "num_nodes": 2, - "_internal_config": json.dumps({ - "num_heartbeats_timeout": 100 - }), - }], - indirect=True) -def test_plasma_store_failed(ray_start_cluster): - cluster = ray_start_cluster - # Kill all plasma stores on worker nodes. - _test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE) - - # No processes should be left alive on the worker nodes. - check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE, - False) - check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False) - - if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index a7e4f9b95..27131537f 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -184,18 +184,6 @@ def test_wait_for_nodes(ray_start_cluster_head): assert ray.cluster_resources()["CPU"] == 1 -def test_worker_plasma_store_failure(ray_start_cluster_head): - cluster = ray_start_cluster_head - worker = cluster.add_node() - cluster.wait_for_nodes() - worker.kill_reporter() - worker.kill_plasma_store() - if ray_constants.PROCESS_TYPE_REAPER in worker.all_processes: - worker.kill_reaper() - worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait() - assert not worker.any_processes_alive(), worker.live_processes() - - if __name__ == "__main__": import pytest import sys diff --git a/python/ray/tests/test_multinode_failures.py b/python/ray/tests/test_multinode_failures.py index 5af364f5f..56e3e4d60 100644 --- a/python/ray/tests/test_multinode_failures.py +++ b/python/ray/tests/test_multinode_failures.py @@ -161,31 +161,6 @@ def test_raylet_failed(ray_start_cluster): True) -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") -@pytest.mark.parametrize( - "ray_start_cluster", - [{ - "num_cpus": 8, - "num_nodes": 2, - "_internal_config": json.dumps({ - # Raylet codepath is not stable with a shorter timeout. - "num_heartbeats_timeout": 10 - }), - }], - indirect=True) -def test_plasma_store_failed(ray_start_cluster): - cluster = ray_start_cluster - # Kill all plasma stores on worker nodes. - _test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE) - - # No processes should be left alive on the worker nodes. - check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE, - False) - check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False) - - if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index e063217a9..4fe255a9a 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -297,6 +297,9 @@ RAY_CONFIG(int64_t, ping_gcs_rpc_server_interval_milliseconds, 1000) /// Maximum number of times to retry ping gcs rpc server when gcs server restarts. RAY_CONFIG(int32_t, ping_gcs_rpc_server_max_retries, 600) +// Whether start the Plasma Store as a Raylet thread. +RAY_CONFIG(bool, plasma_store_as_thread, false) + /// Whether to enable gcs service. /// RAY_GCS_SERVICE_ENABLED is an env variable which only set in ci job. /// If the value of RAY_GCS_SERVICE_ENABLED is false, we will disable gcs service, diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 4e7aff038..60d0dd3bd 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -14,6 +14,8 @@ #include "ray/object_manager/object_manager.h" +#include + #include "ray/common/common_protocol.h" #include "ray/stats/stats.h" #include "ray/util/util.h" @@ -24,12 +26,34 @@ namespace object_manager_protocol = ray::object_manager::protocol; namespace ray { +ObjectStoreRunner::ObjectStoreRunner(const ObjectManagerConfig &config) { + if (config.object_store_memory > 0) { + plasma_store_.reset(new plasma::PlasmaStoreRunner( + config.store_socket_name, config.object_store_memory, config.huge_pages, + config.plasma_directory, "")); + // Initialize object store. + store_thread_ = std::thread(&plasma::PlasmaStoreRunner::Start, plasma_store_.get()); + // Sleep for sometime until the store is working. This can suppress some + // connection warnings. + std::this_thread::sleep_for(std::chrono::microseconds(500)); + } +} + +ObjectStoreRunner::~ObjectStoreRunner() { + if (plasma_store_ != nullptr) { + plasma_store_->Stop(); + store_thread_.join(); + plasma_store_.reset(); + } +} + ObjectManager::ObjectManager(asio::io_service &main_service, const ClientID &self_node_id, const ObjectManagerConfig &config, std::shared_ptr object_directory) : self_node_id_(self_node_id), config_(config), object_directory_(std::move(object_directory)), + object_store_internal_(config), store_notification_(main_service, config_.store_socket_name), buffer_pool_(config_.store_socket_name, config_.object_chunk_size), rpc_work_(rpc_service_), diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index a06100198..5adb3b856 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -37,6 +37,7 @@ #include "ray/object_manager/object_directory.h" #include "ray/object_manager/object_store_notification_manager.h" #include "ray/object_manager/plasma/client.h" +#include "ray/object_manager/plasma/store.h" #include "ray/rpc/object_manager/object_manager_client.h" #include "ray/rpc/object_manager/object_manager_server.h" @@ -62,6 +63,12 @@ struct ObjectManagerConfig { /// Number of threads of rpc service /// Send and receive request in these threads int rpc_service_threads_number; + /// Initial memory allocation for store. + int64_t object_store_memory = -1; + /// The directory for shared memory files. + std::string plasma_directory; + /// Enable huge pages. + bool huge_pages; }; struct LocalObjectInfo { @@ -72,6 +79,16 @@ struct LocalObjectInfo { std::unordered_map recent_pushes; }; +class ObjectStoreRunner { + public: + ObjectStoreRunner(const ObjectManagerConfig &config); + ~ObjectStoreRunner(); + + private: + std::unique_ptr plasma_store_; + std::thread store_thread_; +}; + class ObjectManagerInterface { public: virtual ray::Status Pull(const ObjectID &object_id) = 0; @@ -370,6 +387,8 @@ class ObjectManager : public ObjectManagerInterface, ClientID self_node_id_; const ObjectManagerConfig config_; std::shared_ptr object_directory_; + // Object store runner. + ObjectStoreRunner object_store_internal_; ObjectStoreNotificationManager store_notification_; ObjectBufferPool buffer_pool_; diff --git a/src/ray/plasma/store_exec.cc b/src/ray/plasma/store_exec.cc index 3e0c8d4e7..1d62e8b65 100644 --- a/src/ray/plasma/store_exec.cc +++ b/src/ray/plasma/store_exec.cc @@ -1,6 +1,9 @@ #include #include +#include +#include + #include "ray/object_manager/plasma/store.h" // TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++ // and get rid of the next three lines: @@ -14,35 +17,50 @@ int main(int argc, char *argv[]) { std::string plasma_directory; std::string external_store_endpoint; bool hugepages_enabled = false; + bool keep_idle = false; int64_t system_memory = -1; int c; - while ((c = getopt(argc, argv, "s:m:d:e:h")) != -1) { + while ((c = getopt(argc, argv, "s:m:d:e:h:z")) != -1) { switch (c) { - case 'd': - plasma_directory = std::string(optarg); - break; - case 'e': - external_store_endpoint = std::string(optarg); - break; - case 'h': - hugepages_enabled = true; - break; - case 's': - socket_name = std::string(optarg); - break; - case 'm': { - char extra; - int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); - ARROW_CHECK(scanned == 1); - break; - } - default: - exit(-1); + case 'd': + plasma_directory = std::string(optarg); + break; + case 'e': + external_store_endpoint = std::string(optarg); + break; + case 'h': + hugepages_enabled = true; + break; + case 's': + socket_name = std::string(optarg); + break; + case 'm': { + char extra; + int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); + ARROW_CHECK(scanned == 1); + break; + } + case 'z': { + keep_idle = true; + break; + } + default: + exit(-1); + } + } + + if (!keep_idle) { + plasma::PlasmaStoreRunner runner(socket_name, system_memory, hugepages_enabled, + plasma_directory, external_store_endpoint); + runner.Start(); + } else { + printf( + "The Plasma Store is started with the '-z' flag, " + "and it will run idle as a placeholder."); + while (true) { + std::this_thread::sleep_for(std::chrono::hours(1000)); } } - plasma::PlasmaStoreRunner runner(socket_name, system_memory, hugepages_enabled, - plasma_directory, external_store_endpoint); - runner.Start(); return 0; } diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 24ddf0c78..83c445f00 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -48,6 +48,10 @@ DEFINE_bool(disable_stats, false, "Whether disable the stats."); DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to."); DEFINE_bool(enable_stdout_exporter, false, "Whether enable the stdout exporter for stats."); +// store options +DEFINE_int64(object_store_memory, -1, "The initial memory of the object store."); +DEFINE_string(plasma_directory, "", "The shared memory directory of the object store."); +DEFINE_bool(huge_pages, false, "Whether enable huge pages"); #ifndef RAYLET_TEST @@ -81,6 +85,9 @@ int main(int argc, char *argv[]) { const bool disable_stats = FLAGS_disable_stats; const std::string stat_address = FLAGS_stat_address; const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter; + const int64_t object_store_memory = FLAGS_object_store_memory; + const std::string plasma_directory = FLAGS_plasma_directory; + const bool huge_pages = FLAGS_huge_pages; gflags::ShutDownCommandLineFlags(); // Initialize stats. @@ -166,6 +173,9 @@ int main(int argc, char *argv[]) { RayConfig::instance().object_manager_pull_timeout_ms(); object_manager_config.push_timeout_ms = RayConfig::instance().object_manager_push_timeout_ms(); + object_manager_config.object_store_memory = object_store_memory; + object_manager_config.plasma_directory = plasma_directory; + object_manager_config.huge_pages = huge_pages; int num_cpus = static_cast(static_resource_conf["CPU"]); object_manager_config.rpc_service_threads_number =