diff --git a/bazel/BUILD.arrow b/bazel/BUILD.arrow index 715e48d6e..444a3d64e 100644 --- a/bazel/BUILD.arrow +++ b/bazel/BUILD.arrow @@ -5,9 +5,9 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"] COPTS = [] + select({ "@bazel_tools//src/conditions:windows": [ - "-D" + "WIN32_REPLACE_FD_APIS", - "/FI" + "win32fd.h", - ] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES], + "-D" + define + for define in PROPAGATED_WINDOWS_DEFINES + ], "//conditions:default": [ "-DARROW_USE_GLOG", ], @@ -91,7 +91,6 @@ cc_library( strip_include_prefix = "cpp/src", visibility = ["//visibility:public"], deps = [ - "@//:platform_shims", "@boost//:filesystem", "@com_github_google_glog//:glog", ], diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index d40f217f1..842fc5996 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -187,7 +187,6 @@ def ray_deps_setup(): sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6", patches = [ "//thirdparty/patches:arrow-windows-export.patch", - "//thirdparty/patches:arrow-windows-nonstdc.patch", ], ) diff --git a/src/ray/common/test/client_connection_test.cc b/src/ray/common/test/client_connection_test.cc index f1a22234b..431b651e4 100644 --- a/src/ray/common/test/client_connection_test.cc +++ b/src/ray/common/test/client_connection_test.cc @@ -20,11 +20,6 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#if !defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) -#include -#include -#endif - #include "ray/common/client_connection.h" namespace ray { @@ -40,11 +35,12 @@ class ClientConnectionTest : public ::testing::Test { in_ = std::move(input); out_ = std::move(output); #else - boost::asio::detail::socket_type pair[2] = {boost::asio::detail::invalid_socket, - boost::asio::detail::invalid_socket}; - RAY_CHECK(socketpair(boost::asio::ip::tcp::v4().family(), SOCK_STREAM, 0, pair) == 0); - in_.assign(boost::asio::ip::tcp::v4(), pair[0]); - out_.assign(boost::asio::ip::tcp::v4(), pair[1]); + // Choose a free port. + auto endpoint = ParseUrlEndpoint("tcp://127.0.0.1:65437"); + boost::asio::basic_socket_acceptor acceptor(io_service_, + endpoint); + out_.connect(endpoint); + acceptor.accept(in_); #endif } diff --git a/src/ray/common/test_util.h b/src/ray/common/test_util.h index ffc721073..64b802bbb 100644 --- a/src/ray/common/test_util.h +++ b/src/ray/common/test_util.h @@ -14,8 +14,6 @@ #pragma once -#include - #include #include #include diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b36912ded..951c5561c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -21,6 +21,7 @@ #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/gcs_client/service_based_gcs_client.h" +#include "ray/util/process.h" #include "ray/util/util.h" namespace { @@ -664,13 +665,13 @@ void CoreWorker::RegisterToGcs() { RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr)); } + void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) { if (error == boost::asio::error::operation_aborted) { return; } - // If the raylet fails, we will be reassigned to init (PID=1). - if (getppid() == 1) { + if (!IsParentProcessAlive()) { RAY_LOG(ERROR) << "Raylet failed. Shutting down."; Shutdown(); } diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index e21cc863c..704a04120 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -13,7 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_client/service_based_gcs_client.h" -#include + #include "ray/common/ray_config.h" #include "ray/gcs/gcs_client/service_based_accessor.h" diff --git a/src/ray/gcs/redis_client.cc b/src/ray/gcs/redis_client.cc index 21a87a799..472ca4ea4 100644 --- a/src/ray/gcs/redis_client.cc +++ b/src/ray/gcs/redis_client.cc @@ -14,7 +14,6 @@ #include "ray/gcs/redis_client.h" -#include #include "ray/common/ray_config.h" #include "ray/gcs/redis_context.h" diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 6ad799a26..ad8f2f542 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -14,8 +14,6 @@ #include "ray/gcs/redis_context.h" -#include - #include #include "ray/stats/stats.h" diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 71a061323..6a8865a05 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -14,8 +14,6 @@ #include "ray/gcs/redis_gcs_client.h" -#include - #include "ray/common/ray_config.h" #include "ray/gcs/redis_accessor.h" #include "ray/gcs/redis_context.h" diff --git a/src/ray/object_manager/plasma/fling.cc b/src/ray/object_manager/plasma/fling.cc index 5e6d466ab..d3b683cf9 100644 --- a/src/ray/object_manager/plasma/fling.cc +++ b/src/ray/object_manager/plasma/fling.cc @@ -14,6 +14,10 @@ #include "ray/object_manager/plasma/fling.h" +#ifdef _WIN32 +#error "This file does not supposed to be compiled under windows." +#endif + #include #include diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 3062595b3..586d3782a 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include #include diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 62cb7e46f..8a91822dd 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -14,8 +14,6 @@ #include "ray/object_manager/object_manager.h" -#include - #include #include diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 068f97f3a..cd65097f4 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -14,8 +14,6 @@ #pragma once -#include - #include "ray/common/network_util.h" #include "ray/rpc/grpc_client.h" #include "src/ray/protobuf/gcs_service.grpc.pb.h" diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 4fc4a191b..cdf676ffb 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -15,7 +15,6 @@ #include "ray/rpc/grpc_server.h" #include -#include #include diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index d4ad72bfd..829020da3 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -15,6 +15,11 @@ #include "ray/util/process.h" #ifdef _WIN32 +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN 1 +#endif +#include +#include #include #else #include @@ -26,6 +31,7 @@ #endif #include +#include #include #include #include @@ -439,6 +445,65 @@ void Process::Kill() { } } +#ifdef _WIN32 +#ifndef STATUS_BUFFER_OVERFLOW +#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L) +#endif +typedef LONG NTSTATUS; +typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle, + ULONG ProcessInformationClass, + PVOID ProcessInformation, + ULONG ProcessInformationLength, + ULONG *ReturnLength); + +static std::atomic NtQueryInformationProcess_ = + ATOMIC_VAR_INIT(NULL); + +pid_t GetParentPID() { + NtQueryInformationProcess_t *NtQueryInformationProcess = NtQueryInformationProcess_; + if (!NtQueryInformationProcess) { + NtQueryInformationProcess = reinterpret_cast( + GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")), + _CRT_STRINGIZE(NtQueryInformationProcess))); + NtQueryInformationProcess_ = NtQueryInformationProcess; + } + DWORD ppid = 0; + PROCESS_BASIC_INFORMATION info; + ULONG cb = sizeof(info); + NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb); + if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) { + ppid = static_cast(reinterpret_cast(info.Reserved3)); + } + pid_t result = 0; + if (ppid > 0) { + // For now, assume PPID = 1 (simulating the reassignment to "init" on Linux) + result = 1; + if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) { + long long me_created, parent_created; + FILETIME unused; + if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast(&me_created), + &unused, &unused, &unused) && + GetProcessTimes(parent, reinterpret_cast(&parent_created), &unused, + &unused, &unused)) { + if (me_created >= parent_created) { + // We verified the child is younger than the parent, so we know the parent + // is still alive. + // (Note that the parent can still die by the time this function returns, + // but that race condition exists on POSIX too, which we're emulating here.) + result = static_cast(ppid); + } + } + CloseHandle(parent); + } + } + return result; +} +#else +pid_t GetParentPID() { return getppid(); } +#endif // #ifdef _WIN32 + +bool IsParentProcessAlive() { return GetParentPID() != 1; } + } // namespace ray namespace std { diff --git a/src/ray/util/process.h b/src/ray/util/process.h index d75da3b55..5603baa65 100644 --- a/src/ray/util/process.h +++ b/src/ray/util/process.h @@ -83,6 +83,12 @@ class Process { int Wait() const; }; +// Get the Process ID of the parent process. If the parent process exits, the PID +// will be 1 (this simulates POSIX getppid()). +pid_t GetParentPID(); + +bool IsParentProcessAlive(); + } // namespace ray // We only define operators required by the standard library (==, hash): diff --git a/src/shims/windows/msg.cc b/src/shims/windows/msg.cc deleted file mode 100644 index 43d6937bb..000000000 --- a/src/shims/windows/msg.cc +++ /dev/null @@ -1,12 +0,0 @@ -#include - -int socketpair(int domain, int type, int protocol, SOCKET sv[2]) { - if ((domain != AF_UNIX && domain != AF_INET) || type != SOCK_STREAM) { - return -1; - } - SOCKET sockets[2]; - int r = dumb_socketpair(sockets); - sv[0] = sockets[0]; - sv[1] = sockets[1]; - return r; -} diff --git a/src/shims/windows/poll.h b/src/shims/windows/poll.h deleted file mode 100644 index f6b2d62ff..000000000 --- a/src/shims/windows/poll.h +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef POLL_H -#define POLL_H - -typedef unsigned long int nfds_t; -int poll(struct pollfd fds[], nfds_t nfds, int timeout); - -#endif /* POLL_H */ diff --git a/src/shims/windows/socketpair.cc b/src/shims/windows/socketpair.cc deleted file mode 100644 index 4766212ce..000000000 --- a/src/shims/windows/socketpair.cc +++ /dev/null @@ -1,141 +0,0 @@ -/* socketpair.c -Copyright 2007, 2010 by Nathan C. Myers -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - - Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - - Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - - The name of the author must not be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/* Changes: - * 2014-02-12: merge David Woodhouse, Ger Hobbelt improvements - * git.infradead.org/users/dwmw2/openconnect.git/commitdiff/bdeefa54 - * github.com/GerHobbelt/selectable-socketpair - * always init the socks[] to -1/INVALID_SOCKET on error, both on Win32/64 - * and UNIX/other platforms - * 2013-07-18: Change to BSD 3-clause license - * 2010-03-31: - * set addr to 127.0.0.1 because win32 getsockname does not always set it. - * 2010-02-25: - * set SO_REUSEADDR option to avoid leaking some windows resource. - * Windows System Error 10049, "Event ID 4226 TCP/IP has reached - * the security limit imposed on the number of concurrent TCP connect - * attempts." Bleah. - * 2007-04-25: - * preserve value of WSAGetLastError() on all error returns. - * 2007-04-22: (Thanks to Matthew Gregan ) - * s/EINVAL/WSAEINVAL/ fix trivial compile failure - * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout - * of a child process. - * add argument make_overlapped - */ - -#include - -#ifdef _WIN32 -#ifndef _WINSOCKAPI_ -#include -#endif -#include -#include -#include /* socklen_t, et al (MSVC20xx) */ -#else -#include -#include -#include -#endif - -#ifdef WIN32 - -/* dumb_socketpair: - * If make_overlapped is nonzero, both sockets created will be usable for - * "overlapped" operations via WSASend etc. If make_overlapped is zero, - * socks[0] (only) will be usable with regular ReadFile etc., and thus - * suitable for use as stdin or stdout of a child process. Note that the - * sockets must be closed with closesocket() regardless. - */ - -int dumb_socketpair(SOCKET socks[2]) { - union { - struct sockaddr_in inaddr; - struct sockaddr addr; - } a; - SOCKET listener; - int e; - socklen_t addrlen = sizeof(a.inaddr); - int reuse = 1; - - if (socks == 0) { - return SOCKET_ERROR; - } - socks[0] = socks[1] = -1; - - listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (listener == -1) return SOCKET_ERROR; - - memset(&a, 0, sizeof(a)); - a.inaddr.sin_family = AF_INET; - a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - a.inaddr.sin_port = 0; - - for (;;) { - if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, - (socklen_t)sizeof(reuse)) == -1) - break; - if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break; - - memset(&a, 0, sizeof(a)); - if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) break; - // win32 getsockname may only set the port number, p=0.0005. - // ( http://msdn.microsoft.com/library/ms738543.aspx ): - a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - a.inaddr.sin_family = AF_INET; - - if (listen(listener, 1) == SOCKET_ERROR) break; - - socks[0] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (socks[0] == -1) break; - if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break; - - socks[1] = accept(listener, NULL, NULL); - if (socks[1] == -1) break; - - closesocket(listener); - return 0; - } - - closesocket(listener); - closesocket(socks[0]); - closesocket(socks[1]); - socks[0] = socks[1] = -1; - return SOCKET_ERROR; -} -#else -int dumb_socketpair(int socks[2], int dummy) { - if (socks == 0) { - errno = EINVAL; - return -1; - } - dummy = socketpair(AF_LOCAL, SOCK_STREAM, 0, socks); - if (dummy) socks[0] = socks[1] = -1; - return dummy; -} -#endif diff --git a/src/shims/windows/sys/socket.h b/src/shims/windows/sys/socket.h deleted file mode 100644 index 1d500f49d..000000000 --- a/src/shims/windows/sys/socket.h +++ /dev/null @@ -1,10 +0,0 @@ -#ifndef SOCKET_H -#define SOCKET_H - -#include -#include // ssize_t - -int dumb_socketpair(SOCKET socks[2]); -int socketpair(int domain, int type, int protocol, SOCKET sv[2]); - -#endif /* SOCKET_H */ diff --git a/src/shims/windows/unistd.cc b/src/shims/windows/unistd.cc deleted file mode 100644 index b61ebf2c7..000000000 --- a/src/shims/windows/unistd.cc +++ /dev/null @@ -1,64 +0,0 @@ -#include - -#include - -#ifndef WIN32_LEAN_AND_MEAN -#define WIN32_LEAN_AND_MEAN 1 -#endif -#include -#include - -#ifndef STATUS_BUFFER_OVERFLOW -#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L) -#endif - -typedef LONG NTSTATUS; -typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle, - ULONG ProcessInformationClass, - PVOID ProcessInformation, - ULONG ProcessInformationLength, - ULONG *ReturnLength); - -static std::atomic NtQueryInformationProcess_ = - ATOMIC_VAR_INIT(NULL); - -typedef int pid_t; -int getppid() { - NtQueryInformationProcess_t *NtQueryInformationProcess = ::NtQueryInformationProcess_; - if (!NtQueryInformationProcess) { - NtQueryInformationProcess = reinterpret_cast( - GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")), - _CRT_STRINGIZE(NtQueryInformationProcess))); - ::NtQueryInformationProcess_ = NtQueryInformationProcess; - } - DWORD ppid = 0; - PROCESS_BASIC_INFORMATION info; - ULONG cb = sizeof(info); - NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb); - if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) { - ppid = static_cast(reinterpret_cast(info.Reserved3)); - } - pid_t result = 0; - if (ppid > 0) { - // For now, assume PPID = 1 (simulating the reassignment to "init" on Linux) - result = 1; - if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) { - long long me_created, parent_created; - FILETIME unused; - if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast(&me_created), - &unused, &unused, &unused) && - GetProcessTimes(parent, reinterpret_cast(&parent_created), &unused, - &unused, &unused)) { - if (me_created >= parent_created) { - // We verified the child is younger than the parent, so we know the parent - // is still alive. - // (Note that the parent can still die by the time this function returns, - // but that race condition exists on POSIX too, which we're emulating here.) - result = static_cast(ppid); - } - } - CloseHandle(parent); - } - } - return result; -} diff --git a/src/shims/windows/unistd.h b/src/shims/windows/unistd.h deleted file mode 100644 index 3cf5e4f4f..000000000 --- a/src/shims/windows/unistd.h +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef UNISTD_H -#define UNISTD_H - -#ifdef _WIN64 -typedef long long ssize_t; -#else -typedef int ssize_t; -#endif - -int getppid(); - -#endif /* UNISTD_H */ diff --git a/src/shims/windows/win32fd.cc b/src/shims/windows/win32fd.cc deleted file mode 100644 index 726008d56..000000000 --- a/src/shims/windows/win32fd.cc +++ /dev/null @@ -1,538 +0,0 @@ -#undef WIN32_REPLACE_FD_APIS -#define WIN32_REPLACE_FD_APIS 0 // make sure we don't replace the APIs for our own calls -#include "win32fd.h" - -#include -#include -#include - -#ifndef _WINSOCKAPI_ -#include -#endif - -#ifndef __cplusplus -extern "C" { -#endif - -enum { fh_prefer_fallback = 1 }; -static const char fh_magic_prefix[] = "Win32FD"; -static const size_t fh_magic_size = sizeof(fh_magic_prefix) + sizeof(void *); - -static void fh_get_magic(unsigned char magic[fh_magic_size]) { - memcpy(magic, fh_magic_prefix, sizeof(fh_magic_prefix)); - const void *const suffix = GetModuleHandle(NULL); - memcpy(magic + sizeof(fh_magic_prefix), &suffix, sizeof(suffix)); -} - -struct fh_payload_t { - intptr_t handle; - unsigned char magic[fh_magic_size]; -}; - -static int _fh_close(intptr_t handle) { - int result = closesocket(handle); - if (result != 0) { - int error = WSAGetLastError(); - if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { - if (CloseHandle(reinterpret_cast(handle))) { - result = 0; - } else { - _set_errno(EBADF); - } - } else { - _set_errno(EINVAL); - } - } - return result; -} - -static int _fh_is_socket(intptr_t handle) { - int result = 1; - SOCKADDR_STORAGE addr; - int addrlen = sizeof(addr); - if (getsockname(handle, reinterpret_cast(&addr), &addrlen) != 0) { - int error = WSAGetLastError(); - if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { - result = 0; - } - } - return result; -} - -int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen) { - int result = -1; - intptr_t handle = fh_get(sockpfd); - intptr_t accepted = WSAAccept(handle, name, namelen, NULL, NULL); - if (accepted != -1) { - result = fh_open(accepted, -1); - if (result == -1) { - closesocket(accepted); - accepted = -1; - } - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen) { - intptr_t handle = fh_get(sockpfd); - int result = bind(handle, name, namelen); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_close(int pfd) { - intptr_t handle = fh_get(pfd); - int result = _close(pfd); - if (result == 0 && handle != -1) { - result = _fh_close(handle); - } - return result; -} - -int fh_connect(int sockpfd, const struct sockaddr *name, socklen_t namelen) { - intptr_t handle = fh_get(sockpfd); - int result = WSAConnect(handle, name, namelen, NULL, NULL, NULL, NULL); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - int error = WSAGetLastError(); - switch (error) { - case WSAEINVAL: - case WSAEWOULDBLOCK: - case WSA_IO_PENDING: - error = WSAEINPROGRESS; // for Redis - break; - } - _set_errno(error); - } - return result; -} - -int fh_dup(int pfd) { - int result = -1; - intptr_t handle = fh_get(pfd); - if (fh_prefer_fallback && handle == -1) { - // Not our FD; fall back to default - result = _dup(pfd); - } else { - intptr_t duped = -1; - WSAPROTOCOL_INFO pi; - if (WSADuplicateSocket(handle, GetCurrentProcessId(), &pi) == 0) { - duped = WSASocket(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, 0); - if (duped == -1) { - _set_errno(EINVAL); - } - } else { - int error = WSAGetLastError(); - if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { - if (!DuplicateHandle(GetCurrentProcess(), reinterpret_cast(handle), - GetCurrentProcess(), reinterpret_cast(&duped), 0, - FALSE, DUPLICATE_SAME_ACCESS)) { - duped = -1; - _set_errno(EBADF); - } - } else { - _set_errno(EBADF); - } - } - if (duped != -1) { - result = fh_open(duped, -1); - if (result == -1) { - _fh_close(duped); - } - } - } - return result; -} - -intptr_t fh_get(int pfd) { - fh_payload_t payload = {-1}; - HANDLE pipe = reinterpret_cast(_get_osfhandle(pfd)); - DWORD size = sizeof(payload), nbytes, avail, left; - if (pipe != INVALID_HANDLE_VALUE) { - if (GetFileType(pipe) != FILE_TYPE_PIPE) { - _set_errno(EBADF); // if this triggers, you passed an invalid or incompatible FD - } else if (PeekNamedPipe(pipe, &payload, size, &nbytes, &avail, &left)) { - if (avail != size) { - payload.handle = -1; - _set_errno(EIO); // if this triggers, you accidentally wrote to the FD directly - } else { - unsigned char expected[fh_magic_size]; - fh_get_magic(expected); - if (memcmp(payload.magic, expected, sizeof(expected)) != 0) { - payload.handle = -1; - _set_errno(EBADF); // if this triggers, this isn't a recognized FD - } - } - } else { - _set_errno(EIO); // if this triggers, you accidentally read from the FD directly - } - } else { - _set_errno(EBADF); - } - return payload.handle; -} - -int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen) { - intptr_t handle = fh_get(sockpfd); - int result = getpeername(handle, name, namelen); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen) { - intptr_t handle = fh_get(sockpfd); - int result = getsockname(handle, name, namelen); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len) { - intptr_t handle = fh_get(sockpfd); - int result = getsockopt(handle, level, name, static_cast(val), len); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp) { - intptr_t handle = fh_get(sockpfd); - int result = ioctlsocket(handle, cmd, argp); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_listen(int sockpfd, int backlog) { - intptr_t handle = fh_get(sockpfd); - int result = listen(handle, backlog); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_open(intptr_t handle, int mode) { - int pfd = -1; - HANDLE r, w; - if (mode == -1) { - mode = _O_RDWR; - } - if (!(mode & _O_TEXT)) { - mode |= _O_BINARY; - } - if (handle == -1 || handle == 0) { - _set_errno(EBADF); - } else if (fh_prefer_fallback && - GetFileType(reinterpret_cast(handle)) != FILE_TYPE_UNKNOWN && - !_fh_is_socket(handle)) { - // Already compatible with CRT FDs, so just fall back - pfd = _open_osfhandle(handle, mode); - } else { - fh_payload_t payload = {handle}; - fh_get_magic(payload.magic); - DWORD size = sizeof(payload), nbytes; - if (CreatePipe(&r, &w, NULL, size)) { - if (WriteFile(w, &payload, size, &nbytes, NULL) && size == nbytes) { - pfd = _open_osfhandle(reinterpret_cast(r), mode); - if (pfd == -1) { - CloseHandle(r); - } - } else { - _set_errno(EIO); - } - CloseHandle(w); - } else { - _set_errno(EMFILE); - } - } - return pfd; -} - -int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout) { - struct fd_sets_t { - fd_set rfds, wfds, efds; - } fdsets; - int maxfd = -1; - struct fd_sets_t *pfdsets = - nfds <= FD_SETSIZE ? &fdsets - : static_cast(operator new( - (offsetof(fd_set, fd_array) + nfds * sizeof(SOCKET)) * - (sizeof(struct fd_sets_t) / sizeof(fd_set)))); - FD_ZERO(&pfdsets->rfds); - FD_ZERO(&pfdsets->wfds); - FD_ZERO(&pfdsets->efds); - for (nfds_t i = 0; i < nfds; ++i) { - if (fds[i].events & POLLIN) { - FD_SET(fds[i].fd, &pfdsets->rfds); - } - if (fds[i].events & POLLOUT) { - FD_SET(fds[i].fd, &pfdsets->wfds); - } - if (fds[i].events & POLLERR) { - FD_SET(fds[i].fd, &pfdsets->efds); - } - if (maxfd < fds[i].fd) { - maxfd = static_cast(fds[i].fd); - } - } - int msec = timeout; - struct timeval tv = {msec >= 0 ? msec / 1000 : 0, msec >= 0 ? (msec % 1000) * 1000 : 0}; - int result = fh_select(maxfd + 1, &pfdsets->rfds, &pfdsets->wfds, &pfdsets->efds, - msec >= 0 ? &tv : NULL); - if (result >= 0) { - result = 0; - for (nfds_t i = 0; i < nfds; ++i) { - fds[i].revents = 0; - if (FD_ISSET(fds[i].fd, &pfdsets->rfds)) { - fds[i].revents |= POLLIN; - } - if (FD_ISSET(fds[i].fd, &pfdsets->wfds)) { - fds[i].revents |= POLLOUT; - } - if (FD_ISSET(fds[i].fd, &pfdsets->efds)) { - fds[i].revents |= POLLERR; - } - if (fds[i].revents) { - ++result; - } - } - } - if (pfdsets != &fdsets) { - operator delete(pfdsets); - } - return result; -} - -ssize_t fh_read(int pfd, void *buffer, size_t size) { - ssize_t result = -1; - intptr_t handle = fh_get(pfd); - if (size >= INT_MAX) { - size = INT_MAX; - } - if (fh_prefer_fallback && handle == -1) { - // Not our FD; fall back to default - result = _read(pfd, buffer, static_cast(size)); - } else { - WSABUF buf = {static_cast(size), static_cast(buffer)}; - DWORD nbytes; - DWORD flags = 0; - if (WSARecv(handle, &buf, 1, &nbytes, &flags, NULL, NULL) == 0) { - result = static_cast(nbytes); - } else { - int error = WSAGetLastError(); - if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { - if (ReadFile(reinterpret_cast(handle), buffer, static_cast(size), - &nbytes, NULL)) { - result = static_cast(nbytes); - } else { - _set_errno(EINVAL); - } - } else { - if (error == WSAEWOULDBLOCK) { - error = EAGAIN; // for Redis - } - _set_errno(error); - } - } - } - return result; -} - -ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags) { - ssize_t result = -1; - if (size >= INT_MAX) { - size = INT_MAX; - } - intptr_t handle = fh_get(sockpfd); - WSABUF buf = {static_cast(size), static_cast(buffer)}; - DWORD nbytes; - DWORD dwflags = static_cast(flags); - if (WSARecv(handle, &buf, 1, &nbytes, &dwflags, NULL, NULL) == 0) { - result = static_cast(nbytes); - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -intptr_t fh_release(int pfd) { - intptr_t handle = fh_get(pfd); - if (handle != -1) { - if (_close(pfd) != 0) { - handle = -1; - } - } - return handle; -} - -int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, - struct timeval *timeout) { - int result = 0; - fd_set rfds, wfds, efds; - struct { - fd_set *src, *dst; - } translations[] = { - {readfds, &rfds}, - {writefds, &wfds}, - {errorfds, &efds}, - }; - for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) { - fd_set *src = translations[i].src, *dst = translations[i].dst; - if (src) { - for (size_t j = 0; j < src->fd_count; ++j) { - int sockpfd = static_cast(src->fd_array[j]); - intptr_t handle = fh_get(sockpfd); - if (handle == -1) { - result = -1; - _set_errno(EBADF); - } - dst->fd_array[j] = handle; - } - dst->fd_count = src->fd_count; - } - } - if (result != -1) { - result = select(nfds, readfds ? &rfds : NULL, writefds ? &wfds : NULL, - errorfds ? &efds : NULL, timeout); - if (result != -1) { - _set_errno(WSAGetLastError()); - } - } - if (result >= 0) { - for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) { - fd_set *src = translations[i].src, *dst = translations[i].dst; - if (src) { - fd_set out; - FD_ZERO(&out); - for (size_t j = 0; j < src->fd_count; ++j) { - int sockpfd = static_cast(src->fd_array[j]); - intptr_t handle = fh_get(sockpfd); - if (FD_ISSET(handle, dst)) { - FD_SET(sockpfd, &out); - } - } - memcpy(src->fd_array, out.fd_array, out.fd_count * sizeof(*out.fd_array)); - src->fd_count = out.fd_count; - } - } - } - return result; -} - -ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags) { - ssize_t result = -1; - if (size >= INT_MAX) { - size = INT_MAX; - } - intptr_t handle = fh_get(sockpfd); - WSABUF buf = {static_cast(size), - static_cast(const_cast(buffer))}; - DWORD nbytes; - if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) { - result = static_cast(nbytes); - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len) { - intptr_t handle = fh_get(sockpfd); - int result = setsockopt(handle, level, name, static_cast(val), len); - if (result == 0) { - } else if (handle == -1) { - _set_errno(EBADF); - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -int fh_socket(int domain, int type, int protocol) { - int result = -1; - DWORD flags = WSA_FLAG_OVERLAPPED; // because socket() is overlapped by default - intptr_t handle = WSASocket(domain, type, protocol, NULL, 0, flags); - if (handle != -1) { - result = fh_open(handle, -1); - if (result == -1) { - closesocket(handle); - handle = -1; - } - } else { - _set_errno(WSAGetLastError()); - } - return result; -} - -ssize_t fh_write(int pfd, const void *buffer, size_t size) { - ssize_t result = -1; - ssize_t handle = fh_get(pfd); - if (size >= INT_MAX) { - size = INT_MAX; - } - if (fh_prefer_fallback && handle == -1) { - // Not our FD; fall back to default - result = _write(pfd, buffer, static_cast(size)); - } else { - WSABUF buf = {static_cast(size), - static_cast(const_cast(buffer))}; - DWORD nbytes; - DWORD flags = 0; - if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) { - result = static_cast(nbytes); - } else { - int error = WSAGetLastError(); - if (error == WSANOTINITIALISED || error == WSAENOTSOCK) { - if (WriteFile(reinterpret_cast(handle), buffer, static_cast(size), - &nbytes, NULL)) { - result = static_cast(nbytes); - } else { - _set_errno(EINVAL); - } - } else { - _set_errno(WSAGetLastError()); - } - } - } - return result; -} - -#ifndef __cplusplus -} -#endif diff --git a/src/shims/windows/win32fd.h b/src/shims/windows/win32fd.h deleted file mode 100644 index e643b16aa..000000000 --- a/src/shims/windows/win32fd.h +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef WIN32_FD_H -#define WIN32_FD_H - -#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS -#ifndef _CRT_DECLARE_NONSTDC_NAMES -#define _CRT_DECLARE_NONSTDC_NAMES 0 -#endif -#endif - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef _WIN32 -#include -#undef ECONNRESET -#undef EINPROGRESS -#undef ETIMEDOUT - -#include -#include - -enum { - ECONNRESET = WSAECONNRESET, - EINPROGRESS = WSAEINPROGRESS, - ETIMEDOUT = WSAETIMEDOUT, -}; -typedef ptrdiff_t ssize_t; -typedef unsigned long int nfds_t; -#endif - -int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen); -int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen); -int fh_close(int pfd); -int fh_connect(int socket, const struct sockaddr *name, socklen_t namelen); -int fh_dup(int pfd); - -/// Retrieves the underlying file handle for the given pseudo-file descriptor. -intptr_t fh_get(int pfd); - -int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen); -int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen); -int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len); -int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp); -int fh_listen(int sockpfd, int backlog); - -/// Opens a pseudo-file descriptor around the given file handle. Sockets are supported. -/// However, this is only intended for storing a HANDLE as an integer. -/// The file descriptor must not be used with external file I/O functions. -int fh_open(intptr_t handle, int mode); - -int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout); -ssize_t fh_read(int pfd, void *buffer, size_t size); - -/// Releases the underlying file handle for the given pseudo-file descriptor and then -/// closes the pseudo-file descriptor. -intptr_t fh_release(int pfd); - -ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags); -int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, - struct timeval *timeout); -ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags); -int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len); -int fh_socket(int domain, int type, int protocol); -ssize_t fh_write(int pfd, const void *buffer, size_t size); - -#ifdef __cplusplus -} -#endif - -#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS -#define accept fh_accept -#define bind fh_bind -static int close(int pfd) { return fh_close(pfd); } -#define connect fh_connect -static int dup(int pfd) { return fh_dup(pfd); } -#define getsockopt fh_getsockopt -#define ioctlsocket fh_ioctlsocket -#define listen fh_listen -static int open(intptr_t handle, int mode) { return fh_open(handle, mode); } -#define poll fh_poll -static ssize_t read(int pfd, void *buffer, size_t size) { - return fh_read(pfd, buffer, size); -} -#define recv fh_recv -#define recvfrom fh_recvfrom -#define select fh_select -#define socket fh_socket -#define send fh_send -#define sendto fh_sendto -#define setsockopt fh_setsockopt -static ssize_t write(int pfd, const void *buffer, size_t size) { - return fh_write(pfd, buffer, size); -} -#endif - -#endif diff --git a/streaming/src/config/streaming_config.cc b/streaming/src/config/streaming_config.cc index 8a89ad673..f63b00d2e 100644 --- a/streaming/src/config/streaming_config.cc +++ b/streaming/src/config/streaming_config.cc @@ -1,7 +1,5 @@ #include "config/streaming_config.h" -#include - #include "util/streaming_logging.h" namespace ray { diff --git a/streaming/src/data_writer.cc b/streaming/src/data_writer.cc index 3c4ee64c4..733ee2a34 100644 --- a/streaming/src/data_writer.cc +++ b/streaming/src/data_writer.cc @@ -1,8 +1,5 @@ #include "data_writer.h" -#include -#include - #include #include #include diff --git a/streaming/src/test/ring_buffer_tests.cc b/streaming/src/test/ring_buffer_tests.cc index 25320cd6a..833eb149c 100644 --- a/streaming/src/test/ring_buffer_tests.cc +++ b/streaming/src/test/ring_buffer_tests.cc @@ -1,11 +1,10 @@ -#include "gtest/gtest.h" -#include "ray/util/logging.h" - -#include #include #include #include + +#include "gtest/gtest.h" #include "message/message.h" +#include "ray/util/logging.h" #include "ring_buffer.h" using namespace ray; diff --git a/streaming/src/test/streaming_queue_tests.cc b/streaming/src/test/streaming_queue_tests.cc index 2dfb2bf18..1339d45f5 100644 --- a/streaming/src/test/streaming_queue_tests.cc +++ b/streaming/src/test/streaming_queue_tests.cc @@ -1,16 +1,13 @@ #define BOOST_BIND_NO_PLACEHOLDERS -#include +#include "data_reader.h" +#include "data_writer.h" #include "gtest/gtest.h" +#include "message/message.h" +#include "message/message_bundle.h" #include "queue/queue_client.h" #include "ray/common/test_util.h" #include "ray/core_worker/core_worker.h" - -#include "data_reader.h" -#include "data_writer.h" -#include "message/message.h" -#include "message/message_bundle.h" #include "ring_buffer.h" - #include "test/queue_tests_base.h" using namespace std::placeholders; diff --git a/thirdparty/patches/arrow-windows-nonstdc.patch b/thirdparty/patches/arrow-windows-nonstdc.patch deleted file mode 100644 index 6fd59ad55..000000000 --- a/thirdparty/patches/arrow-windows-nonstdc.patch +++ /dev/null @@ -1,8 +0,0 @@ -diff --git cpp/src/arrow/io/mman.h cpp/src/arrow/io/mman.h ---- cpp/src/arrow/io/mman.h -+++ cpp/src/arrow/io/mman.h -@@ -3,0 +3,3 @@ -+#ifdef _WIN32 -+typedef long long off_t; -+#endif ---