diff --git a/java/api/src/main/java/org/ray/api/BaseActor.java b/java/api/src/main/java/org/ray/api/BaseActor.java new file mode 100644 index 000000000..e175388d9 --- /dev/null +++ b/java/api/src/main/java/org/ray/api/BaseActor.java @@ -0,0 +1,26 @@ +package org.ray.api; + +import org.ray.api.id.ActorId; + +/** + * A handle to an actor.

+ * + * A handle can be used to invoke a remote actor method. + */ +public interface BaseActor { + + /** + * @return The id of this actor. + */ + ActorId getId(); + + /** + * Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to + * fail and the actor to exit in the same way as if it crashed. + * + * @param noReconstruction If set to true, the killed actor will not be reconstructed anymore. + */ + default void kill(boolean noReconstruction) { + Ray.internal().killActor(this, noReconstruction); + } +} diff --git a/java/api/src/main/java/org/ray/api/RayActor.java b/java/api/src/main/java/org/ray/api/RayActor.java index 712423900..d56d442cd 100644 --- a/java/api/src/main/java/org/ray/api/RayActor.java +++ b/java/api/src/main/java/org/ray/api/RayActor.java @@ -1,9 +1,7 @@ package org.ray.api; -import org.ray.api.id.ActorId; - /** - * A handle to an actor.

+ * A handle to a Java actor.

* * A handle can be used to invoke a remote actor method, with the {@code "call"} method. For * example: @@ -14,7 +12,7 @@ import org.ray.api.id.ActorId; * } * } * // Create an actor, and get a handle. - * RayActor myActor = Ray.createActor(RayActor::new); + * RayActor myActor = Ray.createActor(MyActor::new); * // Call the `echo` method remotely. * RayObject result = myActor.call(MyActor::echo, 1); * // Get the result of the remote `echo` method. @@ -26,20 +24,6 @@ import org.ray.api.id.ActorId; * * @param The type of the concrete actor class. */ -public interface RayActor extends ActorCall { +public interface RayActor extends BaseActor, ActorCall { - /** - * @return The id of this actor. - */ - ActorId getId(); - - /** - * Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to - * fail and the actor to exit in the same way as if it crashed. - * - * @param noReconstruction If set to true, the killed actor will not be reconstructed anymore. - */ - default void kill(boolean noReconstruction) { - Ray.internal().killActor(this, noReconstruction); - } } diff --git a/java/api/src/main/java/org/ray/api/RayPyActor.java b/java/api/src/main/java/org/ray/api/RayPyActor.java index cde6c8df7..3821c11f7 100644 --- a/java/api/src/main/java/org/ray/api/RayPyActor.java +++ b/java/api/src/main/java/org/ray/api/RayPyActor.java @@ -3,7 +3,7 @@ package org.ray.api; /** * Handle of a Python actor. */ -public interface RayPyActor extends RayActor, PyActorCall { +public interface RayPyActor extends BaseActor, PyActorCall { /** * @return Module name of the Python actor class. diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 7854821ab..29006e535 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -2,6 +2,7 @@ package org.ray.api.runtime; import java.util.List; import java.util.concurrent.Callable; +import org.ray.api.BaseActor; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; @@ -82,7 +83,7 @@ public interface RayRuntime { * @param actor The actor to be killed. * @param noReconstruction If set to true, the killed actor will not be reconstructed anymore. */ - void killActor(RayActor actor, boolean noReconstruction); + void killActor(BaseActor actor, boolean noReconstruction); /** * Invoke a remote function. diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 1fd8372e8..d8d85977b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -5,6 +5,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.concurrent.Callable; +import org.ray.api.BaseActor; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; @@ -176,7 +177,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { } } - private RayObject callActorFunction(RayActor rayActor, + private RayObject callActorFunction(BaseActor rayActor, FunctionDescriptor functionDescriptor, Object[] args, int numReturns) { List functionArgs = ArgumentsBuilder .wrap(args, functionDescriptor.getLanguage()); @@ -190,14 +191,14 @@ public abstract class AbstractRayRuntime implements RayRuntime { } } - private RayActor createActorImpl(FunctionDescriptor functionDescriptor, + private BaseActor createActorImpl(FunctionDescriptor functionDescriptor, Object[] args, ActorCreationOptions options) { List functionArgs = ArgumentsBuilder .wrap(args, functionDescriptor.getLanguage()); if (functionDescriptor.getLanguage() != Language.JAVA && options != null) { Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions)); } - RayActor actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options); + BaseActor actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options); return actor; } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 0edc285bf..8f9d74e5b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -1,7 +1,7 @@ package org.ray.runtime; import java.util.concurrent.atomic.AtomicInteger; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; @@ -49,7 +49,7 @@ public class RayDevRuntime extends AbstractRayRuntime { } @Override - public void killActor(RayActor actor, boolean noReconstruction) { + public void killActor(BaseActor actor, boolean noReconstruction) { throw new UnsupportedOperationException(); } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java index 176201232..535c0a6b3 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java @@ -3,6 +3,7 @@ package org.ray.runtime; import com.google.common.base.Preconditions; import java.util.List; import java.util.concurrent.Callable; +import org.ray.api.BaseActor; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; @@ -139,7 +140,7 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { } @Override - public void killActor(RayActor actor, boolean noReconstruction) { + public void killActor(BaseActor actor, boolean noReconstruction) { getCurrentRuntime().killActor(actor, noReconstruction); } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 5f69f6a4d..6448b824e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -6,7 +6,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; @@ -135,7 +135,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } @Override - public void killActor(RayActor actor, boolean noReconstruction) { + public void killActor(BaseActor actor, boolean noReconstruction) { nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes(), noReconstruction); } diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java index 42dd224de..6b4d2586f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java @@ -10,7 +10,7 @@ import org.ray.api.id.ActorId; import org.ray.api.id.ObjectId; /** - * RayActor implementation for local mode. + * Implementation of actor handle for local mode. */ public class LocalModeRayActor implements RayActor, Externalizable { diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java index 89a3b1b4d..f7ebec3e7 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java @@ -6,8 +6,8 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.List; +import org.ray.api.BaseActor; import org.ray.api.Ray; -import org.ray.api.RayActor; import org.ray.api.id.ActorId; import org.ray.api.runtime.RayRuntime; import org.ray.runtime.RayMultiWorkerNativeRuntime; @@ -15,10 +15,10 @@ import org.ray.runtime.RayNativeRuntime; import org.ray.runtime.generated.Common.Language; /** - * RayActor abstract language-independent implementation for cluster mode. This is a wrapper class - * for C++ ActorHandle. + * Abstract and language-independent implementation of actor handle for cluster mode. This is a + * wrapper class for C++ ActorHandle. */ -public abstract class NativeRayActor implements RayActor, Externalizable { +public abstract class NativeRayActor implements BaseActor, Externalizable { /** * Address of core worker. diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java index bb2d669c9..a6b096e5f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayJavaActor.java @@ -3,12 +3,13 @@ package org.ray.runtime.actor; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.ObjectInput; +import org.ray.api.RayActor; import org.ray.runtime.generated.Common.Language; /** - * RayActor Java implementation for cluster mode. + * Java implementation of actor handle for cluster mode. */ -public class NativeRayJavaActor extends NativeRayActor { +public class NativeRayJavaActor extends NativeRayActor implements RayActor { NativeRayJavaActor(long nativeCoreWorkerPointer, byte[] actorId) { super(nativeCoreWorkerPointer, actorId); diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java index 157f25ed8..ac97bb608 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayPyActor.java @@ -7,7 +7,7 @@ import org.ray.api.RayPyActor; import org.ray.runtime.generated.Common.Language; /** - * RayActor Python implementation for cluster mode. + * Python actor handle implementation for cluster mode. */ public class NativeRayPyActor extends NativeRayActor implements RayPyActor { diff --git a/java/runtime/src/main/java/org/ray/runtime/task/LocalModeTaskSubmitter.java b/java/runtime/src/main/java/org/ray/runtime/task/LocalModeTaskSubmitter.java index 8498dfd8a..6e5346d7d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/LocalModeTaskSubmitter.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/LocalModeTaskSubmitter.java @@ -17,7 +17,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; import org.ray.api.id.ActorId; import org.ray.api.id.ObjectId; import org.ray.api.id.TaskId; @@ -188,7 +188,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { } @Override - public RayActor createActor(FunctionDescriptor functionDescriptor, List args, + public BaseActor createActor(FunctionDescriptor functionDescriptor, List args, ActorCreationOptions options) { ActorId actorId = ActorId.fromRandom(); TaskSpec taskSpec = getTaskSpecBuilder(TaskType.ACTOR_CREATION_TASK, functionDescriptor, args) @@ -203,7 +203,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { @Override public List submitActorTask( - RayActor actor, FunctionDescriptor functionDescriptor, + BaseActor actor, FunctionDescriptor functionDescriptor, List args, int numReturns, CallOptions options) { Preconditions.checkState(numReturns <= 1); TaskSpec.Builder builder = getTaskSpecBuilder(TaskType.ACTOR_TASK, functionDescriptor, args); diff --git a/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java b/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java index 18edd39c2..eed1d9262 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java @@ -3,7 +3,7 @@ package org.ray.runtime.task; import com.google.common.base.Preconditions; import java.util.List; import java.util.stream.Collectors; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; import org.ray.api.id.ObjectId; import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.CallOptions; @@ -33,7 +33,7 @@ public class NativeTaskSubmitter implements TaskSubmitter { } @Override - public RayActor createActor(FunctionDescriptor functionDescriptor, List args, + public BaseActor createActor(FunctionDescriptor functionDescriptor, List args, ActorCreationOptions options) { byte[] actorId = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args, options); @@ -43,7 +43,7 @@ public class NativeTaskSubmitter implements TaskSubmitter { @Override public List submitActorTask( - RayActor actor, FunctionDescriptor functionDescriptor, + BaseActor actor, FunctionDescriptor functionDescriptor, List args, int numReturns, CallOptions options) { Preconditions.checkState(actor instanceof NativeRayActor); List returnIds = nativeSubmitActorTask(nativeCoreWorkerPointer, diff --git a/java/runtime/src/main/java/org/ray/runtime/task/TaskSubmitter.java b/java/runtime/src/main/java/org/ray/runtime/task/TaskSubmitter.java index d7f825616..73ab9d8cb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/TaskSubmitter.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/TaskSubmitter.java @@ -1,7 +1,7 @@ package org.ray.runtime.task; import java.util.List; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; import org.ray.api.id.ObjectId; import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.CallOptions; @@ -30,7 +30,7 @@ public interface TaskSubmitter { * @param options Options for this actor creation task. * @return Handle to the actor. */ - RayActor createActor(FunctionDescriptor functionDescriptor, List args, + BaseActor createActor(FunctionDescriptor functionDescriptor, List args, ActorCreationOptions options); /** @@ -42,6 +42,6 @@ public interface TaskSubmitter { * @param options Options for this task. * @return Ids of the return objects. */ - List submitActorTask(RayActor actor, FunctionDescriptor functionDescriptor, + List submitActorTask(BaseActor actor, FunctionDescriptor functionDescriptor, List args, int numReturns, CallOptions options); } diff --git a/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java b/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java index 09943ba62..891ba0568 100644 --- a/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java +++ b/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java @@ -4,11 +4,10 @@ import java.io.File; import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Paths; - import javax.tools.JavaCompiler; import javax.tools.ToolProvider; - import org.apache.commons.io.FileUtils; +import org.ray.api.BaseActor; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; @@ -141,7 +140,7 @@ public class ClassLoaderTest extends BaseTest { private RayObject callActorFunction(RayActor rayActor, FunctionDescriptor functionDescriptor, Object[] args, int numReturns) throws Exception { Method callActorFunctionMethod = AbstractRayRuntime.class.getDeclaredMethod("callActorFunction", - RayActor.class, FunctionDescriptor.class, Object[].class, int.class); + BaseActor.class, FunctionDescriptor.class, Object[].class, int.class); callActorFunctionMethod.setAccessible(true); return (RayObject) callActorFunctionMethod .invoke(TestUtils.getRuntime(), rayActor, functionDescriptor, args, numReturns); diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java index 104ed2853..4f37a0134 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java @@ -6,7 +6,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; /** * Physical execution graph. @@ -18,19 +18,19 @@ import org.ray.api.RayActor; public class ExecutionGraph implements Serializable { private long buildTime; private List executionNodeList; - private List sourceWorkers = new ArrayList<>(); - private List sinkWorkers = new ArrayList<>(); + private List sourceWorkers = new ArrayList<>(); + private List sinkWorkers = new ArrayList<>(); public ExecutionGraph(List executionNodes) { this.executionNodeList = executionNodes; for (ExecutionNode executionNode : executionNodeList) { if (executionNode.getNodeType() == ExecutionNode.NodeType.SOURCE) { - List actors = executionNode.getExecutionTasks().stream() + List actors = executionNode.getExecutionTasks().stream() .map(ExecutionTask::getWorker).collect(Collectors.toList()); sourceWorkers.addAll(actors); } if (executionNode.getNodeType() == ExecutionNode.NodeType.SINK) { - List actors = executionNode.getExecutionTasks().stream() + List actors = executionNode.getExecutionTasks().stream() .map(ExecutionTask::getWorker).collect(Collectors.toList()); sinkWorkers.addAll(actors); } @@ -38,11 +38,11 @@ public class ExecutionGraph implements Serializable { buildTime = System.currentTimeMillis(); } - public List getSourceWorkers() { + public List getSourceWorkers() { return sourceWorkers; } - public List getSinkWorkers() { + public List getSinkWorkers() { return sinkWorkers; } @@ -81,10 +81,10 @@ public class ExecutionGraph implements Serializable { throw new RuntimeException("Task " + taskId + " does not exist!"); } - public Map getTaskId2WorkerByNodeId(int nodeId) { + public Map getTaskId2WorkerByNodeId(int nodeId) { for (ExecutionNode executionNode : executionNodeList) { if (executionNode.getNodeId() == nodeId) { - Map taskId2Worker = new HashMap<>(); + Map taskId2Worker = new HashMap<>(); for (ExecutionTask executionTask : executionNode.getExecutionTasks()) { taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker()); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java index 6337bee51..3d067d733 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java @@ -1,7 +1,7 @@ package org.ray.streaming.runtime.core.graph; import java.io.Serializable; -import org.ray.api.RayActor; +import org.ray.api.BaseActor; /** * ExecutionTask is minimal execution unit. @@ -11,9 +11,9 @@ import org.ray.api.RayActor; public class ExecutionTask implements Serializable { private int taskId; private int taskIndex; - private RayActor worker; + private BaseActor worker; - public ExecutionTask(int taskId, int taskIndex, RayActor worker) { + public ExecutionTask(int taskId, int taskIndex, BaseActor worker) { this.taskId = taskId; this.taskIndex = taskIndex; this.worker = worker; @@ -35,11 +35,11 @@ public class ExecutionTask implements Serializable { this.taskIndex = taskIndex; } - public RayActor getWorker() { + public BaseActor getWorker() { return worker; } - public void setWorker(RayActor worker) { + public void setWorker(BaseActor worker) { this.worker = worker; } } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java index 08b717277..0b4900a84 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java @@ -3,6 +3,7 @@ package org.ray.streaming.runtime.schedule; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.ray.api.BaseActor; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; @@ -57,7 +58,7 @@ public class JobSchedulerImpl implements JobScheduler { List executionTasks = executionNode.getExecutionTasks(); for (ExecutionTask executionTask : executionTasks) { int taskId = executionTask.getTaskId(); - RayActor worker = executionTask.getWorker(); + BaseActor worker = executionTask.getWorker(); switch (executionNode.getLanguage()) { case JAVA: RayActor jobWorker = (RayActor) worker; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java index 1f2027948..43c12eafc 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java @@ -4,8 +4,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.ray.api.BaseActor; import org.ray.api.Ray; -import org.ray.api.RayActor; import org.ray.streaming.jobgraph.JobEdge; import org.ray.streaming.jobgraph.JobGraph; import org.ray.streaming.jobgraph.JobVertex; @@ -58,7 +58,7 @@ public class TaskAssignerImpl implements TaskAssigner { return new ExecutionGraph(executionNodes); } - private RayActor createWorker(JobVertex jobVertex) { + private BaseActor createWorker(JobVertex jobVertex) { switch (jobVertex.getLanguage()) { case PYTHON: return Ray.createPyActor( diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java index f4a8c5d7a..8a3695bda 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/tasks/StreamTask.java @@ -4,8 +4,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.ray.api.BaseActor; import org.ray.api.Ray; -import org.ray.api.RayActor; import org.ray.api.id.ActorId; import org.ray.streaming.api.collector.Collector; import org.ray.streaming.api.context.RuntimeContext; @@ -65,7 +65,7 @@ public abstract class StreamTask implements Runnable { List collectors = new ArrayList<>(); for (ExecutionEdge edge : outputEdges) { Map outputActorIds = new HashMap<>(); - Map taskId2Worker = executionGraph + Map taskId2Worker = executionGraph .getTaskId2WorkerByNodeId(edge.getTargetNodeId()); taskId2Worker.forEach((targetTaskId, targetActor) -> { String queueName = ChannelID.genIdStr(taskId, targetTaskId, executionGraph.getBuildTime()); @@ -91,7 +91,7 @@ public abstract class StreamTask implements Runnable { List inputEdges = executionNode.getInputsEdges(); Map inputActorIds = new HashMap<>(); for (ExecutionEdge edge : inputEdges) { - Map taskId2Worker = executionGraph + Map taskId2Worker = executionGraph .getTaskId2WorkerByNodeId(edge.getSrcNodeId()); taskId2Worker.forEach((srcTaskId, srcActor) -> { String queueName = ChannelID.genIdStr(srcTaskId, taskId, executionGraph.getBuildTime());