diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index 192e5550c..2307b0489 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -96,7 +96,7 @@ public class RunManager { * * @param command The command to start the process with. */ - public static String runCommand(List command) throws IOException, InterruptedException { + private static String runCommand(List command) throws IOException, InterruptedException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Starting process with command: {}", Joiner.on(" ").join(command)); } diff --git a/java/test.sh b/java/test.sh index b49f06037..a842194e6 100755 --- a/java/test.sh +++ b/java/test.sh @@ -16,27 +16,30 @@ pushd "$ROOT_DIR" mvn -T16 checkstyle:check popd +on_exit() { + exit_code=$? + if [ $exit_code -ne 0 ]; then + echo "Exit trap, printing ray logs" + cat /tmp/ray/session_latest/logs/* + fi +} + +trap on_exit EXIT + run_testng() { - local pid local exit_code - "$@" & - pid=$! - if wait $pid; then + if "$@"; then exit_code=0 else exit_code=$? fi # exit_code == 2 means there are skipped tests. if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then - # Only print log files if it ran in cluster mode - if [[ ! "$*" =~ SINGLE_PROCESS ]]; then - if [ $exit_code -gt 128 ] ; then - # Test crashed. Print the driver log for diagnosis. - cat /tmp/ray/session_latest/logs/java-core-driver-*$pid* - fi + if [ $exit_code -gt 128 ] ; then + # Test crashed. Print the driver log for diagnosis. + cat /tmp/ray/session_latest/logs/java-core-driver-* fi - # Only print the hs_err_pid file of TestNG process - find . -name "hs_err_pid$pid.log" -exec cat {} + + find . -name "hs_err_*log" -exec cat {} + exit $exit_code fi } @@ -57,31 +60,11 @@ if ! git diff --exit-code -- java src/ray/core_worker/lib/java; then exit 1 fi -# NOTE(kfstrom): Java test troubleshooting only. -# Set MAX_ROUNDS to a big number (e.g. 1000) to run Java tests repeatedly. -# You may also want to modify java/testng.xml to run only a subset of test cases. -MAX_ROUNDS=1 -if [ $MAX_ROUNDS -gt 1 ]; then - export RAY_BACKEND_LOG_LEVEL=debug -fi - -round=1 -while true; do - echo Starting cluster mode test round $round - - echo "Running tests under cluster mode." - # TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, - # TestNG will exit with code 2. And bazel treats it as test failure. - # bazel test //java:all_tests --config=ci || cluster_exit_code=$? - run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml - - echo Finished cluster mode test round $round - date - round=$((round+1)) - if (( round > MAX_ROUNDS )); then - break - fi -done +echo "Running tests under cluster mode." +# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, +# TestNG will exit with code 2. And bazel treats it as test failure. +# bazel test //java:all_tests --config=ci || cluster_exit_code=$? +run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml echo "Running tests under single-process mode." # bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --config=ci || single_exit_code=$? diff --git a/java/test/src/main/java/io/ray/test/TestProgressListener.java b/java/test/src/main/java/io/ray/test/TestProgressListener.java index 915d82af3..1fed5ac21 100644 --- a/java/test/src/main/java/io/ray/test/TestProgressListener.java +++ b/java/test/src/main/java/io/ray/test/TestProgressListener.java @@ -1,42 +1,27 @@ package io.ray.test; -import com.google.common.collect.ImmutableList; -import io.ray.runtime.runner.RunManager; -import java.io.File; import java.time.LocalDateTime; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; import org.testng.IInvokedMethod; import org.testng.IInvokedMethodListener; import org.testng.ITestContext; import org.testng.ITestListener; import org.testng.ITestResult; -import org.testng.SkipException; public class TestProgressListener implements IInvokedMethodListener, ITestListener { - // Travis aborts CI if no outputs for 10 minutes. So threshold needs to be smaller than 10m. - private static final long hangDetectionThresholdMillis = 5 * 60 * 1000; - private static final int TAIL_NO_OF_LINES = 500; - private Thread testMainThread; - private long testStartTimeMillis; - private String getFullTestName(ITestResult testResult) { return testResult.getTestClass().getName() + "." + testResult.getMethod().getMethodName(); } - private void printSection(String sectionName) { + private void printInfo(String tag, String content) { System.out.println( - "============ [" + LocalDateTime.now().toString() + "] " + sectionName + " ============"); - } - - private void printTestStage(String tag, String content) { - printSection("[" + tag + "] " + content); + "============ [" + + LocalDateTime.now().toString() + + "] [" + + tag + + "] " + + content + + " ============"); } @Override @@ -47,50 +32,31 @@ public class TestProgressListener implements IInvokedMethodListener, ITestListen @Override public void onTestStart(ITestResult result) { - printTestStage("TEST START", getFullTestName(result)); - testStartTimeMillis = System.currentTimeMillis(); - // TODO(kfstorm): Add a timer to detect hang - if (testMainThread == null) { - testMainThread = Thread.currentThread(); - Thread hangDetectionThread = - new Thread( - () -> { - try { - // If current task case has ran for more than 5 minutes. - while (System.currentTimeMillis() - testStartTimeMillis - < hangDetectionThresholdMillis) { - Thread.sleep(1000); - } - printDebugInfo(null, /*testHanged=*/ true); - } catch (InterruptedException e) { - // ignored - } - }); - hangDetectionThread.setDaemon(true); - hangDetectionThread.start(); - } + printInfo("TEST START", getFullTestName(result)); } @Override public void onTestSuccess(ITestResult result) { - printTestStage("TEST SUCCESS", getFullTestName(result)); + printInfo("TEST SUCCESS", getFullTestName(result)); } @Override public void onTestFailure(ITestResult result) { - printTestStage("TEST FAILURE", getFullTestName(result)); - printDebugInfo(result, /*testHanged=*/ false); + printInfo("TEST FAILURE", getFullTestName(result)); + Throwable throwable = result.getThrowable(); + if (throwable != null) { + throwable.printStackTrace(); + } } @Override public void onTestSkipped(ITestResult result) { - printTestStage("TEST SKIPPED", getFullTestName(result)); - printDebugInfo(result, /*testHanged=*/ false); + printInfo("TEST SKIPPED", getFullTestName(result)); } @Override public void onTestFailedButWithinSuccessPercentage(ITestResult result) { - printTestStage("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result)); + printInfo("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result)); } @Override @@ -98,102 +64,4 @@ public class TestProgressListener implements IInvokedMethodListener, ITestListen @Override public void onFinish(ITestContext context) {} - - private void printDebugInfo(ITestResult result, boolean testHanged) { - boolean testFailed = false; - if (result != null) { - Throwable throwable = result.getThrowable(); - if (throwable != null && !(throwable instanceof SkipException)) { - testFailed = true; - throwable.printStackTrace(); - } - } - if (!testFailed && !testHanged) { - return; - } - - if (testHanged) { - printSection("TEST CASE HANGED"); - printSection("STACK TRACE OF TEST THREAD"); - for (StackTraceElement element : testMainThread.getStackTrace()) { - System.out.println(element.toString()); - } - Set javaPids = getJavaPids(); - for (Integer pid : javaPids) { - runCommandSafely(ImmutableList.of("jstack", pid.toString())); - // TODO(kfstorm): Check lldb or gdb exists rather than detecting OS type. - if (SystemUtils.IS_OS_MAC) { - runCommandSafely( - ImmutableList.of("lldb", "--batch", "-o", "bt all", "-p", pid.toString())); - } else { - runCommandSafely( - ImmutableList.of( - "sudo", "gdb", "-batch", "-ex", "thread apply all bt", "-p", pid.toString())); - } - } - } - - printLogFiles(); - - if (testHanged) { - printSection("ABORT TEST"); - System.exit(1); - } - } - - private String runCommandSafely(List command) { - String output; - String commandString = String.join(" ", command); - printSection(commandString); - try { - output = RunManager.runCommand(command); - System.out.println(output); - } catch (Exception e) { - System.out.println("Failed to execute command: " + commandString); - e.printStackTrace(); - output = ""; - } - return output; - } - - private Set getJavaPids() { - Set javaPids = new HashSet<>(); - String jpsOutput = runCommandSafely(ImmutableList.of("jps", "-v")); - try { - for (String line : StringUtils.split(jpsOutput, "\n")) { - String[] parts = StringUtils.split(line); - if (parts.length > 1 && parts[1].toLowerCase().equals("jps")) { - // Skip jps. - continue; - } - Integer pid = Integer.valueOf(parts[0]); - javaPids.add(pid); - } - } catch (Exception e) { - System.out.println("Failed to parse jps output."); - e.printStackTrace(); - } - - String pgrepJavaResult = runCommandSafely(ImmutableList.of("pgrep", "java")); - try { - for (String line : StringUtils.split(pgrepJavaResult, "\n")) { - Integer pid = Integer.valueOf(line); - javaPids.add(pid); - } - } catch (Exception e) { - System.out.println("Failed to parse pgrep java output."); - e.printStackTrace(); - } - - return javaPids; - } - - private void printLogFiles() { - Collection logFiles = - FileUtils.listFiles(new File("/tmp/ray/session_latest/logs"), null, false); - for (File file : logFiles) { - runCommandSafely( - ImmutableList.of("tail", "-n", String.valueOf(TAIL_NO_OF_LINES), file.getAbsolutePath())); - } - } } diff --git a/java/testng.xml b/java/testng.xml index 0db270484..6cc10b9ab 100644 --- a/java/testng.xml +++ b/java/testng.xml @@ -1,6 +1,6 @@ - + diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 262c83701..6c8287c15 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -161,19 +161,15 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options) // RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize // for java worker or in constructor of CoreWorker for python worker. ray::stats::Init(global_tags, options_.metrics_agent_port); - - // NOTE(kfstorm): std::atexit should be put at the end of `CoreWorkerProcess` - // constructor. We assume that spdlog has been initialized before this line. When the - // process is exiting, `HandleAtExit` will be invoked before destructing spdlog static - // variables. We explicitly destruct `CoreWorkerProcess` instance in the callback to - // ensure the static `CoreWorkerProcess` instance is destructed while spdlog is still - // usable. This prevents crashing (or hanging) when using `RAY_LOG` in - // `CoreWorkerProcess` destructor. - RAY_CHECK(std::atexit(CoreWorkerProcess::HandleAtExit) == 0); } CoreWorkerProcess::~CoreWorkerProcess() { RAY_LOG(INFO) << "Destructing CoreWorkerProcess. pid: " << getpid(); + { + // Check that all `CoreWorker` instances have been removed. + absl::ReaderMutexLock lock(&worker_map_mutex_); + RAY_CHECK(workers_.empty()); + } RAY_LOG(DEBUG) << "Stats stop in core worker."; // Shutdown stats module if worker process exits. ray::stats::Shutdown(); @@ -187,8 +183,6 @@ void CoreWorkerProcess::EnsureInitialized() { << "shutdown."; } -void CoreWorkerProcess::HandleAtExit() { instance_.reset(); } - std::shared_ptr CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) { if (!instance_) { return nullptr; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 72ef4f36c..6fa24c29e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -265,8 +265,6 @@ class CoreWorkerProcess { /// \return Void. static void EnsureInitialized(); - static void HandleAtExit(); - /// Get the `CoreWorker` instance by worker ID. /// /// \param[in] workerId The worker ID.