[Java] Rename java ObjectRef/ActorHandle (#8799)

This commit is contained in:
chaokunyang
2020-06-09 11:40:43 +08:00
committed by GitHub
parent c1e6813cea
commit 31a4d07bc4
88 changed files with 1551 additions and 1544 deletions
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.core.collector;
import io.ray.api.BaseActor;
import io.ray.api.RayPyActor;
import io.ray.api.BaseActorHandle;
import io.ray.api.PyActorHandle;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
@@ -21,7 +21,7 @@ public class OutputCollector implements Collector<Record> {
private final DataWriter writer;
private final ChannelID[] outputQueues;
private final Collection<BaseActor> targetActors;
private final Collection<BaseActorHandle> targetActors;
private final Language[] targetLanguages;
private final Partition partition;
private final Serializer javaSerializer = new JavaSerializer();
@@ -29,13 +29,13 @@ public class OutputCollector implements Collector<Record> {
public OutputCollector(DataWriter writer,
Collection<String> outputQueueIds,
Collection<BaseActor> targetActors,
Collection<BaseActorHandle> targetActors,
Partition partition) {
this.writer = writer;
this.outputQueues = outputQueueIds.stream().map(ChannelID::from).toArray(ChannelID[]::new);
this.targetActors = targetActors;
this.targetLanguages = targetActors.stream()
.map(actor -> actor instanceof RayPyActor ? Language.PYTHON : Language.JAVA)
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA)
.toArray(Language[]::new);
this.partition = partition;
LOGGER.debug("OutputCollector constructed, outputQueueIds:{}, partition:{}.",
@@ -1,6 +1,6 @@
package io.ray.streaming.runtime.core.graph;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -17,19 +17,19 @@ import java.util.stream.Collectors;
public class ExecutionGraph implements Serializable {
private long buildTime;
private List<ExecutionNode> executionNodeList;
private List<BaseActor> sourceWorkers = new ArrayList<>();
private List<BaseActor> sinkWorkers = new ArrayList<>();
private List<BaseActorHandle> sourceWorkers = new ArrayList<>();
private List<BaseActorHandle> sinkWorkers = new ArrayList<>();
public ExecutionGraph(List<ExecutionNode> executionNodes) {
this.executionNodeList = executionNodes;
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeType() == ExecutionNode.NodeType.SOURCE) {
List<BaseActor> actors = executionNode.getExecutionTasks().stream()
List<BaseActorHandle> actors = executionNode.getExecutionTasks().stream()
.map(ExecutionTask::getWorker).collect(Collectors.toList());
sourceWorkers.addAll(actors);
}
if (executionNode.getNodeType() == ExecutionNode.NodeType.SINK) {
List<BaseActor> actors = executionNode.getExecutionTasks().stream()
List<BaseActorHandle> actors = executionNode.getExecutionTasks().stream()
.map(ExecutionTask::getWorker).collect(Collectors.toList());
sinkWorkers.addAll(actors);
}
@@ -37,11 +37,11 @@ public class ExecutionGraph implements Serializable {
buildTime = System.currentTimeMillis();
}
public List<BaseActor> getSourceWorkers() {
public List<BaseActorHandle> getSourceWorkers() {
return sourceWorkers;
}
public List<BaseActor> getSinkWorkers() {
public List<BaseActorHandle> getSinkWorkers() {
return sinkWorkers;
}
@@ -80,10 +80,10 @@ public class ExecutionGraph implements Serializable {
throw new RuntimeException("Task " + taskId + " does not exist!");
}
public Map<Integer, BaseActor> getTaskId2WorkerByNodeId(int nodeId) {
public Map<Integer, BaseActorHandle> getTaskId2WorkerByNodeId(int nodeId) {
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeId() == nodeId) {
Map<Integer, BaseActor> taskId2Worker = new HashMap<>();
Map<Integer, BaseActorHandle> taskId2Worker = new HashMap<>();
for (ExecutionTask executionTask : executionNode.getExecutionTasks()) {
taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker());
}
@@ -1,6 +1,6 @@
package io.ray.streaming.runtime.core.graph;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import java.io.Serializable;
/**
@@ -11,9 +11,9 @@ import java.io.Serializable;
public class ExecutionTask implements Serializable {
private int taskId;
private int taskIndex;
private BaseActor worker;
private BaseActorHandle worker;
public ExecutionTask(int taskId, int taskIndex, BaseActor worker) {
public ExecutionTask(int taskId, int taskIndex, BaseActorHandle worker) {
this.taskId = taskId;
this.taskIndex = taskIndex;
this.worker = worker;
@@ -35,11 +35,11 @@ public class ExecutionTask implements Serializable {
this.taskIndex = taskIndex;
}
public BaseActor getWorker() {
public BaseActorHandle getWorker() {
return worker;
}
public void setWorker(BaseActor worker) {
public void setWorker(BaseActorHandle worker) {
this.worker = worker;
}
}
@@ -1,6 +1,6 @@
package io.ray.streaming.runtime.core.graph.executiongraph;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.streaming.runtime.worker.JobWorker;
import java.io.Serializable;
import java.util.ArrayList;
@@ -143,7 +143,7 @@ public class ExecutionGraph implements Serializable {
*
* @return actor list
*/
public List<RayActor<JobWorker>> getAllActors() {
public List<ActorHandle<JobWorker>> getAllActors() {
return getActorsFromJobVertices(getExecutionJobVertexList());
}
@@ -152,7 +152,7 @@ public class ExecutionGraph implements Serializable {
*
* @return actor list
*/
public List<RayActor<JobWorker>> getSourceActors() {
public List<ActorHandle<JobWorker>> getSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSourceVertex)
.collect(Collectors.toList());
@@ -165,7 +165,7 @@ public class ExecutionGraph implements Serializable {
*
* @return actor list
*/
public List<RayActor<JobWorker>> getNonSourceActors() {
public List<ActorHandle<JobWorker>> getNonSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(executionJobVertex -> executionJobVertex.isTransformationVertex()
|| executionJobVertex.isSinkVertex())
@@ -179,7 +179,7 @@ public class ExecutionGraph implements Serializable {
*
* @return actor list
*/
public List<RayActor<JobWorker>> getSinkActors() {
public List<ActorHandle<JobWorker>> getSinkActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSinkVertex)
.collect(Collectors.toList());
@@ -193,7 +193,7 @@ public class ExecutionGraph implements Serializable {
* @param executionJobVertices specified job vertices
* @return actor list
*/
public List<RayActor<JobWorker>> getActorsFromJobVertices(
public List<ActorHandle<JobWorker>> getActorsFromJobVertices(
List<ExecutionJobVertex> executionJobVertices) {
return executionJobVertices.stream()
.map(ExecutionJobVertex::getExecutionVertices)
@@ -2,7 +2,7 @@ package io.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.streaming.api.Language;
import io.ray.streaming.jobgraph.JobVertex;
import io.ray.streaming.jobgraph.VertexType;
@@ -77,8 +77,8 @@ public class ExecutionJobVertex {
return executionVertices;
}
public Map<Integer, RayActor<JobWorker>> getExecutionVertexWorkers() {
Map<Integer, RayActor<JobWorker>> executionVertexWorkersMap = new HashMap<>();
public Map<Integer, ActorHandle<JobWorker>> getExecutionVertexWorkers() {
Map<Integer, ActorHandle<JobWorker>> executionVertexWorkersMap = new HashMap<>();
Preconditions.checkArgument(
executionVertices != null && !executionVertices.isEmpty(),
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.api.id.ActorId;
import io.ray.streaming.api.Language;
import io.ray.streaming.jobgraph.VertexType;
@@ -49,7 +49,7 @@ public class ExecutionVertex implements Serializable {
private ExecutionVertexState state = ExecutionVertexState.TO_ADD;
private ContainerID containerId;
private RayActor<JobWorker> workerActor;
private ActorHandle<JobWorker> workerActor;
private List<ExecutionEdge> inputEdges = new ArrayList<>();
private List<ExecutionEdge> outputEdges = new ArrayList<>();
@@ -124,7 +124,7 @@ public class ExecutionVertex implements Serializable {
return state == ExecutionVertexState.TO_DEL;
}
public RayActor<JobWorker> getWorkerActor() {
public ActorHandle<JobWorker> getWorkerActor() {
return workerActor;
}
@@ -132,7 +132,7 @@ public class ExecutionVertex implements Serializable {
return workerActor.getId();
}
public void setWorkerActor(RayActor<JobWorker> workerActor) {
public void setWorkerActor(ActorHandle<JobWorker> workerActor) {
this.workerActor = workerActor;
}
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.master;
import com.google.common.base.Preconditions;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.config.StreamingMasterConfig;
@@ -30,7 +30,7 @@ public class JobMaster {
private GraphManager graphManager;
private StreamingMasterConfig conf;
private RayActor jobMasterActor;
private ActorHandle jobMasterActor;
public JobMaster(Map<String, String> confMap) {
LOG.info("Creating job master with conf: {}.", confMap);
@@ -76,7 +76,7 @@ public class JobMaster {
* @param jobGraph logical plan
* @return submit result
*/
public boolean submitJob(RayActor<JobMaster> jobMasterActor, JobGraph jobGraph) {
public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
this.jobMasterActor = jobMasterActor;
@@ -101,7 +101,7 @@ public class JobMaster {
return true;
}
public RayActor getJobMasterActor() {
public ActorHandle getJobMasterActor() {
return jobMasterActor;
}
@@ -1,6 +1,6 @@
package io.ray.streaming.runtime.master.scheduler;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
@@ -153,7 +153,7 @@ public class JobSchedulerImpl implements JobScheduler {
*/
protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(
ExecutionGraph executionGraph) {
RayActor masterActor = jobMaster.getJobMasterActor();
ActorHandle masterActor = jobMaster.getJobMasterActor();
// build workers' context
Map<ExecutionVertex, JobWorkerContext> needRegistryVertexToContextMap = new HashMap<>();
@@ -166,7 +166,7 @@ public class JobSchedulerImpl implements JobScheduler {
private JobWorkerContext buildJobWorkerContext(
ExecutionVertex executionVertex,
RayActor<JobMaster> masterActor) {
ActorHandle<JobMaster> masterActor) {
// create worker context
JobWorkerContext ctx = new JobWorkerContext(
@@ -1,8 +1,8 @@
package io.ray.streaming.runtime.master.scheduler.controller;
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.RayObject;
import io.ray.api.WaitResult;
import io.ray.api.id.ActorId;
import io.ray.api.options.ActorCreationOptions;
@@ -50,7 +50,7 @@ public class WorkerLifecycleController {
.setMaxRestarts(-1)
.createActorCreationOptions();
RayActor<JobWorker> actor = null;
ActorHandle<JobWorker> actor = null;
// TODO (datayjz): ray create actor
if (null == actor) {
@@ -77,18 +77,18 @@ public class WorkerLifecycleController {
LOG.info("Begin initiating workers: {}.", vertexToContextMap);
long startTime = System.currentTimeMillis();
Map<RayObject<Boolean>, ActorId> rayObjects = new HashMap<>();
Map<ObjectRef<Boolean>, ActorId> rayObjects = new HashMap<>();
vertexToContextMap.entrySet().forEach((entry -> {
ExecutionVertex vertex = entry.getKey();
rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()),
vertex.getWorkerActorId());
}));
List<RayObject<Boolean>> rayObjectList = new ArrayList<>(rayObjects.keySet());
List<ObjectRef<Boolean>> objectRefList = new ArrayList<>(rayObjects.keySet());
LOG.info("Waiting for workers' initialization.");
WaitResult<Boolean> result = Ray.wait(rayObjectList, rayObjectList.size(), timeout);
if (result.getReady().size() != rayObjectList.size()) {
WaitResult<Boolean> result = Ray.wait(objectRefList, objectRefList.size(), timeout);
if (result.getReady().size() != objectRefList.size()) {
LOG.error("Initializing workers timeout[{} ms].", timeout);
return false;
}
@@ -108,18 +108,18 @@ public class WorkerLifecycleController {
public boolean startWorkers(ExecutionGraph executionGraph, int timeout) {
LOG.info("Begin starting workers.");
long startTime = System.currentTimeMillis();
List<RayObject<Boolean>> rayObjects = new ArrayList<>();
List<ObjectRef<Boolean>> objectRefs = new ArrayList<>();
// start source actors 1st
executionGraph.getSourceActors()
.forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
.forEach(actor -> objectRefs.add(RemoteCallWorker.startWorker(actor)));
// then start non-source actors
executionGraph.getNonSourceActors()
.forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
.forEach(actor -> objectRefs.add(RemoteCallWorker.startWorker(actor)));
WaitResult<Boolean> result = Ray.wait(rayObjects, rayObjects.size(), timeout);
if (result.getReady().size() != rayObjects.size()) {
WaitResult<Boolean> result = Ray.wait(objectRefs, objectRefs.size(), timeout);
if (result.getReady().size() != objectRefs.size()) {
LOG.error("Starting workers timeout[{} ms].", timeout);
return false;
}
@@ -139,7 +139,7 @@ public class WorkerLifecycleController {
}
private boolean destroyWorker(ExecutionVertex executionVertex) {
RayActor rayActor = executionVertex.getWorkerActor();
ActorHandle rayActor = executionVertex.getWorkerActor();
LOG.info("Begin destroying worker[vertex={}, actor={}].",
executionVertex.getVertexName(), rayActor.getId());
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.python;
import com.google.protobuf.ByteString;
import io.ray.runtime.actor.NativeRayActor;
import io.ray.runtime.actor.NativeActorHandle;
import io.ray.streaming.api.function.Function;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.operator.Operator;
@@ -43,7 +43,7 @@ public class GraphPbBuilder {
for (ExecutionTask task : node.getExecutionTasks()) {
RemoteCall.ExecutionGraph.ExecutionTask.Builder taskBuilder =
RemoteCall.ExecutionGraph.ExecutionTask.newBuilder();
byte[] serializedActorHandle = ((NativeRayActor) task.getWorker()).toBytes();
byte[] serializedActorHandle = ((NativeActorHandle) task.getWorker()).toBytes();
taskBuilder
.setTaskId(task.getTaskId())
.setTaskIndex(task.getTaskIndex())
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.rpc;
import io.ray.api.RayActor;
import io.ray.api.RayObject;
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
@@ -23,9 +23,9 @@ public class RemoteCallWorker {
* @param ctx JobWorker's context
* @return init result
*/
public static RayObject<Boolean> initWorker(RayActor actor, JobWorkerContext ctx) {
public static ObjectRef<Boolean> initWorker(ActorHandle actor, JobWorkerContext ctx) {
LOG.info("Call worker to init, actor: {}, context: {}.", actor.getId(), ctx);
RayObject<Boolean> result = null;
ObjectRef<Boolean> result = null;
// TODO (datayjz): ray call worker to initiate
@@ -39,9 +39,9 @@ public class RemoteCallWorker {
* @param actor target JobWorker actor
* @return start result
*/
public static RayObject<Boolean> startWorker(RayActor actor) {
public static ObjectRef<Boolean> startWorker(ActorHandle actor) {
LOG.info("Call worker to start, actor: {}.", actor.getId());
RayObject<Boolean> result = null;
ObjectRef<Boolean> result = null;
// TODO (datayjz): ray call worker to start
@@ -55,7 +55,7 @@ public class RemoteCallWorker {
* @param actor target JobWorker actor
* @return destroy result
*/
public static Boolean shutdownWithoutReconstruction(RayActor actor) {
public static Boolean shutdownWithoutReconstruction(ActorHandle actor) {
LOG.info("Call worker to shutdown without reconstruction, actor is {}.",
actor.getId());
Boolean result = false;
@@ -1,10 +1,10 @@
package io.ray.streaming.runtime.schedule;
import io.ray.api.BaseActor;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.RayObject;
import io.ray.api.RayPyActor;
import io.ray.api.function.PyActorMethod;
import io.ray.streaming.api.Language;
import io.ray.streaming.jobgraph.JobGraph;
@@ -50,22 +50,22 @@ public class JobSchedulerImpl implements JobScheduler {
if (hasPythonNode) {
executionGraphPb = new GraphPbBuilder().buildExecutionGraphPb(executionGraph);
}
List<RayObject<Object>> waits = new ArrayList<>();
List<ObjectRef<Object>> waits = new ArrayList<>();
for (ExecutionNode executionNode : executionNodes) {
List<ExecutionTask> executionTasks = executionNode.getExecutionTasks();
for (ExecutionTask executionTask : executionTasks) {
int taskId = executionTask.getTaskId();
BaseActor worker = executionTask.getWorker();
BaseActorHandle worker = executionTask.getWorker();
switch (executionNode.getLanguage()) {
case JAVA:
RayActor<JobWorker> jobWorker = (RayActor<JobWorker>) worker;
ActorHandle<JobWorker> jobWorker = (ActorHandle<JobWorker>) worker;
waits.add(jobWorker.call(JobWorker::init,
new WorkerContext(taskId, executionGraph, jobConfig)));
break;
case PYTHON:
byte[] workerContextBytes = buildPythonWorkerContext(
taskId, executionGraphPb, jobConfig);
waits.add(((RayPyActor)worker).call(new PyActorMethod("init", Object.class),
waits.add(((PyActorHandle)worker).call(new PyActorMethod("init", Object.class),
workerContextBytes));
break;
default:
@@ -1,9 +1,9 @@
package io.ray.streaming.runtime.schedule;
import io.ray.api.BaseActor;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.RayPyActor;
import io.ray.api.function.PyActorClass;
import io.ray.streaming.jobgraph.JobEdge;
import io.ray.streaming.jobgraph.JobGraph;
@@ -64,16 +64,16 @@ public class TaskAssignerImpl implements TaskAssigner {
return new ExecutionGraph(executionNodes);
}
private BaseActor createWorker(JobVertex jobVertex) {
private BaseActorHandle createWorker(JobVertex jobVertex) {
switch (jobVertex.getLanguage()) {
case PYTHON: {
RayPyActor worker = Ray.createActor(
PyActorHandle worker = Ray.createActor(
new PyActorClass("ray.streaming.runtime.worker", "JobWorker"));
LOG.info("Created python worker {}", worker);
return worker;
}
case JAVA: {
RayActor<JobWorker> worker = Ray.createActor(JobWorker::new);
ActorHandle<JobWorker> worker = Ray.createActor(JobWorker::new);
LOG.info("Created java worker {}", worker);
return worker;
}
@@ -1,16 +1,15 @@
package io.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.runtime.actor.LocalModeRayActor;
import io.ray.runtime.actor.NativeRayJavaActor;
import io.ray.runtime.actor.NativeRayPyActor;
import io.ray.runtime.actor.LocalModeActorHandle;
import io.ray.runtime.actor.NativeJavaActorHandle;
import io.ray.runtime.actor.NativePyActorHandle;
import io.ray.runtime.functionmanager.FunctionDescriptor;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.functionmanager.PyFunctionDescriptor;
import io.ray.streaming.runtime.worker.JobWorker;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -107,34 +106,35 @@ public class ChannelCreationParametersBuilder {
javaWriterSyncFuncDesc = syncFunc;
}
public ChannelCreationParametersBuilder buildInputQueueParameters(List<String> queues,
Map<String, BaseActor> actors) {
public ChannelCreationParametersBuilder buildInputQueueParameters(
List<String> queues,
Map<String, BaseActorHandle> actors) {
return buildParameters(queues, actors, javaWriterAsyncFuncDesc, javaWriterSyncFuncDesc,
pyWriterAsyncFunctionDesc, pyWriterSyncFunctionDesc);
}
public ChannelCreationParametersBuilder buildOutputQueueParameters(List<String> queues,
Map<String, BaseActor> actors) {
Map<String, BaseActorHandle> actors) {
return buildParameters(queues, actors, javaReaderAsyncFuncDesc, javaReaderSyncFuncDesc,
pyReaderAsyncFunctionDesc, pyReaderSyncFunctionDesc);
}
private ChannelCreationParametersBuilder buildParameters(List<String> queues,
Map<String, BaseActor> actors,
Map<String, BaseActorHandle> actors,
JavaFunctionDescriptor javaAsyncFunctionDesc, JavaFunctionDescriptor javaSyncFunctionDesc,
PyFunctionDescriptor pyAsyncFunctionDesc, PyFunctionDescriptor pySyncFunctionDesc
) {
parameters = new ArrayList<>(queues.size());
for (String queue : queues) {
Parameter parameter = new Parameter();
BaseActor actor = actors.get(queue);
BaseActorHandle actor = actors.get(queue);
Preconditions.checkArgument(actor != null);
parameter.setActorId(actor.getId());
/// LocalModeRayActor used in single-process mode.
if (actor instanceof NativeRayJavaActor || actor instanceof LocalModeRayActor) {
if (actor instanceof NativeJavaActorHandle || actor instanceof LocalModeActorHandle) {
parameter.setAsyncFunctionDescriptor(javaAsyncFunctionDesc);
parameter.setSyncFunctionDescriptor(javaSyncFunctionDesc);
} else if (actor instanceof NativeRayPyActor) {
} else if (actor instanceof NativePyActorHandle) {
parameter.setAsyncFunctionDescriptor(pyAsyncFunctionDesc);
parameter.setSyncFunctionDescriptor(pySyncFunctionDesc);
} else {
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.util.Platform;
import io.ray.streaming.util.Config;
import java.nio.ByteBuffer;
@@ -24,7 +24,7 @@ public class DataReader {
private Queue<DataMessage> buf = new LinkedList<>();
public DataReader(List<String> inputChannels,
Map<String, BaseActor> fromActors,
Map<String, BaseActorHandle> fromActors,
Map<String, String> conf) {
Preconditions.checkArgument(inputChannels.size() > 0);
Preconditions.checkArgument(inputChannels.size() == fromActors.size());
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.transfer;
import com.google.common.base.Preconditions;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.util.Platform;
import io.ray.streaming.util.Config;
import java.nio.ByteBuffer;
@@ -33,7 +33,7 @@ public class DataWriter {
* @param conf configuration
*/
public DataWriter(List<String> outputChannels,
Map<String, BaseActor> toActors,
Map<String, BaseActorHandle> toActors,
Map<String, String> conf) {
Preconditions.checkArgument(!outputChannels.isEmpty());
Preconditions.checkArgument(outputChannels.size() == toActors.size());
@@ -1,7 +1,7 @@
package io.ray.streaming.runtime.worker.context;
import com.google.common.base.MoreObjects;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.api.id.ActorId;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.master.JobMaster;
@@ -20,7 +20,7 @@ public class JobWorkerContext implements Serializable {
/**
* JobMaster actor.
*/
private RayActor<JobMaster> master;
private ActorHandle<JobMaster> master;
/**
* Worker's vertex info.
@@ -29,7 +29,7 @@ public class JobWorkerContext implements Serializable {
public JobWorkerContext(
ActorId workerId,
RayActor<JobMaster> master,
ActorHandle<JobMaster> master,
ExecutionVertex executionVertex) {
this.workerId = workerId;
this.master = master;
@@ -40,7 +40,7 @@ public class JobWorkerContext implements Serializable {
return workerId;
}
public RayActor<JobMaster> getMaster() {
public ActorHandle<JobMaster> getMaster() {
return master;
}
@@ -1,6 +1,6 @@
package io.ray.streaming.runtime.worker.tasks;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
@@ -64,8 +64,8 @@ public abstract class StreamTask implements Runnable {
List<ExecutionEdge> outputEdges = executionNode.getOutputEdges();
List<Collector> collectors = new ArrayList<>();
for (ExecutionEdge edge : outputEdges) {
Map<String, BaseActor> outputActors = new HashMap<>();
Map<Integer, BaseActor> taskId2Worker = executionGraph
Map<String, BaseActorHandle> outputActors = new HashMap<>();
Map<Integer, BaseActorHandle> taskId2Worker = executionGraph
.getTaskId2WorkerByNodeId(edge.getTargetNodeId());
taskId2Worker.forEach((targetTaskId, targetActor) -> {
String queueName = ChannelID.genIdStr(taskId, targetTaskId, executionGraph.getBuildTime());
@@ -87,9 +87,9 @@ public abstract class StreamTask implements Runnable {
// consumer
List<ExecutionEdge> inputEdges = executionNode.getInputsEdges();
Map<String, BaseActor> inputActors = new HashMap<>();
Map<String, BaseActorHandle> inputActors = new HashMap<>();
for (ExecutionEdge edge : inputEdges) {
Map<Integer, BaseActor> taskId2Worker = executionGraph
Map<Integer, BaseActorHandle> taskId2Worker = executionGraph
.getTaskId2WorkerByNodeId(edge.getSrcNodeId());
taskId2Worker.forEach((srcTaskId, srcActor) -> {
String queueName = ChannelID.genIdStr(srcTaskId, taskId, executionGraph.getBuildTime());
@@ -2,7 +2,7 @@ package io.ray.streaming.runtime.streamingqueue;
import com.google.common.collect.ImmutableMap;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.ActorCreationOptions.Builder;
import io.ray.runtime.config.RayConfig;
@@ -94,9 +94,9 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable {
ActorCreationOptions.Builder builder = new Builder();
RayActor<WriterWorker> writerActor = Ray.createActor(WriterWorker::new, "writer",
ActorHandle<WriterWorker> writerActor = Ray.createActor(WriterWorker::new, "writer",
builder.createActorCreationOptions());
RayActor<ReaderWorker> readerActor = Ray.createActor(ReaderWorker::new, "reader",
ActorHandle<ReaderWorker> readerActor = Ray.createActor(ReaderWorker::new, "reader",
builder.createActorCreationOptions());
LOGGER.info("call getName on writerActor: {}",
@@ -1,8 +1,8 @@
package io.ray.streaming.runtime.streamingqueue;
import io.ray.api.BaseActor;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.ActorHandle;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.streaming.runtime.transfer.ChannelID;
import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder;
@@ -52,10 +52,10 @@ class ReaderWorker extends Worker {
private String name = null;
private List<String> inputQueueList = null;
Map<String, BaseActor> fromActors = new HashMap<>();
Map<String, BaseActorHandle> fromActors = new HashMap<>();
private DataReader dataReader = null;
private long handler = 0;
private RayActor<WriterWorker> peerActor = null;
private ActorHandle<WriterWorker> peerActor = null;
private int msgCount = 0;
private int totalMsg = 0;
@@ -77,7 +77,7 @@ class ReaderWorker extends Worker {
return "testRayCall";
}
public boolean init(List<String> inputQueueList, RayActor<WriterWorker> peer, int msgCount) {
public boolean init(List<String> inputQueueList, ActorHandle<WriterWorker> peer, int msgCount) {
this.inputQueueList = inputQueueList;
this.peerActor = peer;
@@ -171,9 +171,9 @@ class WriterWorker extends Worker {
private String name = null;
private List<String> outputQueueList = null;
Map<String, BaseActor> toActors = new HashMap<>();
Map<String, BaseActorHandle> toActors = new HashMap<>();
DataWriter dataWriter = null;
RayActor<ReaderWorker> peerActor = null;
ActorHandle<ReaderWorker> peerActor = null;
int msgCount = 0;
public WriterWorker(String name) {
@@ -188,13 +188,13 @@ class WriterWorker extends Worker {
return name;
}
public String testCallReader(RayActor<ReaderWorker> readerActor) {
public String testCallReader(ActorHandle<ReaderWorker> readerActor) {
String name = readerActor.call(ReaderWorker::getName).get();
LOGGER.info("testCallReader: {}", name);
return name;
}
public boolean init(List<String> outputQueueList, RayActor<ReaderWorker> peer, int msgCount) {
public boolean init(List<String> outputQueueList, ActorHandle<ReaderWorker> peer, int msgCount) {
this.outputQueueList = outputQueueList;
this.peerActor = peer;