[Streaming] java cross lang streaming graph (#6689)

This commit is contained in:
chaokunyang
2020-01-08 17:32:35 +08:00
committed by Hao Chen
parent 91a3fa0157
commit 70c7d47c09
27 changed files with 499 additions and 89 deletions
@@ -86,8 +86,8 @@ public class DataStream<T> extends Stream<T> {
* @param sinkFunction The sink function.
* @return A new StreamSink.
*/
public StreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new StreamSink<>(this, new SinkOperator(sinkFunction));
public DataStreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new DataStreamSink<>(this, new SinkOperator(sinkFunction));
}
/**
@@ -0,0 +1,21 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.operator.impl.SinkOperator;
/**
* Represents a sink of the DataStream.
*
* @param <T> Type of the input data of this sink.
*/
public class DataStreamSink<T> extends StreamSink<T> {
public DataStreamSink(DataStream<T> input, SinkOperator sinkOperator) {
super(input, sinkOperator);
this.streamingContext.addSink(this);
}
public DataStreamSink<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,39 @@
package org.ray.streaming.api.stream;
import java.util.Collection;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.SourceFunction;
import org.ray.streaming.api.function.internal.CollectionSourceFunction;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* Represents a source of the DataStream.
*
* @param <T> The type of StreamSource data.
*/
public class DataStreamSource<T> extends DataStream<T> implements StreamSource<T> {
public DataStreamSource(StreamingContext streamingContext, SourceFunction<T> sourceFunction) {
super(streamingContext, new SourceOperator<>(sourceFunction));
super.partition = new RoundRobinPartition<>();
}
/**
* Build a DataStreamSource source from a collection.
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data.
* @return A DataStreamSource.
*/
public static <T> DataStreamSource<T> buildSource(
StreamingContext context, Collection<T> values) {
return new DataStreamSource(context, new CollectionSourceFunction(values));
}
public DataStreamSource<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -1,10 +1,8 @@
package org.ray.streaming.api.stream;
import java.io.Serializable;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.JoinFunction;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a DataStream of two joined DataStream.
@@ -15,10 +13,6 @@ import org.ray.streaming.operator.StreamOperator;
*/
public class JoinStream<L, R, J> extends DataStream<L> {
public JoinStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public JoinStream(DataStream<L> leftStream, DataStream<R> rightStream) {
super(leftStream, null);
}
@@ -68,7 +62,7 @@ public class JoinStream<L, R, J> extends DataStream<L> {
private KeyFunction<R, K> rightKeyByFunction;
public Equal(JoinStream<L, R, J> joinStream, KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
KeyFunction<R, K> rightKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
this.rightKeyByFunction = rightKeyByFunction;
@@ -1,6 +1,5 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.AggregateFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.partition.impl.KeyPartition;
@@ -15,10 +14,6 @@ import org.ray.streaming.operator.impl.ReduceOperator;
*/
public class KeyDataStream<K, T> extends DataStream<T> {
public KeyDataStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public KeyDataStream(DataStream<T> input, StreamOperator streamOperator) {
super(input, streamOperator);
this.partition = new KeyPartition();
@@ -5,6 +5,9 @@ import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.python.PythonOperator;
import org.ray.streaming.python.PythonPartition;
import org.ray.streaming.python.stream.PythonStream;
/**
* Abstract base class of all stream types.
@@ -12,7 +15,6 @@ import org.ray.streaming.operator.StreamOperator;
* @param <T> Type of the data in the stream.
*/
public abstract class Stream<T> implements Serializable {
protected int id;
protected int parallelism = 1;
protected StreamOperator operator;
@@ -20,11 +22,16 @@ public abstract class Stream<T> implements Serializable {
protected StreamingContext streamingContext;
protected Partition<T> partition;
@SuppressWarnings("unchecked")
public Stream(StreamingContext streamingContext, StreamOperator streamOperator) {
this.streamingContext = streamingContext;
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
if (streamOperator instanceof PythonOperator) {
this.partition = PythonPartition.RoundRobinPartition;
} else {
this.partition = new RoundRobinPartition<>();
}
}
public Stream(Stream<T> inputStream, StreamOperator streamOperator) {
@@ -33,7 +40,16 @@ public abstract class Stream<T> implements Serializable {
this.streamingContext = this.inputStream.getStreamingContext();
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
this.partition = selectPartition();
}
@SuppressWarnings("unchecked")
private Partition<T> selectPartition() {
if (inputStream instanceof PythonStream) {
return PythonPartition.RoundRobinPartition;
} else {
return new RoundRobinPartition<>();
}
}
public Stream<T> getInputStream() {
@@ -44,6 +60,10 @@ public abstract class Stream<T> implements Serializable {
return operator;
}
public void setOperator(StreamOperator operator) {
this.operator = operator;
}
public StreamingContext getStreamingContext() {
return streamingContext;
}
@@ -1,21 +1,14 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.operator.impl.SinkOperator;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a sink of the DataStream.
* Represents a sink of the Stream.
*
* @param <T> Type of the input data of this sink.
*/
public class StreamSink<T> extends Stream<T> {
public StreamSink(DataStream<T> input, SinkOperator sinkOperator) {
super(input, sinkOperator);
this.streamingContext.addSink(this);
}
public StreamSink<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
public StreamSink(Stream<T> inputStream, StreamOperator streamOperator) {
super(inputStream, streamOperator);
}
}
@@ -1,36 +1,9 @@
package org.ray.streaming.api.stream;
import java.util.Collection;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.SourceFunction;
import org.ray.streaming.api.function.internal.CollectionSourceFunction;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* Represents a source of the DataStream.
* A mark interface that represents a source of the Stream.
*
* @param <T> The type of StreamSource data.
*/
public class StreamSource<T> extends DataStream<T> {
public StreamSource(StreamingContext streamingContext, SourceFunction<T> sourceFunction) {
super(streamingContext, new SourceOperator<>(sourceFunction));
}
/**
* Build a StreamSource source from a collection.
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data.
* @return A StreamSource.
*/
public static <T> StreamSource<T> buildSource(StreamingContext context, Collection<T> values) {
return new StreamSource(context, new CollectionSourceFunction(values));
}
public StreamSource<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public interface StreamSource<T> {
}
@@ -7,6 +7,7 @@ import org.ray.streaming.api.stream.Stream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.python.stream.PythonDataStream;
public class PlanBuilder {
@@ -44,7 +45,7 @@ public class PlanBuilder {
processStream(parentStream);
} else if (stream instanceof StreamSource) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator);
} else if (stream instanceof DataStream) {
} else if (stream instanceof DataStream || stream instanceof PythonDataStream) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
@@ -0,0 +1,92 @@
package org.ray.streaming.python;
import org.ray.streaming.api.function.Function;
/**
* Represents a user defined python function.
*
* <p>Python worker can use information in this class to create a function object.</p>
*
* <p>If this object is constructed from serialized python function,
* python worker can deserialize it to create python function directly.
* If this object is constructed from moduleName and className/functionName,
* python worker will use `importlib` to load python function.</p>
*
* <p>If the python data stream api is invoked from python, `function` will be not null.</p>
* <p>If the python data stream api is invoked from java, `moduleName` and
* `className`/`functionName` will be not null.</p>
* <p>
* TODO serialize to bytes using protobuf
*/
public class PythonFunction implements Function {
public enum FunctionInterface {
SOURCE_FUNCTION("ray.streaming.function.SourceFunction"),
MAP_FUNCTION("ray.streaming.function.MapFunction"),
FLAT_MAP_FUNCTION("ray.streaming.function.FlatMapFunction"),
FILTER_FUNCTION("ray.streaming.function.FilterFunction"),
KEY_FUNCTION("ray.streaming.function.KeyFunction"),
REDUCE_FUNCTION("ray.streaming.function.ReduceFunction"),
SINK_FUNCTION("ray.streaming.function.SinkFunction");
private String functionInterface;
FunctionInterface(String functionInterface) {
this.functionInterface = functionInterface;
}
}
private byte[] function;
private String moduleName;
private String className;
private String functionName;
/**
* FunctionInterface can be used to validate python function,
* and look up operator class from FunctionInterface.
*/
private String functionInterface;
private PythonFunction(byte[] function,
String moduleName,
String className,
String functionName) {
this.function = function;
this.moduleName = moduleName;
this.className = className;
this.functionName = functionName;
}
public void setFunctionInterface(FunctionInterface functionInterface) {
this.functionInterface = functionInterface.functionInterface;
}
/**
* Create a {@link PythonFunction} using python serialized function
*
* @param function serialized python function sent from python driver
*/
public static PythonFunction fromFunction(byte[] function) {
return new PythonFunction(function, null, null, null);
}
/**
* Create a {@link PythonFunction} using <code>moduleName</code> and
* <code>className</code>.
*
* @param moduleName python module name
* @param className python class name
*/
public static PythonFunction fromClassName(String moduleName, String className) {
return new PythonFunction(null, moduleName, className, null);
}
/**
* Create a {@link PythonFunction} using <code>moduleName</code> and
* <code>functionName</code>.
*
* @param moduleName python module name
* @param functionName python function name
*/
public static PythonFunction fromFunctionName(String moduleName, String functionName) {
return new PythonFunction(null, moduleName, null, functionName);
}
}
@@ -0,0 +1,43 @@
package org.ray.streaming.python;
import java.util.List;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a {@link StreamOperator} that wraps python {@link PythonFunction}.
*/
@SuppressWarnings("unchecked")
public class PythonOperator extends StreamOperator {
public PythonOperator(PythonFunction function) {
super(function);
}
@Override
public void open(List list, RuntimeContext runtimeContext) {
String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
@Override
public void finish() {
String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
@Override
public void close() {
String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
@Override
public OperatorType getOpType() {
String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
}
@@ -0,0 +1,48 @@
package org.ray.streaming.python;
import org.ray.streaming.api.partition.Partition;
/**
* Represents a python partition function.
* <p>
* Python worker can create a partition object using information in this
* PythonPartition.
* <p>
* If this object is constructed from serialized python partition,
* python worker can deserialize it to create python partition directly.
* If this object is constructed from moduleName and className/functionName,
* python worker will use `importlib` to load python partition function.
* <p>
* TODO serialize to bytes using protobuf
*/
public class PythonPartition implements Partition {
public static final PythonPartition BroadcastPartition = new PythonPartition(
"ray.streaming.partition", "BroadcastPartition", null);
public static final PythonPartition KeyPartition = new PythonPartition(
"ray.streaming.partition", "KeyPartition", null);
public static final PythonPartition RoundRobinPartition = new PythonPartition(
"ray.streaming.partition", "RoundRobinPartition", null);
private byte[] partition;
private String moduleName;
private String className;
private String functionName;
public PythonPartition(byte[] partition) {
this.partition = partition;
}
public PythonPartition(String moduleName, String className, String functionName) {
this.moduleName = moduleName;
this.className = className;
this.functionName = functionName;
}
@Override
public int[] partition(Object record, int numPartition) {
String msg = String.format("partition method of %s shouldn't be called.",
getClass().getSimpleName());
throw new UnsupportedOperationException(msg);
}
}
@@ -0,0 +1,100 @@
package org.ray.streaming.python.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.Stream;
import org.ray.streaming.python.PythonFunction;
import org.ray.streaming.python.PythonFunction.FunctionInterface;
import org.ray.streaming.python.PythonOperator;
import org.ray.streaming.python.PythonPartition;
/**
* Represents a stream of data whose transformations will be executed in python.
*/
public class PythonDataStream extends Stream implements PythonStream {
protected PythonDataStream(StreamingContext streamingContext,
PythonOperator pythonOperator) {
super(streamingContext, pythonOperator);
}
protected PythonDataStream(Stream inputStream, PythonOperator pythonOperator) {
super(inputStream, pythonOperator);
}
/**
* Apply a map function to this stream.
*
* @param func The python MapFunction.
* @return A new PythonDataStream.
*/
public PythonDataStream map(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.MAP_FUNCTION);
return new PythonDataStream(this, new PythonOperator(func));
}
/**
* Apply a flat-map function to this stream.
*
* @param func The python FlapMapFunction.
* @return A new PythonDataStream
*/
public PythonDataStream flatMap(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FLAT_MAP_FUNCTION);
return new PythonDataStream(this, new PythonOperator(func));
}
/**
* Apply a sink function and get a StreamSink.
*
* @param func The python SinkFunction.
* @return A new StreamSink.
*/
public PythonStreamSink sink(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.SINK_FUNCTION);
return new PythonStreamSink(this, new PythonOperator(func));
}
/**
* Apply a key-by function to this stream.
*
* @param func the python keyFunction.
* @return A new KeyDataStream.
*/
public PythonKeyDataStream keyBy(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.KEY_FUNCTION);
return new PythonKeyDataStream(this, new PythonOperator(func));
}
/**
* Apply broadcast to this stream.
*
* @return This stream.
*/
public PythonDataStream broadcast() {
this.partition = PythonPartition.BroadcastPartition;
return this;
}
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy.
* @return This stream.
*/
public PythonDataStream partitionBy(PythonPartition partition) {
this.partition = partition;
return this;
}
/**
* Set parallelism to current transformation.
*
* @param parallelism The parallelism to set.
* @return This stream.
*/
public PythonDataStream setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,36 @@
package org.ray.streaming.python.stream;
import org.ray.streaming.api.stream.Stream;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.python.PythonFunction;
import org.ray.streaming.python.PythonFunction.FunctionInterface;
import org.ray.streaming.python.PythonOperator;
import org.ray.streaming.python.PythonPartition;
/**
* Represents a python DataStream returned by a key-by operation.
*/
public class PythonKeyDataStream extends Stream implements PythonStream {
public PythonKeyDataStream(PythonDataStream input, StreamOperator streamOperator) {
super(input, streamOperator);
this.partition = PythonPartition.KeyPartition;
}
/**
* Apply a reduce function to this stream.
*
* @param func The reduce function.
* @return A new DataStream.
*/
public PythonDataStream reduce(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.REDUCE_FUNCTION);
return new PythonDataStream(this, new PythonOperator(func));
}
public PythonKeyDataStream setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,7 @@
package org.ray.streaming.python.stream;
/**
* A marker interface used to identify all python streams.
*/
public interface PythonStream {
}
@@ -0,0 +1,20 @@
package org.ray.streaming.python.stream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.python.PythonOperator;
/**
* Represents a sink of the PythonStream.
*/
public class PythonStreamSink extends StreamSink implements PythonStream {
public PythonStreamSink(PythonDataStream input, PythonOperator sinkOperator) {
super(input, null);
this.streamingContext.addSink(this);
}
public PythonStreamSink setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,31 @@
package org.ray.streaming.python.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.python.PythonFunction;
import org.ray.streaming.python.PythonFunction.FunctionInterface;
import org.ray.streaming.python.PythonOperator;
import org.ray.streaming.python.PythonPartition;
/**
* Represents a source of the PythonStream.
*/
public class PythonStreamSource extends PythonDataStream implements StreamSource {
private PythonStreamSource(StreamingContext streamingContext, PythonFunction sourceFunction) {
super(streamingContext, new PythonOperator(sourceFunction));
super.partition = PythonPartition.RoundRobinPartition;
}
public PythonStreamSource setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public static PythonStreamSource from(StreamingContext streamingContext,
PythonFunction sourceFunction) {
sourceFunction.setFunctionInterface(FunctionInterface.SOURCE_FUNCTION);
return new PythonStreamSource(streamingContext, sourceFunction);
}
}
@@ -6,8 +6,8 @@ import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.impl.KeyPartition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.api.stream.DataStreamSink;
import org.ray.streaming.api.stream.DataStreamSource;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +39,9 @@ public class PlanBuilderTest {
public Plan buildDataSyncPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
DataStream<String> dataStream = DataStreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
@@ -74,9 +74,9 @@ public class PlanBuilderTest {
public Plan buildKeyByPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
DataStream<String> dataStream = DataStreamSource.buildSource(streamingContext,
Lists.newArrayList("1", "2", "3", "4"));
StreamSink streamSink = dataStream.keyBy(x -> x)
DataStreamSink streamSink = dataStream.keyBy(x -> x)
.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));