[streaming] Sync changes for graph part. (#7827)

This commit is contained in:
Tianyi Chen
2020-04-09 12:30:44 +08:00
committed by GitHub
parent 6521e92a95
commit c5bf9cc472
8 changed files with 171 additions and 71 deletions
@@ -2,6 +2,7 @@ package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
* An edge that connects two execution vertices.
@@ -18,6 +19,11 @@ public class ExecutionEdge implements Serializable {
*/
private final ExecutionVertex targetVertex;
/**
* The partition of current execution edge's execution job edge.
*/
private final Partition partition;
/**
* An unique id for execution edge.
*/
@@ -27,11 +33,12 @@ public class ExecutionEdge implements Serializable {
ExecutionJobEdge executionJobEdge) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.partition = executionJobEdge.getPartition();
this.executionEdgeIndex = generateExecutionEdgeIndex();
}
private String generateExecutionEdgeIndex() {
return sourceVertex.getVertexId() + "" + targetVertex.getVertexId();
return sourceVertex.getId() + "" + targetVertex.getId();
}
public ExecutionVertex getSourceVertex() {
@@ -43,11 +50,15 @@ public class ExecutionEdge implements Serializable {
}
public int getSourceVertexId() {
return sourceVertex.getVertexId();
return sourceVertex.getId();
}
public int getTargetVertexId() {
return targetVertex.getVertexId();
return targetVertex.getId();
}
public Partition getPartition() {
return partition;
}
public String getExecutionEdgeIndex() {
@@ -57,9 +68,10 @@ public class ExecutionEdge implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("srcVertex", sourceVertex)
.add("executionEdgeIndex", executionEdgeIndex)
.toString();
.add("sourceVertex", sourceVertex)
.add("targetVertex", targetVertex)
.add("partition", partition)
.add("executionEdgeIndex", executionEdgeIndex)
.toString();
}
}
@@ -5,6 +5,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -39,6 +40,11 @@ public class ExecutionGraph implements Serializable {
*/
private long buildTime;
/**
* A monotonic increasing number, used for vertex's id(immutable).
*/
private AtomicInteger executionVertexIdGenerator = new AtomicInteger(0);
public ExecutionGraph(String jobName) {
this.jobName = jobName;
this.buildTime = System.currentTimeMillis();
@@ -80,6 +86,14 @@ public class ExecutionGraph implements Serializable {
return buildTime;
}
public int generateExecutionVertexId() {
return executionVertexIdGenerator.getAndIncrement();
}
public AtomicInteger getExecutionVertexIdGenerator() {
return executionVertexIdGenerator;
}
/**
* Get all execution vertices from current execution graph.
*
@@ -114,7 +128,7 @@ public class ExecutionGraph implements Serializable {
public ExecutionVertex getExecutionJobVertexByJobVertexId(int vertexId) {
for (ExecutionJobVertex executionJobVertex : executionJobVertexMap.values()) {
for (ExecutionVertex executionVertex : executionJobVertex.getExecutionVertices()) {
if (executionVertex.getVertexId() == vertexId) {
if (executionVertex.getId() == vertexId) {
return executionVertex;
}
}
@@ -6,11 +6,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.aeonbits.owner.ConfigFactory;
import org.ray.api.RayActor;
import org.ray.streaming.api.Language;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.jobgraph.VertexType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.worker.JobWorker;
/**
@@ -22,42 +25,54 @@ import org.ray.streaming.runtime.worker.JobWorker;
public class ExecutionJobVertex {
/**
* Unique id for execution job vertex.
* Unique id of operator(use {@link JobVertex}'s id). Used as jobVertex's id.
*/
private final int jobVertexId;
/**
* Unique name generated by vertex name and index for execution job vertex.
* Unique name of operator(use {@link StreamOperator}'s name). Used as jobVertex's name.
*/
private final String jobVertexName;
private final StreamOperator streamOperator;
private final VertexType vertexType;
private final Language language;
private final Map<String, String> jobConfig;
/**
* Parallelism of current execution job vertex(operator).
*/
private int parallelism;
/**
* Sub execution vertices of current execution job vertex(operator).
*/
private List<ExecutionVertex> executionVertices;
private JobRuntimeContext runtimeContext;
/**
* Input and output edges of current execution job vertex.
*/
private List<ExecutionJobEdge> inputEdges = new ArrayList<>();
private List<ExecutionJobEdge> outputEdges = new ArrayList<>();
public ExecutionJobVertex(JobVertex jobVertex, JobRuntimeContext runtimeContext) {
public ExecutionJobVertex(
JobVertex jobVertex, Map<String, String> jobConfig, AtomicInteger idGenerator) {
this.jobVertexId = jobVertex.getVertexId();
this.jobVertexName = generateVertexName(jobVertexId, jobVertex.getStreamOperator());
this.jobVertexName = jobVertex.getStreamOperator().getName();
this.streamOperator = jobVertex.getStreamOperator();
this.vertexType = jobVertex.getVertexType();
this.language = jobVertex.getLanguage();
this.jobConfig = jobConfig;
this.parallelism = jobVertex.getParallelism();
this.runtimeContext = runtimeContext;
this.executionVertices = createExecutionVertics();
this.executionVertices = createExecutionVertics(idGenerator);
}
private String generateVertexName(int vertexId, StreamOperator streamOperator) {
return vertexId + "-" + streamOperator.getName();
}
private List<ExecutionVertex> createExecutionVertics() {
private List<ExecutionVertex> createExecutionVertics(AtomicInteger idGenerator) {
List<ExecutionVertex> executionVertices = new ArrayList<>();
for (int index = 1; index <= parallelism; index++) {
executionVertices.add(new ExecutionVertex(jobVertexId, index, this));
ResourceConfig resourceConfig = ConfigFactory.create(ResourceConfig.class, jobConfig);
for (int subIndex = 0; subIndex < parallelism; subIndex++) {
executionVertices.add(new ExecutionVertex(
idGenerator.getAndIncrement(), subIndex, this, resourceConfig));
}
return executionVertices;
}
@@ -72,7 +87,7 @@ public class ExecutionJobVertex {
Preconditions.checkArgument(
vertex.getWorkerActor() != null,
"Empty execution vertex worker actor.");
executionVertexWorkersMap.put(vertex.getVertexId(), vertex.getWorkerActor());
executionVertexWorkersMap.put(vertex.getId(), vertex.getWorkerActor());
});
return executionVertexWorkersMap;
@@ -86,6 +101,15 @@ public class ExecutionJobVertex {
return jobVertexName;
}
/**
* e.g. 1-SourceOperator
*
* @return operator name with index
*/
public String getVertexNameWithIndex() {
return jobVertexId + "-" + jobVertexName;
}
public int getParallelism() {
return parallelism;
}
@@ -125,6 +149,10 @@ public class ExecutionJobVertex {
return vertexType;
}
public Language getLanguage() {
return language;
}
public boolean isSourceVertex() {
return getVertexType() == VertexType.SOURCE;
}
@@ -137,21 +165,13 @@ public class ExecutionJobVertex {
return getVertexType() == VertexType.SINK;
}
public JobRuntimeContext getRuntimeContext() {
return runtimeContext;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobVertexId", jobVertexId)
.add("jobVertexName", jobVertexName)
.add("streamOperator", streamOperator)
.add("vertexType", vertexType)
.add("parallelism", parallelism)
.add("executionVertices", executionVertices)
.add("inputEdges", inputEdges)
.add("outputEdges", outputEdges)
.toString();
}
}
@@ -8,10 +8,12 @@ import java.util.List;
import java.util.Map;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.streaming.api.Language;
import org.ray.streaming.jobgraph.VertexType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.resource.ResourceType;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.worker.JobWorker;
/**
@@ -22,47 +24,85 @@ public class ExecutionVertex implements Serializable {
/**
* Unique id for execution vertex.
*/
private final int vertexId;
private final int id;
/**
* Ordered index for execution vertex.
* Immutable field inherited from {@link ExecutionJobVertex}.
*/
private final int vertexIndex;
private final int jobVertexId;
private final String jobVertexName;
private final StreamOperator streamOperator;
private final VertexType vertexType;
private final Language language;
/**
* Unique name generated by vertex name and index for execution vertex.
*/
private final String vertexName;
/**
* Resources used by ExecutionVertex.
*/
private final Map<String, Double> resources;
/**
* Ordered sub index for execution vertex in a execution job vertex.
* Might be changed in dynamic scheduling.
*/
private int vertexIndex;
private ExecutionVertexState state = ExecutionVertexState.TO_ADD;
private Slot slot;
private RayActor<JobWorker> workerActor;
private List<ExecutionEdge> inputEdges = new ArrayList<>();
private List<ExecutionEdge> outputEdges = new ArrayList<>();
public ExecutionVertex(int jobVertexId, int index, ExecutionJobVertex executionJobVertex) {
this.vertexId = generateExecutionVertexId(jobVertexId, index);
public ExecutionVertex(
int globalIndex,
int index,
ExecutionJobVertex executionJobVertex,
ResourceConfig resourceConfig) {
this.id = globalIndex;
this.jobVertexId = executionJobVertex.getJobVertexId();
this.jobVertexName = executionJobVertex.getJobVertexName();
this.streamOperator = executionJobVertex.getStreamOperator();
this.vertexType = executionJobVertex.getVertexType();
this.language = executionJobVertex.getLanguage();
this.vertexIndex = index;
this.vertexName = executionJobVertex.getJobVertexName() + "-" + vertexIndex;
this.resources = generateResources(executionJobVertex.getRuntimeContext());
this.resources = generateResources(resourceConfig);
}
private int generateExecutionVertexId(int jobVertexId, int index) {
return jobVertexId * 100000 + index;
public int getId() {
return id;
}
public int getVertexId() {
return vertexId;
public int getJobVertexId() {
return jobVertexId;
}
public String getJobVertexName() {
return jobVertexName;
}
public StreamOperator getStreamOperator() {
return streamOperator;
}
public VertexType getVertexType() {
return vertexType;
}
public Language getLanguage() {
return language;
}
public int getVertexIndex() {
return vertexIndex;
}
/**
* Unique name generated by vertex name and index for execution vertex.
* e.g. 1-SourceOperator-3 (vertex index is 3)
*/
public String getVertexName() {
return jobVertexId + "-" + jobVertexName + "-" + vertexIndex;
}
public ExecutionVertexState getState() {
return state;
}
@@ -113,10 +153,6 @@ public class ExecutionVertex implements Serializable {
this.outputEdges = outputEdges;
}
public String getVertexName() {
return vertexName;
}
public Map<String, Double> getResources() {
return resources;
}
@@ -135,9 +171,8 @@ public class ExecutionVertex implements Serializable {
}
}
private Map<String, Double> generateResources(JobRuntimeContext runtimeContext) {
private Map<String, Double> generateResources(ResourceConfig resourceConfig) {
Map<String, Double> resourceMap = new HashMap<>();
ResourceConfig resourceConfig = runtimeContext.getConf().masterConfig.resourceConfig;
if (resourceConfig.isTaskCpuResourceLimit()) {
resourceMap.put(ResourceType.CPU.name(), resourceConfig.taskCpuResource());
}
@@ -150,9 +185,8 @@ public class ExecutionVertex implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("vertexId", vertexId)
.add("vertexIndex", vertexIndex)
.add("vertexName", vertexName)
.add("id", id)
.add("name", getVertexName())
.add("resources", resources)
.add("state", state)
.add("slot", slot)
@@ -44,6 +44,7 @@ public class GraphManagerImpl implements GraphManager {
private ExecutionGraph setupStructure(JobGraph jobGraph) {
ExecutionGraph executionGraph = new ExecutionGraph(jobGraph.getJobName());
Map<String, String> jobConfig = jobGraph.getJobConfig();
// create vertex
Map<Integer, ExecutionJobVertex> exeJobVertexMap = new LinkedHashMap<>();
@@ -51,7 +52,8 @@ public class GraphManagerImpl implements GraphManager {
for (JobVertex jobVertex : jobGraph.getJobVertexList()) {
int jobVertexId = jobVertex.getVertexId();
exeJobVertexMap.put(jobVertexId,
new ExecutionJobVertex(jobVertex, runtimeContext));
new ExecutionJobVertex(
jobVertex, jobConfig, executionGraph.getExecutionVertexIdGenerator()));
}
// connect vertex
@@ -181,7 +181,7 @@ public class PipelineFirstStrategy implements SlotAssignStrategy {
Slot useSlot = resources.getAllocatingMap().get(container.getContainerId())
.stream().filter(s -> s.getId() == slot.getId()).findFirst().get();
useSlot.getExecutionVertexIds().add(vertex.getVertexId());
useSlot.getExecutionVertexIds().add(vertex.getId());
// current container reaches capacity limitation, go to the next one.
resources.setCurrentContainerAllocatedActorNum(
@@ -2,12 +2,10 @@ package org.ray.streaming.runtime.graph;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.DataStreamSource;
@@ -16,9 +14,11 @@ import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobGraphBuilder;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import org.ray.streaming.runtime.core.resource.ResourceType;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.master.graphmanager.GraphManager;
import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
@@ -45,6 +45,17 @@ public class ExecutionGraphTest extends BaseUnitTest {
int totalVertexNum = jobGraph.getJobVertexList().stream()
.mapToInt(vertex -> vertex.getParallelism()).sum();
Assert.assertEquals(executionGraph.getAllExecutionVertices().size(), totalVertexNum);
Assert.assertEquals(executionGraph.getAllExecutionVertices().size(),
executionGraph.getExecutionVertexIdGenerator().get());
executionGraph.getAllExecutionVertices().forEach(vertex -> {
Assert.assertNotNull(vertex.getStreamOperator());
Assert.assertNotNull(vertex.getJobVertexName());
Assert.assertNotNull(vertex.getVertexType());
Assert.assertNotNull(vertex.getLanguage());
Assert.assertEquals(vertex.getVertexName(),
vertex.getJobVertexId() + "-" + vertex.getJobVertexName() + "-" + vertex.getVertexIndex());
});
int startIndex = 0;
ExecutionJobVertex upStream = executionJobVertices.get(startIndex);
@@ -53,10 +64,11 @@ public class ExecutionGraphTest extends BaseUnitTest {
List<ExecutionVertex> upStreamVertices = upStream.getExecutionVertices();
List<ExecutionVertex> downStreamVertices = downStream.getExecutionVertices();
upStreamVertices.stream().forEach(vertex -> {
vertex.getOutputEdges().stream().forEach(upStreamOutPutEdge -> {
Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetVertex()));
});
upStreamVertices.forEach(vertex -> {
Assert.assertEquals(vertex.getResources().get(ResourceType.CPU.name()), 2.0);
vertex.getOutputEdges().stream().forEach(upStreamOutPutEdge -> {
Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetVertex()));
});
});
}
@@ -73,10 +85,16 @@ public class ExecutionGraphTest extends BaseUnitTest {
DataStream<String> dataStream = DataStreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOG.info(x));
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink));
JobGraph jobGraph = jobGraphBuilder.build();
return jobGraph;
Map<String, String> jobConfig = new HashMap<>();
jobConfig.put("key1", "value1");
jobConfig.put("key2", "value2");
jobConfig.put(ResourceConfig.TASK_RESOURCE_CPU, "2.0");
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(
Lists.newArrayList(streamSink), "test", jobConfig);
return jobGraphBuilder.build();
}
}
@@ -94,7 +94,7 @@ public class ResourceManagerTest extends BaseUnitTest {
});
Assert.assertEquals(container1.getAvailableResource().get(ResourceType.CPU.name()), 14.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceType.CPU.name()), 14.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceType.MEM.name()), 118.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceType.MEM.name()), 118.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceType.MEM.name()), 126.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceType.MEM.name()), 126.0);
}
}