Improve Java logging (#8640)

This commit is contained in:
Hao Chen
2020-06-02 16:15:20 +08:00
committed by GitHub
parent 4cbbc15ca7
commit dc3e98890f
14 changed files with 158 additions and 219 deletions
@@ -4,6 +4,8 @@ import io.ray.api.runtime.RayRuntime;
import io.ray.api.runtime.RayRuntimeFactory;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.config.RunMode;
import io.ray.runtime.generated.Common.WorkerType;
import io.ray.runtime.util.LoggingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,12 +14,21 @@ import org.slf4j.LoggerFactory;
*/
public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRayRuntimeFactory.class);
@Override
public RayRuntime createRayRuntime() {
RayConfig rayConfig = RayConfig.getInstance();
LoggingUtil.setupLogging(rayConfig);
Logger logger = LoggerFactory.getLogger(DefaultRayRuntimeFactory.class);
if (rayConfig.workerMode == WorkerType.WORKER) {
// Handle the uncaught exceptions thrown from user-spawned threads.
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
logger.error(String.format("Uncaught worker exception in thread %s", t), e);
});
}
try {
logger.debug("Initializing runtime with config: {}", rayConfig);
AbstractRayRuntime innerRuntime = rayConfig.runMode == RunMode.SINGLE_PROCESS
? new RayDevRuntime(rayConfig)
: new RayNativeRuntime(rayConfig);
@@ -27,7 +38,7 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
runtime.start();
return runtime;
} catch (Exception e) {
LOGGER.error("Failed to initialize ray runtime", e);
logger.error("Failed to initialize ray runtime, with config " + rayConfig, e);
throw new RuntimeException("Failed to initialize ray runtime", e);
}
}
@@ -96,7 +96,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
objectStore = new NativeObjectStore(workerContext);
taskSubmitter = new NativeTaskSubmitter();
LOGGER.info("RayNativeRuntime started with store {}, raylet {}",
LOGGER.debug("RayNativeRuntime started with store {}, raylet {}",
rayConfig.objectStoreSocketName, rayConfig.rayletSocketName);
}
@@ -114,7 +114,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
gcsClient = null;
}
RayConfig.reset();
LOGGER.info("RayNativeRuntime shutdown");
LOGGER.debug("RayNativeRuntime shutdown");
}
// For test purpose only
@@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValue;
import io.ray.api.id.JobId;
import io.ray.runtime.generated.Common.WorkerType;
@@ -18,8 +19,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Configurations of Ray runtime.
@@ -27,8 +26,6 @@ import org.slf4j.LoggerFactory;
*/
public class RayConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RayConfig.class);
public static final String DEFAULT_CONFIG_FILE = "ray.default.conf";
public static final String CUSTOM_CONFIG_FILE = "ray.conf";
@@ -48,7 +45,6 @@ public class RayConfig {
private JobId jobId;
public String sessionDir;
public String logDir;
public final boolean redirectOutput;
public final List<String> libraryPath;
public final List<String> classpath;
public final List<String> jvmParameters;
@@ -136,8 +132,6 @@ public class RayConfig {
if (isDriver) {
if (!resources.containsKey("CPU")) {
int numCpu = Runtime.getRuntime().availableProcessors();
LOGGER.warn("No CPU resource is set in configuration, "
+ "setting it to the number of CPU cores: {}", numCpu);
resources.put("CPU", numCpu * 1.0);
}
}
@@ -153,8 +147,6 @@ public class RayConfig {
// Object store configurations.
objectStoreSize = config.getBytes("ray.object-store.size");
// Redirect output.
redirectOutput = config.getBoolean("ray.redirect-output");
// Library path.
libraryPath = config.getStringList("ray.library.path");
// Custom classpath.
@@ -219,11 +211,10 @@ public class RayConfig {
numWorkersPerProcess = config.getInt("ray.raylet.config.num_workers_per_process_java");
gcsServiceEnabled = System.getenv("RAY_GCS_SERVICE_ENABLED") == null ||
System.getenv("RAY_GCS_SERVICE_ENABLED").toLowerCase().equals("true");
System.getenv("RAY_GCS_SERVICE_ENABLED").toLowerCase().equals("true");
// Validate config.
validate();
LOGGER.debug("Created config: {}", this);
}
public void setRedisAddress(String redisAddress) {
@@ -269,27 +260,45 @@ public class RayConfig {
return sessionDir;
}
public Config getInternalConfig() {
return config;
}
/**
* Renders the config value as a HOCON string.
*/
public String render() {
// These items might be dynamically generated or mutated at runtime.
// Explicitly include them.
Map<String, Object> dynamic = new HashMap<>();
dynamic.put("ray.session-dir", sessionDir);
dynamic.put("ray.raylet.socket-name", rayletSocketName);
dynamic.put("ray.object-store.socket-name", objectStoreSocketName);
dynamic.put("ray.raylet.node-manager-port", nodeManagerPort);
dynamic.put("ray.redis.address", redisAddress);
dynamic.put("ray.job.resource-path", jobResourcePath);
Config toRender = ConfigFactory.parseMap(dynamic).withFallback(config);
return toRender.root().render(ConfigRenderOptions.concise());
}
private void updateSessionDir() {
// session dir
String localSessionDir = System.getProperty("ray.session-dir");
if (workerMode == WorkerType.DRIVER) {
Preconditions.checkState(localSessionDir == null);
final int minBound = 100000;
final int maxBound = 999999;
final String sessionName = String.format("session_%s_%d", DATE_TIME_FORMATTER.format(
LocalDateTime.now()), RANDOM.nextInt(maxBound - minBound) + minBound);
sessionDir = String.format("%s/%s", DEFAULT_TEMP_DIR, sessionName);
} else if (workerMode == WorkerType.WORKER) {
Preconditions.checkState(localSessionDir != null);
sessionDir = removeTrailingSlash(localSessionDir);
sessionDir = removeTrailingSlash(config.getString("ray.session-dir"));
} else {
throw new RuntimeException("Unknown worker type.");
}
// Log dir.
String localLogDir = null;
if (config.hasPath("ray.log-dir")) {
localLogDir = removeTrailingSlash(config.getString("ray.log-dir"));
if (config.hasPath("ray.logging.dir")) {
localLogDir = removeTrailingSlash(config.getString("ray.logging.dir"));
}
if (Strings.isNullOrEmpty(localLogDir)) {
logDir = String.format("%s/logs", sessionDir);
@@ -323,29 +332,7 @@ public class RayConfig {
@Override
public String toString() {
return "RayConfig{"
+ ", nodeIp='" + nodeIp + '\''
+ ", workerMode=" + workerMode
+ ", runMode=" + runMode
+ ", resources=" + resources
+ ", jobId=" + jobId
+ ", logDir='" + logDir + '\''
+ ", redirectOutput=" + redirectOutput
+ ", libraryPath=" + libraryPath
+ ", classpath=" + classpath
+ ", jvmParameters=" + jvmParameters
+ ", redisAddress='" + redisAddress + '\''
+ ", redisIp='" + redisIp + '\''
+ ", redisPort=" + redisPort
+ ", headRedisPort=" + headRedisPort
+ ", numberRedisShards=" + numberRedisShards
+ ", objectStoreSocketName='" + objectStoreSocketName + '\''
+ ", objectStoreSize=" + objectStoreSize
+ ", rayletSocketName='" + rayletSocketName + '\''
+ ", rayletConfigParameters=" + rayletConfigParameters
+ ", jobResourcePath='" + jobResourcePath + '\''
+ ", pythonWorkerCommand='" + pythonWorkerCommand + '\''
+ '}';
return render();
}
/**
@@ -357,16 +344,14 @@ public class RayConfig {
public static RayConfig create() {
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.systemProperties();
String configPath = System.getProperty("ray.config");
String configPath = System.getProperty("ray.config-file");
if (Strings.isNullOrEmpty(configPath)) {
LOGGER.info("Loading config from \"ray.conf\" file in classpath.");
config = config.withFallback(ConfigFactory.load(CUSTOM_CONFIG_FILE));
} else {
LOGGER.info("Loading config from " + configPath + ".");
config = config.withFallback(ConfigFactory.parseFile(new File(configPath)));
}
config = config.withFallback(ConfigFactory.load(DEFAULT_CONFIG_FILE));
return new RayConfig(config);
return new RayConfig(config.withOnlyPath("ray"));
}
}
@@ -10,18 +10,18 @@ import io.ray.runtime.util.BinaryFileUtil;
import io.ray.runtime.util.ResourceUtil;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,17 +34,12 @@ public class RunManager {
private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class);
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("YYYY-MM-dd_HH-mm-ss");
private static final String WORKER_CLASS = "io.ray.runtime.runner.worker.DefaultWorker";
private static final String SESSION_LATEST = "session_latest";
private RayConfig rayConfig;
private Random random = new Random();
private List<Pair<String, Process>> processes;
private static final int KILL_PROCESS_WAIT_TIMEOUT_SECONDS = 1;
@@ -84,7 +79,7 @@ public class RunManager {
}
numAttempts++;
}
LOGGER.info("Process {} is now terminated.", name);
LOGGER.debug("Process {} is now terminated.", name);
}
/**
@@ -117,6 +112,22 @@ public class RunManager {
}
}
/**
* @return Log files for stdout and stderr.
*/
private Pair<File, File> getLogFiles(String logDir, String processName) {
int suffixIndex = 0;
while (true) {
String suffix = suffixIndex == 0 ? "" : "." + suffixIndex;
File stdout = new File(String.format("%s/%s%s.out", logDir, suffix, processName));
File stderr = new File(String.format("%s/%s%s.err", logDir, suffix, processName));
if (!stdout.exists() && !stderr.exists()) {
return ImmutablePair.of(stdout, stderr);
}
suffixIndex += 1;
}
}
/**
* Start a process.
*
@@ -134,15 +145,11 @@ public class RunManager {
String stdout = "";
String stderr = "";
if (rayConfig.redirectOutput) {
// Set stdout and stderr paths.
int logId = random.nextInt(10000);
String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
stdout = String.format("%s/%s-%s-%05d.out", rayConfig.logDir, name, date, logId);
stderr = String.format("%s/%s-%s-%05d.err", rayConfig.logDir, name, date, logId);
builder.redirectOutput(new File(stdout));
builder.redirectError(new File(stderr));
}
// Set stdout and stderr paths.
Pair<File, File> logFiles = getLogFiles(rayConfig.logDir, name);
builder.redirectOutput(logFiles.getLeft());
builder.redirectError(logFiles.getRight());
// Set environment variables.
if (env != null && !env.isEmpty()) {
builder.environment().putAll(env);
@@ -164,18 +171,14 @@ public class RunManager {
if (!p.isAlive()) {
String message = String.format("Failed to start %s. Exit code: %d.",
name, p.exitValue());
if (rayConfig.redirectOutput) {
message += String.format(" Logs are redirected to %s and %s.", stdout, stderr);
}
message += String.format(" Logs are redirected to %s and %s.", stdout, stderr);
throw new RuntimeException(message);
}
processes.add(Pair.of(name, p));
if (LOGGER.isInfoEnabled()) {
if (LOGGER.isDebugEnabled()) {
String message = String.format("%s process started.", name);
if (rayConfig.redirectOutput) {
message += String.format(" Logs are redirected to %s and %s.", stdout, stderr);
}
LOGGER.info(message);
message += String.format(" Logs are redirected to %s and %s.", stdout, stderr);
LOGGER.debug(message);
}
}
@@ -185,7 +188,7 @@ public class RunManager {
* @param isHead Whether this node is the head node. If true, redis server will be started.
*/
public void startRayProcesses(boolean isHead) {
LOGGER.info("Starting ray processes @ {}.", rayConfig.nodeIp);
LOGGER.debug("Starting ray processes @ {}.", rayConfig.nodeIp);
try {
if (isHead) {
startGcs();
@@ -243,7 +246,7 @@ public class RunManager {
String.format("--config_list=%s",
rayConfig.rayletConfigParameters.entrySet().stream()
.map(entry -> entry.getKey() + "," + entry.getValue()).collect(Collectors
.joining(","))),
.joining(","))),
String.format("--redis_password=%s", redisPasswordOption)
);
startProcess(command, null, "gcs_server");
@@ -274,7 +277,7 @@ public class RunManager {
command.add(password);
}
String name = shard == null ? "redis" : "redis-" + shard;
String name = shard == null ? "redis" : "redis-shard_" + shard;
startProcess(command, null, name);
try (Jedis client = new Jedis("127.0.0.1", port)) {
@@ -291,7 +294,7 @@ public class RunManager {
return ip + ":" + port;
}
private void startRaylet() {
private void startRaylet() throws IOException {
int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
int maximumStartupConcurrency = Math.max(1,
Math.min(rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency));
@@ -336,7 +339,7 @@ public class RunManager {
return stream.filter(s -> !s.contains(" ")).collect(Collectors.joining(":"));
}
private String buildWorkerCommand() {
private String buildWorkerCommand() throws IOException {
List<String> cmd = new ArrayList<>();
cmd.add("java");
cmd.add("-classpath");
@@ -348,41 +351,11 @@ public class RunManager {
));
cmd.add(classpath);
// library path
String libraryPath = concatPath(rayConfig.libraryPath.stream());
cmd.add("-Djava.library.path=" + libraryPath);
// session path
cmd.add("-Dray.session-dir=" + rayConfig.sessionDir);
// logging path
if (rayConfig.redirectOutput) {
cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender");
cmd.add("-Dray.logging.file=org.apache.log4j.FileAppender");
int logId = random.nextInt(10000);
String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
String logFile = String.format("%s/worker-%s-%05d.out", rayConfig.logDir, date, logId);
cmd.add("-Dray.logging.file.path=" + logFile);
}
if (!Strings.isNullOrEmpty(rayConfig.jobResourcePath)) {
cmd.add("-Dray.job.resource-path=" + rayConfig.jobResourcePath);
}
// socket names
cmd.add("-Dray.raylet.socket-name=" + rayConfig.rayletSocketName);
cmd.add("-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName);
cmd.add("-Dray.raylet.node-manager-port=" + rayConfig.getNodeManagerPort());
// Config overwrite
cmd.add("-Dray.redis.address=" + rayConfig.getRedisAddress());
// redis password
if (!Strings.isNullOrEmpty(rayConfig.headRedisPassword)) {
cmd.add("-Dray.redis.password=" + rayConfig.headRedisPassword);
}
// Write current config to a file, and set the file path as Java worker's config file.
// This allows users to set worker config by setting driver's system properties.
File workerConfigFile = new File(rayConfig.sessionDir + "/java_worker.conf");
FileUtils.write(workerConfigFile, rayConfig.render(), Charset.defaultCharset());
cmd.add("-Dray.config-file=" + workerConfigFile.getAbsolutePath());
cmd.add("RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER");
@@ -1,30 +0,0 @@
package io.ray.runtime.runner.worker;
import io.ray.api.Ray;
/**
* The main function of DefaultDriver.
*/
public class DefaultDriver {
//
// " --node-ip-address=" + ip
// + " --redis-address=" + redisAddress
// + " --driver-class" + className
//
public static void main(String[] args) {
try {
System.setProperty("ray.worker.mode", "DRIVER");
Ray.init();
String driverClass = null;
String driverArgs = null;
Class<?> cls = Class.forName(driverClass);
String[] argsArray = (driverArgs != null) ? driverArgs.split(",") : (new String[] {});
cls.getMethod("main", String[].class).invoke(null, (Object) argsArray);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
}
@@ -2,30 +2,18 @@ package io.ray.runtime.runner.worker;
import io.ray.api.Ray;
import io.ray.runtime.RayRuntimeInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of the worker process.
*/
public class DefaultWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultWorker.class);
public static void main(String[] args) {
try {
System.setProperty("ray.worker.mode", "WORKER");
// Set run-mode to `CLUSTER` explicitly, to prevent the DefaultWorker to receive
// a wrong run-mode parameter through jvm options.
System.setProperty("ray.run-mode", "CLUSTER");
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOGGER.error("Uncaught worker exception in thread {}: {}", t, e);
});
Ray.init();
LOGGER.info("Worker started.");
((RayRuntimeInternal) Ray.internal()).run();
} catch (Exception e) {
LOGGER.error("Failed to start worker.", e);
}
// Set run-mode to `CLUSTER` explicitly, to prevent the DefaultWorker to receive
// a wrong run-mode parameter through jvm options.
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.worker.mode", "WORKER");
Ray.init();
((RayRuntimeInternal) Ray.internal()).run();
}
}
}
@@ -0,0 +1,48 @@
package io.ray.runtime.util;
import com.typesafe.config.Config;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.generated.Common.WorkerType;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.apache.log4j.WriterAppender;
public class LoggingUtil {
private static boolean setup = false;
public static synchronized void setupLogging(RayConfig rayConfig) {
if (setup) {
return;
}
setup = true;
WriterAppender appender;
Config config = rayConfig.getInternalConfig();
if (rayConfig.workerMode == WorkerType.DRIVER) {
// Logs of drivers are printed to console.
appender = new ConsoleAppender();
appender.setName("console");
} else {
// Logs of workers are printed to files.
RollingFileAppender rfAppender = new RollingFileAppender();
appender = rfAppender;
rfAppender.setName("file");
String logPath = rayConfig.logDir + "/java-worker-" + SystemUtil.pid() + ".log";
rfAppender.setFile(logPath);
rfAppender.setMaxFileSize(config.getString("ray.logging.max-file-size"));
rfAppender.setMaxBackupIndex(config.getInt("ray.logging.max-backup-files"));
}
Level level = Level.toLevel(config.getString("ray.logging.level"));
appender.setThreshold(level);
PatternLayout patternLayout = new PatternLayout(config.getString("ray.logging.pattern"));
appender.setLayout(patternLayout);
appender.activateOptions();
Logger.getLogger("io.ray").addAppender(appender);
}
}
@@ -3,45 +3,15 @@ package io.ray.runtime.util;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* some utilities for system process.
*/
public class SystemUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(SystemUtil.class);
static final ReentrantLock pidlock = new ReentrantLock();
static Integer pid;
public static String userHome() {
return System.getProperty("user.home");
}
public static String userDir() {
return System.getProperty("user.dir");
}
public static boolean startWithJar(Class<?> cls) {
return cls.getResource(cls.getSimpleName() + ".class").getFile().split("!")[0].endsWith(".jar");
}
public static boolean startWithJar(String clsName) {
Class<?> cls;
try {
cls = Class.forName(clsName);
return cls.getResource(cls.getSimpleName() + ".class").getFile().split("!")[0]
.endsWith(".jar");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
LOGGER.error("error at SystemUtil startWithJar", e);
return false;
}
}
public static int pid() {
if (pid == null) {
pidlock.lock();
@@ -1,17 +0,0 @@
ray.logging.level=info
ray.logging.stdout=org.apache.log4j.ConsoleAppender
ray.logging.file=org.apache.log4j.varia.NullAppender
log4j.rootLogger=${ray.logging.level}, stdout, file
log4j.appender.stdout=${ray.logging.stdout}
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n
# Set the file appender to null by default. If `ray.redirect-output` config is set to true,
# this appender will be set to a real file appender.
log4j.appender.file=${ray.logging.file}
log4j.appender.file.File=${ray.logging.file.path}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n
@@ -31,13 +31,20 @@ ray {
resource-path: ""
}
// Root dir of log files.
// If this is not set, the default log-dir will be `${temp-dir}/session_xxx/logs`.
log-dir: ""
// If true, output of worker processes will be redirected to log files.
// Otherwise, output will be printed to console.
redirect-output: true
// Configurations about logging.
logging {
// Level of logging for Java workers.
level: INFO
// Pattern of log messages.
pattern: "%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n"
// Root directory of the log files.
// If this is not set, the default one will be `${temp-dir}/session_xxx/logs`.
dir: ""
// Maximum size that a log file is allowed to reach before being rolled over to backup files.
max-file-size: 500MB
// Maximum number of backup files to keep around.
max-backup-files: 10
}
// Custom worker jvm parameters.
worker.jvm-parameters: []