[Placement Group] Refactor pg resource constrain in node manager (#12538)

* first version by pointer

* second version reference

* clean up

* add cpp ut

* lint

* extract LocalPlacementGroupManagerInterface

* lint

* fix commemt

* add idempotency test

* lint

* fix pg ut

* fix pg ut

* python lint

* fix pg ut timeout

* python lint

* fix comment

* lint

* lint
This commit is contained in:
DK.Pino
2020-12-13 15:32:15 +08:00
committed by GitHub
parent bdc6624da8
commit 153b24746c
15 changed files with 648 additions and 563 deletions
+12 -9
View File
@@ -1,6 +1,7 @@
import pytest
import os
import sys
import time
try:
import pytest_timeout
@@ -674,12 +675,18 @@ def test_atomic_creation(ray_start_cluster):
@ray.remote(num_cpus=3)
def bothering_task():
import time
time.sleep(1)
time.sleep(6)
return True
# Schedule tasks to fail initial placement group creation.
tasks = [bothering_task.remote() for _ in range(2)]
# Make sure the two common task has scheduled.
def tasks_scheduled():
return ray.available_resources()["CPU"] == 2.0
wait_for_condition(tasks_scheduled)
# Create an actor that will fail bundle scheduling.
# It is important to use pack strategy to make test less flaky.
pg = ray.util.placement_group(
@@ -699,7 +706,7 @@ def test_atomic_creation(ray_start_cluster):
# Wait on the placement group now. It should be unready
# because normal actor takes resources that are required
# for one of bundle creation.
ready, unready = ray.wait([pg.ready()], timeout=0)
ready, unready = ray.wait([pg.ready()], timeout=0.5)
assert len(ready) == 0
assert len(unready) == 1
# Wait until all tasks are done.
@@ -1233,17 +1240,13 @@ def test_create_actor_with_placement_group_after_gcs_server_restart(
def test_create_placement_group_during_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=20)
cluster.add_node(num_cpus=200)
cluster.wait_for_nodes()
# Create placement groups during gcs server restart.
placement_groups = []
for i in range(0, 100):
placement_group = ray.util.placement_group([{
"CPU": 0.1
}, {
"CPU": 0.1
}])
placement_group = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
placement_groups.append(placement_group)
cluster.head_node.kill_gcs_server()