[Java] Simplify ray cmd params (#10394)

This commit is contained in:
chaokunyang
2020-09-02 19:47:52 +08:00
committed by GitHub
parent 6fa0edfbef
commit f10a5a40b0
9 changed files with 89 additions and 34 deletions
@@ -5,8 +5,10 @@ import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.api.id.JobId;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.NativeWorkerContext;
import io.ray.runtime.exception.RayException;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.gcs.GcsClientOptions;
import io.ray.runtime.gcs.RedisClient;
@@ -23,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -72,15 +75,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
} catch (IOException e) {
throw new RuntimeException("Failed to create the log directory.", e);
}
if (rayConfig.getRedisAddress() != null) {
GcsClient tempGcsClient =
new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
for (Map.Entry<String, String> entry :
tempGcsClient.getInternalConfig().entrySet()) {
rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue());
}
}
}
private static void resetLibraryPath(RayConfig rayConfig) {
@@ -91,6 +85,54 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
public RayNativeRuntime(RayConfig rayConfig) {
super(rayConfig);
loadConfigFromGCS(rayConfig);
}
private static void loadConfigFromGCS(RayConfig rayConfig) {
if (rayConfig.getRedisAddress() != null) {
GcsClient tempGcsClient =
new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
for (Map.Entry<String, String> entry :
tempGcsClient.getInternalConfig().entrySet()) {
rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue());
}
if (rayConfig.workerMode == WorkerType.DRIVER) {
// Keep this method logic in sync with `services.get_address_info_from_redis_helper`
int numRetries = 5;
int retryCount = 0;
boolean configLoaded = false;
while (retryCount++ < numRetries) {
for (NodeInfo nodeInfo : tempGcsClient.getAllNodeInfo()) {
if (rayConfig.nodeIp.equals(nodeInfo.nodeAddress) ||
(nodeInfo.nodeAddress.equals("127.0.0.1") &&
rayConfig.nodeIp.equals(rayConfig.getRedisAddress()))) {
rayConfig.objectStoreSocketName = nodeInfo.objectStoreSocketName;
rayConfig.rayletSocketName = nodeInfo.rayletSocketName;
rayConfig.nodeManagerPort = nodeInfo.nodeManagerPort;
configLoaded = true;
break;
}
}
if (!configLoaded) {
LOGGER.warn("Some processes that the driver needs to connect to have " +
"not registered with Redis, so retrying. Have you run " +
"'ray start' on this node?");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
break;
}
}
if (!configLoaded) {
throw new RayException("Some processes that the driver needs to connect to have " +
"not registered with Redis. Have you run 'ray start' on this node?");
}
}
}
}
@Override
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
/**
* Configurations of Ray runtime.
@@ -41,6 +42,9 @@ public class RayConfig {
private Config config;
/**
* IP of this node. if not provided, IP will be automatically detected.
*/
public final String nodeIp;
public final WorkerType workerMode;
public final RunMode runMode;
@@ -61,11 +65,14 @@ public class RayConfig {
public final String headRedisPassword;
public final String redisPassword;
// RPC socket name of object store.
public String objectStoreSocketName;
public final Long objectStoreSize;
// RPC socket name of Raylet.
public String rayletSocketName;
private int nodeManagerPort;
// Listening port for node manager.
public int nodeManagerPort;
public final Map<String, String> rayletConfigParameters;
public final String jobResourcePath;
@@ -125,11 +132,11 @@ public class RayConfig {
// Run mode.
runMode = config.getEnum(RunMode.class, "ray.run-mode");
// Node ip.
String nodeIp = config.getString("ray.node-ip");
if (nodeIp.isEmpty()) {
if (config.hasPath("ray.node-ip")) {
nodeIp = config.getString("ray.node-ip");
} else {
nodeIp = NetworkUtil.getIpAddress(null);
}
this.nodeIp = nodeIp;
// Resources.
resources = ResourceUtil.getResourcesMapFromString(
config.getString("ray.resources"));
@@ -181,9 +188,10 @@ public class RayConfig {
// Redis configurations.
String redisAddress = config.getString("ray.redis.address");
if (!redisAddress.isEmpty()) {
if (StringUtils.isNotBlank(redisAddress)) {
setRedisAddress(redisAddress);
} else {
// We need to start gcs using `RunManager` for local cluster
this.redisAddress = null;
}
@@ -199,12 +207,12 @@ public class RayConfig {
}
headRedisPassword = config.getString("ray.redis.head-password");
redisPassword = config.getString("ray.redis.password");
// Raylet node manager port.
nodeManagerPort = config.getInt("ray.raylet.node-manager-port");
if (nodeManagerPort == 0) {
Preconditions.checkState(this.redisAddress == null,
"Java worker started by raylet should accept the node manager port from raylet.");
if (config.hasPath("ray.raylet.node-manager-port")) {
nodeManagerPort = config.getInt("ray.raylet.node-manager-port");
} else {
Preconditions.checkState(workerMode != WorkerType.WORKER,
"Worker started by raylet should accept the node manager port from raylet.");
nodeManagerPort = NetworkUtil.getUnusedPort();
}
@@ -72,6 +72,7 @@ public class GcsClient {
// and it's only one final state for each node in recorded table.
NodeInfo nodeInfo = new NodeInfo(
nodeId, data.getNodeManagerAddress(), data.getNodeManagerHostname(),
data.getNodeManagerPort(), data.getObjectStoreSocketName(), data.getRayletSocketName(),
data.getState() == GcsNodeInfo.GcsNodeState.ALIVE, new HashMap<>());
nodes.put(nodeId, nodeInfo);
}
@@ -7,9 +7,6 @@ ray {
// Basic configurations
// ----------------------
// IP of this node. if not provided, IP will be automatically detected.
node-ip: ""
// Run mode, available options are:
//
// `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend,
@@ -91,9 +88,6 @@ ray {
// Object store configurations
// ----------------------------
object-store {
// RPC socket name of object store.
// If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/object_store`.
socket-name: ""
// Initial size of the object store.
size: 10 MB
}
@@ -102,12 +96,6 @@ ray {
// Raylet configurations
// ----------------------------
raylet {
// RPC socket name of Raylet.
// If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/raylet`.
socket-name: ""
// Listening port for node manager.
node-manager-port: 0
// See src/ray/ray_config_def.h for options.
config {
num_workers_per_process_java: 10