From ec5ecb661fde4c1dfc9a3749c91ba80a38a31481 Mon Sep 17 00:00:00 2001 From: Tianyi Chen Date: Wed, 10 Jun 2020 14:13:55 +0800 Subject: [PATCH] [Streaming] Implement streaming job-worker. (#8780) --- .../api/context/StreamingContext.java | 12 +- .../io/ray/streaming/client/JobClient.java | 17 ++ .../ray/streaming/schedule/JobScheduler.java | 18 --- .../runtime/client/JobClientImpl.java | 55 +++++++ .../runtime/config/StreamingGlobalConfig.java | 4 + .../runtime/config/StreamingWorkerConfig.java | 20 +++ .../runtime/config/global/TransferConfig.java | 66 ++++++++ .../runtime/config/master/ResourceConfig.java | 4 +- .../config/types/TransferChannelType.java | 29 ++++ .../config/worker/WorkerInternalConfig.java | 27 ++++ .../runtime/core/graph/ExecutionEdge.java | 47 ------ .../runtime/core/graph/ExecutionGraph.java | 99 ------------ .../runtime/core/graph/ExecutionNode.java | 126 --------------- .../runtime/core/graph/ExecutionTask.java | 45 ------ .../graph/executiongraph/ExecutionEdge.java | 33 ++-- .../graph/executiongraph/ExecutionGraph.java | 15 +- .../executiongraph/ExecutionJobEdge.java | 29 ++-- .../executiongraph/ExecutionJobVertex.java | 62 ++++--- .../graph/executiongraph/ExecutionVertex.java | 127 ++++++++++----- .../runtime/core/resource/Container.java | 14 +- .../streaming/runtime/master/JobMaster.java | 4 +- .../master/graphmanager/GraphManagerImpl.java | 5 +- .../strategy/impl/PipelineFirstStrategy.java | 2 +- .../master/scheduler/JobSchedulerImpl.java | 17 +- .../controller/WorkerLifecycleController.java | 24 +-- .../runtime/python/GraphPbBuilder.java | 124 ++++++++------ .../runtime/rpc/RemoteCallWorker.java | 34 ++-- .../runtime/schedule/JobSchedulerImpl.java | 92 ----------- .../runtime/schedule/TaskAssigner.java | 17 -- .../runtime/schedule/TaskAssignerImpl.java | 86 ---------- .../runtime/transfer/ChannelUtils.java | 70 +++++--- .../runtime/transfer/DataReader.java | 32 ++-- .../runtime/transfer/DataWriter.java | 30 ++-- .../streaming/runtime/util/CommonUtils.java | 14 ++ .../streaming/runtime/worker/JobWorker.java | 152 +++++++++++------- .../worker/context/JobWorkerContext.java | 46 ++++-- ...text.java => StreamingRuntimeContext.java} | 10 +- .../runtime/worker/context/WorkerContext.java | 41 ----- .../runtime/worker/tasks/InputStreamTask.java | 18 +-- .../worker/tasks/OneInputStreamTask.java | 9 +- .../worker/tasks/SourceStreamTask.java | 26 ++- .../runtime/worker/tasks/StreamTask.java | 136 ++++++++-------- .../worker/tasks/TwoInputStreamTask.java | 23 +++ .../io.ray.streaming.client.JobClient | 1 + .../io.ray.streaming.schedule.JobScheduler | 1 - .../core/graph/ExecutionGraphTest.java | 14 +- .../streaming/runtime/demo/WordCountTest.java | 9 +- ...bSchedulerTest.java => JobClientTest.java} | 2 +- .../schedule/TaskAssignerImplTest.java | 67 -------- .../streamingqueue/StreamingQueueTest.java | 11 +- .../runtime/streamingqueue/Worker.java | 19 ++- streaming/python/runtime/graph.py | 134 ++++++++------- streaming/python/runtime/task.py | 43 ++--- streaming/python/runtime/worker.py | 45 +++--- streaming/python/tests/test_word_count.py | 2 +- streaming/src/protobuf/remote_call.proto | 82 +++++----- 56 files changed, 1078 insertions(+), 1213 deletions(-) create mode 100644 streaming/java/streaming-api/src/main/java/io/ray/streaming/client/JobClient.java delete mode 100644 streaming/java/streaming-api/src/main/java/io/ray/streaming/schedule/JobScheduler.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/TransferConfig.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/types/TransferChannelType.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/worker/WorkerInternalConfig.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionEdge.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionGraph.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionNode.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionTask.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/JobSchedulerImpl.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssigner.java delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssignerImpl.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/CommonUtils.java rename streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/{RayRuntimeContext.java => StreamingRuntimeContext.java} (90%) delete mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/WorkerContext.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/TwoInputStreamTask.java create mode 100644 streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.client.JobClient delete mode 100644 streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.schedule.JobScheduler rename streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/{JobSchedulerTest.java => JobClientTest.java} (85%) delete mode 100644 streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/TaskAssignerImplTest.java diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java index c18fb7cef..b5cb0a931 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java @@ -3,9 +3,9 @@ package io.ray.streaming.api.context; import com.google.common.base.Preconditions; import io.ray.api.Ray; import io.ray.streaming.api.stream.StreamSink; +import io.ray.streaming.client.JobClient; import io.ray.streaming.jobgraph.JobGraph; import io.ray.streaming.jobgraph.JobGraphBuilder; -import io.ray.streaming.schedule.JobScheduler; import io.ray.streaming.util.Config; import java.io.Serializable; import java.util.ArrayList; @@ -74,12 +74,12 @@ public class StreamingContext implements Serializable { LOG.info("Reuse existing cluster."); } - ServiceLoader serviceLoader = ServiceLoader.load(JobScheduler.class); - Iterator iterator = serviceLoader.iterator(); + ServiceLoader serviceLoader = ServiceLoader.load(JobClient.class); + Iterator iterator = serviceLoader.iterator(); Preconditions.checkArgument(iterator.hasNext(), - "No JobScheduler implementation has been provided."); - JobScheduler jobSchedule = iterator.next(); - jobSchedule.schedule(jobGraph, jobConfig); + "No JobClient implementation has been provided."); + JobClient jobClient = iterator.next(); + jobClient.submit(jobGraph, jobConfig); } public int generateId() { diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/client/JobClient.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/client/JobClient.java new file mode 100644 index 000000000..445b4bd02 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/client/JobClient.java @@ -0,0 +1,17 @@ +package io.ray.streaming.client; + +import io.ray.streaming.jobgraph.JobGraph; +import java.util.Map; + +/** + * Interface of the job client. + */ +public interface JobClient { + + /** + * Submit job with logical plan to run. + * + * @param jobGraph The logical plan. + */ + void submit(JobGraph jobGraph, Map conf); +} diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/schedule/JobScheduler.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/schedule/JobScheduler.java deleted file mode 100644 index 150fde10f..000000000 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/schedule/JobScheduler.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.ray.streaming.schedule; - - -import io.ray.streaming.jobgraph.JobGraph; -import java.util.Map; - -/** - * Interface of the job scheduler. - */ -public interface JobScheduler { - - /** - * Assign logical plan to physical execution graph, and schedule job to run. - * - * @param jobGraph The logical plan. - */ - void schedule(JobGraph jobGraph, Map conf); -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java new file mode 100644 index 000000000..0a5f7ddb5 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java @@ -0,0 +1,55 @@ +package io.ray.streaming.runtime.client; + +import io.ray.api.ActorHandle; +import io.ray.api.ObjectRef; +import io.ray.api.Ray; +import io.ray.api.options.ActorCreationOptions; +import io.ray.streaming.client.JobClient; +import io.ray.streaming.jobgraph.JobGraph; +import io.ray.streaming.runtime.config.global.CommonConfig; +import io.ray.streaming.runtime.master.JobMaster; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job client: to submit job from api to runtime. + */ +public class JobClientImpl implements JobClient { + + public static final Logger LOG = LoggerFactory.getLogger(JobClientImpl.class); + + private ActorHandle jobMasterActor; + + @Override + public void submit(JobGraph jobGraph, Map jobConfig) { + LOG.info("Submitting job [{}] with job graph [{}] and job config [{}].", + jobGraph.getJobName(), jobGraph, jobConfig); + Map resources = new HashMap<>(); + ActorCreationOptions options = new ActorCreationOptions.Builder() + .setResources(resources) + .setMaxRestarts(-1) + .createActorCreationOptions(); + + // set job name and id at start + jobConfig.put(CommonConfig.JOB_ID, Ray.getRuntimeContext().getCurrentJobId().toString()); + jobConfig.put(CommonConfig.JOB_NAME, jobGraph.getJobName()); + + jobGraph.getJobConfig().putAll(jobConfig); + + // create job master actor + this.jobMasterActor = Ray.createActor(JobMaster::new, jobConfig, options); + + try { + ObjectRef submitResult = jobMasterActor.call(JobMaster::submitJob, + jobMasterActor, jobGraph); + + if (submitResult.get()) { + LOG.info("Finish submitting job: {}.", jobGraph.getJobName()); + } + } catch (Exception e) { + LOG.error("Failed to submit job: {}.", jobGraph.getJobName(), e); + } + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingGlobalConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingGlobalConfig.java index 1658ee8ee..ac7ede0a9 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingGlobalConfig.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingGlobalConfig.java @@ -2,6 +2,7 @@ package io.ray.streaming.runtime.config; import com.google.common.base.Preconditions; import io.ray.streaming.runtime.config.global.CommonConfig; +import io.ray.streaming.runtime.config.global.TransferConfig; import java.io.Serializable; import java.lang.reflect.Method; import java.util.HashMap; @@ -20,6 +21,7 @@ public class StreamingGlobalConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(StreamingGlobalConfig.class); public final CommonConfig commonConfig; + public final TransferConfig transferConfig; public final Map configMap; @@ -27,6 +29,7 @@ public class StreamingGlobalConfig implements Serializable { configMap = new HashMap<>(conf); commonConfig = ConfigFactory.create(CommonConfig.class, conf); + transferConfig = ConfigFactory.create(TransferConfig.class, conf); globalConfig2Map(); } @@ -38,6 +41,7 @@ public class StreamingGlobalConfig implements Serializable { private void globalConfig2Map() { try { configMap.putAll(config2Map(this.commonConfig)); + configMap.putAll(config2Map(this.transferConfig)); } catch (Exception e) { LOG.error("Couldn't convert global config to a map.", e); } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingWorkerConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingWorkerConfig.java index 6cd7e4009..f682fd3ea 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingWorkerConfig.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingWorkerConfig.java @@ -1,6 +1,9 @@ package io.ray.streaming.runtime.config; +import io.ray.streaming.runtime.config.worker.WorkerInternalConfig; +import java.util.HashMap; import java.util.Map; +import org.aeonbits.owner.ConfigFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +14,24 @@ public class StreamingWorkerConfig extends StreamingGlobalConfig { private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkerConfig.class); + public WorkerInternalConfig workerInternalConfig; + public StreamingWorkerConfig(final Map conf) { super(conf); + workerInternalConfig = ConfigFactory.create(WorkerInternalConfig.class, conf); + + configMap.putAll(workerConfig2Map()); } + + public Map workerConfig2Map() { + Map result = new HashMap<>(); + try { + result.putAll(config2Map(this.workerInternalConfig)); + } catch (Exception e) { + LOG.error("Worker config to map occur error.", e); + return null; + } + return result; + } + } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/TransferConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/TransferConfig.java new file mode 100644 index 000000000..7508dee2d --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/TransferConfig.java @@ -0,0 +1,66 @@ +package io.ray.streaming.runtime.config.global; + +import io.ray.streaming.runtime.config.Config; +import io.ray.streaming.runtime.config.types.TransferChannelType; + +/** + * Job data transfer config. + */ +public interface TransferConfig extends Config { + + /** + * Data transfer channel type, support memory queue and native queue. + */ + @DefaultValue(value = "NATIVE_CHANNEL") + @Key(value = io.ray.streaming.util.Config.CHANNEL_TYPE) + TransferChannelType channelType(); + + /** + * Queue size. + */ + @DefaultValue(value = "100000000") + @Key(value = io.ray.streaming.util.Config.CHANNEL_SIZE) + long channelSize(); + + /** + * DataRead read timeout. + */ + @DefaultValue(value = "false") + @Key(value = io.ray.streaming.util.Config.IS_RECREATE) + boolean readerIsRecreate(); + + /** + * Return from DataReader.getBundle if only empty message read in this interval. + */ + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.TIMER_INTERVAL_MS) + long readerTimerIntervalMs(); + + /** + * Ring capacity. + */ + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.STREAMING_RING_BUFFER_CAPACITY) + int ringBufferCapacity(); + + /** + * Write an empty message if there is no data to be written in this interval. + */ + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.STREAMING_EMPTY_MESSAGE_INTERVAL) + int emptyMsgInterval(); + + // Flow control + + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.FLOW_CONTROL_TYPE) + int flowControlType(); + + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.WRITER_CONSUMED_STEP) + int writerConsumedStep(); + + @DefaultValue(value = "-1") + @Key(value = io.ray.streaming.util.Config.READER_CONSUMED_STEP) + int readerConsumedStep(); +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/ResourceConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/ResourceConfig.java index 6ca336be2..d2a6d0859 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/ResourceConfig.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/ResourceConfig.java @@ -54,14 +54,14 @@ public interface ResourceConfig extends Config { /** * Whether to enable CPU limit in resource control. */ - @DefaultValue(value = "true") + @DefaultValue(value = "false") @Key(value = TASK_RESOURCE_CPU_LIMIT_ENABLE) boolean isTaskCpuResourceLimit(); /** * Whether to enable memory limit in resource control. */ - @DefaultValue(value = "true") + @DefaultValue(value = "false") @Key(value = TASK_RESOURCE_MEM_LIMIT_ENABLE) boolean isTaskMemResourceLimit(); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/types/TransferChannelType.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/types/TransferChannelType.java new file mode 100644 index 000000000..f69458a1a --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/types/TransferChannelType.java @@ -0,0 +1,29 @@ +package io.ray.streaming.runtime.config.types; + +/** + * Data transfer channel type. + */ +public enum TransferChannelType { + + /** + * Memory queue. + */ + MEMORY_CHANNEL("memory_channel", 0), + + /** + * Native queue. + */ + NATIVE_CHANNEL("native_channel", 1); + + private String value; + private int index; + + TransferChannelType(String value, int index) { + this.value = value; + this.index = index; + } + + public String getValue() { + return value; + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/worker/WorkerInternalConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/worker/WorkerInternalConfig.java new file mode 100644 index 000000000..345485cf6 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/worker/WorkerInternalConfig.java @@ -0,0 +1,27 @@ +package io.ray.streaming.runtime.config.worker; + +import io.ray.streaming.runtime.config.Config; +import org.aeonbits.owner.Mutable; + +/** + * This worker config is used by JobMaster to define the internal configuration of JobWorker. + */ +public interface WorkerInternalConfig extends Config, Mutable { + + String WORKER_NAME_INTERNAL = io.ray.streaming.util.Config.STREAMING_WORKER_NAME; + String OP_NAME_INTERNAL = io.ray.streaming.util.Config.STREAMING_OP_NAME; + + /** + * The name of the worker inside the system. + */ + @DefaultValue(value = "default-worker-name") + @Key(value = WORKER_NAME_INTERNAL) + String workerName(); + + /** + * Operator name corresponding to worker. + */ + @DefaultValue(value = "default-worker-op-name") + @Key(value = OP_NAME_INTERNAL) + String workerOperatorName(); +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionEdge.java deleted file mode 100644 index fc5c37c16..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionEdge.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.ray.streaming.runtime.core.graph; - -import io.ray.streaming.api.partition.Partition; -import java.io.Serializable; - -/** - * An edge in the physical execution graph. - */ -public class ExecutionEdge implements Serializable { - private int srcNodeId; - private int targetNodeId; - private Partition partition; - - public ExecutionEdge(int srcNodeId, int targetNodeId, Partition partition) { - this.srcNodeId = srcNodeId; - this.targetNodeId = targetNodeId; - this.partition = partition; - } - - public int getSrcNodeId() { - return srcNodeId; - } - - public void setSrcNodeId(int srcNodeId) { - this.srcNodeId = srcNodeId; - } - - public int getTargetNodeId() { - return targetNodeId; - } - - public void setTargetNodeId(int targetNodeId) { - this.targetNodeId = targetNodeId; - } - - public Partition getPartition() { - return partition; - } - - public void setPartition(Partition partition) { - this.partition = partition; - } - - public String getStream() { - return "stream:" + srcNodeId + "-" + targetNodeId; - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionGraph.java deleted file mode 100644 index 55b2a101f..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionGraph.java +++ /dev/null @@ -1,99 +0,0 @@ -package io.ray.streaming.runtime.core.graph; - -import io.ray.api.BaseActorHandle; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Physical execution graph. - * - *

Notice: Temporary implementation for now to keep functional. This will be changed to - * {@link ExecutionGraph} later when new stream task implementation is ready. - */ -public class ExecutionGraph implements Serializable { - private long buildTime; - private List executionNodeList; - private List sourceWorkers = new ArrayList<>(); - private List sinkWorkers = new ArrayList<>(); - - public ExecutionGraph(List executionNodes) { - this.executionNodeList = executionNodes; - for (ExecutionNode executionNode : executionNodeList) { - if (executionNode.getNodeType() == ExecutionNode.NodeType.SOURCE) { - List actors = executionNode.getExecutionTasks().stream() - .map(ExecutionTask::getWorker).collect(Collectors.toList()); - sourceWorkers.addAll(actors); - } - if (executionNode.getNodeType() == ExecutionNode.NodeType.SINK) { - List actors = executionNode.getExecutionTasks().stream() - .map(ExecutionTask::getWorker).collect(Collectors.toList()); - sinkWorkers.addAll(actors); - } - } - buildTime = System.currentTimeMillis(); - } - - public List getSourceWorkers() { - return sourceWorkers; - } - - public List getSinkWorkers() { - return sinkWorkers; - } - - public List getExecutionNodeList() { - return executionNodeList; - } - - public ExecutionTask getExecutionTaskByTaskId(int taskId) { - for (ExecutionNode executionNode : executionNodeList) { - for (ExecutionTask executionTask : executionNode.getExecutionTasks()) { - if (executionTask.getTaskId() == taskId) { - return executionTask; - } - } - } - throw new RuntimeException("Task " + taskId + " does not exist!"); - } - - public ExecutionNode getExecutionNodeByNodeId(int nodeId) { - for (ExecutionNode executionNode : executionNodeList) { - if (executionNode.getNodeId() == nodeId) { - return executionNode; - } - } - throw new RuntimeException("Node " + nodeId + " does not exist!"); - } - - public ExecutionNode getExecutionNodeByTaskId(int taskId) { - for (ExecutionNode executionNode : executionNodeList) { - for (ExecutionTask executionTask : executionNode.getExecutionTasks()) { - if (executionTask.getTaskId() == taskId) { - return executionNode; - } - } - } - throw new RuntimeException("Task " + taskId + " does not exist!"); - } - - public Map getTaskId2WorkerByNodeId(int nodeId) { - for (ExecutionNode executionNode : executionNodeList) { - if (executionNode.getNodeId() == nodeId) { - Map taskId2Worker = new HashMap<>(); - for (ExecutionTask executionTask : executionNode.getExecutionTasks()) { - taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker()); - } - return taskId2Worker; - } - } - throw new RuntimeException("Node " + nodeId + " does not exist!"); - } - - public long getBuildTime() { - return buildTime; - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionNode.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionNode.java deleted file mode 100644 index 409d7d49d..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionNode.java +++ /dev/null @@ -1,126 +0,0 @@ -package io.ray.streaming.runtime.core.graph; - -import io.ray.streaming.api.Language; -import io.ray.streaming.jobgraph.VertexType; -import io.ray.streaming.operator.StreamOperator; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * A node in the physical execution graph. - */ -public class ExecutionNode implements Serializable { - private int nodeId; - private int parallelism; - private Map config; - private NodeType nodeType; - private StreamOperator streamOperator; - private List executionTasks; - private List inputsEdges; - private List outputEdges; - - public ExecutionNode(int nodeId, int parallelism, Map config) { - this.nodeId = nodeId; - this.parallelism = parallelism; - this.config = config; - this.executionTasks = new ArrayList<>(); - this.inputsEdges = new ArrayList<>(); - this.outputEdges = new ArrayList<>(); - } - - public int getNodeId() { - return nodeId; - } - - public void setNodeId(int nodeId) { - this.nodeId = nodeId; - } - - public int getParallelism() { - return parallelism; - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public Map getConfig() { - return config; - } - - public List getExecutionTasks() { - return executionTasks; - } - - public void setExecutionTasks(List executionTasks) { - this.executionTasks = executionTasks; - } - - public List getOutputEdges() { - return outputEdges; - } - - public void setOutputEdges(List outputEdges) { - this.outputEdges = outputEdges; - } - - public void addOutputEdge(ExecutionEdge executionEdge) { - this.outputEdges.add(executionEdge); - } - - public void addInputEdge(ExecutionEdge executionEdge) { - this.inputsEdges.add(executionEdge); - } - - public List getInputsEdges() { - return inputsEdges; - } - - public StreamOperator getStreamOperator() { - return streamOperator; - } - - public void setStreamOperator(StreamOperator streamOperator) { - this.streamOperator = streamOperator; - } - - public Language getLanguage() { - return streamOperator.getLanguage(); - } - - public NodeType getNodeType() { - return nodeType; - } - - public void setNodeType(VertexType vertexType) { - switch (vertexType) { - case SOURCE: - this.nodeType = NodeType.SOURCE; - break; - case SINK: - this.nodeType = NodeType.SINK; - break; - default: - this.nodeType = NodeType.TRANSFORM; - } - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("ExecutionNode{"); - sb.append("nodeId=").append(nodeId); - sb.append(", parallelism=").append(parallelism); - sb.append(", nodeType=").append(nodeType); - sb.append(", streamOperator=").append(streamOperator); - sb.append('}'); - return sb.toString(); - } - - public enum NodeType { - SOURCE, - TRANSFORM, - SINK, - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionTask.java deleted file mode 100644 index 36502fca8..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/ExecutionTask.java +++ /dev/null @@ -1,45 +0,0 @@ -package io.ray.streaming.runtime.core.graph; - -import io.ray.api.BaseActorHandle; -import java.io.Serializable; - -/** - * ExecutionTask is minimal execution unit. - *

- * An ExecutionNode has n ExecutionTasks if parallelism is n. - */ -public class ExecutionTask implements Serializable { - private int taskId; - private int taskIndex; - private BaseActorHandle worker; - - public ExecutionTask(int taskId, int taskIndex, BaseActorHandle worker) { - this.taskId = taskId; - this.taskIndex = taskIndex; - this.worker = worker; - } - - public int getTaskId() { - return taskId; - } - - public void setTaskId(int taskId) { - this.taskId = taskId; - } - - public int getTaskIndex() { - return taskIndex; - } - - public void setTaskIndex(int taskIndex) { - this.taskIndex = taskIndex; - } - - public BaseActorHandle getWorker() { - return worker; - } - - public void setWorker(BaseActorHandle worker) { - this.worker = worker; - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java index 3779788b3..a98c77a16 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java @@ -12,12 +12,12 @@ public class ExecutionEdge implements Serializable { /** * The source(upstream) execution vertex. */ - private final ExecutionVertex sourceVertex; + private final ExecutionVertex sourceExecutionVertex; /** * The target(downstream) execution vertex. */ - private final ExecutionVertex targetVertex; + private final ExecutionVertex targetExecutionVertex; /** * The partition of current execution edge's execution job edge. @@ -29,32 +29,35 @@ public class ExecutionEdge implements Serializable { */ private final String executionEdgeIndex; - public ExecutionEdge(ExecutionVertex sourceVertex, ExecutionVertex targetVertex, + public ExecutionEdge( + ExecutionVertex sourceExecutionVertex, + ExecutionVertex targetExecutionVertex, ExecutionJobEdge executionJobEdge) { - this.sourceVertex = sourceVertex; - this.targetVertex = targetVertex; + this.sourceExecutionVertex = sourceExecutionVertex; + this.targetExecutionVertex = targetExecutionVertex; this.partition = executionJobEdge.getPartition(); this.executionEdgeIndex = generateExecutionEdgeIndex(); } private String generateExecutionEdgeIndex() { - return sourceVertex.getId() + "—" + targetVertex.getId(); + return sourceExecutionVertex.getExecutionVertexId() + "—" + + targetExecutionVertex.getExecutionVertexId(); } - public ExecutionVertex getSourceVertex() { - return sourceVertex; + public ExecutionVertex getSourceExecutionVertex() { + return sourceExecutionVertex; } - public ExecutionVertex getTargetVertex() { - return targetVertex; + public ExecutionVertex getTargetExecutionVertex() { + return targetExecutionVertex; } public int getSourceVertexId() { - return sourceVertex.getId(); + return sourceExecutionVertex.getExecutionVertexId(); } public int getTargetVertexId() { - return targetVertex.getId(); + return targetExecutionVertex.getExecutionVertexId(); } public Partition getPartition() { @@ -68,10 +71,10 @@ public class ExecutionEdge implements Serializable { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("sourceVertex", sourceVertex) - .add("targetVertex", targetVertex) + .add("source", sourceExecutionVertex) + .add("target", targetExecutionVertex) .add("partition", partition) - .add("executionEdgeIndex", executionEdgeIndex) + .add("index", executionEdgeIndex) .toString(); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java index cebc4dad4..6a0dc9b58 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java @@ -1,7 +1,6 @@ package io.ray.streaming.runtime.core.graph.executiongraph; -import io.ray.api.ActorHandle; -import io.ray.streaming.runtime.worker.JobWorker; +import io.ray.api.BaseActorHandle; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -130,7 +129,7 @@ public class ExecutionGraph implements Serializable { public ExecutionVertex getExecutionJobVertexByJobVertexId(int vertexId) { for (ExecutionJobVertex executionJobVertex : executionJobVertexMap.values()) { for (ExecutionVertex executionVertex : executionJobVertex.getExecutionVertices()) { - if (executionVertex.getId() == vertexId) { + if (executionVertex.getExecutionVertexId() == vertexId) { return executionVertex; } } @@ -143,7 +142,7 @@ public class ExecutionGraph implements Serializable { * * @return actor list */ - public List> getAllActors() { + public List getAllActors() { return getActorsFromJobVertices(getExecutionJobVertexList()); } @@ -152,7 +151,7 @@ public class ExecutionGraph implements Serializable { * * @return actor list */ - public List> getSourceActors() { + public List getSourceActors() { List executionJobVertices = getExecutionJobVertexList().stream() .filter(ExecutionJobVertex::isSourceVertex) .collect(Collectors.toList()); @@ -165,7 +164,7 @@ public class ExecutionGraph implements Serializable { * * @return actor list */ - public List> getNonSourceActors() { + public List getNonSourceActors() { List executionJobVertices = getExecutionJobVertexList().stream() .filter(executionJobVertex -> executionJobVertex.isTransformationVertex() || executionJobVertex.isSinkVertex()) @@ -179,7 +178,7 @@ public class ExecutionGraph implements Serializable { * * @return actor list */ - public List> getSinkActors() { + public List getSinkActors() { List executionJobVertices = getExecutionJobVertexList().stream() .filter(ExecutionJobVertex::isSinkVertex) .collect(Collectors.toList()); @@ -193,7 +192,7 @@ public class ExecutionGraph implements Serializable { * @param executionJobVertices specified job vertices * @return actor list */ - public List> getActorsFromJobVertices( + public List getActorsFromJobVertices( List executionJobVertices) { return executionJobVertices.stream() .map(ExecutionJobVertex::getExecutionVertices) diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java index b357ccb2e..6ab2fd911 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobEdge.java @@ -12,12 +12,12 @@ public class ExecutionJobEdge { /** * The source(upstream) execution job vertex. */ - private final ExecutionJobVertex sourceVertex; + private final ExecutionJobVertex sourceExecutionJobVertex; /** * The target(downstream) execution job vertex. */ - private final ExecutionJobVertex targetVertex; + private final ExecutionJobVertex targetExecutionJobVertex; /** * The partition of the execution job edge. @@ -29,24 +29,27 @@ public class ExecutionJobEdge { */ private final String executionJobEdgeIndex; - public ExecutionJobEdge(ExecutionJobVertex sourceVertex, ExecutionJobVertex targetVertex, + public ExecutionJobEdge( + ExecutionJobVertex sourceExecutionJobVertex, + ExecutionJobVertex targetExecutionJobVertex, JobEdge jobEdge) { - this.sourceVertex = sourceVertex; - this.targetVertex = targetVertex; + this.sourceExecutionJobVertex = sourceExecutionJobVertex; + this.targetExecutionJobVertex = targetExecutionJobVertex; this.partition = jobEdge.getPartition(); this.executionJobEdgeIndex = generateExecutionJobEdgeIndex(); } private String generateExecutionJobEdgeIndex() { - return sourceVertex.getJobVertexId() + "—" + targetVertex.getJobVertexId(); + return sourceExecutionJobVertex.getExecutionJobVertexId() + "—" + + targetExecutionJobVertex.getExecutionJobVertexId(); } - public ExecutionJobVertex getSourceVertex() { - return sourceVertex; + public ExecutionJobVertex getSourceExecutionJobVertex() { + return sourceExecutionJobVertex; } - public ExecutionJobVertex getTargetVertex() { - return targetVertex; + public ExecutionJobVertex getTargetExecutionJobVertex() { + return targetExecutionJobVertex; } public Partition getPartition() { @@ -56,10 +59,10 @@ public class ExecutionJobEdge { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("srcVertex", sourceVertex) - .add("targetVertex", targetVertex) + .add("source", sourceExecutionJobVertex) + .add("target", targetExecutionJobVertex) .add("partition", partition) - .add("executionJobEdgeIndex", executionJobEdgeIndex) + .add("index", executionJobEdgeIndex) .toString(); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java index 60e16434f..380485cd3 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java @@ -2,13 +2,12 @@ package io.ray.streaming.runtime.core.graph.executiongraph; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import io.ray.api.ActorHandle; +import io.ray.api.BaseActorHandle; import io.ray.streaming.api.Language; import io.ray.streaming.jobgraph.JobVertex; import io.ray.streaming.jobgraph.VertexType; import io.ray.streaming.operator.StreamOperator; import io.ray.streaming.runtime.config.master.ResourceConfig; -import io.ray.streaming.runtime.worker.JobWorker; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -25,18 +24,20 @@ import org.aeonbits.owner.ConfigFactory; public class ExecutionJobVertex { /** - * Unique id of operator(use {@link JobVertex}'s id). Used as jobVertex's id. + * Unique id. Use {@link JobVertex}'s id directly. */ - private final int jobVertexId; + private final int executionJobVertexId; /** - * Unique name of operator(use {@link StreamOperator}'s name). Used as jobVertex's name. + * Use jobVertex id and operator(use {@link StreamOperator}'s name) as name. + * e.g. 1-SourceOperator */ - private final String jobVertexName; + private final String executionJobVertexName; private final StreamOperator streamOperator; private final VertexType vertexType; private final Language language; private final Map jobConfig; + private final long buildTime; /** * Parallelism of current execution job vertex(operator). @@ -55,18 +56,23 @@ public class ExecutionJobVertex { private List outputEdges = new ArrayList<>(); public ExecutionJobVertex( - JobVertex jobVertex, Map jobConfig, AtomicInteger idGenerator) { - this.jobVertexId = jobVertex.getVertexId(); - this.jobVertexName = jobVertex.getStreamOperator().getName(); + JobVertex jobVertex, + Map jobConfig, + AtomicInteger idGenerator, + long buildTime) { + this.executionJobVertexId = jobVertex.getVertexId(); + this.executionJobVertexName = generateExecutionJobVertexName( + executionJobVertexId, jobVertex.getStreamOperator().getName()); this.streamOperator = jobVertex.getStreamOperator(); this.vertexType = jobVertex.getVertexType(); this.language = jobVertex.getLanguage(); this.jobConfig = jobConfig; + this.buildTime = buildTime; this.parallelism = jobVertex.getParallelism(); - this.executionVertices = createExecutionVertics(idGenerator); + this.executionVertices = createExecutionVertices(idGenerator); } - private List createExecutionVertics(AtomicInteger idGenerator) { + private List createExecutionVertices(AtomicInteger idGenerator) { List executionVertices = new ArrayList<>(); ResourceConfig resourceConfig = ConfigFactory.create(ResourceConfig.class, jobConfig); @@ -77,8 +83,12 @@ public class ExecutionJobVertex { return executionVertices; } - public Map> getExecutionVertexWorkers() { - Map> executionVertexWorkersMap = new HashMap<>(); + private String generateExecutionJobVertexName(int jobVertexId, String streamOperatorName) { + return jobVertexId + "-" + streamOperatorName; + } + + public Map getExecutionVertexWorkers() { + Map executionVertexWorkersMap = new HashMap<>(); Preconditions.checkArgument( executionVertices != null && !executionVertices.isEmpty(), @@ -87,18 +97,18 @@ public class ExecutionJobVertex { Preconditions.checkArgument( vertex.getWorkerActor() != null, "Empty execution vertex worker actor."); - executionVertexWorkersMap.put(vertex.getId(), vertex.getWorkerActor()); + executionVertexWorkersMap.put(vertex.getExecutionVertexId(), vertex.getWorkerActor()); }); return executionVertexWorkersMap; } - public int getJobVertexId() { - return jobVertexId; + public int getExecutionJobVertexId() { + return executionJobVertexId; } - public String getJobVertexName() { - return jobVertexName; + public String getExecutionJobVertexName() { + return executionJobVertexName; } /** @@ -106,8 +116,8 @@ public class ExecutionJobVertex { * * @return operator name with index */ - public String getVertexNameWithIndex() { - return jobVertexId + "-" + jobVertexName; + public String getExecutionJobVertexNameWithIndex() { + return executionJobVertexId + "-" + executionJobVertexName; } public int getParallelism() { @@ -153,6 +163,14 @@ public class ExecutionJobVertex { return language; } + public Map getJobConfig() { + return jobConfig; + } + + public long getBuildTime() { + return buildTime; + } + public boolean isSourceVertex() { return getVertexType() == VertexType.SOURCE; } @@ -168,8 +186,8 @@ public class ExecutionJobVertex { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("jobVertexId", jobVertexId) - .add("jobVertexName", jobVertexName) + .add("executionJobVertexId", executionJobVertexId) + .add("executionJobVertexName", executionJobVertexName) .add("vertexType", vertexType) .add("parallelism", parallelism) .toString(); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java index ed11dfeaf..a2cc313a6 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java @@ -1,7 +1,7 @@ package io.ray.streaming.runtime.core.graph.executiongraph; import com.google.common.base.MoreObjects; -import io.ray.api.ActorHandle; +import io.ray.api.BaseActorHandle; import io.ray.api.id.ActorId; import io.ray.streaming.api.Language; import io.ray.streaming.jobgraph.VertexType; @@ -9,13 +9,13 @@ import io.ray.streaming.operator.StreamOperator; import io.ray.streaming.runtime.config.master.ResourceConfig; import io.ray.streaming.runtime.core.resource.ContainerID; import io.ray.streaming.runtime.core.resource.ResourceType; -import io.ray.streaming.runtime.worker.JobWorker; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Physical vertex, correspond to {@link ExecutionJobVertex}. @@ -25,31 +25,51 @@ public class ExecutionVertex implements Serializable { /** * Unique id for execution vertex. */ - private final int id; + private final int executionVertexId; /** * Immutable field inherited from {@link ExecutionJobVertex}. */ - private final int jobVertexId; - private final String jobVertexName; + private final int executionJobVertexId; + private final String executionJobVertexName; private final StreamOperator streamOperator; private final VertexType vertexType; private final Language language; + private final long buildTime; /** - * Resources used by ExecutionVertex. + * Resource used by ExecutionVertex. */ - private final Map resources; + private final Map resource; + + /** + * Parallelism of current vertex's operator. + */ + private int parallelism; /** * Ordered sub index for execution vertex in a execution job vertex. * Might be changed in dynamic scheduling. */ - private int vertexIndex; + private int executionVertexIndex; private ExecutionVertexState state = ExecutionVertexState.TO_ADD; + + /** + * The id of the container which this vertex's worker actor belongs to. + */ private ContainerID containerId; - private ActorHandle workerActor; + + /** + * Worker actor handle. + */ + private BaseActorHandle workerActor; + + /** + * Op config + job config. + */ + private Map workerConfig; + private List inputEdges = new ArrayList<>(); private List outputEdges = new ArrayList<>(); @@ -58,26 +78,43 @@ public class ExecutionVertex implements Serializable { int index, ExecutionJobVertex executionJobVertex, ResourceConfig resourceConfig) { - this.id = globalIndex; - this.jobVertexId = executionJobVertex.getJobVertexId(); - this.jobVertexName = executionJobVertex.getJobVertexName(); + this.executionVertexId = globalIndex; + this.executionJobVertexId = executionJobVertex.getExecutionJobVertexId(); + this.executionJobVertexName = executionJobVertex.getExecutionJobVertexName(); this.streamOperator = executionJobVertex.getStreamOperator(); this.vertexType = executionJobVertex.getVertexType(); this.language = executionJobVertex.getLanguage(); - this.vertexIndex = index; - this.resources = generateResources(resourceConfig); + this.buildTime = executionJobVertex.getBuildTime(); + this.parallelism = executionJobVertex.getParallelism(); + this.executionVertexIndex = index; + this.resource = generateResources(resourceConfig); + this.workerConfig = genWorkerConfig(executionJobVertex.getJobConfig()); } - public int getId() { - return id; + private Map genWorkerConfig(Map jobConfig) { + Map workerConfig = new HashMap<>(); + workerConfig.putAll(jobConfig); + return workerConfig; } - public int getJobVertexId() { - return jobVertexId; + public int getExecutionVertexId() { + return executionVertexId; } - public String getJobVertexName() { - return jobVertexName; + /** + * Unique name generated by execution job vertex name and index of current execution vertex. + * e.g. 1-SourceOperator-3 (vertex index is 3) + */ + public String getExecutionVertexName() { + return executionJobVertexName + "-" + executionVertexIndex; + } + + public int getExecutionJobVertexId() { + return executionJobVertexId; + } + + public String getExecutionJobVertexName() { + return executionJobVertexName; } public StreamOperator getStreamOperator() { @@ -92,16 +129,12 @@ public class ExecutionVertex implements Serializable { return language; } - public int getVertexIndex() { - return vertexIndex; + public int getParallelism() { + return parallelism; } - /** - * Unique name generated by vertex name and index for execution vertex. - * e.g. 1-SourceOperator-3 (vertex index is 3) - */ - public String getVertexName() { - return jobVertexId + "-" + jobVertexName + "-" + vertexIndex; + public int getExecutionVertexIndex() { + return executionVertexIndex; } public ExecutionVertexState getState() { @@ -124,7 +157,7 @@ public class ExecutionVertex implements Serializable { return state == ExecutionVertexState.TO_DEL; } - public ActorHandle getWorkerActor() { + public BaseActorHandle getWorkerActor() { return workerActor; } @@ -132,7 +165,7 @@ public class ExecutionVertex implements Serializable { return workerActor.getId(); } - public void setWorkerActor(ActorHandle workerActor) { + public void setWorkerActor(BaseActorHandle workerActor) { this.workerActor = workerActor; } @@ -154,8 +187,28 @@ public class ExecutionVertex implements Serializable { this.outputEdges = outputEdges; } - public Map getResources() { - return resources; + public List getInputVertices() { + return inputEdges.stream() + .map(ExecutionEdge::getSourceExecutionVertex) + .collect(Collectors.toList()); + } + + public List getOutputVertices() { + return outputEdges.stream() + .map(ExecutionEdge::getTargetExecutionVertex) + .collect(Collectors.toList()); + } + + public Map getResource() { + return resource; + } + + public Map getWorkerConfig() { + return workerConfig; + } + + public long getBuildTime() { + return buildTime; } public ContainerID getContainerId() { @@ -186,22 +239,22 @@ public class ExecutionVertex implements Serializable { @Override public boolean equals(Object obj) { if (obj instanceof ExecutionVertex) { - return this.id == ((ExecutionVertex)obj).getId(); + return this.executionVertexId == ((ExecutionVertex)obj).getExecutionVertexId(); } return false; } @Override public int hashCode() { - return Objects.hash(id, outputEdges); + return Objects.hash(executionVertexId, outputEdges); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("id", id) - .add("name", getVertexName()) - .add("resources", resources) + .add("id", executionVertexId) + .add("name", getExecutionVertexName()) + .add("resources", resource) .add("state", state) .add("containerId", containerId) .add("workerActor", workerActor) diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Container.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Container.java index a1641cc9b..d3ba22f70 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Container.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Container.java @@ -138,21 +138,21 @@ public class Container implements Serializable { public void allocateActor(ExecutionVertex vertex) { LOG.info("Allocating vertex [{}] in container [{}].", vertex, this); - executionVertexIds.add(vertex.getId()); + executionVertexIds.add(vertex.getExecutionVertexId()); vertex.setContainerIfNotExist(this.getId()); // Binding dynamic resource - vertex.getResources().put(getName(), 1.0); - decreaseResource(vertex.getResources()); + vertex.getResource().put(getName(), 1.0); + decreaseResource(vertex.getResource()); } public void releaseActor(ExecutionVertex vertex) { LOG.info("Release actor, vertex: {}, container: {}.", vertex, vertex.getContainerId()); - if (executionVertexIds.contains(vertex.getId())) { - executionVertexIds.removeIf(id -> id == vertex.getId()); - reclaimResource(vertex.getResources()); + if (executionVertexIds.contains(vertex.getExecutionVertexId())) { + executionVertexIds.removeIf(id -> id == vertex.getExecutionVertexId()); + reclaimResource(vertex.getResource()); } else { throw new RuntimeException(String.format("Current container [%s] not found vertex [%s].", - this, vertex.getJobVertexName())); + this, vertex.getExecutionJobVertexName())); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java index 8c45b3966..60d3a0843 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java @@ -30,7 +30,7 @@ public class JobMaster { private GraphManager graphManager; private StreamingMasterConfig conf; - private ActorHandle jobMasterActor; + private ActorHandle jobMasterActor; public JobMaster(Map confMap) { LOG.info("Creating job master with conf: {}.", confMap); @@ -101,7 +101,7 @@ public class JobMaster { return true; } - public ActorHandle getJobMasterActor() { + public ActorHandle getJobMasterActor() { return jobMasterActor; } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java index f44c00402..287d526c8 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java @@ -53,7 +53,10 @@ public class GraphManagerImpl implements GraphManager { int jobVertexId = jobVertex.getVertexId(); exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex( - jobVertex, jobConfig, executionGraph.getExecutionVertexIdGenerator())); + jobVertex, + jobConfig, + executionGraph.getExecutionVertexIdGenerator(), + buildTime)); } // connect vertex diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java index 4fd93e5c7..1539922fb 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java @@ -84,7 +84,7 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy { continue; } ExecutionVertex executionVertex = exeVertices.get(i); - Map requiredResource = executionVertex.getResources(); + Map requiredResource = executionVertex.getResource(); if (requiredResource.containsKey(ResourceType.CPU.getValue())) { LOG.info("Required resource contain {} value : {}, no limitation by default.", ResourceType.CPU, requiredResource.get(ResourceType.CPU.getValue())); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java index 85178c3ce..6b9b3a690 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java @@ -153,29 +153,28 @@ public class JobSchedulerImpl implements JobScheduler { */ protected Map buildWorkersContext( ExecutionGraph executionGraph) { - ActorHandle masterActor = jobMaster.getJobMasterActor(); + ActorHandle masterActor = jobMaster.getJobMasterActor(); // build workers' context - Map needRegistryVertexToContextMap = new HashMap<>(); + Map vertexToContextMap = new HashMap<>(); executionGraph.getAllExecutionVertices().forEach(vertex -> { - JobWorkerContext ctx = buildJobWorkerContext(vertex, masterActor); - needRegistryVertexToContextMap.put(vertex, ctx); + JobWorkerContext context = buildJobWorkerContext(vertex, masterActor); + vertexToContextMap.put(vertex, context); }); - return needRegistryVertexToContextMap; + return vertexToContextMap; } private JobWorkerContext buildJobWorkerContext( ExecutionVertex executionVertex, ActorHandle masterActor) { - // create worker context - JobWorkerContext ctx = new JobWorkerContext( - executionVertex.getWorkerActorId(), + // create java worker context + JobWorkerContext context = new JobWorkerContext( masterActor, executionVertex ); - return ctx; + return context; } /** diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java index c503b3d0a..fe7963d98 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java @@ -1,9 +1,10 @@ package io.ray.streaming.runtime.master.scheduler.controller; -import io.ray.api.ActorHandle; +import io.ray.api.BaseActorHandle; import io.ray.api.ObjectRef; import io.ray.api.Ray; import io.ray.api.WaitResult; +import io.ray.api.function.PyActorClass; import io.ray.api.id.ActorId; import io.ray.api.options.ActorCreationOptions; import io.ray.streaming.api.Language; @@ -41,17 +42,22 @@ public class WorkerLifecycleController { */ private boolean createWorker(ExecutionVertex executionVertex) { LOG.info("Start to create worker actor for vertex: {} with resource: {}.", - executionVertex.getVertexName(), executionVertex.getResources()); + executionVertex.getExecutionVertexName(), executionVertex.getResource()); Language language = executionVertex.getLanguage(); ActorCreationOptions options = new ActorCreationOptions.Builder() - .setResources(executionVertex.getResources()) + .setResources(executionVertex.getResource()) .setMaxRestarts(-1) .createActorCreationOptions(); - ActorHandle actor = null; - // TODO (datayjz): ray create actor + BaseActorHandle actor; + if (Language.JAVA == language) { + actor = Ray.createActor(JobWorker::new, options); + } else { + actor = Ray.createActor( + new PyActorClass("ray.streaming.runtime.worker", "JobWorker")); + } if (null == actor) { LOG.error("Create worker actor failed."); @@ -61,7 +67,7 @@ public class WorkerLifecycleController { executionVertex.setWorkerActor(actor); LOG.info("Worker actor created, actor: {}, vertex: {}.", - executionVertex.getWorkerActorId(), executionVertex.getVertexName()); + executionVertex.getWorkerActorId(), executionVertex.getExecutionVertexName()); return true; } @@ -139,15 +145,15 @@ public class WorkerLifecycleController { } private boolean destroyWorker(ExecutionVertex executionVertex) { - ActorHandle rayActor = executionVertex.getWorkerActor(); + BaseActorHandle rayActor = executionVertex.getWorkerActor(); LOG.info("Begin destroying worker[vertex={}, actor={}].", - executionVertex.getVertexName(), rayActor.getId()); + executionVertex.getExecutionVertexName(), rayActor.getId()); boolean destroyResult = RemoteCallWorker.shutdownWithoutReconstruction(rayActor); if (!destroyResult) { LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.", - executionVertex.getVertexName(), rayActor); + executionVertex.getExecutionVertexName(), rayActor); return false; } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/GraphPbBuilder.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/GraphPbBuilder.java index 34dc68a9b..b66e1a125 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/GraphPbBuilder.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/GraphPbBuilder.java @@ -8,70 +8,98 @@ import io.ray.streaming.operator.Operator; import io.ray.streaming.python.PythonFunction; import io.ray.streaming.python.PythonOperator; import io.ray.streaming.python.PythonPartition; -import io.ray.streaming.runtime.core.graph.ExecutionEdge; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; -import io.ray.streaming.runtime.core.graph.ExecutionTask; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; import io.ray.streaming.runtime.generated.RemoteCall; import io.ray.streaming.runtime.generated.Streaming; import io.ray.streaming.runtime.serialization.MsgPackSerializer; import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; public class GraphPbBuilder { private MsgPackSerializer serializer = new MsgPackSerializer(); - /** - * For simple scenario, a single ExecutionNode is enough. But some cases may need - * sub-graph information, so we serialize entire graph. - */ - public RemoteCall.ExecutionGraph buildExecutionGraphPb(ExecutionGraph graph) { - RemoteCall.ExecutionGraph.Builder builder = RemoteCall.ExecutionGraph.newBuilder(); - builder.setBuildTime(graph.getBuildTime()); - for (ExecutionNode node : graph.getExecutionNodeList()) { - RemoteCall.ExecutionGraph.ExecutionNode.Builder nodeBuilder = - RemoteCall.ExecutionGraph.ExecutionNode.newBuilder(); - nodeBuilder.setNodeId(node.getNodeId()); - nodeBuilder.setParallelism(node.getParallelism()); - nodeBuilder.setNodeType( - Streaming.NodeType.valueOf(node.getNodeType().name())); - nodeBuilder.setLanguage(Streaming.Language.valueOf(node.getLanguage().name())); - byte[] operatorBytes = serializeOperator(node.getStreamOperator()); - nodeBuilder.setOperator(ByteString.copyFrom(operatorBytes)); + public RemoteCall.ExecutionVertexContext buildExecutionVertexContext( + ExecutionVertex executionVertex) { + RemoteCall.ExecutionVertexContext.Builder builder = + RemoteCall.ExecutionVertexContext.newBuilder(); - // build tasks - for (ExecutionTask task : node.getExecutionTasks()) { - RemoteCall.ExecutionGraph.ExecutionTask.Builder taskBuilder = - RemoteCall.ExecutionGraph.ExecutionTask.newBuilder(); - byte[] serializedActorHandle = ((NativeActorHandle) task.getWorker()).toBytes(); - taskBuilder - .setTaskId(task.getTaskId()) - .setTaskIndex(task.getTaskIndex()) - .setWorkerActor(ByteString.copyFrom(serializedActorHandle)); - nodeBuilder.addExecutionTasks(taskBuilder.build()); - } + // build vertex + builder.setCurrentExecutionVertex(buildVertex(executionVertex)); - // build edges - for (ExecutionEdge edge : node.getInputsEdges()) { - nodeBuilder.addInputEdges(buildEdgePb(edge)); - } - for (ExecutionEdge edge : node.getOutputEdges()) { - nodeBuilder.addOutputEdges(buildEdgePb(edge)); - } + // build upstream vertices + List upstreamVertices = executionVertex.getInputVertices(); + List upstreamVertexPbs = + upstreamVertices.stream() + .map(this::buildVertex) + .collect(Collectors.toList()); + builder.addAllUpstreamExecutionVertices(upstreamVertexPbs); - builder.addExecutionNodes(nodeBuilder.build()); - } + // build downstream vertices + List downstreamVertices = executionVertex.getOutputVertices(); + List downstreamVertexPbs = + downstreamVertices.stream() + .map(this::buildVertex) + .collect(Collectors.toList()); + builder.addAllDownstreamExecutionVertices(downstreamVertexPbs); + + // build input edges + List inputEdges = executionVertex.getInputEdges(); + List inputEdgesPbs = + inputEdges.stream() + .map(this::buildEdge) + .collect(Collectors.toList()); + builder.addAllInputExecutionEdges(inputEdgesPbs); + + // build output edges + List outputEdges = executionVertex.getOutputEdges(); + List outputEdgesPbs = + outputEdges.stream() + .map(this::buildEdge) + .collect(Collectors.toList()); + builder.addAllOutputExecutionEdges(outputEdgesPbs); return builder.build(); } - private RemoteCall.ExecutionGraph.ExecutionEdge buildEdgePb(ExecutionEdge edge) { - RemoteCall.ExecutionGraph.ExecutionEdge.Builder edgeBuilder = - RemoteCall.ExecutionGraph.ExecutionEdge.newBuilder(); - edgeBuilder.setSrcNodeId(edge.getSrcNodeId()); - edgeBuilder.setTargetNodeId(edge.getTargetNodeId()); - edgeBuilder.setPartition(ByteString.copyFrom(serializePartition(edge.getPartition()))); - return edgeBuilder.build(); + private RemoteCall.ExecutionVertexContext.ExecutionVertex buildVertex( + ExecutionVertex executionVertex) { + // build vertex infos + RemoteCall.ExecutionVertexContext.ExecutionVertex.Builder executionVertexBuilder = + RemoteCall.ExecutionVertexContext.ExecutionVertex.newBuilder(); + executionVertexBuilder.setExecutionVertexId(executionVertex.getExecutionVertexId()); + executionVertexBuilder.setExecutionJobVertexId(executionVertex.getExecutionJobVertexId()); + executionVertexBuilder.setExecutionJobVertexName(executionVertex.getExecutionJobVertexName()); + executionVertexBuilder.setExecutionVertexIndex(executionVertex.getExecutionVertexIndex()); + executionVertexBuilder.setParallelism(executionVertex.getParallelism()); + executionVertexBuilder.setOperator( + ByteString.copyFrom( + serializeOperator(executionVertex.getStreamOperator()))); + executionVertexBuilder.setWorkerActor( + ByteString.copyFrom( + ((NativeActorHandle) (executionVertex.getWorkerActor())).toBytes())); + executionVertexBuilder.setContainerId(executionVertex.getContainerId().toString()); + executionVertexBuilder.setBuildTime(executionVertex.getBuildTime()); + executionVertexBuilder.setLanguage( + Streaming.Language.valueOf(executionVertex.getLanguage().name())); + executionVertexBuilder.putAllConfig(executionVertex.getWorkerConfig()); + executionVertexBuilder.putAllResource(executionVertex.getResource()); + + return executionVertexBuilder.build(); + } + + private RemoteCall.ExecutionVertexContext.ExecutionEdge buildEdge(ExecutionEdge executionEdge) { + // build edge infos + RemoteCall.ExecutionVertexContext.ExecutionEdge.Builder executionEdgeBuilder = + RemoteCall.ExecutionVertexContext.ExecutionEdge.newBuilder(); + executionEdgeBuilder.setSourceExecutionVertexId(executionEdge.getSourceVertexId()); + executionEdgeBuilder.setTargetExecutionVertexId(executionEdge.getTargetVertexId()); + executionEdgeBuilder.setPartition( + ByteString.copyFrom(serializePartition(executionEdge.getPartition()))); + + return executionEdgeBuilder.build(); } private byte[] serializeOperator(Operator operator) { diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java index 683a2c0d2..33e6771c0 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java @@ -1,7 +1,10 @@ package io.ray.streaming.runtime.rpc; import io.ray.api.ActorHandle; +import io.ray.api.BaseActorHandle; import io.ray.api.ObjectRef; +import io.ray.api.PyActorHandle; +import io.ray.api.function.PyActorMethod; import io.ray.streaming.runtime.master.JobMaster; import io.ray.streaming.runtime.worker.JobWorker; import io.ray.streaming.runtime.worker.context.JobWorkerContext; @@ -20,16 +23,23 @@ public class RemoteCallWorker { * Call JobWorker actor to init. * * @param actor target JobWorker actor - * @param ctx JobWorker's context + * @param context JobWorker's context * @return init result */ - public static ObjectRef initWorker(ActorHandle actor, JobWorkerContext ctx) { - LOG.info("Call worker to init, actor: {}, context: {}.", actor.getId(), ctx); - ObjectRef result = null; + public static ObjectRef initWorker(BaseActorHandle actor, JobWorkerContext context) { + LOG.info("Call worker to initiate, actor: {}, context: {}.", actor.getId(), context); + ObjectRef result; - // TODO (datayjz): ray call worker to initiate + // python + if (actor instanceof PyActorHandle) { + result = ((PyActorHandle) actor).call( + new PyActorMethod("init", Object.class), context.getPythonWorkerContextBytes()); + } else { + // java + result = ((ActorHandle) actor).call(JobWorker::init, context); + } - LOG.info("Finished calling worker to init."); + LOG.info("Finished calling worker to initiate."); return result; } @@ -39,11 +49,17 @@ public class RemoteCallWorker { * @param actor target JobWorker actor * @return start result */ - public static ObjectRef startWorker(ActorHandle actor) { + public static ObjectRef startWorker(BaseActorHandle actor) { LOG.info("Call worker to start, actor: {}.", actor.getId()); ObjectRef result = null; - // TODO (datayjz): ray call worker to start + // python + if (actor instanceof PyActorHandle) { + result = ((PyActorHandle) actor).call(new PyActorMethod("start", Object.class)); + } else { + // java + result = ((ActorHandle) actor).call(JobWorker::start); + } LOG.info("Finished calling worker to start."); return result; @@ -55,7 +71,7 @@ public class RemoteCallWorker { * @param actor target JobWorker actor * @return destroy result */ - public static Boolean shutdownWithoutReconstruction(ActorHandle actor) { + public static Boolean shutdownWithoutReconstruction(BaseActorHandle actor) { LOG.info("Call worker to shutdown without reconstruction, actor is {}.", actor.getId()); Boolean result = false; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/JobSchedulerImpl.java deleted file mode 100644 index d081bb2bf..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/JobSchedulerImpl.java +++ /dev/null @@ -1,92 +0,0 @@ -package io.ray.streaming.runtime.schedule; - -import io.ray.api.ActorHandle; -import io.ray.api.BaseActorHandle; -import io.ray.api.ObjectRef; -import io.ray.api.PyActorHandle; -import io.ray.api.Ray; -import io.ray.api.function.PyActorMethod; -import io.ray.streaming.api.Language; -import io.ray.streaming.jobgraph.JobGraph; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; -import io.ray.streaming.runtime.core.graph.ExecutionTask; -import io.ray.streaming.runtime.generated.RemoteCall; -import io.ray.streaming.runtime.python.GraphPbBuilder; -import io.ray.streaming.runtime.worker.JobWorker; -import io.ray.streaming.runtime.worker.context.WorkerContext; -import io.ray.streaming.schedule.JobScheduler; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * JobSchedulerImpl schedules workers by the Plan and the resource information - * from ResourceManager. - */ -public class JobSchedulerImpl implements JobScheduler { - private JobGraph jobGraph; - private Map jobConfig; - private TaskAssigner taskAssigner; - - public JobSchedulerImpl() { - this.taskAssigner = new TaskAssignerImpl(); - } - - /** - * Schedule physical plan to execution graph, and call streaming worker to init and run. - */ - @SuppressWarnings("unchecked") - @Override - public void schedule(JobGraph jobGraph, Map jobConfig) { - this.jobConfig = jobConfig; - this.jobGraph = jobGraph; - - ExecutionGraph executionGraph = this.taskAssigner.assign(this.jobGraph); - List executionNodes = executionGraph.getExecutionNodeList(); - boolean hasPythonNode = executionNodes.stream() - .anyMatch(node -> node.getLanguage() == Language.PYTHON); - RemoteCall.ExecutionGraph executionGraphPb = null; - if (hasPythonNode) { - executionGraphPb = new GraphPbBuilder().buildExecutionGraphPb(executionGraph); - } - List> waits = new ArrayList<>(); - for (ExecutionNode executionNode : executionNodes) { - List executionTasks = executionNode.getExecutionTasks(); - for (ExecutionTask executionTask : executionTasks) { - int taskId = executionTask.getTaskId(); - BaseActorHandle worker = executionTask.getWorker(); - switch (executionNode.getLanguage()) { - case JAVA: - ActorHandle jobWorker = (ActorHandle) worker; - waits.add(jobWorker.call(JobWorker::init, - new WorkerContext(taskId, executionGraph, jobConfig))); - break; - case PYTHON: - byte[] workerContextBytes = buildPythonWorkerContext( - taskId, executionGraphPb, jobConfig); - waits.add(((PyActorHandle)worker).call(new PyActorMethod("init", Object.class), - workerContextBytes)); - break; - default: - throw new UnsupportedOperationException( - "Unsupported language " + executionNode.getLanguage()); - } - } - } - Ray.wait(waits); - } - - private byte[] buildPythonWorkerContext( - int taskId, - RemoteCall.ExecutionGraph executionGraphPb, - Map jobConfig) { - return RemoteCall.WorkerContext.newBuilder() - .setTaskId(taskId) - .putAllConf(jobConfig) - .setGraph(executionGraphPb) - .build() - .toByteArray(); - } - -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssigner.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssigner.java deleted file mode 100644 index f525994f1..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssigner.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.ray.streaming.runtime.schedule; - -import io.ray.streaming.jobgraph.JobGraph; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import java.io.Serializable; - -/** - * Interface of the task assigning strategy. - */ -public interface TaskAssigner extends Serializable { - - /** - * Assign logical plan to physical execution graph. - */ - ExecutionGraph assign(JobGraph jobGraph); - -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssignerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssignerImpl.java deleted file mode 100644 index ca8ad5cdb..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/schedule/TaskAssignerImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -package io.ray.streaming.runtime.schedule; - -import io.ray.api.ActorHandle; -import io.ray.api.BaseActorHandle; -import io.ray.api.PyActorHandle; -import io.ray.api.Ray; -import io.ray.api.function.PyActorClass; -import io.ray.streaming.jobgraph.JobEdge; -import io.ray.streaming.jobgraph.JobGraph; -import io.ray.streaming.jobgraph.JobVertex; -import io.ray.streaming.runtime.core.graph.ExecutionEdge; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; -import io.ray.streaming.runtime.core.graph.ExecutionTask; -import io.ray.streaming.runtime.worker.JobWorker; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TaskAssignerImpl implements TaskAssigner { - private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class); - - /** - * Assign an optimized logical plan to execution graph. - * - * @param jobGraph The logical plan. - * @return The physical execution graph. - */ - @Override - public ExecutionGraph assign(JobGraph jobGraph) { - List jobVertices = jobGraph.getJobVertexList(); - List jobEdges = jobGraph.getJobEdgeList(); - - int taskId = 0; - Map idToExecutionNode = new HashMap<>(); - for (JobVertex jobVertex : jobVertices) { - ExecutionNode executionNode = new ExecutionNode(jobVertex.getVertexId(), - jobVertex.getParallelism(), jobVertex.getConfig()); - executionNode.setNodeType(jobVertex.getVertexType()); - List vertexTasks = new ArrayList<>(); - for (int taskIndex = 0; taskIndex < jobVertex.getParallelism(); taskIndex++) { - vertexTasks.add(new ExecutionTask(taskId, taskIndex, createWorker(jobVertex))); - taskId++; - } - executionNode.setExecutionTasks(vertexTasks); - executionNode.setStreamOperator(jobVertex.getStreamOperator()); - idToExecutionNode.put(executionNode.getNodeId(), executionNode); - } - - for (JobEdge jobEdge : jobEdges) { - int srcNodeId = jobEdge.getSrcVertexId(); - int targetNodeId = jobEdge.getTargetVertexId(); - - ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId, - jobEdge.getPartition()); - idToExecutionNode.get(srcNodeId).addOutputEdge(executionEdge); - idToExecutionNode.get(targetNodeId).addInputEdge(executionEdge); - } - - List executionNodes = new ArrayList<>(idToExecutionNode.values()); - return new ExecutionGraph(executionNodes); - } - - private BaseActorHandle createWorker(JobVertex jobVertex) { - switch (jobVertex.getLanguage()) { - case PYTHON: { - PyActorHandle worker = Ray.createActor( - new PyActorClass("ray.streaming.runtime.worker", "JobWorker")); - LOG.info("Created python worker {}", worker); - return worker; - } - case JAVA: { - ActorHandle worker = Ray.createActor(JobWorker::new); - LOG.info("Created java worker {}", worker); - return worker; - } - default: - throw new UnsupportedOperationException( - "Unsupported language " + jobVertex.getLanguage()); - - } - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java index c5e1ffadf..ec6535431 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java @@ -1,50 +1,68 @@ package io.ray.streaming.runtime.transfer; +import io.ray.streaming.runtime.config.StreamingWorkerConfig; import io.ray.streaming.runtime.generated.Streaming; -import io.ray.streaming.util.Config; -import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ChannelUtils { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class); - static byte[] toNativeConf(Map conf) { + static byte[] toNativeConf(StreamingWorkerConfig workerConfig) { Streaming.StreamingConfig.Builder builder = Streaming.StreamingConfig.newBuilder(); - if (conf.containsKey(Config.STREAMING_JOB_NAME)) { - builder.setJobName(conf.get(Config.STREAMING_JOB_NAME)); + + // job name + String jobName = workerConfig.commonConfig.jobName(); + if (!StringUtils.isEmpty(jobName)) { + builder.setJobName(workerConfig.commonConfig.jobName()); } - if (conf.containsKey(Config.STREAMING_WORKER_NAME)) { - builder.setWorkerName(conf.get(Config.STREAMING_WORKER_NAME)); + + // worker name + String workerName = workerConfig.workerInternalConfig.workerName(); + if (!StringUtils.isEmpty(workerName)) { + builder.setWorkerName(workerName); } - if (conf.containsKey(Config.STREAMING_OP_NAME)) { - builder.setOpName(conf.get(Config.STREAMING_OP_NAME)); + + // operator name + String operatorName = workerConfig.workerInternalConfig.workerOperatorName(); + if (!StringUtils.isEmpty(operatorName)) { + builder.setOpName(operatorName); } - if (conf.containsKey(Config.STREAMING_RING_BUFFER_CAPACITY)) { - builder.setRingBufferCapacity( - Integer.parseInt(conf.get(Config.STREAMING_RING_BUFFER_CAPACITY))); + + // ring buffer capacity + int ringBufferCapacity = workerConfig.transferConfig.ringBufferCapacity(); + if (ringBufferCapacity != -1) { + builder.setRingBufferCapacity(ringBufferCapacity); } - if (conf.containsKey(Config.STREAMING_EMPTY_MESSAGE_INTERVAL)) { - builder.setEmptyMessageInterval( - Integer.parseInt(conf.get(Config.STREAMING_EMPTY_MESSAGE_INTERVAL))); + + // empty message interval + int emptyMsgInterval = workerConfig.transferConfig.emptyMsgInterval(); + if (emptyMsgInterval != -1) { + builder.setEmptyMessageInterval(emptyMsgInterval); } - if (conf.containsKey(Config.FLOW_CONTROL_TYPE)) { - builder.setFlowControlType( - Streaming.FlowControlType.forNumber( - Integer.parseInt(conf.get(Config.FLOW_CONTROL_TYPE)))); + + //flow control type + int flowControlType = workerConfig.transferConfig.flowControlType(); + if (flowControlType != -1) { + builder.setFlowControlType(Streaming.FlowControlType.forNumber(flowControlType)); } - if (conf.containsKey(Config.WRITER_CONSUMED_STEP)) { - builder.setWriterConsumedStep( - Integer.parseInt(conf.get(Config.WRITER_CONSUMED_STEP))); + + // writer consumed step + int writerConsumedStep = workerConfig.transferConfig.writerConsumedStep(); + if (writerConsumedStep != -1) { + builder.setWriterConsumedStep(writerConsumedStep); } - if (conf.containsKey(Config.READER_CONSUMED_STEP)) { - builder.setReaderConsumedStep( - Integer.parseInt(conf.get(Config.READER_CONSUMED_STEP))); + + //reader consumed step + int readerConsumedStep = workerConfig.transferConfig.readerConsumedStep(); + if (readerConsumedStep != -1) { + builder.setReaderConsumedStep(readerConsumedStep); } + Streaming.StreamingConfig streamingConf = builder.build(); LOGGER.info("Streaming native conf {}", streamingConf.toString()); return streamingConf.toByteArray(); } } - diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java index eedc0f9b4..d12e21fd6 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java @@ -2,8 +2,9 @@ package io.ray.streaming.runtime.transfer; import com.google.common.base.Preconditions; import io.ray.api.BaseActorHandle; +import io.ray.streaming.runtime.config.StreamingWorkerConfig; +import io.ray.streaming.runtime.config.types.TransferChannelType; import io.ray.streaming.runtime.util.Platform; -import io.ray.streaming.util.Config; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.LinkedList; @@ -18,14 +19,19 @@ import org.slf4j.LoggerFactory; * from channels of upstream workers */ public class DataReader { - private static final Logger LOGGER = LoggerFactory.getLogger(DataReader.class); + private static final Logger LOG = LoggerFactory.getLogger(DataReader.class); private long nativeReaderPtr; private Queue buf = new LinkedList<>(); + /** + * @param inputChannels input channels ids + * @param fromActors upstream input actors + * @param workerConfig configuration + */ public DataReader(List inputChannels, Map fromActors, - Map conf) { + StreamingWorkerConfig workerConfig) { Preconditions.checkArgument(inputChannels.size() > 0); Preconditions.checkArgument(inputChannels.size() == fromActors.size()); ChannelCreationParametersBuilder initialParameters = @@ -38,15 +44,14 @@ public class DataReader { seqIds[i] = 0; msgIds[i] = 0; } - long timerInterval = Long.parseLong( - conf.getOrDefault(Config.TIMER_INTERVAL_MS, "-1")); - String channelType = conf.get(Config.CHANNEL_TYPE); + long timerInterval = workerConfig.transferConfig.readerTimerIntervalMs(); + TransferChannelType channelType = workerConfig.transferConfig.channelType(); boolean isMock = false; - if (Config.MEMORY_CHANNEL.equals(channelType)) { + if (TransferChannelType.MEMORY_CHANNEL == channelType) { isMock = true; } - boolean isRecreate = Boolean.parseBoolean( - conf.getOrDefault(Config.IS_RECREATE, "false")); + boolean isRecreate = workerConfig.transferConfig.readerIsRecreate(); + this.nativeReaderPtr = createDataReaderNative( initialParameters, inputChannelsBytes, @@ -54,10 +59,11 @@ public class DataReader { msgIds, timerInterval, isRecreate, - ChannelUtils.toNativeConf(conf), + ChannelUtils.toNativeConf(workerConfig), isMock ); - LOGGER.info("create DataReader succeed"); + LOG.info("Create DataReader succeed for worker: {}.", + workerConfig.workerInternalConfig.workerName()); } // params set by getBundleNative: bundle data address + size @@ -148,10 +154,10 @@ public class DataReader { if (nativeReaderPtr == 0) { return; } - LOGGER.info("closing DataReader."); + LOG.info("Closing DataReader."); closeReaderNative(nativeReaderPtr); nativeReaderPtr = 0; - LOGGER.info("closing DataReader done."); + LOG.info("Finish closing DataReader."); } private static native long createDataReaderNative( diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataWriter.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataWriter.java index acd59f2ce..f9d752fd7 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataWriter.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataWriter.java @@ -2,8 +2,9 @@ package io.ray.streaming.runtime.transfer; import com.google.common.base.Preconditions; import io.ray.api.BaseActorHandle; +import io.ray.streaming.runtime.config.StreamingWorkerConfig; +import io.ray.streaming.runtime.config.types.TransferChannelType; import io.ray.streaming.runtime.util.Platform; -import io.ray.streaming.util.Config; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; @@ -17,7 +18,7 @@ import org.slf4j.LoggerFactory; * to downstream workers */ public class DataWriter { - private static final Logger LOGGER = LoggerFactory.getLogger(DataWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(DataWriter.class); private long nativeWriterPtr; private ByteBuffer buffer = ByteBuffer.allocateDirect(0); @@ -30,38 +31,37 @@ public class DataWriter { /** * @param outputChannels output channels ids * @param toActors downstream output actors - * @param conf configuration + * @param workerConfig configuration */ public DataWriter(List outputChannels, Map toActors, - Map conf) { + StreamingWorkerConfig workerConfig) { Preconditions.checkArgument(!outputChannels.isEmpty()); Preconditions.checkArgument(outputChannels.size() == toActors.size()); - ChannelCreationParametersBuilder initParameters = + ChannelCreationParametersBuilder initialParameters = new ChannelCreationParametersBuilder().buildOutputQueueParameters(outputChannels, toActors); byte[][] outputChannelsBytes = outputChannels.stream() .map(ChannelID::idStrToBytes).toArray(byte[][]::new); - long channelSize = Long.parseLong( - conf.getOrDefault(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT)); + long channelSize = workerConfig.transferConfig.channelSize(); long[] msgIds = new long[outputChannels.size()]; for (int i = 0; i < outputChannels.size(); i++) { msgIds[i] = 0; } - String channelType = conf.get(Config.CHANNEL_TYPE); + TransferChannelType channelType = workerConfig.transferConfig.channelType(); boolean isMock = false; - if (Config.MEMORY_CHANNEL.equalsIgnoreCase(channelType)) { + if (TransferChannelType.MEMORY_CHANNEL == channelType) { isMock = true; - LOGGER.info("Using memory channel"); } this.nativeWriterPtr = createWriterNative( - initParameters, + initialParameters, outputChannelsBytes, msgIds, channelSize, - ChannelUtils.toNativeConf(conf), + ChannelUtils.toNativeConf(workerConfig), isMock ); - LOGGER.info("create DataWriter succeed"); + LOG.info("Create DataWriter succeed for worker: {}.", + workerConfig.workerInternalConfig.workerName()); } /** @@ -117,10 +117,10 @@ public class DataWriter { if (nativeWriterPtr == 0) { return; } - LOGGER.info("closing data writer."); + LOG.info("Closing data writer."); closeWriterNative(nativeWriterPtr); nativeWriterPtr = 0; - LOGGER.info("closing data writer done."); + LOG.info("Finish closing data writer."); } private static native long createWriterNative( diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/CommonUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/CommonUtils.java new file mode 100644 index 000000000..24cd4e204 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/CommonUtils.java @@ -0,0 +1,14 @@ +package io.ray.streaming.runtime.util; + +import java.util.Map; + +/** + * Common tools. + */ +public class CommonUtils { + + public static Map strMapToObjectMap(Map srcMap) { + Map destMap = (Map) srcMap; + return destMap; + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java index 2433d18e9..e3b364cea 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java @@ -1,30 +1,35 @@ package io.ray.streaming.runtime.worker; -import io.ray.api.Ray; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; -import io.ray.streaming.runtime.core.graph.ExecutionNode.NodeType; -import io.ray.streaming.runtime.core.graph.ExecutionTask; +import io.ray.streaming.runtime.config.StreamingWorkerConfig; +import io.ray.streaming.runtime.config.types.TransferChannelType; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; import io.ray.streaming.runtime.core.processor.OneInputProcessor; import io.ray.streaming.runtime.core.processor.ProcessBuilder; import io.ray.streaming.runtime.core.processor.SourceProcessor; import io.ray.streaming.runtime.core.processor.StreamProcessor; +import io.ray.streaming.runtime.master.JobMaster; import io.ray.streaming.runtime.transfer.TransferHandler; import io.ray.streaming.runtime.util.EnvUtil; -import io.ray.streaming.runtime.worker.context.WorkerContext; +import io.ray.streaming.runtime.worker.context.JobWorkerContext; import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask; import io.ray.streaming.runtime.worker.tasks.SourceStreamTask; import io.ray.streaming.runtime.worker.tasks.StreamTask; import java.io.Serializable; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The stream job worker, it is a ray actor. + * The streaming worker implementation class, it is ray actor. JobWorker is created by + * {@link JobMaster} through ray api, and JobMaster communicates + * with JobWorker through Ray.call(). + * + *

The JobWorker is responsible for creating tasks and defines the methods of communication + * between workers. */ public class JobWorker implements Serializable { - private static final Logger LOGGER = LoggerFactory.getLogger(JobWorker.class); + + private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class); + // special flag to indicate this actor not ready private static final byte[] NOT_READY_FLAG = new byte[4]; @@ -32,79 +37,102 @@ public class JobWorker implements Serializable { EnvUtil.loadNativeLibraries(); } - private int taskId; - private Map config; - private WorkerContext workerContext; - private ExecutionNode executionNode; - private ExecutionTask executionTask; - private ExecutionGraph executionGraph; - private StreamProcessor streamProcessor; - private NodeType nodeType; + private JobWorkerContext workerContext; + private ExecutionVertex executionVertex; + private StreamingWorkerConfig workerConfig; + private StreamTask task; private TransferHandler transferHandler; - public Boolean init(WorkerContext workerContext) { - this.workerContext = workerContext; - this.taskId = workerContext.getTaskId(); - this.config = workerContext.getConfig(); - this.executionGraph = this.workerContext.getExecutionGraph(); - this.executionTask = executionGraph.getExecutionTaskByTaskId(taskId); - this.executionNode = executionGraph.getExecutionNodeByTaskId(taskId); + public JobWorker() { + LOG.info("Creating job worker succeeded."); + } - this.nodeType = executionNode.getNodeType(); - this.streamProcessor = ProcessBuilder - .buildProcessor(executionNode.getStreamOperator()); - LOGGER.info("Initializing StreamWorker, pid {}, taskId: {}, operator: {}.", - EnvUtil.getJvmPid(), taskId, streamProcessor); + /** + * Initialize JobWorker and data communication pipeline. + */ + public Boolean init(JobWorkerContext workerContext) { + LOG.info("Initiating job worker: {}. Worker context is: {}.", + workerContext.getWorkerName(), workerContext); - if (!Ray.getRuntimeContext().isSingleProcess()) { - transferHandler = new TransferHandler(); + try { + this.workerContext = workerContext; + this.executionVertex = workerContext.getExecutionVertex(); + this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig()); + + //Init transfer + TransferChannelType channelType = workerConfig.transferConfig.channelType(); + if (TransferChannelType.NATIVE_CHANNEL == channelType) { + transferHandler = new TransferHandler(); + } + + // create stream task + task = createStreamTask(); + if (task == null) { + return false; + } + } catch (Exception e) { + LOG.error("Failed to initiate job worker.", e); + return false; } - task = createStreamTask(); - task.start(); + LOG.info("Initiating job worker succeeded: {}.", workerContext.getWorkerName()); return true; } - private StreamTask createStreamTask() { - if (streamProcessor instanceof OneInputProcessor) { - return new OneInputStreamTask(taskId, streamProcessor, this); - } else if (streamProcessor instanceof SourceProcessor) { - return new SourceStreamTask(taskId, streamProcessor, this); - } else { - throw new RuntimeException("Unsupported type: " + streamProcessor); + /** + * Start worker's stream tasks. + * + * @return result + */ + public Boolean start() { + try { + task.start(); + } catch (Exception e) { + LOG.error("Start worker [{}] occur error.", executionVertex.getExecutionVertexName(), e); + return false; } + return true; + } + + /** + * Create tasks based on the processor corresponding of the operator. + */ + private StreamTask createStreamTask() { + StreamTask task = null; + StreamProcessor streamProcessor = ProcessBuilder + .buildProcessor(executionVertex.getStreamOperator()); + LOG.debug("Stream processor created: {}.", streamProcessor); + + try { + if (streamProcessor instanceof SourceProcessor) { + task = new SourceStreamTask(getTaskId(), streamProcessor, this); + } else if (streamProcessor instanceof OneInputProcessor) { + task = new OneInputStreamTask(getTaskId(), streamProcessor, this); + } else { + throw new RuntimeException("Unsupported processor type:" + streamProcessor); + } + } catch (Exception e) { + LOG.info("Failed to create stream task.", e); + return task; + } + LOG.info("Stream task created: {}.", task); + return task; } public int getTaskId() { - return taskId; + return executionVertex.getExecutionVertexId(); } - public Map getConfig() { - return config; + public StreamingWorkerConfig getWorkerConfig() { + return workerConfig; } - public WorkerContext getWorkerContext() { + public JobWorkerContext getWorkerContext() { return workerContext; } - public NodeType getNodeType() { - return nodeType; - } - - public ExecutionNode getExecutionNode() { - return executionNode; - } - - public ExecutionTask getExecutionTask() { - return executionTask; - } - - public ExecutionGraph getExecutionGraph() { - return executionGraph; - } - - public StreamProcessor getStreamProcessor() { - return streamProcessor; + public ExecutionVertex getExecutionVertex() { + return executionVertex; } public StreamTask getTask() { diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java index 27000cbfe..87ece68b6 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java @@ -1,22 +1,21 @@ package io.ray.streaming.runtime.worker.context; import com.google.common.base.MoreObjects; +import com.google.protobuf.ByteString; import io.ray.api.ActorHandle; -import io.ray.api.id.ActorId; +import io.ray.runtime.actor.NativeActorHandle; import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import io.ray.streaming.runtime.generated.RemoteCall; import io.ray.streaming.runtime.master.JobMaster; +import io.ray.streaming.runtime.python.GraphPbBuilder; import java.io.Serializable; +import java.util.Map; /** - * Job worker context. + * Job worker context of java type. */ public class JobWorkerContext implements Serializable { - /** - * Worker actor's id. - */ - private ActorId workerId; - /** * JobMaster actor. */ @@ -28,16 +27,22 @@ public class JobWorkerContext implements Serializable { private ExecutionVertex executionVertex; public JobWorkerContext( - ActorId workerId, ActorHandle master, ExecutionVertex executionVertex) { - this.workerId = workerId; this.master = master; this.executionVertex = executionVertex; } - public ActorId getWorkerId() { - return workerId; + public int getWorkerId() { + return executionVertex.getExecutionVertexId(); + } + + public String getWorkerName() { + return executionVertex.getExecutionVertexName(); + } + + public Map getConfig() { + return executionVertex.getWorkerConfig(); } public ActorHandle getMaster() { @@ -51,9 +56,24 @@ public class JobWorkerContext implements Serializable { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("workerId", workerId) - .add("master", master) + .add("workerId", getWorkerId()) + .add("workerName", getWorkerName()) + .add("config", getConfig()) .toString(); } + public byte[] getPythonWorkerContextBytes() { + // create python worker context + RemoteCall.ExecutionVertexContext executionVertexContext = + new GraphPbBuilder().buildExecutionVertexContext(executionVertex); + + byte[] contextBytes = RemoteCall.PythonJobWorkerContext.newBuilder() + .setMasterActor(ByteString.copyFrom((((NativeActorHandle) (master)).toBytes()))) + .setExecutionVertexContext(executionVertexContext) + .build() + .toByteArray(); + + return contextBytes; + } + } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/RayRuntimeContext.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/StreamingRuntimeContext.java similarity index 90% rename from streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/RayRuntimeContext.java rename to streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/StreamingRuntimeContext.java index 6a12d1832..23b1b924a 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/RayRuntimeContext.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/StreamingRuntimeContext.java @@ -2,7 +2,7 @@ package io.ray.streaming.runtime.worker.context; import com.google.common.base.Preconditions; import io.ray.streaming.api.context.RuntimeContext; -import io.ray.streaming.runtime.core.graph.ExecutionTask; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; import io.ray.streaming.state.backend.AbstractKeyStateBackend; import io.ray.streaming.state.backend.KeyStateBackend; import io.ray.streaming.state.backend.OperatorStateBackend; @@ -18,7 +18,7 @@ import java.util.Map; /** * Use Ray to implement RuntimeContext. */ -public class RayRuntimeContext implements RuntimeContext { +public class StreamingRuntimeContext implements RuntimeContext { /** * Backend for keyed state. This might be empty if we're not on a keyed stream. */ @@ -33,11 +33,11 @@ public class RayRuntimeContext implements RuntimeContext { private Long checkpointId; private Map config; - public RayRuntimeContext(ExecutionTask executionTask, Map config, + public StreamingRuntimeContext(ExecutionVertex executionVertex, Map config, int parallelism) { - this.taskId = executionTask.getTaskId(); + this.taskId = executionVertex.getExecutionVertexId(); this.config = config; - this.taskIndex = executionTask.getTaskIndex(); + this.taskIndex = executionVertex.getExecutionVertexIndex(); this.parallelism = parallelism; } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/WorkerContext.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/WorkerContext.java deleted file mode 100644 index a0178b5dc..000000000 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/WorkerContext.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.ray.streaming.runtime.worker.context; - -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import java.io.Serializable; -import java.util.Map; - -/** - * Encapsulate the context information for worker initialization. - */ -public class WorkerContext implements Serializable { - - private int taskId; - private ExecutionGraph executionGraph; - private Map config; - - public WorkerContext(int taskId, ExecutionGraph executionGraph, Map jobConfig) { - this.taskId = taskId; - this.executionGraph = executionGraph; - this.config = jobConfig; - } - - public int getTaskId() { - return taskId; - } - - public void setTaskId(int taskId) { - this.taskId = taskId; - } - - public ExecutionGraph getExecutionGraph() { - return executionGraph; - } - - public void setExecutionGraph(ExecutionGraph executionGraph) { - this.executionGraph = executionGraph; - } - - public Map getConfig() { - return config; - } -} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/InputStreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/InputStreamTask.java index 8d642aeef..d47815341 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/InputStreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/InputStreamTask.java @@ -1,12 +1,12 @@ package io.ray.streaming.runtime.worker.tasks; +import com.google.common.base.MoreObjects; import io.ray.streaming.runtime.core.processor.Processor; import io.ray.streaming.runtime.serialization.CrossLangSerializer; import io.ray.streaming.runtime.serialization.JavaSerializer; import io.ray.streaming.runtime.serialization.Serializer; import io.ray.streaming.runtime.transfer.Message; import io.ray.streaming.runtime.worker.JobWorker; -import io.ray.streaming.util.Config; public abstract class InputStreamTask extends StreamTask { private volatile boolean running = true; @@ -15,10 +15,9 @@ public abstract class InputStreamTask extends StreamTask { private final io.ray.streaming.runtime.serialization.Serializer javaSerializer; private final io.ray.streaming.runtime.serialization.Serializer crossLangSerializer; - public InputStreamTask(int taskId, Processor processor, JobWorker streamWorker) { - super(taskId, processor, streamWorker); - readTimeoutMillis = Long.parseLong((String) streamWorker.getConfig() - .getOrDefault(Config.READ_TIMEOUT_MS, Config.DEFAULT_READ_TIMEOUT_MS)); + public InputStreamTask(int taskId, Processor processor, JobWorker jobWorker) { + super(taskId, processor, jobWorker); + readTimeoutMillis = jobWorker.getWorkerConfig().transferConfig.readerTimerIntervalMs(); javaSerializer = new JavaSerializer(); crossLangSerializer = new CrossLangSerializer(); } @@ -56,10 +55,9 @@ public abstract class InputStreamTask extends StreamTask { @Override public String toString() { - final StringBuilder sb = new StringBuilder("InputStreamTask{"); - sb.append("taskId=").append(taskId); - sb.append(", processor=").append(processor); - sb.append('}'); - return sb.toString(); + return MoreObjects.toStringHelper(this) + .add("taskId", taskId) + .add("processor", processor) + .toString(); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/OneInputStreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/OneInputStreamTask.java index 5fd384b89..16293f9ae 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/OneInputStreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/OneInputStreamTask.java @@ -3,9 +3,12 @@ package io.ray.streaming.runtime.worker.tasks; import io.ray.streaming.runtime.core.processor.Processor; import io.ray.streaming.runtime.worker.JobWorker; -public class OneInputStreamTask extends InputStreamTask { +/** + * Input stream task with 1 input. Such as: map operator. + */ +public class OneInputStreamTask extends InputStreamTask { - public OneInputStreamTask(int taskId, Processor processor, JobWorker streamWorker) { - super(taskId, processor, streamWorker); + public OneInputStreamTask(int taskId, Processor inputProcessor, JobWorker jobWorker) { + super(taskId, inputProcessor, jobWorker); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/SourceStreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/SourceStreamTask.java index c13fcc092..a23a781c3 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/SourceStreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/SourceStreamTask.java @@ -1,16 +1,25 @@ package io.ray.streaming.runtime.worker.tasks; +import io.ray.streaming.operator.impl.SourceOperator; import io.ray.streaming.runtime.core.processor.Processor; import io.ray.streaming.runtime.core.processor.SourceProcessor; import io.ray.streaming.runtime.worker.JobWorker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SourceStreamTask extends StreamTask { - private static final Logger LOGGER = LoggerFactory.getLogger(SourceStreamTask.class); +public class SourceStreamTask extends StreamTask { - public SourceStreamTask(int taskId, Processor processor, JobWorker worker) { - super(taskId, processor, worker); + private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class); + + private final SourceProcessor sourceProcessor; + + /** + * SourceStreamTask for executing a {@link SourceOperator}. + * It is responsible for running the corresponding source operator. + */ + public SourceStreamTask(int taskId, Processor sourceProcessor, JobWorker jobWorker) { + super(taskId, sourceProcessor, jobWorker); + this.sourceProcessor = (SourceProcessor) processor; } @Override @@ -19,12 +28,15 @@ public class SourceStreamTask extends StreamTask { @Override public void run() { - final SourceProcessor sourceProcessor = (SourceProcessor) this.processor; - sourceProcessor.run(); + LOG.info("Source stream task thread start."); + + while (running) { + sourceProcessor.run(); + } } @Override - protected void cancelTask() throws Exception { + protected void cancelTask() { } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java index 651614970..64cb0f85d 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java @@ -4,24 +4,20 @@ import io.ray.api.BaseActorHandle; import io.ray.api.Ray; import io.ray.streaming.api.collector.Collector; import io.ray.streaming.api.context.RuntimeContext; -import io.ray.streaming.api.partition.Partition; +import io.ray.streaming.runtime.config.worker.WorkerInternalConfig; import io.ray.streaming.runtime.core.collector.OutputCollector; -import io.ray.streaming.runtime.core.graph.ExecutionEdge; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; import io.ray.streaming.runtime.core.processor.Processor; import io.ray.streaming.runtime.transfer.ChannelID; import io.ray.streaming.runtime.transfer.DataReader; import io.ray.streaming.runtime.transfer.DataWriter; import io.ray.streaming.runtime.worker.JobWorker; -import io.ray.streaming.runtime.worker.context.RayRuntimeContext; -import io.ray.streaming.util.Config; - +import io.ray.streaming.runtime.worker.context.StreamingRuntimeContext; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,71 +26,71 @@ public abstract class StreamTask implements Runnable { protected int taskId; protected Processor processor; - protected JobWorker worker; + protected JobWorker jobWorker; protected DataReader reader; - private Map writers; + List collectors = new ArrayList<>(); + + protected volatile boolean running = true; + protected volatile boolean stopped = false; + private Thread thread; - public StreamTask(int taskId, Processor processor, JobWorker worker) { + protected StreamTask(int taskId, Processor processor, JobWorker jobWorker) { this.taskId = taskId; this.processor = processor; - this.worker = worker; + this.jobWorker = jobWorker; prepareTask(); this.thread = new Thread(Ray.wrapRunnable(this), - this.getClass().getName() + "-" + System.currentTimeMillis()); + this.getClass().getName() + "-" + System.currentTimeMillis()); this.thread.setDaemon(true); } + /** + * Build upstream and downstream data transmission channels according to {@link ExecutionVertex}. + */ private void prepareTask() { - Map queueConf = new HashMap<>(); - worker.getConfig().forEach((k, v) -> queueConf.put(k, String.valueOf(v))); - String queueSize = worker.getConfig() - .getOrDefault(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT); - queueConf.put(Config.CHANNEL_SIZE, queueSize); - String channelType = Ray.getRuntimeContext().isSingleProcess() ? - Config.MEMORY_CHANNEL : Config.NATIVE_CHANNEL; - queueConf.put(Config.CHANNEL_TYPE, channelType); + LOG.debug("Preparing stream task."); + ExecutionVertex executionVertex = jobWorker.getExecutionVertex(); - ExecutionGraph executionGraph = worker.getExecutionGraph(); - ExecutionNode executionNode = worker.getExecutionNode(); + // set vertex info into config for native using + jobWorker.getWorkerConfig().workerInternalConfig.setProperty( + WorkerInternalConfig.WORKER_NAME_INTERNAL, executionVertex.getExecutionVertexName()); + jobWorker.getWorkerConfig().workerInternalConfig.setProperty( + WorkerInternalConfig.OP_NAME_INTERNAL, executionVertex.getExecutionJobVertexName()); + + // producer + List outputEdges = executionVertex.getOutputEdges(); + Map outputActors = new HashMap<>(); - // writers - writers = new HashMap<>(); - List outputEdges = executionNode.getOutputEdges(); - List collectors = new ArrayList<>(); for (ExecutionEdge edge : outputEdges) { - Map outputActors = new HashMap<>(); - Map taskId2Worker = executionGraph - .getTaskId2WorkerByNodeId(edge.getTargetNodeId()); - taskId2Worker.forEach((targetTaskId, targetActor) -> { - String queueName = ChannelID.genIdStr(taskId, targetTaskId, executionGraph.getBuildTime()); - outputActors.put(queueName, targetActor); + String queueName = ChannelID.genIdStr( + taskId, + edge.getTargetExecutionVertex().getExecutionVertexId(), + executionVertex.getBuildTime()); + outputActors.put(queueName, edge.getTargetExecutionVertex().getWorkerActor()); + } + + if (!outputActors.isEmpty()) { + List channelIDs = new ArrayList<>(); + outputActors.forEach((vertexId, actorId) -> { + channelIDs.add(vertexId); }); - if (!outputActors.isEmpty()) { - List channelIDs = new ArrayList<>(); - outputActors.forEach((k, v) -> { - channelIDs.add(k); - }); - DataWriter writer = new DataWriter(channelIDs, outputActors, queueConf); - LOG.info("Create DataWriter succeed."); - writers.put(edge, writer); - Partition partition = edge.getPartition(); - collectors.add(new OutputCollector(writer, channelIDs, outputActors.values(), partition)); - } + DataWriter writer = new DataWriter(channelIDs, outputActors, jobWorker.getWorkerConfig()); + collectors.add(new OutputCollector(writer, channelIDs, outputActors.values(), + executionVertex.getOutputEdges().get(0).getPartition())); } // consumer - List inputEdges = executionNode.getInputsEdges(); + List inputEdges = executionVertex.getInputEdges(); Map inputActors = new HashMap<>(); for (ExecutionEdge edge : inputEdges) { - Map taskId2Worker = executionGraph - .getTaskId2WorkerByNodeId(edge.getSrcNodeId()); - taskId2Worker.forEach((srcTaskId, srcActor) -> { - String queueName = ChannelID.genIdStr(srcTaskId, taskId, executionGraph.getBuildTime()); - inputActors.put(queueName, srcActor); - }); + String queueName = ChannelID.genIdStr( + edge.getSourceExecutionVertex().getExecutionVertexId(), + taskId, + executionVertex.getBuildTime()); + inputActors.put(queueName, edge.getSourceExecutionVertex().getWorkerActor()); } if (!inputActors.isEmpty()) { List channelIDs = new ArrayList<>(); @@ -102,31 +98,43 @@ public abstract class StreamTask implements Runnable { channelIDs.add(k); }); LOG.info("Register queue consumer, queues {}.", channelIDs); - reader = new DataReader(channelIDs, inputActors, queueConf); + reader = new DataReader(channelIDs, inputActors, jobWorker.getWorkerConfig()); } - RuntimeContext runtimeContext = new RayRuntimeContext( - worker.getExecutionTask(), worker.getConfig(), executionNode.getParallelism()); + RuntimeContext runtimeContext = new StreamingRuntimeContext(executionVertex, + jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism()); processor.open(collectors, runtimeContext); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - // Make DataReader stop read data when MockQueue destructor gets called to avoid crash - StreamTask.this.cancelTask(); - } catch (Exception e) { - e.printStackTrace(); - } - })); + LOG.debug("Finished preparing stream task."); } + /** + * Task initialization related work. + */ protected abstract void init() throws Exception; + /** + * Stop running tasks. + */ protected abstract void cancelTask() throws Exception; public void start() { + LOG.info("Start stream task: {}-{}", this.getClass().getSimpleName(), taskId); this.thread.start(); - LOG.info("started {}-{}", this.getClass().getSimpleName(), taskId); + } + + /** + * Close running tasks. + */ + public void close() { + this.running = false; + if (thread.isAlive() && !Ray.getRuntimeContext().isSingleProcess()) { + // `Runtime.halt` is used because System.exist can't ensure the process killing. + Runtime.getRuntime().halt(0); + LOG.warn("runtime halt 0"); + System.exit(0); + } + LOG.info("Stream task close success."); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/TwoInputStreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/TwoInputStreamTask.java new file mode 100644 index 000000000..7126105f6 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/TwoInputStreamTask.java @@ -0,0 +1,23 @@ +package io.ray.streaming.runtime.worker.tasks; + +import io.ray.streaming.runtime.core.processor.Processor; +import io.ray.streaming.runtime.core.processor.TwoInputProcessor; +import io.ray.streaming.runtime.worker.JobWorker; + +/** + * Input stream task with 2 inputs. Such as: join operator. + */ +public class TwoInputStreamTask extends InputStreamTask { + + public TwoInputStreamTask( + int taskId, + Processor processor, + JobWorker jobWorker, + String leftStream, + String rightStream) { + super(taskId, processor, jobWorker); + ((TwoInputProcessor)(super.processor)).setLeftStream(leftStream); + ((TwoInputProcessor)(super.processor)).setRightStream(rightStream); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.client.JobClient b/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.client.JobClient new file mode 100644 index 000000000..b592c7310 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.client.JobClient @@ -0,0 +1 @@ +io.ray.streaming.runtime.client.JobClientImpl \ No newline at end of file diff --git a/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.schedule.JobScheduler b/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.schedule.JobScheduler deleted file mode 100644 index f5930c35b..000000000 --- a/streaming/java/streaming-runtime/src/main/resources/META-INF/services/io.ray.streaming.schedule.JobScheduler +++ /dev/null @@ -1 +0,0 @@ -io.ray.streaming.runtime.schedule.JobSchedulerImpl \ No newline at end of file diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java index 882fc5fb4..6577c81a7 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java @@ -35,6 +35,8 @@ public class ExecutionGraphTest extends BaseUnitTest { StreamingConfig streamingConfig = new StreamingConfig(jobConf); GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig)); JobGraph jobGraph = buildJobGraph(); + jobGraph.getJobConfig().put("streaming.task.resource.cpu.limitation.enable", "true"); + ExecutionGraph executionGraph = buildExecutionGraph(graphManager, jobGraph); List executionJobVertices = executionGraph.getExecutionJobVertexList(); @@ -48,24 +50,24 @@ public class ExecutionGraphTest extends BaseUnitTest { executionGraph.getAllExecutionVertices().forEach(vertex -> { Assert.assertNotNull(vertex.getStreamOperator()); - Assert.assertNotNull(vertex.getJobVertexName()); + Assert.assertNotNull(vertex.getExecutionJobVertexName()); Assert.assertNotNull(vertex.getVertexType()); Assert.assertNotNull(vertex.getLanguage()); - Assert.assertEquals(vertex.getVertexName(), - vertex.getJobVertexId() + "-" + vertex.getJobVertexName() + "-" + vertex.getVertexIndex()); + Assert.assertEquals(vertex.getExecutionVertexName(), + vertex.getExecutionJobVertexName() + "-" + vertex.getExecutionVertexIndex()); }); int startIndex = 0; ExecutionJobVertex upStream = executionJobVertices.get(startIndex); ExecutionJobVertex downStream = executionJobVertices.get(startIndex + 1); - Assert.assertEquals(upStream.getOutputEdges().get(0).getTargetVertex(), downStream); + Assert.assertEquals(upStream.getOutputEdges().get(0).getTargetExecutionJobVertex(), downStream); List upStreamVertices = upStream.getExecutionVertices(); List downStreamVertices = downStream.getExecutionVertices(); upStreamVertices.forEach(vertex -> { - Assert.assertEquals(vertex.getResources().get(ResourceType.CPU.name()), 2.0); + Assert.assertEquals(vertex.getResource().get(ResourceType.CPU.name()), 2.0); vertex.getOutputEdges().stream().forEach(upStreamOutPutEdge -> { - Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetVertex())); + Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetExecutionVertex())); }); }); } diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/WordCountTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/WordCountTest.java index 3069d26ee..cd811b0af 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/WordCountTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/WordCountTest.java @@ -21,18 +21,17 @@ import org.testng.annotations.Test; public class WordCountTest extends BaseUnitTest implements Serializable { - private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class); + private static final Logger LOG = LoggerFactory.getLogger(WordCountTest.class); - // TODO(zhenxuanpan): this test only works in single-process mode, because we put - // results in this in-memory map. static Map wordCount = new ConcurrentHashMap<>(); @Test(timeOut = 60000) public void testWordCount() { Ray.shutdown(); + StreamingContext streamingContext = StreamingContext.buildContext(); Map config = new HashMap<>(); - config.put(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL); + config.put(Config.CHANNEL_TYPE, "MEMORY_CHANNEL"); streamingContext.withConfig(config); List text = new ArrayList<>(); text.add("hello world eagle eagle eagle"); @@ -58,7 +57,7 @@ public class WordCountTest extends BaseUnitTest implements Serializable { try { Thread.sleep(1000); } catch (InterruptedException e) { - LOGGER.warn("Got an exception while sleeping.", e); + LOG.warn("Got an exception while sleeping.", e); } } streamingContext.stop(); diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobClientTest.java similarity index 85% rename from streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java rename to streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobClientTest.java index 4f12abe0e..0d5b2766b 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobClientTest.java @@ -2,7 +2,7 @@ package io.ray.streaming.runtime.master.jobscheduler; import org.testng.annotations.Test; -public class JobSchedulerTest { +public class JobClientTest { @Test public void testSchedule() { diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/TaskAssignerImplTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/TaskAssignerImplTest.java deleted file mode 100644 index 2e8978c7a..000000000 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/TaskAssignerImplTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package io.ray.streaming.runtime.schedule; - -import com.google.common.collect.Lists; -import io.ray.api.Ray; -import io.ray.streaming.api.context.StreamingContext; -import io.ray.streaming.api.partition.impl.RoundRobinPartition; -import io.ray.streaming.api.stream.DataStream; -import io.ray.streaming.api.stream.DataStreamSink; -import io.ray.streaming.api.stream.DataStreamSource; -import io.ray.streaming.jobgraph.JobGraph; -import io.ray.streaming.jobgraph.JobGraphBuilder; -import io.ray.streaming.runtime.BaseUnitTest; -import io.ray.streaming.runtime.core.graph.ExecutionEdge; -import io.ray.streaming.runtime.core.graph.ExecutionGraph; -import io.ray.streaming.runtime.core.graph.ExecutionNode; -import io.ray.streaming.runtime.core.graph.ExecutionNode.NodeType; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class TaskAssignerImplTest extends BaseUnitTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignerImplTest.class); - - @Test - public void testTaskAssignImpl() { - Ray.init(); - JobGraph jobGraph = buildDataSyncPlan(); - - TaskAssigner taskAssigner = new TaskAssignerImpl(); - ExecutionGraph executionGraph = taskAssigner.assign(jobGraph); - - List executionNodeList = executionGraph.getExecutionNodeList(); - - Assert.assertEquals(executionNodeList.size(), 2); - ExecutionNode sourceNode = executionNodeList.get(0); - Assert.assertEquals(sourceNode.getNodeType(), NodeType.SOURCE); - Assert.assertEquals(sourceNode.getExecutionTasks().size(), 1); - Assert.assertEquals(sourceNode.getOutputEdges().size(), 1); - - List sourceExecutionEdges = sourceNode.getOutputEdges(); - - Assert.assertEquals(sourceExecutionEdges.size(), 1); - ExecutionEdge source2Sink = sourceExecutionEdges.get(0); - - Assert.assertEquals(source2Sink.getPartition().getClass(), RoundRobinPartition.class); - - ExecutionNode sinkNode = executionNodeList.get(1); - Assert.assertEquals(sinkNode.getNodeType(), NodeType.SINK); - Assert.assertEquals(sinkNode.getExecutionTasks().size(), 1); - Assert.assertEquals(sinkNode.getOutputEdges().size(), 0); - - Ray.shutdown(); - } - - public JobGraph buildDataSyncPlan() { - StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = DataStreamSource.fromCollection(streamingContext, - Lists.newArrayList("a", "b", "c")); - DataStreamSink streamSink = dataStream.sink(LOGGER::info); - JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink)); - - return jobGraphBuilder.build(); - } -} diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java index aefd39458..2498af2ed 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java @@ -37,6 +37,7 @@ import org.testng.annotations.Test; public class StreamingQueueTest extends BaseUnitTest implements Serializable { private static Logger LOGGER = LoggerFactory.getLogger(StreamingQueueTest.class); + static { EnvUtil.loadNativeLibraries(); } @@ -56,10 +57,6 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { LOGGER.warn("Do tear down"); } - @BeforeClass - public void setUp() { - } - @BeforeMethod void beforeMethod() { LOGGER.info("beforeTest"); @@ -79,7 +76,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { System.clearProperty("ray.run-mode"); } - @Test(timeOut = 3000000) + @Test(timeOut = 300000) public void testReaderWriter() { LOGGER.info("StreamingQueueTest.testReaderWriter run-mode: {}", System.getProperty("ray.run-mode")); @@ -120,7 +117,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { readerActor.call(ReaderWorker::init, inputQueueList, writerActor, msgCount); try { Thread.sleep(1000); - } catch (InterruptedException e) { + } catch (Exception e) { e.printStackTrace(); } writerActor.call(WriterWorker::init, outputQueueList, readerActor, msgCount); @@ -160,7 +157,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { Map wordCount = new ConcurrentHashMap<>(); StreamingContext streamingContext = StreamingContext.buildContext(); Map config = new HashMap<>(); - config.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL); + config.put(Config.CHANNEL_TYPE, "NATIVE_CHANNEL"); config.put(Config.CHANNEL_SIZE, "100000"); streamingContext.withConfig(config); List text = new ArrayList<>(); diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/Worker.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/Worker.java index 4571f045c..ea37ca90e 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/Worker.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/streamingqueue/Worker.java @@ -4,6 +4,7 @@ import io.ray.api.BaseActorHandle; import io.ray.api.Ray; import io.ray.api.ActorHandle; import io.ray.runtime.functionmanager.JavaFunctionDescriptor; +import io.ray.streaming.runtime.config.StreamingWorkerConfig; import io.ray.streaming.runtime.transfer.ChannelID; import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder; import io.ray.streaming.runtime.transfer.DataMessage; @@ -52,7 +53,7 @@ class ReaderWorker extends Worker { private String name = null; private List inputQueueList = null; - Map fromActors = new HashMap<>(); + Map inputActors = new HashMap<>(); private DataReader dataReader = null; private long handler = 0; private ActorHandle peerActor = null; @@ -87,19 +88,20 @@ class ReaderWorker extends Worker { LOGGER.info("java.library.path = {}", System.getProperty("java.library.path")); for (String queue : this.inputQueueList) { - fromActors.put(queue, this.peerActor); + inputActors.put(queue, this.peerActor); LOGGER.info("ReaderWorker actorId: {}", this.peerActor.getId()); } Map conf = new HashMap<>(); - conf.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL); + conf.put(Config.CHANNEL_TYPE, "NATIVE_CHANNEL"); conf.put(Config.CHANNEL_SIZE, "100000"); conf.put(Config.STREAMING_JOB_NAME, "integrationTest1"); ChannelCreationParametersBuilder.setJavaWriterFunctionDesc( new JavaFunctionDescriptor(Worker.class.getName(), "onWriterMessage", "([B)V"), new JavaFunctionDescriptor(Worker.class.getName(), "onWriterMessageSync", "([B)[B")); - dataReader = new DataReader(inputQueueList, fromActors, conf); + StreamingWorkerConfig workerConfig = new StreamingWorkerConfig(conf); + dataReader = new DataReader(inputQueueList, inputActors, workerConfig); // Should not GetBundle in RayCall thread Thread readThread = new Thread(Ray.wrapRunnable(new Runnable() { @@ -171,7 +173,7 @@ class WriterWorker extends Worker { private String name = null; private List outputQueueList = null; - Map toActors = new HashMap<>(); + Map outputActors = new HashMap<>(); DataWriter dataWriter = null; ActorHandle peerActor = null; int msgCount = 0; @@ -203,7 +205,7 @@ class WriterWorker extends Worker { LOGGER.info("WriterWorker init:"); for (String queue : this.outputQueueList) { - toActors.put(queue, this.peerActor); + outputActors.put(queue, this.peerActor); LOGGER.info("WriterWorker actorId: {}", this.peerActor.getId()); } @@ -219,13 +221,14 @@ class WriterWorker extends Worker { } Map conf = new HashMap<>(); - conf.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL); + conf.put(Config.CHANNEL_TYPE, "NATIVE_CHANNEL"); conf.put(Config.CHANNEL_SIZE, "100000"); conf.put(Config.STREAMING_JOB_NAME, "integrationTest1"); ChannelCreationParametersBuilder.setJavaReaderFunctionDesc( new JavaFunctionDescriptor(Worker.class.getName(), "onReaderMessage", "([B)V"), new JavaFunctionDescriptor(Worker.class.getName(), "onReaderMessageSync", "([B)[B")); - dataWriter = new DataWriter(this.outputQueueList, this.toActors, conf); + StreamingWorkerConfig workerConfig = new StreamingWorkerConfig(conf); + dataWriter = new DataWriter(outputQueueList, outputActors, workerConfig); Thread writerThread = new Thread(Ray.wrapRunnable(new Runnable() { @Override public void run() { diff --git a/streaming/python/runtime/graph.py b/streaming/python/runtime/graph.py index 721cc358b..b5e738e29 100644 --- a/streaming/python/runtime/graph.py +++ b/streaming/python/runtime/graph.py @@ -2,7 +2,6 @@ import enum import ray import ray.streaming.generated.remote_call_pb2 as remote_call_pb -import ray.streaming.generated.streaming_pb2 as streaming_pb import ray.streaming.operator as operator import ray.streaming.partition as partition from ray.streaming.generated.streaming_pb2 import Language @@ -24,32 +23,10 @@ class NodeType(enum.Enum): SINK = 2 -class ExecutionNode: - def __init__(self, node_pb): - self.node_id = node_pb.node_id - self.node_type = NodeType[streaming_pb.NodeType.Name( - node_pb.node_type)] - self.parallelism = node_pb.parallelism - if node_pb.language == Language.PYTHON: - operator_bytes = node_pb.operator # python operator descriptor - self.stream_operator = operator.load_operator(operator_bytes) - self.execution_tasks = [ - ExecutionTask(task) for task in node_pb.execution_tasks - ] - self.input_edges = [ - ExecutionEdge(edge, node_pb.language) - for edge in node_pb.input_edges - ] - self.output_edges = [ - ExecutionEdge(edge, node_pb.language) - for edge in node_pb.output_edges - ] - - class ExecutionEdge: def __init__(self, edge_pb, language): - self.src_node_id = edge_pb.src_node_id - self.target_node_id = edge_pb.target_node_id + self.source_execution_vertex_id = edge_pb.source_execution_vertex_id + self.target_execution_vertex_id = edge_pb.target_execution_vertex_id partition_bytes = edge_pb.partition # Sink node doesn't have partition function, # so we only deserialize partition_bytes when it's not None or empty @@ -57,46 +34,85 @@ class ExecutionEdge: self.partition = partition.load_partition(partition_bytes) -class ExecutionTask: - def __init__(self, task_pb): - self.task_id = task_pb.task_id - self.task_index = task_pb.task_index - self.worker_actor = ray.actor.ActorHandle.\ - _deserialization_helper(task_pb.worker_actor) +class ExecutionVertex: + def __init__(self, vertex_pb): + self.execution_vertex_id = vertex_pb.execution_vertex_id + self.execution_job_vertex_Id = vertex_pb.execution_job_vertex_Id + self.execution_job_vertex_name = vertex_pb.execution_job_vertex_name + self.execution_vertex_index = vertex_pb.execution_vertex_index + self.parallelism = vertex_pb.parallelism + if vertex_pb.language == Language.PYTHON: + operator_bytes = vertex_pb.operator # python operator descriptor + self.stream_operator = operator.load_operator(operator_bytes) + self.worker_actor = ray.actor.ActorHandle. \ + _deserialization_helper(vertex_pb.worker_actor) + self.container_id = vertex_pb.container_id + self.build_time = vertex_pb.build_time + self.language = vertex_pb.language + self.config = vertex_pb.config + self.resource = vertex_pb.resource -class ExecutionGraph: - def __init__(self, graph_pb: remote_call_pb.ExecutionGraph): - self._graph_pb = graph_pb - self.execution_nodes = [ - ExecutionNode(node) for node in graph_pb.execution_nodes +class ExecutionVertexContext: + def __init__(self, + vertex_context_pb: remote_call_pb.ExecutionVertexContext): + self.execution_vertex = \ + ExecutionVertex(vertex_context_pb.current_execution_vertex) + self.upstream_execution_vertices = [ + ExecutionVertex(vertex) + for vertex in vertex_context_pb.upstream_execution_vertices + ] + self.downstream_execution_vertices = [ + ExecutionVertex(vertex) + for vertex in vertex_context_pb.downstream_execution_vertices + ] + self.input_execution_edges = [ + ExecutionEdge(edge, self.execution_vertex.language) + for edge in vertex_context_pb.input_execution_edges + ] + self.output_execution_edges = [ + ExecutionEdge(edge, self.execution_vertex.language) + for edge in vertex_context_pb.output_execution_edges ] + def get_parallelism(self): + return self.execution_vertex.parallelism + + def get_upstream_parallelism(self): + if self.upstream_execution_vertices: + return self.upstream_execution_vertices[0].parallelism + return 0 + + def get_downstream_parallelism(self): + if self.downstream_execution_vertices: + return self.downstream_execution_vertices[0].parallelism + return 0 + + @property def build_time(self): - return self._graph_pb.build_time + return self.execution_vertex.build_time - def execution_nodes(self): - return self.execution_nodes + @property + def stream_operator(self): + return self.execution_vertex.stream_operator - def get_execution_task_by_task_id(self, task_id): - for execution_node in self.execution_nodes: - for task in execution_node.execution_tasks: - if task.task_id == task_id: - return task - raise Exception("Task %s does not exist!".format(task_id)) + @property + def config(self): + return self.execution_vertex.config - def get_execution_node_by_task_id(self, task_id): - for execution_node in self.execution_nodes: - for task in execution_node.execution_tasks: - if task.task_id == task_id: - return execution_node - raise Exception("Task %s does not exist!".format(task_id)) + def get_task_id(self): + return self.execution_vertex.execution_vertex_id - def get_task_id2_worker_by_node_id(self, node_id): - for execution_node in self.execution_nodes: - if execution_node.node_id == node_id: - task_id2_worker = {} - for task in execution_node.execution_tasks: - task_id2_worker[task.task_id] = task.worker_actor - return task_id2_worker - raise Exception("Node %s does not exist!".format(node_id)) + def get_source_actor_by_vertex_id(self, execution_vertex_id): + for vertex in self.upstream_execution_vertices: + if vertex.execution_vertex_id == execution_vertex_id: + return vertex.worker_actor + raise Exception("ExecutionVertex %s does not exist!" + .format(execution_vertex_id)) + + def get_target_actor_by_vertex_id(self, execution_vertex_id): + for vertex in self.downstream_execution_vertices: + if vertex.execution_vertex_id == execution_vertex_id: + return vertex.worker_actor + raise Exception("ExecutionVertex %s does not exist!" + .format(execution_vertex_id)) diff --git a/streaming/python/runtime/task.py b/streaming/python/runtime/task.py index df878d624..713a29703 100644 --- a/streaming/python/runtime/task.py +++ b/streaming/python/runtime/task.py @@ -20,6 +20,7 @@ class StreamTask(ABC): self.task_id = task_id self.processor = processor self.worker = worker + self.config = worker.config self.reader = None # DataReader self.writers = {} # ExecutionEdge -> DataWriter self.thread = None @@ -35,18 +36,20 @@ class StreamTask(ABC): channel_conf[Config.CHANNEL_TYPE] = self.worker.config \ .get(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) - execution_graph = self.worker.execution_graph - execution_node = self.worker.execution_node + execution_vertex_context = self.worker.execution_vertex_context + build_time = execution_vertex_context.build_time + # writers collectors = [] - for edge in execution_node.output_edges: - output_actors_map = {} - task_id2_worker = execution_graph.get_task_id2_worker_by_node_id( - edge.target_node_id) - for target_task_id, target_actor in task_id2_worker.items(): - channel_name = ChannelID.gen_id(self.task_id, target_task_id, - execution_graph.build_time()) - output_actors_map[channel_name] = target_actor + output_actors_map = {} + for edge in execution_vertex_context.output_execution_edges: + target_task_id = edge.target_execution_vertex_id + target_actor = execution_vertex_context\ + .get_target_actor_by_vertex_id(target_task_id) + channel_name = ChannelID.gen_id(self.task_id, target_task_id, + build_time) + output_actors_map[channel_name] = target_actor + if len(output_actors_map) > 0: channel_ids = list(output_actors_map.keys()) target_actors = list(output_actors_map.values()) @@ -61,13 +64,14 @@ class StreamTask(ABC): # readers input_actor_map = {} - for edge in execution_node.input_edges: - task_id2_worker = execution_graph.get_task_id2_worker_by_node_id( - edge.src_node_id) - for src_task_id, src_actor in task_id2_worker.items(): - channel_name = ChannelID.gen_id(src_task_id, self.task_id, - execution_graph.build_time()) - input_actor_map[channel_name] = src_actor + for edge in execution_vertex_context.input_execution_edges: + source_task_id = edge.source_execution_vertex_id + source_actor = execution_vertex_context\ + .get_source_actor_by_vertex_id(source_task_id) + channel_name = ChannelID.gen_id(source_task_id, self.task_id, + build_time) + input_actor_map[channel_name] = source_actor + if len(input_actor_map) > 0: channel_ids = list(input_actor_map.keys()) from_actors = list(input_actor_map.values()) @@ -85,8 +89,9 @@ class StreamTask(ABC): # TODO(chaokunyang) add task/job config runtime_context = RuntimeContextImpl( - self.worker.execution_task.task_id, - self.worker.execution_task.task_index, execution_node.parallelism) + self.worker.task_id, + execution_vertex_context.execution_vertex.execution_vertex_index, + execution_vertex_context.get_parallelism()) logger.info("open Processor {}".format(self.processor)) self.processor.open(collectors, runtime_context) diff --git a/streaming/python/runtime/worker.py b/streaming/python/runtime/worker.py index 86e88a0f7..0fdb56096 100644 --- a/streaming/python/runtime/worker.py +++ b/streaming/python/runtime/worker.py @@ -5,7 +5,7 @@ import ray.streaming._streaming as _streaming import ray.streaming.generated.remote_call_pb2 as remote_call_pb import ray.streaming.runtime.processor as processor from ray.streaming.config import Config -from ray.streaming.runtime.graph import ExecutionGraph +from ray.streaming.runtime.graph import ExecutionVertexContext from ray.streaming.runtime.task import SourceStreamTask, OneInputStreamTask logger = logging.getLogger(__name__) @@ -21,43 +21,50 @@ class JobWorker(object): def __init__(self): self.worker_context = None - self.task_id = None + self.execution_vertex_context = None self.config = None - self.execution_graph = None - self.execution_task = None - self.execution_node = None - self.stream_processor = None + self.task_id = None self.task = None + self.stream_processor = None self.reader_client = None self.writer_client = None + logger.info("Creating job worker succeeded.") def init(self, worker_context_bytes): - worker_context = remote_call_pb.WorkerContext() + worker_context = remote_call_pb.PythonJobWorkerContext() worker_context.ParseFromString(worker_context_bytes) self.worker_context = worker_context - self.task_id = worker_context.task_id - self.config = worker_context.conf - execution_graph = ExecutionGraph(worker_context.graph) - self.execution_graph = execution_graph - self.execution_task = self.execution_graph. \ - get_execution_task_by_task_id(self.task_id) - self.execution_node = self.execution_graph. \ - get_execution_node_by_task_id(self.task_id) - operator = self.execution_node.stream_operator + + # build vertex context from pb + self.execution_vertex_context = ExecutionVertexContext( + worker_context.execution_vertex_context) + + # use vertex id as task id + self.task_id = self.execution_vertex_context.get_task_id() + + # build and get processor from operator + operator = self.execution_vertex_context.stream_operator self.stream_processor = processor.build_processor(operator) logger.info( - "Initializing JobWorker, task_id: {}, operator: {}.".format( + "Initializing job worker, task_id: {}, operator: {}.".format( self.task_id, self.stream_processor)) + # get config from vertex + self.config = self.execution_vertex_context.config + if self.config.get(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL): self.reader_client = _streaming.ReaderClient() self.writer_client = _streaming.WriterClient() self.task = self.create_stream_task() - self.task.start() - logger.info("JobWorker init succeed") + + logger.info("Job worker init succeeded.") return True + def start(self): + self.task.start() + logger.info("Job worker start succeeded.") + def create_stream_task(self): if isinstance(self.stream_processor, processor.SourceProcessor): return SourceStreamTask(self.task_id, self.stream_processor, self) diff --git a/streaming/python/tests/test_word_count.py b/streaming/python/tests/test_word_count.py index 03d9d7652..288f7acc0 100644 --- a/streaming/python/tests/test_word_count.py +++ b/streaming/python/tests/test_word_count.py @@ -56,5 +56,5 @@ def test_simple_word_count(): if __name__ == "__main__": - # test_word_count() + test_word_count() test_simple_word_count() diff --git a/streaming/src/protobuf/remote_call.proto b/streaming/src/protobuf/remote_call.proto index b331d3928..a55ca942f 100644 --- a/streaming/src/protobuf/remote_call.proto +++ b/streaming/src/protobuf/remote_call.proto @@ -6,54 +6,52 @@ import "streaming/src/protobuf/streaming.proto"; option java_package = "io.ray.streaming.runtime.generated"; -// Streaming execution graph -message ExecutionGraph { - // A parallel operation consisting of multiple execution tasks - message ExecutionNode { - int32 node_id = 1; - int32 parallelism = 2; - NodeType node_type = 3; - Language language = 4; - // serialized operator - bytes operator = 5; - repeated ExecutionTask execution_tasks = 6; - repeated ExecutionEdge input_edges = 7; - repeated ExecutionEdge output_edges = 8; - } - - // execution edge +// Execution vertex info, including it's upstream and downstream +message ExecutionVertexContext { + // An edge between 2 execution vertices message ExecutionEdge { - // upstream execution node id - int32 src_node_id = 1; - // downstream execution node id - int32 target_node_id = 2; - // serialized partition between src/target node + // upstream execution vertex id + int32 source_execution_vertex_id = 1; + // downstream execution vertex id + int32 target_execution_vertex_id = 2; + // serialized partition between source/target vertex bytes partition = 3; } - // a parallel subtask of the execution - message ExecutionTask { - // unique execution task id - int32 task_id = 1; - // an ordered task index range from 0 to parallelism - 1 - int32 task_index = 2; - // serialized actor handle - bytes worker_actor = 3; + message ExecutionVertex { + // unique id of execution vertex + int32 execution_vertex_id = 1; + // unique id of execution job vertex + int32 execution_job_vertex_Id = 2; + // name of execution job vertex, e.g. 1-SourceOperator + string execution_job_vertex_name = 3; + // index of execution vertex + int32 execution_vertex_index = 4; + int32 parallelism = 5; + // serialized operator + bytes operator = 6; + bytes worker_actor = 7; + string container_id = 8; + uint64 build_time = 9; + Language language = 10; + map config = 11; + map resource = 12; } - // graph build time - uint64 build_time = 1; - repeated ExecutionNode execution_nodes = 2; + // vertices + ExecutionVertex current_execution_vertex = 1; + repeated ExecutionVertex upstream_execution_vertices = 2; + repeated ExecutionVertex downstream_execution_vertices = 3; + + // edges + repeated ExecutionEdge input_execution_edges = 4; + repeated ExecutionEdge output_execution_edges = 5; } -// Streaming worker context -message WorkerContext { - // job name - string job_name = 1; - // unique execution task id - int32 task_id = 2; - // job config - map conf = 3; - // execution graph - ExecutionGraph graph = 4; +// Streaming python worker context +message PythonJobWorkerContext { + // serialized master actor handle + bytes master_actor = 1; + // vertex including it's upstream and downstream + ExecutionVertexContext execution_vertex_context = 2; }