[Streaming] Streaming data transfer java (#6474)

This commit is contained in:
Chaokun Yang
2019-12-22 10:56:05 +08:00
committed by Hao Chen
parent 1b14fbe179
commit 7bbfa85c66
146 changed files with 3923 additions and 786 deletions
+213
View File
@@ -0,0 +1,213 @@
load("//bazel:ray.bzl", "define_java_module")
load("@rules_proto_grpc//java:defs.bzl", "java_proto_compile")
exports_files([
"testng.xml",
])
all_modules = [
"streaming-api",
"streaming-runtime",
]
java_import(
name = "all_modules",
jars = [
"liborg_ray_ray_" + module + ".jar"
for module in all_modules
] + [
"liborg_ray_ray_" + module + "-src.jar"
for module in all_modules
] + [
"all_streaming_tests_deploy.jar",
"all_streaming_tests_deploy-src.jar",
],
deps = [
":org_ray_ray_" + module
for module in all_modules
] + [
":all_streaming_tests",
],
)
define_java_module(
name = "streaming-api",
define_test_lib = True,
test_deps = [
"//java:org_ray_ray_api",
":org_ray_ray_streaming-api",
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
"@ray_streaming_maven//:org_testng_testng",
],
visibility = ["//visibility:public"],
deps = [
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
],
)
# `//streaming:streaming_java` will be located in jar `streaming` directory,
# but we need it located in jar root path.
# resource_strip_prefix = "streaming" will make other resources file located in wrong path.
# So we copy libs explicitly to remove `streaming` path.
filegroup(
name = "java_native_deps",
srcs = [":streaming_java"],
)
filegroup(
name = "streaming_java",
srcs = select({
"@bazel_tools//src/conditions:darwin": [":streaming_java_darwin"],
"//conditions:default": [":streaming_java_linux"],
}),
visibility = ["//visibility:public"],
)
genrule(
name = "streaming_java_darwin",
srcs = ["//streaming:libstreaming_java.so"],
outs = ["libstreaming_java.dylib"],
cmd = "cp $< $@",
output_to_bindir = 1,
)
genrule(
name = "streaming_java_linux",
srcs = ["//streaming:libstreaming_java.so"],
outs = ["libstreaming_java.so"],
cmd = "cp $< $@",
output_to_bindir = 1,
)
define_java_module(
name = "streaming-runtime",
additional_resources = [
":java_native_deps",
],
additional_srcs = [
":all_java_proto",
],
define_test_lib = True,
exclude_srcs = [
"streaming-runtime/src/main/java/org/ray/streaming/runtime/generated/*.java",
],
test_deps = [
"//java:org_ray_ray_api",
"//java:org_ray_ray_runtime",
":org_ray_ray_streaming-api",
":org_ray_ray_streaming-runtime",
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
"@ray_streaming_maven//:org_testng_testng",
],
visibility = ["//visibility:public"],
deps = [
":org_ray_ray_streaming-api",
"//java:org_ray_ray_api",
"//java:org_ray_ray_runtime",
"@ray_streaming_maven//:com_github_davidmoten_flatbuffers_java",
"@ray_streaming_maven//:com_google_guava_guava",
"@ray_streaming_maven//:com_google_protobuf_protobuf_java",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",
],
)
java_binary(
name = "all_streaming_tests",
args = ["streaming/java/testng.xml"],
data = ["testng.xml"],
main_class = "org.testng.TestNG",
runtime_deps = [
":org_ray_ray_streaming-api_test",
":org_ray_ray_streaming-runtime",
":org_ray_ray_streaming-runtime_test",
"//java:org_ray_ray_runtime",
"@ray_streaming_maven//:com_beust_jcommander",
"@ray_streaming_maven//:org_testng_testng",
],
)
# proto buffer
java_proto_compile(
name = "streaming_java_proto",
deps = ["//streaming:streaming_proto"],
)
filegroup(
name = "all_java_proto",
srcs = [
":streaming_java_proto",
],
)
genrule(
name = "copy_pom_file",
srcs = [
"//streaming/java:org_ray_ray_" + module + "_pom"
for module in all_modules
],
outs = ["copy_pom_file.out"],
cmd = """
set -x
WORK_DIR=$$(pwd)
cp -f $(location //streaming/java:org_ray_ray_streaming-api_pom) $$WORK_DIR/streaming/java/streaming-api/pom.xml
cp -f $(location //streaming/java:org_ray_ray_streaming-runtime_pom) $$WORK_DIR/streaming/java/streaming-runtime/pom.xml
echo $$(date) > $@
""",
local = 1,
tags = ["no-cache"],
)
genrule(
name = "cp_java_generated",
srcs = [
":all_java_proto",
":copy_pom_file",
],
outs = ["cp_java_generated.out"],
cmd = """
set -x
WORK_DIR=$$(pwd)
GENERATED_DIR=$$WORK_DIR/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/generated
rm -rf $$GENERATED_DIR
mkdir -p $$GENERATED_DIR
# Copy protobuf-generated files.
for f in $(locations //streaming/java:all_java_proto); do
unzip $$f -x META-INF/MANIFEST.MF -d $$WORK_DIR/streaming/java/streaming-runtime/src/main/java
done
echo $$(date) > $@
""",
local = 1,
tags = ["no-cache"],
)
# Generates the dependencies needed by maven.
genrule(
name = "gen_maven_deps",
srcs = [
":java_native_deps",
":cp_java_generated",
],
outs = ["gen_maven_deps.out"],
cmd = """
set -x
WORK_DIR=$$(pwd)
# Copy native dependencies.
NATIVE_DEPS_DIR=$$WORK_DIR/streaming/java/streaming-runtime/native_dependencies/
rm -rf $$NATIVE_DEPS_DIR
mkdir -p $$NATIVE_DEPS_DIR
for f in $(locations //streaming/java:java_native_deps); do
chmod +w $$f
cp $$f $$NATIVE_DEPS_DIR
done
echo $$(date) > $@
""",
local = 1,
tags = ["no-cache"],
)
@@ -0,0 +1,14 @@
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress checks="OperatorWrap" files=".*" />
<suppress checks="JavadocParagraph" files=".*" />
<suppress checks="SummaryJavadoc" files=".*" />
<suppress checks="AbbreviationAsWordInNameCheck" files=".*"/>
<suppress checks="ClassTypeParameterName" files="OneInputStreamTask.java"/>
<suppress checks="ClassTypeParameterName" files="StreamTask.java"/>
<!-- suppress check for flatbuffer-generated files. -->
<suppress checks=".*" files="org[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]" />
</suppressions>
+20
View File
@@ -0,0 +1,20 @@
load("@rules_jvm_external//:defs.bzl", "maven_install")
def gen_streaming_java_deps():
maven_install(
name = "ray_streaming_maven",
artifacts = [
"com.beust:jcommander:1.72",
"com.google.guava:guava:27.0.1-jre",
"com.github.davidmoten:flatbuffers-java:1.9.0.1",
"com.google.protobuf:protobuf-java:3.8.0",
"de.ruedigermoeller:fst:2.57",
"org.slf4j:slf4j-api:1.7.12",
"org.slf4j:slf4j-log4j12:1.7.25",
"org.apache.logging.log4j:log4j-core:2.8.2",
"org.testng:testng:6.9.10",
],
repositories = [
"https://repo1.maven.org/maven2/",
],
)
+154
View File
@@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<groupId>org.ray</groupId>
<artifactId>ray-streaming</artifactId>
<version>0.1-SNAPSHOT</version>
<name>ray streaming</name>
<description>ray streaming</description>
<modules>
<module>streaming-api</module>
<module>streaming-runtime</module>
</modules>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<projetct.version>0.1-SNAPSHOT</projetct.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
<compilerArgument>-parameters</compilerArgument>
<testCompilerArgument>-parameters</testCompilerArgument>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.0.1</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>deploy</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<executions>
<execution>
<id>attach-javadocs</id>
<phase>deploy</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
<executions>
<execution>
<id>deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<configLocation>../../java/checkstyle.xml</configLocation>
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<failOnViolation>true</failOnViolation>
<violationSeverity>warning</violationSeverity>
<outputFile>${project.build.directory}/checkstyle-errors.xml</outputFile>
<linkXRef>false</linkXRef>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
+46
View File
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- This file is auto-generated by Bazel from pom_template.xml, do not modify it. -->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>org.ray</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming-api</artifactId>
<name>ray streaming api</name>
<description>ray streaming api</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
{auto_gen_header}
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>org.ray</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming-api</artifactId>
<name>ray streaming api</name>
<description>ray streaming api</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
{generated_bzl_deps}
</dependencies>
</project>
@@ -0,0 +1,26 @@
package org.ray.streaming.api.collector;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.message.Record;
/**
* Combination of multiple collectors.
*
* @param <T> The type of output data.
*/
public class CollectionCollector<T> implements Collector<T> {
private List<Collector> collectorList;
public CollectionCollector(List<Collector> collectorList) {
this.collectorList = collectorList;
}
@Override
public void collect(T value) {
for (Collector collector : collectorList) {
collector.collect(new Record(value));
}
}
}
@@ -0,0 +1,13 @@
package org.ray.streaming.api.collector;
/**
* The collector that collects data from an upstream operator, and emits data to downstream
* operators.
*
* @param <T> Type of the data to collect.
*/
public interface Collector<T> {
void collect(T value);
}
@@ -0,0 +1,18 @@
package org.ray.streaming.api.context;
/**
* Encapsulate the runtime information of a streaming task.
*/
public interface RuntimeContext {
int getTaskId();
int getTaskIndex();
int getParallelism();
Long getBatchId();
Long getMaxBatch();
}
@@ -0,0 +1,70 @@
package org.ray.streaming.api.context;
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanBuilder;
import org.ray.streaming.schedule.JobScheduler;
/**
* Encapsulate the context information of a streaming Job.
*/
public class StreamingContext implements Serializable {
private transient AtomicInteger idGenerator;
/**
* The sinks of this streaming job.
*/
private List<StreamSink> streamSinks;
private Map<String, Object> jobConfig;
/**
* The logic plan.
*/
private Plan plan;
private StreamingContext() {
this.idGenerator = new AtomicInteger(0);
this.streamSinks = new ArrayList<>();
this.jobConfig = new HashMap<>();
}
public static StreamingContext buildContext() {
return new StreamingContext();
}
/**
* Construct job DAG, and execute the job.
*/
public void execute() {
PlanBuilder planBuilder = new PlanBuilder(this.streamSinks);
this.plan = planBuilder.buildPlan();
plan.printPlan();
ServiceLoader<JobScheduler> serviceLoader = ServiceLoader.load(JobScheduler.class);
Iterator<JobScheduler> iterator = serviceLoader.iterator();
Preconditions.checkArgument(iterator.hasNext(),
"No JobScheduler implementation has been provided.");
JobScheduler jobSchedule = iterator.next();
jobSchedule.schedule(plan, jobConfig);
}
public int generateId() {
return this.idGenerator.incrementAndGet();
}
public void addSink(StreamSink streamSink) {
streamSinks.add(streamSink);
}
public void withConfig(Map<String, Object> jobConfig) {
this.jobConfig = jobConfig;
}
}
@@ -0,0 +1,10 @@
package org.ray.streaming.api.function;
import java.io.Serializable;
/**
* Interface of streaming functions.
*/
public interface Function extends Serializable {
}
@@ -0,0 +1,23 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of aggregate functions.
*
* @param <I> Type of the input data.
* @param <A> Type of the intermediate data.
* @param <O> Type of the output data.
*/
public interface AggregateFunction<I, A, O> extends Function {
A createAccumulator();
void add(I value, A accumulator);
O getResult(A accumulator);
A merge(A a, A b);
void retract(A acc, I value);
}
@@ -0,0 +1,16 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.Function;
/**
* Interface of flat-map functions.
*
* @param <T> Type of the input data.
* @param <R> Type of the output data.
*/
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Function {
void flatMap(T value, Collector<R> collector);
}
@@ -0,0 +1,17 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of join functions.
*
* @param <T> Type of the left input data.
* @param <O> Type of the right input data.
* @param <R> Type of the output data.
*/
@FunctionalInterface
public interface JoinFunction<T, O, R> extends Function {
R join(T left, O right);
}
@@ -0,0 +1,15 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of key-by functions.
*
* @param <T> Type of the input data.
* @param <K> Type of the key-by field.
*/
@FunctionalInterface
public interface KeyFunction<T, K> extends Function {
K keyBy(T value);
}
@@ -0,0 +1,15 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of map functions.
*
* @param <T> type of the input data.
* @param <R> type of the output data.
*/
@FunctionalInterface
public interface MapFunction<T, R> extends Function {
R map(T value);
}
@@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of process functions.
*
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface ProcessFunction<T> extends Function {
void process(T value);
}
@@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of reduce functions.
*
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface ReduceFunction<T> extends Function {
T reduce(T oldValue, T newValue);
}
@@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of sink functions.
*
* @param <T> Type of the sink data.
*/
@FunctionalInterface
public interface SinkFunction<T> extends Function {
void sink(T value);
}
@@ -0,0 +1,23 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of Source functions.
*
* @param <T> Type of the data output by the source.
*/
public interface SourceFunction<T> extends Function {
void init(int parallel, int index);
void run(SourceContext<T> ctx) throws Exception;
void close();
interface SourceContext<T> {
void collect(T element) throws Exception;
}
}
@@ -0,0 +1,37 @@
package org.ray.streaming.api.function.internal;
import java.util.ArrayList;
import java.util.Collection;
import org.ray.streaming.api.function.impl.SourceFunction;
/**
* The SourceFunction that fetch data from a Java Collection object.
*
* @param <T> Type of the data output by the source.
*/
public class CollectionSourceFunction<T> implements SourceFunction<T> {
private Collection<T> values;
public CollectionSourceFunction(Collection<T> values) {
this.values = values;
}
@Override
public void init(int parallel, int index) {
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
for (T value : values) {
ctx.collect(value);
}
// empty collection
values = new ArrayList<>();
}
@Override
public void close() {
}
}
@@ -0,0 +1,23 @@
package org.ray.streaming.api.partition;
import org.ray.streaming.api.function.Function;
/**
* Interface of the partitioning strategy.
*
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface Partition<T> extends Function {
/**
* Given a record and downstream partitions, determine which partition(s) should receive the
* record.
*
* @param record The record.
* @param numPartition num of partitions
* @return IDs of the downstream partitions that should receive the record.
*/
int[] partition(T record, int numPartition);
}
@@ -0,0 +1,24 @@
package org.ray.streaming.api.partition.impl;
import java.util.stream.IntStream;
import org.ray.streaming.api.partition.Partition;
/**
* Broadcast the record to all downstream partitions.
*/
public class BroadcastPartition<T> implements Partition<T> {
private int[] partitions = new int[0];
public BroadcastPartition() {
}
@Override
public int[] partition(T value, int numPartition) {
if (partitions.length != numPartition) {
partitions = IntStream.rangeClosed(0, numPartition - 1).toArray();
}
return partitions;
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.api.partition.impl;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.message.KeyRecord;
/**
* Partition the record by the key.
*
* @param <K> Type of the partition key.
* @param <T> Type of the input record.
*/
public class KeyPartition<K, T> implements Partition<KeyRecord<K, T>> {
private int[] partitions = new int[1];
@Override
public int[] partition(KeyRecord<K, T> keyRecord, int numPartition) {
partitions[0] = Math.abs(keyRecord.getKey().hashCode() % numPartition);
return partitions;
}
}
@@ -0,0 +1,24 @@
package org.ray.streaming.api.partition.impl;
import org.ray.streaming.api.partition.Partition;
/**
* Partition record to downstream tasks in a round-robin matter.
*
* @param <T> Type of the input record.
*/
public class RoundRobinPartition<T> implements Partition<T> {
private int seq;
private int[] partitions = new int[1];
public RoundRobinPartition() {
this.seq = 0;
}
@Override
public int[] partition(T value, int numPartition) {
seq = (seq + 1) % numPartition;
partitions[0] = seq;
return partitions;
}
}
@@ -0,0 +1,136 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.api.function.impl.MapFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.api.partition.impl.BroadcastPartition;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.impl.FlatMapOperator;
import org.ray.streaming.operator.impl.KeyByOperator;
import org.ray.streaming.operator.impl.MapOperator;
import org.ray.streaming.operator.impl.SinkOperator;
/**
* Represents a stream of data.
*
* This class defines all the streaming operations.
*
* @param <T> Type of data in the stream.
*/
public class DataStream<T> extends Stream<T> {
public DataStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public DataStream(DataStream input, StreamOperator streamOperator) {
super(input, streamOperator);
}
/**
* Apply a map function to this stream.
*
* @param mapFunction The map function.
* @param <R> Type of data returned by the map function.
* @return A new DataStream.
*/
public <R> DataStream<R> map(MapFunction<T, R> mapFunction) {
return new DataStream<>(this, new MapOperator(mapFunction));
}
/**
* Apply a flat-map function to this stream.
*
* @param flatMapFunction The FlatMapFunction
* @param <R> Type of data returned by the flatmap function.
* @return A new DataStream
*/
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapFunction) {
return new DataStream(this, new FlatMapOperator(flatMapFunction));
}
/**
* Apply a union transformation to this stream, with another stream.
*
* @param other Another stream.
* @return A new UnionStream.
*/
public UnionStream<T> union(DataStream<T> other) {
return new UnionStream(this, null, other);
}
/**
* Apply a join transformation to this stream, with another stream.
*
* @param other Another stream.
* @param <O> The type of the other stream data.
* @param <R> The type of the data in the joined stream.
* @return A new JoinStream.
*/
public <O, R> JoinStream<T, O, R> join(DataStream<O> other) {
return new JoinStream<>(this, other);
}
public <R> DataStream<R> process() {
// TODO(zhenxuanpan): Need to add processFunction.
return new DataStream(this, null);
}
/**
* Apply a sink function and get a StreamSink.
*
* @param sinkFunction The sink function.
* @return A new StreamSink.
*/
public StreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new StreamSink<>(this, new SinkOperator(sinkFunction));
}
/**
* Apply a key-by function to this stream.
*
* @param keyFunction the key function.
* @param <K> The type of the key.
* @return A new KeyDataStream.
*/
public <K> KeyDataStream<K, T> keyBy(KeyFunction<T, K> keyFunction) {
return new KeyDataStream<>(this, new KeyByOperator(keyFunction));
}
/**
* Apply broadcast to this stream.
*
* @return This stream.
*/
public DataStream<T> broadcast() {
this.partition = new BroadcastPartition<>();
return this;
}
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy.
* @return This stream.
*/
public DataStream<T> partitionBy(Partition<T> partition) {
this.partition = partition;
return this;
}
/**
* Set parallelism to current transformation.
*
* @param parallelism The parallelism to set.
* @return This stream.
*/
public DataStream<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,82 @@
package org.ray.streaming.api.stream;
import java.io.Serializable;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.JoinFunction;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a DataStream of two joined DataStream.
*
* @param <L> Type of the data in the left stream.
* @param <R> Type of the data in the right stream.
* @param <J> Type of the data in the joined stream.
*/
public class JoinStream<L, R, J> extends DataStream<L> {
public JoinStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public JoinStream(DataStream<L> leftStream, DataStream<R> rightStream) {
super(leftStream, null);
}
/**
* Apply key-by to the left join stream.
*/
public <K> Where<L, R, J, K> where(KeyFunction<L, K> keyFunction) {
return new Where<>(this, keyFunction);
}
/**
* Where clause of the join transformation.
*
* @param <L> Type of the data in the left stream.
* @param <R> Type of the data in the right stream.
* @param <J> Type of the data in the joined stream.
* @param <K> Type of the join key.
*/
class Where<L, R, J, K> implements Serializable {
private JoinStream<L, R, J> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
public Where(JoinStream<L, R, J> joinStream, KeyFunction<L, K> leftKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
}
public Equal<L, R, J, K> equalLo(KeyFunction<R, K> rightKeyFunction) {
return new Equal<>(joinStream, leftKeyByFunction, rightKeyFunction);
}
}
/**
* Equal clause of the join transformation.
*
* @param <L> Type of the data in the left stream.
* @param <R> Type of the data in the right stream.
* @param <J> Type of the data in the joined stream.
* @param <K> Type of the join key.
*/
class Equal<L, R, J, K> implements Serializable {
private JoinStream<L, R, J> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
private KeyFunction<R, K> rightKeyByFunction;
public Equal(JoinStream<L, R, J> joinStream, KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
this.rightKeyByFunction = rightKeyByFunction;
}
public DataStream<J> with(JoinFunction<L, R, J> joinFunction) {
return (DataStream<J>) joinStream;
}
}
}
@@ -0,0 +1,53 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.AggregateFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.partition.impl.KeyPartition;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.impl.ReduceOperator;
/**
* Represents a DataStream returned by a key-by operation.
*
* @param <K> Type of the key.
* @param <T> Type of the data.
*/
public class KeyDataStream<K, T> extends DataStream<T> {
public KeyDataStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public KeyDataStream(DataStream<T> input, StreamOperator streamOperator) {
super(input, streamOperator);
this.partition = new KeyPartition();
}
/**
* Apply a reduce function to this stream.
*
* @param reduceFunction The reduce function.
* @return A new DataStream.
*/
public DataStream<T> reduce(ReduceFunction reduceFunction) {
return new DataStream<>(this, new ReduceOperator(reduceFunction));
}
/**
* Apply an aggregate Function to this stream.
*
* @param aggregateFunction The aggregate function
* @param <A> The type of aggregated intermediate data.
* @param <O> The type of result data.
* @return A new DataStream.
*/
public <A, O> DataStream<O> aggregate(AggregateFunction<T, A, O> aggregateFunction) {
return new DataStream<>(this, null);
}
public KeyDataStream<K, T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,71 @@
package org.ray.streaming.api.stream;
import java.io.Serializable;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.operator.StreamOperator;
/**
* Abstract base class of all stream types.
*
* @param <T> Type of the data in the stream.
*/
public abstract class Stream<T> implements Serializable {
protected int id;
protected int parallelism = 1;
protected StreamOperator operator;
protected Stream<T> inputStream;
protected StreamingContext streamingContext;
protected Partition<T> partition;
public Stream(StreamingContext streamingContext, StreamOperator streamOperator) {
this.streamingContext = streamingContext;
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
}
public Stream(Stream<T> inputStream, StreamOperator streamOperator) {
this.inputStream = inputStream;
this.parallelism = inputStream.getParallelism();
this.streamingContext = this.inputStream.getStreamingContext();
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
}
public Stream<T> getInputStream() {
return inputStream;
}
public StreamOperator getOperator() {
return operator;
}
public StreamingContext getStreamingContext() {
return streamingContext;
}
public int getParallelism() {
return parallelism;
}
public Stream<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public int getId() {
return id;
}
public Partition<T> getPartition() {
return partition;
}
public void setPartition(Partition<T> partition) {
this.partition = partition;
}
}
@@ -0,0 +1,21 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.operator.impl.SinkOperator;
/**
* Represents a sink of the DataStream.
*
* @param <T> Type of the input data of this sink.
*/
public class StreamSink<T> extends Stream<T> {
public StreamSink(DataStream<T> input, SinkOperator sinkOperator) {
super(input, sinkOperator);
this.streamingContext.addSink(this);
}
public StreamSink<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,36 @@
package org.ray.streaming.api.stream;
import java.util.Collection;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.SourceFunction;
import org.ray.streaming.api.function.internal.CollectionSourceFunction;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* Represents a source of the DataStream.
*
* @param <T> The type of StreamSource data.
*/
public class StreamSource<T> extends DataStream<T> {
public StreamSource(StreamingContext streamingContext, SourceFunction<T> sourceFunction) {
super(streamingContext, new SourceOperator<>(sourceFunction));
}
/**
* Build a StreamSource source from a collection.
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data.
* @return A StreamSource.
*/
public static <T> StreamSource<T> buildSource(StreamingContext context, Collection<T> values) {
return new StreamSource(context, new CollectionSourceFunction(values));
}
public StreamSource<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}
@@ -0,0 +1,25 @@
package org.ray.streaming.api.stream;
import java.util.ArrayList;
import java.util.List;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a union DataStream.
*
* @param <T> The type of union data.
*/
public class UnionStream<T> extends DataStream<T> {
private List<DataStream> unionStreams;
public UnionStream(DataStream input, StreamOperator streamOperator, DataStream<T> other) {
super(input, streamOperator);
this.unionStreams = new ArrayList<>();
this.unionStreams.add(other);
}
public List<DataStream> getUnionStreams() {
return unionStreams;
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.message;
public class KeyRecord<K, T> extends Record<T> {
private K key;
public KeyRecord(K key, T value) {
super(value);
this.key = key;
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
}
@@ -0,0 +1,64 @@
package org.ray.streaming.message;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
public class Message implements Serializable {
private int taskId;
private long batchId;
private String stream;
private List<Record> recordList;
public Message(int taskId, long batchId, String stream, List<Record> recordList) {
this.taskId = taskId;
this.batchId = batchId;
this.stream = stream;
this.recordList = recordList;
}
public Message(int taskId, long batchId, String stream, Record record) {
this.taskId = taskId;
this.batchId = batchId;
this.stream = stream;
this.recordList = Lists.newArrayList(record);
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public long getBatchId() {
return batchId;
}
public void setBatchId(long batchId) {
this.batchId = batchId;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public List<Record> getRecordList() {
return recordList;
}
public void setRecordList(List<Record> recordList) {
this.recordList = recordList;
}
public Record getRecord(int index) {
return recordList.get(0);
}
}
@@ -0,0 +1,35 @@
package org.ray.streaming.message;
import java.io.Serializable;
public class Record<T> implements Serializable {
protected transient String stream;
protected T value;
public Record(T value) {
this.value = value;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
@Override
public String toString() {
return value.toString();
}
}
@@ -0,0 +1,13 @@
package org.ray.streaming.operator;
import org.ray.streaming.message.Record;
public interface OneInputOperator<T> extends Operator {
void processElement(Record<T> record) throws Exception;
default OperatorType getOpType() {
return OperatorType.ONE_INPUT;
}
}
@@ -0,0 +1,17 @@
package org.ray.streaming.operator;
import java.io.Serializable;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
public interface Operator extends Serializable {
void open(List<Collector> collectors, RuntimeContext runtimeContext);
void finish();
void close();
OperatorType getOpType();
}
@@ -0,0 +1,8 @@
package org.ray.streaming.operator;
public enum OperatorType {
SOURCE,
ONE_INPUT,
TWO_INPUT,
}
@@ -0,0 +1,47 @@
package org.ray.streaming.operator;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.api.function.Function;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
public abstract class StreamOperator<F extends Function> implements Operator {
protected F function;
protected List<Collector> collectorList;
protected RuntimeContext runtimeContext;
public StreamOperator(F function) {
this.function = function;
}
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
this.collectorList = collectorList;
this.runtimeContext = runtimeContext;
}
public void finish() {
}
public void close() {
}
protected void collect(Record record) {
for (Collector collector : this.collectorList) {
collector.collect(record);
}
}
protected void collect(KeyRecord keyRecord) {
for (Collector collector : this.collectorList) {
collector.collect(keyRecord);
}
}
}
@@ -0,0 +1,13 @@
package org.ray.streaming.operator;
import org.ray.streaming.message.Record;
public interface TwoInputOperator<T, O> extends Operator {
void processElement(Record<T> record1, Record<O> record2);
default OperatorType getOpType() {
return OperatorType.TWO_INPUT;
}
}
@@ -0,0 +1,31 @@
package org.ray.streaming.operator.impl;
import java.util.List;
import org.ray.streaming.api.collector.CollectionCollector;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class FlatMapOperator<T, R> extends StreamOperator<FlatMapFunction<T, R>> implements
OneInputOperator<T> {
private CollectionCollector collectionCollector;
public FlatMapOperator(FlatMapFunction<T, R> flatMapFunction) {
super(flatMapFunction);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.collectionCollector = new CollectionCollector(collectorList);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.function.flatMap(record.getValue(), (Collector<R>) collectionCollector);
}
}
@@ -0,0 +1,22 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class KeyByOperator<T, K> extends StreamOperator<KeyFunction<T, K>> implements
OneInputOperator<T> {
public KeyByOperator(KeyFunction<T, K> keyFunction) {
super(keyFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
K key = this.function.keyBy(record.getValue());
collect(new KeyRecord<>(key, record.getValue()));
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.MapFunction;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class MapOperator<T, R> extends StreamOperator<MapFunction<T, R>> implements
OneInputOperator<T> {
public MapOperator(MapFunction<T, R> mapFunction) {
super(mapFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.collect(new Record<R>(this.function.map(record.getValue())));
}
}
@@ -0,0 +1,44 @@
package org.ray.streaming.operator.impl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class ReduceOperator<K, T> extends StreamOperator<ReduceFunction<T>> implements
OneInputOperator<T> {
private Map<K, T> reduceState;
public ReduceOperator(ReduceFunction<T> reduceFunction) {
super(reduceFunction);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.reduceState = new HashMap<>();
}
@Override
public void processElement(Record<T> record) throws Exception {
KeyRecord<K, T> keyRecord = (KeyRecord<K, T>) record;
K key = keyRecord.getKey();
T value = keyRecord.getValue();
if (reduceState.containsKey(key)) {
T oldValue = reduceState.get(key);
T newValue = this.function.reduce(oldValue, value);
reduceState.put(key, newValue);
collect(new Record(newValue));
} else {
reduceState.put(key, value);
collect(record);
}
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class SinkOperator<T> extends StreamOperator<SinkFunction<T>> implements
OneInputOperator<T> {
public SinkOperator(SinkFunction<T> sinkFunction) {
super(sinkFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.function.sink(record.getValue());
}
}
@@ -0,0 +1,55 @@
package org.ray.streaming.operator.impl;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.api.function.impl.SourceFunction;
import org.ray.streaming.api.function.impl.SourceFunction.SourceContext;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
public class SourceOperator<T> extends StreamOperator<SourceFunction<T>> {
private SourceContextImpl sourceContext;
public SourceOperator(SourceFunction<T> function) {
super(function);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.sourceContext = new SourceContextImpl(collectorList);
this.function.init(runtimeContext.getParallelism(), runtimeContext.getTaskIndex());
}
public void run() {
try {
this.function.run(this.sourceContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public OperatorType getOpType() {
return OperatorType.SOURCE;
}
class SourceContextImpl implements SourceContext<T> {
private List<Collector> collectors;
public SourceContextImpl(List<Collector> collectors) {
this.collectors = collectors;
}
@Override
public void collect(T t) throws Exception {
for (Collector collector : collectors) {
collector.collect(new Record(t));
}
}
}
}
@@ -0,0 +1,58 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The logical execution plan.
*/
public class Plan implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(Plan.class);
private List<PlanVertex> planVertexList;
private List<PlanEdge> planEdgeList;
public Plan() {
this.planVertexList = new ArrayList<>();
this.planEdgeList = new ArrayList<>();
}
public void addVertex(PlanVertex vertex) {
this.planVertexList.add(vertex);
}
public void addEdge(PlanEdge planEdge) {
this.planEdgeList.add(planEdge);
}
public List<PlanVertex> getPlanVertexList() {
return planVertexList;
}
public List<PlanEdge> getPlanEdgeList() {
return planEdgeList;
}
public String getGraphVizPlan() {
return "";
}
public void printPlan() {
if (!LOGGER.isInfoEnabled()) {
return;
}
LOGGER.info("Printing logic plan:");
for (PlanVertex planVertex : planVertexList) {
LOGGER.info(planVertex.toString());
}
for (PlanEdge planEdge : planEdgeList) {
LOGGER.info(planEdge.toString());
}
}
}
@@ -0,0 +1,62 @@
package org.ray.streaming.plan;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.Stream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.operator.StreamOperator;
public class PlanBuilder {
private Plan plan;
private AtomicInteger edgeIdGenerator;
private List<StreamSink> streamSinkList;
public PlanBuilder(List<StreamSink> streamSinkList) {
this.plan = new Plan();
this.streamSinkList = streamSinkList;
this.edgeIdGenerator = new AtomicInteger(0);
}
public Plan buildPlan() {
for (StreamSink streamSink : streamSinkList) {
processStream(streamSink);
}
return this.plan;
}
private void processStream(Stream stream) {
int vertexId = stream.getId();
int parallelism = stream.getParallelism();
StreamOperator streamOperator = stream.getOperator();
PlanVertex planVertex = null;
if (stream instanceof StreamSink) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.SINK, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition());
this.plan.addEdge(planEdge);
processStream(parentStream);
} else if (stream instanceof StreamSource) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator);
} else if (stream instanceof DataStream) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition());
this.plan.addEdge(planEdge);
processStream(parentStream);
}
this.plan.addVertex(planVertex);
}
private int getEdgeId() {
return this.edgeIdGenerator.incrementAndGet();
}
}
@@ -0,0 +1,50 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
* PlanEdge is connection and partition rules of upstream and downstream execution nodes.
*/
public class PlanEdge implements Serializable {
private int srcVertexId;
private int targetVertexId;
private Partition partition;
public PlanEdge(int srcVertexId, int targetVertexId, Partition partition) {
this.srcVertexId = srcVertexId;
this.targetVertexId = targetVertexId;
this.partition = partition;
}
public int getSrcVertexId() {
return srcVertexId;
}
public void setSrcVertexId(int srcVertexId) {
this.srcVertexId = srcVertexId;
}
public int getTargetVertexId() {
return targetVertexId;
}
public void setTargetVertexId(int targetVertexId) {
this.targetVertexId = targetVertexId;
}
public Partition getPartition() {
return partition;
}
public void setPartition(Partition partition) {
this.partition = partition;
}
@Override
public String toString() {
return "Edge(" + "from:" + srcVertexId + "-" + targetVertexId + "-" + this.partition.getClass()
+ ")";
}
}
@@ -0,0 +1,49 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import org.ray.streaming.operator.StreamOperator;
/**
* PlanVertex is a cell node where logic is executed.
*/
public class PlanVertex implements Serializable {
private int vertexId;
private int parallelism;
private VertexType vertexType;
private StreamOperator streamOperator;
public PlanVertex(int vertexId, int parallelism, VertexType vertexType,
StreamOperator streamOperator) {
this.vertexId = vertexId;
this.parallelism = parallelism;
this.vertexType = vertexType;
this.streamOperator = streamOperator;
}
public int getVertexId() {
return vertexId;
}
public int getParallelism() {
return parallelism;
}
public StreamOperator getStreamOperator() {
return streamOperator;
}
public VertexType getVertexType() {
return vertexType;
}
@Override
public String toString() {
return "PlanVertex{" +
"vertexId=" + vertexId +
", parallelism=" + parallelism +
", vertexType=" + vertexType +
", streamOperator=" + streamOperator +
'}';
}
}
@@ -0,0 +1,11 @@
package org.ray.streaming.plan;
/**
* Different roles for a node.
*/
public enum VertexType {
MASTER,
SOURCE,
PROCESS,
SINK,
}
@@ -0,0 +1,19 @@
package org.ray.streaming.schedule;
import java.util.Map;
import org.ray.streaming.plan.Plan;
/**
* Interface of the job scheduler.
*/
public interface JobScheduler {
/**
* Assign logical plan to physical execution graph, and schedule job to run.
*
* @param plan The logical plan.
*/
void schedule(Plan plan, Map<String, Object> conf);
}
@@ -0,0 +1,44 @@
package org.ray.streaming.util;
public class Config {
/**
* Maximum number of batches to run in a streaming job.
*/
public static final String STREAMING_BATCH_MAX_COUNT = "streaming.batch.max.count";
/**
* batch frequency in milliseconds
*/
public static final String STREAMING_BATCH_FREQUENCY = "streaming.batch.frequency";
public static final long STREAMING_BATCH_FREQUENCY_DEFAULT = 1000;
public static final String STREAMING_JOB_NAME = "streaming.job.name";
public static final String STREAMING_OP_NAME = "streaming.op_name";
public static final String TASK_JOB_ID = "streaming.task_job_id";
public static final String STREAMING_WORKER_NAME = "streaming.worker_name";
// channel
public static final String CHANNEL_TYPE = "channel_type";
public static final String MEMORY_CHANNEL = "memory_channel";
public static final String NATIVE_CHANNEL = "native_channel";
public static final String DEFAULT_CHANNEL_TYPE = NATIVE_CHANNEL;
public static final String CHANNEL_SIZE = "channel_size";
public static final String CHANNEL_SIZE_DEFAULT = String.valueOf((long)Math.pow(10, 8));
public static final String IS_RECREATE = "streaming.is_recreate";
// return from DataReader.getBundle if only empty message read in this interval.
public static final String TIMER_INTERVAL_MS = "timer_interval_ms";
public static final String READ_TIMEOUT_MS = "read_timeout_ms";
public static final String DEFAULT_READ_TIMEOUT_MS = "10";
public static final String STREAMING_RING_BUFFER_CAPACITY = "streaming.ring_buffer_capacity";
// write an empty message if there is no data to be written in this
// interval.
public static final String STREAMING_EMPTY_MESSAGE_INTERVAL = "streaming.empty_message_interval";
// operator type
public static final String OPERATOR_TYPE = "operator_type";
}
@@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
@@ -0,0 +1,5 @@
ray {
run-mode = SINGLE_PROCESS
resources = "CPU:4"
redis.address = ""
}
@@ -0,0 +1,87 @@
package org.ray.streaming.plan;
import com.google.common.collect.Lists;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.impl.KeyPartition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class PlanBuilderTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PlanBuilderTest.class);
@Test
public void testDataSync() {
Plan plan = buildDataSyncPlan();
List<PlanVertex> planVertexList = plan.getPlanVertexList();
List<PlanEdge> planEdgeList = plan.getPlanEdgeList();
Assert.assertEquals(planVertexList.size(), 2);
Assert.assertEquals(planEdgeList.size(), 1);
PlanEdge planEdge = planEdgeList.get(0);
Assert.assertEquals(planEdge.getPartition().getClass(), RoundRobinPartition.class);
PlanVertex sinkVertex = planVertexList.get(1);
PlanVertex sourceVertex = planVertexList.get(0);
Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK);
Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE);
}
public Plan buildDataSyncPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
}
@Test
public void testKeyByPlan() {
Plan plan = buildKeyByPlan();
List<PlanVertex> planVertexList = plan.getPlanVertexList();
List<PlanEdge> planEdgeList = plan.getPlanEdgeList();
Assert.assertEquals(planVertexList.size(), 3);
Assert.assertEquals(planEdgeList.size(), 2);
PlanVertex source = planVertexList.get(0);
PlanVertex map = planVertexList.get(1);
PlanVertex sink = planVertexList.get(2);
Assert.assertEquals(source.getVertexType(), VertexType.SOURCE);
Assert.assertEquals(map.getVertexType(), VertexType.PROCESS);
Assert.assertEquals(sink.getVertexType(), VertexType.SINK);
PlanEdge keyBy2Sink = planEdgeList.get(0);
PlanEdge source2KeyBy = planEdgeList.get(1);
Assert.assertEquals(keyBy2Sink.getPartition().getClass(), KeyPartition.class);
Assert.assertEquals(source2KeyBy.getPartition().getClass(), RoundRobinPartition.class);
}
public Plan buildKeyByPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
Lists.newArrayList("1", "2", "3", "4"));
StreamSink streamSink = dataStream.keyBy(x -> x)
.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
}
}
@@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
@@ -0,0 +1,3 @@
ray {
run-mode = SINGLE_PROCESS
}
+106
View File
@@ -0,0 +1,106 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- This file is auto-generated by Bazel from pom_template.xml, do not modify it. -->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>org.ray</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming-runtime</artifactId>
<name>ray streaming runtime</name>
<description>ray streaming runtime</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-runtime</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>streaming-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.9.0.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>native_dependencies</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies-to-build</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/../../build/java</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<configuration>
<outputDirectory>${basedir}/../../build/java</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
{auto_gen_header}
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>org.ray</groupId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming-runtime</artifactId>
<name>ray streaming runtime</name>
<description>ray streaming runtime</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-runtime</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>streaming-api</artifactId>
<version>${project.version}</version>
</dependency>
{generated_bzl_deps}
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>native_dependencies</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies-to-build</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/../../build/java</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<configuration>
<outputDirectory>${basedir}/../../build/java</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,24 @@
package org.ray.streaming.runtime.cluster;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* Resource-Manager is used to do the management of resources
*/
public class ResourceManager {
public List<RayActor<JobWorker>> createWorkers(int workerNum) {
List<RayActor<JobWorker>> workers = new ArrayList<>();
for (int i = 0; i < workerNum; i++) {
RayActor<JobWorker> worker = Ray.createActor(JobWorker::new);
workers.add(worker);
}
return workers;
}
}
@@ -0,0 +1,40 @@
package org.ray.streaming.runtime.core.collector;
import java.nio.ByteBuffer;
import java.util.Collection;
import org.ray.runtime.util.Serializer;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.message.Record;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.transfer.DataWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutputCollector implements Collector<Record> {
private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);
private Partition partition;
private DataWriter writer;
private ChannelID[] outputQueues;
public OutputCollector(Collection<String> outputQueueIds,
DataWriter writer,
Partition partition) {
this.outputQueues = outputQueueIds.stream().map(ChannelID::from).toArray(ChannelID[]::new);
this.writer = writer;
this.partition = partition;
LOGGER.debug("OutputCollector constructed, outputQueueIds:{}, partition:{}.",
outputQueueIds, this.partition);
}
@Override
public void collect(Record record) {
int[] partitions = this.partition.partition(record, outputQueues.length);
ByteBuffer msgBuffer = ByteBuffer.wrap(Serializer.encode(record));
for (int partition : partitions) {
writer.write(outputQueues[partition], msgBuffer);
}
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.runtime.core.command;
import java.io.Serializable;
public class BatchInfo implements Serializable {
private long batchId;
public BatchInfo(long batchId) {
this.batchId = batchId;
}
public long getBatchId() {
return batchId;
}
public void setBatchId(long batchId) {
this.batchId = batchId;
}
}
@@ -0,0 +1,49 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
* An edge in the physical execution graph.
*/
public class ExecutionEdge implements Serializable {
private int srcNodeId;
private int targetNodeId;
private Partition partition;
public ExecutionEdge(int srcNodeId, int targetNodeId, Partition partition) {
this.srcNodeId = srcNodeId;
this.targetNodeId = targetNodeId;
this.partition = partition;
}
public int getSrcNodeId() {
return srcNodeId;
}
public void setSrcNodeId(int srcNodeId) {
this.srcNodeId = srcNodeId;
}
public int getTargetNodeId() {
return targetNodeId;
}
public void setTargetNodeId(int targetNodeId) {
this.targetNodeId = targetNodeId;
}
public Partition getPartition() {
return partition;
}
public void setPartition(Partition partition) {
this.partition = partition;
}
public String getStream() {
return "stream:" + srcNodeId + "-" + targetNodeId;
}
}
@@ -0,0 +1,98 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* Physical execution graph.
*/
public class ExecutionGraph implements Serializable {
private long buildTime;
private List<ExecutionNode> executionNodeList;
private List<RayActor<JobWorker>> sourceWorkers = new ArrayList<>();
private List<RayActor<JobWorker>> sinkWorkers = new ArrayList<>();
public ExecutionGraph(List<ExecutionNode> executionNodes) {
this.executionNodeList = executionNodes;
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeType() == ExecutionNode.NodeType.SOURCE) {
List<RayActor<JobWorker>> actors = executionNode.getExecutionTasks().stream()
.map(ExecutionTask::getWorker).collect(Collectors.toList());
sourceWorkers.addAll(actors);
}
if (executionNode.getNodeType() == ExecutionNode.NodeType.SINK) {
List<RayActor<JobWorker>> actors = executionNode.getExecutionTasks().stream()
.map(ExecutionTask::getWorker).collect(Collectors.toList());
sinkWorkers.addAll(actors);
}
}
buildTime = System.currentTimeMillis();
}
public List<RayActor<JobWorker>> getSourceWorkers() {
return sourceWorkers;
}
public List<RayActor<JobWorker>> getSinkWorkers() {
return sinkWorkers;
}
public List<ExecutionNode> getExecutionNodeList() {
return executionNodeList;
}
public ExecutionTask getExecutionTaskByTaskId(int taskId) {
for (ExecutionNode executionNode : executionNodeList) {
for (ExecutionTask executionTask : executionNode.getExecutionTasks()) {
if (executionTask.getTaskId() == taskId) {
return executionTask;
}
}
}
throw new RuntimeException("Task " + taskId + " does not exist!");
}
public ExecutionNode getExecutionNodeByNodeId(int nodeId) {
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeId() == nodeId) {
return executionNode;
}
}
throw new RuntimeException("Node " + nodeId + " does not exist!");
}
public ExecutionNode getExecutionNodeByTaskId(int taskId) {
for (ExecutionNode executionNode : executionNodeList) {
for (ExecutionTask executionTask : executionNode.getExecutionTasks()) {
if (executionTask.getTaskId() == taskId) {
return executionNode;
}
}
}
throw new RuntimeException("Task " + taskId + " does not exist!");
}
public Map<Integer, RayActor<JobWorker>> getTaskId2WorkerByNodeId(int nodeId) {
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeId() == nodeId) {
Map<Integer, RayActor<JobWorker>> taskId2Worker = new HashMap<>();
for (ExecutionTask executionTask : executionNode.getExecutionTasks()) {
taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker());
}
return taskId2Worker;
}
}
throw new RuntimeException("Node " + nodeId + " does not exist!");
}
public long getBuildTime() {
return buildTime;
}
}
@@ -0,0 +1,115 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.ray.streaming.plan.VertexType;
import org.ray.streaming.runtime.core.processor.StreamProcessor;
/**
* A node in the physical execution graph.
*/
public class ExecutionNode implements Serializable {
private int nodeId;
private int parallelism;
private NodeType nodeType;
private StreamProcessor streamProcessor;
private List<ExecutionTask> executionTasks;
private List<ExecutionEdge> inputsEdges;
private List<ExecutionEdge> outputEdges;
public ExecutionNode(int nodeId, int parallelism) {
this.nodeId = nodeId;
this.parallelism = parallelism;
this.executionTasks = new ArrayList<>();
this.inputsEdges = new ArrayList<>();
this.outputEdges = new ArrayList<>();
}
public int getNodeId() {
return nodeId;
}
public void setNodeId(int nodeId) {
this.nodeId = nodeId;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public List<ExecutionTask> getExecutionTasks() {
return executionTasks;
}
public void setExecutionTasks(List<ExecutionTask> executionTasks) {
this.executionTasks = executionTasks;
}
public List<ExecutionEdge> getOutputEdges() {
return outputEdges;
}
public void setOutputEdges(List<ExecutionEdge> outputEdges) {
this.outputEdges = outputEdges;
}
public void addExecutionEdge(ExecutionEdge executionEdge) {
this.outputEdges.add(executionEdge);
}
public void addInputEdge(ExecutionEdge executionEdge) {
this.inputsEdges.add(executionEdge);
}
public List<ExecutionEdge> getInputsEdges() {
return inputsEdges;
}
public StreamProcessor getStreamProcessor() {
return streamProcessor;
}
public void setStreamProcessor(StreamProcessor streamProcessor) {
this.streamProcessor = streamProcessor;
}
public NodeType getNodeType() {
return nodeType;
}
public void setNodeType(VertexType vertexType) {
switch (vertexType) {
case SOURCE:
this.nodeType = NodeType.SOURCE;
break;
case SINK:
this.nodeType = NodeType.SINK;
break;
default:
this.nodeType = NodeType.PROCESS;
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ExecutionNode{");
sb.append("nodeId=").append(nodeId);
sb.append(", parallelism=").append(parallelism);
sb.append(", nodeType=").append(nodeType);
sb.append(", streamProcessor=").append(streamProcessor);
sb.append('}');
return sb.toString();
}
public enum NodeType {
SOURCE,
PROCESS,
SINK,
}
}
@@ -0,0 +1,48 @@
package org.ray.streaming.runtime.core.graph;
import java.io.Serializable;
import org.ray.api.RayActor;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* ExecutionTask is minimal execution unit.
* <p>
* An ExecutionNode has n ExecutionTasks if parallelism is n.
*/
public class ExecutionTask implements Serializable {
private int taskId;
private int taskIndex;
private RayActor<JobWorker> worker;
public ExecutionTask(int taskId, int taskIndex, RayActor<JobWorker> worker) {
this.taskId = taskId;
this.taskIndex = taskIndex;
this.worker = worker;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public int getTaskIndex() {
return taskIndex;
}
public void setTaskIndex(int taskIndex) {
this.taskIndex = taskIndex;
}
public RayActor<JobWorker> getWorker() {
return worker;
}
public void setWorker(RayActor<JobWorker> worker) {
this.worker = worker;
}
}
@@ -0,0 +1,29 @@
package org.ray.streaming.runtime.core.processor;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneInputProcessor<T> extends StreamProcessor<Record<T>, OneInputOperator<T>> {
private static final Logger LOGGER = LoggerFactory.getLogger(OneInputProcessor.class);
public OneInputProcessor(OneInputOperator<T> operator) {
super(operator);
}
@Override
public void process(Record<T> record) {
try {
this.operator.processElement(record);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.operator.close();
}
}
@@ -0,0 +1,31 @@
package org.ray.streaming.runtime.core.processor;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.TwoInputOperator;
import org.ray.streaming.operator.impl.SourceOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProcessBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBuilder.class);
public static StreamProcessor buildProcessor(StreamOperator streamOperator) {
OperatorType type = streamOperator.getOpType();
LOGGER.info("Building StreamProcessor, operator type = {}, operator = {}.", type,
streamOperator.getClass().getSimpleName().toString());
switch (type) {
case SOURCE:
return new SourceProcessor<>((SourceOperator) streamOperator);
case ONE_INPUT:
return new OneInputProcessor<>((OneInputOperator) streamOperator);
case TWO_INPUT:
return new TwoInputProcessor((TwoInputOperator) streamOperator);
default:
throw new RuntimeException("current operator type is not support");
}
}
}
@@ -0,0 +1,15 @@
package org.ray.streaming.runtime.core.processor;
import java.io.Serializable;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
public interface Processor<T> extends Serializable {
void open(List<Collector> collectors, RuntimeContext runtimeContext);
void process(T t);
void close();
}
@@ -0,0 +1,30 @@
package org.ray.streaming.runtime.core.processor;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* The processor for the stream sources, containing a SourceOperator.
*
* @param <T> The type of source data.
*/
public class SourceProcessor<T> extends StreamProcessor<Record, SourceOperator<T>> {
public SourceProcessor(SourceOperator<T> operator) {
super(operator);
}
@Override
public void process(Record record) {
throw new UnsupportedOperationException("SourceProcessor should not process record");
}
public void run() {
operator.run();
}
@Override
public void close() {
}
}
@@ -0,0 +1,41 @@
package org.ray.streaming.runtime.core.processor;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.operator.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* StreamingProcessor is a process unit for a operator.
*
* @param <T> The type of process data.
* @param <P> Type of the specific operator class.
*/
public abstract class StreamProcessor<T, P extends Operator> implements Processor<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
protected List<Collector> collectors;
protected RuntimeContext runtimeContext;
protected P operator;
public StreamProcessor(P operator) {
this.operator = operator;
}
@Override
public void open(List<Collector> collectors, RuntimeContext runtimeContext) {
this.collectors = collectors;
this.runtimeContext = runtimeContext;
if (operator != null) {
this.operator.open(collectors, runtimeContext);
}
LOGGER.info("opened {}", this);
}
@Override
public String toString() {
return this.getClass().getSimpleName();
}
}
@@ -0,0 +1,51 @@
package org.ray.streaming.runtime.core.processor;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.TwoInputOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoInputProcessor<T, O> extends StreamProcessor<Record, TwoInputOperator<T, O>> {
private static final Logger LOGGER = LoggerFactory.getLogger(TwoInputProcessor.class);
private String leftStream;
private String rightStream;
public TwoInputProcessor(TwoInputOperator<T, O> operator) {
super(operator);
}
@Override
public void process(Record record) {
try {
if (record.getStream().equals(leftStream)) {
this.operator.processElement(record, null);
} else {
this.operator.processElement(null, record);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.operator.close();
}
public String getLeftStream() {
return leftStream;
}
public void setLeftStream(String leftStream) {
this.leftStream = leftStream;
}
public String getRightStream() {
return rightStream;
}
public void setRightStream(String rightStream) {
this.rightStream = rightStream;
}
}
@@ -0,0 +1,20 @@
package org.ray.streaming.runtime.schedule;
import java.io.Serializable;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.worker.JobWorker;
/**
* Interface of the task assigning strategy.
*/
public interface ITaskAssign extends Serializable {
/**
* Assign logical plan to physical execution graph.
*/
ExecutionGraph assign(Plan plan, List<RayActor<JobWorker>> workers);
}
@@ -0,0 +1,65 @@
package org.ray.streaming.runtime.schedule;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.runtime.cluster.ResourceManager;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.runtime.worker.context.WorkerContext;
import org.ray.streaming.schedule.JobScheduler;
/**
* JobSchedulerImpl schedules workers by the Plan and the resource information
* from ResourceManager.
*/
public class JobSchedulerImpl implements JobScheduler {
private Plan plan;
private Map<String, Object> jobConfig;
private ResourceManager resourceManager;
private ITaskAssign taskAssign;
public JobSchedulerImpl() {
this.resourceManager = new ResourceManager();
this.taskAssign = new TaskAssignImpl();
}
/**
* Schedule physical plan to execution graph, and call streaming worker to init and run.
*/
@Override
public void schedule(Plan plan, Map<String, Object> jobConfig) {
this.jobConfig = jobConfig;
this.plan = plan;
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
Ray.init();
List<RayActor<JobWorker>> workers = this.resourceManager.createWorkers(getPlanWorker());
ExecutionGraph executionGraph = this.taskAssign.assign(this.plan, workers);
List<ExecutionNode> executionNodes = executionGraph.getExecutionNodeList();
List<RayObject<Boolean>> waits = new ArrayList<>();
for (ExecutionNode executionNode : executionNodes) {
List<ExecutionTask> executionTasks = executionNode.getExecutionTasks();
for (ExecutionTask executionTask : executionTasks) {
int taskId = executionTask.getTaskId();
RayActor<JobWorker> streamWorker = executionTask.getWorker();
waits.add(Ray.call(JobWorker::init, streamWorker,
new WorkerContext(taskId, executionGraph, jobConfig)));
}
}
Ray.wait(waits);
}
private int getPlanWorker() {
List<PlanVertex> planVertexList = plan.getPlanVertexList();
return planVertexList.stream().map(PlanVertex::getParallelism).reduce(0, Integer::sum);
}
}
@@ -0,0 +1,66 @@
package org.ray.streaming.runtime.schedule;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanEdge;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.core.processor.ProcessBuilder;
import org.ray.streaming.runtime.core.processor.StreamProcessor;
import org.ray.streaming.runtime.worker.JobWorker;
public class TaskAssignImpl implements ITaskAssign {
/**
* Assign an optimized logical plan to execution graph.
*
* @param plan The logical plan.
* @param workers The worker actors.
* @return The physical execution graph.
*/
@Override
public ExecutionGraph assign(Plan plan, List<RayActor<JobWorker>> workers) {
List<PlanVertex> planVertices = plan.getPlanVertexList();
List<PlanEdge> planEdges = plan.getPlanEdgeList();
int taskId = 0;
Map<Integer, ExecutionNode> idToExecutionNode = new HashMap<>();
for (PlanVertex planVertex : planVertices) {
ExecutionNode executionNode = new ExecutionNode(planVertex.getVertexId(),
planVertex.getParallelism());
executionNode.setNodeType(planVertex.getVertexType());
List<ExecutionTask> vertexTasks = new ArrayList<>();
for (int taskIndex = 0; taskIndex < planVertex.getParallelism(); taskIndex++) {
vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId)));
taskId++;
}
StreamProcessor streamProcessor = ProcessBuilder
.buildProcessor(planVertex.getStreamOperator());
executionNode.setExecutionTasks(vertexTasks);
executionNode.setStreamProcessor(streamProcessor);
idToExecutionNode.put(executionNode.getNodeId(), executionNode);
}
for (PlanEdge planEdge : planEdges) {
int srcNodeId = planEdge.getSrcVertexId();
int targetNodeId = planEdge.getTargetVertexId();
ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId,
planEdge.getPartition());
idToExecutionNode.get(srcNodeId).addExecutionEdge(executionEdge);
idToExecutionNode.get(targetNodeId).addInputEdge(executionEdge);
}
List<ExecutionNode> executionNodes = idToExecutionNode.values().stream()
.collect(Collectors.toList());
return new ExecutionGraph(executionNodes);
}
}
@@ -0,0 +1,182 @@
package org.ray.streaming.runtime.transfer;
import com.google.common.base.FinalizablePhantomReference;
import com.google.common.base.FinalizableReferenceQueue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import java.lang.ref.Reference;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.Set;
import sun.nio.ch.DirectBuffer;
/**
* ChannelID is used to identify a transfer channel between a upstream worker
* and downstream worker.
*/
public class ChannelID {
public static final int ID_LENGTH = 20;
private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
// This ensures that the FinalizablePhantomReference itself is not garbage-collected.
private static final Set<Reference<?>> references = Sets.newConcurrentHashSet();
private final byte[] bytes;
private final String strId;
private final ByteBuffer buffer;
private final long address;
private final long nativeIdPtr;
private ChannelID(String strId, byte[] idBytes) {
this.strId = strId;
this.bytes = idBytes;
ByteBuffer directBuffer = ByteBuffer.allocateDirect(ID_LENGTH);
directBuffer.put(bytes);
directBuffer.rewind();
this.buffer = directBuffer;
this.address = ((DirectBuffer) (buffer)).address();
long nativeIdPtr = 0;
nativeIdPtr = createNativeID(address);
this.nativeIdPtr = nativeIdPtr;
}
public byte[] getBytes() {
return bytes;
}
public ByteBuffer getBuffer() {
return buffer;
}
public long getAddress() {
return address;
}
public long getNativeIdPtr() {
if (nativeIdPtr == 0) {
throw new IllegalStateException("native ID not available");
}
return nativeIdPtr;
}
@Override
public String toString() {
return strId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChannelID that = (ChannelID) o;
return strId.equals(that.strId);
}
@Override
public int hashCode() {
return strId.hashCode();
}
private static native long createNativeID(long idAddress);
private static native void destroyNativeID(long nativeIdPtr);
/**
* @param id hex string representation of channel id
*/
public static ChannelID from(String id) {
return from(id, ChannelID.idStrToBytes(id));
}
/**
* @param idBytes bytes representation of channel id
*/
public static ChannelID from(byte[] idBytes) {
return from(idBytesToStr(idBytes), idBytes);
}
private static ChannelID from(String strID, byte[] idBytes) {
ChannelID id = new ChannelID(strID, idBytes);
long nativeIdPtr = id.nativeIdPtr;
if (nativeIdPtr != 0) {
Reference<ChannelID> reference =
new FinalizablePhantomReference<ChannelID>(id, REFERENCE_QUEUE) {
@Override
public void finalizeReferent() {
destroyNativeID(nativeIdPtr);
references.remove(this);
}
};
references.add(reference);
}
return id;
}
/**
* @return a random channel id string
*/
public static String genRandomIdStr() {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < ChannelID.ID_LENGTH * 2; ++i) {
sb.append((char) (random.nextInt(6) + 'A'));
}
return sb.toString();
}
/**
* Generate channel name, which will be 20 character
*
* @param fromTaskId upstream task id
* @param toTaskId downstream task id
* @return channel name
*/
public static String genIdStr(int fromTaskId, int toTaskId, long ts) {
/*
| Head | Timestamp | Empty | From | To |
| 8 bytes | 4bytes | 4bytes| 2bytes| 2bytes |
*/
Preconditions.checkArgument(fromTaskId < Short.MAX_VALUE,
"fromTaskId %d is larger than %d", fromTaskId, Short.MAX_VALUE);
Preconditions.checkArgument(toTaskId < Short.MAX_VALUE,
"toTaskId %d is larger than %d", fromTaskId, Short.MAX_VALUE);
byte[] channelName = new byte[20];
for (int i = 11; i >= 8; i--) {
channelName[i] = (byte) (ts & 0xff);
ts >>= 8;
}
channelName[16] = (byte) ((fromTaskId & 0xffff) >> 8);
channelName[17] = (byte) (fromTaskId & 0xff);
channelName[18] = (byte) ((toTaskId & 0xffff) >> 8);
channelName[19] = (byte) (toTaskId & 0xff);
return ChannelID.idBytesToStr(channelName);
}
/**
* @param id hex string representation of channel id
* @return bytes representation of channel id
*/
static byte[] idStrToBytes(String id) {
byte[] idBytes = BaseEncoding.base16().decode(id.toUpperCase());
assert idBytes.length == ChannelID.ID_LENGTH;
return idBytes;
}
/**
* @param id bytes representation of channel id
* @return hex string representation of channel id
*/
static String idBytesToStr(byte[] id) {
assert id.length == ChannelID.ID_LENGTH;
return BaseEncoding.base16().encode(id).toLowerCase();
}
}
@@ -0,0 +1,24 @@
package org.ray.streaming.runtime.transfer;
import java.util.ArrayList;
import java.util.List;
public class ChannelInitException extends Exception {
private final List<byte[]> abnormalQueues;
public ChannelInitException(String message, List<byte[]> abnormalQueues) {
super(message);
this.abnormalQueues = abnormalQueues;
}
public List<byte[]> getAbnormalChannels() {
return abnormalQueues;
}
public List<String> getAbnormalChannelsString() {
List<String> res = new ArrayList<>();
abnormalQueues.forEach(ele -> res.add(ChannelID.idBytesToStr(ele)));
return res;
}
}
@@ -0,0 +1,11 @@
package org.ray.streaming.runtime.transfer;
public class ChannelInterruptException extends RuntimeException {
public ChannelInterruptException() {
super();
}
public ChannelInterruptException(String message) {
super(message);
}
}
@@ -0,0 +1,40 @@
package org.ray.streaming.runtime.transfer;
import java.util.Map;
import org.ray.streaming.runtime.generated.Streaming;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ChannelUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
static byte[] toNativeConf(Map<String, String> conf) {
Streaming.StreamingConfig.Builder builder = Streaming.StreamingConfig.newBuilder();
if (conf.containsKey(Config.STREAMING_JOB_NAME)) {
builder.setJobName(conf.get(Config.STREAMING_JOB_NAME));
}
if (conf.containsKey(Config.TASK_JOB_ID)) {
builder.setTaskJobId(conf.get(Config.TASK_JOB_ID));
}
if (conf.containsKey(Config.STREAMING_WORKER_NAME)) {
builder.setWorkerName(conf.get(Config.STREAMING_WORKER_NAME));
}
if (conf.containsKey(Config.STREAMING_OP_NAME)) {
builder.setOpName(conf.get(Config.STREAMING_OP_NAME));
}
if (conf.containsKey(Config.STREAMING_RING_BUFFER_CAPACITY)) {
builder.setRingBufferCapacity(
Integer.parseInt(conf.get(Config.STREAMING_RING_BUFFER_CAPACITY)));
}
if (conf.containsKey(Config.STREAMING_EMPTY_MESSAGE_INTERVAL)) {
builder.setEmptyMessageInterval(
Integer.parseInt(conf.get(Config.STREAMING_EMPTY_MESSAGE_INTERVAL)));
}
Streaming.StreamingConfig streamingConf = builder.build();
LOGGER.info("Streaming native conf {}", streamingConf.toString());
return streamingConf.toByteArray();
}
}
@@ -0,0 +1,54 @@
package org.ray.streaming.runtime.transfer;
import java.nio.ByteBuffer;
/**
* DataMessage represents data between upstream and downstream operator
*/
public class DataMessage implements Message {
private final ByteBuffer body;
private final long msgId;
private final long timestamp;
private final String channelId;
public DataMessage(ByteBuffer body, long timestamp, long msgId, String channelId) {
this.body = body;
this.timestamp = timestamp;
this.msgId = msgId;
this.channelId = channelId;
}
@Override
public ByteBuffer body() {
return body;
}
@Override
public long timestamp() {
return timestamp;
}
/**
* @return message id
*/
public long msgId() {
return msgId;
}
/**
* @return string id of channel where data is coming from
*/
public String channelId() {
return channelId;
}
@Override
public String toString() {
return "DataMessage{" +
"body=" + body +
", msgId=" + msgId +
", timestamp=" + timestamp +
", channelId='" + channelId + '\'' +
'}';
}
}
@@ -0,0 +1,258 @@
package org.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.ray.api.id.ActorId;
import org.ray.streaming.runtime.util.Platform;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataReader is wrapper of streaming c++ DataReader, which read data
* from channels of upstream workers
*/
public class DataReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataReader.class);
private long nativeReaderPtr;
private Queue<DataMessage> buf = new LinkedList<>();
public DataReader(List<String> inputChannels,
List<ActorId> fromActors,
Map<String, String> conf) {
Preconditions.checkArgument(inputChannels.size() > 0);
Preconditions.checkArgument(inputChannels.size() == fromActors.size());
byte[][] inputChannelsBytes = inputChannels.stream()
.map(ChannelID::idStrToBytes).toArray(byte[][]::new);
byte[][] fromActorsBytes = fromActors.stream()
.map(ActorId::getBytes).toArray(byte[][]::new);
long[] seqIds = new long[inputChannels.size()];
long[] msgIds = new long[inputChannels.size()];
for (int i = 0; i < inputChannels.size(); i++) {
seqIds[i] = 0;
msgIds[i] = 0;
}
long timerInterval = Long.parseLong(
conf.getOrDefault(Config.TIMER_INTERVAL_MS, "-1"));
String channelType = conf.getOrDefault(Config.CHANNEL_TYPE, Config.DEFAULT_CHANNEL_TYPE);
boolean isMock = false;
if (Config.MEMORY_CHANNEL.equals(channelType)) {
isMock = true;
}
boolean isRecreate = Boolean.parseBoolean(
conf.getOrDefault(Config.IS_RECREATE, "false"));
this.nativeReaderPtr = createDataReaderNative(
inputChannelsBytes,
fromActorsBytes,
seqIds,
msgIds,
timerInterval,
isRecreate,
ChannelUtils.toNativeConf(conf),
isMock
);
LOGGER.info("create DataReader succeed");
}
// params set by getBundleNative: bundle data address + size
private final ByteBuffer getBundleParams = ByteBuffer.allocateDirect(24);
// We use direct buffer to reduce gc overhead and memory copy.
private final ByteBuffer bundleData = Platform.wrapDirectBuffer(0, 0);
private final ByteBuffer bundleMeta = ByteBuffer.allocateDirect(BundleMeta.LENGTH);
{
getBundleParams.order(ByteOrder.nativeOrder());
bundleData.order(ByteOrder.nativeOrder());
bundleMeta.order(ByteOrder.nativeOrder());
}
/**
* Read message from input channels, if timeout, return null.
*
* @param timeoutMillis timeout
* @return message or null
*/
public DataMessage read(long timeoutMillis) {
if (buf.isEmpty()) {
getBundle(timeoutMillis);
// if bundle not empty. empty message still has data size + seqId + msgId
if (bundleData.position() < bundleData.limit()) {
BundleMeta bundleMeta = new BundleMeta(this.bundleMeta);
// barrier
if (bundleMeta.getBundleType() == DataBundleType.BARRIER) {
throw new UnsupportedOperationException(
"Unsupported bundle type " + bundleMeta.getBundleType());
} else if (bundleMeta.getBundleType() == DataBundleType.BUNDLE) {
String channelID = bundleMeta.getChannelID();
long timestamp = bundleMeta.getBundleTs();
for (int i = 0; i < bundleMeta.getMessageListSize(); i++) {
buf.offer(getDataMessage(bundleData, channelID, timestamp));
}
} else if (bundleMeta.getBundleType() == DataBundleType.EMPTY) {
long messageId = bundleMeta.getLastMessageId();
buf.offer(new DataMessage(null, bundleMeta.getBundleTs(),
messageId, bundleMeta.getChannelID()));
}
}
}
if (buf.isEmpty()) {
return null;
}
return buf.poll();
}
private DataMessage getDataMessage(ByteBuffer bundleData, String channelID, long timestamp) {
int dataSize = bundleData.getInt();
// msgId
long msgId = bundleData.getLong();
// msgType
bundleData.getInt();
// make `data.capacity() == data.remaining()`, because some code used `capacity()`
// rather than `remaining()`
int position = bundleData.position();
int limit = bundleData.limit();
bundleData.limit(position + dataSize);
ByteBuffer data = bundleData.slice();
bundleData.limit(limit);
bundleData.position(position + dataSize);
return new DataMessage(data, timestamp, msgId, channelID);
}
private void getBundle(long timeoutMillis) {
getBundleNative(nativeReaderPtr, timeoutMillis,
Platform.getAddress(getBundleParams), Platform.getAddress(bundleMeta));
bundleMeta.rewind();
long bundleAddress = getBundleParams.getLong(0);
int bundleSize = getBundleParams.getInt(8);
// This has better performance than NewDirectBuffer or set address/capacity in jni.
Platform.wrapDirectBuffer(bundleData, bundleAddress, bundleSize);
}
/**
* Stop reader
*/
public void stop() {
stopReaderNative(nativeReaderPtr);
}
/**
* Close reader to release resource
*/
public void close() {
if (nativeReaderPtr == 0) {
return;
}
LOGGER.info("closing DataReader.");
closeReaderNative(nativeReaderPtr);
nativeReaderPtr = 0;
LOGGER.info("closing DataReader done.");
}
private static native long createDataReaderNative(
byte[][] inputChannels,
byte[][] inputActorIds,
long[] seqIds,
long[] msgIds,
long timerInterval,
boolean isRecreate,
byte[] configBytes,
boolean isMock);
private native void getBundleNative(long nativeReaderPtr,
long timeoutMillis,
long params,
long metaAddress);
private native void stopReaderNative(long nativeReaderPtr);
private native void closeReaderNative(long nativeReaderPtr);
enum DataBundleType {
EMPTY(1),
BARRIER(2),
BUNDLE(3);
int code;
DataBundleType(int code) {
this.code = code;
}
}
static class BundleMeta {
// kMessageBundleHeaderSize + kUniqueIDSize:
// magicNum(4b) + bundleTs(8b) + lastMessageId(8b) + messageListSize(4b)
// + bundleType(4b) + rawBundleSize(4b) + channelID(20b)
static final int LENGTH = 4 + 8 + 8 + 4 + 4 + 4 + 20;
private int magicNum;
private long bundleTs;
private long lastMessageId;
private int messageListSize;
private DataBundleType bundleType;
private String channelID;
private int rawBundleSize;
BundleMeta(ByteBuffer buffer) {
// StreamingMessageBundleMeta Deserialization
// magicNum
magicNum = buffer.getInt();
// messageBundleTs
bundleTs = buffer.getLong();
// lastOffsetSeqId
lastMessageId = buffer.getLong();
messageListSize = buffer.getInt();
int typeInt = buffer.getInt();
if (DataBundleType.BUNDLE.code == typeInt) {
bundleType = DataBundleType.BUNDLE;
} else if (DataBundleType.BARRIER.code == typeInt) {
bundleType = DataBundleType.BARRIER;
} else {
bundleType = DataBundleType.EMPTY;
}
// rawBundleSize
rawBundleSize = buffer.getInt();
channelID = getQidString(buffer);
}
private String getQidString(ByteBuffer buffer) {
byte[] bytes = new byte[ChannelID.ID_LENGTH];
buffer.get(bytes);
return ChannelID.idBytesToStr(bytes);
}
public int getMagicNum() {
return magicNum;
}
public long getBundleTs() {
return bundleTs;
}
public long getLastMessageId() {
return lastMessageId;
}
public int getMessageListSize() {
return messageListSize;
}
public DataBundleType getBundleType() {
return bundleType;
}
public String getChannelID() {
return channelID;
}
public int getRawBundleSize() {
return rawBundleSize;
}
}
}
@@ -0,0 +1,140 @@
package org.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.ray.api.id.ActorId;
import org.ray.streaming.runtime.util.Platform;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataWriter is a wrapper of streaming c++ DataWriter, which sends data
* to downstream workers
*/
public class DataWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(DataWriter.class);
private long nativeWriterPtr;
private ByteBuffer buffer = ByteBuffer.allocateDirect(0);
private long bufferAddress;
{
ensureBuffer(0);
}
/**
* @param outputChannels output channels ids
* @param toActors downstream output actors
* @param conf configuration
*/
public DataWriter(List<String> outputChannels,
List<ActorId> toActors,
Map<String, String> conf) {
Preconditions.checkArgument(!outputChannels.isEmpty());
Preconditions.checkArgument(outputChannels.size() == toActors.size());
byte[][] outputChannelsBytes = outputChannels.stream()
.map(ChannelID::idStrToBytes).toArray(byte[][]::new);
byte[][] toActorsBytes = toActors.stream()
.map(ActorId::getBytes).toArray(byte[][]::new);
long channelSize = Long.parseLong(
conf.getOrDefault(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT));
long[] msgIds = new long[outputChannels.size()];
for (int i = 0; i < outputChannels.size(); i++) {
msgIds[i] = 0;
}
String channelType = conf.getOrDefault(Config.CHANNEL_TYPE, Config.DEFAULT_CHANNEL_TYPE);
boolean isMock = false;
if (Config.MEMORY_CHANNEL.equals(channelType)) {
isMock = true;
}
this.nativeWriterPtr = createWriterNative(
outputChannelsBytes,
toActorsBytes,
msgIds,
channelSize,
ChannelUtils.toNativeConf(conf),
isMock
);
LOGGER.info("create DataWriter succeed");
}
/**
* Write msg into the specified channel
*
* @param id channel id
* @param item message item data section is specified by [position, limit).
*/
public void write(ChannelID id, ByteBuffer item) {
int size = item.remaining();
ensureBuffer(size);
buffer.clear();
buffer.put(item);
writeMessageNative(nativeWriterPtr, id.getNativeIdPtr(), bufferAddress, size);
}
/**
* Write msg into the specified channels
*
* @param ids channel ids
* @param item message item data section is specified by [position, limit).
* item doesn't have to be a direct buffer.
*/
public void write(Set<ChannelID> ids, ByteBuffer item) {
int size = item.remaining();
ensureBuffer(size);
for (ChannelID id : ids) {
buffer.clear();
buffer.put(item.duplicate());
writeMessageNative(nativeWriterPtr, id.getNativeIdPtr(), bufferAddress, size);
}
}
private void ensureBuffer(int size) {
if (buffer.capacity() < size) {
buffer = ByteBuffer.allocateDirect(size);
buffer.order(ByteOrder.nativeOrder());
bufferAddress = Platform.getAddress(buffer);
}
}
/**
* stop writer
*/
public void stop() {
stopWriterNative(nativeWriterPtr);
}
/**
* close writer to release resources
*/
public void close() {
if (nativeWriterPtr == 0) {
return;
}
LOGGER.info("closing data writer.");
closeWriterNative(nativeWriterPtr);
nativeWriterPtr = 0;
LOGGER.info("closing data writer done.");
}
private static native long createWriterNative(
byte[][] outputQueueIds,
byte[][] outputActorIds,
long[] msgIds,
long channelSize,
byte[] confBytes,
boolean isMock);
private native long writeMessageNative(
long nativeQueueProducerPtr, long nativeIdPtr, long address, int size);
private native void stopWriterNative(long nativeQueueProducerPtr);
private native void closeWriterNative(long nativeQueueProducerPtr);
}
@@ -0,0 +1,22 @@
package org.ray.streaming.runtime.transfer;
import java.nio.ByteBuffer;
public interface Message {
/**
* Message data
*
* Message body is a direct byte buffer, which may be invalid after call next
* <code>DataReader#getBundleNative</code>. Please consume this buffer fully
* before next call <code>getBundleNative</code>.
*
* @return message body
*/
ByteBuffer body();
/**
* @return timestamp when item is written by upstream DataWriter
*/
long timestamp();
}
@@ -0,0 +1,72 @@
package org.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.runtime.util.JniUtils;
/**
* TransferHandler is used for handle direct call based data transfer between workers.
* TransferHandler is used by streaming queue for data transfer.
*/
public class TransferHandler {
static {
try {
Class.forName(RayNativeRuntime.class.getName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
JniUtils.loadLibrary("streaming_java");
}
private long writerClientNative;
private long readerClientNative;
public TransferHandler(long coreWorkerNative,
JavaFunctionDescriptor writerAsyncFunc,
JavaFunctionDescriptor writerSyncFunc,
JavaFunctionDescriptor readerAsyncFunc,
JavaFunctionDescriptor readerSyncFunc) {
Preconditions.checkArgument(coreWorkerNative != 0);
writerClientNative = createWriterClientNative(
coreWorkerNative, writerAsyncFunc, writerSyncFunc);
readerClientNative = createReaderClientNative(
coreWorkerNative, readerAsyncFunc, readerSyncFunc);
}
public void onWriterMessage(byte[] buffer) {
handleWriterMessageNative(writerClientNative, buffer);
}
public byte[] onWriterMessageSync(byte[] buffer) {
return handleWriterMessageSyncNative(writerClientNative, buffer);
}
public void onReaderMessage(byte[] buffer) {
handleReaderMessageNative(readerClientNative, buffer);
}
public byte[] onReaderMessageSync(byte[] buffer) {
return handleReaderMessageSyncNative(readerClientNative, buffer);
}
private native long createWriterClientNative(
long coreWorkerNative,
FunctionDescriptor asyncFunc,
FunctionDescriptor syncFunc);
private native long createReaderClientNative(
long coreWorkerNative,
FunctionDescriptor asyncFunc,
FunctionDescriptor syncFunc);
private native void handleWriterMessageNative(long handler, byte[] buffer);
private native byte[] handleWriterMessageSyncNative(long handler, byte[] buffer);
private native void handleReaderMessageNative(long handler, byte[] buffer);
private native byte[] handleReaderMessageSyncNative(long handler, byte[] buffer);
}
@@ -0,0 +1,19 @@
package org.ray.streaming.runtime.util;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.util.JniUtils;
public class EnvUtil {
public static void loadNativeLibraries() {
// Explicitly load `RayNativeRuntime`, to make sure `core_worker_library_java`
// is loaded before `streaming_java`.
try {
Class.forName(RayNativeRuntime.class.getName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
JniUtils.loadLibrary("streaming_java");
}
}
@@ -0,0 +1,91 @@
package org.ray.streaming.runtime.util;
import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
/**
* Based on org.apache.spark.unsafe.Platform
*/
public final class Platform {
public static final Unsafe UNSAFE;
static {
Unsafe unsafe;
try {
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (Unsafe) unsafeField.get(null);
} catch (Throwable cause) {
throw new UnsupportedOperationException("Unsafe is not supported in this platform.");
}
UNSAFE = unsafe;
}
// Access fields and constructors once and store them, for performance:
private static final Constructor<?> DBB_CONSTRUCTOR;
private static final long BUFFER_ADDRESS_FIELD_OFFSET;
private static final long BUFFER_CAPACITY_FIELD_OFFSET;
static {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
DBB_CONSTRUCTOR = constructor;
Field addressField = Buffer.class.getDeclaredField("address");
BUFFER_ADDRESS_FIELD_OFFSET = UNSAFE.objectFieldOffset(addressField);
Preconditions.checkArgument(BUFFER_ADDRESS_FIELD_OFFSET != 0);
Field capacityField = Buffer.class.getDeclaredField("capacity");
BUFFER_CAPACITY_FIELD_OFFSET = UNSAFE.objectFieldOffset(capacityField);
Preconditions.checkArgument(BUFFER_CAPACITY_FIELD_OFFSET != 0);
} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}
private static final ThreadLocal<ByteBuffer> localEmptyBuffer =
ThreadLocal.withInitial(() -> {
try {
return (ByteBuffer) DBB_CONSTRUCTOR.newInstance(0, 0);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
UNSAFE.throwException(e);
}
throw new IllegalStateException("unreachable");
});
/**
* Wrap a buffer [address, address + size) as a DirectByteBuffer.
*/
public static ByteBuffer wrapDirectBuffer(long address, int size) {
ByteBuffer buffer = localEmptyBuffer.get().duplicate();
UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
buffer.clear();
return buffer;
}
/**
* Wrap a buffer [address, address + size) into provided <code>buffer</code>.
*/
public static void wrapDirectBuffer(ByteBuffer buffer, long address, int size) {
UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
buffer.clear();
}
/**
* @param buffer a DirectBuffer backed by off-heap memory
* @return address of off-heap memory
*/
public static long getAddress(ByteBuffer buffer) {
return ((DirectBuffer) buffer).address();
}
}
@@ -0,0 +1,159 @@
package org.ray.streaming.runtime.worker;
import java.io.Serializable;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.annotation.RayRemote;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
import org.ray.streaming.runtime.core.processor.OneInputProcessor;
import org.ray.streaming.runtime.core.processor.SourceProcessor;
import org.ray.streaming.runtime.core.processor.StreamProcessor;
import org.ray.streaming.runtime.transfer.TransferHandler;
import org.ray.streaming.runtime.util.EnvUtil;
import org.ray.streaming.runtime.worker.context.WorkerContext;
import org.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import org.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import org.ray.streaming.runtime.worker.tasks.StreamTask;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The stream job worker, it is a ray actor.
*/
@RayRemote
public class JobWorker implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(JobWorker.class);
static {
EnvUtil.loadNativeLibraries();
}
private int taskId;
private Map<String, Object> config;
private WorkerContext workerContext;
private ExecutionNode executionNode;
private ExecutionTask executionTask;
private ExecutionGraph executionGraph;
private StreamProcessor streamProcessor;
private NodeType nodeType;
private StreamTask task;
private TransferHandler transferHandler;
public Boolean init(WorkerContext workerContext) {
this.workerContext = workerContext;
this.taskId = workerContext.getTaskId();
this.config = workerContext.getConfig();
this.executionGraph = this.workerContext.getExecutionGraph();
this.executionTask = executionGraph.getExecutionTaskByTaskId(taskId);
this.executionNode = executionGraph.getExecutionNodeByTaskId(taskId);
this.nodeType = executionNode.getNodeType();
this.streamProcessor = executionNode.getStreamProcessor();
LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", taskId, streamProcessor);
String channelType = (String) this.config.getOrDefault(
Config.CHANNEL_TYPE, Config.DEFAULT_CHANNEL_TYPE);
if (channelType.equals(Config.NATIVE_CHANNEL)) {
transferHandler = new TransferHandler(
getNativeCoreWorker(),
new JavaFunctionDescriptor(JobWorker.class.getName(), "onWriterMessage", "([B)V"),
new JavaFunctionDescriptor(JobWorker.class.getName(), "onWriterMessageSync", "([B)[B"),
new JavaFunctionDescriptor(JobWorker.class.getName(), "onReaderMessage", "([B)V"),
new JavaFunctionDescriptor(JobWorker.class.getName(), "onReaderMessageSync", "([B)[B"));
}
task = createStreamTask();
task.start();
return true;
}
private StreamTask createStreamTask() {
if (streamProcessor instanceof OneInputProcessor) {
return new OneInputStreamTask(taskId, streamProcessor, this);
} else if (streamProcessor instanceof SourceProcessor) {
return new SourceStreamTask(taskId, streamProcessor, this);
} else {
throw new RuntimeException("Unsupported type: " + streamProcessor);
}
}
public int getTaskId() {
return taskId;
}
public Map<String, Object> getConfig() {
return config;
}
public WorkerContext getWorkerContext() {
return workerContext;
}
public NodeType getNodeType() {
return nodeType;
}
public ExecutionNode getExecutionNode() {
return executionNode;
}
public ExecutionTask getExecutionTask() {
return executionTask;
}
public ExecutionGraph getExecutionGraph() {
return executionGraph;
}
public StreamProcessor getStreamProcessor() {
return streamProcessor;
}
public StreamTask getTask() {
return task;
}
/**
* Used by upstream streaming queue to send data to this actor
*/
public void onReaderMessage(byte[] buffer) {
transferHandler.onReaderMessage(buffer);
}
/**
* Used by upstream streaming queue to send data to this actor
* and receive result from this actor
*/
public byte[] onReaderMessageSync(byte[] buffer) {
return transferHandler.onReaderMessageSync(buffer);
}
/**
* Used by downstream streaming queue to send data to this actor
*/
public void onWriterMessage(byte[] buffer) {
transferHandler.onWriterMessage(buffer);
}
/**
* Used by downstream streaming queue to send data to this actor
* and receive result from this actor
*/
public byte[] onWriterMessageSync(byte[] buffer) {
return transferHandler.onWriterMessageSync(buffer);
}
private static long getNativeCoreWorker() {
long pointer = 0;
if (Ray.internal() instanceof RayMultiWorkerNativeRuntime) {
pointer = ((RayMultiWorkerNativeRuntime) Ray.internal())
.getCurrentRuntime().getNativeCoreWorkerPointer();
}
return pointer;
}
}
@@ -0,0 +1,62 @@
package org.ray.streaming.runtime.worker.context;
import static org.ray.streaming.util.Config.STREAMING_BATCH_MAX_COUNT;
import java.util.Map;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.runtime.core.graph.ExecutionTask;
/**
* Use Ray to implement RuntimeContext.
*/
public class RayRuntimeContext implements RuntimeContext {
private int taskId;
private int taskIndex;
private int parallelism;
private Long batchId;
private final Long maxBatch;
private Map<String, Object> config;
public RayRuntimeContext(ExecutionTask executionTask, Map<String, Object> config,
int parallelism) {
this.taskId = executionTask.getTaskId();
this.config = config;
this.taskIndex = executionTask.getTaskIndex();
this.parallelism = parallelism;
if (config.containsKey(STREAMING_BATCH_MAX_COUNT)) {
this.maxBatch = Long.valueOf(String.valueOf(config.get(STREAMING_BATCH_MAX_COUNT)));
} else {
this.maxBatch = Long.MAX_VALUE;
}
}
@Override
public int getTaskId() {
return taskId;
}
@Override
public int getTaskIndex() {
return taskIndex;
}
@Override
public int getParallelism() {
return parallelism;
}
@Override
public Long getBatchId() {
return batchId;
}
@Override
public Long getMaxBatch() {
return maxBatch;
}
public void setBatchId(Long batchId) {
this.batchId = batchId;
}
}
@@ -0,0 +1,41 @@
package org.ray.streaming.runtime.worker.context;
import java.io.Serializable;
import java.util.Map;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
/**
* Encapsulate the context information for worker initialization.
*/
public class WorkerContext implements Serializable {
private int taskId;
private ExecutionGraph executionGraph;
private Map<String, Object> config;
public WorkerContext(int taskId, ExecutionGraph executionGraph, Map<String, Object> jobConfig) {
this.taskId = taskId;
this.executionGraph = executionGraph;
this.config = jobConfig;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public ExecutionGraph getExecutionGraph() {
return executionGraph;
}
public void setExecutionGraph(ExecutionGraph executionGraph) {
this.executionGraph = executionGraph;
}
public Map<String, Object> getConfig() {
return config;
}
}
@@ -0,0 +1,53 @@
package org.ray.streaming.runtime.worker.tasks;
import org.ray.runtime.util.Serializer;
import org.ray.streaming.runtime.core.processor.Processor;
import org.ray.streaming.runtime.transfer.Message;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.util.Config;
public abstract class InputStreamTask extends StreamTask {
private volatile boolean running = true;
private volatile boolean stopped = false;
private long readTimeoutMillis;
public InputStreamTask(int taskId, Processor processor, JobWorker streamWorker) {
super(taskId, processor, streamWorker);
readTimeoutMillis = Long.parseLong((String) streamWorker.getConfig()
.getOrDefault(Config.READ_TIMEOUT_MS, Config.DEFAULT_READ_TIMEOUT_MS));
}
@Override
protected void init() {
}
@Override
public void run() {
while (running) {
Message item = reader.read(readTimeoutMillis);
if (item != null) {
byte[] bytes = new byte[item.body().remaining()];
item.body().get(bytes);
Object obj = Serializer.decode(bytes);
processor.process(obj);
}
}
stopped = true;
}
@Override
protected void cancelTask() throws Exception {
running = false;
while (!stopped) {
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("InputStreamTask{");
sb.append("taskId=").append(taskId);
sb.append(", processor=").append(processor);
sb.append('}');
return sb.toString();
}
}
@@ -0,0 +1,11 @@
package org.ray.streaming.runtime.worker.tasks;
import org.ray.streaming.runtime.core.processor.Processor;
import org.ray.streaming.runtime.worker.JobWorker;
public class OneInputStreamTask<IN> extends InputStreamTask {
public OneInputStreamTask(int taskId, Processor processor, JobWorker streamWorker) {
super(taskId, processor, streamWorker);
}
}
@@ -0,0 +1,30 @@
package org.ray.streaming.runtime.worker.tasks;
import org.ray.streaming.runtime.core.processor.Processor;
import org.ray.streaming.runtime.core.processor.SourceProcessor;
import org.ray.streaming.runtime.worker.JobWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SourceStreamTask<IN> extends StreamTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceStreamTask.class);
public SourceStreamTask(int taskId, Processor processor, JobWorker worker) {
super(taskId, processor, worker);
}
@Override
protected void init() {
}
@Override
public void run() {
final SourceProcessor<IN> sourceProcessor = (SourceProcessor<IN>) this.processor;
sourceProcessor.run();
}
@Override
protected void cancelTask() throws Exception {
}
}
@@ -0,0 +1,134 @@
package org.ray.streaming.runtime.worker.tasks;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.runtime.core.collector.OutputCollector;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.processor.Processor;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.transfer.DataReader;
import org.ray.streaming.runtime.transfer.DataWriter;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.runtime.worker.context.RayRuntimeContext;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class StreamTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
protected int taskId;
protected Processor processor;
protected JobWorker worker;
protected DataReader reader;
private Map<ExecutionEdge, DataWriter> writers;
private Thread thread;
public StreamTask(int taskId, Processor processor, JobWorker worker) {
this.taskId = taskId;
this.processor = processor;
this.worker = worker;
prepareTask();
this.thread = new Thread(Ray.wrapRunnable(this), this.getClass().getName()
+ "-" + System.currentTimeMillis());
this.thread.setDaemon(true);
}
private void prepareTask() {
Map<String, String> queueConf = new HashMap<>();
worker.getConfig().forEach((k, v) -> queueConf.put(k, String.valueOf(v)));
String queueSize = (String) worker.getConfig()
.getOrDefault(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT);
queueConf.put(Config.CHANNEL_SIZE, queueSize);
queueConf.put(Config.TASK_JOB_ID, Ray.getRuntimeContext().getCurrentJobId().toString());
String channelType = (String) worker.getConfig()
.getOrDefault(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL);
queueConf.put(Config.CHANNEL_TYPE, channelType);
ExecutionGraph executionGraph = worker.getExecutionGraph();
ExecutionNode executionNode = worker.getExecutionNode();
// writers
writers = new HashMap<>();
List<ExecutionEdge> outputEdges = executionNode.getOutputEdges();
List<Collector> collectors = new ArrayList<>();
for (ExecutionEdge edge : outputEdges) {
Map<String, ActorId> outputActorIds = new HashMap<>();
Map<Integer, RayActor<JobWorker>> taskId2Worker = executionGraph
.getTaskId2WorkerByNodeId(edge.getTargetNodeId());
taskId2Worker.forEach((targetTaskId, targetActor) -> {
String queueName = ChannelID.genIdStr(taskId, targetTaskId, executionGraph.getBuildTime());
outputActorIds.put(queueName, targetActor.getId());
});
if (!outputActorIds.isEmpty()) {
List<String> channelIDs = new ArrayList<>();
List<ActorId> toActorIds = new ArrayList<>();
outputActorIds.forEach((k, v) -> {
channelIDs.add(k);
toActorIds.add(v);
});
DataWriter writer = new DataWriter(channelIDs, toActorIds, queueConf);
LOG.info("Create DataWriter succeed.");
writers.put(edge, writer);
collectors.add(new OutputCollector(channelIDs, writer, edge.getPartition()));
}
}
// consumer
List<ExecutionEdge> inputEdges = executionNode.getInputsEdges();
Map<String, ActorId> inputActorIds = new HashMap<>();
for (ExecutionEdge edge : inputEdges) {
Map<Integer, RayActor<JobWorker>> taskId2Worker = executionGraph
.getTaskId2WorkerByNodeId(edge.getSrcNodeId());
taskId2Worker.forEach((srcTaskId, srcActor) -> {
String queueName = ChannelID.genIdStr(srcTaskId, taskId, executionGraph.getBuildTime());
inputActorIds.put(queueName, srcActor.getId());
});
}
if (!inputActorIds.isEmpty()) {
List<String> channelIDs = new ArrayList<>();
List<ActorId> fromActorIds = new ArrayList<>();
inputActorIds.forEach((k, v) -> {
channelIDs.add(k);
fromActorIds.add(v);
});
LOG.info("Register queue consumer, queues {}.", channelIDs);
reader = new DataReader(channelIDs, fromActorIds, queueConf);
}
RuntimeContext runtimeContext = new RayRuntimeContext(
worker.getExecutionTask(), worker.getConfig(), executionNode.getParallelism());
processor.open(collectors, runtimeContext);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
// Make DataReader stop read data when MockQueue destructor gets called to avoid crash
StreamTask.this.cancelTask();
} catch (Exception e) {
e.printStackTrace();
}
}));
}
protected abstract void init() throws Exception;
protected abstract void cancelTask() throws Exception;
public void start() {
this.thread.start();
LOG.info("started {}-{}", this.getClass().getSimpleName(), taskId);
}
}
@@ -0,0 +1 @@
org.ray.streaming.runtime.schedule.JobSchedulerImpl
@@ -0,0 +1,77 @@
package org.ray.streaming.runtime.demo;
import com.google.common.collect.ImmutableMap;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.util.Config;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class WordCountTest implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class);
// TODO(zhenxuanpan): this test only works in single-process mode, because we put
// results in this in-memory map.
static Map<String, Integer> wordCount = new ConcurrentHashMap<>();
@Test
public void testWordCount() {
StreamingContext streamingContext = StreamingContext.buildContext();
Map<String, Object> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, 1);
config.put(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL);
streamingContext.withConfig(config);
List<String> text = new ArrayList<>();
text.add("hello world eagle eagle eagle");
StreamSource<String> streamSource = StreamSource.buildSource(streamingContext, text);
streamSource
.flatMap((FlatMapFunction<String, WordAndCount>) (value, collector) -> {
String[] records = value.split(" ");
for (String record : records) {
collector.collect(new WordAndCount(record, 1));
}
})
.keyBy(pair -> pair.word)
.reduce((ReduceFunction<WordAndCount>) (oldValue, newValue) ->
new WordAndCount(oldValue.word, oldValue.count + newValue.count))
.sink((SinkFunction<WordAndCount>)
result -> wordCount.put(result.word, result.count));
streamingContext.execute();
// Sleep until the count for every word is computed.
while (wordCount.size() < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.warn("Got an exception while sleeping.", e);
}
}
Assert.assertEquals(wordCount, ImmutableMap.of("eagle", 3, "hello", 1, "world", 1));
}
private static class WordAndCount implements Serializable {
public final String word;
public final Integer count;
public WordAndCount(String key, Integer count) {
this.word = key;
this.count = count;
}
}
}
@@ -0,0 +1,75 @@
package org.ray.streaming.runtime.schedule;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Lists;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.api.id.ObjectId;
import org.ray.runtime.actor.LocalModeRayActor;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.runtime.core.graph.ExecutionEdge;
import org.ray.streaming.runtime.core.graph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.ExecutionNode;
import org.ray.streaming.runtime.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.runtime.worker.JobWorker;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TaskAssignImplTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignImplTest.class);
@Test
public void testTaskAssignImpl() {
Plan plan = buildDataSyncPlan();
List<RayActor<JobWorker>> workers = new ArrayList<>();
for(int i = 0; i < plan.getPlanVertexList().size(); i++) {
workers.add(new LocalModeRayActor(ActorId.fromRandom(), ObjectId.fromRandom()));
}
ITaskAssign taskAssign = new TaskAssignImpl();
ExecutionGraph executionGraph = taskAssign.assign(plan, workers);
List<ExecutionNode> executionNodeList = executionGraph.getExecutionNodeList();
Assert.assertEquals(executionNodeList.size(), 2);
ExecutionNode sourceNode = executionNodeList.get(0);
Assert.assertEquals(sourceNode.getNodeType(), NodeType.SOURCE);
Assert.assertEquals(sourceNode.getExecutionTasks().size(), 1);
Assert.assertEquals(sourceNode.getOutputEdges().size(), 1);
List<ExecutionEdge> sourceExecutionEdges = sourceNode.getOutputEdges();
Assert.assertEquals(sourceExecutionEdges.size(), 1);
ExecutionEdge source2Sink = sourceExecutionEdges.get(0);
Assert.assertEquals(source2Sink.getPartition().getClass(), RoundRobinPartition.class);
ExecutionNode sinkNode = executionNodeList.get(1);
Assert.assertEquals(sinkNode.getNodeType(), NodeType.SINK);
Assert.assertEquals(sinkNode.getExecutionTasks().size(), 1);
Assert.assertEquals(sinkNode.getOutputEdges().size(), 0);
}
public Plan buildDataSyncPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
}
}
@@ -0,0 +1,234 @@
package org.ray.streaming.runtime.streamingqueue;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.ActorCreationOptions.Builder;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.util.EnvUtil;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class StreamingQueueTest implements Serializable {
private static Logger LOGGER = LoggerFactory.getLogger(StreamingQueueTest.class);
static {
EnvUtil.loadNativeLibraries();
}
@org.testng.annotations.BeforeSuite
public void suiteSetUp() throws Exception {
LOGGER.info("Do set up");
String management = ManagementFactory.getRuntimeMXBean().getName();
String pid = management.split("@")[0];
LOGGER.info("StreamingQueueTest pid: {}", pid);
LOGGER.info("java.library.path = {}", System.getProperty("java.library.path"));
}
@org.testng.annotations.AfterSuite
public void suiteTearDown() throws Exception {
LOGGER.warn("Do tear down");
}
@BeforeClass
public void setUp() {
}
@BeforeMethod
void beforeMethod() {
LOGGER.info("beforeTest");
Ray.shutdown();
System.setProperty("ray.resources", "CPU:4,RES-A:4");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.redirect-output", "true");
// ray init
Ray.init();
}
@AfterMethod
void afterMethod() {
LOGGER.info("afterTest");
Ray.shutdown();
System.clearProperty("ray.run-mode");
}
@Test(timeOut = 3000000)
public void testReaderWriter() {
LOGGER.info("StreamingQueueTest.testReaderWriter run-mode: {}",
System.getProperty("ray.run-mode"));
Ray.shutdown();
System.setProperty("ray.resources", "CPU:4,RES-A:4");
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.redirect-output", "true");
// ray init
Ray.init();
ActorCreationOptions.Builder builder = new Builder();
RayActor<WriterWorker> writerActor = Ray.createActor(WriterWorker::new, "writer",
builder.createActorCreationOptions());
RayActor<ReaderWorker> readerActor = Ray.createActor(ReaderWorker::new, "reader",
builder.createActorCreationOptions());
LOGGER.info("call getName on writerActor: {}",
Ray.call(WriterWorker::getName, writerActor).get());
LOGGER.info("call getName on readerActor: {}",
Ray.call(ReaderWorker::getName, readerActor).get());
// LOGGER.info(Ray.call(WriterWorker::testCallReader, writerActor, readerActor).get());
List<String> outputQueueList = new ArrayList<>();
List<String> inputQueueList = new ArrayList<>();
int queueNum = 2;
for (int i = 0; i < queueNum; ++i) {
String qid = ChannelID.genRandomIdStr();
LOGGER.info("getRandomQueueId: {}", qid);
inputQueueList.add(qid);
outputQueueList.add(qid);
readerActor.getId();
}
final int msgCount = 100;
Ray.call(ReaderWorker::init, readerActor, inputQueueList, writerActor, msgCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Ray.call(WriterWorker::init, writerActor, outputQueueList, readerActor, msgCount);
long time = 0;
while (time < 20000 &&
Ray.call(ReaderWorker::getTotalMsg, readerActor).get() < msgCount * queueNum) {
try {
Thread.sleep(1000);
time += 1000;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Assert.assertEquals(
Ray.call(ReaderWorker::getTotalMsg, readerActor).get().intValue(),
msgCount * queueNum);
}
@Test(timeOut = 60000)
public void testWordCount() {
LOGGER.info("StreamingQueueTest.testWordCount run-mode: {}",
System.getProperty("ray.run-mode"));
String resultFile = "/tmp/org.ray.streaming.runtime.streamingqueue.testWordCount.txt";
deleteResultFile(resultFile);
Map<String, Integer> wordCount = new ConcurrentHashMap<>();
StreamingContext streamingContext = StreamingContext.buildContext();
Map<String, Object> config = new HashMap<>();
config.put(Config.STREAMING_BATCH_MAX_COUNT, 1);
config.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL);
config.put(Config.CHANNEL_SIZE, "100000");
streamingContext.withConfig(config);
List<String> text = new ArrayList<>();
text.add("hello world eagle eagle eagle");
StreamSource<String> streamSource = StreamSource.buildSource(streamingContext, text);
streamSource
.flatMap((FlatMapFunction<String, WordAndCount>) (value, collector) -> {
String[] records = value.split(" ");
for (String record : records) {
collector.collect(new WordAndCount(record, 1));
}
})
.keyBy(pair -> pair.word)
.reduce((ReduceFunction<WordAndCount>) (oldValue, newValue) -> {
LOGGER.info("reduce: {} {}", oldValue, newValue);
return new WordAndCount(oldValue.word, oldValue.count + newValue.count);
})
.sink(s -> {
LOGGER.info("sink {} {}", s.word, s.count);
wordCount.put(s.word, s.count);
serializeResultToFile(resultFile, wordCount);
});
streamingContext.execute();
Map<String, Integer> checkWordCount =
(Map<String, Integer>) deserializeResultFromFile(resultFile);
// Sleep until the count for every word is computed.
while (checkWordCount == null || checkWordCount.size() < 3) {
LOGGER.info("sleep");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.warn("Got an exception while sleeping.", e);
}
checkWordCount = (Map<String, Integer>) deserializeResultFromFile(resultFile);
}
LOGGER.info("check");
Assert.assertEquals(checkWordCount,
ImmutableMap.of("eagle", 3, "hello", 1, "world", 1));
}
private void serializeResultToFile(String fileName, Object obj) {
try {
ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(fileName));
out.writeObject(obj);
} catch (Exception e) {
LOGGER.error(String.valueOf(e));
}
}
private Object deserializeResultFromFile(String fileName) {
Map<String, Integer> checkWordCount = null;
try {
ObjectInputStream in = new ObjectInputStream(new FileInputStream(fileName));
checkWordCount = (Map<String, Integer>) in.readObject();
Assert.assertEquals(checkWordCount,
ImmutableMap.of("eagle", 3, "hello", 1, "world", 1));
} catch (Exception e) {
LOGGER.error(String.valueOf(e));
}
return checkWordCount;
}
private static class WordAndCount implements Serializable {
public final String word;
public final Integer count;
public WordAndCount(String key, Integer count) {
this.word = key;
this.count = count;
}
}
private void deleteResultFile(String path) {
File file = new File(path);
file.deleteOnExit();
}
}
@@ -0,0 +1,280 @@
package org.ray.streaming.runtime.streamingqueue;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.annotation.RayRemote;
import org.ray.api.id.ActorId;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.streaming.runtime.transfer.ChannelID;
import org.ray.streaming.runtime.transfer.DataMessage;
import org.ray.streaming.runtime.transfer.DataReader;
import org.ray.streaming.runtime.transfer.DataWriter;
import org.ray.streaming.runtime.transfer.TransferHandler;
import org.ray.streaming.util.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
public class Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
protected TransferHandler transferHandler = null;
public Worker() {
transferHandler = new TransferHandler(((RayMultiWorkerNativeRuntime) Ray.internal())
.getCurrentRuntime().getNativeCoreWorkerPointer(),
new JavaFunctionDescriptor(Worker.class.getName(),
"onWriterMessage", "([B)V"),
new JavaFunctionDescriptor(Worker.class.getName(),
"onWriterMessageSync", "([B)[B"),
new JavaFunctionDescriptor(Worker.class.getName(),
"onReaderMessage", "([B)V"),
new JavaFunctionDescriptor(Worker.class.getName(),
"onReaderMessageSync", "([B)[B"));
}
public void onReaderMessage(byte[] buffer) {
transferHandler.onReaderMessage(buffer);
}
public byte[] onReaderMessageSync(byte[] buffer) {
return transferHandler.onReaderMessageSync(buffer);
}
public void onWriterMessage(byte[] buffer) {
transferHandler.onWriterMessage(buffer);
}
public byte[] onWriterMessageSync(byte[] buffer) {
return transferHandler.onWriterMessageSync(buffer);
}
}
@RayRemote
class ReaderWorker extends Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(ReaderWorker.class);
private String name = null;
private List<String> inputQueueList = null;
private List<ActorId> inputActorIds = new ArrayList<>();
private DataReader dataReader = null;
private long handler = 0;
private RayActor peerActor = null;
private int msgCount = 0;
private int totalMsg = 0;
public ReaderWorker(String name) {
LOGGER.info("ReaderWorker constructor");
this.name = name;
}
public String getName() {
String management = ManagementFactory.getRuntimeMXBean().getName();
String pid = management.split("@")[0];
LOGGER.info("pid: {} name: {}", pid, name);
return name;
}
public String testRayCall() {
LOGGER.info("testRayCall called");
return "testRayCall";
}
public boolean init(List<String> inputQueueList, RayActor peer, int msgCount) {
this.inputQueueList = inputQueueList;
this.peerActor = peer;
this.msgCount = msgCount;
LOGGER.info("ReaderWorker init");
LOGGER.info("java.library.path = {}", System.getProperty("java.library.path"));
for (String queue : this.inputQueueList) {
inputActorIds.add(this.peerActor.getId());
LOGGER.info("ReaderWorker actorId: {}", this.peerActor.getId());
}
Map<String, String> conf = new HashMap<>();
conf.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL);
conf.put(Config.CHANNEL_SIZE, "100000");
conf.put(Config.STREAMING_JOB_NAME, "integrationTest1");
dataReader = new DataReader(inputQueueList, inputActorIds, conf);
// Should not GetBundle in RayCall thread
Thread readThread = new Thread(Ray.wrapRunnable(new Runnable() {
@Override
public void run() {
consume();
}
}));
readThread.start();
LOGGER.info("ReaderWorker init done");
return true;
}
public final void consume() {
int checkPointId = 1;
for (int i = 0; i < msgCount * inputQueueList.size(); ++i) {
DataMessage dataMessage = dataReader.read(100);
if (dataMessage == null) {
LOGGER.error("dataMessage is null");
i--;
continue;
}
int bufferSize = dataMessage.body().remaining();
int dataSize = dataMessage.body().getInt();
// check size
LOGGER.info("capacity {} bufferSize {} dataSize {}",
dataMessage.body().capacity(), bufferSize, dataSize);
Assert.assertEquals(bufferSize, dataSize);
if (dataMessage instanceof DataMessage) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("{} : {} message.", i, dataMessage.toString());
}
// check content
for (int j = 0; j < dataSize - 4; ++j) {
Assert.assertEquals(dataMessage.body().get(), (byte) j);
}
} else {
LOGGER.error("unknown message type");
Assert.fail();
}
totalMsg++;
}
LOGGER.info("ReaderWorker consume data done.");
}
void onQueueTransfer(long handler, byte[] buffer) {
}
public boolean done() {
return totalMsg == msgCount;
}
public int getTotalMsg() {
return totalMsg;
}
}
@RayRemote
class WriterWorker extends Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(WriterWorker.class);
private String name = null;
private List<String> outputQueueList = null;
private List<ActorId> outputActorIds = new ArrayList<>();
DataWriter dataWriter = null;
RayActor peerActor = null;
int msgCount = 0;
public WriterWorker(String name) {
this.name = name;
}
public String getName() {
String management = ManagementFactory.getRuntimeMXBean().getName();
String pid = management.split("@")[0];
LOGGER.info("pid: {} name: {}", pid, name);
return name;
}
public String testCallReader(RayActor readerActor) {
String name = (String) Ray.call(ReaderWorker::getName, readerActor).get();
LOGGER.info("testCallReader: {}", name);
return name;
}
public boolean init(List<String> outputQueueList, RayActor peer, int msgCount) {
this.outputQueueList = outputQueueList;
this.peerActor = peer;
this.msgCount = msgCount;
LOGGER.info("WriterWorker init:");
for (String queue : this.outputQueueList) {
outputActorIds.add(this.peerActor.getId());
LOGGER.info("WriterWorker actorId: {}", this.peerActor.getId());
}
LOGGER.info("Peer isDirectActorCall: {}", ((NativeRayActor) peer).isDirectCallActor());
int count = 3;
while (count-- != 0) {
Ray.call(ReaderWorker::testRayCall, peer).get();
}
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, String> conf = new HashMap<>();
conf.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL);
conf.put(Config.CHANNEL_SIZE, "100000");
conf.put(Config.STREAMING_JOB_NAME, "integrationTest1");
dataWriter = new DataWriter(this.outputQueueList, this.outputActorIds, conf);
Thread writerThread = new Thread(Ray.wrapRunnable(new Runnable() {
@Override
public void run() {
produce();
}
}));
writerThread.start();
LOGGER.info("WriterWorker init done");
return true;
}
public final void produce() {
int checkPointId = 1;
Random random = new Random();
this.msgCount = 100;
for (int i = 0; i < this.msgCount; ++i) {
for (int j = 0; j < outputQueueList.size(); ++j) {
LOGGER.info("WriterWorker produce");
int dataSize = (random.nextInt(100)) + 10;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("dataSize: {}", dataSize);
}
ByteBuffer bb = ByteBuffer.allocate(dataSize);
bb.putInt(dataSize);
for (int k = 0; k < dataSize - 4; ++k) {
bb.put((byte) k);
}
bb.clear();
ChannelID qid = ChannelID.from(outputQueueList.get(j));
dataWriter.write(qid, bb);
}
}
try {
Thread.sleep(20 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,22 @@
package org.ray.streaming.runtime.transfer;
import static org.testng.Assert.assertEquals;
import org.ray.streaming.runtime.util.EnvUtil;
import org.testng.annotations.Test;
public class ChannelIDTest {
static {
EnvUtil.loadNativeLibraries();
}
@Test
public void testIdStrToBytes() {
String idStr = ChannelID.genRandomIdStr();
assertEquals(idStr.length(), ChannelID.ID_LENGTH * 2);
assertEquals(ChannelID.idStrToBytes(idStr).length, ChannelID.ID_LENGTH);
}
}

Some files were not shown because too many files have changed in this diff Show More