diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index fb05e987a..62945b557 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -141,6 +141,11 @@ public final class RayNativeRuntime extends AbstractRayRuntime { LOGGER.info("RayNativeRuntime shutdown"); } + // For test purpose only + public RunManager getRunManager() { + return manager; + } + @Override public void setResource(String resourceName, double capacity, UniqueId nodeId) { Preconditions.checkArgument(Double.compare(capacity, 0) >= 0); diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 2f5fa8b69..a101ef5f9 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -57,30 +57,39 @@ public class RunManager { for (int i = processes.size() - 1; i >= 0; --i) { Pair pair = processes.get(i); - String name = pair.getLeft(); - Process p = pair.getRight(); - - int numAttempts = 0; - while (p.isAlive()) { - if (numAttempts == 0) { - LOGGER.debug("Terminating process {}.", name); - p.destroy(); - } else { - LOGGER.debug("Terminating process {} forcibly.", name); - p.destroyForcibly(); - } - try { - p.waitFor(KILL_PROCESS_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn("Got InterruptedException while waiting for process {}" + - " to be terminated.", processes.get(i)); - } - numAttempts++; - } - LOGGER.info("Process {} is now terminated.", name); + terminateProcess(pair.getLeft(), pair.getRight()); } } + public void terminateProcess(String name, Process p) { + int numAttempts = 0; + while (p.isAlive()) { + if (numAttempts == 0) { + LOGGER.debug("Terminating process {}.", name); + p.destroy(); + } else { + LOGGER.debug("Terminating process {} forcibly.", name); + p.destroyForcibly(); + } + try { + p.waitFor(KILL_PROCESS_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Got InterruptedException while waiting for process {}" + + " to be terminated.", name); + } + numAttempts++; + } + LOGGER.info("Process {} is now terminated.", name); + } + + /** + * Get processes by name. For test purposes only. + */ + public List getProcesses(String name) { + return processes.stream().filter(pair -> pair.getLeft().equals(name)).map(Pair::getRight) + .collect(Collectors.toList()); + } + private void createTempDirs() { try { FileUtils.forceMkdir(new File(rayConfig.logDir)); @@ -258,7 +267,8 @@ public class RunManager { String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName), String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName), String.format("--object_manager_port=%d", 0), // The object manager port. - String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), // The node manager port. + // The node manager port. + String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), String.format("--node_ip_address=%s", rayConfig.nodeIp), String.format("--redis_address=%s", rayConfig.getRedisIp()), String.format("--redis_port=%d", rayConfig.getRedisPort()), diff --git a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java index b73ddd75c..f862eb95c 100644 --- a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java @@ -7,7 +7,9 @@ import org.ray.api.RayObject; import org.ray.api.TestUtils; import org.ray.api.exception.RayException; import org.ray.api.id.ObjectId; +import org.ray.runtime.RayNativeRuntime; import org.ray.runtime.object.RayObjectImpl; +import org.ray.runtime.runner.RunManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -26,7 +28,11 @@ public class ClientExceptionTest extends BaseTest { Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(1); - Ray.shutdown(); + // kill raylet + RunManager runManager = ((RayNativeRuntime) TestUtils.getRuntime()).getRunManager(); + for (Process process : runManager.getProcesses("raylet")) { + runManager.terminateProcess("raylet", process); + } } catch (InterruptedException e) { LOGGER.error("Got InterruptedException when sleeping, exit right now."); throw new RuntimeException("Got InterruptedException when sleeping.", e);