[Core] Try remove all windows compat shims (#9671)

* try remove compat for arrow

* remove unistd.h

* remove socket compat

* delete arrow windows patch
This commit is contained in:
Siyuan (Ryans) Zhuang
2020-07-25 12:00:36 -07:00
committed by GitHub
parent d49b19c24c
commit 54a0d8b69e
29 changed files with 95 additions and 940 deletions
+3 -4
View File
@@ -5,9 +5,9 @@ PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"]
COPTS = [] + select({
"@bazel_tools//src/conditions:windows": [
"-D" + "WIN32_REPLACE_FD_APIS",
"/FI" + "win32fd.h",
] + ["-D" + define for define in PROPAGATED_WINDOWS_DEFINES],
"-D" + define
for define in PROPAGATED_WINDOWS_DEFINES
],
"//conditions:default": [
"-DARROW_USE_GLOG",
],
@@ -91,7 +91,6 @@ cc_library(
strip_include_prefix = "cpp/src",
visibility = ["//visibility:public"],
deps = [
"@//:platform_shims",
"@boost//:filesystem",
"@com_github_google_glog//:glog",
],
-1
View File
@@ -187,7 +187,6 @@ def ray_deps_setup():
sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6",
patches = [
"//thirdparty/patches:arrow-windows-export.patch",
"//thirdparty/patches:arrow-windows-nonstdc.patch",
],
)
+6 -10
View File
@@ -20,11 +20,6 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#if !defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
#include <sys/socket.h>
#include <sys/types.h>
#endif
#include "ray/common/client_connection.h"
namespace ray {
@@ -40,11 +35,12 @@ class ClientConnectionTest : public ::testing::Test {
in_ = std::move(input);
out_ = std::move(output);
#else
boost::asio::detail::socket_type pair[2] = {boost::asio::detail::invalid_socket,
boost::asio::detail::invalid_socket};
RAY_CHECK(socketpair(boost::asio::ip::tcp::v4().family(), SOCK_STREAM, 0, pair) == 0);
in_.assign(boost::asio::ip::tcp::v4(), pair[0]);
out_.assign(boost::asio::ip::tcp::v4(), pair[1]);
// Choose a free port.
auto endpoint = ParseUrlEndpoint("tcp://127.0.0.1:65437");
boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor(io_service_,
endpoint);
out_.connect(endpoint);
acceptor.accept(in_);
#endif
}
-2
View File
@@ -14,8 +14,6 @@
#pragma once
#include <unistd.h>
#include <boost/optional.hpp>
#include <functional>
#include <string>
+3 -2
View File
@@ -21,6 +21,7 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include "ray/util/process.h"
#include "ray/util/util.h"
namespace {
@@ -664,13 +665,13 @@ void CoreWorker::RegisterToGcs() {
RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
}
void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
// If the raylet fails, we will be reassigned to init (PID=1).
if (getppid() == 1) {
if (!IsParentProcessAlive()) {
RAY_LOG(ERROR) << "Raylet failed. Shutting down.";
Shutdown();
}
@@ -13,7 +13,7 @@
// limitations under the License.
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_client/service_based_accessor.h"
-1
View File
@@ -14,7 +14,6 @@
#include "ray/gcs/redis_client.h"
#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_context.h"
-2
View File
@@ -14,8 +14,6 @@
#include "ray/gcs/redis_context.h"
#include <unistd.h>
#include <sstream>
#include "ray/stats/stats.h"
-2
View File
@@ -14,8 +14,6 @@
#include "ray/gcs/redis_gcs_client.h"
#include <unistd.h>
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_context.h"
+4
View File
@@ -14,6 +14,10 @@
#include "ray/object_manager/plasma/fling.h"
#ifdef _WIN32
#error "This file does not supposed to be compiled under windows."
#endif
#include <errno.h>
#include <string.h>
@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <unistd.h>
#include <chrono>
#include <iostream>
#include <random>
@@ -14,8 +14,6 @@
#include "ray/object_manager/object_manager.h"
#include <unistd.h>
#include <iostream>
#include <thread>
-2
View File
@@ -14,8 +14,6 @@
#pragma once
#include <unistd.h>
#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
-1
View File
@@ -15,7 +15,6 @@
#include "ray/rpc/grpc_server.h"
#include <grpcpp/impl/service_type.h>
#include <unistd.h>
#include <boost/asio/detail/socket_holder.hpp>
+65
View File
@@ -15,6 +15,11 @@
#include "ray/util/process.h"
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <Windows.h>
#include <Winternl.h>
#include <process.h>
#else
#include <poll.h>
@@ -26,6 +31,7 @@
#endif
#include <algorithm>
#include <atomic>
#include <fstream>
#include <string>
#include <vector>
@@ -439,6 +445,65 @@ void Process::Kill() {
}
}
#ifdef _WIN32
#ifndef STATUS_BUFFER_OVERFLOW
#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L)
#endif
typedef LONG NTSTATUS;
typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle,
ULONG ProcessInformationClass,
PVOID ProcessInformation,
ULONG ProcessInformationLength,
ULONG *ReturnLength);
static std::atomic<NtQueryInformationProcess_t *> NtQueryInformationProcess_ =
ATOMIC_VAR_INIT(NULL);
pid_t GetParentPID() {
NtQueryInformationProcess_t *NtQueryInformationProcess = NtQueryInformationProcess_;
if (!NtQueryInformationProcess) {
NtQueryInformationProcess = reinterpret_cast<NtQueryInformationProcess_t *>(
GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")),
_CRT_STRINGIZE(NtQueryInformationProcess)));
NtQueryInformationProcess_ = NtQueryInformationProcess;
}
DWORD ppid = 0;
PROCESS_BASIC_INFORMATION info;
ULONG cb = sizeof(info);
NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb);
if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) {
ppid = static_cast<DWORD>(reinterpret_cast<uintptr_t>(info.Reserved3));
}
pid_t result = 0;
if (ppid > 0) {
// For now, assume PPID = 1 (simulating the reassignment to "init" on Linux)
result = 1;
if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) {
long long me_created, parent_created;
FILETIME unused;
if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast<FILETIME *>(&me_created),
&unused, &unused, &unused) &&
GetProcessTimes(parent, reinterpret_cast<FILETIME *>(&parent_created), &unused,
&unused, &unused)) {
if (me_created >= parent_created) {
// We verified the child is younger than the parent, so we know the parent
// is still alive.
// (Note that the parent can still die by the time this function returns,
// but that race condition exists on POSIX too, which we're emulating here.)
result = static_cast<pid_t>(ppid);
}
}
CloseHandle(parent);
}
}
return result;
}
#else
pid_t GetParentPID() { return getppid(); }
#endif // #ifdef _WIN32
bool IsParentProcessAlive() { return GetParentPID() != 1; }
} // namespace ray
namespace std {
+6
View File
@@ -83,6 +83,12 @@ class Process {
int Wait() const;
};
// Get the Process ID of the parent process. If the parent process exits, the PID
// will be 1 (this simulates POSIX getppid()).
pid_t GetParentPID();
bool IsParentProcessAlive();
} // namespace ray
// We only define operators required by the standard library (==, hash):
-12
View File
@@ -1,12 +0,0 @@
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, SOCKET sv[2]) {
if ((domain != AF_UNIX && domain != AF_INET) || type != SOCK_STREAM) {
return -1;
}
SOCKET sockets[2];
int r = dumb_socketpair(sockets);
sv[0] = sockets[0];
sv[1] = sockets[1];
return r;
}
-7
View File
@@ -1,7 +0,0 @@
#ifndef POLL_H
#define POLL_H
typedef unsigned long int nfds_t;
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
#endif /* POLL_H */
-141
View File
@@ -1,141 +0,0 @@
/* socketpair.c
Copyright 2007, 2010 by Nathan C. Myers <ncm@cantrip.org>
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
The name of the author must not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* Changes:
* 2014-02-12: merge David Woodhouse, Ger Hobbelt improvements
* git.infradead.org/users/dwmw2/openconnect.git/commitdiff/bdeefa54
* github.com/GerHobbelt/selectable-socketpair
* always init the socks[] to -1/INVALID_SOCKET on error, both on Win32/64
* and UNIX/other platforms
* 2013-07-18: Change to BSD 3-clause license
* 2010-03-31:
* set addr to 127.0.0.1 because win32 getsockname does not always set it.
* 2010-02-25:
* set SO_REUSEADDR option to avoid leaking some windows resource.
* Windows System Error 10049, "Event ID 4226 TCP/IP has reached
* the security limit imposed on the number of concurrent TCP connect
* attempts." Bleah.
* 2007-04-25:
* preserve value of WSAGetLastError() on all error returns.
* 2007-04-22: (Thanks to Matthew Gregan <kinetik@flim.org>)
* s/EINVAL/WSAEINVAL/ fix trivial compile failure
* s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout
* of a child process.
* add argument make_overlapped
*/
#include <string.h>
#ifdef _WIN32
#ifndef _WINSOCKAPI_
#include <WinSock2.h>
#endif
#include <Windows.h>
#include <io.h>
#include <ws2tcpip.h> /* socklen_t, et al (MSVC20xx) */
#else
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#endif
#ifdef WIN32
/* dumb_socketpair:
* If make_overlapped is nonzero, both sockets created will be usable for
* "overlapped" operations via WSASend etc. If make_overlapped is zero,
* socks[0] (only) will be usable with regular ReadFile etc., and thus
* suitable for use as stdin or stdout of a child process. Note that the
* sockets must be closed with closesocket() regardless.
*/
int dumb_socketpair(SOCKET socks[2]) {
union {
struct sockaddr_in inaddr;
struct sockaddr addr;
} a;
SOCKET listener;
int e;
socklen_t addrlen = sizeof(a.inaddr);
int reuse = 1;
if (socks == 0) {
return SOCKET_ERROR;
}
socks[0] = socks[1] = -1;
listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listener == -1) return SOCKET_ERROR;
memset(&a, 0, sizeof(a));
a.inaddr.sin_family = AF_INET;
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
a.inaddr.sin_port = 0;
for (;;) {
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse,
(socklen_t)sizeof(reuse)) == -1)
break;
if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break;
memset(&a, 0, sizeof(a));
if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) break;
// win32 getsockname may only set the port number, p=0.0005.
// ( http://msdn.microsoft.com/library/ms738543.aspx ):
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
a.inaddr.sin_family = AF_INET;
if (listen(listener, 1) == SOCKET_ERROR) break;
socks[0] = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socks[0] == -1) break;
if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break;
socks[1] = accept(listener, NULL, NULL);
if (socks[1] == -1) break;
closesocket(listener);
return 0;
}
closesocket(listener);
closesocket(socks[0]);
closesocket(socks[1]);
socks[0] = socks[1] = -1;
return SOCKET_ERROR;
}
#else
int dumb_socketpair(int socks[2], int dummy) {
if (socks == 0) {
errno = EINVAL;
return -1;
}
dummy = socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
if (dummy) socks[0] = socks[1] = -1;
return dummy;
}
#endif
-10
View File
@@ -1,10 +0,0 @@
#ifndef SOCKET_H
#define SOCKET_H
#include <Winsock2.h>
#include <unistd.h> // ssize_t
int dumb_socketpair(SOCKET socks[2]);
int socketpair(int domain, int type, int protocol, SOCKET sv[2]);
#endif /* SOCKET_H */
-64
View File
@@ -1,64 +0,0 @@
#include <unistd.h>
#include <atomic>
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <Windows.h>
#include <Winternl.h>
#ifndef STATUS_BUFFER_OVERFLOW
#define STATUS_BUFFER_OVERFLOW ((NTSTATUS)0x80000005L)
#endif
typedef LONG NTSTATUS;
typedef NTSTATUS WINAPI NtQueryInformationProcess_t(HANDLE ProcessHandle,
ULONG ProcessInformationClass,
PVOID ProcessInformation,
ULONG ProcessInformationLength,
ULONG *ReturnLength);
static std::atomic<NtQueryInformationProcess_t *> NtQueryInformationProcess_ =
ATOMIC_VAR_INIT(NULL);
typedef int pid_t;
int getppid() {
NtQueryInformationProcess_t *NtQueryInformationProcess = ::NtQueryInformationProcess_;
if (!NtQueryInformationProcess) {
NtQueryInformationProcess = reinterpret_cast<NtQueryInformationProcess_t *>(
GetProcAddress(GetModuleHandle(TEXT("ntdll.dll")),
_CRT_STRINGIZE(NtQueryInformationProcess)));
::NtQueryInformationProcess_ = NtQueryInformationProcess;
}
DWORD ppid = 0;
PROCESS_BASIC_INFORMATION info;
ULONG cb = sizeof(info);
NTSTATUS status = NtQueryInformationProcess(GetCurrentProcess(), 0, &info, cb, &cb);
if ((status >= 0 || status == STATUS_BUFFER_OVERFLOW) && cb >= sizeof(info)) {
ppid = static_cast<DWORD>(reinterpret_cast<uintptr_t>(info.Reserved3));
}
pid_t result = 0;
if (ppid > 0) {
// For now, assume PPID = 1 (simulating the reassignment to "init" on Linux)
result = 1;
if (HANDLE parent = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, ppid)) {
long long me_created, parent_created;
FILETIME unused;
if (GetProcessTimes(GetCurrentProcess(), reinterpret_cast<FILETIME *>(&me_created),
&unused, &unused, &unused) &&
GetProcessTimes(parent, reinterpret_cast<FILETIME *>(&parent_created), &unused,
&unused, &unused)) {
if (me_created >= parent_created) {
// We verified the child is younger than the parent, so we know the parent
// is still alive.
// (Note that the parent can still die by the time this function returns,
// but that race condition exists on POSIX too, which we're emulating here.)
result = static_cast<pid_t>(ppid);
}
}
CloseHandle(parent);
}
}
return result;
}
-12
View File
@@ -1,12 +0,0 @@
#ifndef UNISTD_H
#define UNISTD_H
#ifdef _WIN64
typedef long long ssize_t;
#else
typedef int ssize_t;
#endif
int getppid();
#endif /* UNISTD_H */
-538
View File
@@ -1,538 +0,0 @@
#undef WIN32_REPLACE_FD_APIS
#define WIN32_REPLACE_FD_APIS 0 // make sure we don't replace the APIs for our own calls
#include "win32fd.h"
#include <errno.h>
#include <fcntl.h>
#include <io.h>
#ifndef _WINSOCKAPI_
#include <WinSock2.h>
#endif
#ifndef __cplusplus
extern "C" {
#endif
enum { fh_prefer_fallback = 1 };
static const char fh_magic_prefix[] = "Win32FD";
static const size_t fh_magic_size = sizeof(fh_magic_prefix) + sizeof(void *);
static void fh_get_magic(unsigned char magic[fh_magic_size]) {
memcpy(magic, fh_magic_prefix, sizeof(fh_magic_prefix));
const void *const suffix = GetModuleHandle(NULL);
memcpy(magic + sizeof(fh_magic_prefix), &suffix, sizeof(suffix));
}
struct fh_payload_t {
intptr_t handle;
unsigned char magic[fh_magic_size];
};
static int _fh_close(intptr_t handle) {
int result = closesocket(handle);
if (result != 0) {
int error = WSAGetLastError();
if (error == WSANOTINITIALISED || error == WSAENOTSOCK) {
if (CloseHandle(reinterpret_cast<HANDLE>(handle))) {
result = 0;
} else {
_set_errno(EBADF);
}
} else {
_set_errno(EINVAL);
}
}
return result;
}
static int _fh_is_socket(intptr_t handle) {
int result = 1;
SOCKADDR_STORAGE addr;
int addrlen = sizeof(addr);
if (getsockname(handle, reinterpret_cast<sockaddr *>(&addr), &addrlen) != 0) {
int error = WSAGetLastError();
if (error == WSANOTINITIALISED || error == WSAENOTSOCK) {
result = 0;
}
}
return result;
}
int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen) {
int result = -1;
intptr_t handle = fh_get(sockpfd);
intptr_t accepted = WSAAccept(handle, name, namelen, NULL, NULL);
if (accepted != -1) {
result = fh_open(accepted, -1);
if (result == -1) {
closesocket(accepted);
accepted = -1;
}
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen) {
intptr_t handle = fh_get(sockpfd);
int result = bind(handle, name, namelen);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_close(int pfd) {
intptr_t handle = fh_get(pfd);
int result = _close(pfd);
if (result == 0 && handle != -1) {
result = _fh_close(handle);
}
return result;
}
int fh_connect(int sockpfd, const struct sockaddr *name, socklen_t namelen) {
intptr_t handle = fh_get(sockpfd);
int result = WSAConnect(handle, name, namelen, NULL, NULL, NULL, NULL);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
int error = WSAGetLastError();
switch (error) {
case WSAEINVAL:
case WSAEWOULDBLOCK:
case WSA_IO_PENDING:
error = WSAEINPROGRESS; // for Redis
break;
}
_set_errno(error);
}
return result;
}
int fh_dup(int pfd) {
int result = -1;
intptr_t handle = fh_get(pfd);
if (fh_prefer_fallback && handle == -1) {
// Not our FD; fall back to default
result = _dup(pfd);
} else {
intptr_t duped = -1;
WSAPROTOCOL_INFO pi;
if (WSADuplicateSocket(handle, GetCurrentProcessId(), &pi) == 0) {
duped = WSASocket(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, 0);
if (duped == -1) {
_set_errno(EINVAL);
}
} else {
int error = WSAGetLastError();
if (error == WSANOTINITIALISED || error == WSAENOTSOCK) {
if (!DuplicateHandle(GetCurrentProcess(), reinterpret_cast<HANDLE>(handle),
GetCurrentProcess(), reinterpret_cast<HANDLE *>(&duped), 0,
FALSE, DUPLICATE_SAME_ACCESS)) {
duped = -1;
_set_errno(EBADF);
}
} else {
_set_errno(EBADF);
}
}
if (duped != -1) {
result = fh_open(duped, -1);
if (result == -1) {
_fh_close(duped);
}
}
}
return result;
}
intptr_t fh_get(int pfd) {
fh_payload_t payload = {-1};
HANDLE pipe = reinterpret_cast<HANDLE>(_get_osfhandle(pfd));
DWORD size = sizeof(payload), nbytes, avail, left;
if (pipe != INVALID_HANDLE_VALUE) {
if (GetFileType(pipe) != FILE_TYPE_PIPE) {
_set_errno(EBADF); // if this triggers, you passed an invalid or incompatible FD
} else if (PeekNamedPipe(pipe, &payload, size, &nbytes, &avail, &left)) {
if (avail != size) {
payload.handle = -1;
_set_errno(EIO); // if this triggers, you accidentally wrote to the FD directly
} else {
unsigned char expected[fh_magic_size];
fh_get_magic(expected);
if (memcmp(payload.magic, expected, sizeof(expected)) != 0) {
payload.handle = -1;
_set_errno(EBADF); // if this triggers, this isn't a recognized FD
}
}
} else {
_set_errno(EIO); // if this triggers, you accidentally read from the FD directly
}
} else {
_set_errno(EBADF);
}
return payload.handle;
}
int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen) {
intptr_t handle = fh_get(sockpfd);
int result = getpeername(handle, name, namelen);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen) {
intptr_t handle = fh_get(sockpfd);
int result = getsockname(handle, name, namelen);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len) {
intptr_t handle = fh_get(sockpfd);
int result = getsockopt(handle, level, name, static_cast<char *>(val), len);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp) {
intptr_t handle = fh_get(sockpfd);
int result = ioctlsocket(handle, cmd, argp);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_listen(int sockpfd, int backlog) {
intptr_t handle = fh_get(sockpfd);
int result = listen(handle, backlog);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_open(intptr_t handle, int mode) {
int pfd = -1;
HANDLE r, w;
if (mode == -1) {
mode = _O_RDWR;
}
if (!(mode & _O_TEXT)) {
mode |= _O_BINARY;
}
if (handle == -1 || handle == 0) {
_set_errno(EBADF);
} else if (fh_prefer_fallback &&
GetFileType(reinterpret_cast<HANDLE>(handle)) != FILE_TYPE_UNKNOWN &&
!_fh_is_socket(handle)) {
// Already compatible with CRT FDs, so just fall back
pfd = _open_osfhandle(handle, mode);
} else {
fh_payload_t payload = {handle};
fh_get_magic(payload.magic);
DWORD size = sizeof(payload), nbytes;
if (CreatePipe(&r, &w, NULL, size)) {
if (WriteFile(w, &payload, size, &nbytes, NULL) && size == nbytes) {
pfd = _open_osfhandle(reinterpret_cast<intptr_t>(r), mode);
if (pfd == -1) {
CloseHandle(r);
}
} else {
_set_errno(EIO);
}
CloseHandle(w);
} else {
_set_errno(EMFILE);
}
}
return pfd;
}
int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
struct fd_sets_t {
fd_set rfds, wfds, efds;
} fdsets;
int maxfd = -1;
struct fd_sets_t *pfdsets =
nfds <= FD_SETSIZE ? &fdsets
: static_cast<struct fd_sets_t *>(operator new(
(offsetof(fd_set, fd_array) + nfds * sizeof(SOCKET)) *
(sizeof(struct fd_sets_t) / sizeof(fd_set))));
FD_ZERO(&pfdsets->rfds);
FD_ZERO(&pfdsets->wfds);
FD_ZERO(&pfdsets->efds);
for (nfds_t i = 0; i < nfds; ++i) {
if (fds[i].events & POLLIN) {
FD_SET(fds[i].fd, &pfdsets->rfds);
}
if (fds[i].events & POLLOUT) {
FD_SET(fds[i].fd, &pfdsets->wfds);
}
if (fds[i].events & POLLERR) {
FD_SET(fds[i].fd, &pfdsets->efds);
}
if (maxfd < fds[i].fd) {
maxfd = static_cast<int>(fds[i].fd);
}
}
int msec = timeout;
struct timeval tv = {msec >= 0 ? msec / 1000 : 0, msec >= 0 ? (msec % 1000) * 1000 : 0};
int result = fh_select(maxfd + 1, &pfdsets->rfds, &pfdsets->wfds, &pfdsets->efds,
msec >= 0 ? &tv : NULL);
if (result >= 0) {
result = 0;
for (nfds_t i = 0; i < nfds; ++i) {
fds[i].revents = 0;
if (FD_ISSET(fds[i].fd, &pfdsets->rfds)) {
fds[i].revents |= POLLIN;
}
if (FD_ISSET(fds[i].fd, &pfdsets->wfds)) {
fds[i].revents |= POLLOUT;
}
if (FD_ISSET(fds[i].fd, &pfdsets->efds)) {
fds[i].revents |= POLLERR;
}
if (fds[i].revents) {
++result;
}
}
}
if (pfdsets != &fdsets) {
operator delete(pfdsets);
}
return result;
}
ssize_t fh_read(int pfd, void *buffer, size_t size) {
ssize_t result = -1;
intptr_t handle = fh_get(pfd);
if (size >= INT_MAX) {
size = INT_MAX;
}
if (fh_prefer_fallback && handle == -1) {
// Not our FD; fall back to default
result = _read(pfd, buffer, static_cast<unsigned int>(size));
} else {
WSABUF buf = {static_cast<unsigned long>(size), static_cast<char *>(buffer)};
DWORD nbytes;
DWORD flags = 0;
if (WSARecv(handle, &buf, 1, &nbytes, &flags, NULL, NULL) == 0) {
result = static_cast<ssize_t>(nbytes);
} else {
int error = WSAGetLastError();
if (error == WSANOTINITIALISED || error == WSAENOTSOCK) {
if (ReadFile(reinterpret_cast<HANDLE>(handle), buffer, static_cast<DWORD>(size),
&nbytes, NULL)) {
result = static_cast<ssize_t>(nbytes);
} else {
_set_errno(EINVAL);
}
} else {
if (error == WSAEWOULDBLOCK) {
error = EAGAIN; // for Redis
}
_set_errno(error);
}
}
}
return result;
}
ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags) {
ssize_t result = -1;
if (size >= INT_MAX) {
size = INT_MAX;
}
intptr_t handle = fh_get(sockpfd);
WSABUF buf = {static_cast<unsigned long>(size), static_cast<char *>(buffer)};
DWORD nbytes;
DWORD dwflags = static_cast<DWORD>(flags);
if (WSARecv(handle, &buf, 1, &nbytes, &dwflags, NULL, NULL) == 0) {
result = static_cast<ssize_t>(nbytes);
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
intptr_t fh_release(int pfd) {
intptr_t handle = fh_get(pfd);
if (handle != -1) {
if (_close(pfd) != 0) {
handle = -1;
}
}
return handle;
}
int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds,
struct timeval *timeout) {
int result = 0;
fd_set rfds, wfds, efds;
struct {
fd_set *src, *dst;
} translations[] = {
{readfds, &rfds},
{writefds, &wfds},
{errorfds, &efds},
};
for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) {
fd_set *src = translations[i].src, *dst = translations[i].dst;
if (src) {
for (size_t j = 0; j < src->fd_count; ++j) {
int sockpfd = static_cast<int>(src->fd_array[j]);
intptr_t handle = fh_get(sockpfd);
if (handle == -1) {
result = -1;
_set_errno(EBADF);
}
dst->fd_array[j] = handle;
}
dst->fd_count = src->fd_count;
}
}
if (result != -1) {
result = select(nfds, readfds ? &rfds : NULL, writefds ? &wfds : NULL,
errorfds ? &efds : NULL, timeout);
if (result != -1) {
_set_errno(WSAGetLastError());
}
}
if (result >= 0) {
for (size_t i = 0; i < sizeof(translations) / sizeof(*translations); ++i) {
fd_set *src = translations[i].src, *dst = translations[i].dst;
if (src) {
fd_set out;
FD_ZERO(&out);
for (size_t j = 0; j < src->fd_count; ++j) {
int sockpfd = static_cast<int>(src->fd_array[j]);
intptr_t handle = fh_get(sockpfd);
if (FD_ISSET(handle, dst)) {
FD_SET(sockpfd, &out);
}
}
memcpy(src->fd_array, out.fd_array, out.fd_count * sizeof(*out.fd_array));
src->fd_count = out.fd_count;
}
}
}
return result;
}
ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags) {
ssize_t result = -1;
if (size >= INT_MAX) {
size = INT_MAX;
}
intptr_t handle = fh_get(sockpfd);
WSABUF buf = {static_cast<unsigned long>(size),
static_cast<char *>(const_cast<void *>(buffer))};
DWORD nbytes;
if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) {
result = static_cast<ssize_t>(nbytes);
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len) {
intptr_t handle = fh_get(sockpfd);
int result = setsockopt(handle, level, name, static_cast<const char *>(val), len);
if (result == 0) {
} else if (handle == -1) {
_set_errno(EBADF);
} else {
_set_errno(WSAGetLastError());
}
return result;
}
int fh_socket(int domain, int type, int protocol) {
int result = -1;
DWORD flags = WSA_FLAG_OVERLAPPED; // because socket() is overlapped by default
intptr_t handle = WSASocket(domain, type, protocol, NULL, 0, flags);
if (handle != -1) {
result = fh_open(handle, -1);
if (result == -1) {
closesocket(handle);
handle = -1;
}
} else {
_set_errno(WSAGetLastError());
}
return result;
}
ssize_t fh_write(int pfd, const void *buffer, size_t size) {
ssize_t result = -1;
ssize_t handle = fh_get(pfd);
if (size >= INT_MAX) {
size = INT_MAX;
}
if (fh_prefer_fallback && handle == -1) {
// Not our FD; fall back to default
result = _write(pfd, buffer, static_cast<unsigned int>(size));
} else {
WSABUF buf = {static_cast<unsigned long>(size),
static_cast<char *>(const_cast<void *>(buffer))};
DWORD nbytes;
DWORD flags = 0;
if (WSASend(handle, &buf, 1, &nbytes, flags, NULL, NULL) == 0) {
result = static_cast<ssize_t>(nbytes);
} else {
int error = WSAGetLastError();
if (error == WSANOTINITIALISED || error == WSAENOTSOCK) {
if (WriteFile(reinterpret_cast<HANDLE>(handle), buffer, static_cast<DWORD>(size),
&nbytes, NULL)) {
result = static_cast<ssize_t>(nbytes);
} else {
_set_errno(EINVAL);
}
} else {
_set_errno(WSAGetLastError());
}
}
}
return result;
}
#ifndef __cplusplus
}
#endif
-100
View File
@@ -1,100 +0,0 @@
#ifndef WIN32_FD_H
#define WIN32_FD_H
#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS
#ifndef _CRT_DECLARE_NONSTDC_NAMES
#define _CRT_DECLARE_NONSTDC_NAMES 0
#endif
#endif
#include <limits.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif
#ifdef _WIN32
#include <errno.h>
#undef ECONNRESET
#undef EINPROGRESS
#undef ETIMEDOUT
#include <WS2tcpip.h>
#include <io.h>
enum {
ECONNRESET = WSAECONNRESET,
EINPROGRESS = WSAEINPROGRESS,
ETIMEDOUT = WSAETIMEDOUT,
};
typedef ptrdiff_t ssize_t;
typedef unsigned long int nfds_t;
#endif
int fh_accept(int sockpfd, struct sockaddr *name, socklen_t *namelen);
int fh_bind(int sockpfd, const struct sockaddr *name, socklen_t namelen);
int fh_close(int pfd);
int fh_connect(int socket, const struct sockaddr *name, socklen_t namelen);
int fh_dup(int pfd);
/// Retrieves the underlying file handle for the given pseudo-file descriptor.
intptr_t fh_get(int pfd);
int fh_getpeername(int sockpfd, struct sockaddr *name, socklen_t *namelen);
int fh_getsockname(int sockpfd, struct sockaddr *name, socklen_t *namelen);
int fh_getsockopt(int sockpfd, int level, int name, void *val, socklen_t *len);
int fh_ioctlsocket(int sockpfd, long cmd, unsigned long *argp);
int fh_listen(int sockpfd, int backlog);
/// Opens a pseudo-file descriptor around the given file handle. Sockets are supported.
/// However, this is only intended for storing a HANDLE as an integer.
/// The file descriptor must not be used with external file I/O functions.
int fh_open(intptr_t handle, int mode);
int fh_poll(struct pollfd *fds, nfds_t nfds, int timeout);
ssize_t fh_read(int pfd, void *buffer, size_t size);
/// Releases the underlying file handle for the given pseudo-file descriptor and then
/// closes the pseudo-file descriptor.
intptr_t fh_release(int pfd);
ssize_t fh_recv(int sockpfd, void *buffer, size_t size, int flags);
int fh_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds,
struct timeval *timeout);
ssize_t fh_send(int sockpfd, const void *buffer, size_t size, int flags);
int fh_setsockopt(int sockpfd, int level, int name, const void *val, socklen_t len);
int fh_socket(int domain, int type, int protocol);
ssize_t fh_write(int pfd, const void *buffer, size_t size);
#ifdef __cplusplus
}
#endif
#if defined(WIN32_REPLACE_FD_APIS) && WIN32_REPLACE_FD_APIS
#define accept fh_accept
#define bind fh_bind
static int close(int pfd) { return fh_close(pfd); }
#define connect fh_connect
static int dup(int pfd) { return fh_dup(pfd); }
#define getsockopt fh_getsockopt
#define ioctlsocket fh_ioctlsocket
#define listen fh_listen
static int open(intptr_t handle, int mode) { return fh_open(handle, mode); }
#define poll fh_poll
static ssize_t read(int pfd, void *buffer, size_t size) {
return fh_read(pfd, buffer, size);
}
#define recv fh_recv
#define recvfrom fh_recvfrom
#define select fh_select
#define socket fh_socket
#define send fh_send
#define sendto fh_sendto
#define setsockopt fh_setsockopt
static ssize_t write(int pfd, const void *buffer, size_t size) {
return fh_write(pfd, buffer, size);
}
#endif
#endif
-2
View File
@@ -1,7 +1,5 @@
#include "config/streaming_config.h"
#include <unistd.h>
#include "util/streaming_logging.h"
namespace ray {
-3
View File
@@ -1,8 +1,5 @@
#include "data_writer.h"
#include <signal.h>
#include <unistd.h>
#include <chrono>
#include <functional>
#include <list>
+3 -4
View File
@@ -1,11 +1,10 @@
#include "gtest/gtest.h"
#include "ray/util/logging.h"
#include <unistd.h>
#include <iostream>
#include <set>
#include <thread>
#include "gtest/gtest.h"
#include "message/message.h"
#include "ray/util/logging.h"
#include "ring_buffer.h"
using namespace ray;
+4 -7
View File
@@ -1,16 +1,13 @@
#define BOOST_BIND_NO_PLACEHOLDERS
#include <unistd.h>
#include "data_reader.h"
#include "data_writer.h"
#include "gtest/gtest.h"
#include "message/message.h"
#include "message/message_bundle.h"
#include "queue/queue_client.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/core_worker.h"
#include "data_reader.h"
#include "data_writer.h"
#include "message/message.h"
#include "message/message_bundle.h"
#include "ring_buffer.h"
#include "test/queue_tests_base.h"
using namespace std::placeholders;
-8
View File
@@ -1,8 +0,0 @@
diff --git cpp/src/arrow/io/mman.h cpp/src/arrow/io/mman.h
--- cpp/src/arrow/io/mman.h
+++ cpp/src/arrow/io/mman.h
@@ -3,0 +3,3 @@
+#ifdef _WIN32
+typedef long long off_t;
+#endif
--