[Streaming] Streaming scheduler - part1-2: execution graph (#6666)

This commit is contained in:
Tianyi Chen
2020-02-04 11:51:21 +08:00
committed by GitHub
parent 984490d2be
commit 13882d052d
19 changed files with 841 additions and 6 deletions
@@ -32,6 +32,7 @@ public class DataStreamSource<T> extends DataStream<T> implements StreamSource<T
return new DataStreamSource(context, new CollectionSourceFunction(values));
}
@Override
public DataStreamSource<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
@@ -4,7 +4,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class JobGraphBuilder {
} else if (stream instanceof StreamSource) {
jobVertex = new JobVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator);
} else if (stream instanceof DataStream || stream instanceof PythonDataStream) {
jobVertex = new JobVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator);
jobVertex = new JobVertex(vertexId, parallelism, VertexType.TRANSFORMATION, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
JobEdge jobEdge = new JobEdge(inputVertexId, vertexId, parentStream.getPartition());
@@ -47,4 +47,5 @@ public class JobVertex implements Serializable {
.add("streamOperator", streamOperator)
.toString();
}
}
@@ -6,6 +6,6 @@ package org.ray.streaming.jobgraph;
public enum VertexType {
MASTER,
SOURCE,
PROCESS,
TRANSFORMATION,
SINK,
}
@@ -14,7 +14,6 @@ public abstract class StreamOperator<F extends Function> implements Operator {
protected List<Collector> collectorList;
protected RuntimeContext runtimeContext;
public StreamOperator(F function) {
this.name = getClass().getSimpleName();
this.function = function;
@@ -8,7 +8,6 @@ import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.DataStreamSource;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -62,7 +61,7 @@ public class JobGraphBuilderTest {
JobVertex sink = jobVertexList.get(2);
Assert.assertEquals(source.getVertexType(), VertexType.SOURCE);
Assert.assertEquals(map.getVertexType(), VertexType.PROCESS);
Assert.assertEquals(map.getVertexType(), VertexType.TRANSFORMATION);
Assert.assertEquals(sink.getVertexType(), VertexType.SINK);
JobEdge keyBy2Sink = jobEdgeList.get(0);
@@ -11,6 +11,10 @@ import org.ray.streaming.runtime.worker.JobWorker;
/**
* Physical execution graph.
*
* <p>Notice: Temporary implementation for now to keep functional. This will be changed to
* {@link org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph} later when
* new stream task implementation is ready.
*/
public class ExecutionGraph implements Serializable {
private long buildTime;
@@ -0,0 +1,66 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
/**
* An edge that connects two execution vertices.
*/
public class ExecutionEdge implements Serializable {
/**
* The source(upstream) execution vertex.
*/
private final ExecutionVertex sourceVertex;
/**
* The target(downstream) execution vertex.
*/
private final ExecutionVertex targetVertex;
/**
* An unique id for execution edge.
*/
private final String executionEdgeIndex;
public ExecutionEdge(ExecutionVertex sourceVertex, ExecutionVertex targetVertex,
ExecutionJobEdge executionJobEdge) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.executionEdgeIndex = generateExecutionEdgeIndex();
}
private String generateExecutionEdgeIndex() {
return sourceVertex.getVertexId() + "" + targetVertex.getVertexId();
}
public ExecutionVertex getSourceVertex() {
return sourceVertex;
}
public ExecutionVertex getTargetVertex() {
return targetVertex;
}
public int getSourceVertexId() {
return sourceVertex.getVertexId();
}
public int getTargetVertexId() {
return targetVertex.getVertexId();
}
public String getExecutionEdgeIndex() {
return executionEdgeIndex;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("srcVertex", sourceVertex)
.add("targetVertex", targetVertex)
.add("executionEdgeIndex", executionEdgeIndex)
.toString();
}
}
@@ -0,0 +1,125 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Physical plan.
*/
public class ExecutionGraph implements Serializable {
/**
* Name of the job.
*/
private final String jobName;
/**
* Configuration of the job.
*/
private Map<String, String> jobConfig;
/**
* Data map for execution job vertex.
* key: job vertex id.
* value: execution job vertex.
*/
private Map<Integer, ExecutionJobVertex> executionJobVertexMap;
/**
* The max parallelism of the whole graph.
*/
private int maxParallelism;
/**
* Build time.
*/
private long buildTime;
public ExecutionGraph(String jobName) {
this.jobName = jobName;
this.buildTime = System.currentTimeMillis();
}
public String getJobName() {
return jobName;
}
public List<ExecutionJobVertex> getExecutionJobVertexLices() {
return new ArrayList<ExecutionJobVertex>(executionJobVertexMap.values());
}
public Map<Integer, ExecutionJobVertex> getExecutionJobVertexMap() {
return executionJobVertexMap;
}
public void setExecutionJobVertexMap(Map<Integer, ExecutionJobVertex> executionJobVertexMap) {
this.executionJobVertexMap = executionJobVertexMap;
}
public Map<String, String> getJobConfig() {
return jobConfig;
}
public void setJobConfig(Map<String, String> jobConfig) {
this.jobConfig = jobConfig;
}
public int getMaxParallelism() {
return maxParallelism;
}
public void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
public long getBuildTime() {
return buildTime;
}
/**
* Get all execution vertices from current execution graph.
*
* @return all execution vertices.
*/
public List<ExecutionVertex> getAllExecutionVertices() {
return executionJobVertexMap.values().stream()
.map(ExecutionJobVertex::getExecutionVertices)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
/**
* Get all execution vertices whose status is 'TO_ADD' from current execution graph.
*
* @return all added execution vertices.
*/
public List<ExecutionVertex> getAllAddedExecutionVertices() {
return executionJobVertexMap.values().stream()
.map(ExecutionJobVertex::getExecutionVertices)
.flatMap(Collection::stream)
.filter(vertex -> vertex.is2Add())
.collect(Collectors.toList());
}
/**
* Get specified execution vertex from current execution graph by execution vertex id.
*
* @param vertexId execution vertex id.
* @return the specified execution vertex.
*/
public ExecutionVertex getExecutionJobVertexByJobVertexId(int vertexId) {
for (ExecutionJobVertex executionJobVertex : executionJobVertexMap.values()) {
for (ExecutionVertex executionVertex : executionJobVertex.getExecutionVertices()) {
if (executionVertex.getVertexId() == vertexId) {
return executionVertex;
}
}
}
throw new RuntimeException("Vertex " + vertexId + " does not exist!");
}
}
@@ -0,0 +1,65 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.jobgraph.JobEdge;
/**
* An edge that connects two execution job vertices.
*/
public class ExecutionJobEdge {
/**
* The source(upstream) execution job vertex.
*/
private final ExecutionJobVertex sourceVertex;
/**
* The target(downstream) execution job vertex.
*/
private final ExecutionJobVertex targetVertex;
/**
* The partition of the execution job edge.
*/
private final Partition partition;
/**
* An unique id for execution job edge.
*/
private final String executionJobEdgeIndex;
public ExecutionJobEdge(ExecutionJobVertex sourceVertex, ExecutionJobVertex targetVertex,
JobEdge jobEdge) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.partition = jobEdge.getPartition();
this.executionJobEdgeIndex = generateExecutionJobEdgeIndex();
}
private String generateExecutionJobEdgeIndex() {
return sourceVertex.getJobVertexId() + "" + targetVertex.getJobVertexId();
}
public ExecutionJobVertex getSourceVertex() {
return sourceVertex;
}
public ExecutionJobVertex getTargetVertex() {
return targetVertex;
}
public Partition getPartition() {
return partition;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("srcVertex", sourceVertex)
.add("targetVertex", targetVertex)
.add("partition", partition)
.add("executionJobEdgeIndex", executionJobEdgeIndex)
.toString();
}
}
@@ -0,0 +1,149 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.RayActor;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.jobgraph.VertexType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* Physical job vertex.
*
* <p>Execution job vertex is the physical form of {@link JobVertex} and
* every execution job vertex is corresponding to a group of {@link ExecutionVertex}.
*/
public class ExecutionJobVertex {
/**
* Unique id for execution job vertex.
*/
private final int jobVertexId;
/**
* Unique name generated by vertex name and index for execution job vertex.
*/
private final String jobVertexName;
private final StreamOperator streamOperator;
private final VertexType vertexType;
private int parallelism;
private List<ExecutionVertex> executionVertices;
private List<ExecutionJobEdge> inputEdges = new ArrayList<>();
private List<ExecutionJobEdge> outputEdges = new ArrayList<>();
public ExecutionJobVertex(JobVertex jobVertex) {
this.jobVertexId = jobVertex.getVertexId();
this.jobVertexName = generateVertexName(jobVertexId, jobVertex.getStreamOperator());
this.streamOperator = jobVertex.getStreamOperator();
this.vertexType = jobVertex.getVertexType();
this.parallelism = jobVertex.getParallelism();
this.executionVertices = createExecutionVertics();
}
private String generateVertexName(int vertexId, StreamOperator streamOperator) {
return vertexId + "-" + streamOperator.getName();
}
private List<ExecutionVertex> createExecutionVertics() {
List<ExecutionVertex> executionVertices = new ArrayList<>();
for (int index = 1; index <= parallelism; index++) {
executionVertices.add(new ExecutionVertex(jobVertexId, index, this));
}
return executionVertices;
}
public Map<Integer, RayActor<JobWorker>> getExecutionVertexWorkers() {
Map<Integer, RayActor<JobWorker>> executionVertexWorkersMap = new HashMap<>();
Preconditions.checkArgument(
executionVertices != null && !executionVertices.isEmpty(),
"Empty execution vertex.");
executionVertices.stream().forEach(vertex -> {
Preconditions.checkArgument(
vertex.getWorkerActor() != null,
"Empty execution vertex worker actor.");
executionVertexWorkersMap.put(vertex.getVertexId(), vertex.getWorkerActor());
});
return executionVertexWorkersMap;
}
public int getJobVertexId() {
return jobVertexId;
}
public String getJobVertexName() {
return jobVertexName;
}
public int getParallelism() {
return parallelism;
}
public List<ExecutionVertex> getExecutionVertices() {
return executionVertices;
}
public void setExecutionVertices(
List<ExecutionVertex> executionVertex) {
this.executionVertices = executionVertex;
}
public List<ExecutionJobEdge> getOutputEdges() {
return outputEdges;
}
public void setOutputEdges(
List<ExecutionJobEdge> outputEdges) {
this.outputEdges = outputEdges;
}
public List<ExecutionJobEdge> getInputEdges() {
return inputEdges;
}
public void setInputEdges(
List<ExecutionJobEdge> inputEdges) {
this.inputEdges = inputEdges;
}
public StreamOperator getStreamOperator() {
return streamOperator;
}
public VertexType getVertexType() {
return vertexType;
}
public boolean isSourceVertex() {
return getVertexType() == VertexType.SOURCE;
}
public boolean isTransformationVertex() {
return getVertexType() == VertexType.TRANSFORMATION;
}
public boolean isSinkVertex() {
return getVertexType() == VertexType.SINK;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobVertexId", jobVertexId)
.add("jobVertexName", jobVertexName)
.add("streamOperator", streamOperator)
.add("vertexType", vertexType)
.add("parallelism", parallelism)
.add("executionVertices", executionVertices)
.add("inputEdges", inputEdges)
.add("outputEdges", outputEdges)
.toString();
}
}
@@ -0,0 +1,120 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* Physical vertex, correspond to {@link ExecutionJobVertex}.
*/
public class ExecutionVertex implements Serializable {
/**
* Unique id for execution vertex.
*/
private final int vertexId;
/**
* Ordered index for execution vertex.
*/
private final int vertexIndex;
/**
* Unique name generated by vertex name and index for execution vertex.
*/
private final String vertexName;
private ExecutionVertexState state = ExecutionVertexState.TO_ADD;
private RayActor<JobWorker> workerActor;
private List<ExecutionEdge> inputEdges = new ArrayList<>();
private List<ExecutionEdge> outputEdges = new ArrayList<>();
public ExecutionVertex(int jobVertexId, int index, ExecutionJobVertex executionJobVertex) {
this.vertexId = generateExecutionVertexId(jobVertexId, index);
this.vertexIndex = index;
this.vertexName = executionJobVertex.getJobVertexName() + "-" + vertexIndex;
}
private int generateExecutionVertexId(int jobVertexId, int index) {
return jobVertexId * 100000 + index;
}
public int getVertexId() {
return vertexId;
}
public int getVertexIndex() {
return vertexIndex;
}
public ExecutionVertexState getState() {
return state;
}
public void setState(ExecutionVertexState state) {
this.state = state;
}
public boolean is2Add() {
return state == ExecutionVertexState.TO_ADD;
}
public boolean isRunning() {
return state == ExecutionVertexState.RUNNING;
}
public boolean is2Delete() {
return state == ExecutionVertexState.TO_DEL;
}
public RayActor<JobWorker> getWorkerActor() {
return workerActor;
}
public ActorId getWorkerActorId() {
return workerActor.getId();
}
public void setWorkerActor(RayActor<JobWorker> workerActor) {
this.workerActor = workerActor;
}
public List<ExecutionEdge> getInputEdges() {
return inputEdges;
}
public void setInputEdges(
List<ExecutionEdge> inputEdges) {
this.inputEdges = inputEdges;
}
public List<ExecutionEdge> getOutputEdges() {
return outputEdges;
}
public void setOutputEdges(
List<ExecutionEdge> outputEdges) {
this.outputEdges = outputEdges;
}
public String getVertexName() {
return vertexName;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("vertexId", vertexId)
.add("vertexIndex", vertexIndex)
.add("vertexName", vertexName)
.add("state", state)
.add("workerActor", workerActor)
.add("inputEdges", inputEdges)
.add("outputEdges", outputEdges)
.toString();
}
}
@@ -0,0 +1,38 @@
package org.ray.streaming.runtime.core.graph.executiongraph;
import java.io.Serializable;
/**
* Vertex state.
*/
public enum ExecutionVertexState implements Serializable {
/**
* Vertex(Worker) to be added.
*/
TO_ADD(1, "TO_ADD"),
/**
* Vertex(Worker) to be deleted.
*/
TO_DEL(2, "TO_DEL"),
/**
* Vertex(Worker) is running.
*/
RUNNING(3, "RUNNING"),
/**
* Unknown status,
*/
UNKNOWN(-1, "UNKNOWN");
public final int code;
public final String msg;
ExecutionVertexState(int code, String msg) {
this.code = code;
this.msg = msg;
}
}
@@ -0,0 +1,57 @@
package org.ray.streaming.runtime.master;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
/**
* Runtime context for job master.
*
* <p>Including: graph, resource, checkpoint info, etc.
*/
public class JobRuntimeContext implements Serializable {
private StreamingConfig conf;
private JobGraph jobGraph;
private volatile ExecutionGraph executionGraph;
public JobRuntimeContext(StreamingConfig conf) {
this.conf = conf;
}
public String getJobName() {
return conf.masterConfig.commonConfig.jobName();
}
public StreamingConfig getConf() {
return conf;
}
public JobGraph getJobGraph() {
return jobGraph;
}
public void setJobGraph(JobGraph jobGraph) {
this.jobGraph = jobGraph;
}
public ExecutionGraph getExecutionGraph() {
return executionGraph;
}
public void setExecutionGraph(ExecutionGraph executionGraph) {
this.executionGraph = executionGraph;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobGraph", jobGraph)
.add("executionGraph", executionGraph)
.add("conf", conf.getMap())
.toString();
}
}
@@ -0,0 +1,42 @@
package org.ray.streaming.runtime.master.graphmanager;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
/**
* Graph manager is one of the important roles of JobMaster. It mainly focuses on graph management.
*
* <p>
* Such as:
* <ol>
* <li>Build execution graph from job graph.</li>
* <li>Do modifications or operations on graph.</li>
* <li>Query vertex info from graph.</li>
* </ol>
* </p>
*/
public interface GraphManager {
/**
* Build execution graph from job graph.
*
* @param jobGraph logical plan of streaming job.
* @return physical plan of streaming job.
*/
ExecutionGraph buildExecutionGraph(JobGraph jobGraph);
/**
* Get job graph.
*
* @return the job graph.
*/
JobGraph getJobGraph();
/**
* Get execution graph.
*
* @return the execution graph.
*/
ExecutionGraph getExecutionGraph();
}
@@ -0,0 +1,92 @@
package org.ray.streaming.runtime.master.graphmanager;
import java.util.LinkedHashMap;
import java.util.Map;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobEdge;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GraphManagerImpl implements GraphManager {
private static final Logger LOG = LoggerFactory.getLogger(GraphManagerImpl.class);
protected final JobRuntimeContext runtimeContext;
public GraphManagerImpl(JobRuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}
@Override
public ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {
LOG.info("Begin build execution graph with job graph {}.", jobGraph);
// setup structure
ExecutionGraph executionGraph = setupStructure(jobGraph);
// set max parallelism
int maxParallelism = jobGraph.getJobVertexList().stream()
.map(JobVertex::getParallelism)
.max(Integer::compareTo).get();
executionGraph.setMaxParallelism(maxParallelism);
// set job config
executionGraph.setJobConfig(jobGraph.getJobConfig());
LOG.info("Build execution graph success.");
return executionGraph;
}
private ExecutionGraph setupStructure(JobGraph jobGraph) {
ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName());
// create vertex
Map<Integer, ExecutionJobVertex> exeJobVertexMap = new LinkedHashMap<>();
long buildTime = executionGraph.getBuildTime();
for (JobVertex jobVertex : jobGraph.getJobVertexList()) {
int jobVertexId = jobVertex.getVertexId();
exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex));
}
// connect vertex
jobGraph.getJobEdgeList().stream().forEach(jobEdge -> {
ExecutionJobVertex source = exeJobVertexMap.get(jobEdge.getSrcVertexId());
ExecutionJobVertex target = exeJobVertexMap.get(jobEdge.getTargetVertexId());
ExecutionJobEdge executionJobEdge =
new ExecutionJobEdge(source, target, jobEdge);
source.getOutputEdges().add(executionJobEdge);
target.getInputEdges().add(executionJobEdge);
source.getExecutionVertices().stream().forEach(vertex -> {
target.getExecutionVertices().stream().forEach(outputVertex -> {
ExecutionEdge executionEdge = new ExecutionEdge(vertex, outputVertex, executionJobEdge);
vertex.getOutputEdges().add(executionEdge);
outputVertex.getInputEdges().add(executionEdge);
});
});
});
// set execution job vertex into execution graph
executionGraph.setExecutionJobVertexMap(exeJobVertexMap);
return executionGraph;
}
@Override
public JobGraph getJobGraph() {
return runtimeContext.getJobGraph();
}
@Override
public ExecutionGraph getExecutionGraph() {
return runtimeContext.getExecutionGraph();
}
}
@@ -1,10 +1,15 @@
package org.ray.streaming.runtime.util;
import java.lang.management.ManagementFactory;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.util.JniUtils;
public class EnvUtil {
public static String getJvmPid() {
return ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
}
public static void loadNativeLibraries() {
// Explicitly load `RayNativeRuntime`, to make sure `core_worker_library_java`
// is loaded before `streaming_java`.
@@ -0,0 +1,73 @@
package org.ray.streaming.runtime.graph;
import com.google.common.collect.Lists;
import java.util.List;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.DataStreamSource;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobGraphBuilder;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.master.graphmanager.GraphManager;
import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ExecutionGraphTest extends BaseUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphTest.class);
@Test
public void testBuildExecutionGraph() {
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(null));
JobGraph jobGraph = buildJobGraph();
ExecutionGraph executionGraph = buildExecutionGraph(graphManager, jobGraph);
List<ExecutionJobVertex> executionJobVertices = executionGraph.getExecutionJobVertexLices();
Assert.assertEquals(executionJobVertices.size(), jobGraph.getJobVertexList().size());
int totalVertexNum = jobGraph.getJobVertexList().stream()
.mapToInt(vertex -> vertex.getParallelism()).sum();
Assert.assertEquals(executionGraph.getAllExecutionVertices().size(), totalVertexNum);
int startIndex = 0;
ExecutionJobVertex upStream = executionJobVertices.get(startIndex);
ExecutionJobVertex downStream = executionJobVertices.get(startIndex + 1);
Assert.assertEquals(upStream.getOutputEdges().get(0).getTargetVertex(), downStream);
List<ExecutionVertex> upStreamVertices = upStream.getExecutionVertices();
List<ExecutionVertex> downStreamVertices = downStream.getExecutionVertices();
upStreamVertices.stream().forEach(vertex -> {
vertex.getOutputEdges().stream().forEach(upStreamOutPutEdge -> {
Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetVertex()));
});
});
}
public static ExecutionGraph buildExecutionGraph(GraphManager graphManager) {
return graphManager.buildExecutionGraph(buildJobGraph());
}
public static ExecutionGraph buildExecutionGraph(GraphManager graphManager, JobGraph jobGraph) {
return graphManager.buildExecutionGraph(jobGraph);
}
public static JobGraph buildJobGraph() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = DataStreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOG.info(x));
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink));
JobGraph jobGraph = jobGraphBuilder.build();
return jobGraph;
}
}