diff --git a/streaming/java/BUILD.bazel b/streaming/java/BUILD.bazel
index 4cc708c38..adb4e338a 100644
--- a/streaming/java/BUILD.bazel
+++ b/streaming/java/BUILD.bazel
@@ -101,6 +101,8 @@ define_java_module(
":org_ray_ray_streaming-api",
":org_ray_ray_streaming-runtime",
"@ray_streaming_maven//:com_google_guava_guava",
+ "@ray_streaming_maven//:de_ruedigermoeller_fst",
+ "@ray_streaming_maven//:org_aeonbits_owner_owner",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
"@ray_streaming_maven//:org_testng_testng",
@@ -113,6 +115,8 @@ define_java_module(
"@ray_streaming_maven//:com_github_davidmoten_flatbuffers_java",
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:com_google_protobuf_protobuf_java",
+ "@ray_streaming_maven//:de_ruedigermoeller_fst",
+ "@ray_streaming_maven//:org_aeonbits_owner_owner",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
],
diff --git a/streaming/java/dependencies.bzl b/streaming/java/dependencies.bzl
index 61fb7418b..b3b327a27 100644
--- a/streaming/java/dependencies.bzl
+++ b/streaming/java/dependencies.bzl
@@ -9,6 +9,7 @@ def gen_streaming_java_deps():
"com.github.davidmoten:flatbuffers-java:1.9.0.1",
"com.google.protobuf:protobuf-java:3.8.0",
"de.ruedigermoeller:fst:2.57",
+ "org.aeonbits.owner:owner:1.0.10",
"org.slf4j:slf4j-api:1.7.12",
"org.slf4j:slf4j-log4j12:1.7.25",
"org.apache.logging.log4j:log4j-core:2.8.2",
diff --git a/streaming/java/streaming-runtime/pom.xml b/streaming/java/streaming-runtime/pom.xml
index e908dc735..ff0841f1c 100755
--- a/streaming/java/streaming-runtime/pom.xml
+++ b/streaming/java/streaming-runtime/pom.xml
@@ -46,6 +46,16 @@
protobuf-java
3.8.0
+
+ de.ruedigermoeller
+ fst
+ 2.57
+
+
+ org.aeonbits.owner
+ owner
+ 1.0.10
+
org.slf4j
slf4j-api
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/Config.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/Config.java
new file mode 100644
index 000000000..bc185e44c
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/Config.java
@@ -0,0 +1,11 @@
+package org.ray.streaming.runtime.config;
+
+import java.io.Serializable;
+import javax.accessibility.Accessible;
+
+/**
+ * Basic config interface.
+ */
+public interface Config extends org.aeonbits.owner.Config, Accessible, Serializable {
+
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingConfig.java
new file mode 100644
index 000000000..a5faac393
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingConfig.java
@@ -0,0 +1,25 @@
+package org.ray.streaming.runtime.config;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Streaming config including general, master and worker part.
+ */
+public class StreamingConfig implements Serializable {
+
+ public StreamingMasterConfig masterConfig;
+ public StreamingWorkerConfig workerConfigTemplate;
+
+ public StreamingConfig(final Map conf) {
+ masterConfig = new StreamingMasterConfig(conf);
+ workerConfigTemplate = new StreamingWorkerConfig(conf);
+ }
+
+ public Map getMap() {
+ Map wholeConfigMap = masterConfig.configMap;
+ wholeConfigMap.putAll(workerConfigTemplate.configMap);
+ return wholeConfigMap;
+ }
+
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingGlobalConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingGlobalConfig.java
new file mode 100644
index 000000000..f10f265ac
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingGlobalConfig.java
@@ -0,0 +1,83 @@
+package org.ray.streaming.runtime.config;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import org.aeonbits.owner.Config.DefaultValue;
+import org.aeonbits.owner.Config.Key;
+import org.aeonbits.owner.ConfigFactory;
+import org.ray.streaming.runtime.config.global.CommonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming general config. May used by both JobMaster and JobWorker.
+ */
+public class StreamingGlobalConfig implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingGlobalConfig.class);
+
+ public final CommonConfig commonConfig;
+
+ public final Map configMap;
+
+ public StreamingGlobalConfig(final Map conf) {
+ configMap = new HashMap<>(conf);
+
+ commonConfig = ConfigFactory.create(CommonConfig.class, conf);
+ globalConfig2Map();
+ }
+
+ @Override
+ public String toString() {
+ return configMap.toString();
+ }
+
+ private void globalConfig2Map() {
+ try {
+ configMap.putAll(config2Map(this.commonConfig));
+ } catch (Exception e) {
+ LOG.error("Couldn't convert global config to a map.", e);
+ }
+ }
+
+ protected Map config2Map(org.aeonbits.owner.Config config)
+ throws ClassNotFoundException {
+ Map result = new HashMap<>();
+
+ Class> proxyClazz = Class.forName(config.getClass().getName());
+ Class>[] proxyInterfaces = proxyClazz.getInterfaces();
+
+ Class> configInterface = null;
+ for (Class> proxyInterface : proxyInterfaces) {
+ if (Config.class.isAssignableFrom(proxyInterface)) {
+ configInterface = proxyInterface;
+ break;
+ }
+ }
+ Preconditions.checkArgument(configInterface != null,
+ "Can not get config interface.");
+ Method[] methods = configInterface.getMethods();
+
+ for (Method method : methods) {
+ Key ownerKeyAnnotation = method.getAnnotation(Key.class);
+ String ownerKeyAnnotationValue;
+ if (ownerKeyAnnotation != null) {
+ ownerKeyAnnotationValue = ownerKeyAnnotation.value();
+ Object value;
+ try {
+ value = method.invoke(config);
+ } catch (Exception e) {
+ LOG.warn("Can not get value by method invoking for config key: {}. "
+ + "So use default value instead.", ownerKeyAnnotationValue);
+ String defaultValue = method.getAnnotation(DefaultValue.class).value();
+ value = defaultValue;
+ }
+ result.put(ownerKeyAnnotationValue, value + "");
+ }
+ }
+ return result;
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java
new file mode 100644
index 000000000..2d6c6629a
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java
@@ -0,0 +1,17 @@
+package org.ray.streaming.runtime.config;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming job master config.
+ */
+public class StreamingMasterConfig extends StreamingGlobalConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class);
+
+ public StreamingMasterConfig(final Map conf) {
+ super(conf);
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingWorkerConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingWorkerConfig.java
new file mode 100644
index 000000000..0eefe2656
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingWorkerConfig.java
@@ -0,0 +1,17 @@
+package org.ray.streaming.runtime.config;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming job worker specified config.
+ */
+public class StreamingWorkerConfig extends StreamingGlobalConfig {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkerConfig.class);
+
+ public StreamingWorkerConfig(final Map conf) {
+ super(conf);
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/global/CommonConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/global/CommonConfig.java
new file mode 100644
index 000000000..d5a07db27
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/global/CommonConfig.java
@@ -0,0 +1,28 @@
+package org.ray.streaming.runtime.config.global;
+
+import org.ray.streaming.runtime.config.Config;
+
+/**
+ * Job common config.
+ */
+public interface CommonConfig extends Config {
+
+ String JOB_ID = "streaming.job.id";
+ String JOB_NAME = "streaming.job.name";
+
+ /**
+ * Ray streaming job id. Non-custom.
+ * @return Job id with string type.
+ */
+ @DefaultValue(value = "default-job-id")
+ @Key(value = JOB_ID)
+ String jobId();
+
+ /**
+ * Ray streaming job name. Non-custom.
+ * @return Job name with string type.
+ */
+ @DefaultValue(value = "default-job-name")
+ @Key(value = JOB_NAME)
+ String jobName();
+}
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/BaseUnitTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/BaseUnitTest.java
new file mode 100644
index 000000000..b320d6b64
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/BaseUnitTest.java
@@ -0,0 +1,34 @@
+package org.ray.streaming.runtime;
+
+import java.lang.reflect.Method;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class BaseUnitTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseUnitTest.class);
+
+ @BeforeClass
+ public void setUp() {
+ TestHelper.setUTFlag();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ TestHelper.clearUTFlag();
+ }
+
+ @BeforeMethod
+ public void testBegin(Method method) {
+ LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: " + method.getName() + " began >>>>>>>>>>>>>>>>>>>>");
+ }
+
+ @AfterMethod
+ public void testEnd(Method method) {
+ LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: " + method.getName() + " end >>>>>>>>>>>>>>>>>>");
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/TestHelper.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/TestHelper.java
new file mode 100644
index 000000000..43f6e8f25
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/TestHelper.java
@@ -0,0 +1,18 @@
+package org.ray.streaming.runtime;
+
+public class TestHelper {
+
+ private static volatile boolean UT_FLAG = false;
+
+ public static void setUTFlag() {
+ UT_FLAG = true;
+ }
+
+ public static void clearUTFlag() {
+ UT_FLAG = false;
+ }
+
+ public static boolean isUT() {
+ return UT_FLAG;
+ }
+}
\ No newline at end of file
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/config/ConfigTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/config/ConfigTest.java
new file mode 100644
index 000000000..a529f70e4
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/config/ConfigTest.java
@@ -0,0 +1,64 @@
+package org.ray.streaming.runtime.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.aeonbits.owner.ConfigFactory;
+import org.nustaq.serialization.FSTConfiguration;
+import org.ray.streaming.runtime.BaseUnitTest;
+import org.ray.streaming.runtime.config.global.CommonConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ConfigTest extends BaseUnitTest {
+
+ @Test
+ public void testBaseFunc() {
+ // conf using
+ CommonConfig commonConfig = ConfigFactory.create(CommonConfig.class);
+ Assert.assertTrue(commonConfig.jobId().equals("default-job-id"));
+
+ // override conf
+ Map customConf = new HashMap<>();
+ customConf.put(CommonConfig.JOB_ID, "111");
+ CommonConfig commonConfig2 = ConfigFactory.create(CommonConfig.class, customConf);
+ Assert.assertTrue(commonConfig2.jobId().equals("111"));
+ }
+
+ @Test
+ public void testMapTransformation() {
+ Map conf = new HashMap<>();
+ String testValue = "222";
+ conf.put(CommonConfig.JOB_ID, testValue);
+
+ StreamingConfig config = new StreamingConfig(conf);
+ Map wholeConfigMap = config.getMap();
+
+ Assert.assertTrue(wholeConfigMap.get(CommonConfig.JOB_ID).equals(testValue));
+ }
+
+ @Test
+ public void testCustomConfKeeping() {
+ Map conf = new HashMap<>();
+ String customKey = "test_key";
+ String customValue = "test_value";
+ conf.put(customKey, customValue);
+ StreamingConfig config = new StreamingConfig(conf);
+ Assert.assertEquals(config.getMap().get(customKey), customValue);
+ }
+
+ @Test
+ public void testSerialization() {
+ Map conf = new HashMap<>();
+ String customKey = "test_key";
+ String customValue = "test_value";
+ conf.put(customKey, customValue);
+ StreamingConfig config = new StreamingConfig(conf);
+
+ FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
+ byte[] configBytes = fstConf.asByteArray(config);
+ StreamingConfig deserializedConfig = (StreamingConfig) fstConf.asObject(configBytes);
+
+ Assert.assertEquals(deserializedConfig.masterConfig.commonConfig.jobId(), "default-job-id");
+ Assert.assertEquals(deserializedConfig.getMap().get(customKey), customValue);
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java
index e6427120a..aae096f45 100644
--- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java
@@ -6,6 +6,7 @@ import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.stream.StreamSource;
+import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.util.Config;
import java.io.Serializable;
import java.util.ArrayList;
@@ -18,8 +19,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
-
-public class WordCountTest implements Serializable {
+public class WordCountTest extends BaseUnitTest implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class);
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java
index c4ee3a005..e3beefa62 100644
--- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java
@@ -13,6 +13,7 @@ import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
+import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
@@ -25,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TaskAssignerImplTest {
+public class TaskAssignerImplTest extends BaseUnitTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignerImplTest.class);
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java
index 0a65b1abc..b0e7f0759 100644
--- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java
@@ -21,6 +21,7 @@ import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.stream.StreamSource;
+import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.util.EnvUtil;
import org.ray.streaming.util.Config;
@@ -32,7 +33,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-public class StreamingQueueTest implements Serializable {
+public class StreamingQueueTest extends BaseUnitTest implements Serializable {
+
private static Logger LOGGER = LoggerFactory.getLogger(StreamingQueueTest.class);
static {
diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/transfer/ChannelIDTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/transfer/ChannelIDTest.java
index 654447f95..a621b311c 100644
--- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/transfer/ChannelIDTest.java
+++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/transfer/ChannelIDTest.java
@@ -2,11 +2,11 @@ package org.ray.streaming.runtime.transfer;
import static org.testng.Assert.assertEquals;
-
+import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.util.EnvUtil;
import org.testng.annotations.Test;
-public class ChannelIDTest {
+public class ChannelIDTest extends BaseUnitTest {
static {
EnvUtil.loadNativeLibraries();