[Java] Replace binary rewrite with Remote Lambda Cache (SerdeLambda) (#2245)

* <feature> : serde lambda

* <feature>:fixed CR

with issue #2245

* <feature>: fixed CR
This commit is contained in:
mylinyuzhi
2018-06-14 03:58:07 +08:00
committed by Philipp Moritz
parent 62de86ff7a
commit fa0ade2bc5
89 changed files with 2633 additions and 7668 deletions
@@ -74,9 +74,8 @@ public class RayNativeRuntime extends RayRuntime {
}
// initialize remote function manager
RemoteFunctionManager funcMgr = params.run_mode.isStaticRewrite()
? new NativeRemoteFunctionManager(kvStore) :
new NopRemoteFunctionManager(params.driver_id);
RemoteFunctionManager funcMgr = params.run_mode.isDevPathManager()
? new NopRemoteFunctionManager(params.driver_id) : new NativeRemoteFunctionManager(kvStore);
// initialize worker context
if (params.worker_mode == WorkerMode.DRIVER) {
@@ -101,6 +100,7 @@ public class RayNativeRuntime extends RayRuntime {
int releaseDelay = RayRuntime.configReader
.getIntegerValue("ray", "plasma_default_release_delay", 0,
"how many release requests should be delayed in plasma client");
ObjectStoreLink plink = new PlasmaClient(params.object_store_name, params
.object_store_manager_name, releaseDelay);
@@ -163,7 +163,7 @@ public class RayNativeRuntime extends RayRuntime {
}
private void registerWorker(boolean isWorker, String nodeIpAddress, String storeName,
String managerName, String schedulerName) {
String managerName, String schedulerName) {
Map<String, String> workerInfo = new HashMap<>();
String workerId = new String(WorkerContext.currentWorkerId().getBytes());
if (!isWorker) {
@@ -193,26 +193,16 @@ public class RayNativeRuntime extends RayRuntime {
UniqueID actorId = UniqueIdHelper.taskComputeReturnId(createTaskId, 0, false);
RayActor<T> actor = new RayActor<>(actorId);
UniqueID cursorId;
if (params.run_mode.isRemoteLambda()) {
RayFunc_2_1<byte[], String, byte[]> createActorLambda = RayNativeRuntime::createActorInActor;
cursorId = worker.rpcCreateActor(
createTaskId,
actorId,
UniqueID.nil,
RayFunc_2_1.class,
createActorLambda,
1,
new Object[] {actorId.getBytes(), cls.getName()}
).getObjs()[0].getId();
} else {
cursorId = worker.rpcCreateActor(
createTaskId,
actorId,
() -> RayNativeRuntime.createActorInActor(null, null),
1,
new Object[] {actorId.getBytes(), cls.getName()}
).getObjs()[0].getId();
}
RayFunc_2_1<byte[], String, byte[]> createActorLambda = RayNativeRuntime::createActorInActor;
cursorId = worker.rpcCreateActor(
createTaskId,
actorId,
RayFunc_2_1.class,
createActorLambda,
1,
new Object[]{actorId.getBytes(), cls.getName()}
).getObjs()[0].getId();
actor.setTaskCursor(cursorId);
return actor;
}
@@ -247,4 +237,4 @@ public class RayNativeRuntime extends RayRuntime {
throw new TaskExecutionException(log, e);
}
}
}
}
@@ -0,0 +1,89 @@
package org.ray.spi.impl;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.ray.util.logger.RayLog;
/**
* load and unload jars from a dir.
*/
public class JarLoader {
public static URLClassLoader loadJars(String dir, boolean explicitLoad) {
// get all jars
Collection<File> jars = FileUtils.listFiles(
new File(dir),
new RegexFileFilter(".*\\.jar"),
DirectoryFileFilter.DIRECTORY
);
return loadJar(jars, explicitLoad);
}
public static void unloadJars(ClassLoader loader) {
// now do nothing, if no ref to the loader and loader's class.
// they would be gc.
}
private static URLClassLoader loadJar(Collection<File> appJars, boolean explicitLoad) {
List<JarFile> jars = new ArrayList<>();
List<URL> urls = new ArrayList<>();
for (File appJar : appJars) {
try {
RayLog.core.info("load jar " + appJar.getAbsolutePath());
JarFile jar = new JarFile(appJar.getAbsolutePath());
jars.add(jar);
urls.add(appJar.toURI().toURL());
} catch (IOException e) {
throw new RuntimeException(
"invalid app jar path: " + appJar.getAbsolutePath() + ", load failed with exception",
e);
}
}
URLClassLoader cl = URLClassLoader.newInstance(urls.toArray(new URL[urls.size()]));
if (!explicitLoad) {
return cl;
}
for (JarFile jar : jars) {
try {
Enumeration<JarEntry> e = jar.entries();
while (e.hasMoreElements()) {
JarEntry je = e.nextElement();
if (je.isDirectory() || !je.getName().endsWith(".class")) {
continue;
}
String className = classNameOfJarEntry(je);
className = className.replace('/', '.');
try {
Class.forName(className, true, cl);
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
}
}
} finally {
IOUtils.closeQuietly(jar);
}
}
return cl;
}
private static String classNameOfJarEntry(JarEntry je) {
return je.getName().substring(0, je.getName().length() - ".class".length());
}
}
@@ -6,13 +6,10 @@ import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
import net.lingala.zip4j.core.ZipFile;
import org.ray.api.UniqueID;
import org.ray.core.RayRuntime;
import org.ray.hook.JarRewriter;
import org.ray.hook.runtime.JarLoader;
import org.ray.hook.runtime.LoadedFunctions;
import org.ray.spi.KeyValueStoreLink;
import org.ray.spi.RemoteFunctionManager;
import org.ray.util.FileUtil;
import org.ray.util.Sha1Digestor;
import org.ray.util.SystemUtil;
import org.ray.util.logger.RayLog;
@@ -21,10 +18,11 @@ import org.ray.util.logger.RayLog;
*/
public class NativeRemoteFunctionManager implements RemoteFunctionManager {
private ConcurrentHashMap<UniqueID, LoadedFunctions> loadedApps = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UniqueID, ClassLoader> loadedApps = new ConcurrentHashMap<>();
private MessageDigest md;
private String appDir = System.getProperty("user.dir") + "/apps";
private KeyValueStoreLink kvStore;
private final String appDir = System.getProperty("user.dir") + "/apps";
private final KeyValueStoreLink kvStore;
public NativeRemoteFunctionManager(KeyValueStoreLink kvStore) throws NoSuchAlgorithmException {
this.kvStore = kvStore;
@@ -38,24 +36,20 @@ public class NativeRemoteFunctionManager implements RemoteFunctionManager {
@Override
public UniqueID registerResource(byte[] resourceZip) {
byte[] digest = md.digest(resourceZip);
byte[] digest = Sha1Digestor.digest(resourceZip);
assert (digest.length == UniqueID.LENGTH);
UniqueID resourceId = new UniqueID(digest);
// TODO: resources must be saved in persistent store
// instead of cache
//if (!Ray.exist(resourceId)) {
//Ray.put(resourceId, resourceZip);
kvStore.set(resourceId.getBytes(), resourceZip, null);
//}
return resourceId;
}
@Override
public byte[] getResource(UniqueID resourceId) {
return kvStore.get(resourceId.getBytes(), null);
//return (byte[])Ray.get(resourceId);
}
@Override
@@ -65,7 +59,6 @@ public class NativeRemoteFunctionManager implements RemoteFunctionManager {
@Override
public void registerApp(UniqueID driverId, UniqueID resourceId) {
//Ray.put(driverId, resourceId);
kvStore.set("App2ResMap", resourceId.toString(), driverId.toString());
}
@@ -80,27 +73,31 @@ public class NativeRemoteFunctionManager implements RemoteFunctionManager {
}
@Override
public LoadedFunctions loadFunctions(UniqueID driverId) {
LoadedFunctions rf = loadedApps.get(driverId);
if (rf == null) {
rf = initLoadedApps(driverId);
public ClassLoader loadResource(UniqueID driverId) {
ClassLoader classLoader = loadedApps.get(driverId);
if (classLoader == null) {
synchronized (this) {
classLoader = loadedApps.get(driverId);
if (classLoader == null) {
classLoader = initLoadedApps(driverId);
}
}
}
return rf;
return classLoader;
}
private synchronized LoadedFunctions initLoadedApps(UniqueID driverId) {
private ClassLoader initLoadedApps(UniqueID driverId) {
try {
RayLog.core.info("initLoadedApps" + driverId.toString());
LoadedFunctions rf = loadedApps.get(driverId);
if (rf == null) {
UniqueID resId = new UniqueID(kvStore.get("App2ResMap", driverId.toString()));
//UniqueID resId = Ray.get(driverId);
ClassLoader cl = loadedApps.get(driverId);
if (cl == null) {
UniqueID resId = new UniqueID(kvStore.get("App2ResMap", driverId.toString()));
byte[] res = getResource(resId);
if (res == null) {
throw new RuntimeException("get resource null, the resId " + resId.toString());
}
RayLog.core.info("ger resource of " + resId.toString() + ", result len " + res.length);
RayLog.core.info("get resource of " + resId.toString() + ", result len " + res.length);
String resPath =
appDir + "/" + driverId.toString() + "/" + String.valueOf(SystemUtil.pid());
File dir = new File(resPath);
@@ -112,11 +109,10 @@ public class NativeRemoteFunctionManager implements RemoteFunctionManager {
FileUtil.bytesToFile(res, zipPath);
ZipFile zipFile = new ZipFile(zipPath);
zipFile.extractAll(resPath);
rf = JarRewriter
.load(resPath, RayRuntime.getInstance().getPaths().java_runtime_rewritten_jars_dir);
loadedApps.put(driverId, rf);
cl = JarLoader.loadJars(resPath, false);
loadedApps.put(driverId, cl);
}
return rf;
return cl;
} catch (Exception e) {
RayLog.rapp.error("load function for " + driverId + " failed, ex = " + e.getMessage(), e);
return null;
@@ -125,11 +121,11 @@ public class NativeRemoteFunctionManager implements RemoteFunctionManager {
@Override
public synchronized void unloadFunctions(UniqueID driverId) {
LoadedFunctions rf = loadedApps.get(driverId);
ClassLoader cl = loadedApps.get(driverId);
try {
JarLoader.unloadJars(rf.loader);
JarLoader.unloadJars(cl);
} catch (Exception e) {
RayLog.rapp.error("unload function for " + driverId + " failed, ex = " + e.getMessage(), e);
}
}
}
}