[Streaming] Add configuration with owner config. (#6687)

This commit is contained in:
Tianyi Chen
2020-01-08 11:19:01 +08:00
committed by Hao Chen
parent 872a3522aa
commit 9dacebec1a
16 changed files with 321 additions and 6 deletions
@@ -0,0 +1,11 @@
package org.ray.streaming.runtime.config;
import java.io.Serializable;
import javax.accessibility.Accessible;
/**
* Basic config interface.
*/
public interface Config extends org.aeonbits.owner.Config, Accessible, Serializable {
}
@@ -0,0 +1,25 @@
package org.ray.streaming.runtime.config;
import java.io.Serializable;
import java.util.Map;
/**
* Streaming config including general, master and worker part.
*/
public class StreamingConfig implements Serializable {
public StreamingMasterConfig masterConfig;
public StreamingWorkerConfig workerConfigTemplate;
public StreamingConfig(final Map<String, String> conf) {
masterConfig = new StreamingMasterConfig(conf);
workerConfigTemplate = new StreamingWorkerConfig(conf);
}
public Map<String, String> getMap() {
Map<String, String> wholeConfigMap = masterConfig.configMap;
wholeConfigMap.putAll(workerConfigTemplate.configMap);
return wholeConfigMap;
}
}
@@ -0,0 +1,83 @@
package org.ray.streaming.runtime.config;
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.aeonbits.owner.Config.DefaultValue;
import org.aeonbits.owner.Config.Key;
import org.aeonbits.owner.ConfigFactory;
import org.ray.streaming.runtime.config.global.CommonConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming general config. May used by both JobMaster and JobWorker.
*/
public class StreamingGlobalConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StreamingGlobalConfig.class);
public final CommonConfig commonConfig;
public final Map<String, String> configMap;
public StreamingGlobalConfig(final Map<String, String> conf) {
configMap = new HashMap<>(conf);
commonConfig = ConfigFactory.create(CommonConfig.class, conf);
globalConfig2Map();
}
@Override
public String toString() {
return configMap.toString();
}
private void globalConfig2Map() {
try {
configMap.putAll(config2Map(this.commonConfig));
} catch (Exception e) {
LOG.error("Couldn't convert global config to a map.", e);
}
}
protected Map<String, String> config2Map(org.aeonbits.owner.Config config)
throws ClassNotFoundException {
Map<String, String> result = new HashMap<>();
Class<?> proxyClazz = Class.forName(config.getClass().getName());
Class<?>[] proxyInterfaces = proxyClazz.getInterfaces();
Class<?> configInterface = null;
for (Class<?> proxyInterface : proxyInterfaces) {
if (Config.class.isAssignableFrom(proxyInterface)) {
configInterface = proxyInterface;
break;
}
}
Preconditions.checkArgument(configInterface != null,
"Can not get config interface.");
Method[] methods = configInterface.getMethods();
for (Method method : methods) {
Key ownerKeyAnnotation = method.getAnnotation(Key.class);
String ownerKeyAnnotationValue;
if (ownerKeyAnnotation != null) {
ownerKeyAnnotationValue = ownerKeyAnnotation.value();
Object value;
try {
value = method.invoke(config);
} catch (Exception e) {
LOG.warn("Can not get value by method invoking for config key: {}. "
+ "So use default value instead.", ownerKeyAnnotationValue);
String defaultValue = method.getAnnotation(DefaultValue.class).value();
value = defaultValue;
}
result.put(ownerKeyAnnotationValue, value + "");
}
}
return result;
}
}
@@ -0,0 +1,17 @@
package org.ray.streaming.runtime.config;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming job master config.
*/
public class StreamingMasterConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class);
public StreamingMasterConfig(final Map<String, String> conf) {
super(conf);
}
}
@@ -0,0 +1,17 @@
package org.ray.streaming.runtime.config;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming job worker specified config.
*/
public class StreamingWorkerConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkerConfig.class);
public StreamingWorkerConfig(final Map<String, String> conf) {
super(conf);
}
}
@@ -0,0 +1,28 @@
package org.ray.streaming.runtime.config.global;
import org.ray.streaming.runtime.config.Config;
/**
* Job common config.
*/
public interface CommonConfig extends Config {
String JOB_ID = "streaming.job.id";
String JOB_NAME = "streaming.job.name";
/**
* Ray streaming job id. Non-custom.
* @return Job id with string type.
*/
@DefaultValue(value = "default-job-id")
@Key(value = JOB_ID)
String jobId();
/**
* Ray streaming job name. Non-custom.
* @return Job name with string type.
*/
@DefaultValue(value = "default-job-name")
@Key(value = JOB_NAME)
String jobName();
}
@@ -0,0 +1,34 @@
package org.ray.streaming.runtime;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
public abstract class BaseUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(BaseUnitTest.class);
@BeforeClass
public void setUp() {
TestHelper.setUTFlag();
}
@AfterClass
public void tearDown() {
TestHelper.clearUTFlag();
}
@BeforeMethod
public void testBegin(Method method) {
LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: " + method.getName() + " began >>>>>>>>>>>>>>>>>>>>");
}
@AfterMethod
public void testEnd(Method method) {
LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: " + method.getName() + " end >>>>>>>>>>>>>>>>>>");
}
}
@@ -0,0 +1,18 @@
package org.ray.streaming.runtime;
public class TestHelper {
private static volatile boolean UT_FLAG = false;
public static void setUTFlag() {
UT_FLAG = true;
}
public static void clearUTFlag() {
UT_FLAG = false;
}
public static boolean isUT() {
return UT_FLAG;
}
}
@@ -0,0 +1,64 @@
package org.ray.streaming.runtime.config;
import java.util.HashMap;
import java.util.Map;
import org.aeonbits.owner.ConfigFactory;
import org.nustaq.serialization.FSTConfiguration;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.config.global.CommonConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ConfigTest extends BaseUnitTest {
@Test
public void testBaseFunc() {
// conf using
CommonConfig commonConfig = ConfigFactory.create(CommonConfig.class);
Assert.assertTrue(commonConfig.jobId().equals("default-job-id"));
// override conf
Map<String, String> customConf = new HashMap<>();
customConf.put(CommonConfig.JOB_ID, "111");
CommonConfig commonConfig2 = ConfigFactory.create(CommonConfig.class, customConf);
Assert.assertTrue(commonConfig2.jobId().equals("111"));
}
@Test
public void testMapTransformation() {
Map<String, String> conf = new HashMap<>();
String testValue = "222";
conf.put(CommonConfig.JOB_ID, testValue);
StreamingConfig config = new StreamingConfig(conf);
Map<String, String> wholeConfigMap = config.getMap();
Assert.assertTrue(wholeConfigMap.get(CommonConfig.JOB_ID).equals(testValue));
}
@Test
public void testCustomConfKeeping() {
Map<String, String> conf = new HashMap<>();
String customKey = "test_key";
String customValue = "test_value";
conf.put(customKey, customValue);
StreamingConfig config = new StreamingConfig(conf);
Assert.assertEquals(config.getMap().get(customKey), customValue);
}
@Test
public void testSerialization() {
Map<String, String> conf = new HashMap<>();
String customKey = "test_key";
String customValue = "test_value";
conf.put(customKey, customValue);
StreamingConfig config = new StreamingConfig(conf);
FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
byte[] configBytes = fstConf.asByteArray(config);
StreamingConfig deserializedConfig = (StreamingConfig) fstConf.asObject(configBytes);
Assert.assertEquals(deserializedConfig.masterConfig.commonConfig.jobId(), "default-job-id");
Assert.assertEquals(deserializedConfig.getMap().get(customKey), customValue);
}
}
@@ -6,6 +6,7 @@ import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.util.Config;
import java.io.Serializable;
import java.util.ArrayList;
@@ -18,8 +19,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class WordCountTest implements Serializable {
public class WordCountTest extends BaseUnitTest implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class);
@@ -13,6 +13,7 @@ import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
@@ -25,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TaskAssignerImplTest {
public class TaskAssignerImplTest extends BaseUnitTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignerImplTest.class);
@@ -21,6 +21,7 @@ import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.util.EnvUtil;
import org.ray.streaming.util.Config;
@@ -32,7 +33,8 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class StreamingQueueTest implements Serializable {
public class StreamingQueueTest extends BaseUnitTest implements Serializable {
private static Logger LOGGER = LoggerFactory.getLogger(StreamingQueueTest.class);
static {
@@ -2,11 +2,11 @@ package org.ray.streaming.runtime.transfer;
import static org.testng.Assert.assertEquals;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.util.EnvUtil;
import org.testng.annotations.Test;
public class ChannelIDTest {
public class ChannelIDTest extends BaseUnitTest {
static {
EnvUtil.loadNativeLibraries();