From b4030cdbbec071959de44c6468dea4481e78b744 Mon Sep 17 00:00:00 2001 From: mehrdadn Date: Mon, 23 Mar 2020 21:08:25 -0700 Subject: [PATCH] File HANDLE/descriptor translation layer for Windows (#7657) * Use TCP sockets on Windows with custom HANDLE <-> FD translation layer * Get Plasma working on Windows Co-authored-by: Mehrdad --- bazel/BUILD.plasma | 15 +- bazel/BUILD.redis | 9 +- bazel/ray_deps_setup.bzl | 7 +- python/ray/node.py | 25 +- src/ray/gcs/asio.cc | 8 +- .../object_store_notification_manager.cc | 11 +- src/ray/raylet/client_connection_test.cc | 3 +- src/shims/windows/msg.cc | 22 +- src/shims/windows/strings.h | 4 +- src/shims/windows/sys/mman.h | 39 -- src/shims/windows/sys/socket.h | 16 +- src/shims/windows/win32fd.cc | 538 ++++++++++++++++++ src/shims/windows/win32fd.h | 100 ++++ .../patches/arrow-windows-dlmalloc.patch | 45 +- .../patches/arrow-windows-nonstdc.patch | 17 + thirdparty/patches/arrow-windows-poll.patch | 55 -- thirdparty/patches/arrow-windows-socket.patch | 138 ++++- thirdparty/patches/arrow-windows-tcp.patch | 190 +++++++ thirdparty/patches/hiredis-casts.patch | 34 -- thirdparty/patches/hiredis-windows-poll.patch | 30 - .../patches/hiredis-windows-sockets.patch | 13 +- .../hiredis-windows-translations.patch | 102 ---- thirdparty/patches/redis-windows-poll.patch | 55 -- 23 files changed, 1092 insertions(+), 384 deletions(-) delete mode 100644 src/shims/windows/sys/mman.h create mode 100644 src/shims/windows/win32fd.cc create mode 100644 src/shims/windows/win32fd.h create mode 100644 thirdparty/patches/arrow-windows-nonstdc.patch delete mode 100644 thirdparty/patches/arrow-windows-poll.patch create mode 100644 thirdparty/patches/arrow-windows-tcp.patch delete mode 100644 thirdparty/patches/hiredis-casts.patch delete mode 100644 thirdparty/patches/hiredis-windows-poll.patch delete mode 100644 thirdparty/patches/hiredis-windows-translations.patch delete mode 100644 thirdparty/patches/redis-windows-poll.patch diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index 3d8091014..b97be9439 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -5,6 +5,8 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"] COPTS = [] + select({ "@bazel_tools//src/conditions:windows": [ + "-D" + "WIN32_REPLACE_FD_APIS", + "/FI" + "win32fd.h", ] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES], "//conditions:default": [ "-DARROW_USE_GLOG", @@ -88,6 +90,7 @@ cc_library( linkopts = LINKOPTS, strip_include_prefix = "cpp/src", deps = [ + "@//:platform_shims", "@boost//:filesystem", "@com_github_google_glog//:glog", ], @@ -129,8 +132,8 @@ cc_library( ":arrow", ":common_fbs", ":plasma_fbs", + "@//:platform_shims", "@com_github_google_glog//:glog", - "@com_github_ray_project_ray//:platform_shims", ], ) @@ -198,7 +201,7 @@ cc_library( strip_include_prefix = "cpp/src", visibility = ["//visibility:public"], deps = [ - "@com_github_ray_project_ray//:platform_shims", + "@//:platform_shims", ], ) @@ -227,8 +230,8 @@ cc_library( deps = [ ":ae", ":plasma_client", + "@//:platform_shims", "@com_github_google_glog//:glog", - "@com_github_ray_project_ray//:platform_shims", ], ) @@ -237,8 +240,12 @@ cc_binary( srcs = [ "cpp/src/plasma/store.cc", ], + copts = COPTS, visibility = ["//visibility:public"], - deps = [":plasma_lib"], + deps = [ + ":plasma_lib", + "@//:platform_shims", + ], ) FLATC_ARGS = [ diff --git a/bazel/BUILD.redis b/bazel/BUILD.redis index f93473eea..6f84f7ab9 100644 --- a/bazel/BUILD.redis +++ b/bazel/BUILD.redis @@ -45,7 +45,14 @@ cc_library( "deps/hiredis/*.h", "deps/hiredis/adapters/*.h", ]), - copts = COPTS, + copts = COPTS + select({ + "@bazel_tools//src/conditions:windows": [ + "-D" + "WIN32_REPLACE_FD_APIS", + "/FI" + "win32fd.h", + ], + "//conditions:default": [ + ], + }), includes = ["deps/hiredis"], strip_include_prefix = "deps", deps = [ diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index ab359718d..3769e03df 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -78,14 +78,10 @@ def ray_deps_setup(): url = "https://github.com/antirez/redis/archive/5.0.3.tar.gz", sha256 = "7084e8bd9e5dedf2dbb2a1e1d862d0c46e66cc0872654bdc677f4470d28d84c5", patches = [ - "//thirdparty/patches:hiredis-casts.patch", "//thirdparty/patches:hiredis-connect-rename.patch", "//thirdparty/patches:hiredis-windows-sigpipe.patch", "//thirdparty/patches:hiredis-windows-sockets.patch", "//thirdparty/patches:hiredis-windows-strerror.patch", - "//thirdparty/patches:hiredis-windows-poll.patch", - "//thirdparty/patches:hiredis-windows-translations.patch", - "//thirdparty/patches:redis-windows-poll.patch", ], ) @@ -171,10 +167,11 @@ def ray_deps_setup(): patches = [ "//thirdparty/patches:arrow-headers-unused.patch", "//thirdparty/patches:arrow-windows-export.patch", - "//thirdparty/patches:arrow-windows-poll.patch", + "//thirdparty/patches:arrow-windows-nonstdc.patch", "//thirdparty/patches:arrow-windows-sigpipe.patch", "//thirdparty/patches:arrow-windows-socket.patch", "//thirdparty/patches:arrow-windows-dlmalloc.patch", + "//thirdparty/patches:arrow-windows-tcp.patch", ], ) diff --git a/python/ray/node.py b/python/ray/node.py index f1ea737de..06143bcf7 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -89,6 +89,7 @@ class Node: "workers/default_worker.py")) self._resource_spec = None + self._localhost = socket.gethostbyname("localhost") self._ray_params = ray_params self._redis_address = ray_params.redis_address self._config = ray_params._internal_config @@ -396,15 +397,21 @@ class Node: Args: socket_path (string): the socket file to prepare. """ - if socket_path is not None: - if os.path.exists(socket_path): - raise RuntimeError( - "Socket file {} exists!".format(socket_path)) - socket_dir = os.path.dirname(socket_path) - try_to_create_directory(socket_dir) - return socket_path - return self._make_inc_temp( - prefix=default_prefix, directory_name=self._sockets_dir) + result = socket_path + if sys.platform == "win32": + if socket_path is None: + result = "tcp://{}:{}".format(self._localhost, + self._get_unused_port()) + else: + if socket_path is None: + result = self._make_inc_temp( + prefix=default_prefix, directory_name=self._sockets_dir) + else: + if os.path.exists(socket_path): + raise RuntimeError( + "Socket file {} exists!".format(socket_path)) + try_to_create_directory(os.path.dirname(socket_path)) + return result def start_reaper_process(self): """ diff --git a/src/ray/gcs/asio.cc b/src/ray/gcs/asio.cc index 2ff3f82a4..f22165fd9 100644 --- a/src/ray/gcs/asio.cc +++ b/src/ray/gcs/asio.cc @@ -14,12 +14,12 @@ #include "asio.h" -#include "ray/util/logging.h" - #ifdef _WIN32 -#include +#include #endif +#include "ray/util/logging.h" + RedisAsioClient::RedisAsioClient(boost::asio::io_service &io_service, ray::gcs::RedisAsyncContext &redis_async_context) : redis_async_context_(redis_async_context), @@ -37,7 +37,7 @@ RedisAsioClient::RedisAsioClient(boost::asio::io_service &io_service, #ifdef _WIN32 SOCKET sock = SOCKET_ERROR; WSAPROTOCOL_INFO pi; - if (WSADuplicateSocket(_get_osfhandle(c->fd), GetCurrentProcessId(), &pi) == 0) { + if (WSADuplicateSocket(fh_get(c->fd), GetCurrentProcessId(), &pi) == 0) { DWORD flag = WSA_FLAG_OVERLAPPED; sock = WSASocket(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, flag); } diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 9615f7453..d36abfba6 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -25,6 +25,10 @@ #include "ray/object_manager/object_store_notification_manager.h" #include "ray/util/util.h" +#ifdef _WIN32 +#include +#endif + namespace ray { ObjectStoreNotificationManager::ObjectStoreNotificationManager( @@ -38,10 +42,11 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( exit_on_error_(exit_on_error) { RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket_name.c_str(), "", 0, 300)); - int c_socket; // TODO(mehrdadn): This should be type SOCKET for Windows - RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket)); + int fd; + RAY_ARROW_CHECK_OK(store_client_.Subscribe(&fd)); boost::system::error_code ec; #ifdef _WIN32 + boost::asio::detail::socket_type c_socket = fh_release(fd); WSAPROTOCOL_INFO pi; size_t n = sizeof(pi); char *p = reinterpret_cast(&pi); @@ -63,7 +68,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( } } #else - socket_.assign(local_stream_protocol(), c_socket, ec); + socket_.assign(local_stream_protocol(), fd, ec); #endif RAY_CHECK(!ec); NotificationWait(); diff --git a/src/ray/raylet/client_connection_test.cc b/src/ray/raylet/client_connection_test.cc index 75631d592..a6d1d18d4 100644 --- a/src/ray/raylet/client_connection_test.cc +++ b/src/ray/raylet/client_connection_test.cc @@ -37,7 +37,8 @@ class ClientConnectionTest : public ::testing::Test { #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) boost::asio::local::connect_pair(in_, out_); #else - int pair[2] = {}; // TODO(mehrdadn): This should be type SOCKET for Windows + boost::asio::detail::socket_type pair[2] = {boost::asio::detail::invalid_socket, + boost::asio::detail::invalid_socket}; RAY_CHECK(socketpair(AF_INET, SOCK_STREAM, 0, pair) == 0); in_.assign(local_stream_protocol::v4(), pair[0]); out_.assign(local_stream_protocol::v4(), pair[1]); diff --git a/src/shims/windows/msg.cc b/src/shims/windows/msg.cc index 08b8492ca..d63c89517 100644 --- a/src/shims/windows/msg.cc +++ b/src/shims/windows/msg.cc @@ -8,14 +8,14 @@ #pragma comment(lib, "IPHlpAPI.lib") -int socketpair(int domain, int type, int protocol, int sv[2]) { +int socketpair(int domain, int type, int protocol, SOCKET sv[2]) { if ((domain != AF_UNIX && domain != AF_INET) || type != SOCK_STREAM) { return -1; } SOCKET sockets[2]; int r = dumb_socketpair(sockets); - sv[0] = (int)sockets[0]; - sv[1] = (int)sockets[1]; + sv[0] = sockets[0]; + sv[1] = sockets[1]; return r; } @@ -63,7 +63,7 @@ static DWORD getsockpid(SOCKET client) { return pid; } -ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { +ssize_t sendmsg(SOCKET sock, struct msghdr *msg, int flags) { ssize_t result = -1; struct cmsghdr *header = CMSG_FIRSTHDR(msg); if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) { @@ -76,11 +76,11 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { * that the child process closes AND that its process ID is reassigned * to another existing process. */ - int *const pfd = (int *)CMSG_DATA(header); + SOCKET *const pfd = (SOCKET *)CMSG_DATA(header); WSAPROTOCOL_INFO protocol_info = {0}; /* assume socket if it's a pipe, until proven otherwise */ BOOL is_socket = GetFileType((HANDLE)(SOCKET)(*pfd)) == FILE_TYPE_PIPE; - DWORD const target_pid = getsockpid(sockfd); + DWORD const target_pid = getsockpid(sock); HANDLE target_process = NULL; if (target_pid) { if (is_socket) { @@ -107,7 +107,7 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { bufs[0].len = sizeof(protocol_info); memcpy(&bufs[1], msg->lpBuffers, msg->dwBufferCount * sizeof(*msg->lpBuffers)); DWORD nb; - result = WSASend(sockfd, bufs, nbufs, &nb, flags, NULL, NULL) == 0 + result = WSASend(sock, bufs, nbufs, &nb, flags, NULL, NULL) == 0 ? (ssize_t)(nb - sizeof(protocol_info)) : -1; } @@ -126,7 +126,7 @@ ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags) { return result; } -ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) { +ssize_t recvmsg(SOCKET sock, struct msghdr *msg, int flags) { int result = -1; struct cmsghdr *header = CMSG_FIRSTHDR(msg); if (msg->msg_controllen && flags == 0 /* We can't send flags on Windows... */) { @@ -138,13 +138,13 @@ ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) { memcpy(&bufs[1], msg->lpBuffers, msg->dwBufferCount * sizeof(*msg->lpBuffers)); DWORD nb; DWORD dwFlags = flags; - result = WSARecv(sockfd, bufs, nbufs, &nb, &dwFlags, NULL, NULL) == 0 + result = WSARecv(sock, bufs, nbufs, &nb, &dwFlags, NULL, NULL) == 0 ? (ssize_t)(nb - sizeof(protocol_info)) : -1; if (result != -1) { - int *const pfd = (int *)CMSG_DATA(header); + SOCKET *const pfd = (SOCKET *)CMSG_DATA(header); if (protocol_info.iSocketType == 0 && protocol_info.iProtocol == 0) { - *pfd = *(int *)&protocol_info; + *pfd = *(SOCKET *)&protocol_info; } else { *pfd = WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &protocol_info, 0, WSA_FLAG_OVERLAPPED); diff --git a/src/shims/windows/strings.h b/src/shims/windows/strings.h index 03c885a19..3cdc86bb6 100644 --- a/src/shims/windows/strings.h +++ b/src/shims/windows/strings.h @@ -3,10 +3,10 @@ #include -static int strcasecmp(const char *s1, const char *s2) { return stricmp(s1, s2); } +static int strcasecmp(const char *s1, const char *s2) { return _stricmp(s1, s2); } static int strncasecmp(const char *s1, const char *s2, size_t n) { - return strnicmp(s1, s2, n); + return _strnicmp(s1, s2, n); } #endif /* STRINGS_H */ diff --git a/src/shims/windows/sys/mman.h b/src/shims/windows/sys/mman.h deleted file mode 100644 index 5cf6ae8db..000000000 --- a/src/shims/windows/sys/mman.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef MMAN_H -#define MMAN_H - -#include - -#define MAP_SHARED 0x0010 /* share changes */ -#define MAP_FAILED ((void *)-1) -#define PROT_READ 0x04 /* pages can be read */ -#define PROT_WRITE 0x02 /* pages can be written */ -#define PROT_EXEC 0x01 /* pages can be executed */ -#ifndef FILE_MAP_ALL_ACCESS -enum { FILE_MAP_ALL_ACCESS = 0xF001F }; -#endif -EXTERN_C WINBASEAPI void *WINAPI MapViewOfFile(HANDLE hFileMappingObject, - DWORD dwDesiredAccess, - DWORD dwFileOffsetHigh, - DWORD dwFileOffsetLow, - SIZE_T dwNumberOfBytesToMap); -EXTERN_C WINBASEAPI BOOL WINAPI UnmapViewOfFile(void const *lpBaseAddress); -static void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t off) { - void *result = (void *)(-1); - if (!addr && (flags & MAP_SHARED)) { - /* HACK: we're assuming handle sizes can't exceed 32 bits, which is wrong... - * but works for now. */ - void *ptr = MapViewOfFile((HANDLE)(intptr_t)fd, FILE_MAP_ALL_ACCESS, - (DWORD)(off >> (CHAR_BIT * sizeof(DWORD))), (DWORD)off, - (SIZE_T)len); - if (ptr) { - result = ptr; - } - } - return result; -} -static int munmap(void *addr, size_t length) { - (void)length; - return UnmapViewOfFile(addr) ? 0 : -1; -} - -#endif /* MMAN_H */ diff --git a/src/shims/windows/sys/socket.h b/src/shims/windows/sys/socket.h index ba2dea79b..d6a44722d 100644 --- a/src/shims/windows/sys/socket.h +++ b/src/shims/windows/sys/socket.h @@ -29,18 +29,8 @@ typedef unsigned short sa_family_t; #define msg_flags dwFlags int dumb_socketpair(SOCKET socks[2]); -ssize_t sendmsg(int sockfd, struct msghdr *msg, int flags); -ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); -int socketpair(int domain, int type, int protocol, int sv[2]); - -#ifdef __cplusplus -namespace { -inline int send(SOCKET s, const void *buf, int len, int flags) { - // Call the const char* overload version - int (*psend)(SOCKET s, const char *buf, int len, int flags) = ::send; - return (*psend)(s, (const char *)buf, len, flags); -} -} // namespace -#endif +ssize_t sendmsg(SOCKET sockfd, struct msghdr *msg, int flags); +ssize_t recvmsg(SOCKET sockfd, struct msghdr *msg, int flags); +int socketpair(int domain, int type, int protocol, SOCKET sv[2]); #endif /* SOCKET_H */ diff --git a/src/shims/windows/win32fd.cc b/src/shims/windows/win32fd.cc new file mode 100644 index 000000000..726008d56 --- /dev/null +++ b/src/shims/windows/win32fd.cc @@ -0,0 +1,538 @@ +#undef WIN32_REPLACE_FD_APIS +#define WIN32_REPLACE_FD_APIS 0 // make sure we don't replace the APIs for our own calls +#include "win32fd.h" + +#include +#include +#include + +#ifndef _WINSOCKAPI_ +#include +#endif + +#ifndef __cplusplus +extern "C" { +#endif + +enum { fh_prefer_fallback = 1 }; +static const char fh_magic_prefix[] = "Win32FD"; +static const size_t fh_magic_size = sizeof(fh_magic_prefix) + sizeof(void *); + +static void fh_get_magic(unsigned char magic[fh_magic_size]) { + memcpy(magic, fh_magic_prefix, sizeof(fh_magic_prefix)); + const void *const suffix = GetModuleHandle(NULL); + memcpy(magic + sizeof(fh_magic_prefix), &suffix, sizeof(suffix)); +} + +struct fh_payload_t { + intptr_t handle; + unsigned char magic[fh_magic_size]; +}; + +static int _fh_close(intptr_t handle) { + int result = closesocket(handle); + if (result != 0) { + int error = WSAGetLastError(); + if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { + if (CloseHandle(reinterpret_cast(handle))) { + result = 0; + } else { + _set_errno(EBADF); + } + } else { + _set_errno(EINVAL); + } + } + return result; +} + +static int _fh_is_socket(intptr_t handle) { + int result = 1; + SOCKADDR_STORAGE addr; + int addrlen = sizeof(addr); + if (getsockname(handle, reinterpret_cast(&addr), &addrlen) != 0) { + int error = WSAGetLastError(); + if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { + result = 0; + } + } + return result; +} + +int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen) { + int result = -1; + intptr_t handle = fh_get(sockpfd); + intptr_t accepted = WSAAccept(handle, name, namelen, NULL, NULL); + if (accepted != -1) { + result = fh_open(accepted, -1); + if (result == -1) { + closesocket(accepted); + accepted = -1; + } + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen) { + intptr_t handle = fh_get(sockpfd); + int result = bind(handle, name, namelen); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_close(int pfd) { + intptr_t handle = fh_get(pfd); + int result = _close(pfd); + if (result == 0 && handle != -1) { + result = _fh_close(handle); + } + return result; +} + +int fh_connect(int sockpfd, const struct sockaddr *name, socklen_t namelen) { + intptr_t handle = fh_get(sockpfd); + int result = WSAConnect(handle, name, namelen, NULL, NULL, NULL, NULL); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + int error = WSAGetLastError(); + switch (error) { + case WSAEINVAL: + case WSAEWOULDBLOCK: + case WSA_IO_PENDING: + error = WSAEINPROGRESS; // for Redis + break; + } + _set_errno(error); + } + return result; +} + +int fh_dup(int pfd) { + int result = -1; + intptr_t handle = fh_get(pfd); + if (fh_prefer_fallback && handle == -1) { + // Not our FD; fall back to default + result = _dup(pfd); + } else { + intptr_t duped = -1; + WSAPROTOCOL_INFO pi; + if (WSADuplicateSocket(handle, GetCurrentProcessId(), &pi) == 0) { + duped = WSASocket(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, 0); + if (duped == -1) { + _set_errno(EINVAL); + } + } else { + int error = WSAGetLastError(); + if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { + if (!DuplicateHandle(GetCurrentProcess(), reinterpret_cast(handle), + GetCurrentProcess(), reinterpret_cast(&duped), 0, + FALSE, DUPLICATE_SAME_ACCESS)) { + duped = -1; + _set_errno(EBADF); + } + } else { + _set_errno(EBADF); + } + } + if (duped != -1) { + result = fh_open(duped, -1); + if (result == -1) { + _fh_close(duped); + } + } + } + return result; +} + +intptr_t fh_get(int pfd) { + fh_payload_t payload = {-1}; + HANDLE pipe = reinterpret_cast(_get_osfhandle(pfd)); + DWORD size = sizeof(payload), nbytes, avail, left; + if (pipe != INVALID_HANDLE_VALUE) { + if (GetFileType(pipe) != FILE_TYPE_PIPE) { + _set_errno(EBADF); // if this triggers, you passed an invalid or incompatible FD + } else if (PeekNamedPipe(pipe, &payload, size, &nbytes, &avail, &left)) { + if (avail != size) { + payload.handle = -1; + _set_errno(EIO); // if this triggers, you accidentally wrote to the FD directly + } else { + unsigned char expected[fh_magic_size]; + fh_get_magic(expected); + if (memcmp(payload.magic, expected, sizeof(expected)) != 0) { + payload.handle = -1; + _set_errno(EBADF); // if this triggers, this isn't a recognized FD + } + } + } else { + _set_errno(EIO); // if this triggers, you accidentally read from the FD directly + } + } else { + _set_errno(EBADF); + } + return payload.handle; +} + +int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen) { + intptr_t handle = fh_get(sockpfd); + int result = getpeername(handle, name, namelen); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen) { + intptr_t handle = fh_get(sockpfd); + int result = getsockname(handle, name, namelen); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len) { + intptr_t handle = fh_get(sockpfd); + int result = getsockopt(handle, level, name, static_cast(val), len); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp) { + intptr_t handle = fh_get(sockpfd); + int result = ioctlsocket(handle, cmd, argp); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_listen(int sockpfd, int backlog) { + intptr_t handle = fh_get(sockpfd); + int result = listen(handle, backlog); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_open(intptr_t handle, int mode) { + int pfd = -1; + HANDLE r, w; + if (mode == -1) { + mode = _O_RDWR; + } + if (!(mode & _O_TEXT)) { + mode |= _O_BINARY; + } + if (handle == -1 || handle == 0) { + _set_errno(EBADF); + } else if (fh_prefer_fallback && + GetFileType(reinterpret_cast(handle)) != FILE_TYPE_UNKNOWN && + !_fh_is_socket(handle)) { + // Already compatible with CRT FDs, so just fall back + pfd = _open_osfhandle(handle, mode); + } else { + fh_payload_t payload = {handle}; + fh_get_magic(payload.magic); + DWORD size = sizeof(payload), nbytes; + if (CreatePipe(&r, &w, NULL, size)) { + if (WriteFile(w, &payload, size, &nbytes, NULL) && size == nbytes) { + pfd = _open_osfhandle(reinterpret_cast(r), mode); + if (pfd == -1) { + CloseHandle(r); + } + } else { + _set_errno(EIO); + } + CloseHandle(w); + } else { + _set_errno(EMFILE); + } + } + return pfd; +} + +int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout) { + struct fd_sets_t { + fd_set rfds, wfds, efds; + } fdsets; + int maxfd = -1; + struct fd_sets_t *pfdsets = + nfds <= FD_SETSIZE ? &fdsets + : static_cast(operator new( + (offsetof(fd_set, fd_array) + nfds * sizeof(SOCKET)) * + (sizeof(struct fd_sets_t) / sizeof(fd_set)))); + FD_ZERO(&pfdsets->rfds); + FD_ZERO(&pfdsets->wfds); + FD_ZERO(&pfdsets->efds); + for (nfds_t i = 0; i < nfds; ++i) { + if (fds[i].events & POLLIN) { + FD_SET(fds[i].fd, &pfdsets->rfds); + } + if (fds[i].events & POLLOUT) { + FD_SET(fds[i].fd, &pfdsets->wfds); + } + if (fds[i].events & POLLERR) { + FD_SET(fds[i].fd, &pfdsets->efds); + } + if (maxfd < fds[i].fd) { + maxfd = static_cast(fds[i].fd); + } + } + int msec = timeout; + struct timeval tv = {msec >= 0 ? msec / 1000 : 0, msec >= 0 ? (msec % 1000) * 1000 : 0}; + int result = fh_select(maxfd + 1, &pfdsets->rfds, &pfdsets->wfds, &pfdsets->efds, + msec >= 0 ? &tv : NULL); + if (result >= 0) { + result = 0; + for (nfds_t i = 0; i < nfds; ++i) { + fds[i].revents = 0; + if (FD_ISSET(fds[i].fd, &pfdsets->rfds)) { + fds[i].revents |= POLLIN; + } + if (FD_ISSET(fds[i].fd, &pfdsets->wfds)) { + fds[i].revents |= POLLOUT; + } + if (FD_ISSET(fds[i].fd, &pfdsets->efds)) { + fds[i].revents |= POLLERR; + } + if (fds[i].revents) { + ++result; + } + } + } + if (pfdsets != &fdsets) { + operator delete(pfdsets); + } + return result; +} + +ssize_t fh_read(int pfd, void *buffer, size_t size) { + ssize_t result = -1; + intptr_t handle = fh_get(pfd); + if (size >= INT_MAX) { + size = INT_MAX; + } + if (fh_prefer_fallback && handle == -1) { + // Not our FD; fall back to default + result = _read(pfd, buffer, static_cast(size)); + } else { + WSABUF buf = {static_cast(size), static_cast(buffer)}; + DWORD nbytes; + DWORD flags = 0; + if (WSARecv(handle, &buf, 1, &nbytes, &flags, NULL, NULL) == 0) { + result = static_cast(nbytes); + } else { + int error = WSAGetLastError(); + if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { + if (ReadFile(reinterpret_cast(handle), buffer, static_cast(size), + &nbytes, NULL)) { + result = static_cast(nbytes); + } else { + _set_errno(EINVAL); + } + } else { + if (error == WSAEWOULDBLOCK) { + error = EAGAIN; // for Redis + } + _set_errno(error); + } + } + } + return result; +} + +ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags) { + ssize_t result = -1; + if (size >= INT_MAX) { + size = INT_MAX; + } + intptr_t handle = fh_get(sockpfd); + WSABUF buf = {static_cast(size), static_cast(buffer)}; + DWORD nbytes; + DWORD dwflags = static_cast(flags); + if (WSARecv(handle, &buf, 1, &nbytes, &dwflags, NULL, NULL) == 0) { + result = static_cast(nbytes); + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +intptr_t fh_release(int pfd) { + intptr_t handle = fh_get(pfd); + if (handle != -1) { + if (_close(pfd) != 0) { + handle = -1; + } + } + return handle; +} + +int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, + struct timeval *timeout) { + int result = 0; + fd_set rfds, wfds, efds; + struct { + fd_set *src, *dst; + } translations[] = { + {readfds, &rfds}, + {writefds, &wfds}, + {errorfds, &efds}, + }; + for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) { + fd_set *src = translations[i].src, *dst = translations[i].dst; + if (src) { + for (size_t j = 0; j < src->fd_count; ++j) { + int sockpfd = static_cast(src->fd_array[j]); + intptr_t handle = fh_get(sockpfd); + if (handle == -1) { + result = -1; + _set_errno(EBADF); + } + dst->fd_array[j] = handle; + } + dst->fd_count = src->fd_count; + } + } + if (result != -1) { + result = select(nfds, readfds ? &rfds : NULL, writefds ? &wfds : NULL, + errorfds ? &efds : NULL, timeout); + if (result != -1) { + _set_errno(WSAGetLastError()); + } + } + if (result >= 0) { + for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) { + fd_set *src = translations[i].src, *dst = translations[i].dst; + if (src) { + fd_set out; + FD_ZERO(&out); + for (size_t j = 0; j < src->fd_count; ++j) { + int sockpfd = static_cast(src->fd_array[j]); + intptr_t handle = fh_get(sockpfd); + if (FD_ISSET(handle, dst)) { + FD_SET(sockpfd, &out); + } + } + memcpy(src->fd_array, out.fd_array, out.fd_count * sizeof(*out.fd_array)); + src->fd_count = out.fd_count; + } + } + } + return result; +} + +ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags) { + ssize_t result = -1; + if (size >= INT_MAX) { + size = INT_MAX; + } + intptr_t handle = fh_get(sockpfd); + WSABUF buf = {static_cast(size), + static_cast(const_cast(buffer))}; + DWORD nbytes; + if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) { + result = static_cast(nbytes); + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len) { + intptr_t handle = fh_get(sockpfd); + int result = setsockopt(handle, level, name, static_cast(val), len); + if (result == 0) { + } else if (handle == -1) { + _set_errno(EBADF); + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +int fh_socket(int domain, int type, int protocol) { + int result = -1; + DWORD flags = WSA_FLAG_OVERLAPPED; // because socket() is overlapped by default + intptr_t handle = WSASocket(domain, type, protocol, NULL, 0, flags); + if (handle != -1) { + result = fh_open(handle, -1); + if (result == -1) { + closesocket(handle); + handle = -1; + } + } else { + _set_errno(WSAGetLastError()); + } + return result; +} + +ssize_t fh_write(int pfd, const void *buffer, size_t size) { + ssize_t result = -1; + ssize_t handle = fh_get(pfd); + if (size >= INT_MAX) { + size = INT_MAX; + } + if (fh_prefer_fallback && handle == -1) { + // Not our FD; fall back to default + result = _write(pfd, buffer, static_cast(size)); + } else { + WSABUF buf = {static_cast(size), + static_cast(const_cast(buffer))}; + DWORD nbytes; + DWORD flags = 0; + if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) { + result = static_cast(nbytes); + } else { + int error = WSAGetLastError(); + if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { + if (WriteFile(reinterpret_cast(handle), buffer, static_cast(size), + &nbytes, NULL)) { + result = static_cast(nbytes); + } else { + _set_errno(EINVAL); + } + } else { + _set_errno(WSAGetLastError()); + } + } + } + return result; +} + +#ifndef __cplusplus +} +#endif diff --git a/src/shims/windows/win32fd.h b/src/shims/windows/win32fd.h new file mode 100644 index 000000000..e643b16aa --- /dev/null +++ b/src/shims/windows/win32fd.h @@ -0,0 +1,100 @@ +#ifndef WIN32_FD_H +#define WIN32_FD_H + +#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS +#ifndef _CRT_DECLARE_NONSTDC_NAMES +#define _CRT_DECLARE_NONSTDC_NAMES 0 +#endif +#endif + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef _WIN32 +#include +#undef ECONNRESET +#undef EINPROGRESS +#undef ETIMEDOUT + +#include +#include + +enum { + ECONNRESET = WSAECONNRESET, + EINPROGRESS = WSAEINPROGRESS, + ETIMEDOUT = WSAETIMEDOUT, +}; +typedef ptrdiff_t ssize_t; +typedef unsigned long int nfds_t; +#endif + +int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen); +int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen); +int fh_close(int pfd); +int fh_connect(int socket, const struct sockaddr *name, socklen_t namelen); +int fh_dup(int pfd); + +/// Retrieves the underlying file handle for the given pseudo-file descriptor. +intptr_t fh_get(int pfd); + +int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen); +int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen); +int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len); +int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp); +int fh_listen(int sockpfd, int backlog); + +/// Opens a pseudo-file descriptor around the given file handle. Sockets are supported. +/// However, this is only intended for storing a HANDLE as an integer. +/// The file descriptor must not be used with external file I/O functions. +int fh_open(intptr_t handle, int mode); + +int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout); +ssize_t fh_read(int pfd, void *buffer, size_t size); + +/// Releases the underlying file handle for the given pseudo-file descriptor and then +/// closes the pseudo-file descriptor. +intptr_t fh_release(int pfd); + +ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags); +int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, + struct timeval *timeout); +ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags); +int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len); +int fh_socket(int domain, int type, int protocol); +ssize_t fh_write(int pfd, const void *buffer, size_t size); + +#ifdef __cplusplus +} +#endif + +#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS +#define accept fh_accept +#define bind fh_bind +static int close(int pfd) { return fh_close(pfd); } +#define connect fh_connect +static int dup(int pfd) { return fh_dup(pfd); } +#define getsockopt fh_getsockopt +#define ioctlsocket fh_ioctlsocket +#define listen fh_listen +static int open(intptr_t handle, int mode) { return fh_open(handle, mode); } +#define poll fh_poll +static ssize_t read(int pfd, void *buffer, size_t size) { + return fh_read(pfd, buffer, size); +} +#define recv fh_recv +#define recvfrom fh_recvfrom +#define select fh_select +#define socket fh_socket +#define send fh_send +#define sendto fh_sendto +#define setsockopt fh_setsockopt +static ssize_t write(int pfd, const void *buffer, size_t size) { + return fh_write(pfd, buffer, size); +} +#endif + +#endif diff --git a/thirdparty/patches/arrow-windows-dlmalloc.patch b/thirdparty/patches/arrow-windows-dlmalloc.patch index e5d9b166f..ed4e07bde 100644 --- a/thirdparty/patches/arrow-windows-dlmalloc.patch +++ b/thirdparty/patches/arrow-windows-dlmalloc.patch @@ -1,7 +1,20 @@ +diff --git cpp/src/plasma/malloc.cc cpp/src/plasma/malloc.cc +--- cpp/src/plasma/malloc.cc ++++ cpp/src/plasma/malloc.cc +@@ -26,2 +26,0 @@ +-#include +-#include diff --git cpp/src/plasma/dlmalloc.cc cpp/src/plasma/dlmalloc.cc --- cpp/src/plasma/dlmalloc.cc +++ cpp/src/plasma/dlmalloc.cc -@@ -76,5 +76,8 @@ int create_buffer(int64_t size) { +@@ -25,2 +25,6 @@ ++#ifdef _WIN32 ++#include ++#else + #include + #include ++#endif +@@ -76,5 +80,8 @@ int create_buffer(int64_t size) { - if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, - (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), - (DWORD)(uint64_t)size, NULL)) { @@ -9,8 +22,36 @@ diff --git cpp/src/plasma/dlmalloc.cc cpp/src/plasma/dlmalloc.cc + (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), + (DWORD)(uint64_t)size, NULL); + if (h) { -+ fd = reinterpret_cast(h); ++ fd = fh_open(reinterpret_cast(h), -1); + } else { fd = -1; } +@@ -119,9 +126,18 @@ ++ void* pointer; ++#ifdef _WIN32 ++ pointer = MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, size); ++ if (pointer == NULL) { ++ ARROW_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError(); ++ return reinterpret_cast(-1); ++ } ++#else +- void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); ++ pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (pointer == MAP_FAILED) { + ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); + if (errno == ENOMEM && plasma_config->hugepages_enabled) { + ARROW_LOG(ERROR) + << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; + } + return pointer; + } ++#endif +@@ -155,1 +169,6 @@ ++ int r; ++#ifdef _WIN32 ++ r = UnmapViewOfFile(addr) ? 0 : -1; ++#else +- int r = munmap(addr, size); ++ r = munmap(addr, size); ++#endif -- diff --git a/thirdparty/patches/arrow-windows-nonstdc.patch b/thirdparty/patches/arrow-windows-nonstdc.patch new file mode 100644 index 000000000..8ad88ed3c --- /dev/null +++ b/thirdparty/patches/arrow-windows-nonstdc.patch @@ -0,0 +1,17 @@ +diff --git cpp/src/arrow/io/mman.h cpp/src/arrow/io/mman.h +--- cpp/src/arrow/io/mman.h ++++ cpp/src/arrow/io/mman.h +@@ -3,0 +3,3 @@ ++#ifdef _WIN32 ++typedef long long off_t; ++#endif +diff --git cpp/src/plasma/protocol.cc cpp/src/plasma/protocol.cc +--- cpp/src/plasma/protocol.cc ++++ cpp/src/plasma/protocol.cc +@@ -760,1 +760,5 @@ ++#ifdef _WIN32 ++ *address = _strdup(message->address()->c_str()); ++#else + *address = strdup(message->address()->c_str()); ++#endif +-- diff --git a/thirdparty/patches/arrow-windows-poll.patch b/thirdparty/patches/arrow-windows-poll.patch deleted file mode 100644 index b7f0bdebc..000000000 --- a/thirdparty/patches/arrow-windows-poll.patch +++ /dev/null @@ -1,55 +0,0 @@ -diff --git cpp/src/plasma/thirdparty/ae/ae.c cpp/src/plasma/thirdparty/ae/ae.c ---- cpp/src/plasma/thirdparty/ae/ae.c -+++ cpp/src/plasma/thirdparty/ae/ae.c -@@ -428,20 +428,43 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) - /* Wait for milliseconds until the given file descriptor becomes - * writable/readable/exception */ - int aeWait(int fd, int mask, long long milliseconds) { -+ int retmask = 0, retval; -+ short revents = 0; -+#ifdef _WINSOCKAPI_ -+ fd_set rset, wset; -+ FD_ZERO(&rset); -+ FD_ZERO(&wset); -+ if (mask & AE_READABLE) { -+ FD_SET(fd, &rset); -+ } else if (mask & AE_WRITABLE) { -+ FD_SET(fd, &wset); -+ } -+ struct timeval tv = { milliseconds / 1000, (milliseconds % 1000) * 1000 }; -+ if ((retval = select(fd + 1, &rset, &wset, NULL, milliseconds >= 0 ? &tv : NULL)) > 0) { -+ if (FD_ISSET(fd, &rset)) { -+ revents |= POLLIN; -+ } -+ if (FD_ISSET(fd, &wset)) { -+ revents |= POLLOUT; -+ } -+ } -+#else - struct pollfd pfd; -- int retmask = 0, retval; - - memset(&pfd, 0, sizeof(pfd)); - pfd.fd = fd; - if (mask & AE_READABLE) pfd.events |= POLLIN; - if (mask & AE_WRITABLE) pfd.events |= POLLOUT; - -+ retval = poll(&pfd, 1, milliseconds); -+ revents = pfd.revents; -- if ((retval = poll(&pfd, 1, milliseconds))== 1) { -- if (pfd.revents & POLLIN) retmask |= AE_READABLE; -- if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; -- if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; -- if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; -+#endif -+ if (retval== 1) { -+ if (revents & POLLIN) retmask |= AE_READABLE; -+ if (revents & POLLOUT) retmask |= AE_WRITABLE; -+ if (revents & POLLERR) retmask |= AE_WRITABLE; -+ if (revents & POLLHUP) retmask |= AE_WRITABLE; - return retmask; - } else { - return retval; - } - } --- diff --git a/thirdparty/patches/arrow-windows-socket.patch b/thirdparty/patches/arrow-windows-socket.patch index 26d00d6a1..e0e8079ba 100644 --- a/thirdparty/patches/arrow-windows-socket.patch +++ b/thirdparty/patches/arrow-windows-socket.patch @@ -1,7 +1,48 @@ diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc --- cpp/src/plasma/client.cc +++ cpp/src/plasma/client.cc -@@ -985,3 +985,8 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { +@@ -28,1 +28,5 @@ ++#ifdef _WIN32 ++#include ++#else + #include ++#endif +@@ -33,1 +37,3 @@ ++#ifndef _WIN32 + #include ++#endif +@@ -178,6 +184,14 @@ ++#ifdef _WIN32 ++ pointer_ = reinterpret_cast(MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, length_)); ++ // TODO(pcm): Don't fail here, instead return a Status. ++ if (pointer_ == NULL) { ++ ARROW_LOG(FATAL) << "mmap failed"; ++ } ++#else + pointer_ = reinterpret_cast( + mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + // TODO(pcm): Don't fail here, instead return a Status. + if (pointer_ == MAP_FAILED) { + ARROW_LOG(FATAL) << "mmap failed"; + } ++#endif +@@ -189,1 +195,6 @@ ++ int r; ++#ifdef _WIN32 ++ r = UnmapViewOfFile(pointer_) ? 0 : -1; ++#else +- int r = munmap(pointer_, length_); ++ r = munmap(pointer_, length_); ++#endif +@@ -990,5 +1009,17 @@ ++#ifdef _WINSOCKAPI_ ++ SOCKET sockets[2] = { INVALID_SOCKET, INVALID_SOCKET }; ++ socketpair(AF_INET, SOCK_STREAM, 0, sockets); ++ sock[0] = fh_open(sockets[0], -1); ++ sock[1] = fh_open(sockets[1], -1); ++#else + socketpair(AF_UNIX, SOCK_STREAM, 0, sock); ++#endif // Make the socket non-blocking. +#ifdef _WINSOCKAPI_ + unsigned long value = 1; @@ -10,18 +51,109 @@ diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc int flags = fcntl(sock[1], F_GETFL, 0); ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0); +#endif + // Tell the Plasma store about the subscription. diff --git cpp/src/plasma/fling.cc cpp/src/plasma/fling.cc --- cpp/src/plasma/fling.cc +++ cpp/src/plasma/fling.cc -@@ -18,6 +18,10 @@ - +@@ -19,7 +19,14 @@ #include "arrow/util/logging.h" +#ifdef _WIN32 +#include // socklen_t ++#else ++typedef int SOCKET; +#endif + void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) { iov->iov_base = buf; iov->iov_len = 1; + ++ msg->msg_flags = 0; + msg->msg_iov = iov; +@@ -36,3 +43,8 @@ ++#ifdef _WIN32 ++ SOCKET to_send = fh_get(fd); ++#else ++ SOCKET to_send = fd; ++#endif +- char buf[CMSG_SPACE(sizeof(int))]; ++ char buf[CMSG_SPACE(sizeof(to_send))]; +- memset(&buf, 0, CMSG_SPACE(sizeof(int))); ++ memset(&buf, 0, sizeof(buf)); + +@@ -47,7 +59,12 @@ +- header->cmsg_len = CMSG_LEN(sizeof(int)); ++ header->cmsg_len = CMSG_LEN(sizeof(to_send)); +- memcpy(CMSG_DATA(header), reinterpret_cast(&fd), sizeof(int)); ++ memcpy(CMSG_DATA(header), reinterpret_cast(&to_send), sizeof(to_send)); + ++#ifdef _WIN32 ++ SOCKET sock = fh_get(conn); ++#else ++ SOCKET sock = conn; ++#endif + // Send file descriptor. + while (true) { +- ssize_t r = sendmsg(conn, &msg, 0); ++ ssize_t r = sendmsg(sock, &msg, 0); + if (r < 0) { +@@ -83,6 +100,11 @@ +- char buf[CMSG_SPACE(sizeof(int))]; ++ char buf[CMSG_SPACE(sizeof(SOCKET))]; + init_msg(&msg, &iov, buf, sizeof(buf)); + ++#ifdef _WIN32 ++ SOCKET sock = fh_get(conn); ++#else ++ int sock = conn; ++#endif + while (true) { +- ssize_t r = recvmsg(conn, &msg, 0); ++ ssize_t r = recvmsg(sock, &msg, 0); + if (r == -1) { +@@ -100,18 +122,22 @@ +- int found_fd = -1; ++ SOCKET found_fd = -1; + int oh_noes = 0; + for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL; + header = CMSG_NXTHDR(&msg, header)) + if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) { + ssize_t count = (header->cmsg_len - + (CMSG_DATA(header) - reinterpret_cast(header))) / +- sizeof(int); ++ sizeof(SOCKET); + for (int i = 0; i < count; ++i) { +- int fd = (reinterpret_cast(CMSG_DATA(header)))[i]; ++ SOCKET fd = (reinterpret_cast(CMSG_DATA(header)))[i]; + if (found_fd == -1) { + found_fd = fd; + } else { ++#ifdef _WIN32 ++ closesocket(fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast(fd))); ++#else + close(fd); ++#endif + oh_noes = 1; + } + } + } +@@ -122,8 +148,17 @@ + if (oh_noes) { ++#ifdef _WIN32 ++ closesocket(found_fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast(found_fd))); ++#else + close(found_fd); ++#endif + errno = EBADMSG; + return -1; + } + +- return found_fd; ++#ifdef _WIN32 ++ int to_receive = fh_open(found_fd, -1); ++#else ++ int to_receive = found_fd; ++#endif ++ return to_receive; + } -- diff --git a/thirdparty/patches/arrow-windows-tcp.patch b/thirdparty/patches/arrow-windows-tcp.patch new file mode 100644 index 000000000..98fa49c96 --- /dev/null +++ b/thirdparty/patches/arrow-windows-tcp.patch @@ -0,0 +1,190 @@ +diff --git cpp/src/plasma/io.h cpp/src/plasma/io.h +--- cpp/src/plasma/io.h ++++ cpp/src/plasma/io.h +@@ -30,2 +30,3 @@ + #include "arrow/status.h" ++#include "plasma/common.h" + #include "plasma/compat.h" +@@ -57,3 +58,1 @@ +-int BindIpcSock(const std::string& pathname, bool shall_listen); +- +-int ConnectIpcSock(const std::string& pathname); ++int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen); +diff --git cpp/src/plasma/store.cc cpp/src/plasma/store.cc +--- cpp/src/plasma/store.cc ++++ cpp/src/plasma/store.cc +@@ -1150,1 +1150,1 @@ +- int socket = BindIpcSock(socket_name, true); ++ int socket = ConnectOrListenIpcSock(socket_name, true); +@@ -1301,1 +1301,5 @@ ++#ifdef _WINSOCKAPI_ ++ WSADATA wsadata; ++ WSAStartup(MAKEWORD(2, 2), &wsadata); ++#endif + plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store); +@@ -1307,1 +1311,4 @@ ++#ifdef _WINSOCKAPI_ ++ WSACleanup(); ++#endif + return 0; +diff --git cpp/src/plasma/io.cc cpp/src/plasma/io.cc +--- cpp/src/plasma/io.cc ++++ cpp/src/plasma/io.cc +@@ -29,1 +29,5 @@ ++#ifndef _WIN32 ++#include ++#include ++#endif + +@@ -117,39 +121,79 @@ +-int BindIpcSock(const std::string& pathname, bool shall_listen) { +- struct sockaddr_un socket_address; +- int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); ++int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { ++ union { ++ struct sockaddr addr; ++ struct sockaddr_un un; ++ struct sockaddr_in in; ++ } socket_address; ++ int addrlen; ++ memset(&socket_address, 0, sizeof(socket_address)); ++ if (pathname.find("tcp://") == 0) { ++ addrlen = sizeof(socket_address.in); ++ socket_address.in.sin_family = AF_INET; ++ std::string addr = pathname.substr(pathname.find('/') + 2); ++ size_t i = addr.rfind(':'), j; ++ if (i >= addr.size()) { ++ j = i = addr.size(); ++ } else { ++ j = i + 1; ++ } ++ socket_address.in.sin_addr.s_addr = inet_addr(addr.substr(0, i).c_str()); ++ socket_address.in.sin_port = htons(static_cast(atoi(addr.substr(j).c_str()))); ++ if (socket_address.in.sin_addr.s_addr == INADDR_NONE) { ++ ARROW_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname; ++ return -1; ++ } ++ if (socket_address.in.sin_port == htons(0)) { ++ ARROW_LOG(ERROR) << "Socket address is missing a valid port: " << pathname; ++ return -1; ++ } ++ } else { ++ addrlen = sizeof(socket_address.un); ++ socket_address.un.sun_family = AF_UNIX; ++ if (pathname.size() + 1 > sizeof(socket_address.un.sun_path)) { ++ ARROW_LOG(ERROR) << "Socket pathname is too long."; ++ return -1; ++ } ++ strncpy(socket_address.un.sun_path, pathname.c_str(), pathname.size() + 1); ++ } ++ ++ int socket_fd = socket(socket_address.addr.sa_family, SOCK_STREAM, 0); + if (socket_fd < 0) { + ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname; + return -1; + } +- // Tell the system to allow the port to be reused. +- int on = 1; +- if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&on), +- sizeof(on)) < 0) { +- ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname; +- close(socket_fd); +- return -1; +- } ++ if (shall_listen) { ++ // Tell the system to allow the port to be reused. ++ int on = 1; ++ if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&on), ++ sizeof(on)) < 0) { ++ ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname; ++ close(socket_fd); ++ return -1; ++ } + +- unlink(pathname.c_str()); +- memset(&socket_address, 0, sizeof(socket_address)); +- socket_address.sun_family = AF_UNIX; +- if (pathname.size() + 1 > sizeof(socket_address.sun_path)) { +- ARROW_LOG(ERROR) << "Socket pathname is too long."; +- close(socket_fd); +- return -1; +- } +- strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1); ++ if (socket_address.addr.sa_family == AF_UNIX) { ++#ifdef _WIN32 ++ _unlink(pathname.c_str()); ++#else ++ unlink(pathname.c_str()); ++#endif ++ } ++ if (bind(socket_fd, &socket_address.addr, addrlen) != 0) { ++ ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname; ++ close(socket_fd); ++ return -1; ++ } + +- if (bind(socket_fd, reinterpret_cast(&socket_address), +- sizeof(socket_address)) != 0) { +- ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname; +- close(socket_fd); +- return -1; +- } +- if (shall_listen && listen(socket_fd, 128) == -1) { +- ARROW_LOG(ERROR) << "Could not listen to socket " << pathname; +- close(socket_fd); +- return -1; ++ if (listen(socket_fd, 128) == -1) { ++ ARROW_LOG(ERROR) << "Could not listen to socket " << pathname; ++ close(socket_fd); ++ return -1; ++ } ++ } else { ++ if (connect(socket_fd, &socket_address.addr, addrlen) != 0) { ++ close(socket_fd); ++ return -1; ++ } + } + return socket_fd; + } +@@ -166,9 +270,9 @@ +- *fd = ConnectIpcSock(pathname); ++ *fd = ConnectOrListenIpcSock(pathname, false); + while (*fd < 0 && num_retries > 0) { + ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname + << ", retrying " << num_retries << " more times"; + // Sleep for timeout milliseconds. + usleep(static_cast(timeout * 1000)); +- *fd = ConnectIpcSock(pathname); ++ *fd = ConnectOrListenIpcSock(pathname, false); + --num_retries; + } +@@ -184,28 +228,0 @@ +-int ConnectIpcSock(const std::string& pathname) { +- struct sockaddr_un socket_address; +- int socket_fd; +- +- socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); +- if (socket_fd < 0) { +- ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname; +- return -1; +- } +- +- memset(&socket_address, 0, sizeof(socket_address)); +- socket_address.sun_family = AF_UNIX; +- if (pathname.size() + 1 > sizeof(socket_address.sun_path)) { +- ARROW_LOG(ERROR) << "Socket pathname is too long."; +- close(socket_fd); +- return -1; +- } +- strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1); +- +- if (connect(socket_fd, reinterpret_cast(&socket_address), +- sizeof(socket_address)) != 0) { +- close(socket_fd); +- return -1; +- } +- +- return socket_fd; +-} +- +-- diff --git a/thirdparty/patches/hiredis-casts.patch b/thirdparty/patches/hiredis-casts.patch deleted file mode 100644 index 6cd55251f..000000000 --- a/thirdparty/patches/hiredis-casts.patch +++ /dev/null @@ -1,34 +0,0 @@ -diff --git deps/hiredis/net.c deps/hiredis/net.c ---- deps/hiredis/net.c -+++ deps/hiredis/net.c -@@ -79,1 +79,1 @@ -- if (setsockopt(c->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { -+ if (setsockopt(c->fd, SOL_SOCKET, SO_REUSEADDR, (char const *)&on, sizeof(on)) == -1) { -@@ -131,1 +131,1 @@ -- if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){ -+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char const *)&val, sizeof(val)) == -1){ -@@ -139,1 +139,1 @@ -- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &val, sizeof(val)) < 0) { -+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, (char const *)&val, sizeof(val)) < 0) { -@@ -146,1 +146,1 @@ -- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) { -+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (char const *)&val, sizeof(val)) < 0) { -@@ -153,1 +153,1 @@ -- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) { -+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (char const *)&val, sizeof(val)) < 0) { -@@ -159,1 +159,1 @@ -- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) { -+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (char const *)&val, sizeof(val)) < 0) { -@@ -171,1 +171,1 @@ -- if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) { -+ if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (char const *)&yes, sizeof(yes)) == -1) { -@@ -240,1 +240,1 @@ -- if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { -+ if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) == -1) { -@@ -255,5 +255,5 @@ -- if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { -+ if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,(char const *)&tv,sizeof(tv)) == -1) { -@@ -259,1 +259,1 @@ -- if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,&tv,sizeof(tv)) == -1) { -+ if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,(char const *)&tv,sizeof(tv)) == -1) { --- diff --git a/thirdparty/patches/hiredis-windows-poll.patch b/thirdparty/patches/hiredis-windows-poll.patch deleted file mode 100644 index cee9a82ef..000000000 --- a/thirdparty/patches/hiredis-windows-poll.patch +++ /dev/null @@ -1,30 +0,0 @@ -diff --git deps/hiredis/net.c deps/hiredis/net.c ---- deps/hiredis/net.c -+++ deps/hiredis/net.c -@@ -204,11 +204,24 @@ - static int redisContextWaitReady(redisContext *c, long msec) { -+#ifdef _WINSOCKAPI_ -+ fd_set wset; -+ -+ FD_ZERO(&wset); -+ FD_SET(_get_osfhandle(c->fd), &wset); -+#else - struct pollfd wfd[1]; - - wfd[0].fd = c->fd; - wfd[0].events = POLLOUT; -+#endif - - if (errno == EINPROGRESS) { - int res; - -+#ifdef _WINSOCKAPI_ -+ struct timeval tv = { msec / 1000, (msec % 1000) * 1000 }; -+ res = select(_get_osfhandle(c->fd) + 1, NULL, &wset, NULL, msec >= 0 ? &tv : NULL); -+#else -+ res = poll(wfd, 1, msec); -+#endif -- if ((res = poll(wfd, 1, msec)) == -1) { -+ if (res == -1) { - __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)"); --- diff --git a/thirdparty/patches/hiredis-windows-sockets.patch b/thirdparty/patches/hiredis-windows-sockets.patch index ce75b2f24..5fc463bcc 100644 --- a/thirdparty/patches/hiredis-windows-sockets.patch +++ b/thirdparty/patches/hiredis-windows-sockets.patch @@ -16,19 +16,10 @@ diff --git deps/hiredis/net.h deps/hiredis/net.h diff --git deps/hiredis/net.c deps/hiredis/net.c --- deps/hiredis/net.c +++ deps/hiredis/net.c -@@ -60,6 +60,10 @@ +@@ -60,1 +60,1 @@ -static void redisContextCloseFd(redisContext *c) { +void redisContextCloseFd(redisContext *c) { - if (c && c->fd >= 0) { -+#ifdef _WINSOCKAPI_ -+ closesocket(_get_osfhandle(c->fd)) == 0 || (WSAGetLastError() == WSAENOTSOCK && _close(c->fd) == 0); -+#else - close(c->fd); -+#endif - c->fd = -1; - } - } -@@ -102,24 +106,33 @@ +@@ -102,24 +102,33 @@ static int redisSetBlocking(redisContext *c, int blocking) { int flags; diff --git a/thirdparty/patches/hiredis-windows-translations.patch b/thirdparty/patches/hiredis-windows-translations.patch deleted file mode 100644 index c6722545d..000000000 --- a/thirdparty/patches/hiredis-windows-translations.patch +++ /dev/null @@ -1,102 +0,0 @@ -diff --git deps/hiredis/hiredis.c deps/hiredis/hiredis.c ---- deps/hiredis/hiredis.c -+++ deps/hiredis/hiredis.c -@@ -46,0 +46,37 @@ -+#ifdef _WINSOCKAPI_ -+static ptrdiff_t read_redis(int fildes, void *buf, size_t nbyte) { -+ WSABUF wsabuf = { (u_long)nbyte, (char *)buf }; -+ WSAOVERLAPPED overlapped = { 0 }; -+ SOCKET sock = _get_osfhandle(fildes); -+ int flags = 0; -+ ptrdiff_t r = WSARecv(sock, &wsabuf, 1, NULL, &flags, &overlapped, NULL); -+ if (r == SOCKET_ERROR && WSAGetLastError() == WSA_IO_PENDING) { -+ r = WSAWaitForMultipleEvents(1, &sock, FALSE, WSA_INFINITE, FALSE) == WSA_WAIT_EVENT_0 ? overlapped.InternalHigh : -1; -+ } else if (r != SOCKET_ERROR) { -+ r = overlapped.InternalHigh; -+ } -+ if (r == -1) { -+ switch (errno) { -+ case WSAEWOULDBLOCK: -+ WSASetLastError(EAGAIN); -+ break; -+ } -+ } -+ return r; -+} -+#define read read_redis -+static ptrdiff_t write_redis(int fildes, const void *buf, size_t nbyte) { -+ WSABUF wsabuf = { (u_long)nbyte, (char *)buf }; -+ WSAOVERLAPPED overlapped = { 0 }; -+ SOCKET sock = _get_osfhandle(fildes); -+ int flags = 0; -+ ptrdiff_t r = WSASend(sock, &wsabuf, 1, NULL, flags, &overlapped, NULL); -+ if (r == SOCKET_ERROR && WSAGetLastError() == WSA_IO_PENDING) { -+ r = WSAWaitForMultipleEvents(1, &sock, FALSE, WSA_INFINITE, FALSE) == WSA_WAIT_EVENT_0 ? overlapped.InternalHigh : -1; -+ } else if (r != SOCKET_ERROR) { -+ r = overlapped.InternalHigh; -+ } -+ return r; -+} -+#define write write_redis -+#endif -diff --git deps/hiredis/net.c deps/hiredis/net.c ---- deps/hiredis/net.c -+++ deps/hiredis/net.c -@@ -47,1 +47,45 @@ -+#ifdef _WINSOCKAPI_ -+#define errno WSAGetLastError() -+static int socket_redis(int af, int type, int protocol) { -+ int r; -+ SOCKET sock = WSASocket(af, type, protocol, NULL, 0, WSA_FLAG_OVERLAPPED); -+ if (sock != -1) { -+ r = _open_osfhandle(sock, _O_BINARY | _O_RDWR); -+ if (r == -1) { closesocket(sock); } -+ } else { -+ r = -1; -+ } -+ return r; -+} -+#define socket socket_redis -+static int connect_redis(int socketfd, const struct sockaddr *address, socklen_t address_len) { -+ int r = connect(_get_osfhandle(socketfd), address, address_len); -+ if (r == -1) { -+ switch (errno) { -+ case WSAEINVAL: -+ case WSAEWOULDBLOCK: -+ case WSA_IO_PENDING: -+ WSASetLastError(EINPROGRESS); -+ break; -+ } -+ } -+ return r; -+} -+#define connect connect_redis -+static int ioctlsocket_redis(int socketfd, long cmd, u_long* argp) { -+ int r = ioctlsocket(_get_osfhandle(socketfd), cmd, argp); -+ return r; -+} -+#define ioctlsocket ioctlsocket_redis -+static int getsockopt_redis(int socketfd, int level, int optname, char* optval, int* optlen) { -+ return getsockopt(_get_osfhandle(socketfd), level, optname, optval, optlen); -+} -+#define getsockopt getsockopt_redis -+static int setsockopt_redis(int socketfd, int level, int optname, const char* optval, int optlen) { -+ return setsockopt(_get_osfhandle(socketfd), level, optname, optval, optlen); -+} -+#define setsockopt setsockopt_redis -+#else - #include -+#endif -@@ -218,1 +258,5 @@ -+#ifdef _WINSOCKAPI_ -+ WSASetLastError(ETIMEDOUT); -+#else - errno = ETIMEDOUT; -+#endif -@@ -245,1 +285,5 @@ -+#ifdef _WINSOCKAPI_ -+ WSASetLastError(err); -+#else - errno = err; -+#endif --- diff --git a/thirdparty/patches/redis-windows-poll.patch b/thirdparty/patches/redis-windows-poll.patch deleted file mode 100644 index a6a369841..000000000 --- a/thirdparty/patches/redis-windows-poll.patch +++ /dev/null @@ -1,55 +0,0 @@ -diff --git src/ae.c src/ae.c ---- src/ae.c -+++ src/ae.c -@@ -474,21 +474,44 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) - /* Wait for milliseconds until the given file descriptor becomes - * writable/readable/exception */ - int aeWait(int fd, int mask, long long milliseconds) { -+ int retmask = 0, retval; -+ short revents = 0; -+#ifdef _WINSOCKAPI_ -+ fd_set rset, wset; -+ FD_ZERO(&rset); -+ FD_ZERO(&wset); -+ if (mask & AE_READABLE) { -+ FD_SET(fd, &rset); -+ } else if (mask & AE_WRITABLE) { -+ FD_SET(fd, &wset); -+ } -+ struct timeval tv = { milliseconds / 1000, (milliseconds % 1000) * 1000 }; -+ if ((retval = select(fd + 1, &rset, &wset, NULL, milliseconds >= 0 ? &tv : NULL)) > 0) { -+ if (FD_ISSET(fd, &rset)) { -+ revents |= POLLIN; -+ } -+ if (FD_ISSET(fd, &wset)) { -+ revents |= POLLOUT; -+ } -+ } -+#else - struct pollfd pfd; -- int retmask = 0, retval; - - memset(&pfd, 0, sizeof(pfd)); - pfd.fd = fd; - if (mask & AE_READABLE) pfd.events |= POLLIN; - if (mask & AE_WRITABLE) pfd.events |= POLLOUT; - -+ retval = poll(&pfd, 1, milliseconds); -+ revents = pfd.revents; -- if ((retval = poll(&pfd, 1, milliseconds))== 1) { -- if (pfd.revents & POLLIN) retmask |= AE_READABLE; -- if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; -- if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; -- if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; -+#endif -+ if (retval== 1) { -+ if (revents & POLLIN) retmask |= AE_READABLE; -+ if (revents & POLLOUT) retmask |= AE_WRITABLE; -+ if (revents & POLLERR) retmask |= AE_WRITABLE; -+ if (revents & POLLHUP) retmask |= AE_WRITABLE; - return retmask; - } else { - return retval; - } - } ---