Add return value for recontruction RPC. (#3493)

* Add return value for recontruct RPC.

* Fix comment function name
This commit is contained in:
Yuhong Guo
2018-12-09 16:08:44 +08:00
committed by Eric Liang
parent 7aec357501
commit 0136af5aac
8 changed files with 39 additions and 23 deletions
@@ -6,6 +6,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayDevRuntime;
import org.ray.runtime.objectstore.MockObjectStore;
@@ -67,7 +68,7 @@ public class MockRayletClient implements RayletClient {
@Override
public void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly,
UniqueId currentTaskId) {
UniqueId currentTaskId) throws RayException {
}
@@ -3,6 +3,7 @@ package org.ray.runtime.raylet;
import java.util.List;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.task.TaskSpec;
@@ -15,7 +16,8 @@ public interface RayletClient {
TaskSpec getTask();
void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly, UniqueId currentTaskId);
void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly, UniqueId currentTaskId)
throws RayException;
void notifyUnblocked(UniqueId currentTaskId);
@@ -10,6 +10,7 @@ import java.util.List;
import java.util.Map;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.generated.Arg;
@@ -89,13 +90,16 @@ public class RayletClientImpl implements RayletClient {
@Override
public void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly,
UniqueId currentTaskId) {
UniqueId currentTaskId) throws RayException {
if (RayLog.core.isDebugEnabled()) {
RayLog.core.debug("Blocked on objects for task {}, object IDs are {}",
UniqueIdUtil.computeTaskId(objectIds.get(0)), objectIds);
}
nativeFetchOrReconstruct(client, UniqueIdUtil.getIdBytes(objectIds),
int ret = nativeFetchOrReconstruct(client, UniqueIdUtil.getIdBytes(objectIds),
fetchOnly, currentTaskId.getBytes());
if (ret != 0) {
throw new RayException("Connection closed by Raylet");
}
}
@Override
@@ -274,7 +278,7 @@ public class RayletClientImpl implements RayletClient {
private static native void nativeDestroy(long client);
private static native void nativeFetchOrReconstruct(long client, byte[][] objectIds,
private static native int nativeFetchOrReconstruct(long client, byte[][] objectIds,
boolean fetchOnly, byte[] currentTaskId);
private static native void nativeNotifyUnblocked(long client, byte[] currentTaskId);