From b7d5c8f2202059701a02cd6d5121bf2622eab1ef Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Mon, 16 Dec 2019 14:04:44 +0800 Subject: [PATCH] [Java] Fix multiple `FunctionManager`s creating multiple `ClassLoader `s (#6434) --- .../org/ray/runtime/AbstractRayRuntime.java | 10 +- .../ray/runtime/DefaultRayRuntimeFactory.java | 8 +- .../java/org/ray/runtime/RayDevRuntime.java | 6 +- .../runtime/RayMultiWorkerNativeRuntime.java | 12 +- .../org/ray/runtime/RayNativeRuntime.java | 7 +- .../functionmanager/FunctionManager.java | 81 ++++++---- .../org/ray/runtime/runner/RunManager.java | 4 + .../ray/api/test/BaseMultiLanguageTest.java | 10 +- .../main/java/org/ray/api/test/BaseTest.java | 6 +- .../org/ray/api/test/ClassLoaderTest.java | 149 ++++++++++++++++++ 10 files changed, 245 insertions(+), 48 deletions(-) create mode 100644 java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 9c39e7a2a..dabb958f8 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -1,10 +1,12 @@ package org.ray.runtime; +import java.util.List; +import java.util.concurrent.Callable; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import java.util.List; -import java.util.concurrent.Callable; + import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; @@ -52,9 +54,9 @@ public abstract class AbstractRayRuntime implements RayRuntime { protected TaskSubmitter taskSubmitter; protected WorkerContext workerContext; - public AbstractRayRuntime(RayConfig rayConfig) { + public AbstractRayRuntime(RayConfig rayConfig, FunctionManager functionManager) { this.rayConfig = rayConfig; - functionManager = new FunctionManager(rayConfig.jobResourcePath); + this.functionManager = functionManager; runtimeContext = new RuntimeContextImpl(this); } diff --git a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java index f95d7cfa4..b03debddc 100644 --- a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java +++ b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java @@ -4,6 +4,7 @@ import org.ray.api.runtime.RayRuntime; import org.ray.api.runtime.RayRuntimeFactory; import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.RunMode; +import org.ray.runtime.functionmanager.FunctionManager; import org.ray.runtime.generated.Common.WorkerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,14 +20,15 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory { public RayRuntime createRayRuntime() { RayConfig rayConfig = RayConfig.create(); try { + FunctionManager functionManager = new FunctionManager(rayConfig.jobResourcePath); RayRuntime runtime; if (rayConfig.runMode == RunMode.SINGLE_PROCESS) { - runtime = new RayDevRuntime(rayConfig); + runtime = new RayDevRuntime(rayConfig, functionManager); } else { if (rayConfig.workerMode == WorkerType.DRIVER) { - runtime = new RayNativeRuntime(rayConfig); + runtime = new RayNativeRuntime(rayConfig, functionManager); } else { - runtime = new RayMultiWorkerNativeRuntime(rayConfig); + runtime = new RayMultiWorkerNativeRuntime(rayConfig, functionManager); } } return runtime; diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index befe0fe08..d34f6fc68 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -1,10 +1,12 @@ package org.ray.runtime; import java.util.concurrent.atomic.AtomicInteger; + import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; import org.ray.runtime.context.LocalModeWorkerContext; +import org.ray.runtime.functionmanager.FunctionManager; import org.ray.runtime.object.LocalModeObjectStore; import org.ray.runtime.task.LocalModeTaskExecutor; import org.ray.runtime.task.LocalModeTaskSubmitter; @@ -17,8 +19,8 @@ public class RayDevRuntime extends AbstractRayRuntime { private AtomicInteger jobCounter = new AtomicInteger(0); - public RayDevRuntime(RayConfig rayConfig) { - super(rayConfig); + public RayDevRuntime(RayConfig rayConfig, FunctionManager functionManager) { + super(rayConfig, functionManager); if (rayConfig.getJobId().isNil()) { rayConfig.setJobId(nextJobId()); } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java index 8c85048c9..47e69cb0e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java @@ -1,8 +1,10 @@ package org.ray.runtime; -import com.google.common.base.Preconditions; import java.util.List; import java.util.concurrent.Callable; + +import com.google.common.base.Preconditions; + import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.RayPyActor; @@ -16,6 +18,7 @@ import org.ray.api.runtime.RayRuntime; import org.ray.api.runtimecontext.RuntimeContext; import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.RunMode; +import org.ray.runtime.functionmanager.FunctionManager; import org.ray.runtime.generated.Common.WorkerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +31,8 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { private static final Logger LOGGER = LoggerFactory.getLogger(RayMultiWorkerNativeRuntime.class); + private final FunctionManager functionManager; + /** * The number of workers per worker process. */ @@ -45,7 +50,8 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { */ private final ThreadLocal currentThreadRuntime = new ThreadLocal<>(); - public RayMultiWorkerNativeRuntime(RayConfig rayConfig) { + public RayMultiWorkerNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) { + this.functionManager = functionManager; Preconditions.checkState( rayConfig.runMode == RunMode.CLUSTER && rayConfig.workerMode == WorkerType.WORKER); Preconditions.checkState(rayConfig.numWorkersPerProcess > 0, @@ -59,7 +65,7 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { for (int i = 0; i < numWorkers; i++) { final int workerIndex = i; threads[i] = new Thread(() -> { - RayNativeRuntime runtime = new RayNativeRuntime(rayConfig); + RayNativeRuntime runtime = new RayNativeRuntime(rayConfig, functionManager); runtimes[workerIndex] = runtime; currentThreadRuntime.set(runtime); runtime.run(); diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 4775553c5..ebbcebded 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -12,6 +12,7 @@ import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; import org.ray.runtime.context.NativeWorkerContext; +import org.ray.runtime.functionmanager.FunctionManager; import org.ray.runtime.gcs.GcsClient; import org.ray.runtime.gcs.GcsClientOptions; import org.ray.runtime.gcs.RedisClient; @@ -90,8 +91,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } } - public RayNativeRuntime(RayConfig rayConfig) { - super(rayConfig); + public RayNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) { + super(rayConfig, functionManager); // Reset library path at runtime. resetLibraryPath(rayConfig); @@ -136,6 +137,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime { manager.cleanup(); manager = null; } + + LOGGER.info("RayNativeRuntime shutdown"); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java b/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java index 230768933..21f91dcde 100644 --- a/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/functionmanager/FunctionManager.java @@ -16,6 +16,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.DirectoryFileFilter; @@ -42,24 +44,26 @@ public class FunctionManager { * Cache from a RayFunc object to its corresponding JavaFunctionDescriptor. Because * `LambdaUtils.getSerializedLambda` is expensive. */ + // If the cache is not thread local, we'll need a lock to protect it, + // which means competition is highly possible. private static final ThreadLocal, JavaFunctionDescriptor>> RAY_FUNC_CACHE = ThreadLocal.withInitial(WeakHashMap::new); /** * Mapping from the job id to the functions that belong to this job. */ - private Map jobFunctionTables = new HashMap<>(); + private ConcurrentMap jobFunctionTables = new ConcurrentHashMap<>(); /** * The resource path which we can load the job's jar resources. */ - private String jobResourcePath; + private final String jobResourcePath; /** * Construct a FunctionManager with the specified job resource path. * * @param jobResourcePath The specified job resource that can store the job's - * resources. + * resources. */ public FunctionManager(String jobResourcePath) { this.jobResourcePath = jobResourcePath; @@ -75,6 +79,8 @@ public class FunctionManager { public RayFunction getFunction(JobId jobId, RayFunc func) { JavaFunctionDescriptor functionDescriptor = RAY_FUNC_CACHE.get().get(func.getClass()); if (functionDescriptor == null) { + // It's OK to not lock here, because it's OK to have multiple JavaFunctionDescriptor instances + // for the same RayFunc instance. SerializedLambda serializedLambda = LambdaUtils.getSerializedLambda(func); final String className = serializedLambda.getImplClass().replace('/', '.'); final String methodName = serializedLambda.getImplMethodName(); @@ -92,35 +98,45 @@ public class FunctionManager { * @param functionDescriptor The function descriptor. * @return A RayFunction object. */ - public RayFunction getFunction(JobId jobId, JavaFunctionDescriptor functionDescriptor) { + public RayFunction getFunction(JobId jobId, + JavaFunctionDescriptor functionDescriptor) { JobFunctionTable jobFunctionTable = jobFunctionTables.get(jobId); if (jobFunctionTable == null) { - ClassLoader classLoader; - if (Strings.isNullOrEmpty(jobResourcePath)) { - classLoader = getClass().getClassLoader(); - } else { - File resourceDir = new File(jobResourcePath + "/" + jobId.toString() + "/"); - Collection files = FileUtils.listFiles(resourceDir, - new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY); - files.add(resourceDir); - final List urlList = files.stream().map(file -> { - try { - return file.toURI().toURL(); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - classLoader = new URLClassLoader(urlList.toArray(new URL[urlList.size()])); - LOGGER.debug("Resource loaded for job {} from path {}.", jobId, - resourceDir.getAbsolutePath()); + synchronized (this) { + jobFunctionTable = jobFunctionTables.get(jobId); + if (jobFunctionTable == null) { + jobFunctionTable = createJobFunctionTable(jobId); + jobFunctionTables.put(jobId, jobFunctionTable); + } } - - jobFunctionTable = new JobFunctionTable(classLoader); - jobFunctionTables.put(jobId, jobFunctionTable); } return jobFunctionTable.getFunction(functionDescriptor); } + private JobFunctionTable createJobFunctionTable(JobId jobId) { + ClassLoader classLoader; + if (Strings.isNullOrEmpty(jobResourcePath)) { + classLoader = getClass().getClassLoader(); + } else { + File resourceDir = new File(jobResourcePath + "/" + jobId.toString() + "/"); + Collection files = FileUtils.listFiles(resourceDir, + new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY); + files.add(resourceDir); + final List urlList = files.stream().map(file -> { + try { + return file.toURI().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + classLoader = new URLClassLoader(urlList.toArray(new URL[urlList.size()])); + LOGGER.debug("Resource loaded for job {} from path {}.", jobId, + resourceDir.getAbsolutePath()); + } + + return new JobFunctionTable(classLoader); + } + /** * Manages all functions that belong to one job. */ @@ -129,22 +145,27 @@ public class FunctionManager { /** * The job's corresponding class loader. */ - ClassLoader classLoader; + final ClassLoader classLoader; /** * Functions per class, per function name + type descriptor. */ - Map, RayFunction>> functions; + ConcurrentMap, RayFunction>> functions; JobFunctionTable(ClassLoader classLoader) { this.classLoader = classLoader; - this.functions = new HashMap<>(); + this.functions = new ConcurrentHashMap<>(); } RayFunction getFunction(JavaFunctionDescriptor descriptor) { Map, RayFunction> classFunctions = functions.get(descriptor.className); if (classFunctions == null) { - classFunctions = loadFunctionsForClass(descriptor.className); - functions.put(descriptor.className, classFunctions); + synchronized (this) { + classFunctions = functions.get(descriptor.className); + if (classFunctions == null) { + classFunctions = loadFunctionsForClass(descriptor.className); + functions.put(descriptor.className, classFunctions); + } + } } return classFunctions.get(ImmutablePair.of(descriptor.name, descriptor.typeDescriptor)); } diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index f1bfab1f1..2f5fa8b69 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -308,6 +308,10 @@ public class RunManager { cmd.add("-Dray.logging.file.path=" + logFile); } + if (!Strings.isNullOrEmpty(rayConfig.jobResourcePath)) { + cmd.add("-Dray.job.resource-path=" + rayConfig.jobResourcePath); + } + // socket names cmd.add("-Dray.raylet.socket-name=" + rayConfig.rayletSocketName); cmd.add("-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName); diff --git a/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java b/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java index 294430492..d430adee7 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java @@ -1,18 +1,21 @@ package org.ray.api.test; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.io.File; import java.lang.ProcessBuilder.Redirect; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + import org.ray.api.Ray; import org.ray.runtime.util.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -95,6 +98,7 @@ public abstract class BaseMultiLanguageTest { } // Connect to the cluster. + Assert.assertNull(Ray.internal()); System.setProperty("ray.redis.address", "127.0.0.1:6379"); System.setProperty("ray.object-store.socket-name", PLASMA_STORE_SOCKET_NAME); System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME); diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java index 34f44217d..39d049347 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java @@ -1,12 +1,15 @@ package org.ray.api.test; -import com.google.common.collect.ImmutableList; import java.io.File; import java.lang.reflect.Method; import java.util.List; + +import com.google.common.collect.ImmutableList; + import org.ray.api.Ray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -18,6 +21,7 @@ public class BaseTest { @BeforeMethod(alwaysRun = true) public void setUpBase(Method method) { + Assert.assertNull(Ray.internal()); Ray.init(); // These files need to be deleted after each test case. filesToDelete = ImmutableList.of( diff --git a/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java b/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java new file mode 100644 index 000000000..6606d061d --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/ClassLoaderTest.java @@ -0,0 +1,149 @@ +package org.ray.api.test; + +import java.io.File; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Paths; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + +import org.apache.commons.io.FileUtils; +import org.ray.api.Ray; +import org.ray.api.RayActor; +import org.ray.api.RayObject; +import org.ray.api.TestUtils; +import org.ray.api.options.ActorCreationOptions; +import org.ray.runtime.AbstractRayRuntime; +import org.ray.runtime.functionmanager.FunctionDescriptor; +import org.ray.runtime.functionmanager.JavaFunctionDescriptor; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ClassLoaderTest extends BaseTest { + + private final String resourcePath = FileUtils.getTempDirectoryPath() + + "/ray_test/ClassLoaderTest"; + + @BeforeClass + public void setUp() { + // The potential issue of multiple `ClassLoader` instances for the same job on multi-threading + // scenario only occurs if the classes are loaded from the job resource path. + System.setProperty("ray.job.resource-path", resourcePath); + } + + @AfterClass + public void tearDown() { + System.clearProperty("ray.job.resource-path"); + } + + @Test + public void testClassLoaderInMultiThreading() throws Exception { + TestUtils.skipTestUnderSingleProcess(); + Assert.assertTrue(TestUtils.getRuntime().getRayConfig().numWorkersPerProcess > 1); + + final String jobResourcePath = resourcePath + "/" + Ray.getRuntimeContext().getCurrentJobId(); + File jobResourceDir = new File(jobResourcePath); + FileUtils.deleteQuietly(jobResourceDir); + jobResourceDir.mkdirs(); + jobResourceDir.deleteOnExit(); + + // In this test case the class is expected to be loaded from the job resource path, so we need + // to put the compiled class file into the job resource path and load it later. + String testJavaFile = "" + + "import java.lang.management.ManagementFactory;\n" + + "import java.lang.management.RuntimeMXBean;\n" + + "\n" + + "class ClassLoaderTester {\n" + + "\n" + + " static volatile int value;\n" + + "\n" + + " public int getPid() {\n" + + " RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();\n" + + " String name = runtime.getName();\n" + + " int index = name.indexOf(\"@\");\n" + + " if (index != -1) {\n" + + " return Integer.parseInt(name.substring(0, index));\n" + + " } else {\n" + + " throw new RuntimeException(\"parse pid error:\" + name);\n" + + " }\n" + + " }\n" + + "\n" + + " public int increase() throws InterruptedException {\n" + + " return increaseInternal();\n" + + " }\n" + + "\n" + + " public static synchronized int increaseInternal() throws InterruptedException {\n" + + " int oldValue = value;\n" + + " Thread.sleep(10 * 1000);\n" + + " value = oldValue + 1;\n" + + " return value;\n" + + " }\n" + + "\n" + + " public int getClassLoaderHashCode() {\n" + + " return this.getClass().getClassLoader().hashCode();\n" + + " }\n" + + "}"; + + // Write the demo java file to the job resource path. + String javaFilePath = jobResourcePath + "/ClassLoaderTester.java"; + Files.write(Paths.get(javaFilePath), testJavaFile.getBytes()); + + // Compile the java file. + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + int result = compiler.run(null, null, null, "-d", jobResourcePath, javaFilePath); + if (result != 0) { + throw new RuntimeException("Couldn't compile ClassLoaderTester.java."); + } + + FunctionDescriptor constructor = new JavaFunctionDescriptor("ClassLoaderTester", "", + "()V"); + RayActor actor1 = createActor(constructor); + FunctionDescriptor getPid = new JavaFunctionDescriptor("ClassLoaderTester", "getPid", "()I"); + int pid = this.callActorFunction(actor1, getPid, new Object[0], 1).get(); + RayActor actor2; + while (true) { + // Create another actor which share the same process of actor 1. + actor2 = createActor(constructor); + int actor2Pid = this.callActorFunction(actor2, getPid, new Object[0], 1).get(); + if (actor2Pid == pid) { + break; + } + } + + FunctionDescriptor getClassLoaderHashCode = new JavaFunctionDescriptor("ClassLoaderTester", + "getClassLoaderHashCode", + "()I"); + RayObject hashCode1 = callActorFunction(actor1, getClassLoaderHashCode, new Object[0], + 1); + RayObject hashCode2 = callActorFunction(actor2, getClassLoaderHashCode, new Object[0], + 1); + Assert.assertEquals(hashCode1.get(), hashCode2.get()); + + FunctionDescriptor increase = new JavaFunctionDescriptor("ClassLoaderTester", "increase", + "()I"); + RayObject value1 = callActorFunction(actor1, increase, new Object[0], 1); + RayObject value2 = callActorFunction(actor2, increase, new Object[0], 1); + Assert.assertNotEquals(value1.get(), value2.get()); + } + + private RayActor createActor(FunctionDescriptor functionDescriptor) + throws Exception { + Method createActorMethod = AbstractRayRuntime.class.getDeclaredMethod("createActorImpl", + FunctionDescriptor.class, Object[].class, ActorCreationOptions.class); + createActorMethod.setAccessible(true); + return (RayActor) createActorMethod + .invoke(TestUtils.getRuntime(), functionDescriptor, new Object[0], null); + } + + private RayObject callActorFunction(RayActor rayActor, + FunctionDescriptor functionDescriptor, Object[] args, int numReturns) throws Exception { + Method callActorFunctionMethod = AbstractRayRuntime.class.getDeclaredMethod("callActorFunction", + RayActor.class, FunctionDescriptor.class, Object[].class, int.class); + callActorFunctionMethod.setAccessible(true); + return (RayObject) callActorFunctionMethod + .invoke(TestUtils.getRuntime(), rayActor, functionDescriptor, args, numReturns); + } +}