[Java] Format ray java code (#13056)

This commit is contained in:
chaokunyang
2020-12-29 10:36:16 +08:00
committed by GitHub
parent cc1c2c3dc9
commit d1dd3410c8
422 changed files with 4384 additions and 5035 deletions
+28 -1
View File
@@ -198,7 +198,14 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<version>3.1.0</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.19</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>validate</id>
@@ -220,6 +227,22 @@
<linkXRef>false</linkXRef>
</configuration>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.5.0</version>
<configuration>
<java>
<excludes>
<exclude>**/runtime/generated/**/*.*</exclude>
</excludes>
<googleJavaFormat>
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
</java>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
@@ -251,6 +274,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
@@ -9,5 +9,4 @@ package io.ray.streaming.api.collector;
public interface Collector<T> {
void collect(T value);
}
@@ -9,9 +9,7 @@ import io.ray.streaming.state.keystate.state.MapState;
import io.ray.streaming.state.keystate.state.ValueState;
import java.util.Map;
/**
* Encapsulate the runtime information of a streaming task.
*/
/** Encapsulate the runtime information of a streaming task. */
public interface RuntimeContext {
int getTaskId();
@@ -20,14 +18,10 @@ public interface RuntimeContext {
int getParallelism();
/**
* @return config of current function
*/
/** Returns config of current function */
Map<String, String> getConfig();
/**
* @return config of the job
*/
/** Returns config of the job */
Map<String, String> getJobConfig();
Long getCheckpointId();
@@ -19,28 +19,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Encapsulate the context information of a streaming Job.
*/
/** Encapsulate the context information of a streaming Job. */
public class StreamingContext implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class);
private transient AtomicInteger idGenerator;
/**
* The sinks of this streaming job.
*/
/** The sinks of this streaming job. */
private List<StreamSink> streamSinks;
/**
* The user custom streaming job configuration.
*/
/** The user custom streaming job configuration. */
private Map<String, String> jobConfig;
/**
* The logic plan.
*/
/** The logic plan. */
private JobGraph jobGraph;
private StreamingContext() {
@@ -53,9 +45,7 @@ public class StreamingContext implements Serializable {
return new StreamingContext();
}
/**
* Construct job DAG, and execute the job.
*/
/** Construct job DAG, and execute the job. */
public void execute(String jobName) {
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName);
JobGraph originalJobGraph = jobGraphBuilder.build();
@@ -78,8 +68,8 @@ public class StreamingContext implements Serializable {
ServiceLoader<JobClient> serviceLoader = ServiceLoader.load(JobClient.class);
Iterator<JobClient> iterator = serviceLoader.iterator();
Preconditions.checkArgument(iterator.hasNext(),
"No JobClient implementation has been provided.");
Preconditions.checkArgument(
iterator.hasNext(), "No JobClient implementation has been provided.");
JobClient jobClient = iterator.next();
jobClient.submit(jobGraph, jobConfig);
}
@@ -2,31 +2,27 @@ package io.ray.streaming.api.function;
import java.io.Serializable;
/**
* Interface of streaming functions.
*/
/** Interface of streaming functions. */
public interface Function extends Serializable {
/**
* This method will be called periodically by framework, you should return a a serializable
* object which represents function state, framework will help you to serialize this object, save
* it to storage, and load it back when in fail-over through.
* {@link Function#loadCheckpoint(Serializable)}.
* This method will be called periodically by framework, you should return a a serializable object
* which represents function state, framework will help you to serialize this object, save it to
* storage, and load it back when in fail-over through. {@link
* Function#loadCheckpoint(Serializable)}.
*
* @return A serializable object which represents function state.
* <p>Returns A serializable object which represents function state.
*/
default Serializable saveCheckpoint() {
return null;
}
/**
* This method will be called by framework when a worker died and been restarted.
* We will pass the last object you returned in {@link Function#saveCheckpoint()} when
* doing checkpoint, you are responsible to load this object back to you function.
* This method will be called by framework when a worker died and been restarted. We will pass the
* last object you returned in {@link Function#saveCheckpoint()} when doing checkpoint, you are
* responsible to load this object back to you function.
*
* @param checkpointObject the last object you returned in {@link Function#saveCheckpoint()}
*/
default void loadCheckpoint(Serializable checkpointObject) {
}
default void loadCheckpoint(Serializable checkpointObject) {}
}
@@ -20,5 +20,4 @@ public interface RichFunction extends Function {
* Tear-down method for the user function which called after the last call to the user function.
*/
void close();
}
@@ -14,8 +14,8 @@ public interface FilterFunction<T> extends Function {
/**
* The filter function that evaluates the predicate.
*
* @param value The value to be filtered.
* @return True for values that should be retained, false for values to be filtered out.
* @param value The value to be filtered. Returns True for values that should be retained, false
* for values to be filtered out.
*/
boolean filter(T value) throws Exception;
}
@@ -13,5 +13,4 @@ import io.ray.streaming.api.function.Function;
public interface JoinFunction<T, O, R> extends Function {
R join(T left, O right);
}
@@ -18,6 +18,5 @@ public interface SourceFunction<T> extends Function {
interface SourceContext<T> {
void collect(T element) throws Exception;
}
}
@@ -18,8 +18,7 @@ public class CollectionSourceFunction<T> implements SourceFunction<T> {
}
@Override
public void init(int totalParallel, int currentIndex) {
}
public void init(int totalParallel, int currentIndex) {}
@Override
public void fetch(SourceContext<T> ctx) throws Exception {
@@ -33,7 +32,5 @@ public class CollectionSourceFunction<T> implements SourceFunction<T> {
}
@Override
public void close() {
}
public void close() {}
}
@@ -4,9 +4,7 @@ import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.Function;
import io.ray.streaming.api.function.RichFunction;
/**
* A util class for {@link Function}
*/
/** A util class for {@link Function} */
public class Functions {
private static class DefaultRichFunction implements RichFunction {
@@ -18,12 +16,10 @@ public class Functions {
}
@Override
public void open(RuntimeContext runtimeContext) {
}
public void open(RuntimeContext runtimeContext) {}
@Override
public void close() {
}
public void close() {}
public Function getFunction() {
return function;
@@ -41,5 +37,4 @@ public class Functions {
public static RichFunction emptyFunction() {
return new DefaultRichFunction(null);
}
}
@@ -15,9 +15,8 @@ public interface Partition<T> extends Function {
* record.
*
* @param record The record.
* @param numPartition num of partitions
* @return IDs of the downstream partitions that should receive the record.
* @param numPartition num of partitions Returns IDs of the downstream partitions that should
* receive the record.
*/
int[] partition(T record, int numPartition);
}
@@ -3,15 +3,12 @@ package io.ray.streaming.api.partition.impl;
import io.ray.streaming.api.partition.Partition;
import java.util.stream.IntStream;
/**
* Broadcast the record to all downstream partitions.
*/
/** Broadcast the record to all downstream partitions. */
public class BroadcastPartition<T> implements Partition<T> {
private int[] partitions = new int[0];
public BroadcastPartition() {
}
public BroadcastPartition() {}
@Override
public int[] partition(T value, int numPartition) {
@@ -20,5 +17,4 @@ public class BroadcastPartition<T> implements Partition<T> {
}
return partitions;
}
}
@@ -22,6 +22,7 @@ import java.util.List;
/**
* Represents a stream of data.
*
* <p>This class defines all the streaming operations.
*
* @param <T> Type of data in the stream.
@@ -33,9 +34,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
public DataStream(
StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
StreamingContext streamingContext, StreamOperator streamOperator, Partition<T> partition) {
super(streamingContext, streamOperator, partition);
}
@@ -44,9 +43,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
public <R> DataStream(
DataStream<R> input,
StreamOperator streamOperator,
Partition<T> partition) {
DataStream<R> input, StreamOperator streamOperator, Partition<T> partition) {
super(input, streamOperator, partition);
}
@@ -62,8 +59,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a map function to this stream.
*
* @param mapFunction The map function.
* @param <R> Type of data returned by the map function.
* @return A new DataStream.
* @param <R> Type of data returned by the map function. Returns A new DataStream.
*/
public <R> DataStream<R> map(MapFunction<T, R> mapFunction) {
return new DataStream<>(this, new MapOperator<>(mapFunction));
@@ -73,8 +69,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a flat-map function to this stream.
*
* @param flatMapFunction The FlatMapFunction
* @param <R> Type of data returned by the flatmap function.
* @return A new DataStream
* @param <R> Type of data returned by the flatmap function. Returns A new DataStream
*/
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapFunction) {
return new DataStream<>(this, new FlatMapOperator<>(flatMapFunction));
@@ -89,8 +84,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with.
* @return A new UnionStream.
* @param others The other DataStreams to union output with. Returns A new UnionStream.
*/
@SafeVarargs
public final DataStream<T> union(DataStream<T> stream, DataStream<T>... others) {
@@ -104,8 +98,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply union transformations to this stream by merging {@link DataStream} outputs of the same
* type with each other.
*
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
* @param streams The DataStreams to union output with. Returns A new UnionStream.
*/
public final DataStream<T> union(List<DataStream<T>> streams) {
if (this instanceof UnionStream) {
@@ -122,8 +115,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
*
* @param other Another stream.
* @param <O> The type of the other stream data.
* @param <R> The type of the data in the joined stream.
* @return A new JoinStream.
* @param <R> The type of the data in the joined stream. Returns A new JoinStream.
*/
public <O, R> JoinStream<T, O, R> join(DataStream<O> other) {
return new JoinStream<>(this, other);
@@ -137,8 +129,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply a sink function and get a StreamSink.
*
* @param sinkFunction The sink function.
* @return A new StreamSink.
* @param sinkFunction The sink function. Returns A new StreamSink.
*/
public DataStreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new DataStreamSink<>(this, new SinkOperator<>(sinkFunction));
@@ -148,8 +139,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a key-by function to this stream.
*
* @param keyFunction the key function.
* @param <K> The type of the key.
* @return A new KeyDataStream.
* @param <K> The type of the key. Returns A new KeyDataStream.
*/
public <K> KeyDataStream<K, T> keyBy(KeyFunction<T, K> keyFunction) {
checkPartitionCall();
@@ -159,7 +149,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply broadcast to this stream.
*
* @return This stream.
* <p>Returns This stream.
*/
public DataStream<T> broadcast() {
checkPartitionCall();
@@ -169,8 +159,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy.
* @return This stream.
* @param partition The partitioning strategy. Returns This stream.
*/
public DataStream<T> partitionBy(Partition<T> partition) {
checkPartitionCall();
@@ -183,8 +172,9 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
*/
private void checkPartitionCall() {
if (getInputStream() != null && getInputStream().getLanguage() == Language.PYTHON) {
throw new RuntimeException("Partition related methods can't be called on a " +
"java stream if parent stream is a python stream.");
throw new RuntimeException(
"Partition related methods can't be called on a "
+ "java stream if parent stream is a python stream.");
}
}
@@ -27,12 +27,10 @@ public class DataStreamSource<T> extends DataStream<T> implements StreamSource<T
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data.
* @return A DataStreamSource.
* @param <T> The type of source data. Returns A DataStreamSource.
*/
public static <T> DataStreamSource<T> fromCollection(
StreamingContext context, Collection<T> values) {
return new DataStreamSource<>(context, new CollectionSourceFunction<>(values));
}
}
@@ -25,9 +25,7 @@ public class JoinStream<L, R, O> extends DataStream<L> {
return rightStream;
}
/**
* Apply key-by to the left join stream.
*/
/** Apply key-by to the left join stream. */
public <K> Where<K> where(KeyFunction<L, K> keyFunction) {
return new Where<>(this, keyFunction);
}
@@ -64,7 +62,8 @@ public class JoinStream<L, R, O> extends DataStream<L> {
private KeyFunction<R, K> rightKeyByFunction;
Equal(
JoinStream<L, R, O> joinStream, KeyFunction<L, K> leftKeyByFunction,
JoinStream<L, R, O> joinStream,
KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
@@ -78,5 +77,4 @@ public class JoinStream<L, R, O> extends DataStream<L> {
return (DataStream<O>) joinStream;
}
}
}
@@ -33,8 +33,7 @@ public class KeyDataStream<K, T> extends DataStream<T> {
/**
* Apply a reduce function to this stream.
*
* @param reduceFunction The reduce function.
* @return A new DataStream.
* @param reduceFunction The reduce function. Returns A new DataStream.
*/
public DataStream<T> reduce(ReduceFunction reduceFunction) {
return new DataStream<>(this, new ReduceOperator(reduceFunction));
@@ -45,8 +44,7 @@ public class KeyDataStream<K, T> extends DataStream<T> {
*
* @param aggregateFunction The aggregate function
* @param <A> The type of aggregated intermediate data.
* @param <O> The type of result data.
* @return A new DataStream.
* @param <O> The type of result data. Returns A new DataStream.
*/
public <A, O> DataStream<O> aggregate(AggregateFunction<T, A, O> aggregateFunction) {
return new DataStream<>(this, null);
@@ -60,5 +58,4 @@ public class KeyDataStream<K, T> extends DataStream<T> {
public PythonKeyDataStream asPythonStream() {
return new PythonKeyDataStream(this);
}
}
@@ -19,8 +19,7 @@ import java.util.Map;
* @param <S> Type of stream class
* @param <T> Type of the data in the stream.
*/
public abstract class Stream<S extends Stream<S, T>, T>
implements Serializable {
public abstract class Stream<S extends Stream<S, T>, T> implements Serializable {
private final int id;
private final StreamingContext streamingContext;
@@ -36,14 +35,15 @@ public abstract class Stream<S extends Stream<S, T>, T>
}
public Stream(
StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
StreamingContext streamingContext, StreamOperator streamOperator, Partition<T> partition) {
this(streamingContext, null, streamOperator, partition);
}
public Stream(Stream inputStream, StreamOperator streamOperator) {
this(inputStream.getStreamingContext(), inputStream, streamOperator,
this(
inputStream.getStreamingContext(),
inputStream,
streamOperator,
getForwardPartition(streamOperator));
}
@@ -87,8 +87,7 @@ public abstract class Stream<S extends Stream<S, T>, T>
case JAVA:
return new ForwardPartition<>();
default:
throw new UnsupportedOperationException(
"Unsupported language " + operator.getLanguage());
throw new UnsupportedOperationException("Unsupported language " + operator.getLanguage());
}
}
@@ -169,18 +168,14 @@ public abstract class Stream<S extends Stream<S, T>, T>
return originalStream;
}
/**
* Set chain strategy for this stream
*/
/** Set chain strategy for this stream */
public S withChainStrategy(ChainStrategy chainStrategy) {
Preconditions.checkArgument(!isProxyStream());
operator.setChainStrategy(chainStrategy);
return self();
}
/**
* Disable chain for this stream
*/
/** Disable chain for this stream */
public S disableChain() {
return withChainStrategy(ChainStrategy.NEVER);
}
@@ -5,6 +5,4 @@ package io.ray.streaming.api.stream;
*
* @param <T> The type of StreamSource data.
*/
public interface StreamSource<T> {
}
public interface StreamSource<T> {}
@@ -6,6 +6,7 @@ import java.util.List;
/**
* Represents a union DataStream.
*
* <p>This stream does not create a physical operation, it only affects how upstream data are
* connected to downstream data.
*
@@ -3,9 +3,7 @@ package io.ray.streaming.client;
import io.ray.streaming.jobgraph.JobGraph;
import java.util.Map;
/**
* Interface of the job client.
*/
/** Interface of the job client. */
public interface JobClient {
/**
@@ -3,9 +3,7 @@ package io.ray.streaming.jobgraph;
import io.ray.streaming.api.partition.Partition;
import java.io.Serializable;
/**
* Job edge is connection and partition rules of upstream and downstream execution nodes.
*/
/** Job edge is connection and partition rules of upstream and downstream execution nodes. */
public class JobEdge implements Serializable {
private int srcVertexId;
@@ -44,7 +42,13 @@ public class JobEdge implements Serializable {
@Override
public String toString() {
return "Edge(" + "from:" + srcVertexId + "-" + targetVertexId + "-" + this.partition.getClass()
return "Edge("
+ "from:"
+ srcVertexId
+ "-"
+ targetVertexId
+ "-"
+ this.partition.getClass()
+ ")";
}
}
@@ -9,9 +9,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Job graph, the logical plan of streaming job.
*/
/** Job graph, the logical plan of streaming job. */
public class JobGraph implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(JobGraph.class);
@@ -30,8 +28,10 @@ public class JobGraph implements Serializable {
}
public JobGraph(
String jobName, Map<String, String> jobConfig,
List<JobVertex> jobVertices, List<JobEdge> jobEdges) {
String jobName,
Map<String, String> jobConfig,
List<JobVertex> jobVertices,
List<JobEdge> jobEdges) {
this.jobName = jobName;
this.jobConfig = jobConfig;
this.jobVertices = jobVertices;
@@ -43,7 +43,7 @@ public class JobGraph implements Serializable {
* Generate direct-graph(made up of a set of vertices and connected by edges) by current job graph
* for simple log printing.
*
* @return Digraph in string type.
* <p>Returns Digraph in string type.
*/
public String generateDigraph() {
StringBuilder digraph = new StringBuilder();
@@ -136,5 +136,4 @@ public class JobGraph implements Serializable {
LOG.info(jobEdge.toString());
}
}
}
@@ -36,8 +36,7 @@ public class JobGraphBuilder {
}
public JobGraphBuilder(
List<StreamSink> streamSinkList, String jobName,
Map<String, String> jobConfig) {
List<StreamSink> streamSinkList, String jobName, Map<String, String> jobConfig) {
this.jobGraph = new JobGraph(jobName, jobConfig);
this.streamSinkList = streamSinkList;
this.edgeIdGenerator = new AtomicInteger(0);
@@ -60,7 +59,8 @@ public class JobGraphBuilder {
stream = stream.getOriginalStream();
}
StreamOperator streamOperator = stream.getOperator();
Preconditions.checkArgument(stream.getLanguage() == streamOperator.getLanguage(),
Preconditions.checkArgument(
stream.getLanguage() == streamOperator.getLanguage(),
"Reference stream should be skipped.");
int vertexId = stream.getId();
int parallelism = stream.getParallelism();
@@ -76,8 +76,8 @@ public class JobGraphBuilder {
} else if (stream instanceof StreamSource) {
jobVertex = new JobVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator, config);
} else if (stream instanceof DataStream || stream instanceof PythonDataStream) {
jobVertex = new JobVertex(
vertexId, parallelism, VertexType.TRANSFORMATION, streamOperator, config);
jobVertex =
new JobVertex(vertexId, parallelism, VertexType.TRANSFORMATION, streamOperator, config);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
JobEdge jobEdge = new JobEdge(inputVertexId, vertexId, parentStream.getPartition());
@@ -114,5 +114,4 @@ public class JobGraphBuilder {
private int getEdgeId() {
return this.edgeIdGenerator.incrementAndGet();
}
}
@@ -36,24 +36,32 @@ public class JobGraphOptimizer {
public JobGraphOptimizer(JobGraph jobGraph) {
this.jobGraph = jobGraph;
vertexMap = jobGraph.getJobVertices().stream()
.collect(Collectors.toMap(JobVertex::getVertexId, Function.identity()));
outputEdgesMap = vertexMap.keySet().stream().collect(Collectors.toMap(
id -> vertexMap.get(id), id -> new HashSet<>(jobGraph.getVertexOutputEdges(id))));
vertexMap =
jobGraph.getJobVertices().stream()
.collect(Collectors.toMap(JobVertex::getVertexId, Function.identity()));
outputEdgesMap =
vertexMap.keySet().stream()
.collect(
Collectors.toMap(
id -> vertexMap.get(id),
id -> new HashSet<>(jobGraph.getVertexOutputEdges(id))));
mergedVertexMap = new HashMap<>();
}
public JobGraph optimize() {
// Deep-first traverse nodes from source to sink to merge vertices that can be chained
// together.
jobGraph.getSourceVertices().forEach(vertex -> {
List<JobVertex> verticesToMerge = new ArrayList<>();
verticesToMerge.add(vertex);
mergeVerticesRecursively(vertex, verticesToMerge);
});
jobGraph
.getSourceVertices()
.forEach(
vertex -> {
List<JobVertex> verticesToMerge = new ArrayList<>();
verticesToMerge.add(vertex);
mergeVerticesRecursively(vertex, verticesToMerge);
});
List<JobVertex> vertices = mergedVertexMap.values().stream()
.map(Pair::getLeft).collect(Collectors.toList());
List<JobVertex> vertices =
mergedVertexMap.values().stream().map(Pair::getLeft).collect(Collectors.toList());
return new JobGraph(jobGraph.getJobName(), jobGraph.getJobConfig(), vertices, createEdges());
}
@@ -65,18 +73,19 @@ public class JobGraphOptimizer {
if (outputEdges.isEmpty()) {
mergeAndAddVertex(verticesToMerge);
} else {
outputEdges.forEach(edge -> {
JobVertex succeedingVertex = vertexMap.get(edge.getTargetVertexId());
if (canBeChained(vertex, succeedingVertex, edge)) {
verticesToMerge.add(succeedingVertex);
mergeVerticesRecursively(succeedingVertex, verticesToMerge);
} else {
mergeAndAddVertex(verticesToMerge);
List<JobVertex> newMergedVertices = new ArrayList<>();
newMergedVertices.add(succeedingVertex);
mergeVerticesRecursively(succeedingVertex, newMergedVertices);
}
});
outputEdges.forEach(
edge -> {
JobVertex succeedingVertex = vertexMap.get(edge.getTargetVertexId());
if (canBeChained(vertex, succeedingVertex, edge)) {
verticesToMerge.add(succeedingVertex);
mergeVerticesRecursively(succeedingVertex, verticesToMerge);
} else {
mergeAndAddVertex(verticesToMerge);
List<JobVertex> newMergedVertices = new ArrayList<>();
newMergedVertices.add(succeedingVertex);
mergeVerticesRecursively(succeedingVertex, newMergedVertices);
}
});
}
}
}
@@ -89,25 +98,30 @@ public class JobGraphOptimizer {
// no chain
mergedVertex = headVertex;
} else {
List<StreamOperator> operators = verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId())
.getStreamOperator())
.collect(Collectors.toList());
List<Map<String, String>> configs = verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId()).getConfig())
.collect(Collectors.toList());
List<StreamOperator> operators =
verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId()).getStreamOperator())
.collect(Collectors.toList());
List<Map<String, String>> configs =
verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId()).getConfig())
.collect(Collectors.toList());
StreamOperator operator;
if (language == Language.JAVA) {
operator = ChainedOperator.newChainedOperator(operators, configs);
} else {
List<PythonOperator> pythonOperators = operators.stream()
.map(o -> (PythonOperator) o)
.collect(Collectors.toList());
List<PythonOperator> pythonOperators =
operators.stream().map(o -> (PythonOperator) o).collect(Collectors.toList());
operator = new ChainedPythonOperator(pythonOperators, configs);
}
// chained operator config is placed into `ChainedOperator`.
mergedVertex = new JobVertex(headVertex.getVertexId(), headVertex.getParallelism(),
headVertex.getVertexType(), operator, new HashMap<>());
mergedVertex =
new JobVertex(
headVertex.getVertexId(),
headVertex.getParallelism(),
headVertex.getVertexType(),
operator,
new HashMap<>());
}
mergedVertexMap.put(mergedVertex.getVertexId(), Pair.of(mergedVertex, verticesToMerge));
@@ -115,37 +129,39 @@ public class JobGraphOptimizer {
private List<JobEdge> createEdges() {
List<JobEdge> edges = new ArrayList<>();
mergedVertexMap.forEach((id, pair) -> {
JobVertex mergedVertex = pair.getLeft();
List<JobVertex> mergedVertices = pair.getRight();
JobVertex tailVertex = mergedVertices.get(mergedVertices.size() - 1);
// input edge will be set up in input vertices
if (outputEdgesMap.containsKey(tailVertex)) {
outputEdgesMap.get(tailVertex).forEach(edge -> {
Pair<JobVertex, List<JobVertex>> downstreamPair =
mergedVertexMap.get(edge.getTargetVertexId());
// change ForwardPartition to RoundRobinPartition.
Partition partition = changePartition(edge.getPartition());
JobEdge newEdge = new JobEdge(
mergedVertex.getVertexId(),
downstreamPair.getLeft().getVertexId(),
partition);
edges.add(newEdge);
mergedVertexMap.forEach(
(id, pair) -> {
JobVertex mergedVertex = pair.getLeft();
List<JobVertex> mergedVertices = pair.getRight();
JobVertex tailVertex = mergedVertices.get(mergedVertices.size() - 1);
// input edge will be set up in input vertices
if (outputEdgesMap.containsKey(tailVertex)) {
outputEdgesMap
.get(tailVertex)
.forEach(
edge -> {
Pair<JobVertex, List<JobVertex>> downstreamPair =
mergedVertexMap.get(edge.getTargetVertexId());
// change ForwardPartition to RoundRobinPartition.
Partition partition = changePartition(edge.getPartition());
JobEdge newEdge =
new JobEdge(
mergedVertex.getVertexId(),
downstreamPair.getLeft().getVertexId(),
partition);
edges.add(newEdge);
});
}
});
}
});
return edges;
}
/**
* Change ForwardPartition to RoundRobinPartition.
*/
/** Change ForwardPartition to RoundRobinPartition. */
private Partition changePartition(Partition partition) {
if (partition instanceof PythonPartition) {
PythonPartition pythonPartition = (PythonPartition) partition;
if (!pythonPartition.isConstructedFromBinary() &&
pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS)) {
if (!pythonPartition.isConstructedFromBinary()
&& pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS)) {
return PythonPartition.RoundRobinPartition;
} else {
return partition;
@@ -160,11 +176,9 @@ public class JobGraphOptimizer {
}
private boolean canBeChained(
JobVertex precedingVertex,
JobVertex succeedingVertex,
JobEdge edge) {
if (jobGraph.getVertexOutputEdges(precedingVertex.getVertexId()).size() > 1 ||
jobGraph.getVertexInputEdges(succeedingVertex.getVertexId()).size() > 1) {
JobVertex precedingVertex, JobVertex succeedingVertex, JobEdge edge) {
if (jobGraph.getVertexOutputEdges(precedingVertex.getVertexId()).size() > 1
|| jobGraph.getVertexInputEdges(succeedingVertex.getVertexId()).size() > 1) {
return false;
}
if (precedingVertex.getParallelism() != succeedingVertex.getParallelism()) {
@@ -183,9 +197,8 @@ public class JobGraphOptimizer {
return partition instanceof ForwardPartition;
} else {
PythonPartition pythonPartition = (PythonPartition) partition;
return !pythonPartition.isConstructedFromBinary() &&
pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS);
return !pythonPartition.isConstructedFromBinary()
&& pythonPartition.getFunctionName().equals(PythonPartition.FORWARD_PARTITION_CLASS);
}
}
}
@@ -6,9 +6,7 @@ import io.ray.streaming.operator.StreamOperator;
import java.io.Serializable;
import java.util.Map;
/**
* Job vertex is a cell node where logic is executed.
*/
/** Job vertex is a cell node where logic is executed. */
public class JobVertex implements Serializable {
private int vertexId;
@@ -71,5 +69,4 @@ public class JobVertex implements Serializable {
.add("config", config)
.toString();
}
}
@@ -1,8 +1,6 @@
package io.ray.streaming.jobgraph;
/**
* Different roles for a node.
*/
/** Different roles for a node. */
public enum VertexType {
SOURCE,
TRANSFORMATION,
@@ -37,8 +37,7 @@ public class Record<T> implements Serializable {
return false;
}
Record<?> record = (Record<?>) o;
return Objects.equals(stream, record.stream) &&
Objects.equals(value, record.value);
return Objects.equals(stream, record.stream) && Objects.equals(value, record.value);
}
@Override
@@ -50,5 +49,4 @@ public class Record<T> implements Serializable {
public String toString() {
return value.toString();
}
}
@@ -1,20 +1,14 @@
package io.ray.streaming.operator;
/**
* Chain strategy for streaming operators. Chained operators are run in the same thread.
*/
/** Chain strategy for streaming operators. Chained operators are run in the same thread. */
public enum ChainStrategy {
/**
* The operator won't be chained with preceding operators, but maybe chained with succeeding
* operators.
*/
HEAD,
/**
* Operators will be chained together when possible.
*/
/** Operators will be chained together when possible. */
ALWAYS,
/**
* The operator won't be chained with any operator.
*/
/** The operator won't be chained with any operator. */
NEVER
}
@@ -25,13 +25,9 @@ public interface Operator extends Serializable {
ChainStrategy getChainStrategy();
/**
* See {@link Function#saveCheckpoint()}.
*/
/** See {@link Function#saveCheckpoint()}. */
Serializable saveCheckpoint();
/**
* See {@link Function#loadCheckpoint(Serializable)}.
*/
/** See {@link Function#loadCheckpoint(Serializable)}. */
void loadCheckpoint(Serializable checkpointObject);
}
@@ -11,4 +11,4 @@ public interface SourceOperator<T> extends Operator {
default OperatorType getOpType() {
return OperatorType.SOURCE;
}
}
}
@@ -42,9 +42,7 @@ public abstract class StreamOperator<F extends Function> implements Operator {
}
@Override
public void finish() {
}
public void finish() {}
@Override
public void close() {
@@ -20,9 +20,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Abstract base class for chained operators.
*/
/** Abstract base class for chained operators. */
public abstract class ChainedOperator extends StreamOperator<Function> {
protected final List<StreamOperator> operators;
@@ -31,9 +29,10 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
private final List<Map<String, String>> configs;
public ChainedOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
Preconditions.checkArgument(operators.size() >= 2,
"Need at lease two operators to be chained together");
operators.stream().skip(1)
Preconditions.checkArgument(
operators.size() >= 2, "Need at lease two operators to be chained together");
operators.stream()
.skip(1)
.forEach(operator -> Preconditions.checkArgument(operator instanceof OneInputOperator));
this.operators = operators;
this.configs = configs;
@@ -44,10 +43,11 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
// Dont' call super.open() as we `open` every operator separately.
List<ForwardCollector> succeedingCollectors = operators.stream().skip(1)
.map(operator -> new ForwardCollector(
(OneInputOperator) operator))
.collect(Collectors.toList());
List<ForwardCollector> succeedingCollectors =
operators.stream()
.skip(1)
.map(operator -> new ForwardCollector((OneInputOperator) operator))
.collect(Collectors.toList());
for (int i = 0; i < operators.size() - 1; i++) {
StreamOperator operator = operators.get(i);
List<ForwardCollector> forwardCollectors =
@@ -70,8 +70,7 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
@Override
public String getName() {
return operators.stream().map(Operator::getName)
.collect(Collectors.joining(" -> ", "[", "]"));
return operators.stream().map(Operator::getName).collect(Collectors.joining(" -> ", "[", "]"));
}
public List<StreamOperator> getOperators() {
@@ -104,20 +103,21 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
}
private RuntimeContext createRuntimeContext(RuntimeContext runtimeContext, int index) {
return (RuntimeContext) Proxy.newProxyInstance(runtimeContext.getClass().getClassLoader(),
new Class[] {RuntimeContext.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getConfig")) {
return configs.get(index);
} else {
return method.invoke(runtimeContext, methodArgs);
}
});
return (RuntimeContext)
Proxy.newProxyInstance(
runtimeContext.getClass().getClassLoader(),
new Class[] {RuntimeContext.class},
(proxy, method, methodArgs) -> {
if (method.getName().equals("getConfig")) {
return configs.get(index);
} else {
return method.invoke(runtimeContext, methodArgs);
}
});
}
public static ChainedOperator newChainedOperator(
List<StreamOperator> operators,
List<Map<String, String>> configs) {
List<StreamOperator> operators, List<Map<String, String>> configs) {
switch (operators.get(0).getOpType()) {
case SOURCE:
return new ChainedSourceOperator(operators, configs);
@@ -131,8 +131,7 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
}
}
static class ChainedSourceOperator<T> extends ChainedOperator
implements SourceOperator<T> {
static class ChainedSourceOperator<T> extends ChainedOperator implements SourceOperator<T> {
private final SourceOperator<T> sourceOperator;
@@ -151,11 +150,9 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
public SourceContext<T> getSourceContext() {
return sourceOperator.getSourceContext();
}
}
static class ChainedOneInputOperator<T> extends ChainedOperator
implements OneInputOperator<T> {
static class ChainedOneInputOperator<T> extends ChainedOperator implements OneInputOperator<T> {
private final OneInputOperator<T> inputOperator;
@@ -169,7 +166,6 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
public void processElement(Record<T> record) throws Exception {
inputOperator.processElement(record);
}
}
static class ChainedTwoInputOperator<L, R> extends ChainedOperator
@@ -187,6 +183,5 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
public void processElement(Record<L> record1, Record<R> record2) {
inputOperator.processElement(record1, record2);
}
}
}
@@ -5,8 +5,8 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
public class FilterOperator<T> extends StreamOperator<FilterFunction<T>> implements
OneInputOperator<T> {
public class FilterOperator<T> extends StreamOperator<FilterFunction<T>>
implements OneInputOperator<T> {
public FilterOperator(FilterFunction<T> filterFunction) {
super(filterFunction);
@@ -9,8 +9,8 @@ import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
import java.util.List;
public class FlatMapOperator<T, R> extends StreamOperator<FlatMapFunction<T, R>> implements
OneInputOperator<T> {
public class FlatMapOperator<T, R> extends StreamOperator<FlatMapFunction<T, R>>
implements OneInputOperator<T> {
private CollectionCollector collectionCollector;
@@ -15,12 +15,10 @@ import io.ray.streaming.operator.TwoInputOperator;
* @param <K> Type of the data in the join key.
* @param <O> Type of the data in the joined stream.
*/
public class JoinOperator<L, R, K, O> extends StreamOperator<JoinFunction<L, R, O>> implements
TwoInputOperator<L, R> {
public class JoinOperator<L, R, K, O> extends StreamOperator<JoinFunction<L, R, O>>
implements TwoInputOperator<L, R> {
public JoinOperator() {
}
public JoinOperator() {}
public JoinOperator(JoinFunction<L, R, O> function) {
super(function);
@@ -28,13 +26,10 @@ public class JoinOperator<L, R, K, O> extends StreamOperator<JoinFunction<L, R,
}
@Override
public void processElement(Record<L> record1, Record<R> record2) {
}
public void processElement(Record<L> record1, Record<R> record2) {}
@Override
public OperatorType getOpType() {
return OperatorType.TWO_INPUT;
}
}
@@ -6,8 +6,8 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
public class KeyByOperator<T, K> extends StreamOperator<KeyFunction<T, K>> implements
OneInputOperator<T> {
public class KeyByOperator<T, K> extends StreamOperator<KeyFunction<T, K>>
implements OneInputOperator<T> {
public KeyByOperator(KeyFunction<T, K> keyFunction) {
super(keyFunction);
@@ -19,4 +19,3 @@ public class KeyByOperator<T, K> extends StreamOperator<KeyFunction<T, K>> imple
collect(new KeyRecord<>(key, record.getValue()));
}
}
@@ -5,8 +5,8 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
public class MapOperator<T, R> extends StreamOperator<MapFunction<T, R>> implements
OneInputOperator<T> {
public class MapOperator<T, R> extends StreamOperator<MapFunction<T, R>>
implements OneInputOperator<T> {
public MapOperator(MapFunction<T, R> mapFunction) {
super(mapFunction);
@@ -12,8 +12,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReduceOperator<K, T> extends StreamOperator<ReduceFunction<T>> implements
OneInputOperator<T> {
public class ReduceOperator<K, T> extends StreamOperator<ReduceFunction<T>>
implements OneInputOperator<T> {
private Map<K, T> reduceState;
@@ -43,5 +43,4 @@ public class ReduceOperator<K, T> extends StreamOperator<ReduceFunction<T>> impl
collect(record);
}
}
}
@@ -5,8 +5,8 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
public class SinkOperator<T> extends StreamOperator<SinkFunction<T>> implements
OneInputOperator<T> {
public class SinkOperator<T> extends StreamOperator<SinkFunction<T>>
implements OneInputOperator<T> {
public SinkOperator(SinkFunction<T> sinkFunction) {
super(sinkFunction);
@@ -61,7 +61,5 @@ public class SourceOperatorImpl<T> extends StreamOperator<SourceFunction<T>>
collector.collect(new Record<>(t));
}
}
}
}
@@ -6,8 +6,7 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.StreamOperator;
public class UnionOperator<T> extends StreamOperator<Function> implements
OneInputOperator<T> {
public class UnionOperator<T> extends StreamOperator<Function> implements OneInputOperator<T> {
public UnionOperator() {
super(Functions.emptyFunction());
@@ -17,5 +16,4 @@ public class UnionOperator<T> extends StreamOperator<Function> implements
public void processElement(Record<T> record) {
collect(record);
}
}
@@ -7,14 +7,18 @@ import org.apache.commons.lang3.StringUtils;
/**
* Represents a user defined python function.
* <p>Python worker can use information in this class to create a function object.</p>
* <p>If this object is constructed from serialized python function,
* python worker can deserialize it to create python function directly. If this object is
* constructed from moduleName and className/functionName, python worker will use `importlib` to
* load python function.</p>
* <p>If the python data stream api is invoked from python, `function` will be not null.</p>
* <p>If the python data stream api is invoked from java, `moduleName` and
* `functionName` will be not null.</p>
*
* <p>Python worker can use information in this class to create a function object.
*
* <p>If this object is constructed from serialized python function, python worker can deserialize
* it to create python function directly. If this object is constructed from moduleName and
* className/functionName, python worker will use `importlib` to load python function.
*
* <p>If the python data stream api is invoked from python, `function` will be not null.
*
* <p>If the python data stream api is invoked from java, `moduleName` and `functionName` will be
* not null.
*
* <p>
*/
public class PythonFunction implements Function {
@@ -30,9 +34,7 @@ public class PythonFunction implements Function {
private String functionInterface;
/**
* @param functionInterface function class name in `ray.streaming.function` module.
*/
/** @param functionInterface function class name in `ray.streaming.function` module. */
FunctionInterface(String functionInterface) {
this.functionInterface = functionInterface;
}
@@ -66,13 +68,10 @@ public class PythonFunction implements Function {
* Create a {@link PythonFunction} from a moduleName and streaming function name.
*
* @param moduleName module name of streaming function.
* @param functionName function name of streaming function. {@code functionName} is the name
* of a
* @param functionName function name of streaming function. {@code functionName} is the name of a
* python function, or class name of subclass of `ray.streaming.function.`
*/
public PythonFunction(
String moduleName,
String functionName) {
public PythonFunction(String moduleName, String functionName) {
Preconditions.checkArgument(StringUtils.isNotBlank(moduleName));
Preconditions.checkArgument(StringUtils.isNotBlank(functionName));
this.function = null;
@@ -110,12 +109,13 @@ public class PythonFunction implements Function {
@Override
public String toString() {
StringJoiner stringJoiner = new StringJoiner(", ",
PythonFunction.class.getSimpleName() + "[", "]");
StringJoiner stringJoiner =
new StringJoiner(", ", PythonFunction.class.getSimpleName() + "[", "]");
if (function != null) {
stringJoiner.add("function=binary function");
} else {
stringJoiner.add("moduleName='" + moduleName + "'")
stringJoiner
.add("moduleName='" + moduleName + "'")
.add("functionName='" + functionName + "'");
}
stringJoiner.add("functionInterface='" + functionInterface + "'");
@@ -12,9 +12,7 @@ import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
/**
* Represents a {@link StreamOperator} that wraps python {@link PythonFunction}.
*/
/** Represents a {@link StreamOperator} that wraps python {@link PythonFunction}. */
@SuppressWarnings("unchecked")
public class PythonOperator extends StreamOperator {
@@ -65,8 +63,10 @@ public class PythonOperator extends StreamOperator {
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
Preconditions.checkState(trace.length >= 2);
StackTraceElement traceElement = trace[2];
String msg = String.format("Method %s.%s shouldn't be called.",
traceElement.getClassName(), traceElement.getMethodName());
String msg =
String.format(
"Method %s.%s shouldn't be called.",
traceElement.getClassName(), traceElement.getMethodName());
throw new UnsupportedOperationException(msg);
}
@@ -90,13 +90,12 @@ public class PythonOperator extends StreamOperator {
@Override
public String toString() {
StringJoiner stringJoiner = new StringJoiner(", ",
PythonOperator.class.getSimpleName() + "[", "]");
StringJoiner stringJoiner =
new StringJoiner(", ", PythonOperator.class.getSimpleName() + "[", "]");
if (function != null) {
stringJoiner.add("function='" + function + "'");
} else {
stringJoiner.add("moduleName='" + moduleName + "'")
.add("className='" + className + "'");
stringJoiner.add("moduleName='" + moduleName + "'").add("className='" + className + "'");
}
return stringJoiner.toString();
}
@@ -130,7 +129,8 @@ public class PythonOperator extends StreamOperator {
@Override
public String getName() {
return operators.stream().map(Operator::getName)
return operators.stream()
.map(Operator::getName)
.collect(Collectors.joining(" -> ", "[", "]"));
}
@@ -7,25 +7,26 @@ import org.apache.commons.lang3.StringUtils;
/**
* Represents a python partition function.
* <p>
* Python worker can create a partition object using information in this PythonPartition.
* <p>
* If this object is constructed from serialized python partition, python worker can deserialize it
* to create python partition directly. If this object is constructed from moduleName and
*
* <p>Python worker can create a partition object using information in this PythonPartition.
*
* <p>If this object is constructed from serialized python partition, python worker can deserialize
* it to create python partition directly. If this object is constructed from moduleName and
* className/functionName, python worker will use `importlib` to load python partition function.
*
* <p>
*/
public class PythonPartition implements Partition<Object> {
public static final PythonPartition BroadcastPartition = new PythonPartition(
"ray.streaming.partition", "BroadcastPartition");
public static final PythonPartition KeyPartition = new PythonPartition(
"ray.streaming.partition", "KeyPartition");
public static final PythonPartition RoundRobinPartition = new PythonPartition(
"ray.streaming.partition", "RoundRobinPartition");
public static final PythonPartition BroadcastPartition =
new PythonPartition("ray.streaming.partition", "BroadcastPartition");
public static final PythonPartition KeyPartition =
new PythonPartition("ray.streaming.partition", "KeyPartition");
public static final PythonPartition RoundRobinPartition =
new PythonPartition("ray.streaming.partition", "RoundRobinPartition");
public static final String FORWARD_PARTITION_CLASS = "ForwardPartition";
public static final PythonPartition ForwardPartition = new PythonPartition(
"ray.streaming.partition", FORWARD_PARTITION_CLASS);
public static final PythonPartition ForwardPartition =
new PythonPartition("ray.streaming.partition", FORWARD_PARTITION_CLASS);
private byte[] partition;
private String moduleName;
@@ -51,8 +52,8 @@ public class PythonPartition implements Partition<Object> {
@Override
public int[] partition(Object record, int numPartition) {
String msg = String.format("partition method of %s shouldn't be called.",
getClass().getSimpleName());
String msg =
String.format("partition method of %s shouldn't be called.", getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
@@ -74,15 +75,15 @@ public class PythonPartition implements Partition<Object> {
@Override
public String toString() {
StringJoiner stringJoiner = new StringJoiner(", ",
PythonPartition.class.getSimpleName() + "[", "]");
StringJoiner stringJoiner =
new StringJoiner(", ", PythonPartition.class.getSimpleName() + "[", "]");
if (partition != null) {
stringJoiner.add("partition=binary partition");
} else {
stringJoiner.add("moduleName='" + moduleName + "'")
stringJoiner
.add("moduleName='" + moduleName + "'")
.add("functionName='" + functionName + "'");
}
return stringJoiner.toString();
}
}
@@ -13,14 +13,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Represents a stream of data whose transformations will be executed in python.
*/
/** Represents a stream of data whose transformations will be executed in python. */
public class PythonDataStream extends Stream<PythonDataStream, Object> implements PythonStream {
protected PythonDataStream(
StreamingContext streamingContext,
PythonOperator pythonOperator) {
protected PythonDataStream(StreamingContext streamingContext, PythonOperator pythonOperator) {
super(streamingContext, pythonOperator);
}
@@ -36,9 +32,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
}
public PythonDataStream(
PythonDataStream input,
PythonOperator pythonOperator,
Partition<Object> partition) {
PythonDataStream input, PythonOperator pythonOperator, Partition<Object> partition) {
super(input, pythonOperator, partition);
}
@@ -57,8 +51,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a map function to this stream.
*
* @param func The python MapFunction.
* @return A new PythonDataStream.
* @param func The python MapFunction. Returns A new PythonDataStream.
*/
public PythonDataStream map(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.MAP_FUNCTION);
@@ -72,8 +65,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a flat-map function to this stream.
*
* @param func The python FlapMapFunction.
* @return A new PythonDataStream
* @param func The python FlapMapFunction. Returns A new PythonDataStream
*/
public PythonDataStream flatMap(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FLAT_MAP_FUNCTION);
@@ -87,9 +79,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a filter function to this stream.
*
* @param func The python FilterFunction.
* @return A new PythonDataStream that contains only the elements satisfying the given filter
* predicate.
* @param func The python FilterFunction. Returns A new PythonDataStream that contains only the
* elements satisfying the given filter predicate.
*/
public PythonDataStream filter(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FILTER_FUNCTION);
@@ -101,8 +92,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
* same type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with.
* @return A new UnionStream.
* @param others The other DataStreams to union output with. Returns A new UnionStream.
*/
public final PythonDataStream union(PythonDataStream stream, PythonDataStream... others) {
List<PythonDataStream> streams = new ArrayList<>();
@@ -115,8 +105,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of the
* same type with each other.
*
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
* @param streams The DataStreams to union output with. Returns A new UnionStream.
*/
public final PythonDataStream union(List<PythonDataStream> streams) {
if (this instanceof PythonUnionStream) {
@@ -135,8 +124,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a sink function and get a StreamSink.
*
* @param func The python SinkFunction.
* @return A new StreamSink.
* @param func The python SinkFunction. Returns A new StreamSink.
*/
public PythonStreamSink sink(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.SINK_FUNCTION);
@@ -150,8 +138,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a key-by function to this stream.
*
* @param func the python keyFunction.
* @return A new KeyDataStream.
* @param func the python keyFunction. Returns A new KeyDataStream.
*/
public PythonKeyDataStream keyBy(PythonFunction func) {
checkPartitionCall();
@@ -162,7 +149,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply broadcast to this stream.
*
* @return This stream.
* <p>Returns This stream.
*/
public PythonDataStream broadcast() {
checkPartitionCall();
@@ -172,8 +159,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy.
* @return This stream.
* @param partition The partitioning strategy. Returns This stream.
*/
public PythonDataStream partitionBy(PythonPartition partition) {
checkPartitionCall();
@@ -186,8 +172,9 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
*/
private void checkPartitionCall() {
if (getInputStream() != null && getInputStream().getLanguage() == Language.JAVA) {
throw new RuntimeException("Partition related methods can't be called on a " +
"python stream if parent stream is a java stream.");
throw new RuntimeException(
"Partition related methods can't be called on a "
+ "python stream if parent stream is a java stream.");
}
}
@@ -204,5 +191,4 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
public Language getLanguage() {
return Language.PYTHON;
}
}
@@ -8,9 +8,7 @@ import io.ray.streaming.python.PythonFunction.FunctionInterface;
import io.ray.streaming.python.PythonOperator;
import io.ray.streaming.python.PythonPartition;
/**
* Represents a python DataStream returned by a key-by operation.
*/
/** Represents a python DataStream returned by a key-by operation. */
@SuppressWarnings("unchecked")
public class PythonKeyDataStream extends PythonDataStream implements PythonStream {
@@ -33,8 +31,7 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
/**
* Apply a reduce function to this stream.
*
* @param func The reduce function.
* @return A new DataStream.
* @param func The reduce function. Returns A new DataStream.
*/
public PythonDataStream reduce(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.REDUCE_FUNCTION);
@@ -51,5 +48,4 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
public KeyDataStream<Object, Object> asJavaStream() {
return new KeyDataStream(this);
}
}
@@ -1,8 +1,4 @@
package io.ray.streaming.python.stream;
/**
* A marker interface used to identify all python streams.
*/
public interface PythonStream {
}
/** A marker interface used to identify all python streams. */
public interface PythonStream {}
@@ -4,9 +4,7 @@ import io.ray.streaming.api.Language;
import io.ray.streaming.api.stream.StreamSink;
import io.ray.streaming.python.PythonOperator;
/**
* Represents a sink of the PythonStream.
*/
/** Represents a sink of the PythonStream. */
public class PythonStreamSink extends StreamSink implements PythonStream {
public PythonStreamSink(PythonDataStream input, PythonOperator sinkOperator) {
@@ -18,5 +16,4 @@ public class PythonStreamSink extends StreamSink implements PythonStream {
public Language getLanguage() {
return Language.PYTHON;
}
}
@@ -7,9 +7,7 @@ import io.ray.streaming.python.PythonFunction;
import io.ray.streaming.python.PythonFunction.FunctionInterface;
import io.ray.streaming.python.PythonOperator;
/**
* Represents a source of the PythonStream.
*/
/** Represents a source of the PythonStream. */
public class PythonStreamSource extends PythonDataStream implements StreamSource {
private PythonStreamSource(StreamingContext streamingContext, PythonFunction sourceFunction) {
@@ -18,10 +16,8 @@ public class PythonStreamSource extends PythonDataStream implements StreamSource
}
public static PythonStreamSource from(
StreamingContext streamingContext,
PythonFunction sourceFunction) {
StreamingContext streamingContext, PythonFunction sourceFunction) {
sourceFunction.setFunctionInterface(FunctionInterface.SOURCE_FUNCTION);
return new PythonStreamSource(streamingContext, sourceFunction);
}
}
@@ -6,6 +6,7 @@ import java.util.List;
/**
* Represents a union DataStream.
*
* <p>This stream does not create a physical operation, it only affects how upstream data are
* connected to downstream data.
*/
@@ -16,8 +17,7 @@ public class PythonUnionStream extends PythonDataStream {
public PythonUnionStream(PythonDataStream input, List<PythonDataStream> others) {
// Union stream does not create a physical operation, so we don't have to set partition
// function for it.
super(input, new PythonOperator(
"ray.streaming.operator", "UnionOperator"));
super(input, new PythonOperator("ray.streaming.operator", "UnionOperator"));
this.unionStreams = new ArrayList<>();
others.forEach(this::addStream);
}
@@ -33,4 +33,4 @@ public class PythonUnionStream extends PythonDataStream {
public List<PythonDataStream> getUnionStreams() {
return unionStreams;
}
}
}
@@ -18,7 +18,6 @@ public class Config {
public static final String READ_TIMEOUT_MS = "read_timeout_ms";
public static final String DEFAULT_READ_TIMEOUT_MS = "10";
public static final String STREAMING_RING_BUFFER_CAPACITY = "streaming.ring_buffer_capacity";
// write an empty message if there is no data to be written in this
// interval.
@@ -31,6 +30,4 @@ public class Config {
public static final String FLOW_CONTROL_TYPE = "streaming.flow_control_type";
public static final String WRITER_CONSUMED_STEP = "streaming.writer.consumed_step";
public static final String READER_CONSUMED_STEP = "streaming.reader.consumed_step";
}
@@ -2,7 +2,6 @@ package io.ray.streaming.api.stream;
import static org.testng.Assert.assertEquals;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.operator.impl.MapOperator;
import io.ray.streaming.python.stream.PythonDataStream;
@@ -14,8 +13,8 @@ public class StreamTest {
@Test
public void testReferencedDataStream() {
DataStream dataStream = new DataStream(StreamingContext.buildContext(),
new MapOperator(value -> null));
DataStream dataStream =
new DataStream(StreamingContext.buildContext(), new MapOperator(value -> null));
PythonDataStream pythonDataStream = dataStream.asPythonStream();
DataStream javaStream = pythonDataStream.asJavaStream();
assertEquals(dataStream.getId(), pythonDataStream.getId());
@@ -27,8 +26,8 @@ public class StreamTest {
@Test
public void testReferencedKeyDataStream() {
DataStream dataStream = new DataStream(StreamingContext.buildContext(),
new MapOperator(value -> null));
DataStream dataStream =
new DataStream(StreamingContext.buildContext(), new MapOperator(value -> null));
KeyDataStream keyDataStream = dataStream.keyBy(value -> null);
PythonKeyDataStream pythonKeyDataStream = keyDataStream.asPythonStream();
KeyDataStream javaKeyDataStream = pythonKeyDataStream.asJavaStream();
@@ -38,4 +37,4 @@ public class StreamTest {
assertEquals(keyDataStream.getParallelism(), pythonKeyDataStream.getParallelism());
assertEquals(keyDataStream.getParallelism(), javaKeyDataStream.getParallelism());
}
}
}
@@ -33,13 +33,12 @@ public class JobGraphBuilderTest {
JobVertex sourceVertex = jobVertexList.get(0);
Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK);
Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE);
}
public JobGraph buildDataSyncJobGraph() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = DataStreamSource.fromCollection(streamingContext,
Lists.newArrayList("a", "b", "c"));
DataStream<String> dataStream =
DataStreamSource.fromCollection(streamingContext, Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOG.info(x));
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink));
@@ -73,10 +72,9 @@ public class JobGraphBuilderTest {
public JobGraph buildKeyByJobGraph() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = DataStreamSource.fromCollection(streamingContext,
Lists.newArrayList("1", "2", "3", "4"));
StreamSink streamSink = dataStream.keyBy(x -> x)
.sink(x -> LOG.info(x));
DataStream<String> dataStream =
DataStreamSource.fromCollection(streamingContext, Lists.newArrayList("1", "2", "3", "4"));
StreamSink streamSink = dataStream.keyBy(x -> x).sink(x -> LOG.info(x));
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink));
JobGraph jobGraph = jobGraphBuilder.build();
@@ -92,4 +90,4 @@ public class JobGraphBuilderTest {
Assert.assertTrue(diGraph.contains("\"1-SourceOperatorImpl\" -> \"2-KeyByOperator\""));
Assert.assertTrue(diGraph.contains("\"2-KeyByOperator\" -> \"3-SinkOperator\""));
}
}
}
@@ -2,7 +2,6 @@ package io.ray.streaming.jobgraph;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.stream.DataStream;
@@ -19,13 +18,14 @@ public class JobGraphOptimizerTest {
@Test
public void testOptimize() {
StreamingContext context = StreamingContext.buildContext();
DataStream<Integer> source1 = DataStreamSource.fromCollection(context,
Lists.newArrayList(1, 2, 3));
DataStream<String> source2 = DataStreamSource.fromCollection(context,
Lists.newArrayList("1", "2", "3"));
DataStream<String> source3 = DataStreamSource.fromCollection(context,
Lists.newArrayList("2", "3", "4"));
source1.filter(x -> x > 1)
DataStream<Integer> source1 =
DataStreamSource.fromCollection(context, Lists.newArrayList(1, 2, 3));
DataStream<String> source2 =
DataStreamSource.fromCollection(context, Lists.newArrayList("1", "2", "3"));
DataStream<String> source3 =
DataStreamSource.fromCollection(context, Lists.newArrayList("2", "3", "4"));
source1
.filter(x -> x > 1)
.map(String::valueOf)
.union(source2)
.join(source3)
@@ -44,11 +44,12 @@ public class JobGraphOptimizerTest {
@Test
public void testOptimizeHybridStream() {
StreamingContext context = StreamingContext.buildContext();
DataStream<Integer> source1 = DataStreamSource.fromCollection(context,
Lists.newArrayList(1, 2, 3));
DataStream<String> source2 = DataStreamSource.fromCollection(context,
Lists.newArrayList("1", "2", "3"));
source1.asPythonStream()
DataStream<Integer> source1 =
DataStreamSource.fromCollection(context, Lists.newArrayList(1, 2, 3));
DataStream<String> source2 =
DataStreamSource.fromCollection(context, Lists.newArrayList("1", "2", "3"));
source1
.asPythonStream()
.map(pyFunc(1))
.filter(pyFunc(2))
.union(source2.asPythonStream().filter(pyFunc(3)).map(pyFunc(4)))
@@ -68,5 +69,4 @@ public class JobGraphOptimizerTest {
private PythonFunction pyFunc(int number) {
return new PythonFunction("module", "func" + number);
}
}
}
@@ -12,9 +12,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Job client: to submit job from api to runtime.
*/
/** Job client: to submit job from api to runtime. */
public class JobClientImpl implements JobClient {
public static final Logger LOG = LoggerFactory.getLogger(JobClientImpl.class);
@@ -23,8 +21,11 @@ public class JobClientImpl implements JobClient {
@Override
public void submit(JobGraph jobGraph, Map<String, String> jobConfig) {
LOG.info("Submitting job [{}] with job graph [{}] and job config [{}].",
jobGraph.getJobName(), jobGraph, jobConfig);
LOG.info(
"Submitting job [{}] with job graph [{}] and job config [{}].",
jobGraph.getJobName(),
jobGraph,
jobConfig);
Map<String, Double> resources = new HashMap<>();
// set job name and id at start
@@ -34,14 +35,12 @@ public class JobClientImpl implements JobClient {
jobGraph.getJobConfig().putAll(jobConfig);
// create job master actor
this.jobMasterActor = Ray.actor(JobMaster::new, jobConfig)
.setResources(resources)
.setMaxRestarts(-1)
.remote();
this.jobMasterActor =
Ray.actor(JobMaster::new, jobConfig).setResources(resources).setMaxRestarts(-1).remote();
try {
ObjectRef<Boolean> submitResult = jobMasterActor.task(JobMaster::submitJob,
jobMasterActor, jobGraph).remote();
ObjectRef<Boolean> submitResult =
jobMasterActor.task(JobMaster::submitJob, jobMasterActor, jobGraph).remote();
if (submitResult.get()) {
LOG.info("Finish submitting job: {}.", jobGraph.getJobName());
@@ -2,9 +2,5 @@ package io.ray.streaming.runtime.config;
import org.aeonbits.owner.Accessible;
/**
* Basic config interface.
*/
public interface Config extends org.aeonbits.owner.Config, Accessible {
}
/** Basic config interface. */
public interface Config extends org.aeonbits.owner.Config, Accessible {}
@@ -3,9 +3,7 @@ package io.ray.streaming.runtime.config;
import java.io.Serializable;
import java.util.Map;
/**
* Streaming config including general, master and worker part.
*/
/** Streaming config including general, master and worker part. */
public class StreamingConfig implements Serializable {
public StreamingMasterConfig masterConfig;
@@ -21,5 +19,4 @@ public class StreamingConfig implements Serializable {
wholeConfigMap.putAll(workerConfigTemplate.configMap);
return wholeConfigMap;
}
}
@@ -15,9 +15,7 @@ import org.aeonbits.owner.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming general config. May used by both JobMaster and JobWorker.
*/
/** Streaming general config. May used by both JobMaster and JobWorker. */
public class StreamingGlobalConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StreamingGlobalConfig.class);
@@ -65,8 +63,7 @@ public class StreamingGlobalConfig implements Serializable {
break;
}
}
Preconditions.checkArgument(configInterface != null,
"Can not get config interface.");
Preconditions.checkArgument(configInterface != null, "Can not get config interface.");
Method[] methods = configInterface.getMethods();
for (Method method : methods) {
@@ -78,8 +75,10 @@ public class StreamingGlobalConfig implements Serializable {
try {
value = method.invoke(config);
} catch (Exception e) {
LOG.warn("Can not get value by method invoking for config key: {}. "
+ "So use default value instead.", ownerKeyAnnotationValue);
LOG.warn(
"Can not get value by method invoking for config key: {}. "
+ "So use default value instead.",
ownerKeyAnnotationValue);
String defaultValue = method.getAnnotation(DefaultValue.class).value();
value = defaultValue;
}
@@ -7,9 +7,7 @@ import org.aeonbits.owner.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming job master config.
*/
/** Streaming job master config. */
public class StreamingMasterConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class);
@@ -7,9 +7,7 @@ import org.aeonbits.owner.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming job worker specified config.
*/
/** Streaming job worker specified config. */
public class StreamingWorkerConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkerConfig.class);
@@ -33,5 +31,4 @@ public class StreamingWorkerConfig extends StreamingGlobalConfig {
}
return result;
}
}
@@ -3,9 +3,7 @@ package io.ray.streaming.runtime.config.global;
import io.ray.streaming.runtime.config.Config;
import org.aeonbits.owner.Mutable;
/**
* Configurations for checkpointing.
*/
/** Configurations for checkpointing. */
public interface CheckpointConfig extends Config, Mutable {
String CP_INTERVAL_SECS = "streaming.checkpoint.interval.secs";
@@ -2,9 +2,7 @@ package io.ray.streaming.runtime.config.global;
import io.ray.streaming.runtime.config.Config;
/**
* Job common config.
*/
/** Job common config. */
public interface CommonConfig extends Config {
String JOB_ID = "streaming.job.id";
@@ -13,7 +11,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job id. Non-custom.
*
* @return Job id with string type.
* <p>Returns Job id with string type.
*/
@DefaultValue(value = "default-job-id")
@Key(value = JOB_ID)
@@ -22,7 +20,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job name. Non-custom.
*
* @return Job name with string type.
* <p>Returns Job name with string type.
*/
@DefaultValue(value = "default-job-name")
@Key(value = JOB_NAME)
@@ -3,42 +3,30 @@ package io.ray.streaming.runtime.config.global;
import io.ray.streaming.runtime.config.Config;
import io.ray.streaming.runtime.config.types.TransferChannelType;
/**
* Job data transfer config.
*/
/** Job data transfer config. */
public interface TransferConfig extends Config {
/**
* Data transfer channel type, support memory queue and native queue.
*/
/** Data transfer channel type, support memory queue and native queue. */
@DefaultValue(value = "NATIVE_CHANNEL")
@Key(value = io.ray.streaming.util.Config.CHANNEL_TYPE)
TransferChannelType channelType();
/**
* Queue size.
*/
/** Queue size. */
@DefaultValue(value = "100000000")
@Key(value = io.ray.streaming.util.Config.CHANNEL_SIZE)
long channelSize();
/**
* Return from DataReader.getBundle if only empty message read in this interval.
*/
/** Return from DataReader.getBundle if only empty message read in this interval. */
@DefaultValue(value = "-1")
@Key(value = io.ray.streaming.util.Config.TIMER_INTERVAL_MS)
long readerTimerIntervalMs();
/**
* Ring capacity.
*/
/** Ring capacity. */
@DefaultValue(value = "-1")
@Key(value = io.ray.streaming.util.Config.STREAMING_RING_BUFFER_CAPACITY)
int ringBufferCapacity();
/**
* Write an empty message if there is no data to be written in this interval.
*/
/** Write an empty message if there is no data to be written in this interval. */
@DefaultValue(value = "-1")
@Key(value = io.ray.streaming.util.Config.STREAMING_EMPTY_MESSAGE_INTERVAL)
int emptyMsgInterval();
@@ -2,81 +2,54 @@ package io.ray.streaming.runtime.config.master;
import io.ray.streaming.runtime.config.Config;
/**
* Job resource management config.
*/
/** Job resource management config. */
public interface ResourceConfig extends Config {
/**
* Number of actors per container.
*/
/** Number of actors per container. */
String MAX_ACTOR_NUM_PER_CONTAINER = "streaming.container.per.max.actor";
/**
* The interval between detecting ray cluster nodes.
*/
/** The interval between detecting ray cluster nodes. */
String CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND = "streaming.resource.check.interval.second";
/**
* CPU use by per task.
*/
/** CPU use by per task. */
String TASK_RESOURCE_CPU = "streaming.task.resource.cpu";
/**
* Memory use by each task
*/
/** Memory use by each task */
String TASK_RESOURCE_MEM = "streaming.task.resource.mem";
/**
* Whether to enable CPU limit in resource control.
*/
/** Whether to enable CPU limit in resource control. */
String TASK_RESOURCE_CPU_LIMIT_ENABLE = "streaming.task.resource.cpu.limitation.enable";
/**
* Whether to enable memory limit in resource control.
*/
/** Whether to enable memory limit in resource control. */
String TASK_RESOURCE_MEM_LIMIT_ENABLE = "streaming.task.resource.mem.limitation.enable";
/**
* Number of cpu per task.
*/
/** Number of cpu per task. */
@DefaultValue(value = "1.0")
@Key(value = TASK_RESOURCE_CPU)
double taskCpuResource();
/**
* Memory size used by each task.
*/
/** Memory size used by each task. */
@DefaultValue(value = "2.0")
@Key(value = TASK_RESOURCE_MEM)
double taskMemResource();
/**
* Whether to enable CPU limit in resource control.
*/
/** Whether to enable CPU limit in resource control. */
@DefaultValue(value = "false")
@Key(value = TASK_RESOURCE_CPU_LIMIT_ENABLE)
boolean isTaskCpuResourceLimit();
/**
* Whether to enable memory limit in resource control.
*/
/** Whether to enable memory limit in resource control. */
@DefaultValue(value = "false")
@Key(value = TASK_RESOURCE_MEM_LIMIT_ENABLE)
boolean isTaskMemResourceLimit();
/**
* Number of actors per container.
*/
/** Number of actors per container. */
@DefaultValue(value = "500")
@Key(MAX_ACTOR_NUM_PER_CONTAINER)
int actorNumPerContainer();
/**
* The interval between detecting ray cluster nodes.
*/
/** The interval between detecting ray cluster nodes. */
@DefaultValue(value = "1")
@Key(value = CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND)
long resourceCheckIntervalSecond();
}
@@ -2,9 +2,7 @@ package io.ray.streaming.runtime.config.master;
import io.ray.streaming.runtime.config.Config;
/**
* Configuration for job scheduler.
*/
/** Configuration for job scheduler. */
public interface SchedulerConfig extends Config {
String WORKER_INITIATION_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.initiation.timeout.ms";
@@ -13,7 +11,7 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker initiation. Default is: 10000ms(10s).
*
* @return timeout ms
* <p>Returns timeout ms
*/
@Key(WORKER_INITIATION_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
@@ -22,10 +20,9 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker starting. Default is: 10000ms(10s).
*
* @return timeout ms
* <p>Returns timeout ms
*/
@Key(WORKER_STARTING_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
int workerStartingWaitTimeoutMs();
}
@@ -2,14 +2,10 @@ package io.ray.streaming.runtime.config.types;
public enum ContextBackendType {
/**
* Memory type
*/
/** Memory type */
MEMORY("memory", 0),
/**
* Local File
*/
/** Local File */
LOCAL_FILE("local_file", 1);
private String name;
@@ -2,9 +2,7 @@ package io.ray.streaming.runtime.config.types;
public enum ResourceAssignStrategyType {
/**
* Resource scheduling strategy based on FF(First Fit) algorithm and pipeline.
*/
/** Resource scheduling strategy based on FF(First Fit) algorithm and pipeline. */
PIPELINE_FIRST_STRATEGY("pipeline_first_strategy", 0);
private String name;
@@ -1,18 +1,12 @@
package io.ray.streaming.runtime.config.types;
/**
* Data transfer channel type.
*/
/** Data transfer channel type. */
public enum TransferChannelType {
/**
* Memory queue.
*/
/** Memory queue. */
MEMORY_CHANNEL("memory_channel", 0),
/**
* Native queue.
*/
/** Native queue. */
NATIVE_CHANNEL("native_channel", 1);
private String value;
@@ -3,24 +3,18 @@ package io.ray.streaming.runtime.config.worker;
import io.ray.streaming.runtime.config.Config;
import org.aeonbits.owner.Mutable;
/**
* This worker config is used by JobMaster to define the internal configuration of JobWorker.
*/
/** This worker config is used by JobMaster to define the internal configuration of JobWorker. */
public interface WorkerInternalConfig extends Config, Mutable {
String WORKER_NAME_INTERNAL = io.ray.streaming.util.Config.STREAMING_WORKER_NAME;
String OP_NAME_INTERNAL = io.ray.streaming.util.Config.STREAMING_OP_NAME;
/**
* The name of the worker inside the system.
*/
/** The name of the worker inside the system. */
@DefaultValue(value = "default-worker-name")
@Key(value = WORKER_NAME_INTERNAL)
String workerName();
/**
* Operator name corresponding to worker.
*/
/** Operator name corresponding to worker. */
@DefaultValue(value = "default-worker-op-name")
@Key(value = OP_NAME_INTERNAL)
String workerOperatorName();
@@ -4,23 +4,22 @@ import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.worker.JobWorker;
/**
* This interface is used for storing context of {@link JobWorker} and {@link JobMaster}.
* The checkpoint returned by user function is also saved using this interface.
* This interface is used for storing context of {@link JobWorker} and {@link JobMaster}. The
* checkpoint returned by user function is also saved using this interface.
*/
public interface ContextBackend {
/**
* check if key exists in state
*
* @return true if exists
* <p>Returns true if exists
*/
boolean exists(final String key) throws Exception;
/**
* get content by key
*
* @param key key
* @return the StateBackend
* @param key key Returns the StateBackend
*/
byte[] get(final String key) throws Exception;
@@ -38,5 +37,4 @@ public interface ContextBackend {
* @param key key
*/
void remove(final String key) throws Exception;
}
@@ -9,8 +9,8 @@ public class ContextBackendFactory {
public static ContextBackend getContextBackend(final StreamingGlobalConfig config) {
ContextBackend contextBackend;
ContextBackendType type = ContextBackendType.valueOf(
config.contextBackendConfig.stateBackendType().toUpperCase());
ContextBackendType type =
ContextBackendType.valueOf(config.contextBackendConfig.stateBackendType().toUpperCase());
switch (type) {
case MEMORY:
@@ -24,4 +24,4 @@ public class ContextBackendFactory {
}
return contextBackend;
}
}
}
@@ -6,21 +6,17 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* This data structure contains state information of a task.
*/
/** This data structure contains state information of a task. */
public class OperatorCheckpointInfo implements Serializable {
/**
* key: channel ID, value: offset
*/
/** key: channel ID, value: offset */
public Map<String, OffsetInfo> inputPoints;
public Map<String, OffsetInfo> outputPoints;
/**
* a serializable checkpoint returned by processor
*/
/** a serializable checkpoint returned by processor */
public Serializable processorCheckpoint;
public long checkpointId;
public OperatorCheckpointInfo() {
@@ -5,8 +5,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Achieves an atomic `put` method.
* known issue: if you crashed while write a key at first time, this code will not work.
* Achieves an atomic `put` method. known issue: if you crashed while write a key at first time,
* this code will not work.
*/
public class AtomicFsBackend extends LocalFileContextBackend {
@@ -6,16 +6,15 @@ import java.io.File;
import org.apache.commons.io.FileUtils;
/**
* This context backend uses local file system and doesn't supports failover in cluster.
* But it supports failover in single node.
* This is a pure file system backend which doesn't support atomic writing, please don't use this
* class, instead, use {@link AtomicFsBackend} which extends this class.
* This context backend uses local file system and doesn't supports failover in cluster. But it
* supports failover in single node. This is a pure file system backend which doesn't support atomic
* writing, please don't use this class, instead, use {@link AtomicFsBackend} which extends this
* class.
*/
public class LocalFileContextBackend implements ContextBackend {
private final String rootPath;
public LocalFileContextBackend(ContextBackendConfig config) {
rootPath = config.fileStateRootPath();
}
@@ -8,8 +8,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This context backend uses memory and doesn't supports failover.
* Data will be lost after worker died.
* This context backend uses memory and doesn't supports failover. Data will be lost after worker
* died.
*/
public class MemoryContextBackend implements ContextBackend {
@@ -36,13 +36,15 @@ public class OutputCollector implements Collector<Record> {
this.writer = writer;
this.outputQueues = outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new);
this.targetActors = targetActors;
this.targetLanguages = targetActors.stream()
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON :
Language.JAVA)
.toArray(Language[]::new);
this.targetLanguages =
targetActors.stream()
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA)
.toArray(Language[]::new);
this.partition = partition;
LOGGER.debug("OutputCollector constructed, outputChannelIds:{}, partition:{}.",
outputChannelIds, this.partition);
LOGGER.debug(
"OutputCollector constructed, outputChannelIds:{}, partition:{}.",
outputChannelIds,
this.partition);
}
@Override
@@ -76,5 +78,4 @@ public class OutputCollector implements Collector<Record> {
}
}
}
}
@@ -5,9 +5,7 @@ import io.ray.streaming.runtime.core.resource.ContainerId;
import java.io.Serializable;
import java.util.UUID;
/**
* Streaming system unique identity base class. For example, ${@link ContainerId }
*/
/** Streaming system unique identity base class. For example, ${@link ContainerId } */
public class AbstractId implements Serializable {
private UUID id;
@@ -27,8 +25,6 @@ public class AbstractId implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.toString();
return MoreObjects.toStringHelper(this).add("id", id).toString();
}
}
@@ -4,29 +4,19 @@ import com.google.common.base.MoreObjects;
import io.ray.streaming.api.partition.Partition;
import java.io.Serializable;
/**
* An edge that connects two execution vertices.
*/
/** An edge that connects two execution vertices. */
public class ExecutionEdge implements Serializable {
/**
* The source(upstream) execution vertex.
*/
/** The source(upstream) execution vertex. */
private final ExecutionVertex sourceExecutionVertex;
/**
* The target(downstream) execution vertex.
*/
/** The target(downstream) execution vertex. */
private final ExecutionVertex targetExecutionVertex;
/**
* The partition of current execution edge's execution job edge.
*/
/** The partition of current execution edge's execution job edge. */
private final Partition partition;
/**
* An unique id for execution edge.
*/
/** An unique id for execution edge. */
private final String executionEdgeIndex;
public ExecutionEdge(
@@ -40,7 +30,8 @@ public class ExecutionEdge implements Serializable {
}
private String generateExecutionEdgeIndex() {
return sourceExecutionVertex.getExecutionVertexId() + ""
return sourceExecutionVertex.getExecutionVertexId()
+ ""
+ targetExecutionVertex.getExecutionVertexId();
}
@@ -17,62 +17,36 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Physical plan.
*/
/** Physical plan. */
public class ExecutionGraph implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
/**
* Name of the job.
*/
/** Name of the job. */
private final String jobName;
/**
* Configuration of the job.
*/
/** Configuration of the job. */
private Map<String, String> jobConfig;
/**
* Data map for execution job vertex. key: job vertex id. value: execution job vertex.
*/
/** Data map for execution job vertex. key: job vertex id. value: execution job vertex. */
private Map<Integer, ExecutionJobVertex> executionJobVertexMap;
/**
* Data map for execution vertex.
* key: execution vertex id.
* value: execution vertex.
*/
/** Data map for execution vertex. key: execution vertex id. value: execution vertex. */
private Map<Integer, ExecutionVertex> executionVertexMap;
/**
* Data map for execution vertex.
* key: actor id.
* value: execution vertex.
*/
/** Data map for execution vertex. key: actor id. value: execution vertex. */
private Map<ActorId, ExecutionVertex> actorIdExecutionVertexMap;
/**
* key: channel ID
* value: actors in both sides of this channel
*/
/** key: channel ID value: actors in both sides of this channel */
private Map<String, Set<BaseActorHandle>> channelGroupedActors;
/**
* The max parallelism of the whole graph.
*/
/** The max parallelism of the whole graph. */
private int maxParallelism;
/**
* Build time.
*/
/** Build time. */
private long buildTime;
/**
* A monotonic increasing number, used for vertex's id(immutable).
*/
/** A monotonic increasing number, used for vertex's id(immutable). */
private AtomicInteger executionVertexIdGenerator = new AtomicInteger(0);
public ExecutionGraph(String jobName) {
@@ -96,10 +70,9 @@ public class ExecutionGraph implements Serializable {
this.executionJobVertexMap = executionJobVertexMap;
}
/**
* generate relation mappings between actors, execution vertices and channels
* this method must be called after worker actor is set.
* generate relation mappings between actors, execution vertices and channels this method must be
* called after worker actor is set.
*/
public void generateActorMappings() {
LOG.info("Setup queue actors relation.");
@@ -107,29 +80,33 @@ public class ExecutionGraph implements Serializable {
channelGroupedActors = new HashMap<>();
actorIdExecutionVertexMap = new HashMap<>();
getAllExecutionVertices().forEach(curVertex -> {
getAllExecutionVertices()
.forEach(
curVertex -> {
// current
actorIdExecutionVertexMap.put(curVertex.getActorId(), curVertex);
// current
actorIdExecutionVertexMap.put(curVertex.getActorId(), curVertex);
// input
List<ExecutionEdge> inputEdges = curVertex.getInputEdges();
inputEdges.forEach(inputEdge -> {
ExecutionVertex inputVertex = inputEdge.getSourceExecutionVertex();
String channelId = curVertex.getChannelIdByPeerVertex(inputVertex);
addActorToChannelGroupedActors(channelGroupedActors, channelId,
inputVertex.getWorkerActor());
});
// input
List<ExecutionEdge> inputEdges = curVertex.getInputEdges();
inputEdges.forEach(
inputEdge -> {
ExecutionVertex inputVertex = inputEdge.getSourceExecutionVertex();
String channelId = curVertex.getChannelIdByPeerVertex(inputVertex);
addActorToChannelGroupedActors(
channelGroupedActors, channelId, inputVertex.getWorkerActor());
});
// output
List<ExecutionEdge> outputEdges = curVertex.getOutputEdges();
outputEdges.forEach(outputEdge -> {
ExecutionVertex outputVertex = outputEdge.getTargetExecutionVertex();
String channelId = curVertex.getChannelIdByPeerVertex(outputVertex);
addActorToChannelGroupedActors(channelGroupedActors, channelId,
outputVertex.getWorkerActor());
});
});
// output
List<ExecutionEdge> outputEdges = curVertex.getOutputEdges();
outputEdges.forEach(
outputEdge -> {
ExecutionVertex outputVertex = outputEdge.getTargetExecutionVertex();
String channelId = curVertex.getChannelIdByPeerVertex(outputVertex);
addActorToChannelGroupedActors(
channelGroupedActors, channelId, outputVertex.getWorkerActor());
});
});
LOG.debug("Channel grouped actors is: {}.", channelGroupedActors);
}
@@ -179,7 +156,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices from current execution graph.
*
* @return all execution vertices.
* <p>Returns all execution vertices.
*/
public List<ExecutionVertex> getAllExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -191,7 +168,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices whose status is 'TO_ADD' from current execution graph.
*
* @return all added execution vertices.
* <p>Returns all added execution vertices.
*/
public List<ExecutionVertex> getAllAddedExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -204,8 +181,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified execution vertex from current execution graph by execution vertex id.
*
* @param executionVertexId execution vertex id.
* @return the specified execution vertex.
* @param executionVertexId execution vertex id. Returns the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByExecutionVertexId(int executionVertexId) {
if (executionVertexMap.containsKey(executionVertexId)) {
@@ -214,53 +190,46 @@ public class ExecutionGraph implements Serializable {
throw new RuntimeException("Vertex " + executionVertexId + " does not exist!");
}
/**
* Get specified execution vertex from current execution graph by actor id.
*
* @param actorId the actor id of execution vertex.
* @return the specified execution vertex.
* @param actorId the actor id of execution vertex. Returns the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByActorId(ActorId actorId) {
return actorIdExecutionVertexMap.get(actorId);
}
/**
* Get specified actor by actor id.
*
* @param actorId the actor id of execution vertex.
* @return the specified actor handle.
* @param actorId the actor id of execution vertex. Returns the specified actor handle.
*/
public Optional<BaseActorHandle> getActorById(ActorId actorId) {
return getAllActors().stream()
.filter(actor -> actor.getId().equals(actorId))
.findFirst();
return getAllActors().stream().filter(actor -> actor.getId().equals(actorId)).findFirst();
}
/**
* Get the peer actor in the other side of channelName of a given actor
*
* @param actor actor in this side
* @param channelName the channel name
* @return the peer actor in the other side
* @param channelName the channel name Returns the peer actor in the other side
*/
public BaseActorHandle getPeerActor(BaseActorHandle actor, String channelName) {
Set<BaseActorHandle> set = getActorsByChannelId(channelName);
final BaseActorHandle[] res = new BaseActorHandle[1];
set.forEach(anActor -> {
if (!anActor.equals(actor)) {
res[0] = anActor;
}
});
set.forEach(
anActor -> {
if (!anActor.equals(actor)) {
res[0] = anActor;
}
});
return res[0];
}
/**
* Get actors in both sides of a channelId
*
* @param channelId the channelId
* @return actors in both sides
* @param channelId the channelId Returns actors in both sides
*/
public Set<BaseActorHandle> getActorsByChannelId(String channelId) {
return channelGroupedActors.getOrDefault(channelId, Sets.newHashSet());
@@ -269,7 +238,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all actors by graph.
*
* @return actor list
* <p>Returns actor list
*/
public List<BaseActorHandle> getAllActors() {
return getActorsFromJobVertices(getExecutionJobVertexList());
@@ -278,12 +247,13 @@ public class ExecutionGraph implements Serializable {
/**
* Get source actors by graph.
*
* @return actor list
* <p>Returns actor list
*/
public List<BaseActorHandle> getSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSourceVertex)
.collect(Collectors.toList());
List<ExecutionJobVertex> executionJobVertices =
getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSourceVertex)
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
@@ -291,16 +261,16 @@ public class ExecutionGraph implements Serializable {
/**
* Get transformation and sink actors by graph.
*
* @return actor list
* <p>Returns actor list
*/
public List<BaseActorHandle> getNonSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(executionJobVertex ->
executionJobVertex
.isTransformationVertex()
|| executionJobVertex
.isSinkVertex())
.collect(Collectors.toList());
List<ExecutionJobVertex> executionJobVertices =
getExecutionJobVertexList().stream()
.filter(
executionJobVertex ->
executionJobVertex.isTransformationVertex()
|| executionJobVertex.isSinkVertex())
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
@@ -308,12 +278,13 @@ public class ExecutionGraph implements Serializable {
/**
* Get sink actors by graph.
*
* @return actor list
* <p>Returns actor list
*/
public List<BaseActorHandle> getSinkActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSinkVertex)
.collect(Collectors.toList());
List<ExecutionJobVertex> executionJobVertices =
getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSinkVertex)
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
@@ -321,8 +292,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get actors according to job vertices.
*
* @param executionJobVertices specified job vertices
* @return actor list
* @param executionJobVertices specified job vertices Returns actor list
*/
public List<BaseActorHandle> getActorsFromJobVertices(
List<ExecutionJobVertex> executionJobVertices) {
@@ -351,9 +321,6 @@ public class ExecutionGraph implements Serializable {
}
public List<ActorId> getAllActorsId() {
return getAllActors().stream()
.map(BaseActorHandle::getId)
.collect(Collectors.toList());
return getAllActors().stream().map(BaseActorHandle::getId).collect(Collectors.toList());
}
}
@@ -5,29 +5,19 @@ import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.jobgraph.JobEdge;
import java.io.Serializable;
/**
* An edge that connects two execution job vertices.
*/
/** An edge that connects two execution job vertices. */
public class ExecutionJobEdge implements Serializable {
/**
* The source(upstream) execution job vertex.
*/
/** The source(upstream) execution job vertex. */
private final ExecutionJobVertex sourceExecutionJobVertex;
/**
* The target(downstream) execution job vertex.
*/
/** The target(downstream) execution job vertex. */
private final ExecutionJobVertex targetExecutionJobVertex;
/**
* The partition of the execution job edge.
*/
/** The partition of the execution job edge. */
private final Partition partition;
/**
* An unique id for execution job edge.
*/
/** An unique id for execution job edge. */
private final String executionJobEdgeIndex;
public ExecutionJobEdge(
@@ -41,7 +31,8 @@ public class ExecutionJobEdge implements Serializable {
}
private String generateExecutionJobEdgeIndex() {
return sourceExecutionJobVertex.getExecutionJobVertexId() + ""
return sourceExecutionJobVertex.getExecutionJobVertexId()
+ ""
+ targetExecutionJobVertex.getExecutionJobVertexId();
}
@@ -18,41 +18,35 @@ import org.aeonbits.owner.ConfigFactory;
/**
* Physical job vertex.
* <p>Execution job vertex is the physical form of {@link JobVertex} and
* every execution job vertex is corresponding to a group of {@link ExecutionVertex}.
*
* <p>Execution job vertex is the physical form of {@link JobVertex} and every execution job vertex
* is corresponding to a group of {@link ExecutionVertex}.
*/
public class ExecutionJobVertex implements Serializable {
/**
* Unique id. Use {@link JobVertex}'s id directly.
*/
/** Unique id. Use {@link JobVertex}'s id directly. */
private final int executionJobVertexId;
/**
* Use jobVertex id and operator(use {@link StreamOperator}'s name) as name. e.g.
* 1-SourceOperator
* Use jobVertex id and operator(use {@link StreamOperator}'s name) as name. e.g. 1-SourceOperator
*/
private final String executionJobVertexName;
private final StreamOperator streamOperator;
private final VertexType vertexType;
private final Language language;
private final Map<String, String> jobConfig;
private final long buildTime;
/**
* Parallelism of current execution job vertex(operator).
*/
/** Parallelism of current execution job vertex(operator). */
private int parallelism;
/**
* Sub execution vertices of current execution job vertex(operator).
*/
/** Sub execution vertices of current execution job vertex(operator). */
private List<ExecutionVertex> executionVertices;
/**
* Input and output edges of current execution job vertex.
*/
/** Input and output edges of current execution job vertex. */
private List<ExecutionJobEdge> inputEdges = new ArrayList<>();
private List<ExecutionJobEdge> outputEdges = new ArrayList<>();
public ExecutionJobVertex(
@@ -61,8 +55,9 @@ public class ExecutionJobVertex implements Serializable {
AtomicInteger idGenerator,
long buildTime) {
this.executionJobVertexId = jobVertex.getVertexId();
this.executionJobVertexName = generateExecutionJobVertexName(
executionJobVertexId, jobVertex.getStreamOperator().getName());
this.executionJobVertexName =
generateExecutionJobVertexName(
executionJobVertexId, jobVertex.getStreamOperator().getName());
this.streamOperator = jobVertex.getStreamOperator();
this.vertexType = jobVertex.getVertexType();
this.language = jobVertex.getLanguage();
@@ -77,8 +72,8 @@ public class ExecutionJobVertex implements Serializable {
ResourceConfig resourceConfig = ConfigFactory.create(ResourceConfig.class, jobConfig);
for (int subIndex = 0; subIndex < parallelism; subIndex++) {
executionVertices.add(new ExecutionVertex(
idGenerator.getAndIncrement(), subIndex, this, resourceConfig));
executionVertices.add(
new ExecutionVertex(idGenerator.getAndIncrement(), subIndex, this, resourceConfig));
}
return executionVertices;
}
@@ -91,14 +86,14 @@ public class ExecutionJobVertex implements Serializable {
Map<Integer, BaseActorHandle> executionVertexWorkersMap = new HashMap<>();
Preconditions.checkArgument(
executionVertices != null && !executionVertices.isEmpty(),
"Empty execution vertex.");
executionVertices.stream().forEach(vertex -> {
Preconditions.checkArgument(
vertex.getWorkerActor() != null,
"Empty execution vertex worker actor.");
executionVertexWorkersMap.put(vertex.getExecutionVertexId(), vertex.getWorkerActor());
});
executionVertices != null && !executionVertices.isEmpty(), "Empty execution vertex.");
executionVertices.stream()
.forEach(
vertex -> {
Preconditions.checkArgument(
vertex.getWorkerActor() != null, "Empty execution vertex worker actor.");
executionVertexWorkersMap.put(vertex.getExecutionVertexId(), vertex.getWorkerActor());
});
return executionVertexWorkersMap;
}
@@ -114,7 +109,7 @@ public class ExecutionJobVertex implements Serializable {
/**
* e.g. 1-SourceOperator
*
* @return operator name with index
* <p>Returns operator name with index
*/
public String getExecutionJobVertexNameWithIndex() {
return executionJobVertexId + "-" + executionJobVertexName;
@@ -128,8 +123,7 @@ public class ExecutionJobVertex implements Serializable {
return executionVertices;
}
public void setExecutionVertices(
List<ExecutionVertex> executionVertex) {
public void setExecutionVertices(List<ExecutionVertex> executionVertex) {
this.executionVertices = executionVertex;
}
@@ -137,8 +131,7 @@ public class ExecutionJobVertex implements Serializable {
return outputEdges;
}
public void setOutputEdges(
List<ExecutionJobEdge> outputEdges) {
public void setOutputEdges(List<ExecutionJobEdge> outputEdges) {
this.outputEdges = outputEdges;
}
@@ -146,8 +139,7 @@ public class ExecutionJobVertex implements Serializable {
return inputEdges;
}
public void setInputEdges(
List<ExecutionJobEdge> inputEdges) {
public void setInputEdges(List<ExecutionJobEdge> inputEdges) {
this.inputEdges = inputEdges;
}
@@ -18,34 +18,25 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Physical vertex, correspond to {@link ExecutionJobVertex}.
*/
/** Physical vertex, correspond to {@link ExecutionJobVertex}. */
public class ExecutionVertex implements Serializable {
/**
* Unique id for execution vertex.
*/
/** Unique id for execution vertex. */
private final int executionVertexId;
/**
* Immutable field inherited from {@link ExecutionJobVertex}.
*/
/** Immutable field inherited from {@link ExecutionJobVertex}. */
private final int executionJobVertexId;
private final String executionJobVertexName;
private final StreamOperator streamOperator;
private final VertexType vertexType;
private final Language language;
private final long buildTime;
/**
* Resource used by ExecutionVertex.
*/
/** Resource used by ExecutionVertex. */
private final Map<String, Double> resource;
/**
* Parallelism of current vertex's operator.
*/
/** Parallelism of current vertex's operator. */
private int parallelism;
/**
@@ -56,21 +47,15 @@ public class ExecutionVertex implements Serializable {
private ExecutionVertexState state = ExecutionVertexState.TO_ADD;
/**
* The id of the container which this vertex's worker actor belongs to.
*/
/** The id of the container which this vertex's worker actor belongs to. */
private ContainerId containerId;
private String pid;
/**
* Worker actor handle.
*/
/** Worker actor handle. */
private BaseActorHandle workerActor;
/**
* Op config + job config.
*/
/** Op config + job config. */
private Map<String, String> workerConfig;
private List<ExecutionEdge> inputEdges = new ArrayList<>();
@@ -83,7 +68,6 @@ public class ExecutionVertex implements Serializable {
private transient List<BaseActorHandle> inputActorList;
private Map<Integer, String> exeVertexChannelMap;
public ExecutionVertex(
int globalIndex,
int index,
@@ -182,8 +166,7 @@ public class ExecutionVertex implements Serializable {
return inputEdges;
}
public void setInputEdges(
List<ExecutionEdge> inputEdges) {
public void setInputEdges(List<ExecutionEdge> inputEdges) {
this.inputEdges = inputEdges;
}
@@ -191,8 +174,7 @@ public class ExecutionVertex implements Serializable {
return outputEdges;
}
public void setOutputEdges(
List<ExecutionEdge> outputEdges) {
public void setOutputEdges(List<ExecutionEdge> outputEdges) {
this.outputEdges = outputEdges;
}
@@ -279,7 +261,6 @@ public class ExecutionVertex implements Serializable {
return inputActorList;
}
public String getChannelIdByPeerVertex(ExecutionVertex peerVertex) {
if (exeVertexChannelMap == null) {
generateActorChannelInfo();
@@ -287,7 +268,6 @@ public class ExecutionVertex implements Serializable {
return exeVertexChannelMap.get(peerVertex.getExecutionVertexId());
}
private void generateActorChannelInfo() {
inputChannelIdList = new ArrayList<>();
inputActorList = new ArrayList<>();
@@ -297,10 +277,11 @@ public class ExecutionVertex implements Serializable {
List<ExecutionEdge> inputEdges = getInputEdges();
for (ExecutionEdge edge : inputEdges) {
String channelId = ChannelId.genIdStr(
edge.getSourceExecutionVertex().getExecutionVertexId(),
getExecutionVertexId(),
getBuildTime());
String channelId =
ChannelId.genIdStr(
edge.getSourceExecutionVertex().getExecutionVertexId(),
getExecutionVertexId(),
getBuildTime());
inputChannelIdList.add(channelId);
inputActorList.add(edge.getSourceExecutionVertex().getWorkerActor());
exeVertexChannelMap.put(edge.getSourceExecutionVertex().getExecutionVertexId(), channelId);
@@ -308,17 +289,17 @@ public class ExecutionVertex implements Serializable {
List<ExecutionEdge> outputEdges = getOutputEdges();
for (ExecutionEdge edge : outputEdges) {
String channelId = ChannelId.genIdStr(
getExecutionVertexId(),
edge.getTargetExecutionVertex().getExecutionVertexId(),
getBuildTime());
String channelId =
ChannelId.genIdStr(
getExecutionVertexId(),
edge.getTargetExecutionVertex().getExecutionVertexId(),
getBuildTime());
outputChannelIdList.add(channelId);
outputActorList.add(edge.getTargetExecutionVertex().getWorkerActor());
exeVertexChannelMap.put(edge.getTargetExecutionVertex().getExecutionVertexId(), channelId);
}
}
private Map<String, Double> generateResources(ResourceConfig resourceConfig) {
Map<String, Double> resourceMap = new HashMap<>();
if (resourceConfig.isTaskCpuResourceLimit()) {
@@ -2,29 +2,19 @@ package io.ray.streaming.runtime.core.graph.executiongraph;
import java.io.Serializable;
/**
* Vertex state.
*/
/** Vertex state. */
public enum ExecutionVertexState implements Serializable {
/**
* Vertex(Worker) to be added.
*/
/** Vertex(Worker) to be added. */
TO_ADD(1, "TO_ADD"),
/**
* Vertex(Worker) to be deleted.
*/
/** Vertex(Worker) to be deleted. */
TO_DEL(2, "TO_DEL"),
/**
* Vertex(Worker) is running.
*/
/** Vertex(Worker) is running. */
RUNNING(3, "RUNNING"),
/**
* Unknown status,
*/
/** Unknown status, */
UNKNOWN(-1, "UNKNOWN");
public final int code;
@@ -34,5 +24,4 @@ public enum ExecutionVertexState implements Serializable {
this.code = code;
this.msg = msg;
}
}
@@ -14,7 +14,9 @@ public class ProcessBuilder {
public static StreamProcessor buildProcessor(StreamOperator streamOperator) {
OperatorType type = streamOperator.getOpType();
LOGGER.info("Building StreamProcessor, operator type = {}, operator = {}.", type,
LOGGER.info(
"Building StreamProcessor, operator type = {}, operator = {}.",
type,
streamOperator.getClass().getSimpleName());
switch (type) {
case SOURCE:
@@ -12,14 +12,10 @@ public interface Processor<T> extends Serializable {
void process(T t);
/**
* See {@link Function#saveCheckpoint()}.
*/
/** See {@link Function#saveCheckpoint()}. */
Serializable saveCheckpoint();
/**
* See {@link Function#loadCheckpoint(Serializable)}.
*/
/** See {@link Function#loadCheckpoint(Serializable)}. */
void loadCheckpoint(Serializable checkpointObject);
void close();
@@ -24,7 +24,5 @@ public class SourceProcessor<T> extends StreamProcessor<Record, SourceOperator<T
}
@Override
public void close() {
}
public void close() {}
}
@@ -21,48 +21,31 @@ public class Container implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(Container.class);
/**
* container id
*/
/** container id */
private ContainerId id;
/**
* Container address
*/
/** Container address */
private String address;
/**
* Container hostname
*/
/** Container hostname */
private String hostname;
/**
* Container unique id fetched from raylet
*/
/** Container unique id fetched from raylet */
private UniqueId nodeId;
/**
* Container available resources
*/
/** Container available resources */
private Map<String, Double> availableResources = new HashMap<>();
/**
* List of {@link ExecutionVertex} ids belong to the container.
*/
/** List of {@link ExecutionVertex} ids belong to the container. */
private List<Integer> executionVertexIds = new ArrayList<>();
/**
* Capacity is max actor number could be allocated in the container
*/
/** Capacity is max actor number could be allocated in the container */
private int capacity = 0;
public Container() {
}
public Container() {}
public Container(
String address,
UniqueId nodeId, String hostname,
Map<String, Double> availableResources) {
String address, UniqueId nodeId, String hostname, Map<String, Double> availableResources) {
this.id = new ContainerId();
this.address = address;
@@ -73,11 +56,7 @@ public class Container implements Serializable {
public static Container from(NodeInfo nodeInfo) {
return new Container(
nodeInfo.nodeAddress,
nodeInfo.nodeId,
nodeInfo.nodeHostname,
nodeInfo.resources
);
nodeInfo.nodeAddress, nodeInfo.nodeId, nodeInfo.nodeHostname, nodeInfo.resources);
}
public ContainerId getId() {
@@ -112,7 +91,6 @@ public class Container implements Serializable {
return capacity;
}
public void updateCapacity(int capacity) {
LOG.info("Update container capacity, old value: {}, new value: {}.", this.capacity, capacity);
this.capacity = capacity;
@@ -150,8 +128,10 @@ public class Container implements Serializable {
executionVertexIds.removeIf(id -> id == vertex.getExecutionVertexId());
reclaimResource(vertex.getResource());
} else {
throw new RuntimeException(String.format("Current container [%s] not found vertex [%s].",
this, vertex.getExecutionJobVertexName()));
throw new RuntimeException(
String.format(
"Current container [%s] not found vertex [%s].",
this, vertex.getExecutionJobVertexName()));
}
}
@@ -160,24 +140,36 @@ public class Container implements Serializable {
}
private void decreaseResource(Map<String, Double> allocatedResource) {
allocatedResource.forEach((k, v) -> {
Preconditions.checkArgument(this.availableResources.get(k) >= v,
String.format("Available resource %s not >= decreased resource %s",
this.availableResources.get(k), v));
Double newValue = this.availableResources.get(k) - v;
LOG.info("Decrease container {} resource [{}], from {} to {}.",
this.address, k, this.availableResources.get(k), newValue);
this.availableResources.put(k, newValue);
});
allocatedResource.forEach(
(k, v) -> {
Preconditions.checkArgument(
this.availableResources.get(k) >= v,
String.format(
"Available resource %s not >= decreased resource %s",
this.availableResources.get(k), v));
Double newValue = this.availableResources.get(k) - v;
LOG.info(
"Decrease container {} resource [{}], from {} to {}.",
this.address,
k,
this.availableResources.get(k),
newValue);
this.availableResources.put(k, newValue);
});
}
private void reclaimResource(Map<String, Double> allocatedResource) {
allocatedResource.forEach((k, v) -> {
Double newValue = this.availableResources.get(k) + v;
LOG.info("Reclaim container {} resource [{}], from {} to {}.",
this.address, k, this.availableResources.get(k), newValue);
this.availableResources.put(k, newValue);
});
allocatedResource.forEach(
(k, v) -> {
Double newValue = this.availableResources.get(k) + v;
LOG.info(
"Reclaim container {} resource [{}], from {} to {}.",
this.address,
k,
this.availableResources.get(k),
newValue);
this.availableResources.put(k, newValue);
});
}
@Override
@@ -192,4 +184,4 @@ public class Container implements Serializable {
.add("capacity", capacity)
.toString();
}
}
}
@@ -2,9 +2,5 @@ package io.ray.streaming.runtime.core.resource;
import io.ray.streaming.runtime.core.common.AbstractId;
/**
* Container unique identifier.
*/
public class ContainerId extends AbstractId {
}
/** Container unique identifier. */
public class ContainerId extends AbstractId {}
@@ -1,23 +1,15 @@
package io.ray.streaming.runtime.core.resource;
/**
* Key for different type of resources.
*/
/** Key for different type of resources. */
public enum ResourceType {
/**
* Cpu resource key.
*/
/** Cpu resource key. */
CPU("CPU"),
/**
* Gpu resource key.
*/
/** Gpu resource key. */
GPU("GPU"),
/**
* Memory resource key.
*/
/** Memory resource key. */
MEM("MEM");
private String value;
@@ -29,5 +21,4 @@ public enum ResourceType {
public String getValue() {
return value;
}
}
@@ -11,25 +11,20 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Resource description of ResourceManager.
*/
/** Resource description of ResourceManager. */
public class Resources implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(Resources.class);
/**
* Available containers registered to ResourceManager.
*/
/** Available containers registered to ResourceManager. */
private List<Container> registerContainers = new ArrayList<>();
public Resources() {
}
public Resources() {}
/**
* Get registered containers, the container list is read-only.
*
* @return container list.
* <p>Returns container list.
*/
public ImmutableList<Container> getRegisteredContainers() {
return ImmutableList.copyOf(registerContainers);
@@ -52,9 +47,9 @@ public class Resources implements Serializable {
}
public ImmutableMap<UniqueId, Container> getRegisteredContainerMap() {
return ImmutableMap.copyOf(registerContainers.stream()
.collect(java.util.stream.Collectors
.toMap(Container::getNodeId, c -> c)));
return ImmutableMap.copyOf(
registerContainers.stream()
.collect(java.util.stream.Collectors.toMap(Container::getNodeId, c -> c)));
}
@Override
@@ -67,8 +67,8 @@ public class JobMaster {
runtimeContext = new JobMasterRuntimeContext(streamingConfig);
// load checkpoint if is recover
if (!Ray.getRuntimeContext().isSingleProcess() && Ray.getRuntimeContext()
.wasCurrentActorRestarted()) {
if (!Ray.getRuntimeContext().isSingleProcess()
&& Ray.getRuntimeContext().wasCurrentActorRestarted()) {
loadMasterCheckpoint();
}
@@ -101,7 +101,7 @@ public class JobMaster {
/**
* Init JobMaster. To initiate or recover other components(like metrics and extra coordinators).
*
* @return init result
* <p>Returns init result
*/
public Boolean init(boolean isRecover) {
LOG.info("Initializing job master, isRecover={}.", isRecover);
@@ -128,15 +128,15 @@ public class JobMaster {
/**
* Submit job to run:
*
* <ol>
* <li> Using GraphManager to build physical plan according to the logical plan.</li>
* <li> Using ResourceManager to manage and allocate the resources.</li>
* <li> Using JobScheduler to schedule the job to run.</li>
* <li>Using GraphManager to build physical plan according to the logical plan.
* <li>Using ResourceManager to manage and allocate the resources.
* <li>Using JobScheduler to schedule the job to run.
* </ol>
*
* @param jobMasterActor JobMaster actor
* @param jobGraph logical plan
* @return submit result
* @param jobGraph logical plan Returns submit result
*/
public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
@@ -168,8 +168,8 @@ public class JobMaster {
LOG.debug("Save JobMaster context.");
byte[] contextBytes = Serializer.encode(runtimeContext);
CheckpointStateUtil
.put(contextBackend, getJobMasterRuntimeContextKey(getConf()), contextBytes);
CheckpointStateUtil.put(
contextBackend, getJobMasterRuntimeContextKey(getConf()), contextBytes);
}
}
@@ -180,8 +180,11 @@ public class JobMaster {
reportPb = RemoteCall.BaseWorkerCmd.parseFrom(reportBytes);
ActorId actorId = ActorId.fromBytes(reportPb.getActorId().toByteArray());
long remoteCallCost = System.currentTimeMillis() - reportPb.getTimestamp();
LOG.info("Vertex {}, request job worker commit cost {}ms, actorId={}.",
getExecutionVertex(actorId), remoteCallCost, actorId);
LOG.info(
"Vertex {}, request job worker commit cost {}ms, actorId={}.",
getExecutionVertex(actorId),
remoteCallCost,
actorId);
RemoteCall.WorkerCommitReport commit =
reportPb.getDetail().unpack(RemoteCall.WorkerCommitReport.class);
WorkerCommitReport report = new WorkerCommitReport(actorId, commit.getCommitCheckpointId());
@@ -206,27 +209,31 @@ public class JobMaster {
return RemoteCall.BoolResult.newBuilder().setBoolRes(false).build().toByteArray();
}
ExecutionVertex exeVertex = getExecutionVertex(actorId);
LOG.info("Vertex {}, request job worker rollback cost {}ms, actorId={}.",
exeVertex, remoteCallCost, actorId);
RemoteCall.WorkerRollbackRequest rollbackPb
= RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue());
LOG.info(
"Vertex {}, request job worker rollback cost {}ms, actorId={}.",
exeVertex,
remoteCallCost,
actorId);
RemoteCall.WorkerRollbackRequest rollbackPb =
RemoteCall.WorkerRollbackRequest.parseFrom(requestPb.getDetail().getValue());
exeVertex.setPid(rollbackPb.getWorkerPid());
// To find old container where slot is located in.
String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
resourceManager.getRegisteredContainers(),
exeVertex.getContainerId()
);
Optional<Container> container =
ResourceUtil.getContainerById(
resourceManager.getRegisteredContainers(), exeVertex.getContainerId());
if (container.isPresent()) {
hostname = container.get().getHostname();
}
WorkerRollbackRequest request = new WorkerRollbackRequest(
actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid()
);
WorkerRollbackRequest request =
new WorkerRollbackRequest(
actorId, rollbackPb.getExceptionMsg(), hostname, exeVertex.getPid());
ret = failoverCoordinator.requestJobWorkerRollback(request);
LOG.info("Vertex {} request rollback, exception msg : {}.",
exeVertex, rollbackPb.getExceptionMsg());
LOG.info(
"Vertex {} request rollback, exception msg : {}.",
exeVertex,
rollbackPb.getExceptionMsg());
} catch (Throwable e) {
LOG.error("Parse job worker rollback has exception.", e);
@@ -257,5 +264,4 @@ public class JobMaster {
public StreamingMasterConfig getConf() {
return conf;
}
}
@@ -8,6 +8,7 @@ import java.io.Serializable;
/**
* Runtime context for job master.
*
* <p>Including: graph, resource, checkpoint info, etc.
*/
public class JobRuntimeContext implements Serializable {
@@ -52,5 +53,4 @@ public class JobRuntimeContext implements Serializable {
.add("conf", conf.getMap())
.toString();
}
}
@@ -77,5 +77,4 @@ public class JobMasterRuntimeContext implements Serializable {
.add("conf", conf.getMap())
.toString();
}
}
@@ -25,8 +25,9 @@ public abstract class BaseCoordinator implements Runnable {
}
public void start() {
thread = new Thread(Ray.wrapRunnable(this),
this.getClass().getName() + "-" + System.currentTimeMillis());
thread =
new Thread(
Ray.wrapRunnable(this), this.getClass().getName() + "-" + System.currentTimeMillis());
thread.start();
}
@@ -20,8 +20,8 @@ import org.slf4j.LoggerFactory;
/**
* CheckpointCoordinator is the controller of checkpoint, responsible for triggering checkpoint,
* collecting {@link JobWorker}'s reports and calling {@link JobWorker} to clear expired
* checkpoints when new checkpoint finished.
* collecting {@link JobWorker}'s reports and calling {@link JobWorker} to clear expired checkpoints
* when new checkpoint finished.
*/
public class CheckpointCoordinator extends BaseCoordinator {
@@ -58,7 +58,8 @@ public class CheckpointCoordinator extends BaseCoordinator {
if (!pendingCheckpointActors.isEmpty()) {
// if wait commit report timeout, this cp fail, and restart next cp
if (timeoutOnWaitCheckpoint()) {
LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}.",
LOG.warn(
"Waiting for checkpoint {} timeout, pending cp actors is {}.",
runtimeContext.lastCheckpointId,
graphManager.getExecutionGraph().getActorName(pendingCheckpointActors));
@@ -90,14 +91,17 @@ public class CheckpointCoordinator extends BaseCoordinator {
}
private void processCommitReport(WorkerCommitReport commitReport) {
LOG.info("Start process commit report {}, from actor name={}.", commitReport,
LOG.info(
"Start process commit report {}, from actor name={}.",
commitReport,
graphManager.getExecutionGraph().getActorName(commitReport.fromActorId));
try {
Preconditions.checkArgument(
commitReport.commitCheckpointId == runtimeContext.lastCheckpointId,
"expect checkpointId %s, but got %s",
runtimeContext.lastCheckpointId, commitReport);
runtimeContext.lastCheckpointId,
commitReport);
if (!pendingCheckpointActors.contains(commitReport.fromActorId)) {
LOG.warn("Invalid commit report, skipped.");
@@ -105,7 +109,8 @@ public class CheckpointCoordinator extends BaseCoordinator {
}
pendingCheckpointActors.remove(commitReport.fromActorId);
LOG.info("Pending actors after this commit: {}.",
LOG.info(
"Pending actors after this commit: {}.",
graphManager.getExecutionGraph().getActorName(pendingCheckpointActors));
// checkpoint finish
@@ -144,10 +149,14 @@ public class CheckpointCoordinator extends BaseCoordinator {
final List<ObjectRef> sourcesRet = new ArrayList<>();
graphManager.getExecutionGraph().getSourceActors().forEach(actor -> {
sourcesRet.add(RemoteCallWorker.triggerCheckpoint(
actor, runtimeContext.lastCheckpointId));
});
graphManager
.getExecutionGraph()
.getSourceActors()
.forEach(
actor -> {
sourcesRet.add(
RemoteCallWorker.triggerCheckpoint(actor, runtimeContext.lastCheckpointId));
});
for (ObjectRef rayObject : sourcesRet) {
if (rayObject.get() instanceof RayException) {
@@ -171,8 +180,7 @@ public class CheckpointCoordinator extends BaseCoordinator {
List<BaseActorHandle> allActor = graphManager.getExecutionGraph().getAllActors();
if (runtimeContext.lastCheckpointId > runtimeContext.getLastValidCheckpointId()) {
RemoteCallWorker
.notifyCheckpointTimeoutParallel(allActor, runtimeContext.lastCheckpointId);
RemoteCallWorker.notifyCheckpointTimeoutParallel(allActor, runtimeContext.lastCheckpointId);
}
if (!pendingCheckpointActors.isEmpty()) {
@@ -198,15 +206,14 @@ public class CheckpointCoordinator extends BaseCoordinator {
if (runtimeContext.checkpointIds.size() > 1) {
Long stateExpiredCpId = runtimeContext.checkpointIds.remove(0);
Long msgExpiredCheckpointId = runtimeContext.checkpointIds.get(0);
RemoteCallWorker
.clearExpiredCheckpointParallel(allActor, stateExpiredCpId, msgExpiredCheckpointId);
RemoteCallWorker.clearExpiredCheckpointParallel(
allActor, stateExpiredCpId, msgExpiredCheckpointId);
}
return true;
}
private boolean readyToTrigger() {
return (System.currentTimeMillis() - runtimeContext.lastCpTimestamp) >=
cpIntervalSecs * 1000;
return (System.currentTimeMillis() - runtimeContext.lastCpTimestamp) >= cpIntervalSecs * 1000;
}
private boolean timeoutOnWaitCheckpoint() {
@@ -39,8 +39,7 @@ public class FailoverCoordinator extends BaseCoordinator {
}
public FailoverCoordinator(
JobMaster jobMaster, AsyncRemoteCaller asyncRemoteCaller,
boolean isRecover) {
JobMaster jobMaster, AsyncRemoteCaller asyncRemoteCaller, boolean isRecover) {
super(jobMaster);
this.asyncRemoteCaller = asyncRemoteCaller;
@@ -111,8 +110,8 @@ public class FailoverCoordinator extends BaseCoordinator {
ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest);
// Reset pid for new-rollback actor.
if (null != rollbackRequest.getPid() &&
!rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) {
if (null != rollbackRequest.getPid()
&& !rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) {
exeVertex.setPid(rollbackRequest.getPid());
}
@@ -122,10 +121,9 @@ public class FailoverCoordinator extends BaseCoordinator {
}
String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(),
exeVertex.getContainerId()
);
Optional<Container> container =
ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(), exeVertex.getContainerId());
if (container.isPresent()) {
hostname = container.get().getHostname();
}
@@ -133,16 +131,22 @@ public class FailoverCoordinator extends BaseCoordinator {
if (rollbackRequest.isForcedRollback) {
interruptCheckpointAndRollback(rollbackRequest);
} else {
asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertex.getWorkerActor(), res -> {
if (!res) {
LOG.info("Vertex {} doesn't need to rollback, skip it.", exeVertex);
return;
}
interruptCheckpointAndRollback(rollbackRequest);
}, throwable -> {
LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead" +
", ignore this request, vertex={}.", exeVertex, throwable);
});
asyncRemoteCaller.checkIfNeedRollbackAsync(
exeVertex.getWorkerActor(),
res -> {
if (!res) {
LOG.info("Vertex {} doesn't need to rollback, skip it.", exeVertex);
return;
}
interruptCheckpointAndRollback(rollbackRequest);
},
throwable -> {
LOG.error(
"Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead"
+ ", ignore this request, vertex={}.",
exeVertex,
throwable);
});
}
LOG.info("Deal with rollback request {} success.", rollbackRequest);
@@ -154,7 +158,9 @@ public class FailoverCoordinator extends BaseCoordinator {
rollbackRequest.cascadingGroupId = currentCascadingGroupId++;
}
// get last valid checkpoint id then call worker rollback
rollback(jobMaster.getRuntimeContext().getLastValidCheckpointId(), rollbackRequest,
rollback(
jobMaster.getRuntimeContext().getLastValidCheckpointId(),
rollbackRequest,
currentCascadingGroupId);
// we interrupt current checkpoint for 2 considerations:
// 1. current checkpoint might be timeout, because barrier might be lost after failover. so we
@@ -165,66 +171,83 @@ public class FailoverCoordinator extends BaseCoordinator {
}
/**
* call worker rollback, and deal with it's reports. callback won't be finished until
* the entire DAG back to normal.
* call worker rollback, and deal with it's reports. callback won't be finished until the entire
* DAG back to normal.
*
* @param checkpointId checkpointId to be rollback
* @param rollbackRequest worker rollback request
* @param cascadingGroupId all rollback of a cascading group should have same ID
*/
private void rollback(
long checkpointId, WorkerRollbackRequest rollbackRequest,
long cascadingGroupId) {
long checkpointId, WorkerRollbackRequest rollbackRequest, long cascadingGroupId) {
ExecutionVertex exeVertex = getExeVertexFromRequest(rollbackRequest);
LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.",
exeVertex, checkpointId, cascadingGroupId);
LOG.info(
"Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.",
exeVertex,
checkpointId,
cascadingGroupId);
isRollbacking.put(exeVertex, true);
asyncRemoteCaller.rollback(exeVertex.getWorkerActor(), checkpointId, result -> {
List<WorkerRollbackRequest> newRollbackRequests = new ArrayList<>();
switch (result.getResultEnum()) {
case SUCCESS:
ChannelRecoverInfo recoverInfo = result.getResultObj();
LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.",
exeVertex, recoverInfo.getDataLostQueues(), result.getResultMsg(), cascadingGroupId);
// rollback upstream if vertex reports abnormal input queues
newRollbackRequests =
cascadeUpstreamActors(recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId);
break;
case SKIPPED:
LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}.", result,
cascadingGroupId);
break;
default:
LOG.error(
"Rollback vertex {} failed, result={}, cascadingGroupId={}," +
" rollback this worker again after {} ms.",
exeVertex, result, cascadingGroupId, ROLLBACK_RETRY_TIME_MS);
Thread.sleep(ROLLBACK_RETRY_TIME_MS);
LOG.info("Add rollback request for {} again, cascadingGroupId={}.", exeVertex,
cascadingGroupId);
newRollbackRequests.add(
new WorkerRollbackRequest(exeVertex, "", "Rollback failed, try again.", false)
);
break;
}
asyncRemoteCaller.rollback(
exeVertex.getWorkerActor(),
checkpointId,
result -> {
List<WorkerRollbackRequest> newRollbackRequests = new ArrayList<>();
switch (result.getResultEnum()) {
case SUCCESS:
ChannelRecoverInfo recoverInfo = result.getResultObj();
LOG.info(
"Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.",
exeVertex,
recoverInfo.getDataLostQueues(),
result.getResultMsg(),
cascadingGroupId);
// rollback upstream if vertex reports abnormal input queues
newRollbackRequests =
cascadeUpstreamActors(
recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId);
break;
case SKIPPED:
LOG.info(
"Vertex skip rollback, result = {}, cascadingGroupId={}.",
result,
cascadingGroupId);
break;
default:
LOG.error(
"Rollback vertex {} failed, result={}, cascadingGroupId={},"
+ " rollback this worker again after {} ms.",
exeVertex,
result,
cascadingGroupId,
ROLLBACK_RETRY_TIME_MS);
Thread.sleep(ROLLBACK_RETRY_TIME_MS);
LOG.info(
"Add rollback request for {} again, cascadingGroupId={}.",
exeVertex,
cascadingGroupId);
newRollbackRequests.add(
new WorkerRollbackRequest(exeVertex, "", "Rollback failed, try again.", false));
break;
}
// lock to avoid executing new rollback requests added.
// consider such a case: A->B->C, C cascade B, and B cascade A
// if B is rollback before B's rollback request is saved, and then JobMaster crashed,
// then A will never be rollback.
synchronized (cmdLock) {
jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests);
// this rollback request is finished, remove it.
jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest);
jobMaster.saveContext();
}
isRollbacking.put(exeVertex, false);
}, throwable -> {
LOG.error("Exception when calling vertex to rollback, vertex={}.", exeVertex, throwable);
isRollbacking.put(exeVertex, false);
});
// lock to avoid executing new rollback requests added.
// consider such a case: A->B->C, C cascade B, and B cascade A
// if B is rollback before B's rollback request is saved, and then JobMaster crashed,
// then A will never be rollback.
synchronized (cmdLock) {
jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests);
// this rollback request is finished, remove it.
jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest);
jobMaster.saveContext();
}
isRollbacking.put(exeVertex, false);
},
throwable -> {
LOG.error("Exception when calling vertex to rollback, vertex={}.", exeVertex, throwable);
isRollbacking.put(exeVertex, false);
});
LOG.info("Finish rollback vertex {}, checkpoint id is {}.", exeVertex, checkpointId);
}
@@ -233,32 +256,39 @@ public class FailoverCoordinator extends BaseCoordinator {
Set<String> dataLostQueues, ExecutionVertex fromVertex, long cascadingGroupId) {
List<WorkerRollbackRequest> cascadedRollbackRequest = new ArrayList<>();
// rollback upstream if vertex reports abnormal input queues
dataLostQueues.forEach(q -> {
BaseActorHandle upstreamActor =
graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), q);
ExecutionVertex upstreamExeVertex = getExecutionVertex(upstreamActor);
// vertexes that has already cascaded by other vertex in the same level
// of graph should be ignored.
if (isRollbacking.get(upstreamExeVertex)) {
return;
}
LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.",
upstreamExeVertex, fromVertex, cascadingGroupId);
String hostname = "";
Optional<Container> container = ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(),
upstreamExeVertex.getContainerId()
);
if (container.isPresent()) {
hostname = container.get().getHostname();
}
// force upstream vertexes to rollback
WorkerRollbackRequest upstreamRequest = new WorkerRollbackRequest(
upstreamExeVertex, hostname, String.format("Cascading rollback from %s", fromVertex), true
);
upstreamRequest.cascadingGroupId = cascadingGroupId;
cascadedRollbackRequest.add(upstreamRequest);
});
dataLostQueues.forEach(
q -> {
BaseActorHandle upstreamActor =
graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), q);
ExecutionVertex upstreamExeVertex = getExecutionVertex(upstreamActor);
// vertexes that has already cascaded by other vertex in the same level
// of graph should be ignored.
if (isRollbacking.get(upstreamExeVertex)) {
return;
}
LOG.info(
"Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.",
upstreamExeVertex,
fromVertex,
cascadingGroupId);
String hostname = "";
Optional<Container> container =
ResourceUtil.getContainerById(
jobMaster.getResourceManager().getRegisteredContainers(),
upstreamExeVertex.getContainerId());
if (container.isPresent()) {
hostname = container.get().getHostname();
}
// force upstream vertexes to rollback
WorkerRollbackRequest upstreamRequest =
new WorkerRollbackRequest(
upstreamExeVertex,
hostname,
String.format("Cascading rollback from %s", fromVertex),
true);
upstreamRequest.cascadingGroupId = cascadingGroupId;
cascadedRollbackRequest.add(upstreamRequest);
});
return cascadedRollbackRequest;
}
@@ -7,11 +7,9 @@ public abstract class BaseWorkerCmd implements Serializable {
public ActorId fromActorId;
public BaseWorkerCmd() {
}
public BaseWorkerCmd() {}
protected BaseWorkerCmd(ActorId actorId) {
this.fromActorId = actorId;
}
}
@@ -1,5 +1,3 @@
package io.ray.streaming.runtime.master.coordinator.command;
public final class InterruptCheckpointRequest extends BaseWorkerCmd {
}
public final class InterruptCheckpointRequest extends BaseWorkerCmd {}
@@ -23,10 +23,7 @@ public final class WorkerRollbackRequest extends BaseWorkerCmd {
}
public WorkerRollbackRequest(
ExecutionVertex executionVertex,
String hostname,
String msg,
boolean isForcedRollback) {
ExecutionVertex executionVertex, String hostname, String msg, boolean isForcedRollback) {
super(executionVertex.getWorkerActorId());
@@ -56,8 +53,6 @@ public final class WorkerRollbackRequest extends BaseWorkerCmd {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("fromActorId", fromActorId)
.toString();
return MoreObjects.toStringHelper(this).add("fromActorId", fromActorId).toString();
}
}

Some files were not shown because too many files have changed in this diff Show More