Python api of placement group (#9243)

This commit is contained in:
Alisa
2020-07-28 05:57:05 +08:00
committed by GitHub
parent b51ab2af66
commit 51e12ee97c
20 changed files with 362 additions and 23 deletions
+2
View File
@@ -69,6 +69,7 @@ from ray._raylet import (
TaskID,
UniqueID,
Language,
PlacementGroupID,
) # noqa: E402
_config = _Config()
@@ -170,4 +171,5 @@ __all__ += [
"ObjectRef",
"TaskID",
"UniqueID",
"PlacementGroupID",
]
+44 -2
View File
@@ -38,6 +38,7 @@ from libcpp.string cimport string as c_string
from libcpp.utility cimport pair
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
from libcpp.pair cimport pair as c_pair
from cython.operator import dereference, postincrement
@@ -52,6 +53,7 @@ from ray.includes.common cimport (
CTaskArgByReference,
CTaskArgByValue,
CTaskType,
CPlacementStrategy,
CRayFunction,
LocalMemoryBuffer,
move,
@@ -64,15 +66,19 @@ from ray.includes.common cimport (
TASK_TYPE_ACTOR_TASK,
WORKER_TYPE_WORKER,
WORKER_TYPE_DRIVER,
PLACEMENT_STRATEGY_PACK,
PLACEMENT_STRATEGY_SPREAD,
)
from ray.includes.unique_ids cimport (
CActorID,
CActorCheckpointID,
CObjectID,
CClientID,
CPlacementGroupID,
)
from ray.includes.libcoreworker cimport (
CActorCreationOptions,
CPlacementGroupCreationOptions,
CCoreWorkerOptions,
CCoreWorkerProcess,
CTaskOptions,
@@ -907,7 +913,10 @@ cdef class CoreWorker:
c_bool is_detached,
c_string name,
c_bool is_asyncio,
c_string extension_data):
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index,
c_string extension_data
):
cdef:
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
@@ -915,6 +924,8 @@ cdef class CoreWorker:
unordered_map[c_string, double] c_resources
unordered_map[c_string, double] c_placement_resources
CActorID c_actor_id
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
@@ -929,12 +940,43 @@ cdef class CoreWorker:
CActorCreationOptions(
max_restarts, max_task_retries, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, name, is_asyncio),
dynamic_worker_options, is_detached, name, is_asyncio,
c_pair[CPlacementGroupID, int64_t](c_placement_group_id, placement_group_bundle_index)),
extension_data,
&c_actor_id))
return ActorID(c_actor_id.Binary())
def create_placement_group(
self,
c_string name,
c_vector[unordered_map[c_string, double]] bundles,
c_string strategy):
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
if strategy == b"PACK":
c_strategy = PLACEMENT_STRATEGY_PACK
else:
if strategy == b"SPREAD":
c_strategy = PLACEMENT_STRATEGY_SPREAD
else:
raise TypeError(strategy)
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().
CreatePlacementGroup(
CPlacementGroupCreationOptions(
name,
c_strategy,
bundles
),
&c_placement_group_id))
return PlacementGroupID(c_placement_group_id.Binary())
def submit_actor_task(self,
Language language,
ActorID actor_id,
+16 -2
View File
@@ -411,7 +411,9 @@ class ActorClass:
max_restarts=None,
max_task_retries=None,
name=None,
detached=False):
detached=False,
placement_group_id=None,
placement_group_bundle_index=None):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -436,6 +438,10 @@ class ActorClass:
guaranteed when max_concurrency > 1.
name: The globally unique name for the actor.
detached: DEPRECATED.
placement_group_id: the placement group this actor belongs to,
or None if it doesn't belong to any group.
placement_group_bundle_index: the index of the bundle
if the actor belongs to a placement group.
Returns:
A handle to the newly created actor.
@@ -446,7 +452,6 @@ class ActorClass:
kwargs = {}
if is_direct_call is not None and not is_direct_call:
raise ValueError("Non-direct call actors are no longer supported.")
meta = self.__ray_metadata__
actor_has_async_methods = len(
inspect.getmembers(
@@ -498,6 +503,11 @@ class ActorClass:
else:
detached = False
if placement_group_id is not None and placement_group_bundle_index is \
None:
raise ValueError("The placement_group_id is set."
"But the bundle_index is not set.")
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@@ -568,6 +578,10 @@ class ActorClass:
detached,
name if name is not None else "",
is_asyncio,
placement_group_id
if placement_group_id is not None else ray.PlacementGroupID.nil(),
placement_group_bundle_index
if placement_group_bundle_index is not None else -1,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
+3 -1
View File
@@ -1,8 +1,10 @@
from .api import get, wait
from .dynamic_resources import set_resource
from .placement_group import (
placement_group, )
__all__ = [
"get",
"wait",
"set_resource",
"placement_group",
]
@@ -0,0 +1,26 @@
import ray
from typing import (List, Dict)
def placement_group(bundles: List[Dict[str, float]],
strategy: str = "PACK",
name: str = None):
"""
Create a placement group.
This method is the api to create placement group.
Args:
bundles: A list of bundles which represent the resources needed.
strategy: The strategy to create the placement group.
There are two build-in strategies for the time begin.
PACK: Packs Bundles close together inside processes or nodes as
tight as possible.
SPREAD: Places Bundles across distinct nodes or processes as even
as possible.
name: The name of the placement group.
"""
worker = ray.worker.global_worker
placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
return placement_group_id
+20 -1
View File
@@ -5,6 +5,7 @@ from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
from libcpp.pair cimport pair as c_pair
from ray.includes.unique_ids cimport (
CActorID,
@@ -12,6 +13,7 @@ from ray.includes.unique_ids cimport (
CWorkerID,
CObjectID,
CTaskID,
CPlacementGroupID,
)
from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
@@ -142,6 +144,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
pass
cdef cppclass CTaskType "ray::TaskType":
pass
cdef cppclass CPlacementStrategy "ray::PlacementStrategy":
pass
cdef cppclass CAddress "ray::rpc::Address":
CAddress()
const c_string &SerializeAsString()
@@ -164,6 +168,11 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CTaskType TASK_TYPE_ACTOR_CREATION_TASK "ray::TaskType::ACTOR_CREATION_TASK" # noqa: E501
cdef CTaskType TASK_TYPE_ACTOR_TASK "ray::TaskType::ACTOR_TASK"
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \
"ray::PlacementStrategy::PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \
"ray::PlacementStrategy::SPREAD"
cdef extern from "ray/common/task/scheduling_resources.h" nogil:
cdef cppclass ResourceSet "ray::ResourceSet":
@@ -239,7 +248,17 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached, c_string &name, c_bool is_asyncio)
c_bool is_detached, c_string &name, c_bool is_asyncio,
c_pair[CPlacementGroupID, int64_t] placement_options)
cdef cppclass CPlacementGroupCreationOptions \
"ray::PlacementGroupCreationOptions":
CPlacementGroupCreationOptions()
CPlacementGroupCreationOptions(
const c_string &name,
CPlacementStrategy strategy,
const c_vector[unordered_map[c_string, double]] &bundles
)
cdef extern from "ray/gcs/gcs_client.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
+8
View File
@@ -17,11 +17,13 @@ from ray.includes.unique_ids cimport (
CJobID,
CTaskID,
CObjectID,
CPlacementGroupID,
)
from ray.includes.common cimport (
CAddress,
CActorCreationOptions,
CBuffer,
CPlacementGroupCreationOptions,
CRayFunction,
CRayObject,
CRayStatus,
@@ -91,6 +93,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[unique_ptr[CTaskArg]] &args,
const CActorCreationOptions &options,
const c_string &extension_data, CActorID *actor_id)
CRayStatus CreatePlacementGroup(
const CPlacementGroupCreationOptions &options,
CPlacementGroupID *placement_group_id)
void SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
@@ -225,9 +230,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void Initialize(const CCoreWorkerOptions &options)
# Only call this in CoreWorker.__cinit__,
# use CoreWorker.core_worker to access C++ CoreWorker.
@staticmethod
CCoreWorker &GetCoreWorker()
@staticmethod
void Shutdown()
@staticmethod
void RunTaskExecutionLoop()
+15
View File
@@ -158,3 +158,18 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
CWorkerID FromBinary(const c_string &binary)
cdef cppclass CPlacementGroupID "ray::PlacementGroupID" \
(CBaseID[CPlacementGroupID]):
@staticmethod
CPlacementGroupID FromBinary(const c_string &binary)
@staticmethod
const CActorID Nil()
@staticmethod
size_t Size()
@staticmethod
CPlacementGroupID FromRandom()
+40 -1
View File
@@ -19,7 +19,8 @@ from ray.includes.unique_ids cimport (
CObjectID,
CTaskID,
CUniqueID,
CWorkerID
CWorkerID,
CPlacementGroupID
)
import ray
@@ -331,6 +332,43 @@ cdef class ActorClassID(UniqueID):
# This type alias is for backward compatibility.
ObjectID = ObjectRef
cdef class PlacementGroupID(BaseID):
cdef CPlacementGroupID data
def __init__(self, id):
check_id(id, CPlacementGroupID.Size())
self.data = CPlacementGroupID.FromBinary(<c_string>id)
cdef CPlacementGroupID native(self):
return <CPlacementGroupID>self.data
@classmethod
def from_random(cls):
return cls(CPlacementGroupID.FromRandom().Binary())
@classmethod
def nil(cls):
return cls(CPlacementGroupID.Nil().Binary())
@classmethod
def size(cls):
return CPlacementGroupID.Size()
def binary(self):
return self.data.Binary()
def hex(self):
return decode(self.data.Hex())
def size(self):
return CPlacementGroupID.Size()
def is_nil(self):
return self.data.IsNil()
cdef size_t hash(self):
return self.data.Hash()
_ID_TYPES = [
ActorCheckpointID,
ActorClassID,
@@ -342,4 +380,5 @@ _ID_TYPES = [
ObjectID,
TaskID,
UniqueID,
PlacementGroupID,
]
+5 -2
View File
@@ -246,11 +246,14 @@ class GlobalState:
"JobID": binary_to_hex(actor_table_data.job_id),
"Address": {
"IPAddress": actor_table_data.address.ip_address,
"Port": actor_table_data.address.port
"Port": actor_table_data.address.port,
"NodeID": binary_to_hex(actor_table_data.address.raylet_id),
},
"OwnerAddress": {
"IPAddress": actor_table_data.owner_address.ip_address,
"Port": actor_table_data.owner_address.port
"Port": actor_table_data.owner_address.port,
"NodeID": binary_to_hex(
actor_table_data.owner_address.raylet_id),
},
"State": actor_table_data.state,
"Timestamp": actor_table_data.timestamp,
+8
View File
@@ -487,3 +487,11 @@ py_test(
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_placement_group",
size = "medium",
srcs = SRCS + ["test_placement_group.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
+149
View File
@@ -0,0 +1,149 @@
import pytest
try:
import pytest_timeout
except ImportError:
pytest_timeout = None
import sys
import os
import ray
import ray.test_utils
import ray.cluster_utils
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_pack(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
num_nodes = 2
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
name="name", strategy="PACK", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
# Get all actors.
actor_infos = ray.actors()
# Make sure all actors in counter_list are collocated in one node.
actor_info_1 = actor_infos.get(actor_1._actor_id.hex())
actor_info_2 = actor_infos.get(actor_2._actor_id.hex())
assert actor_info_1 and actor_info_2
node_of_actor_1 = actor_info_1["Address"]["NodeID"]
node_of_actor_2 = actor_info_2["Address"]["NodeID"]
assert node_of_actor_1 == node_of_actor_2
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_pack_best_effort(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
# TODO(Shanly):
pass
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_spread(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
num_nodes = 2
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
name="name", strategy="SPREAD", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id, bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id, bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
# Get all actors.
actor_infos = ray.actors()
# Make sure all actors in counter_list are collocated in one node.
actor_info_1 = actor_infos.get(actor_1._actor_id.hex())
actor_info_2 = actor_infos.get(actor_2._actor_id.hex())
assert actor_info_1 and actor_info_2
node_of_actor_1 = actor_info_1["Address"]["NodeID"]
node_of_actor_2 = actor_info_2["Address"]["NodeID"]
assert node_of_actor_1 != node_of_actor_2
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_spread_best_effort(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
# TODO(Shanly):
pass
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
@@ -154,14 +154,17 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
ray::rpc::SendReplyCallback send_reply_callback) {
auto placement_group_id =
PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id());
const auto &strategy = request.placement_group_spec().strategy();
const auto &name = request.placement_group_spec().name();
RAY_LOG(INFO) << "Registering placement group, placement group id = "
<< placement_group_id;
<< placement_group_id << ", name = " << name
<< ", strategy = " << PlacementStrategy_Name(strategy);
RegisterPlacementGroup(
request, [reply, send_reply_callback, placement_group_id](
std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
RAY_LOG(INFO) << "Registered placement group, placement group id = "
<< placement_group_id;
<< placement_group_id << ", name = " << placement_group->GetName()
<< ", strategy = " << placement_group->GetStrategy();
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
@@ -171,7 +174,7 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
}
void GcsPlacementGroupManager::ScheduleTick() {
reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(5));
reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(500));
reschedule_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::system::errc::operation_canceled) {
return;
@@ -154,14 +154,11 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode(
std::shared_ptr<ray::rpc::GcsNodeInfo> node, ReserveResourceCallback callback) {
RAY_CHECK(node);
auto node_id = ClientID::FromBinary(node->node_id());
RAY_LOG(DEBUG) << "Start leasing resource from node " << node_id << " for bundle "
<< bundle->BundleId().first << std::to_string(bundle->BundleId().second);
rpc::Address remote_address;
remote_address.set_raylet_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());
remote_address.set_port(node->node_manager_port());
auto node_id = ClientID::FromBinary(node->node_id());
auto lease_client = GetOrConnectLeaseClient(remote_address);
RAY_LOG(DEBUG) << "Start leasing resource from node " << node_id << " for bundle "
<< bundle->BundleId().first << bundle->BundleId().second;
@@ -183,7 +180,7 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode(
<< " for bundle " << bundle->BundleId().first
<< bundle->BundleId().second;
}
// Remove the actor from the leasing map as the reply is returned from the
// Remove the bundle from the leasing map as the reply is returned from the
// remote node.
iter->second.erase(bundle_iter);
if (iter->second.empty()) {
-1
View File
@@ -152,7 +152,6 @@ class GcsServer {
/// Worker info service
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Placement Group info handler and service
std::unique_ptr<rpc::PlacementGroupInfoHandler> placement_group_info_handler_;
std::unique_ptr<rpc::PlacementGroupInfoGrpcService> placement_group_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
+1 -1
View File
@@ -410,8 +410,8 @@ class GcsTableStorage {
std::unique_ptr<GcsObjectTable> object_table_;
std::unique_ptr<GcsNodeTable> node_table_;
std::unique_ptr<GcsNodeResourceTable> node_resource_table_;
std::unique_ptr<GcsHeartbeatTable> heartbeat_table_;
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;
std::unique_ptr<GcsHeartbeatTable> heartbeat_table_;
std::unique_ptr<GcsHeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<GcsErrorInfoTable> error_info_table_;
std::unique_ptr<GcsProfileTable> profile_table_;
+5
View File
@@ -835,6 +835,11 @@ Status RedisWorkerInfoAccessor::AsyncAdd(
return Status::Invalid("Not implemented");
}
Status RedisPlacementGroupInfoAccessor::AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) {
return Status::Invalid("Not implemented");
}
} // namespace gcs
} // namespace ray
+8
View File
@@ -473,6 +473,14 @@ class RedisWorkerInfoAccessor : public WorkerInfoAccessor {
WorkerFailureSubscriptionExecutor worker_failure_sub_executor_;
};
class RedisPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor {
public:
virtual ~RedisPlacementGroupInfoAccessor() = default;
Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) override;
};
} // namespace gcs
} // namespace ray
+1
View File
@@ -82,6 +82,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
error_accessor_.reset(new RedisErrorInfoAccessor(this));
stats_accessor_.reset(new RedisStatsInfoAccessor(this));
worker_accessor_.reset(new RedisWorkerInfoAccessor(this));
placement_group_accessor_.reset(new RedisPlacementGroupInfoAccessor());
is_connected_ = true;
+2 -3
View File
@@ -1986,8 +1986,8 @@ ResourceIdSet NodeManager::ScheduleBundle(
std::string resource_name = bundle_id_str + "_" + resource.first;
local_available_resources_.AddBundleResource(resource_name, resource.second);
}
cluster_resource_map_[self_node_id_].UpdateBundleResource(
bundle_id_str, bundle_spec.GetRequiredResources());
resource_map[self_node_id_].UpdateBundleResource(bundle_id_str,
bundle_spec.GetRequiredResources());
}
return acquired_resources;
}
@@ -2566,7 +2566,6 @@ void NodeManager::AssignTask(const std::shared_ptr<WorkerInterface> &worker,
auto acquired_resources =
local_available_resources_.Acquire(spec.GetRequiredResources());
cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources());
if (spec.IsActorCreationTask()) {
// Check that the actor's placement resource requirements are satisfied.
RAY_CHECK(spec.GetRequiredPlacementResources().IsSubset(