ServiceBasedGcsClient support detect gcs server availability and retry (#7292)

This commit is contained in:
fangfengbin
2020-03-10 21:01:07 +08:00
committed by GitHub
parent fc76586518
commit fa785a2ad2
8 changed files with 200 additions and 107 deletions
@@ -11,7 +11,6 @@ import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.TestUtils;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.actor.NativeRayPyActor;
import org.slf4j.Logger;
+2 -1
View File
@@ -237,7 +237,8 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20)
/// Allow up to 5 seconds for connecting to gcs service.
/// Note: this only takes effect when gcs service is enabled.
RAY_CONFIG(int64_t, gcs_service_connect_retries, 50)
RAY_CONFIG(int64_t, gcs_service_connect_wait_milliseconds, 100)
/// Waiting time for each gcs service connection.
RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
@@ -826,7 +826,6 @@ Status ServiceBasedErrorInfoAccessor::AsyncReportJobError(
}
RAY_LOG(DEBUG) << "Finished reporting job error, status = " << status
<< ", job id = " << job_id << ", type = " << type;
;
});
return Status::OK();
}
@@ -22,14 +22,18 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
RAY_CHECK_OK(redis_gcs_client_->Connect(io_service));
// Get gcs service address
std::pair<std::string, int> address;
GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(),
&address);
auto get_server_address = [this]() {
std::pair<std::string, int> address;
GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(),
&address);
return address;
};
std::pair<std::string, int> address = get_server_address();
// Connect to gcs service
client_call_manager_.reset(new rpc::ClientCallManager(io_service));
gcs_rpc_client_.reset(
new rpc::GcsRpcClient(address.first, address.second, *client_call_manager_));
gcs_rpc_client_.reset(new rpc::GcsRpcClient(address.first, address.second,
*client_call_manager_, get_server_address));
job_accessor_.reset(new ServiceBasedJobInfoAccessor(this));
actor_accessor_.reset(new ServiceBasedActorInfoAccessor(this));
@@ -65,7 +69,7 @@ void ServiceBasedGcsClient::GetGcsServerAddressFromRedis(
// Sleep for a little, and try again if the entry isn't there yet.
freeReplyObject(reply);
usleep(RayConfig::instance().gcs_service_connect_wait_milliseconds() * 1000);
usleep(RayConfig::instance().internal_gcs_service_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().gcs_service_connect_retries())
@@ -14,7 +14,6 @@ static std::string libray_redis_module_path;
class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
public:
void SetUp() override {
gcs::GcsServerConfig config;
config.grpc_server_port = 0;
config.grpc_server_name = "MockedGcsServer";
config.grpc_server_thread_num = 1;
@@ -329,6 +328,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
}
// Gcs server
gcs::GcsServerConfig config;
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> thread_io_service_;
std::unique_ptr<std::thread> thread_gcs_server_;
@@ -628,6 +628,30 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestTaskInfo) {
ASSERT_TRUE(AttemptTaskReconstruction(task_reconstruction_data));
}
TEST_F(ServiceBasedGcsGcsClientTest, TestDetectGcsAvailability) {
// Create job_table_data
JobID add_job_id = JobID::FromInt(1);
auto job_table_data = GenJobTableData(add_job_id);
RAY_LOG(INFO) << "Gcs service init port = " << gcs_server_->GetPort();
gcs_server_->Stop();
thread_gcs_server_->join();
gcs_server_.reset(new gcs::GcsServer(config));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
// Wait until server starts listening.
while (gcs_server_->GetPort() == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
RAY_LOG(INFO) << "Gcs service restart success, port = " << gcs_server_->GetPort();
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(
job_table_data, [&promise](Status status) { promise.set_value(status.ok()); }));
promise.get_future().get();
}
} // namespace ray
int main(int argc, char **argv) {
@@ -604,8 +604,6 @@ TEST_F(GcsServerTest, TestTaskInfo) {
rpc::DeleteTasksRequest delete_tasks_request;
delete_tasks_request.add_task_id_list(task_id.Binary());
ASSERT_TRUE(DeleteTasks(delete_tasks_request));
result = GetTask(task_id.Binary());
ASSERT_TRUE(!result.has_task());
// Add task lease
ClientID node_id = ClientID::FromRandom();
-3
View File
@@ -162,9 +162,6 @@ int main(int argc, char *argv[]) {
ray::gcs::GcsClientOptions client_options(redis_address, redis_port, redis_password);
std::shared_ptr<ray::gcs::GcsClient> gcs_client;
std::unique_ptr<std::thread> thread_io_service;
boost::asio::io_service io_service;
// RAY_GCS_SERVICE_ENABLED only set in ci job, so we just check if it is null.
if (getenv("RAY_GCS_SERVICE_ENABLED") != nullptr) {
gcs_client = std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options);
+163 -92
View File
@@ -7,6 +7,52 @@
namespace ray {
namespace rpc {
class GcsRpcClient;
/// \class Executor
/// Executor saves operation and support retries.
class Executor {
public:
explicit Executor(GcsRpcClient *gcs_rpc_client) : gcs_rpc_client_(gcs_rpc_client) {}
/// This function is used to execute the given operation.
///
/// \param operation The operation to be executed.
void Execute(const std::function<void(GcsRpcClient *gcs_rpc_client)> &operation) {
operation_ = operation;
operation(gcs_rpc_client_);
}
/// This function is used to retry the given operation.
void Retry() { operation_(gcs_rpc_client_); }
private:
GcsRpcClient *gcs_rpc_client_;
std::function<void(GcsRpcClient *gcs_rpc_client)> operation_;
};
// Define a void GCS RPC client method.
#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback) SPECS { \
auto executor = new Executor(this); \
auto operation_callback = [this, request, callback, executor]( \
const Status &status, const METHOD##Reply &reply) { \
if (!status.IsIOError()) { \
callback(status, reply); \
delete executor; \
} else { \
Reconnect(); \
executor->Retry(); \
} \
}; \
auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \
RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \
gcs_rpc_client->grpc_client)); \
}; \
executor->Execute(operation); \
}
/// Client used for communicating with gcs server.
class GcsRpcClient {
public:
@@ -15,8 +61,115 @@ class GcsRpcClient {
/// \param[in] address Address of gcs server.
/// \param[in] port Port of the gcs server.
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
/// \param[in] get_server_address The function used for getting address when reconnect
/// rpc server.
GcsRpcClient(const std::string &address, const int port,
ClientCallManager &client_call_manager) {
ClientCallManager &client_call_manager,
std::function<std::pair<std::string, int>()> get_server_address = nullptr)
: client_call_manager_(client_call_manager),
get_server_address_(std::move(get_server_address)) {
Init(address, port, client_call_manager);
};
/// Add job info to gcs server.
VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, AddJob, job_info_grpc_client_, )
/// Mark job as finished to gcs server.
VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, MarkJobFinished, job_info_grpc_client_, )
/// Get actor data from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorInfo, actor_info_grpc_client_, )
/// Register an actor to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActorInfo,
actor_info_grpc_client_, )
/// Update actor info in GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, UpdateActorInfo,
actor_info_grpc_client_, )
/// Add actor checkpoint data to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, AddActorCheckpoint,
actor_info_grpc_client_, )
/// Get actor checkpoint data from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpoint,
actor_info_grpc_client_, )
/// Get actor checkpoint id data from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpointID,
actor_info_grpc_client_, )
/// Register a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, )
/// Unregister a node from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, UnregisterNode, node_info_grpc_client_, )
/// Get information of all nodes from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, )
/// Report heartbeat of a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat,
node_info_grpc_client_, )
/// Report batch heartbeat to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportBatchHeartbeat,
node_info_grpc_client_, )
/// Get node's resources from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetResources, node_info_grpc_client_, )
/// Update resources of a node in GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, UpdateResources,
node_info_grpc_client_, )
/// Delete resources of a node in GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, DeleteResources,
node_info_grpc_client_, )
/// Get object's locations from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations,
object_info_grpc_client_, )
/// Add location of object to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, AddObjectLocation,
object_info_grpc_client_, )
/// Remove location of object to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, RemoveObjectLocation,
object_info_grpc_client_, )
/// Add a task to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTask, task_info_grpc_client_, )
/// Get task information from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTask, task_info_grpc_client_, )
/// Delete tasks from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, DeleteTasks, task_info_grpc_client_, )
/// Add a task lease to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, )
/// Attempt task reconstruction to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction,
task_info_grpc_client_, )
/// Add profile data to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, )
/// Report a job error to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError,
error_info_grpc_client_, )
/// Report a worker failure to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure,
worker_info_grpc_client_, )
private:
void Init(const std::string &address, const int port,
ClientCallManager &client_call_manager) {
job_info_grpc_client_ = std::unique_ptr<GrpcClient<JobInfoGcsService>>(
new GrpcClient<JobInfoGcsService>(address, port, client_call_manager));
actor_info_grpc_client_ = std::unique_ptr<GrpcClient<ActorInfoGcsService>>(
@@ -33,100 +186,18 @@ class GcsRpcClient {
new GrpcClient<ErrorInfoGcsService>(address, port, client_call_manager));
worker_info_grpc_client_ = std::unique_ptr<GrpcClient<WorkerInfoGcsService>>(
new GrpcClient<WorkerInfoGcsService>(address, port, client_call_manager));
};
}
/// Add job info to gcs server.
VOID_RPC_CLIENT_METHOD(JobInfoGcsService, AddJob, job_info_grpc_client_, )
void Reconnect() {
if (get_server_address_) {
auto address = get_server_address_();
Init(address.first, address.second, client_call_manager_);
}
}
/// Mark job as finished to gcs server.
VOID_RPC_CLIENT_METHOD(JobInfoGcsService, MarkJobFinished, job_info_grpc_client_, )
ClientCallManager &client_call_manager_;
std::function<std::pair<std::string, int>()> get_server_address_;
/// Get actor data from GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorInfo, actor_info_grpc_client_, )
/// Register an actor to GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActorInfo,
actor_info_grpc_client_, )
/// Update actor info in GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, UpdateActorInfo, actor_info_grpc_client_, )
/// Add actor checkpoint data to GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, AddActorCheckpoint,
actor_info_grpc_client_, )
/// Get actor checkpoint data from GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpoint,
actor_info_grpc_client_, )
/// Get actor checkpoint id data from GCS Service.
VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpointID,
actor_info_grpc_client_, )
/// Register a node to GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, )
/// Unregister a node from GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, UnregisterNode, node_info_grpc_client_, )
/// Get information of all nodes from GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, )
/// Report heartbeat of a node to GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat, node_info_grpc_client_, )
/// Report batch heartbeat to GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportBatchHeartbeat,
node_info_grpc_client_, )
/// Get node's resources from GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, GetResources, node_info_grpc_client_, )
/// Update resources of a node in GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, UpdateResources, node_info_grpc_client_, )
/// Delete resources of a node in GCS Service.
VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, DeleteResources, node_info_grpc_client_, )
/// Get object's locations from GCS Service.
VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations,
object_info_grpc_client_, )
/// Add location of object to GCS Service.
VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, AddObjectLocation,
object_info_grpc_client_, )
/// Remove location of object to GCS Service.
VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, RemoveObjectLocation,
object_info_grpc_client_, )
/// Add a task to GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTask, task_info_grpc_client_, )
/// Get task information from GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTask, task_info_grpc_client_, )
/// Delete tasks from GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, DeleteTasks, task_info_grpc_client_, )
/// Add a task lease to GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, )
/// Attempt task reconstruction to GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction,
task_info_grpc_client_, )
/// Add profile data to GCS Service.
VOID_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, )
/// Report a job error to GCS Service.
VOID_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, error_info_grpc_client_, )
/// Report a worker failure to GCS Service.
VOID_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure,
worker_info_grpc_client_, )
private:
/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
std::unique_ptr<GrpcClient<ActorInfoGcsService>> actor_info_grpc_client_;