From cb9fa90203309ff4c487e2bc86f89b1213cc8261 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 3 Feb 2021 14:16:26 -0800 Subject: [PATCH] [Object Spilling] Add consumed bytes to detect thrashing. (#13853) --- python/ray/internal/internal_api.py | 9 +- python/ray/scripts/scripts.py | 11 ++- python/ray/tests/test_memstat.py | 3 +- python/ray/tests/test_object_spilling.py | 86 ++++++++++++++----- src/ray/core_worker/core_worker.cc | 13 ++- src/ray/core_worker/core_worker.h | 2 + .../store_provider/plasma_store_provider.cc | 7 +- src/ray/object_manager/object_buffer_pool.cc | 3 +- src/ray/object_manager/object_manager.cc | 3 + src/ray/object_manager/plasma/client.cc | 32 ++++--- src/ray/object_manager/plasma/client.h | 6 +- src/ray/object_manager/plasma/plasma.fbs | 2 + src/ray/object_manager/plasma/protocol.cc | 9 +- src/ray/object_manager/plasma/protocol.h | 6 +- src/ray/object_manager/plasma/store.cc | 26 ++++-- src/ray/object_manager/plasma/store.h | 9 +- src/ray/object_manager/plasma/store_runner.cc | 2 + src/ray/object_manager/plasma/store_runner.h | 2 + src/ray/protobuf/node_manager.proto | 2 + src/ray/raylet/node_manager.cc | 6 +- 20 files changed, 172 insertions(+), 67 deletions(-) diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 67c1a9275..7956725b7 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -13,7 +13,9 @@ def global_gc(): worker.core_worker.global_gc() -def memory_summary(node_manager_address=None, node_manager_port=None): +def memory_summary(node_manager_address=None, + node_manager_port=None, + stats_only=False): """Returns a formatted string describing memory usage in the cluster.""" import grpc @@ -63,6 +65,11 @@ def memory_summary(node_manager_address=None, node_manager_port=None): reply.store_stats.restored_objects_total, int(reply.store_stats.restored_bytes_total / (1024 * 1024) / reply.store_stats.restore_time_total_s))) + if reply.store_stats.consumed_bytes > 0: + store_summary += ("Objects consumed by Ray tasks: {} MiB.".format( + int(reply.store_stats.consumed_bytes / (1024 * 1024)))) + if stats_only: + return store_summary return reply.memory_summary + "\n" + store_summary diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index d4ae094d9..8deaa6f4a 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -1372,7 +1372,13 @@ def timeline(address): type=str, default=ray_constants.REDIS_DEFAULT_PASSWORD, help="Connect to ray with redis_password.") -def memory(address, redis_password): +@click.option( + "--stats-only", + is_flag=True, + type=bool, + default=False, + help="Connect to ray with redis_password.") +def memory(address, redis_password, stats_only): """Print object references held in a Ray cluster.""" if not address: address = services.get_ray_address_to_use_or_die() @@ -1381,7 +1387,8 @@ def memory(address, redis_password): raylet = state.node_table()[0] print( ray.internal.internal_api.memory_summary(raylet["NodeManagerAddress"], - raylet["NodeManagerPort"])) + raylet["NodeManagerPort"], + stats_only)) @cli.command() diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index cb734b3b7..a0e8e3c90 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -27,7 +27,8 @@ DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)" def data_lines(memory_str): for line in memory_str.split("\n"): if (not line or "---" in line or "===" in line or "Object ID" in line - or "pid=" in line or "Plasma memory" in line): + or "pid=" in line or "Plasma memory" in line + or "Objects consumed" in line): continue yield line diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 159e0aaf7..500c66225 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -88,6 +88,27 @@ def is_dir_empty(temp_folder, return num_files == 0 +def assert_no_thrashing(address): + state = ray.state.GlobalState() + state._initialize_global_state(address, + ray.ray_constants.REDIS_DEFAULT_PASSWORD) + raylet = state.node_table()[0] + memory_summary = ray.internal.internal_api.memory_summary( + raylet["NodeManagerAddress"], + raylet["NodeManagerPort"], + stats_only=True) + restored_bytes = 0 + consumed_bytes = 0 + + for line in memory_summary.split("\n"): + if "Restored" in line: + restored_bytes = int(line.split(" ")[1]) + if "consumed" in line: + consumed_bytes = int(line.split(" ")[-2]) + assert consumed_bytes >= restored_bytes, ( + f"consumed: {consumed_bytes}, restored: {restored_bytes}") + + def test_invalid_config_raises_exception(shutdown_only): # Make sure ray.init raises an exception before # it starts processes when invalid object spilling @@ -187,7 +208,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, @@ -203,6 +224,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config, ref2 = ray.put(arr) # noqa wait_for_condition(lambda: is_dir_empty(temp_folder)) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -249,6 +271,7 @@ def test_spill_remote_object(ray_start_cluster, # Test passing the spilled object as an arg to another task. ray.get(depends.remote(ref)) + assert_no_thrashing(cluster.address) @pytest.mark.skipif( @@ -256,7 +279,7 @@ def test_spill_remote_object(ray_start_cluster, def test_spill_objects_automatically(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, _ = object_spilling_config - ray.init( + address = ray.init( num_cpus=1, object_store_memory=75 * 1024 * 1024, _system_config={ @@ -287,14 +310,15 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): solution = solution_buffer[index] sample = ray.get(ref, timeout=0) assert np.array_equal(sample, solution) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( - platform.system() in ["Darwin", "Windows"], reason="Failing on Windows.") + platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.") def test_spill_stats(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, _ = object_spilling_config - ray.init( + address = ray.init( num_cpus=1, object_store_memory=100 * 1024 * 1024, _system_config={ @@ -319,17 +343,31 @@ def test_spill_stats(object_spilling_config, shutdown_only): x_id = f.remote() # noqa ray.get(x_id) - s = memory_summary() + s = memory_summary(stats_only=True) assert "Plasma memory usage 50 MiB, 1 objects, 50.0% full" in s, s assert "Spilled 200 MiB, 4 objects" in s, s assert "Restored 150 MiB, 3 objects" in s, s + # Test if consumed bytes are correctly calculated. + obj = ray.put(np.zeros(30 * 1024 * 1024, dtype=np.uint8)) + + @ray.remote + def func_with_ref(obj): + return True + + ray.get(func_with_ref.remote(obj)) + + s = memory_summary(stats_only=True) + # 50MB * 5 references + 30MB used for task execution. + assert "Objects consumed by Ray tasks: 280 MiB." in s, s + assert_no_thrashing(address["redis_address"]) + @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") def test_spill_during_get(object_spilling_config, shutdown_only): object_spilling_config, _ = object_spilling_config - ray.init( + address = ray.init( num_cpus=4, object_store_memory=100 * 1024 * 1024, _system_config={ @@ -355,6 +393,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only): # objects are being created. for x in ids: print(ray.get(x).shape) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -362,7 +401,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only): def test_spill_deadlock(object_spilling_config, shutdown_only): object_spilling_config, _ = object_spilling_config # Limit our object store to 75 MiB of memory. - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 1, @@ -386,6 +425,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): ref = random.choice(replay_buffer) sample = ray.get(ref, timeout=0) assert np.array_equal(sample, arr) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -394,7 +434,7 @@ def test_delete_objects(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 1, @@ -417,6 +457,7 @@ def test_delete_objects(object_spilling_config, shutdown_only): del replay_buffer del ref wait_for_condition(lambda: is_dir_empty(temp_folder)) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -426,7 +467,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config, # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, @@ -457,6 +498,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config, del replay_buffer del ref wait_for_condition(lambda: is_dir_empty(temp_folder)) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -466,7 +508,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config, # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, @@ -518,6 +560,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config, # After all, make sure all objects are deleted upon worker failures. wait_for_condition(lambda: is_dir_empty(temp_folder)) + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -539,10 +582,11 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config, "object_store_full_delay_ms": 100, "object_spilling_config": object_spilling_config, }) + ray.init(address=cluster.address) # Add 2 worker nodes. for _ in range(2): cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024) - ray.init(address=cluster.address) + cluster.wait_for_nodes() arr = np.random.rand(1024 * 1024) # 8 MB data @@ -565,9 +609,9 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config, self.replay_buffer.pop() # Do random sampling. - for _ in range(200): + for _ in range(50): ref = random.choice(self.replay_buffer) - sample = ray.get(ref, timeout=0) + sample = ray.get(ref, timeout=10) assert np.array_equal(sample, arr) actors = [Actor.remote() for _ in range(3)] @@ -586,6 +630,7 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config, wait_for_condition(lambda: wait_until_actor_dead(actor)) # The multi node deletion should work. wait_for_condition(lambda: is_dir_empty(temp_folder)) + assert_no_thrashing(cluster.address) @pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.") @@ -593,7 +638,7 @@ def test_fusion_objects(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config min_spilling_size = 10 * 1024 * 1024 - ray.init( + address = ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 3, @@ -637,12 +682,13 @@ def test_fusion_objects(object_spilling_config, shutdown_only): if file_size >= min_spilling_size: is_test_passing = True assert is_test_passing + assert_no_thrashing(address["redis_address"]) # https://github.com/ray-project/ray/issues/12912 def do_test_release_resource(object_spilling_config, expect_released): object_spilling_config, temp_folder = object_spilling_config - ray.init( + address = ray.init( num_cpus=1, object_store_memory=75 * 1024 * 1024, _system_config={ @@ -674,6 +720,7 @@ def do_test_release_resource(object_spilling_config, expect_released): assert ready else: assert not ready + assert_no_thrashing(address["redis_address"]) @pytest.mark.skipif( @@ -745,6 +792,7 @@ def test_spill_objects_on_object_transfer(object_spilling_config, # spilling. tasks = [foo.remote(*task_args) for task_args in args] ray.get(tasks) + assert_no_thrashing(cluster.address) @pytest.mark.skipif( @@ -801,14 +849,6 @@ os.kill(os.getpid(), sig) driver.format(temp_dir=str(temp_folder), signum=2))) wait_for_condition(lambda: is_dir_empty(temp_folder, append_path="")) - # Q: Looks like Sigterm doesn't work with Ray? - # print("Sending sigterm...") - # # Run a driver with sigterm. - # with pytest.raises(subprocess.CalledProcessError): - # print(run_string_as_driver( - # driver.format(temp_dir=str(temp_folder), signum=15))) - # wait_for_condition(is_dir_empty, timeout=1000) - if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b56f18cf0..a8c2e8557 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -566,6 +566,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // NOTE: This also marks the worker as available in Raylet. We do this at the // very end in case there is a problem during construction. RAY_CHECK_OK(local_raylet_client_->AnnounceWorkerPort(core_worker_server_->GetPort())); + // Used to detect if the object is in the plasma store. + max_direct_call_object_size_ = RayConfig::instance().max_direct_call_object_size(); } void CoreWorker::Shutdown() { @@ -881,8 +883,7 @@ Status CoreWorker::Put(const RayObject &object, bool object_exists; if (options_.is_local_mode || (RayConfig::instance().put_small_object_in_memory_store() && - static_cast(object.GetSize()) < - RayConfig::instance().max_direct_call_object_size())) { + static_cast(object.GetSize()) < max_direct_call_object_size_)) { RAY_LOG(DEBUG) << "Put " << object_id << " in memory store"; RAY_CHECK(memory_store_->Put(object, object_id)); return Status::OK(); @@ -923,8 +924,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, NodeID::FromBinary(rpc_address_.raylet_id())); if (options_.is_local_mode || (RayConfig::instance().put_small_object_in_memory_store() && - static_cast(data_size) < - RayConfig::instance().max_direct_call_object_size())) { + static_cast(data_size) < max_direct_call_object_size_)) { *data = std::make_shared(data_size); } else { auto status = @@ -1037,7 +1037,7 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m bool missing_result = false; bool will_throw_exception = false; for (size_t i = 0; i < ids.size(); i++) { - auto pair = result_map.find(ids[i]); + const auto pair = result_map.find(ids[i]); if (pair != result_map.end()) { (*results)[i] = pair->second; RAY_CHECK(!pair->second->IsInPlasmaError()); @@ -1778,8 +1778,7 @@ Status CoreWorker::AllocateReturnObjects( // Allocate a buffer for the return object. if (options_.is_local_mode || - static_cast(data_sizes[i]) < - RayConfig::instance().max_direct_call_object_size()) { + static_cast(data_sizes[i]) < max_direct_call_object_size_) { data_buffer = std::make_shared(data_sizes[i]); } else { RAY_RETURN_NOT_OK(CreateExisting(metadatas[i], data_sizes[i], object_ids[i], diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 89331b5ce..6fa24c29e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1255,6 +1255,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Whether we are shutting down and not running further tasks. bool exiting_ = false; + int64_t max_direct_call_object_size_; + friend class CoreWorkerTest; }; diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index b42c4b509..f3b5f047c 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -191,7 +191,8 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( std::vector plasma_results; { std::lock_guard guard(store_client_mutex_); - RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results)); + RAY_RETURN_NOT_OK(store_client_.Get(batch_ids, timeout_ms, &plasma_results, + /*is_from_worker=*/true)); } // Add successfully retrieved objects to the result map and remove them from @@ -231,7 +232,9 @@ Status CoreWorkerPlasmaStoreProvider::GetIfLocal( std::vector plasma_results; { std::lock_guard guard(store_client_mutex_); - RAY_RETURN_NOT_OK(store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results)); + // Since this path is used only for spilling, we should set is_from_worker: false. + RAY_RETURN_NOT_OK(store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results, + /*is_from_worker=*/false)); } for (size_t i = 0; i < object_ids.size(); i++) { diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 726a6fefc..63dabcb41 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -57,7 +57,8 @@ std::pair ObjectBufferPool::Ge std::lock_guard lock(pool_mutex_); if (get_buffer_state_.count(object_id) == 0) { plasma::ObjectBuffer object_buffer; - RAY_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer)); + RAY_CHECK_OK( + store_client_.Get(&object_id, 1, 0, &object_buffer, /*is_from_worker=*/false)); if (object_buffer.data == nullptr) { RAY_LOG(INFO) << "Failed to get a chunk of the object: " << object_id diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 448245e01..d59737ca6 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -834,6 +834,9 @@ void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const { stats->set_object_store_bytes_used(used_memory_); stats->set_object_store_bytes_avail(config_.object_store_memory); stats->set_num_local_objects(local_objects_.size()); + if (plasma::plasma_store_runner) { + stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes()); + } } void ObjectManager::Tick(const boost::system::error_code &e) { diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index a5429d985..9b9bb5408 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -121,10 +121,10 @@ class PlasmaClient::Impl : public std::enable_shared_from_this *data, int device_num); Status Get(const std::vector &object_ids, int64_t timeout_ms, - std::vector *object_buffers); + std::vector *object_buffers, bool is_from_worker); Status Get(const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, - ObjectBuffer *object_buffers); + ObjectBuffer *object_buffers, bool is_from_worker); Status Release(const ObjectID &object_id); @@ -172,7 +172,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this( const ObjectID &, const std::shared_ptr &)> &wrap_buffer, - ObjectBuffer *object_buffers); + ObjectBuffer *object_buffers, bool is_from_worker); uint8_t *LookupMmappedFile(MEMFD_TYPE store_fd_val); @@ -362,7 +362,7 @@ Status PlasmaClient::Impl::GetBuffers( const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, const std::function( const ObjectID &, const std::shared_ptr &)> &wrap_buffer, - ObjectBuffer *object_buffers) { + ObjectBuffer *object_buffers, bool is_from_worker) { // Fill out the info for the objects that are already in use locally. bool all_present = true; for (int64_t i = 0; i < num_objects; ++i) { @@ -409,7 +409,8 @@ Status PlasmaClient::Impl::GetBuffers( // If we get here, then the objects aren't all currently in use by this // client, so we need to send a request to the plasma store. - RAY_RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms)); + RAY_RETURN_NOT_OK(SendGetRequest(store_conn_, &object_ids[0], num_objects, timeout_ms, + is_from_worker)); std::vector buffer; RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer)); std::vector received_object_ids(num_objects); @@ -470,7 +471,8 @@ Status PlasmaClient::Impl::GetBuffers( } Status PlasmaClient::Impl::Get(const std::vector &object_ids, - int64_t timeout_ms, std::vector *out) { + int64_t timeout_ms, std::vector *out, + bool is_from_worker) { std::lock_guard guard(client_mutex_); const auto wrap_buffer = [=](const ObjectID &object_id, @@ -479,16 +481,19 @@ Status PlasmaClient::Impl::Get(const std::vector &object_ids, }; const size_t num_objects = object_ids.size(); *out = std::vector(num_objects); - return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0]); + return GetBuffers(&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0], + is_from_worker); } Status PlasmaClient::Impl::Get(const ObjectID *object_ids, int64_t num_objects, - int64_t timeout_ms, ObjectBuffer *out) { + int64_t timeout_ms, ObjectBuffer *out, + bool is_from_worker) { std::lock_guard guard(client_mutex_); const auto wrap_buffer = [](const ObjectID &object_id, const std::shared_ptr &buffer) { return buffer; }; - return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out); + return GetBuffers(object_ids, num_objects, timeout_ms, wrap_buffer, out, + is_from_worker); } Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) { @@ -753,13 +758,14 @@ Status PlasmaClient::TryCreateImmediately(const ObjectID &object_id, } Status PlasmaClient::Get(const std::vector &object_ids, int64_t timeout_ms, - std::vector *object_buffers) { - return impl_->Get(object_ids, timeout_ms, object_buffers); + std::vector *object_buffers, bool is_from_worker) { + return impl_->Get(object_ids, timeout_ms, object_buffers, is_from_worker); } Status PlasmaClient::Get(const ObjectID *object_ids, int64_t num_objects, - int64_t timeout_ms, ObjectBuffer *object_buffers) { - return impl_->Get(object_ids, num_objects, timeout_ms, object_buffers); + int64_t timeout_ms, ObjectBuffer *object_buffers, + bool is_from_worker) { + return impl_->Get(object_ids, num_objects, timeout_ms, object_buffers, is_from_worker); } Status PlasmaClient::Release(const ObjectID &object_id) { diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index e88a9eb13..703250bd2 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -161,9 +161,10 @@ class PlasmaClient { /// \param timeout_ms The amount of time in milliseconds to wait before this /// request times out. If this value is -1, then no timeout is set. /// \param[out] object_buffers The object results. + /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. /// \return The return status. Status Get(const std::vector &object_ids, int64_t timeout_ms, - std::vector *object_buffers); + std::vector *object_buffers, bool is_from_worker); /// Deprecated variant of Get() that doesn't automatically release buffers /// when they get out of scope. @@ -173,12 +174,13 @@ class PlasmaClient { /// \param timeout_ms The amount of time in milliseconds to wait before this /// request times out. If this value is -1, then no timeout is set. /// \param object_buffers An array where the results will be stored. + /// \param is_from_worker Whether or not if the Get request comes from a Ray workers. /// \return The return status. /// /// The caller is responsible for releasing any retrieved objects, but it /// should not release objects that were not retrieved. Status Get(const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, - ObjectBuffer *object_buffers); + ObjectBuffer *object_buffers, bool is_from_worker); /// Tell Plasma that the client no longer needs the object. This should be /// called after Get() or Create() when the client is done with the object. diff --git a/src/ray/object_manager/plasma/plasma.fbs b/src/ray/object_manager/plasma/plasma.fbs index 3816de79e..5a268a891 100644 --- a/src/ray/object_manager/plasma/plasma.fbs +++ b/src/ray/object_manager/plasma/plasma.fbs @@ -210,6 +210,8 @@ table PlasmaGetRequest { object_ids: [string]; // The number of milliseconds before the request should timeout. timeout_ms: long; + // Whether or not the get request is from the core worker. It is used to record how many bytes are consumed by core workers. + is_from_worker: bool; } table PlasmaGetReply { diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 8c3164d6a..c3b5b55ee 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -553,16 +553,16 @@ Status ReadEvictReply(uint8_t *data, size_t size, int64_t &num_bytes) { // Get messages. Status SendGetRequest(const std::shared_ptr &store_conn, - const ObjectID *object_ids, int64_t num_objects, - int64_t timeout_ms) { + const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, + bool is_from_worker) { flatbuffers::FlatBufferBuilder fbb; auto message = fb::CreatePlasmaGetRequest( - fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms); + fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms, is_from_worker); return PlasmaSend(store_conn, MessageType::PlasmaGetRequest, &fbb, message); } Status ReadGetRequest(uint8_t *data, size_t size, std::vector &object_ids, - int64_t *timeout_ms) { + int64_t *timeout_ms, bool *is_from_worker) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); RAY_DCHECK(VerifyFlatbuffer(message, data, size)); @@ -571,6 +571,7 @@ Status ReadGetRequest(uint8_t *data, size_t size, std::vector &object_ object_ids.push_back(ObjectID::FromBinary(object_id)); } *timeout_ms = message->timeout_ms(); + *is_from_worker = message->is_from_worker(); return Status::OK(); } diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index a8ba71b46..f5baf03ec 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -128,11 +128,11 @@ Status ReadSealReply(uint8_t *data, size_t size, ObjectID *object_id); /* Plasma Get message functions. */ Status SendGetRequest(const std::shared_ptr &store_conn, - const ObjectID *object_ids, int64_t num_objects, - int64_t timeout_ms); + const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms, + bool is_from_worker); Status ReadGetRequest(uint8_t *data, size_t size, std::vector &object_ids, - int64_t *timeout_ms); + int64_t *timeout_ms, bool *is_from_worker); Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[], std::unordered_map &plasma_objects, diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index e101c5a9b..af7219273 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -69,7 +69,7 @@ namespace plasma { struct GetRequest { GetRequest(boost::asio::io_service &io_context, const std::shared_ptr &client, - const std::vector &object_ids); + const std::vector &object_ids, bool is_from_worker); /// The client that called get. std::shared_ptr client; /// The object IDs involved in this request. This is used in the reply. @@ -82,6 +82,9 @@ struct GetRequest { /// The number of object requests in this wait request that are already /// satisfied. int64_t num_satisfied; + /// Whether or not the request comes from the core worker. It is used to track the size + /// of total objects that are consumed by core worker. + bool is_from_worker; void AsyncWait(int64_t timeout_ms, std::function on_timeout) { @@ -100,11 +103,12 @@ struct GetRequest { GetRequest::GetRequest(boost::asio::io_service &io_context, const std::shared_ptr &client, - const std::vector &object_ids) + const std::vector &object_ids, bool is_from_worker) : client(client), object_ids(object_ids.begin(), object_ids.end()), objects(object_ids.size()), num_satisfied(0), + is_from_worker(is_from_worker), timer_(io_context) { std::unordered_set unique_ids(object_ids.begin(), object_ids.end()); num_objects_to_wait_for = unique_ids.size(); @@ -393,6 +397,9 @@ void PlasmaStore::ReturnFromGet(GetRequest *get_req) { fds_to_send.insert(fd); store_fds.push_back(fd); mmap_sizes.push_back(GetMmapSize(fd)); + if (get_req->is_from_worker) { + total_consumed_bytes_ += object.data_size + object.metadata_size; + } } } // Send the get reply to the client. @@ -465,9 +472,9 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID &object_id) { void PlasmaStore::ProcessGetRequest(const std::shared_ptr &client, const std::vector &object_ids, - int64_t timeout_ms) { + int64_t timeout_ms, bool is_from_worker) { // Create a get request for this object. - auto get_req = new GetRequest(io_context_, client, object_ids); + auto get_req = new GetRequest(io_context_, client, object_ids, is_from_worker); for (auto object_id : object_ids) { // Check if this object is already present // locally. If so, record that the object is being used and mark it as accounted for. @@ -894,8 +901,10 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, case fb::MessageType::PlasmaGetRequest: { std::vector object_ids_to_get; int64_t timeout_ms; - RAY_RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms)); - ProcessGetRequest(client, object_ids_to_get, timeout_ms); + bool is_from_worker; + RAY_RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms, + &is_from_worker)); + ProcessGetRequest(client, object_ids_to_get, timeout_ms, is_from_worker); } break; case fb::MessageType::PlasmaReleaseRequest: { RAY_RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id)); @@ -1020,6 +1029,11 @@ void PlasmaStore::ReplyToCreateClient(const std::shared_ptr &client, } } +int64_t PlasmaStore::GetConsumedBytes() { + std::lock_guard guard(mutex_); + return total_consumed_bytes_; +} + bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) { // The lock is acquired when a request is received to the plasma store. // recursive mutex is used here to allow diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 214cf9763..eedcb526d 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -139,7 +139,8 @@ class PlasmaStore { /// \param object_ids Object IDs of the objects to be gotten. /// \param timeout_ms The timeout for the get request in milliseconds. void ProcessGetRequest(const std::shared_ptr &client, - const std::vector &object_ids, int64_t timeout_ms); + const std::vector &object_ids, int64_t timeout_ms, + bool is_from_worker); /// Seal a vector of objects. The objects are now immutable and can be accessed with /// get. @@ -190,6 +191,9 @@ class PlasmaStore { /// before the object is pinned by raylet for the first time. bool IsObjectSpillable(const ObjectID &object_id); + /// Return the plasma object bytes that are consumed by core workers. + int64_t GetConsumedBytes(); + void SetNotificationListener( const std::shared_ptr ¬ification_listener) { notification_listener_ = notification_listener; @@ -316,6 +320,9 @@ class PlasmaStore { std::recursive_mutex mutex_; size_t num_bytes_in_use_ = 0; + + /// Total plasma object bytes that are consumed by core workers. + int64_t total_consumed_bytes_ = 0; }; } // namespace plasma diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 34e08080c..5a44e297c 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -123,6 +123,8 @@ bool PlasmaStoreRunner::IsPlasmaObjectSpillable(const ObjectID &object_id) { return store_->IsObjectSpillable(object_id); } +int64_t PlasmaStoreRunner::GetConsumedBytes() { return store_->GetConsumedBytes(); } + std::unique_ptr plasma_store_runner; } // namespace plasma diff --git a/src/ray/object_manager/plasma/store_runner.h b/src/ray/object_manager/plasma/store_runner.h index 7ac7be59b..f4785810c 100644 --- a/src/ray/object_manager/plasma/store_runner.h +++ b/src/ray/object_manager/plasma/store_runner.h @@ -22,6 +22,8 @@ class PlasmaStoreRunner { } bool IsPlasmaObjectSpillable(const ObjectID &object_id); + int64_t GetConsumedBytes(); + void GetAvailableMemoryAsync(std::function callback) const { main_service_.post([this, callback]() { store_->GetAvailableMemory(callback); }); } diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 386ed988a..8e225293c 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -138,6 +138,8 @@ message ObjectStoreStats { int64 object_store_bytes_avail = 8; // The number of local objects total. int64 num_local_objects = 9; + // The number of plasma object bytes that are consumed by core workers. + int64 consumed_bytes = 10; } message GetNodeStatsReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e784758b1..2c20bab40 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2384,7 +2384,9 @@ bool NodeManager::GetObjectsFromPlasma(const std::vector &object_ids, // heavy load, then this request can still block the NodeManager event loop // since we must wait for the plasma store's reply. We should consider using // an `AsyncGet` instead. - if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) { + if (!store_client_ + .Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false) + .ok()) { return false; } @@ -2546,6 +2548,8 @@ rpc::ObjectStoreStats AccumulateStoreStats( cur_store.object_store_bytes_avail()); store_stats.set_num_local_objects(store_stats.num_local_objects() + cur_store.num_local_objects()); + store_stats.set_consumed_bytes(store_stats.consumed_bytes() + + cur_store.consumed_bytes()); } return store_stats; }