Get more tests running on Windows (#6537)

* Get rid of system() calls

* Work around '/usr/share/mini' showing up on GitHub Actions (probably due to psutil truncation)

https://github.com/ray-project/ray/runs/722480047?check_suite_focus=true

* Don't check for socket max path length on Windows

* Don't check for socket existence on Windows

* Fix race condition in Windows fate-sharing

* Work around missing .exe extension for Redis tests

* Add more tests to GitHub Actions

Co-authored-by: Mehrdad <noreply@github.com>
This commit is contained in:
mehrdadn
2020-06-12 21:32:10 -07:00
committed by GitHub
parent 34bae27ac7
commit 101c215125
18 changed files with 424 additions and 182 deletions
+1
View File
@@ -86,6 +86,7 @@ jobs:
. ./ci/travis/ci.sh build
. ./ci/travis/ci.sh upload_wheels || true
. ./ci/travis/ci.sh test_python
#. ./ci/travis/ci.sh test_core || true
. ./ci/travis/ci.sh test_wheels
- name: Run Clang Include-What-You-Use
continue-on-error: true
+10
View File
@@ -876,6 +876,16 @@ cc_test(
],
)
cc_test(
name = "filesystem_test",
srcs = ["src/ray/util/filesystem_test.cc"],
copts = COPTS,
deps = [
":ray_util",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "util_test",
srcs = ["src/ray/util/util_test.cc"],
+39 -7
View File
@@ -106,16 +106,48 @@ upload_wheels() {
fi
}
test_core() {
bazel test --config=ci --build_tests_only -- //:all -rllib/...
}
test_python() {
if [ "${OSTYPE}" = msys ]; then
# Increased timeout from default of timeout=300 due to test_basic
bazel test -k --config=ci --test_timeout=600 --build_tests_only -- \
python/ray/tests:test_actor \
python/ray/tests:test_basic \
python/ray/tests:test_debug_tools \
python/ray/tests:test_mini \
;
local args=(python/ray/tests/...)
args+=(
-python/ray/tests:test_actor_advanced
-python/ray/tests:test_actor_failures
-python/ray/tests:test_advanced_2
-python/ray/tests:test_advanced_3
-python/ray/tests:test_array
-python/ray/tests:test_asyncio
-python/ray/tests:test_autoscaler_aws
-python/ray/tests:test_autoscaler_yaml
-python/ray/tests:test_cancel
-python/ray/tests:test_component_failures
-python/ray/tests:test_cython
-python/ray/tests:test_dynres
-python/ray/tests:test_failure
-python/ray/tests:test_global_gc
-python/ray/tests:test_global_state
-python/ray/tests:test_iter
-python/ray/tests:test_memory_scheduling
-python/ray/tests:test_memstat
-python/ray/tests:test_metrics
-python/ray/tests:test_multi_node
-python/ray/tests:test_multi_node_2
-python/ray/tests:test_multinode_failures_2
-python/ray/tests:test_multiprocessing
-python/ray/tests:test_node_manager
-python/ray/tests:test_object_manager
-python/ray/tests:test_projects
-python/ray/tests:test_queue
-python/ray/tests:test_ray_init
-python/ray/tests:test_reconstruction
-python/ray/tests:test_stress
-python/ray/tests:test_stress_sharded
-python/ray/tests:test_webui
)
bazel test -k --config=ci --test_timeout=600 --build_tests_only -- "${args[@]}";
fi
}
+23 -7
View File
@@ -99,14 +99,16 @@ class ConsolePopen(subprocess.Popen):
# CREATE_NEW_PROCESS_GROUP is used to send Ctrl+C on Windows:
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
new_pgroup = subprocess.CREATE_NEW_PROCESS_GROUP
flags = 0
flags_to_add = 0
if ray.utils.detect_fate_sharing_support():
# If we don't have kernel-mode fate-sharing, then don't do this
# because our children need to be in out process group for
# the process reaper to properly terminate them.
flags = new_pgroup
kwargs.setdefault("creationflags", flags)
self._use_signals = (kwargs["creationflags"] & new_pgroup)
flags_to_add = new_pgroup
flags_key = "creationflags"
if flags_to_add:
kwargs[flags_key] = (kwargs.get(flags_key) or 0) | flags_to_add
self._use_signals = (kwargs[flags_key] & new_pgroup)
super(ConsolePopen, self).__init__(*args, **kwargs)
@@ -503,6 +505,14 @@ def start_ray_process(command,
if fate_share and sys.platform.startswith("linux"):
ray.utils.set_kill_on_parent_death_linux()
win32_fate_sharing = fate_share and sys.platform == "win32"
# With Windows fate-sharing, we need special care:
# The process must be added to the job before it is allowed to execute.
# Otherwise, there's a race condition: the process might spawn children
# before the process itself is assigned to the job.
# After that point, its children will not be added to the job anymore.
CREATE_SUSPENDED = 0x00000004 # from Windows headers
process = ConsolePopen(
command,
env=modified_env,
@@ -510,10 +520,16 @@ def start_ray_process(command,
stdout=stdout_file,
stderr=stderr_file,
stdin=subprocess.PIPE if pipe_stdin else None,
preexec_fn=preexec_fn if sys.platform != "win32" else None)
preexec_fn=preexec_fn if sys.platform != "win32" else None,
creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0)
if fate_share and sys.platform == "win32":
ray.utils.set_kill_child_on_death_win32(process)
if win32_fate_sharing:
try:
ray.utils.set_kill_child_on_death_win32(process)
psutil.Process(process.pid).resume()
except (psutil.Error, OSError):
process.kill()
raise
return ProcessInfo(
process=process,
+3 -3
View File
@@ -113,9 +113,9 @@ def test_worker_stats(shutdown_only):
]
for process in processes:
# TODO(ekl) why does travis/mi end up in the process list
assert ("python" in process or "conda" in process
or "travis" in process or "runner" in process
or "ray" in process)
assert ("python" in process or "mini" in process
or "conda" in process or "travis" in process
or "runner" in process or "ray" in process)
break
# Test kill_actor.
+45 -43
View File
@@ -1,13 +1,30 @@
import os
import shutil
import subprocess
import sys
import time
import pytest
import ray
import ray.ray_constants as ray_constants
import subprocess
from ray.cluster_utils import Cluster
def unix_socket_create_path(name):
unix = sys.platform != "win32"
return os.path.join(ray.utils.get_user_temp_dir(), name) if unix else None
def unix_socket_verify(unix_socket):
if sys.platform != "win32":
assert os.path.exists(unix_socket), "Socket not found: " + unix_socket
def unix_socket_delete(unix_socket):
unix = sys.platform != "win32"
return os.remove(unix_socket) if unix else None
def test_conn_cluster():
# plasma_store_socket_name
with pytest.raises(Exception) as exc_info:
@@ -73,72 +90,58 @@ def test_tempdir_commandline():
def test_tempdir_long_path():
temp_dir = os.path.join(ray.utils.get_user_temp_dir(), "z" * 108)
with pytest.raises(OSError):
ray.init(temp_dir=temp_dir) # path should be too long
if sys.platform != "win32":
# Test AF_UNIX limits for sockaddr_un->sun_path on POSIX OSes
maxlen = 104 if sys.platform.startswith("darwin") else 108
temp_dir = os.path.join(ray.utils.get_user_temp_dir(), "z" * maxlen)
with pytest.raises(OSError):
ray.init(temp_dir=temp_dir) # path should be too long
def test_raylet_socket_name(shutdown_only):
ray.init(
raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket"))
assert os.path.exists(
os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket")), "Specified socket path not found."
sock1 = unix_socket_create_path("i_am_a_temp_socket_1")
ray.init(raylet_socket_name=sock1)
unix_socket_verify(sock1)
ray.shutdown()
try:
os.remove(
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket"))
unix_socket_delete(sock1)
except OSError:
pass # It could have been removed by Ray.
cluster = Cluster(True)
cluster.add_node(
raylet_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2"))
assert os.path.exists(
os.path.join(
ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2")), "Specified socket path not found."
sock2 = unix_socket_create_path("i_am_a_temp_socket_2")
cluster.add_node(raylet_socket_name=sock2)
unix_socket_verify(sock2)
cluster.shutdown()
try:
os.remove(
os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2"))
unix_socket_delete(sock2)
except OSError:
pass # It could have been removed by Ray.
def test_temp_plasma_store_socket(shutdown_only):
ray.init(
plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket"))
assert os.path.exists(
os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket")), "Specified socket path not found."
sock1 = unix_socket_create_path("i_am_a_temp_socket_1")
ray.init(plasma_store_socket_name=sock1)
unix_socket_verify(sock1)
ray.shutdown()
try:
os.remove(
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_socket"))
unix_socket_delete(sock1)
except OSError:
pass # It could have been removed by Ray.
cluster = Cluster(True)
cluster.add_node(
plasma_store_socket_name=os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2"))
assert os.path.exists(
os.path.join(
ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2")), "Specified socket path not found."
sock2 = unix_socket_create_path("i_am_a_temp_socket_2")
cluster.add_node(plasma_store_socket_name=sock2)
unix_socket_verify(sock2)
cluster.shutdown()
try:
os.remove(
os.path.join(ray.utils.get_user_temp_dir(),
"i_am_a_temp_socket_2"))
unix_socket_delete(sock2)
except OSError:
pass # It could have been removed by Ray.
def test_raylet_tempfiles(shutdown_only):
expected_socket_files = ({"plasma_store", "raylet"}
if sys.platform != "win32" else set())
ray.init(num_cpus=0)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_session_dir_path()))
@@ -159,7 +162,7 @@ def test_raylet_tempfiles(shutdown_only):
assert log_files.issuperset(log_files_expected)
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
assert socket_files == expected_socket_files
ray.shutdown()
ray.init(num_cpus=2)
@@ -176,7 +179,7 @@ def test_raylet_tempfiles(shutdown_only):
1 for filename in log_files if filename.startswith("worker")) == 4
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
assert socket_files == expected_socket_files
def test_tempdir_privilege(shutdown_only):
@@ -196,7 +199,6 @@ def test_session_dir_uniqueness():
if __name__ == "__main__":
import sys
# Make subprocess happy in bazel.
os.environ["LC_ALL"] = "en_US.UTF-8"
os.environ["LANG"] = "en_US.UTF-8"
+65 -64
View File
@@ -14,12 +14,15 @@
#include "ray/common/test_util.h"
#include <fstream>
#include <functional>
#include "ray/common/buffer.h"
#include "ray/common/ray_object.h"
#include "ray/util/filesystem.h"
#include "ray/util/logging.h"
#include "ray/util/process.h"
#include "ray/util/util.h"
#include "test_util.h"
namespace ray {
@@ -42,17 +45,14 @@ int TestSetupUtil::StartUpRedisServer(const int &port) {
actual_port = rand() % 5000 + 2000;
}
std::string load_module_command;
std::string program = TEST_REDIS_SERVER_EXEC_PATH;
std::vector<std::string> cmdargs({program, "--loglevel", "warning"});
if (!TEST_REDIS_MODULE_LIBRARY_PATH.empty()) {
// Fill load module command.
load_module_command = "--loadmodule " + TEST_REDIS_MODULE_LIBRARY_PATH;
cmdargs.insert(cmdargs.end(), {"--loadmodule", TEST_REDIS_MODULE_LIBRARY_PATH});
}
std::string start_redis_command = TEST_REDIS_SERVER_EXEC_PATH + " --loglevel warning " +
load_module_command + " --port " +
std::to_string(actual_port) + " &";
RAY_LOG(INFO) << "Start redis command is: " << start_redis_command;
RAY_CHECK(system(start_redis_command.c_str()) == 0);
cmdargs.insert(cmdargs.end(), {"--port", std::to_string(actual_port)});
RAY_LOG(INFO) << "Start redis command is: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
usleep(200 * 1000);
return actual_port;
}
@@ -65,10 +65,10 @@ void TestSetupUtil::ShutDownRedisServers() {
}
void TestSetupUtil::ShutDownRedisServer(const int &port) {
std::string stop_redis_command =
TEST_REDIS_CLIENT_EXEC_PATH + " -p " + std::to_string(port) + " shutdown";
RAY_LOG(INFO) << "Stop redis command is: " << stop_redis_command;
if (system(stop_redis_command.c_str()) != 0) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "shutdown"});
RAY_LOG(INFO) << "Stop redis command is: " << CreateCommandLine(cmdargs);
if (Process::Call(cmdargs) != std::error_code()) {
RAY_LOG(WARNING) << "Failed to stop redis. The redis process may no longer exist.";
}
usleep(100 * 1000);
@@ -81,10 +81,10 @@ void TestSetupUtil::FlushAllRedisServers() {
}
void TestSetupUtil::FlushRedisServer(const int &port) {
std::string flush_all_redis_command =
TEST_REDIS_CLIENT_EXEC_PATH + " -p " + std::to_string(port) + " flushall";
RAY_LOG(INFO) << "Cleaning up redis with command: " << flush_all_redis_command;
if (system(flush_all_redis_command.c_str()) != 0) {
std::vector<std::string> cmdargs(
{TEST_REDIS_CLIENT_EXEC_PATH, "-p", std::to_string(port), "flushall"});
RAY_LOG(INFO) << "Cleaning up redis with command: " << CreateCommandLine(cmdargs);
if (Process::Call(cmdargs)) {
RAY_LOG(WARNING) << "Failed to flush redis. The redis process may no longer exist.";
}
usleep(100 * 1000);
@@ -100,12 +100,10 @@ std::string TestSetupUtil::StartObjectStore(
}
std::string store_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "store" + socket_suffix);
std::string store_pid_file = store_socket_name + ".pid";
std::string plasma_command = TEST_STORE_EXEC_PATH + " -m 10000000 -s " +
store_socket_name +
" 1> /dev/null 2> /dev/null & echo $! > " + store_pid_file;
RAY_LOG(DEBUG) << plasma_command;
RAY_CHECK(system(plasma_command.c_str()) == 0);
std::vector<std::string> cmdargs(
{TEST_STORE_EXEC_PATH, "-m", "10000000", "-s", store_socket_name});
RAY_LOG(DEBUG) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, store_socket_name + ".pid").second);
usleep(200 * 1000);
return store_socket_name;
}
@@ -117,14 +115,11 @@ void TestSetupUtil::StopObjectStore(const std::string &store_socket_name) {
std::string TestSetupUtil::StartGcsServer(const std::string &redis_address) {
std::string gcs_server_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex());
std::string gcs_server_start_cmd = TEST_GCS_SERVER_EXEC_PATH;
gcs_server_start_cmd.append(" --redis_address=" + redis_address)
.append(" --redis_port=6379")
.append(" --config_list=initial_reconstruction_timeout_milliseconds,2000")
.append(" & echo $! > " + gcs_server_socket_name + ".pid");
RAY_LOG(INFO) << "Start gcs server command: " << gcs_server_start_cmd;
RAY_CHECK(system(gcs_server_start_cmd.c_str()) == 0);
std::vector<std::string> cmdargs(
{TEST_GCS_SERVER_EXEC_PATH, "--redis_address=" + redis_address, "--redis_port=6379",
"--config_list=initial_reconstruction_timeout_milliseconds,2000"});
RAY_LOG(INFO) << "Start gcs server command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, gcs_server_socket_name + ".pid").second);
usleep(200 * 1000);
RAY_LOG(INFO) << "GCS server started.";
return gcs_server_socket_name;
@@ -140,26 +135,20 @@ std::string TestSetupUtil::StartRaylet(const std::string &store_socket_name,
const std::string &resource) {
std::string raylet_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "raylet" + ObjectID::FromRandom().Hex());
std::string raylet_start_cmd = TEST_RAYLET_EXEC_PATH;
raylet_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name)
.append(" --store_socket_name=" + store_socket_name)
.append(" --object_manager_port=0 --node_manager_port=" + std::to_string(port))
.append(" --node_ip_address=" + node_ip_address)
.append(" --redis_address=" + redis_address)
.append(" --redis_port=6379")
.append(" --min-worker-port=0")
.append(" --max-worker-port=0")
.append(" --num_initial_workers=1")
.append(" --maximum_startup_concurrency=10")
.append(" --static_resource_list=" + resource)
.append(" --python_worker_command=\"" + TEST_MOCK_WORKER_EXEC_PATH + " " +
store_socket_name + " " + raylet_socket_name + " " + std::to_string(port) +
"\"")
.append(" --config_list=initial_reconstruction_timeout_milliseconds,2000")
.append(" & echo $! > " + raylet_socket_name + ".pid");
RAY_LOG(DEBUG) << "Raylet Start command: " << raylet_start_cmd;
RAY_CHECK(system(raylet_start_cmd.c_str()) == 0);
std::vector<std::string> cmdargs(
{TEST_RAYLET_EXEC_PATH, "--raylet_socket_name=" + raylet_socket_name,
"--store_socket_name=" + store_socket_name, "--object_manager_port=0",
"--node_manager_port=" + std::to_string(port),
"--node_ip_address=" + node_ip_address, "--redis_address=" + redis_address,
"--redis_port=6379", "--min-worker-port=0", "--max-worker-port=0",
"--num_initial_workers=1", "--maximum_startup_concurrency=10",
"--static_resource_list=" + resource,
"--python_worker_command=" +
CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH, store_socket_name,
raylet_socket_name, std::to_string(port)}),
"--config_list=initial_reconstruction_timeout_milliseconds,2000"});
RAY_LOG(DEBUG) << "Raylet Start command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, raylet_socket_name + ".pid").second);
usleep(200 * 1000);
return raylet_socket_name;
}
@@ -171,14 +160,11 @@ void TestSetupUtil::StopRaylet(const std::string &raylet_socket_name) {
std::string TestSetupUtil::StartRayletMonitor(const std::string &redis_address) {
std::string raylet_monitor_socket_name = ray::JoinPaths(
ray::GetUserTempDir(), "raylet_monitor" + ObjectID::FromRandom().Hex() + ".pid");
std::string raylet_monitor_pid = raylet_monitor_socket_name + ".pid";
std::string raylet_monitor_start_cmd = TEST_RAYLET_MONITOR_EXEC_PATH;
raylet_monitor_start_cmd.append(" --redis_address=" + redis_address)
.append(" --redis_port=6379")
.append(" & echo $! > " + raylet_monitor_pid);
RAY_LOG(DEBUG) << "Raylet monitor Start command: " << raylet_monitor_start_cmd;
RAY_CHECK(system(raylet_monitor_start_cmd.c_str()) == 0);
std::vector<std::string> cmdargs({TEST_RAYLET_MONITOR_EXEC_PATH,
"--redis_address=" + redis_address,
"--redis_port=6379"});
RAY_LOG(DEBUG) << "Raylet monitor Start command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, raylet_monitor_socket_name).second);
usleep(200 * 1000);
return raylet_monitor_socket_name;
}
@@ -206,11 +192,26 @@ bool WaitForCondition(std::function<bool()> condition, int timeout_ms) {
}
void KillProcessBySocketName(std::string socket_name) {
std::string pid = socket_name + ".pid";
std::string kill_9 = "kill -9 `cat " + pid + "`";
RAY_LOG(DEBUG) << kill_9;
ASSERT_TRUE(system(kill_9.c_str()) == 0);
ASSERT_TRUE(system(("rm -f " + pid).c_str()) == 0);
std::string pidfile_path = socket_name + ".pid";
{
std::ifstream pidfile(pidfile_path, std::ios_base::in);
RAY_CHECK(pidfile.good());
pid_t pid = -1;
pidfile >> pid;
RAY_CHECK(pid != -1);
Process::FromPid(pid).Kill();
}
ASSERT_EQ(unlink(pidfile_path.c_str()), 0);
}
int KillAllExecutable(const std::string &executable) {
std::vector<std::string> cmdargs;
#ifdef _WIN32
cmdargs.insert(cmdargs.end(), {"taskkill", "/IM", executable});
#else
cmdargs.insert(cmdargs.end(), {"pkill", "-x", executable});
#endif
return Process::Call(cmdargs).value();
}
TaskID RandomTaskId() {
+5
View File
@@ -44,6 +44,11 @@ bool WaitForCondition(std::function<bool()> condition, int timeout_ms);
/// Used to kill process whose pid is stored in `socket_name.id` file.
void KillProcessBySocketName(std::string socket_name);
/// Kills all processes with the given executable name (similar to killall).
/// Note: On Windows, this should include the file extension (e.g. ".exe"), if any.
/// This cannot be done automatically as doing so may be incorrect in some cases.
int KillAllExecutable(const std::string &executable_with_suffix);
// A helper function to return a random task id.
TaskID RandomTaskId();
+2 -5
View File
@@ -88,9 +88,6 @@ class CoreWorkerTest : public ::testing::Test {
public:
CoreWorkerTest(int num_nodes)
: num_nodes_(num_nodes), gcs_options_("127.0.0.1", 6379, "") {
#ifdef _WIN32
RAY_CHECK(false) << "port system() calls to Windows before running this test";
#endif
TestSetupUtil::StartUpRedisServers(std::vector<int>{6379, 6380});
// flush redis first.
@@ -401,7 +398,7 @@ void CoreWorkerTest::TestActorRestart(
for (int i = 0; i < num_tasks; i++) {
if (i == task_index_to_kill_worker) {
RAY_LOG(INFO) << "killing worker";
ASSERT_EQ(system("pkill mock_worker"), 0);
ASSERT_EQ(KillAllExecutable(GetFileName(TEST_MOCK_WORKER_EXEC_PATH)), 0);
// Wait for actor restruction event, and then for alive event.
auto check_actor_restart_func = [this, pid, &actor_id, &resources]() -> bool {
@@ -453,7 +450,7 @@ void CoreWorkerTest::TestActorFailure(
for (int i = 0; i < num_tasks; i++) {
if (i == task_index_to_kill_worker) {
RAY_LOG(INFO) << "killing worker";
ASSERT_EQ(system("pkill mock_worker"), 0);
ASSERT_EQ(KillAllExecutable(GetFileName(TEST_MOCK_WORKER_EXEC_PATH)), 0);
}
// wait for actor being restarted.
@@ -80,12 +80,6 @@ class MockServer {
class TestObjectManagerBase : public ::testing::Test {
public:
TestObjectManagerBase() {
#ifdef _WIN32
RAY_CHECK(false) << "port system() calls to Windows before running this test";
#endif
}
void SetUp() {
flushall_redis();
@@ -76,12 +76,6 @@ class MockServer {
class TestObjectManagerBase : public ::testing::Test {
public:
TestObjectManagerBase() {
#ifdef _WIN32
RAY_CHECK(false) << "port system() calls to Windows before running this test";
#endif
}
void SetUp() {
flushall_redis();
@@ -32,9 +32,6 @@ class TestObjectManagerBase : public ::testing::Test {
public:
TestObjectManagerBase() {
RAY_LOG(INFO) << "TestObjectManagerBase: started.";
#ifdef _WIN32
RAY_CHECK(false) << "port system() calls to Windows before running this test";
#endif
}
NodeManagerConfig GetNodeManagerConfig(std::string raylet_socket_name,
@@ -95,14 +92,11 @@ class TestObjectManagerBase : public ::testing::Test {
this->server1.reset();
this->server2.reset();
int s = system("killall plasma_store_server &");
ASSERT_TRUE(!s);
ASSERT_EQ(TestSetupUtil::KillAllExecutable(plasma_store_server + GetExeSuffix()), 0);
std::string cmd_str = test_executable.substr(0, test_executable.find_last_of("/"));
s = system(("rm " + cmd_str + "/raylet_1").c_str());
ASSERT_TRUE(!s);
s = system(("rm " + cmd_str + "/raylet_2").c_str());
ASSERT_TRUE(!s);
ASSERT_EQ(unlink((cmd_str + "/raylet_1").c_str()), 0);
ASSERT_EQ(unlink((cmd_str + "/raylet_2").c_str()), 0);
}
ObjectID WriteDataToClient(plasma::PlasmaClient &client, int64_t data_size) {
+41
View File
@@ -10,6 +10,47 @@
namespace ray {
std::string GetExeSuffix() {
std::string result;
#ifdef _WIN32
result = ".exe";
#endif
return result;
}
std::string GetFileName(const std::string &path) {
size_t i = GetRootPathLength(path), j = path.size();
while (j > i && !IsDirSep(path[j - 1])) {
--j;
}
return path.substr(j);
}
size_t GetRootPathLength(const std::string &path) {
size_t i = 0;
#ifdef _WIN32
if (i + 2 < path.size() && IsDirSep(path[i]) && IsDirSep(path[i + 1]) &&
!IsDirSep(path[i + 2])) {
// UNC paths begin with two separators (but not 1 or 3)
i += 2;
for (int k = 0; k < 2; ++k) {
while (i < path.size() && !IsDirSep(path[i])) {
++i;
}
while (i < path.size() && IsDirSep(path[i])) {
++i;
}
}
} else if (i + 1 < path.size() && path[i + 1] == ':') {
i += 2;
}
#endif
while (i < path.size() && IsDirSep(path[i])) {
++i;
}
return i;
}
std::string GetRayTempDir() { return JoinPaths(GetUserTempDir(), "ray"); }
std::string GetUserTempDir() {
+12 -1
View File
@@ -2,6 +2,10 @@
#define RAY_UTIL_FILESYSTEM_H
#include <string>
#include <utility>
// Filesystem and path manipulation APIs.
// (NTFS stream & attribute paths are not supported.)
namespace ray {
@@ -30,6 +34,14 @@ static char GetPathSep() {
return result;
}
/// Returns the executable binary suffix for the platform, if any.
std::string GetExeSuffix();
/// Equivalent to Python's os.path.basename() for file system paths.
std::string GetFileName(const std::string &path);
size_t GetRootPathLength(const std::string &path);
/// \return A non-volatile temporary directory in which Ray can stores its files.
std::string GetRayTempDir();
@@ -61,7 +73,6 @@ std::string JoinPaths(std::string base, Paths... components) {
}
return base;
}
} // namespace ray
#endif // RAY_UTIL_UTIL_H
+26
View File
@@ -0,0 +1,26 @@
#include "ray/util/filesystem.h"
#include "gtest/gtest.h"
namespace ray {
TEST(FileSystemTest, PathParseTest) {
ASSERT_EQ(GetFileName("."), ".");
ASSERT_EQ(GetFileName(".."), "..");
ASSERT_EQ(GetFileName("foo/bar"), "bar");
ASSERT_EQ(GetFileName("///bar"), "bar");
ASSERT_EQ(GetFileName("///bar/"), "");
#ifdef _WIN32
ASSERT_EQ(GetFileName("C:"), "");
ASSERT_EQ(GetFileName("C::"), ":"); // just to match Python behavior
ASSERT_EQ(GetFileName("CC::"), "CC::");
ASSERT_EQ(GetFileName("C:\\"), "");
#endif
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+137 -27
View File
@@ -18,15 +18,18 @@
#include <process.h>
#else
#include <signal.h>
#include <stddef.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
#include <unistd.h>
#include <algorithm>
#include <fstream>
#include <string>
#include <vector>
#include "ray/util/filesystem.h"
#include "ray/util/logging.h"
#include "ray/util/util.h"
@@ -50,48 +53,98 @@ class ProcessFD {
pid_t GetId() const;
// Fork + exec combo. Returns -1 for the PID on failure.
static ProcessFD spawnvp(const char *argv[], std::error_code &ec) {
static ProcessFD spawnvp(const char *argv[], std::error_code &ec, bool decouple) {
ec = std::error_code();
intptr_t fd;
pid_t pid;
#ifdef _WIN32
(void)decouple; // Windows doesn't require anything particular for decoupling.
std::vector<std::string> args;
for (size_t i = 0; argv[i]; ++i) {
args.push_back(argv[i]);
}
// Calling CreateCommandLine() here wouldn't make sense here if the
// Microsoft C runtime properly quoted each command-argument argument.
// However, it doesn't quote at all. It just joins arguments with a space.
// So we have to do the quoting manually and pass everything as a single argument.
fd = _spawnlp(P_NOWAIT, args[0].c_str(), CreateCommandLine(args).c_str(), NULL);
if (fd != -1) {
pid = static_cast<pid_t>(GetProcessId(reinterpret_cast<HANDLE>(fd)));
if (pid == 0) {
pid = -1;
}
} else {
pid = -1;
std::string cmds[] = {std::string(), CreateCommandLine(args)};
if (GetFileName(args.at(0)).find('.') == std::string::npos) {
// Some executables might be missing an extension.
// Append a single "." to prevent automatic appending of extensions by the system.
std::vector<std::string> args_direct_call = args;
args_direct_call[0] += ".";
cmds[0] = CreateCommandLine(args_direct_call);
}
if (pid == -1) {
bool succeeded = false;
PROCESS_INFORMATION pi = {};
for (int attempt = 0; attempt < sizeof(cmds) / sizeof(*cmds); ++attempt) {
std::string &cmd = cmds[attempt];
if (!cmd.empty()) {
(void)cmd.c_str(); // We'll need this to be null-terminated (but mutable) below
TCHAR *cmdline = &*cmd.begin();
STARTUPINFO si = {sizeof(si)};
if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi)) {
succeeded = true;
break;
}
}
}
if (succeeded) {
CloseHandle(pi.hThread);
fd = reinterpret_cast<intptr_t>(pi.hProcess);
pid = pi.dwProcessId;
} else {
ec = std::error_code(GetLastError(), std::system_category());
fd = -1;
pid = -1;
}
#else
// TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating
// file descriptors into the child process, as that can be problematic.
pid = fork();
int pipefds[2]; // Create pipe to get PID & track lifetime
if (pipe(pipefds) == -1) {
pipefds[0] = pipefds[1] = -1;
}
pid = pipefds[1] != -1 ? fork() : -1;
if (pid <= 0 && pipefds[0] != -1) {
close(pipefds[0]); // not the parent, so close the read end of the pipe
pipefds[0] = -1;
}
if (pid != 0 && pipefds[1] != -1) {
close(pipefds[1]); // not the child, so close the write end of the pipe
pipefds[1] = -1;
}
if (pid == 0) {
// Child process case. Reset the SIGCHLD handler for the worker.
// Child process case. Reset the SIGCHLD handler.
signal(SIGCHLD, SIG_DFL);
if (execvp(argv[0], const_cast<char *const *>(argv)) == -1) {
pid = -1;
abort(); // fork() succeeded but exec() failed, so abort the child
// If process needs to be decoupled, double-fork to avoid zombies.
if (pid_t pid2 = decouple ? fork() : 0) {
_exit(pid2 == -1 ? errno : 0); // Parent of grandchild; must exit
}
// This is the spawned process. Any intermediate parent is now dead.
pid_t my_pid = getpid();
if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) {
execvp(argv[0], const_cast<char *const *>(argv));
}
_exit(errno); // fork() succeeded and exec() failed, so abort the child
}
if (pid > 0) {
// Parent process case
if (decouple) {
int s;
(void)waitpid(pid, &s, 0); // can't do much if this fails, so ignore return value
}
int r = read(pipefds[0], &pid, sizeof(pid));
(void)r; // can't do much if this fails, so ignore return value
}
if (decouple) {
fd = pipefds[0]; // grandchild, but we can use this to track its lifetime
} else {
fd = -1; // direct child, so we can use the PID to track its lifetime
if (pipefds[0] != -1) {
close(pipefds[0]);
pipefds[0] = -1;
}
}
if (pid == -1) {
ec = std::error_code(errno, std::system_category());
}
// TODO(mehrdadn): This would be a good place to open a descriptor later
fd = -1;
#endif
return ProcessFD(pid, fd);
}
@@ -99,11 +152,13 @@ class ProcessFD {
ProcessFD::~ProcessFD() {
if (fd_ != -1) {
bool success;
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(fd_));
success = !!CloseHandle(reinterpret_cast<HANDLE>(fd_));
#else
close(static_cast<int>(fd_));
success = close(static_cast<int>(fd_)) == 0;
#endif
RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD";
}
}
@@ -219,14 +274,32 @@ Process &Process::operator=(Process other) {
Process::Process(pid_t pid) { p_ = std::make_shared<ProcessFD>(pid); }
Process::Process(const char *argv[], void *io_service, std::error_code &ec) {
Process::Process(const char *argv[], void *io_service, std::error_code &ec,
bool decouple) {
(void)io_service;
ProcessFD procfd = ProcessFD::spawnvp(argv, ec);
ProcessFD procfd = ProcessFD::spawnvp(argv, ec, decouple);
if (!ec) {
p_ = std::make_shared<ProcessFD>(std::move(procfd));
}
}
std::error_code Process::Call(const std::vector<std::string> &args) {
std::vector<const char *> argv;
for (size_t i = 0; i != args.size(); ++i) {
argv.push_back(args[i].c_str());
}
argv.push_back(NULL);
std::error_code ec;
Process proc(&*argv.begin(), NULL, ec, true);
if (!ec) {
int return_code = proc.Wait();
if (return_code != 0) {
ec = std::error_code(return_code, std::system_category());
}
}
return ec;
}
Process Process::CreateNewDummy() {
pid_t pid = -1;
Process result(pid);
@@ -247,6 +320,24 @@ bool Process::IsNull() const { return !p_; }
bool Process::IsValid() const { return GetId() != -1; }
std::pair<Process, std::error_code> Process::Spawn(const std::vector<std::string> &args,
bool decouple,
const std::string &pid_file) {
std::vector<const char *> argv;
for (size_t i = 0; i != args.size(); ++i) {
argv.push_back(args[i].c_str());
}
argv.push_back(NULL);
std::error_code error;
Process proc(&*argv.begin(), NULL, error, decouple);
if (!error && !pid_file.empty()) {
std::ofstream file(pid_file, std::ios_base::out | std::ios_base::trunc);
file << proc.GetId() << std::endl;
RAY_CHECK(file.good());
}
return std::make_pair(std::move(proc), error);
}
int Process::Wait() const {
int status;
if (p_) {
@@ -265,8 +356,27 @@ int Process::Wait() const {
status = -1;
}
#else
(void)fd;
if (waitpid(pid, &status, 0) != 0) {
// There are 3 possible cases:
// - The process is a child whose death we await via waitpid().
// This is the usual case, when we have a child whose SIGCHLD we handle.
// - The process shares a pipe with us whose closure we use to detect its death.
// This is used to track a non-owned process, like a grandchild.
// - The process has no relationship with us, in which case we simply fail,
// since we have no need for this (and there's no good way to do it).
// Why don't we just poll the PID? Because it's better not to:
// - It would be prone to a race condition (we won't know when the PID is recycled).
// - It would incur high latency and/or high CPU usage for the caller.
if (fd != -1) {
// We have a pipe, so wait for its other end to close, to detect process death.
unsigned char buf[1 << 8];
ptrdiff_t r;
while ((r = read(fd, buf, sizeof(buf))) > 0) {
// Keep reading until socket terminates
}
status = r == -1 ? -1 : 0;
} else if (waitpid(pid, &status, 0) == -1) {
// Just the normal waitpid() case.
// (We can only do this once, only if we own the process. It fails otherwise.)
error = std::error_code(errno, std::system_category());
}
#endif
+12 -1
View File
@@ -23,8 +23,10 @@
#include <functional>
#include <memory>
#include <string>
#include <system_error>
#include <utility>
#include <vector>
#ifndef PID_MAX_LIMIT
// This is defined by Linux to be the maximum allowable number of processes
@@ -57,7 +59,11 @@ class Process {
/// \param[in] argv The command-line of the process to spawn (terminated with NULL).
/// \param[in] io_service Boost.Asio I/O service (optional).
/// \param[in] ec Returns any error that occurred when spawning the process.
explicit Process(const char *argv[], void *io_service, std::error_code &ec);
/// \param[in] decouple True iff the parent will not wait for the child to exit.
explicit Process(const char *argv[], void *io_service, std::error_code &ec,
bool decouple = false);
/// Convenience function to run the given command line and wait for it to finish.
static std::error_code Call(const std::vector<std::string> &args);
static Process CreateNewDummy();
static Process FromPid(pid_t pid);
pid_t GetId() const;
@@ -68,6 +74,11 @@ class Process {
bool IsValid() const;
/// Forcefully kills the process. Unsafe for unowned processes.
void Kill();
/// Convenience function to start a process in the background.
/// \param pid_file A file to write the PID of the spawned process in.
static std::pair<Process, std::error_code> Spawn(
const std::vector<std::string> &args, bool decouple,
const std::string &pid_file = std::string());
/// Waits for process to terminate. Not supported for unowned processes.
/// \return The process's exit code. Returns 0 for a dummy process, -1 for a null one.
int Wait() const;
-3
View File
@@ -18,9 +18,6 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
public:
StreamingQueueTestBase(int num_nodes, int port)
: gcs_options_("127.0.0.1", 6379, ""), node_manager_port_(port) {
#ifdef _WIN32
RAY_CHECK(false) << "port system() calls to Windows before running this test";
#endif
TestSetupUtil::StartUpRedisServers(std::vector<int>{6379, 6380});
// flush redis first.