From dcb744518eb526896b954a3cab46ae206c9b7104 Mon Sep 17 00:00:00 2001 From: Wang Qing Date: Thu, 24 Jan 2019 03:56:25 +0800 Subject: [PATCH] Implement actor dummy object gc in java (#3822) * Add dummy object gc in java * Fix * Address comments. * Refine * Address comments. --- .../java/org/ray/api/runtime/RayRuntime.java | 2 +- .../org/ray/runtime/AbstractRayRuntime.java | 10 +++++++--- .../java/org/ray/runtime/RayActorImpl.java | 20 +++++++++++++++++++ .../ray/runtime/raylet/RayletClientImpl.java | 7 ++++++- .../java/org/ray/runtime/task/TaskSpec.java | 7 +++---- 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 7c12c3543..242190802 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -81,7 +81,7 @@ public interface RayRuntime { * @param args The arguments of the remote function. * @return The result object. */ - RayObject call(RayFunc func, RayActor actor, Object[] args); + RayObject call(RayFunc func, RayActor actor, Object[] args); /** * Create an actor on a remote node. diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 049e4a9c0..2944da07e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -221,16 +221,17 @@ public abstract class AbstractRayRuntime implements RayRuntime { } @Override - public RayObject call(RayFunc func, RayActor actor, Object[] args) { + public RayObject call(RayFunc func, RayActor actor, Object[] args) { if (!(actor instanceof RayActorImpl)) { throw new IllegalArgumentException("Unsupported actor type: " + actor.getClass().getName()); } - RayActorImpl actorImpl = (RayActorImpl)actor; + RayActorImpl actorImpl = (RayActorImpl) actor; TaskSpec spec; synchronized (actor) { spec = createTaskSpec(func, actorImpl, args, false, null); spec.getExecutionDependencies().add(((RayActorImpl) actor).getTaskCursor()); actorImpl.setTaskCursor(spec.returnIds[1]); + actorImpl.clearNewActorHandles(); } rayletClient.submitTask(spec); return new RayObjectImpl(spec.returnIds[0]); @@ -257,7 +258,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { * @param isActorCreationTask Whether this task is an actor creation task. * @return A TaskSpec object. */ - private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args, + private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args, boolean isActorCreationTask, BaseTaskOptions taskOptions) { UniqueId taskId = rayletClient.generateTaskId(workerContext.getCurrentDriverId(), workerContext.getCurrentTaskId(), workerContext.nextTaskIndex()); @@ -285,7 +286,9 @@ public abstract class AbstractRayRuntime implements RayRuntime { if (taskOptions instanceof ActorCreationOptions) { maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions; } + RayFunction rayFunction = functionManager.getFunction(workerContext.getCurrentDriverId(), func); + return new TaskSpec( workerContext.getCurrentDriverId(), taskId, @@ -296,6 +299,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { actor.getId(), actor.getHandleId(), actor.increaseTaskCounter(), + actor.getNewActorHandles().toArray(new UniqueId[0]), ArgumentsBuilder.wrap(args), returnIds, resources, diff --git a/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java b/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java index 64fe45a86..2d86449c5 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java @@ -4,6 +4,8 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; import org.ray.api.RayActor; import org.ray.api.id.UniqueId; import org.ray.runtime.util.Sha1Digestor; @@ -29,6 +31,14 @@ public final class RayActorImpl implements RayActor, Externalizable { */ private int numForks; + /** + * The new actor handles that were created from this handle + * since the last task on this handle was submitted. This is + * used to garbage-collect dummy objects that are no longer + * necessary in the backend. + */ + private List newActorHandles; + public RayActorImpl() { this(UniqueId.NIL, UniqueId.NIL); } @@ -42,6 +52,7 @@ public final class RayActorImpl implements RayActor, Externalizable { this.handleId = handleId; this.taskCounter = 0; this.taskCursor = null; + this.newActorHandles = new ArrayList<>(); numForks = 0; } @@ -59,6 +70,14 @@ public final class RayActorImpl implements RayActor, Externalizable { this.taskCursor = taskCursor; } + public List getNewActorHandles() { + return this.newActorHandles; + } + + public void clearNewActorHandles() { + this.newActorHandles.clear(); + } + public UniqueId getTaskCursor() { return taskCursor; } @@ -74,6 +93,7 @@ public final class RayActorImpl implements RayActor, Externalizable { ret.numForks = 0; ret.taskCursor = this.taskCursor; ret.handleId = this.computeNextActorHandleId(); + newActorHandles.add(ret.handleId); return ret; } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index 2a58ea051..f4194bffb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -142,6 +142,11 @@ public class RayletClientImpl implements RayletClient { UniqueId actorId = UniqueId.fromByteBuffer(info.actorIdAsByteBuffer()); UniqueId actorHandleId = UniqueId.fromByteBuffer(info.actorHandleIdAsByteBuffer()); int actorCounter = info.actorCounter(); + + // Deserialize new actor handles + UniqueId[] newActorHandles = UniqueIdUtil.getUniqueIdsFromByteBuffer( + info.newActorHandlesAsByteBuffer()); + // Deserialize args FunctionArg[] args = new FunctionArg[info.argsLength()]; for (int i = 0; i < info.argsLength(); i++) { @@ -175,7 +180,7 @@ public class RayletClientImpl implements RayletClient { info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2) ); return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId, - maxActorReconstructions, actorId, actorHandleId, actorCounter, + maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles, args, returnIds, resources, functionDescriptor); } diff --git a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java index fd919f625..1e205b99b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java @@ -68,8 +68,8 @@ public class TaskSpec { public TaskSpec(UniqueId driverId, UniqueId taskId, UniqueId parentTaskId, int parentCounter, UniqueId actorCreationId, int maxActorReconstructions, UniqueId actorId, - UniqueId actorHandleId, int actorCounter, FunctionArg[] args, UniqueId[] returnIds, - Map resources, FunctionDescriptor functionDescriptor) { + UniqueId actorHandleId, int actorCounter, UniqueId[] newActorHandles, FunctionArg[] args, + UniqueId[] returnIds, Map resources, FunctionDescriptor functionDescriptor) { this.driverId = driverId; this.taskId = taskId; this.parentTaskId = parentTaskId; @@ -79,8 +79,7 @@ public class TaskSpec { this.actorId = actorId; this.actorHandleId = actorHandleId; this.actorCounter = actorCounter; - // TODO: Initialize the new actor handles. - this.newActorHandles = new UniqueId[] {}; + this.newActorHandles = newActorHandles; this.args = args; this.returnIds = returnIds; this.resources = resources;