[Streaming] Streaming scheduler - part1-1: job graph (#6712)

This commit is contained in:
Tianyi Chen
2020-01-15 13:12:03 +08:00
committed by Hao Chen
parent 4227fd1b60
commit 9a4da1951e
27 changed files with 322 additions and 250 deletions
@@ -2,7 +2,6 @@ package org.ray.streaming.runtime.cluster;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
@@ -1,7 +1,6 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
@@ -6,7 +6,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
@@ -3,13 +3,14 @@ package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.ray.streaming.jobgraph.VertexType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.plan.VertexType;
/**
* A node in the physical execution graph.
*/
public class ExecutionNode implements Serializable {
private int nodeId;
private int parallelism;
private NodeType nodeType;
@@ -1,7 +1,6 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
@@ -6,8 +6,8 @@ import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.runtime.cluster.ResourceManager;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
@@ -21,8 +21,8 @@ import org.ray.streaming.schedule.JobScheduler;
* from ResourceManager.
*/
public class JobSchedulerImpl implements JobScheduler {
private Plan plan;
private Map<String, Object> jobConfig;
private JobGraph jobGraph;
private Map<String, String> jobConfig;
private ResourceManager resourceManager;
private TaskAssigner taskAssigner;
@@ -35,14 +35,14 @@ public class JobSchedulerImpl implements JobScheduler {
* Schedule physical plan to execution graph, and call streaming worker to init and run.
*/
@Override
public void schedule(Plan plan, Map<String, Object> jobConfig) {
public void schedule(JobGraph jobGraph, Map<String, String> jobConfig) {
this.jobConfig = jobConfig;
this.plan = plan;
this.jobGraph = jobGraph;
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
Ray.init();
List<RayActor<JobWorker>> workers = this.resourceManager.createWorkers(getPlanWorker());
ExecutionGraph executionGraph = this.taskAssigner.assign(this.plan, workers);
ExecutionGraph executionGraph = this.taskAssigner.assign(this.jobGraph, workers);
List<ExecutionNode> executionNodes = executionGraph.getExecutionNodeList();
List<RayObject<Boolean>> waits = new ArrayList<>();
@@ -59,7 +59,7 @@ public class JobSchedulerImpl implements JobScheduler {
}
private int getPlanWorker() {
List<PlanVertex> planVertexList = plan.getPlanVertexList();
return planVertexList.stream().map(PlanVertex::getParallelism).reduce(0, Integer::sum);
List<JobVertex> jobVertexList = jobGraph.getJobVertexList();
return jobVertexList.stream().map(JobVertex::getParallelism).reduce(0, Integer::sum);
}
}
@@ -3,7 +3,7 @@ package org.ray.streaming.runtime.schedule;
import java.io.Serializable;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.worker.JobWorker;
@@ -15,6 +15,6 @@ public interface TaskAssigner extends Serializable {
/**
* Assign logical plan to physical execution graph.
*/
ExecutionGraph assign(Plan plan, List<RayActor<JobWorker>> workers);
ExecutionGraph assign(JobGraph jobGraph, List<RayActor<JobWorker>> workers);
}
@@ -6,9 +6,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanEdge;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.jobgraph.JobEdge;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
@@ -20,37 +20,37 @@ public class TaskAssignerImpl implements TaskAssigner {
/**
* Assign an optimized logical plan to execution graph.
*
* @param plan The logical plan.
* @param jobGraph The logical plan.
* @param workers The worker actors.
* @return The physical execution graph.
*/
@Override
public ExecutionGraph assign(Plan plan, List<RayActor<JobWorker>> workers) {
List<PlanVertex> planVertices = plan.getPlanVertexList();
List<PlanEdge> planEdges = plan.getPlanEdgeList();
public ExecutionGraph assign(JobGraph jobGraph, List<RayActor<JobWorker>> workers) {
List<JobVertex> jobVertices = jobGraph.getJobVertexList();
List<JobEdge> jobEdges = jobGraph.getJobEdgeList();
int taskId = 0;
Map<Integer, ExecutionNode> idToExecutionNode = new HashMap<>();
for (PlanVertex planVertex : planVertices) {
ExecutionNode executionNode = new ExecutionNode(planVertex.getVertexId(),
planVertex.getParallelism());
executionNode.setNodeType(planVertex.getVertexType());
for (JobVertex jobVertex : jobVertices) {
ExecutionNode executionNode = new ExecutionNode(jobVertex.getVertexId(),
jobVertex.getParallelism());
executionNode.setNodeType(jobVertex.getVertexType());
List<ExecutionTask> vertexTasks = new ArrayList<>();
for (int taskIndex = 0; taskIndex < planVertex.getParallelism(); taskIndex++) {
for (int taskIndex = 0; taskIndex < jobVertex.getParallelism(); taskIndex++) {
vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId)));
taskId++;
}
executionNode.setExecutionTasks(vertexTasks);
executionNode.setStreamOperator(planVertex.getStreamOperator());
executionNode.setStreamOperator(jobVertex.getStreamOperator());
idToExecutionNode.put(executionNode.getNodeId(), executionNode);
}
for (PlanEdge planEdge : planEdges) {
int srcNodeId = planEdge.getSrcVertexId();
int targetNodeId = planEdge.getTargetVertexId();
for (JobEdge jobEdge : jobEdges) {
int srcNodeId = jobEdge.getSrcVertexId();
int targetNodeId = jobEdge.getTargetVertexId();
ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId,
planEdge.getPartition());
jobEdge.getPartition());
idToExecutionNode.get(srcNodeId).addExecutionEdge(executionEdge);
idToExecutionNode.get(targetNodeId).addInputEdge(executionEdge);
}
@@ -36,7 +36,7 @@ public class JobWorker implements Serializable {
}
private int taskId;
private Map<String, Object> config;
private Map<String, String> config;
private WorkerContext workerContext;
private ExecutionNode executionNode;
private ExecutionTask executionTask;
@@ -88,7 +88,7 @@ public class JobWorker implements Serializable {
return taskId;
}
public Map<String, Object> getConfig() {
public Map<String, String> getConfig() {
return config;
}
@@ -3,7 +3,6 @@ package org.ray.streaming.runtime.worker.context;
import static org.ray.streaming.util.Config.STREAMING_BATCH_MAX_COUNT;
import java.util.Map;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
@@ -16,16 +15,16 @@ public class RayRuntimeContext implements RuntimeContext {
private int parallelism;
private Long batchId;
private final Long maxBatch;
private Map<String, Object> config;
private Map<String, String> config;
public RayRuntimeContext(ExecutionTask executionTask, Map<String, Object> config,
public RayRuntimeContext(ExecutionTask executionTask, Map<String, String> config,
int parallelism) {
this.taskId = executionTask.getTaskId();
this.config = config;
this.taskIndex = executionTask.getTaskIndex();
this.parallelism = parallelism;
if (config.containsKey(STREAMING_BATCH_MAX_COUNT)) {
this.maxBatch = Long.valueOf(String.valueOf(config.get(STREAMING_BATCH_MAX_COUNT)));
this.maxBatch = Long.valueOf(config.get(STREAMING_BATCH_MAX_COUNT));
} else {
this.maxBatch = Long.MAX_VALUE;
}
@@ -11,9 +11,9 @@ public class WorkerContext implements Serializable {
private int taskId;
private ExecutionGraph executionGraph;
private Map<String, Object> config;
private Map<String, String> config;
public WorkerContext(int taskId, ExecutionGraph executionGraph, Map<String, Object> jobConfig) {
public WorkerContext(int taskId, ExecutionGraph executionGraph, Map<String, String> jobConfig) {
this.taskId = taskId;
this.executionGraph = executionGraph;
this.config = jobConfig;
@@ -35,7 +35,7 @@ public class WorkerContext implements Serializable {
this.executionGraph = executionGraph;
}
public Map<String, Object> getConfig() {
public Map<String, String> getConfig() {
return config;
}
}
@@ -30,8 +30,8 @@ public class WordCountTest extends BaseUnitTest implements Serializable {
@Test
public void testWordCount() {
StreamingContext streamingContext = StreamingContext.buildContext();
Map<String, Object> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, 1);
Map<String, String> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, "1");
config.put(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL);
streamingContext.withConfig(config);
List<String> text = new ArrayList<>();
@@ -50,7 +50,7 @@ public class WordCountTest extends BaseUnitTest implements Serializable {
.sink((SinkFunction<WordAndCount>)
result -> wordCount.put(result.word, result.count));
streamingContext.execute();
streamingContext.execute("testWordCount");
// Sleep until the count for every word is computed.
while (wordCount.size() < 3) {
@@ -19,8 +19,8 @@ import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanBuilder;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobGraphBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -32,15 +32,15 @@ public class TaskAssignerImplTest extends BaseUnitTest {
@Test
public void testTaskAssignImpl() {
Plan plan = buildDataSyncPlan();
JobGraph jobGraph = buildDataSyncPlan();
List<RayActor<JobWorker>> workers = new ArrayList<>();
for(int i = 0; i < plan.getPlanVertexList().size(); i++) {
for(int i = 0; i < jobGraph.getJobVertexList().size(); i++) {
workers.add(new LocalModeRayActor(ActorId.fromRandom(), ObjectId.fromRandom()));
}
TaskAssigner taskAssigner = new TaskAssignerImpl();
ExecutionGraph executionGraph = taskAssigner.assign(plan, workers);
ExecutionGraph executionGraph = taskAssigner.assign(jobGraph, workers);
List<ExecutionNode> executionNodeList = executionGraph.getExecutionNodeList();
@@ -63,14 +63,14 @@ public class TaskAssignerImplTest extends BaseUnitTest {
Assert.assertEquals(sinkNode.getOutputEdges().size(), 0);
}
public Plan buildDataSyncPlan() {
public JobGraph buildDataSyncPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = DataStreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
JobGraph jobGraph = jobGraphBuilder.build();
return jobGraph;
}
}
@@ -151,8 +151,8 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
Map<String, Integer> wordCount = new ConcurrentHashMap<>();
StreamingContext streamingContext = StreamingContext.buildContext();
Map<String, Object> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, 1);
Map<String, String> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, "1");
config.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL);
config.put(Config.CHANNEL_SIZE, "100000");
streamingContext.withConfig(config);
@@ -177,7 +177,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
serializeResultToFile(resultFile, wordCount);
});
streamingContext.execute();
streamingContext.execute("testWordCount");
Map<String, Integer> checkWordCount =
(Map<String, Integer>) deserializeResultFromFile(resultFile);