Add placement group manager and some code in core_worker (#9120)

Co-authored-by: Lingxuan Zuo <skyzlxuan@gmail.com>
This commit is contained in:
Alisa
2020-07-17 20:49:51 +08:00
committed by GitHub
parent 78dfed2683
commit f080aa6ce3
28 changed files with 641 additions and 43 deletions
+14
View File
@@ -1076,6 +1076,20 @@ cc_test(
],
)
cc_test(
name = "gcs_placement_group_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc",
"src/ray/gcs/gcs_server/test/gcs_server_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "gcs_placement_group_scheduler_test",
srcs = [
+2 -6
View File
@@ -33,12 +33,8 @@ std::vector<BundleSpecification> PlacementGroupSpecification::GetBundles() const
return bundles_;
}
Strategy PlacementGroupSpecification::GetStrategy() const {
if (message_->strategy() == rpc::PlacementStrategy::PACK) {
return Strategy::PACK;
} else {
return Strategy::SPREAD;
}
rpc::PlacementStrategy PlacementGroupSpecification::GetStrategy() const {
return message_->strategy();
}
BundleSpecification PlacementGroupSpecification::GetBundle(int position) const {
+6 -10
View File
@@ -21,11 +21,6 @@
namespace ray {
enum class Strategy {
PACK = 0x0,
SPREAD = 0x1,
};
class PlacementGroupSpecification : public MessageWrapper<rpc::PlacementGroupSpec> {
public:
/// Construct from a protobuf message object.
@@ -48,7 +43,7 @@ class PlacementGroupSpecification : public MessageWrapper<rpc::PlacementGroupSpe
/// Return the bundles in this placement group.
std::vector<BundleSpecification> GetBundles() const;
/// Return the strategy of the placement group.
Strategy GetStrategy() const;
rpc::PlacementStrategy GetStrategy() const;
/// Return the bundle by given index.
BundleSpecification GetBundle(int position) const;
/// Return the name of this placement group.
@@ -71,18 +66,19 @@ class PlacementGroupSpecBuilder {
/// \return Reference to the builder object itself.
PlacementGroupSpecBuilder &SetPlacementGroupSpec(
const PlacementGroupID &placement_group_id, std::string name,
const std::vector<rpc::Bundle> &bundles, const rpc::PlacementStrategy strategy) {
const std::vector<std::unordered_map<std::string, double>> &bundles,
const rpc::PlacementStrategy strategy) {
message_->set_placement_group_id(placement_group_id.Binary());
message_->set_name(name);
message_->set_strategy(strategy);
for (int i = 0; i < bundles.size(); i++) {
rpc::Bundle bundle = bundles[i];
auto resources = bundles[i];
auto message_bundle = message_->add_bundles();
auto mutable_bundle_id = message_bundle->mutable_bundle_id();
mutable_bundle_id->set_bundle_index(i);
mutable_bundle_id->set_placement_group_id(placement_group_id.Binary());
message_bundle->mutable_unit_resources()->insert(bundle.unit_resources().begin(),
bundle.unit_resources().end());
message_bundle->mutable_unit_resources()->insert(resources.begin(),
resources.end());
}
return *this;
}
@@ -683,7 +683,6 @@ void ResourceIdSet::CancelResourceReserve(const std::string &resource_name) {
available_resources_.erase(iter_bundle);
}
}
void ResourceIdSet::DeleteResource(const std::string &resource_name) {
available_resources_.erase(resource_name);
}
@@ -429,6 +429,7 @@ class ResourceIdSet {
/// \brief remove a Bundle resource in the ResourceIdSet.
///
/// \param resource_name the name of the resource to remove.
void CancelResourceReserve(const std::string &resource_name);
/// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception,
/// just deletes the resource. Tasks with acquired resources keep running.
@@ -63,14 +63,7 @@ TEST_F(SchedulingResourcesTest, ReturnBundleResources) {
resource_labels.push_back(bundle_id.Binary() + "_" + "CPU");
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource));
resource_set->ReturnBundleResources(bundle_id.Binary());
ASSERT_EQ(1, resource_set->IsEqual(resource));
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+31 -7
View File
@@ -25,6 +25,7 @@
namespace ray {
using WorkerType = rpc::WorkerType;
using PlacementOptions = std::pair<PlacementGroupID, int64_t>;
// Return a string representation of the worker type.
std::string WorkerTypeString(WorkerType type);
@@ -65,12 +66,13 @@ struct TaskOptions {
/// Options for actor creation tasks.
struct ActorCreationOptions {
ActorCreationOptions() {}
ActorCreationOptions(int64_t max_restarts, int64_t max_task_retries,
int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached, std::string &name, bool is_asyncio)
ActorCreationOptions(
int64_t max_restarts, int64_t max_task_retries, int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options, bool is_detached,
std::string &name, bool is_asyncio,
PlacementOptions placement_options = std::make_pair(PlacementGroupID::Nil(), -1))
: max_restarts(max_restarts),
max_task_retries(max_task_retries),
max_concurrency(max_concurrency),
@@ -79,7 +81,8 @@ struct ActorCreationOptions {
dynamic_worker_options(dynamic_worker_options),
is_detached(is_detached),
name(name),
is_asyncio(is_asyncio){};
is_asyncio(is_asyncio),
placement_options(placement_options){};
/// Maximum number of times that the actor should be restarted if it dies
/// unexpectedly. A value of -1 indicates infinite restarts. If it's 0, the
@@ -107,6 +110,27 @@ struct ActorCreationOptions {
const std::string name;
/// Whether to use async mode of direct actor call.
const bool is_asyncio = false;
/// The placement_options include placement_group_id and bundle_index.
/// If the actor doesn't belong to a placement group, the placement_group_id will be
/// nil, and the bundle_index will be -1.
PlacementOptions placement_options;
};
using PlacementStrategy = rpc::PlacementStrategy;
struct PlacementGroupCreationOptions {
PlacementGroupCreationOptions() {}
PlacementGroupCreationOptions(
const std::string &name, PlacementStrategy strategy,
const std::vector<std::unordered_map<std::string, double>> &bundles)
: name(name), strategy(strategy), bundles(bundles) {}
/// The strategy to place the bundle in Placement Group.
const PlacementStrategy strategy = rpc::PACK;
/// The resource bundles in this placement group.
const std::vector<std::unordered_map<std::string, double>> bundles;
/// The name of the placement group.
const std::string name;
};
} // namespace ray
+40 -2
View File
@@ -1168,6 +1168,21 @@ void CoreWorker::SubmitTask(const RayFunction &function,
}
}
std::unordered_map<std::string, double> AddPlacementGroupConstraint(
const std::unordered_map<std::string, double> &resources,
PlacementGroupID placement_group_id, int64_t bundle_index) {
std::unordered_map<std::string, double> new_resources;
if (placement_group_id != PlacementGroupID::Nil()) {
for (auto iter = resources.begin(); iter != resources.end(); iter++) {
auto new_name =
placement_group_id.Hex() + std::to_string(bundle_index) + "_" + iter->first;
new_resources[new_name] = iter->second;
}
return new_resources;
}
return resources;
}
Status CoreWorker::CreateActor(const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const ActorCreationOptions &actor_creation_options,
@@ -1185,10 +1200,17 @@ Status CoreWorker::CreateActor(const RayFunction &function,
const JobID job_id = worker_context_.GetCurrentJobID();
std::vector<ObjectID> return_ids;
TaskSpecBuilder builder;
auto new_placement_resources =
AddPlacementGroupConstraint(actor_creation_options.placement_resources,
actor_creation_options.placement_options.first,
actor_creation_options.placement_options.second);
auto new_resource = AddPlacementGroupConstraint(
actor_creation_options.resources, actor_creation_options.placement_options.first,
actor_creation_options.placement_options.second);
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources, &return_ids);
rpc_address_, function, args, 1, new_resource,
new_placement_resources, &return_ids);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_restarts,
actor_creation_options.dynamic_worker_options,
@@ -1227,6 +1249,22 @@ Status CoreWorker::CreateActor(const RayFunction &function,
return status;
}
Status CoreWorker::CreatePlacementGroup(
const PlacementGroupCreationOptions &placement_group_creation_options,
PlacementGroupID *return_placement_group_id) {
const PlacementGroupID placement_group_id = PlacementGroupID ::FromRandom();
PlacementGroupSpecBuilder builder;
builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name,
placement_group_creation_options.bundles,
placement_group_creation_options.strategy);
PlacementGroupSpecification placement_group_spec = builder.Build();
*return_placement_group_id = placement_group_id;
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;
RAY_CHECK_OK(
gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(placement_group_spec));
return Status::OK();
}
void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options,
+12
View File
@@ -17,6 +17,7 @@
#include "absl/base/optimization.h"
#include "absl/container/flat_hash_map.h"
#include "ray/common/buffer.h"
#include "ray/common/placement_group.h"
#include "ray/core_worker/actor_handle.h"
#include "ray/core_worker/actor_manager.h"
#include "ray/core_worker/actor_reporter.h"
@@ -593,6 +594,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const ActorCreationOptions &actor_creation_options,
const std::string &extension_data, ActorID *actor_id);
/// Create a placement group.
///
/// \param[in] function The remote function that generates the placement group object.
/// \param[in] placement_group_creation_options Options for this placement group
/// creation task. \param[out] placement_group_id ID of the created placement group.
/// This can be used to shedule actor in node \return Status error if placement group
/// creation fails, likely due to raylet failure.
Status CreatePlacementGroup(
const PlacementGroupCreationOptions &placement_group_creation_options,
PlacementGroupID *placement_group_id);
/// Submit an actor task.
///
/// \param[in] caller_id ID of the task submitter.
+16
View File
@@ -15,6 +15,7 @@
#pragma once
#include "ray/common/id.h"
#include "ray/common/placement_group.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/entry_change_notification.h"
@@ -711,6 +712,21 @@ class WorkerInfoAccessor {
WorkerInfoAccessor() = default;
};
class PlacementGroupInfoAccessor {
public:
// TODO(AlisaWu): fill the accessor.
/// Create an placement group to GCS asynchronously.
///
/// \param placement_group_spec The specification for the placement group creation task.
/// \param callback Callback that will be called after the placement group info is
/// written to GCS. \return Status
virtual Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) = 0;
protected:
PlacementGroupInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
+8
View File
@@ -134,6 +134,13 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
return *worker_accessor_;
}
/// Get the sub-interface for accessing worker information in GCS.
/// This function is thread safe.
PlacementGroupInfoAccessor &PlacementGroups() {
RAY_CHECK(placement_group_accessor_ != nullptr);
return *placement_group_accessor_;
}
protected:
/// Constructor of GcsClient.
///
@@ -153,6 +160,7 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
std::unique_ptr<ErrorInfoAccessor> error_accessor_;
std::unique_ptr<StatsInfoAccessor> stats_accessor_;
std::unique_ptr<WorkerInfoAccessor> worker_accessor_;
std::unique_ptr<PlacementGroupInfoAccessor> placement_group_accessor_;
};
} // namespace gcs
@@ -1383,5 +1383,28 @@ Status ServiceBasedWorkerInfoAccessor::AsyncAdd(
return Status::OK();
}
ServiceBasedPlacementGroupInfoAccessor::ServiceBasedPlacementGroupInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup(
const ray::PlacementGroupSpecification &placement_group_spec) {
rpc::CreatePlacementGroupRequest request;
request.mutable_placement_group_spec()->CopyFrom(placement_group_spec.GetMessage());
client_impl_->GetGcsRpcClient().CreatePlacementGroup(
request, [placement_group_spec](const Status &,
const rpc::CreatePlacementGroupReply &reply) {
auto status =
reply.status().code() == (int)StatusCode::OK
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
if (status.ok()) {
RAY_LOG(DEBUG) << "Finished registering placement group. placement group id = "
<< placement_group_spec.PlacementGroupId();
}
});
return Status::OK();
}
} // namespace gcs
} // namespace ray
@@ -409,5 +409,23 @@ class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor {
ServiceBasedGcsClient *client_impl_;
};
/// \class ServiceBasedPlacementGroupInfoAccessor
/// ServiceBasedPlacementGroupInfoAccessor is an implementation of
/// `PlacementGroupInfoAccessor` that uses GCS Service as the backend.
class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor {
// TODO(AlisaWu):fill the ServiceAccessor.
public:
explicit ServiceBasedPlacementGroupInfoAccessor(ServiceBasedGcsClient *client_impl);
virtual ~ServiceBasedPlacementGroupInfoAccessor() = default;
Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) override;
private:
ServiceBasedGcsClient *client_impl_;
};
} // namespace gcs
} // namespace ray
@@ -75,6 +75,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
stats_accessor_.reset(new ServiceBasedStatsInfoAccessor(this));
error_accessor_.reset(new ServiceBasedErrorInfoAccessor(this));
worker_accessor_.reset(new ServiceBasedWorkerInfoAccessor(this));
placement_group_accessor_.reset(new ServiceBasedPlacementGroupInfoAccessor(this));
// Init gcs service address check timer.
detect_timer_.reset(new boost::asio::deadline_timer(io_service));
@@ -54,5 +54,131 @@ const rpc::PlacementGroupTableData &GcsPlacementGroup::GetPlacementGroupTableDat
return placement_group_table_data_;
}
/////////////////////////////////////////////////////////////////////////////////////////
GcsPlacementGroupManager::GcsPlacementGroupManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_placement_group_scheduler_(std::move(scheduler)),
gcs_table_storage_(gcs_table_storage),
reschedule_timer_(io_context) {}
void GcsPlacementGroupManager::RegisterPlacementGroup(
const ray::rpc::CreatePlacementGroupRequest &request,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> callback) {
RAY_CHECK(callback);
const auto &placement_group_spec = request.placement_group_spec();
auto placement_group_id =
PlacementGroupID::FromBinary(placement_group_spec.placement_group_id());
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
// Mark the callback as pending and invoke it after the placement_group has been
// successfully created.
placement_group_to_register_callbacks_[placement_group_id].emplace_back(
std::move(callback));
RAY_CHECK(registered_placement_groups_
.emplace(placement_group->GetPlacementGroupID(), placement_group)
.second);
pending_placement_groups_.emplace_back(std::move(placement_group));
SchedulePendingPlacementGroups();
}
PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(
const std::string &name) {
PlacementGroupID placement_group_id = PlacementGroupID::Nil();
for (auto placement_group_pair : registered_placement_groups_) {
auto placement_group = placement_group_pair.second;
if (placement_group->GetName() == name) {
placement_group_id = placement_group_pair.first;
break;
}
}
return placement_group_id;
}
void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
std::shared_ptr<GcsPlacementGroup> placement_group) {
// We will attempt to schedule this placement_group once an eligible node is
// registered.
pending_placement_groups_.emplace_back(std::move(placement_group));
is_creating_ = false;
ScheduleTick();
}
void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
std::shared_ptr<GcsPlacementGroup> placement_group) {
auto placement_group_id = placement_group->GetPlacementGroupID();
RAY_CHECK(registered_placement_groups_.count(placement_group_id) > 0);
placement_group->UpdateState(rpc::PlacementGroupTableData::ALIVE);
auto placement_group_table_data = placement_group->GetPlacementGroupTableData();
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group_table_data, [](Status status) {}));
// Invoke all callbacks for all registration requests of this placement_group
// (duplicated requests are included) and remove all of them from
// placement_group_to_register_callbacks_.
auto iter = placement_group_to_register_callbacks_.find(placement_group_id);
if (iter != placement_group_to_register_callbacks_.end()) {
for (auto &callback : iter->second) {
callback(placement_group);
}
placement_group_to_register_callbacks_.erase(iter);
}
is_creating_ = false;
SchedulePendingPlacementGroups();
}
void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
if (pending_placement_groups_.empty() || is_creating_) {
return;
}
is_creating_ = true;
gcs_placement_group_scheduler_->Schedule(
*pending_placement_groups_.begin(),
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationFailed(std::move(placement_group));
},
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationSuccess(std::move(placement_group));
});
pending_placement_groups_.pop_front();
}
void GcsPlacementGroupManager::HandleCreatePlacementGroup(
const ray::rpc::CreatePlacementGroupRequest &request,
ray::rpc::CreatePlacementGroupReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
auto placement_group_id =
PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id());
RAY_LOG(INFO) << "Registering placement group, placement group id = "
<< placement_group_id;
RegisterPlacementGroup(
request, [reply, send_reply_callback, placement_group_id](
std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
RAY_LOG(INFO) << "Registered placement group, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
auto placement_group_table_data = placement_group->GetPlacementGroupTableData();
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group_table_data, [](Status status) {}));
}
void GcsPlacementGroupManager::ScheduleTick() {
reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(5));
reschedule_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::system::errc::operation_canceled) {
return;
} else {
SchedulePendingPlacementGroups();
}
});
}
} // namespace gcs
} // namespace ray
@@ -47,7 +47,7 @@ class GcsPlacementGroup {
const auto &placement_group_spec = request.placement_group_spec();
placement_group_table_data_.set_placement_group_id(
placement_group_spec.placement_group_id());
placement_group_table_data_.set_name(placement_group_spec.name());
placement_group_table_data_.set_state(rpc::PlacementGroupTableData::PENDING);
placement_group_table_data_.mutable_bundles()->CopyFrom(
placement_group_spec.bundles());
@@ -78,5 +78,100 @@ class GcsPlacementGroup {
/// state of the gcs placement_group and so on (see gcs.proto).
rpc::PlacementGroupTableData placement_group_table_data_;
};
using RegisterPlacementGroupCallback =
std::function<void(std::shared_ptr<GcsPlacementGroup>)>;
/// GcsPlacementGroupManager is responsible for managing the lifecycle of all placement
/// group. This class is not thread-safe.
/// The placementGroup will be added into queue and set the status as pending first and
/// use SchedulePendingPlacementGroups(). The SchedulePendingPlacementGroups() will get
/// the head of the queue and schedule it. If schedule success, using the
/// SchedulePendingPlacementGroups() Immediately. else wait for a short time beforw using
/// SchedulePendingPlacementGroups() next time.
class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
public:
/// Create a GcsPlacementGroupManager
///
/// \param io_context The event loop to run the monitor on.
/// \param scheduler Used to schedule placement group creation tasks.
/// \param gcs_table_storage Used to flush placement group data to storage.
explicit GcsPlacementGroupManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsPlacementGroupSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
~GcsPlacementGroupManager() = default;
void HandleCreatePlacementGroup(const rpc::CreatePlacementGroupRequest &request,
rpc::CreatePlacementGroupReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Register placement_group asynchronously.
///
/// \param request Contains the meta info to create the placement_group.
/// \param callback Will be invoked after the placement_group is created successfully or
/// be invoked immediately if the placement_group is already registered to
/// `registered_placement_groups_` and its state is `ALIVE`. The callback will not be
/// called in this case.
void RegisterPlacementGroup(const rpc::CreatePlacementGroupRequest &request,
RegisterPlacementGroupCallback callback);
/// Schedule placement_groups in the `pending_placement_groups_` queue.
/// This function is exposed for testing only.
void SchedulePendingPlacementGroups();
/// Get the placement_group ID for the named placement_group. Returns nil if the
/// placement_group was not found.
/// \param name The name of the placement_group to look up.
/// \returns PlacementGroupID The ID of the placement_group. Nil if the
/// placement_group was not found.
PlacementGroupID GetPlacementGroupIDByName(const std::string &name);
/// Handle placement_group creation task failure. This should be called when scheduling
/// an placement_group creation task is infeasible.
///
/// \param placement_group The placement_group whose creation task is infeasible.
void OnPlacementGroupCreationFailed(std::shared_ptr<GcsPlacementGroup> placement_group);
/// Handle placement_group creation task success. This should be called when the
/// placement_group creation task has been scheduled successfully.
///
/// \param placement_group The placement_group that has been created.
void OnPlacementGroupCreationSuccess(
std::shared_ptr<GcsPlacementGroup> placement_group);
private:
/// Schedule another tick after a short time.
void ScheduleTick();
/// Callbacks of placement_group registration requests that are not yet flushed.
/// This map is used to filter duplicated messages from a Driver/Worker caused by some
/// network problems.
///
/// Since the GRPC message received by the GCS side is out of order, it can not be
/// determined that the last callback is the valid one. Therefore, the repeated
/// callbacks are recorded in the form of vector without distinction. When the operation
/// is successful, all callbacks will be triggered. One of them must be valid, and the
/// rest invalid callbacks will not have any effect even if they are called.
absl::flat_hash_map<PlacementGroupID, std::vector<RegisterPlacementGroupCallback>>
placement_group_to_register_callbacks_;
/// All registered placement_groups (pending placement_groups are also included).
absl::flat_hash_map<PlacementGroupID, std::shared_ptr<GcsPlacementGroup>>
registered_placement_groups_;
/// The pending placement_groups which will not be scheduled until there's a resource
/// change.
std::deque<std::shared_ptr<GcsPlacementGroup>> pending_placement_groups_;
/// The scheduler to schedule all registered placement_groups.
std::shared_ptr<gcs::GcsPlacementGroupSchedulerInterface>
gcs_placement_group_scheduler_;
/// Used to update placement group information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// If a placement group is creating
bool is_creating_ = false;
/// A timer that ticks every schedule failure milliseconds.
boost::asio::deadline_timer reschedule_timer_;
};
} // namespace gcs
} // namespace ray
@@ -188,8 +188,8 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode(
if (iter->second.empty()) {
node_to_bundles_when_leasing_.erase(iter);
}
callback(status, reply);
}
callback(status, reply);
}
});
if (!status.ok()) {
@@ -229,7 +229,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
});
}
});
} // namespace gcs
}
std::shared_ptr<ResourceReserveInterface>
GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_address) {
+24
View File
@@ -19,6 +19,7 @@
#include "gcs_job_manager.h"
#include "gcs_node_manager.h"
#include "gcs_object_manager.h"
#include "gcs_placement_group_manager.h"
#include "gcs_worker_manager.h"
#include "ray/common/network_util.h"
#include "ray/common/ray_config.h"
@@ -60,6 +61,9 @@ void GcsServer::Start() {
// Init gcs actor manager.
InitGcsActorManager();
// Init gcs placement group manager.
InitGcsPlacementGroupManager();
// Register rpc service.
gcs_object_manager_ = InitObjectManager();
object_info_service_.reset(
@@ -79,6 +83,10 @@ void GcsServer::Start() {
new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_));
rpc_server_.RegisterService(*actor_info_service_);
placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService(
main_service_, *gcs_placement_group_manager_));
rpc_server_.RegisterService(*placement_group_info_service_);
node_info_service_.reset(
new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_));
rpc_server_.RegisterService(*node_info_service_);
@@ -210,6 +218,22 @@ void GcsServer::InitGcsJobManager() {
});
}
void GcsServer::InitGcsPlacementGroupManager() {
RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr);
auto scheduler = std::make_shared<GcsPlacementGroupScheduler>(
main_service_, gcs_table_storage_, *gcs_node_manager_,
/*lease_client_factory=*/
[this](const rpc::Address &address) {
auto node_manager_worker_client = rpc::NodeManagerWorkerClient::make(
address.ip_address(), address.port(), client_call_manager_);
return std::make_shared<ray::raylet::RayletClient>(
std::move(node_manager_worker_client));
});
gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_, scheduler, gcs_table_storage_);
}
std::unique_ptr<GcsObjectManager> GcsServer::InitObjectManager() {
return std::unique_ptr<GcsObjectManager>(
new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_));
+9
View File
@@ -40,6 +40,7 @@ class GcsNodeManager;
class GcsActorManager;
class GcsJobManager;
class GcsWorkerManager;
class GcsPlacementGroupManager;
/// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent
/// transmit the command to the backend reliable storage for the time being.
@@ -85,6 +86,9 @@ class GcsServer {
/// Initialize the gcs job manager.
virtual void InitGcsJobManager();
/// Initialize the gcs placement group manager.
virtual void InitGcsPlacementGroupManager();
/// The object manager
virtual std::unique_ptr<GcsObjectManager> InitObjectManager();
@@ -122,6 +126,8 @@ class GcsServer {
std::shared_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// The gcs actor manager
std::shared_ptr<GcsActorManager> gcs_actor_manager_;
/// The gcs placement group manager
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
/// Job info handler and service
std::unique_ptr<GcsJobManager> gcs_job_manager_;
std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_;
@@ -145,6 +151,9 @@ class GcsServer {
std::unique_ptr<GcsWorkerManager> gcs_worker_manager_;
/// Worker info service
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Placement Group info handler and service
std::unique_ptr<rpc::PlacementGroupInfoHandler> placement_group_info_handler_;
std::unique_ptr<rpc::PlacementGroupInfoGrpcService> placement_group_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
/// A publisher for publishing gcs messages.
@@ -128,6 +128,7 @@ template class GcsTableWithJobId<TaskID, TaskTableData>;
template class GcsTableWithJobId<TaskID, TaskLeaseData>;
template class GcsTableWithJobId<TaskID, TaskReconstructionData>;
template class GcsTableWithJobId<ObjectID, ObjectTableDataList>;
template class GcsTable<PlacementGroupID, PlacementGroupTableData>;
template class GcsTable<PlacementGroupID, ScheduleData>;
} // namespace gcs
+19 -1
View File
@@ -33,6 +33,7 @@ using rpc::HeartbeatTableData;
using rpc::JobTableData;
using rpc::ObjectTableData;
using rpc::ObjectTableDataList;
using rpc::PlacementGroupTableData;
using rpc::ProfileTableData;
using rpc::ResourceMap;
using rpc::ResourceTableData;
@@ -154,6 +155,15 @@ class GcsActorTable : public GcsTableWithJobId<ActorID, ActorTableData> {
JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); }
};
class GcsPlacementGroupTable
: public GcsTable<PlacementGroupID, PlacementGroupTableData> {
public:
explicit GcsPlacementGroupTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
table_name_ = TablePrefix_Name(TablePrefix::PLACEMENT_GROUP);
}
};
class GcsActorCheckpointTable : public GcsTable<ActorCheckpointID, ActorCheckpointData> {
public:
explicit GcsActorCheckpointTable(std::shared_ptr<StoreClient> &store_client)
@@ -307,6 +317,11 @@ class GcsTableStorage {
return *actor_table_;
}
GcsPlacementGroupTable &PlacementGroupTable() {
RAY_CHECK(placement_group_table_ != nullptr);
return *placement_group_table_;
}
GcsActorCheckpointTable &ActorCheckpointTable() {
RAY_CHECK(actor_checkpoint_table_ != nullptr);
return *actor_checkpoint_table_;
@@ -386,6 +401,7 @@ class GcsTableStorage {
std::shared_ptr<StoreClient> store_client_;
std::unique_ptr<GcsJobTable> job_table_;
std::unique_ptr<GcsActorTable> actor_table_;
std::unique_ptr<GcsPlacementGroupTable> placement_group_table_;
std::unique_ptr<GcsActorCheckpointTable> actor_checkpoint_table_;
std::unique_ptr<GcsActorCheckpointIdTable> actor_checkpoint_id_table_;
std::unique_ptr<GcsTaskTable> task_table_;
@@ -394,8 +410,8 @@ class GcsTableStorage {
std::unique_ptr<GcsObjectTable> object_table_;
std::unique_ptr<GcsNodeTable> node_table_;
std::unique_ptr<GcsNodeResourceTable> node_resource_table_;
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;
std::unique_ptr<GcsHeartbeatTable> heartbeat_table_;
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;
std::unique_ptr<GcsHeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<GcsErrorInfoTable> error_info_table_;
std::unique_ptr<GcsProfileTable> profile_table_;
@@ -412,6 +428,7 @@ class RedisGcsTableStorage : public GcsTableStorage {
store_client_ = std::make_shared<RedisStoreClient>(redis_client);
job_table_.reset(new GcsJobTable(store_client_));
actor_table_.reset(new GcsActorTable(store_client_));
placement_group_table_.reset(new GcsPlacementGroupTable(store_client_));
actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_));
actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_));
task_table_.reset(new GcsTaskTable(store_client_));
@@ -442,6 +459,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage {
store_client_ = std::make_shared<InMemoryStoreClient>(main_io_service);
job_table_.reset(new GcsJobTable(store_client_));
actor_table_.reset(new GcsActorTable(store_client_));
placement_group_table_.reset(new GcsPlacementGroupTable(store_client_));
actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_));
actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_));
task_table_.reset(new GcsTaskTable(store_client_));
@@ -0,0 +1,132 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <ray/gcs/gcs_server/test/gcs_server_test_util.h>
#include <ray/gcs/test/gcs_test_util.h>
#include <memory>
#include "gtest/gtest.h"
namespace ray {
using ::testing::_;
class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface {
public:
MockPlacementGroupScheduler() {}
void Schedule(std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)>
schedule_failure_handler = nullptr,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)>
schedule_success_handler = nullptr) {
placement_groups.push_back(placement_group);
}
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> placement_groups;
};
class GcsPlacementGroupManagerTest : public ::testing::Test {
public:
GcsPlacementGroupManagerTest()
: mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager(
io_service_, mock_placement_group_scheduler_, gcs_table_storage_));
}
boost::asio::io_service io_service_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockPlacementGroupScheduler> mock_placement_group_scheduler_;
std::unique_ptr<gcs::GcsPlacementGroupManager> gcs_placement_group_manager_;
};
TEST_F(GcsPlacementGroupManagerTest, TestBasic) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> finished_placement_groups;
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request,
[&finished_placement_groups](
std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
finished_placement_groups.emplace_back(placement_group);
});
ASSERT_EQ(finished_placement_groups.size(), 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(finished_placement_groups.size(), 1);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE);
}
TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> finished_placement_groups;
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request,
[&finished_placement_groups](
std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
finished_placement_groups.emplace_back(placement_group);
});
ASSERT_EQ(finished_placement_groups.size(), 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.clear();
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(finished_placement_groups.size(), 0);
// Check that the placement_group is in state `ALIVE`.
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(finished_placement_groups.size(), 1);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE);
}
TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) {
auto create_placement_group_request =
Mocker::GenCreatePlacementGroupRequest("test_name");
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> finished_placement_groups;
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request,
[&finished_placement_groups](
std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
finished_placement_groups.emplace_back(placement_group);
});
ASSERT_EQ(finished_placement_groups.size(), 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(finished_placement_groups.size(), 1);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE);
ASSERT_EQ(
gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"),
PlacementGroupID::FromBinary(
create_placement_group_request.placement_group_spec().placement_group_id()));
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -163,8 +163,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
ASSERT_EQ(1, raylet_client_->num_return_requested);
// // Reply the placement_group creation request, then the placement_group should be
// scheduled successfully.
// Reply the placement_group creation request, then the placement_group should be
// scheduled successfully.
ASSERT_EQ(1, failure_placement_groups_.size());
ASSERT_EQ(0, success_placement_groups_.size());
ASSERT_EQ(placement_group, failure_placement_groups_.front());
+5 -4
View File
@@ -64,7 +64,8 @@ struct Mocker {
}
static PlacementGroupSpecification GenPlacementGroupCreation(
const std::string &name, std::vector<rpc::Bundle> &bundles,
const std::string &name,
std::vector<std::unordered_map<std::string, double>> &bundles,
rpc::PlacementStrategy strategy) {
PlacementGroupSpecBuilder builder;
@@ -76,9 +77,10 @@ struct Mocker {
static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest(
const std::string name = "") {
rpc::CreatePlacementGroupRequest request;
std::vector<rpc::Bundle> bundles;
std::vector<std::unordered_map<std::string, double>> bundles;
rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD;
rpc::Bundle bundle;
std::unordered_map<std::string, double> bundle;
bundle["CPU"] = 1.0;
bundles.push_back(bundle);
bundles.push_back(bundle);
auto placement_group_creation_spec =
@@ -87,7 +89,6 @@ struct Mocker {
placement_group_creation_spec.GetMessage());
return request;
}
static std::shared_ptr<rpc::GcsNodeInfo> GenNodeInfo(
uint16_t port = 0, const std::string address = "127.0.0.1") {
auto node = std::make_shared<rpc::GcsNodeInfo>();
+1
View File
@@ -46,6 +46,7 @@ enum TablePrefix {
INTERNAL_CONFIG = 20;
TABLE_PREFIX_MAX = 21;
PLACEMENT_GROUP_SCHEDULE = 22;
PLACEMENT_GROUP = 23;
}
// The channel that Add operations to the Table should be published on, if any.
+7
View File
@@ -162,6 +162,13 @@ service ActorInfoGcsService {
returns (GetActorCheckpointIDReply);
}
// Service for placement group info access.
service PlacementGroupInfoGcsService {
// Create placement group via gcs service
rpc CreatePlacementGroup(CreatePlacementGroupRequest)
returns (CreatePlacementGroupReply);
}
message RegisterNodeRequest {
// Info of node.
GcsNodeInfo node_info = 1;
+6
View File
@@ -247,6 +247,10 @@ class GcsRpcClient {
VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, AddWorkerInfo,
worker_info_grpc_client_, )
/// Create placement group via GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(PlacementGroupInfoGcsService, CreatePlacementGroup,
placement_group_info_grpc_client_, )
private:
std::function<void(GcsServiceFailureType)> gcs_service_failure_detected_;
@@ -259,6 +263,8 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<StatsGcsService>> stats_grpc_client_;
std::unique_ptr<GrpcClient<ErrorInfoGcsService>> error_info_grpc_client_;
std::unique_ptr<GrpcClient<WorkerInfoGcsService>> worker_info_grpc_client_;
std::unique_ptr<GrpcClient<PlacementGroupInfoGcsService>>
placement_group_info_grpc_client_;
};
} // namespace rpc
+39
View File
@@ -44,6 +44,9 @@ namespace rpc {
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER)
#define PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(PlacementGroupInfoGcsService, HANDLER)
#define GCS_RPC_SEND_REPLY(send_reply_callback, reply, status) \
reply->mutable_status()->set_code((int)status.code()); \
reply->mutable_status()->set_message(status.message()); \
@@ -479,6 +482,41 @@ class WorkerInfoGrpcService : public GrpcService {
WorkerInfoGcsServiceHandler &service_handler_;
};
class PlacementGroupInfoGcsServiceHandler {
public:
virtual ~PlacementGroupInfoGcsServiceHandler() = default;
virtual void HandleCreatePlacementGroup(const CreatePlacementGroupRequest &request,
CreatePlacementGroupReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `PlacementGroupInfoGcsService`.
class PlacementGroupInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit PlacementGroupInfoGrpcService(boost::asio::io_service &io_service,
PlacementGroupInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(CreatePlacementGroup);
}
private:
/// The grpc async service object.
PlacementGroupInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
PlacementGroupInfoGcsServiceHandler &service_handler_;
};
using JobInfoHandler = JobInfoGcsServiceHandler;
using ActorInfoHandler = ActorInfoGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
@@ -487,6 +525,7 @@ using TaskInfoHandler = TaskInfoGcsServiceHandler;
using StatsHandler = StatsGcsServiceHandler;
using ErrorInfoHandler = ErrorInfoGcsServiceHandler;
using WorkerInfoHandler = WorkerInfoGcsServiceHandler;
using PlacementGroupInfoHandler = PlacementGroupInfoGcsServiceHandler;
} // namespace rpc
} // namespace ray