From edbb2937d393f9cd95a5016bc2df5250bbd59152 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Sat, 23 Jan 2021 23:15:32 -0800 Subject: [PATCH] [Object Spilling] Multi node file spilling V2. (#13542) * done. * done. * Fix a mistake. * Ready. * Fix issues. * fix. * Finished the first round of code review. * formatting. * In progress. * Formatting. * Addressed code review. * Formatting * Fix tests. * fix bugs. * Skip flaky tests for now. --- python/ray/external_storage.py | 4 + python/ray/parameter.py | 3 + python/ray/tests/BUILD | 2 +- python/ray/tests/test_object_spilling.py | 183 ++++++++---------- src/ray/common/ray_config_def.h | 4 + src/ray/gcs/accessor.h | 2 + .../gcs/gcs_client/service_based_accessor.cc | 4 +- .../gcs/gcs_client/service_based_accessor.h | 1 + src/ray/gcs/gcs_server/gcs_object_manager.cc | 10 +- src/ray/gcs/gcs_server/gcs_object_manager.h | 1 + .../gcs_server/gcs_placement_group_manager.h | 2 +- src/ray/object_manager/common.h | 5 +- src/ray/object_manager/object_buffer_pool.cc | 5 +- src/ray/object_manager/object_directory.cc | 43 ++-- src/ray/object_manager/object_directory.h | 9 +- src/ray/object_manager/object_manager.cc | 12 +- src/ray/object_manager/object_manager.h | 5 +- .../ownership_based_object_directory.cc | 6 +- src/ray/object_manager/pull_manager.cc | 60 ++++-- src/ray/object_manager/pull_manager.h | 6 +- .../object_manager/test/pull_manager_test.cc | 134 ++++++++----- src/ray/protobuf/gcs.proto | 10 +- src/ray/protobuf/gcs_service.proto | 5 +- src/ray/protobuf/node_manager.proto | 15 ++ src/ray/raylet/local_object_manager.cc | 33 +++- src/ray/raylet/local_object_manager.h | 41 +++- src/ray/raylet/node_manager.cc | 78 ++++++-- src/ray/raylet/node_manager.h | 11 ++ src/ray/raylet/raylet.cc | 5 +- src/ray/raylet/reconstruction_policy.cc | 3 +- src/ray/raylet/reconstruction_policy_test.cc | 5 +- .../raylet/test/local_object_manager_test.cc | 86 +++++++- src/ray/raylet_client/raylet_client.cc | 12 ++ src/ray/raylet_client/raylet_client.h | 9 + .../rpc/node_manager/node_manager_client.h | 3 + .../rpc/node_manager/node_manager_server.h | 5 + 36 files changed, 573 insertions(+), 249 deletions(-) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 1b4f6fec8..6e1635148 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -345,6 +345,10 @@ def setup_external_storage(config): elif storage_type == "smart_open": _external_storage = ExternalStorageSmartOpenImpl( **config["params"]) + elif storage_type == "mock_distributed_fs": + # This storage is used to unit test distributed external storages. + # TODO(sang): Delete it after introducing the mock S3 test. + _external_storage = FileSystemStorage(**config["params"]) else: raise ValueError(f"Unknown external storage type: {storage_type}") else: diff --git a/python/ray/parameter.py b/python/ray/parameter.py index a9b20769d..666b82905 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -330,3 +330,6 @@ class RayParams: # Validate external storage usage. external_storage.setup_external_storage(object_spilling_config) external_storage.reset_external_storage() + # Configure the proper system config. + self._system_config["is_external_storage_type_fs"] = ( + object_spilling_config["type"] == "filesystem") diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 8fe8b21c3..2ccdb4be2 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -53,7 +53,6 @@ py_test_module_list( "test_multinode_failures_2.py", "test_multiprocessing.py", "test_object_manager.py", - "test_object_spilling.py", "test_output.py", "test_reconstruction.py", "test_reference_counting.py", @@ -134,6 +133,7 @@ py_test_module_list( py_test_module_list( files = [ "test_placement_group.py", + "test_object_spilling.py", ], size = "large", extra_srcs = SRCS, diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 8319dbfca..68824b7bb 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -21,6 +21,15 @@ file_system_object_spilling_config = { "directory_path": spill_local_path } } +# Since we have differet protocol for a local external storage (e.g., fs) +# and distributed external storage (e.g., S3), we need to test both cases. +# This mocks the distributed fs with cluster utils. +mock_distributed_fs_object_spilling_config = { + "type": "mock_distributed_fs", + "params": { + "directory_path": spill_local_path + } +} smart_open_object_spilling_config = { "type": "smart_open", "params": { @@ -29,6 +38,15 @@ smart_open_object_spilling_config = { } +def create_object_spilling_config(request, tmp_path): + if (request.param["type"] == "filesystem" + or request.param["type"] == "mock_distributed_fs"): + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + request.param["params"]["directory_path"] = str(temp_folder) + return json.dumps(request.param), temp_folder + + @pytest.fixture( scope="function", params=[ @@ -36,10 +54,18 @@ smart_open_object_spilling_config = { # TODO(sang): Add a mock dependency to test S3. # smart_open_object_spilling_config, ]) -def object_spilling_config(request, tmpdir): - if request.param["type"] == "filesystem": - request.param["params"]["directory_path"] = str(tmpdir) - yield json.dumps(request.param) +def object_spilling_config(request, tmp_path): + yield create_object_spilling_config(request, tmp_path) + + +@pytest.fixture( + scope="function", + params=[ + file_system_object_spilling_config, + mock_distributed_fs_object_spilling_config + ]) +def multi_node_object_spilling_config(request, tmp_path): + yield create_object_spilling_config(request, tmp_path) def test_invalid_config_raises_exception(shutdown_only): @@ -75,22 +101,17 @@ def test_url_generation_and_parse(): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only): +def test_spilling_not_done_for_pinned_object(object_spilling_config, + shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = object_spilling_config ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, "min_spilling_size": 0, }) arr = np.random.rand(5 * 1024 * 1024) # 40 MB @@ -110,27 +131,23 @@ def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -@pytest.mark.parametrize( - "ray_start_cluster_head", [{ - "num_cpus": 0, - "object_store_memory": 75 * 1024 * 1024, - "_system_config": { +def test_spill_remote_object(ray_start_cluster, + multi_node_object_spilling_config): + cluster = ray_start_cluster + object_spilling_config, _ = multi_node_object_spilling_config + cluster.add_node( + num_cpus=0, + object_store_memory=75 * 1024 * 1024, + _system_config={ "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, "max_io_workers": 4, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }), + "object_spilling_config": object_spilling_config, "min_spilling_size": 0, - }, - }], - indirect=True) -def test_spill_remote_object(ray_start_cluster_head): - cluster = ray_start_cluster_head + }) + ray.init(address=cluster.address) cluster.add_node(object_store_memory=75 * 1024 * 1024) + cluster.wait_for_nodes() @ray.remote def put(): @@ -162,6 +179,7 @@ def test_spill_remote_object(ray_start_cluster_head): platform.system() == "Windows", reason="Failing on Windows.") def test_spill_objects_automatically(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. + object_spilling_config, _ = object_spilling_config ray.init( num_cpus=1, object_store_memory=75 * 1024 * 1024, @@ -197,10 +215,9 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_spill_stats(tmp_path, shutdown_only): +def test_spill_stats(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, _ = object_spilling_config ray.init( num_cpus=1, object_store_memory=100 * 1024 * 1024, @@ -208,14 +225,7 @@ def test_spill_stats(tmp_path, shutdown_only): "automatic_object_spilling_enabled": True, "max_io_workers": 100, "min_spilling_size": 1, - "object_spilling_config": json.dumps( - { - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }, - separators=(",", ":")) + "object_spilling_config": object_spilling_config }, ) @@ -242,6 +252,7 @@ def test_spill_stats(tmp_path, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") def test_spill_during_get(object_spilling_config, shutdown_only): + object_spilling_config, _ = object_spilling_config ray.init( num_cpus=4, object_store_memory=100 * 1024 * 1024, @@ -273,6 +284,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") def test_spill_deadlock(object_spilling_config, shutdown_only): + object_spilling_config, _ = object_spilling_config # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=75 * 1024 * 1024, @@ -302,10 +314,9 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_delete_objects(tmp_path, shutdown_only): +def test_delete_objects(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = object_spilling_config ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ @@ -313,12 +324,7 @@ def test_delete_objects(tmp_path, shutdown_only): "min_spilling_size": 0, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] @@ -343,13 +349,11 @@ def test_delete_objects(tmp_path, shutdown_only): @pytest.mark.skipif( - platform.system() in ["Windows", "Darwin"], - reason="Failing on " - "Windows and Mac.") -def test_delete_objects_delete_while_creating(tmp_path, shutdown_only): + platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.") +def test_delete_objects_delete_while_creating(object_spilling_config, + shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = object_spilling_config ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ @@ -357,12 +361,7 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only): "min_spilling_size": 0, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, }) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] @@ -395,25 +394,18 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only): @pytest.mark.skipif( - platform.system() in ["Windows", "Darwin"], - reason="Failing on Windows " - "and Mac.") -def test_delete_objects_on_worker_failure(tmp_path, shutdown_only): + platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.") +def test_delete_objects_on_worker_failure(object_spilling_config, + shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = object_spilling_config ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ "max_io_workers": 4, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, "min_spilling_size": 0, }) @@ -469,10 +461,10 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_delete_objects_multi_node(tmp_path, ray_start_cluster): +def test_delete_objects_multi_node(multi_node_object_spilling_config, + ray_start_cluster): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = multi_node_object_spilling_config cluster = ray_start_cluster # Head node. cluster.add_node( @@ -483,12 +475,7 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster): "min_spilling_size": 20 * 1024 * 1024, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, }) # Add 2 worker nodes. for _ in range(2): @@ -546,10 +533,9 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster): @pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.") -def test_fusion_objects(tmp_path, shutdown_only): +def test_fusion_objects(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. - temp_folder = tmp_path / "spill" - temp_folder.mkdir() + object_spilling_config, temp_folder = object_spilling_config min_spilling_size = 10 * 1024 * 1024 ray.init( object_store_memory=75 * 1024 * 1024, @@ -557,12 +543,7 @@ def test_fusion_objects(tmp_path, shutdown_only): "max_io_workers": 3, "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, "min_spilling_size": min_spilling_size, }) replay_buffer = [] @@ -600,8 +581,8 @@ def test_fusion_objects(tmp_path, shutdown_only): # https://github.com/ray-project/ray/issues/12912 -def do_test_release_resource(tmp_path, expect_released): - temp_folder = tmp_path / "spill" +def do_test_release_resource(object_spilling_config, expect_released): + object_spilling_config, temp_folder = object_spilling_config ray.init( num_cpus=1, object_store_memory=75 * 1024 * 1024, @@ -609,12 +590,7 @@ def do_test_release_resource(tmp_path, expect_released): "max_io_workers": 1, "release_resources_during_plasma_fetch": expect_released, "automatic_object_spilling_enabled": True, - "object_spilling_config": json.dumps({ - "type": "filesystem", - "params": { - "directory_path": str(temp_folder) - } - }), + "object_spilling_config": object_spilling_config, }) plasma_obj = ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8)) for _ in range(5): @@ -643,14 +619,14 @@ def do_test_release_resource(tmp_path, expect_released): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_no_release_during_plasma_fetch(tmp_path, shutdown_only): - do_test_release_resource(tmp_path, expect_released=False) +def test_no_release_during_plasma_fetch(object_spilling_config, shutdown_only): + do_test_release_resource(object_spilling_config, expect_released=False) @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_release_during_plasma_fetch(tmp_path, shutdown_only): - do_test_release_resource(tmp_path, expect_released=True) +def test_release_during_plasma_fetch(object_spilling_config, shutdown_only): + do_test_release_resource(object_spilling_config, expect_released=True) @pytest.mark.skip( @@ -661,6 +637,7 @@ def test_release_during_plasma_fetch(tmp_path, shutdown_only): @pytest.mark.timeout(30) def test_spill_objects_on_object_transfer(object_spilling_config, ray_start_cluster): + object_spilling_config, _ = object_spilling_config # This test checks that objects get spilled to make room for transferred # objects. cluster = ray_start_cluster diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index cfbc62517..d06a1c358 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -361,6 +361,10 @@ RAY_CONFIG(bool, automatic_object_deletion_enabled, true) /// Grace period until we throw the OOM error to the application in seconds. RAY_CONFIG(int64_t, oom_grace_period_s, 10) +/// Whether or not the external storage is file system. +/// This is configured based on object_spilling_config. +RAY_CONFIG(bool, is_external_storage_type_fs, true) + /* Configuration parameters for locality-aware scheduling. */ /// Whether to enable locality-aware leasing. If enabled, then Ray will consider task /// dependency locality when choosing a worker for leasing. diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index ab0704bca..3bc700202 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -303,10 +303,12 @@ class ObjectInfoAccessor { /// /// \param object_id The ID of object which location will be added to GCS. /// \param spilled_url The URL where the object has been spilled. + /// \param spilled_node_id The NodeID where the object has been spilled. /// \param callback Callback that will be called after object has been added to GCS. /// \return Status virtual Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, + const NodeID &spilled_node_id, const StatusCallback &callback) = 0; /// Remove location of object from GCS asynchronously. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index dfa192320..821e0f7d9 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1102,13 +1102,14 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( const ObjectID &object_id, const std::string &spilled_url, - const StatusCallback &callback) { + const NodeID &spilled_node_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id << ", spilled_url = " << spilled_url << ", job id = " << object_id.TaskId().JobId(); rpc::AddObjectLocationRequest request; request.set_object_id(object_id.Binary()); request.set_spilled_url(spilled_url); + request.set_spilled_node_id(spilled_node_id.Binary()); auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) { client_impl_->GetGcsRpcClient().AddObjectLocation( @@ -1179,6 +1180,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( if (!result->spilled_url().empty()) { rpc::ObjectLocationChange update; update.set_spilled_url(result->spilled_url()); + update.set_spilled_node_id(result->spilled_node_id()); update.set_size(result->size()); notification.push_back(update); } diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 2d362976d..149fa6d2e 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -326,6 +326,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor { size_t object_size, const StatusCallback &callback) override; Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, + const NodeID &node_id, const StatusCallback &callback) override; Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 73971ed7f..818904d65 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -66,6 +66,7 @@ void GcsObjectManager::HandleAddObjectLocation( NodeID node_id; std::string spilled_url; + NodeID spilled_node_id; if (!request.node_id().empty()) { node_id = NodeID::FromBinary(request.node_id()); RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId() @@ -75,12 +76,14 @@ void GcsObjectManager::HandleAddObjectLocation( absl::MutexLock lock(&mutex_); RAY_CHECK(!request.spilled_url().empty()); spilled_url = request.spilled_url(); + spilled_node_id = NodeID::FromBinary(request.spilled_node_id()); object_to_locations_[object_id].spilled_url = spilled_url; + object_to_locations_[object_id].spilled_node_id = spilled_node_id; RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id; } size_t size = request.size(); - auto on_done = [this, object_id, node_id, spilled_url, size, reply, + auto on_done = [this, object_id, node_id, spilled_url, size, spilled_node_id, reply, send_reply_callback](const Status &status) { if (status.ok()) { rpc::ObjectLocationChange notification; @@ -90,6 +93,7 @@ void GcsObjectManager::HandleAddObjectLocation( } if (!spilled_url.empty()) { notification.set_spilled_url(spilled_url); + notification.set_spilled_node_id(spilled_node_id.Binary()); } notification.set_size(size); RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(), @@ -97,7 +101,8 @@ void GcsObjectManager::HandleAddObjectLocation( RAY_LOG(DEBUG) << "Finished adding object location, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id << ", node id = " << node_id << ", task id = " << object_id.TaskId() - << ", spilled_url = " << spilled_url; + << ", spilled_url = " << spilled_url + << ", spilled_node_id = " << spilled_node_id; } else { RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString() << ", job id = " << object_id.TaskId().JobId() @@ -291,6 +296,7 @@ const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo( object_data.add_locations()->set_manager(node_id.Binary()); } object_data.set_spilled_url(it->second.spilled_url); + object_data.set_spilled_node_id(it->second.spilled_node_id.Binary()); object_data.set_size(it->second.object_size); } return object_data; diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index 2afff0816..6d4d39598 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -65,6 +65,7 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { struct LocationSet { absl::flat_hash_set locations; std::string spilled_url = ""; + NodeID spilled_node_id = NodeID::Nil(); size_t object_size = 0; }; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 8bd369417..c76849108 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -193,7 +193,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { void OnPlacementGroupCreationSuccess( const std::shared_ptr &placement_group); - /// TODO-SANG Fill it up. + /// Remove the placement group of a given id. void RemovePlacementGroup(const PlacementGroupID &placement_group_id, StatusCallback on_placement_group_removed); diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index 9c71e2c2b..3cda75266 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -17,7 +17,8 @@ using SpillObjectsCallback = std::function; using SpaceReleasedCallback = std::function; /// A callback to call when a spilled object needs to be returned to the object store. -using RestoreSpilledObjectCallback = std::function)>; +using RestoreSpilledObjectCallback = + std::function)>; } // namespace ray diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 4b6a44e6b..726a6fefc 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -59,7 +59,10 @@ std::pair ObjectBufferPool::Ge plasma::ObjectBuffer object_buffer; RAY_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer)); if (object_buffer.data == nullptr) { - RAY_LOG(ERROR) << "Failed to get object"; + RAY_LOG(INFO) + << "Failed to get a chunk of the object: " << object_id + << ". It is mostly because the object is already evicted or spilled when the " + "pull request is received. The caller will retry the pull request again."; return std::pair( errored_chunk_, ray::Status::IOError("Unable to obtain object chunk, object not local.")); diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index ccfda7f5a..27e6f42b0 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -32,7 +32,7 @@ using ray::rpc::ObjectTableData; bool UpdateObjectLocations(const std::vector &location_updates, std::shared_ptr gcs_client, std::unordered_set *node_ids, std::string *spilled_url, - size_t *object_size) { + NodeID *spilled_node_id, size_t *object_size) { // location_updates contains the updates of locations of the object. // with GcsChangeMode, we can determine whether the update mode is // addition or deletion. @@ -57,9 +57,12 @@ bool UpdateObjectLocations(const std::vector &locatio } } else { RAY_CHECK(!update.spilled_url().empty()); - RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url(); + const auto received_spilled_node_id = NodeID::FromBinary(update.spilled_node_id()); + RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url() + << " spilled at " << NodeID::FromBinary(update.spilled_node_id()); if (update.spilled_url() != *spilled_url) { *spilled_url = update.spilled_url(); + *spilled_node_id = received_spilled_node_id; isUpdated = true; } } @@ -128,14 +131,17 @@ void ObjectDirectory::HandleNodeRemoved(const NodeID &node_id) { // If the subscribed object has the removed node as a location, update // its locations with an empty update so that the location will be removed. UpdateObjectLocations({}, gcs_client_, &listener.second.current_object_locations, - &listener.second.spilled_url, &listener.second.object_size); + &listener.second.spilled_url, + &listener.second.spilled_node_id, + &listener.second.object_size); // Re-call all the subscribed callbacks for the object, since its // locations have changed. for (const auto &callback_pair : listener.second.callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. callback_pair.second(object_id, listener.second.current_object_locations, - listener.second.spilled_url, listener.second.object_size); + listener.second.spilled_url, listener.second.spilled_node_id, + listener.second.object_size); } } } @@ -162,11 +168,11 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // Once this flag is set to true, it should never go back to false. it->second.subscribed = true; - // Update entries for this object. if (!UpdateObjectLocations(object_notifications, gcs_client_, &it->second.current_object_locations, - &it->second.spilled_url, &it->second.object_size)) { + &it->second.spilled_url, &it->second.spilled_node_id, + &it->second.object_size)) { return; } // Copy the callbacks so that the callbacks can unsubscribe without interrupting @@ -180,7 +186,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // It is safe to call the callback directly since this is already running // in the subscription callback stack. callback_pair.second(object_id, it->second.current_object_locations, - it->second.spilled_url, it->second.object_size); + it->second.spilled_url, it->second.spilled_node_id, + it->second.object_size); } }; status = gcs_client_->Objects().AsyncSubscribeToLocations( @@ -198,10 +205,12 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i if (listener_state.subscribed) { auto &locations = listener_state.current_object_locations; auto &spilled_url = listener_state.spilled_url; + auto &spilled_node_id = listener_state.spilled_node_id; auto object_size = it->second.object_size; - io_service_.post([callback, locations, spilled_url, object_size, object_id]() { - callback(object_id, locations, spilled_url, object_size); - }); + io_service_.post( + [callback, locations, spilled_url, object_size, object_id, spilled_node_id]() { + callback(object_id, locations, spilled_url, spilled_node_id, object_size); + }); } return status; } @@ -233,10 +242,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, // cached locations. auto &locations = it->second.current_object_locations; auto &spilled_url = it->second.spilled_url; + auto &spilled_node_id = it->second.spilled_node_id; auto object_size = it->second.object_size; - io_service_.post([callback, object_id, spilled_url, locations, object_size]() { - callback(object_id, locations, spilled_url, object_size); - }); + io_service_.post( + [callback, object_id, spilled_url, locations, object_size, spilled_node_id]() { + callback(object_id, locations, spilled_url, spilled_node_id, object_size); + }); } else { // We do not have any locations cached due to a concurrent // SubscribeObjectLocations call, so look up the object's locations @@ -258,17 +269,19 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, if (!update->spilled_url().empty()) { rpc::ObjectLocationChange change; change.set_spilled_url(update->spilled_url()); + change.set_spilled_node_id(update->spilled_node_id()); notification.push_back(change); } std::unordered_set node_ids; std::string spilled_url; + NodeID spilled_node_id; size_t object_size = 0; UpdateObjectLocations(notification, gcs_client_, &node_ids, &spilled_url, - &object_size); + &spilled_node_id, &object_size); // It is safe to call the callback directly since this is already running // in the GCS client's lookup callback stack. - callback(object_id, node_ids, spilled_url, object_size); + callback(object_id, node_ids, spilled_url, spilled_node_id, object_size); }); } return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 8f06888ae..0a4c6300a 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -41,9 +41,9 @@ struct RemoteConnectionInfo { }; /// Callback for object location notifications. -using OnLocationsFound = std::function &, - const std::string &, size_t object_size)>; +using OnLocationsFound = std::function &, + const std::string &, const NodeID &, size_t object_size)>; class ObjectDirectoryInterface { public: @@ -185,6 +185,9 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_set current_object_locations; /// The location where this object has been spilled, if any. std::string spilled_url = ""; + // The node id that spills the object to the disk. + // It will be Nil if it uses a distributed external storage. + NodeID spilled_node_id = NodeID::Nil(); /// The size of the object. size_t object_size = 0; /// This flag will get set to true if received any notification of the object. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 467ea2567..ddd71c766 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -220,8 +220,10 @@ uint64_t ObjectManager::Pull(const std::vector &object_ref const auto &callback = [this](const ObjectID &object_id, const std::unordered_set &client_ids, - const std::string &spilled_url, size_t object_size) { - pull_manager_->OnLocationChange(object_id, client_ids, spilled_url, object_size); + const std::string &spilled_url, + const NodeID &spilled_node_id, size_t object_size) { + pull_manager_->OnLocationChange(object_id, client_ids, spilled_url, spilled_node_id, + object_size); }; for (const auto &ref : objects_to_locate) { @@ -513,7 +515,8 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { object_id, wait_state.owner_addresses[object_id], [this, wait_id](const ObjectID &lookup_object_id, const std::unordered_set &node_ids, - const std::string &spilled_url, size_t object_size) { + const std::string &spilled_url, const NodeID &spilled_node_id, + size_t object_size) { auto &wait_state = active_wait_requests_.find(wait_id)->second; // Note that the object is guaranteed to be added to local_objects_ before // the notification is triggered. @@ -554,7 +557,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { wait_id, object_id, wait_state.owner_addresses[object_id], [this, wait_id](const ObjectID &subscribe_object_id, const std::unordered_set &node_ids, - const std::string &spilled_url, size_t object_size) { + const std::string &spilled_url, const NodeID &spilled_node_id, + size_t object_size) { auto object_id_wait_state = active_wait_requests_.find(wait_id); if (object_id_wait_state == active_wait_requests_.end()) { // Depending on the timing of calls to the object directory, we diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index a114f16bc..000730122 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -106,8 +106,9 @@ class ObjectManagerInterface { class ObjectManager : public ObjectManagerInterface, public rpc::ObjectManagerServiceHandler { public: - using RestoreSpilledObjectCallback = std::function)>; + using RestoreSpilledObjectCallback = + std::function)>; /// Implementation of object manager service diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index efc37b3e8..a17d3dfc6 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -146,7 +146,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback( // It is safe to call the callback directly since this is already running // in the subscription callback stack. callback_pair.second(object_id, it->second.current_object_locations, "", - it->second.object_size); + NodeID::Nil(), it->second.object_size); } } @@ -213,7 +213,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations( RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " << "LookupLocations returns an empty list of locations."; io_service_.post([callback, object_id]() { - callback(object_id, std::unordered_set(), "", 0); + callback(object_id, std::unordered_set(), "", NodeID::Nil(), 0); }); return Status::OK(); } @@ -234,7 +234,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations( node_ids.emplace(NodeID::FromBinary(node_id)); } FilterRemovedNodes(gcs_client_, &node_ids); - callback(object_id, node_ids, "", reply.object_size()); + callback(object_id, node_ids, "", NodeID::Nil(), reply.object_size()); }); return Status::OK(); } diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 1ebf9214a..302f2f435 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -259,7 +259,8 @@ std::vector PullManager::CancelPull(uint64_t request_id) { void PullManager::OnLocationChange(const ObjectID &object_id, const std::unordered_set &client_ids, - const std::string &spilled_url, size_t object_size) { + const std::string &spilled_url, + const NodeID &spilled_node_id, size_t object_size) { // Exit if the Pull request has already been fulfilled or canceled. auto it = object_pull_requests_.find(object_id); if (it == object_pull_requests_.end()) { @@ -271,7 +272,7 @@ void PullManager::OnLocationChange(const ObjectID &object_id, // before. it->second.client_locations = std::vector(client_ids.begin(), client_ids.end()); it->second.spilled_url = spilled_url; - + it->second.spilled_node_id = spilled_node_id; if (!it->second.object_size_set) { RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size << ", num bytes being pulled is now " << num_bytes_being_pulled_; @@ -299,30 +300,47 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) { return; } + // We always pull objects from a remote node before + // restoring it because of two reasons. + // 1. This will help reducing the load of external storages + // or remote node that spilled the object. + // 2. Also, if we use multi-node file spilling, the restoration will be + // confirmed by a object location subscription, so we should pull first + // before requesting for object restoration. + bool did_pull = PullFromRandomLocation(object_id); + if (did_pull) { + // New object locations were found, so begin trying to pull from a + // client. + UpdateRetryTimer(request); + return; + } + + // If we cannot pull, it means all objects have been evicted, so try restoring objects + // from the external storage. If the object was spilled on the current node, the + // callback will restore the object from the local the disk. + // Otherwise, it will send a request to a remote node that spilled the object. + // If external storage is a distributed storage, we always try restoring from it without + // sending RPCs. if (!request.spilled_url.empty()) { - // Try to restore the spilled object. + const auto spilled_node_id = request.spilled_node_id; restore_spilled_object_( - object_id, request.spilled_url, [this, object_id](const ray::Status &status) { - bool did_pull = true; - // Fall back to fetching from another object manager. + object_id, request.spilled_url, spilled_node_id, + [this, object_id, spilled_node_id](const ray::Status &status) { if (!status.ok()) { - did_pull = PullFromRandomLocation(object_id); - } - if (!did_pull) { - RAY_LOG(WARNING) << "Object restoration failed and the object could not be " - "found on any other nodes. Object id: " - << object_id; + const auto node_id_with_issue = + spilled_node_id.IsNil() ? self_node_id_ : spilled_node_id; + RAY_LOG(WARNING) + << "Object restoration failed and the object could " + "not be " + "found on any other nodes. This can happen if the location where the " + "object was spilled is unreachable. This job may hang if the object " + "is permanently unreachable. " + "Please check the log of node of id: " + << node_id_with_issue << " Object id: " << object_id; } }); - UpdateRetryTimer(request); - } else { - // New object locations were found, so begin trying to pull from a - // client. This will be called every time a new client location - // appears. - bool did_pull = PullFromRandomLocation(object_id); - if (did_pull) { - UpdateRetryTimer(request); - } + // We shouldn't update the timer here because restoration takes some time, and since + // we retry pull requests with exponential backoff, the delay could be large. } } diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index e4a662eb6..26eba1a35 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -72,9 +72,12 @@ class PullManager { /// necessarily a super or subset of the previously available nodes. /// \param spilled_url The location of the object if it was spilled. If /// non-empty, the object may no longer be on any node. + /// \param spilled_node_id The node id of the object if it was spilled. If Nil, the + /// object may no longer be on any node. void OnLocationChange(const ObjectID &object_id, const std::unordered_set &client_ids, - const std::string &spilled_url, size_t object_size); + const std::string &spilled_url, const NodeID &spilled_node_id, + size_t object_size); /// Cancel an existing pull request. /// @@ -108,6 +111,7 @@ class PullManager { bundle_request_ids() {} std::vector client_locations; std::string spilled_url; + NodeID spilled_node_id; double next_pull_time; uint8_t num_retries; bool object_size_set = false; diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index 345cc6cea..ecdaa0619 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -24,7 +24,7 @@ class PullManagerTestWithCapacity { [this](const ObjectID &object_id, const NodeID &node_id) { num_send_pull_request_calls_++; }, - [this](const ObjectID &, const std::string &, + [this](const ObjectID &, const std::string &, const NodeID &, std::function callback) { num_restore_spilled_object_calls_++; restore_object_callback_ = callback; @@ -94,7 +94,7 @@ TEST_F(PullManagerTest, TestStaleSubscription) { ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); std::unordered_set client_ids; - pull_manager_.OnLocationChange(oid, client_ids, "", 0); + pull_manager_.OnLocationChange(oid, client_ids, "", NodeID::Nil(), 0); AssertNumActiveRequestsEquals(1); // There are no client ids to pull from. @@ -109,7 +109,7 @@ TEST_F(PullManagerTest, TestStaleSubscription) { AssertNumActiveRequestsEquals(0); client_ids.insert(NodeID::FromRandom()); - pull_manager_.OnLocationChange(oid, client_ids, "", 0); + pull_manager_.OnLocationChange(oid, client_ids, "", NodeID::Nil(), 0); // Now we're getting a notification about an object that was already cancelled. ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -128,26 +128,38 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) { ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); std::unordered_set client_ids; - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); - AssertNumActiveRequestsEquals(1); + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); // client_ids is empty here, so there's nowhere to pull from. ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_EQ(num_restore_spilled_object_calls_, 1); + ASSERT_EQ(num_restore_spilled_object_calls_, 0); - client_ids.insert(NodeID::FromRandom()); + NodeID node_that_object_spilled = NodeID::FromRandom(); fake_time_ += 10.; - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + node_that_object_spilled, 0); // The behavior is supposed to be to always restore the spilled object if possible (even // if it exists elsewhere in the cluster). ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_EQ(num_restore_spilled_object_calls_, 2); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + // The restore object call will ask the remote node to restore the object, and the + // client location is updated accordingly. + client_ids.insert(node_that_object_spilled); + fake_time_ += 10.; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + node_that_object_spilled, 0); + + // Now the pull requests are sent. + ASSERT_EQ(num_send_pull_request_calls_, 1); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); // Don't restore an object if it's local. object_is_local_ = true; num_restore_spilled_object_calls_ = 0; - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + NodeID::FromRandom(), 0); ASSERT_EQ(num_restore_spilled_object_calls_, 0); auto objects_to_cancel = pull_manager_.CancelPull(req_id); @@ -164,51 +176,78 @@ TEST_F(PullManagerTest, TestRestoreObjectFailed) { std::vector objects_to_locate; auto req_id = pull_manager_.Pull(refs, &objects_to_locate); ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); - std::unordered_set client_ids; - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); AssertNumActiveRequestsEquals(1); // client_ids is empty here, so there's nowhere to pull from. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 0); + + // Object is now spilled to a remote node, but the client_ids are still empty. + const NodeID remote_node_object_spilled = NodeID::FromRandom(); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + remote_node_object_spilled, 0); + ASSERT_EQ(num_send_pull_request_calls_, 0); ASSERT_EQ(num_restore_spilled_object_calls_, 1); restore_object_callback_(ray::Status::IOError(":(")); - // client_ids is empty here, so there's nowhere to pull from. - ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_EQ(num_restore_spilled_object_calls_, 1); - - client_ids.insert(NodeID::FromRandom()); - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); - - // We always assume the restore succeeded so there's only 1 restore call still. - ASSERT_EQ(num_send_pull_request_calls_, 0); - ASSERT_EQ(num_restore_spilled_object_calls_, 1); - + // Now the restore request has failed, the remote object shouldn't have been properly + // restored. fake_time_ += 10.0; - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + remote_node_object_spilled, 0); ASSERT_EQ(num_send_pull_request_calls_, 0); ASSERT_EQ(num_restore_spilled_object_calls_, 2); - restore_object_callback_(ray::Status::IOError(":(")); - - // Since restore failed, we can fallback to pulling from another node immediately. - ASSERT_EQ(num_send_pull_request_calls_, 1); - ASSERT_EQ(num_restore_spilled_object_calls_, 2); - - pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0); + restore_object_callback_(ray::Status::OK()); + // Now the remote restoration request succeeds, so we sholud be able to pull the object. + client_ids.insert(remote_node_object_spilled); + // Since it is the second retry, the interval gets doubled. + fake_time_ += 20.0; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + remote_node_object_spilled, 0); // Now that we've successfully sent a pull request, we need to wait for the retry period // before sending another one. ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 2); - pull_manager_.CancelPull(req_id); + auto objects_to_cancel = pull_manager_.CancelPull(req_id); AssertNoLeaks(); } +TEST_F(PullManagerTest, TestLoadBalancingRestorationRequest) { + /* Make sure when the object copy is in other raylet, we pull object from there instead + * of requesting the owner node to restore the object. */ + + auto refs = CreateObjectRefs(1); + auto obj1 = ObjectRefsToIds(refs)[0]; + rpc::Address addr1; + ASSERT_EQ(pull_manager_.NumActiveRequests(), 0); + std::vector objects_to_locate; + pull_manager_.Pull(refs, &objects_to_locate); + ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); + ASSERT_EQ(pull_manager_.NumActiveRequests(), 1); + + std::unordered_set client_ids; + const auto copy_node1 = NodeID::FromRandom(); + const auto copy_node2 = NodeID::FromRandom(); + const auto remote_node_that_spilled_object = NodeID::FromRandom(); + client_ids.insert(copy_node1); + client_ids.insert(copy_node2); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", + remote_node_that_spilled_object, 0); + + ASSERT_EQ(num_send_pull_request_calls_, 1); + // Make sure the restore request wasn't sent since there are nodes that have a copied + // object. + ASSERT_EQ(num_restore_spilled_object_calls_, 0); +} + TEST_F(PullManagerTest, TestManyUpdates) { auto refs = CreateObjectRefs(1); auto obj1 = ObjectRefsToIds(refs)[0]; @@ -222,7 +261,7 @@ TEST_F(PullManagerTest, TestManyUpdates) { client_ids.insert(NodeID::FromRandom()); for (int i = 0; i < 100; i++) { - pull_manager_.OnLocationChange(obj1, client_ids, "", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); AssertNumActiveRequestsEquals(1); } @@ -250,7 +289,7 @@ TEST_F(PullManagerTest, TestRetryTimer) { // We need to call OnLocationChange at least once, to population the list of nodes with // the object. - pull_manager_.OnLocationChange(obj1, client_ids, "", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); AssertNumActiveRequestsEquals(1); ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 0); @@ -261,7 +300,7 @@ TEST_F(PullManagerTest, TestRetryTimer) { // Location changes can trigger reset timer. for (; fake_time_ <= 120 * 10; fake_time_ += 1.) { - pull_manager_.OnLocationChange(obj1, client_ids, "", 0); + pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0); } // We should make a pull request every tick (even if it's a duplicate to a node we're @@ -294,7 +333,7 @@ TEST_F(PullManagerTest, TestBasic) { std::unordered_set client_ids; client_ids.insert(NodeID::FromRandom()); for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); } ASSERT_EQ(num_send_pull_request_calls_, oids.size()); ASSERT_EQ(num_restore_spilled_object_calls_, 0); @@ -305,7 +344,7 @@ TEST_F(PullManagerTest, TestBasic) { num_send_pull_request_calls_ = 0; fake_time_ += 10; for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); } ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -318,7 +357,7 @@ TEST_F(PullManagerTest, TestBasic) { num_send_pull_request_calls_ = 0; fake_time_ += 10; for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); } ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -340,7 +379,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { std::unordered_set client_ids; client_ids.insert(NodeID::FromRandom()); for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); } ASSERT_EQ(num_send_pull_request_calls_, oids.size()); ASSERT_EQ(num_restore_spilled_object_calls_, 0); @@ -354,7 +393,8 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { fake_time_ += 10; num_send_pull_request_calls_ = 0; for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); ASSERT_EQ(num_send_pull_request_calls_, i + 1); ASSERT_EQ(num_restore_spilled_object_calls_, 0); } @@ -368,7 +408,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { object_is_local_ = false; num_send_pull_request_calls_ = 0; for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", 0); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0); } ASSERT_EQ(num_send_pull_request_calls_, 0); @@ -390,7 +430,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) { std::unordered_set client_ids; client_ids.insert(NodeID::FromRandom()); for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size); } ASSERT_EQ(num_send_pull_request_calls_, oids.size()); ASSERT_EQ(num_restore_spilled_object_calls_, 0); @@ -406,7 +446,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) { fake_time_ += 10; auto prev_pull_requests = num_send_pull_request_calls_; for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size); ASSERT_EQ(num_send_pull_request_calls_, prev_pull_requests); ASSERT_EQ(num_restore_spilled_object_calls_, 0); } @@ -449,7 +489,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestQueue) { client_ids.insert(NodeID::FromRandom()); for (auto &oids : bundles) { for (size_t i = 0; i < oids.size(); i++) { - pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size); + pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size); } } @@ -500,7 +540,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestCancel) { req_ids.push_back(req_id); } for (size_t i = 0; i < object_sizes.size(); i++) { - pull_manager_.OnLocationChange(oids[i], {}, "", object_sizes[i]); + pull_manager_.OnLocationChange(oids[i], {}, "", NodeID::Nil(), object_sizes[i]); } AssertNumActiveRequestsEquals(num_active_requests_expected_before); pull_manager_.CancelPull(req_ids[cancel_idx]); @@ -508,14 +548,14 @@ TEST_F(PullManagerWithAdmissionControlTest, TestCancel) { // Request is really canceled. pull_manager_.OnLocationChange(oids[cancel_idx], {NodeID::FromRandom()}, "", - object_sizes[cancel_idx]); + NodeID::Nil(), object_sizes[cancel_idx]); ASSERT_EQ(num_send_pull_request_calls_, 0); // The expected number of requests at the head of the queue are pulled. int num_active = 0; for (size_t i = 0; i < refs.size() && num_active < num_active_requests_expected_after; i++) { - pull_manager_.OnLocationChange(oids[i], {NodeID::FromRandom()}, "", + pull_manager_.OnLocationChange(oids[i], {NodeID::FromRandom()}, "", NodeID::Nil(), object_sizes[i]); if (i != cancel_idx) { num_active++; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index a332a9081..1e59ae812 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -413,8 +413,11 @@ message ObjectLocationInfo { // For objects that have been spilled to external storage, the URL from which // they can be retrieved. string spilled_url = 3; + // The node id that spills the object to the disk. + // It will be Nil if it uses a distributed external storage. + bytes spilled_node_id = 4; // The size of the object in bytes. - uint64 size = 4; + uint64 size = 5; } // A notification message about one object's locations being changed. @@ -425,8 +428,11 @@ message ObjectLocationChange { // The object has been spilled to this URL. This should be set xor the above // fields are set. string spilled_url = 3; + // The node id that spills the object to the disk. + // It will be Nil if it uses a distributed external storage. + bytes spilled_node_id = 4; // The size of the object in bytes. - uint64 size = 4; + uint64 size = 5; } // A notification message about one node's resources being changed. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index eda00b806..8922ce6f4 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -272,8 +272,11 @@ message AddObjectLocationRequest { // The spilled URL that will be added to GCS Service. Either this or the node // ID should be set. string spilled_url = 3; + // The node id that spills the object to the disk. + // It will be Nil if it uses a distributed external storage. + bytes spilled_node_id = 4; // The size of the object in bytes. - uint64 size = 4; + uint64 size = 5; } message AddObjectLocationReply { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index bae2a9715..386ed988a 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -179,6 +179,18 @@ message RequestObjectSpillageReply { bool success = 1; } +message RestoreSpilledObjectRequest { + // ObjectID to restore. + bytes object_id = 1; + // Object URL where the object is spilled. + string object_url = 2; + // The node id of a node where the object is spilled. + bytes spilled_node_id = 3; +} + +message RestoreSpilledObjectReply { +} + message ReleaseUnusedBundlesRequest { repeated Bundle bundles_in_use = 1; } @@ -224,6 +236,9 @@ service NodeManagerService { // Ask the raylet to spill an object to external storage. rpc RequestObjectSpillage(RequestObjectSpillageRequest) returns (RequestObjectSpillageReply); + // Ask the raylet to restore the object from the external storage. + rpc RestoreSpilledObject(RestoreSpilledObjectRequest) + returns (RestoreSpilledObjectReply); // This method is only used by GCS, and the purpose is to release bundles // that may be leaked. When GCS restarts, it doesn't know which bundles it has leased // in the previous lifecycle. In this case, GCS will send a list of bundles that diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 721adb6bd..9909beb76 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -261,11 +261,15 @@ void LocalObjectManager::AddSpilledUrls( const ObjectID &object_id = object_ids[i]; const std::string &object_url = worker_reply.spilled_objects_url(i); RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url; + // Choose a node id to report. If an external storage type is not a filesystem, we + // don't need to report where this object is spilled. + const auto node_id_object_spilled = + is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil(); // Write to object directory. Wait for the write to finish before // releasing the object to make sure that the spilled object can // be retrieved by other raylets. RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl( - object_id, object_url, + object_id, object_url, node_id_object_spilled, [this, object_id, object_url, callback, num_remaining](Status status) { RAY_CHECK_OK(status); // Unpin the object. @@ -298,14 +302,35 @@ void LocalObjectManager::AddSpilledUrls( } void LocalObjectManager::AsyncRestoreSpilledObject( - const ObjectID &object_id, const std::string &object_url, + const ObjectID &object_id, const std::string &object_url, const NodeID &node_id, std::function callback) { - RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL " - << object_url; if (objects_pending_restore_.count(object_id) > 0) { // If the same object is restoring, we dedup here. return; } + + if (!node_id.IsNil() && node_id != self_node_id_) { + // If we know where this object was spilled, and the current node is not that one, + // send a RPC to a remote node that spilled the object to restore it. + RAY_LOG(DEBUG) << "Send a object restoration request of id: " << object_id + << " to a remote node: " << node_id; + // TODO(sang): We need to deduplicate this remote RPC. Since restore request + // is retried every 10ms without exponential backoff, this can add huge overhead to a + // remote node that spilled the object. + restore_object_from_remote_node_(object_id, object_url, node_id); + if (callback) { + callback(Status::OK()); + } + return; + } + + // Restore the object. + RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL " + << object_url; + if (!node_id.IsNil()) { + RAY_CHECK(spilled_objects_url_.count(object_id) > 0); + } + RAY_CHECK(objects_pending_restore_.emplace(object_id).second) << "Object dedupe wasn't done properly. Please report if you see this issue."; io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback]( diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 14142f5f9..c4f157d58 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -16,6 +16,8 @@ #include +#include +#include #include #include "ray/common/id.h" @@ -24,6 +26,7 @@ #include "ray/object_manager/common.h" #include "ray/raylet/worker_pool.h" #include "ray/rpc/worker/core_worker_client_pool.h" +#include "ray/util/util.h" #include "src/ray/protobuf/node_manager.pb.h" namespace ray { @@ -35,15 +38,18 @@ namespace raylet { class LocalObjectManager { public: LocalObjectManager( - boost::asio::io_service &io_context, size_t free_objects_batch_size, + const NodeID &node_id, size_t free_objects_batch_size, int64_t free_objects_period_ms, IOWorkerPoolInterface &io_worker_pool, gcs::ObjectInfoAccessor &object_info_accessor, rpc::CoreWorkerClientPool &owner_client_pool, bool object_pinning_enabled, bool automatic_object_deletion_enabled, int max_io_workers, - int64_t min_spilling_size, + int64_t min_spilling_size, bool is_external_storage_type_fs, std::function &)> on_objects_freed, - std::function is_plasma_object_spillable) - : free_objects_period_ms_(free_objects_period_ms), + std::function is_plasma_object_spillable, + std::function + restore_object_from_remote_node) + : self_node_id_(node_id), + free_objects_period_ms_(free_objects_period_ms), free_objects_batch_size_(free_objects_batch_size), io_worker_pool_(io_worker_pool), object_info_accessor_(object_info_accessor), @@ -55,7 +61,9 @@ class LocalObjectManager { min_spilling_size_(min_spilling_size), num_active_workers_(0), max_active_workers_(max_io_workers), - is_plasma_object_spillable_(is_plasma_object_spillable) {} + is_plasma_object_spillable_(is_plasma_object_spillable), + restore_object_from_remote_node_(restore_object_from_remote_node), + is_external_storage_type_fs_(is_external_storage_type_fs) {} /// Pin objects. /// @@ -90,10 +98,15 @@ class LocalObjectManager { /// Restore a spilled object from external storage back into local memory. /// /// \param object_id The ID of the object to restore. - /// \param object_url The URL in external storage from which the object can be restored. - /// \param callback A callback to call when the restoration is done. Status - /// will contain the error during restoration, if any. + /// \param object_url The URL where the object is spilled. + /// \param node_id Node id that we try restoring the object. If Nil is provided, the + /// object is restored directly from the external storage. If a node id is provided, it + /// sends a RPC request to a corresponding node if the given node_id is not equivalent + /// to a self node id. + /// \param callback A callback to call when the restoration is done. + /// Status will contain the error during restoration, if any. void AsyncRestoreSpilledObject(const ObjectID &object_id, const std::string &object_url, + const NodeID &node_id, std::function callback); /// Try to clear any objects that have been freed. @@ -160,6 +173,8 @@ class LocalObjectManager { /// \param urls_to_delete List of urls to delete from external storages. void DeleteSpilledObjects(std::vector &urls_to_delete); + const NodeID self_node_id_; + /// The period between attempts to eagerly evict objects from plasma. const int64_t free_objects_period_ms_; @@ -247,6 +262,16 @@ class LocalObjectManager { /// Return true if unpinned, meaning we can safely spill the object. False otherwise. std::function is_plasma_object_spillable_; + /// Callback to restore object of object id from a remote node of node id. + std::function + restore_object_from_remote_node_; + + /// Used to decide spilling protocol. + /// If it is "filesystem", it restores spilled objects only from an owner node. + /// If it is not (meaning it is distributed backend), it always restores objects + /// directly from the external storage. + bool is_external_storage_type_fs_; + /// /// Stats /// diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1b8c50c58..072064f46 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -158,19 +158,29 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self agent_manager_service_(io_service, *agent_manager_service_handler_), client_call_manager_(io_service), worker_rpc_pool_(client_call_manager_), - local_object_manager_(io_service_, RayConfig::instance().free_objects_batch_size(), - RayConfig::instance().free_objects_period_milliseconds(), - worker_pool_, gcs_client_->Objects(), worker_rpc_pool_, - /* object_pinning_enabled */ config.object_pinning_enabled, - /* automatic_object_deletion_enabled */ - config.automatic_object_deletion_enabled, - /*max_io_workers*/ config.max_io_workers, - /*min_spilling_size*/ config.min_spilling_size, - [this](const std::vector &object_ids) { - object_manager_.FreeObjects(object_ids, - /*local_only=*/false); - }, - is_plasma_object_spillable), + local_object_manager_( + self_node_id_, RayConfig::instance().free_objects_batch_size(), + RayConfig::instance().free_objects_period_milliseconds(), worker_pool_, + gcs_client_->Objects(), worker_rpc_pool_, + /* object_pinning_enabled */ config.object_pinning_enabled, + /* automatic_object_deletion_enabled */ + config.automatic_object_deletion_enabled, + /*max_io_workers*/ config.max_io_workers, + /*min_spilling_size*/ config.min_spilling_size, + /*is_external_storage_type_fs*/ + RayConfig::instance().is_external_storage_type_fs(), + /*on_objects_freed*/ + [this](const std::vector &object_ids) { + object_manager_.FreeObjects(object_ids, + /*local_only=*/false); + }, + is_plasma_object_spillable, + /*restore_object_from_remote_node*/ + [this](const ObjectID &object_id, const std::string &spilled_url, + const NodeID &node_id) { + SendSpilledObjectRestorationRequestToRemoteNode(object_id, spilled_url, + node_id); + }), report_worker_backlog_(RayConfig::instance().report_worker_backlog()), last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), @@ -511,6 +521,24 @@ void NodeManager::HandleRequestObjectSpillage( }); } +void NodeManager::HandleRestoreSpilledObject( + const rpc::RestoreSpilledObjectRequest &request, + rpc::RestoreSpilledObjectReply *reply, rpc::SendReplyCallback send_reply_callback) { + const auto object_id = ObjectID::FromBinary(request.object_id()); + const auto spilled_node_id = NodeID::FromBinary(request.spilled_node_id()); + const auto object_url = request.object_url(); + RAY_CHECK(spilled_node_id == self_node_id_); + RAY_LOG(DEBUG) << "Restore spilled object request received. Object id: " << object_id + << " spilled_node_id: " << self_node_id_ + << " object url: " << object_url; + local_object_manager_.AsyncRestoreSpilledObject(object_id, object_url, spilled_node_id, + nullptr); + // Just reply right away. The caller will keep hitting this RPC endpoint until + // restoration succeeds, so we can safely reply here without waiting for the + // restoreSpilledObject to be done. + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::HandleReleaseUnusedBundles( const rpc::ReleaseUnusedBundlesRequest &request, rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -2714,6 +2742,30 @@ void NodeManager::PublishInfeasibleTaskError(const Task &task) const { } } +void NodeManager::SendSpilledObjectRestorationRequestToRemoteNode( + const ObjectID &object_id, const std::string &spilled_url, const NodeID &node_id) { + // Fetch from a remote node. + if (!remote_node_manager_addresses_.contains(node_id)) { + // It is possible the new node information is not received at this point. + // In this case, the PullManager will handle retry, so we just return. + return; + } + const auto &entry = remote_node_manager_addresses_.find(node_id); + // TODO(sang): Use a node manager pool instead. + auto raylet_client = + std::make_shared(rpc::NodeManagerWorkerClient::make( + entry->second.first, entry->second.second, client_call_manager_)); + raylet_client->RestoreSpilledObject( + object_id, spilled_url, node_id, + [](const ray::Status &status, const rpc::RestoreSpilledObjectReply &r) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to send a spilled object restoration request to a " + "remote node. This request will be retried. Error message: " + << status.ToString(); + } + }); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d626e5246..3a68fcbae 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -28,6 +28,7 @@ #include "ray/common/task/scheduling_resources.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/agent_manager.h" +#include "ray/raylet_client/raylet_client.h" #include "ray/raylet/local_object_manager.h" #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h" @@ -603,6 +604,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::RequestObjectSpillageReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `RestoreSpilledObject` request. + void HandleRestoreSpilledObject(const rpc::RestoreSpilledObjectRequest &request, + rpc::RestoreSpilledObjectReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `ReleaseUnusedBundles` request. void HandleReleaseUnusedBundles(const rpc::ReleaseUnusedBundlesRequest &request, rpc::ReleaseUnusedBundlesReply *reply, @@ -633,6 +639,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \param task Task that is infeasible void PublishInfeasibleTaskError(const Task &task) const; + /// Send a object restoration request to a remote node of a given node id. + void SendSpilledObjectRestorationRequestToRemoteNode(const ObjectID &object_id, + const std::string &spilled_url, + const NodeID &node_id); + std::unordered_map> MakeTasksByClass( const std::vector &tasks) const; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 6aeec576e..4d9514e62 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -72,10 +72,11 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ std::make_shared(main_service, gcs_client_))), object_manager_( main_service, self_node_id_, object_manager_config, object_directory_, - [this](const ObjectID &object_id, const std::string &spilled_url, + [this](const ObjectID &object_id, const std::string &object_url, + const NodeID &node_id, std::function callback) { node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject( - object_id, spilled_url, callback); + object_id, object_url, node_id, callback); }, [this]() { // This callback is called from the plasma store thread. diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index f4fd3d025..1da422529 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -179,7 +179,8 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { created_object_id, it->second.owner_addresses[created_object_id], [this, task_id, reconstruction_attempt]( const ray::ObjectID &object_id, const std::unordered_set &nodes, - const std::string &spilled_url, size_t object_size) { + const std::string &spilled_url, const ray::NodeID &spilled_node_id, + size_t object_size) { if (nodes.empty() && spilled_url.empty()) { // The required object no longer exists on any live nodes. Attempt // reconstruction. diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 8b5fd9d0e..d4eb387a3 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -58,9 +58,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const ObjectID object_id = callback.first; auto it = locations_.find(object_id); if (it == locations_.end()) { - callback.second(object_id, std::unordered_set(), "", 0); + callback.second(object_id, std::unordered_set(), "", NodeID::Nil(), + 0); } else { - callback.second(object_id, it->second, "", 0); + callback.second(object_id, it->second, "", NodeID::Nil(), 0); } } callbacks_.clear(); diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index bbae5bb14..8ff77250f 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -84,12 +84,16 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface { restore_callbacks.push_back(callback); } - void ReplyRestoreObjects(int64_t bytes_restored, Status status = Status::OK()) { + bool ReplyRestoreObjects(int64_t bytes_restored, Status status = Status::OK()) { rpc::RestoreSpilledObjectsReply reply; reply.set_bytes_restored_total(bytes_restored); + if (restore_callbacks.size() == 0) { + return false; + }; auto callback = restore_callbacks.front(); callback(status, reply); restore_callbacks.pop_front(); + return true; } void DeleteSpilledObjects( @@ -190,6 +194,7 @@ class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor { size_t object_size, const gcs::StatusCallback &callback)); Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, + const NodeID &spilled_node_id, const gcs::StatusCallback &callback) { object_urls[object_id] = spilled_url; callbacks.push_back(callback); @@ -252,12 +257,15 @@ class LocalObjectManagerTest : public ::testing::Test { LocalObjectManagerTest() : owner_client(std::make_shared()), client_pool([&](const rpc::Address &addr) { return owner_client; }), - manager(io_service_, free_objects_batch_size, + manager_node_id_(NodeID::FromRandom()), + manager(manager_node_id_, free_objects_batch_size, /*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool, /*object_pinning_enabled=*/true, /*automatic_object_delete_enabled=*/true, /*max_io_workers=*/2, /*min_spilling_size=*/0, + /*is_external_storage_type_fs=*/true, + /*on_objects_freed=*/ [&](const std::vector &object_ids) { for (const auto &object_id : object_ids) { freed.insert(object_id); @@ -266,12 +274,24 @@ class LocalObjectManagerTest : public ::testing::Test { /*is_plasma_object_spillable=*/ [&](const ray::ObjectID &object_id) { return unevictable_objects_.count(object_id) == 0; + }, + /*restore_object_from_remote_node=*/ + [&](const ObjectID &object_id, const std::string spilled_url, + const NodeID &node_id) { + if (remote_node_set_restore_requested_.count(node_id) == 0) { + remote_node_set_restore_requested_.emplace( + node_id, std::unordered_set()); + } + remote_node_set_restore_requested_[node_id].emplace(object_id); }), unpins(std::make_shared>()) { RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}}); } - void TearDown() { unevictable_objects_.clear(); } + void TearDown() { + unevictable_objects_.clear(); + remote_node_set_restore_requested_.clear(); + } std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) { return url + "?" + "num_objects=" + std::to_string(num_objects) + @@ -284,7 +304,10 @@ class LocalObjectManagerTest : public ::testing::Test { rpc::CoreWorkerClientPool client_pool; MockIOWorkerPool worker_pool; MockObjectInfoAccessor object_table; + NodeID manager_node_id_; LocalObjectManager manager; + std::unordered_map> + remote_node_set_restore_requested_; std::unordered_set freed; // This hashmap is incremented when objects are unpinned by destroying their @@ -323,16 +346,43 @@ TEST_F(LocalObjectManagerTest, TestPin) { } TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { - ObjectID object_id = ObjectID::FromRandom(); - std::string object_url("url"); + // First, spill objects. + std::vector object_ids; + std::vector> objects; + + for (size_t i = 0; i < free_objects_batch_size; i++) { + ObjectID object_id = ObjectID::FromRandom(); + object_ids.push_back(object_id); + auto data_buffer = std::make_shared(0, object_id, unpins); + std::unique_ptr object( + new RayObject(data_buffer, nullptr, std::vector())); + objects.push_back(std::move(object)); + } + manager.PinObjects(object_ids, std::move(objects)); + + manager.SpillObjects(object_ids, + [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); + std::vector urls; + for (size_t i = 0; i < object_ids.size(); i++) { + urls.push_back(BuildURL("url" + std::to_string(i))); + } + ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); + for (size_t i = 0; i < object_ids.size(); i++) { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } + + // Then try restoring objects from local. + ObjectID object_id = object_ids[0]; + const auto url = urls[0]; int num_times_fired = 0; EXPECT_CALL(worker_pool, PushRestoreWorker(_)); // Subsequent calls should be deduped, so that only one callback should be fired. for (int i = 0; i < 10; i++) { - manager.AsyncRestoreSpilledObject(object_id, object_url, [&](const Status &status) { - ASSERT_TRUE(status.ok()); - num_times_fired++; - }); + manager.AsyncRestoreSpilledObject(object_id, url, manager_node_id_, + [&](const Status &status) { + ASSERT_TRUE(status.ok()); + num_times_fired++; + }); } ASSERT_EQ(num_times_fired, 0); @@ -342,7 +392,25 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { ASSERT_EQ(num_times_fired, 0); } worker_pool.io_worker_client->ReplyRestoreObjects(10); + // The restore should've been invoked. ASSERT_EQ(num_times_fired, 1); + + // If the object wasn't spilled on the current node, it should request restoration to + // remote nodes. + ObjectID remote_object_id = ObjectID::FromRandom(); + const auto remote_object_url = BuildURL("remote_url"); + NodeID remote_node_id = NodeID::FromRandom(); + manager.AsyncRestoreSpilledObject(remote_object_id, remote_object_url, remote_node_id, + [&](const Status &status) { + ASSERT_TRUE(status.ok()); + num_times_fired++; + }); + // Make sure the remote call was invoked. + ASSERT_FALSE(worker_pool.io_worker_client->ReplyRestoreObjects(10)); + ASSERT_TRUE(remote_node_set_restore_requested_.count(remote_node_id) > 0); + ASSERT_TRUE(remote_node_set_restore_requested_[remote_node_id].count(remote_object_id) > + 0); + ASSERT_EQ(num_times_fired, 2); } TEST_F(LocalObjectManagerTest, TestExplicitSpill) { diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 739832b2b..b3177071a 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -311,6 +311,18 @@ void raylet::RayletClient::RequestObjectSpillage( grpc_client_->RequestObjectSpillage(request, callback); } +void raylet::RayletClient::RestoreSpilledObject( + const ObjectID &object_id, const std::string &object_url, + const NodeID &spilled_node_id, + const rpc::ClientCallback &callback) { + RAY_CHECK(!spilled_node_id.IsNil()); + rpc::RestoreSpilledObjectRequest request; + request.set_object_id(object_id.Binary()); + request.set_object_url(object_url); + request.set_spilled_node_id(spilled_node_id.Binary()); + grpc_client_->RestoreSpilledObject(request, callback); +} + Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id, bool disconnect_worker) { rpc::ReturnWorkerRequest request; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 185ca445a..cf9cfea56 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -332,6 +332,15 @@ class RayletClient : public RayletClientInterface { const ObjectID &object_id, const rpc::ClientCallback &callback); + /// Ask the raylet to restore the object of a given id. + /// \param object_id Object id that the remote raylet needs to restore. + /// \param object_url Object URL where the object is spilled. + /// \param spilled_node_id Node id of a node where the object is spilled. + void RestoreSpilledObject( + const ObjectID &object_id, const std::string &object_url, + const NodeID &spilled_node_id, + const rpc::ClientCallback &callback); + /// Implements WorkerLeaseInterface. void RequestWorkerLease( const ray::TaskSpecification &resource_spec, diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 1c9b16c18..81182ab94 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -100,6 +100,9 @@ class NodeManagerWorkerClient /// Ask the raylet to spill an object to external storage. VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestObjectSpillage, grpc_client_, ) + /// Ask the raylet to restore an object from external storage. + VOID_RPC_CLIENT_METHOD(NodeManagerService, RestoreSpilledObject, grpc_client_, ) + /// Release unused bundles. VOID_RPC_CLIENT_METHOD(NodeManagerService, ReleaseUnusedBundles, grpc_client_, ) diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 08893d49f..7f7691508 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -36,6 +36,7 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \ RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \ RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \ + RPC_SERVICE_HANDLER(NodeManagerService, RestoreSpilledObject) \ RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. @@ -102,6 +103,10 @@ class NodeManagerServiceHandler { RequestObjectSpillageReply *reply, SendReplyCallback send_reply_callback) = 0; + virtual void HandleRestoreSpilledObject(const RestoreSpilledObjectRequest &request, + RestoreSpilledObjectReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleReleaseUnusedBundles(const ReleaseUnusedBundlesRequest &request, ReleaseUnusedBundlesReply *reply, SendReplyCallback send_reply_callback) = 0;