From 0204dff1e9cefc4fabd1edbb14610c230466a9bf Mon Sep 17 00:00:00 2001 From: Tianyi Chen Date: Wed, 22 Apr 2020 14:43:56 +0800 Subject: [PATCH] [streaming]Add master and scheduler. (#8044) --- streaming/java/BUILD.bazel | 4 - .../ray/streaming/runtime/config/Config.java | 5 +- .../runtime/config/StreamingMasterConfig.java | 3 + .../config/master/SchedulerConfig.java | 33 +++ .../graph/executiongraph/ExecutionGraph.java | 66 ++++++ .../graph/executiongraph/ExecutionVertex.java | 6 + .../streaming/runtime/master/JobMaster.java | 124 +++++++++++ .../master/scheduler/JobScheduler.java | 16 ++ .../master/scheduler/JobSchedulerImpl.java | 201 ++++++++++++++++++ .../controller/WorkerLifecycleController.java | 184 ++++++++++++++++ .../runtime/rpc/RemoteCallWorker.java | 69 ++++++ .../ray/streaming/runtime/util/RayUtils.java | 27 +++ .../worker/context/JobWorkerContext.java | 59 +++++ .../{ => core}/graph/ExecutionGraphTest.java | 2 +- .../runtime/master/JobMasterTest.java | 20 ++ .../master/jobscheduler/JobSchedulerTest.java | 12 ++ .../resourcemanager/ResourceManagerTest.java | 42 +--- .../strategy/PipelineFirstStrategyTest.java | 16 +- 18 files changed, 837 insertions(+), 52 deletions(-) create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java rename streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/{ => core}/graph/ExecutionGraphTest.java (99%) create mode 100644 streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java create mode 100644 streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java rename streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/{ => master}/resourcemanager/ResourceManagerTest.java (59%) rename streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/{schedule => master/resourcemanager}/strategy/PipelineFirstStrategyTest.java (86%) diff --git a/streaming/java/BUILD.bazel b/streaming/java/BUILD.bazel index f993aaa3f..98aaacf6f 100644 --- a/streaming/java/BUILD.bazel +++ b/streaming/java/BUILD.bazel @@ -113,7 +113,6 @@ define_java_module( "@ray_streaming_maven//:org_testng_testng", "@ray_streaming_maven//:org_mockito_mockito_all", "@ray_streaming_maven//:org_powermock_powermock_api_mockito", - "@ray_streaming_maven//:org_powermock_powermock_core", "@ray_streaming_maven//:org_powermock_powermock_module_testng", "@ray_streaming_maven//:org_projectlombok_lombok", ], @@ -127,10 +126,7 @@ define_java_module( "@ray_streaming_maven//:com_google_protobuf_protobuf_java", "@ray_streaming_maven//:de_ruedigermoeller_fst", "@ray_streaming_maven//:org_aeonbits_owner_owner", - "@ray_streaming_maven//:org_mockito_mockito_all", "@ray_streaming_maven//:org_msgpack_msgpack_core", - "@ray_streaming_maven//:org_powermock_powermock_api_mockito", - "@ray_streaming_maven//:org_powermock_powermock_module_testng", "@ray_streaming_maven//:org_projectlombok_lombok", "@ray_streaming_maven//:org_slf4j_slf4j_api", "@ray_streaming_maven//:org_slf4j_slf4j_log4j12", diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/Config.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/Config.java index 7b0a1dc12..06ea3c754 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/Config.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/Config.java @@ -1,11 +1,10 @@ package io.ray.streaming.runtime.config; -import java.io.Serializable; -import javax.accessibility.Accessible; +import org.aeonbits.owner.Accessible; /** * Basic config interface. */ -public interface Config extends org.aeonbits.owner.Config, Accessible, Serializable { +public interface Config extends org.aeonbits.owner.Config, Accessible { } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingMasterConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingMasterConfig.java index 36e5a0c38..38f91d80c 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingMasterConfig.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/StreamingMasterConfig.java @@ -1,6 +1,7 @@ package io.ray.streaming.runtime.config; import io.ray.streaming.runtime.config.master.ResourceConfig; +import io.ray.streaming.runtime.config.master.SchedulerConfig; import java.util.Map; import org.aeonbits.owner.ConfigFactory; import org.slf4j.Logger; @@ -14,9 +15,11 @@ public class StreamingMasterConfig extends StreamingGlobalConfig { private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class); public ResourceConfig resourceConfig; + public SchedulerConfig schedulerConfig; public StreamingMasterConfig(final Map conf) { super(conf); this.resourceConfig = ConfigFactory.create(ResourceConfig.class, conf); + this.schedulerConfig = ConfigFactory.create(SchedulerConfig.class, conf); } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java new file mode 100644 index 000000000..a486f5cc8 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java @@ -0,0 +1,33 @@ +package io.ray.streaming.runtime.config.master; + +import io.ray.streaming.runtime.config.Config; + +/** + * Configuration for job scheduler. + */ +public interface SchedulerConfig extends Config { + + String WORKER_INITIATION_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.initiation.timeout.ms"; + String WORKER_STARTING_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.starting.timeout.ms"; + + /** + * The timeout ms of worker initiation. + * Default is: 10000ms(10s). + * + * @return timeout ms + */ + @Key(WORKER_INITIATION_WAIT_TIMEOUT_MS) + @DefaultValue(value = "10000") + int workerInitiationWaitTimeoutMs(); + + /** + * The timeout ms of worker starting. + * Default is: 10000ms(10s). + * + * @return timeout ms + */ + @Key(WORKER_STARTING_WAIT_TIMEOUT_MS) + @DefaultValue(value = "10000") + int workerStartingWaitTimeoutMs(); + +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java index 9d1f1bb3e..652934d81 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java @@ -1,5 +1,7 @@ package io.ray.streaming.runtime.core.graph.executiongraph; +import io.ray.api.RayActor; +import io.ray.streaming.runtime.worker.JobWorker; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -136,4 +138,68 @@ public class ExecutionGraph implements Serializable { throw new RuntimeException("Vertex " + vertexId + " does not exist!"); } + /** + * Get all actors by graph. + * + * @return actor list + */ + public List> getAllActors() { + return getActorsFromJobVertices(getExecutionJobVertexList()); + } + + /** + * Get source actors by graph. + * + * @return actor list + */ + public List> getSourceActors() { + List executionJobVertices = getExecutionJobVertexList().stream() + .filter(ExecutionJobVertex::isSourceVertex) + .collect(Collectors.toList()); + + return getActorsFromJobVertices(executionJobVertices); + } + + /** + * Get transformation and sink actors by graph. + * + * @return actor list + */ + public List> getNonSourceActors() { + List executionJobVertices = getExecutionJobVertexList().stream() + .filter(executionJobVertex -> executionJobVertex.isTransformationVertex() + || executionJobVertex.isSinkVertex()) + .collect(Collectors.toList()); + + return getActorsFromJobVertices(executionJobVertices); + } + + /** + * Get sink actors by graph. + * + * @return actor list + */ + public List> getSinkActors() { + List executionJobVertices = getExecutionJobVertexList().stream() + .filter(ExecutionJobVertex::isSinkVertex) + .collect(Collectors.toList()); + + return getActorsFromJobVertices(executionJobVertices); + } + + /** + * Get actors according to job vertices. + * + * @param executionJobVertices specified job vertices + * @return actor list + */ + public List> getActorsFromJobVertices( + List executionJobVertices) { + return executionJobVertices.stream() + .map(ExecutionJobVertex::getExecutionVertices) + .flatMap(Collection::stream) + .map(ExecutionVertex::getWorkerActor) + .collect(Collectors.toList()); + } + } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java index 1c7768972..21616399b 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Physical vertex, correspond to {@link ExecutionJobVertex}. @@ -190,6 +191,11 @@ public class ExecutionVertex implements Serializable { return false; } + @Override + public int hashCode() { + return Objects.hash(id, outputEdges); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java new file mode 100644 index 000000000..7d3924493 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java @@ -0,0 +1,124 @@ +package io.ray.streaming.runtime.master; + +import com.google.common.base.Preconditions; +import io.ray.api.RayActor; +import io.ray.streaming.jobgraph.JobGraph; +import io.ray.streaming.runtime.config.StreamingConfig; +import io.ray.streaming.runtime.config.StreamingMasterConfig; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import io.ray.streaming.runtime.master.graphmanager.GraphManager; +import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl; +import io.ray.streaming.runtime.master.resourcemanager.ResourceManager; +import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl; +import io.ray.streaming.runtime.master.scheduler.JobSchedulerImpl; +import io.ray.streaming.runtime.worker.JobWorker; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JobMaster is the core controller in streaming job as a ray actor. It is responsible for all the + * controls facing the {@link JobWorker}. + */ +public class JobMaster { + + private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class); + + private JobRuntimeContext runtimeContext; + private ResourceManager resourceManager; + private JobSchedulerImpl scheduler; + private GraphManager graphManager; + private StreamingMasterConfig conf; + + private RayActor jobMasterActor; + + public JobMaster(Map confMap) { + LOG.info("Creating job master with conf: {}.", confMap); + + StreamingConfig streamingConfig = new StreamingConfig(confMap); + this.conf = streamingConfig.masterConfig; + + // init runtime context + runtimeContext = new JobRuntimeContext(streamingConfig); + + LOG.info("Finished creating job master."); + } + + /** + * Init JobMaster. To initiate or recover other components(like metrics and extra coordinators). + * + * @return init result + */ + public Boolean init() { + LOG.info("Initializing job master."); + + if (this.runtimeContext.getExecutionGraph() == null) { + LOG.error("Init job master failed. Job graphs is null."); + return false; + } + + ExecutionGraph executionGraph = graphManager.getExecutionGraph(); + Preconditions.checkArgument(executionGraph != null, "no execution graph"); + + LOG.info("Finished initializing job master."); + return true; + } + + /** + * Submit job to run: + *
    + *
  1. Using GraphManager to build physical plan according to the logical plan.
  2. + *
  3. Using ResourceManager to manage and allocate the resources.
  4. + *
  5. Using JobScheduler to schedule the job to run.
  6. + *
+ * + * @param jobMasterActor JobMaster actor + * @param jobGraph logical plan + * @return submit result + */ + public boolean submitJob(RayActor jobMasterActor, JobGraph jobGraph) { + LOG.info("Begin submitting job using logical plan: {}.", jobGraph); + + this.jobMasterActor = jobMasterActor; + + // init manager + graphManager = new GraphManagerImpl(runtimeContext); + resourceManager = new ResourceManagerImpl(runtimeContext); + + // build and set graph into runtime context + ExecutionGraph executionGraph = graphManager.buildExecutionGraph(jobGraph); + runtimeContext.setJobGraph(jobGraph); + runtimeContext.setExecutionGraph(executionGraph); + + // init scheduler + try { + scheduler = new JobSchedulerImpl(this); + scheduler.scheduleJob(graphManager.getExecutionGraph()); + } catch (Exception e) { + LOG.error("Failed to submit job.", e); + return false; + } + return true; + } + + public RayActor getJobMasterActor() { + return jobMasterActor; + } + + public JobRuntimeContext getRuntimeContext() { + return runtimeContext; + } + + public ResourceManager getResourceManager() { + return resourceManager; + } + + public GraphManager getGraphManager() { + return graphManager; + } + + public StreamingMasterConfig getConf() { + return conf; + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java new file mode 100644 index 000000000..919de1776 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java @@ -0,0 +1,16 @@ +package io.ray.streaming.runtime.master.scheduler; + +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; + +/** + * Job scheduler is used to do the scheduling in JobMaster. + */ +public interface JobScheduler { + + /** + * Schedule streaming job using the physical plan. + * @param executionGraph physical plan + * @return scheduling result + */ + boolean scheduleJob(ExecutionGraph executionGraph); +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java new file mode 100644 index 000000000..519f58979 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java @@ -0,0 +1,201 @@ +package io.ray.streaming.runtime.master.scheduler; + +import io.ray.api.RayActor; +import io.ray.streaming.runtime.config.StreamingConfig; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import io.ray.streaming.runtime.core.resource.Container; +import io.ray.streaming.runtime.master.JobMaster; +import io.ray.streaming.runtime.master.graphmanager.GraphManager; +import io.ray.streaming.runtime.master.resourcemanager.ResourceManager; +import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder; +import io.ray.streaming.runtime.master.scheduler.controller.WorkerLifecycleController; +import io.ray.streaming.runtime.worker.context.JobWorkerContext; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job scheduler implementation. + */ +public class JobSchedulerImpl implements JobScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class); + + private StreamingConfig jobConf; + + private final JobMaster jobMaster; + private final ResourceManager resourceManager; + private final GraphManager graphManager; + private final WorkerLifecycleController workerLifecycleController; + + public JobSchedulerImpl(JobMaster jobMaster) { + this.jobMaster = jobMaster; + this.graphManager = jobMaster.getGraphManager(); + this.resourceManager = jobMaster.getResourceManager(); + this.workerLifecycleController = new WorkerLifecycleController(); + this.jobConf = jobMaster.getRuntimeContext().getConf(); + + LOG.info("Scheduler initiated."); + } + + @Override + public boolean scheduleJob(ExecutionGraph executionGraph) { + LOG.info("Begin scheduling. Job: {}.", executionGraph.getJobName()); + + // Allocate resource then create workers + prepareResourceAndCreateWorker(executionGraph); + + // init worker context and start to run + initAndStart(executionGraph); + + return true; + } + + /** + * Allocating job worker resource then create job worker actor + * + * @param executionGraph + */ + protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) { + List containers = resourceManager.getRegisteredContainers(); + + // Assign resource for execution vertices + resourceManager.assignResource(containers, executionGraph); + + LOG.info("Allocating map is: {}.", ViewBuilder.buildResourceAssignmentView(containers)); + + // Start all new added workers + createWorkers(executionGraph); + } + + /** + * Init JobMaster and JobWorkers then start JobWorkers. + * + * @param executionGraph physical plan + */ + private void initAndStart(ExecutionGraph executionGraph) { + // generate vertex - context map + Map vertexToContextMap = buildWorkersContext(executionGraph); + + // init workers + initWorkers(vertexToContextMap); + + // init master + initMaster(); + + // start workers + startWorkers(executionGraph); + } + + /** + * Create JobWorker actors according to the physical plan. + * + * @param executionGraph physical plan + * @return actor creation result + */ + public boolean createWorkers(ExecutionGraph executionGraph) { + LOG.info("Begin creating workers."); + long startTs = System.currentTimeMillis(); + + // create JobWorker actors + boolean createResult = workerLifecycleController + .createWorkers(executionGraph.getAllAddedExecutionVertices()); + + if (createResult) { + LOG.info("Finished creating workers. Cost {} ms.", System.currentTimeMillis() - startTs); + return true; + } else { + LOG.error("Failed to create workers. Cost {} ms.", System.currentTimeMillis() - startTs); + return false; + } + } + + /** + * Init JobWorkers according to the vertex and context infos. + * + * @param vertexToContextMap vertex - context map + */ + protected boolean initWorkers(Map vertexToContextMap) { + boolean result; + try { + result = workerLifecycleController.initWorkers(vertexToContextMap, + jobConf.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs()); + } catch (Exception e) { + LOG.error("Failed to initiate workers.", e); + return false; + } + return result; + } + + /** + * Start JobWorkers according to the physical plan. + */ + public boolean startWorkers(ExecutionGraph executionGraph) { + boolean result; + try { + result = workerLifecycleController.startWorkers( + executionGraph, jobConf.masterConfig.schedulerConfig.workerStartingWaitTimeoutMs()); + } catch (Exception e) { + LOG.error("Failed to start workers.", e); + return false; + } + return result; + } + + /** + * Build workers context. + * + * @param executionGraph execution graph + * @return vertex to worker context map + */ + protected Map buildWorkersContext( + ExecutionGraph executionGraph) { + RayActor masterActor = jobMaster.getJobMasterActor(); + + // build workers' context + Map needRegistryVertexToContextMap = new HashMap<>(); + executionGraph.getAllExecutionVertices().forEach(vertex -> { + JobWorkerContext ctx = buildJobWorkerContext(vertex, masterActor); + needRegistryVertexToContextMap.put(vertex, ctx); + }); + return needRegistryVertexToContextMap; + } + + private JobWorkerContext buildJobWorkerContext( + ExecutionVertex executionVertex, + RayActor masterActor) { + + // create worker context + JobWorkerContext ctx = new JobWorkerContext( + executionVertex.getWorkerActorId(), + masterActor, + executionVertex + ); + + return ctx; + } + + /** + * Destroy JobWorkers according to the vertex infos. + * + * @param executionVertices specified vertices + */ + public boolean destroyWorkers(List executionVertices) { + boolean result; + try { + result = workerLifecycleController.destroyWorkers(executionVertices); + } catch (Exception e) { + LOG.error("Failed to destroy workers.", e); + return false; + } + return result; + } + + private void initMaster() { + jobMaster.init(); + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java new file mode 100644 index 000000000..39aff6c36 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java @@ -0,0 +1,184 @@ +package io.ray.streaming.runtime.master.scheduler.controller; + +import io.ray.api.Ray; +import io.ray.api.RayActor; +import io.ray.api.RayObject; +import io.ray.api.WaitResult; +import io.ray.api.id.ActorId; +import io.ray.api.options.ActorCreationOptions; +import io.ray.streaming.api.Language; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import io.ray.streaming.runtime.rpc.RemoteCallWorker; +import io.ray.streaming.runtime.worker.JobWorker; +import io.ray.streaming.runtime.worker.context.JobWorkerContext; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Worker lifecycle controller is used to control JobWorker's creation, initiation and so on. + */ +public class WorkerLifecycleController { + + private static final Logger LOG = LoggerFactory.getLogger(WorkerLifecycleController.class); + + public boolean createWorkers(List executionVertices) { + return asyncBatchExecute(this::createWorker, executionVertices); + } + + /** + * Create JobWorker actor according to the execution vertex. + * + * @param executionVertex target execution vertex + * @return creation result + */ + private boolean createWorker(ExecutionVertex executionVertex) { + LOG.info("Start to create worker actor for vertex: {} with resource: {}.", + executionVertex.getVertexName(), executionVertex.getResources()); + + Language language = executionVertex.getLanguage(); + + ActorCreationOptions options = new ActorCreationOptions.Builder() + .setResources(executionVertex.getResources()) + .setMaxReconstructions(ActorCreationOptions.INFINITE_RECONSTRUCTION) + .createActorCreationOptions(); + + RayActor actor = null; + // TODO (datayjz): ray create actor + + if (null == actor) { + LOG.error("Create worker actor failed."); + return false; + } + + executionVertex.setWorkerActor(actor); + + LOG.info("Worker actor created, actor: {}, vertex: {}.", + executionVertex.getWorkerActorId(), executionVertex.getVertexName()); + return true; + } + + /** + * Using context to init JobWorker. + * + * @param vertexToContextMap target JobWorker actor + * @param timeout timeout for waiting, unit: ms + * @return initiation result + */ + public boolean initWorkers( + Map vertexToContextMap, int timeout) { + LOG.info("Begin initiating workers: {}.", vertexToContextMap); + long startTime = System.currentTimeMillis(); + + Map, ActorId> rayObjects = new HashMap<>(); + vertexToContextMap.entrySet().forEach((entry -> { + ExecutionVertex vertex = entry.getKey(); + rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()), + vertex.getWorkerActorId()); + })); + + List> rayObjectList = new ArrayList<>(rayObjects.keySet()); + + LOG.info("Waiting for workers' initialization."); + WaitResult result = Ray.wait(rayObjectList, rayObjectList.size(), timeout); + if (result.getReady().size() != rayObjectList.size()) { + LOG.error("Initializing workers timeout[{} ms].", timeout); + return false; + } + + LOG.info("Finished waiting workers' initialization."); + LOG.info("Workers initialized. Cost {} ms.", System.currentTimeMillis() - startTime); + return true; + } + + /** + * Start JobWorkers to run task. + * + * @param executionGraph physical plan + * @param timeout timeout for waiting, unit: ms + * @return starting result + */ + public boolean startWorkers(ExecutionGraph executionGraph, int timeout) { + LOG.info("Begin starting workers."); + long startTime = System.currentTimeMillis(); + List> rayObjects = new ArrayList<>(); + + // start source actors 1st + executionGraph.getSourceActors() + .forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor))); + + // then start non-source actors + executionGraph.getNonSourceActors() + .forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor))); + + WaitResult result = Ray.wait(rayObjects, rayObjects.size(), timeout); + if (result.getReady().size() != rayObjects.size()) { + LOG.error("Starting workers timeout[{} ms].", timeout); + return false; + } + + LOG.info("Workers started. Cost {} ms.", System.currentTimeMillis() - startTime); + return true; + } + + /** + * Stop and destroy JobWorkers' actor. + * + * @param executionVertices target vertices + * @return destroy result + */ + public boolean destroyWorkers(List executionVertices) { + return asyncBatchExecute(this::destroyWorker, executionVertices); + } + + private boolean destroyWorker(ExecutionVertex executionVertex) { + RayActor rayActor = executionVertex.getWorkerActor(); + LOG.info("Begin destroying worker[vertex={}, actor={}].", + executionVertex.getVertexName(), rayActor.getId()); + + boolean destroyResult = RemoteCallWorker.shutdownWithoutReconstruction(rayActor); + + if (!destroyResult) { + LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.", + executionVertex.getVertexName(), rayActor); + return false; + } + + LOG.info("Worker destroyed, actor: {}.", rayActor); + return true; + } + + /** + * Async batch execute function, for some cases that could not use Ray.wait + * + * @param operation the function to be executed + */ + private boolean asyncBatchExecute( + Function operation, + List executionVertices) { + final Object asyncContext = Ray.getAsyncContext(); + + List> futureResults = executionVertices.stream() + .map(vertex -> CompletableFuture.supplyAsync(() -> { + Ray.setAsyncContext(asyncContext); + return operation.apply(vertex); + })).collect(Collectors.toList()); + + List succeeded = futureResults.stream().map(CompletableFuture::join) + .collect(Collectors.toList()); + + if (succeeded.stream().anyMatch(x -> !x)) { + LOG.error("Not all futures return true, check ResourceManager'log the detail."); + return false; + } + return true; + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java new file mode 100644 index 000000000..de02ad057 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java @@ -0,0 +1,69 @@ +package io.ray.streaming.runtime.rpc; + +import io.ray.api.RayActor; +import io.ray.api.RayObject; +import io.ray.streaming.runtime.master.JobMaster; +import io.ray.streaming.runtime.worker.JobWorker; +import io.ray.streaming.runtime.worker.context.JobWorkerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ray call worker. + * It takes the communication job from {@link JobMaster} to {@link JobWorker}. + */ +public class RemoteCallWorker { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteCallWorker.class); + + /** + * Call JobWorker actor to init. + * + * @param actor target JobWorker actor + * @param ctx JobWorker's context + * @return init result + */ + public static RayObject initWorker(RayActor actor, JobWorkerContext ctx) { + LOG.info("Call worker to init, actor: {}, context: {}.", actor.getId(), ctx); + RayObject result = null; + + // TODO (datayjz): ray call worker to initiate + + LOG.info("Finished calling worker to init."); + return result; + } + + /** + * Call JobWorker actor to start. + * + * @param actor target JobWorker actor + * @return start result + */ + public static RayObject startWorker(RayActor actor) { + LOG.info("Call worker to start, actor: {}.", actor.getId()); + RayObject result = null; + + // TODO (datayjz): ray call worker to start + + LOG.info("Finished calling worker to start."); + return result; + } + + /** + * Call JobWorker actor to destroy without reconstruction. + * + * @param actor target JobWorker actor + * @return destroy result + */ + public static Boolean shutdownWithoutReconstruction(RayActor actor) { + LOG.info("Call worker to shutdown without reconstruction, actor is {}.", + actor.getId()); + Boolean result = false; + + // TODO (datayjz): ray call worker to destroy + + LOG.info("Finished calling wk shutdownWithoutReconstruction, result is {}.", result); + return result; + } + +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java index 5f688eabb..9f1c3ec1e 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java @@ -3,6 +3,8 @@ package io.ray.streaming.runtime.util; import io.ray.api.Ray; import io.ray.api.id.UniqueId; import io.ray.api.runtimecontext.NodeInfo; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -18,6 +20,10 @@ public class RayUtils { * @return node info list */ public static List getAllNodeInfo() { + if (Ray.getRuntimeContext().isSingleProcess()) { + // only for single process(for unit test) + return mockContainerResources(); + } return Ray.getRuntimeContext().getAllNodeInfo(); } @@ -31,4 +37,25 @@ public class RayUtils { .filter(nodeInfo -> nodeInfo.isAlive) .collect(Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo)); } + + private static List mockContainerResources() { + List nodeInfos = new LinkedList<>(); + + for (int i = 1; i <= 5; i++) { + Map resources = new HashMap<>(); + resources.put("CPU", (double) i); + resources.put("MEM", 16.0); + + byte[] nodeIdBytes = new byte[UniqueId.LENGTH]; + for (int byteIndex = 0; byteIndex < UniqueId.LENGTH; ++byteIndex) { + nodeIdBytes[byteIndex] = String.valueOf(i).getBytes()[0]; + } + NodeInfo nodeInfo = new NodeInfo(new UniqueId(nodeIdBytes), + "localhost" + i, "localhost" + i, + true, resources); + nodeInfos.add(nodeInfo); + } + return nodeInfos; + } + } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java new file mode 100644 index 000000000..c046a9b53 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/context/JobWorkerContext.java @@ -0,0 +1,59 @@ +package io.ray.streaming.runtime.worker.context; + +import com.google.common.base.MoreObjects; +import io.ray.api.RayActor; +import io.ray.api.id.ActorId; +import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import io.ray.streaming.runtime.master.JobMaster; +import java.io.Serializable; + +/** + * Job worker context. + */ +public class JobWorkerContext implements Serializable { + + /** + * Worker actor's id. + */ + private ActorId workerId; + + /** + * JobMaster actor. + */ + private RayActor master; + + /** + * Worker's vertex info. + */ + private ExecutionVertex executionVertex; + + public JobWorkerContext( + ActorId workerId, + RayActor master, + ExecutionVertex executionVertex) { + this.workerId = workerId; + this.master = master; + this.executionVertex = executionVertex; + } + + public ActorId getWorkerId() { + return workerId; + } + + public RayActor getMaster() { + return master; + } + + public ExecutionVertex getExecutionVertex() { + return executionVertex; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("workerId", workerId) + .add("master", master) + .toString(); + } + +} diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/graph/ExecutionGraphTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java similarity index 99% rename from streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/graph/ExecutionGraphTest.java rename to streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java index c046bb177..920bc1f74 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/graph/ExecutionGraphTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/core/graph/ExecutionGraphTest.java @@ -1,4 +1,4 @@ -package io.ray.streaming.runtime.graph; +package io.ray.streaming.runtime.core.graph; import com.google.common.collect.Lists; import io.ray.streaming.api.context.StreamingContext; diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java new file mode 100644 index 000000000..53f3cc4d1 --- /dev/null +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/JobMasterTest.java @@ -0,0 +1,20 @@ +package io.ray.streaming.runtime.master; + +import java.util.HashMap; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class JobMasterTest { + + @Test + public void testCreation() { + JobMaster jobMaster = new JobMaster(new HashMap<>()); + Assert.assertNotNull(jobMaster.getRuntimeContext()); + Assert.assertNotNull(jobMaster.getConf()); + Assert.assertNull(jobMaster.getGraphManager()); + Assert.assertNull(jobMaster.getResourceManager()); + Assert.assertNull(jobMaster.getJobMasterActor()); + Assert.assertFalse(jobMaster.init()); + } + +} diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java new file mode 100644 index 000000000..4f12abe0e --- /dev/null +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/jobscheduler/JobSchedulerTest.java @@ -0,0 +1,12 @@ +package io.ray.streaming.runtime.master.jobscheduler; + +import org.testng.annotations.Test; + +public class JobSchedulerTest { + + @Test + public void testSchedule() { + // TODO (tianyi): need JobWorker Part to do this. + } + +} diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManagerTest.java similarity index 59% rename from streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java rename to streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManagerTest.java index c751c413f..579b1266a 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManagerTest.java @@ -1,60 +1,34 @@ -package io.ray.streaming.runtime.resourcemanager; +package io.ray.streaming.runtime.master.resourcemanager; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.mockito.MockitoAnnotations; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; import io.ray.api.Ray; import io.ray.api.id.UniqueId; import io.ray.api.runtimecontext.NodeInfo; +import io.ray.streaming.runtime.BaseUnitTest; import io.ray.streaming.runtime.config.StreamingConfig; import io.ray.streaming.runtime.config.global.CommonConfig; -import io.ray.streaming.runtime.master.resourcemanager.ResourceManager; -import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl; import io.ray.streaming.runtime.core.resource.Container; import io.ray.streaming.runtime.master.JobRuntimeContext; -import io.ray.streaming.runtime.util.Mockitools; import io.ray.streaming.runtime.util.RayUtils; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.IObjectFactory; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.ObjectFactory; import org.testng.annotations.Test; -@PrepareForTest(RayUtils.class) -@PowerMockIgnore({"org.slf4j.*", "javax.xml.*"}) -public class ResourceManagerTest { +public class ResourceManagerTest extends BaseUnitTest { private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerTest.class); private Object rayAsyncContext; - @ObjectFactory - public IObjectFactory getObjectFactory() { - return new org.powermock.modules.testng.PowerMockObjectFactory(); - } - - @org.testng.annotations.BeforeClass - public void setUp() { - LOG.warn("Do set up"); - MockitoAnnotations.initMocks(this); - } - - @org.testng.annotations.AfterClass - public void tearDown() { - LOG.warn("Do tear down"); - } - @BeforeMethod - public void mockGscApi() { + public void init() { // ray init Ray.init(); rayAsyncContext = Ray.getAsyncContext(); - Mockitools.mockGscApi(); } @Test @@ -63,7 +37,7 @@ public class ResourceManagerTest { Assert.assertEquals(nodeInfoMap.size(), 5); } - @Test + @Test(dependsOnMethods = "testGcsMockedApi") public void testApi() { Ray.setAsyncContext(rayAsyncContext); diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/strategy/PipelineFirstStrategyTest.java similarity index 86% rename from streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java rename to streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/strategy/PipelineFirstStrategyTest.java index 40f3cc769..e9eb3b1d7 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/master/resourcemanager/strategy/PipelineFirstStrategyTest.java @@ -1,4 +1,4 @@ -package io.ray.streaming.runtime.schedule.strategy; +package io.ray.streaming.runtime.master.resourcemanager.strategy; import io.ray.api.id.UniqueId; import io.ray.streaming.jobgraph.JobGraph; @@ -8,12 +8,11 @@ import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType; import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; import io.ray.streaming.runtime.core.resource.Container; import io.ray.streaming.runtime.core.resource.ResourceType; -import io.ray.streaming.runtime.graph.ExecutionGraphTest; +import io.ray.streaming.runtime.core.graph.ExecutionGraphTest; import io.ray.streaming.runtime.master.JobRuntimeContext; import io.ray.streaming.runtime.master.graphmanager.GraphManager; import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl; import io.ray.streaming.runtime.master.resourcemanager.ResourceAssignmentView; -import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy; import io.ray.streaming.runtime.master.resourcemanager.strategy.impl.PipelineFirstStrategy; import java.util.ArrayList; import java.util.HashMap; @@ -58,14 +57,11 @@ public class PipelineFirstStrategyTest extends BaseUnitTest { } @Test - public void testStrategyName() { - Assert - .assertEquals(ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName(), strategy.getName()); - } - - @Test - public void testAssignResource() { + public void testResourceAssignment() { strategy = new PipelineFirstStrategy(); + Assert.assertEquals( + ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName(), strategy.getName()); + Map jobConf = new HashMap<>(); StreamingConfig streamingConfig = new StreamingConfig(jobConf); GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig));