[Object Spilling] Add consumed bytes to detect thrashing. (#13853)

This commit is contained in:
SangBin Cho
2021-02-03 14:16:26 -08:00
committed by GitHub
parent 77ee2c569f
commit cb9fa90203
20 changed files with 172 additions and 67 deletions
+8 -1
View File
@@ -13,7 +13,9 @@ def global_gc():
worker.core_worker.global_gc()
def memory_summary(node_manager_address=None, node_manager_port=None):
def memory_summary(node_manager_address=None,
node_manager_port=None,
stats_only=False):
"""Returns a formatted string describing memory usage in the cluster."""
import grpc
@@ -63,6 +65,11 @@ def memory_summary(node_manager_address=None, node_manager_port=None):
reply.store_stats.restored_objects_total,
int(reply.store_stats.restored_bytes_total / (1024 * 1024) /
reply.store_stats.restore_time_total_s)))
if reply.store_stats.consumed_bytes > 0:
store_summary += ("Objects consumed by Ray tasks: {} MiB.".format(
int(reply.store_stats.consumed_bytes / (1024 * 1024))))
if stats_only:
return store_summary
return reply.memory_summary + "\n" + store_summary
+9 -2
View File
@@ -1372,7 +1372,13 @@ def timeline(address):
type=str,
default=ray_constants.REDIS_DEFAULT_PASSWORD,
help="Connect to ray with redis_password.")
def memory(address, redis_password):
@click.option(
"--stats-only",
is_flag=True,
type=bool,
default=False,
help="Connect to ray with redis_password.")
def memory(address, redis_password, stats_only):
"""Print object references held in a Ray cluster."""
if not address:
address = services.get_ray_address_to_use_or_die()
@@ -1381,7 +1387,8 @@ def memory(address, redis_password):
raylet = state.node_table()[0]
print(
ray.internal.internal_api.memory_summary(raylet["NodeManagerAddress"],
raylet["NodeManagerPort"]))
raylet["NodeManagerPort"],
stats_only))
@cli.command()
+2 -1
View File
@@ -27,7 +27,8 @@ DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)"
def data_lines(memory_str):
for line in memory_str.split("\n"):
if (not line or "---" in line or "===" in line or "Object ID" in line
or "pid=" in line or "Plasma memory" in line):
or "pid=" in line or "Plasma memory" in line
or "Objects consumed" in line):
continue
yield line
+63 -23
View File
@@ -88,6 +88,27 @@ def is_dir_empty(temp_folder,
return num_files == 0
def assert_no_thrashing(address):
state = ray.state.GlobalState()
state._initialize_global_state(address,
ray.ray_constants.REDIS_DEFAULT_PASSWORD)
raylet = state.node_table()[0]
memory_summary = ray.internal.internal_api.memory_summary(
raylet["NodeManagerAddress"],
raylet["NodeManagerPort"],
stats_only=True)
restored_bytes = 0
consumed_bytes = 0
for line in memory_summary.split("\n"):
if "Restored" in line:
restored_bytes = int(line.split(" ")[1])
if "consumed" in line:
consumed_bytes = int(line.split(" ")[-2])
assert consumed_bytes >= restored_bytes, (
f"consumed: {consumed_bytes}, restored: {restored_bytes}")
def test_invalid_config_raises_exception(shutdown_only):
# Make sure ray.init raises an exception before
# it starts processes when invalid object spilling
@@ -187,7 +208,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
@@ -203,6 +224,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
ref2 = ray.put(arr) # noqa
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -249,6 +271,7 @@ def test_spill_remote_object(ray_start_cluster,
# Test passing the spilled object as an arg to another task.
ray.get(depends.remote(ref))
assert_no_thrashing(cluster.address)
@pytest.mark.skipif(
@@ -256,7 +279,7 @@ def test_spill_remote_object(ray_start_cluster,
def test_spill_objects_automatically(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -287,14 +310,15 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
platform.system() in ["Darwin", "Windows"], reason="Failing on Windows.")
platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.")
def test_spill_stats(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_system_config={
@@ -319,17 +343,31 @@ def test_spill_stats(object_spilling_config, shutdown_only):
x_id = f.remote() # noqa
ray.get(x_id)
s = memory_summary()
s = memory_summary(stats_only=True)
assert "Plasma memory usage 50 MiB, 1 objects, 50.0% full" in s, s
assert "Spilled 200 MiB, 4 objects" in s, s
assert "Restored 150 MiB, 3 objects" in s, s
# Test if consumed bytes are correctly calculated.
obj = ray.put(np.zeros(30 * 1024 * 1024, dtype=np.uint8))
@ray.remote
def func_with_ref(obj):
return True
ray.get(func_with_ref.remote(obj))
s = memory_summary(stats_only=True)
# 50MB * 5 references + 30MB used for task execution.
assert "Objects consumed by Ray tasks: 280 MiB." in s, s
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=4,
object_store_memory=100 * 1024 * 1024,
_system_config={
@@ -355,6 +393,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
# objects are being created.
for x in ids:
print(ray.get(x).shape)
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -362,7 +401,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
def test_spill_deadlock(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
# Limit our object store to 75 MiB of memory.
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
@@ -386,6 +425,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -394,7 +434,7 @@ def test_delete_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
@@ -417,6 +457,7 @@ def test_delete_objects(object_spilling_config, shutdown_only):
del replay_buffer
del ref
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -426,7 +467,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
@@ -457,6 +498,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
del replay_buffer
del ref
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -466,7 +508,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
@@ -518,6 +560,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
# After all, make sure all objects are deleted upon worker failures.
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -539,10 +582,11 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
})
ray.init(address=cluster.address)
# Add 2 worker nodes.
for _ in range(2):
cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024)
ray.init(address=cluster.address)
cluster.wait_for_nodes()
arr = np.random.rand(1024 * 1024) # 8 MB data
@@ -565,9 +609,9 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
self.replay_buffer.pop()
# Do random sampling.
for _ in range(200):
for _ in range(50):
ref = random.choice(self.replay_buffer)
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=10)
assert np.array_equal(sample, arr)
actors = [Actor.remote() for _ in range(3)]
@@ -586,6 +630,7 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
wait_for_condition(lambda: wait_until_actor_dead(actor))
# The multi node deletion should work.
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(cluster.address)
@pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.")
@@ -593,7 +638,7 @@ def test_fusion_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
min_spilling_size = 10 * 1024 * 1024
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 3,
@@ -637,12 +682,13 @@ def test_fusion_objects(object_spilling_config, shutdown_only):
if file_size >= min_spilling_size:
is_test_passing = True
assert is_test_passing
assert_no_thrashing(address["redis_address"])
# https://github.com/ray-project/ray/issues/12912
def do_test_release_resource(object_spilling_config, expect_released):
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -674,6 +720,7 @@ def do_test_release_resource(object_spilling_config, expect_released):
assert ready
else:
assert not ready
assert_no_thrashing(address["redis_address"])
@pytest.mark.skipif(
@@ -745,6 +792,7 @@ def test_spill_objects_on_object_transfer(object_spilling_config,
# spilling.
tasks = [foo.remote(*task_args) for task_args in args]
ray.get(tasks)
assert_no_thrashing(cluster.address)
@pytest.mark.skipif(
@@ -801,14 +849,6 @@ os.kill(os.getpid(), sig)
driver.format(temp_dir=str(temp_folder), signum=2)))
wait_for_condition(lambda: is_dir_empty(temp_folder, append_path=""))
# Q: Looks like Sigterm doesn't work with Ray?
# print("Sending sigterm...")
# # Run a driver with sigterm.
# with pytest.raises(subprocess.CalledProcessError):
# print(run_string_as_driver(
# driver.format(temp_dir=str(temp_folder), signum=15)))
# wait_for_condition(is_dir_empty, timeout=1000)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
+6 -7
View File
@@ -566,6 +566,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// NOTE: This also marks the worker as available in Raylet. We do this at the
// very end in case there is a problem during construction.
RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort()));
// Used to detect if the object is in the plasma store.
max_direct_call_object_size_ = RayConfig::instance().max_direct_call_object_size();
}
void CoreWorker::Shutdown() {
@@ -881,8 +883,7 @@ Status CoreWorker::Put(const RayObject &object,
bool object_exists;
if (options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(object.GetSize()) <
RayConfig::instance().max_direct_call_object_size())) {
static_cast<int64_t>(object.GetSize()) < max_direct_call_object_size_)) {
RAY_LOG(DEBUG) << "Put " << object_id << " in memory store";
RAY_CHECK(memory_store_->Put(object, object_id));
return Status::OK();
@@ -923,8 +924,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
NodeID::FromBinary(rpc_address_.raylet_id()));
if (options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(data_size) <
RayConfig::instance().max_direct_call_object_size())) {
static_cast<int64_t>(data_size) < max_direct_call_object_size_)) {
*data = std::make_shared<LocalMemoryBuffer>(data_size);
} else {
auto status =
@@ -1037,7 +1037,7 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
bool missing_result = false;
bool will_throw_exception = false;
for (size_t i = 0; i < ids.size(); i++) {
auto pair = result_map.find(ids[i]);
const auto pair = result_map.find(ids[i]);
if (pair != result_map.end()) {
(*results)[i] = pair->second;
RAY_CHECK(!pair->second->IsInPlasmaError());
@@ -1778,8 +1778,7 @@ Status CoreWorker::AllocateReturnObjects(
// Allocate a buffer for the return object.
if (options_.is_local_mode ||
static_cast<int64_t>(data_sizes[i]) <
RayConfig::instance().max_direct_call_object_size()) {
static_cast<int64_t>(data_sizes[i]) < max_direct_call_object_size_) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_sizes[i]);
} else {
RAY_RETURN_NOT_OK(CreateExisting(metadatas[i], data_sizes[i], object_ids[i],
+2
View File
@@ -1255,6 +1255,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Whether we are shutting down and not running further tasks.
bool exiting_ = false;
int64_t max_direct_call_object_size_;
friend class CoreWorkerTest;
};
@@ -191,7 +191,8 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
std::vector<plasma::ObjectBuffer> plasma_results;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results));
RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results,
/*is_from_worker=*/true));
}
// Add successfully retrieved objects to the result map and remove them from
@@ -231,7 +232,9 @@ Status CoreWorkerPlasmaStoreProvider::GetIfLocal(
std::vector<plasma::ObjectBuffer> plasma_results;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_RETURN_NOT_OK(store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results));
// Since this path is used only for spilling, we should set is_from_worker: false.
RAY_RETURN_NOT_OK(store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results,
/*is_from_worker=*/false));
}
for (size_t i = 0; i < object_ids.size(); i++) {
+2 -1
View File
@@ -57,7 +57,8 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
std::lock_guard<std::mutex> lock(pool_mutex_);
if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer;
RAY_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer));
RAY_CHECK_OK(
store_client_.Get(&object_id, 1, 0, &object_buffer, /*is_from_worker=*/false));
if (object_buffer.data == nullptr) {
RAY_LOG(INFO)
<< "Failed to get a chunk of the object: " << object_id
+3
View File
@@ -834,6 +834,9 @@ void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
stats->set_object_store_bytes_used(used_memory_);
stats->set_object_store_bytes_avail(config_.object_store_memory);
stats->set_num_local_objects(local_objects_.size());
if (plasma::plasma_store_runner) {
stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes());
}
}
void ObjectManager::Tick(const boost::system::error_code &e) {
+19 -13
View File
@@ -121,10 +121,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
std::shared_ptr<Buffer> *data, int device_num);
Status Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers);
std::vector<ObjectBuffer> *object_buffers, bool is_from_worker);
Status Get(const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
ObjectBuffer *object_buffers);
ObjectBuffer *object_buffers, bool is_from_worker);
Status Release(const ObjectID &object_id);
@@ -172,7 +172,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status GetBuffers(const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
const std::function<std::shared_ptr<Buffer>(
const ObjectID &, const std::shared_ptr<Buffer> &)> &wrap_buffer,
ObjectBuffer *object_buffers);
ObjectBuffer *object_buffers, bool is_from_worker);
uint8_t *LookupMmappedFile(MEMFD_TYPE store_fd_val);
@@ -362,7 +362,7 @@ Status PlasmaClient::Impl::GetBuffers(
const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
const std::function<std::shared_ptr<Buffer>(
const ObjectID &, const std::shared_ptr<Buffer> &)> &wrap_buffer,
ObjectBuffer *object_buffers) {
ObjectBuffer *object_buffers, bool is_from_worker) {
// Fill out the info for the objects that are already in use locally.
bool all_present = true;
for (int64_t i = 0; i < num_objects; ++i) {
@@ -409,7 +409,8 @@ Status PlasmaClient::Impl::GetBuffers(
// If we get here, then the objects aren't all currently in use by this
// client, so we need to send a request to the plasma store.
RAY_RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms));
RAY_RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms,
is_from_worker));
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer));
std::vector<ObjectID> received_object_ids(num_objects);
@@ -470,7 +471,8 @@ Status PlasmaClient::Impl::GetBuffers(
}
Status PlasmaClient::Impl::Get(const std::vector<ObjectID> &object_ids,
int64_t timeout_ms, std::vector<ObjectBuffer> *out) {
int64_t timeout_ms, std::vector<ObjectBuffer> *out,
bool is_from_worker) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
const auto wrap_buffer = [=](const ObjectID &object_id,
@@ -479,16 +481,19 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID> &object_ids,
};
const size_t num_objects = object_ids.size();
*out = std::vector<ObjectBuffer>(num_objects);
return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0]);
return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0],
is_from_worker);
}
Status PlasmaClient::Impl::Get(const ObjectID *object_ids, int64_t num_objects,
int64_t timeout_ms, ObjectBuffer *out) {
int64_t timeout_ms, ObjectBuffer *out,
bool is_from_worker) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
const auto wrap_buffer = [](const ObjectID &object_id,
const std::shared_ptr<Buffer> &buffer) { return buffer; };
return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out);
return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out,
is_from_worker);
}
Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) {
@@ -753,13 +758,14 @@ Status PlasmaClient::TryCreateImmediately(const ObjectID &object_id,
}
Status PlasmaClient::Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers) {
return impl_->Get(object_ids, timeout_ms, object_buffers);
std::vector<ObjectBuffer> *object_buffers, bool is_from_worker) {
return impl_->Get(object_ids, timeout_ms, object_buffers, is_from_worker);
}
Status PlasmaClient::Get(const ObjectID *object_ids, int64_t num_objects,
int64_t timeout_ms, ObjectBuffer *object_buffers) {
return impl_->Get(object_ids, num_objects, timeout_ms, object_buffers);
int64_t timeout_ms, ObjectBuffer *object_buffers,
bool is_from_worker) {
return impl_->Get(object_ids, num_objects, timeout_ms, object_buffers, is_from_worker);
}
Status PlasmaClient::Release(const ObjectID &object_id) {
+4 -2
View File
@@ -161,9 +161,10 @@ class PlasmaClient {
/// \param timeout_ms The amount of time in milliseconds to wait before this
/// request times out. If this value is -1, then no timeout is set.
/// \param[out] object_buffers The object results.
/// \param is_from_worker Whether or not if the Get request comes from a Ray workers.
/// \return The return status.
Status Get(const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer> *object_buffers);
std::vector<ObjectBuffer> *object_buffers, bool is_from_worker);
/// Deprecated variant of Get() that doesn't automatically release buffers
/// when they get out of scope.
@@ -173,12 +174,13 @@ class PlasmaClient {
/// \param timeout_ms The amount of time in milliseconds to wait before this
/// request times out. If this value is -1, then no timeout is set.
/// \param object_buffers An array where the results will be stored.
/// \param is_from_worker Whether or not if the Get request comes from a Ray workers.
/// \return The return status.
///
/// The caller is responsible for releasing any retrieved objects, but it
/// should not release objects that were not retrieved.
Status Get(const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
ObjectBuffer *object_buffers);
ObjectBuffer *object_buffers, bool is_from_worker);
/// Tell Plasma that the client no longer needs the object. This should be
/// called after Get() or Create() when the client is done with the object.
+2
View File
@@ -210,6 +210,8 @@ table PlasmaGetRequest {
object_ids: [string];
// The number of milliseconds before the request should timeout.
timeout_ms: long;
// Whether or not the get request is from the core worker. It is used to record how many bytes are consumed by core workers.
is_from_worker: bool;
}
table PlasmaGetReply {
+5 -4
View File
@@ -553,16 +553,16 @@ Status ReadEvictReply(uint8_t *data, size_t size, int64_t &num_bytes) {
// Get messages.
Status SendGetRequest(const std::shared_ptr<StoreConn> &store_conn,
const ObjectID *object_ids, int64_t num_objects,
int64_t timeout_ms) {
const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
bool is_from_worker) {
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaGetRequest(
fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms);
fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms, is_from_worker);
return PlasmaSend(store_conn, MessageType::PlasmaGetRequest, &fbb, message);
}
Status ReadGetRequest(uint8_t *data, size_t size, std::vector<ObjectID> &object_ids,
int64_t *timeout_ms) {
int64_t *timeout_ms, bool *is_from_worker) {
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
@@ -571,6 +571,7 @@ Status ReadGetRequest(uint8_t *data, size_t size, std::vector<ObjectID> &object_
object_ids.push_back(ObjectID::FromBinary(object_id));
}
*timeout_ms = message->timeout_ms();
*is_from_worker = message->is_from_worker();
return Status::OK();
}
+3 -3
View File
@@ -128,11 +128,11 @@ Status ReadSealReply(uint8_t *data, size_t size, ObjectID *object_id);
/* Plasma Get message functions. */
Status SendGetRequest(const std::shared_ptr<StoreConn> &store_conn,
const ObjectID *object_ids, int64_t num_objects,
int64_t timeout_ms);
const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms,
bool is_from_worker);
Status ReadGetRequest(uint8_t *data, size_t size, std::vector<ObjectID> &object_ids,
int64_t *timeout_ms);
int64_t *timeout_ms, bool *is_from_worker);
Status SendGetReply(const std::shared_ptr<Client> &client, ObjectID object_ids[],
std::unordered_map<ObjectID, PlasmaObject> &plasma_objects,
+20 -6
View File
@@ -69,7 +69,7 @@ namespace plasma {
struct GetRequest {
GetRequest(boost::asio::io_service &io_context, const std::shared_ptr<Client> &client,
const std::vector<ObjectID> &object_ids);
const std::vector<ObjectID> &object_ids, bool is_from_worker);
/// The client that called get.
std::shared_ptr<Client> client;
/// The object IDs involved in this request. This is used in the reply.
@@ -82,6 +82,9 @@ struct GetRequest {
/// The number of object requests in this wait request that are already
/// satisfied.
int64_t num_satisfied;
/// Whether or not the request comes from the core worker. It is used to track the size
/// of total objects that are consumed by core worker.
bool is_from_worker;
void AsyncWait(int64_t timeout_ms,
std::function<void(const boost::system::error_code &)> on_timeout) {
@@ -100,11 +103,12 @@ struct GetRequest {
GetRequest::GetRequest(boost::asio::io_service &io_context,
const std::shared_ptr<Client> &client,
const std::vector<ObjectID> &object_ids)
const std::vector<ObjectID> &object_ids, bool is_from_worker)
: client(client),
object_ids(object_ids.begin(), object_ids.end()),
objects(object_ids.size()),
num_satisfied(0),
is_from_worker(is_from_worker),
timer_(io_context) {
std::unordered_set<ObjectID> unique_ids(object_ids.begin(), object_ids.end());
num_objects_to_wait_for = unique_ids.size();
@@ -393,6 +397,9 @@ void PlasmaStore::ReturnFromGet(GetRequest *get_req) {
fds_to_send.insert(fd);
store_fds.push_back(fd);
mmap_sizes.push_back(GetMmapSize(fd));
if (get_req->is_from_worker) {
total_consumed_bytes_ += object.data_size + object.metadata_size;
}
}
}
// Send the get reply to the client.
@@ -465,9 +472,9 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID &object_id) {
void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
const std::vector<ObjectID> &object_ids,
int64_t timeout_ms) {
int64_t timeout_ms, bool is_from_worker) {
// Create a get request for this object.
auto get_req = new GetRequest(io_context_, client, object_ids);
auto get_req = new GetRequest(io_context_, client, object_ids, is_from_worker);
for (auto object_id : object_ids) {
// Check if this object is already present
// locally. If so, record that the object is being used and mark it as accounted for.
@@ -894,8 +901,10 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
case fb::MessageType::PlasmaGetRequest: {
std::vector<ObjectID> object_ids_to_get;
int64_t timeout_ms;
RAY_RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
ProcessGetRequest(client, object_ids_to_get, timeout_ms);
bool is_from_worker;
RAY_RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms,
&is_from_worker));
ProcessGetRequest(client, object_ids_to_get, timeout_ms, is_from_worker);
} break;
case fb::MessageType::PlasmaReleaseRequest: {
RAY_RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
@@ -1020,6 +1029,11 @@ void PlasmaStore::ReplyToCreateClient(const std::shared_ptr<Client> &client,
}
}
int64_t PlasmaStore::GetConsumedBytes() {
std::lock_guard<std::recursive_mutex> guard(mutex_);
return total_consumed_bytes_;
}
bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) {
// The lock is acquired when a request is received to the plasma store.
// recursive mutex is used here to allow
+8 -1
View File
@@ -139,7 +139,8 @@ class PlasmaStore {
/// \param object_ids Object IDs of the objects to be gotten.
/// \param timeout_ms The timeout for the get request in milliseconds.
void ProcessGetRequest(const std::shared_ptr<Client> &client,
const std::vector<ObjectID> &object_ids, int64_t timeout_ms);
const std::vector<ObjectID> &object_ids, int64_t timeout_ms,
bool is_from_worker);
/// Seal a vector of objects. The objects are now immutable and can be accessed with
/// get.
@@ -190,6 +191,9 @@ class PlasmaStore {
/// before the object is pinned by raylet for the first time.
bool IsObjectSpillable(const ObjectID &object_id);
/// Return the plasma object bytes that are consumed by core workers.
int64_t GetConsumedBytes();
void SetNotificationListener(
const std::shared_ptr<ray::ObjectStoreNotificationManager> &notification_listener) {
notification_listener_ = notification_listener;
@@ -316,6 +320,9 @@ class PlasmaStore {
std::recursive_mutex mutex_;
size_t num_bytes_in_use_ = 0;
/// Total plasma object bytes that are consumed by core workers.
int64_t total_consumed_bytes_ = 0;
};
} // namespace plasma
@@ -123,6 +123,8 @@ bool PlasmaStoreRunner::IsPlasmaObjectSpillable(const ObjectID &object_id) {
return store_->IsObjectSpillable(object_id);
}
int64_t PlasmaStoreRunner::GetConsumedBytes() { return store_->GetConsumedBytes(); }
std::unique_ptr<PlasmaStoreRunner> plasma_store_runner;
} // namespace plasma
@@ -22,6 +22,8 @@ class PlasmaStoreRunner {
}
bool IsPlasmaObjectSpillable(const ObjectID &object_id);
int64_t GetConsumedBytes();
void GetAvailableMemoryAsync(std::function<void(size_t)> callback) const {
main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); });
}
+2
View File
@@ -138,6 +138,8 @@ message ObjectStoreStats {
int64 object_store_bytes_avail = 8;
// The number of local objects total.
int64 num_local_objects = 9;
// The number of plasma object bytes that are consumed by core workers.
int64 consumed_bytes = 10;
}
message GetNodeStatsReply {
+5 -1
View File
@@ -2384,7 +2384,9 @@ bool NodeManager::GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
// heavy load, then this request can still block the NodeManager event loop
// since we must wait for the plasma store's reply. We should consider using
// an `AsyncGet` instead.
if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) {
if (!store_client_
.Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false)
.ok()) {
return false;
}
@@ -2546,6 +2548,8 @@ rpc::ObjectStoreStats AccumulateStoreStats(
cur_store.object_store_bytes_avail());
store_stats.set_num_local_objects(store_stats.num_local_objects() +
cur_store.num_local_objects());
store_stats.set_consumed_bytes(store_stats.consumed_bytes() +
cur_store.consumed_bytes());
}
return store_stats;
}