diff --git a/java/runtime/src/main/java/io/ray/runtime/placementgroup/PlacementGroupImpl.java b/java/runtime/src/main/java/io/ray/runtime/placementgroup/PlacementGroupImpl.java index 558d80dc3..96f499e60 100644 --- a/java/runtime/src/main/java/io/ray/runtime/placementgroup/PlacementGroupImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/placementgroup/PlacementGroupImpl.java @@ -52,11 +52,11 @@ public class PlacementGroupImpl implements PlacementGroup { /** * Wait for the placement group to be ready within the specified time. - * @param timeoutMs Timeout in milliseconds. + * @param timeoutSeconds Timeout in seconds. * @return True if the placement group is created. False otherwise. */ - public boolean wait(int timeoutMs) { - return Ray.internal().waitPlacementGroupReady(id, timeoutMs); + public boolean wait(int timeoutSeconds) { + return Ray.internal().waitPlacementGroupReady(id, timeoutSeconds); } /** diff --git a/java/test/src/main/java/io/ray/test/PlacementGroupTest.java b/java/test/src/main/java/io/ray/test/PlacementGroupTest.java index 831232e91..547d3bae6 100644 --- a/java/test/src/main/java/io/ray/test/PlacementGroupTest.java +++ b/java/test/src/main/java/io/ray/test/PlacementGroupTest.java @@ -6,6 +6,7 @@ import io.ray.api.id.ActorId; import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.placementgroup.PlacementGroupState; import io.ray.api.placementgroup.PlacementStrategy; +import io.ray.runtime.exception.RayException; import io.ray.runtime.placementgroup.PlacementGroupImpl; import java.util.List; import org.testng.Assert; @@ -33,7 +34,7 @@ public class PlacementGroupTest extends BaseTest { public void testCreateAndCallActor() { PlacementGroupImpl placementGroup = (PlacementGroupImpl)PlacementGroupTestUtils .createSimpleGroup(); - Assert.assertTrue(placementGroup.wait(10000)); + Assert.assertTrue(placementGroup.wait(10)); Assert.assertEquals(placementGroup.getName(),"unnamed_group"); // Test creating an actor from a constructor. @@ -42,7 +43,7 @@ public class PlacementGroupTest extends BaseTest { Assert.assertNotEquals(actor.getId(), ActorId.NIL); // Test calling an actor. - Assert.assertEquals(Integer.valueOf(1), actor.task(Counter::getValue).remote().get()); + Assert.assertEquals(actor.task(Counter::getValue).remote().get(), Integer.valueOf(1)); } @Test(groups = {"cluster"}) @@ -54,8 +55,8 @@ public class PlacementGroupTest extends BaseTest { PlacementGroupImpl secondPlacementGroup = (PlacementGroupImpl)PlacementGroupTestUtils .createNameSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK, 1.0, "second_placement_group"); - Assert.assertTrue(firstPlacementGroup.wait(10000)); - Assert.assertTrue(secondPlacementGroup.wait(10000)); + Assert.assertTrue(firstPlacementGroup.wait(10)); + Assert.assertTrue(secondPlacementGroup.wait(10)); PlacementGroupImpl firstPlacementGroupRes = (PlacementGroupImpl)Ray.getPlacementGroup((firstPlacementGroup).getId()); @@ -101,6 +102,15 @@ public class PlacementGroupTest extends BaseTest { PlacementGroupImpl removedPlacementGroup = (PlacementGroupImpl)Ray.getPlacementGroup((secondPlacementGroup).getId()); Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED); + + // Wait for placement group after it is removed. + int exceptionCount = 0; + try { + removedPlacementGroup.wait(10); + } catch (RayException e) { + ++exceptionCount; + } + Assert.assertEquals(exceptionCount, 1); } public void testCheckBundleIndex() { @@ -112,14 +122,14 @@ public class PlacementGroupTest extends BaseTest { } catch (IllegalArgumentException e) { ++exceptionCount; } - Assert.assertEquals(1, exceptionCount); + Assert.assertEquals(exceptionCount, 1); try { Ray.actor(Counter::new, 1).setPlacementGroup(placementGroup, -1).remote(); } catch (IllegalArgumentException e) { ++exceptionCount; } - Assert.assertEquals(2, exceptionCount); + Assert.assertEquals(exceptionCount, 2); } @Test (expectedExceptions = { IllegalArgumentException.class }) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 72d8a45d3..47215149a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1207,14 +1207,17 @@ cdef class CoreWorker: def wait_placement_group_ready(self, PlacementGroupID placement_group_id, - int32_t timeout_ms): + int32_t timeout_seconds): cdef CRayStatus status cdef CPlacementGroupID cplacement_group_id = ( CPlacementGroupID.FromBinary(placement_group_id.binary())) - cdef int ctimeout_ms = timeout_ms + cdef int ctimeout_seconds = timeout_seconds with nogil: status = CCoreWorkerProcess.GetCoreWorker() \ - .WaitPlacementGroupReady(cplacement_group_id, ctimeout_ms) + .WaitPlacementGroupReady(cplacement_group_id, ctimeout_seconds) + if status.IsNotFound(): + raise Exception("Placement group {} does not exist.".format( + placement_group_id)) return status.ok() def submit_actor_task(self, diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index a82c449a5..066ff0e75 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1256,5 +1256,37 @@ def test_create_placement_group_during_gcs_server_restart( ray.get(placement_groups[i].ready()) +@pytest.mark.parametrize( + "ray_start_cluster_head", [ + generate_system_config_map( + num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + ], + indirect=True) +def test_placement_group_wait_api(ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node(num_cpus=2) + cluster.add_node(num_cpus=2) + cluster.wait_for_nodes() + + # Create placement group 1 successfully. + placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + assert placement_group1.wait(10) + + # Restart gcs server. + cluster.head_node.kill_gcs_server() + cluster.head_node.start_gcs_server() + + # Create placement group 2 successfully. + placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + assert placement_group2.wait(10) + + # Remove placement group 1. + ray.util.remove_placement_group(placement_group1) + + # Wait for placement group 1 after it is removed. + with pytest.raises(Exception): + placement_group1.wait(10) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 8a6b6c3d1..182dbb38c 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -83,6 +83,19 @@ class PlacementGroup: placement_group_bundle_index=bundle_index, resources=resources).remote(self) + def wait(self, timeout_seconds: int) -> bool: + """Wait for the placement group to be ready within the specified time. + Args: + timeout_seconds(str): Timeout in seconds. + Return: + True if the placement group is created. False otherwise. + """ + worker = ray.worker.global_worker + worker.check_connected() + + return worker.core_worker.wait_placement_group_ready( + self.id, timeout_seconds) + @property def bundle_specs(self) -> List[Dict]: """List[Dict]: Return bundles belonging to this placement group.""" diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 20e322072..2aba250a5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1460,14 +1460,14 @@ Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_ } Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_group_id, - int timeout_ms) { + int timeout_seconds) { std::shared_ptr> status_promise = std::make_shared>(); RAY_CHECK_OK(gcs_client_->PlacementGroups().AsyncWaitUntilReady( placement_group_id, [status_promise](const Status &status) { status_promise->set_value(status); })); auto status_future = status_promise->get_future(); - if (status_future.wait_for(std::chrono::milliseconds(timeout_ms)) != + if (status_future.wait_for(std::chrono::seconds(timeout_seconds)) != std::future_status::ready) { std::ostringstream stream; stream << "There was timeout in waiting for placement group " << placement_group_id diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 8e8167eae..5e2770b71 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -682,11 +682,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Returns once the placement group is created or the timeout expires. /// /// \param placement_group The id of a placement group to wait for. - /// \param timeout_ms Timeout in milliseconds. + /// \param timeout_seconds Timeout in seconds. /// \return Status OK if the placement group is created. TimedOut if request to GCS /// server times out. NotFound if placement group is already removed or doesn't exist. Status WaitPlacementGroupReady(const PlacementGroupID &placement_group_id, - int timeout_ms); + int timeout_seconds); /// Submit an actor task. /// diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 031f62c44..5470f70fb 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -297,11 +297,11 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeRemovePlacementGroup( JNIEXPORT jboolean JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeWaitPlacementGroupReady( - JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes, jint timeout_ms) { + JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes, jint timeout_seconds) { const auto placement_group_id = JavaByteArrayToId(env, placement_group_id_bytes); auto status = ray::CoreWorkerProcess::GetCoreWorker().WaitPlacementGroupReady( - placement_group_id, timeout_ms); + placement_group_id, timeout_seconds); if (status.IsNotFound()) { env->ThrowNew(java_ray_exception_class, status.message().c_str()); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index b15cd9ce7..5ac3d0510 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -410,26 +410,48 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady( RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = " << placement_group_id; + auto callback = [placement_group_id, reply, send_reply_callback](const Status &status) { + RAY_LOG(DEBUG) + << "Finished waiting for placement group until ready, placement group id = " + << placement_group_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }; + // If the placement group does not exist or it has been successfully created, return // directly. const auto &iter = registered_placement_groups_.find(placement_group_id); if (iter == registered_placement_groups_.end()) { - RAY_LOG(DEBUG) << "Placement group is not exist, placement group id = " - << placement_group_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, - Status::NotFound("Placement group is not exist.")); + // Check whether the placement group does not exist or is removed. + auto on_done = [this, placement_group_id, reply, callback, send_reply_callback]( + const Status &status, + const boost::optional &result) { + if (result) { + RAY_LOG(DEBUG) << "Placement group is removed, placement group id = " + << placement_group_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, + Status::NotFound("Placement group is removed.")); + } else { + // `wait` is a method of placement group object. Placement group object is + // obtained by create placement group api, so it can guarantee the existence of + // placement group. + // GCS client does not guarantee the order of placement group creation and + // wait, so GCS may call wait placement group first and then create placement + // group. + placement_group_to_create_callbacks_[placement_group_id].emplace_back( + std::move(callback)); + } + }; + + Status status = + gcs_table_storage_->PlacementGroupTable().Get(placement_group_id, on_done); + if (!status.ok()) { + on_done(status, boost::none); + } } else if (iter->second->GetState() == rpc::PlacementGroupTableData::CREATED) { RAY_LOG(DEBUG) << "Placement group is created, placement group id = " << placement_group_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } else { - auto callback = [placement_group_id, reply, - send_reply_callback](const Status &status) { - RAY_LOG(DEBUG) - << "Finished waiting for placement group until ready, placement group id = " - << placement_group_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; placement_group_to_create_callbacks_[placement_group_id].emplace_back( std::move(callback)); }