Add C++ global state for actor table (#8501)

* add global state actors

* fix code style

* fix GcsActorManagerTest bug

* rebase master

* add jni code

* add get checkpoint id code

* add debug code

* add debug code

* change log level

* fix compile bug

* return null in jni

* fix crash bug

* change import seq

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
This commit is contained in:
fangfengbin
2020-05-29 21:10:42 +08:00
committed by GitHub
parent d483ed28ba
commit 35eeec5647
21 changed files with 393 additions and 244 deletions
@@ -10,6 +10,7 @@ import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.gcs.GlobalStateAccessor;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.ActorCheckpointIdData;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
@@ -116,9 +117,8 @@ public class GcsClient {
* If the actor exists in GCS.
*/
public boolean actorExists(ActorId actorId) {
byte[] key = ArrayUtils.addAll(
TablePrefix.ACTOR.toString().getBytes(), actorId.getBytes());
return primary.exists(key);
byte[] result = globalStateAccessor.getActorInfo(actorId);
return result != null;
}
public boolean wasCurrentActorRestarted(ActorId actorId) {
@@ -128,7 +128,7 @@ public class GcsClient {
}
// TODO(ZhuSenlin): Get the actor table data from CoreWorker later.
byte[] value = primary.get(key);
byte[] value = globalStateAccessor.getActorInfo(actorId);
if (value == null) {
return false;
}
@@ -138,7 +138,7 @@ public class GcsClient {
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
return actorTableData.getNumRestarts() != 0;
return actorTableData.getNumRestarts() != 0;
}
/**
@@ -156,11 +156,7 @@ public class GcsClient {
*/
public List<Checkpoint> getCheckpointsForActor(ActorId actorId) {
List<Checkpoint> checkpoints = new ArrayList<>();
final String prefix = TablePrefix.ACTOR_CHECKPOINT_ID.toString();
final byte[] key = ArrayUtils.addAll(prefix.getBytes(), actorId.getBytes());
RedisClient client = getShardClient(actorId);
byte[] result = client.get(key);
byte[] result = globalStateAccessor.getActorCheckpointId(actorId);
if (result != null) {
ActorCheckpointIdData data = null;
try {
@@ -1,6 +1,7 @@
package io.ray.runtime.gcs;
import com.google.common.base.Preconditions;
import io.ray.api.id.ActorId;
import java.util.List;
/**
@@ -64,6 +65,39 @@ public class GlobalStateAccessor {
}
}
/**
* @return A list of actor info with ActorInfo protobuf schema.
*/
public List<byte[]> getAllActorInfo() {
// Fetch a actor list with protobuf bytes format from GCS.
synchronized (GlobalStateAccessor.class) {
Preconditions.checkState(globalStateAccessorNativePointer != 0);
return this.nativeGetAllActorInfo(globalStateAccessorNativePointer);
}
}
/**
* @return An actor info with ActorInfo protobuf schema.
*/
public byte[] getActorInfo(ActorId actorId) {
// Fetch an actor with protobuf bytes format from GCS.
synchronized (GlobalStateAccessor.class) {
Preconditions.checkState(globalStateAccessorNativePointer != 0);
return this.nativeGetActorInfo(globalStateAccessorNativePointer, actorId.getBytes());
}
}
/**
* @return An actor checkpoint id data with ActorCheckpointIdData protobuf schema.
*/
public byte[] getActorCheckpointId(ActorId actorId) {
// Fetch an actor checkpoint id with protobuf bytes format from GCS.
synchronized (GlobalStateAccessor.class) {
Preconditions.checkState(globalStateAccessorNativePointer != 0);
return this.nativeGetActorCheckpointId(globalStateAccessorNativePointer, actorId.getBytes());
}
}
private void destroyGlobalStateAccessor() {
synchronized (GlobalStateAccessor.class) {
if (0 == globalStateAccessorNativePointer) {
@@ -85,4 +119,10 @@ public class GlobalStateAccessor {
private native List<byte[]> nativeGetAllJobInfo(long nativePtr);
private native List<byte[]> nativeGetAllNodeInfo(long nativePtr);
private native List<byte[]> nativeGetAllActorInfo(long nativePtr);
private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);
private native byte[] nativeGetActorCheckpointId(long nativePtr, byte[] actorId);
}