From fa33ea5283fc05868ff60e8f368bb3d9a347f39d Mon Sep 17 00:00:00 2001
From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com>
Date: Tue, 10 Jul 2018 01:20:41 +0800
Subject: [PATCH] [Java] Java worker cluster support (#2359)
---
java/cli/assembly.xml | 15 +
java/cli/pom.xml | 98 ++++++
.../main/java/org/ray/cli/CommandStart.java | 21 ++
.../main/java/org/ray/cli/CommandStop.java | 11 +
.../main/java/org/ray/cli/CommandSubmit.java | 27 ++
.../cli/src/main/java/org/ray/cli/RayCli.java | 290 ++++++++++++++++++
.../src/main/java/org/ray/cli/RayCliArgs.java | 12 +
.../main/java/org/ray/example/HelloWorld.java | 8 +-
java/pom.xml | 10 +-
java/prepare.sh | 102 ++++++
java/ray.config.ini | 8 +-
java/run.sh | 8 +
.../main/java/org/ray/runner/RunManager.java | 11 +-
.../org/ray/runner/worker/DefaultDriver.java | 6 +-
.../org/ray/spi/impl/StateStoreProxyImpl.java | 22 +-
java/test_cluster.sh | 31 ++
16 files changed, 661 insertions(+), 19 deletions(-)
create mode 100644 java/cli/assembly.xml
create mode 100644 java/cli/pom.xml
create mode 100644 java/cli/src/main/java/org/ray/cli/CommandStart.java
create mode 100644 java/cli/src/main/java/org/ray/cli/CommandStop.java
create mode 100644 java/cli/src/main/java/org/ray/cli/CommandSubmit.java
create mode 100644 java/cli/src/main/java/org/ray/cli/RayCli.java
create mode 100644 java/cli/src/main/java/org/ray/cli/RayCliArgs.java
create mode 100755 java/prepare.sh
create mode 100755 java/run.sh
create mode 100755 java/test_cluster.sh
diff --git a/java/cli/assembly.xml b/java/cli/assembly.xml
new file mode 100644
index 000000000..1684a11d6
--- /dev/null
+++ b/java/cli/assembly.xml
@@ -0,0 +1,15 @@
+
+ ear
+
+ zip
+
+
+
+ true
+ lib
+
+
+
+
+
+
diff --git a/java/cli/pom.xml b/java/cli/pom.xml
new file mode 100644
index 000000000..d8d11455d
--- /dev/null
+++ b/java/cli/pom.xml
@@ -0,0 +1,98 @@
+
+
+
+
+ org.ray.parent
+ ray-superpom
+ 1.0
+
+ 4.0.0
+ org.ray
+ ray-cli
+
+ java cli for ray
+ java cli for ray
+
+
+ jar
+
+
+
+ org.ray
+ ray-api
+ ${project.version}
+
+
+ org.ray
+ ray-runtime-native
+ ${project.version}
+
+
+ de.ruedigermoeller
+ fst
+
+
+
+ com.github.davidmoten
+ flatbuffers-java
+
+
+
+ redis.clients
+ jedis
+
+
+
+
+ com.beust
+ jcommander
+
+
+
+
+ ray-cli
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ assembly.xml
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${basedir}/lib
+ false
+ false
+ true
+
+
+
+
+
+
+
+
+
diff --git a/java/cli/src/main/java/org/ray/cli/CommandStart.java b/java/cli/src/main/java/org/ray/cli/CommandStart.java
new file mode 100644
index 000000000..218caf1be
--- /dev/null
+++ b/java/cli/src/main/java/org/ray/cli/CommandStart.java
@@ -0,0 +1,21 @@
+package org.ray.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+/**
+ * Arguments for command start.
+ */
+@Parameters(separators = "= ", commandDescription = "start ray daemons")
+public class CommandStart {
+
+ @Parameter(names = "--head", description = "start the head node")
+ public boolean head;
+
+ @Parameter(names = "--config", description = "the config file of ray")
+ public String config = "";
+
+ @Parameter(names = "--overwrite", description = "the overwrite items of config")
+ public String overwrite = "";
+
+}
diff --git a/java/cli/src/main/java/org/ray/cli/CommandStop.java b/java/cli/src/main/java/org/ray/cli/CommandStop.java
new file mode 100644
index 000000000..818c28e00
--- /dev/null
+++ b/java/cli/src/main/java/org/ray/cli/CommandStop.java
@@ -0,0 +1,11 @@
+package org.ray.cli;
+
+import com.beust.jcommander.Parameters;
+
+/**
+ * Arguments for command stop.
+ */
+@Parameters(separators = "= ", commandDescription = "stop ray daemons")
+public class CommandStop {
+
+}
diff --git a/java/cli/src/main/java/org/ray/cli/CommandSubmit.java b/java/cli/src/main/java/org/ray/cli/CommandSubmit.java
new file mode 100644
index 000000000..cdef011ce
--- /dev/null
+++ b/java/cli/src/main/java/org/ray/cli/CommandSubmit.java
@@ -0,0 +1,27 @@
+package org.ray.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+/**
+ * Arguments for command submit.
+ */
+@Parameters(separators = "= ", commandDescription = "submit a job to ray cluster")
+public class CommandSubmit {
+
+ @Parameter(names = "--package", description = "java jar package zip file", required = true)
+ public String packageZip;
+
+ @Parameter(names = "--class", description = "java class name", required = true)
+ public String className;
+
+ @Parameter(names = "--args", description = "arguments for the java class")
+ public String classArgs;
+
+ @Parameter(names = "--config", description = "the config file of ray")
+ public String config;
+
+ @Parameter(names = "--redis-address", description = "ip:port for redis service", required = true)
+ public String redisAddress;
+
+}
diff --git a/java/cli/src/main/java/org/ray/cli/RayCli.java b/java/cli/src/main/java/org/ray/cli/RayCli.java
new file mode 100644
index 000000000..d6becbf22
--- /dev/null
+++ b/java/cli/src/main/java/org/ray/cli/RayCli.java
@@ -0,0 +1,290 @@
+package org.ray.cli;
+
+import com.beust.jcommander.JCommander;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import net.lingala.zip4j.core.ZipFile;
+import net.lingala.zip4j.exception.ZipException;
+import org.ray.api.UniqueID;
+import org.ray.cli.CommandStart;
+import org.ray.cli.CommandStop;
+import org.ray.core.RayRuntime;
+import org.ray.core.model.RayParameters;
+import org.ray.core.model.RunMode;
+import org.ray.runner.RunInfo;
+import org.ray.runner.RunManager;
+import org.ray.runner.worker.DefaultDriver;
+import org.ray.spi.KeyValueStoreLink;
+import org.ray.spi.PathConfig;
+import org.ray.spi.RemoteFunctionManager;
+import org.ray.spi.StateStoreProxy;
+import org.ray.spi.impl.NativeRemoteFunctionManager;
+import org.ray.spi.impl.RedisClient;
+import org.ray.spi.impl.StateStoreProxyImpl;
+import org.ray.util.FileUtil;
+import org.ray.util.config.ConfigReader;
+import org.ray.util.logger.RayLog;
+
+
+/**
+ * Ray command line interface.
+ */
+public class RayCli {
+
+ private static RayCliArgs rayArgs = new RayCliArgs();
+
+ private static RunManager startRayHead(RayParameters params, PathConfig paths,
+ ConfigReader configReader) {
+ RunManager manager = new RunManager(params, paths, configReader);
+
+ try {
+ manager.startRayHead();
+ } catch (Exception e) {
+ e.printStackTrace();
+ RayLog.core.error("error at RayCli startRayHead", e);
+ throw new RuntimeException("Ray head node start failed", e);
+ }
+
+ RayLog.core.info("Started Ray head node. Redis address: " + manager.info().redisAddress);
+ return manager;
+ }
+
+ private static RunManager startRayNode(RayParameters params, PathConfig paths,
+ ConfigReader configReader) {
+ RunManager manager = new RunManager(params, paths, configReader);
+
+ try {
+ manager.startRayNode();
+ } catch (Exception e) {
+ e.printStackTrace();
+ RayLog.core.error("error at RayCli startRayNode", e);
+ throw new RuntimeException("Ray work node start failed, err = " + e.getMessage());
+ }
+
+ RayLog.core.info("Started Ray work node.");
+ return manager;
+ }
+
+ private static RunManager startProcess(CommandStart cmdStart, ConfigReader config) {
+ PathConfig paths = new PathConfig(config);
+ RayParameters params = new RayParameters(config);
+
+ RayLog.core.info("Using IP address " + params.node_ip_address + " for this node.");
+ RunManager manager;
+ if (cmdStart.head) {
+ manager = startRayHead(params, paths, config);
+ } else {
+ manager = startRayNode(params, paths, config);
+ }
+ return manager;
+ }
+
+ private static void start(CommandStart cmdStart, ConfigReader reader) {
+ startProcess(cmdStart, reader);
+ }
+
+ private static void stop(CommandStop cmdStop) {
+ String[] cmd = {"/bin/sh", "-c", ""};
+
+ cmd[2] = "killall global_scheduler local_scheduler plasma_store plasma_manager";
+ try {
+ Runtime.getRuntime().exec(cmd);
+ } catch (IOException e) {
+ RayLog.core.warn("exception in killing ray processes");
+ }
+
+ cmd[2] = "kill $(ps aux | grep redis-server | grep -v grep | "
+ + "awk \'{ print $2 }\') 2> /dev/null";
+ try {
+ Runtime.getRuntime().exec(cmd);
+ } catch (IOException e) {
+ RayLog.core.warn("exception in killing ray processes");
+ }
+
+ cmd[2] = "kill -9 $(ps aux | grep DefaultWorker | grep -v grep | "
+ + "awk \'{ print $2 }\') 2> /dev/null";
+ try {
+ Runtime.getRuntime().exec(cmd);
+ } catch (IOException e) {
+ RayLog.core.warn("exception in killing ray processes");
+ }
+ }
+
+ private static String[] buildRayRuntimeArgs(CommandSubmit cmdSubmit) {
+
+ if (cmdSubmit.redisAddress == null) {
+ throw new RuntimeException(
+ "--redis-address must be specified to submit a job");
+ }
+
+ List argList = new ArrayList();
+ String section = "ray.java.start.";
+ String overwrite = "--overwrite="
+ + section + "redis_address=" + cmdSubmit.redisAddress + ";"
+ + section + "run_mode=" + "CLUSTER";
+
+ argList.add(overwrite);
+
+ if (cmdSubmit.config != null) {
+ String config = "--config=" + cmdSubmit.config;
+ argList.add(config);
+ }
+
+ String[] args = new String[argList.size()];
+ argList.toArray(args);
+
+ return args;
+ }
+
+ private static void submit(CommandSubmit cmdSubmit, String configPath) throws Exception {
+ ConfigReader config = new ConfigReader(configPath, "ray.java.start.deploy=true");
+ PathConfig paths = new PathConfig(config);
+ RayParameters params = new RayParameters(config);
+
+ params.redis_address = cmdSubmit.redisAddress;
+ params.run_mode = RunMode.CLUSTER;
+
+
+ KeyValueStoreLink kvStore = new RedisClient();
+ kvStore.setAddr(cmdSubmit.redisAddress);
+ StateStoreProxy stateStoreProxy = new StateStoreProxyImpl(kvStore);
+ stateStoreProxy.initializeGlobalState();
+
+ RemoteFunctionManager functionManager = new NativeRemoteFunctionManager(kvStore);
+
+ // Register app to Redis.
+ byte[] zip = FileUtil.fileToBytes(cmdSubmit.packageZip);
+
+ String packageName = cmdSubmit.packageZip.substring(
+ cmdSubmit.packageZip.lastIndexOf('/') + 1,
+ cmdSubmit.packageZip.lastIndexOf('.'));
+
+ //final RemoteFunctionManager functionManager = RayRuntime
+ // .getInstance().getRemoteFunctionManager();
+
+ UniqueID resourceId = functionManager.registerResource(zip);
+ RayLog.rapp.debug(
+ "registerResource " + resourceId + " for package " + packageName + " done");
+
+ UniqueID appId = params.driver_id;
+ functionManager.registerApp(appId, resourceId);
+ RayLog.rapp.debug("registerApp " + appId + " for resouorce " + resourceId + " done");
+
+ // Unzip the package file.
+ String appDir = "/tmp/" + cmdSubmit.className;
+ String extPath = appDir + "/" + packageName;
+ if (!FileUtil.createDir(extPath, false)) {
+ throw new RuntimeException("create dir " + extPath + " failed ");
+ }
+
+ ZipFile zipFile = new ZipFile(cmdSubmit.packageZip);
+ zipFile.extractAll(extPath);
+
+ // Build the args for driver process.
+ File originDirFile = new File(extPath);
+ File[] topFiles = originDirFile.listFiles();
+ String topDir = null;
+ for (File file : topFiles) {
+ if (file.isDirectory()) {
+ topDir = file.getName();
+ }
+ }
+ RayLog.rapp.debug("topDir of app classes: " + topDir);
+ if (topDir == null) {
+ RayLog.rapp.error("Can't find topDir of app classes, the app directory " + appDir);
+ return;
+ }
+
+ String additionalClassPath = appDir + "/" + packageName + "/" + topDir + "/*";
+ RayLog.rapp.debug("Find app class path " + additionalClassPath);
+
+ // Start driver process.
+ //RunManager runManager = new RunManager(params, RayRuntime.getInstance().getPaths(),
+ // RayRuntime.configReader);
+ RunManager runManager = new RunManager(params, paths, config);
+ Process proc = runManager.startDriver(
+ DefaultDriver.class.getName(),
+ cmdSubmit.redisAddress,
+ appId,
+ appDir,
+ params.node_ip_address,
+ cmdSubmit.className,
+ cmdSubmit.classArgs,
+ additionalClassPath,
+ null);
+
+ if (null == proc) {
+ RayLog.rapp.error(
+ "Create process for app " + packageName + " in local directory " + appDir
+ + " failed");
+ return;
+ }
+
+ RayLog.rapp
+ .info("Create app " + appDir + " for package " + packageName + " succeeded");
+ }
+
+ private static String getConfigPath(String config) {
+ String configPath;
+
+ if (config != null && !config.equals("")) {
+ configPath = config;
+ } else {
+ configPath = System.getenv("RAY_CONFIG");
+ if (configPath == null) {
+ configPath = System.getProperty("ray.config");
+ }
+ if (configPath == null) {
+ throw new RuntimeException(
+ "Please set config file path in env RAY_CONFIG or property ray.config");
+ }
+ }
+ return configPath;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ CommandStart cmdStart = new CommandStart();
+ CommandStop cmdStop = new CommandStop();
+ CommandSubmit cmdSubmit = new CommandSubmit();
+ JCommander rayCommander = JCommander.newBuilder().addObject(rayArgs)
+ .addCommand("start", cmdStart)
+ .addCommand("stop", cmdStop)
+ .addCommand("submit", cmdSubmit)
+ .build();
+ rayCommander.parse(args);
+
+ if (rayArgs.help) {
+ rayCommander.usage();
+ System.exit(0);
+ }
+
+ String cmd = rayCommander.getParsedCommand();
+ if (cmd == null) {
+ rayCommander.usage();
+ System.exit(0);
+ }
+
+ String configPath;
+ switch (cmd) {
+ case "start": {
+ configPath = getConfigPath(cmdStart.config);
+ ConfigReader config = new ConfigReader(configPath, cmdStart.overwrite);
+ start(cmdStart, config);
+ }
+ break;
+ case "stop":
+ stop(cmdStop);
+ break;
+ case "submit":
+ configPath = getConfigPath(cmdSubmit.config);
+ submit(cmdSubmit, configPath);
+ break;
+ default:
+ rayCommander.usage();
+ }
+ }
+
+}
diff --git a/java/cli/src/main/java/org/ray/cli/RayCliArgs.java b/java/cli/src/main/java/org/ray/cli/RayCliArgs.java
new file mode 100644
index 000000000..d5d7bafec
--- /dev/null
+++ b/java/cli/src/main/java/org/ray/cli/RayCliArgs.java
@@ -0,0 +1,12 @@
+package org.ray.cli;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Arguments for Ray cli.
+ */
+public class RayCliArgs {
+
+ @Parameter(names = {"-h", "-help", "--help"}, description = "print this usage", help = true)
+ public boolean help;
+}
diff --git a/java/example/src/main/java/org/ray/example/HelloWorld.java b/java/example/src/main/java/org/ray/example/HelloWorld.java
index 632ea5517..2335b0a54 100644
--- a/java/example/src/main/java/org/ray/example/HelloWorld.java
+++ b/java/example/src/main/java/org/ray/example/HelloWorld.java
@@ -33,6 +33,12 @@ public class HelloWorld implements Serializable {
public static void main(String[] args) throws Exception {
try {
Ray.init();
+
+ RayLog.rapp.info("HelloWorld.main() has " + args.length + " args");
+ for (String arg: args) {
+ RayLog.rapp.info("arg: " + arg);
+ }
+
String helloWorld = HelloWorld.sayHelloWorld();
RayLog.rapp.info(helloWorld);
assert helloWorld.equals("hello,world!");
@@ -41,8 +47,6 @@ public class HelloWorld implements Serializable {
} finally {
RayRuntime.getInstance().cleanUp();
}
-
-
}
public static String sayHelloWorld() {
diff --git a/java/pom.xml b/java/pom.xml
index 97da98a1d..36fbaa370 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -15,6 +15,7 @@
runtime-common
runtime-native
runtime-dev
+ cli
test
example
@@ -22,7 +23,7 @@
1.8
UTF-8
- 1.0
+ 1.0
@@ -69,6 +70,13 @@
1.7.0.1
+
+
+ com.beust
+ jcommander
+ 1.72
+
+
redis.clients
diff --git a/java/prepare.sh b/java/prepare.sh
new file mode 100755
index 000000000..d59b92b09
--- /dev/null
+++ b/java/prepare.sh
@@ -0,0 +1,102 @@
+#!/bin/bash
+
+function usage() {
+ echo " -t|--target-dir local target directory for prepare a Ray cluster deployment package"
+ echo " [-s|--source-dir] local source directory to prepare a Ray cluster deployment package"
+}
+
+while [ $# -gt 0 ];do
+ key=$1
+ case $key in
+ -h|--help)
+ usage
+ exit 0
+ ;;
+ -s|--source-dir)
+ ray_dir=$2
+ shift 2
+ ;;
+ -t|--target-dir)
+ t_dir=$2
+ shift 2
+ ;;
+ *)
+ echo "ERROR: unknown option $key"
+ echo
+ usage
+ exit -1
+ ;;
+ esac
+done
+
+realpath() {
+ [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}"
+}
+
+if [ -z $ray_dir ];then
+ scripts_path=`realpath $0`
+ ray_dir=`dirname $scripts_path`
+ ray_dir=`dirname $ray_dir`
+fi
+
+# echo "ray_dir = $ray_dir"
+
+declare -a nativeBinaries=(
+ "./src/common/thirdparty/redis/src/redis-server"
+ "./src/plasma/plasma_store"
+ "./src/plasma/plasma_manager"
+ "./src/local_scheduler/local_scheduler"
+ "./src/global_scheduler/global_scheduler"
+)
+
+declare -a nativeLibraries=(
+ "./src/common/redis_module/libray_redis_module.so"
+ "./src/local_scheduler/liblocal_scheduler_library_java.*"
+ "./src/plasma/libplasma_java.*"
+)
+
+declare -a javaBinaries=(
+ "api"
+ "common"
+ "worker"
+ "test"
+)
+
+function prepare_source()
+{
+ if [ -z $t_dir ];then
+ echo "--target-dir not specified"
+ usage
+ exit -1
+ fi
+
+ # prepare native components under /ray/native/bin
+ mkdir -p $t_dir"/ray/native/bin/"
+ for i in "${!nativeBinaries[@]}"
+ do
+ cp $ray_dir/build/${nativeBinaries[$i]} $t_dir/ray/native/bin/
+ done
+
+ # prepare native libraries under /ray/native/lib
+ mkdir -p $t_dir"/ray/native/lib/"
+ for i in "${!nativeLibraries[@]}"
+ do
+ cp $ray_dir/build/${nativeLibraries[$i]} $t_dir/ray/native/lib/
+ done
+
+ # prepare java components under /ray/java/lib
+ mkdir -p $t_dir"/ray/java/lib/"
+ unzip -q $ray_dir/java/cli/target/ray-cli-ear.zip -d $ray_dir/java
+ cp $ray_dir/java/ray-cli/lib/* $t_dir/ray/java/lib/
+ rm -rf $ray_dir/java/ray-cli
+
+ cp -rf $ray_dir/java/ray.config.ini $t_dir/ray/
+
+ # prepare java apps directory
+ mkdir -p $t_dir"/ray/java/apps/"
+
+ # prepare run.sh
+ cp $ray_dir/java/run.sh $t_dir/
+}
+
+prepare_source
diff --git a/java/ray.config.ini b/java/ray.config.ini
index d7e0cc8bf..726298518 100644
--- a/java/ray.config.ini
+++ b/java/ray.config.ini
@@ -72,11 +72,15 @@ max_java_log_file_size = 500MB
onebox_delay_seconds_before_run_app_logic = 0
-[ray.java.start.job]
-
; java class which main is served as the driver in a java worker
driver_class =
+; arguments for the java class main function which is served at the driver
+; the arguments are separated by ','
+driver_args =
+
+[ray.java.start.job]
+
[ray.java.path.classes.source]
%CONFIG_FILE_DIR%/common/target/classes =
%CONFIG_FILE_DIR%/common/target/test-classes =
diff --git a/java/run.sh b/java/run.sh
new file mode 100755
index 000000000..7cfc8d75e
--- /dev/null
+++ b/java/run.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+scripts_dir=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
+cd $scripts_dir
+
+export RAY_CONFIG=$scripts_dir/ray/ray.config.ini
+export LD_LIBRARY_PATH=$scripts_dir/ray/native/lib:$LD_LIBRARY_PATH
+java -ea -classpath ray/java/lib/*:ray/java/lib/commons-cli-1.3.1.jar org.ray.cli.RayCli "$@"
+
diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java
index 6c4123c56..8a25923ea 100644
--- a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java
+++ b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java
@@ -110,10 +110,14 @@ public class RunManager {
public Process startDriver(String mainClass, String redisAddress, UniqueID driverId,
String workDir, String ip,
- String driverClass, String additonalClassPaths, String
- additionalConfigs) {
+ String driverClass, String driverArgs, String additonalClassPaths,
+ String additionalConfigs) {
String driverConfigs =
"ray.java.start.driver_id=" + driverId + ";ray.java.start.driver_class=" + driverClass;
+ if (driverArgs != null) {
+ driverConfigs += ";ray.java.start.driver_args=" + driverArgs;
+ }
+
if (null != additionalConfigs) {
additionalConfigs += ";" + driverConfigs;
} else {
@@ -175,7 +179,8 @@ public class RunManager {
+ section + "redis_address=" + redisAddr + ";"
+ section + "working_directory=" + workDir + ";"
+ section + "logging_directory=" + params.logging_directory + ";"
- + section + "working_directory=" + workDir;
+ + section + "working_directory=" + workDir + ";"
+ + section + "run_mode=" + params.run_mode;
if (additionalConfigs.length() > 0) {
cmd += ";" + additionalConfigs;
diff --git a/java/runtime-native/src/main/java/org/ray/runner/worker/DefaultDriver.java b/java/runtime-native/src/main/java/org/ray/runner/worker/DefaultDriver.java
index d8cdde446..2eeb30439 100644
--- a/java/runtime-native/src/main/java/org/ray/runner/worker/DefaultDriver.java
+++ b/java/runtime-native/src/main/java/org/ray/runner/worker/DefaultDriver.java
@@ -21,8 +21,12 @@ public class DefaultDriver {
String driverClass = RayRuntime.configReader
.getStringValue("ray.java.start", "driver_class", "",
"java class which main is served as the driver in a java worker");
+ String driverArgs = RayRuntime.configReader
+ .getStringValue("ray.java.start", "driver_args", "",
+ "arguments for the java class main function which is served at the driver");
Class> cls = Class.forName(driverClass);
- cls.getMethod("main", String[].class).invoke(null, (Object) new String[] {});
+ String[] argsArray = (driverArgs != null) ? driverArgs.split(",") : (new String[] {});
+ cls.getMethod("main", String[].class).invoke(null, (Object) argsArray);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java b/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java
index 907d3d059..d6fd6a211 100644
--- a/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java
+++ b/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java
@@ -80,6 +80,8 @@ public class StateStoreProxyImpl implements StateStoreProxy {
return getAddressInfoHelper(nodeIpAddress);
} catch (Exception e) {
try {
+ RayLog.core.warn("Error occurred in StateStoreProxyImpl getAddressInfo, "
+ + (numRetries - count) + " retries remaining", e);
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException ie) {
RayLog.core.error("error at StateStoreProxyImpl getAddressInfo", e);
@@ -100,7 +102,7 @@ public class StateStoreProxyImpl implements StateStoreProxy {
* The hash contains the following:
* "deleted" : 0/1
* "ray_client_id"
- * "nodeIpAddress"
+ * "node_ip_address"
* "client_type" : plasma_manager/local_scheduler
* "store_socket_name"(op)
* "manager_socket_name"(op)
@@ -129,27 +131,27 @@ public class StateStoreProxyImpl implements StateStoreProxy {
if (!info.containsKey("ray_client_id".getBytes())) {
throw new Exception("no ray_client_id in any client");
- } else if (!info.containsKey("nodeIpAddress".getBytes())) {
- throw new Exception("no nodeIpAddress in any client");
+ } else if (!info.containsKey("node_ip_address".getBytes())) {
+ throw new Exception("no node_ip_address in any client");
} else if (!info.containsKey("client_type".getBytes())) {
throw new Exception("no client_type in any client");
}
-
- if (charsetDecode(info.get("nodeIpAddress".getBytes()), "US-ASCII")
+
+ if (charsetDecode(info.get("node_ip_address".getBytes()), "US-ASCII")
.equals(nodeIpAddress)) {
String clientType = charsetDecode(info.get("client_type".getBytes()), "US-ASCII");
- if (clientType.equals("plasmaManager")) {
+ if (clientType.equals("plasma_manager")) {
plasmaManager.add(info);
- } else if (clientType.equals("localScheduler")) {
+ } else if (clientType.equals("local_scheduler")) {
localScheduler.add(info);
}
}
}
if (plasmaManager.size() < 1 || localScheduler.size() < 1) {
- throw new Exception("no plasmaManager or localScheduler");
+ throw new Exception("no plasma_manager or local_scheduler");
} else if (plasmaManager.size() != localScheduler.size()) {
- throw new Exception("plasmaManager number not Equal localScheduler number");
+ throw new Exception("plasma_manager number not Equal local_scheduler number");
}
for (int i = 0; i < plasmaManager.size(); i++) {
@@ -173,7 +175,7 @@ public class StateStoreProxyImpl implements StateStoreProxy {
"US-ASCII");
si.managerPort = Integer.parseInt(managerAddr.split(":")[1]);
si.schedulerName = charsetDecode(
- localScheduler.get(i).get("local_scheduler_socket_name".getBytes()), "US-ASCII");
+ localScheduler.get(i).get("local_scheduler_socket_name".getBytes()), "US-ASCII");
rpc = localScheduler.get(i).get("local_scheduler_rpc_name".getBytes());
if (rpc != null) {
diff --git a/java/test_cluster.sh b/java/test_cluster.sh
new file mode 100755
index 000000000..2c1a23953
--- /dev/null
+++ b/java/test_cluster.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+#############################
+# build deploy file and deploy cluster
+sh cleanup.sh
+rm -rf local_deploy
+./prepare.sh -t local_deploy
+pushd local_deploy
+local_ip=`ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"`
+#echo "use local_ip" $local_ip
+OVERWRITE="ray.java.start.redis_port=34222;ray.java.start.node_ip_address=$local_ip;ray.java.start.deploy=true;ray.java.start.run_mode=CLUSTER"
+echo OVERWRITE is $OVERWRITE
+./run.sh start --head --overwrite=$OVERWRITE > cli.log 2>&1 &
+popd
+sleep 10
+
+# auto-pack zip for app example
+pushd example
+if [ ! -d "app1/" ];then
+ mkdir app1
+fi
+cp -rf target/ray-example-1.0.jar app1/
+zip -r app1.zip app1
+popd
+
+# run with cluster mode
+pushd local_deploy
+export RAY_CONFIG=ray/ray.config.ini
+ARGS=" --package ../example/app1.zip --class org.ray.example.HelloWorld --args=test1,test2 --redis-address=$local_ip:34222"
+../local_deploy/run.sh submit $ARGS
+popd