[Core] Run Plasma Store as a Raylet thread (with a feature flag) (#8897)

* integrate plasma store as a thread (C++)

* integrate plasma store as a thread (Python)

* fix config issues

* remove plasma component fail tests

* without forcefully kill the plasma store thread
This commit is contained in:
Siyuan (Ryans) Zhuang
2020-06-11 22:54:08 -07:00
committed by GitHub
parent dfa4768fc6
commit 4b31b383f3
11 changed files with 123 additions and 89 deletions
+8 -1
View File
@@ -107,7 +107,11 @@ class Node:
self._localhost = socket.gethostbyname("localhost")
self._ray_params = ray_params
self._redis_address = ray_params.redis_address
self._config = ray_params._internal_config
self._config = ray_params._internal_config or {}
# Enable Plasma Store as a thread by default.
if "plasma_store_as_thread" not in self._config:
self._config["plasma_store_as_thread"] = True
if head:
redis_client = None
@@ -571,6 +575,7 @@ class Node:
stderr_file=stderr_file,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
keep_idle=bool(self._config.get("plasma_store_as_thread")),
fate_share=self.kernel_fate_share)
assert (
ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes)
@@ -627,6 +632,8 @@ class Node:
include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
fate_share=self.kernel_fate_share,
socket_to_use=self.socket)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
+16 -3
View File
@@ -1190,7 +1190,6 @@ def start_gcs_server(redis_address,
"""
gcs_ip_address, gcs_port = redis_address.split(":")
redis_password = redis_password or ""
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
command = [
GCS_SERVER_EXECUTABLE,
@@ -1230,6 +1229,8 @@ def start_raylet(redis_address,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
plasma_directory=None,
huge_pages=False,
fate_share=None,
socket_to_use=None):
"""Start a raylet, which is a combined local scheduler and object manager.
@@ -1273,7 +1274,6 @@ def start_raylet(redis_address,
# The caller must provide a node manager port so that we can correctly
# populate the command to start a worker.
assert node_manager_port is not None and node_manager_port != 0
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
if use_valgrind and use_profiler:
@@ -1362,6 +1362,16 @@ def start_raylet(redis_address,
"--temp_dir={}".format(temp_dir),
"--session_dir={}".format(session_dir),
]
if config.get("plasma_store_as_thread"):
# command related to the plasma store
plasma_directory, object_store_memory = determine_plasma_store_config(
resource_spec.object_store_memory, plasma_directory, huge_pages)
command += [
"--object_store_memory={}".format(object_store_memory),
"--plasma_directory={}".format(plasma_directory),
]
if huge_pages:
command.append("--huge_pages")
if socket_to_use:
socket_to_use.close()
process_info = start_ray_process(
@@ -1554,6 +1564,7 @@ def start_plasma_store(resource_spec,
stdout_file=None,
stderr_file=None,
plasma_directory=None,
keep_idle=False,
huge_pages=False,
fate_share=None,
use_valgrind=False):
@@ -1571,6 +1582,7 @@ def start_plasma_store(resource_spec,
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
keep_idle: If True, run the plasma store as an idle placeholder.
Returns:
ProcessInfo for the process that was started.
@@ -1594,6 +1606,8 @@ def start_plasma_store(resource_spec,
command += ["-d", plasma_directory]
if huge_pages:
command += ["-h"]
if keep_idle:
command.append("-z")
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_PLASMA_STORE,
@@ -1721,7 +1735,6 @@ def start_raylet_monitor(redis_address,
"""
gcs_ip_address, gcs_port = redis_address.split(":")
redis_password = redis_password or ""
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
command = [
RAYLET_MONITOR_EXECUTABLE,
@@ -153,29 +153,6 @@ def test_raylet_failed(ray_start_cluster):
True)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 2,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
False)
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
-12
View File
@@ -184,18 +184,6 @@ def test_wait_for_nodes(ray_start_cluster_head):
assert ray.cluster_resources()["CPU"] == 1
def test_worker_plasma_store_failure(ray_start_cluster_head):
cluster = ray_start_cluster_head
worker = cluster.add_node()
cluster.wait_for_nodes()
worker.kill_reporter()
worker.kill_plasma_store()
if ray_constants.PROCESS_TYPE_REAPER in worker.all_processes:
worker.kill_reaper()
worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait()
assert not worker.any_processes_alive(), worker.live_processes()
if __name__ == "__main__":
import pytest
import sys
@@ -161,31 +161,6 @@ def test_raylet_failed(ray_start_cluster):
True)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.parametrize(
"ray_start_cluster",
[{
"num_cpus": 8,
"num_nodes": 2,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10
}),
}],
indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
False)
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))