From 13882d052d5406b0ae79fb4e6b259929ae654067 Mon Sep 17 00:00:00 2001 From: Tianyi Chen Date: Tue, 4 Feb 2020 11:51:21 +0800 Subject: [PATCH] [Streaming] Streaming scheduler - part1-2: execution graph (#6666) --- .../api/stream/DataStreamSource.java | 1 + .../org/ray/streaming/jobgraph/JobGraph.java | 1 - .../streaming/jobgraph/JobGraphBuilder.java | 2 +- .../org/ray/streaming/jobgraph/JobVertex.java | 1 + .../ray/streaming/jobgraph/VertexType.java | 2 +- .../streaming/operator/StreamOperator.java | 1 - .../jobgraph/JobGraphBuilderTest.java | 3 +- .../runtime/core/graph/ExecutionGraph.java | 4 + .../graph/executiongraph/ExecutionEdge.java | 66 ++++++++ .../graph/executiongraph/ExecutionGraph.java | 125 +++++++++++++++ .../executiongraph/ExecutionJobEdge.java | 65 ++++++++ .../executiongraph/ExecutionJobVertex.java | 149 ++++++++++++++++++ .../graph/executiongraph/ExecutionVertex.java | 120 ++++++++++++++ .../executiongraph/ExecutionVertexState.java | 38 +++++ .../runtime/master/JobRuntimeContext.java | 57 +++++++ .../master/graphmanager/GraphManager.java | 42 +++++ .../master/graphmanager/GraphManagerImpl.java | 92 +++++++++++ .../ray/streaming/runtime/util/EnvUtil.java | 5 + .../runtime/graph/ExecutionGraphTest.java | 73 +++++++++ 19 files changed, 841 insertions(+), 6 deletions(-) create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertexState.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/JobRuntimeContext.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManager.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java create mode 100644 streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java index 930f6b51a..4d38a5df7 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java @@ -32,6 +32,7 @@ public class DataStreamSource extends DataStream implements StreamSource setParallelism(int parallelism) { this.parallelism = parallelism; return this; diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java index 604abbb84..d7008e8ef 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java @@ -4,7 +4,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java index c0e5c5b3a..c190a4317 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java @@ -57,7 +57,7 @@ public class JobGraphBuilder { } else if (stream instanceof StreamSource) { jobVertex = new JobVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator); } else if (stream instanceof DataStream || stream instanceof PythonDataStream) { - jobVertex = new JobVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator); + jobVertex = new JobVertex(vertexId, parallelism, VertexType.TRANSFORMATION, streamOperator); Stream parentStream = stream.getInputStream(); int inputVertexId = parentStream.getId(); JobEdge jobEdge = new JobEdge(inputVertexId, vertexId, parentStream.getPartition()); diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java index ebb736ecb..6fbb6aaec 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java @@ -47,4 +47,5 @@ public class JobVertex implements Serializable { .add("streamOperator", streamOperator) .toString(); } + } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java index fabc0acf0..664b835af 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java @@ -6,6 +6,6 @@ package org.ray.streaming.jobgraph; public enum VertexType { MASTER, SOURCE, - PROCESS, + TRANSFORMATION, SINK, } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java index 74894dfa5..bdab36a28 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java @@ -14,7 +14,6 @@ public abstract class StreamOperator implements Operator { protected List collectorList; protected RuntimeContext runtimeContext; - public StreamOperator(F function) { this.name = getClass().getSimpleName(); this.function = function; diff --git a/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java b/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java index 0a3951a9c..1ce3065a0 100644 --- a/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java +++ b/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java @@ -8,7 +8,6 @@ import org.ray.streaming.api.partition.impl.RoundRobinPartition; import org.ray.streaming.api.stream.DataStream; import org.ray.streaming.api.stream.DataStreamSource; import org.ray.streaming.api.stream.StreamSink; -import org.ray.streaming.api.stream.StreamSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -62,7 +61,7 @@ public class JobGraphBuilderTest { JobVertex sink = jobVertexList.get(2); Assert.assertEquals(source.getVertexType(), VertexType.SOURCE); - Assert.assertEquals(map.getVertexType(), VertexType.PROCESS); + Assert.assertEquals(map.getVertexType(), VertexType.TRANSFORMATION); Assert.assertEquals(sink.getVertexType(), VertexType.SINK); JobEdge keyBy2Sink = jobEdgeList.get(0); diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java index 56a3ecdb1..6dbd204d9 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java @@ -11,6 +11,10 @@ import org.ray.streaming.runtime.worker.JobWorker; /** * Physical execution graph. + * + *

Notice: Temporary implementation for now to keep functional. This will be changed to + * {@link org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph} later when + * new stream task implementation is ready. */ public class ExecutionGraph implements Serializable { private long buildTime; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java new file mode 100644 index 000000000..c18a4e2a5 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java @@ -0,0 +1,66 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; + +/** + * An edge that connects two execution vertices. + */ +public class ExecutionEdge implements Serializable { + + /** + * The source(upstream) execution vertex. + */ + private final ExecutionVertex sourceVertex; + + /** + * The target(downstream) execution vertex. + */ + private final ExecutionVertex targetVertex; + + /** + * An unique id for execution edge. + */ + private final String executionEdgeIndex; + + public ExecutionEdge(ExecutionVertex sourceVertex, ExecutionVertex targetVertex, + ExecutionJobEdge executionJobEdge) { + this.sourceVertex = sourceVertex; + this.targetVertex = targetVertex; + this.executionEdgeIndex = generateExecutionEdgeIndex(); + } + + private String generateExecutionEdgeIndex() { + return sourceVertex.getVertexId() + "—" + targetVertex.getVertexId(); + } + + public ExecutionVertex getSourceVertex() { + return sourceVertex; + } + + public ExecutionVertex getTargetVertex() { + return targetVertex; + } + + public int getSourceVertexId() { + return sourceVertex.getVertexId(); + } + + public int getTargetVertexId() { + return targetVertex.getVertexId(); + } + + public String getExecutionEdgeIndex() { + return executionEdgeIndex; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("srcVertex", sourceVertex) + .add("targetVertex", targetVertex) + .add("executionEdgeIndex", executionEdgeIndex) + .toString(); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java new file mode 100644 index 000000000..623abbc8c --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java @@ -0,0 +1,125 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Physical plan. + */ +public class ExecutionGraph implements Serializable { + + /** + * Name of the job. + */ + private final String jobName; + + /** + * Configuration of the job. + */ + private Map jobConfig; + + /** + * Data map for execution job vertex. + * key: job vertex id. + * value: execution job vertex. + */ + private Map executionJobVertexMap; + + /** + * The max parallelism of the whole graph. + */ + private int maxParallelism; + + /** + * Build time. + */ + private long buildTime; + + public ExecutionGraph(String jobName) { + this.jobName = jobName; + this.buildTime = System.currentTimeMillis(); + } + + public String getJobName() { + return jobName; + } + + public List getExecutionJobVertexLices() { + return new ArrayList(executionJobVertexMap.values()); + } + + public Map getExecutionJobVertexMap() { + return executionJobVertexMap; + } + + public void setExecutionJobVertexMap(Map executionJobVertexMap) { + this.executionJobVertexMap = executionJobVertexMap; + } + + public Map getJobConfig() { + return jobConfig; + } + + public void setJobConfig(Map jobConfig) { + this.jobConfig = jobConfig; + } + + public int getMaxParallelism() { + return maxParallelism; + } + + public void setMaxParallelism(int maxParallelism) { + this.maxParallelism = maxParallelism; + } + + public long getBuildTime() { + return buildTime; + } + + /** + * Get all execution vertices from current execution graph. + * + * @return all execution vertices. + */ + public List getAllExecutionVertices() { + return executionJobVertexMap.values().stream() + .map(ExecutionJobVertex::getExecutionVertices) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + /** + * Get all execution vertices whose status is 'TO_ADD' from current execution graph. + * + * @return all added execution vertices. + */ + public List getAllAddedExecutionVertices() { + return executionJobVertexMap.values().stream() + .map(ExecutionJobVertex::getExecutionVertices) + .flatMap(Collection::stream) + .filter(vertex -> vertex.is2Add()) + .collect(Collectors.toList()); + } + + /** + * Get specified execution vertex from current execution graph by execution vertex id. + * + * @param vertexId execution vertex id. + * @return the specified execution vertex. + */ + public ExecutionVertex getExecutionJobVertexByJobVertexId(int vertexId) { + for (ExecutionJobVertex executionJobVertex : executionJobVertexMap.values()) { + for (ExecutionVertex executionVertex : executionJobVertex.getExecutionVertices()) { + if (executionVertex.getVertexId() == vertexId) { + return executionVertex; + } + } + } + throw new RuntimeException("Vertex " + vertexId + " does not exist!"); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java new file mode 100644 index 000000000..fed044a53 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java @@ -0,0 +1,65 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import com.google.common.base.MoreObjects; +import org.ray.streaming.api.partition.Partition; +import org.ray.streaming.jobgraph.JobEdge; + +/** + * An edge that connects two execution job vertices. + */ +public class ExecutionJobEdge { + + /** + * The source(upstream) execution job vertex. + */ + private final ExecutionJobVertex sourceVertex; + + /** + * The target(downstream) execution job vertex. + */ + private final ExecutionJobVertex targetVertex; + + /** + * The partition of the execution job edge. + */ + private final Partition partition; + + /** + * An unique id for execution job edge. + */ + private final String executionJobEdgeIndex; + + public ExecutionJobEdge(ExecutionJobVertex sourceVertex, ExecutionJobVertex targetVertex, + JobEdge jobEdge) { + this.sourceVertex = sourceVertex; + this.targetVertex = targetVertex; + this.partition = jobEdge.getPartition(); + this.executionJobEdgeIndex = generateExecutionJobEdgeIndex(); + } + + private String generateExecutionJobEdgeIndex() { + return sourceVertex.getJobVertexId() + "—" + targetVertex.getJobVertexId(); + } + + public ExecutionJobVertex getSourceVertex() { + return sourceVertex; + } + + public ExecutionJobVertex getTargetVertex() { + return targetVertex; + } + + public Partition getPartition() { + return partition; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("srcVertex", sourceVertex) + .add("targetVertex", targetVertex) + .add("partition", partition) + .add("executionJobEdgeIndex", executionJobEdgeIndex) + .toString(); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java new file mode 100644 index 000000000..900b55307 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java @@ -0,0 +1,149 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.api.RayActor; +import org.ray.streaming.jobgraph.JobVertex; +import org.ray.streaming.jobgraph.VertexType; +import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.runtime.worker.JobWorker; + +/** + * Physical job vertex. + * + *

Execution job vertex is the physical form of {@link JobVertex} and + * every execution job vertex is corresponding to a group of {@link ExecutionVertex}. + */ +public class ExecutionJobVertex { + + /** + * Unique id for execution job vertex. + */ + private final int jobVertexId; + + /** + * Unique name generated by vertex name and index for execution job vertex. + */ + private final String jobVertexName; + private final StreamOperator streamOperator; + private final VertexType vertexType; + private int parallelism; + private List executionVertices; + + private List inputEdges = new ArrayList<>(); + private List outputEdges = new ArrayList<>(); + + public ExecutionJobVertex(JobVertex jobVertex) { + this.jobVertexId = jobVertex.getVertexId(); + this.jobVertexName = generateVertexName(jobVertexId, jobVertex.getStreamOperator()); + this.streamOperator = jobVertex.getStreamOperator(); + this.vertexType = jobVertex.getVertexType(); + this.parallelism = jobVertex.getParallelism(); + this.executionVertices = createExecutionVertics(); + } + + private String generateVertexName(int vertexId, StreamOperator streamOperator) { + return vertexId + "-" + streamOperator.getName(); + } + + private List createExecutionVertics() { + List executionVertices = new ArrayList<>(); + for (int index = 1; index <= parallelism; index++) { + executionVertices.add(new ExecutionVertex(jobVertexId, index, this)); + } + return executionVertices; + } + + public Map> getExecutionVertexWorkers() { + Map> executionVertexWorkersMap = new HashMap<>(); + + Preconditions.checkArgument( + executionVertices != null && !executionVertices.isEmpty(), + "Empty execution vertex."); + executionVertices.stream().forEach(vertex -> { + Preconditions.checkArgument( + vertex.getWorkerActor() != null, + "Empty execution vertex worker actor."); + executionVertexWorkersMap.put(vertex.getVertexId(), vertex.getWorkerActor()); + }); + + return executionVertexWorkersMap; + } + + public int getJobVertexId() { + return jobVertexId; + } + + public String getJobVertexName() { + return jobVertexName; + } + + public int getParallelism() { + return parallelism; + } + + public List getExecutionVertices() { + return executionVertices; + } + + public void setExecutionVertices( + List executionVertex) { + this.executionVertices = executionVertex; + } + + public List getOutputEdges() { + return outputEdges; + } + + public void setOutputEdges( + List outputEdges) { + this.outputEdges = outputEdges; + } + + public List getInputEdges() { + return inputEdges; + } + + public void setInputEdges( + List inputEdges) { + this.inputEdges = inputEdges; + } + + public StreamOperator getStreamOperator() { + return streamOperator; + } + + public VertexType getVertexType() { + return vertexType; + } + + public boolean isSourceVertex() { + return getVertexType() == VertexType.SOURCE; + } + + public boolean isTransformationVertex() { + return getVertexType() == VertexType.TRANSFORMATION; + } + + public boolean isSinkVertex() { + return getVertexType() == VertexType.SINK; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("jobVertexId", jobVertexId) + .add("jobVertexName", jobVertexName) + .add("streamOperator", streamOperator) + .add("vertexType", vertexType) + .add("parallelism", parallelism) + .add("executionVertices", executionVertices) + .add("inputEdges", inputEdges) + .add("outputEdges", outputEdges) + .toString(); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java new file mode 100644 index 000000000..9dd7fad64 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java @@ -0,0 +1,120 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.ray.api.RayActor; +import org.ray.api.id.ActorId; +import org.ray.streaming.runtime.worker.JobWorker; + +/** + * Physical vertex, correspond to {@link ExecutionJobVertex}. + */ +public class ExecutionVertex implements Serializable { + + /** + * Unique id for execution vertex. + */ + private final int vertexId; + + /** + * Ordered index for execution vertex. + */ + private final int vertexIndex; + + /** + * Unique name generated by vertex name and index for execution vertex. + */ + private final String vertexName; + + private ExecutionVertexState state = ExecutionVertexState.TO_ADD; + private RayActor workerActor; + private List inputEdges = new ArrayList<>(); + private List outputEdges = new ArrayList<>(); + + public ExecutionVertex(int jobVertexId, int index, ExecutionJobVertex executionJobVertex) { + this.vertexId = generateExecutionVertexId(jobVertexId, index); + this.vertexIndex = index; + this.vertexName = executionJobVertex.getJobVertexName() + "-" + vertexIndex; + } + + private int generateExecutionVertexId(int jobVertexId, int index) { + return jobVertexId * 100000 + index; + } + + public int getVertexId() { + return vertexId; + } + + public int getVertexIndex() { + return vertexIndex; + } + + public ExecutionVertexState getState() { + return state; + } + + public void setState(ExecutionVertexState state) { + this.state = state; + } + + public boolean is2Add() { + return state == ExecutionVertexState.TO_ADD; + } + + public boolean isRunning() { + return state == ExecutionVertexState.RUNNING; + } + + public boolean is2Delete() { + return state == ExecutionVertexState.TO_DEL; + } + + public RayActor getWorkerActor() { + return workerActor; + } + + public ActorId getWorkerActorId() { + return workerActor.getId(); + } + + public void setWorkerActor(RayActor workerActor) { + this.workerActor = workerActor; + } + + public List getInputEdges() { + return inputEdges; + } + + public void setInputEdges( + List inputEdges) { + this.inputEdges = inputEdges; + } + + public List getOutputEdges() { + return outputEdges; + } + + public void setOutputEdges( + List outputEdges) { + this.outputEdges = outputEdges; + } + + public String getVertexName() { + return vertexName; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("vertexId", vertexId) + .add("vertexIndex", vertexIndex) + .add("vertexName", vertexName) + .add("state", state) + .add("workerActor", workerActor) + .add("inputEdges", inputEdges) + .add("outputEdges", outputEdges) + .toString(); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertexState.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertexState.java new file mode 100644 index 000000000..cfff38c47 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertexState.java @@ -0,0 +1,38 @@ +package org.ray.streaming.runtime.core.graph.executiongraph; + +import java.io.Serializable; + +/** + * Vertex state. + */ +public enum ExecutionVertexState implements Serializable { + + /** + * Vertex(Worker) to be added. + */ + TO_ADD(1, "TO_ADD"), + + /** + * Vertex(Worker) to be deleted. + */ + TO_DEL(2, "TO_DEL"), + + /** + * Vertex(Worker) is running. + */ + RUNNING(3, "RUNNING"), + + /** + * Unknown status, + */ + UNKNOWN(-1, "UNKNOWN"); + + public final int code; + public final String msg; + + ExecutionVertexState(int code, String msg) { + this.code = code; + this.msg = msg; + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/JobRuntimeContext.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/JobRuntimeContext.java new file mode 100644 index 000000000..699018e01 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/JobRuntimeContext.java @@ -0,0 +1,57 @@ +package org.ray.streaming.runtime.master; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.runtime.config.StreamingConfig; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; + +/** + * Runtime context for job master. + * + *

Including: graph, resource, checkpoint info, etc. + */ +public class JobRuntimeContext implements Serializable { + + private StreamingConfig conf; + private JobGraph jobGraph; + private volatile ExecutionGraph executionGraph; + + public JobRuntimeContext(StreamingConfig conf) { + this.conf = conf; + } + + public String getJobName() { + return conf.masterConfig.commonConfig.jobName(); + } + + public StreamingConfig getConf() { + return conf; + } + + public JobGraph getJobGraph() { + return jobGraph; + } + + public void setJobGraph(JobGraph jobGraph) { + this.jobGraph = jobGraph; + } + + public ExecutionGraph getExecutionGraph() { + return executionGraph; + } + + public void setExecutionGraph(ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("jobGraph", jobGraph) + .add("executionGraph", executionGraph) + .add("conf", conf.getMap()) + .toString(); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManager.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManager.java new file mode 100644 index 000000000..b5ec8ffc0 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManager.java @@ -0,0 +1,42 @@ +package org.ray.streaming.runtime.master.graphmanager; + +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; + +/** + * Graph manager is one of the important roles of JobMaster. It mainly focuses on graph management. + * + *

+ * Such as: + *

    + *
  1. Build execution graph from job graph.
  2. + *
  3. Do modifications or operations on graph.
  4. + *
  5. Query vertex info from graph.
  6. + *
+ *

+ */ +public interface GraphManager { + + /** + * Build execution graph from job graph. + * + * @param jobGraph logical plan of streaming job. + * @return physical plan of streaming job. + */ + ExecutionGraph buildExecutionGraph(JobGraph jobGraph); + + /** + * Get job graph. + * + * @return the job graph. + */ + JobGraph getJobGraph(); + + /** + * Get execution graph. + * + * @return the execution graph. + */ + ExecutionGraph getExecutionGraph(); + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java new file mode 100644 index 000000000..e8453201b --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java @@ -0,0 +1,92 @@ +package org.ray.streaming.runtime.master.graphmanager; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobVertex; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobEdge; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex; +import org.ray.streaming.runtime.master.JobRuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphManagerImpl implements GraphManager { + + private static final Logger LOG = LoggerFactory.getLogger(GraphManagerImpl.class); + + protected final JobRuntimeContext runtimeContext; + + public GraphManagerImpl(JobRuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public ExecutionGraph buildExecutionGraph(JobGraph jobGraph) { + LOG.info("Begin build execution graph with job graph {}.", jobGraph); + + // setup structure + ExecutionGraph executionGraph = setupStructure(jobGraph); + + // set max parallelism + int maxParallelism = jobGraph.getJobVertexList().stream() + .map(JobVertex::getParallelism) + .max(Integer::compareTo).get(); + executionGraph.setMaxParallelism(maxParallelism); + + // set job config + executionGraph.setJobConfig(jobGraph.getJobConfig()); + + LOG.info("Build execution graph success."); + return executionGraph; + } + + private ExecutionGraph setupStructure(JobGraph jobGraph) { + ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName()); + + // create vertex + Map exeJobVertexMap = new LinkedHashMap<>(); + long buildTime = executionGraph.getBuildTime(); + for (JobVertex jobVertex : jobGraph.getJobVertexList()) { + int jobVertexId = jobVertex.getVertexId(); + exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex)); + } + + // connect vertex + jobGraph.getJobEdgeList().stream().forEach(jobEdge -> { + ExecutionJobVertex source = exeJobVertexMap.get(jobEdge.getSrcVertexId()); + ExecutionJobVertex target = exeJobVertexMap.get(jobEdge.getTargetVertexId()); + + ExecutionJobEdge executionJobEdge = + new ExecutionJobEdge(source, target, jobEdge); + + source.getOutputEdges().add(executionJobEdge); + target.getInputEdges().add(executionJobEdge); + + source.getExecutionVertices().stream().forEach(vertex -> { + target.getExecutionVertices().stream().forEach(outputVertex -> { + ExecutionEdge executionEdge = new ExecutionEdge(vertex, outputVertex, executionJobEdge); + vertex.getOutputEdges().add(executionEdge); + outputVertex.getInputEdges().add(executionEdge); + }); + }); + }); + + // set execution job vertex into execution graph + executionGraph.setExecutionJobVertexMap(exeJobVertexMap); + + return executionGraph; + } + + @Override + public JobGraph getJobGraph() { + return runtimeContext.getJobGraph(); + } + + @Override + public ExecutionGraph getExecutionGraph() { + return runtimeContext.getExecutionGraph(); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/util/EnvUtil.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/util/EnvUtil.java index caf47d489..70cb72cb0 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/util/EnvUtil.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/util/EnvUtil.java @@ -1,10 +1,15 @@ package org.ray.streaming.runtime.util; +import java.lang.management.ManagementFactory; import org.ray.runtime.RayNativeRuntime; import org.ray.runtime.util.JniUtils; public class EnvUtil { + public static String getJvmPid() { + return ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; + } + public static void loadNativeLibraries() { // Explicitly load `RayNativeRuntime`, to make sure `core_worker_library_java` // is loaded before `streaming_java`. diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java new file mode 100644 index 000000000..dd7c5e591 --- /dev/null +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java @@ -0,0 +1,73 @@ +package org.ray.streaming.runtime.graph; + +import com.google.common.collect.Lists; +import java.util.List; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.stream.DataStream; +import org.ray.streaming.api.stream.DataStreamSource; +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobGraphBuilder; +import org.ray.streaming.runtime.BaseUnitTest; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import org.ray.streaming.runtime.master.JobRuntimeContext; +import org.ray.streaming.runtime.master.graphmanager.GraphManager; +import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ExecutionGraphTest extends BaseUnitTest { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphTest.class); + + @Test + public void testBuildExecutionGraph() { + GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(null)); + JobGraph jobGraph = buildJobGraph(); + ExecutionGraph executionGraph = buildExecutionGraph(graphManager, jobGraph); + List executionJobVertices = executionGraph.getExecutionJobVertexLices(); + + Assert.assertEquals(executionJobVertices.size(), jobGraph.getJobVertexList().size()); + + int totalVertexNum = jobGraph.getJobVertexList().stream() + .mapToInt(vertex -> vertex.getParallelism()).sum(); + Assert.assertEquals(executionGraph.getAllExecutionVertices().size(), totalVertexNum); + + int startIndex = 0; + ExecutionJobVertex upStream = executionJobVertices.get(startIndex); + ExecutionJobVertex downStream = executionJobVertices.get(startIndex + 1); + Assert.assertEquals(upStream.getOutputEdges().get(0).getTargetVertex(), downStream); + + List upStreamVertices = upStream.getExecutionVertices(); + List downStreamVertices = downStream.getExecutionVertices(); + upStreamVertices.stream().forEach(vertex -> { + vertex.getOutputEdges().stream().forEach(upStreamOutPutEdge -> { + Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetVertex())); + }); + }); + } + + public static ExecutionGraph buildExecutionGraph(GraphManager graphManager) { + return graphManager.buildExecutionGraph(buildJobGraph()); + } + + public static ExecutionGraph buildExecutionGraph(GraphManager graphManager, JobGraph jobGraph) { + return graphManager.buildExecutionGraph(jobGraph); + } + + public static JobGraph buildJobGraph() { + StreamingContext streamingContext = StreamingContext.buildContext(); + DataStream dataStream = DataStreamSource.buildSource(streamingContext, + Lists.newArrayList("a", "b", "c")); + StreamSink streamSink = dataStream.sink(x -> LOG.info(x)); + JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink)); + + JobGraph jobGraph = jobGraphBuilder.build(); + return jobGraph; + } + +}