diff --git a/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java b/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java index 392bdfdcc..fe4b95dd3 100644 --- a/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java +++ b/java/test/src/main/java/org/ray/api/test/CrossLanguageInvocationTest.java @@ -11,7 +11,6 @@ import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; -import org.ray.api.TestUtils; import org.ray.runtime.actor.NativeRayActor; import org.ray.runtime.actor.NativeRayPyActor; import org.slf4j.Logger; diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7c47e2a27..3fae9e7ac 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -237,7 +237,8 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20) /// Allow up to 5 seconds for connecting to gcs service. /// Note: this only takes effect when gcs service is enabled. RAY_CONFIG(int64_t, gcs_service_connect_retries, 50) -RAY_CONFIG(int64_t, gcs_service_connect_wait_milliseconds, 100) +/// Waiting time for each gcs service connection. +RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100) /// Maximum number of times to retry putting an object when the plasma store is full. /// Can be set to -1 to enable unlimited retries. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index f580a3044..3a3077bc6 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -826,7 +826,6 @@ Status ServiceBasedErrorInfoAccessor::AsyncReportJobError( } RAY_LOG(DEBUG) << "Finished reporting job error, status = " << status << ", job id = " << job_id << ", type = " << type; - ; }); return Status::OK(); } diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 3bf2ce339..ba7f93450 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -22,14 +22,18 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) { RAY_CHECK_OK(redis_gcs_client_->Connect(io_service)); // Get gcs service address - std::pair address; - GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(), - &address); + auto get_server_address = [this]() { + std::pair address; + GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(), + &address); + return address; + }; + std::pair address = get_server_address(); // Connect to gcs service client_call_manager_.reset(new rpc::ClientCallManager(io_service)); - gcs_rpc_client_.reset( - new rpc::GcsRpcClient(address.first, address.second, *client_call_manager_)); + gcs_rpc_client_.reset(new rpc::GcsRpcClient(address.first, address.second, + *client_call_manager_, get_server_address)); job_accessor_.reset(new ServiceBasedJobInfoAccessor(this)); actor_accessor_.reset(new ServiceBasedActorInfoAccessor(this)); @@ -65,7 +69,7 @@ void ServiceBasedGcsClient::GetGcsServerAddressFromRedis( // Sleep for a little, and try again if the entry isn't there yet. freeReplyObject(reply); - usleep(RayConfig::instance().gcs_service_connect_wait_milliseconds() * 1000); + usleep(RayConfig::instance().internal_gcs_service_connect_wait_milliseconds() * 1000); num_attempts++; } RAY_CHECK(num_attempts < RayConfig::instance().gcs_service_connect_retries()) diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index e5f9166c3..9a710409b 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -14,7 +14,6 @@ static std::string libray_redis_module_path; class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { public: void SetUp() override { - gcs::GcsServerConfig config; config.grpc_server_port = 0; config.grpc_server_name = "MockedGcsServer"; config.grpc_server_thread_num = 1; @@ -329,6 +328,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { } // Gcs server + gcs::GcsServerConfig config; std::unique_ptr gcs_server_; std::unique_ptr thread_io_service_; std::unique_ptr thread_gcs_server_; @@ -628,6 +628,30 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestTaskInfo) { ASSERT_TRUE(AttemptTaskReconstruction(task_reconstruction_data)); } +TEST_F(ServiceBasedGcsGcsClientTest, TestDetectGcsAvailability) { + // Create job_table_data + JobID add_job_id = JobID::FromInt(1); + auto job_table_data = GenJobTableData(add_job_id); + + RAY_LOG(INFO) << "Gcs service init port = " << gcs_server_->GetPort(); + gcs_server_->Stop(); + thread_gcs_server_->join(); + + gcs_server_.reset(new gcs::GcsServer(config)); + thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); + + // Wait until server starts listening. + while (gcs_server_->GetPort() == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + RAY_LOG(INFO) << "Gcs service restart success, port = " << gcs_server_->GetPort(); + + std::promise promise; + RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd( + job_table_data, [&promise](Status status) { promise.set_value(status.ok()); })); + promise.get_future().get(); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 92efeafb7..3a0bc17a4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -604,8 +604,6 @@ TEST_F(GcsServerTest, TestTaskInfo) { rpc::DeleteTasksRequest delete_tasks_request; delete_tasks_request.add_task_id_list(task_id.Binary()); ASSERT_TRUE(DeleteTasks(delete_tasks_request)); - result = GetTask(task_id.Binary()); - ASSERT_TRUE(!result.has_task()); // Add task lease ClientID node_id = ClientID::FromRandom(); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 907a3f3c3..6c94bb76c 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -162,9 +162,6 @@ int main(int argc, char *argv[]) { ray::gcs::GcsClientOptions client_options(redis_address, redis_port, redis_password); std::shared_ptr gcs_client; - std::unique_ptr thread_io_service; - boost::asio::io_service io_service; - // RAY_GCS_SERVICE_ENABLED only set in ci job, so we just check if it is null. if (getenv("RAY_GCS_SERVICE_ENABLED") != nullptr) { gcs_client = std::make_shared(client_options); diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index c9237c7f8..cad1bc8b8 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -7,6 +7,52 @@ namespace ray { namespace rpc { +class GcsRpcClient; + +/// \class Executor +/// Executor saves operation and support retries. +class Executor { + public: + explicit Executor(GcsRpcClient *gcs_rpc_client) : gcs_rpc_client_(gcs_rpc_client) {} + + /// This function is used to execute the given operation. + /// + /// \param operation The operation to be executed. + void Execute(const std::function &operation) { + operation_ = operation; + operation(gcs_rpc_client_); + } + + /// This function is used to retry the given operation. + void Retry() { operation_(gcs_rpc_client_); } + + private: + GcsRpcClient *gcs_rpc_client_; + std::function operation_; +}; + +// Define a void GCS RPC client method. +#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \ + void METHOD(const METHOD##Request &request, \ + const ClientCallback &callback) SPECS { \ + auto executor = new Executor(this); \ + auto operation_callback = [this, request, callback, executor]( \ + const Status &status, const METHOD##Reply &reply) { \ + if (!status.IsIOError()) { \ + callback(status, reply); \ + delete executor; \ + } else { \ + Reconnect(); \ + executor->Retry(); \ + } \ + }; \ + auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \ + RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \ + gcs_rpc_client->grpc_client)); \ + }; \ + executor->Execute(operation); \ + } + /// Client used for communicating with gcs server. class GcsRpcClient { public: @@ -15,8 +61,115 @@ class GcsRpcClient { /// \param[in] address Address of gcs server. /// \param[in] port Port of the gcs server. /// \param[in] client_call_manager The `ClientCallManager` used for managing requests. + /// \param[in] get_server_address The function used for getting address when reconnect + /// rpc server. GcsRpcClient(const std::string &address, const int port, - ClientCallManager &client_call_manager) { + ClientCallManager &client_call_manager, + std::function()> get_server_address = nullptr) + : client_call_manager_(client_call_manager), + get_server_address_(std::move(get_server_address)) { + Init(address, port, client_call_manager); + }; + + /// Add job info to gcs server. + VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, AddJob, job_info_grpc_client_, ) + + /// Mark job as finished to gcs server. + VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, MarkJobFinished, job_info_grpc_client_, ) + + /// Get actor data from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorInfo, actor_info_grpc_client_, ) + + /// Register an actor to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActorInfo, + actor_info_grpc_client_, ) + + /// Update actor info in GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, UpdateActorInfo, + actor_info_grpc_client_, ) + + /// Add actor checkpoint data to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, AddActorCheckpoint, + actor_info_grpc_client_, ) + + /// Get actor checkpoint data from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpoint, + actor_info_grpc_client_, ) + + /// Get actor checkpoint id data from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpointID, + actor_info_grpc_client_, ) + + /// Register a node to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, ) + + /// Unregister a node from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, UnregisterNode, node_info_grpc_client_, ) + + /// Get information of all nodes from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, ) + + /// Report heartbeat of a node to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat, + node_info_grpc_client_, ) + + /// Report batch heartbeat to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportBatchHeartbeat, + node_info_grpc_client_, ) + + /// Get node's resources from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetResources, node_info_grpc_client_, ) + + /// Update resources of a node in GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, UpdateResources, + node_info_grpc_client_, ) + + /// Delete resources of a node in GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, DeleteResources, + node_info_grpc_client_, ) + + /// Get object's locations from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations, + object_info_grpc_client_, ) + + /// Add location of object to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, AddObjectLocation, + object_info_grpc_client_, ) + + /// Remove location of object to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, RemoveObjectLocation, + object_info_grpc_client_, ) + + /// Add a task to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTask, task_info_grpc_client_, ) + + /// Get task information from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTask, task_info_grpc_client_, ) + + /// Delete tasks from GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, DeleteTasks, task_info_grpc_client_, ) + + /// Add a task lease to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, ) + + /// Attempt task reconstruction to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction, + task_info_grpc_client_, ) + + /// Add profile data to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, ) + + /// Report a job error to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, + error_info_grpc_client_, ) + + /// Report a worker failure to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure, + worker_info_grpc_client_, ) + + private: + void Init(const std::string &address, const int port, + ClientCallManager &client_call_manager) { job_info_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); actor_info_grpc_client_ = std::unique_ptr>( @@ -33,100 +186,18 @@ class GcsRpcClient { new GrpcClient(address, port, client_call_manager)); worker_info_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); - }; + } - /// Add job info to gcs server. - VOID_RPC_CLIENT_METHOD(JobInfoGcsService, AddJob, job_info_grpc_client_, ) + void Reconnect() { + if (get_server_address_) { + auto address = get_server_address_(); + Init(address.first, address.second, client_call_manager_); + } + } - /// Mark job as finished to gcs server. - VOID_RPC_CLIENT_METHOD(JobInfoGcsService, MarkJobFinished, job_info_grpc_client_, ) + ClientCallManager &client_call_manager_; + std::function()> get_server_address_; - /// Get actor data from GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorInfo, actor_info_grpc_client_, ) - - /// Register an actor to GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActorInfo, - actor_info_grpc_client_, ) - - /// Update actor info in GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, UpdateActorInfo, actor_info_grpc_client_, ) - - /// Add actor checkpoint data to GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, AddActorCheckpoint, - actor_info_grpc_client_, ) - - /// Get actor checkpoint data from GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpoint, - actor_info_grpc_client_, ) - - /// Get actor checkpoint id data from GCS Service. - VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpointID, - actor_info_grpc_client_, ) - - /// Register a node to GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, ) - - /// Unregister a node from GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, UnregisterNode, node_info_grpc_client_, ) - - /// Get information of all nodes from GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, ) - - /// Report heartbeat of a node to GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat, node_info_grpc_client_, ) - - /// Report batch heartbeat to GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportBatchHeartbeat, - node_info_grpc_client_, ) - - /// Get node's resources from GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, GetResources, node_info_grpc_client_, ) - - /// Update resources of a node in GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, UpdateResources, node_info_grpc_client_, ) - - /// Delete resources of a node in GCS Service. - VOID_RPC_CLIENT_METHOD(NodeInfoGcsService, DeleteResources, node_info_grpc_client_, ) - - /// Get object's locations from GCS Service. - VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations, - object_info_grpc_client_, ) - - /// Add location of object to GCS Service. - VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, AddObjectLocation, - object_info_grpc_client_, ) - - /// Remove location of object to GCS Service. - VOID_RPC_CLIENT_METHOD(ObjectInfoGcsService, RemoveObjectLocation, - object_info_grpc_client_, ) - - /// Add a task to GCS Service. - VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTask, task_info_grpc_client_, ) - - /// Get task information from GCS Service. - VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTask, task_info_grpc_client_, ) - - /// Delete tasks from GCS Service. - VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, DeleteTasks, task_info_grpc_client_, ) - - /// Add a task lease to GCS Service. - VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, ) - - /// Attempt task reconstruction to GCS Service. - VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction, - task_info_grpc_client_, ) - - /// Add profile data to GCS Service. - VOID_RPC_CLIENT_METHOD(StatsGcsService, AddProfileData, stats_grpc_client_, ) - - /// Report a job error to GCS Service. - VOID_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, error_info_grpc_client_, ) - - /// Report a worker failure to GCS Service. - VOID_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure, - worker_info_grpc_client_, ) - - private: /// The gRPC-generated stub. std::unique_ptr> job_info_grpc_client_; std::unique_ptr> actor_info_grpc_client_;