diff --git a/src/ray/common/network_util.cc b/src/ray/common/network_util.cc index a628de50f..857db87ed 100644 --- a/src/ray/common/network_util.cc +++ b/src/ray/common/network_util.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "network_util.h" + #include "ray/util/logging.h" std::string GetValidLocalIp(int port, int64_t timeout_ms) { @@ -71,3 +72,13 @@ bool Ping(const std::string &ip, int port, int64_t timeout_ms) { bool is_timeout; return client.Connect(ip, port, timeout_ms, &is_timeout); } + +bool CheckFree(int port) { + boost::asio::io_service io_service; + tcp::socket socket(io_service); + socket.open(boost::asio::ip::tcp::v4()); + boost::system::error_code ec; + socket.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), ec); + socket.close(); + return !ec.failed(); +} diff --git a/src/ray/common/network_util.h b/src/ray/common/network_util.h index 56776888c..a9898b87a 100644 --- a/src/ray/common/network_util.h +++ b/src/ray/common/network_util.h @@ -20,6 +20,7 @@ #include #include #include + #include "constants.h" using boost::asio::deadline_timer; @@ -123,4 +124,5 @@ std::string GetValidLocalIp(int port, int64_t timeout_ms); /// \return Whether target rpc server is valid. bool Ping(const std::string &ip, int port, int64_t timeout_ms); +bool CheckFree(int port); #endif // RAY_COMMON_NETWORK_UTIL_H diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 92ef70650..e063217a9 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -116,6 +116,13 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) // limit in Ray to avoid crashing with many small inlined task arguments. RAY_CONFIG(int64_t, max_grpc_message_size, 100 * 1024 * 1024) +// Number of times to retry creating a gRPC server. +RAY_CONFIG(int64_t, grpc_server_num_retries, 1) + +// Retry timeout for trying to create a gRPC server. Only applies if the number +// of retries is non zero. +RAY_CONFIG(int64_t, grpc_server_retry_timeout_milliseconds, 1000) + // The min number of retries for direct actor creation tasks. The actual number // of creation retries will be MAX(actor_creation_min_retries, max_restarts). RAY_CONFIG(uint64_t, actor_creation_min_retries, 3) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index ad24c9f2d..e5430979a 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -18,6 +18,7 @@ #include #include "ray/common/constants.h" +#include "ray/common/network_util.h" #include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/gcs/pb_util.h" @@ -296,17 +297,26 @@ Process WorkerPool::StartProcess(const std::vector &worker_command_ } Status WorkerPool::GetNextFreePort(int *port) { - if (free_ports_) { - if (free_ports_->empty()) { - return Status::Invalid( - "Ran out of ports to allocate to workers. Please specify a wider port range."); - } + if (!free_ports_) { + *port = 0; + return Status::OK(); + } + + // Try up to the current number of ports. + int current_size = free_ports_->size(); + for (int i = 0; i < current_size; i++) { *port = free_ports_->front(); free_ports_->pop(); - } else { - *port = 0; + if (CheckFree(*port)) { + return Status::OK(); + } + // Return to pool to check later. + free_ports_->push(*port); } - return Status::OK(); + *port = -1; + return Status::Invalid( + "No available ports. Please specify a wider port range using --min-worker-port and " + "--max-worker-port."); } void WorkerPool::MarkPortAsFree(int port) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 50d18cfc3..7f80fb9d5 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -261,6 +261,9 @@ class WorkerPool { /// Get the next unallocated port in the free ports list. If a port range isn't /// configured, returns 0. + /// NOTE: Ray does not 'reserve' these ports from being used by other services. + /// There is a race condition where another service binds to the port sometime + /// after this function returns and before the Worker/Driver uses the port. /// \param[out] port The next available port. Status GetNextFreePort(int *port); diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 932ad368f..141ee426c 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -31,32 +31,40 @@ GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads) void GrpcServer::Run() { uint32_t specified_port = port_; std::string server_address("0.0.0.0:" + std::to_string(port_)); + int num_retries = RayConfig::instance().grpc_server_num_retries(); + while (num_retries >= 0) { + grpc::ServerBuilder builder; + // Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled + // (default behavior in grpc), we may see multiple workers listen on the same port and + // the requests sent to this port may be handled by any of the workers. + builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0); + builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, + RayConfig::instance().max_grpc_message_size()); + builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, + RayConfig::instance().max_grpc_message_size()); + // TODO(hchen): Add options for authentication. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_); + // Register all the services to this server. + if (services_.empty()) { + RAY_LOG(WARNING) << "No service is found when start grpc server " << name_; + } + for (auto &entry : services_) { + builder.RegisterService(&entry.get()); + } + // Get hold of the completion queue used for the asynchronous communication + // with the gRPC runtime. + for (int i = 0; i < num_threads_; i++) { + cqs_[i] = builder.AddCompletionQueue(); + } + // Build and start server. + server_ = builder.BuildAndStart(); + if (port_ > 0) { + break; + } + usleep(RayConfig::instance().grpc_server_retry_timeout_milliseconds() * 1000); + num_retries--; + } - grpc::ServerBuilder builder; - // Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled - // (default behavior in grpc), we may see multiple workers listen on the same port and - // the requests sent to this port may be handled by any of the workers. - builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0); - builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, - RayConfig::instance().max_grpc_message_size()); - builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, - RayConfig::instance().max_grpc_message_size()); - // TODO(hchen): Add options for authentication. - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_); - // Register all the services to this server. - if (services_.empty()) { - RAY_LOG(WARNING) << "No service is found when start grpc server " << name_; - } - for (auto &entry : services_) { - builder.RegisterService(&entry.get()); - } - // Get hold of the completion queue used for the asynchronous communication - // with the gRPC runtime. - for (int i = 0; i < num_threads_; i++) { - cqs_[i] = builder.AddCompletionQueue(); - } - // Build and start server. - server_ = builder.BuildAndStart(); // If the grpc server failed to bind the port, the `port_` will be set to 0. RAY_CHECK(port_ > 0) << "Port " << specified_port