From e69664b74bc98dccfc919e89843990fb065767d3 Mon Sep 17 00:00:00 2001 From: mehrdadn Date: Thu, 19 Mar 2020 19:32:53 -0700 Subject: [PATCH] Miscellaneous Windows compatibility bugfixes (#7658) * Windows compatibility bug fixes * Use WSASend/WSARecv as WSASendMsg/WSARecvMsg do not work with TCP sockets * Clean up some TODOs * Fix duplicate compilations * RedisAsioClient boost::asio::error::connection_reset Co-authored-by: Mehrdad --- BUILD.bazel | 2 +- python/ray/ray_process_reaper.py | 1 - src/ray/core_worker/core_worker.cc | 4 +- src/ray/gcs/asio.cc | 6 +- src/ray/gcs/callback.h | 5 -- .../object_store_notification_manager.cc | 2 +- src/ray/raylet/lineage_cache.h | 5 -- src/ray/raylet/node_manager.cc | 2 +- src/shims/windows/msg.cc | 64 +++++-------------- src/shims/windows/socketpair.cc | 2 +- 10 files changed, 25 insertions(+), 68 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index d2f36e6f9..7336b5ee6 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -384,7 +384,7 @@ cc_library( ], exclude = [ "src/ray/raylet/mock_gcs_client.cc", - "src/ray/raylet/monitor_main.cc", + "src/ray/raylet/monitor*.cc", "src/ray/raylet/*_test.cc", "src/ray/raylet/main.cc", ], diff --git a/python/ray/ray_process_reaper.py b/python/ray/ray_process_reaper.py index f804242c4..79c754207 100644 --- a/python/ray/ray_process_reaper.py +++ b/python/ray/ray_process_reaper.py @@ -29,7 +29,6 @@ def reap_process_group(*args): signal.signal(signal.SIGTERM, sigterm_handler) # Our parent must have died, SIGTERM the group (including ourselves). - # TODO(mehrdadn): killpg isn't supported on Windows. os.killpg(0, signal.SIGTERM) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5ef8f2018..dfa229735 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -334,9 +334,7 @@ void CoreWorker::Exit(bool intentional) { } void CoreWorker::RunIOService() { -#ifdef _WIN32 - // TODO(mehrdadn): Is there an equivalent for Windows we need here? -#else +#ifndef _WIN32 // Block SIGINT and SIGTERM so they will be handled by the main thread. sigset_t mask; sigemptyset(&mask); diff --git a/src/ray/gcs/asio.cc b/src/ray/gcs/asio.cc index 1d51cec72..2ff3f82a4 100644 --- a/src/ray/gcs/asio.cc +++ b/src/ray/gcs/asio.cc @@ -78,7 +78,8 @@ void RedisAsioClient::operate() { } void RedisAsioClient::handle_read(boost::system::error_code error_code) { - RAY_CHECK(!error_code || error_code == boost::asio::error::would_block); + RAY_CHECK(!error_code || error_code == boost::asio::error::would_block || + error_code == boost::asio::error::connection_reset); read_in_progress_ = false; redis_async_context_.RedisAsyncHandleRead(); @@ -88,7 +89,8 @@ void RedisAsioClient::handle_read(boost::system::error_code error_code) { } void RedisAsioClient::handle_write(boost::system::error_code error_code) { - RAY_CHECK(!error_code || error_code == boost::asio::error::would_block); + RAY_CHECK(!error_code || error_code == boost::asio::error::would_block || + error_code == boost::asio::error::connection_reset); write_in_progress_ = false; redis_async_context_.RedisAsyncHandleWrite(); diff --git a/src/ray/gcs/callback.h b/src/ray/gcs/callback.h index fdbffb1a4..7545058f6 100644 --- a/src/ray/gcs/callback.h +++ b/src/ray/gcs/callback.h @@ -15,11 +15,6 @@ #ifndef RAY_GCS_CALLBACK_H #define RAY_GCS_CALLBACK_H -#if defined(__clang__) && defined(_MSC_VER) -// TODO(mehrdadn): Remove this Windows (clang-cl) workaround once we upgrade to -// Boost > 1.68: https://lists.boost.org/Archives/boost/2018/09/243420.php -#include -#endif #include #include #include "ray/common/status.h" diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 658b7c319..9615f7453 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -65,7 +65,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( #else socket_.assign(local_stream_protocol(), c_socket, ec); #endif - assert(!ec.value()); + RAY_CHECK(!ec); NotificationWait(); } diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 3aa73c5cb..2b58ada17 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -16,11 +16,6 @@ #define RAY_RAYLET_LINEAGE_CACHE_H #include -#if defined(__clang__) && defined(_MSC_VER) -// TODO(mehrdadn): Remove this Windows (clang-cl) workaround once we upgrade to -// Boost > 1.68: https://lists.boost.org/Archives/boost/2018/09/243420.php -#include -#endif #include #include "ray/common/id.h" diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6ffcf0a86..7ca684766 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -244,7 +244,7 @@ ray::Status NodeManager::RegisterGcs() { void NodeManager::KillWorker(std::shared_ptr worker) { #ifdef _WIN32 - // TODO(mehrdadn): Implement implement graceful process termination mechanism + // TODO(mehrdadn): implement graceful process termination mechanism #else // If we're just cleaning up a single worker, allow it some time to clean // up its state before force killing. The client socket will be closed diff --git a/src/shims/windows/msg.cc b/src/shims/windows/msg.cc index f00581bda..08b8492ca 100644 --- a/src/shims/windows/msg.cc +++ b/src/shims/windows/msg.cc @@ -10,7 +10,7 @@ int socketpair(int domain, int type, int protocol, int sv[2]) { if ((domain != AF_UNIX && domain != AF_INET) || type != SOCK_STREAM) { - return (int)INVALID_SOCKET; + return -1; } SOCKET sockets[2]; int r = dumb_socketpair(sockets); @@ -76,10 +76,7 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { * that the child process closes AND that its process ID is reassigned * to another existing process. */ - struct msghdr const old_msg = *msg; int *const pfd = (int *)CMSG_DATA(header); - msg->msg_control = NULL; - msg->msg_controllen = 0; WSAPROTOCOL_INFO protocol_info = {0}; /* assume socket if it's a pipe, until proven otherwise */ BOOL is_socket = GetFileType((HANDLE)(SOCKET)(*pfd)) == FILE_TYPE_PIPE; @@ -104,31 +101,17 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { } } if (result == 0) { - int const nbufs = msg->dwBufferCount + 1; - WSABUF *const bufs = (struct _WSABUF *)_alloca(sizeof(*msg->lpBuffers) * nbufs); + DWORD const nbufs = msg->dwBufferCount + 1; + WSABUF *const bufs = (WSABUF *)_alloca(sizeof(*msg->lpBuffers) * nbufs); bufs[0].buf = (char *)&protocol_info; bufs[0].len = sizeof(protocol_info); memcpy(&bufs[1], msg->lpBuffers, msg->dwBufferCount * sizeof(*msg->lpBuffers)); DWORD nb; - msg->lpBuffers = bufs; - msg->dwBufferCount = nbufs; - GUID wsaid_WSASendMsg = { - 0xa441e712, 0x754f, 0x43ca, {0x84, 0xa7, 0x0d, 0xee, 0x44, 0xcf, 0x60, 0x6d}}; - typedef INT PASCAL WSASendMsg_t( - SOCKET s, LPWSAMSG lpMsg, DWORD dwFlags, LPDWORD lpNumberOfBytesSent, - LPWSAOVERLAPPED lpOverlapped, - LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine); - WSASendMsg_t *WSASendMsg = NULL; - result = WSAIoctl(sockfd, SIO_GET_EXTENSION_FUNCTION_POINTER, &wsaid_WSASendMsg, - sizeof(wsaid_WSASendMsg), &WSASendMsg, sizeof(WSASendMsg), &nb, - NULL, 0); - if (result == 0) { - result = (*WSASendMsg)(sockfd, msg, flags, &nb, NULL, NULL) == 0 - ? (ssize_t)(nb - sizeof(protocol_info)) - : 0; - } + result = WSASend(sockfd, bufs, nbufs, &nb, flags, NULL, NULL) == 0 + ? (ssize_t)(nb - sizeof(protocol_info)) + : -1; } - if (result != 0 && target_process && !is_socket) { + if (result == -1 && target_process && !is_socket) { /* we failed to send the handle, and it needs cleaning up! */ HANDLE duplicated_back = NULL; if (DuplicateHandle(target_process, *(HANDLE *)&protocol_info, GetCurrentProcess(), @@ -139,7 +122,6 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { if (target_process) { CloseHandle(target_process); } - *msg = old_msg; } return result; } @@ -148,43 +130,29 @@ ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) { int result = -1; struct cmsghdr *header = CMSG_FIRSTHDR(msg); if (msg->msg_controllen && flags == 0 /* We can't send flags on Windows... */) { - struct msghdr const old_msg = *msg; - msg->msg_control = NULL; - msg->msg_controllen = 0; WSAPROTOCOL_INFO protocol_info = {0}; - int const nbufs = msg->dwBufferCount + 1; - WSABUF *const bufs = (struct _WSABUF *)_alloca(sizeof(*msg->lpBuffers) * nbufs); + DWORD const nbufs = msg->dwBufferCount + 1; + WSABUF *const bufs = (WSABUF *)_alloca(sizeof(*msg->lpBuffers) * nbufs); bufs[0].buf = (char *)&protocol_info; bufs[0].len = sizeof(protocol_info); memcpy(&bufs[1], msg->lpBuffers, msg->dwBufferCount * sizeof(*msg->lpBuffers)); - typedef INT PASCAL WSARecvMsg_t( - SOCKET s, LPWSAMSG lpMsg, LPDWORD lpNumberOfBytesRecvd, - LPWSAOVERLAPPED lpOverlapped, - LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine); - WSARecvMsg_t *WSARecvMsg = NULL; DWORD nb; - GUID wsaid_WSARecvMsg = { - 0xf689d7c8, 0x6f1f, 0x436b, {0x8a, 0x53, 0xe5, 0x4f, 0xe3, 0x51, 0xc3, 0x22}}; - result = - WSAIoctl(sockfd, SIO_GET_EXTENSION_FUNCTION_POINTER, &wsaid_WSARecvMsg, - sizeof(wsaid_WSARecvMsg), &WSARecvMsg, sizeof(WSARecvMsg), &nb, NULL, 0); - if (result == 0) { - result = (*WSARecvMsg)(sockfd, msg, &nb, NULL, NULL) == 0 - ? (ssize_t)(nb - sizeof(protocol_info)) - : 0; - } - if (result == 0) { + DWORD dwFlags = flags; + result = WSARecv(sockfd, bufs, nbufs, &nb, &dwFlags, NULL, NULL) == 0 + ? (ssize_t)(nb - sizeof(protocol_info)) + : -1; + if (result != -1) { int *const pfd = (int *)CMSG_DATA(header); if (protocol_info.iSocketType == 0 && protocol_info.iProtocol == 0) { *pfd = *(int *)&protocol_info; } else { *pfd = WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, - &protocol_info, 0, 0); + &protocol_info, 0, WSA_FLAG_OVERLAPPED); } + header->cmsg_len = CMSG_LEN(sizeof(*pfd)); header->cmsg_level = SOL_SOCKET; header->cmsg_type = SCM_RIGHTS; } - *msg = old_msg; } return result; } diff --git a/src/shims/windows/socketpair.cc b/src/shims/windows/socketpair.cc index 15ad0a3ff..2fe46b000 100644 --- a/src/shims/windows/socketpair.cc +++ b/src/shims/windows/socketpair.cc @@ -108,7 +108,7 @@ int dumb_socketpair(SOCKET socks[2]) { if (listen(listener, 1) == SOCKET_ERROR) break; - socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + socks[0] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (socks[0] == -1) break; if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break;