[Placement Group]Fix SigSegv bug (#10262)

* fix SigSegv bug

* fix review comments

* fix ut bug

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-08-24 02:33:40 +08:00
committed by GitHub
parent 73c4246332
commit b61a79efd7
5 changed files with 102 additions and 25 deletions
@@ -29,6 +29,7 @@ import io.ray.runtime.generated.Common.TaskSpec;
import io.ray.runtime.generated.Common.TaskType;
import io.ray.runtime.object.LocalModeObjectStore;
import io.ray.runtime.object.NativeRayObject;
import io.ray.runtime.placementgroup.PlacementGroupId;
import io.ray.runtime.placementgroup.PlacementGroupImpl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -165,6 +166,15 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
public BaseActorHandle createActor(
FunctionDescriptor functionDescriptor, List<FunctionArg> args,
ActorCreationOptions options) throws IllegalArgumentException {
if (options != null) {
if (options.group != null) {
PlacementGroupImpl group = (PlacementGroupImpl)options.group;
Preconditions.checkArgument(options.bundleIndex >= 0
&& options.bundleIndex < group.getBundleCount(),
String.format("Bundle index %s is invalid", options.bundleIndex));
}
}
ActorId actorId = ActorId.fromRandom();
TaskSpec taskSpec = getTaskSpecBuilder(TaskType.ACTOR_CREATION_TASK, functionDescriptor, args)
.setNumReturns(1)
@@ -215,7 +225,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
@Override
public PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy) {
return new PlacementGroupImpl();
return new PlacementGroupImpl(PlacementGroupId.fromRandom(), bundles.size());
}
@Override
+25
View File
@@ -546,5 +546,30 @@ def test_check_bundle_index(ray_start_cluster):
assert error_count == 3
def test_schedule_placement_group_when_node_add(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# Creating a placement group that cannot be satisfied yet.
placement_group = ray.experimental.placement_group([{
"GPU": 2
}, {
"CPU": 2
}])
def is_placement_group_created():
table = ray.experimental.placement_group_table(placement_group)
if "state" not in table:
return False
return table["state"] == "CREATED"
# Add a node that has GPU.
cluster.add_node(num_cpus=4, num_gpus=4)
# Make sure the placement group is created.
wait_for_condition(is_placement_group_created)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
@@ -337,8 +337,10 @@ void GcsPlacementGroupManager::OnNodeDead(const ClientID &node_id) {
// TODO(ffbin): If we have a placement group bundle that requires a unique resource
// (for example gpu resource when theres only one gpu node), this can postpone
// creating until a node with the resources is added. we will solve it in next pr.
iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
pending_placement_groups_.emplace_front(iter->second);
if (iter->second->GetState() != rpc::PlacementGroupTableData::RESCHEDULING) {
iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
pending_placement_groups_.emplace_front(iter->second);
}
}
}
@@ -102,36 +102,52 @@ ScheduleMap GcsSpreadStrategy::Schedule(
// bundles will be deployed to the previous nodes. So we start with the next node of the
// last selected node.
ScheduleMap schedule_map;
auto node_resources = context->node_manager_.GetClusterRealtimeResources();
if (node_resources.empty()) {
const auto &candidate_nodes = context->node_manager_.GetClusterRealtimeResources();
if (candidate_nodes.empty()) {
return schedule_map;
}
auto candidate_nodes = node_resources;
auto iter = candidate_nodes.begin();
auto iter_begin = iter;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
// Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node that
// meets the resource requirements. `iter_begin` is the next node of the last selected
// node.
for (; iter != candidate_nodes.end(); ++iter) {
if (required_resources.IsSubset(*iter->second)) {
node_resources[iter->first]->SubtractResourcesStrict(required_resources);
iter->second->SubtractResourcesStrict(required_resources);
schedule_map[bundle->BundleId()] = iter->first;
break;
}
}
if (iter == candidate_nodes.end() && iter_begin != candidate_nodes.begin()) {
for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) {
if (required_resources.IsSubset(*iter->second)) {
node_resources[iter->first]->SubtractResourcesStrict(required_resources);
schedule_map[bundle->BundleId()] = iter->first;
// We've traversed all the nodes from `iter_begin` to `candidate_nodes.end()`, but we
// haven't found one that meets the requirements.
// If `iter_begin` is `candidate_nodes.begin()`, it means that all nodes are not
// satisfied, we will return directly. Otherwise, we will traverse the nodes from
// `candidate_nodes.begin()` to `iter_begin` to find the nodes that meet the
// requirements.
if (iter == candidate_nodes.end()) {
if (iter_begin != candidate_nodes.begin()) {
// Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`.
for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) {
if (required_resources.IsSubset(*iter->second)) {
iter->second->SubtractResourcesStrict(required_resources);
schedule_map[bundle->BundleId()] = iter->first;
break;
}
}
if (iter == iter_begin) {
// We have traversed all the nodes, so return directly.
break;
}
}
if (iter == iter_begin) {
} else {
// We have traversed all the nodes, so return directly.
break;
}
}
// NOTE: If `iter == candidate_nodes.end()`, ++iter causes crash.
iter_begin = ++iter;
}
@@ -258,7 +274,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
}
placement_group_to_bundle_locations_.erase(it);
// Remove bundles from node_to_leased_bundles_ because bundels are removed now.
// Remove bundles from node_to_leased_bundles_ because bundles are removed now.
for (const auto &bundle_location : *bundle_locations) {
const auto &bundle_id = bundle_location.first;
const auto &node_id = bundle_location.second.first;
@@ -394,7 +410,6 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
const PlacementGroupID &placement_group_id) {
// TODO(ffbin): We will add listener to the GCS node manager to handle node deletion.
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
for (const auto &iter : alive_nodes) {
if (!node_to_leased_bundles_.contains(iter.first)) {
@@ -204,7 +204,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackSchedulePlacementGroupSucce
SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::STRICT_PACK);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
@@ -227,15 +227,40 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
ASSERT_EQ(2, raylet_client_->num_lease_requested);
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
// Reply failure, so the placement group scheduling failed.
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
// Reply the placement_group creation request, then the placement_group should be
// scheduled successfully.
WaitPendingDone(failure_placement_groups_, 1);
WaitPendingDone(success_placement_groups_, 0);
ASSERT_EQ(placement_group, failure_placement_groups_.front());
}
TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadStrategyResourceCheck) {
auto node = Mocker::GenNodeInfo(0);
AddNode(node, 2);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
auto request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 3, 2);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
// The node resource is not enough, scheduling failed.
WaitPendingDone(failure_placement_groups_, 1);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
// The node resource is not enough, scheduling failed.
WaitPendingDone(failure_placement_groups_, 2);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
@@ -478,15 +503,15 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id()));
ASSERT_EQ(1, bundles_on_node1.size());
// Node1 is dead, reschedule the placement group.
// One node is dead, reschedule the placement group.
auto bundle_on_dead_node = placement_group->GetMutableBundle(0);
bundle_on_dead_node->clear_node_id();
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
if (0 == bundles_on_node0[placement_group->GetPlacementGroupID()][0]) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
} else {
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
}
// TODO(ffbin): We need to see which node the other bundles that have been placed are
// deployed on, and spread them as far as possible. It will be implemented in the next
// pr.
raylet_client_->GrantResourceReserve();
raylet_client1_->GrantResourceReserve();
WaitPendingDone(success_placement_groups_, 2);
}