diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst
index bfc2161f9..d400bba55 100644
--- a/doc/source/advanced.rst
+++ b/doc/source/advanced.rst
@@ -362,39 +362,3 @@ To get information about the current available resource capacity of your cluster
.. autofunction:: ray.available_resources
:noindex:
-
-Object Spilling
----------------
-
-Ray 1.2.0+ has *beta* support for spilling objects to external storage once the capacity
-of the object store is used up. Please file a `GitHub issue `__
-if you encounter any problems with this new feature. Eventually, object spilling will be
-enabled by default, but for now you need to enable it manually:
-
-To enable object spilling to the local filesystem (single node clusters only):
-
-.. code-block:: python
-
- ray.init(
- _system_config={
- "automatic_object_spilling_enabled": True,
- "object_spilling_config": json.dumps(
- {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
- )
- },
- )
-
-To enable object spilling to remote storage (any URI supported by `smart_open `__):
-
-.. code-block:: python
-
- ray.init(
- _system_config={
- "automatic_object_spilling_enabled": True,
- "max_io_workers": 4, # More IO workers for remote storage.
- "min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
- "object_spilling_config": json.dumps(
- {"type": "smart_open", "params": {"uri": "s3:///bucket/path"}},
- )
- },
- )
diff --git a/doc/source/memory-management.rst b/doc/source/memory-management.rst
index a8ab32098..5b32dfa59 100644
--- a/doc/source/memory-management.rst
+++ b/doc/source/memory-management.rst
@@ -215,6 +215,54 @@ To enable LRU eviction when the object store is full, initialize ray with the ``
ray start --lru-evict
+Object Spilling
+---------------
+
+Ray 1.2.0+ has *beta* support for spilling objects to external storage once the capacity
+of the object store is used up. Please file a `GitHub issue `__
+if you encounter any problems with this new feature. Eventually, object spilling will be
+enabled by default, but for now you need to enable it manually:
+
+To enable object spilling to the local filesystem (single node clusters only):
+
+.. code-block:: python
+
+ ray.init(
+ _system_config={
+ "automatic_object_spilling_enabled": True,
+ "object_spilling_config": json.dumps(
+ {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
+ )
+ },
+ )
+
+To enable object spilling to remote storage (any URI supported by `smart_open `__):
+
+.. code-block:: python
+
+ ray.init(
+ _system_config={
+ "automatic_object_spilling_enabled": True,
+ "max_io_workers": 4, # More IO workers for remote storage.
+ "min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
+ "object_spilling_config": json.dumps(
+ {"type": "smart_open", "params": {"uri": "s3:///bucket/path"}},
+ )
+ },
+ )
+
+When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., ``/tmp/ray/session_latest/logs/raylet.out``)::
+
+ local_object_manager.cc:166: Spilled 50 MiB, 1 objects, write throughput 230 MiB/s
+ local_object_manager.cc:334: Restored 50 MiB, 1 objects, read throughput 505 MiB/s
+
+You can also view cluster-wide spill stats by using the ``ray memory`` command::
+
+ --- Aggregate object store stats across all nodes ---
+ Plasma memory usage 50 MiB, 1 objects, 50.0% full
+ Spilled 200 MiB, 4 objects, avg write throughput 570 MiB/s
+ Restored 150 MiB, 3 objects, avg read throughput 1361 MiB/s
+
Memory Aware Scheduling
~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/doc/source/walkthrough.rst b/doc/source/walkthrough.rst
index 8680c1af3..facf1ff40 100644
--- a/doc/source/walkthrough.rst
+++ b/doc/source/walkthrough.rst
@@ -407,9 +407,7 @@ Object Eviction
When the object store gets full, objects will be evicted to make room for new objects.
This happens in approximate LRU (least recently used) order. To avoid objects from
being evicted, you can call ``get`` and store their values instead. Numpy array
-objects cannot be evicted while they are mapped in any Python process. You can also
-configure `memory limits `__ to control object store usage by
-actors.
+objects cannot be evicted while they are mapped in any Python process.
.. note::
@@ -417,7 +415,7 @@ actors.
to the object ref returned by the put exists. This only applies to the specific
ref returned by put, not refs in general or copies of that refs.
-See also: `object spilling `__.
+See also: `object spilling `__.
Remote Classes (Actors)
-----------------------
diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py
index 601b3986a..999e49220 100644
--- a/python/ray/internal/internal_api.py
+++ b/python/ray/internal/internal_api.py
@@ -20,7 +20,8 @@ def memory_summary():
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
- # We can ask any Raylet for the global memory info.
+ # We can ask any Raylet for the global memory info, that Raylet internally
+ # asks all nodes in the cluster for memory stats.
raylet = ray.nodes()[0]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
ray.nodes()[0]["NodeManagerPort"])
@@ -34,7 +35,31 @@ def memory_summary():
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.FormatGlobalMemoryInfo(
node_manager_pb2.FormatGlobalMemoryInfoRequest(), timeout=30.0)
- return reply.memory_summary
+ store_summary = "--- Aggregate object store stats across all nodes ---\n"
+ store_summary += (
+ "Plasma memory usage {} MiB, {} objects, {}% full\n".format(
+ int(reply.store_stats.object_store_bytes_used / (1024 * 1024)),
+ reply.store_stats.num_local_objects,
+ round(
+ 100 * reply.store_stats.object_store_bytes_used /
+ reply.store_stats.object_store_bytes_avail, 2)))
+ if reply.store_stats.spill_time_total_s > 0:
+ store_summary += (
+ "Spilled {} MiB, {} objects, avg write throughput {} MiB/s\n".
+ format(
+ int(reply.store_stats.spilled_bytes_total / (1024 * 1024)),
+ reply.store_stats.spilled_objects_total,
+ int(reply.store_stats.spilled_bytes_total / (1024 * 1024) /
+ reply.store_stats.spill_time_total_s)))
+ if reply.store_stats.restore_time_total_s > 0:
+ store_summary += (
+ "Restored {} MiB, {} objects, avg read throughput {} MiB/s\n".
+ format(
+ int(reply.store_stats.restored_bytes_total / (1024 * 1024)),
+ reply.store_stats.restored_objects_total,
+ int(reply.store_stats.restored_bytes_total / (1024 * 1024) /
+ reply.store_stats.restore_time_total_s)))
+ return reply.memory_summary + "\n" + store_summary
def free(object_refs, local_only=False):
diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py
index b10d09916..cb734b3b7 100644
--- a/python/ray/tests/test_memstat.py
+++ b/python/ray/tests/test_memstat.py
@@ -27,7 +27,7 @@ DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)"
def data_lines(memory_str):
for line in memory_str.split("\n"):
if (not line or "---" in line or "===" in line or "Object ID" in line
- or "pid=" in line):
+ or "pid=" in line or "Plasma memory" in line):
continue
yield line
diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py
index 4cefca998..5fe9b697b 100644
--- a/python/ray/tests/test_object_spilling.py
+++ b/python/ray/tests/test_object_spilling.py
@@ -11,6 +11,7 @@ import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray.test_utils import wait_for_condition
+from ray.internal.internal_api import memory_summary
bucket_name = "object-spilling-test"
spill_local_path = "/tmp/spill"
@@ -198,6 +199,50 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
assert np.array_equal(sample, solution)
+@pytest.mark.skipif(
+ platform.system() == "Windows", reason="Failing on Windows.")
+def test_spill_stats(tmp_path, shutdown_only):
+ # Limit our object store to 75 MiB of memory.
+ temp_folder = tmp_path / "spill"
+ temp_folder.mkdir()
+ ray.init(
+ num_cpus=1,
+ object_store_memory=100 * 1024 * 1024,
+ _system_config={
+ "automatic_object_spilling_enabled": True,
+ "max_io_workers": 100,
+ "min_spilling_size": 1,
+ "object_spilling_config": json.dumps(
+ {
+ "type": "filesystem",
+ "params": {
+ "directory_path": str(temp_folder)
+ }
+ },
+ separators=(",", ":"))
+ },
+ )
+
+ @ray.remote
+ def f():
+ return np.zeros(50 * 1024 * 1024, dtype=np.uint8)
+
+ ids = []
+ for _ in range(4):
+ x = f.remote()
+ ids.append(x)
+
+ while ids:
+ print(ray.get(ids.pop()))
+
+ x_id = f.remote() # noqa
+ ray.get(x_id)
+ s = memory_summary()
+ assert "Plasma memory usage 50 MiB, 1 objects, 50.0% full" in s, s
+ assert "Spilled 200 MiB, 4 objects" in s, s
+ assert "Restored 150 MiB, 3 objects" in s, s
+
+
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):
diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc
index f3620d4c9..b69085682 100644
--- a/src/ray/object_manager/object_manager.cc
+++ b/src/ray/object_manager/object_manager.cc
@@ -807,6 +807,13 @@ void ObjectManager::RecordMetrics() const {
stats::ObjectManagerPullRequests().Record(pull_manager_->NumActiveRequests());
}
+void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
+ auto stats = reply->mutable_store_stats();
+ stats->set_object_store_bytes_used(used_memory_);
+ stats->set_object_store_bytes_avail(config_.object_store_memory);
+ stats->set_num_local_objects(local_objects_.size());
+}
+
void ObjectManager::Tick(const boost::system::error_code &e) {
RAY_CHECK(!e) << "The raylet's object manager has failed unexpectedly with error: " << e
<< ". Please file a bug report on here: "
diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h
index 3e793e21c..a7970dc82 100644
--- a/src/ray/object_manager/object_manager.h
+++ b/src/ray/object_manager/object_manager.h
@@ -44,6 +44,7 @@
#include "ray/rpc/object_manager/object_manager_client.h"
#include "ray/rpc/object_manager/object_manager_server.h"
#include "src/ray/protobuf/common.pb.h"
+#include "src/ray/protobuf/node_manager.pb.h"
namespace ray {
@@ -291,6 +292,11 @@ class ObjectManager : public ObjectManagerInterface,
/// Record metrics.
void RecordMetrics() const;
+ /// Populate object store stats.
+ ///
+ /// \param Output parameter.
+ void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const;
+
void Tick(const boost::system::error_code &e);
private:
diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto
index dcf6fe783..bae2a9715 100644
--- a/src/ray/protobuf/node_manager.proto
+++ b/src/ray/protobuf/node_manager.proto
@@ -117,12 +117,36 @@ message GetNodeStatsRequest {
bool include_memory_info = 1;
}
+// Object store stats, which may be reported per-node or aggregated across
+// multiple nodes in the cluster (values are additive).
+message ObjectStoreStats {
+ // The amount of wall time total where spilling was happening.
+ double spill_time_total_s = 1;
+ // The number of bytes spilled total.
+ int64 spilled_bytes_total = 2;
+ // The number of objects spilled total.
+ int64 spilled_objects_total = 3;
+ // The amount of wall time total where object restore was happening.
+ double restore_time_total_s = 4;
+ // The number of bytes restored total.
+ int64 restored_bytes_total = 5;
+ // The number of objects restored total.
+ int64 restored_objects_total = 6;
+ // The current usage of the object store.
+ int64 object_store_bytes_used = 7;
+ // The max capacity of the object store.
+ int64 object_store_bytes_avail = 8;
+ // The number of local objects total.
+ int64 num_local_objects = 9;
+}
+
message GetNodeStatsReply {
repeated CoreWorkerStats core_workers_stats = 1;
repeated ViewData view_data = 2;
uint32 num_workers = 3;
repeated TaskSpec infeasible_tasks = 4;
repeated TaskSpec ready_tasks = 5;
+ ObjectStoreStats store_stats = 6;
}
message GlobalGCRequest {
@@ -131,6 +155,8 @@ message GlobalGCRequest {
message GlobalGCReply {
}
+// Accumulates memory info across all nodes. To access per-node memory info,
+// use GetNodeStats() calls instead.
message FormatGlobalMemoryInfoRequest {
}
@@ -138,6 +164,9 @@ message FormatGlobalMemoryInfoReply {
// A tabular summary of the memory stats. To get this data in structured form,
// you can instead use GetNodeStats() directly.
string memory_summary = 1;
+ // Aggregate store stats across all nodes. To get the individual node data,
+ // you can instead use GetNodeStats() directly.
+ ObjectStoreStats store_stats = 2;
}
message RequestObjectSpillageRequest {
diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc
index 87446f84f..eae10ae40 100644
--- a/src/ray/raylet/local_object_manager.cc
+++ b/src/ray/raylet/local_object_manager.cc
@@ -162,14 +162,13 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
spill_time_total_s_ += (now - std::max(start_time, last_spill_finish_ns_)) / 1e9;
if (now - last_spill_log_ns_ > 1e9) {
last_spill_log_ns_ = now;
- // TODO(ekl) logging at error level until we add a better UX indicator.
- RAY_LOG(ERROR) << "Spilled "
- << static_cast(spilled_bytes_total_ / (1024 * 1024))
- << " MiB, " << spilled_objects_total_
- << " objects, write throughput "
- << static_cast(spilled_bytes_total_ / (1024 * 1024) /
- spill_time_total_s_)
- << " MiB/s";
+ RAY_LOG(INFO) << "Spilled "
+ << static_cast(spilled_bytes_total_ / (1024 * 1024))
+ << " MiB, " << spilled_objects_total_
+ << " objects, write throughput "
+ << static_cast(spilled_bytes_total_ / (1024 * 1024) /
+ spill_time_total_s_)
+ << " MiB/s";
}
last_spill_finish_ns_ = now;
}
@@ -330,14 +329,13 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
(now - std::max(start_time, last_restore_finish_ns_)) / 1e9;
if (now - last_restore_log_ns_ > 1e9) {
last_restore_log_ns_ = now;
- // TODO(ekl) logging at error level until we add a better UX indicator.
- RAY_LOG(ERROR) << "Restored "
- << static_cast(restored_bytes_total_ / (1024 * 1024))
- << " MiB, " << restored_objects_total_
- << " objects, read throughput "
- << static_cast(restored_bytes_total_ / (1024 * 1024) /
- restore_time_total_s_)
- << " MiB/s";
+ RAY_LOG(INFO) << "Restored "
+ << static_cast(restored_bytes_total_ / (1024 * 1024))
+ << " MiB, " << restored_objects_total_
+ << " objects, read throughput "
+ << static_cast(restored_bytes_total_ / (1024 * 1024) /
+ restore_time_total_s_)
+ << " MiB/s";
}
last_restore_finish_ns_ = now;
}
@@ -414,6 +412,16 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector &urls_to_
});
}
+void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const {
+ auto stats = reply->mutable_store_stats();
+ stats->set_spill_time_total_s(spill_time_total_s_);
+ stats->set_spilled_bytes_total(spilled_bytes_total_);
+ stats->set_spilled_objects_total(spilled_objects_total_);
+ stats->set_restore_time_total_s(restore_time_total_s_);
+ stats->set_restored_bytes_total(restored_bytes_total_);
+ stats->set_restored_objects_total(restored_objects_total_);
+}
+
}; // namespace raylet
}; // namespace ray
diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h
index 3cf2a2ac5..144209585 100644
--- a/src/ray/raylet/local_object_manager.h
+++ b/src/ray/raylet/local_object_manager.h
@@ -24,6 +24,7 @@
#include "ray/object_manager/common.h"
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
+#include "src/ray/protobuf/node_manager.pb.h"
namespace ray {
@@ -117,6 +118,11 @@ class LocalObjectManager {
/// \return True if spilling is still in progress. False otherwise.
bool IsSpillingInProgress();
+ /// Populate object spilling stats.
+ ///
+ /// \param Output parameter.
+ void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const;
+
private:
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSize);
FRIEND_TEST(LocalObjectManagerTest,
diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc
index c5e08e187..b1657de49 100644
--- a/src/ray/raylet/node_manager.cc
+++ b/src/ray/raylet/node_manager.cc
@@ -2893,6 +2893,10 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_
ready_task->ParseFromString(task.GetTaskSpecification().Serialize());
}
}
+ // Report object spilling stats.
+ local_object_manager_.FillObjectSpillingStats(reply);
+ // Report object store stats.
+ object_manager_.FillObjectStoreStats(reply);
// Ensure we never report an empty set of metrics.
if (!recorded_metrics_) {
RecordMetrics();
@@ -2967,6 +2971,33 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_
}
}
+rpc::ObjectStoreStats AccumulateStoreStats(
+ std::vector node_stats) {
+ rpc::ObjectStoreStats store_stats;
+ for (const auto &reply : node_stats) {
+ auto cur_store = reply.store_stats();
+ store_stats.set_spill_time_total_s(store_stats.spill_time_total_s() +
+ cur_store.spill_time_total_s());
+ store_stats.set_spilled_bytes_total(store_stats.spilled_bytes_total() +
+ cur_store.spilled_bytes_total());
+ store_stats.set_spilled_objects_total(store_stats.spilled_objects_total() +
+ cur_store.spilled_objects_total());
+ store_stats.set_restore_time_total_s(store_stats.restore_time_total_s() +
+ cur_store.restore_time_total_s());
+ store_stats.set_restored_bytes_total(store_stats.restored_bytes_total() +
+ cur_store.restored_bytes_total());
+ store_stats.set_restored_objects_total(store_stats.restored_objects_total() +
+ cur_store.restored_objects_total());
+ store_stats.set_object_store_bytes_used(store_stats.object_store_bytes_used() +
+ cur_store.object_store_bytes_used());
+ store_stats.set_object_store_bytes_avail(store_stats.object_store_bytes_avail() +
+ cur_store.object_store_bytes_avail());
+ store_stats.set_num_local_objects(store_stats.num_local_objects() +
+ cur_store.num_local_objects());
+ }
+ return store_stats;
+}
+
std::string FormatMemoryInfo(std::vector node_stats) {
// First pass to compute object sizes.
absl::flat_hash_map object_sizes;
@@ -2984,13 +3015,14 @@ std::string FormatMemoryInfo(std::vector node_stats) {
std::ostringstream builder;
builder
<< "----------------------------------------------------------------------------"
- "-------------------------\n";
+ "-----------------------------------------\n";
builder
- << " Object ID Reference Type Object Size "
+ << " Object ID Reference Type "
+ " Object Size "
" Reference Creation Site\n";
builder
<< "============================================================================"
- "=========================\n";
+ "=========================================\n";
// Second pass builds the summary string for each node.
for (const auto &reply : node_stats) {
@@ -3040,7 +3072,7 @@ std::string FormatMemoryInfo(std::vector node_stats) {
}
builder
<< "----------------------------------------------------------------------------"
- "-------------------------\n";
+ "-----------------------------------------\n";
return builder.str();
}
@@ -3062,6 +3094,7 @@ void NodeManager::HandleFormatGlobalMemoryInfo(
replies->push_back(local_reply);
if (replies->size() >= num_nodes) {
reply->set_memory_summary(FormatMemoryInfo(*replies));
+ reply->mutable_store_stats()->CopyFrom(AccumulateStoreStats(*replies));
send_reply_callback(Status::OK(), nullptr, nullptr);
}
};