diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 624fcb85d..4cefca998 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -563,5 +563,59 @@ def test_fusion_objects(tmp_path, shutdown_only): assert is_test_passing +# https://github.com/ray-project/ray/issues/12912 +def do_test_release_resource(tmp_path, expect_released): + temp_folder = tmp_path / "spill" + ray.init( + num_cpus=1, + object_store_memory=75 * 1024 * 1024, + _system_config={ + "max_io_workers": 1, + "release_resources_during_plasma_fetch": expect_released, + "automatic_object_spilling_enabled": True, + "object_spilling_config": json.dumps({ + "type": "filesystem", + "params": { + "directory_path": str(temp_folder) + } + }), + }) + plasma_obj = ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8)) + for _ in range(5): + ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8)) # Force spilling + + @ray.remote + def sneaky_task_tries_to_steal_released_resources(): + print("resources were released!") + + @ray.remote + def f(dep): + while True: + try: + ray.get(dep[0], timeout=0.001) + except ray.exceptions.GetTimeoutError: + pass + + done = f.remote([plasma_obj]) # noqa + canary = sneaky_task_tries_to_steal_released_resources.remote() + ready, _ = ray.wait([canary], timeout=2) + if expect_released: + assert ready + else: + assert not ready + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_no_release_during_plasma_fetch(tmp_path, shutdown_only): + do_test_release_resource(tmp_path, expect_released=False) + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_release_during_plasma_fetch(tmp_path, shutdown_only): + do_test_release_resource(tmp_path, expect_released=True) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 9f9392bf7..fe41477f7 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -274,6 +274,10 @@ RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000) /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, plasma_store_as_thread, false) +/// Whether to release worker CPUs during plasma fetches. +/// See https://github.com/ray-project/ray/issues/12912 for further discussion. +RAY_CONFIG(bool, release_resources_during_plasma_fetch, false) + /// The interval at which the gcs client will check if the address of gcs service has /// changed. When the address changed, we will resubscribe again. RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000) diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 0391b7a1d..6dad1b37b 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -232,18 +232,16 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_ Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, int num_objects, int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get, - std::vector> *results, - bool release_resources) { + std::vector> *results) { return GetImpl(object_ids, num_objects, timeout_ms, ctx, remove_after_get, results, - /*abort_if_any_object_is_exception=*/true, release_resources); + /*abort_if_any_object_is_exception=*/true); } Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, int num_objects, int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get, std::vector> *results, - bool abort_if_any_object_is_exception, - bool release_resources) { + bool abort_if_any_object_is_exception) { (*results).resize(object_ids.size(), nullptr); std::shared_ptr get_request; @@ -301,8 +299,7 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, // Wait for remaining objects (or timeout). if (should_notify_raylet) { - // SANG-TODO Implement memory store get - RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources)); + RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources=*/true)); } bool done = false; @@ -377,11 +374,11 @@ Status CoreWorkerMemoryStore::Get( const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, - bool *got_exception, bool release_resources) { + bool *got_exception) { const std::vector id_vector(object_ids.begin(), object_ids.end()); std::vector> result_objects; RAY_RETURN_NOT_OK(Get(id_vector, id_vector.size(), timeout_ms, ctx, - /*remove_after_get=*/false, &result_objects, release_resources)); + /*remove_after_get=*/false, &result_objects)); for (size_t i = 0; i < id_vector.size(); i++) { if (result_objects[i] != nullptr) { @@ -404,9 +401,8 @@ Status CoreWorkerMemoryStore::Wait(const absl::flat_hash_set &object_i std::vector id_vector(object_ids.begin(), object_ids.end()); std::vector> result_objects; RAY_CHECK(object_ids.size() == id_vector.size()); - auto status = - GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects, - /*abort_if_any_object_is_exception=*/false, /*release_resources=*/true); + auto status = GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects, + /*abort_if_any_object_is_exception=*/false); // Ignore TimedOut statuses since we return ready objects explicitly. if (!status.IsTimedOut()) { RAY_RETURN_NOT_OK(status); diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index faadafaff..709227f65 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -58,14 +58,13 @@ class CoreWorkerMemoryStore { /// \return Status. Status Get(const std::vector &object_ids, int num_objects, int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get, - std::vector> *results, - bool release_resources = true); + std::vector> *results); /// Convenience wrapper around Get() that stores results in a given result map. Status Get(const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, - bool *got_exception, bool release_resources = true); + bool *got_exception); /// Convenience wrapper around Get() that stores ready objects in a given result set. Status Wait(const absl::flat_hash_set &object_ids, int num_objects, @@ -138,12 +137,11 @@ class CoreWorkerMemoryStore { private: /// See the public version of `Get` for meaning of the other arguments. /// \param[in] abort_if_any_object_is_exception Whether we should abort if any object - /// \param[in] release_resources true if memory store blocking get needs to release /// resources. is an exception. Status GetImpl(const std::vector &object_ids, int num_objects, int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get, std::vector> *results, - bool abort_if_any_object_is_exception, bool release_resources); + bool abort_if_any_object_is_exception); /// Optional callback for putting objects into the plasma store. std::function store_in_plasma_; diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 3079b99f5..f7559e9b9 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -226,7 +226,7 @@ Status CoreWorkerPlasmaStoreProvider::Get( const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, - bool *got_exception, bool release_resources) { + bool *got_exception) { int64_t batch_size = RayConfig::instance().worker_fetch_request_size(); std::vector batch_ids; absl::flat_hash_set remaining(object_ids.begin(), object_ids.end()); @@ -277,7 +277,8 @@ Status CoreWorkerPlasmaStoreProvider::Get( size_t previous_size = remaining.size(); // This is a separate IPC from the FetchAndGet in direct call mode. if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) { - RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources)); + RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked( + RayConfig::instance().release_resources_during_plasma_fetch())); } RAY_RETURN_NOT_OK( FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout, @@ -334,9 +335,8 @@ Status CoreWorkerPlasmaStoreProvider::Wait( // This is a separate IPC from the Wait in direct call mode. if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) { - // SANG-TODO Implement wait - RAY_RETURN_NOT_OK( - raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true)); + RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked( + RayConfig::instance().release_resources_during_plasma_fetch())); } const auto owner_addresses = reference_counter_->GetOwnerAddresses(id_vector); RAY_RETURN_NOT_OK( diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 6085a50c1..e9c7a23ee 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -90,7 +90,7 @@ class CoreWorkerPlasmaStoreProvider { Status Get(const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, - bool *got_exception, bool release_resources = true); + bool *got_exception); Status Contains(const ObjectID &object_id, bool *has_object);