diff --git a/java/api/src/main/java/io/ray/api/Checkpointable.java b/java/api/src/main/java/io/ray/api/Checkpointable.java deleted file mode 100644 index 77524a1b3..000000000 --- a/java/api/src/main/java/io/ray/api/Checkpointable.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.ray.api; - -import io.ray.api.id.ActorId; -import io.ray.api.id.UniqueId; -import java.util.List; - -public interface Checkpointable { - - class CheckpointContext { - - /** - * Actor's ID. - */ - public final ActorId actorId; - /** - * Number of tasks executed since last checkpoint. - */ - public final int numTasksSinceLastCheckpoint; - /** - * Time elapsed since last checkpoint, in milliseconds. - */ - public final long timeElapsedMsSinceLastCheckpoint; - - public CheckpointContext(ActorId actorId, int numTasksSinceLastCheckpoint, - long timeElapsedMsSinceLastCheckpoint) { - this.actorId = actorId; - this.numTasksSinceLastCheckpoint = numTasksSinceLastCheckpoint; - this.timeElapsedMsSinceLastCheckpoint = timeElapsedMsSinceLastCheckpoint; - } - } - - class Checkpoint { - - /** - * Checkpoint's ID. - */ - public final UniqueId checkpointId; - /** - * Checkpoint's timestamp. - */ - public final long timestamp; - - public Checkpoint(UniqueId checkpointId, long timestamp) { - this.checkpointId = checkpointId; - this.timestamp = timestamp; - } - } - - /** - * Whether this actor needs to be checkpointed. - * - * This method will be called after every task. You should implement this callback to decide - * whether this actor needs to be checkpointed at this time, based on the checkpoint context, or - * any other factors. - * - * @param checkpointContext An object that contains info about last checkpoint. - * @return A boolean value that indicates whether this actor needs to be checkpointed. - */ - boolean shouldCheckpoint(CheckpointContext checkpointContext); - - /** - * Save a checkpoint to persistent storage. - * - * If `shouldCheckpoint` returns true, this method will be called. You should implement this - * callback to save actor's checkpoint and the given checkpoint id to persistent storage. - * - * @param actorId Actor's ID. - * @param checkpointId An ID that represents this actor's current state in GCS. You should - * save this checkpoint ID together with actor's checkpoint data. - */ - void saveCheckpoint(ActorId actorId, UniqueId checkpointId); - - /** - * Load actor's previous checkpoint, and restore actor's state. - * - * This method will be called when an actor is restarted, after the actor's constructor. If the - * actor needs to restore from previous checkpoint, this function should restore actor's state and - * return the checkpoint ID. Otherwise, it should do nothing and return null. - * - * @param actorId Actor's ID. - * @param availableCheckpoints A list of available checkpoint IDs and their timestamps, sorted - * by timestamp in descending order. Note, this method must return the ID of one checkpoint in - * this list, or null. Otherwise, an exception will be thrown. - * @return The ID of the checkpoint from which the actor was resumed, or null if the actor should - * restart from the beginning. - */ - UniqueId loadCheckpoint(ActorId actorId, List availableCheckpoints); - - /** - * Delete an expired checkpoint; - * - * This method will be called when an checkpoint is expired. You should implement this method to - * delete your application checkpoint data. Note, the maximum number of checkpoints kept in the - * backend can be configured at `RayConfig.num_actor_checkpoints_to_keep`. - * - * @param actorId ID of the actor. - * @param checkpointId ID of the checkpoint that has expired. - */ - void checkpointExpired(ActorId actorId, UniqueId checkpointId); -} diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 97d98b137..9c5d10072 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -2,7 +2,6 @@ package io.ray.runtime.gcs; import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; -import io.ray.api.Checkpointable.Checkpoint; import io.ray.api.id.ActorId; import io.ray.api.id.BaseId; import io.ray.api.id.JobId; @@ -12,7 +11,6 @@ import io.ray.api.id.UniqueId; import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimecontext.NodeInfo; import io.ray.runtime.generated.Gcs; -import io.ray.runtime.generated.Gcs.ActorCheckpointIdData; import io.ray.runtime.generated.Gcs.GcsNodeInfo; import io.ray.runtime.generated.Gcs.TablePrefix; import io.ray.runtime.placementgroup.PlacementGroupUtils; @@ -175,33 +173,6 @@ public class GcsClient { return client.exists(key); } - /** - * Get the available checkpoints for the given actor ID. - */ - public List getCheckpointsForActor(ActorId actorId) { - List checkpoints = new ArrayList<>(); - byte[] result = globalStateAccessor.getActorCheckpointId(actorId); - if (result != null) { - ActorCheckpointIdData data = null; - try { - data = ActorCheckpointIdData.parseFrom(result); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException("Received invalid protobuf data from GCS."); - } - UniqueId[] checkpointIds = new UniqueId[data.getCheckpointIdsCount()]; - for (int i = 0; i < checkpointIds.length; i++) { - checkpointIds[i] = UniqueId - .fromByteBuffer(data.getCheckpointIds(i).asReadOnlyByteBuffer()); - } - - for (int i = 0; i < checkpointIds.length; i++) { - checkpoints.add(new Checkpoint(checkpointIds[i], data.getTimestamps(i))); - } - } - checkpoints.sort((x, y) -> Long.compare(y.timestamp, x.timestamp)); - return checkpoints; - } - public JobId nextJobId() { int jobCounter = (int) primary.incr("JobCounter".getBytes()); return JobId.fromInt(jobCounter); diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java index 4d57256b9..23516e547 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java @@ -125,17 +125,6 @@ public class GlobalStateAccessor { } } - /** - * @return An actor checkpoint id data with ActorCheckpointIdData protobuf schema. - */ - public byte[] getActorCheckpointId(ActorId actorId) { - // Fetch an actor checkpoint id with protobuf bytes format from GCS. - synchronized (GlobalStateAccessor.class) { - validateGlobalStateAccessorPointer(); - return this.nativeGetActorCheckpointId(globalStateAccessorNativePointer, actorId.getBytes()); - } - } - private void destroyGlobalStateAccessor() { synchronized (GlobalStateAccessor.class) { if (0 == globalStateAccessorNativePointer) { @@ -164,8 +153,6 @@ public class GlobalStateAccessor { private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId); - private native byte[] nativeGetActorCheckpointId(long nativePtr, byte[] actorId); - private native byte[] nativeGetPlacementGroupInfo(long nativePtr, byte[] placementGroupId); diff --git a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java index b45c8a9db..7d0eeaf57 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java @@ -1,6 +1,5 @@ package io.ray.runtime.task; -import io.ray.api.id.ActorId; import io.ray.api.id.UniqueId; import io.ray.runtime.RayRuntimeInternal; @@ -34,11 +33,4 @@ public class LocalModeTaskExecutor extends TaskExecutor { - // TODO(hchen): Use the C++ config. - private static final int NUM_ACTOR_CHECKPOINTS_TO_KEEP = 20; - static class NativeActorContext extends TaskExecutor.ActorContext { - - /** - * Number of tasks executed since last actor checkpoint. - */ - private int numTasksSinceLastCheckpoint = 0; - - /** - * IDs of this actor's previous checkpoints. - */ - private List checkpointIds; - - /** - * Timestamp of the last actor checkpoint. - */ - private long lastCheckpointTimestamp = 0; } public NativeTaskExecutor(RayRuntimeInternal runtime) { @@ -49,63 +24,4 @@ public class NativeTaskExecutor extends TaskExecutor checkpointIds = actorContext.checkpointIds; - checkpointIds.add(checkpointId); - if (checkpointIds.size() > NUM_ACTOR_CHECKPOINTS_TO_KEEP) { - ((Checkpointable) actor).checkpointExpired(actorId, checkpointIds.get(0)); - checkpointIds.remove(0); - } - checkpointable.saveCheckpoint(actorId, checkpointId); - } - - @Override - protected void maybeLoadCheckpoint(Object actor, ActorId actorId) { - if (!(actor instanceof Checkpointable)) { - return; - } - NativeActorContext actorContext = getActorContext(); - actorContext.numTasksSinceLastCheckpoint = 0; - actorContext.lastCheckpointTimestamp = System.currentTimeMillis(); - actorContext.checkpointIds = new ArrayList<>(); - List availableCheckpoints - = runtime.getGcsClient().getCheckpointsForActor(actorId); - if (availableCheckpoints.isEmpty()) { - return; - } - UniqueId checkpointId = ((Checkpointable) actor).loadCheckpoint(actorId, availableCheckpoints); - if (checkpointId != null) { - boolean checkpointValid = false; - for (Checkpoint checkpoint : availableCheckpoints) { - if (checkpoint.checkpointId.equals(checkpointId)) { - checkpointValid = true; - break; - } - } - Preconditions.checkArgument(checkpointValid, - "'loadCheckpoint' must return a checkpoint ID that exists in the " - + "'availableCheckpoints' list, or null."); - nativeNotifyActorResumedFromCheckpoint(checkpointId.getBytes()); - } - } - - private static native byte[] nativePrepareCheckpoint(); - - private static native void nativeNotifyActorResumedFromCheckpoint(byte[] checkpointId); } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java index e50f17d15..97d71b0bd 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java @@ -1,7 +1,6 @@ package io.ray.runtime.task; import com.google.common.base.Preconditions; -import io.ray.api.id.ActorId; import io.ray.api.id.JobId; import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; @@ -150,16 +149,10 @@ public abstract class TaskExecutor { } // Set result if (taskType != TaskType.ACTOR_CREATION_TASK) { - if (taskType == TaskType.ACTOR_TASK) { - // TODO (kfstorm): handle checkpoint in core worker. - maybeSaveCheckpoint(actor, runtime.getWorkerContext().getCurrentActorId()); - } if (rayFunction.hasReturn()) { returnObjects.add(ObjectSerializer.serialize(result)); } } else { - // TODO (kfstorm): handle checkpoint in core worker. - maybeLoadCheckpoint(result, runtime.getWorkerContext().getCurrentActorId()); actorContext.currentActor = result; } LOGGER.debug("Finished executing task {}", taskId); @@ -195,7 +188,4 @@ public abstract class TaskExecutor { rayFunctionInfo.get(2)); } - protected abstract void maybeSaveCheckpoint(Object actor, ActorId actorId); - - protected abstract void maybeLoadCheckpoint(Object actor, ActorId actorId); } diff --git a/java/test/src/main/java/io/ray/test/ActorRestartTest.java b/java/test/src/main/java/io/ray/test/ActorRestartTest.java index 3a299b046..f24bb5e34 100644 --- a/java/test/src/main/java/io/ray/test/ActorRestartTest.java +++ b/java/test/src/main/java/io/ray/test/ActorRestartTest.java @@ -1,14 +1,10 @@ package io.ray.test; import io.ray.api.ActorHandle; -import io.ray.api.Checkpointable; import io.ray.api.Ray; -import io.ray.api.id.ActorId; -import io.ray.api.id.UniqueId; import io.ray.runtime.exception.RayActorException; import io.ray.runtime.util.SystemUtil; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeUnit; import org.testng.Assert; import org.testng.annotations.Test; @@ -73,68 +69,5 @@ public class ActorRestartTest extends BaseTest { // We should receive a RayActorException because the actor is dead. } } - - public static class CheckpointableCounter extends Counter implements Checkpointable { - - private boolean resumedFromCheckpoint = false; - private boolean increaseCalled = false; - - @Override - public int increase() { - increaseCalled = true; - return super.increase(); - } - - public boolean wasResumedFromCheckpoint() { - return resumedFromCheckpoint; - } - - @Override - public boolean shouldCheckpoint(CheckpointContext checkpointContext) { - // Checkpoint the actor when value is increased to 3. - boolean shouldCheckpoint = increaseCalled && value == 3; - increaseCalled = false; - return shouldCheckpoint; - } - - @Override - public void saveCheckpoint(ActorId actorId, UniqueId checkpointId) { - // In practice, user should save the checkpoint id and data to a persistent store. - // But for simplicity, we don't do that in this unit test. - } - - @Override - public UniqueId loadCheckpoint(ActorId actorId, List availableCheckpoints) { - // Restore previous value and return checkpoint id. - this.value = 3; - this.resumedFromCheckpoint = true; - return availableCheckpoints.get(availableCheckpoints.size() - 1).checkpointId; - } - - @Override - public void checkpointExpired(ActorId actorId, UniqueId checkpointId) { - } - } - - public void testActorCheckpointing() throws IOException, InterruptedException { - ActorHandle actor = Ray.actor(CheckpointableCounter::new) - .setMaxRestarts(1).remote(); - // Call increase 3 times. - for (int i = 0; i < 3; i++) { - actor.task(CheckpointableCounter::increase).remote().get(); - } - // Assert that the actor wasn't resumed from a checkpoint. - Assert.assertFalse(actor.task(CheckpointableCounter::wasResumedFromCheckpoint).remote().get()); - int pid = actor.task(CheckpointableCounter::getPid).remote().get(); - Runtime.getRuntime().exec("kill -9 " + pid); - // Wait for the actor to be killed. - TimeUnit.SECONDS.sleep(1); - - // Try calling increase on this actor again and check the value is now 4. - int value = actor.task(CheckpointableCounter::increase).remote().get(); - Assert.assertEquals(value, 4); - // Assert that the actor was resumed from a checkpoint. - Assert.assertTrue(actor.task(CheckpointableCounter::wasResumedFromCheckpoint).remote().get()); - } } diff --git a/java/test/src/main/java/io/ray/test/ExitActorTest.java b/java/test/src/main/java/io/ray/test/ExitActorTest.java index a0cdc8336..1ea86d293 100644 --- a/java/test/src/main/java/io/ray/test/ExitActorTest.java +++ b/java/test/src/main/java/io/ray/test/ExitActorTest.java @@ -3,17 +3,13 @@ package io.ray.test; import static io.ray.runtime.util.SystemUtil.pid; import io.ray.api.ActorHandle; -import io.ray.api.Checkpointable; import io.ray.api.ObjectRef; import io.ray.api.Ray; -import io.ray.api.id.ActorId; -import io.ray.api.id.UniqueId; import io.ray.runtime.exception.RayActorException; import io.ray.runtime.task.TaskExecutor; import io.ray.runtime.util.SystemUtil; import java.io.IOException; import java.lang.reflect.Field; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.testng.Assert; @@ -22,7 +18,7 @@ import org.testng.annotations.Test; @Test(groups = {"cluster"}) public class ExitActorTest extends BaseTest { - private static class ExitingActor implements Checkpointable { + private static class ExitingActor { int counter = 0; @@ -45,26 +41,6 @@ public class ExitActorTest extends BaseTest { } } - @Override - public boolean shouldCheckpoint(CheckpointContext checkpointContext) { - return true; - } - - @Override - public void saveCheckpoint(ActorId actorId, UniqueId checkpointId) { - } - - @Override - public UniqueId loadCheckpoint(ActorId actorId, List availableCheckpoints) { - // Dummy load checkpoint. - this.counter = 1; - return availableCheckpoints.get(availableCheckpoints.size() - 1).checkpointId; - } - - @Override - public void checkpointExpired(ActorId actorId, UniqueId checkpointId) { - } - public boolean exit() { Ray.exitActor(); return false; @@ -79,7 +55,7 @@ public class ExitActorTest extends BaseTest { Runtime.getRuntime().exec("kill -9 " + pid); TimeUnit.SECONDS.sleep(1); // Make sure this actor can be reconstructed. - Assert.assertEquals(2, (int) actor.task(ExitingActor::incr).remote().get()); + Assert.assertEquals(1, (int) actor.task(ExitingActor::incr).remote().get()); // `exitActor` will exit the actor without reconstructing. ObjectRef obj = actor.task(ExitingActor::exit).remote(); diff --git a/python/ray/__init__.py b/python/ray/__init__.py index afab79248..23b8ac2a7 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -63,7 +63,6 @@ if os.path.exists(so_path): import ray._raylet # noqa: E402 from ray._raylet import ( - ActorCheckpointID, ActorClassID, ActorID, NodeID, @@ -149,7 +148,6 @@ __all__ = [ # ID types __all__ += [ - "ActorCheckpointID", "ActorClassID", "ActorID", "NodeID", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 67cfeac61..757e9794d 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -73,7 +73,6 @@ from ray.includes.common cimport ( ) from ray.includes.unique_ids cimport ( CActorID, - CActorCheckpointID, CObjectID, CNodeID, CPlacementGroupID, @@ -357,11 +356,6 @@ cdef execute_task( actor_class = manager.load_actor_class(job_id, function_descriptor) actor_id = core_worker.get_actor_id() worker.actors[actor_id] = actor_class.__new__(actor_class) - worker.actor_checkpoint_info[actor_id] = ( - ray.worker.ActorCheckpointInfo( - num_tasks_since_last_checkpoint=0, - last_checkpoint_timestamp=int(1000 * time.time()), - checkpoint_ids=[])) execution_info = execution_infos.get(function_descriptor) if not execution_info: @@ -1470,26 +1464,6 @@ cdef class CoreWorker: job_id.native(), error_type.encode("ascii"), error_message.encode("ascii"), timestamp)) - def prepare_actor_checkpoint(self, ActorID actor_id): - cdef: - CActorCheckpointID checkpoint_id - CActorID c_actor_id = actor_id.native() - - # PrepareActorCheckpoint will wait for raylet's reply, release - # the GIL so other Python threads can run. - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker() - .PrepareActorCheckpoint(c_actor_id, &checkpoint_id)) - return ActorCheckpointID(checkpoint_id.Binary()) - - def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, - ActorCheckpointID checkpoint_id): - check_status( - CCoreWorkerProcess.GetCoreWorker() - .NotifyActorResumedFromCheckpoint( - actor_id.native(), checkpoint_id.native())) - def set_resource(self, basestring resource_name, double capacity, NodeID client_id): CCoreWorkerProcess.GetCoreWorker().SetResource( diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index d5e30f698..ef95b4c0a 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -1,6 +1,5 @@ from ray.core.generated.common_pb2 import ErrorType from ray.core.generated.gcs_pb2 import ( - ActorCheckpointIdData, ActorTableData, GcsNodeInfo, AvailableResources, @@ -26,7 +25,6 @@ from ray.core.generated.gcs_pb2 import ( ) __all__ = [ - "ActorCheckpointIdData", "ActorTableData", "GcsNodeInfo", "AvailableResources", diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index fa25d6f66..7276ce43f 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -13,7 +13,6 @@ from libcpp.vector cimport vector as c_vector from ray.includes.unique_ids cimport ( CActorID, - CActorCheckpointID, CNodeID, CJobID, CTaskID, @@ -194,10 +193,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus PushError(const CJobID &job_id, const c_string &type, const c_string &error_message, double timestamp) - CRayStatus PrepareActorCheckpoint(const CActorID &actor_id, - CActorCheckpointID *checkpoint_id) - CRayStatus NotifyActorResumedFromCheckpoint( - const CActorID &actor_id, const CActorCheckpointID &checkpoint_id) CRayStatus SetResource(const c_string &resource_name, const double capacity, const CNodeID &client_Id) diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index f0c444a29..86cfa9817 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -40,11 +40,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: @staticmethod size_t Size() - cdef cppclass CActorCheckpointID "ray::ActorCheckpointID"(CUniqueID): - - @staticmethod - CActorCheckpointID FromBinary(const c_string &binary) - cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID): @staticmethod diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index d98179f2d..bcf766829 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -9,7 +9,6 @@ See https://github.com/ray-project/ray/issues/3721. import os from ray.includes.unique_ids cimport ( - CActorCheckpointID, CActorClassID, CActorID, CNodeID, @@ -303,16 +302,6 @@ cdef class ActorID(BaseID): return self.data.Hash() -cdef class ActorCheckpointID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CActorCheckpointID.FromBinary(id) - - cdef CActorCheckpointID native(self): - return self.data - - cdef class FunctionID(UniqueID): def __init__(self, id): @@ -373,7 +362,6 @@ cdef class PlacementGroupID(BaseID): return self.data.Hash() _ID_TYPES = [ - ActorCheckpointID, ActorClassID, ActorID, NodeID, diff --git a/python/ray/state.py b/python/ray/state.py index eb69eb333..6d9df7870 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -832,37 +832,6 @@ class GlobalState: return dict(total_available_resources) - def actor_checkpoint_info(self, actor_id): - """Get checkpoint info for the given actor id. - Args: - actor_id: Actor's ID. - Returns: - A dictionary with information about the actor's checkpoint IDs and - their timestamps. - """ - self._check_connected() - message = self._execute_command( - actor_id, - "RAY.TABLE_LOOKUP", - gcs_utils.TablePrefix.Value("ACTOR_CHECKPOINT_ID"), - "", - actor_id.binary(), - ) - if message is None: - return None - gcs_entry = gcs_utils.GcsEntry.FromString(message) - entry = gcs_utils.ActorCheckpointIdData.FromString( - gcs_entry.entries[0]) - checkpoint_ids = [ - ray.ActorCheckpointID(checkpoint_id) - for checkpoint_id in entry.checkpoint_ids - ] - return { - "ActorID": ray.utils.binary_to_hex(entry.actor_id), - "CheckpointIds": checkpoint_ids, - "Timestamps": list(entry.timestamps), - } - state = GlobalState() """A global object used to access the cluster's global state.""" diff --git a/python/ray/tests/test_runtime_context.py b/python/ray/tests/test_runtime_context.py index 026357628..aa56df592 100644 --- a/python/ray/tests/test_runtime_context.py +++ b/python/ray/tests/test_runtime_context.py @@ -23,19 +23,6 @@ def test_was_current_actor_reconstructed(shutdown_only): def get_pid(self): return os.getpid() - # The following methods is to apply the checkpointable interface. - def should_checkpoint(self, checkpoint_context): - return False - - def save_checkpoint(self, actor_id, checkpoint_id): - pass - - def load_checkpoint(self, actor_id, available_checkpoints): - pass - - def checkpoint_expired(self, actor_id, checkpoint_id): - pass - a = A.remote() # `was_reconstructed` should be False when it's called in actor. assert ray.get(a.get_was_reconstructed.remote()) is False diff --git a/python/ray/worker.py b/python/ray/worker.py index e05340f0a..9960f811a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -64,25 +64,6 @@ ERROR_KEY_PREFIX = b"Error:" logger = logging.getLogger(__name__) -class ActorCheckpointInfo: - """Information used to maintain actor checkpoints.""" - - __slots__ = [ - # Number of tasks executed since last checkpoint. - "num_tasks_since_last_checkpoint", - # Timestamp of the last checkpoint, in milliseconds. - "last_checkpoint_timestamp", - # IDs of the previous checkpoints. - "checkpoint_ids", - ] - - def __init__(self, num_tasks_since_last_checkpoint, - last_checkpoint_timestamp, checkpoint_ids): - self.num_tasks_since_last_checkpoint = num_tasks_since_last_checkpoint - self.last_checkpoint_timestamp = last_checkpoint_timestamp - self.checkpoint_ids = checkpoint_ids - - class Worker: """A class used to define the control flow of a worker process. @@ -106,8 +87,6 @@ class Worker: self.cached_functions_to_run = [] self.actor_init_error = None self.actors = {} - # Information used to maintain actor checkpoints. - self.actor_checkpoint_info = {} # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() diff --git a/src/ray/common/id_def.h b/src/ray/common/id_def.h index ef82aba3b..e396d1d53 100644 --- a/src/ray/common/id_def.h +++ b/src/ray/common/id_def.h @@ -20,7 +20,6 @@ DEFINE_UNIQUE_ID(FunctionID) DEFINE_UNIQUE_ID(ActorClassID) -DEFINE_UNIQUE_ID(ActorCheckpointID) DEFINE_UNIQUE_ID(WorkerID) DEFINE_UNIQUE_ID(ConfigID) DEFINE_UNIQUE_ID(NodeID) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5b5c4a320..97eb0dc83 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1171,16 +1171,6 @@ Status CoreWorker::PushError(const JobID &job_id, const std::string &type, return local_raylet_client_->PushError(job_id, type, error_message, timestamp); } -Status CoreWorker::PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID *checkpoint_id) { - return local_raylet_client_->PrepareActorCheckpoint(actor_id, checkpoint_id); -} - -Status CoreWorker::NotifyActorResumedFromCheckpoint( - const ActorID &actor_id, const ActorCheckpointID &checkpoint_id) { - return local_raylet_client_->NotifyActorResumedFromCheckpoint(actor_id, checkpoint_id); -} - Status CoreWorker::SetResource(const std::string &resource_name, const double capacity, const NodeID &node_id) { return local_raylet_client_->SetResource(resource_name, capacity, node_id); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bac35ef7c..057060ea8 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -603,23 +603,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status PushError(const JobID &job_id, const std::string &type, const std::string &error_message, double timestamp); - /// Request raylet backend to prepare a checkpoint for an actor. - /// - /// \param[in] actor_id ID of the actor. - /// \param[out] checkpoint_id ID of the new checkpoint (output parameter). - /// \return Status. - Status PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID *checkpoint_id); - - /// Notify raylet backend that an actor was resumed from a checkpoint. - /// - /// \param[in] actor_id ID of the actor. - /// \param[in] checkpoint_id ID of the checkpoint from which the actor was resumed. - /// \return Status. - Status NotifyActorResumedFromCheckpoint(const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id); - - /// Sets a resource with the specified capacity and node id + /// Sets a resource with the specified capacity and client id /// \param[in] resource_name Name of the resource to be set. /// \param[in] capacity Capacity of the resource. /// \param[in] node_id NodeID where the resource is to be set. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc index ec94fa334..3ad794918 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc @@ -116,19 +116,6 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorInfo(JNIEnv *env, jobj return nullptr; } -JNIEXPORT jbyteArray JNICALL -Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorCheckpointId( - JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray actorId) { - const auto actor_id = JavaByteArrayToId(env, actorId); - auto *gcs_accessor = - reinterpret_cast(gcs_accessor_ptr); - auto actor_checkpoint_id = gcs_accessor->GetActorCheckpointId(actor_id); - if (actor_checkpoint_id) { - return NativeStringToJavaByteArray(env, *actor_checkpoint_id); - } - return nullptr; -} - JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfo( JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray placement_group_id_bytes) { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h index 0bc2dd19b..a8d743632 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h @@ -103,15 +103,6 @@ JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorInfo(JNIEnv *, jobject, jlong, jbyteArray); -/* - * Class: io_ray_runtime_gcs_GlobalStateAccessor - * Method: nativeGetActorCheckpointId - * Signature: (J[B)[B - */ -JNIEXPORT jbyteArray JNICALL -Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorCheckpointId(JNIEnv *, jobject, - jlong, jbyteArray); - /* * Class: io_ray_runtime_gcs_GlobalStateAccessor * Method: nativeGetPlacementGroupInfo diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc index b4403b8cb..196458a98 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc @@ -28,29 +28,6 @@ extern "C" { using ray::NodeID; -JNIEXPORT jbyteArray JNICALL -Java_io_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint(JNIEnv *env, jclass) { - auto &core_worker = ray::CoreWorkerProcess::GetCoreWorker(); - const auto &actor_id = core_worker.GetWorkerContext().GetCurrentActorID(); - const auto &task_spec = core_worker.GetWorkerContext().GetCurrentTask(); - RAY_CHECK(task_spec->IsActorTask()); - ActorCheckpointID checkpoint_id; - auto status = core_worker.PrepareActorCheckpoint(actor_id, &checkpoint_id); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); - return IdToJavaByteArray(env, checkpoint_id); -} - -JNIEXPORT void JNICALL -Java_io_ray_runtime_task_NativeTaskExecutor_nativeNotifyActorResumedFromCheckpoint( - JNIEnv *env, jclass, jbyteArray checkpointId) { - const auto &actor_id = - ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID(); - const auto checkpoint_id = JavaByteArrayToId(env, checkpointId); - auto status = ray::CoreWorkerProcess::GetCoreWorker().NotifyActorResumedFromCheckpoint( - actor_id, checkpoint_id); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); -} - #ifdef __cplusplus } #endif diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 484318df4..a78c3c4cc 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -111,38 +111,6 @@ class ActorInfoAccessor { /// \return Status virtual Status AsyncUnsubscribe(const ActorID &actor_id) = 0; - /// Add actor checkpoint data to GCS asynchronously. - /// - /// \param data_ptr The checkpoint data that will be added to GCS. - /// \param callback The callback that will be called after add finishes. - /// \return Status - /// TODO(micafan) When the GCS backend is redis, - /// the checkpoint of the same actor needs to be updated serially, - /// otherwise the checkpoint may be overwritten. This issue will be resolved if - /// necessary. - virtual Status AsyncAddCheckpoint( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) = 0; - - /// Get actor checkpoint data from GCS asynchronously. - /// - /// \param checkpoint_id The ID of checkpoint to lookup in GCS. - /// \param actor_id The ID of actor that this checkpoint belongs to. - /// \param callback The callback that will be called after lookup finishes. - /// \return Status - virtual Status AsyncGetCheckpoint( - const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, - const OptionalItemCallback &callback) = 0; - - /// Get actor checkpoint id data from GCS asynchronously. - /// - /// \param actor_id The ID of actor to lookup in GCS. - /// \param callback The callback that will be called after lookup finishes. - /// \return Status - virtual Status AsyncGetCheckpointID( - const ActorID &actor_id, - const OptionalItemCallback &callback) = 0; - /// Reestablish subscription. /// This should be called when GCS server restarts from a failure. /// PubSub server restart will cause GCS server restart. In this case, we need to diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index ee6fd1243..a1162ced0 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -203,17 +203,6 @@ std::unique_ptr GlobalStateAccessor::GetActorInfo(const ActorID &ac return actor_table_data; } -std::unique_ptr GlobalStateAccessor::GetActorCheckpointId( - const ActorID &actor_id) { - std::unique_ptr actor_checkpoint_id_data; - std::promise promise; - RAY_CHECK_OK(gcs_client_->Actors().AsyncGetCheckpointID( - actor_id, TransformForOptionalItemCallback( - actor_checkpoint_id_data, promise))); - promise.get_future().get(); - return actor_checkpoint_id_data; -} - std::unique_ptr GlobalStateAccessor::GetWorkerInfo( const WorkerID &worker_id) { std::unique_ptr worker_table_data; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 87456d607..01c3ebb12 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -122,14 +122,6 @@ class GlobalStateAccessor { /// protobuf function. std::unique_ptr GetActorInfo(const ActorID &actor_id); - /// Get checkpoint id of an actor from GCS Service. - /// - /// \param actor_id The ID of actor to look up in the GCS Service. - /// \return Actor checkpoint id. To support multi-language, we serialize each - /// ActorCheckpointIdData and return the serialized string. Where used, it needs to be - /// deserialized with protobuf function. - std::unique_ptr GetActorCheckpointId(const ActorID &actor_id); - /// Get information of a worker from GCS Service. /// /// \param worker_id The ID of worker to look up in the GCS Service. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 3f1c75058..2abaf0783 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -299,83 +299,6 @@ Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) return status; } -Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) { - ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id()); - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(data_ptr->checkpoint_id()); - RAY_LOG(DEBUG) << "Adding actor checkpoint, actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id - << ", job id = " << actor_id.JobId(); - rpc::AddActorCheckpointRequest request; - request.mutable_checkpoint_data()->CopyFrom(*data_ptr); - - auto operation = [this, request, actor_id, checkpoint_id, - callback](const SequencerDoneCallback &done_callback) { - client_impl_->GetGcsRpcClient().AddActorCheckpoint( - request, [actor_id, checkpoint_id, callback, done_callback]( - const Status &status, const rpc::AddActorCheckpointReply &reply) { - if (callback) { - callback(status); - } - RAY_LOG(DEBUG) << "Finished adding actor checkpoint, status = " << status - << ", actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id - << ", job id = " << actor_id.JobId(); - done_callback(); - }); - }; - - sequencer_.Post(actor_id, operation); - return Status::OK(); -} - -Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint( - const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, - const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting actor checkpoint, checkpoint id = " << checkpoint_id - << ", job id = " << actor_id.JobId(); - rpc::GetActorCheckpointRequest request; - request.set_actor_id(actor_id.Binary()); - request.set_checkpoint_id(checkpoint_id.Binary()); - client_impl_->GetGcsRpcClient().GetActorCheckpoint( - request, [checkpoint_id, actor_id, callback]( - const Status &status, const rpc::GetActorCheckpointReply &reply) { - if (reply.has_checkpoint_data()) { - callback(status, reply.checkpoint_data()); - } else { - callback(status, boost::none); - } - RAY_LOG(DEBUG) << "Finished getting actor checkpoint, status = " << status - << ", checkpoint id = " << checkpoint_id - << ", job id = " << actor_id.JobId(); - }); - return Status::OK(); -} - -Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID( - const ActorID &actor_id, - const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting actor checkpoint id, actor id = " << actor_id - << ", job id = " << actor_id.JobId(); - rpc::GetActorCheckpointIDRequest request; - request.set_actor_id(actor_id.Binary()); - client_impl_->GetGcsRpcClient().GetActorCheckpointID( - request, [actor_id, callback](const Status &status, - const rpc::GetActorCheckpointIDReply &reply) { - if (reply.has_checkpoint_id_data()) { - callback(status, reply.checkpoint_id_data()); - } else { - callback(status, boost::none); - } - RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, status = " << status - << ", actor id = " << actor_id - << ", job id = " << actor_id.JobId(); - }); - return Status::OK(); -} - void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(DEBUG) << "Reestablishing subscription for actor info."; // If only the GCS sever has restarted, we only need to fetch data from the GCS server. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index e9840c536..1ad9da10f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -98,17 +98,6 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncUnsubscribe(const ActorID &actor_id) override; - Status AsyncAddCheckpoint(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGetCheckpoint( - const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetCheckpointID( - const ActorID &actor_id, - const OptionalItemCallback &callback) override; - void AsyncResubscribe(bool is_pubsub_server_restarted) override; bool IsActorUnsubscribed(const ActorID &actor_id) override; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 6623d4798..5b14f8f67 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -239,46 +239,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return actors; } - bool AddCheckpoint( - const std::shared_ptr &actor_checkpoint_data) { - std::promise promise; - RAY_CHECK_OK(gcs_client_->Actors().AsyncAddCheckpoint( - actor_checkpoint_data, - [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); - } - - rpc::ActorCheckpointData GetCheckpoint(const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id) { - std::promise promise; - rpc::ActorCheckpointData actor_checkpoint_data; - RAY_CHECK_OK(gcs_client_->Actors().AsyncGetCheckpoint( - checkpoint_id, actor_id, - [&actor_checkpoint_data, &promise]( - Status status, const boost::optional &result) { - assert(result); - actor_checkpoint_data.CopyFrom(*result); - promise.set_value(true); - })); - EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); - return actor_checkpoint_data; - } - - rpc::ActorCheckpointIdData GetCheckpointID(const ActorID &actor_id) { - std::promise promise; - rpc::ActorCheckpointIdData actor_checkpoint_id_data; - RAY_CHECK_OK(gcs_client_->Actors().AsyncGetCheckpointID( - actor_id, - [&actor_checkpoint_id_data, &promise]( - Status status, const boost::optional &result) { - assert(result); - actor_checkpoint_id_data.CopyFrom(*result); - promise.set_value(true); - })); - EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); - return actor_checkpoint_id_data; - } - bool SubscribeToNodeChange( const gcs::SubscribeCallback &subscribe) { std::promise promise; @@ -650,31 +610,6 @@ TEST_F(ServiceBasedGcsClientTest, TestActorInfo) { WaitForActorUnsubscribed(actor_id); } -TEST_F(ServiceBasedGcsClientTest, TestActorCheckpoint) { - // Create actor checkpoint data. - JobID job_id = JobID::FromInt(1); - auto actor_table_data = Mocker::GenActorTableData(job_id); - ActorID actor_id = ActorID::FromBinary(actor_table_data->actor_id()); - - ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); - auto checkpoint = std::make_shared(); - checkpoint->set_actor_id(actor_table_data->actor_id()); - checkpoint->set_checkpoint_id(checkpoint_id.Binary()); - checkpoint->set_execution_dependency(checkpoint_id.Binary()); - - // Add actor checkpoint data to GCS. - ASSERT_TRUE(AddCheckpoint(checkpoint)); - - // Get actor checkpoint data from GCS. - auto get_checkpoint_result = GetCheckpoint(actor_id, checkpoint_id); - ASSERT_TRUE(get_checkpoint_result.actor_id() == actor_id.Binary()); - - // Get actor checkpoint id data from GCS. - auto get_checkpoint_id_result = GetCheckpointID(actor_id); - ASSERT_TRUE(get_checkpoint_id_result.checkpoint_ids_size() == 1); - ASSERT_TRUE(get_checkpoint_id_result.checkpoint_ids(0) == checkpoint_id.Binary()); -} - TEST_F(ServiceBasedGcsClientTest, TestActorSubscribeAll) { // Create actor table data. JobID job_id = JobID::FromInt(1); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 4c15c689f..e30cf2569 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -271,116 +271,6 @@ void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &r ++counts_[CountType::UPDATE_ACTOR_INFO_REQUEST]; } -void GcsActorManager::HandleAddActorCheckpoint( - const rpc::AddActorCheckpointRequest &request, rpc::AddActorCheckpointReply *reply, - rpc::SendReplyCallback send_reply_callback) { - ActorID actor_id = ActorID::FromBinary(request.checkpoint_data().actor_id()); - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(request.checkpoint_data().checkpoint_id()); - RAY_LOG(DEBUG) << "Adding actor checkpoint, job id = " << actor_id.JobId() - << ", actor id = " << actor_id << ", checkpoint id = " << checkpoint_id; - auto on_done = [this, actor_id, checkpoint_id, reply, - send_reply_callback](const Status &status) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString() - << ", job id = " << actor_id.JobId() << ", actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id; - } else { - auto on_get_done = [this, actor_id, checkpoint_id, reply, send_reply_callback]( - const Status &status, - const boost::optional &result) { - ActorCheckpointIdData actor_checkpoint_id; - if (result) { - actor_checkpoint_id.CopyFrom(*result); - } else { - actor_checkpoint_id.set_actor_id(actor_id.Binary()); - } - actor_checkpoint_id.add_checkpoint_ids(checkpoint_id.Binary()); - actor_checkpoint_id.add_timestamps(absl::GetCurrentTimeNanos() / 1000000); - auto on_put_done = [actor_id, checkpoint_id, reply, - send_reply_callback](const Status &status) { - RAY_LOG(DEBUG) << "Finished adding actor checkpoint, job id = " - << actor_id.JobId() << ", actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointIdTable().Put( - actor_id, actor_checkpoint_id, on_put_done)); - }; - RAY_CHECK_OK( - gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_get_done)); - } - }; - - Status status = gcs_table_storage_->ActorCheckpointTable().Put( - checkpoint_id, request.checkpoint_data(), on_done); - if (!status.ok()) { - on_done(status); - } - ++counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST]; -} - -void GcsActorManager::HandleGetActorCheckpoint( - const rpc::GetActorCheckpointRequest &request, rpc::GetActorCheckpointReply *reply, - rpc::SendReplyCallback send_reply_callback) { - ActorID actor_id = ActorID::FromBinary(request.actor_id()); - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(request.checkpoint_id()); - RAY_LOG(DEBUG) << "Getting actor checkpoint, job id = " << actor_id.JobId() - << ", checkpoint id = " << checkpoint_id; - auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback]( - const Status &status, - const boost::optional &result) { - if (status.ok()) { - if (result) { - reply->mutable_checkpoint_data()->CopyFrom(*result); - } - RAY_LOG(DEBUG) << "Finished getting actor checkpoint, job id = " << actor_id.JobId() - << ", checkpoint id = " << checkpoint_id; - } else { - RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString() - << ", job id = " << actor_id.JobId() - << ", checkpoint id = " << checkpoint_id; - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - Status status = gcs_table_storage_->ActorCheckpointTable().Get(checkpoint_id, on_done); - if (!status.ok()) { - on_done(status, boost::none); - } - ++counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST]; -} - -void GcsActorManager::HandleGetActorCheckpointID( - const rpc::GetActorCheckpointIDRequest &request, - rpc::GetActorCheckpointIDReply *reply, rpc::SendReplyCallback send_reply_callback) { - ActorID actor_id = ActorID::FromBinary(request.actor_id()); - RAY_LOG(DEBUG) << "Getting actor checkpoint id, job id = " << actor_id.JobId() - << ", actor id = " << actor_id; - auto on_done = [actor_id, reply, send_reply_callback]( - const Status &status, - const boost::optional &result) { - if (status.ok()) { - if (result) { - reply->mutable_checkpoint_id_data()->CopyFrom(*result); - } - RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, job id = " - << actor_id.JobId() << ", actor id = " << actor_id; - } else { - RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString() - << ", job id = " << actor_id.JobId() << ", actor id = " << actor_id; - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - Status status = gcs_table_storage_->ActorCheckpointIdTable().Get(actor_id, on_done); - if (!status.ok()) { - on_done(status, boost::none); - } - ++counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST]; -} - Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, RegisterActorCallback success_callback) { // NOTE: After the abnormal recovery of the network between GCS client and GCS server or @@ -1033,32 +923,7 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) { } else { iter++; } - } - - // Get checkpoint id first from checkpoint id table and delete all checkpoints - // related to this job - RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointIdTable().GetByJobId( - job_id, [this, non_detached_actors_set]( - const std::unordered_map &result) { - if (!result.empty()) { - std::vector checkpoints; - std::vector checkpoint_ids; - for (auto &item : result) { - if (non_detached_actors_set.find(item.first) != - non_detached_actors_set.end()) { - checkpoints.push_back(item.first); - for (auto &id : item.second.checkpoint_ids()) { - checkpoint_ids.push_back(ActorCheckpointID::FromBinary(id)); - } - } - } - - RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointIdTable().BatchDelete( - checkpoints, nullptr)); - RAY_CHECK_OK(gcs_table_storage_->ActorCheckpointTable().BatchDelete( - checkpoint_ids, nullptr)); - } - })); + }; } }; @@ -1154,12 +1019,6 @@ std::string GcsActorManager::DebugString() const { << counts_[CountType::REGISTER_ACTOR_INFO_REQUEST] << ", UpdateActorInfo request count: " << counts_[CountType::UPDATE_ACTOR_INFO_REQUEST] - << ", AddActorCheckpoint request count: " - << counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST] - << ", GetActorCheckpoint request count: " - << counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST] - << ", GetActorCheckpointID request count: " - << counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST] << ", Registered actors count: " << registered_actors_.size() << ", Destroyed actors count: " << destroyed_actors_.size() << ", Named actors count: " << named_actors_.size() diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index ecc6f279a..c2f23ac2d 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -199,18 +199,6 @@ class GcsActorManager : public rpc::ActorInfoHandler { rpc::UpdateActorInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - void HandleAddActorCheckpoint(const rpc::AddActorCheckpointRequest &request, - rpc::AddActorCheckpointReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - - void HandleGetActorCheckpoint(const rpc::GetActorCheckpointRequest &request, - rpc::GetActorCheckpointReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - - void HandleGetActorCheckpointID(const rpc::GetActorCheckpointIDRequest &request, - rpc::GetActorCheckpointIDReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Register actor asynchronously. /// /// \param request Contains the meta info to create the actor. @@ -426,9 +414,6 @@ class GcsActorManager : public rpc::ActorInfoHandler { GET_ALL_ACTOR_INFO_REQUEST = 4, REGISTER_ACTOR_INFO_REQUEST = 5, UPDATE_ACTOR_INFO_REQUEST = 6, - ADD_ACTOR_CHECKPOINT_REQUEST = 7, - GET_ACTOR_CHECKPOINT_REQUEST = 8, - GET_ACTOR_CHECKPOINT_ID_REQUEST = 9, CountType_MAX = 10, }; uint64_t counts_[CountType::CountType_MAX] = {0}; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 93cb7cc21..41a914272 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -135,15 +135,12 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; -template class GcsTable; -template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTableWithJobId; -template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 7cbd8499f..6378c161b 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -23,8 +23,6 @@ namespace ray { namespace gcs { -using rpc::ActorCheckpointData; -using rpc::ActorCheckpointIdData; using rpc::ActorTableData; using rpc::ErrorTableData; using rpc::GcsNodeInfo; @@ -180,26 +178,6 @@ class GcsPlacementGroupTable } }; -class GcsActorCheckpointTable : public GcsTable { - public: - explicit GcsActorCheckpointTable(std::shared_ptr &store_client) - : GcsTable(store_client) { - table_name_ = TablePrefix_Name(TablePrefix::ACTOR_CHECKPOINT); - } -}; - -class GcsActorCheckpointIdTable - : public GcsTableWithJobId { - public: - explicit GcsActorCheckpointIdTable(std::shared_ptr &store_client) - : GcsTableWithJobId(store_client) { - table_name_ = TablePrefix_Name(TablePrefix::ACTOR_CHECKPOINT_ID); - } - - private: - JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } -}; - class GcsTaskTable : public GcsTableWithJobId { public: explicit GcsTaskTable(std::shared_ptr &store_client) @@ -330,16 +308,6 @@ class GcsTableStorage { return *placement_group_table_; } - GcsActorCheckpointTable &ActorCheckpointTable() { - RAY_CHECK(actor_checkpoint_table_ != nullptr); - return *actor_checkpoint_table_; - } - - GcsActorCheckpointIdTable &ActorCheckpointIdTable() { - RAY_CHECK(actor_checkpoint_id_table_ != nullptr); - return *actor_checkpoint_id_table_; - } - GcsTaskTable &TaskTable() { RAY_CHECK(task_table_ != nullptr); return *task_table_; @@ -405,8 +373,6 @@ class GcsTableStorage { std::unique_ptr job_table_; std::unique_ptr actor_table_; std::unique_ptr placement_group_table_; - std::unique_ptr actor_checkpoint_table_; - std::unique_ptr actor_checkpoint_id_table_; std::unique_ptr task_table_; std::unique_ptr task_lease_table_; std::unique_ptr task_reconstruction_table_; @@ -431,8 +397,6 @@ class RedisGcsTableStorage : public GcsTableStorage { job_table_.reset(new GcsJobTable(store_client_)); actor_table_.reset(new GcsActorTable(store_client_)); placement_group_table_.reset(new GcsPlacementGroupTable(store_client_)); - actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_)); - actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_)); task_table_.reset(new GcsTaskTable(store_client_)); task_lease_table_.reset(new GcsTaskLeaseTable(store_client_)); task_reconstruction_table_.reset(new GcsTaskReconstructionTable(store_client_)); @@ -461,8 +425,6 @@ class InMemoryGcsTableStorage : public GcsTableStorage { job_table_.reset(new GcsJobTable(store_client_)); actor_table_.reset(new GcsActorTable(store_client_)); placement_group_table_.reset(new GcsPlacementGroupTable(store_client_)); - actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_)); - actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_)); task_table_.reset(new GcsTaskTable(store_client_)); task_lease_table_.reset(new GcsTaskLeaseTable(store_client_)); task_reconstruction_table_.reset(new GcsTaskReconstructionTable(store_client_)); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index d7fd8636a..c2f4657e0 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -122,60 +122,6 @@ class GcsServerTest : public ::testing::Test { return actor_table_data_opt; } - bool AddActorCheckpoint(const rpc::AddActorCheckpointRequest &request) { - std::promise promise; - client_->AddActorCheckpoint( - request, - [&promise](const Status &status, const rpc::AddActorCheckpointReply &reply) { - RAY_CHECK_OK(status); - promise.set_value(true); - }); - return WaitReady(promise.get_future(), timeout_ms_); - } - - boost::optional GetActorCheckpoint( - const std::string &actor_id, const std::string &checkpoint_id) { - rpc::GetActorCheckpointRequest request; - request.set_actor_id(actor_id); - request.set_checkpoint_id(checkpoint_id); - boost::optional checkpoint_data_opt; - std::promise promise; - client_->GetActorCheckpoint( - request, [&checkpoint_data_opt, &promise]( - const Status &status, const rpc::GetActorCheckpointReply &reply) { - RAY_CHECK_OK(status); - if (reply.has_checkpoint_data()) { - checkpoint_data_opt = reply.checkpoint_data(); - } else { - checkpoint_data_opt = boost::none; - } - promise.set_value(true); - }); - EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); - return checkpoint_data_opt; - } - - boost::optional GetActorCheckpointID( - const std::string &actor_id) { - rpc::GetActorCheckpointIDRequest request; - request.set_actor_id(actor_id); - boost::optional checkpoint_id_data_opt; - std::promise promise; - client_->GetActorCheckpointID( - request, [&checkpoint_id_data_opt, &promise]( - const Status &status, const rpc::GetActorCheckpointIDReply &reply) { - RAY_CHECK_OK(status); - if (reply.has_checkpoint_id_data()) { - checkpoint_id_data_opt = reply.checkpoint_id_data(); - } else { - checkpoint_id_data_opt = boost::none; - } - promise.set_value(true); - }); - EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); - return checkpoint_id_data_opt; - } - bool RegisterNode(const rpc::RegisterNodeRequest &request) { std::promise promise; client_->RegisterNode( @@ -451,25 +397,7 @@ TEST_F(GcsServerTest, TestActorInfo) { // Create actor_table_data JobID job_id = JobID::FromInt(1); auto actor_table_data = Mocker::GenActorTableData(job_id); - - // Add actor checkpoint - ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); - rpc::ActorCheckpointData checkpoint; - checkpoint.set_actor_id(actor_table_data->actor_id()); - checkpoint.set_checkpoint_id(checkpoint_id.Binary()); - checkpoint.set_execution_dependency(checkpoint_id.Binary()); - - rpc::AddActorCheckpointRequest add_actor_checkpoint_request; - add_actor_checkpoint_request.mutable_checkpoint_data()->CopyFrom(checkpoint); - ASSERT_TRUE(AddActorCheckpoint(add_actor_checkpoint_request)); - boost::optional checkpoint_result = - GetActorCheckpoint(actor_table_data->actor_id(), checkpoint_id.Binary()); - ASSERT_TRUE(checkpoint_result->actor_id() == actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_result->checkpoint_id() == checkpoint_id.Binary()); - boost::optional checkpoint_id_result = - GetActorCheckpointID(actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_id_result->actor_id() == actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_id_result->checkpoint_ids_size() == 1); + // TODO(sand): Add tests that don't require checkponit. } TEST_F(GcsServerTest, TestJobInfo) { @@ -498,54 +426,12 @@ TEST_F(GcsServerTest, TestJobGarbageCollection) { add_job_request.mutable_data()->CopyFrom(*job_table_data); ASSERT_TRUE(AddJob(add_job_request)); - // Add actor checkpoint auto actor_table_data = Mocker::GenActorTableData(job_id); - ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); - rpc::ActorCheckpointData checkpoint; - checkpoint.set_actor_id(actor_table_data->actor_id()); - checkpoint.set_checkpoint_id(checkpoint_id.Binary()); - checkpoint.set_execution_dependency(checkpoint_id.Binary()); - - rpc::AddActorCheckpointRequest add_actor_checkpoint_request; - add_actor_checkpoint_request.mutable_checkpoint_data()->CopyFrom(checkpoint); - ASSERT_TRUE(AddActorCheckpoint(add_actor_checkpoint_request)); - boost::optional checkpoint_result = - GetActorCheckpoint(actor_table_data->actor_id(), checkpoint_id.Binary()); - ASSERT_TRUE(checkpoint_result->actor_id() == actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_result->checkpoint_id() == checkpoint_id.Binary()); - boost::optional checkpoint_id_result = - GetActorCheckpointID(actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_id_result->actor_id() == actor_table_data->actor_id()); - ASSERT_TRUE(checkpoint_id_result->checkpoint_ids_size() == 1); // Register detached actor for job auto detached_actor_table_data = Mocker::GenActorTableData(job_id); detached_actor_table_data->set_is_detached(true); - // Add checkpoint for detached actor - ActorCheckpointID detached_checkpoint_id = ActorCheckpointID::FromRandom(); - rpc::ActorCheckpointData detached_checkpoint; - detached_checkpoint.set_actor_id(detached_actor_table_data->actor_id()); - detached_checkpoint.set_checkpoint_id(detached_checkpoint_id.Binary()); - detached_checkpoint.set_execution_dependency(detached_checkpoint_id.Binary()); - - rpc::AddActorCheckpointRequest add_detached_actor_checkpoint_request; - add_detached_actor_checkpoint_request.mutable_checkpoint_data()->CopyFrom( - detached_checkpoint); - ASSERT_TRUE(AddActorCheckpoint(add_detached_actor_checkpoint_request)); - boost::optional detached_checkpoint_result = - GetActorCheckpoint(detached_actor_table_data->actor_id(), - detached_checkpoint_id.Binary()); - ASSERT_TRUE(detached_checkpoint_result->actor_id() == - detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_result->checkpoint_id() == - detached_checkpoint_id.Binary()); - boost::optional detached_checkpoint_id_result = - GetActorCheckpointID(detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_id_result->actor_id() == - detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_id_result->checkpoint_ids_size() == 1); - // Mark job finished rpc::MarkJobFinishedRequest mark_job_finished_request; mark_job_finished_request.set_job_id(job_table_data->job_id()); @@ -555,19 +441,6 @@ TEST_F(GcsServerTest, TestJobGarbageCollection) { return !GetActorInfo(actor_table_data->actor_id()).has_value(); }; ASSERT_TRUE(WaitForCondition(condition_func, 10 * 1000)); - - detached_checkpoint_result = GetActorCheckpoint(detached_actor_table_data->actor_id(), - detached_checkpoint_id.Binary()); - ASSERT_TRUE(detached_checkpoint_result->actor_id() == - detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_result->checkpoint_id() == - detached_checkpoint_id.Binary()); - - detached_checkpoint_id_result = - GetActorCheckpointID(detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_id_result->actor_id() == - detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_checkpoint_id_result->checkpoint_ids_size() == 1); } TEST_F(GcsServerTest, TestNodeInfo) { diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 3157d40fc..b048e32ad 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -101,79 +101,6 @@ Status RedisLogBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) return log_based_actor_sub_executor_.AsyncUnsubscribe(subscribe_id_, actor_id, nullptr); } -Status RedisLogBasedActorInfoAccessor::AsyncAddCheckpoint( - const std::shared_ptr &data_ptr, - const StatusCallback &callback) { - ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id()); - auto on_add_data_done = [actor_id, callback, data_ptr, this]( - RedisGcsClient *client, - const ActorCheckpointID &checkpoint_id, - const ActorCheckpointData &data) { - Status status = AsyncAddCheckpointID(actor_id, checkpoint_id, callback); - if (!status.ok()) { - callback(status); - } - }; - - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(data_ptr->checkpoint_id()); - ActorCheckpointTable &actor_cp_table = client_impl_->actor_checkpoint_table(); - return actor_cp_table.Add(actor_id.JobId(), checkpoint_id, data_ptr, on_add_data_done); -} - -Status RedisLogBasedActorInfoAccessor::AsyncGetCheckpoint( - const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, - const OptionalItemCallback &callback) { - RAY_CHECK(callback != nullptr); - auto on_success = [callback](RedisGcsClient *client, - const ActorCheckpointID &checkpoint_id, - const ActorCheckpointData &checkpoint_data) { - boost::optional optional(checkpoint_data); - callback(Status::OK(), std::move(optional)); - }; - - auto on_failure = [callback](RedisGcsClient *client, - const ActorCheckpointID &checkpoint_id) { - boost::optional optional; - callback(Status::Invalid("Invalid checkpoint id."), std::move(optional)); - }; - - ActorCheckpointTable &actor_cp_table = client_impl_->actor_checkpoint_table(); - return actor_cp_table.Lookup(actor_id.JobId(), checkpoint_id, on_success, on_failure); -} - -Status RedisLogBasedActorInfoAccessor::AsyncGetCheckpointID( - const ActorID &actor_id, - const OptionalItemCallback &callback) { - RAY_CHECK(callback != nullptr); - auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id, - const ActorCheckpointIdData &data) { - boost::optional optional(data); - callback(Status::OK(), std::move(optional)); - }; - - auto on_failure = [callback](RedisGcsClient *client, const ActorID &actor_id) { - boost::optional optional; - callback(Status::Invalid("Checkpoint not found."), std::move(optional)); - }; - - ActorCheckpointIdTable &cp_id_table = client_impl_->actor_checkpoint_id_table(); - return cp_id_table.Lookup(actor_id.JobId(), actor_id, on_success, on_failure); -} - -Status RedisLogBasedActorInfoAccessor::AsyncAddCheckpointID( - const ActorID &actor_id, const ActorCheckpointID &checkpoint_id, - const StatusCallback &callback) { - ActorCheckpointIdTable::WriteCallback on_done = nullptr; - if (callback != nullptr) { - on_done = [callback](RedisGcsClient *client, const ActorID &actor_id, - const ActorCheckpointIdData &data) { callback(Status::OK()); }; - } - - ActorCheckpointIdTable &cp_id_table = client_impl_->actor_checkpoint_id_table(); - return cp_id_table.AddCheckpointId(actor_id.JobId(), actor_id, checkpoint_id, on_done); -} - RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl) : RedisLogBasedActorInfoAccessor(client_impl), actor_sub_executor_(client_impl_->actor_table()) {} diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 4e851e8a2..baa63514b 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -67,17 +67,6 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncUnsubscribe(const ActorID &actor_id) override; - Status AsyncAddCheckpoint(const std::shared_ptr &data_ptr, - const StatusCallback &callback) override; - - Status AsyncGetCheckpoint( - const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, - const OptionalItemCallback &callback) override; - - Status AsyncGetCheckpointID( - const ActorID &actor_id, - const OptionalItemCallback &callback) override; - void AsyncResubscribe(bool is_pubsub_server_restarted) override {} bool IsActorUnsubscribed(const ActorID &actor_id) override { return false; } @@ -86,17 +75,6 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor { virtual std::vector GetAllActorID() const; virtual Status Get(const ActorID &actor_id, ActorTableData *actor_table_data) const; - private: - /// Add checkpoint id to GCS asynchronously. - /// - /// \param actor_id The ID of actor that the checkpoint belongs to. - /// \param checkpoint_id The ID of checkpoint that will be added to GCS. - /// \return Status - Status AsyncAddCheckpointID(const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id, - const StatusCallback &callback); - - protected: RedisGcsClient *client_impl_{nullptr}; // Use a random NodeID for actor subscription. Because: // If we use NodeID::Nil, GCS will still send all actors' updates to this GCS Client. diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index f60056803..35fd0506e 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -63,8 +63,6 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { task_lease_table_.reset(new TaskLeaseTable(shard_contexts, this)); heartbeat_table_.reset(new HeartbeatTable(shard_contexts, this)); profile_table_.reset(new ProfileTable(shard_contexts, this)); - actor_checkpoint_table_.reset(new ActorCheckpointTable(shard_contexts, this)); - actor_checkpoint_id_table_.reset(new ActorCheckpointIdTable(shard_contexts, this)); resource_table_.reset(new DynamicResourceTable({primary_context}, this)); worker_table_.reset(new WorkerTable(shard_contexts, this)); @@ -138,14 +136,6 @@ JobTable &RedisGcsClient::job_table() { return *job_table_; } ProfileTable &RedisGcsClient::profile_table() { return *profile_table_; } -ActorCheckpointTable &RedisGcsClient::actor_checkpoint_table() { - return *actor_checkpoint_table_; -} - -ActorCheckpointIdTable &RedisGcsClient::actor_checkpoint_id_table() { - return *actor_checkpoint_id_table_; -} - DynamicResourceTable &RedisGcsClient::resource_table() { return *resource_table_; } } // namespace gcs diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 2db39db06..7d14826ba 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -86,8 +86,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { /// Implements the Actors() interface. LogBasedActorTable &log_based_actor_table(); ActorTable &actor_table(); - ActorCheckpointTable &actor_checkpoint_table(); - ActorCheckpointIdTable &actor_checkpoint_id_table(); /// Implements the Jobs() interface. JobTable &job_table(); /// Implements the Objects() interface. @@ -123,8 +121,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { std::unique_ptr heartbeat_batch_table_; std::unique_ptr profile_table_; std::unique_ptr node_table_; - std::unique_ptr actor_checkpoint_table_; - std::unique_ptr actor_checkpoint_id_table_; std::unique_ptr resource_table_; std::unique_ptr worker_table_; std::unique_ptr job_table_; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 6784de75b..17ad126fd 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -821,41 +821,6 @@ Status ActorTable::Get(const ray::ActorID &actor_id, return Status::OK(); } -Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id, - const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id, - const WriteCallback &done) { - auto lookup_callback = [this, checkpoint_id, job_id, actor_id, done]( - ray::gcs::RedisGcsClient *client, const ActorID &id, - const ActorCheckpointIdData &data) { - std::shared_ptr copy = - std::make_shared(data); - copy->add_timestamps(absl::GetCurrentTimeNanos() / 1000000); - copy->add_checkpoint_ids(checkpoint_id.Binary()); - // TODO(swang): This is a temporary value while we deprecate the actor - // checkpoint table. - auto num_to_keep = 20; - while (copy->timestamps().size() > num_to_keep) { - // Delete the checkpoint from actor checkpoint table. - const auto &to_delete = ActorCheckpointID::FromBinary(copy->checkpoint_ids(0)); - copy->mutable_checkpoint_ids()->erase(copy->mutable_checkpoint_ids()->begin()); - copy->mutable_timestamps()->erase(copy->mutable_timestamps()->begin()); - client_->actor_checkpoint_table().Delete(job_id, to_delete); - } - RAY_CHECK_OK(Add(job_id, actor_id, copy, done)); - }; - auto failure_callback = [this, checkpoint_id, job_id, actor_id, done]( - ray::gcs::RedisGcsClient *client, const ActorID &id) { - std::shared_ptr data = - std::make_shared(); - data->set_actor_id(id.Binary()); - data->add_timestamps(absl::GetCurrentTimeNanos() / 1000000); - *data->add_checkpoint_ids() = checkpoint_id.Binary(); - RAY_CHECK_OK(Add(job_id, actor_id, data, done)); - }; - return Lookup(job_id, actor_id, lookup_callback, failure_callback); -} - template class Log; template class Set; template class Log; @@ -871,8 +836,6 @@ template class Log; template class Log; template class Log; template class Log; -template class Table; -template class Table; template class Table; template class Table; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 6084de3be..11683e6f4 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -34,8 +34,6 @@ namespace ray { namespace gcs { -using rpc::ActorCheckpointData; -using rpc::ActorCheckpointIdData; using rpc::ActorTableData; using rpc::ErrorTableData; using rpc::GcsChangeMode; @@ -810,35 +808,6 @@ class TaskLeaseTable : public Table { const SubscriptionCallback &done); }; -class ActorCheckpointTable : public Table { - public: - ActorCheckpointTable(const std::vector> &contexts, - RedisGcsClient *client) - : Table(contexts, client) { - prefix_ = TablePrefix::ACTOR_CHECKPOINT; - }; -}; - -class ActorCheckpointIdTable : public Table { - public: - ActorCheckpointIdTable(const std::vector> &contexts, - RedisGcsClient *client) - : Table(contexts, client) { - prefix_ = TablePrefix::ACTOR_CHECKPOINT_ID; - }; - - /// Add a checkpoint id to an actor, and remove a previous checkpoint if the - /// total number of checkpoints in GCS exceeds the max allowed value. - /// - /// \param job_id The ID of the job. - /// \param actor_id ID of the actor. - /// \param checkpoint_id ID of the checkpoint. - /// \return Status. - Status AddCheckpointId(const JobID &job_id, const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id, - const WriteCallback &done); -}; - namespace raylet { class TaskTable : public Table { diff --git a/src/ray/gcs/test/redis_actor_info_accessor_test.cc b/src/ray/gcs/test/redis_actor_info_accessor_test.cc index 2f2700f7a..49f474621 100644 --- a/src/ray/gcs/test/redis_actor_info_accessor_test.cc +++ b/src/ray/gcs/test/redis_actor_info_accessor_test.cc @@ -41,27 +41,8 @@ class ActorInfoAccessorTest : public AccessorTestBase { actor->set_actor_id(actor_id.Binary()); id_to_data_[actor_id] = actor; } - GenCheckpointData(); } - void GenCheckpointData() { - for (const auto &item : id_to_data_) { - const ActorID &id = item.first; - ActorCheckpointList checkpoints; - for (size_t i = 0; i < checkpoint_number_; ++i) { - ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); - auto checkpoint = std::make_shared(); - checkpoint->set_actor_id(id.Binary()); - checkpoint->set_checkpoint_id(checkpoint_id.Binary()); - checkpoint->set_execution_dependency(checkpoint_id.Binary()); - checkpoints.emplace_back(checkpoint); - } - id_to_checkpoints_[id] = std::move(checkpoints); - } - } - - typedef std::vector> ActorCheckpointList; - std::unordered_map id_to_checkpoints_; size_t checkpoint_number_{2}; }; @@ -87,67 +68,6 @@ TEST_F(ActorInfoAccessorTest, Subscribe) { WaitPendingDone(do_sub_pending_count, wait_pending_timeout_); } -TEST_F(ActorInfoAccessorTest, GetActorCheckpointTest) { - ActorInfoAccessor &actor_accessor = gcs_client_->Actors(); - auto on_add_done = [this](Status status) { - RAY_CHECK_OK(status); - --pending_count_; - }; - for (size_t index = 0; index < checkpoint_number_; ++index) { - for (const auto &actor_checkpoints : id_to_checkpoints_) { - const ActorCheckpointList &checkpoints = actor_checkpoints.second; - const auto &checkpoint = checkpoints[index]; - ++pending_count_; - Status status = actor_accessor.AsyncAddCheckpoint(checkpoint, on_add_done); - RAY_CHECK_OK(status); - } - WaitPendingDone(wait_pending_timeout_); - } - - for (const auto &actor_checkpoints : id_to_checkpoints_) { - const ActorCheckpointList &checkpoints = actor_checkpoints.second; - for (const auto &checkpoint : checkpoints) { - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(checkpoint->checkpoint_id()); - auto on_get_done = [this, checkpoint_id]( - Status status, - const boost::optional &result) { - RAY_CHECK(result); - ActorCheckpointID result_checkpoint_id = - ActorCheckpointID::FromBinary(result->checkpoint_id()); - ASSERT_EQ(checkpoint_id, result_checkpoint_id); - --pending_count_; - }; - ++pending_count_; - Status status = actor_accessor.AsyncGetCheckpoint( - checkpoint_id, ActorID::FromBinary(checkpoint->actor_id()), on_get_done); - RAY_CHECK_OK(status); - } - } - WaitPendingDone(wait_pending_timeout_); - - for (const auto &actor_checkpoints : id_to_checkpoints_) { - const ActorID &actor_id = actor_checkpoints.first; - const ActorCheckpointList &checkpoints = actor_checkpoints.second; - auto on_get_done = [this, &checkpoints]( - Status status, - const boost::optional &result) { - RAY_CHECK(result); - ASSERT_EQ(checkpoints.size(), result->checkpoint_ids_size()); - for (size_t i = 0; i < checkpoints.size(); ++i) { - const std::string checkpoint_id_str = checkpoints[i]->checkpoint_id(); - const std::string &result_checkpoint_id_str = result->checkpoint_ids(i); - ASSERT_EQ(checkpoint_id_str, result_checkpoint_id_str); - } - --pending_count_; - }; - ++pending_count_; - Status status = actor_accessor.AsyncGetCheckpointID(actor_id, on_get_done); - RAY_CHECK_OK(status); - } - WaitPendingDone(wait_pending_timeout_); -} - } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 807973123..580623185 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -36,8 +36,6 @@ enum TablePrefix { JOB = 11; PROFILE = 12; TASK_LEASE = 13; - ACTOR_CHECKPOINT = 14; - ACTOR_CHECKPOINT_ID = 15; NODE_RESOURCE = 16; DIRECT_ACTOR = 17; // WORKER is already used in WorkerType, so use WORKERS here. @@ -394,38 +392,6 @@ message JobTableData { JobConfig config = 6; } -// This table stores the actor checkpoint data. An actor checkpoint -// is the snapshot of an actor's state in the actor registration. -// See `actor_registration.h` for more detailed explanation of these fields. -message ActorCheckpointData { - // ID of this checkpoint. - bytes checkpoint_id = 1; - // ID of this actor. - bytes actor_id = 2; - // The dummy object ID of actor's most recently executed task. - bytes execution_dependency = 3; - // A list of IDs of this actor's handles. - repeated bytes handle_ids = 4; - // The task counters of the above handles. - repeated uint64 task_counters = 5; - // The frontier dependencies of the above handles. - repeated bytes frontier_dependencies = 6; - // A list of unreleased dummy objects from this actor. - repeated bytes unreleased_dummy_objects = 7; - // The numbers of dependencies for the above unreleased dummy objects. - repeated uint32 num_dummy_object_dependencies = 8; -} - -// This table stores the actor-to-available-checkpoint-ids mapping. -message ActorCheckpointIdData { - // ID of this actor. - bytes actor_id = 1; - // IDs of this actor's available checkpoints. - repeated bytes checkpoint_ids = 2; - // A list of the timestamps for each of the above `checkpoint_ids`. - repeated uint64 timestamps = 3; -} - message WorkerTableData { // Is this worker alive. bool is_alive = 1; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 646c1ccb9..a68264359 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -112,33 +112,6 @@ message UpdateActorInfoReply { GcsStatus status = 1; } -message AddActorCheckpointRequest { - ActorCheckpointData checkpoint_data = 1; -} - -message AddActorCheckpointReply { - GcsStatus status = 1; -} - -message GetActorCheckpointRequest { - bytes actor_id = 1; - bytes checkpoint_id = 2; -} - -message GetActorCheckpointReply { - GcsStatus status = 1; - ActorCheckpointData checkpoint_data = 2; -} - -message GetActorCheckpointIDRequest { - bytes actor_id = 1; -} - -message GetActorCheckpointIDReply { - GcsStatus status = 1; - ActorCheckpointIdData checkpoint_id_data = 2; -} - // Service for actor info access. service ActorInfoGcsService { // Register actor to gcs service. @@ -155,13 +128,6 @@ service ActorInfoGcsService { rpc RegisterActorInfo(RegisterActorInfoRequest) returns (RegisterActorInfoReply); // Update actor info in GCS Service. rpc UpdateActorInfo(UpdateActorInfoRequest) returns (UpdateActorInfoReply); - // Add actor checkpoint data to GCS Service. - rpc AddActorCheckpoint(AddActorCheckpointRequest) returns (AddActorCheckpointReply); - // Get actor checkpoint data from GCS Service. - rpc GetActorCheckpoint(GetActorCheckpointRequest) returns (GetActorCheckpointReply); - // Get actor checkpoint id data from GCS Service. - rpc GetActorCheckpointID(GetActorCheckpointIDRequest) - returns (GetActorCheckpointIDReply); } message RegisterNodeRequest { diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc deleted file mode 100644 index 5863cd91c..000000000 --- a/src/ray/raylet/actor_registration.cc +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/raylet/actor_registration.h" - -#include - -#include "ray/util/logging.h" - -namespace ray { - -namespace raylet { - -ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data) - : actor_table_data_(actor_table_data) { - // The first task submitted on each new actor handle will depend on the actor - // creation object, so we always pin it. - dummy_objects_[GetActorCreationDependency()]++; -} - -ActorRegistration::ActorRegistration(const ActorTableData &actor_table_data, - const ActorCheckpointData &checkpoint_data) - : actor_table_data_(actor_table_data), - execution_dependency_( - ObjectID::FromBinary(checkpoint_data.execution_dependency())) { - // Restore `frontier_`. - for (int64_t i = 0; i < checkpoint_data.handle_ids_size(); i++) { - auto caller_id = TaskID::FromBinary(checkpoint_data.handle_ids(i)); - auto &frontier_entry = frontier_[caller_id]; - frontier_entry.task_counter = checkpoint_data.task_counters(i); - frontier_entry.execution_dependency = - ObjectID::FromBinary(checkpoint_data.frontier_dependencies(i)); - } - // Restore `dummy_objects_`. - for (int64_t i = 0; i < checkpoint_data.unreleased_dummy_objects_size(); i++) { - auto dummy = ObjectID::FromBinary(checkpoint_data.unreleased_dummy_objects(i)); - dummy_objects_[dummy] = checkpoint_data.num_dummy_object_dependencies(i); - } -} - -const NodeID ActorRegistration::GetNodeManagerId() const { - return NodeID::FromBinary(actor_table_data_.address().raylet_id()); -} - -const ObjectID ActorRegistration::GetActorCreationDependency() const { - return ObjectID::FromBinary(actor_table_data_.actor_creation_dummy_object_id()); -} - -const ObjectID ActorRegistration::GetExecutionDependency() const { - return execution_dependency_; -} - -const JobID ActorRegistration::GetJobId() const { - return JobID::FromBinary(actor_table_data_.job_id()); -} - -const int64_t ActorRegistration::GetMaxRestarts() const { - return actor_table_data_.max_restarts(); -} - -const int64_t ActorRegistration::GetRemainingRestarts() const { - if (actor_table_data_.max_restarts() == -1) { - return -1; - } - return actor_table_data_.max_restarts() - actor_table_data_.num_restarts(); -} - -const uint64_t ActorRegistration::GetNumRestarts() const { - return actor_table_data_.num_restarts(); -} - -const std::unordered_map - &ActorRegistration::GetFrontier() const { - return frontier_; -} - -ObjectID ActorRegistration::ExtendFrontier(const TaskID &caller_id, - const ObjectID &execution_dependency) { - auto &frontier_entry = frontier_[caller_id]; - // Release the reference to the previous cursor for this - // actor handle, if there was one. - ObjectID object_to_release; - if (!frontier_entry.execution_dependency.IsNil()) { - auto it = dummy_objects_.find(frontier_entry.execution_dependency); - RAY_CHECK(it != dummy_objects_.end()); - it->second--; - RAY_CHECK(it->second >= 0); - if (it->second == 0) { - object_to_release = frontier_entry.execution_dependency; - dummy_objects_.erase(it); - } - } - - frontier_entry.task_counter++; - frontier_entry.execution_dependency = execution_dependency; - execution_dependency_ = execution_dependency; - // Add the reference to the new cursor for this actor handle. - dummy_objects_[execution_dependency]++; - return object_to_release; -} - -int ActorRegistration::NumHandles() const { return frontier_.size(); } - -std::shared_ptr ActorRegistration::GenerateCheckpointData( - const ActorID &actor_id, const Task *task) { - // Make a copy of the actor registration - ActorRegistration copy = *this; - if (task) { - const auto actor_caller_id = task->GetTaskSpecification().CallerId(); - const auto dummy_object = task->GetTaskSpecification().ActorDummyObject(); - // Extend its frontier to include the most recent task. - // NOTE(hchen): For non-direct-call actors, this is needed because this method is - // called before `FinishAssignedTask`, which will be called when the worker tries to - // fetch the next task. For direct-call actors, checkpoint data doesn't contain - // frontier info, so we don't need to do `ExtendFrontier` here. - copy.ExtendFrontier(actor_caller_id, dummy_object); - } - - // Use actor's current state to generate checkpoint data. - auto checkpoint_data = std::make_shared(); - checkpoint_data->set_actor_id(actor_id.Binary()); - checkpoint_data->set_execution_dependency(copy.GetExecutionDependency().Binary()); - for (const auto &frontier : copy.GetFrontier()) { - checkpoint_data->add_handle_ids(frontier.first.Binary()); - checkpoint_data->add_task_counters(frontier.second.task_counter); - checkpoint_data->add_frontier_dependencies( - frontier.second.execution_dependency.Binary()); - } - for (const auto &entry : copy.GetDummyObjects()) { - checkpoint_data->add_unreleased_dummy_objects(entry.first.Binary()); - checkpoint_data->add_num_dummy_object_dependencies(entry.second); - } - - ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); - checkpoint_data->set_checkpoint_id(checkpoint_id.Binary()); - return checkpoint_data; -} - -} // namespace raylet - -} // namespace ray diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h deleted file mode 100644 index 8a2895173..000000000 --- a/src/ray/raylet/actor_registration.h +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include "ray/common/id.h" -#include "ray/common/task/task.h" -#include "src/ray/protobuf/gcs.pb.h" - -namespace ray { - -namespace raylet { - -using rpc::ActorTableData; -using ActorState = rpc::ActorTableData::ActorState; -using rpc::ActorCheckpointData; - -/// \class ActorRegistration -/// -/// Information about an actor registered in the system. This includes the -/// actor's current node manager location, and if local, information about its -/// current execution state, used for reconstruction purposes, and whether the -/// actor is currently alive or not. -class ActorRegistration { - public: - /// Create an actor registration. - /// - /// \param actor_table_data Information from the global actor table about - /// this actor. This includes the actor's node manager location. - explicit ActorRegistration(const ActorTableData &actor_table_data); - - /// Recreate an actor's registration from a checkpoint. - /// - /// \param checkpoint_data The checkpoint used to restore the actor. - ActorRegistration(const ActorTableData &actor_table_data, - const ActorCheckpointData &checkpoint_data); - - /// Each actor may have multiple callers, or "handles". A frontier leaf - /// represents the execution state of the actor with respect to a single - /// handle. - struct FrontierLeaf { - /// The number of tasks submitted by this handle that have executed on the - /// actor so far. - int64_t task_counter; - /// The execution dependency returned by the task submitted by this handle - /// that most recently executed on the actor. - ObjectID execution_dependency; - }; - - /// Get the actor table data. - /// - /// \return The actor table data. - const ActorTableData &GetTableData() const { return actor_table_data_; } - - /// Get the actor's current state (ALIVE or DEAD). - /// - /// \return The actor's current state. - const ActorState GetState() const { return actor_table_data_.state(); } - - /// Update actor's state. - void SetState(const ActorState &state) { actor_table_data_.set_state(state); } - - /// Get the actor's node manager location. - /// - /// \return The actor's node manager location. All tasks for the actor should - /// be forwarded to this node. - const NodeID GetNodeManagerId() const; - - /// Get the object that represents the actor's initial state. This is the - /// execution dependency returned by this actor's creation task. If - /// restarted, this will recreate the actor. - /// - /// \return The execution dependency returned by the actor's creation task. - const ObjectID GetActorCreationDependency() const; - - /// Get actor's job ID. - const JobID GetJobId() const; - - /// Get the max number of times this actor should be restarted. - const int64_t GetMaxRestarts() const; - - /// Get the remaining number of times this actor should be restarted. - const int64_t GetRemainingRestarts() const; - - /// Get the number of times this actor has already been restarted - const uint64_t GetNumRestarts() const; - - /// Get the object that represents the actor's current state. This is the - /// execution dependency returned by the task most recently executed on the - /// actor. The next task to execute on the actor should be marked as - /// execution-dependent on this object. - /// - /// \return The execution dependency returned by the most recently executed - /// task. - const ObjectID GetExecutionDependency() const; - - /// Get the execution frontier of the actor, indexed by handle. This captures - /// the execution state of the actor, a summary of which tasks have executed - /// so far. - /// - /// \return The actor frontier, a map from handle ID to execution state for - /// that handle. - const std::unordered_map &GetFrontier() const; - - /// Get all the dummy objects of this actor's tasks. - const std::unordered_map &GetDummyObjects() const { - return dummy_objects_; - } - - /// Extend the frontier of the actor by a single task. This should be called - /// whenever the actor executes a task. - /// - /// \param handle_id The ID of the handle that submitted the task. - /// \param execution_dependency The object representing the actor's new - /// state. This is the execution dependency returned by the task. - /// \return The dummy object that can be released as a result of the executed - /// task. If no dummy object can be released, then this is nil. - ObjectID ExtendFrontier(const TaskID &caller_id, const ObjectID &execution_dependency); - - /// Returns num handles to this actor entry. - /// - /// \return int. - int NumHandles() const; - - /// Generate checkpoint data based on actor's current state. - /// - /// \param actor_id ID of this actor. - /// \param task The task that just finished on the actor. (nullptr when it's direct - /// call.) - /// \return A shared pointer to the generated checkpoint data. - std::shared_ptr GenerateCheckpointData(const ActorID &actor_id, - const Task *task); - - private: - /// Information from the global actor table about this actor, including the - /// node manager location. - ActorTableData actor_table_data_; - /// The object representing the state following the actor's most recently - /// executed task. The next task to execute on the actor should be marked as - /// execution-dependent on this object. - ObjectID execution_dependency_; - /// The execution frontier of the actor, which represents which tasks have - /// executed so far and which tasks may execute next, based on execution - /// dependencies. This is indexed by handle. - std::unordered_map frontier_; - /// This map is used to track all the unreleased dummy objects for this - /// actor. The map key is the dummy object ID, and the map value is the - /// number of actor handles that depend on that dummy object. When the map - /// value decreases to 0, the dummy object is safe to release from the object - /// manager, since this means that no actor handle will depend on that dummy - /// object again. - /// - /// An actor handle depends on a dummy object when its next unfinished task - /// depends on the dummy object. For a given dummy object (say D) created by - /// task (say T) that was submitted by an actor handle (say H), there could - /// be 2 types of such actor handles: - /// 1. T is the last task submitted by H that was executed. If the next task - /// submitted by H hasn't finished yet, then H still depends on D since D - /// will be in the next task's execution dependencies. - /// 2. Any handles that were forked from H after T finished, and before T's - /// next task finishes. Such handles depend on D until their first tasks - /// finish since D will be their first tasks' execution dependencies. - std::unordered_map dummy_objects_; -}; - -} // namespace raylet - -} // namespace ray diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index ab212442a..162504eb5 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -70,12 +70,6 @@ enum MessageType:int { PushProfileEventsRequest, // Free the objects in objects store. FreeObjectsInObjectStoreRequest, - // Request raylet backend to prepare a checkpoint for an actor. - PrepareActorCheckpointRequest, - // Reply of `PrepareActorCheckpointRequest`. - PrepareActorCheckpointReply, - // Notify raylet backend that an actor was resumed from a checkpoint. - NotifyActorResumedFromCheckpoint, // A node manager requests to connect to another node manager. ConnectClient, // Set dynamic custom resource. @@ -262,23 +256,6 @@ table FreeObjectsRequest { object_ids: [string]; } -table PrepareActorCheckpointRequest { - // ID of the actor. - actor_id: string; -} - -table PrepareActorCheckpointReply { - // ID of the checkpoint. - checkpoint_id: string; -} - -table NotifyActorResumedFromCheckpoint { - // ID of the actor. - actor_id: string; - // ID of the checkpoint from which the actor was resumed. - checkpoint_id: string; -} - table ConnectClient { // ID of the connecting client. client_id: string; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index aab45cdcb..680393d49 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -150,7 +150,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self RayConfig::instance().object_timeout_milliseconds(), self_node_id_, gcs_client_, object_directory_), task_dependency_manager_(object_manager, reconstruction_policy_), - actor_registry_(), node_manager_server_("NodeManager", config.node_manager_port), node_manager_service_(io_service, *this), agent_manager_service_handler_( @@ -232,16 +231,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self } ray::Status NodeManager::RegisterGcs() { - // The TaskLease subscription is done on demand in reconstruction policy. - // Register a callback to handle actor notifications. - auto actor_notification_callback = [this](const ActorID &actor_id, - const ActorTableData &data) { - HandleActorStateTransition(actor_id, ActorRegistration(data)); - }; - - RAY_RETURN_NOT_OK( - gcs_client_->Actors().AsyncSubscribeAll(actor_notification_callback, nullptr)); - auto on_node_change = [this](const NodeID &node_id, const GcsNodeInfo &data) { if (data.state() == GcsNodeInfo::ALIVE) { NodeAdded(data); @@ -1009,30 +998,6 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_b } } -void NodeManager::HandleActorStateTransition(const ActorID &actor_id, - ActorRegistration &&actor_registration) { - // Update local registry. - auto it = actor_registry_.find(actor_id); - if (it == actor_registry_.end()) { - it = actor_registry_.emplace(actor_id, actor_registration).first; - } else { - it->second = actor_registration; - } - RAY_LOG(DEBUG) << "Actor notification received: actor_id = " << actor_id - << ", node_manager_id = " << actor_registration.GetNodeManagerId() - << ", state = " - << ActorTableData::ActorState_Name(actor_registration.GetState()) - << ", remaining_restarts = " - << actor_registration.GetRemainingRestarts(); - - if (actor_registration.GetState() == ActorTableData::ALIVE) { - // The actor is now alive (created for the first time or restarted). We can - // stop listening for the actor creation task. This is needed because we use - // `ListenAndMaybeReconstruct` to reconstruct the actor. - reconstruction_policy_.Cancel(actor_registration.GetActorCreationDependency()); - } -} - void NodeManager::ProcessNewClient(ClientConnection &client) { // The new client is a worker, so begin listening for messages. client.ProcessMessages(); @@ -1202,12 +1167,6 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & RAY_CHECK_OK(gcs_client_->Tasks().AsyncDelete(creating_task_ids, nullptr)); } } break; - case protocol::MessageType::PrepareActorCheckpointRequest: { - ProcessPrepareActorCheckpointRequest(client, message_data); - } break; - case protocol::MessageType::NotifyActorResumedFromCheckpoint: { - ProcessNotifyActorResumedFromCheckpoint(message_data); - } break; case protocol::MessageType::SubscribePlasmaReady: { ProcessSubscribePlasmaReady(client, message_data); } break; @@ -1622,58 +1581,6 @@ void NodeManager::ProcessPushErrorRequestMessage(const uint8_t *message_data) { RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } -void NodeManager::ProcessPrepareActorCheckpointRequest( - const std::shared_ptr &client, const uint8_t *message_data) { - auto message = - flatbuffers::GetRoot(message_data); - ActorID actor_id = from_flatbuf(*message->actor_id()); - RAY_LOG(DEBUG) << "Preparing checkpoint for actor " << actor_id; - const auto &actor_entry = actor_registry_.find(actor_id); - RAY_CHECK(actor_entry != actor_registry_.end()); - - std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - RAY_CHECK(worker && worker->GetActorId() == actor_id); - - std::shared_ptr checkpoint_data = - actor_entry->second.GenerateCheckpointData(actor_entry->first, nullptr); - - // Write checkpoint data to GCS. - RAY_CHECK_OK(gcs_client_->Actors().AsyncAddCheckpoint( - checkpoint_data, [worker, checkpoint_data](Status status) { - ActorCheckpointID checkpoint_id = - ActorCheckpointID::FromBinary(checkpoint_data->checkpoint_id()); - RAY_CHECK(status.ok()) << "Add checkpoint failed, actor is " - << worker->GetActorId() << " checkpoint_id is " - << checkpoint_id; - RAY_LOG(DEBUG) << "Checkpoint " << checkpoint_id << " saved for actor " - << worker->GetActorId(); - // Send reply to worker. - flatbuffers::FlatBufferBuilder fbb; - auto reply = ray::protocol::CreatePrepareActorCheckpointReply( - fbb, to_flatbuf(fbb, checkpoint_id)); - fbb.Finish(reply); - worker->Connection()->WriteMessageAsync( - static_cast(protocol::MessageType::PrepareActorCheckpointReply), - fbb.GetSize(), fbb.GetBufferPointer(), [](const ray::Status &status) { - if (!status.ok()) { - RAY_LOG(WARNING) - << "Failed to send PrepareActorCheckpointReply to client"; - } - }); - })); -} - -void NodeManager::ProcessNotifyActorResumedFromCheckpoint(const uint8_t *message_data) { - auto message = - flatbuffers::GetRoot(message_data); - ActorID actor_id = from_flatbuf(*message->actor_id()); - ActorCheckpointID checkpoint_id = - from_flatbuf(*message->checkpoint_id()); - RAY_LOG(DEBUG) << "Actor " << actor_id << " was resumed from checkpoint " - << checkpoint_id; - checkpoint_id_to_restore_.emplace(actor_id, checkpoint_id); -} - void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { // Read the task submitted by the client. auto fbs_message = flatbuffers::GetRoot(message_data); @@ -2195,12 +2102,6 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ const TaskSpecification &spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error " << ErrorType_Name(error_type) << "."; - // If this was an actor creation task that tried to resume from a checkpoint, - // then erase it here since the task did not finish. - if (spec.IsActorCreationTask()) { - ActorID actor_id = spec.ActorCreationId(); - checkpoint_id_to_restore_.erase(actor_id); - } // Loop over the return IDs (except the dummy ID) and store a fake object in // the object store. int64_t num_returns = spec.NumReturns(); @@ -2636,58 +2537,6 @@ bool NodeManager::FinishAssignedTask(const std::shared_ptr &wor return !spec.IsActorCreationTask(); } -std::shared_ptr NodeManager::CreateActorTableDataFromCreationTask( - const TaskSpecification &task_spec, int port, const WorkerID &worker_id) { - RAY_CHECK(task_spec.IsActorCreationTask()); - auto actor_id = task_spec.ActorCreationId(); - auto actor_entry = actor_registry_.find(actor_id); - std::shared_ptr actor_info_ptr; - // TODO(swang): If this is an actor that was restarted, and previous - // actor notifications were delayed, then this node may not have an entry for - // the actor in actor_regisry_. Then, the fields for the number of - // restarts will be wrong. - if (actor_entry == actor_registry_.end()) { - actor_info_ptr.reset(new ActorTableData()); - // Set all of the static fields for the actor. These fields will not - // change even if the actor fails or is restarted. - actor_info_ptr->set_actor_id(actor_id.Binary()); - actor_info_ptr->set_actor_creation_dummy_object_id( - task_spec.ActorDummyObject().Binary()); - actor_info_ptr->set_job_id(task_spec.JobId().Binary()); - actor_info_ptr->set_max_restarts(task_spec.MaxActorRestarts()); - actor_info_ptr->set_num_restarts(0); - actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); - actor_info_ptr->mutable_owner_address()->CopyFrom( - task_spec.GetMessage().caller_address()); - } else { - // If we've already seen this actor, it means that this actor was restarted. - // Thus, its previous state must be RESTARTING. - // TODO: The following is a workaround for the issue described in - // https://github.com/ray-project/ray/issues/5524, please see the issue - // description for more information. - if (actor_entry->second.GetState() != ActorTableData::RESTARTING) { - RAY_LOG(WARNING) << "Actor not in restarting state, most likely it " - << "died before creation handler could run. Actor state is " - << actor_entry->second.GetState(); - } - // Copy the static fields from the current actor entry. - actor_info_ptr.reset(new ActorTableData(actor_entry->second.GetTableData())); - // We are restarting the actor, so increment its num_restarts - actor_info_ptr->set_num_restarts(actor_info_ptr->num_restarts() + 1); - } - - // Set the new fields for the actor's state to indicate that the actor is - // now alive on this node manager. - actor_info_ptr->mutable_address()->set_ip_address( - gcs_client_->Nodes().GetSelfInfo().node_manager_address()); - actor_info_ptr->mutable_address()->set_port(port); - actor_info_ptr->mutable_address()->set_raylet_id(self_node_id_.Binary()); - actor_info_ptr->mutable_address()->set_worker_id(worker_id.Binary()); - actor_info_ptr->set_state(ActorTableData::ALIVE); - actor_info_ptr->set_timestamp(current_time_ms()); - return actor_info_ptr; -} - void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker, const Task &task) { RAY_LOG(DEBUG) << "Finishing assigned actor creation task"; @@ -2797,9 +2646,6 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Filter out direct call actors. These are not tracked by the raylet and // their assigned task ID is the actor ID. for (const auto &id : ready_task_id_set_copy) { - if (actor_registry_.count(id.ActorId()) == 0) { - RAY_LOG(WARNING) << "Actor not found in registry " << id.Hex(); - } ready_task_id_set.erase(id); } @@ -2815,11 +2661,7 @@ bool NodeManager::IsActorCreationTask(const TaskID &task_id) { auto actor_id = task_id.ActorId(); if (!actor_id.IsNil() && task_id == TaskID::ForActorCreationTask(actor_id)) { // This task ID corresponds to an actor creation task. - auto iter = actor_registry_.find(actor_id); - if (iter != actor_registry_.end()) { - // This actor is direct call actor. - return true; - } + return true; } return false; @@ -3073,14 +2915,6 @@ std::string NodeManager::DebugString() const { result << "\nnum async plasma notifications: " << async_plasma_objects_notification_.size(); } - /* Disabled for now #11239. - result << "\nActorRegistry:"; - - auto statistical_data = GetActorStatisticalData(actor_registry_); - result << "\n- num live actors: " << statistical_data.live_actors; - result << "\n- num restarting actors: " << statistical_data.restarting_actors; - result << "\n- num dead actors: " << statistical_data.dead_actors; - */ result << "\nRemote node managers: "; for (const auto &entry : remote_node_manager_addresses_) { @@ -3414,12 +3248,6 @@ void NodeManager::RecordMetrics() { object_manager_.RecordMetrics(); local_queues_.RecordMetrics(); - - /* Disabled for now #11239. - auto statistical_data = GetActorStatisticalData(actor_registry_); - stats::LiveActors().Record(statistical_data.live_actors); - stats::RestartingActors().Record(statistical_data.restarting_actors); - */ } bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 34d0527c9..c1ef24a08 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -27,7 +27,6 @@ #include "ray/common/task/task_common.h" #include "ray/common/task/scheduling_resources.h" #include "ray/object_manager/object_manager.h" -#include "ray/raylet/actor_registration.h" #include "ray/raylet/agent_manager.h" #include "ray/raylet/local_object_manager.h" #include "ray/raylet/scheduling/scheduling_ids.h" @@ -432,18 +431,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void DestroyWorker(std::shared_ptr worker); - /// The callback for handling an actor state transition (e.g., from ALIVE to - /// DEAD), whether as a notification from the actor table or as a handler for - /// a local actor's state transition. This method is idempotent and will ignore - /// old state transition. - /// - /// \param actor_id The actor ID of the actor whose state was updated. - /// \param actor_registration The ActorRegistration object that represents actor's - /// new state. - /// \return Void. - void HandleActorStateTransition(const ActorID &actor_id, - ActorRegistration &&actor_registration); - /// When a job finished, loop over all of the queued tasks for that job and /// treat them as failed. /// @@ -553,18 +540,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void ProcessPushErrorRequestMessage(const uint8_t *message_data); - /// Process client message of PrepareActorCheckpointRequest. - /// - /// \param client The client that sent the message. - /// \param message_data A pointer to the message data. - void ProcessPrepareActorCheckpointRequest( - const std::shared_ptr &client, const uint8_t *message_data); - - /// Process client message of NotifyActorResumedFromCheckpoint. - /// - /// \param message_data A pointer to the message data. - void ProcessNotifyActorResumedFromCheckpoint(const uint8_t *message_data); - /// Process client message of SetResourceRequest /// \param client The client that sent the message. /// \param message_data A pointer to the message data. @@ -747,12 +722,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { ReconstructionPolicy reconstruction_policy_; /// A manager to make waiting tasks's missing object dependencies available. TaskDependencyManager task_dependency_manager_; - /// A mapping from actor ID to registration information about that actor - /// (including which node manager owns it). - std::unordered_map actor_registry_; - /// This map stores actor ID to the ID of the checkpoint that will be used to - /// restore the actor. - std::unordered_map checkpoint_id_to_restore_; std::unique_ptr agent_manager_; diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 75445871b..3589fc840 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -282,32 +282,6 @@ Status raylet::RayletClient::FreeObjects(const std::vector &object_ids return conn_->WriteMessage(MessageType::FreeObjectsInObjectStoreRequest, &fbb); } -Status raylet::RayletClient::PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID *checkpoint_id) { - flatbuffers::FlatBufferBuilder fbb; - auto message = - protocol::CreatePrepareActorCheckpointRequest(fbb, to_flatbuf(fbb, actor_id)); - fbb.Finish(message); - - std::vector reply; - RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::PrepareActorCheckpointRequest, - MessageType::PrepareActorCheckpointReply, - &reply, &fbb)); - auto reply_message = - flatbuffers::GetRoot(reply.data()); - *checkpoint_id = ActorCheckpointID::FromBinary(reply_message->checkpoint_id()->str()); - return Status::OK(); -} - -Status raylet::RayletClient::NotifyActorResumedFromCheckpoint( - const ActorID &actor_id, const ActorCheckpointID &checkpoint_id) { - flatbuffers::FlatBufferBuilder fbb; - auto message = protocol::CreateNotifyActorResumedFromCheckpoint( - fbb, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, checkpoint_id)); - fbb.Finish(message); - return conn_->WriteMessage(MessageType::NotifyActorResumedFromCheckpoint, &fbb); -} - Status raylet::RayletClient::SetResource(const std::string &resource_name, const double capacity, const NodeID &node_id) { flatbuffers::FlatBufferBuilder fbb; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 9fd790f5e..a7fa8dcb0 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -26,7 +26,6 @@ #include "src/ray/protobuf/common.pb.h" #include "src/ray/protobuf/gcs.pb.h" -using ray::ActorCheckpointID; using ray::ActorID; using ray::JobID; using ray::NodeID; @@ -315,22 +314,6 @@ class RayletClient : public PinObjectsInterface, ray::Status FreeObjects(const std::vector &object_ids, bool local_only, bool deleteCreatingTasks); - /// Request raylet backend to prepare a checkpoint for an actor. - /// - /// \param[in] actor_id ID of the actor. - /// \param[out] checkpoint_id ID of the new checkpoint (output parameter). - /// \return ray::Status. - ray::Status PrepareActorCheckpoint(const ActorID &actor_id, - ActorCheckpointID *checkpoint_id); - - /// Notify raylet backend that an actor was resumed from a checkpoint. - /// - /// \param actor_id ID of the actor. - /// \param checkpoint_id ID of the checkpoint from which the actor was resumed. - /// \return ray::Status. - ray::Status NotifyActorResumedFromCheckpoint(const ActorID &actor_id, - const ActorCheckpointID &checkpoint_id); - /// Sets a resource with the specified capacity and client id /// \param resource_name Name of the resource to be set /// \param capacity Capacity of the resource diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 8539b589a..faba25e01 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -146,18 +146,6 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, UpdateActorInfo, actor_info_grpc_client_, ) - /// Add actor checkpoint data to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, AddActorCheckpoint, - actor_info_grpc_client_, ) - - /// Get actor checkpoint data from GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpoint, - actor_info_grpc_client_, ) - - /// Get actor checkpoint id data from GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorCheckpointID, - actor_info_grpc_client_, ) - /// Register a node to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, RegisterNode, node_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index d2b7bd62e..aecabcfa2 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -127,18 +127,6 @@ class ActorInfoGcsServiceHandler { virtual void HandleUpdateActorInfo(const UpdateActorInfoRequest &request, UpdateActorInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - - virtual void HandleAddActorCheckpoint(const AddActorCheckpointRequest &request, - AddActorCheckpointReply *reply, - SendReplyCallback send_reply_callback) = 0; - - virtual void HandleGetActorCheckpoint(const GetActorCheckpointRequest &request, - GetActorCheckpointReply *reply, - SendReplyCallback send_reply_callback) = 0; - - virtual void HandleGetActorCheckpointID(const GetActorCheckpointIDRequest &request, - GetActorCheckpointIDReply *reply, - SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `ActorInfoGcsService`. @@ -164,9 +152,6 @@ class ActorInfoGrpcService : public GrpcService { ACTOR_INFO_SERVICE_RPC_HANDLER(GetAllActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo); - ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint); - ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint); - ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID); } private: