[Java] Improve some Java code (#3040)

This PR improves some java codes,  and removes some duplicated code.
This commit is contained in:
Wang Qing
2018-10-11 08:30:23 +08:00
committed by Robert Nishihara
parent 060891a9c9
commit 4a2ed47b6c
5 changed files with 36 additions and 41 deletions
@@ -22,7 +22,7 @@ import org.ray.runtime.raylet.RayletClient;
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.UniqueIdHelper;
import org.ray.runtime.util.UniqueIdUtil;
import org.ray.runtime.util.exception.TaskExecutionException;
import org.ray.runtime.util.logger.RayLog;
@@ -63,7 +63,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
public <T> RayObject<T> put(T obj) {
UniqueId objectId = UniqueIdHelper.computePutId(
UniqueId objectId = UniqueIdUtil.computePutId(
workerContext.getCurrentTask().taskId, workerContext.nextPutIndex());
put(objectId, obj);
@@ -222,7 +222,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
private UniqueId[] genReturnIds(UniqueId taskId, int numReturns) {
UniqueId[] ret = new UniqueId[numReturns];
for (int i = 0; i < numReturns; i++) {
ret[i] = UniqueIdHelper.computeReturnId(taskId, i + 1);
ret[i] = UniqueIdUtil.computeReturnId(taskId, i + 1);
}
return ret;
}
@@ -7,6 +7,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.id.UniqueId;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.util.Serializer;
import org.ray.runtime.util.UniqueIdUtil;
import org.ray.runtime.util.exception.TaskExecutionException;
/**
@@ -15,9 +16,10 @@ import org.ray.runtime.util.exception.TaskExecutionException;
*/
public class ObjectStoreProxy {
private static final int GET_TIMEOUT_MS = 1000;
private final AbstractRayRuntime runtime;
private final ObjectStoreLink store;
private final int getTimeoutMs = 1000;
public ObjectStoreProxy(AbstractRayRuntime runtime, ObjectStoreLink store) {
this.runtime = runtime;
@@ -26,7 +28,7 @@ public class ObjectStoreProxy {
public <T> Pair<T, GetStatus> get(UniqueId objectId, boolean isMetadata)
throws TaskExecutionException {
return get(objectId, getTimeoutMs, isMetadata);
return get(objectId, GET_TIMEOUT_MS, isMetadata);
}
public <T> Pair<T, GetStatus> get(UniqueId id, int timeoutMs, boolean isMetadata)
@@ -46,12 +48,12 @@ public class ObjectStoreProxy {
public <T> List<Pair<T, GetStatus>> get(List<UniqueId> objectIds, boolean isMetadata)
throws TaskExecutionException {
return get(objectIds, getTimeoutMs, isMetadata);
return get(objectIds, GET_TIMEOUT_MS, isMetadata);
}
public <T> List<Pair<T, GetStatus>> get(List<UniqueId> ids, int timeoutMs, boolean isMetadata)
throws TaskExecutionException {
List<byte[]> objs = store.get(getIdBytes(ids), timeoutMs, isMetadata);
List<byte[]> objs = store.get(UniqueIdUtil.getIdBytes(ids), timeoutMs, isMetadata);
List<Pair<T, GetStatus>> ret = new ArrayList<>();
for (int i = 0; i < objs.size(); i++) {
byte[] obj = objs.get(i);
@@ -69,15 +71,6 @@ public class ObjectStoreProxy {
return ret;
}
private static byte[][] getIdBytes(List<UniqueId> objectIds) {
int size = objectIds.size();
byte[][] ids = new byte[size][];
for (int i = 0; i < size; i++) {
ids[i] = objectIds.get(i).getBytes();
}
return ids;
}
public void put(UniqueId id, Object obj, Object metadata) {
store.put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
}
@@ -18,7 +18,7 @@ import org.ray.runtime.generated.TaskInfo;
import org.ray.runtime.generated.TaskLanguage;
import org.ray.runtime.task.FunctionArg;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.UniqueIdHelper;
import org.ray.runtime.util.UniqueIdUtil;
import org.ray.runtime.util.logger.RayLog;
public class RayletClientImpl implements RayletClient {
@@ -50,7 +50,8 @@ public class RayletClientImpl implements RayletClient {
ids.add(element.getId());
}
boolean[] ready = nativeWaitObject(client, getIdBytes(ids), numReturns, timeoutMs, false);
boolean[] ready = nativeWaitObject(client, UniqueIdUtil.getIdBytes(ids),
numReturns, timeoutMs, false);
List<RayObject<T>> readyList = new ArrayList<>();
List<RayObject<T>> unreadyList = new ArrayList<>();
@@ -89,9 +90,9 @@ public class RayletClientImpl implements RayletClient {
public void reconstructObjects(List<UniqueId> objectIds, boolean fetchOnly) {
if (RayLog.core.isInfoEnabled()) {
RayLog.core.info("Reconstructing objects for task {}, object IDs are {}",
UniqueIdHelper.computeTaskId(objectIds.get(0)), objectIds);
UniqueIdUtil.computeTaskId(objectIds.get(0)), objectIds);
}
nativeReconstructObjects(client, getIdBytes(objectIds), fetchOnly);
nativeReconstructObjects(client, UniqueIdUtil.getIdBytes(objectIds), fetchOnly);
}
@Override
@@ -107,7 +108,7 @@ public class RayletClientImpl implements RayletClient {
@Override
public void freePlasmaObjects(List<UniqueId> objectIds, boolean localOnly) {
byte[][] objectIdsArray = getIdBytes(objectIds);
byte[][] objectIdsArray = UniqueIdUtil.getIdBytes(objectIds);
nativeFreePlasmaObjects(client, objectIdsArray, localOnly);
}
@@ -242,15 +243,6 @@ public class RayletClientImpl implements RayletClient {
return buffer;
}
private static byte[][] getIdBytes(List<UniqueId> objectIds) {
int size = objectIds.size();
byte[][] ids = new byte[size][];
for (int i = 0; i < size; i++) {
ids[i] = objectIds.get(i).getBytes();
}
return ids;
}
public void destroy() {
nativeDestroy(client);
}
@@ -3,6 +3,8 @@ package org.ray.runtime.util;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import org.ray.api.id.UniqueId;
@@ -11,7 +13,7 @@ import org.ray.api.id.UniqueId;
* Note: any changes to these methods must be synced with C++ helper functions
* in src/ray/id.h
*/
public class UniqueIdHelper {
public class UniqueIdUtil {
public static final int OBJECT_INDEX_POS = 0;
public static final int OBJECT_INDEX_LENGTH = 4;
@@ -37,7 +39,7 @@ public class UniqueIdHelper {
System.arraycopy(taskId.getBytes(),0, objId, 0, UniqueId.LENGTH);
ByteBuffer wbb = ByteBuffer.wrap(objId);
wbb.order(ByteOrder.LITTLE_ENDIAN);
wbb.putInt(UniqueIdHelper.OBJECT_INDEX_POS, index);
wbb.putInt(UniqueIdUtil.OBJECT_INDEX_POS, index);
return new UniqueId(objId);
}
@@ -63,9 +65,18 @@ public class UniqueIdHelper {
public static UniqueId computeTaskId(UniqueId objectId) {
byte[] taskId = new byte[UniqueId.LENGTH];
System.arraycopy(objectId.getBytes(), 0, taskId, 0, UniqueId.LENGTH);
Arrays.fill(taskId, UniqueIdHelper.OBJECT_INDEX_POS,
UniqueIdHelper.OBJECT_INDEX_POS + UniqueIdHelper.OBJECT_INDEX_LENGTH, (byte) 0);
Arrays.fill(taskId, UniqueIdUtil.OBJECT_INDEX_POS,
UniqueIdUtil.OBJECT_INDEX_POS + UniqueIdUtil.OBJECT_INDEX_LENGTH, (byte) 0);
return new UniqueId(taskId);
}
public static byte[][] getIdBytes(List<UniqueId> objectIds) {
int size = objectIds.size();
byte[][] ids = new byte[size][];
for (int i = 0; i < size; i++) {
ids[i] = objectIds.get(i).getBytes();
}
return ids;
}
}