[Placement Group] Placement group remove api part 1 (#10063)

* Added basic rpc calls.

* fix issues.

* Fix the gcs server not getting request issue.

* In Progress.

* Basic logic done. Tests are required.

* In progress.

* In progress in refactoring context.

* Revert "In progress in refactoring context."

This reverts commit 38236256cf1306c60dd203e75d45ceb4509c8106.

* Working now.

* Python test works.

* Lint.

* Addressed code review.

* Addressed code review.

* Lint.

* Added unit tests.

* Done, but one of unit tests fail

* Addressed code review.

* Addressed the last code review.

* Fix the wrong test case.
This commit is contained in:
SangBin Cho
2020-08-18 12:44:00 -07:00
committed by GitHub
parent d188becec2
commit 263df6163c
24 changed files with 762 additions and 111 deletions
+10
View File
@@ -1083,6 +1083,16 @@ cdef class CoreWorker:
return PlacementGroupID(c_placement_group_id.Binary())
def remove_placement_group(self, PlacementGroupID placement_group_id):
cdef:
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().
RemovePlacementGroup(c_placement_group_id))
def submit_actor_task(self,
Language language,
ActorID actor_id,
+4 -2
View File
@@ -1,8 +1,10 @@
from .api import get, wait
from .dynamic_resources import set_resource
from .object_spilling import force_spill_objects, force_restore_spilled_objects
from .placement_group import (placement_group, placement_group_table)
from .placement_group import (placement_group, placement_group_table,
remove_placement_group)
__all__ = [
"get", "wait", "set_resource", "force_spill_objects",
"force_restore_spilled_objects", "placement_group", "placement_group_table"
"force_restore_spilled_objects", "placement_group",
"placement_group_table", "remove_placement_group"
]
+12 -1
View File
@@ -1,6 +1,9 @@
import ray
from typing import (List, Dict)
import ray
from ray._raylet import (
PlacementGroupID, )
def placement_group(bundles: List[Dict[str, float]],
strategy: str = "PACK",
@@ -32,6 +35,14 @@ def placement_group(bundles: List[Dict[str, float]],
return placement_group_id
def remove_placement_group(placement_group_id: PlacementGroupID):
assert type(placement_group_id) == PlacementGroupID
worker = ray.worker.global_worker
worker.check_connected()
worker.core_worker.remove_placement_group(placement_group_id)
def placement_group_table(placement_group_id):
assert placement_group_id is not None
worker = ray.worker.global_worker
+2
View File
@@ -98,6 +98,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus CreatePlacementGroup(
const CPlacementGroupCreationOptions &options,
CPlacementGroupID *placement_group_id)
CRayStatus RemovePlacementGroup(
const CPlacementGroupID &placement_group_id)
void SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
+3 -3
View File
@@ -404,10 +404,10 @@ class GlobalState:
def get_state(state):
if state == ray.gcs_utils.PlacementGroupTableData.PENDING:
return "PENDING"
elif state == ray.gcs_utils.PlacementGroupTableData.ALIVE:
return "ALIVE"
elif state == ray.gcs_utils.PlacementGroupTableData.CREATED:
return "CREATED"
else:
return "DEAD"
return "REMOVED"
def get_strategy(strategy):
if strategy == PlacementStrategy.PACK:
+74 -2
View File
@@ -8,8 +8,9 @@ except ImportError:
pytest_timeout = None
import ray
import ray.test_utils
from ray.test_utils import wait_for_condition
import ray.cluster_utils
from ray._raylet import PlacementGroupID
def test_placement_group_pack(ray_start_cluster):
@@ -219,6 +220,77 @@ def test_placement_group_hang(ray_start_cluster):
assert "CPU_group_" in list(resources.keys())[0], resources
def test_remove_placement_group(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# First try to remove a placement group that doesn't
# exist. This should not do anything.
random_placement_group_id = PlacementGroupID.from_random()
for _ in range(3):
ray.experimental.remove_placement_group(random_placement_group_id)
# Creating a placement group as soon as it is
# created should work.
pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}])
ray.experimental.remove_placement_group(pid)
def is_placement_group_removed():
table = ray.experimental.placement_group_table(pid)
if "state" not in table:
return False
return table["state"] == "REMOVED"
wait_for_condition(is_placement_group_removed)
# # Now let's create a placement group.
pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}])
# # This is a hack to wait for placement group creation.
# # TODO(sang): Remove it when wait is implemented.
@ray.remote(num_cpus=0)
class A:
def f(self):
return 3
a = A.options(placement_group_id=pid).remote()
assert ray.get(a.f.remote()) == 3
ray.experimental.remove_placement_group(pid)
# # Subsequent remove request shouldn't do anything
for _ in range(3):
ray.experimental.remove_placement_group(pid)
# # Make sure placement group resources are
# # released and we can schedule this task.
@ray.remote(num_cpus=4)
def f():
return 3
assert ray.get(f.remote()) == 3
# Since the placement group is removed,
# the actor should've been killed.
# That means this request should fail.
# TODO(sang): Turn it on.
# ray.get(a.f.remote())
def test_remove_pending_placement_group(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# Create a placement group that cannot be scheduled now.
pid = ray.experimental.placement_group([{"GPU": 2}, {"CPU": 2}])
ray.experimental.remove_placement_group(pid)
# TODO(sang): Add state check here.
@ray.remote(num_cpus=4)
def f():
return 3
# Make sure this task is still schedulable.
assert ray.get(f.remote()) == 3
def test_placement_group_table(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
@@ -257,7 +329,7 @@ def test_placement_group_table(ray_start_cluster):
ray.get(actor_1.value.remote())
result = ray.experimental.placement_group_table(placement_group_id)
assert result["state"] == "ALIVE"
assert result["state"] == "CREATED"
def test_cuda_visible_devices(ray_start_cluster):