Surface object store spilling statistics in ray memory (#13124)

This commit is contained in:
Eric Liang
2021-01-04 17:35:39 -08:00
committed by GitHub
parent b765914a1b
commit dfb326d4b5
12 changed files with 232 additions and 63 deletions
+27 -2
View File
@@ -20,7 +20,8 @@ def memory_summary():
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
# We can ask any Raylet for the global memory info.
# We can ask any Raylet for the global memory info, that Raylet internally
# asks all nodes in the cluster for memory stats.
raylet = ray.nodes()[0]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
ray.nodes()[0]["NodeManagerPort"])
@@ -34,7 +35,31 @@ def memory_summary():
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.FormatGlobalMemoryInfo(
node_manager_pb2.FormatGlobalMemoryInfoRequest(), timeout=30.0)
return reply.memory_summary
store_summary = "--- Aggregate object store stats across all nodes ---\n"
store_summary += (
"Plasma memory usage {} MiB, {} objects, {}% full\n".format(
int(reply.store_stats.object_store_bytes_used / (1024 * 1024)),
reply.store_stats.num_local_objects,
round(
100 * reply.store_stats.object_store_bytes_used /
reply.store_stats.object_store_bytes_avail, 2)))
if reply.store_stats.spill_time_total_s > 0:
store_summary += (
"Spilled {} MiB, {} objects, avg write throughput {} MiB/s\n".
format(
int(reply.store_stats.spilled_bytes_total / (1024 * 1024)),
reply.store_stats.spilled_objects_total,
int(reply.store_stats.spilled_bytes_total / (1024 * 1024) /
reply.store_stats.spill_time_total_s)))
if reply.store_stats.restore_time_total_s > 0:
store_summary += (
"Restored {} MiB, {} objects, avg read throughput {} MiB/s\n".
format(
int(reply.store_stats.restored_bytes_total / (1024 * 1024)),
reply.store_stats.restored_objects_total,
int(reply.store_stats.restored_bytes_total / (1024 * 1024) /
reply.store_stats.restore_time_total_s)))
return reply.memory_summary + "\n" + store_summary
def free(object_refs, local_only=False):
+1 -1
View File
@@ -27,7 +27,7 @@ DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)"
def data_lines(memory_str):
for line in memory_str.split("\n"):
if (not line or "---" in line or "===" in line or "Object ID" in line
or "pid=" in line):
or "pid=" in line or "Plasma memory" in line):
continue
yield line
+45
View File
@@ -11,6 +11,7 @@ import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray.test_utils import wait_for_condition
from ray.internal.internal_api import memory_summary
bucket_name = "object-spilling-test"
spill_local_path = "/tmp/spill"
@@ -198,6 +199,50 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
assert np.array_equal(sample, solution)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_stats(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_system_config={
"automatic_object_spilling_enabled": True,
"max_io_workers": 100,
"min_spilling_size": 1,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
},
separators=(",", ":"))
},
)
@ray.remote
def f():
return np.zeros(50 * 1024 * 1024, dtype=np.uint8)
ids = []
for _ in range(4):
x = f.remote()
ids.append(x)
while ids:
print(ray.get(ids.pop()))
x_id = f.remote() # noqa
ray.get(x_id)
s = memory_summary()
assert "Plasma memory usage 50 MiB, 1 objects, 50.0% full" in s, s
assert "Spilled 200 MiB, 4 objects" in s, s
assert "Restored 150 MiB, 3 objects" in s, s
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):