[PlacementGroup]Fix placement group wait api disorder bug (#12827)

* [PlacementGroup]Fix placment group wait api disorder bug

* fix review comment

* fix review comment

* fix review comment

* fix review comments

* increase num_heartbeats_timeout

Co-authored-by: 灵洵 <fengbin.ffb@antgroup.com>
This commit is contained in:
fangfengbin
2020-12-16 18:45:53 +08:00
committed by GitHub
parent 7ff314a5df
commit 91878d18b5
9 changed files with 109 additions and 29 deletions
@@ -52,11 +52,11 @@ public class PlacementGroupImpl implements PlacementGroup {
/**
* Wait for the placement group to be ready within the specified time.
* @param timeoutMs Timeout in milliseconds.
* @param timeoutSeconds Timeout in seconds.
* @return True if the placement group is created. False otherwise.
*/
public boolean wait(int timeoutMs) {
return Ray.internal().waitPlacementGroupReady(id, timeoutMs);
public boolean wait(int timeoutSeconds) {
return Ray.internal().waitPlacementGroupReady(id, timeoutSeconds);
}
/**
@@ -6,6 +6,7 @@ import io.ray.api.id.ActorId;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.placementgroup.PlacementGroupState;
import io.ray.api.placementgroup.PlacementStrategy;
import io.ray.runtime.exception.RayException;
import io.ray.runtime.placementgroup.PlacementGroupImpl;
import java.util.List;
import org.testng.Assert;
@@ -33,7 +34,7 @@ public class PlacementGroupTest extends BaseTest {
public void testCreateAndCallActor() {
PlacementGroupImpl placementGroup = (PlacementGroupImpl)PlacementGroupTestUtils
.createSimpleGroup();
Assert.assertTrue(placementGroup.wait(10000));
Assert.assertTrue(placementGroup.wait(10));
Assert.assertEquals(placementGroup.getName(),"unnamed_group");
// Test creating an actor from a constructor.
@@ -42,7 +43,7 @@ public class PlacementGroupTest extends BaseTest {
Assert.assertNotEquals(actor.getId(), ActorId.NIL);
// Test calling an actor.
Assert.assertEquals(Integer.valueOf(1), actor.task(Counter::getValue).remote().get());
Assert.assertEquals(actor.task(Counter::getValue).remote().get(), Integer.valueOf(1));
}
@Test(groups = {"cluster"})
@@ -54,8 +55,8 @@ public class PlacementGroupTest extends BaseTest {
PlacementGroupImpl secondPlacementGroup = (PlacementGroupImpl)PlacementGroupTestUtils
.createNameSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK,
1.0, "second_placement_group");
Assert.assertTrue(firstPlacementGroup.wait(10000));
Assert.assertTrue(secondPlacementGroup.wait(10000));
Assert.assertTrue(firstPlacementGroup.wait(10));
Assert.assertTrue(secondPlacementGroup.wait(10));
PlacementGroupImpl firstPlacementGroupRes =
(PlacementGroupImpl)Ray.getPlacementGroup((firstPlacementGroup).getId());
@@ -101,6 +102,15 @@ public class PlacementGroupTest extends BaseTest {
PlacementGroupImpl removedPlacementGroup =
(PlacementGroupImpl)Ray.getPlacementGroup((secondPlacementGroup).getId());
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
// Wait for placement group after it is removed.
int exceptionCount = 0;
try {
removedPlacementGroup.wait(10);
} catch (RayException e) {
++exceptionCount;
}
Assert.assertEquals(exceptionCount, 1);
}
public void testCheckBundleIndex() {
@@ -112,14 +122,14 @@ public class PlacementGroupTest extends BaseTest {
} catch (IllegalArgumentException e) {
++exceptionCount;
}
Assert.assertEquals(1, exceptionCount);
Assert.assertEquals(exceptionCount, 1);
try {
Ray.actor(Counter::new, 1).setPlacementGroup(placementGroup, -1).remote();
} catch (IllegalArgumentException e) {
++exceptionCount;
}
Assert.assertEquals(2, exceptionCount);
Assert.assertEquals(exceptionCount, 2);
}
@Test (expectedExceptions = { IllegalArgumentException.class })
+6 -3
View File
@@ -1207,14 +1207,17 @@ cdef class CoreWorker:
def wait_placement_group_ready(self,
PlacementGroupID placement_group_id,
int32_t timeout_ms):
int32_t timeout_seconds):
cdef CRayStatus status
cdef CPlacementGroupID cplacement_group_id = (
CPlacementGroupID.FromBinary(placement_group_id.binary()))
cdef int ctimeout_ms = timeout_ms
cdef int ctimeout_seconds = timeout_seconds
with nogil:
status = CCoreWorkerProcess.GetCoreWorker() \
.WaitPlacementGroupReady(cplacement_group_id, ctimeout_ms)
.WaitPlacementGroupReady(cplacement_group_id, ctimeout_seconds)
if status.IsNotFound():
raise Exception("Placement group {} does not exist.".format(
placement_group_id))
return status.ok()
def submit_actor_task(self,
+32
View File
@@ -1256,5 +1256,37 @@ def test_create_placement_group_during_gcs_server_restart(
ray.get(placement_groups[i].ready())
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_placement_group_wait_api(ray_start_cluster_head):
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=2)
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()
# Create placement group 1 successfully.
placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
assert placement_group1.wait(10)
# Restart gcs server.
cluster.head_node.kill_gcs_server()
cluster.head_node.start_gcs_server()
# Create placement group 2 successfully.
placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
assert placement_group2.wait(10)
# Remove placement group 1.
ray.util.remove_placement_group(placement_group1)
# Wait for placement group 1 after it is removed.
with pytest.raises(Exception):
placement_group1.wait(10)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
+13
View File
@@ -83,6 +83,19 @@ class PlacementGroup:
placement_group_bundle_index=bundle_index,
resources=resources).remote(self)
def wait(self, timeout_seconds: int) -> bool:
"""Wait for the placement group to be ready within the specified time.
Args:
timeout_seconds(str): Timeout in seconds.
Return:
True if the placement group is created. False otherwise.
"""
worker = ray.worker.global_worker
worker.check_connected()
return worker.core_worker.wait_placement_group_ready(
self.id, timeout_seconds)
@property
def bundle_specs(self) -> List[Dict]:
"""List[Dict]: Return bundles belonging to this placement group."""
+2 -2
View File
@@ -1460,14 +1460,14 @@ Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_
}
Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_group_id,
int timeout_ms) {
int timeout_seconds) {
std::shared_ptr<std::promise<Status>> status_promise =
std::make_shared<std::promise<Status>>();
RAY_CHECK_OK(gcs_client_->PlacementGroups().AsyncWaitUntilReady(
placement_group_id,
[status_promise](const Status &status) { status_promise->set_value(status); }));
auto status_future = status_promise->get_future();
if (status_future.wait_for(std::chrono::milliseconds(timeout_ms)) !=
if (status_future.wait_for(std::chrono::seconds(timeout_seconds)) !=
std::future_status::ready) {
std::ostringstream stream;
stream << "There was timeout in waiting for placement group " << placement_group_id
+2 -2
View File
@@ -682,11 +682,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Returns once the placement group is created or the timeout expires.
///
/// \param placement_group The id of a placement group to wait for.
/// \param timeout_ms Timeout in milliseconds.
/// \param timeout_seconds Timeout in seconds.
/// \return Status OK if the placement group is created. TimedOut if request to GCS
/// server times out. NotFound if placement group is already removed or doesn't exist.
Status WaitPlacementGroupReady(const PlacementGroupID &placement_group_id,
int timeout_ms);
int timeout_seconds);
/// Submit an actor task.
///
@@ -297,11 +297,11 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeRemovePlacementGroup(
JNIEXPORT jboolean JNICALL
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeWaitPlacementGroupReady(
JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes, jint timeout_ms) {
JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes, jint timeout_seconds) {
const auto placement_group_id =
JavaByteArrayToId<ray::PlacementGroupID>(env, placement_group_id_bytes);
auto status = ray::CoreWorkerProcess::GetCoreWorker().WaitPlacementGroupReady(
placement_group_id, timeout_ms);
placement_group_id, timeout_seconds);
if (status.IsNotFound()) {
env->ThrowNew(java_ray_exception_class, status.message().c_str());
}
@@ -410,26 +410,48 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady(
RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = "
<< placement_group_id;
auto callback = [placement_group_id, reply, send_reply_callback](const Status &status) {
RAY_LOG(DEBUG)
<< "Finished waiting for placement group until ready, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
// If the placement group does not exist or it has been successfully created, return
// directly.
const auto &iter = registered_placement_groups_.find(placement_group_id);
if (iter == registered_placement_groups_.end()) {
RAY_LOG(DEBUG) << "Placement group is not exist, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply,
Status::NotFound("Placement group is not exist."));
// Check whether the placement group does not exist or is removed.
auto on_done = [this, placement_group_id, reply, callback, send_reply_callback](
const Status &status,
const boost::optional<PlacementGroupTableData> &result) {
if (result) {
RAY_LOG(DEBUG) << "Placement group is removed, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply,
Status::NotFound("Placement group is removed."));
} else {
// `wait` is a method of placement group object. Placement group object is
// obtained by create placement group api, so it can guarantee the existence of
// placement group.
// GCS client does not guarantee the order of placement group creation and
// wait, so GCS may call wait placement group first and then create placement
// group.
placement_group_to_create_callbacks_[placement_group_id].emplace_back(
std::move(callback));
}
};
Status status =
gcs_table_storage_->PlacementGroupTable().Get(placement_group_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
} else if (iter->second->GetState() == rpc::PlacementGroupTableData::CREATED) {
RAY_LOG(DEBUG) << "Placement group is created, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
} else {
auto callback = [placement_group_id, reply,
send_reply_callback](const Status &status) {
RAY_LOG(DEBUG)
<< "Finished waiting for placement group until ready, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
placement_group_to_create_callbacks_[placement_group_id].emplace_back(
std::move(callback));
}