[JavaWorker] Java code lint check and binding to CI (#2225)

* add java code lint check and fix the java code lint error

* add java doc lint check and fix the java doc lint error

* add java code and doc lint to the CI
This commit is contained in:
Yujie Liu
2018-06-10 07:26:54 +08:00
committed by Philipp Moritz
parent 5789a247f9
commit 3b5e700fd7
158 changed files with 3805 additions and 3325 deletions
+48 -48
View File
@@ -1,58 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.ray.parent</groupId>
<artifactId>ray-superpom</artifactId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.ray</groupId>
<artifactId>ray-runtime-common</artifactId>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.ray.parent</groupId>
<artifactId>ray-superpom</artifactId>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.ray</groupId>
<artifactId>ray-runtime-common</artifactId>
<name>runtime common</name>
<description>runtime common</description>
<url></url>
<name>runtime common</name>
<description>runtime common</description>
<url></url>
<packaging>jar</packaging>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-hook</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.47</version>
</dependency>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-hook</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.47</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.davidmoten/flatbuffers-java -->
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.7.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.davidmoten/flatbuffers-java -->
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>flatbuffers-java</artifactId>
<version>1.7.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-plasma</artifactId>
</dependency>
</dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-plasma</artifactId>
</dependency>
</dependencies>
</project>
@@ -20,7 +20,7 @@ import org.ray.spi.model.TaskSpec;
import org.ray.util.exception.TaskExecutionException;
/**
* arguments wrap and unwrap
* arguments wrap and unwrap.
*/
public class ArgumentsBuilder {
@@ -36,12 +36,10 @@ public class ArgumentsBuilder {
} else if (oarg.getClass().equals(RayActor.class)) {
// serialize actor unique id
if (k == 0) {
RayActorID aid = new RayActorID();
aid.Id = ((RayActor) oarg).getId();
RayActorId aid = new RayActorId();
aid.id = ((RayActor) oarg).getId();
fargs[k].data = Serializer.encode(aid);
}
// serialize actor handle
else {
} else { // serialize actor handle
fargs[k].data = Serializer.encode(oarg);
}
@@ -85,17 +83,17 @@ public class ArgumentsBuilder {
@SuppressWarnings({"rawtypes", "unchecked"})
public static Pair<Object, Object[]> unwrap(TaskSpec task, Method m, ClassLoader classLoader)
throws TaskExecutionException {
FunctionArg fargs[] = task.args;
Object this_ = null;
Object realArgs[];
FunctionArg[] fargs = task.args;
Object current = null;
Object[] realArgs;
int start = 0;
// check actor method
if (!Modifier.isStatic(m.getModifiers())) {
start = 1;
RayActorID actorId = Serializer.decode(fargs[0].data, classLoader);
this_ = RayRuntime.getInstance().getLocalActor(actorId.Id);
RayActorId actorId = Serializer.decode(fargs[0].data, classLoader);
current = RayRuntime.getInstance().getLocalActor(actorId.id);
realArgs = new Object[fargs.length - 1];
} else {
realArgs = new Object[fargs.length];
@@ -110,22 +108,16 @@ public class ArgumentsBuilder {
Object obj = Serializer.decode(farg.data, classLoader);
// due to remote lambda, method may be static
if (obj instanceof RayActorID) {
if (obj instanceof RayActorId) {
assert (k == 0);
realArgs[raIndex] = RayRuntime.getInstance().getLocalActor(((RayActorID) obj).Id);
realArgs[raIndex] = RayRuntime.getInstance().getLocalActor(((RayActorId) obj).id);
} else {
realArgs[raIndex] = obj;
}
}
// only ids, big data or single object id
else if (farg.data == null) {
} else if (farg.data == null) { // only ids, big data or single object id
assert (farg.ids.size() == 1);
realArgs[raIndex] = RayRuntime.getInstance().get(farg.ids.get(0));
}
// both id and data, could be RayList or RayMap only
else {
} else { // both id and data, could be RayList or RayMap only
Object idBag = Serializer.decode(farg.data, classLoader);
if (idBag instanceof RayMapArg) {
Map newMap = new HashMap<>();
@@ -144,7 +136,7 @@ public class ArgumentsBuilder {
}
}
}
return Pair.of(this_, realArgs);
return Pair.of(current, realArgs);
}
//for recognition
@@ -161,9 +153,9 @@ public class ArgumentsBuilder {
}
public static class RayActorID implements Serializable {
public static class RayActorId implements Serializable {
private static final long serialVersionUID = 3993646395842605166L;
public UniqueID Id;
public UniqueID id;
}
}
@@ -14,7 +14,7 @@ import org.ray.util.exception.TaskExecutionException;
import org.ray.util.logger.RayLog;
/**
* how to execute a invocation
* how to execute a invocation.
*/
public class InvocationExecutor {
@@ -67,18 +67,14 @@ public class InvocationExecutor {
}
}
private static void safePut(UniqueID objectId, Object obj) {
RayRuntime.getInstance().putRaw(objectId, obj);
}
private static void executeInternal(TaskSpec task, Pair<ClassLoader, RayMethod> pr,
String taskdesc)
String taskdesc)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Method m = pr.getRight().invokable;
Map<?, UniqueID> userRayReturnIdMap = null;
Class<?> returnType = m.getReturnType(); // TODO: not ready for multiple return etc.
boolean hasMultiReturn = false;
if(task.returnIds != null && task.returnIds.length > 0) {
if (task.returnIds != null && task.returnIds.length > 0) {
hasMultiReturn = UniqueIdHelper.hasMultipleReturnOrNotFromReturnObjectId(task.returnIds[0]);
}
@@ -94,10 +90,10 @@ public class InvocationExecutor {
if (!UniqueIdHelper.isLambdaFunction(task.functionId)) {
result = m.invoke(realArgs.getLeft(), realArgs.getRight());
} else {
result = m.invoke(realArgs.getLeft(), new Object[]{realArgs.getRight()});
result = m.invoke(realArgs.getLeft(), new Object[] {realArgs.getRight()});
}
if(task.returnIds == null || task.returnIds.length == 0) {
if (task.returnIds == null || task.returnIds.length == 0) {
return;
}
// set result into storage
@@ -147,4 +143,8 @@ public class InvocationExecutor {
return "Execute task " + task.taskId
+ " failed with function name = " + funcName;
}
private static void safePut(UniqueID objectId, Object obj) {
RayRuntime.getInstance().putRaw(objectId, obj);
}
}
@@ -13,18 +13,49 @@ import org.ray.spi.model.RayMethod;
import org.ray.util.logger.RayLog;
/**
* local function manager which pulls remote functions on demand
* local function manager which pulls remote functions on demand.
*/
public class LocalFunctionManager {
private final RemoteFunctionManager remoteLoader;
private final ConcurrentHashMap<UniqueID, FunctionTable> functionTables
= new ConcurrentHashMap<>();
/**
* initialize load function manager using remote function manager to pull remote functions on
* demand
* demand.
*/
public LocalFunctionManager(RemoteFunctionManager remoteLoader) {
this.remoteLoader = remoteLoader;
}
/**
* get local method for executing, which pulls information from remote repo on-demand, therefore
* it may block for a while if the related resources (e.g., jars) are not ready on local machine
*/
public Pair<ClassLoader, RayMethod> getMethod(UniqueID driverId, UniqueID methodId,
FunctionArg[] args) throws NoSuchMethodException,
SecurityException, ClassNotFoundException {
FunctionTable funcs = loadDriverFunctions(driverId);
RayMethod m;
// hooked methods
if (!UniqueIdHelper.isLambdaFunction(methodId)) {
m = funcs.functions.get(methodId);
if (null == m) {
throw new RuntimeException(
"DriverId " + driverId + " load remote function methodId:" + methodId + " failed");
}
} else { // remote lambda
assert args.length >= 2;
String fname = Serializer.decode(args[args.length - 2].data);
Method fm = Class.forName(fname).getMethod("execute", Object[].class);
m = new RayMethod(fm);
}
return Pair.of(funcs.linkedFunctions.loader, m);
}
private synchronized FunctionTable loadDriverFunctions(UniqueID driverId) {
FunctionTable funcs = functionTables.get(driverId);
if (null == funcs) {
@@ -50,9 +81,7 @@ public class LocalFunctionManager {
}
functionTables.put(driverId, funcs);
}
// reSync automatically
else {
} else { // reSync automatically
// more functions are loaded
if (funcs.linkedFunctions.functions.size() > funcs.functions.size()) {
for (MethodId mid : funcs.linkedFunctions.functions) {
@@ -71,36 +100,7 @@ public class LocalFunctionManager {
}
/**
* get local method for executing, which pulls information from remote repo on-demand, therefore
* it may block for a while if the related resources (e.g., jars) are not ready on local machine
*/
public Pair<ClassLoader, RayMethod> getMethod(UniqueID driverId, UniqueID methodId,
FunctionArg[] args) throws NoSuchMethodException, SecurityException, ClassNotFoundException {
FunctionTable funcs = loadDriverFunctions(driverId);
RayMethod m;
// hooked methods
if (!UniqueIdHelper.isLambdaFunction(methodId)) {
m = funcs.functions.get(methodId);
if (null == m) {
throw new RuntimeException(
"DriverId " + driverId + " load remote function methodId:" + methodId + " failed");
}
}
// remote lambda
else {
assert args.length >= 2;
String fname = Serializer.decode(args[args.length - 2].data);
Method fm = Class.forName(fname).getMethod("execute", Object[].class);
m = new RayMethod(fm);
}
return Pair.of(funcs.linkedFunctions.loader, m);
}
/**
* unload the functions when the driver is declared dead
* unload the functions when the driver is declared dead.
*/
public synchronized void removeApp(UniqueID driverId) {
FunctionTable funcs = functionTables.get(driverId);
@@ -115,7 +115,4 @@ public class LocalFunctionManager {
public final ConcurrentHashMap<UniqueID, RayMethod> functions = new ConcurrentHashMap<>();
public LoadedFunctions linkedFunctions;
}
private final RemoteFunctionManager remoteLoader;
private final ConcurrentHashMap<UniqueID, FunctionTable> functionTables = new ConcurrentHashMap<>();
}
@@ -9,7 +9,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.Ray;
@@ -35,19 +34,20 @@ import org.ray.util.logger.DynamicLogManager;
import org.ray.util.logger.RayLog;
/**
* Core functionality to implement Ray APIs
* Core functionality to implement Ray APIs.
*/
public abstract class RayRuntime implements RayApi {
protected static RayRuntime ins = null;
protected static RayParameters params = null;
private static boolean fromRayInit = false;
public static ConfigReader configReader;
public abstract void cleanUp();
protected static RayRuntime ins = null;
protected static RayParameters params = null;
private static boolean fromRayInit = false;
protected Worker worker;
protected LocalSchedulerProxy localSchedulerProxy;
protected ObjectStoreProxy objectStoreProxy;
protected LocalFunctionManager functions;
protected RemoteFunctionManager remoteFunctionManager;
protected PathConfig pathConfig;
// app level Ray.init()
// make it private so there is no direct usage but only from Ray.init
@@ -65,24 +65,6 @@ public abstract class RayRuntime implements RayApi {
return ins;
}
// init with command line args
// --config=ray.config.ini --overwrite=updateConfigStr
public static RayRuntime init(String[] args) throws Exception {
String config = null;
String updateConfig = null;
for (String arg : args) {
if (arg.startsWith("--config=")) {
config = arg.substring("--config=".length());
} else if (arg.startsWith("--overwrite=")) {
updateConfig = arg.substring("--overwrite=".length());
} else {
throw new RuntimeException("Input argument " + arg
+ " is not recognized, please use --overwrite to merge it into config file");
}
}
return init(config, updateConfig);
}
// engine level RayRuntime.init(xx, xx)
// updateConfigStr is sth like section1.k1=v1;section2.k2=v2
public static RayRuntime init(String configPath, String updateConfigStr) throws Exception {
@@ -114,9 +96,43 @@ public abstract class RayRuntime implements RayApi {
return ins;
}
// init with command line args
// --config=ray.config.ini --overwrite=updateConfigStr
public static RayRuntime init(String[] args) throws Exception {
String config = null;
String updateConfig = null;
for (String arg : args) {
if (arg.startsWith("--config=")) {
config = arg.substring("--config=".length());
} else if (arg.startsWith("--overwrite=")) {
updateConfig = arg.substring("--overwrite=".length());
} else {
throw new RuntimeException("Input argument " + arg
+ " is not recognized, please use --overwrite to merge it into config file");
}
}
return init(config, updateConfig);
}
protected void init(
LocalSchedulerLink slink,
ObjectStoreLink plink,
RemoteFunctionManager remoteLoader,
PathConfig pathManager
) {
UniqueIdHelper.setThreadRandomSeed(UniqueIdHelper.getUniqueness(params.driver_id));
remoteFunctionManager = remoteLoader;
pathConfig = pathManager;
functions = new LocalFunctionManager(remoteLoader);
localSchedulerProxy = new LocalSchedulerProxy(slink);
objectStoreProxy = new ObjectStoreProxy(plink);
worker = new Worker(localSchedulerProxy, functions);
}
private static RayRuntime instantiate(RayParameters params) {
String className = params.run_mode.isNativeRuntime() ?
"org.ray.core.impl.RayNativeRuntime" : "org.ray.core.impl.RayDevRuntime";
String className = params.run_mode.isNativeRuntime()
? "org.ray.core.impl.RayNativeRuntime" : "org.ray.core.impl.RayDevRuntime";
RayRuntime runtime;
try {
@@ -129,7 +145,8 @@ public abstract class RayRuntime implements RayApi {
runtime = (RayRuntime) cons.newInstance();
cons.setAccessible(false);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException | ClassNotFoundException | NoSuchMethodException e) {
| InvocationTargetException | SecurityException | ClassNotFoundException
| NoSuchMethodException e) {
RayLog.core
.error("Load class " + className + " failed for run-mode " + params.run_mode.toString(),
e);
@@ -150,6 +167,11 @@ public abstract class RayRuntime implements RayApi {
return runtime;
}
/**
* start runtime.
*/
public abstract void start(RayParameters params) throws Exception;
public static RayRuntime getInstance() {
return ins;
}
@@ -158,38 +180,57 @@ public abstract class RayRuntime implements RayApi {
return params;
}
/*********** RayApi methods ***********/
public <T, TM> void putRaw(UniqueID taskId, UniqueID objectId, T obj, TM metadata) {
RayLog.core.info("Task " + taskId.toString() + " Object " + objectId.toString() + " put");
localSchedulerProxy.markTaskPutDependency(taskId, objectId);
objectStoreProxy.put(objectId, obj, metadata);
}
public abstract void cleanUp();
public <T> void putRaw(UniqueID taskId, UniqueID objectId, T obj) {
putRaw(taskId, objectId, obj, null);
}
/***********
* RayApi methods.
***********/
public <T, TMT> void putRaw(UniqueID taskId, UniqueID objectId, T obj, TMT metadata) {
RayLog.core.info("Task " + taskId.toString() + " Object " + objectId.toString() + " put");
localSchedulerProxy.markTaskPutDependency(taskId, objectId);
objectStoreProxy.put(objectId, obj, metadata);
}
public <T> void putRaw(UniqueID objectId, T obj) {
UniqueID taskId = getCurrentTaskID();
UniqueID taskId = getCurrentTaskId();
putRaw(taskId, objectId, obj, null);
}
public <T> void putRaw(T obj) {
UniqueID taskId = getCurrentTaskID();
UniqueID objectId = getCurrentTaskNextPutID();
UniqueID taskId = getCurrentTaskId();
UniqueID objectId = getCurrentTaskNextPutId();
putRaw(taskId, objectId, obj, null);
}
/**
* get the task identity of the currently running task, UniqueID.Nil if not inside any
*/
public UniqueID getCurrentTaskId() {
return worker.getCurrentTaskId();
}
/**
* get the to-be-returned objects identities of the currently running task, empty array if not
* inside any.
*/
public UniqueID getCurrentTaskNextPutId() {
return worker.getCurrentTaskNextPutId();
}
@Override
public <T> RayObject<T> put(T obj) {
return put(obj, null);
}
@Override
public <T, TM> RayObject<T> put(T obj, TM metadata) {
UniqueID taskId = getCurrentTaskID();
UniqueID objectId = getCurrentTaskNextPutID();
public <T, TMT> RayObject<T> put(T obj, TMT metadata) {
UniqueID taskId = getCurrentTaskId();
UniqueID objectId = getCurrentTaskNextPutId();
putRaw(taskId, objectId, obj, metadata);
return new RayObject<>(objectId);
}
@@ -200,58 +241,13 @@ public abstract class RayRuntime implements RayApi {
}
@Override
public <T> T getMeta(UniqueID objectId) throws TaskExecutionException {
return doGet(objectId, true);
}
private <T> T doGet(UniqueID objectId, boolean isMetadata) throws TaskExecutionException {
boolean wasBlocked = false;
UniqueID taskId = getCurrentTaskID();
try {
// Do an initial fetch.
objectStoreProxy.fetch(objectId);
// Get the object. We initially try to get the object immediately.
Pair<T, GetStatus> ret = objectStoreProxy
.get(objectId, params.default_first_check_timeout_ms, isMetadata);
wasBlocked = (ret.getRight() != GetStatus.SUCCESS);
// Try reconstructing the object. Try to get it until at least PlasmaLink.GET_TIMEOUT_MS
// milliseconds passes, then repeat.
while (ret.getRight() != GetStatus.SUCCESS) {
RayLog.core.warn(
"Task " + taskId + " Object " + objectId.toString() + " get failed, reconstruct ...");
localSchedulerProxy.reconstructObject(objectId);
// Do another fetch
objectStoreProxy.fetch(objectId);
ret = objectStoreProxy.get(objectId, params.default_get_check_interval_ms,
isMetadata);//check the result every 5s, but it will return once available
}
RayLog.core.debug(
"Task " + taskId + " Object " + objectId.toString() + " get" + ", the result " + ret
.getLeft());
return ret.getLeft();
} catch (TaskExecutionException e) {
RayLog.core
.error("Task " + taskId + " Object " + objectId.toString() + " get with Exception", e);
throw e;
} finally {
// If the object was not able to get locally, let the local scheduler
// know that we're now unblocked.
if (wasBlocked) {
localSchedulerProxy.notifyUnblocked();
}
}
public <T> List<T> get(List<UniqueID> objectIds) throws TaskExecutionException {
return doGet(objectIds, false);
}
@Override
public <T> List<T> get(List<UniqueID> objectIds) throws TaskExecutionException {
return doGet(objectIds, false);
public <T> T getMeta(UniqueID objectId) throws TaskExecutionException {
return doGet(objectId, true);
}
@Override
@@ -259,26 +255,59 @@ public abstract class RayRuntime implements RayApi {
return doGet(objectIds, true);
}
// We divide the fetch into smaller fetches so as to not block the manager
// for a prolonged period of time in a single call.
private void dividedFetch(List<UniqueID> objectIds) {
int fetchSize = objectStoreProxy.getFetchSize();
@Override
public <T> WaitResult<T> wait(RayList<T> waitfor, int numReturns, int timeout) {
return objectStoreProxy.wait(waitfor, numReturns, timeout);
}
int numObjectIds = objectIds.size();
for (int i = 0; i < numObjectIds; i += fetchSize) {
int endIndex = i + fetchSize;
if (endIndex < numObjectIds) {
objectStoreProxy.fetch(objectIds.subList(i, endIndex));
} else {
objectStoreProxy.fetch(objectIds.subList(i, numObjectIds));
}
}
@Override
public RayObjects call(UniqueID taskId, Callable funcRun, int returnCount, Object... args) {
return worker.rpc(taskId, funcRun, returnCount, args);
}
@Override
public RayObjects call(UniqueID taskId, Class<?> funcCls, Serializable lambda, int returnCount,
Object... args) {
return worker.rpc(taskId, UniqueID.nil, funcCls, lambda, returnCount, args);
}
@Override
public <R, RIDT> RayMap<RIDT, R> callWithReturnLabels(UniqueID taskId, Callable funcRun,
Collection<RIDT> returnIds,
Object... args) {
return worker.rpcWithReturnLabels(taskId, funcRun, returnIds, args);
}
@Override
public <R, RIDT> RayMap<RIDT, R> callWithReturnLabels(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Collection<RIDT>
returnids,
Object... args) {
return worker.rpcWithReturnLabels(taskId, funcCls, lambda, returnids, args);
}
@Override
public <R> RayList<R> callWithReturnIndices(UniqueID taskId, Callable funcRun,
Integer returnCount, Object... args) {
return worker.rpcWithReturnIndices(taskId, funcRun, returnCount, args);
}
@Override
public <R> RayList<R> callWithReturnIndices(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Integer returnCount, Object...
args) {
return worker.rpcWithReturnIndices(taskId, funcCls, lambda, returnCount, args);
}
@Override
public boolean isRemoteLambda() {
return params.run_mode.isRemoteLambda();
}
private <T> List<T> doGet(List<UniqueID> objectIds, boolean isMetadata)
throws TaskExecutionException {
boolean wasBlocked = false;
UniqueID taskId = getCurrentTaskID();
UniqueID taskId = getCurrentTaskId();
try {
int numObjectIds = objectIds.size();
@@ -348,53 +377,65 @@ public abstract class RayRuntime implements RayApi {
}
@Override
public <T> WaitResult<T> wait(RayList<T> waitfor, int numReturns, int timeout) {
return objectStoreProxy.wait(waitfor, numReturns, timeout);
private <T> T doGet(UniqueID objectId, boolean isMetadata) throws TaskExecutionException {
boolean wasBlocked = false;
UniqueID taskId = getCurrentTaskId();
try {
// Do an initial fetch.
objectStoreProxy.fetch(objectId);
// Get the object. We initially try to get the object immediately.
Pair<T, GetStatus> ret = objectStoreProxy
.get(objectId, params.default_first_check_timeout_ms, isMetadata);
wasBlocked = (ret.getRight() != GetStatus.SUCCESS);
// Try reconstructing the object. Try to get it until at least PlasmaLink.GET_TIMEOUT_MS
// milliseconds passes, then repeat.
while (ret.getRight() != GetStatus.SUCCESS) {
RayLog.core.warn(
"Task " + taskId + " Object " + objectId.toString() + " get failed, reconstruct ...");
localSchedulerProxy.reconstructObject(objectId);
// Do another fetch
objectStoreProxy.fetch(objectId);
ret = objectStoreProxy.get(objectId, params.default_get_check_interval_ms,
isMetadata);//check the result every 5s, but it will return once available
}
RayLog.core.debug(
"Task " + taskId + " Object " + objectId.toString() + " get" + ", the result " + ret
.getLeft());
return ret.getLeft();
} catch (TaskExecutionException e) {
RayLog.core
.error("Task " + taskId + " Object " + objectId.toString() + " get with Exception", e);
throw e;
} finally {
// If the object was not able to get locally, let the local scheduler
// know that we're now unblocked.
if (wasBlocked) {
localSchedulerProxy.notifyUnblocked();
}
}
}
@Override
public RayObjects call(UniqueID taskId, Callable funcRun, int returnCount, Object... args) {
return worker.rpc(taskId, funcRun, returnCount, args);
}
// We divide the fetch into smaller fetches so as to not block the manager
// for a prolonged period of time in a single call.
private void dividedFetch(List<UniqueID> objectIds) {
int fetchSize = objectStoreProxy.getFetchSize();
@Override
public RayObjects call(UniqueID taskId, Class<?> funcCls, Serializable lambda, int returnCount,
Object... args) {
return worker.rpc(taskId, UniqueID.nil, funcCls, lambda, returnCount, args);
}
@Override
public <R, RID> RayMap<RID, R> callWithReturnLabels(UniqueID taskId, Callable funcRun,
Collection<RID> returnIds,
Object... args) {
return worker.rpcWithReturnLabels(taskId, funcRun, returnIds, args);
}
@Override
public <R, RID> RayMap<RID, R> callWithReturnLabels(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Collection<RID> returnids,
Object... args) {
return worker.rpcWithReturnLabels(taskId, funcCls, lambda, returnids, args);
}
@Override
public <R> RayList<R> callWithReturnIndices(UniqueID taskId, Callable funcRun,
Integer returnCount, Object... args) {
return worker.rpcWithReturnIndices(taskId, funcRun, returnCount, args);
}
@Override
public <R> RayList<R> callWithReturnIndices(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Integer returnCount, Object... args) {
return worker.rpcWithReturnIndices(taskId, funcCls, lambda, returnCount, args);
}
/**
* get the task identity of the currently running task, UniqueID.Nil if not inside any
*/
public UniqueID getCurrentTaskID() {
return worker.getCurrentTaskID();
int numObjectIds = objectIds.size();
for (int i = 0; i < numObjectIds; i += fetchSize) {
int endIndex = i + fetchSize;
if (endIndex < numObjectIds) {
objectStoreProxy.fetch(objectIds.subList(i, endIndex));
} else {
objectStoreProxy.fetch(objectIds.subList(i, numObjectIds));
}
}
}
/**
@@ -404,48 +445,16 @@ public abstract class RayRuntime implements RayApi {
return worker.getCurrentTaskReturnIDs();
}
/**
* get the to-be-returned objects identities of the currently running task, empty array if not
* inside any
*/
public UniqueID getCurrentTaskNextPutID() {
return worker.getCurrentTaskNextPutID();
}
@Override
public boolean isRemoteLambda() {
return params.run_mode.isRemoteLambda();
}
protected void init(
LocalSchedulerLink slink,
ObjectStoreLink plink,
RemoteFunctionManager remoteLoader,
PathConfig pathManager
) {
UniqueIdHelper.setThreadRandomSeed(UniqueIdHelper.getUniqueness(params.driver_id));
remoteFunctionManager = remoteLoader;
pathConfig = pathManager;
functions = new LocalFunctionManager(remoteLoader);
localSchedulerProxy = new LocalSchedulerProxy(slink);
objectStoreProxy = new ObjectStoreProxy(plink);
worker = new Worker(localSchedulerProxy, functions);
}
/*********** Internal Methods ***********/
/***********
* Internal Methods.
***********/
public void loop() {
worker.loop();
}
/**
* start runtime
*/
public abstract void start(RayParameters params) throws Exception;
/**
* get actor with given id
* get actor with given id.
*/
public abstract Object getLocalActor(UniqueID id);
@@ -456,11 +465,4 @@ public abstract class RayRuntime implements RayApi {
public RemoteFunctionManager getRemoteFunctionManager() {
return remoteFunctionManager;
}
protected Worker worker;
protected LocalSchedulerProxy localSchedulerProxy;
protected ObjectStoreProxy objectStoreProxy;
protected LocalFunctionManager functions;
protected RemoteFunctionManager remoteFunctionManager;
protected PathConfig pathConfig;
}
@@ -14,11 +14,6 @@ public class Serializer {
return conf.get().asByteArray(obj);
}
@SuppressWarnings("unchecked")
public static <T> T decode(byte[] bs) {
return (T) conf.get().asObject(bs);
}
public static byte[] encode(Object obj, ClassLoader classLoader) {
byte[] result;
FSTConfiguration current = conf.get();
@@ -34,6 +29,11 @@ public class Serializer {
return result;
}
@SuppressWarnings("unchecked")
public static <T> T decode(byte[] bs) {
return (T) conf.get().asObject(bs);
}
@SuppressWarnings("unchecked")
public static <T> T decode(byte[] bs, ClassLoader classLoader) {
Object object;
@@ -15,16 +15,20 @@ import org.ray.util.logger.RayLog;
//
public class UniqueIdHelper {
public enum Type {
OBJECT,
TASK,
ACTOR,
}
private static final ThreadLocal<ByteBuffer> longBuffer = ThreadLocal
.withInitial(() -> ByteBuffer.allocate(Long.SIZE / Byte.SIZE));
private static final ThreadLocal<Random> rand = ThreadLocal.withInitial(Random::new);
private static final ThreadLocal<Long> randSeed = new ThreadLocal<>();
private static final int batchPos = 0;
private static final int uniquenessPos = Long.SIZE / Byte.SIZE;
private static final int typePos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField typeField = new BitField(0x7);
private static final int testPos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField testField = new BitField(0x1 << 3);
private static final int unionPos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField multipleReturnField = new BitField(0x1 << 8);
private static final BitField isReturnIdField = new BitField(0x1 << 9);
private static final BitField withinTaskIndexField = new BitField(0xFFFFFC00);
public static void setThreadRandomSeed(long seed) {
if (randSeed.get() != null) {
@@ -43,123 +47,80 @@ public class UniqueIdHelper {
UniqueID currentTaskId = WorkerContext.currentTask().taskId;
byte[] bytes;
ByteBuffer lBuffer = longBuffer.get();
ByteBuffer lbuffer = longBuffer.get();
// similar to task id generation (see nextTaskId below)
if (!currentTaskId.isNil()) {
ByteBuffer rbb = ByteBuffer.wrap(currentTaskId.getBytes());
rbb.order(ByteOrder.LITTLE_ENDIAN);
long cid = rbb.getLong(uniquenessPos);
byte[] cbuffer = lBuffer.putLong(cid).array();
byte[] cbuffer = lbuffer.putLong(cid).array();
bytes = MD5Digestor.digest(cbuffer, WorkerContext.nextCallIndex());
} else {
long cid = rand.get().nextLong();
byte[] cbuffer = lBuffer.putLong(cid).array();
byte[] cbuffer = lbuffer.putLong(cid).array();
bytes = MD5Digestor.digest(cbuffer, rand.get().nextLong());
}
lBuffer.clear();
lbuffer.clear();
lBuffer.put(bytes, 0, Long.SIZE / Byte.SIZE);
long r = lBuffer.getLong();
lBuffer.clear();
lbuffer.put(bytes, 0, Long.SIZE / Byte.SIZE);
long r = lbuffer.getLong();
lbuffer.clear();
return r;
}
private static final int batchPos = 0;
private static void setBatch(ByteBuffer bb, long batchId) {
bb.putLong(batchPos, batchId);
}
private static long getBatch(ByteBuffer bb) {
return bb.getLong(batchPos);
}
private static final int uniquenessPos = Long.SIZE / Byte.SIZE;
private static void setUniqueness(ByteBuffer bb, long uniqueness) {
bb.putLong(uniquenessPos, uniqueness);
}
private static void setUniqueness(ByteBuffer bb, byte[] uniqueness) {
for (int i = 0; i < Long.SIZE / Byte.SIZE; ++i) {
bb.put(uniquenessPos + i, uniqueness[i]);
}
}
private static long getUniqueness(ByteBuffer bb) {
return bb.getLong(uniquenessPos);
}
private static final int typePos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField typeField = new BitField(0x7);
private static void setType(ByteBuffer bb, Type type) {
byte v = bb.get(typePos);
v = (byte) typeField.setValue(v, type.ordinal());
bb.put(typePos, v);
}
private static Type getType(ByteBuffer bb) {
byte v = bb.get(typePos);
return Type.values()[typeField.getValue(v)];
}
private static final int testPos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField testField = new BitField(0x1 << 3);
private static void setIsTest(ByteBuffer bb, boolean isTest) {
byte v = bb.get(testPos);
v = (byte) testField.setValue(v, isTest ? 1 : 0);
bb.put(testPos, v);
}
private static boolean getIsTest(ByteBuffer bb) {
byte v = bb.get(testPos);
return testField.getValue(v) == 1;
}
private static final int unionPos = 2 * Long.SIZE / Byte.SIZE;
private static final BitField multipleReturnField = new BitField(0x1 << 8);
private static final BitField isReturnIdField = new BitField(0x1 << 9);
private static final BitField withinTaskIndexField = new BitField(0xFFFFFC00);
private static void setHasMultipleReturn(ByteBuffer bb, int hasMultipleReturnOrNot) {
int v = bb.getInt(unionPos);
v = multipleReturnField.setValue(v, hasMultipleReturnOrNot);
bb.putInt(unionPos, v);
}
private static int getHasMultipleReturn(ByteBuffer bb) {
int v = bb.getInt(unionPos);
return multipleReturnField.getValue(v);
}
private static void setIsReturn(ByteBuffer bb, int isReturn) {
int v = bb.getInt(unionPos);
v = isReturnIdField.setValue(v, isReturn);
bb.putInt(unionPos, v);
}
private static int getIsReturn(ByteBuffer bb) {
int v = bb.getInt(unionPos);
return isReturnIdField.getValue(v);
}
private static void setWithinTaskIndex(ByteBuffer bb, int index) {
int v = bb.getInt(unionPos);
v = withinTaskIndexField.setValue(v, index);
bb.putInt(unionPos, v);
}
private static int getWithinTaskIndex(ByteBuffer bb) {
int v = bb.getInt(unionPos);
return withinTaskIndexField.getValue(v);
}
public static void setTest(UniqueID id, boolean isTest) {
ByteBuffer bb = ByteBuffer.wrap(id.getBytes());
setIsTest(bb, isTest);
}
private static void setIsTest(ByteBuffer bb, boolean isTest) {
byte v = bb.get(testPos);
v = (byte) testField.setValue(v, isTest ? 1 : 0);
bb.put(testPos, v);
}
public static long getUniqueness(UniqueID id) {
ByteBuffer bb = ByteBuffer.wrap(id.getBytes());
bb.order(ByteOrder.LITTLE_ENDIAN);
return getUniqueness(bb);
}
private static long getUniqueness(ByteBuffer bb) {
return bb.getLong(uniquenessPos);
}
public static UniqueID taskComputeReturnId(
UniqueID uid,
int returnIndex,
boolean hasMultipleReturn
) {
return objectIdFromTaskId(uid, true, hasMultipleReturn, returnIndex);
}
private static UniqueID objectIdFromTaskId(UniqueID taskId,
boolean isReturn,
boolean hasMultipleReturn,
int index
boolean isReturn,
boolean hasMultipleReturn,
int index
) {
UniqueID oid = newZero();
ByteBuffer rbb = ByteBuffer.wrap(taskId.getBytes());
@@ -181,23 +142,46 @@ public class UniqueIdHelper {
return new UniqueID(b);
}
public static void setTest(UniqueID id, boolean isTest) {
ByteBuffer bb = ByteBuffer.wrap(id.getBytes());
setIsTest(bb, isTest);
private static void setBatch(ByteBuffer bb, long batchId) {
bb.putLong(batchPos, batchId);
}
public static long getUniqueness(UniqueID id) {
ByteBuffer bb = ByteBuffer.wrap(id.getBytes());
bb.order(ByteOrder.LITTLE_ENDIAN);
return getUniqueness(bb);
private static long getBatch(ByteBuffer bb) {
return bb.getLong(batchPos);
}
public static UniqueID taskComputeReturnId(
UniqueID uid,
int returnIndex,
boolean hasMultipleReturn
) {
return objectIdFromTaskId(uid, true, hasMultipleReturn, returnIndex);
private static void setUniqueness(ByteBuffer bb, long uniqueness) {
bb.putLong(uniquenessPos, uniqueness);
}
private static void setUniqueness(ByteBuffer bb, byte[] uniqueness) {
for (int i = 0; i < Long.SIZE / Byte.SIZE; ++i) {
bb.put(uniquenessPos + i, uniqueness[i]);
}
}
private static void setType(ByteBuffer bb, Type type) {
byte v = bb.get(typePos);
v = (byte) typeField.setValue(v, type.ordinal());
bb.put(typePos, v);
}
private static void setHasMultipleReturn(ByteBuffer bb, int hasMultipleReturnOrNot) {
int v = bb.getInt(unionPos);
v = multipleReturnField.setValue(v, hasMultipleReturnOrNot);
bb.putInt(unionPos, v);
}
private static void setIsReturn(ByteBuffer bb, int isReturn) {
int v = bb.getInt(unionPos);
v = isReturnIdField.setValue(v, isReturn);
bb.putInt(unionPos, v);
}
private static void setWithinTaskIndex(ByteBuffer bb, int index) {
int v = bb.getInt(unionPos);
v = withinTaskIndexField.setValue(v, index);
bb.putInt(unionPos, v);
}
public static UniqueID taskComputePutId(UniqueID uid, int putIndex) {
@@ -210,6 +194,11 @@ public class UniqueIdHelper {
return getHasMultipleReturn(bb) != 0;
}
private static int getHasMultipleReturn(ByteBuffer bb) {
int v = bb.getInt(unionPos);
return multipleReturnField.getValue(v);
}
public static UniqueID taskIdFromObjectId(UniqueID objectId) {
UniqueID taskId = newZero();
ByteBuffer rbb = ByteBuffer.wrap(objectId.getBytes());
@@ -242,25 +231,24 @@ public class UniqueIdHelper {
// setup unique id (task id)
byte[] idBytes;
ByteBuffer lBuffer = longBuffer.get();
ByteBuffer lbuffer = longBuffer.get();
// if inside a task
if (!currentTaskId.isNil()) {
long cid = rbb.getLong(uniquenessPos);
byte[] cbuffer = lBuffer.putLong(cid).array();
byte[] cbuffer = lbuffer.putLong(cid).array();
idBytes = MD5Digestor.digest(cbuffer, WorkerContext.nextCallIndex());
// if not
} else {
long cid = rand.get().nextLong();
byte[] cbuffer = lBuffer.putLong(cid).array();
byte[] cbuffer = lbuffer.putLong(cid).array();
idBytes = MD5Digestor.digest(cbuffer, rand.get().nextLong());
}
setUniqueness(wbb, idBytes);
lBuffer.clear();
lbuffer.clear();
return taskId;
}
public static boolean isLambdaFunction(UniqueID functionId) {
ByteBuffer wbb = ByteBuffer.wrap(functionId.getBytes());
wbb.order(ByteOrder.LITTLE_ENDIAN);
@@ -285,4 +273,10 @@ public class UniqueIdHelper {
wbb.order(ByteOrder.LITTLE_ENDIAN);
return getUniqueness(wbb) == 0;
}
public enum Type {
OBJECT,
TASK,
ACTOR,
}
}
@@ -72,32 +72,60 @@ public class Worker {
}
private RayObjects taskSubmit(UniqueID taskId,
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args) {
RayInvocation ri = new RayInvocation(fid, args);
return scheduler.submit(taskId, ri, returnCount, multiReturn);
public RayObjects rpc(UniqueID taskId, Callable funcRun, int returnCount, Object[] args) {
byte[] fid = fidFromHook(funcRun);
return submit(taskId, fid, returnCount, false, args);
}
private RayObjects actorTaskSubmit(UniqueID taskId,
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args,
RayActor<?> actor) {
RayInvocation ri = new RayInvocation(fid, args, actor);
RayObjects returnObjs = scheduler.submit(taskId, ri, returnCount + 1, multiReturn);
actor.setTaskCursor(returnObjs.pop().getId());
return returnObjs;
public RayObjects rpc(UniqueID taskId, UniqueID functionId, Class<?> funcCls, Serializable lambda,
int returnCount, Object[] args) {
byte[] fid = functionId.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
ls[args.length] = funcCls.getName();
ls[args.length + 1] = SerializationUtils.serialize(lambda);
return submit(taskId, fid, returnCount, false, ls);
}
public RayObjects rpc(UniqueID taskId, RayActor<?> actor, Callable funcRun, int returnCount,
Object[] args) {
byte[] fid = fidFromHook(funcRun);
return actorTaskSubmit(taskId, fid, returnCount, false, args, actor);
}
public RayObjects rpc(UniqueID taskId, UniqueID functionId, RayActor<?> actor, Class<?> funcCls,
Serializable lambda, int returnCount, Object[] args) {
byte[] fid = functionId.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
ls[args.length] = funcCls.getName();
ls[args.length + 1] = SerializationUtils.serialize(lambda);
return actorTaskSubmit(taskId, fid, returnCount, false, ls, actor);
}
private byte[] fidFromHook(Callable funcRun) {
MethodSwitcher.IsRemoteCall.set(true);
try {
funcRun.run();
} catch (Throwable e) {
RayLog.core.error(
"make sure you are using code rewritten using the rewrite tool, see JarRewriter for"
+ " options", e);
throw new RuntimeException("make sure you are using code rewritten using the rewrite tool,"
+ "see JarRewriter for options");
}
byte[] fid = MethodSwitcher.MethodId.get();//get the identity of function from hook
MethodSwitcher.IsRemoteCall.set(false);
return fid;
}
private RayObjects submit(UniqueID taskId,
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args) {
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args) {
if (taskId == null) {
taskId = UniqueIdHelper.nextTaskId(-1);
}
@@ -108,32 +136,38 @@ public class Worker {
}
}
public RayObjects rpc(UniqueID taskId, Callable funcRun, int returnCount, Object[] args) {
byte[] fid = fidFromHook(funcRun);
return submit(taskId, fid, returnCount, false, args);
private RayObjects actorTaskSubmit(UniqueID taskId,
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args,
RayActor<?> actor) {
RayInvocation ri = new RayInvocation(fid, args, actor);
RayObjects returnObjs = scheduler.submit(taskId, ri, returnCount + 1, multiReturn);
actor.setTaskCursor(returnObjs.pop().getId());
return returnObjs;
}
private RayObjects taskSubmit(UniqueID taskId,
byte[] fid,
int returnCount,
boolean multiReturn,
Object[] args) {
RayInvocation ri = new RayInvocation(fid, args);
return scheduler.submit(taskId, ri, returnCount, multiReturn);
}
public RayObjects rpcCreateActor(UniqueID taskId, UniqueID createActorId, Callable funcRun,
int returnCount,
Object[] args) {
int returnCount,
Object[] args) {
byte[] fid = fidFromHook(funcRun);
RayInvocation ri = new RayInvocation(fid, new Object[]{});
RayInvocation ri = new RayInvocation(fid, new Object[] {});
return scheduler.submit(taskId, createActorId, ri, returnCount, false);
}
public RayObjects rpc(UniqueID taskId, UniqueID functionId, Class<?> funcCls, Serializable lambda,
int returnCount, Object[] args) {
byte[] fid = functionId.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
ls[args.length] = funcCls.getName();
ls[args.length + 1] = SerializationUtils.serialize(lambda);
return submit(taskId, fid, returnCount, false, ls);
}
public RayObjects rpcCreateActor(UniqueID taskId, UniqueID createActorId, UniqueID functionId,
Class<?> funcCls, Serializable lambda, int returnCount, Object[] args) {
Class<?> funcCls, Serializable lambda, int returnCount,
Object[] args) {
byte[] fid = functionId.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
@@ -144,26 +178,9 @@ public class Worker {
return scheduler.submit(taskId, createActorId, ri, returnCount, false);
}
public RayObjects rpc(UniqueID taskId, RayActor<?> actor, Callable funcRun, int returnCount,
Object[] args) {
byte[] fid = fidFromHook(funcRun);
return actorTaskSubmit(taskId, fid, returnCount, false, args, actor);
}
public RayObjects rpc(UniqueID taskId, UniqueID functionId, RayActor<?> actor, Class<?> funcCls,
Serializable lambda, int returnCount, Object[] args) {
byte[] fid = functionId.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
ls[args.length] = funcCls.getName();
ls[args.length + 1] = SerializationUtils.serialize(lambda);
return actorTaskSubmit(taskId, fid, returnCount, false, ls, actor);
}
public <R, RID> RayMap<RID, R> rpcWithReturnLabels(UniqueID taskId, Callable funcRun,
Collection<RID> returnids,
Object[] args) {
public <R, RIDT> RayMap<RIDT, R> rpcWithReturnLabels(UniqueID taskId, Callable funcRun,
Collection<RIDT> returnids,
Object[] args) {
byte[] fid = fidFromHook(funcRun);
if (taskId == null) {
taskId = UniqueIdHelper.nextTaskId(-1);
@@ -171,10 +188,11 @@ public class Worker {
return scheduler.submit(taskId, new RayInvocation(fid, args), returnids);
}
public <R, RID> RayMap<RID, R> rpcWithReturnLabels(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Collection<RID> returnids,
Object[] args) {
byte[] fid = UniqueID.nil.getBytes();
public <R, RIDT> RayMap<RIDT, R> rpcWithReturnLabels(UniqueID taskId, Class<?> funcCls,
Serializable lambda,
Collection<RIDT> returnids,
Object[] args) {
final byte[] fid = UniqueID.nil.getBytes();
if (taskId == null) {
taskId = UniqueIdHelper.nextTaskId(-1);
}
@@ -187,8 +205,8 @@ public class Worker {
}
public <R> RayList<R> rpcWithReturnIndices(UniqueID taskId, Callable funcRun,
Integer returnCount,
Object[] args) {
Integer returnCount,
Object[] args) {
byte[] fid = fidFromHook(funcRun);
RayObjects objs = submit(taskId, fid, returnCount, true, args);
RayList<R> rets = new RayList<>();
@@ -200,8 +218,8 @@ public class Worker {
@SuppressWarnings("unchecked")
public <R> RayList<R> rpcWithReturnIndices(UniqueID taskId, Class<?> funcCls,
Serializable lambda, Integer returnCount,
Object[] args) {
Serializable lambda, Integer returnCount,
Object[] args) {
byte[] fid = UniqueID.nil.getBytes();
Object[] ls = Arrays.copyOf(args, args.length + 2);
ls[args.length] = funcCls.getName();
@@ -216,27 +234,11 @@ public class Worker {
return rets;
}
private byte[] fidFromHook(Callable funcRun) {
MethodSwitcher.IsRemoteCall.set(true);
try {
funcRun.run();
} catch (Throwable e) {
RayLog.core.error(
"make sure you are using code rewritten using the rewrite tool, see JarRewriter for options",
e);
throw new RuntimeException(
"make sure you are using code rewritten using the rewrite tool, see JarRewriter for options");
}
byte[] fid = MethodSwitcher.MethodId.get();//get the identity of function from hook
MethodSwitcher.IsRemoteCall.set(false);
return fid;
}
public UniqueID getCurrentTaskID() {
public UniqueID getCurrentTaskId() {
return WorkerContext.currentTask().taskId;
}
public UniqueID getCurrentTaskNextPutID() {
public UniqueID getCurrentTaskNextPutId() {
return UniqueIdHelper.taskComputePutId(
WorkerContext.currentTask().taskId, WorkerContext.nextPutIndex());
}
@@ -6,33 +6,28 @@ import org.ray.spi.model.TaskSpec;
public class WorkerContext {
/**
* id of worker
*/
public static UniqueID workerID = UniqueID.randomID();
/**
* current doing task
*/
private TaskSpec currentTask;
/**
* current app classloader
*/
private ClassLoader currentClassLoader;
/**
* how many puts done by current task
*/
private int currentTaskPutCount;
/**
* how many calls done by current task
*/
private int currentTaskCallCount;
private static final ThreadLocal<WorkerContext> currentWorkerCtx =
ThreadLocal.withInitial(() -> init(RayRuntime.getParams()));
/**
* id of worker.
*/
public static UniqueID workerID = UniqueID.randomId();
/**
* current doing task.
*/
private TaskSpec currentTask;
/**
* current app classloader.
*/
private ClassLoader currentClassLoader;
/**
* how many puts done by current task.
*/
private int currentTaskPutCount;
/**
* how many calls done by current task.
*/
private int currentTaskCallCount;
public static WorkerContext init(RayParameters params) {
WorkerContext ctx = new WorkerContext();
@@ -48,10 +43,6 @@ public class WorkerContext {
return ctx;
}
public static WorkerContext get() {
return currentWorkerCtx.get();
}
public static void prepare(TaskSpec task, ClassLoader classLoader) {
WorkerContext wc = get();
wc.currentTask = task;
@@ -60,6 +51,10 @@ public class WorkerContext {
wc.currentClassLoader = classLoader;
}
public static WorkerContext get() {
return currentWorkerCtx.get();
}
public static TaskSpec currentTask() {
return get().currentTask;
}
@@ -72,7 +67,7 @@ public class WorkerContext {
return ++get().currentTaskCallCount;
}
public static UniqueID currentWorkerID() {
public static UniqueID currentWorkerId() {
return WorkerContext.workerID;
}
@@ -6,7 +6,7 @@ import org.ray.util.config.AConfig;
import org.ray.util.config.ConfigReader;
/**
* Runtime parameters of Ray process
* Runtime parameters of Ray process.
*/
public class RayParameters {
@@ -115,7 +115,8 @@ public class RayParameters {
@AConfig(comment = "whether to disable process failover")
public boolean disable_process_failover = false;
@AConfig(comment = "the max size of each file of java worker log, could be set as 10KB, 10MB, 1GB or something similar")
@AConfig(comment = "the max size of each file of java worker log, could be set as 10KB, 10MB, "
+ "1GB or something similar")
public String max_java_log_file_size = "500MB";
@AConfig(comment = "delay seconds under onebox before app logic for debugging")
@@ -124,7 +125,8 @@ public class RayParameters {
public RayParameters(ConfigReader config) {
if (null != config) {
String networkInterface = config.getStringValue("ray.java", "network_interface", null,
"Network interface to be specified for host ip address(e.g., en0, eth0), may use ifconfig to get options");
"Network interface to be specified for host ip address(e.g., en0, eth0), may use "
+ "ifconfig to get options");
node_ip_address = NetworkUtil.getIpAddress(networkInterface);
config.readObject("ray.java.start", this, this);
}
@@ -5,24 +5,21 @@ public enum RunMode {
SINGLE_BOX(true, false, true, true), // remote lambda, dev path, native runtime
CLUSTER(false, true, false, true); // static rewrite, deploy path, naive runtime
private final boolean remoteLambda;
private final boolean staticRewrite;
private final boolean devPathManager;
private final boolean nativeRuntime;
RunMode(boolean remoteLambda, boolean staticRewrite, boolean devPathManager,
boolean nativeRuntime) {
boolean nativeRuntime) {
this.remoteLambda = remoteLambda;
this.staticRewrite = staticRewrite;
this.devPathManager = devPathManager;
this.nativeRuntime = nativeRuntime;
}
private final boolean remoteLambda;
private final boolean staticRewrite;
private final boolean devPathManager;
private final boolean nativeRuntime;
/**
* Getter method for property <tt>remoteLambda</tt>
* Getter method for property <tt>remoteLambda</tt>.
*
* @return property value of remoteLambda
*/
@@ -31,7 +28,7 @@ public enum RunMode {
}
/**
* Getter method for property <tt>staticRewrite</tt>
* Getter method for property <tt>staticRewrite</tt>.
*
* @return property value of staticRewrite
*/
@@ -40,7 +37,7 @@ public enum RunMode {
}
/**
* Getter method for property <tt>devPathManager</tt>
* Getter method for property <tt>devPathManager</tt>.
*
* @return property value of devPathManager
*/
@@ -49,7 +46,7 @@ public enum RunMode {
}
/**
* Getter method for property <tt>nativeRuntime</tt>
* Getter method for property <tt>nativeRuntime</tt>.
*
* @return property value of nativeRuntime
*/
@@ -29,27 +29,27 @@ public class LocalSchedulerProxy {
}
public RayObjects submit(UniqueID taskId, RayInvocation invocation, int returnCount,
boolean multiReturn) {
boolean multiReturn) {
UniqueID[] returnIds = buildReturnIds(taskId, returnCount, multiReturn);
this.doSubmit(invocation, taskId, returnIds, UniqueID.nil);
return new RayObjects(returnIds);
}
public RayObjects submit(UniqueID taskId, UniqueID createActorId, RayInvocation invocation,
int returnCount, boolean multiReturn) {
int returnCount, boolean multiReturn) {
UniqueID[] returnIds = buildReturnIds(taskId, returnCount, multiReturn);
this.doSubmit(invocation, taskId, returnIds, createActorId);
return new RayObjects(returnIds);
}
public <R, RID> RayMap<RID, R> submit(UniqueID taskId, RayInvocation invocation,
Collection<RID> userReturnIds) {
public <R, RIDT> RayMap<RIDT, R> submit(UniqueID taskId, RayInvocation invocation,
Collection<RIDT> userReturnIds) {
UniqueID[] returnIds = buildReturnIds(taskId, userReturnIds.size(), true);
RayMap<RID, R> ret = new RayMap<>();
Map<RID, UniqueID> returnidmapArg = new HashMap<>();
RayMap<RIDT, R> ret = new RayMap<>();
Map<RIDT, UniqueID> returnidmapArg = new HashMap<>();
int index = 0;
for (RID userReturnId : userReturnIds) {
for (RIDT userReturnId : userReturnIds) {
if (returnidmapArg.containsKey(userReturnId)) {
RayLog.core.error("TaskId " + taskId + " userReturnId is duplicate " + userReturnId);
continue;
@@ -63,10 +63,10 @@ public class LocalSchedulerProxy {
System.arraycopy(returnIds, 0, newReturnIds, 0, index);
returnIds = newReturnIds;
}
Object args[] = invocation.getArgs();
Object[] args = invocation.getArgs();
Object[] newargs;
if (args == null) {
newargs = new Object[]{returnidmapArg};
newargs = new Object[] {returnidmapArg};
} else {
newargs = new Object[args.length + 1];
newargs[0] = returnidmapArg;
@@ -77,10 +77,19 @@ public class LocalSchedulerProxy {
return ret;
}
private void doSubmit(RayInvocation invocation, UniqueID taskId,
UniqueID[] returnIds, UniqueID createActorId) {
// build Object IDs of return values.
private UniqueID[] buildReturnIds(UniqueID taskId, int returnCount, boolean multiReturn) {
UniqueID[] returnIds = new UniqueID[returnCount];
for (int k = 0; k < returnCount; k++) {
returnIds[k] = UniqueIdHelper.taskComputeReturnId(taskId, k, multiReturn);
}
return returnIds;
}
TaskSpec current = WorkerContext.currentTask();
private void doSubmit(RayInvocation invocation, UniqueID taskId,
UniqueID[] returnIds, UniqueID createActorId) {
final TaskSpec current = WorkerContext.currentTask();
TaskSpec task = new TaskSpec();
task.actorCounter = invocation.getActor().increaseTaskCounter();
task.actorId = invocation.getActor().getId();
@@ -101,19 +110,10 @@ public class LocalSchedulerProxy {
"Task " + taskId + " submitted, functionId = " + task.functionId + " actorId = "
+ task.actorId + ", driverId = " + task.driverId + ", return_ids = " + Arrays
.toString(returnIds) + ", currentTask " + WorkerContext.currentTask().taskId
+ " cursorId = " + task.cursorId);
+ " cursorId = " + task.cursorId);
scheduler.submitTask(task);
}
// build Object IDs of return values.
private UniqueID[] buildReturnIds(UniqueID taskId, int returnCount, boolean multiReturn) {
UniqueID[] returnIds = new UniqueID[returnCount];
for (int k = 0; k < returnCount; k++) {
returnIds[k] = UniqueIdHelper.taskComputeReturnId(taskId, k, multiReturn);
}
return returnIds;
}
public TaskSpec getTask() {
TaskSpec ts = scheduler.getTaskTodo();
RayLog.core.info("Task " + ts.taskId.toString() + " received");
@@ -7,10 +7,12 @@ import org.ray.hook.runtime.LoadedFunctions;
import org.ray.util.logger.RayLog;
/**
* mock version of remote function manager using local loaded jars + runtime hook
* mock version of remote function manager using local loaded jars + runtime hook.
*/
public class NopRemoteFunctionManager implements RemoteFunctionManager {
private final LoadedFunctions loadedFunctions = new LoadedFunctions();
public NopRemoteFunctionManager(UniqueID driverId) {
//onLoad(driverId, Agent.hookedMethods);
//Agent.consumers.add(m -> { this.onLoad(m); });
@@ -48,17 +50,6 @@ public class NopRemoteFunctionManager implements RemoteFunctionManager {
// nothing to do
}
private void onLoad(UniqueID driverId, Set<MethodId> methods) {
//assert (startupDriverId().equals(driverId));
for (MethodId mid : methods) {
onLoad(mid);
}
}
private void onLoad(MethodId mid) {
loadedFunctions.functions.add(mid);
}
@Override
public LoadedFunctions loadFunctions(UniqueID driverId) {
//assert (startupDriverId().equals(driverId));
@@ -76,5 +67,14 @@ public class NopRemoteFunctionManager implements RemoteFunctionManager {
//assert (startupDriverId().equals(driverId));
}
private final LoadedFunctions loadedFunctions = new LoadedFunctions();
private void onLoad(UniqueID driverId, Set<MethodId> methods) {
//assert (startupDriverId().equals(driverId));
for (MethodId mid : methods) {
onLoad(mid);
}
}
private void onLoad(MethodId mid) {
loadedFunctions.functions.add(mid);
}
}
@@ -2,7 +2,6 @@ package org.ray.spi;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.RayList;
@@ -19,19 +18,21 @@ import org.ray.util.exception.TaskExecutionException;
*/
public class ObjectStoreProxy {
public enum GetStatus {SUCCESS, FAILED}
private final ObjectStoreLink store;
private final int GET_TIMEOUT_MS = 1000;
private final int getTimeoutMs = 1000;
public ObjectStoreProxy(ObjectStoreLink store) {
this.store = store;
}
public <T> Pair<T, GetStatus> get(UniqueID id, int timeout_ms, boolean isMetadata)
public <T> Pair<T, GetStatus> get(UniqueID objectId, boolean isMetadata)
throws TaskExecutionException {
byte[] obj = store.get(id.getBytes(), timeout_ms, isMetadata);
return get(objectId, getTimeoutMs, isMetadata);
}
public <T> Pair<T, GetStatus> get(UniqueID id, int timeoutMs, boolean isMetadata)
throws TaskExecutionException {
byte[] obj = store.get(id.getBytes(), timeoutMs, isMetadata);
if (obj != null) {
T t = Serializer.decode(obj, WorkerContext.currentClassLoader());
store.release(id.getBytes());
@@ -44,9 +45,9 @@ public class ObjectStoreProxy {
}
}
public <T> Pair<T, GetStatus> get(UniqueID objectId, boolean isMetadata)
public <T> List<Pair<T, GetStatus>> get(List<UniqueID> objectIds, boolean isMetadata)
throws TaskExecutionException {
return get(objectId, GET_TIMEOUT_MS, isMetadata);
return get(objectIds, getTimeoutMs, isMetadata);
}
public <T> List<Pair<T, GetStatus>> get(List<UniqueID> ids, int timeoutMs, boolean isMetadata)
@@ -69,9 +70,13 @@ public class ObjectStoreProxy {
return ret;
}
public <T> List<Pair<T, GetStatus>> get(List<UniqueID> objectIds, boolean isMetadata)
throws TaskExecutionException {
return get(objectIds, GET_TIMEOUT_MS, isMetadata);
private static byte[][] getIdBytes(List<UniqueID> objectIds) {
int size = objectIds.size();
byte[][] ids = new byte[size][];
for (int i = 0; i < size; i++) {
ids[i] = objectIds.get(i).getBytes();
}
return ids;
}
public void put(UniqueID id, Object obj, Object metadata) {
@@ -111,12 +116,7 @@ public class ObjectStoreProxy {
}
private static byte[][] getIdBytes(List<UniqueID> objectIds) {
int size = objectIds.size();
byte[][] ids = new byte[size][];
for (int i = 0; i < size; i++) {
ids[i] = objectIds.get(i).getBytes();
}
return ids;
public enum GetStatus {
SUCCESS, FAILED
}
}
@@ -4,13 +4,12 @@ import org.ray.api.UniqueID;
import org.ray.hook.runtime.LoadedFunctions;
/**
* register and load functions from function table
* register and load functions from function table.
*/
public interface RemoteFunctionManager {
/**
* register <resourceId, resource> mapping, and upload resource
*
/*
* register <resourceId, resource> mapping, and upload resource.
* this function is invoked by app proxy or other stand-alone tools it should detect for
* duplication first though
*
@@ -20,7 +19,7 @@ public interface RemoteFunctionManager {
UniqueID registerResource(byte[] resourceZip);
/**
* download resource content
* download resource content.
*
* @return resource content
*/
@@ -28,44 +27,40 @@ public interface RemoteFunctionManager {
/**
* remove resource by its hash id
*
* be careful of invoking this function to make sure it is no longer used
* be careful of invoking this function to make sure it is no longer used.
*
* @param resourceId SHA-1 hash of the resource zip bytes
*/
void unregisterResource(UniqueID resourceId);
/**
* register the <driver, resource> mapping to repo
*
/*
* register the <driver, resource> mapping to repo,
* this function is invoked by whoever initiates the driver id
*/
void registerApp(UniqueID driverId, UniqueID resourceId);
/**
* get the resourceId of one app
* get the resourceId of one app.
*
* @return resourceId of the app driver
*/
UniqueID getAppResourceId(UniqueID driverId);
/**
/*
* unregister <dirver, resource> mapping
*
* this function is called when the driver exits or detected dead
*/
void unregisterApp(UniqueID driverId);
/**
* load resource and functions for this driver this function is used by the workers on demand when
* a required function is not found in {@code LocalFunctionManager}
* a required function is not found in {@code LocalFunctionManager}.
*/
LoadedFunctions loadFunctions(UniqueID driverId);
/**
* unload functions for this driver
*
* this function is used by the workers on demand when a driver is dead
* this function is used by the workers on demand when a driver is dead.
*/
void unloadFunctions(UniqueID driverId);
}
@@ -8,21 +8,19 @@ import org.ray.api.UniqueID;
*/
public class RayInvocation {
private static final RayActor<?> nil = new RayActor<>(UniqueID.nil, UniqueID.nil);
/**
* unique id for a method
* unique id for a method.
*
* @see UniqueID
*/
private final byte[] id;
private final RayActor<?> actor;
/**
* function arguments
* function arguments.
*/
private Object[] args;
private final RayActor<?> actor;
private static final RayActor<?> nil = new RayActor<>(UniqueID.nil, UniqueID.nil);
public RayInvocation(byte[] id, Object[] args) {
this(id, args, nil);
}
@@ -3,7 +3,7 @@ package org.ray.spi.model;
import java.lang.reflect.Method;
/**
* method info
* method info.
*/
public class RayMethod {
@@ -11,6 +11,11 @@ public class RayMethod {
public final String fullName;
// TODO: other annotated information
public RayMethod(Method m) {
invokable = m;
fullName = m.getDeclaringClass().getName() + "." + m.getName();
}
public void check() {
for (Class<?> paramCls : invokable.getParameterTypes()) {
if (paramCls.isPrimitive()) {
@@ -19,9 +24,4 @@ public class RayMethod {
}
}
}
public RayMethod(Method m) {
invokable = m;
fullName = m.getDeclaringClass().getName() + "." + m.getName();
}
}