[core] Store Internal Config in GCS (#8921)

This commit is contained in:
Ian Rodney
2020-07-08 09:22:08 -07:00
committed by GitHub
parent 4da0e542d5
commit 9172f8c3a6
36 changed files with 484 additions and 212 deletions
@@ -56,6 +56,15 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
} catch (IOException e) {
throw new RuntimeException("Failed to create the log directory.", e);
}
if (rayConfig.getRedisAddress() != null) {
GcsClient tempGcsClient =
new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
for (Map.Entry<String, String> entry :
tempGcsClient.getInternalConfig().entrySet()) {
rayConfig.rayletConfigParameters.put(entry.getKey(), entry.getValue());
}
}
}
private static void resetLibraryPath(RayConfig rayConfig) {
@@ -93,6 +93,17 @@ public class GcsClient {
return new ArrayList<>(nodes.values());
}
public Map<String, String> getInternalConfig() {
Gcs.StoredConfig storedConfig;
byte[] conf = globalStateAccessor.getInternalConfig();
try {
storedConfig = Gcs.StoredConfig.parseFrom(conf);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid internal config protobuf from GCS.");
}
return storedConfig.getConfigMap();
}
private Map<String, Double> getResourcesForClient(UniqueId clientId) {
byte[] resourceMapBytes = globalStateAccessor.getNodeResourceInfo(clientId);
Gcs.ResourceMap resourceMap;
@@ -78,6 +78,14 @@ public class GlobalStateAccessor {
}
}
public byte[] getInternalConfig() {
synchronized (GlobalStateAccessor.class) {
Preconditions.checkState(globalStateAccessorNativePointer != 0,
"Get internal config when global state accessor have been destroyed.");
return nativeGetInternalConfig(globalStateAccessorNativePointer);
}
}
/**
* @return A list of actor info with ActorInfo protobuf schema.
*/
@@ -135,6 +143,8 @@ public class GlobalStateAccessor {
private native byte[] nativeGetNodeResourceInfo(long nativePtr, byte[] nodeId);
private native byte[] nativeGetInternalConfig(long nativePtr);
private native List<byte[]> nativeGetAllActorInfo(long nativePtr);
private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);
@@ -194,7 +194,7 @@ public class RunManager {
startGcs();
}
startObjectStore();
startRaylet();
startRaylet(isHead);
LOGGER.info("All processes started @ {}.", rayConfig.nodeIp);
} catch (Exception e) {
// Clean up started processes.
@@ -292,7 +292,7 @@ public class RunManager {
return ip + ":" + port;
}
private void startRaylet() throws IOException {
private void startRaylet(boolean isHead) throws IOException {
int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
int maximumStartupConcurrency = Math.max(1,
Math.min(rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency));
@@ -325,7 +325,8 @@ public class RunManager {
.collect(Collectors.joining(","))),
String.format("--python_worker_command=%s", buildPythonWorkerCommand()),
String.format("--java_worker_command=%s", buildWorkerCommand()),
String.format("--redis_password=%s", redisPasswordOption)
String.format("--redis_password=%s", redisPasswordOption),
isHead ? "--head_node" : ""
);
startProcess(command, null, "raylet");