diff --git a/java/README.rst b/java/README.rst index 5d6e0ab4c..bfa8ac641 100644 --- a/java/README.rst +++ b/java/README.rst @@ -1,18 +1,15 @@ -This directory contains the java worker, with the following components. - -- java/api: Ray API definition -- java/common: utilities -- java/runtime-common: common implementation of the runtime in worker -- java/runtime-dev: a pure-java mock implementation of the runtime for - fast development -- java/runtime-native: a native implementation of the runtime -- java/test: various tests -- src/local\_scheduler/lib/java: JNI client library for local scheduler -- src/plasma/lib/java: JNI client library for plasma storage - Quick start =========== +Configuration +------------- +Ray will read your configurations in the following order: + +* Java system properties: e.g., ``-Dray.home=/path/to/ray``. +* A ``ray.conf`` file in the classpath: `example `_. + +For all available config items and default values, see `this file `_. + Starting Ray ------------ diff --git a/java/api/pom.xml b/java/api/pom.xml index 04a3d053c..ebf3f7c51 100644 --- a/java/api/pom.xml +++ b/java/api/pom.xml @@ -14,4 +14,11 @@ java api for ray jar + + + + org.slf4j + slf4j-log4j12 + + diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index a7d6ed313..053f01d55 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -2,7 +2,6 @@ package org.ray.api; import java.util.List; import org.ray.api.id.UniqueId; -import org.ray.api.runtime.DefaultRayRuntimeFactory; import org.ray.api.runtime.RayRuntime; import org.ray.api.runtime.RayRuntimeFactory; @@ -17,7 +16,14 @@ public final class Ray extends RayCall { * Initialize Ray runtime with the default runtime implementation. */ public static void init() { - init(new DefaultRayRuntimeFactory()); + try { + Class clz = Class.forName("org.ray.runtime.DefaultRayRuntimeFactory"); + RayRuntimeFactory factory = (RayRuntimeFactory) clz.newInstance(); + init(factory); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize Ray runtime.", e); + } + } /** diff --git a/java/api/src/main/java/org/ray/api/runtime/DefaultRayRuntimeFactory.java b/java/api/src/main/java/org/ray/api/runtime/DefaultRayRuntimeFactory.java deleted file mode 100644 index 13ff9205a..000000000 --- a/java/api/src/main/java/org/ray/api/runtime/DefaultRayRuntimeFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.ray.api.runtime; - -import java.lang.reflect.Method; - -/** - * The default Ray runtime factory. It produces an instance of AbstractRayRuntime. - */ -public class DefaultRayRuntimeFactory implements RayRuntimeFactory { - - @Override - public RayRuntime createRayRuntime() { - try { - Method m = Class.forName("org.ray.runtime.AbstractRayRuntime").getDeclaredMethod("init"); - m.setAccessible(true); - RayRuntime runtime = (RayRuntime) m.invoke(null); - m.setAccessible(false); - return runtime; - } catch (Exception e) { - throw new RuntimeException("Failed to initialize ray runtime", e); - } - } -} diff --git a/java/cli/src/main/java/org/ray/cli/CommandStart.java b/java/cli/src/main/java/org/ray/cli/CommandStart.java index 218caf1be..f476373b2 100644 --- a/java/cli/src/main/java/org/ray/cli/CommandStart.java +++ b/java/cli/src/main/java/org/ray/cli/CommandStart.java @@ -12,10 +12,4 @@ public class CommandStart { @Parameter(names = "--head", description = "start the head node") public boolean head; - @Parameter(names = "--config", description = "the config file of ray") - public String config = ""; - - @Parameter(names = "--overwrite", description = "the overwrite items of config") - public String overwrite = ""; - } diff --git a/java/cli/src/main/java/org/ray/cli/CommandSubmit.java b/java/cli/src/main/java/org/ray/cli/CommandSubmit.java deleted file mode 100644 index cdef011ce..000000000 --- a/java/cli/src/main/java/org/ray/cli/CommandSubmit.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.ray.cli; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; - -/** - * Arguments for command submit. - */ -@Parameters(separators = "= ", commandDescription = "submit a job to ray cluster") -public class CommandSubmit { - - @Parameter(names = "--package", description = "java jar package zip file", required = true) - public String packageZip; - - @Parameter(names = "--class", description = "java class name", required = true) - public String className; - - @Parameter(names = "--args", description = "arguments for the java class") - public String classArgs; - - @Parameter(names = "--config", description = "the config file of ray") - public String config; - - @Parameter(names = "--redis-address", description = "ip:port for redis service", required = true) - public String redisAddress; - -} diff --git a/java/cli/src/main/java/org/ray/cli/RayCli.java b/java/cli/src/main/java/org/ray/cli/RayCli.java index 7c39030fd..d79262a06 100644 --- a/java/cli/src/main/java/org/ray/cli/RayCli.java +++ b/java/cli/src/main/java/org/ray/cli/RayCli.java @@ -2,20 +2,10 @@ package org.ray.cli; import com.beust.jcommander.JCommander; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.ray.api.id.UniqueId; -import org.ray.runtime.config.PathConfig; -import org.ray.runtime.config.RayParameters; -import org.ray.runtime.config.RunMode; -import org.ray.runtime.gcs.KeyValueStoreLink; -import org.ray.runtime.gcs.RedisClient; -import org.ray.runtime.gcs.StateStoreProxy; -import org.ray.runtime.gcs.StateStoreProxyImpl; +import org.ray.runtime.config.RayConfig; import org.ray.runtime.runner.RunManager; -import org.ray.runtime.runner.worker.DefaultDriver; -import org.ray.runtime.util.config.ConfigReader; -import org.ray.runtime.util.logger.RayLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -23,179 +13,69 @@ import org.ray.runtime.util.logger.RayLog; */ public class RayCli { + private static final Logger LOGGER = LoggerFactory.getLogger(RayCli.class); + private static RayCliArgs rayArgs = new RayCliArgs(); - private static RunManager startRayHead(RayParameters params, PathConfig paths, - ConfigReader configReader) { - RunManager manager = new RunManager(params, paths, configReader); - + private static RunManager startRayHead() { + RayConfig rayConfig = RayConfig.create(); + RunManager manager = new RunManager(rayConfig); try { - manager.startRayHead(); + manager.startRayProcesses(true); } catch (Exception e) { - e.printStackTrace(); - RayLog.core.error("error at RayCli startRayHead", e); - throw new RuntimeException("Ray head node start failed", e); + LOGGER.error("Failed to start head node.", e); + throw new RuntimeException("Failed to start Ray head node.", e); } - - RayLog.core.info("Started Ray head node. Redis address: {}", manager.info().redisAddress); + LOGGER.info("Ray head node started. Redis address is {}", rayConfig.getRedisAddress()); return manager; } - private static RunManager startRayNode(RayParameters params, PathConfig paths, - ConfigReader configReader) { - RunManager manager = new RunManager(params, paths, configReader); - + private static RunManager startRayNode() { + RayConfig rayConfig = RayConfig.create(); + RunManager manager = new RunManager(rayConfig); try { - manager.startRayNode(); + manager.startRayProcesses(false); } catch (Exception e) { - e.printStackTrace(); - RayLog.core.error("error at RayCli startRayNode", e); - throw new RuntimeException("Ray work node start failed, err = " + e.getMessage()); + LOGGER.error("Failed to start work node.", e); + throw new RuntimeException("Failed to start work node.", e); } - RayLog.core.info("Started Ray work node."); + LOGGER.info("Ray work node started."); return manager; } - private static RunManager startProcess(CommandStart cmdStart, ConfigReader config) { - PathConfig paths = new PathConfig(config); - RayParameters params = new RayParameters(config); - - // Init RayLog before using it. - RayLog.init(params.log_dir); - - RayLog.core.info("Using IP address {} for this node.", params.node_ip_address); + private static RunManager startProcess(CommandStart cmdStart) { RunManager manager; if (cmdStart.head) { - manager = startRayHead(params, paths, config); + manager = startRayHead(); } else { - manager = startRayNode(params, paths, config); + manager = startRayNode(); } return manager; } - private static void start(CommandStart cmdStart, ConfigReader reader) { - startProcess(cmdStart, reader); + private static void start(CommandStart cmdStart) { + startProcess(cmdStart); } private static void stop(CommandStop cmdStop) { String[] cmd = {"/bin/sh", "-c", ""}; - - cmd[2] = "killall global_scheduler local_scheduler plasma_store plasma_manager"; - try { - Runtime.getRuntime().exec(cmd); - } catch (IOException e) { - RayLog.core.warn("exception in killing ray processes"); - } - - cmd[2] = "kill $(ps aux | grep redis-server | grep -v grep | " + cmd[2] = "kill $(ps aux | grep ray | grep -v grep | " + "awk \'{ print $2 }\') 2> /dev/null"; try { Runtime.getRuntime().exec(cmd); } catch (IOException e) { - RayLog.core.warn("exception in killing ray processes"); - } - - cmd[2] = "kill -9 $(ps aux | grep DefaultWorker | grep -v grep | " - + "awk \'{ print $2 }\') 2> /dev/null"; - try { - Runtime.getRuntime().exec(cmd); - } catch (IOException e) { - RayLog.core.warn("exception in killing ray processes"); + LOGGER.error("Exception in killing ray processes.", e); } } - private static String[] buildRayRuntimeArgs(CommandSubmit cmdSubmit) { - - if (cmdSubmit.redisAddress == null) { - throw new RuntimeException( - "--redis-address must be specified to submit a job"); - } - - List argList = new ArrayList(); - String section = "ray.java.start."; - String overwrite = "--overwrite=" - + section + "redis_address=" + cmdSubmit.redisAddress + ";" - + section + "run_mode=" + "CLUSTER"; - - argList.add(overwrite); - - if (cmdSubmit.config != null) { - String config = "--config=" + cmdSubmit.config; - argList.add(config); - } - - String[] args = new String[argList.size()]; - argList.toArray(args); - - return args; - } - - private static void submit(CommandSubmit cmdSubmit, String configPath) throws Exception { - ConfigReader config = new ConfigReader(configPath, "ray.java.start.deploy=true"); - PathConfig paths = new PathConfig(config); - RayParameters params = new RayParameters(config); - params.redis_address = cmdSubmit.redisAddress; - params.run_mode = RunMode.CLUSTER; - - KeyValueStoreLink kvStore = new RedisClient(); - kvStore.setAddr(cmdSubmit.redisAddress); - StateStoreProxy stateStoreProxy = new StateStoreProxyImpl(kvStore); - stateStoreProxy.initializeGlobalState(); - - // Init RayLog before using it. - RayLog.init(params.log_dir); - UniqueId appId = params.driver_id; - String appDir = "/tmp/" + cmdSubmit.className; - - // Start driver process. - RunManager runManager = new RunManager(params, paths, config); - Process proc = runManager.startDriver( - DefaultDriver.class.getName(), - cmdSubmit.redisAddress, - appId, - appDir, - params.node_ip_address, - cmdSubmit.className, - cmdSubmit.classArgs, - "", - null); - - if (null == proc) { - RayLog.rapp.error("Failed to start driver."); - return; - } - - RayLog.rapp.info("Driver started."); - } - - private static String getConfigPath(String config) { - String configPath; - - if (config != null && !config.equals("")) { - configPath = config; - } else { - configPath = System.getenv("RAY_CONFIG"); - if (configPath == null) { - configPath = System.getProperty("ray.config"); - } - if (configPath == null) { - throw new RuntimeException( - "Please set config file path in env RAY_CONFIG or property ray.config"); - } - } - return configPath; - } - - public static void main(String[] args) throws Exception { + public static void main(String[] args) { CommandStart cmdStart = new CommandStart(); CommandStop cmdStop = new CommandStop(); - CommandSubmit cmdSubmit = new CommandSubmit(); JCommander rayCommander = JCommander.newBuilder().addObject(rayArgs) .addCommand("start", cmdStart) .addCommand("stop", cmdStop) - .addCommand("submit", cmdSubmit) .build(); rayCommander.parse(args); @@ -210,21 +90,13 @@ public class RayCli { System.exit(0); } - String configPath; switch (cmd) { - case "start": { - configPath = getConfigPath(cmdStart.config); - ConfigReader config = new ConfigReader(configPath, cmdStart.overwrite); - start(cmdStart, config); - } - break; + case "start": + start(cmdStart); + break; case "stop": stop(cmdStop); break; - case "submit": - configPath = getConfigPath(cmdSubmit.config); - submit(cmdSubmit, configPath); - break; default: rayCommander.usage(); } diff --git a/java/doc/installation.rst b/java/doc/installation.rst index 41189c0ae..fca3b12e7 100644 --- a/java/doc/installation.rst +++ b/java/doc/installation.rst @@ -57,5 +57,4 @@ Run tests :: # in `ray/java` directory - export RAY_CONFIG=ray.config.ini mvn test diff --git a/java/example.conf b/java/example.conf new file mode 100644 index 000000000..ca42b3361 --- /dev/null +++ b/java/example.conf @@ -0,0 +1,29 @@ +# This is an example ray config file. +# To use this file, copy it to your classpath and rename it to 'ray.conf'. + +# For all available config items and default values, +# see 'java/runtime/src/main/resources/ray.default.conf'. +# For config file format, see 'https://github.com/lightbend/config/blob/master/HOCON.md'. + +ray { + // This is the path to the directory where Ray is installed, e.g., + // something like /home/ubmutu/ray. This can be an absolute path or + // a relative path from the current working directory. + home = "/path/to/your/ray/home" + + // Run mode, available options are: + // + // `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend, + // object store, and GCS. It's useful for debug. + // `CLUSTER`: Ray is running on one or more nodes, with multiple processes. + run-mode = CLUSTER + + // Available resources on this node. + resources: "CPU:4,GPU:0" + + // The address of the redis server to connect, in format `ip:port`. + // If not provided, Ray processes will be started locally, including + // Redis server, Raylet and object store. + redis.address = "" + +} diff --git a/java/pom.xml b/java/pom.xml index be17781f6..d1c7aaee3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -95,6 +95,11 @@ slf4j-log4j12 ${slf4j.version} + + com.typesafe + config + 1.3.2 + net.java.dev.jna jna diff --git a/java/ray.config.ini b/java/ray.config.ini deleted file mode 100644 index f4eb4550d..000000000 --- a/java/ray.config.ini +++ /dev/null @@ -1,133 +0,0 @@ -[ray] -ray_protocol_version = 0x0000000000000000 -heartbeat_timeout_milliseconds = 1000 -num_heartbeats_timeout = 100 -get_timeout_milliseconds = 1000 -worker_get_request_size = 1000 -worker_fetch_request_size = 1000 -num_connect_attempts = 50 -connect_timeout_milliseconds = 1000 -local_scheduler_fetch_timeout_milliseconds = 1000 -local_scheduler_reconstruction_timeout_milliseconds = 1000 -max_num_to_reconstruct = 1000 -local_scheduler_fetch_request_size = 10000 -kill_worker_timeout_milliseconds = 1000 -manager_timeout_milliseconds = 1000 -buf_size = 4096 -max_time_for_handler_milliseconds = 1000 -size_limit = 100 -num_elements_limit = 1000 -max_time_for_loop = 1000 -redis_db_connect_retries = 50 -redis_db_connect_wait_milliseconds = 1000 -plasma_default_release_delay = 0 -L3_cache_size_bytes = 100000000 -simple_fail_over = false -;store_evict_soft_max_count = 10 - -[ray.java] -;network_interface = en0 - -[ray.java.start] - -; run mode for this app SINGLE_PROCESS | SINGLE_BOX | CLUSTER -;run_mode = SINGLE_PROCESS -run_mode = SINGLE_BOX - -; worker mode for this app DRIVER | WORKER | NONE -worker_mode = DRIVER - -; number of workers initially started -num_workers = 2 - -driver_id = 0123456789abcdef0123456789abcdef01234567 - -redis_port = 34111 - -max_submit_task_buffer_size_bytes = 51200 - -default_first_check_timeout_ms = 1000 - -default_get_check_interval_ms = 5000 - -;jvm_parameters = -XX:+TraceClassLoading - -object_store_occupied_memory_MB = 2 - -deploy = false - -onebox_delay_seconds_before_run_app_logic = 0 - -static_resources = CPU:4,GPU:0 - -; java class which main is served as the driver in a java worker -driver_class = - -; arguments for the java class main function which is served at the driver -; the arguments are separated by ',' -driver_args = - -[ray.java.start.job] - -[ray.java.path.classes.source] -%CONFIG_FILE_DIR%/api/target/classes = -%CONFIG_FILE_DIR%/api/target/test-classes = - -%CONFIG_FILE_DIR%/runtime/target/classes = -%CONFIG_FILE_DIR%/runtime/target/test-classes = - -%CONFIG_FILE_DIR%/tutorial/target/classes = - -%CONFIG_FILE_DIR%/test/target/classes = -%CONFIG_FILE_DIR%/test/target/test-classes = -%CONFIG_FILE_DIR%/test/lib/* = - -[ray.java.path.classes.package] -%CONFIG_FILE_DIR%/api/target/ray-api-1.0.jar = -%CONFIG_FILE_DIR%/runtime/target/ray-runtime-1.0.jar = - -%CONFIG_FILE_DIR%/test/target/ray-test-1.0.jar = -%CONFIG_FILE_DIR%/test/target/test-classes = -%CONFIG_FILE_DIR%/test/lib/* = - -%CONFIG_FILE_DIR%/tutorial/target/ray-tutorial-1.0.jar = - -[ray.java.path.classes.deploy] -%CONFIG_FILE_DIR%/java/lib/* = - -[ray.java.path.jni.package] -%CONFIG_FILE_DIR%/../build/src/plasma = -%CONFIG_FILE_DIR%/../build/src/local_scheduler = - -[ray.java.path.jni.deploy] -%CONFIG_FILE_DIR%/native/lib = - -[ray.java.path.source] -redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server -redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so -store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server -raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet -python_dir = %CONFIG_FILE_DIR%/../build/ -java_runtime_rewritten_jars_dir = -java_class_paths = ray.java.path.classes.source -java_jnilib_paths = ray.java.path.jni.package - -[ray.java.path.package] -redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server -redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so -store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server -raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet -python_dir = %CONFIG_FILE_DIR%/../build/ -java_runtime_rewritten_jars_dir = -java_class_paths = ray.java.path.classes.package -java_jnilib_paths = ray.java.path.jni.package - -[ray.java.path.deploy] -redis_server = %CONFIG_FILE_DIR%/native/bin/redis-server -redis_module = %CONFIG_FILE_DIR%/native/lib/libray_redis_module.so -store = %CONFIG_FILE_DIR%/native/bin/plasma_store_server -raylet = %CONFIG_FILE_DIR%/native/bin/raylet -python_dir = %CONFIG_FILE_DIR%/python -java_runtime_rewritten_jars_dir = %CONFIG_FILE_DIR%/java/lib/ -java_class_paths = ray.java.path.classes.deploy -java_jnilib_paths = ray.java.path.jni.deploy diff --git a/java/runtime/pom.xml b/java/runtime/pom.xml index 34a868d5b..bc40ac776 100644 --- a/java/runtime/pom.xml +++ b/java/runtime/pom.xml @@ -21,6 +21,10 @@ ray-api ${project.version} + + com.typesafe + config + org.apache.commons commons-lang3 diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 50189b82b..b035f3b52 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -6,17 +6,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.arrow.plasma.ObjectStoreLink; import org.apache.commons.lang3.tuple.Pair; -import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.WaitResult; import org.ray.api.function.RayFunc; import org.ray.api.id.UniqueId; import org.ray.api.runtime.RayRuntime; -import org.ray.runtime.config.PathConfig; -import org.ray.runtime.config.RayParameters; +import org.ray.runtime.config.RayConfig; import org.ray.runtime.functionmanager.FunctionManager; import org.ray.runtime.functionmanager.RayFunction; import org.ray.runtime.objectstore.ObjectStoreProxy; @@ -26,7 +23,6 @@ import org.ray.runtime.task.ArgumentsBuilder; import org.ray.runtime.task.TaskSpec; import org.ray.runtime.util.ResourceUtil; import org.ray.runtime.util.UniqueIdHelper; -import org.ray.runtime.util.config.ConfigReader; import org.ray.runtime.util.exception.TaskExecutionException; import org.ray.runtime.util.logger.RayLog; @@ -35,132 +31,32 @@ import org.ray.runtime.util.logger.RayLog; */ public abstract class AbstractRayRuntime implements RayRuntime { - public static ConfigReader configReader; - protected static AbstractRayRuntime ins = null; - protected static RayParameters params = null; - private static boolean fromRayInit = false; + private static final int GET_TIMEOUT_MS = 1000; + private static final int FETCH_BATCH_SIZE = 1000; + + protected RayConfig rayConfig; + protected WorkerContext workerContext; protected Worker worker; protected RayletClient rayletClient; protected ObjectStoreProxy objectStoreProxy; protected FunctionManager functionManager; - protected PathConfig pathConfig; /** * Actor ID -> local actor instance. */ Map localActors = new HashMap<>(); - // app level Ray.init() - // make it private so there is no direct usage but only from Ray.init - private static AbstractRayRuntime init() { - if (ins == null) { - try { - fromRayInit = true; - AbstractRayRuntime.init(null, null); - fromRayInit = false; - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException("Ray.init failed", e); - } - } - return ins; - } - - // engine level AbstractRayRuntime.init(xx, xx) - // updateConfigStr is sth like section1.k1=v1;section2.k2=v2 - public static AbstractRayRuntime init(String configPath, String updateConfigStr) - throws Exception { - if (ins == null) { - if (configPath == null) { - configPath = System.getenv("RAY_CONFIG"); - if (configPath == null) { - configPath = System.getProperty("ray.config"); - } - if (configPath == null) { - throw new Exception( - "Please set config file path in env RAY_CONFIG or property ray.config"); - } - } - configReader = new ConfigReader(configPath, updateConfigStr); - AbstractRayRuntime.params = new RayParameters(configReader); - - RayLog.init(params.log_dir); - assert RayLog.core != null; - - ins = instantiate(params); - assert (ins != null); - - if (!fromRayInit) { - Ray.init(); // assign Ray._impl - } - } - return ins; - } - - // init with command line args - // --config=ray.config.ini --overwrite=updateConfigStr - public static AbstractRayRuntime init(String[] args) throws Exception { - String config = null; - String updateConfig = null; - for (String arg : args) { - if (arg.startsWith("--config=")) { - config = arg.substring("--config=".length()); - } else if (arg.startsWith("--overwrite=")) { - updateConfig = arg.substring("--overwrite=".length()); - } else { - throw new RuntimeException("Input argument " + arg - + " is not recognized, please use --overwrite to merge it into config file"); - } - } - return init(config, updateConfig); - } - - protected void init( - RayletClient slink, - ObjectStoreLink plink, - PathConfig pathManager - ) { - pathConfig = pathManager; - + public AbstractRayRuntime(RayConfig rayConfig) { + this.rayConfig = rayConfig; functionManager = new FunctionManager(); - rayletClient = slink; - - objectStoreProxy = new ObjectStoreProxy(plink); worker = new Worker(this); - } - - private static AbstractRayRuntime instantiate(RayParameters params) { - AbstractRayRuntime runtime; - if (params.run_mode.isNativeRuntime()) { - runtime = new RayNativeRuntime(); - } else { - runtime = new RayDevRuntime(); - } - - RayLog.core - .info("Start " + runtime.getClass().getName() + " with " + params.run_mode.toString()); - try { - runtime.start(params); - } catch (Exception e) { - RayLog.core.error("Failed to init RayRuntime", e); - System.exit(-1); - } - - return runtime; + workerContext = new WorkerContext(rayConfig.workerMode, rayConfig.driverId); } /** - * start runtime. + * Start runtime. */ - public abstract void start(RayParameters params) throws Exception; - - public static AbstractRayRuntime getInstance() { - return ins; - } - - public static RayParameters getParams() { - return params; - } + public abstract void start() throws Exception; @Override public abstract void shutdown(); @@ -168,14 +64,14 @@ public abstract class AbstractRayRuntime implements RayRuntime { @Override public RayObject put(T obj) { UniqueId objectId = UniqueIdHelper.computePutId( - WorkerContext.currentTask().taskId, WorkerContext.nextPutIndex()); + workerContext.getCurrentTask().taskId, workerContext.nextPutIndex()); put(objectId, obj); return new RayObjectImpl<>(objectId); } public void put(UniqueId objectId, T obj) { - UniqueId taskId = WorkerContext.currentTask().taskId; + UniqueId taskId = workerContext.getCurrentTask().taskId; RayLog.core.info("Putting object {}, for task {} ", objectId, taskId); objectStoreProxy.put(objectId, obj, null); } @@ -189,21 +85,21 @@ public abstract class AbstractRayRuntime implements RayRuntime { @Override public List get(List objectIds) { boolean wasBlocked = false; - UniqueId taskId = WorkerContext.currentTask().taskId; + UniqueId taskId = workerContext.getCurrentTask().taskId; try { int numObjectIds = objectIds.size(); // Do an initial fetch for remote objects. List> fetchBatches = - splitIntoBatches(objectIds, params.worker_fetch_request_size); + splitIntoBatches(objectIds, FETCH_BATCH_SIZE); for (List batch : fetchBatches) { rayletClient.reconstructObjects(batch, true); } // Get the objects. We initially try to get the objects immediately. List> ret = objectStoreProxy - .get(objectIds, params.default_first_check_timeout_ms, false); + .get(objectIds, GET_TIMEOUT_MS, false); assert ret.size() == numObjectIds; // Mapping the object IDs that we haven't gotten yet to their original index in objectIds. @@ -220,14 +116,14 @@ public abstract class AbstractRayRuntime implements RayRuntime { while (unreadys.size() > 0) { List unreadyList = new ArrayList<>(unreadys.keySet()); List> reconstructBatches = - splitIntoBatches(unreadyList, params.worker_fetch_request_size); + splitIntoBatches(unreadyList, FETCH_BATCH_SIZE); for (List batch : reconstructBatches) { rayletClient.reconstructObjects(batch, false); } List> results = objectStoreProxy - .get(unreadyList, params.default_get_check_interval_ms, false); + .get(unreadyList, GET_TIMEOUT_MS, false); // Remove any entries for objects we received during this iteration so we // don't retrieve the same object twice. @@ -341,10 +237,10 @@ public abstract class AbstractRayRuntime implements RayRuntime { */ private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args, boolean isActorCreationTask) { - final TaskSpec current = WorkerContext.currentTask(); + final TaskSpec current = workerContext.getCurrentTask(); UniqueId taskId = rayletClient.generateTaskId(current.driverId, current.taskId, - WorkerContext.nextCallIndex()); + workerContext.nextCallIndex()); int numReturns = actor.getId().isNil() ? 1 : 2; UniqueId[] returnIds = genReturnIds(taskId, numReturns); @@ -378,6 +274,10 @@ public abstract class AbstractRayRuntime implements RayRuntime { return worker; } + public WorkerContext getWorkerContext() { + return workerContext; + } + public RayletClient getRayletClient() { return rayletClient; } diff --git a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java new file mode 100644 index 000000000..366937b10 --- /dev/null +++ b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java @@ -0,0 +1,40 @@ +package org.ray.runtime; + +import com.google.common.base.Strings; +import java.lang.reflect.Field; +import java.util.stream.Collectors; +import org.ray.api.runtime.RayRuntime; +import org.ray.api.runtime.RayRuntimeFactory; +import org.ray.runtime.config.RayConfig; +import org.ray.runtime.config.RunMode; +import org.ray.runtime.util.logger.RayLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The default Ray runtime factory. It produces an instance of AbstractRayRuntime. + */ +public class DefaultRayRuntimeFactory implements RayRuntimeFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRayRuntimeFactory.class); + + @Override + public RayRuntime createRayRuntime() { + RayLog.init(); + RayConfig rayConfig = RayConfig.create(); + try { + AbstractRayRuntime runtime; + if (rayConfig.runMode == RunMode.SINGLE_PROCESS) { + runtime = new RayDevRuntime(rayConfig); + } else { + runtime = new RayNativeRuntime(rayConfig); + } + + runtime.start(); + return runtime; + } catch (Exception e) { + LOGGER.error("Failed to initialize ray runtime", e); + throw new RuntimeException("Failed to initialize ray runtime", e); + } + } +} diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 21714e0f6..4799baa94 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -1,18 +1,21 @@ package org.ray.runtime; -import org.ray.runtime.config.PathConfig; -import org.ray.runtime.config.RayParameters; +import org.ray.runtime.config.RayConfig; import org.ray.runtime.objectstore.MockObjectStore; +import org.ray.runtime.objectstore.ObjectStoreProxy; import org.ray.runtime.raylet.MockRayletClient; public class RayDevRuntime extends AbstractRayRuntime { + public RayDevRuntime(RayConfig rayConfig) { + super(rayConfig); + } + @Override - public void start(RayParameters params) { - PathConfig pathConfig = new PathConfig(configReader); - MockObjectStore store = new MockObjectStore(); - MockRayletClient scheduler = new MockRayletClient(this, store); - init(scheduler, store, pathConfig); + public void start() { + MockObjectStore store = new MockObjectStore(this); + objectStoreProxy = new ObjectStoreProxy(this, store); + rayletClient = new MockRayletClient(this, store); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 04bbdac26..2036f2319 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -1,148 +1,119 @@ package org.ray.runtime; +import com.google.common.base.Strings; +import java.lang.reflect.Field; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.arrow.plasma.ObjectStoreLink; import org.apache.arrow.plasma.PlasmaClient; -import org.ray.runtime.config.PathConfig; -import org.ray.runtime.config.RayParameters; +import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.WorkerMode; -import org.ray.runtime.gcs.AddressInfo; import org.ray.runtime.gcs.KeyValueStoreLink; import org.ray.runtime.gcs.RedisClient; -import org.ray.runtime.gcs.StateStoreProxy; -import org.ray.runtime.gcs.StateStoreProxyImpl; -import org.ray.runtime.raylet.RayletClient; +import org.ray.runtime.objectstore.ObjectStoreProxy; import org.ray.runtime.raylet.RayletClientImpl; import org.ray.runtime.runner.RunManager; -import org.ray.runtime.util.logger.RayLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * native runtime for local box and cluster run. */ public final class RayNativeRuntime extends AbstractRayRuntime { - static { - System.err.println("Current working directory is " + System.getProperty("user.dir")); - System.loadLibrary("local_scheduler_library_java"); - System.loadLibrary("plasma_java"); - } + private static final Logger LOGGER = LoggerFactory.getLogger(RayNativeRuntime.class); - private StateStoreProxy stateStoreProxy; private KeyValueStoreLink kvStore = null; private RunManager manager = null; - public RayNativeRuntime() { + public RayNativeRuntime(RayConfig rayConfig) { + super(rayConfig); + } + + private void resetLibaryPath() { + String path = System.getProperty("java.library.path"); + if (Strings.isNullOrEmpty(path)) { + path = ""; + } else { + path += ":"; + } + + path += rayConfig.libraryPath.stream().collect(Collectors.joining(":")); + + // This is a hack to reset library path at runtime, + // see https://stackoverflow.com/questions/15409223/. + System.setProperty("java.library.path", path); + //set sys_paths to null so that java.library.path will be re-evalueted next time it is needed + final Field sysPathsField; + try { + sysPathsField = ClassLoader.class.getDeclaredField("sys_paths"); + sysPathsField.setAccessible(true); + sysPathsField.set(null, null); + } catch (NoSuchFieldException | IllegalAccessException e) { + e.printStackTrace(); + LOGGER.error("Failed to set library path.", e); + } } @Override - public void start(RayParameters params) throws Exception { - boolean isWorker = (params.worker_mode == WorkerMode.WORKER); - PathConfig pathConfig = new PathConfig(configReader); - - // initialize params - if (params.redis_address.length() == 0) { - if (isWorker) { - throw new Error("Redis address must be configured under Worker mode."); - } - startOnebox(params, pathConfig); - initStateStore(params.redis_address); - } else { - initStateStore(params.redis_address); - if (!isWorker) { - List nodes = stateStoreProxy.getAddressInfo( - params.node_ip_address, params.redis_address, 5); - params.object_store_name = nodes.get(0).storeName; - params.raylet_socket_name = nodes.get(0).rayletSocketName; - } + public void start() throws Exception { + // Load native libraries. + try { + resetLibaryPath(); + System.loadLibrary("local_scheduler_library_java"); + System.loadLibrary("plasma_java"); + } catch (Exception e) { + LOGGER.error("Failed to load native libraries.", e); + throw e; } - // initialize worker context - if (params.worker_mode == WorkerMode.DRIVER) { - // TODO: The relationship between workerID, driver_id and dummy_task.driver_id should be - // recheck carefully - WorkerContext.workerID = params.driver_id; + if (rayConfig.getRedisAddress() == null) { + manager = new RunManager(rayConfig); + manager.startRayProcesses(true); } - WorkerContext.init(params); + kvStore = new RedisClient(rayConfig.getRedisAddress()); - if (params.onebox_delay_seconds_before_run_app_logic > 0) { - for (int i = 0; i < params.onebox_delay_seconds_before_run_app_logic; ++i) { - System.err.println("Pause for debugger, " - + (params.onebox_delay_seconds_before_run_app_logic - i) - + " seconds left ..."); - Thread.sleep(1000); - } - } + ObjectStoreLink store = new PlasmaClient(rayConfig.objectStoreSocketName, "", 0); + objectStoreProxy = new ObjectStoreProxy(this, store); - if (params.worker_mode != WorkerMode.NONE) { - // initialize the links - int releaseDelay = AbstractRayRuntime.configReader - .getIntegerValue("ray", "plasma_default_release_delay", 0, - "how many release requests should be delayed in plasma client"); + rayletClient = new RayletClientImpl( + rayConfig.rayletSocketName, + workerContext.getCurrentWorkerId(), + rayConfig.workerMode == WorkerMode.WORKER, + workerContext.getCurrentTask().taskId + ); - ObjectStoreLink plink = new PlasmaClient(params.object_store_name, "", releaseDelay); - RayletClient rayletClient = new RayletClientImpl( - params.raylet_socket_name, - WorkerContext.currentWorkerId(), - isWorker, - WorkerContext.currentTask().taskId - ); + // register + registerWorker(); - init(rayletClient, plink, pathConfig); - - // register - registerWorker(isWorker, params.node_ip_address, params.object_store_name, - params.raylet_socket_name); - - } - - RayLog.core.info("RayNativeRuntime started with store {}, raylet {}", - params.object_store_name, params.raylet_socket_name); + LOGGER.info("RayNativeRuntime started with store {}, raylet {}", + rayConfig.objectStoreSocketName, rayConfig.rayletSocketName); } @Override public void shutdown() { if (null != manager) { - manager.cleanup(true); + manager.cleanup(); } } - private void startOnebox(RayParameters params, PathConfig paths) throws Exception { - params.cleanup = true; - manager = new RunManager(params, paths, AbstractRayRuntime.configReader); - manager.startRayHead(); - - params.redis_address = manager.info().redisAddress; - params.object_store_name = manager.info().localStores.get(0).storeName; - params.raylet_socket_name = manager.info().localStores.get(0).rayletSocketName; - //params.node_ip_address = NetworkUtil.getIpAddress(); - } - - private void initStateStore(String redisAddress) throws Exception { - kvStore = new RedisClient(); - kvStore.setAddr(redisAddress); - stateStoreProxy = new StateStoreProxyImpl(kvStore); - stateStoreProxy.initializeGlobalState(); - } - - private void registerWorker(boolean isWorker, String nodeIpAddress, String storeName, - String rayletSocketName) { + private void registerWorker() { Map workerInfo = new HashMap<>(); - String workerId = new String(WorkerContext.currentWorkerId().getBytes()); - if (!isWorker) { - workerInfo.put("node_ip_address", nodeIpAddress); + String workerId = new String(workerContext.getCurrentWorkerId().getBytes()); + if (rayConfig.workerMode == WorkerMode.DRIVER) { + workerInfo.put("node_ip_address", rayConfig.nodeIp); workerInfo.put("driver_id", workerId); workerInfo.put("start_time", String.valueOf(System.currentTimeMillis())); - workerInfo.put("plasma_store_socket", storeName); - workerInfo.put("raylet_socket", rayletSocketName); + workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName); + workerInfo.put("raylet_socket", rayConfig.rayletSocketName); workerInfo.put("name", System.getProperty("user.dir")); //TODO: worker.redis_client.hmset(b"Drivers:" + worker.workerId, driver_info) kvStore.hmset("Drivers:" + workerId, workerInfo); } else { - workerInfo.put("node_ip_address", nodeIpAddress); - workerInfo.put("plasma_store_socket", storeName); - workerInfo.put("raylet_socket", rayletSocketName); + workerInfo.put("node_ip_address", rayConfig.nodeIp); + workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName); + workerInfo.put("raylet_socket", rayConfig.rayletSocketName); //TODO: b"Workers:" + worker.workerId, kvStore.hmset("Workers:" + workerId, workerInfo); } diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index d9215e33f..5371560bf 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -44,7 +44,8 @@ public class Worker { RayFunction rayFunction = runtime.getFunctionManager() .getFunction(spec.driverId, spec.functionDescriptor); // Set context - WorkerContext.prepare(spec, rayFunction.classLoader); + runtime.getWorkerContext().setCurrentTask(spec); + runtime.getWorkerContext().setCurrentClassLoader(rayFunction.classLoader); Thread.currentThread().setContextClassLoader(rayFunction.classLoader); // Get local actor object and arguments. Object actor = spec.isActorTask() ? runtime.localActors.get(spec.actorId) : null; diff --git a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java index 55099fb0d..ebe00cfa1 100644 --- a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java +++ b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java @@ -1,42 +1,80 @@ package org.ray.runtime; import org.ray.api.id.UniqueId; -import org.ray.runtime.config.RayParameters; import org.ray.runtime.config.WorkerMode; import org.ray.runtime.task.TaskSpec; public class WorkerContext { - private static final ThreadLocal currentWorkerCtx = - ThreadLocal.withInitial(() -> init(AbstractRayRuntime.getParams())); /** - * id of worker. + * Worker id. */ - public static UniqueId workerID = UniqueId.randomId(); + private UniqueId workerId; + /** - * current doing task. + * Current task. */ private TaskSpec currentTask; + /** - * current app classloader. + * Current class loader. */ private ClassLoader currentClassLoader; + /** - * how many puts done by current task. + * How many puts have been done by current task. */ private int currentTaskPutCount; + /** - * how many calls done by current task. + * How many calls have been done by current task. */ private int currentTaskCallCount; - public static WorkerContext init(RayParameters params) { - WorkerContext ctx = new WorkerContext(); - currentWorkerCtx.set(ctx); + public WorkerContext(WorkerMode workerMode, UniqueId driverId) { + workerId = workerMode == WorkerMode.DRIVER ? driverId : UniqueId.randomId(); + currentTaskPutCount = 0; + currentTaskCallCount = 0; + currentClassLoader = null; + currentTask = createDummyTask(workerMode, driverId); + } - TaskSpec dummy = new TaskSpec( - params.driver_id, - params.worker_mode == WorkerMode.DRIVER ? UniqueId.randomId() : UniqueId.NIL, + public void setWorkerId(UniqueId workerId) { + this.workerId = workerId; + } + + public TaskSpec getCurrentTask() { + return currentTask; + } + + public int nextPutIndex() { + return ++currentTaskPutCount; + } + + public int nextCallIndex() { + return ++currentTaskCallCount; + } + + public UniqueId getCurrentWorkerId() { + return workerId; + } + + public ClassLoader getCurrentClassLoader() { + return currentClassLoader; + } + + public void setCurrentTask(TaskSpec currentTask) { + this.currentTask = currentTask; + } + + public void setCurrentClassLoader(ClassLoader currentClassLoader) { + this.currentClassLoader = currentClassLoader; + } + + private TaskSpec createDummyTask(WorkerMode workerMode, UniqueId driverId) { + return new TaskSpec( + driverId, + workerMode == WorkerMode.DRIVER ? UniqueId.randomId() : UniqueId.NIL, UniqueId.NIL, 0, UniqueId.NIL, @@ -46,42 +84,6 @@ public class WorkerContext { null, null, null, - null - ); - prepare(dummy, null); - - return ctx; - } - - public static void prepare(TaskSpec task, ClassLoader classLoader) { - WorkerContext wc = get(); - wc.currentTask = task; - wc.currentTaskPutCount = 0; - wc.currentTaskCallCount = 0; - wc.currentClassLoader = classLoader; - } - - public static WorkerContext get() { - return currentWorkerCtx.get(); - } - - public static TaskSpec currentTask() { - return get().currentTask; - } - - public static int nextPutIndex() { - return ++get().currentTaskPutCount; - } - - public static int nextCallIndex() { - return ++get().currentTaskCallCount; - } - - public static UniqueId currentWorkerId() { - return WorkerContext.workerID; - } - - public static ClassLoader currentClassLoader() { - return get().currentClassLoader; + null); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/config/PathConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/PathConfig.java deleted file mode 100644 index d58cc6bc6..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/config/PathConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.ray.runtime.config; - -import org.ray.runtime.util.config.AConfig; -import org.ray.runtime.util.config.ConfigReader; - -/** - * Path related configurations. - */ -public class PathConfig { - - @AConfig(comment = "additional class path for JAVA", - defaultArrayIndirectSectionName = "ray.java.path.classes.source") - public String[] java_class_paths; - - @AConfig(comment = "additional JNI library paths for JAVA", - defaultArrayIndirectSectionName = "ray.java.path.jni.build") - public String[] java_jnilib_paths; - - @AConfig(comment = "path to ray_functions.txt for the default rewritten functions in ray runtime") - public String java_runtime_rewritten_jars_dir = ""; - - @AConfig(comment = "path to redis-server") - public String redis_server; - - @AConfig(comment = "path to redis module") - public String redis_module; - - @AConfig(comment = "path to plasma storage") - public String store; - - @AConfig(comment = "path to raylet") - public String raylet; - - @AConfig(comment = "path to python directory") - public String python_dir; - - @AConfig(comment = "path to log server") - public String log_server; - - @AConfig(comment = "path to log server config file") - public String log_server_config; - - public PathConfig(ConfigReader config) { - if (config.getBooleanValue("ray.java.start", "deploy", false, - "whether the package is used as a cluster deployment")) { - config.readObject("ray.java.path.deploy", this, this); - } else { - boolean isJar = this.getClass().getResource(this.getClass().getSimpleName() + ".class") - .getFile().split("!")[0].endsWith(".jar"); - if (isJar) { - config.readObject("ray.java.path.package", this, this); - } else { - config.readObject("ray.java.path.source", this, this); - } - } - } -} diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java new file mode 100644 index 000000000..f607b44ca --- /dev/null +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -0,0 +1,227 @@ +package org.ray.runtime.config; + + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; +import java.util.List; +import java.util.Map; +import org.ray.api.id.UniqueId; +import org.ray.runtime.util.NetworkUtil; +import org.ray.runtime.util.ResourceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Configurations of Ray runtime. + * See `ray.default.conf` for the meaning of each field. + */ +public class RayConfig { + + private static final Logger LOGGER = LoggerFactory.getLogger(RayConfig.class); + + public static final String DEFAULT_CONFIG_FILE = "ray.default.conf"; + public static final String CUSTOM_CONFIG_FILE = "ray.conf"; + + public final String rayHome; + public final String nodeIp; + public final WorkerMode workerMode; + public final RunMode runMode; + public final Map resources; + public final UniqueId driverId; + public final String logDir; + public final boolean redirectOutput; + public final List libraryPath; + public final List classpath; + + private String redisAddress; + private String redisIp; + private Integer redisPort; + public final int headRedisPort; + public final int numberRedisShards; + + public final String objectStoreSocketName; + public final Long objectStoreSize; + + public final String rayletSocketName; + + public final String redisServerExecutablePath; + public final String redisModulePath; + public final String plasmaStoreExecutablePath; + public final String rayletExecutablePath; + + private void validate() { + if (workerMode == WorkerMode.WORKER) { + Preconditions.checkArgument(redisAddress != null, + "Redis address must be set in worker mode."); + } else { + Preconditions.checkArgument(!rayHome.isEmpty(), + "'ray.home' must be set in driver mode"); + } + } + + private String removeTrailingSlash(String path) { + if (path.endsWith("/")) { + return path.substring(0, path.length() - 1); + } else { + return path; + } + } + + public RayConfig(Config config) { + // worker mode + WorkerMode localWorkerMode; + try { + localWorkerMode = config.getEnum(WorkerMode.class, "ray.worker.mode"); + } catch (ConfigException.Missing e) { + localWorkerMode = WorkerMode.DRIVER; + } + + workerMode = localWorkerMode; + boolean isDriver = workerMode == WorkerMode.DRIVER; + // run mode + runMode = config.getEnum(RunMode.class, "ray.run-mode"); + // ray home + String rayHome = config.getString("ray.home"); + if (!rayHome.startsWith("/")) { + // If ray.home isn't an absolute path, prepend it with current work dir. + rayHome = System.getProperty("user.dir") + "/" + rayHome; + } + this.rayHome = removeTrailingSlash(rayHome); + // node ip + String nodeIp = config.getString("ray.node-ip"); + if (nodeIp.isEmpty()) { + nodeIp = NetworkUtil.getIpAddress(null); + } + this.nodeIp = nodeIp; + // resources + resources = ResourceUtil.getResourcesMapFromString( + config.getString("ray.resources")); + if (isDriver) { + if (!resources.containsKey("CPU")) { + int numCpu = Runtime.getRuntime().availableProcessors(); + LOGGER.warn("No CPU resource is set in configuration, " + + "setting it to the number of CPU cores: {}", numCpu); + resources.put("CPU", numCpu * 1.0); + } + if (!resources.containsKey("GPU")) { + LOGGER.warn("No GPU resource is set in configuration, setting it to 0"); + resources.put("GPU", 0.0); + } + } + // driver id + String driverId = config.getString("ray.driver.id"); + if (!driverId.isEmpty()) { + this.driverId = UniqueId.fromHexString(driverId); + } else { + this.driverId = UniqueId.randomId(); + } + // log dir + logDir = removeTrailingSlash(config.getString("ray.log-dir")); + // redirect output + redirectOutput = config.getBoolean("ray.redirect-output"); + // custom library path + List customLibraryPath = config.getStringList("ray.library.path"); + // custom classpath + classpath = config.getStringList("ray.classpath"); + + // redis configurations + String redisAddress = config.getString("ray.redis.address"); + if (!redisAddress.isEmpty()) { + setRedisAddress(redisAddress); + } else { + this.redisAddress = null; + } + headRedisPort = config.getInt("ray.redis.head-port"); + numberRedisShards = config.getInt("ray.redis.shard-number"); + + // object store configurations + objectStoreSocketName = config.getString("ray.object-store.socket-name"); + objectStoreSize = config.getBytes("ray.object-store.size"); + + // raylet socket name + rayletSocketName = config.getString("ray.raylet.socket-name"); + + // library path + this.libraryPath = new ImmutableList.Builder().add( + rayHome + "/build/src/plasma", + rayHome + "/build/src/local_scheduler" + ).addAll(customLibraryPath).build(); + + redisServerExecutablePath = rayHome + "/build/src/common/thirdparty/redis/src/redis-server"; + redisModulePath = rayHome + "/build/src/common/redis_module/libray_redis_module.so"; + plasmaStoreExecutablePath = rayHome + "/build/src/plasma/plasma_store_server"; + rayletExecutablePath = rayHome + "/build/src/ray/raylet/raylet"; + + // validate config + validate(); + LOGGER.debug("Created config: {}", this); + } + + public void setRedisAddress(String redisAddress) { + Preconditions.checkNotNull(redisAddress); + Preconditions.checkState(this.redisAddress == null, "Redis address was already set"); + + this.redisAddress = redisAddress; + String[] ipAndPort = redisAddress.split(":"); + Preconditions.checkArgument(ipAndPort.length == 2, "Invalid redis address."); + this.redisIp = ipAndPort[0]; + this.redisPort = Integer.parseInt(ipAndPort[1]); + } + + public String getRedisAddress() { + return redisAddress; + } + + public String getRedisIp() { + return redisIp; + } + + public Integer getRedisPort() { + return redisPort; + } + + @Override + public String toString() { + return "RayConfig{" + + "rayHome='" + rayHome + '\'' + + ", nodeIp='" + nodeIp + '\'' + + ", workerMode=" + workerMode + + ", runMode=" + runMode + + ", resources=" + resources + + ", driverId=" + driverId + + ", logDir='" + logDir + '\'' + + ", redirectOutput=" + redirectOutput + + ", libraryPath=" + libraryPath + + ", classpath=" + classpath + + ", redisAddress='" + redisAddress + '\'' + + ", redisIp='" + redisIp + '\'' + + ", redisPort=" + redisPort + + ", headRedisPort=" + headRedisPort + + ", numberRedisShards=" + numberRedisShards + + ", objectStoreSocketName='" + objectStoreSocketName + '\'' + + ", objectStoreSize=" + objectStoreSize + + ", rayletSocketName='" + rayletSocketName + '\'' + + ", redisServerExecutablePath='" + redisServerExecutablePath + '\'' + + ", plasmaStoreExecutablePath='" + plasmaStoreExecutablePath + '\'' + + ", rayletExecutablePath='" + rayletExecutablePath + '\'' + + '}'; + } + + /** + * Create a RayConfig by reading configuration in the following order: + * 1. System properties. + * 2. `ray.conf` file. + * 3. `ray.default.conf` file. + */ + public static RayConfig create() { + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.systemProperties() + .withFallback(ConfigFactory.load(CUSTOM_CONFIG_FILE)) + .withFallback(ConfigFactory.load(DEFAULT_CONFIG_FILE)); + return new RayConfig(config); + } + +} diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayParameters.java b/java/runtime/src/main/java/org/ray/runtime/config/RayParameters.java deleted file mode 100644 index 579666b9a..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayParameters.java +++ /dev/null @@ -1,106 +0,0 @@ -package org.ray.runtime.config; - -import org.ray.api.id.UniqueId; -import org.ray.runtime.util.NetworkUtil; -import org.ray.runtime.util.config.AConfig; -import org.ray.runtime.util.config.ConfigReader; - -/** - * Runtime parameters of Ray process. - */ -public class RayParameters { - - @AConfig(comment = "worker mode for this process DRIVER | WORKER | NONE") - public WorkerMode worker_mode = WorkerMode.DRIVER; - - @AConfig(comment = "run mode for this app SINGLE_PROCESS | SINGLE_BOX | CLUSTER") - public RunMode run_mode = RunMode.SINGLE_PROCESS; - - @AConfig(comment = "local node ip") - public String node_ip_address = NetworkUtil.getIpAddress(null); - - @AConfig(comment = "primary redis address (e.g., 127.0.0.1:34222") - public String redis_address = ""; - - @AConfig(comment = "object store name (e.g., /tmp/store1111") - public String object_store_name = ""; - - @AConfig(comment = "object store rpc listen port") - public int object_store_rpc_port = 32567; - - @AConfig(comment = "driver ID when the worker is served as a driver") - public UniqueId driver_id = UniqueId.NIL; - - @AConfig(comment = "logging directory") - public String log_dir = "/tmp/raylogs"; - - @AConfig(comment = "primary redis port") - public int redis_port = 34222; - - @AConfig(comment = "number of workers started initially") - public int num_workers = 1; - - @AConfig(comment = "redirect err and stdout to files for newly created processes") - public boolean redirect = true; - - @AConfig(comment = "whether to start redis shard server in addition to the primary server") - public boolean start_redis_shards = false; - - @AConfig(comment = "whether to clean up the processes when there is a process start failure") - public boolean cleanup = false; - - @AConfig(comment = "number of redis shard servers to be started") - public int num_redis_shards = 0; - - @AConfig(comment = "whether this is a deployment in cluster") - public boolean deploy = false; - - @AConfig(comment = "whether this is for python deployment") - public boolean py = false; - - @AConfig(comment = "the max bytes of the buffer for task submit") - public int max_submit_task_buffer_size_bytes = 2 * 1024 * 1024; - - @AConfig(comment = "default first check timeout(ms)") - public int default_first_check_timeout_ms = 1000; - - @AConfig(comment = "default get check rate(ms)") - public int default_get_check_interval_ms = 5000; - - @AConfig(comment = "add the jvm parameters for java worker") - public String jvm_parameters = ""; - - @AConfig(comment = "set the occupied memory(MB) size of object store") - public int object_store_occupied_memory_MB = 1000; - - @AConfig(comment = "whether to use supreme failover strategy") - public boolean supremeFO = false; - - @AConfig(comment = "whether to disable process failover") - public boolean disable_process_failover = false; - - @AConfig(comment = "delay seconds under onebox before app logic for debugging") - public int onebox_delay_seconds_before_run_app_logic = 0; - - @AConfig(comment = "raylet socket name (e.g., /tmp/raylet1111") - public String raylet_socket_name = ""; - - @AConfig(comment = "raylet rpc listen port") - public int raylet_port = 35567; - - @AConfig(comment = "worker fetch request size") - public int worker_fetch_request_size = 10000; - - @AConfig(comment = "static resource list of this node") - public String static_resources = ""; - - public RayParameters(ConfigReader config) { - if (null != config) { - String networkInterface = config.getStringValue("ray.java", "network_interface", null, - "Network interface to be specified for host ip address(e.g., en0, eth0), may use " - + "ifconfig to get options"); - node_ip_address = NetworkUtil.getIpAddress(networkInterface); - config.readObject("ray.java.start", this, this); - } - } -} diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RunMode.java b/java/runtime/src/main/java/org/ray/runtime/config/RunMode.java index 988ed1984..835cb8e8d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RunMode.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RunMode.java @@ -1,39 +1,15 @@ package org.ray.runtime.config; public enum RunMode { - SINGLE_PROCESS(true, false), // dev path, dev runtime - SINGLE_BOX(true, true), // dev path, native runtime - CLUSTER(false, true); // deploy path, naive runtime - - - RunMode(boolean devPathManager, - boolean nativeRuntime) { - this.devPathManager = devPathManager; - this.nativeRuntime = nativeRuntime; - } /** - * the jar has add to java -cp, no need to load jar after started. + * Ray is running in one single Java process, without Raylet backend, object store, and GCS. + * It's useful for debug. */ - private final boolean devPathManager; - - private final boolean nativeRuntime; + SINGLE_PROCESS, /** - * Getter method for property devPathManager. - * - * @return property value of devPathManager + * Ray is running on one or more nodes, with multiple processes. */ - public boolean isDevPathManager() { - return devPathManager; - } - - /** - * Getter method for property nativeRuntime. - * - * @return property value of nativeRuntime - */ - public boolean isNativeRuntime() { - return nativeRuntime; - } -} \ No newline at end of file + CLUSTER, +} diff --git a/java/runtime/src/main/java/org/ray/runtime/config/WorkerMode.java b/java/runtime/src/main/java/org/ray/runtime/config/WorkerMode.java index c9e71e536..947159c3b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/WorkerMode.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/WorkerMode.java @@ -1,7 +1,6 @@ package org.ray.runtime.config; public enum WorkerMode { - NONE, // not set - DRIVER, // driver - WORKER // worker + DRIVER, + WORKER } diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java index be98ba234..3dbe7b614 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java @@ -7,7 +7,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.arrow.plasma.ObjectStoreLink; import org.ray.api.id.UniqueId; -import org.ray.runtime.WorkerContext; +import org.ray.runtime.RayDevRuntime; import org.ray.runtime.raylet.MockRayletClient; import org.ray.runtime.util.logger.RayLog; @@ -16,10 +16,15 @@ import org.ray.runtime.util.logger.RayLog; */ public class MockObjectStore implements ObjectStoreLink { + private final RayDevRuntime runtime; private final Map data = new ConcurrentHashMap<>(); private final Map metadata = new ConcurrentHashMap<>(); private MockRayletClient scheduler = null; + public MockObjectStore(RayDevRuntime runtime) { + this.runtime = runtime; + } + @Override public void put(byte[] objectId, byte[] value, byte[] metadataValue) { if (objectId == null || objectId.length == 0 || value == null) { @@ -87,7 +92,7 @@ public class MockObjectStore implements ObjectStoreLink { } private String logPrefix() { - return WorkerContext.currentTask().taskId + "-" + getUserTrace() + " -> "; + return runtime.getWorkerContext().getCurrentTask().taskId + "-" + getUserTrace() + " -> "; } private String getUserTrace() { diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java index 2832262b7..b497f5c44 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/ObjectStoreProxy.java @@ -5,7 +5,7 @@ import java.util.List; import org.apache.arrow.plasma.ObjectStoreLink; import org.apache.commons.lang3.tuple.Pair; import org.ray.api.id.UniqueId; -import org.ray.runtime.WorkerContext; +import org.ray.runtime.AbstractRayRuntime; import org.ray.runtime.util.Serializer; import org.ray.runtime.util.exception.TaskExecutionException; @@ -15,12 +15,14 @@ import org.ray.runtime.util.exception.TaskExecutionException; */ public class ObjectStoreProxy { + private final AbstractRayRuntime runtime; private final ObjectStoreLink store; private final int getTimeoutMs = 1000; - public ObjectStoreProxy(ObjectStoreLink store) { + public ObjectStoreProxy(AbstractRayRuntime runtime, ObjectStoreLink store) { + this.runtime = runtime; this.store = store; - } + } public Pair get(UniqueId objectId, boolean isMetadata) throws TaskExecutionException { @@ -31,7 +33,7 @@ public class ObjectStoreProxy { throws TaskExecutionException { byte[] obj = store.get(id.getBytes(), timeoutMs, isMetadata); if (obj != null) { - T t = Serializer.decode(obj, WorkerContext.currentClassLoader()); + T t = Serializer.decode(obj, runtime.getWorkerContext().getCurrentClassLoader()); store.release(id.getBytes()); if (t instanceof TaskExecutionException) { throw (TaskExecutionException) t; @@ -54,7 +56,7 @@ public class ObjectStoreProxy { for (int i = 0; i < objs.size(); i++) { byte[] obj = objs.get(i); if (obj != null) { - T t = Serializer.decode(obj, WorkerContext.currentClassLoader()); + T t = Serializer.decode(obj, runtime.getWorkerContext().getCurrentClassLoader()); store.release(ids.get(i).getBytes()); if (t instanceof TaskExecutionException) { throw (TaskExecutionException) t; diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/ProcessInfo.java b/java/runtime/src/main/java/org/ray/runtime/runner/ProcessInfo.java deleted file mode 100644 index b7e3a3b12..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/runner/ProcessInfo.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.ray.runtime.runner; - -public class ProcessInfo { - - public Process process; - public String[] cmd; - public RunInfo.ProcessType type; - public String name; - public String redisAddress; - public String ip; - public boolean redirect; - public boolean cleanup; -} \ No newline at end of file diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunInfo.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunInfo.java deleted file mode 100644 index 8a532ac75..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunInfo.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.ray.runtime.runner; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.ray.runtime.gcs.AddressInfo; - -/** - * information of kinds of processes. - */ -public class RunInfo { - - public String redisAddress; - public List redisShards; - public List localStores = new ArrayList<>(); - public ArrayList> allProcesses = initProcessInfoArray(); - public ArrayList> toBeCleanedProcesses = initProcessArray(); - public ArrayList deadProcess = new ArrayList<>(); - - private ArrayList> initProcessArray() { - ArrayList> processes = new ArrayList<>(); - for (ProcessType ignored : ProcessType.values()) { - processes.add(Collections.synchronizedList(new ArrayList<>())); - } - return processes; - } - - private ArrayList> initProcessInfoArray() { - ArrayList> processes = new ArrayList<>(); - for (ProcessType ignored : ProcessType.values()) { - processes.add(Collections.synchronizedList(new ArrayList<>())); - } - return processes; - } - - public enum ProcessType { - PT_WORKER, - PT_PLASMA_STORE, - PT_REDIS_SERVER, - PT_WEB_UI, - PT_RAYLET, - PT_DRIVER - } - -} diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 240366ce9..03429c963 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -1,25 +1,23 @@ package org.ray.runtime.runner; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.ray.api.id.UniqueId; -import org.ray.runtime.config.PathConfig; -import org.ray.runtime.config.RayParameters; -import org.ray.runtime.gcs.AddressInfo; -import org.ray.runtime.runner.RunInfo.ProcessType; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.ray.runtime.config.RayConfig; +import org.ray.runtime.util.FileUtil; import org.ray.runtime.util.ResourceUtil; -import org.ray.runtime.util.StringUtil; -import org.ray.runtime.util.config.ConfigReader; -import org.ray.runtime.util.logger.RayLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** @@ -27,501 +25,224 @@ import redis.clients.jedis.Jedis; */ public class RunManager { + private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class); + private static final DateTimeFormatter DATE_TIME_FORMATTER = - DateTimeFormatter.ofPattern("Y-m-d_H-M-S"); + DateTimeFormatter.ofPattern("Y-M-d_H-m-s"); - private RayParameters params; + private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker"; - private PathConfig paths; + private RayConfig rayConfig; - private ConfigReader configReader; + private Random random; - private RunInfo runInfo = new RunInfo(); + private List processes; - private Random random = new Random(); - - - public RunManager(RayParameters params, PathConfig paths, ConfigReader configReader) { - this.params = params; - this.paths = paths; - this.configReader = configReader; + public RunManager(RayConfig rayConfig) { + this.rayConfig = rayConfig; + processes = new ArrayList<>(); + random = new Random(); } - private static boolean killProcess(Process p) { - if (p.isAlive()) { + public void cleanup() { + for (Process p : processes) { p.destroy(); - return true; - } else { - return false; } } - public RunInfo info() { - return runInfo; + private void createTempDirs() { + FileUtil.mkDir(new File(rayConfig.logDir)); + FileUtil.mkDir(new File(rayConfig.rayletSocketName).getParentFile()); + FileUtil.mkDir(new File(rayConfig.objectStoreSocketName).getParentFile()); } - public void startRayHead() throws Exception { - if (params.redis_address.length() != 0) { - throw new Exception("Redis address must be empty in head node."); - } - if (params.num_redis_shards <= 0) { - params.num_redis_shards = 1; + /** + * Start a process. + * @param command The command to start the process with. + * @param env Environment variables. + * @param name Process name. + */ + private void startProcess(List command, Map env, String name) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Starting process {} with command: {}", name, command, + Joiner.on(" ").join(command)); } - params.start_redis_shards = true; + ProcessBuilder builder = new ProcessBuilder(command); - startRayProcesses(); - } - - public void startRayNode() throws Exception { - if (params.redis_address.length() == 0) { - throw new Exception("Redis address cannot be empty in non-head node."); - } - if (params.num_redis_shards != 0) { - throw new Exception("Number of redis shards should be zero in non-head node."); - } - - params.start_redis_shards = false; - - startRayProcesses(); - } - - public Process startDriver(String mainClass, String redisAddress, UniqueId driverId, - String logDir, String ip, - String driverClass, String driverArgs, String additonalClassPaths, - String additionalConfigs) { - String driverConfigs = - "ray.java.start.driver_id=" + driverId + ";ray.java.start.driver_class=" + driverClass; - if (driverArgs != null) { - driverConfigs += ";ray.java.start.driver_args=" + driverArgs; - } - - if (null != additionalConfigs) { - additionalConfigs += ";" + driverConfigs; - } else { - additionalConfigs = driverConfigs; - } - - return startJavaProcess( - RunInfo.ProcessType.PT_DRIVER, - mainClass, - additonalClassPaths, - additionalConfigs, - "", - ip, - redisAddress, - false, - false, - null - ); - } - - private Process startJavaProcess(RunInfo.ProcessType pt, String mainClass, - String additonalClassPaths, String additionalConfigs, - String additionalJvmArgs, String ip, String - redisAddr, boolean redirect, - boolean cleanup, String agentlibAddr) { - - String cmd = buildJavaProcessCommand(pt, mainClass, additonalClassPaths, additionalConfigs, - additionalJvmArgs, ip, redisAddr, agentlibAddr); - return startProcess(cmd.split(" "), null, pt, "", redisAddr, ip, redirect, cleanup); - } - - private String buildJavaProcessCommand( - RunInfo.ProcessType pt, String mainClass, String additionalClassPaths, - String additionalConfigs, - String additionalJvmArgs, String ip, String redisAddr, String agentlibAddr) { - String cmd = "java -ea -noverify " + params.jvm_parameters + " "; - if (agentlibAddr != null && !agentlibAddr.equals("")) { - cmd += " -agentlib:jdwp=transport=dt_socket,address=" + agentlibAddr + ",server=y,suspend=n"; - } - - cmd += " -Djava.library.path=" + StringUtil.mergeArray(paths.java_jnilib_paths, ":"); - cmd += " -classpath " + StringUtil.mergeArray(paths.java_class_paths, ":"); - - if (additionalClassPaths.length() > 0) { - cmd += ":" + additionalClassPaths; - } - - if (additionalJvmArgs.length() > 0) { - cmd += " " + additionalJvmArgs; - } - - cmd += " " + mainClass; - - String section = "ray.java.start."; - cmd += " --config=" + configReader.filePath(); - cmd += " --overwrite=" - + section + "node_ip_address=" + ip + ";" - + section + "redis_address=" + redisAddr + ";" - + section + "log_dir=" + params.log_dir + ";" - + section + "run_mode=" + params.run_mode; - - if (additionalConfigs.length() > 0) { - cmd += ";" + additionalConfigs; - } - - return cmd; - } - - private Process startProcess(String[] cmd, Map env, RunInfo.ProcessType type, - String name, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { - ProcessBuilder builder; - List newCommand = Arrays.asList(cmd); - builder = new ProcessBuilder(newCommand); - - if (redirect) { + if (rayConfig.redirectOutput) { + // Set stdout and stderr paths. int logId = random.nextInt(10000); String date = DATE_TIME_FORMATTER.format(LocalDateTime.now()); - String stdout = String.format("%s/%s-%s-%05d.out", params.log_dir, name, date, logId); - String stderr = String.format("%s/%s-%s-%05d.err", params.log_dir, name, date, logId); + String stdout = String.format("%s/%s-%s-%05d.out", rayConfig.logDir, name, date, logId); + String stderr = String.format("%s/%s-%s-%05d.err", rayConfig.logDir, name, date, logId); builder.redirectOutput(new File(stdout)); builder.redirectError(new File(stderr)); - recordLogFilesInRedis(redisAddress, ip, ImmutableList.of(stdout, stderr)); } - + // Set environment variables. if (env != null && !env.isEmpty()) { builder.environment().putAll(env); } - Process p = null; + Process p; try { p = builder.start(); } catch (IOException e) { - RayLog.core.error("Failed to start process {}", name, e); - return null; + LOGGER.error("Failed to start process " + name, e); + throw new RuntimeException("Failed to start process " + name, e); } - - RayLog.core.info("Process {} started", name); - - if (cleanup) { - runInfo.toBeCleanedProcesses.get(type.ordinal()).add(p); - } - - ProcessInfo processInfo = new ProcessInfo(); - processInfo.cmd = cmd; - processInfo.type = type; - processInfo.name = name; - processInfo.redisAddress = redisAddress; - processInfo.ip = ip; - processInfo.redirect = redirect; - processInfo.cleanup = cleanup; - processInfo.process = p; - runInfo.allProcesses.get(type.ordinal()).add(processInfo); - - return p; - } - - private void recordLogFilesInRedis(String redisAddress, String nodeIpAddress, - List logFiles) { - if (redisAddress != null && !redisAddress.isEmpty() && nodeIpAddress != null - && !nodeIpAddress.isEmpty() && logFiles.size() > 0) { - String[] ipPort = redisAddress.split(":"); - Jedis jedisClient = new Jedis(ipPort[0], Integer.parseInt(ipPort[1])); - String logFileListKey = String.format("LOG_FILENAMES:{%s}", nodeIpAddress); - for (String logfile : logFiles) { - jedisClient.rpush(logFileListKey, logfile); - } - jedisClient.close(); - } - } - - private void startRayProcesses() { - Jedis redisClient = null; - - RayLog.core.info("start ray processes @ " + params.node_ip_address + " ..."); - - // start primary redis - if (params.redis_address.length() == 0) { - List primaryShards = startRedis( - params.node_ip_address, params.redis_port, 1, params.redirect, params.cleanup); - params.redis_address = primaryShards.get(0); - - String[] args = params.redis_address.split(":"); - redisClient = new Jedis(args[0], Integer.parseInt(args[1])); - - // Register the number of Redis shards in the primary shard, so that clients - // know how many redis shards to expect under RedisShards. - redisClient.set("NumRedisShards", Integer.toString(params.num_redis_shards)); - } else { - String[] args = params.redis_address.split(":"); - redisClient = new Jedis(args[0], Integer.parseInt(args[1])); - } - runInfo.redisAddress = params.redis_address; - - // start redis shards - if (params.start_redis_shards) { - runInfo.redisShards = startRedis( - params.node_ip_address, params.redis_port + 1, params.num_redis_shards, - params.redirect, - params.cleanup); - - // Store redis shard information in the primary redis shard. - for (int i = 0; i < runInfo.redisShards.size(); i++) { - String addr = runInfo.redisShards.get(i); - redisClient.rpush("RedisShards", addr); - } - } - redisClient.close(); - - AddressInfo info = new AddressInfo(); - - // Start object store - int rpcPort = params.object_store_rpc_port; - String storeName = "/tmp/plasma_store" + rpcPort; - - startObjectStore(0, info, - params.redis_address, params.node_ip_address, params.redirect, params.cleanup); - - Map staticResources = - ResourceUtil.getResourcesMapFromString(params.static_resources); - - //Start raylet - startRaylet(storeName, info, params.num_workers, - params.redis_address, - params.node_ip_address, params.redirect, staticResources, params.cleanup); - - runInfo.localStores.add(info); - - if (!checkAlive()) { - cleanup(true); - throw new RuntimeException("Start Ray processes failed"); - } - } - - private boolean checkAlive() { - RunInfo.ProcessType[] types = RunInfo.ProcessType.values(); - for (int i = 0; i < types.length; i++) { - ProcessInfo p; - for (int j = 0; j < runInfo.allProcesses.get(i).size(); ) { - p = runInfo.allProcesses.get(i).get(j); - if (!p.process.isAlive()) { - RayLog.core.error("Process " + p.process.hashCode() + " is not alive!" + " Process Type " - + types[i].name()); - runInfo.deadProcess.add(p); - runInfo.allProcesses.get(i).remove(j); - } else { - j++; - } - } - } - - return runInfo.deadProcess.isEmpty(); - } - - // kill all processes started by startRayHead - public void cleanup(boolean killAll) { - // clean up the process in reverse order - for (int i = ProcessType.values().length - 1; i >= 0; i--) { - if (killAll) { - runInfo.allProcesses.get(i).forEach(p -> { - if (killProcess(p.process)) { - RayLog.core.info("Kill process " + p.process.hashCode() + " forcely"); - } - }); - } else { - runInfo.toBeCleanedProcesses.get(i).forEach(p -> { - if (killProcess(p)) { - RayLog.core.info("Kill process " + p.hashCode() + " forcely"); - } - }); - } - - runInfo.toBeCleanedProcesses.get(i).clear(); - runInfo.allProcesses.get(i).clear(); - runInfo.deadProcess.clear(); - } - } - - // - // start a redis server - // - // @param ip the IP address of the local node - // @param port port to be opended for redis traffic - // @param numOfShards the number of redis shards to start - // @param redirect whether to redirect the output/err to the log files - // @param cleanup true if using ray in local mode. If cleanup is true, when - // all Redis processes started by this method will be killed by @cleanup - // when the worker exits - // @return primary redis shard address - // - private List startRedis(String ip, int port, int numOfShards, - boolean redirect, boolean cleanup) { - ArrayList shards = new ArrayList<>(); - String addr; - for (int i = 0; i < numOfShards; i++) { - addr = startRedisInstance(ip, port + i, redirect, cleanup); - - if (addr.length() == 0) { - cleanup(cleanup); - shards.clear(); - return shards; - } else { - shards.add(addr); - } - } - - for (String shard : shards) { - // TODO: wait for redis server to start - } - - return shards; - } - - // - // @param ip local node ip, only used for logging purpose - // @param port given port for this redis instance, 0 for auto-selected port - // @return redis server address - // - private String startRedisInstance(String ip, int port, - boolean redirect, boolean cleanup) { - String redisFilePath = paths.redis_server; - String redisModule = paths.redis_module; - - assert (new File(redisFilePath).exists()) : "file don't exsits : " + redisFilePath; - assert (new File(redisModule).exists()) : "file don't exsits : " + redisModule; - - String cmd = redisFilePath + " --protected-mode no --port " + port + " --loglevel warning" - + " --loadmodule " + redisModule; - - Map env = null; - Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_REDIS_SERVER, - "redis", "", ip, redirect, cleanup); - - if (p == null || !p.isAlive()) { - return ""; - } - + // Wait 200ms and check whether the process is alive. try { - TimeUnit.MILLISECONDS.sleep(300); + TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } + if (!p.isAlive()) { + throw new RuntimeException("Failed to start " + name); + } + processes.add(p); + LOGGER.info("{} process started", name); + } - Jedis client = new Jedis(params.node_ip_address, port); + /** + * Start all Ray processes on this node. + * @param isHead Whether this node is the head node. If true, redis server will be started. + */ + public void startRayProcesses(boolean isHead) { + LOGGER.info("Starting ray processes @ {}.", rayConfig.nodeIp); + try { + createTempDirs(); + if (isHead) { + startRedisServer(); + } + startObjectStore(); + startRaylet(); + LOGGER.info("All processes started @ {}.", rayConfig.nodeIp); + } catch (Exception e) { + // Clean up started processes. + cleanup(); + LOGGER.error("Failed to start ray processes.", e); + throw new RuntimeException("Failed to start ray processes.", e); + } + } - // Configure Redis to only generate notifications for the export keys. - client.configSet("notify-keyspace-events", "Kl"); + private void startRedisServer() { + // start primary redis + String primary = startRedisInstance(rayConfig.nodeIp, rayConfig.headRedisPort, null); + rayConfig.setRedisAddress(primary); + try (Jedis client = new Jedis("127.0.0.1", rayConfig.headRedisPort)) { + client.set("UseRaylet", "1"); + // Register the number of Redis shards in the primary shard, so that clients + // know how many redis shards to expect under RedisShards. + client.set("NumRedisShards", Integer.toString(rayConfig.numberRedisShards)); - // Put a time stamp in Redis to indicate when it was started. - client.set("redis_start_time", LocalDateTime.now().toString()); + // start redis shards + for (int i = 0; i < rayConfig.numberRedisShards; i++) { + String shard = startRedisInstance(rayConfig.nodeIp, rayConfig.headRedisPort + i + 1, i); + client.rpush("RedisShards", shard); + } + } + } + + private String startRedisInstance(String ip, int port, Integer shard) { + List command = ImmutableList.of( + rayConfig.redisServerExecutablePath, + "--protected-mode", + "no", + "--port", + String.valueOf(port), + "--loglevel", + "warning", + "--loadmodule", + rayConfig.redisModulePath + ); + String name = shard == null ? "redis" : "redis-" + shard; + startProcess(command, null, name); + + try (Jedis client = new Jedis("127.0.0.1", port)) { + // Configure Redis to only generate notifications for the export keys. + client.configSet("notify-keyspace-events", "Kl"); + // Put a time stamp in Redis to indicate when it was started. + client.set("redis_start_time", LocalDateTime.now().toString()); + } - client.close(); return ip + ":" + port; } - private void startRaylet(String storeName, AddressInfo info, int numWorkers, - String redisAddress, String ip, boolean redirect, - Map staticResources, boolean cleanup) { - - int rpcPort = params.raylet_port; - String rayletSocketName = "/tmp/raylet" + rpcPort; - - String filePath = paths.raylet; - - //Create the worker command that the raylet will use to start workers. - String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName, - UniqueId.NIL, "", ip, redisAddress); - - int sep = redisAddress.indexOf(':'); - assert (sep != -1); - String gcsIp = redisAddress.substring(0, sep); - String gcsPort = redisAddress.substring(sep + 1); - - String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources); - + private void startRaylet() { int hardwareConcurrency = Runtime.getRuntime().availableProcessors(); - int maximumStartupConcurrency = Math.max(1, Math.min(staticResources.get("CPU").intValue(), - hardwareConcurrency)); + int maximumStartupConcurrency = Math.max(1, + Math.min(rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency)); - // The second-last arugment is the worker command for Python, not needed for Java. - String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp, - gcsPort, String.valueOf(numWorkers), String.valueOf(maximumStartupConcurrency), - resourceArgument, "", workerCommand}; - - Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET, - "raylet", redisAddress, ip, redirect, cleanup); - - if (p != null && p.isAlive()) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - if (p == null || !p.isAlive()) { - info.rayletSocketName = ""; - info.rayletRpcAddr = ""; - throw new RuntimeException("Failed to start raylet process."); - } else { - info.rayletSocketName = rayletSocketName; - info.rayletRpcAddr = ip + ":" + rpcPort; - } - } - - private String buildWorkerCommandRaylet(String storeName, String rayletSocketName, - UniqueId actorId, String actorClass, - String ip, String redisAddress) { - String workerConfigs = "ray.java.start.object_store_name=" + storeName - + ";ray.java.start.raylet_socket_name=" + rayletSocketName - + ";ray.java.start.worker_mode=WORKER"; - workerConfigs += ";ray.java.start.deploy=" + params.deploy; - if (!actorId.equals(UniqueId.NIL)) { - workerConfigs += ";ray.java.start.actor_id=" + actorId; - } - if (!actorClass.equals("")) { - workerConfigs += ";ray.java.start.driver_class=" + actorClass; - } - - String jvmArgs = ""; - jvmArgs += " -Dlogging.path=" + params.log_dir; - jvmArgs += " -Dlogging.file.name=core-*pid_suffix*"; - - return buildJavaProcessCommand( - RunInfo.ProcessType.PT_WORKER, - "org.ray.runtime.runner.worker.DefaultWorker", - "", - workerConfigs, - jvmArgs, - ip, - redisAddress, - null + // See `src/ray/raylet/main.cc` for the meaning of each parameter. + List command = ImmutableList.of( + rayConfig.rayletExecutablePath, + rayConfig.rayletSocketName, + rayConfig.objectStoreSocketName, + rayConfig.nodeIp, + rayConfig.getRedisIp(), + rayConfig.getRedisPort().toString(), + "0", // number of initial workers + String.valueOf(maximumStartupConcurrency), + ResourceUtil.getResourcesStringFromMap(rayConfig.resources), + "", // python worker command + buildWorkerCommandRaylet() // java worker command ); + + startProcess(command, null, "raylet"); } - private void startObjectStore(int index, AddressInfo info, String redisAddress, - String ip, boolean redirect, boolean cleanup) { - int occupiedMemoryMb = params.object_store_occupied_memory_MB; - long memoryBytes = occupiedMemoryMb * 1000000; - String filePath = paths.store; - int rpcPort = params.object_store_rpc_port + index; - String name = "/tmp/plasma_store" + rpcPort; - String rpcAddr = ""; - String cmd = filePath + " -s " + name + " -m " + memoryBytes; + private String concatPath(Stream stream) { + // TODO (hchen): Right now, raylet backend doesn't support worker command with spaces. + // Thus, we have to drop some some paths until that is fixed. + return stream.filter(s -> !s.contains(" ")).collect(Collectors.joining(":")); + } - Map env = null; - Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_STORE, - "plasma_store", redisAddress, ip, redirect, cleanup); + private String buildWorkerCommandRaylet() { + List cmd = new ArrayList<>(); + cmd.add("java"); + cmd.add("-classpath"); - if (p != null && p.isAlive()) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } + // Generate classpath based on current classpath + user-defined classpath. + String classpath = concatPath(Stream.concat( + Stream.of(System.getProperty("java.class.path").split(":")), + rayConfig.classpath.stream() + )); + cmd.add(classpath); + + // library path + String libraryPath = concatPath(rayConfig.libraryPath.stream()); + cmd.add("-Djava.library.path=" + libraryPath); + + // logging path + if (rayConfig.redirectOutput) { + cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender"); + cmd.add("-Dray.logging.file=org.apache.log4j.FileAppender"); + int logId = random.nextInt(10000); + String date = DATE_TIME_FORMATTER.format(LocalDateTime.now()); + String logFile = String.format("%s/worker-%s-%05d.out", rayConfig.logDir, date, logId); + cmd.add("-Dray.logging.file.path=" + logFile); } - if (p == null || !p.isAlive()) { - info.storeName = ""; - info.storeRpcAddr = ""; - throw new RuntimeException("Start object store failed ..."); - } else { - info.storeName = name; - info.storeRpcAddr = rpcAddr; - } + // Config overwrite + cmd.add("-Dray.redis.address=" + rayConfig.getRedisAddress()); + + // Main class + cmd.add(WORKER_CLASS); + String command = Joiner.on(" ").join(cmd); + LOGGER.debug("Worker command is: {}", command); + return command; + } + + private void startObjectStore() { + List command = ImmutableList.of( + rayConfig.plasmaStoreExecutablePath, + "-s", + rayConfig.objectStoreSocketName, + "-m", + rayConfig.objectStoreSize.toString() + ); + startProcess(command, null, "plasma_store"); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultDriver.java b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultDriver.java index e34cd6ab5..14b315146 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultDriver.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultDriver.java @@ -1,7 +1,6 @@ package org.ray.runtime.runner.worker; -import org.ray.runtime.AbstractRayRuntime; -import org.ray.runtime.config.WorkerMode; +import org.ray.api.Ray; /** * The main function of DefaultDriver. @@ -15,15 +14,11 @@ public class DefaultDriver { // public static void main(String[] args) { try { - AbstractRayRuntime.init(args); - assert AbstractRayRuntime.getParams().worker_mode == WorkerMode.DRIVER; + System.setProperty("ray.worker.mode", "DRIVER"); + Ray.init(); - String driverClass = AbstractRayRuntime.configReader - .getStringValue("ray.java.start", "driver_class", "", - "java class which main is served as the driver in a java worker"); - String driverArgs = AbstractRayRuntime.configReader - .getStringValue("ray.java.start", "driver_args", "", - "arguments for the java class main function which is served at the driver"); + String driverClass = null; + String driverArgs = null; Class cls = Class.forName(driverClass); String[] argsArray = (driverArgs != null) ? driverArgs.split(",") : (new String[] {}); cls.getMethod("main", String[].class).invoke(null, (Object) argsArray); diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java index 9c57ec133..e67ec1967 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java @@ -1,31 +1,29 @@ package org.ray.runtime.runner.worker; +import org.ray.api.Ray; import org.ray.runtime.AbstractRayRuntime; -import org.ray.runtime.config.WorkerMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * default worker implementation. + * Default implementation of the worker process. */ public class DefaultWorker { - // - // String workerCmd = "java" + " -jarls " + workerPath + " --node-ip-address=" + ip - // + " --object-store-name=" + storeName - // + " --object-store-manager-name=" + storeManagerName - // + " --local-scheduler-name=" + name + " --redis-address=" + redisAddress - // + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultWorker.class); + public static void main(String[] args) { try { - AbstractRayRuntime.init(args); - assert AbstractRayRuntime.getParams().worker_mode == WorkerMode.WORKER; - AbstractRayRuntime.getInstance().loop(); - throw new RuntimeException("Control flow should never reach here"); - - } catch (Throwable e) { - e.printStackTrace(); - System.err - .println("--config=ray.config.ini --overwrite=ray.java.start.worker_mode=WORKER;..."); - System.exit(-1); + System.setProperty("ray.worker.mode", "WORKER"); + Ray.init(); + } catch (Exception e) { + LOGGER.error("Worker failed to start.", e); + } + LOGGER.info("Worker started."); + try { + ((AbstractRayRuntime)Ray.internal()).loop(); + } catch (Exception e) { + LOGGER.error("Error occurred in worker.", e); } } } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/ResourceUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/ResourceUtil.java index 371fd28ef..499ef8f77 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/ResourceUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/ResourceUtil.java @@ -37,6 +37,9 @@ public class ResourceUtil { * @return The format resources string, like "{CPU:4, GPU:0}". */ public static String getResourcesFromatStringFromMap(Map resources) { + if (resources == null) { + return "{}"; + } StringBuilder builder = new StringBuilder(); builder.append("{"); int count = 1; @@ -89,6 +92,9 @@ public class ResourceUtil { String[] items = resources.split(","); for (String item : items) { String trimItem = item.trim(); + if (trimItem.isEmpty()) { + continue; + } String[] resourcePair = trimItem.split(":"); if (resourcePair.length != 2) { diff --git a/java/runtime/src/main/java/org/ray/runtime/util/config/AConfig.java b/java/runtime/src/main/java/org/ray/runtime/util/config/AConfig.java deleted file mode 100644 index c37f3edbf..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/config/AConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.ray.runtime.util.config; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotate a field as a ray configuration item. - */ -@Target({ElementType.FIELD}) -@Retention(RetentionPolicy.RUNTIME) -public @interface AConfig { - - /** - * comments for this configuration field. - */ - String comment(); - - /** - * when the config is an array list, a splitter set is specified, e.g., " \t" to use ' ' and '\t' - * as possible splits. - */ - String splitters() default ", \t"; - - /** - * indirect with value as the new section name, the field name remains the same. - */ - String defaultIndirectSectionName() default ""; - - /** - * see ConfigReader.getIndirectStringArray this config tells which is the default - * indirectSectionName in that function. - */ - String defaultArrayIndirectSectionName() default ""; -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigItem.java b/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigItem.java deleted file mode 100644 index 89f074906..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigItem.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.ray.runtime.util.config; - -/** - * A ray configuration item of type {@code T}. - */ -public class ConfigItem { - - public String key; - - public String oriValue; - - public T defaultValue; - - public String desc; -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigReader.java b/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigReader.java deleted file mode 100644 index 4f9c93f0e..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigReader.java +++ /dev/null @@ -1,382 +0,0 @@ -package org.ray.runtime.util.config; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.InputStream; -import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Vector; -import org.ini4j.Config; -import org.ini4j.Ini; -import org.ini4j.Profile; -import org.ray.api.id.UniqueId; -import org.ray.runtime.util.ObjectUtil; -import org.ray.runtime.util.StringUtil; - -/** - * Loads configurations from a file. - */ -public class ConfigReader { - - private final CurrentUseConfig currentUseConfig = new CurrentUseConfig(); - - private final Ini ini = new Ini(); - - private String file = ""; - - public ConfigReader(String filePath) throws Exception { - this(filePath, null); - } - - public ConfigReader(String filePath, String updateConfigStr) throws Exception { - System.out.println("Build ConfigReader, the file path " + filePath + " ,the update config str " - + updateConfigStr); - try { - loadConfigFile(filePath); - updateConfigFile(updateConfigStr); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - - } - - private void loadConfigFile(String filePath) throws Exception { - - this.currentUseConfig.filePath = filePath; - String configFileDir = (new File(filePath)).getAbsoluteFile().getParent(); - byte[] encoded = Files.readAllBytes(Paths.get(filePath)); - String content = new String(encoded, StandardCharsets.UTF_8); - content = content.replaceAll("%CONFIG_FILE_DIR%", configFileDir); - - InputStream fis = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); - Config config = new Config(); - ini.setConfig(config); - ini.load(fis); - file = currentUseConfig.filePath; - } - - private void updateConfigFile(String updateConfigStr) { - - if (updateConfigStr == null) { - return; - } - String[] updateConfigArray = updateConfigStr.split(";"); - for (String currentUpdateConfig : updateConfigArray) { - if (StringUtil.isNullOrEmpty(currentUpdateConfig)) { - continue; - } - - String[] currentUpdateConfigArray = currentUpdateConfig.split("="); - String sectionAndItemKey; - String value = ""; - if (currentUpdateConfigArray.length == 2) { - sectionAndItemKey = currentUpdateConfigArray[0]; - value = currentUpdateConfigArray[1]; - } else if (currentUpdateConfigArray.length == 1) { - sectionAndItemKey = currentUpdateConfigArray[0]; - } else { - String errorMsg = "invalid config (must be of k=v or k or k=): " + currentUpdateConfig; - System.err.println(errorMsg); - throw new RuntimeException(errorMsg); - } - - int splitOffset = sectionAndItemKey.lastIndexOf("."); - int len = sectionAndItemKey.length(); - if (splitOffset < 1 || splitOffset == len - 1) { - String errorMsg = - "invalid config (no '.' found for section name and key):" + currentUpdateConfig; - System.err.println(errorMsg); - throw new RuntimeException(errorMsg); - } - - String sectionKey = sectionAndItemKey.substring(0, splitOffset); - String itemKey = sectionAndItemKey.substring(splitOffset + 1); - if (ini.containsKey(sectionKey)) { - ini.get(sectionKey).put(itemKey, value); - } else { - ini.add(sectionKey, itemKey, value); - } - } - } - - public String filePath() { - return file; - } - - public CurrentUseConfig getCurrentUseConfig() { - return currentUseConfig; - } - - public String getStringValue(String sectionKey, String configKey, String defaultValue, - String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - if (value != null) { - return value; - } else { - return defaultValue; - } - } - - public boolean getBooleanValue(String sectionKey, String configKey, boolean defaultValue, - String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - if (value != null) { - if (value.length() == 0) { - return defaultValue; - } else { - return Boolean.valueOf(value); - } - } else { - return defaultValue; - } - } - - public int getIntegerValue(String sectionKey, String configKey, int defaultValue, String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - if (value != null) { - if (value.length() == 0) { - return defaultValue; - } else { - return Integer.valueOf(value); - } - } else { - return defaultValue; - } - } - - private synchronized String getOriValue(String sectionKey, String configKey, T defaultValue, - String deptr) { - if (null == deptr) { - throw new RuntimeException("desc must not be empty of the key:" + configKey); - } - Profile.Section section = ini.get(sectionKey); - String oriValue = null; - if (section != null && section.containsKey(configKey)) { - oriValue = section.get(configKey); - } - - if (!currentUseConfig.sectionMap.containsKey(sectionKey)) { - ConfigSection configSection = new ConfigSection(); - configSection.sectionKey = sectionKey; - updateConfigSection(configSection, configKey, defaultValue, deptr, oriValue); - currentUseConfig.sectionMap.put(sectionKey, configSection); - } else if (!currentUseConfig.sectionMap.get(sectionKey).itemMap.containsKey(configKey)) { - ConfigSection configSection = currentUseConfig.sectionMap.get(sectionKey); - updateConfigSection(configSection, configKey, defaultValue, deptr, oriValue); - } - return oriValue; - } - - private void updateConfigSection(ConfigSection configSection, String configKey, - T defaultValue, String deptr, String oriValue) { - ConfigItem configItem = new ConfigItem<>(); - configItem.defaultValue = defaultValue; - configItem.key = configKey; - configItem.oriValue = oriValue; - configItem.desc = deptr; - configSection.itemMap.put(configKey, configItem); - } - - public long getLongValue(String sectionKey, String configKey, long defaultValue, String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - if (value != null) { - if (value.length() == 0) { - return defaultValue; - } else { - return Long.valueOf(value); - } - } else { - return defaultValue; - } - } - - public double getDoubleValue(String sectionKey, String configKey, double defaultValue, - String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - if (value != null) { - if (value.length() == 0) { - return defaultValue; - } else { - return Double.valueOf(value); - } - } else { - return defaultValue; - } - } - - - public int[] getIntegerArray(String sectionKey, String configKey, int[] defaultValue, - String dsptr) { - String value = getOriValue(sectionKey, configKey, defaultValue, dsptr); - int[] array = defaultValue; - if (value != null) { - String[] list = value.split(","); - array = new int[list.length]; - for (int i = 0; i < list.length; i++) { - array[i] = Integer.valueOf(list[i]); - } - } - return array; - } - - /** - * get a string list from a whole section as keys e.g., [core] data_dirs = local.dirs # or - * cluster.dirs - * [local.dirs] /home/xxx/1 /home/yyy/2 - * [cluster.dirs] ... - * - * @param sectionKey e.g., core - * @param configKey e.g., data_dirs - * @param indirectSectionName e.g., cluster.dirs - * @return string list - */ - public String[] getIndirectStringArray(String sectionKey, String configKey, - String indirectSectionName, String dsptr) { - String s = getStringValue(sectionKey, configKey, indirectSectionName, dsptr); - Profile.Section section = ini.get(s); - if (section == null) { - return new String[] {}; - } else { - return section.keySet().toArray(new String[] {}); - } - } - - public void readObject(String sectionKey, T obj, T defaultValues) { - for (Field fld : obj.getClass().getFields()) { - Object defaultFldValue; - try { - defaultFldValue = defaultValues != null ? fld.get(defaultValues) : null; - } catch (IllegalArgumentException | IllegalAccessException e) { - defaultFldValue = null; - } - - String section = sectionKey; - String comment; - String splitters = ", \t"; - String defaultArrayIndirectSectionName; - AConfig[] anns = fld.getAnnotationsByType(AConfig.class); - if (anns.length > 0) { - comment = anns[0].comment(); - if (!StringUtil.isNullOrEmpty(anns[0].splitters())) { - splitters = anns[0].splitters(); - } - defaultArrayIndirectSectionName = anns[0].defaultArrayIndirectSectionName(); - - // redirect the section if necessary - if (!StringUtil.isNullOrEmpty(anns[0].defaultIndirectSectionName())) { - section = this - .getStringValue(sectionKey, fld.getName(), anns[0].defaultIndirectSectionName(), - comment); - } - } else { - throw new RuntimeException("unspecified comment, please use @AConfig(comment = xxxx) for " - + obj.getClass().getName() + "." + fld.getName() + "'s configuration descriptions "); - } - - try { - if (fld.getType().isPrimitive()) { - if (fld.getType().equals(boolean.class)) { - boolean v = getBooleanValue(section, fld.getName(), (boolean) defaultFldValue, comment); - fld.set(obj, v); - } else if (fld.getType().equals(float.class)) { - float v = (float) getDoubleValue(section, fld.getName(), - (double) (float) defaultFldValue, comment); - fld.set(obj, v); - } else if (fld.getType().equals(double.class)) { - double v = getDoubleValue(section, fld.getName(), (double) defaultFldValue, comment); - fld.set(obj, v); - } else if (fld.getType().equals(byte.class)) { - byte v = (byte) getLongValue(section, fld.getName(), (long) (byte) defaultFldValue, - comment); - fld.set(obj, v); - } else if (fld.getType().equals(char.class)) { - char v = (char) getLongValue(section, fld.getName(), (long) (char) defaultFldValue, - comment); - fld.set(obj, v); - } else if (fld.getType().equals(short.class)) { - short v = (short) getLongValue(section, fld.getName(), (long) (short) defaultFldValue, - comment); - fld.set(obj, v); - } else if (fld.getType().equals(int.class)) { - int v = (int) getLongValue(section, fld.getName(), (long) (int) defaultFldValue, - comment); - fld.set(obj, v); - } else if (fld.getType().equals(long.class)) { - long v = getLongValue(section, fld.getName(), (long) defaultFldValue, comment); - fld.set(obj, v); - } else { - throw new RuntimeException("unhandled type " + fld.getType().getName()); - } - } else if (fld.getType().equals(String.class)) { - String v = getStringValue(section, fld.getName(), (String) defaultFldValue, comment); - fld.set(obj, v); - } else if (fld.getType().isEnum()) { - String sv = getStringValue(section, fld.getName(), defaultFldValue.toString(), comment); - @SuppressWarnings({"unchecked", "rawtypes"}) - Object v = Enum.valueOf((Class) fld.getType(), sv); - fld.set(obj, v); - // TODO: this is a hack and needs to be resolved later - } else if (fld.getType().equals(UniqueId.class)) { - String sv = getStringValue(section, fld.getName(), defaultFldValue.toString(), comment); - Object v; - try { - v = UniqueId.fromHexString(sv); - } catch (IllegalArgumentException e) { - System.err.println( - section + "." + fld.getName() + "'s format (" + sv + ") is invalid, default to " - + defaultFldValue.toString()); - v = defaultFldValue; - } - fld.set(obj, v); - } else if (fld.getType().isArray()) { - Class ccls = fld.getType().getComponentType(); - String ss = getStringValue(section, fld.getName(), null, comment); - if (null == ss) { - fld.set(obj, defaultFldValue); - } else { - Vector ls = StringUtil.split(ss, splitters, "", ""); - if (ccls.equals(boolean.class)) { - boolean[] v = ObjectUtil - .toBooleanArray(ls.stream().map(Boolean::parseBoolean).toArray()); - fld.set(obj, v); - } else if (ccls.equals(double.class)) { - double[] v = ls.stream().mapToDouble(Double::parseDouble).toArray(); - fld.set(obj, v); - } else if (ccls.equals(int.class)) { - int[] v = ls.stream().mapToInt(Integer::parseInt).toArray(); - fld.set(obj, v); - } else if (ccls.equals(long.class)) { - long[] v = ls.stream().mapToLong(Long::parseLong).toArray(); - fld.set(obj, v); - } else if (ccls.equals(String.class)) { - String[] v; - if (StringUtil.isNullOrEmpty(defaultArrayIndirectSectionName)) { - v = ls.toArray(new String[] {}); - } else { - v = this - .getIndirectStringArray(section, fld.getName(), - defaultArrayIndirectSectionName, - comment); - } - fld.set(obj, v); - } else { - throw new RuntimeException( - "Array with component type " + ccls.getName() + " is not supported yet"); - } - } - } else { - Object fldObj = ObjectUtil.newObject(fld.getType()); - fld.set(obj, fldObj); - readObject(section + "." + fld.getName(), fldObj, defaultFldValue); - } - } catch (IllegalArgumentException | IllegalAccessException e) { - throw new RuntimeException("set fld " + fld.getName() + " failed, err = " + e.getMessage(), - e); - } - } - } - -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigSection.java b/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigSection.java deleted file mode 100644 index fafb6fc9a..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/config/ConfigSection.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.ray.runtime.util.config; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * A configuration section of related items. - */ -public class ConfigSection { - - public final Map> itemMap = new ConcurrentHashMap<>(); - public String sectionKey; -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/config/CurrentUseConfig.java b/java/runtime/src/main/java/org/ray/runtime/util/config/CurrentUseConfig.java deleted file mode 100644 index aca7ae589..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/config/CurrentUseConfig.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.ray.runtime.util.config; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * The configuration which is currently in use. - */ -public class CurrentUseConfig { - - public final Map sectionMap = new ConcurrentHashMap<>(); - public String filePath; - -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java b/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java index da57f718b..efc38d1db 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java @@ -1,6 +1,5 @@ package org.ray.runtime.util.logger; -import org.ray.runtime.util.SystemUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,24 +20,8 @@ public class RayLog { */ public static Logger rapp; - /** - * Initialize loggers - * @param logDir directory of the log files. - */ - public static void init(String logDir) { - String loggingPath = System.getProperty("logging.path"); - if (loggingPath == null) { - System.setProperty("logging.path", logDir); - } - String loggingFileName = System.getProperty("logging.file.name"); - if (loggingFileName != null && loggingFileName.contains("*pid_suffix*")) { - loggingFileName = loggingFileName.replaceAll("\\*pid_suffix\\*", - String.valueOf(SystemUtil.pid())); - System.setProperty("logging.file.name", loggingFileName); - } - + public static void init() { core = LoggerFactory.getLogger("core"); - rapp = core; } } diff --git a/java/runtime/src/main/resources/log4j.properties b/java/runtime/src/main/resources/log4j.properties index d371e62d2..2218efef6 100644 --- a/java/runtime/src/main/resources/log4j.properties +++ b/java/runtime/src/main/resources/log4j.properties @@ -1,20 +1,17 @@ -# define default properties here -logging.level=WARN -logging.path=./run/logs -logging.file.name=core -logging.max.log.file.num=10 -logging.max.log.file.size=500MB +ray.logging.level=INFO -log4j.rootLogger=${logging.level}, stdout, core +ray.logging.stdout=org.apache.log4j.ConsoleAppender +ray.logging.file=org.apache.log4j.varia.NullAppender -log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.rootLogger=${ray.logging.level}, stdout, file + +log4j.appender.stdout=${ray.logging.stdout} log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n -log4j.appender.core=org.apache.log4j.RollingFileAppender -log4j.appender.core.File=${logging.path}/${logging.file.name}.log -log4j.appender.core.Append=true -log4j.appender.core.MaxFileSize=${logging.max.log.file.size} -log4j.appender.core.MaxBackupIndex=${logging.max.log.file.num} -log4j.appender.core.layout=org.apache.log4j.PatternLayout -log4j.appender.core.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n +# Set the file appender to null by default. If `ray.redirect-output` config is set to true, +# this appender will be set to a real file appender. +log4j.appender.file=${ray.logging.file} +log4j.appender.file.File=${ray.logging.file.path} +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf new file mode 100644 index 000000000..c20d679a9 --- /dev/null +++ b/java/runtime/src/main/resources/ray.default.conf @@ -0,0 +1,79 @@ +// This file contains default values of all Ray configurations. +// Users should define their own 'ray.conf' file in the classpath, +// or use Java properties, to overwrite these values. + +ray { + // ---------------------- + // Basic configurations + // ---------------------- + + // This is the path to the directory where Ray is installed, e.g., + // something like /home/ubmutu/ray. This can be an absolute path or + // a relative path from the current working directory. + home: "" + + // IP of this node. if not provided, IP will be automatically detected. + node-ip: "" + + // Run mode, available options are: + // + // `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend, + // object store, and GCS. It's useful for debug. + // `CLUSTER`: Ray is running on one or more nodes, with multiple processes. + run-mode: CLUSTER + + // Available resources on this node, for example "CPU:4,GPU:0". + resources: "" + + // If worker.mode is DRIVER, specify the driver id. + // If not provided, a random id will be used. + driver.id: "" + + // Root dir of log files. + log-dir: /tmp/ray/logs + + // If true, output of worker processes will be redirected to log files. + // Otherwise, output will be printed to console. + redirect-output: true + + // Custom `java.library.path` + // Note, do not use `dir1:dir2` format, put each dir as a list item. + library.path: [] + + // Custom classpath. + // Note, do not use `dir1:dir2` format, put each dir as a list item. + classpath = [] + + // ---------------------- + // Redis configurations + // ---------------------- + redis { + // The address of the redis server to connect, in format `ip:port`. + // If not provided, Ray processes will be started locally, including + // Redis server, Raylet and object store. + address: "" + // If `redis.server` isn't provided, which port we should use to start redis server. + head-port: 6379 + // If `redis.server` isn't provided, how many Redis shards we should start in addition to the + // primary Redis shard. The ports of these shards will be `head-port + 1`, `head-port + 2`, etc. + shard-number: 1 + } + + // ---------------------------- + // Object store configurations + // ---------------------------- + object-store { + // RPC socket name of object store + socket-name: /tmp/ray/sockets/object_store + // Initial size of the object store. + size: 10 MB + } + + // ---------------------------- + // Raylet configurations + // ---------------------------- + raylet { + // RPC socket name of Raylet + socket-name: /tmp/ray/sockets/raylet + } +} diff --git a/java/test/pom.xml b/java/test/pom.xml index 0fde0541b..d2031537f 100644 --- a/java/test/pom.xml +++ b/java/test/pom.xml @@ -51,14 +51,6 @@ maven-surefire-plugin 2.21.0 - - ${basedir}/../ray.config.ini - - -ea - -Djava.library.path=${basedir}/../../build/src/plasma:${basedir}/../../build/src/local_scheduler - -noverify - -DlogOutput=console - ${basedir}/src/main/java/ ${project.build.directory}/classes/ diff --git a/java/test/run/logs/core.log b/java/test/run/logs/core.log deleted file mode 100644 index e69de29bb..000000000 diff --git a/java/test/src/main/java/org/ray/api/test/RayConfigTest.java b/java/test/src/main/java/org/ray/api/test/RayConfigTest.java new file mode 100644 index 000000000..f7b829bec --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/RayConfigTest.java @@ -0,0 +1,20 @@ +package org.ray.api.test; + +import org.junit.Assert; +import org.junit.Test; +import org.ray.runtime.config.RayConfig; +import org.ray.runtime.config.RunMode; +import org.ray.runtime.config.WorkerMode; + +public class RayConfigTest { + + @Test + public void testCreateRayConfig() { + System.setProperty("ray.home", "/path/to/ray"); + RayConfig rayConfig = RayConfig.create(); + + Assert.assertEquals("/path/to/ray", rayConfig.rayHome); + Assert.assertEquals(WorkerMode.DRIVER, rayConfig.workerMode); + Assert.assertEquals(RunMode.CLUSTER, rayConfig.runMode); + } +} diff --git a/java/test/src/main/java/org/ray/api/test/TestListener.java b/java/test/src/main/java/org/ray/api/test/TestListener.java index 323b1eb74..3fb16bf4f 100644 --- a/java/test/src/main/java/org/ray/api/test/TestListener.java +++ b/java/test/src/main/java/org/ray/api/test/TestListener.java @@ -9,6 +9,8 @@ public class TestListener extends RunListener { @Override public void testRunStarted(Description description) { + System.setProperty("ray.home", "../.."); + System.setProperty("ray.resources", "CPU:4"); Ray.init(); } diff --git a/java/tutorial/README.rst b/java/tutorial/README.rst index 0e980bc1c..e099ad25b 100644 --- a/java/tutorial/README.rst +++ b/java/tutorial/README.rst @@ -7,12 +7,12 @@ Ray Java Tutorial Exercises --------- -Each file ``java/example/src/main/java/org/ray/exercise/Exercise*.java`` is a separate exercise. -To run a exercise case, set the ``RAY_CONFIG`` env variable and run the following command in ``ray/java/`` directory. +Each file of ``java/example/src/main/java/org/ray/exercise/Exercise*.java`` is a separate exercise. +To run them, execute the following command under ``ray/java`` folder. .. code-block:: shell - java -Djava.library.path=../build/src/plasma/:../build/src/local_scheduler/ -classpath "tutorial/target/ray-tutorial-1.0.jar:tutorial/lib/*" org.ray.exercise.Exercise01 + java -Dray.home=.. -classpath "tutorial/target/ray-tutorial-1.0.jar:tutorial/lib/*" org.ray.exercise.Exercise01 `Exercise 1 `_: Define a remote function, and execute multiple remote functions in parallel. diff --git a/java/tutorial/src/main/resources/ray.conf b/java/tutorial/src/main/resources/ray.conf new file mode 100644 index 000000000..a6cdf42f8 --- /dev/null +++ b/java/tutorial/src/main/resources/ray.conf @@ -0,0 +1,5 @@ +ray{ + home: ".." + run-mode: CLUSTER + redirect-output: false +}