From b53d00b65970dcb702696ff79af2e203dc3ef661 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 3 Feb 2020 19:21:25 +0800 Subject: [PATCH] [Java] Fix Java test issues (#7000) --- .../org/ray/runtime/config/RayConfig.java | 6 +++++- .../org/ray/runtime/runner/RunManager.java | 8 ++++---- .../org/ray/runtime/util/NetworkUtil.java | 19 +++++++++---------- .../src/main/resources/log4j.properties | 6 +++--- .../java/org/ray/api/test/FailureTest.java | 15 +++++++++++++++ .../java/org/ray/api/test/RayConfigTest.java | 2 +- 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index da9bde19d..23c2eec9a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -11,7 +11,6 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Random; import org.ray.api.id.JobId; import org.ray.runtime.generated.Common.WorkerType; @@ -46,6 +45,7 @@ public class RayConfig { private String redisIp; private Integer redisPort; public final int headRedisPort; + public final int[] redisShardPorts; public final int numberRedisShards; public final String headRedisPassword; public final String redisPassword; @@ -153,6 +153,10 @@ public class RayConfig { headRedisPort = NetworkUtil.getUnusedPort(); } numberRedisShards = config.getInt("ray.redis.shard-number"); + redisShardPorts = new int[numberRedisShards]; + for (int i = 0; i < numberRedisShards; i++) { + redisShardPorts[i] = NetworkUtil.getUnusedPort(); + } headRedisPassword = config.getString("ray.redis.head-password"); redisPassword = config.getString("ray.redis.password"); diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 9ead9af16..4e645ed69 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -32,7 +32,7 @@ public class RunManager { private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class); private static final DateTimeFormatter DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("Y-M-d_H-m-s"); + DateTimeFormatter.ofPattern("YYYY-MM-dd_HH-mm-ss"); private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker"; @@ -139,9 +139,9 @@ public class RunManager { LOGGER.error("Failed to start process " + name, e); throw new RuntimeException("Failed to start process " + name, e); } - // Wait 200ms and check whether the process is alive. + // Wait 1000 ms and check whether the process is alive. try { - TimeUnit.MILLISECONDS.sleep(200); + TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } @@ -201,7 +201,7 @@ public class RunManager { // start redis shards for (int i = 0; i < rayConfig.numberRedisShards; i++) { String shard = startRedisInstance(rayConfig.nodeIp, - rayConfig.headRedisPort + i + 1, rayConfig.headRedisPassword, i); + rayConfig.redisShardPorts[i], rayConfig.headRedisPassword, i); client.rpush("RedisShards", shard); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java index 8a6aca3f3..8d3f1af88 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java @@ -5,10 +5,10 @@ import java.io.IOException; import java.net.DatagramSocket; import java.net.Inet6Address; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.ServerSocket; import java.util.Enumeration; +import java.util.concurrent.ThreadLocalRandom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +16,9 @@ public class NetworkUtil { private static final Logger LOGGER = LoggerFactory.getLogger(NetworkUtil.class); + private static final int MIN_PORT = 10000; + private static final int MAX_PORT = 65535; + public static String getIpAddress(String interfaceName) { try { Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); @@ -49,16 +52,12 @@ public class NetworkUtil { } public static int getUnusedPort() { - int port; - try { - ServerSocket ss = new ServerSocket(); - ss.bind(new InetSocketAddress(0)); - port = ss.getLocalPort(); - ss.close(); - } catch (Exception e) { - throw new RuntimeException("Failed to bind to an available port.", e); + while (true) { + int port = ThreadLocalRandom.current().nextInt(MAX_PORT - MIN_PORT) + MIN_PORT; + if (isPortAvailable(port)) { + return port; + } } - return port; } public static boolean isPortAvailable(int port) { diff --git a/java/runtime/src/main/resources/log4j.properties b/java/runtime/src/main/resources/log4j.properties index 2218efef6..2d28a3ca0 100644 --- a/java/runtime/src/main/resources/log4j.properties +++ b/java/runtime/src/main/resources/log4j.properties @@ -1,4 +1,4 @@ -ray.logging.level=INFO +ray.logging.level=info ray.logging.stdout=org.apache.log4j.ConsoleAppender ray.logging.file=org.apache.log4j.varia.NullAppender @@ -7,11 +7,11 @@ log4j.rootLogger=${ray.logging.level}, stdout, file log4j.appender.stdout=${ray.logging.stdout} log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n # Set the file appender to null by default. If `ray.redirect-output` config is set to true, # this appender will be set to a real file appender. log4j.appender.file=${ray.logging.file} log4j.appender.file.File=${ray.logging.file.path} log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n diff --git a/java/test/src/main/java/org/ray/api/test/FailureTest.java b/java/test/src/main/java/org/ray/api/test/FailureTest.java index 83548d90f..a7091c2b2 100644 --- a/java/test/src/main/java/org/ray/api/test/FailureTest.java +++ b/java/test/src/main/java/org/ray/api/test/FailureTest.java @@ -14,12 +14,27 @@ import org.ray.api.exception.RayTaskException; import org.ray.api.exception.RayWorkerException; import org.ray.api.function.RayFunc0; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class FailureTest extends BaseTest { private static final String EXCEPTION_MESSAGE = "Oops"; + @BeforeClass + public void setUp() { + // This is needed by `testGetThrowsQuicklyWhenFoundException`. + // Set one worker per process. Otherwise, if `badFunc2` and `slowFunc` run in the same + // process, `sleep` will delay `System.exit`. + System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); + } + + @AfterClass + public void tearDown() { + System.clearProperty("ray.raylet.config.num_workers_per_process_java"); + } + public static int badFunc() { throw new RuntimeException(EXCEPTION_MESSAGE); } diff --git a/java/test/src/main/java/org/ray/api/test/RayConfigTest.java b/java/test/src/main/java/org/ray/api/test/RayConfigTest.java index 8448108f6..e4615d120 100644 --- a/java/test/src/main/java/org/ray/api/test/RayConfigTest.java +++ b/java/test/src/main/java/org/ray/api/test/RayConfigTest.java @@ -7,7 +7,7 @@ import org.testng.annotations.Test; public class RayConfigTest { - public final static int NUM_RETRIES = 5; + public static final int NUM_RETRIES = 5; @Test public void testCreateRayConfig() {