[Object Spilling] Multi node file spilling V2. (#13542)

* done.

* done.

* Fix a mistake.

* Ready.

* Fix issues.

* fix.

* Finished the first round of code review.

* formatting.

* In progress.

* Formatting.

* Addressed code review.

* Formatting

* Fix tests.

* fix bugs.

* Skip flaky tests for now.
This commit is contained in:
SangBin Cho
2021-01-23 23:15:32 -08:00
committed by GitHub
parent e675e5b75a
commit edbb2937d3
36 changed files with 573 additions and 249 deletions
+4
View File
@@ -345,6 +345,10 @@ def setup_external_storage(config):
elif storage_type == "smart_open":
_external_storage = ExternalStorageSmartOpenImpl(
**config["params"])
elif storage_type == "mock_distributed_fs":
# This storage is used to unit test distributed external storages.
# TODO(sang): Delete it after introducing the mock S3 test.
_external_storage = FileSystemStorage(**config["params"])
else:
raise ValueError(f"Unknown external storage type: {storage_type}")
else:
+3
View File
@@ -330,3 +330,6 @@ class RayParams:
# Validate external storage usage.
external_storage.setup_external_storage(object_spilling_config)
external_storage.reset_external_storage()
# Configure the proper system config.
self._system_config["is_external_storage_type_fs"] = (
object_spilling_config["type"] == "filesystem")
+1 -1
View File
@@ -53,7 +53,6 @@ py_test_module_list(
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_manager.py",
"test_object_spilling.py",
"test_output.py",
"test_reconstruction.py",
"test_reference_counting.py",
@@ -134,6 +133,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_placement_group.py",
"test_object_spilling.py",
],
size = "large",
extra_srcs = SRCS,
+80 -103
View File
@@ -21,6 +21,15 @@ file_system_object_spilling_config = {
"directory_path": spill_local_path
}
}
# Since we have differet protocol for a local external storage (e.g., fs)
# and distributed external storage (e.g., S3), we need to test both cases.
# This mocks the distributed fs with cluster utils.
mock_distributed_fs_object_spilling_config = {
"type": "mock_distributed_fs",
"params": {
"directory_path": spill_local_path
}
}
smart_open_object_spilling_config = {
"type": "smart_open",
"params": {
@@ -29,6 +38,15 @@ smart_open_object_spilling_config = {
}
def create_object_spilling_config(request, tmp_path):
if (request.param["type"] == "filesystem"
or request.param["type"] == "mock_distributed_fs"):
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
request.param["params"]["directory_path"] = str(temp_folder)
return json.dumps(request.param), temp_folder
@pytest.fixture(
scope="function",
params=[
@@ -36,10 +54,18 @@ smart_open_object_spilling_config = {
# TODO(sang): Add a mock dependency to test S3.
# smart_open_object_spilling_config,
])
def object_spilling_config(request, tmpdir):
if request.param["type"] == "filesystem":
request.param["params"]["directory_path"] = str(tmpdir)
yield json.dumps(request.param)
def object_spilling_config(request, tmp_path):
yield create_object_spilling_config(request, tmp_path)
@pytest.fixture(
scope="function",
params=[
file_system_object_spilling_config,
mock_distributed_fs_object_spilling_config
])
def multi_node_object_spilling_config(request, tmp_path):
yield create_object_spilling_config(request, tmp_path)
def test_invalid_config_raises_exception(shutdown_only):
@@ -75,22 +101,17 @@ def test_url_generation_and_parse():
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only):
def test_spilling_not_done_for_pinned_object(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
@@ -110,27 +131,23 @@ def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
def test_spill_remote_object(ray_start_cluster,
multi_node_object_spilling_config):
cluster = ray_start_cluster
object_spilling_config, _ = multi_node_object_spilling_config
cluster.add_node(
num_cpus=0,
object_store_memory=75 * 1024 * 1024,
_system_config={
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"max_io_workers": 4,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
}),
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
},
}],
indirect=True)
def test_spill_remote_object(ray_start_cluster_head):
cluster = ray_start_cluster_head
})
ray.init(address=cluster.address)
cluster.add_node(object_store_memory=75 * 1024 * 1024)
cluster.wait_for_nodes()
@ray.remote
def put():
@@ -162,6 +179,7 @@ def test_spill_remote_object(ray_start_cluster_head):
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_automatically(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
@@ -197,10 +215,9 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_stats(tmp_path, shutdown_only):
def test_spill_stats(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, _ = object_spilling_config
ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
@@ -208,14 +225,7 @@ def test_spill_stats(tmp_path, shutdown_only):
"automatic_object_spilling_enabled": True,
"max_io_workers": 100,
"min_spilling_size": 1,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
},
separators=(",", ":"))
"object_spilling_config": object_spilling_config
},
)
@@ -242,6 +252,7 @@ def test_spill_stats(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
ray.init(
num_cpus=4,
object_store_memory=100 * 1024 * 1024,
@@ -273,6 +284,7 @@ def test_spill_during_get(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_deadlock(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
# Limit our object store to 75 MiB of memory.
ray.init(
object_store_memory=75 * 1024 * 1024,
@@ -302,10 +314,9 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects(tmp_path, shutdown_only):
def test_delete_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -313,12 +324,7 @@ def test_delete_objects(tmp_path, shutdown_only):
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
@@ -343,13 +349,11 @@ def test_delete_objects(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() in ["Windows", "Darwin"],
reason="Failing on "
"Windows and Mac.")
def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.")
def test_delete_objects_delete_while_creating(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -357,12 +361,7 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
@@ -395,25 +394,18 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() in ["Windows", "Darwin"],
reason="Failing on Windows "
"and Mac.")
def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.")
def test_delete_objects_on_worker_failure(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
"min_spilling_size": 0,
})
@@ -469,10 +461,10 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
def test_delete_objects_multi_node(multi_node_object_spilling_config,
ray_start_cluster):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = multi_node_object_spilling_config
cluster = ray_start_cluster
# Head node.
cluster.add_node(
@@ -483,12 +475,7 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
"min_spilling_size": 20 * 1024 * 1024,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
})
# Add 2 worker nodes.
for _ in range(2):
@@ -546,10 +533,9 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
@pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.")
def test_fusion_objects(tmp_path, shutdown_only):
def test_fusion_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
object_spilling_config, temp_folder = object_spilling_config
min_spilling_size = 10 * 1024 * 1024
ray.init(
object_store_memory=75 * 1024 * 1024,
@@ -557,12 +543,7 @@ def test_fusion_objects(tmp_path, shutdown_only):
"max_io_workers": 3,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
"min_spilling_size": min_spilling_size,
})
replay_buffer = []
@@ -600,8 +581,8 @@ def test_fusion_objects(tmp_path, shutdown_only):
# https://github.com/ray-project/ray/issues/12912
def do_test_release_resource(tmp_path, expect_released):
temp_folder = tmp_path / "spill"
def do_test_release_resource(object_spilling_config, expect_released):
object_spilling_config, temp_folder = object_spilling_config
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
@@ -609,12 +590,7 @@ def do_test_release_resource(tmp_path, expect_released):
"max_io_workers": 1,
"release_resources_during_plasma_fetch": expect_released,
"automatic_object_spilling_enabled": True,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
"object_spilling_config": object_spilling_config,
})
plasma_obj = ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
@@ -643,14 +619,14 @@ def do_test_release_resource(tmp_path, expect_released):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_no_release_during_plasma_fetch(tmp_path, shutdown_only):
do_test_release_resource(tmp_path, expect_released=False)
def test_no_release_during_plasma_fetch(object_spilling_config, shutdown_only):
do_test_release_resource(object_spilling_config, expect_released=False)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_release_during_plasma_fetch(tmp_path, shutdown_only):
do_test_release_resource(tmp_path, expect_released=True)
def test_release_during_plasma_fetch(object_spilling_config, shutdown_only):
do_test_release_resource(object_spilling_config, expect_released=True)
@pytest.mark.skip(
@@ -661,6 +637,7 @@ def test_release_during_plasma_fetch(tmp_path, shutdown_only):
@pytest.mark.timeout(30)
def test_spill_objects_on_object_transfer(object_spilling_config,
ray_start_cluster):
object_spilling_config, _ = object_spilling_config
# This test checks that objects get spilled to make room for transferred
# objects.
cluster = ray_start_cluster
+4
View File
@@ -361,6 +361,10 @@ RAY_CONFIG(bool, automatic_object_deletion_enabled, true)
/// Grace period until we throw the OOM error to the application in seconds.
RAY_CONFIG(int64_t, oom_grace_period_s, 10)
/// Whether or not the external storage is file system.
/// This is configured based on object_spilling_config.
RAY_CONFIG(bool, is_external_storage_type_fs, true)
/* Configuration parameters for locality-aware scheduling. */
/// Whether to enable locality-aware leasing. If enabled, then Ray will consider task
/// dependency locality when choosing a worker for leasing.
+2
View File
@@ -303,10 +303,12 @@ class ObjectInfoAccessor {
///
/// \param object_id The ID of object which location will be added to GCS.
/// \param spilled_url The URL where the object has been spilled.
/// \param spilled_node_id The NodeID where the object has been spilled.
/// \param callback Callback that will be called after object has been added to GCS.
/// \return Status
virtual Status AsyncAddSpilledUrl(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id,
const StatusCallback &callback) = 0;
/// Remove location of object from GCS asynchronously.
@@ -1102,13 +1102,14 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i
Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) {
const NodeID &spilled_node_id, const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
<< ", spilled_url = " << spilled_url
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(spilled_url);
request.set_spilled_node_id(spilled_node_id.Binary());
auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddObjectLocation(
@@ -1179,6 +1180,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
if (!result->spilled_url().empty()) {
rpc::ObjectLocationChange update;
update.set_spilled_url(result->spilled_url());
update.set_spilled_node_id(result->spilled_node_id());
update.set_size(result->size());
notification.push_back(update);
}
@@ -326,6 +326,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
size_t object_size, const StatusCallback &callback) override;
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &node_id,
const StatusCallback &callback) override;
Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
+8 -2
View File
@@ -66,6 +66,7 @@ void GcsObjectManager::HandleAddObjectLocation(
NodeID node_id;
std::string spilled_url;
NodeID spilled_node_id;
if (!request.node_id().empty()) {
node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId()
@@ -75,12 +76,14 @@ void GcsObjectManager::HandleAddObjectLocation(
absl::MutexLock lock(&mutex_);
RAY_CHECK(!request.spilled_url().empty());
spilled_url = request.spilled_url();
spilled_node_id = NodeID::FromBinary(request.spilled_node_id());
object_to_locations_[object_id].spilled_url = spilled_url;
object_to_locations_[object_id].spilled_node_id = spilled_node_id;
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id;
}
size_t size = request.size();
auto on_done = [this, object_id, node_id, spilled_url, size, reply,
auto on_done = [this, object_id, node_id, spilled_url, size, spilled_node_id, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
rpc::ObjectLocationChange notification;
@@ -90,6 +93,7 @@ void GcsObjectManager::HandleAddObjectLocation(
}
if (!spilled_url.empty()) {
notification.set_spilled_url(spilled_url);
notification.set_spilled_node_id(spilled_node_id.Binary());
}
notification.set_size(size);
RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(),
@@ -97,7 +101,8 @@ void GcsObjectManager::HandleAddObjectLocation(
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id << ", task id = " << object_id.TaskId()
<< ", spilled_url = " << spilled_url;
<< ", spilled_url = " << spilled_url
<< ", spilled_node_id = " << spilled_node_id;
} else {
RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString()
<< ", job id = " << object_id.TaskId().JobId()
@@ -291,6 +296,7 @@ const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo(
object_data.add_locations()->set_manager(node_id.Binary());
}
object_data.set_spilled_url(it->second.spilled_url);
object_data.set_spilled_node_id(it->second.spilled_node_id.Binary());
object_data.set_size(it->second.object_size);
}
return object_data;
@@ -65,6 +65,7 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
struct LocationSet {
absl::flat_hash_set<NodeID> locations;
std::string spilled_url = "";
NodeID spilled_node_id = NodeID::Nil();
size_t object_size = 0;
};
@@ -193,7 +193,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
void OnPlacementGroupCreationSuccess(
const std::shared_ptr<GcsPlacementGroup> &placement_group);
/// TODO-SANG Fill it up.
/// Remove the placement group of a given id.
void RemovePlacementGroup(const PlacementGroupID &placement_group_id,
StatusCallback on_placement_group_removed);
+3 -2
View File
@@ -17,7 +17,8 @@ using SpillObjectsCallback = std::function<bool()>;
using SpaceReleasedCallback = std::function<void()>;
/// A callback to call when a spilled object needs to be returned to the object store.
using RestoreSpilledObjectCallback = std::function<void(
const ObjectID &, const std::string &, std::function<void(const ray::Status &)>)>;
using RestoreSpilledObjectCallback =
std::function<void(const ObjectID &, const std::string &, const NodeID &,
std::function<void(const ray::Status &)>)>;
} // namespace ray
+4 -1
View File
@@ -59,7 +59,10 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
plasma::ObjectBuffer object_buffer;
RAY_CHECK_OK(store_client_.Get(&object_id, 1, 0, &object_buffer));
if (object_buffer.data == nullptr) {
RAY_LOG(ERROR) << "Failed to get object";
RAY_LOG(INFO)
<< "Failed to get a chunk of the object: " << object_id
<< ". It is mostly because the object is already evicted or spilled when the "
"pull request is received. The caller will retry the pull request again.";
return std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status>(
errored_chunk_,
ray::Status::IOError("Unable to obtain object chunk, object not local."));
+28 -15
View File
@@ -32,7 +32,7 @@ using ray::rpc::ObjectTableData;
bool UpdateObjectLocations(const std::vector<rpc::ObjectLocationChange> &location_updates,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::unordered_set<NodeID> *node_ids, std::string *spilled_url,
size_t *object_size) {
NodeID *spilled_node_id, size_t *object_size) {
// location_updates contains the updates of locations of the object.
// with GcsChangeMode, we can determine whether the update mode is
// addition or deletion.
@@ -57,9 +57,12 @@ bool UpdateObjectLocations(const std::vector<rpc::ObjectLocationChange> &locatio
}
} else {
RAY_CHECK(!update.spilled_url().empty());
RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url();
const auto received_spilled_node_id = NodeID::FromBinary(update.spilled_node_id());
RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url()
<< " spilled at " << NodeID::FromBinary(update.spilled_node_id());
if (update.spilled_url() != *spilled_url) {
*spilled_url = update.spilled_url();
*spilled_node_id = received_spilled_node_id;
isUpdated = true;
}
}
@@ -128,14 +131,17 @@ void ObjectDirectory::HandleNodeRemoved(const NodeID &node_id) {
// If the subscribed object has the removed node as a location, update
// its locations with an empty update so that the location will be removed.
UpdateObjectLocations({}, gcs_client_, &listener.second.current_object_locations,
&listener.second.spilled_url, &listener.second.object_size);
&listener.second.spilled_url,
&listener.second.spilled_node_id,
&listener.second.object_size);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.spilled_url, listener.second.object_size);
listener.second.spilled_url, listener.second.spilled_node_id,
listener.second.object_size);
}
}
}
@@ -162,11 +168,11 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// Once this flag is set to true, it should never go back to false.
it->second.subscribed = true;
// Update entries for this object.
if (!UpdateObjectLocations(object_notifications, gcs_client_,
&it->second.current_object_locations,
&it->second.spilled_url, &it->second.object_size)) {
&it->second.spilled_url, &it->second.spilled_node_id,
&it->second.object_size)) {
return;
}
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
@@ -180,7 +186,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.spilled_url, it->second.object_size);
it->second.spilled_url, it->second.spilled_node_id,
it->second.object_size);
}
};
status = gcs_client_->Objects().AsyncSubscribeToLocations(
@@ -198,10 +205,12 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
if (listener_state.subscribed) {
auto &locations = listener_state.current_object_locations;
auto &spilled_url = listener_state.spilled_url;
auto &spilled_node_id = listener_state.spilled_node_id;
auto object_size = it->second.object_size;
io_service_.post([callback, locations, spilled_url, object_size, object_id]() {
callback(object_id, locations, spilled_url, object_size);
});
io_service_.post(
[callback, locations, spilled_url, object_size, object_id, spilled_node_id]() {
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
});
}
return status;
}
@@ -233,10 +242,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
// cached locations.
auto &locations = it->second.current_object_locations;
auto &spilled_url = it->second.spilled_url;
auto &spilled_node_id = it->second.spilled_node_id;
auto object_size = it->second.object_size;
io_service_.post([callback, object_id, spilled_url, locations, object_size]() {
callback(object_id, locations, spilled_url, object_size);
});
io_service_.post(
[callback, object_id, spilled_url, locations, object_size, spilled_node_id]() {
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
});
} else {
// We do not have any locations cached due to a concurrent
// SubscribeObjectLocations call, so look up the object's locations
@@ -258,17 +269,19 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
if (!update->spilled_url().empty()) {
rpc::ObjectLocationChange change;
change.set_spilled_url(update->spilled_url());
change.set_spilled_node_id(update->spilled_node_id());
notification.push_back(change);
}
std::unordered_set<NodeID> node_ids;
std::string spilled_url;
NodeID spilled_node_id;
size_t object_size = 0;
UpdateObjectLocations(notification, gcs_client_, &node_ids, &spilled_url,
&object_size);
&spilled_node_id, &object_size);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, node_ids, spilled_url, object_size);
callback(object_id, node_ids, spilled_url, spilled_node_id, object_size);
});
}
return status;
+6 -3
View File
@@ -41,9 +41,9 @@ struct RemoteConnectionInfo {
};
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &,
const std::string &, size_t object_size)>;
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::NodeID> &,
const std::string &, const NodeID &, size_t object_size)>;
class ObjectDirectoryInterface {
public:
@@ -185,6 +185,9 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_set<NodeID> current_object_locations;
/// The location where this object has been spilled, if any.
std::string spilled_url = "";
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
NodeID spilled_node_id = NodeID::Nil();
/// The size of the object.
size_t object_size = 0;
/// This flag will get set to true if received any notification of the object.
+8 -4
View File
@@ -220,8 +220,10 @@ uint64_t ObjectManager::Pull(const std::vector<rpc::ObjectReference> &object_ref
const auto &callback = [this](const ObjectID &object_id,
const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url, size_t object_size) {
pull_manager_->OnLocationChange(object_id, client_ids, spilled_url, object_size);
const std::string &spilled_url,
const NodeID &spilled_node_id, size_t object_size) {
pull_manager_->OnLocationChange(object_id, client_ids, spilled_url, spilled_node_id,
object_size);
};
for (const auto &ref : objects_to_locate) {
@@ -513,7 +515,8 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
object_id, wait_state.owner_addresses[object_id],
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<NodeID> &node_ids,
const std::string &spilled_url, size_t object_size) {
const std::string &spilled_url, const NodeID &spilled_node_id,
size_t object_size) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
// Note that the object is guaranteed to be added to local_objects_ before
// the notification is triggered.
@@ -554,7 +557,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
wait_id, object_id, wait_state.owner_addresses[object_id],
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<NodeID> &node_ids,
const std::string &spilled_url, size_t object_size) {
const std::string &spilled_url, const NodeID &spilled_node_id,
size_t object_size) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
if (object_id_wait_state == active_wait_requests_.end()) {
// Depending on the timing of calls to the object directory, we
+3 -2
View File
@@ -106,8 +106,9 @@ class ObjectManagerInterface {
class ObjectManager : public ObjectManagerInterface,
public rpc::ObjectManagerServiceHandler {
public:
using RestoreSpilledObjectCallback = std::function<void(
const ObjectID &, const std::string &, std::function<void(const ray::Status &)>)>;
using RestoreSpilledObjectCallback =
std::function<void(const ObjectID &, const std::string &, const NodeID &,
std::function<void(const ray::Status &)>)>;
/// Implementation of object manager service
@@ -146,7 +146,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations, "",
it->second.object_size);
NodeID::Nil(), it->second.object_size);
}
}
@@ -213,7 +213,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. "
<< "LookupLocations returns an empty list of locations.";
io_service_.post([callback, object_id]() {
callback(object_id, std::unordered_set<NodeID>(), "", 0);
callback(object_id, std::unordered_set<NodeID>(), "", NodeID::Nil(), 0);
});
return Status::OK();
}
@@ -234,7 +234,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
node_ids.emplace(NodeID::FromBinary(node_id));
}
FilterRemovedNodes(gcs_client_, &node_ids);
callback(object_id, node_ids, "", reply.object_size());
callback(object_id, node_ids, "", NodeID::Nil(), reply.object_size());
});
return Status::OK();
}
+39 -21
View File
@@ -259,7 +259,8 @@ std::vector<ObjectID> PullManager::CancelPull(uint64_t request_id) {
void PullManager::OnLocationChange(const ObjectID &object_id,
const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url, size_t object_size) {
const std::string &spilled_url,
const NodeID &spilled_node_id, size_t object_size) {
// Exit if the Pull request has already been fulfilled or canceled.
auto it = object_pull_requests_.find(object_id);
if (it == object_pull_requests_.end()) {
@@ -271,7 +272,7 @@ void PullManager::OnLocationChange(const ObjectID &object_id,
// before.
it->second.client_locations = std::vector<NodeID>(client_ids.begin(), client_ids.end());
it->second.spilled_url = spilled_url;
it->second.spilled_node_id = spilled_node_id;
if (!it->second.object_size_set) {
RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size
<< ", num bytes being pulled is now " << num_bytes_being_pulled_;
@@ -299,30 +300,47 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) {
return;
}
// We always pull objects from a remote node before
// restoring it because of two reasons.
// 1. This will help reducing the load of external storages
// or remote node that spilled the object.
// 2. Also, if we use multi-node file spilling, the restoration will be
// confirmed by a object location subscription, so we should pull first
// before requesting for object restoration.
bool did_pull = PullFromRandomLocation(object_id);
if (did_pull) {
// New object locations were found, so begin trying to pull from a
// client.
UpdateRetryTimer(request);
return;
}
// If we cannot pull, it means all objects have been evicted, so try restoring objects
// from the external storage. If the object was spilled on the current node, the
// callback will restore the object from the local the disk.
// Otherwise, it will send a request to a remote node that spilled the object.
// If external storage is a distributed storage, we always try restoring from it without
// sending RPCs.
if (!request.spilled_url.empty()) {
// Try to restore the spilled object.
const auto spilled_node_id = request.spilled_node_id;
restore_spilled_object_(
object_id, request.spilled_url, [this, object_id](const ray::Status &status) {
bool did_pull = true;
// Fall back to fetching from another object manager.
object_id, request.spilled_url, spilled_node_id,
[this, object_id, spilled_node_id](const ray::Status &status) {
if (!status.ok()) {
did_pull = PullFromRandomLocation(object_id);
}
if (!did_pull) {
RAY_LOG(WARNING) << "Object restoration failed and the object could not be "
"found on any other nodes. Object id: "
<< object_id;
const auto node_id_with_issue =
spilled_node_id.IsNil() ? self_node_id_ : spilled_node_id;
RAY_LOG(WARNING)
<< "Object restoration failed and the object could "
"not be "
"found on any other nodes. This can happen if the location where the "
"object was spilled is unreachable. This job may hang if the object "
"is permanently unreachable. "
"Please check the log of node of id: "
<< node_id_with_issue << " Object id: " << object_id;
}
});
UpdateRetryTimer(request);
} else {
// New object locations were found, so begin trying to pull from a
// client. This will be called every time a new client location
// appears.
bool did_pull = PullFromRandomLocation(object_id);
if (did_pull) {
UpdateRetryTimer(request);
}
// We shouldn't update the timer here because restoration takes some time, and since
// we retry pull requests with exponential backoff, the delay could be large.
}
}
+5 -1
View File
@@ -72,9 +72,12 @@ class PullManager {
/// necessarily a super or subset of the previously available nodes.
/// \param spilled_url The location of the object if it was spilled. If
/// non-empty, the object may no longer be on any node.
/// \param spilled_node_id The node id of the object if it was spilled. If Nil, the
/// object may no longer be on any node.
void OnLocationChange(const ObjectID &object_id,
const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url, size_t object_size);
const std::string &spilled_url, const NodeID &spilled_node_id,
size_t object_size);
/// Cancel an existing pull request.
///
@@ -108,6 +111,7 @@ class PullManager {
bundle_request_ids() {}
std::vector<NodeID> client_locations;
std::string spilled_url;
NodeID spilled_node_id;
double next_pull_time;
uint8_t num_retries;
bool object_size_set = false;
@@ -24,7 +24,7 @@ class PullManagerTestWithCapacity {
[this](const ObjectID &object_id, const NodeID &node_id) {
num_send_pull_request_calls_++;
},
[this](const ObjectID &, const std::string &,
[this](const ObjectID &, const std::string &, const NodeID &,
std::function<void(const ray::Status &)> callback) {
num_restore_spilled_object_calls_++;
restore_object_callback_ = callback;
@@ -94,7 +94,7 @@ TEST_F(PullManagerTest, TestStaleSubscription) {
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));
std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(oid, client_ids, "", 0);
pull_manager_.OnLocationChange(oid, client_ids, "", NodeID::Nil(), 0);
AssertNumActiveRequestsEquals(1);
// There are no client ids to pull from.
@@ -109,7 +109,7 @@ TEST_F(PullManagerTest, TestStaleSubscription) {
AssertNumActiveRequestsEquals(0);
client_ids.insert(NodeID::FromRandom());
pull_manager_.OnLocationChange(oid, client_ids, "", 0);
pull_manager_.OnLocationChange(oid, client_ids, "", NodeID::Nil(), 0);
// Now we're getting a notification about an object that was already cancelled.
ASSERT_EQ(num_send_pull_request_calls_, 0);
@@ -128,26 +128,38 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) {
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));
std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
AssertNumActiveRequestsEquals(1);
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);
// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
client_ids.insert(NodeID::FromRandom());
NodeID node_that_object_spilled = NodeID::FromRandom();
fake_time_ += 10.;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
node_that_object_spilled, 0);
// The behavior is supposed to be to always restore the spilled object if possible (even
// if it exists elsewhere in the cluster).
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
// The restore object call will ask the remote node to restore the object, and the
// client location is updated accordingly.
client_ids.insert(node_that_object_spilled);
fake_time_ += 10.;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
node_that_object_spilled, 0);
// Now the pull requests are sent.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
// Don't restore an object if it's local.
object_is_local_ = true;
num_restore_spilled_object_calls_ = 0;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
NodeID::FromRandom(), 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
@@ -164,51 +176,78 @@ TEST_F(PullManagerTest, TestRestoreObjectFailed) {
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));
std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);
AssertNumActiveRequestsEquals(1);
// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
// Object is now spilled to a remote node, but the client_ids are still empty.
const NodeID remote_node_object_spilled = NodeID::FromRandom();
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
remote_node_object_spilled, 0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
restore_object_callback_(ray::Status::IOError(":("));
// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
client_ids.insert(NodeID::FromRandom());
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
// We always assume the restore succeeded so there's only 1 restore call still.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
// Now the restore request has failed, the remote object shouldn't have been properly
// restored.
fake_time_ += 10.0;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
remote_node_object_spilled, 0);
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
restore_object_callback_(ray::Status::IOError(":("));
// Since restore failed, we can fallback to pulling from another node immediately.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar", 0);
restore_object_callback_(ray::Status::OK());
// Now the remote restoration request succeeds, so we sholud be able to pull the object.
client_ids.insert(remote_node_object_spilled);
// Since it is the second retry, the interval gets doubled.
fake_time_ += 20.0;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
remote_node_object_spilled, 0);
// Now that we've successfully sent a pull request, we need to wait for the retry period
// before sending another one.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
pull_manager_.CancelPull(req_id);
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
AssertNoLeaks();
}
TEST_F(PullManagerTest, TestLoadBalancingRestorationRequest) {
/* Make sure when the object copy is in other raylet, we pull object from there instead
* of requesting the owner node to restore the object. */
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
rpc::Address addr1;
ASSERT_EQ(pull_manager_.NumActiveRequests(), 0);
std::vector<rpc::ObjectReference> objects_to_locate;
pull_manager_.Pull(refs, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));
ASSERT_EQ(pull_manager_.NumActiveRequests(), 1);
std::unordered_set<NodeID> client_ids;
const auto copy_node1 = NodeID::FromRandom();
const auto copy_node2 = NodeID::FromRandom();
const auto remote_node_that_spilled_object = NodeID::FromRandom();
client_ids.insert(copy_node1);
client_ids.insert(copy_node2);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar",
remote_node_that_spilled_object, 0);
ASSERT_EQ(num_send_pull_request_calls_, 1);
// Make sure the restore request wasn't sent since there are nodes that have a copied
// object.
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
}
TEST_F(PullManagerTest, TestManyUpdates) {
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
@@ -222,7 +261,7 @@ TEST_F(PullManagerTest, TestManyUpdates) {
client_ids.insert(NodeID::FromRandom());
for (int i = 0; i < 100; i++) {
pull_manager_.OnLocationChange(obj1, client_ids, "", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);
AssertNumActiveRequestsEquals(1);
}
@@ -250,7 +289,7 @@ TEST_F(PullManagerTest, TestRetryTimer) {
// We need to call OnLocationChange at least once, to population the list of nodes with
// the object.
pull_manager_.OnLocationChange(obj1, client_ids, "", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);
AssertNumActiveRequestsEquals(1);
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
@@ -261,7 +300,7 @@ TEST_F(PullManagerTest, TestRetryTimer) {
// Location changes can trigger reset timer.
for (; fake_time_ <= 120 * 10; fake_time_ += 1.) {
pull_manager_.OnLocationChange(obj1, client_ids, "", 0);
pull_manager_.OnLocationChange(obj1, client_ids, "", NodeID::Nil(), 0);
}
// We should make a pull request every tick (even if it's a duplicate to a node we're
@@ -294,7 +333,7 @@ TEST_F(PullManagerTest, TestBasic) {
std::unordered_set<NodeID> client_ids;
client_ids.insert(NodeID::FromRandom());
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, oids.size());
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
@@ -305,7 +344,7 @@ TEST_F(PullManagerTest, TestBasic) {
num_send_pull_request_calls_ = 0;
fake_time_ += 10;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, 0);
@@ -318,7 +357,7 @@ TEST_F(PullManagerTest, TestBasic) {
num_send_pull_request_calls_ = 0;
fake_time_ += 10;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, 0);
@@ -340,7 +379,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
std::unordered_set<NodeID> client_ids;
client_ids.insert(NodeID::FromRandom());
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, oids.size());
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
@@ -354,7 +393,8 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
fake_time_ += 10;
num_send_pull_request_calls_ = 0;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
ASSERT_EQ(num_send_pull_request_calls_, i + 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
}
@@ -368,7 +408,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
object_is_local_ = false;
num_send_pull_request_calls_ = 0;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", 0);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, 0);
@@ -390,7 +430,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) {
std::unordered_set<NodeID> client_ids;
client_ids.insert(NodeID::FromRandom());
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size);
}
ASSERT_EQ(num_send_pull_request_calls_, oids.size());
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
@@ -406,7 +446,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestBasic) {
fake_time_ += 10;
auto prev_pull_requests = num_send_pull_request_calls_;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size);
ASSERT_EQ(num_send_pull_request_calls_, prev_pull_requests);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
}
@@ -449,7 +489,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestQueue) {
client_ids.insert(NodeID::FromRandom());
for (auto &oids : bundles) {
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", object_size);
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), object_size);
}
}
@@ -500,7 +540,7 @@ TEST_F(PullManagerWithAdmissionControlTest, TestCancel) {
req_ids.push_back(req_id);
}
for (size_t i = 0; i < object_sizes.size(); i++) {
pull_manager_.OnLocationChange(oids[i], {}, "", object_sizes[i]);
pull_manager_.OnLocationChange(oids[i], {}, "", NodeID::Nil(), object_sizes[i]);
}
AssertNumActiveRequestsEquals(num_active_requests_expected_before);
pull_manager_.CancelPull(req_ids[cancel_idx]);
@@ -508,14 +548,14 @@ TEST_F(PullManagerWithAdmissionControlTest, TestCancel) {
// Request is really canceled.
pull_manager_.OnLocationChange(oids[cancel_idx], {NodeID::FromRandom()}, "",
object_sizes[cancel_idx]);
NodeID::Nil(), object_sizes[cancel_idx]);
ASSERT_EQ(num_send_pull_request_calls_, 0);
// The expected number of requests at the head of the queue are pulled.
int num_active = 0;
for (size_t i = 0; i < refs.size() && num_active < num_active_requests_expected_after;
i++) {
pull_manager_.OnLocationChange(oids[i], {NodeID::FromRandom()}, "",
pull_manager_.OnLocationChange(oids[i], {NodeID::FromRandom()}, "", NodeID::Nil(),
object_sizes[i]);
if (i != cancel_idx) {
num_active++;
+8 -2
View File
@@ -413,8 +413,11 @@ message ObjectLocationInfo {
// For objects that have been spilled to external storage, the URL from which
// they can be retrieved.
string spilled_url = 3;
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
bytes spilled_node_id = 4;
// The size of the object in bytes.
uint64 size = 4;
uint64 size = 5;
}
// A notification message about one object's locations being changed.
@@ -425,8 +428,11 @@ message ObjectLocationChange {
// The object has been spilled to this URL. This should be set xor the above
// fields are set.
string spilled_url = 3;
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
bytes spilled_node_id = 4;
// The size of the object in bytes.
uint64 size = 4;
uint64 size = 5;
}
// A notification message about one node's resources being changed.
+4 -1
View File
@@ -272,8 +272,11 @@ message AddObjectLocationRequest {
// The spilled URL that will be added to GCS Service. Either this or the node
// ID should be set.
string spilled_url = 3;
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
bytes spilled_node_id = 4;
// The size of the object in bytes.
uint64 size = 4;
uint64 size = 5;
}
message AddObjectLocationReply {
+15
View File
@@ -179,6 +179,18 @@ message RequestObjectSpillageReply {
bool success = 1;
}
message RestoreSpilledObjectRequest {
// ObjectID to restore.
bytes object_id = 1;
// Object URL where the object is spilled.
string object_url = 2;
// The node id of a node where the object is spilled.
bytes spilled_node_id = 3;
}
message RestoreSpilledObjectReply {
}
message ReleaseUnusedBundlesRequest {
repeated Bundle bundles_in_use = 1;
}
@@ -224,6 +236,9 @@ service NodeManagerService {
// Ask the raylet to spill an object to external storage.
rpc RequestObjectSpillage(RequestObjectSpillageRequest)
returns (RequestObjectSpillageReply);
// Ask the raylet to restore the object from the external storage.
rpc RestoreSpilledObject(RestoreSpilledObjectRequest)
returns (RestoreSpilledObjectReply);
// This method is only used by GCS, and the purpose is to release bundles
// that may be leaked. When GCS restarts, it doesn't know which bundles it has leased
// in the previous lifecycle. In this case, GCS will send a list of bundles that
+29 -4
View File
@@ -261,11 +261,15 @@ void LocalObjectManager::AddSpilledUrls(
const ObjectID &object_id = object_ids[i];
const std::string &object_url = worker_reply.spilled_objects_url(i);
RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url;
// Choose a node id to report. If an external storage type is not a filesystem, we
// don't need to report where this object is spilled.
const auto node_id_object_spilled =
is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil();
// Write to object directory. Wait for the write to finish before
// releasing the object to make sure that the spilled object can
// be retrieved by other raylets.
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
object_id, object_url,
object_id, object_url, node_id_object_spilled,
[this, object_id, object_url, callback, num_remaining](Status status) {
RAY_CHECK_OK(status);
// Unpin the object.
@@ -298,14 +302,35 @@ void LocalObjectManager::AddSpilledUrls(
}
void LocalObjectManager::AsyncRestoreSpilledObject(
const ObjectID &object_id, const std::string &object_url,
const ObjectID &object_id, const std::string &object_url, const NodeID &node_id,
std::function<void(const ray::Status &)> callback) {
RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL "
<< object_url;
if (objects_pending_restore_.count(object_id) > 0) {
// If the same object is restoring, we dedup here.
return;
}
if (!node_id.IsNil() && node_id != self_node_id_) {
// If we know where this object was spilled, and the current node is not that one,
// send a RPC to a remote node that spilled the object to restore it.
RAY_LOG(DEBUG) << "Send a object restoration request of id: " << object_id
<< " to a remote node: " << node_id;
// TODO(sang): We need to deduplicate this remote RPC. Since restore request
// is retried every 10ms without exponential backoff, this can add huge overhead to a
// remote node that spilled the object.
restore_object_from_remote_node_(object_id, object_url, node_id);
if (callback) {
callback(Status::OK());
}
return;
}
// Restore the object.
RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL "
<< object_url;
if (!node_id.IsNil()) {
RAY_CHECK(spilled_objects_url_.count(object_id) > 0);
}
RAY_CHECK(objects_pending_restore_.emplace(object_id).second)
<< "Object dedupe wasn't done properly. Please report if you see this issue.";
io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback](
+33 -8
View File
@@ -16,6 +16,8 @@
#include <google/protobuf/repeated_field.h>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <functional>
#include "ray/common/id.h"
@@ -24,6 +26,7 @@
#include "ray/object_manager/common.h"
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/node_manager.pb.h"
namespace ray {
@@ -35,15 +38,18 @@ namespace raylet {
class LocalObjectManager {
public:
LocalObjectManager(
boost::asio::io_service &io_context, size_t free_objects_batch_size,
const NodeID &node_id, size_t free_objects_batch_size,
int64_t free_objects_period_ms, IOWorkerPoolInterface &io_worker_pool,
gcs::ObjectInfoAccessor &object_info_accessor,
rpc::CoreWorkerClientPool &owner_client_pool, bool object_pinning_enabled,
bool automatic_object_deletion_enabled, int max_io_workers,
int64_t min_spilling_size,
int64_t min_spilling_size, bool is_external_storage_type_fs,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable)
: free_objects_period_ms_(free_objects_period_ms),
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable,
std::function<void(const ObjectID &, const std::string &, const NodeID &)>
restore_object_from_remote_node)
: self_node_id_(node_id),
free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
object_info_accessor_(object_info_accessor),
@@ -55,7 +61,9 @@ class LocalObjectManager {
min_spilling_size_(min_spilling_size),
num_active_workers_(0),
max_active_workers_(max_io_workers),
is_plasma_object_spillable_(is_plasma_object_spillable) {}
is_plasma_object_spillable_(is_plasma_object_spillable),
restore_object_from_remote_node_(restore_object_from_remote_node),
is_external_storage_type_fs_(is_external_storage_type_fs) {}
/// Pin objects.
///
@@ -90,10 +98,15 @@ class LocalObjectManager {
/// Restore a spilled object from external storage back into local memory.
///
/// \param object_id The ID of the object to restore.
/// \param object_url The URL in external storage from which the object can be restored.
/// \param callback A callback to call when the restoration is done. Status
/// will contain the error during restoration, if any.
/// \param object_url The URL where the object is spilled.
/// \param node_id Node id that we try restoring the object. If Nil is provided, the
/// object is restored directly from the external storage. If a node id is provided, it
/// sends a RPC request to a corresponding node if the given node_id is not equivalent
/// to a self node id.
/// \param callback A callback to call when the restoration is done.
/// Status will contain the error during restoration, if any.
void AsyncRestoreSpilledObject(const ObjectID &object_id, const std::string &object_url,
const NodeID &node_id,
std::function<void(const ray::Status &)> callback);
/// Try to clear any objects that have been freed.
@@ -160,6 +173,8 @@ class LocalObjectManager {
/// \param urls_to_delete List of urls to delete from external storages.
void DeleteSpilledObjects(std::vector<std::string> &urls_to_delete);
const NodeID self_node_id_;
/// The period between attempts to eagerly evict objects from plasma.
const int64_t free_objects_period_ms_;
@@ -247,6 +262,16 @@ class LocalObjectManager {
/// Return true if unpinned, meaning we can safely spill the object. False otherwise.
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable_;
/// Callback to restore object of object id from a remote node of node id.
std::function<void(const ObjectID &, const std::string &, const NodeID &)>
restore_object_from_remote_node_;
/// Used to decide spilling protocol.
/// If it is "filesystem", it restores spilled objects only from an owner node.
/// If it is not (meaning it is distributed backend), it always restores objects
/// directly from the external storage.
bool is_external_storage_type_fs_;
///
/// Stats
///
+65 -13
View File
@@ -158,19 +158,29 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
agent_manager_service_(io_service, *agent_manager_service_handler_),
client_call_manager_(io_service),
worker_rpc_pool_(client_call_manager_),
local_object_manager_(io_service_, RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(),
worker_pool_, gcs_client_->Objects(), worker_rpc_pool_,
/* object_pinning_enabled */ config.object_pinning_enabled,
/* automatic_object_deletion_enabled */
config.automatic_object_deletion_enabled,
/*max_io_workers*/ config.max_io_workers,
/*min_spilling_size*/ config.min_spilling_size,
[this](const std::vector<ObjectID> &object_ids) {
object_manager_.FreeObjects(object_ids,
/*local_only=*/false);
},
is_plasma_object_spillable),
local_object_manager_(
self_node_id_, RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(), worker_pool_,
gcs_client_->Objects(), worker_rpc_pool_,
/* object_pinning_enabled */ config.object_pinning_enabled,
/* automatic_object_deletion_enabled */
config.automatic_object_deletion_enabled,
/*max_io_workers*/ config.max_io_workers,
/*min_spilling_size*/ config.min_spilling_size,
/*is_external_storage_type_fs*/
RayConfig::instance().is_external_storage_type_fs(),
/*on_objects_freed*/
[this](const std::vector<ObjectID> &object_ids) {
object_manager_.FreeObjects(object_ids,
/*local_only=*/false);
},
is_plasma_object_spillable,
/*restore_object_from_remote_node*/
[this](const ObjectID &object_id, const std::string &spilled_url,
const NodeID &node_id) {
SendSpilledObjectRestorationRequestToRemoteNode(object_id, spilled_url,
node_id);
}),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
@@ -511,6 +521,24 @@ void NodeManager::HandleRequestObjectSpillage(
});
}
void NodeManager::HandleRestoreSpilledObject(
const rpc::RestoreSpilledObjectRequest &request,
rpc::RestoreSpilledObjectReply *reply, rpc::SendReplyCallback send_reply_callback) {
const auto object_id = ObjectID::FromBinary(request.object_id());
const auto spilled_node_id = NodeID::FromBinary(request.spilled_node_id());
const auto object_url = request.object_url();
RAY_CHECK(spilled_node_id == self_node_id_);
RAY_LOG(DEBUG) << "Restore spilled object request received. Object id: " << object_id
<< " spilled_node_id: " << self_node_id_
<< " object url: " << object_url;
local_object_manager_.AsyncRestoreSpilledObject(object_id, object_url, spilled_node_id,
nullptr);
// Just reply right away. The caller will keep hitting this RPC endpoint until
// restoration succeeds, so we can safely reply here without waiting for the
// restoreSpilledObject to be done.
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleReleaseUnusedBundles(
const rpc::ReleaseUnusedBundlesRequest &request,
rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) {
@@ -2714,6 +2742,30 @@ void NodeManager::PublishInfeasibleTaskError(const Task &task) const {
}
}
void NodeManager::SendSpilledObjectRestorationRequestToRemoteNode(
const ObjectID &object_id, const std::string &spilled_url, const NodeID &node_id) {
// Fetch from a remote node.
if (!remote_node_manager_addresses_.contains(node_id)) {
// It is possible the new node information is not received at this point.
// In this case, the PullManager will handle retry, so we just return.
return;
}
const auto &entry = remote_node_manager_addresses_.find(node_id);
// TODO(sang): Use a node manager pool instead.
auto raylet_client =
std::make_shared<ray::raylet::RayletClient>(rpc::NodeManagerWorkerClient::make(
entry->second.first, entry->second.second, client_call_manager_));
raylet_client->RestoreSpilledObject(
object_id, spilled_url, node_id,
[](const ray::Status &status, const rpc::RestoreSpilledObjectReply &r) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to send a spilled object restoration request to a "
"remote node. This request will be retried. Error message: "
<< status.ToString();
}
});
}
} // namespace raylet
} // namespace ray
+11
View File
@@ -28,6 +28,7 @@
#include "ray/common/task/scheduling_resources.h"
#include "ray/object_manager/object_manager.h"
#include "ray/raylet/agent_manager.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/raylet/local_object_manager.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
@@ -603,6 +604,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::RequestObjectSpillageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `RestoreSpilledObject` request.
void HandleRestoreSpilledObject(const rpc::RestoreSpilledObjectRequest &request,
rpc::RestoreSpilledObjectReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `ReleaseUnusedBundles` request.
void HandleReleaseUnusedBundles(const rpc::ReleaseUnusedBundlesRequest &request,
rpc::ReleaseUnusedBundlesReply *reply,
@@ -633,6 +639,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param task Task that is infeasible
void PublishInfeasibleTaskError(const Task &task) const;
/// Send a object restoration request to a remote node of a given node id.
void SendSpilledObjectRestorationRequestToRemoteNode(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &node_id);
std::unordered_map<SchedulingClass, ordered_set<TaskID>> MakeTasksByClass(
const std::vector<Task> &tasks) const;
+3 -2
View File
@@ -72,10 +72,11 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
std::make_shared<ObjectDirectory>(main_service, gcs_client_))),
object_manager_(
main_service, self_node_id_, object_manager_config, object_directory_,
[this](const ObjectID &object_id, const std::string &spilled_url,
[this](const ObjectID &object_id, const std::string &object_url,
const NodeID &node_id,
std::function<void(const ray::Status &)> callback) {
node_manager_.GetLocalObjectManager().AsyncRestoreSpilledObject(
object_id, spilled_url, callback);
object_id, object_url, node_id, callback);
},
[this]() {
// This callback is called from the plasma store thread.
+2 -1
View File
@@ -179,7 +179,8 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
created_object_id, it->second.owner_addresses[created_object_id],
[this, task_id, reconstruction_attempt](
const ray::ObjectID &object_id, const std::unordered_set<ray::NodeID> &nodes,
const std::string &spilled_url, size_t object_size) {
const std::string &spilled_url, const ray::NodeID &spilled_node_id,
size_t object_size) {
if (nodes.empty() && spilled_url.empty()) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
+3 -2
View File
@@ -58,9 +58,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const ObjectID object_id = callback.first;
auto it = locations_.find(object_id);
if (it == locations_.end()) {
callback.second(object_id, std::unordered_set<ray::NodeID>(), "", 0);
callback.second(object_id, std::unordered_set<ray::NodeID>(), "", NodeID::Nil(),
0);
} else {
callback.second(object_id, it->second, "", 0);
callback.second(object_id, it->second, "", NodeID::Nil(), 0);
}
}
callbacks_.clear();
@@ -84,12 +84,16 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface {
restore_callbacks.push_back(callback);
}
void ReplyRestoreObjects(int64_t bytes_restored, Status status = Status::OK()) {
bool ReplyRestoreObjects(int64_t bytes_restored, Status status = Status::OK()) {
rpc::RestoreSpilledObjectsReply reply;
reply.set_bytes_restored_total(bytes_restored);
if (restore_callbacks.size() == 0) {
return false;
};
auto callback = restore_callbacks.front();
callback(status, reply);
restore_callbacks.pop_front();
return true;
}
void DeleteSpilledObjects(
@@ -190,6 +194,7 @@ class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor {
size_t object_size, const gcs::StatusCallback &callback));
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &spilled_node_id,
const gcs::StatusCallback &callback) {
object_urls[object_id] = spilled_url;
callbacks.push_back(callback);
@@ -252,12 +257,15 @@ class LocalObjectManagerTest : public ::testing::Test {
LocalObjectManagerTest()
: owner_client(std::make_shared<MockWorkerClient>()),
client_pool([&](const rpc::Address &addr) { return owner_client; }),
manager(io_service_, free_objects_batch_size,
manager_node_id_(NodeID::FromRandom()),
manager(manager_node_id_, free_objects_batch_size,
/*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool,
/*object_pinning_enabled=*/true,
/*automatic_object_delete_enabled=*/true,
/*max_io_workers=*/2,
/*min_spilling_size=*/0,
/*is_external_storage_type_fs=*/true,
/*on_objects_freed=*/
[&](const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
freed.insert(object_id);
@@ -266,12 +274,24 @@ class LocalObjectManagerTest : public ::testing::Test {
/*is_plasma_object_spillable=*/
[&](const ray::ObjectID &object_id) {
return unevictable_objects_.count(object_id) == 0;
},
/*restore_object_from_remote_node=*/
[&](const ObjectID &object_id, const std::string spilled_url,
const NodeID &node_id) {
if (remote_node_set_restore_requested_.count(node_id) == 0) {
remote_node_set_restore_requested_.emplace(
node_id, std::unordered_set<ObjectID>());
}
remote_node_set_restore_requested_[node_id].emplace(object_id);
}),
unpins(std::make_shared<std::unordered_map<ObjectID, int>>()) {
RayConfig::instance().initialize({{"object_spilling_config", "mock_config"}});
}
void TearDown() { unevictable_objects_.clear(); }
void TearDown() {
unevictable_objects_.clear();
remote_node_set_restore_requested_.clear();
}
std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) {
return url + "?" + "num_objects=" + std::to_string(num_objects) +
@@ -284,7 +304,10 @@ class LocalObjectManagerTest : public ::testing::Test {
rpc::CoreWorkerClientPool client_pool;
MockIOWorkerPool worker_pool;
MockObjectInfoAccessor object_table;
NodeID manager_node_id_;
LocalObjectManager manager;
std::unordered_map<NodeID, std::unordered_set<ObjectID>>
remote_node_set_restore_requested_;
std::unordered_set<ObjectID> freed;
// This hashmap is incremented when objects are unpinned by destroying their
@@ -323,16 +346,43 @@ TEST_F(LocalObjectManagerTest, TestPin) {
}
TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
ObjectID object_id = ObjectID::FromRandom();
std::string object_url("url");
// First, spill objects.
std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(0, object_id, unpins);
std::unique_ptr<RayObject> object(
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects));
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
}
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < object_ids.size(); i++) {
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
}
// Then try restoring objects from local.
ObjectID object_id = object_ids[0];
const auto url = urls[0];
int num_times_fired = 0;
EXPECT_CALL(worker_pool, PushRestoreWorker(_));
// Subsequent calls should be deduped, so that only one callback should be fired.
for (int i = 0; i < 10; i++) {
manager.AsyncRestoreSpilledObject(object_id, object_url, [&](const Status &status) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
manager.AsyncRestoreSpilledObject(object_id, url, manager_node_id_,
[&](const Status &status) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
}
ASSERT_EQ(num_times_fired, 0);
@@ -342,7 +392,25 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
ASSERT_EQ(num_times_fired, 0);
}
worker_pool.io_worker_client->ReplyRestoreObjects(10);
// The restore should've been invoked.
ASSERT_EQ(num_times_fired, 1);
// If the object wasn't spilled on the current node, it should request restoration to
// remote nodes.
ObjectID remote_object_id = ObjectID::FromRandom();
const auto remote_object_url = BuildURL("remote_url");
NodeID remote_node_id = NodeID::FromRandom();
manager.AsyncRestoreSpilledObject(remote_object_id, remote_object_url, remote_node_id,
[&](const Status &status) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
// Make sure the remote call was invoked.
ASSERT_FALSE(worker_pool.io_worker_client->ReplyRestoreObjects(10));
ASSERT_TRUE(remote_node_set_restore_requested_.count(remote_node_id) > 0);
ASSERT_TRUE(remote_node_set_restore_requested_[remote_node_id].count(remote_object_id) >
0);
ASSERT_EQ(num_times_fired, 2);
}
TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
+12
View File
@@ -311,6 +311,18 @@ void raylet::RayletClient::RequestObjectSpillage(
grpc_client_->RequestObjectSpillage(request, callback);
}
void raylet::RayletClient::RestoreSpilledObject(
const ObjectID &object_id, const std::string &object_url,
const NodeID &spilled_node_id,
const rpc::ClientCallback<rpc::RestoreSpilledObjectReply> &callback) {
RAY_CHECK(!spilled_node_id.IsNil());
rpc::RestoreSpilledObjectRequest request;
request.set_object_id(object_id.Binary());
request.set_object_url(object_url);
request.set_spilled_node_id(spilled_node_id.Binary());
grpc_client_->RestoreSpilledObject(request, callback);
}
Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) {
rpc::ReturnWorkerRequest request;
+9
View File
@@ -332,6 +332,15 @@ class RayletClient : public RayletClientInterface {
const ObjectID &object_id,
const rpc::ClientCallback<rpc::RequestObjectSpillageReply> &callback);
/// Ask the raylet to restore the object of a given id.
/// \param object_id Object id that the remote raylet needs to restore.
/// \param object_url Object URL where the object is spilled.
/// \param spilled_node_id Node id of a node where the object is spilled.
void RestoreSpilledObject(
const ObjectID &object_id, const std::string &object_url,
const NodeID &spilled_node_id,
const rpc::ClientCallback<rpc::RestoreSpilledObjectReply> &callback);
/// Implements WorkerLeaseInterface.
void RequestWorkerLease(
const ray::TaskSpecification &resource_spec,
@@ -100,6 +100,9 @@ class NodeManagerWorkerClient
/// Ask the raylet to spill an object to external storage.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestObjectSpillage, grpc_client_, )
/// Ask the raylet to restore an object from external storage.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RestoreSpilledObject, grpc_client_, )
/// Release unused bundles.
VOID_RPC_CLIENT_METHOD(NodeManagerService, ReleaseUnusedBundles, grpc_client_, )
@@ -36,6 +36,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \
RPC_SERVICE_HANDLER(NodeManagerService, RestoreSpilledObject) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles)
/// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`.
@@ -102,6 +103,10 @@ class NodeManagerServiceHandler {
RequestObjectSpillageReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleRestoreSpilledObject(const RestoreSpilledObjectRequest &request,
RestoreSpilledObjectReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleReleaseUnusedBundles(const ReleaseUnusedBundlesRequest &request,
ReleaseUnusedBundlesReply *reply,
SendReplyCallback send_reply_callback) = 0;