From f10a5a40b09020bde34f82dcdc776d520ea06457 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 2 Sep 2020 19:47:52 +0800 Subject: [PATCH] [Java] Simplify ray cmd params (#10394) --- .../io/ray/api/runtimecontext/NodeInfo.java | 12 +++- .../java/io/ray/runtime/RayNativeRuntime.java | 60 ++++++++++++++++--- .../java/io/ray/runtime/config/RayConfig.java | 28 +++++---- .../java/io/ray/runtime/gcs/GcsClient.java | 1 + .../src/main/resources/ray.default.conf | 12 ---- java/test.sh | 1 + src/ray/core_worker/lib/java/jni_utils.h | 2 +- .../ray/streaming/runtime/util/RayUtils.java | 3 +- .../streaming/runtime/util/Mockitools.java | 4 ++ 9 files changed, 89 insertions(+), 34 deletions(-) diff --git a/java/api/src/main/java/io/ray/api/runtimecontext/NodeInfo.java b/java/api/src/main/java/io/ray/api/runtimecontext/NodeInfo.java index 5a24120be..dcf5f3e0c 100644 --- a/java/api/src/main/java/io/ray/api/runtimecontext/NodeInfo.java +++ b/java/api/src/main/java/io/ray/api/runtimecontext/NodeInfo.java @@ -14,15 +14,25 @@ public class NodeInfo { public final String nodeHostname; + public final int nodeManagerPort; + + public final String objectStoreSocketName; + + public final String rayletSocketName; + public final boolean isAlive; public final Map resources; - public NodeInfo(UniqueId nodeId, String nodeAddress, String nodeHostname, + public NodeInfo(UniqueId nodeId, String nodeAddress, String nodeHostname, int nodeManagerPort, + String objectStoreSocketName, String rayletSocketName, boolean isAlive, Map resources) { this.nodeId = nodeId; this.nodeAddress = nodeAddress; this.nodeHostname = nodeHostname; + this.nodeManagerPort = nodeManagerPort; + this.objectStoreSocketName = objectStoreSocketName; + this.rayletSocketName = rayletSocketName; this.isAlive = isAlive; this.resources = resources; } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 6d326bc35..0e79d8811 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -5,8 +5,10 @@ import io.ray.api.BaseActorHandle; import io.ray.api.id.ActorId; import io.ray.api.id.JobId; import io.ray.api.id.UniqueId; +import io.ray.api.runtimecontext.NodeInfo; import io.ray.runtime.config.RayConfig; import io.ray.runtime.context.NativeWorkerContext; +import io.ray.runtime.exception.RayException; import io.ray.runtime.gcs.GcsClient; import io.ray.runtime.gcs.GcsClientOptions; import io.ray.runtime.gcs.RedisClient; @@ -23,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -72,15 +75,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } catch (IOException e) { throw new RuntimeException("Failed to create the log directory.", e); } - - if (rayConfig.getRedisAddress() != null) { - GcsClient tempGcsClient = - new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); - for (Map.Entry entry : - tempGcsClient.getInternalConfig().entrySet()) { - rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue()); - } - } } private static void resetLibraryPath(RayConfig rayConfig) { @@ -91,6 +85,54 @@ public final class RayNativeRuntime extends AbstractRayRuntime { public RayNativeRuntime(RayConfig rayConfig) { super(rayConfig); + loadConfigFromGCS(rayConfig); + } + + private static void loadConfigFromGCS(RayConfig rayConfig) { + if (rayConfig.getRedisAddress() != null) { + GcsClient tempGcsClient = + new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); + for (Map.Entry entry : + tempGcsClient.getInternalConfig().entrySet()) { + rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue()); + } + + if (rayConfig.workerMode == WorkerType.DRIVER) { + // Keep this method logic in sync with `services.get_address_info_from_redis_helper` + int numRetries = 5; + int retryCount = 0; + boolean configLoaded = false; + while (retryCount++ < numRetries) { + for (NodeInfo nodeInfo : tempGcsClient.getAllNodeInfo()) { + if (rayConfig.nodeIp.equals(nodeInfo.nodeAddress) || + (nodeInfo.nodeAddress.equals("127.0.0.1") && + rayConfig.nodeIp.equals(rayConfig.getRedisAddress()))) { + rayConfig.objectStoreSocketName = nodeInfo.objectStoreSocketName; + rayConfig.rayletSocketName = nodeInfo.rayletSocketName; + rayConfig.nodeManagerPort = nodeInfo.nodeManagerPort; + configLoaded = true; + break; + } + } + if (!configLoaded) { + LOGGER.warn("Some processes that the driver needs to connect to have " + + "not registered with Redis, so retrying. Have you run " + + "'ray start' on this node?"); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + if (!configLoaded) { + throw new RayException("Some processes that the driver needs to connect to have " + + "not registered with Redis. Have you run 'ray start' on this node?"); + } + } + } } @Override diff --git a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java index f282a1058..1c80a6d86 100644 --- a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.commons.lang3.StringUtils; /** * Configurations of Ray runtime. @@ -41,6 +42,9 @@ public class RayConfig { private Config config; + /** + * IP of this node. if not provided, IP will be automatically detected. + */ public final String nodeIp; public final WorkerType workerMode; public final RunMode runMode; @@ -61,11 +65,14 @@ public class RayConfig { public final String headRedisPassword; public final String redisPassword; + // RPC socket name of object store. public String objectStoreSocketName; public final Long objectStoreSize; + // RPC socket name of Raylet. public String rayletSocketName; - private int nodeManagerPort; + // Listening port for node manager. + public int nodeManagerPort; public final Map rayletConfigParameters; public final String jobResourcePath; @@ -125,11 +132,11 @@ public class RayConfig { // Run mode. runMode = config.getEnum(RunMode.class, "ray.run-mode"); // Node ip. - String nodeIp = config.getString("ray.node-ip"); - if (nodeIp.isEmpty()) { + if (config.hasPath("ray.node-ip")) { + nodeIp = config.getString("ray.node-ip"); + } else { nodeIp = NetworkUtil.getIpAddress(null); } - this.nodeIp = nodeIp; // Resources. resources = ResourceUtil.getResourcesMapFromString( config.getString("ray.resources")); @@ -181,9 +188,10 @@ public class RayConfig { // Redis configurations. String redisAddress = config.getString("ray.redis.address"); - if (!redisAddress.isEmpty()) { + if (StringUtils.isNotBlank(redisAddress)) { setRedisAddress(redisAddress); } else { + // We need to start gcs using `RunManager` for local cluster this.redisAddress = null; } @@ -199,12 +207,12 @@ public class RayConfig { } headRedisPassword = config.getString("ray.redis.head-password"); redisPassword = config.getString("ray.redis.password"); - // Raylet node manager port. - nodeManagerPort = config.getInt("ray.raylet.node-manager-port"); - if (nodeManagerPort == 0) { - Preconditions.checkState(this.redisAddress == null, - "Java worker started by raylet should accept the node manager port from raylet."); + if (config.hasPath("ray.raylet.node-manager-port")) { + nodeManagerPort = config.getInt("ray.raylet.node-manager-port"); + } else { + Preconditions.checkState(workerMode != WorkerType.WORKER, + "Worker started by raylet should accept the node manager port from raylet."); nodeManagerPort = NetworkUtil.getUnusedPort(); } diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index a0f0f218d..22e06ad96 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -72,6 +72,7 @@ public class GcsClient { // and it's only one final state for each node in recorded table. NodeInfo nodeInfo = new NodeInfo( nodeId, data.getNodeManagerAddress(), data.getNodeManagerHostname(), + data.getNodeManagerPort(), data.getObjectStoreSocketName(), data.getRayletSocketName(), data.getState() == GcsNodeInfo.GcsNodeState.ALIVE, new HashMap<>()); nodes.put(nodeId, nodeInfo); } diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index b95bc10d8..b3fa59520 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -7,9 +7,6 @@ ray { // Basic configurations // ---------------------- - // IP of this node. if not provided, IP will be automatically detected. - node-ip: "" - // Run mode, available options are: // // `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend, @@ -91,9 +88,6 @@ ray { // Object store configurations // ---------------------------- object-store { - // RPC socket name of object store. - // If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/object_store`. - socket-name: "" // Initial size of the object store. size: 10 MB } @@ -102,12 +96,6 @@ ray { // Raylet configurations // ---------------------------- raylet { - // RPC socket name of Raylet. - // If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/raylet`. - socket-name: "" - // Listening port for node manager. - node-manager-port: 0 - // See src/ray/ray_config_def.h for options. config { num_workers_per_process_java: 10 diff --git a/java/test.sh b/java/test.sh index 62079f181..875522f5c 100755 --- a/java/test.sh +++ b/java/test.sh @@ -16,6 +16,7 @@ run_testng() { fi # exit_code == 2 means there are skipped tests. if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then + find . -name "hs_err_*log" -exec cat {} + exit $exit_code fi } diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 058810dcd..2f4b73b61 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -256,7 +256,7 @@ inline ID JavaByteArrayToId(JNIEnv *env, const jbyteArray &bytes) { std::string id_str(ID::Size(), 0); env->GetByteArrayRegion(bytes, 0, ID::Size(), reinterpret_cast(&id_str.front())); - auto arr_size = env->GetArrayLength(bytes); + auto arr_size = static_cast(env->GetArrayLength(bytes)); RAY_CHECK(arr_size == ID::Size()) << "ID length should be " << ID::Size() << " instead of " << arr_size; return ID::FromBinary(id_str); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java index 8681e3904..e0bd950fa 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java @@ -51,7 +51,8 @@ public class RayUtils { nodeIdBytes[byteIndex] = String.valueOf(i).getBytes()[0]; } NodeInfo nodeInfo = new NodeInfo(new UniqueId(nodeIdBytes), - "localhost" + i, "localhost" + i, + "localhost" + i, "localhost" + i, -1, + "", "", true, resources); nodeInfos.add(nodeInfo); } diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/util/Mockitools.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/util/Mockitools.java index 4dc305d71..6ba018e27 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/util/Mockitools.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/util/Mockitools.java @@ -68,6 +68,10 @@ public class Mockitools { createNodeId(i), "localhost" + i, "localhost" + i, + -1, + "", + "", + true, resources); }