diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStream.java index a1989b9f2..7f99f87b3 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStream.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStream.java @@ -86,8 +86,8 @@ public class DataStream extends Stream { * @param sinkFunction The sink function. * @return A new StreamSink. */ - public StreamSink sink(SinkFunction sinkFunction) { - return new StreamSink<>(this, new SinkOperator(sinkFunction)); + public DataStreamSink sink(SinkFunction sinkFunction) { + return new DataStreamSink<>(this, new SinkOperator(sinkFunction)); } /** diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSink.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSink.java new file mode 100644 index 000000000..53b6db4b6 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSink.java @@ -0,0 +1,21 @@ +package org.ray.streaming.api.stream; + +import org.ray.streaming.operator.impl.SinkOperator; + +/** + * Represents a sink of the DataStream. + * + * @param Type of the input data of this sink. + */ +public class DataStreamSink extends StreamSink { + + public DataStreamSink(DataStream input, SinkOperator sinkOperator) { + super(input, sinkOperator); + this.streamingContext.addSink(this); + } + + public DataStreamSink setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java new file mode 100644 index 000000000..930f6b51a --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/DataStreamSource.java @@ -0,0 +1,39 @@ +package org.ray.streaming.api.stream; + +import java.util.Collection; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.impl.SourceFunction; +import org.ray.streaming.api.function.internal.CollectionSourceFunction; +import org.ray.streaming.api.partition.impl.RoundRobinPartition; +import org.ray.streaming.operator.impl.SourceOperator; + +/** + * Represents a source of the DataStream. + * + * @param The type of StreamSource data. + */ +public class DataStreamSource extends DataStream implements StreamSource { + + public DataStreamSource(StreamingContext streamingContext, SourceFunction sourceFunction) { + super(streamingContext, new SourceOperator<>(sourceFunction)); + super.partition = new RoundRobinPartition<>(); + } + + /** + * Build a DataStreamSource source from a collection. + * + * @param context Stream context. + * @param values A collection of values. + * @param The type of source data. + * @return A DataStreamSource. + */ + public static DataStreamSource buildSource( + StreamingContext context, Collection values) { + return new DataStreamSource(context, new CollectionSourceFunction(values)); + } + + public DataStreamSource setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/JoinStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/JoinStream.java index 69cb8fe79..e2117aaee 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/JoinStream.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/JoinStream.java @@ -1,10 +1,8 @@ package org.ray.streaming.api.stream; import java.io.Serializable; -import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.function.impl.JoinFunction; import org.ray.streaming.api.function.impl.KeyFunction; -import org.ray.streaming.operator.StreamOperator; /** * Represents a DataStream of two joined DataStream. @@ -15,10 +13,6 @@ import org.ray.streaming.operator.StreamOperator; */ public class JoinStream extends DataStream { - public JoinStream(StreamingContext streamingContext, StreamOperator streamOperator) { - super(streamingContext, streamOperator); - } - public JoinStream(DataStream leftStream, DataStream rightStream) { super(leftStream, null); } @@ -68,7 +62,7 @@ public class JoinStream extends DataStream { private KeyFunction rightKeyByFunction; public Equal(JoinStream joinStream, KeyFunction leftKeyByFunction, - KeyFunction rightKeyByFunction) { + KeyFunction rightKeyByFunction) { this.joinStream = joinStream; this.leftKeyByFunction = leftKeyByFunction; this.rightKeyByFunction = rightKeyByFunction; diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java index 16cc3d104..e388bb88a 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java @@ -1,6 +1,5 @@ package org.ray.streaming.api.stream; -import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.function.impl.AggregateFunction; import org.ray.streaming.api.function.impl.ReduceFunction; import org.ray.streaming.api.partition.impl.KeyPartition; @@ -15,10 +14,6 @@ import org.ray.streaming.operator.impl.ReduceOperator; */ public class KeyDataStream extends DataStream { - public KeyDataStream(StreamingContext streamingContext, StreamOperator streamOperator) { - super(streamingContext, streamOperator); - } - public KeyDataStream(DataStream input, StreamOperator streamOperator) { super(input, streamOperator); this.partition = new KeyPartition(); diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/Stream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/Stream.java index 52cb53b1d..2e49f4db5 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/Stream.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/Stream.java @@ -5,6 +5,9 @@ import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.partition.Partition; import org.ray.streaming.api.partition.impl.RoundRobinPartition; import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.python.PythonOperator; +import org.ray.streaming.python.PythonPartition; +import org.ray.streaming.python.stream.PythonStream; /** * Abstract base class of all stream types. @@ -12,7 +15,6 @@ import org.ray.streaming.operator.StreamOperator; * @param Type of the data in the stream. */ public abstract class Stream implements Serializable { - protected int id; protected int parallelism = 1; protected StreamOperator operator; @@ -20,11 +22,16 @@ public abstract class Stream implements Serializable { protected StreamingContext streamingContext; protected Partition partition; + @SuppressWarnings("unchecked") public Stream(StreamingContext streamingContext, StreamOperator streamOperator) { this.streamingContext = streamingContext; this.operator = streamOperator; this.id = streamingContext.generateId(); - this.partition = new RoundRobinPartition<>(); + if (streamOperator instanceof PythonOperator) { + this.partition = PythonPartition.RoundRobinPartition; + } else { + this.partition = new RoundRobinPartition<>(); + } } public Stream(Stream inputStream, StreamOperator streamOperator) { @@ -33,7 +40,16 @@ public abstract class Stream implements Serializable { this.streamingContext = this.inputStream.getStreamingContext(); this.operator = streamOperator; this.id = streamingContext.generateId(); - this.partition = new RoundRobinPartition<>(); + this.partition = selectPartition(); + } + + @SuppressWarnings("unchecked") + private Partition selectPartition() { + if (inputStream instanceof PythonStream) { + return PythonPartition.RoundRobinPartition; + } else { + return new RoundRobinPartition<>(); + } } public Stream getInputStream() { @@ -44,6 +60,10 @@ public abstract class Stream implements Serializable { return operator; } + public void setOperator(StreamOperator operator) { + this.operator = operator; + } + public StreamingContext getStreamingContext() { return streamingContext; } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSink.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSink.java index 00ec34ef4..e3bc0b66d 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSink.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSink.java @@ -1,21 +1,14 @@ package org.ray.streaming.api.stream; -import org.ray.streaming.operator.impl.SinkOperator; +import org.ray.streaming.operator.StreamOperator; /** - * Represents a sink of the DataStream. + * Represents a sink of the Stream. * * @param Type of the input data of this sink. */ public class StreamSink extends Stream { - - public StreamSink(DataStream input, SinkOperator sinkOperator) { - super(input, sinkOperator); - this.streamingContext.addSink(this); - } - - public StreamSink setParallelism(int parallelism) { - this.parallelism = parallelism; - return this; + public StreamSink(Stream inputStream, StreamOperator streamOperator) { + super(inputStream, streamOperator); } } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSource.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSource.java index 05d1ddd91..e6e7df3e7 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSource.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/stream/StreamSource.java @@ -1,36 +1,9 @@ package org.ray.streaming.api.stream; -import java.util.Collection; -import org.ray.streaming.api.context.StreamingContext; -import org.ray.streaming.api.function.impl.SourceFunction; -import org.ray.streaming.api.function.internal.CollectionSourceFunction; -import org.ray.streaming.operator.impl.SourceOperator; - /** - * Represents a source of the DataStream. + * A mark interface that represents a source of the Stream. * * @param The type of StreamSource data. */ -public class StreamSource extends DataStream { - - public StreamSource(StreamingContext streamingContext, SourceFunction sourceFunction) { - super(streamingContext, new SourceOperator<>(sourceFunction)); - } - - /** - * Build a StreamSource source from a collection. - * - * @param context Stream context. - * @param values A collection of values. - * @param The type of source data. - * @return A StreamSource. - */ - public static StreamSource buildSource(StreamingContext context, Collection values) { - return new StreamSource(context, new CollectionSourceFunction(values)); - } - - public StreamSource setParallelism(int parallelism) { - this.parallelism = parallelism; - return this; - } +public interface StreamSource { } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java index 5ae09cdbc..3b12cca46 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java @@ -7,6 +7,7 @@ import org.ray.streaming.api.stream.Stream; import org.ray.streaming.api.stream.StreamSink; import org.ray.streaming.api.stream.StreamSource; import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.python.stream.PythonDataStream; public class PlanBuilder { @@ -44,7 +45,7 @@ public class PlanBuilder { processStream(parentStream); } else if (stream instanceof StreamSource) { planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator); - } else if (stream instanceof DataStream) { + } else if (stream instanceof DataStream || stream instanceof PythonDataStream) { planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator); Stream parentStream = stream.getInputStream(); int inputVertexId = parentStream.getId(); diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonFunction.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonFunction.java new file mode 100644 index 000000000..9751d7176 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonFunction.java @@ -0,0 +1,92 @@ +package org.ray.streaming.python; + +import org.ray.streaming.api.function.Function; + +/** + * Represents a user defined python function. + * + *

Python worker can use information in this class to create a function object.

+ * + *

If this object is constructed from serialized python function, + * python worker can deserialize it to create python function directly. + * If this object is constructed from moduleName and className/functionName, + * python worker will use `importlib` to load python function.

+ * + *

If the python data stream api is invoked from python, `function` will be not null.

+ *

If the python data stream api is invoked from java, `moduleName` and + * `className`/`functionName` will be not null.

+ *

+ * TODO serialize to bytes using protobuf + */ +public class PythonFunction implements Function { + public enum FunctionInterface { + SOURCE_FUNCTION("ray.streaming.function.SourceFunction"), + MAP_FUNCTION("ray.streaming.function.MapFunction"), + FLAT_MAP_FUNCTION("ray.streaming.function.FlatMapFunction"), + FILTER_FUNCTION("ray.streaming.function.FilterFunction"), + KEY_FUNCTION("ray.streaming.function.KeyFunction"), + REDUCE_FUNCTION("ray.streaming.function.ReduceFunction"), + SINK_FUNCTION("ray.streaming.function.SinkFunction"); + + private String functionInterface; + + FunctionInterface(String functionInterface) { + this.functionInterface = functionInterface; + } + } + + private byte[] function; + private String moduleName; + private String className; + private String functionName; + /** + * FunctionInterface can be used to validate python function, + * and look up operator class from FunctionInterface. + */ + private String functionInterface; + + private PythonFunction(byte[] function, + String moduleName, + String className, + String functionName) { + this.function = function; + this.moduleName = moduleName; + this.className = className; + this.functionName = functionName; + } + + public void setFunctionInterface(FunctionInterface functionInterface) { + this.functionInterface = functionInterface.functionInterface; + } + + /** + * Create a {@link PythonFunction} using python serialized function + * + * @param function serialized python function sent from python driver + */ + public static PythonFunction fromFunction(byte[] function) { + return new PythonFunction(function, null, null, null); + } + + /** + * Create a {@link PythonFunction} using moduleName and + * className. + * + * @param moduleName python module name + * @param className python class name + */ + public static PythonFunction fromClassName(String moduleName, String className) { + return new PythonFunction(null, moduleName, className, null); + } + + /** + * Create a {@link PythonFunction} using moduleName and + * functionName. + * + * @param moduleName python module name + * @param functionName python function name + */ + public static PythonFunction fromFunctionName(String moduleName, String functionName) { + return new PythonFunction(null, moduleName, null, functionName); + } +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonOperator.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonOperator.java new file mode 100644 index 000000000..179d6ef2e --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonOperator.java @@ -0,0 +1,43 @@ +package org.ray.streaming.python; + +import java.util.List; +import org.ray.streaming.api.context.RuntimeContext; +import org.ray.streaming.operator.OperatorType; +import org.ray.streaming.operator.StreamOperator; + +/** + * Represents a {@link StreamOperator} that wraps python {@link PythonFunction}. + */ +@SuppressWarnings("unchecked") +public class PythonOperator extends StreamOperator { + + public PythonOperator(PythonFunction function) { + super(function); + } + + @Override + public void open(List list, RuntimeContext runtimeContext) { + String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName()); + throw new UnsupportedOperationException(msg); + } + + @Override + public void finish() { + String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName()); + throw new UnsupportedOperationException(msg); + } + + @Override + public void close() { + String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName()); + throw new UnsupportedOperationException(msg); + } + + @Override + public OperatorType getOpType() { + String msg = String.format("Methods of %s shouldn't be called.", getClass().getSimpleName()); + throw new UnsupportedOperationException(msg); + } + + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonPartition.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonPartition.java new file mode 100644 index 000000000..8067855da --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/PythonPartition.java @@ -0,0 +1,48 @@ +package org.ray.streaming.python; + +import org.ray.streaming.api.partition.Partition; + +/** + * Represents a python partition function. + *

+ * Python worker can create a partition object using information in this + * PythonPartition. + *

+ * If this object is constructed from serialized python partition, + * python worker can deserialize it to create python partition directly. + * If this object is constructed from moduleName and className/functionName, + * python worker will use `importlib` to load python partition function. + *

+ * TODO serialize to bytes using protobuf + */ +public class PythonPartition implements Partition { + public static final PythonPartition BroadcastPartition = new PythonPartition( + "ray.streaming.partition", "BroadcastPartition", null); + public static final PythonPartition KeyPartition = new PythonPartition( + "ray.streaming.partition", "KeyPartition", null); + public static final PythonPartition RoundRobinPartition = new PythonPartition( + "ray.streaming.partition", "RoundRobinPartition", null); + + private byte[] partition; + private String moduleName; + private String className; + private String functionName; + + public PythonPartition(byte[] partition) { + this.partition = partition; + } + + public PythonPartition(String moduleName, String className, String functionName) { + this.moduleName = moduleName; + this.className = className; + this.functionName = functionName; + } + + @Override + public int[] partition(Object record, int numPartition) { + String msg = String.format("partition method of %s shouldn't be called.", + getClass().getSimpleName()); + throw new UnsupportedOperationException(msg); + } + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonDataStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonDataStream.java new file mode 100644 index 000000000..39cb89b45 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonDataStream.java @@ -0,0 +1,100 @@ +package org.ray.streaming.python.stream; + +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.stream.Stream; +import org.ray.streaming.python.PythonFunction; +import org.ray.streaming.python.PythonFunction.FunctionInterface; +import org.ray.streaming.python.PythonOperator; +import org.ray.streaming.python.PythonPartition; + +/** + * Represents a stream of data whose transformations will be executed in python. + */ +public class PythonDataStream extends Stream implements PythonStream { + + protected PythonDataStream(StreamingContext streamingContext, + PythonOperator pythonOperator) { + super(streamingContext, pythonOperator); + } + + protected PythonDataStream(Stream inputStream, PythonOperator pythonOperator) { + super(inputStream, pythonOperator); + } + + /** + * Apply a map function to this stream. + * + * @param func The python MapFunction. + * @return A new PythonDataStream. + */ + public PythonDataStream map(PythonFunction func) { + func.setFunctionInterface(FunctionInterface.MAP_FUNCTION); + return new PythonDataStream(this, new PythonOperator(func)); + } + + /** + * Apply a flat-map function to this stream. + * + * @param func The python FlapMapFunction. + * @return A new PythonDataStream + */ + public PythonDataStream flatMap(PythonFunction func) { + func.setFunctionInterface(FunctionInterface.FLAT_MAP_FUNCTION); + return new PythonDataStream(this, new PythonOperator(func)); + } + + /** + * Apply a sink function and get a StreamSink. + * + * @param func The python SinkFunction. + * @return A new StreamSink. + */ + public PythonStreamSink sink(PythonFunction func) { + func.setFunctionInterface(FunctionInterface.SINK_FUNCTION); + return new PythonStreamSink(this, new PythonOperator(func)); + } + + /** + * Apply a key-by function to this stream. + * + * @param func the python keyFunction. + * @return A new KeyDataStream. + */ + public PythonKeyDataStream keyBy(PythonFunction func) { + func.setFunctionInterface(FunctionInterface.KEY_FUNCTION); + return new PythonKeyDataStream(this, new PythonOperator(func)); + } + + /** + * Apply broadcast to this stream. + * + * @return This stream. + */ + public PythonDataStream broadcast() { + this.partition = PythonPartition.BroadcastPartition; + return this; + } + + /** + * Apply a partition to this stream. + * + * @param partition The partitioning strategy. + * @return This stream. + */ + public PythonDataStream partitionBy(PythonPartition partition) { + this.partition = partition; + return this; + } + + /** + * Set parallelism to current transformation. + * + * @param parallelism The parallelism to set. + * @return This stream. + */ + public PythonDataStream setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonKeyDataStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonKeyDataStream.java new file mode 100644 index 000000000..2042e478c --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonKeyDataStream.java @@ -0,0 +1,36 @@ +package org.ray.streaming.python.stream; + +import org.ray.streaming.api.stream.Stream; +import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.python.PythonFunction; +import org.ray.streaming.python.PythonFunction.FunctionInterface; +import org.ray.streaming.python.PythonOperator; +import org.ray.streaming.python.PythonPartition; + +/** + * Represents a python DataStream returned by a key-by operation. + */ +public class PythonKeyDataStream extends Stream implements PythonStream { + + public PythonKeyDataStream(PythonDataStream input, StreamOperator streamOperator) { + super(input, streamOperator); + this.partition = PythonPartition.KeyPartition; + } + + /** + * Apply a reduce function to this stream. + * + * @param func The reduce function. + * @return A new DataStream. + */ + public PythonDataStream reduce(PythonFunction func) { + func.setFunctionInterface(FunctionInterface.REDUCE_FUNCTION); + return new PythonDataStream(this, new PythonOperator(func)); + } + + public PythonKeyDataStream setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStream.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStream.java new file mode 100644 index 000000000..985a90262 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStream.java @@ -0,0 +1,7 @@ +package org.ray.streaming.python.stream; + +/** + * A marker interface used to identify all python streams. + */ +public interface PythonStream { +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSink.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSink.java new file mode 100644 index 000000000..ef691cf05 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSink.java @@ -0,0 +1,20 @@ +package org.ray.streaming.python.stream; + +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.python.PythonOperator; + +/** + * Represents a sink of the PythonStream. + */ +public class PythonStreamSink extends StreamSink implements PythonStream { + public PythonStreamSink(PythonDataStream input, PythonOperator sinkOperator) { + super(input, null); + this.streamingContext.addSink(this); + } + + public PythonStreamSink setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSource.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSource.java new file mode 100644 index 000000000..425086834 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/python/stream/PythonStreamSource.java @@ -0,0 +1,31 @@ +package org.ray.streaming.python.stream; + +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.python.PythonFunction; +import org.ray.streaming.python.PythonFunction.FunctionInterface; +import org.ray.streaming.python.PythonOperator; +import org.ray.streaming.python.PythonPartition; + +/** + * Represents a source of the PythonStream. + */ +public class PythonStreamSource extends PythonDataStream implements StreamSource { + + private PythonStreamSource(StreamingContext streamingContext, PythonFunction sourceFunction) { + super(streamingContext, new PythonOperator(sourceFunction)); + super.partition = PythonPartition.RoundRobinPartition; + } + + public PythonStreamSource setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public static PythonStreamSource from(StreamingContext streamingContext, + PythonFunction sourceFunction) { + sourceFunction.setFunctionInterface(FunctionInterface.SOURCE_FUNCTION); + return new PythonStreamSource(streamingContext, sourceFunction); + } + +} diff --git a/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java b/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java index bf631d61b..0e02f8c7b 100644 --- a/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java +++ b/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java @@ -6,8 +6,8 @@ import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.partition.impl.KeyPartition; import org.ray.streaming.api.partition.impl.RoundRobinPartition; import org.ray.streaming.api.stream.DataStream; -import org.ray.streaming.api.stream.StreamSink; -import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.api.stream.DataStreamSink; +import org.ray.streaming.api.stream.DataStreamSource; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +39,9 @@ public class PlanBuilderTest { public Plan buildDataSyncPlan() { StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = StreamSource.buildSource(streamingContext, + DataStream dataStream = DataStreamSource.buildSource(streamingContext, Lists.newArrayList("a", "b", "c")); - StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); + DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); Plan plan = planBuilder.buildPlan(); @@ -74,9 +74,9 @@ public class PlanBuilderTest { public Plan buildKeyByPlan() { StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = StreamSource.buildSource(streamingContext, + DataStream dataStream = DataStreamSource.buildSource(streamingContext, Lists.newArrayList("1", "2", "3", "4")); - StreamSink streamSink = dataStream.keyBy(x -> x) + DataStreamSink streamSink = dataStream.keyBy(x -> x) .sink(x -> LOGGER.info(x)); PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java index ae9ecf841..966834d4d 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java @@ -8,7 +8,6 @@ import org.ray.streaming.api.partition.Partition; * An edge in the physical execution graph. */ public class ExecutionEdge implements Serializable { - private int srcNodeId; private int targetNodeId; private Partition partition; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java index 4b120ead6..019583810 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java @@ -3,18 +3,17 @@ package org.ray.streaming.runtime.core.graph; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.ray.streaming.operator.StreamOperator; import org.ray.streaming.plan.VertexType; -import org.ray.streaming.runtime.core.processor.StreamProcessor; /** * A node in the physical execution graph. */ public class ExecutionNode implements Serializable { - private int nodeId; private int parallelism; private NodeType nodeType; - private StreamProcessor streamProcessor; + private StreamOperator streamOperator; private List executionTasks; private List inputsEdges; private List outputEdges; @@ -71,12 +70,12 @@ public class ExecutionNode implements Serializable { return inputsEdges; } - public StreamProcessor getStreamProcessor() { - return streamProcessor; + public StreamOperator getStreamOperator() { + return streamOperator; } - public void setStreamProcessor(StreamProcessor streamProcessor) { - this.streamProcessor = streamProcessor; + public void setStreamOperator(StreamOperator streamOperator) { + this.streamOperator = streamOperator; } public NodeType getNodeType() { @@ -102,7 +101,7 @@ public class ExecutionNode implements Serializable { sb.append("nodeId=").append(nodeId); sb.append(", parallelism=").append(parallelism); sb.append(", nodeType=").append(nodeType); - sb.append(", streamProcessor=").append(streamProcessor); + sb.append(", streamOperator=").append(streamOperator); sb.append('}'); return sb.toString(); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java index 7e205d51f..d56c30532 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java @@ -11,7 +11,6 @@ import org.ray.streaming.runtime.worker.JobWorker; * An ExecutionNode has n ExecutionTasks if parallelism is n. */ public class ExecutionTask implements Serializable { - private int taskId; private int taskIndex; private RayActor worker; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java index 1c15be9c2..2c6b70a9c 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java @@ -13,8 +13,6 @@ import org.ray.streaming.runtime.core.graph.ExecutionEdge; import org.ray.streaming.runtime.core.graph.ExecutionGraph; import org.ray.streaming.runtime.core.graph.ExecutionNode; import org.ray.streaming.runtime.core.graph.ExecutionTask; -import org.ray.streaming.runtime.core.processor.ProcessBuilder; -import org.ray.streaming.runtime.core.processor.StreamProcessor; import org.ray.streaming.runtime.worker.JobWorker; public class TaskAssignerImpl implements TaskAssigner { @@ -42,10 +40,8 @@ public class TaskAssignerImpl implements TaskAssigner { vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId))); taskId++; } - StreamProcessor streamProcessor = ProcessBuilder - .buildProcessor(planVertex.getStreamOperator()); executionNode.setExecutionTasks(vertexTasks); - executionNode.setStreamProcessor(streamProcessor); + executionNode.setStreamOperator(planVertex.getStreamOperator()); idToExecutionNode.put(executionNode.getNodeId(), executionNode); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java index bb7e607b0..47950b0cf 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java @@ -11,6 +11,7 @@ import org.ray.streaming.runtime.core.graph.ExecutionNode; import org.ray.streaming.runtime.core.graph.ExecutionNode.NodeType; import org.ray.streaming.runtime.core.graph.ExecutionTask; import org.ray.streaming.runtime.core.processor.OneInputProcessor; +import org.ray.streaming.runtime.core.processor.ProcessBuilder; import org.ray.streaming.runtime.core.processor.SourceProcessor; import org.ray.streaming.runtime.core.processor.StreamProcessor; import org.ray.streaming.runtime.transfer.TransferHandler; @@ -54,7 +55,8 @@ public class JobWorker implements Serializable { this.executionNode = executionGraph.getExecutionNodeByTaskId(taskId); this.nodeType = executionNode.getNodeType(); - this.streamProcessor = executionNode.getStreamProcessor(); + this.streamProcessor = ProcessBuilder + .buildProcessor(executionNode.getStreamOperator()); LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", taskId, streamProcessor); String channelType = (String) this.config.getOrDefault( diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java index 7d4f397ba..cdea7092d 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java @@ -9,6 +9,7 @@ import org.ray.api.RayActor; import org.ray.api.id.ActorId; import org.ray.streaming.api.collector.Collector; import org.ray.streaming.api.context.RuntimeContext; +import org.ray.streaming.api.partition.Partition; import org.ray.streaming.runtime.core.collector.OutputCollector; import org.ray.streaming.runtime.core.graph.ExecutionEdge; import org.ray.streaming.runtime.core.graph.ExecutionGraph; @@ -81,7 +82,8 @@ public abstract class StreamTask implements Runnable { DataWriter writer = new DataWriter(channelIDs, toActorIds, queueConf); LOG.info("Create DataWriter succeed."); writers.put(edge, writer); - collectors.add(new OutputCollector(channelIDs, writer, edge.getPartition())); + Partition partition = edge.getPartition(); + collectors.add(new OutputCollector(channelIDs, writer, partition)); } } diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java index aae096f45..40246abdc 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java @@ -5,7 +5,7 @@ import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.function.impl.FlatMapFunction; import org.ray.streaming.api.function.impl.ReduceFunction; import org.ray.streaming.api.function.impl.SinkFunction; -import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.api.stream.DataStreamSource; import org.ray.streaming.runtime.BaseUnitTest; import org.ray.streaming.util.Config; import java.io.Serializable; @@ -36,7 +36,7 @@ public class WordCountTest extends BaseUnitTest implements Serializable { streamingContext.withConfig(config); List text = new ArrayList<>(); text.add("hello world eagle eagle eagle"); - StreamSource streamSource = StreamSource.buildSource(streamingContext, text); + DataStreamSource streamSource = DataStreamSource.buildSource(streamingContext, text); streamSource .flatMap((FlatMapFunction) (value, collector) -> { String[] records = value.split(" "); diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java index e3beefa62..c9f108d8b 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java @@ -11,8 +11,8 @@ import org.ray.runtime.actor.LocalModeRayActor; import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.partition.impl.RoundRobinPartition; import org.ray.streaming.api.stream.DataStream; -import org.ray.streaming.api.stream.StreamSink; -import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.api.stream.DataStreamSink; +import org.ray.streaming.api.stream.DataStreamSource; import org.ray.streaming.runtime.BaseUnitTest; import org.ray.streaming.runtime.core.graph.ExecutionEdge; import org.ray.streaming.runtime.core.graph.ExecutionGraph; @@ -65,9 +65,9 @@ public class TaskAssignerImplTest extends BaseUnitTest { public Plan buildDataSyncPlan() { StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = StreamSource.buildSource(streamingContext, + DataStream dataStream = DataStreamSource.buildSource(streamingContext, Lists.newArrayList("a", "b", "c")); - StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); + DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); Plan plan = planBuilder.buildPlan(); diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java index b0e7f0759..be4c1399a 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java @@ -20,7 +20,7 @@ import org.ray.api.options.ActorCreationOptions.Builder; import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.function.impl.FlatMapFunction; import org.ray.streaming.api.function.impl.ReduceFunction; -import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.api.stream.DataStreamSource; import org.ray.streaming.runtime.BaseUnitTest; import org.ray.streaming.runtime.transfer.ChannelID; import org.ray.streaming.runtime.util.EnvUtil; @@ -158,7 +158,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { streamingContext.withConfig(config); List text = new ArrayList<>(); text.add("hello world eagle eagle eagle"); - StreamSource streamSource = StreamSource.buildSource(streamingContext, text); + DataStreamSource streamSource = DataStreamSource.buildSource(streamingContext, text); streamSource .flatMap((FlatMapFunction) (value, collector) -> { String[] records = value.split(" ");