diff --git a/java/api/src/main/java/org/ray/api/options/ActorCreationOptions.java b/java/api/src/main/java/org/ray/api/options/ActorCreationOptions.java index 56901fe36..7bef5870c 100644 --- a/java/api/src/main/java/org/ray/api/options/ActorCreationOptions.java +++ b/java/api/src/main/java/org/ray/api/options/ActorCreationOptions.java @@ -18,8 +18,8 @@ public class ActorCreationOptions extends BaseTaskOptions { public final int maxConcurrency; private ActorCreationOptions(Map resources, int maxReconstructions, - boolean useDirectCall, String jvmOptions, int maxConcurrency) { - super(resources, useDirectCall); + String jvmOptions, int maxConcurrency) { + super(resources); this.maxReconstructions = maxReconstructions; this.jvmOptions = jvmOptions; this.maxConcurrency = maxConcurrency; @@ -32,7 +32,6 @@ public class ActorCreationOptions extends BaseTaskOptions { private Map resources = new HashMap<>(); private int maxReconstructions = NO_RECONSTRUCTION; - private boolean useDirectCall = DEFAULT_USE_DIRECT_CALL; private String jvmOptions = null; private int maxConcurrency = 1; @@ -46,14 +45,6 @@ public class ActorCreationOptions extends BaseTaskOptions { return this; } - // Since direct call is not fully supported yet (see issue #5559), - // users are not allowed to set the option to true. - // TODO (kfstorm): uncomment when direct call is ready. - // public Builder setUseDirectCall(boolean useDirectCall) { - // this.useDirectCall = useDirectCall; - // return this; - // } - public Builder setJvmOptions(String jvmOptions) { this.jvmOptions = jvmOptions; return this; @@ -61,9 +52,8 @@ public class ActorCreationOptions extends BaseTaskOptions { // The max number of concurrent calls to allow for this actor. // - // This only works with direct actor calls. The max concurrency defaults to 1 - // for threaded execution. Note that the execution order is not guaranteed - // when max_concurrency > 1. + // The max concurrency defaults to 1 for threaded execution. + // Note that the execution order is not guaranteed when max_concurrency > 1. public Builder setMaxConcurrency(int maxConcurrency) { if (maxConcurrency <= 0) { throw new IllegalArgumentException("maxConcurrency must be greater than 0."); @@ -75,7 +65,7 @@ public class ActorCreationOptions extends BaseTaskOptions { public ActorCreationOptions createActorCreationOptions() { return new ActorCreationOptions( - resources, maxReconstructions, useDirectCall, jvmOptions, maxConcurrency); + resources, maxReconstructions, jvmOptions, maxConcurrency); } } diff --git a/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java b/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java index 054e44391..f2a47cd70 100644 --- a/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java +++ b/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java @@ -7,21 +7,14 @@ import java.util.Map; * The options class for RayCall or ActorCreation. */ public abstract class BaseTaskOptions { - // DO NOT set this environment variable. It's only used for test purposes. - // Please use `setUseDirectCall` instead. - public static final boolean DEFAULT_USE_DIRECT_CALL = "1" - .equals(System.getenv("DEFAULT_USE_DIRECT_CALL")); public final Map resources; - public final boolean useDirectCall; - public BaseTaskOptions() { resources = new HashMap<>(); - useDirectCall = DEFAULT_USE_DIRECT_CALL; } - public BaseTaskOptions(Map resources, boolean useDirectCall) { + public BaseTaskOptions(Map resources) { for (Map.Entry entry : resources.entrySet()) { if (entry.getValue().compareTo(0.0) <= 0) { throw new IllegalArgumentException(String.format("Resource capacity should be " + @@ -29,7 +22,6 @@ public abstract class BaseTaskOptions { } } this.resources = resources; - this.useDirectCall = useDirectCall; } } diff --git a/java/api/src/main/java/org/ray/api/options/CallOptions.java b/java/api/src/main/java/org/ray/api/options/CallOptions.java index b4be033fd..1e5b61bf1 100644 --- a/java/api/src/main/java/org/ray/api/options/CallOptions.java +++ b/java/api/src/main/java/org/ray/api/options/CallOptions.java @@ -8,8 +8,8 @@ import java.util.Map; */ public class CallOptions extends BaseTaskOptions { - private CallOptions(Map resources, boolean useDirectCall) { - super(resources, useDirectCall); + private CallOptions(Map resources) { + super(resources); } /** @@ -18,23 +18,14 @@ public class CallOptions extends BaseTaskOptions { public static class Builder { private Map resources = new HashMap<>(); - private boolean useDirectCall = DEFAULT_USE_DIRECT_CALL; public Builder setResources(Map resources) { this.resources = resources; return this; } - // Since direct call is not fully supported yet (see issue #5559), - // users are not allowed to set the option to true. - // TODO (kfstorm): uncomment when direct call is ready. - // public Builder setUseDirectCall(boolean useDirectCall) { - // this.useDirectCall = useDirectCall; - // return this; - // } - public CallOptions createCallOptions() { - return new CallOptions(resources, useDirectCall); + return new CallOptions(resources); } } } diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 27b95d8c4..84990d5b4 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -17,7 +17,6 @@ import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.CallOptions; import org.ray.api.runtime.RayRuntime; import org.ray.api.runtimecontext.RuntimeContext; -import org.ray.runtime.actor.NativeRayActor; import org.ray.runtime.config.RayConfig; import org.ray.runtime.context.RuntimeContextImpl; import org.ray.runtime.context.WorkerContext; @@ -166,7 +165,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayObject callNormalFunction(FunctionDescriptor functionDescriptor, Object[] args, int numReturns, CallOptions options) { List functionArgs = ArgumentsBuilder - .wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false); + .wrap(args, functionDescriptor.getLanguage()); List returnIds = taskSubmitter.submitTask(functionDescriptor, functionArgs, numReturns, options); Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1); @@ -180,7 +179,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayObject callActorFunction(RayActor rayActor, FunctionDescriptor functionDescriptor, Object[] args, int numReturns) { List functionArgs = ArgumentsBuilder - .wrap(args, functionDescriptor.getLanguage(), isDirectCall(rayActor)); + .wrap(args, functionDescriptor.getLanguage()); List returnIds = taskSubmitter.submitActorTask(rayActor, functionDescriptor, functionArgs, numReturns, null); Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1); @@ -194,7 +193,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayActor createActorImpl(FunctionDescriptor functionDescriptor, Object[] args, ActorCreationOptions options) { List functionArgs = ArgumentsBuilder - .wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false); + .wrap(args, functionDescriptor.getLanguage()); if (functionDescriptor.getLanguage() != Language.JAVA && options != null) { Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions)); } @@ -202,13 +201,6 @@ public abstract class AbstractRayRuntime implements RayRuntime { return actor; } - private boolean isDirectCall(RayActor rayActor) { - if (rayActor instanceof NativeRayActor) { - return ((NativeRayActor) rayActor).isDirectCallActor(); - } - return false; - } - public WorkerContext getWorkerContext() { return workerContext; } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 5e1be7bb6..27af5f756 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -9,7 +9,6 @@ import org.apache.commons.io.FileUtils; import org.ray.api.RayActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; -import org.ray.runtime.actor.NativeRayActor; import org.ray.runtime.config.RayConfig; import org.ray.runtime.context.NativeWorkerContext; import org.ray.runtime.functionmanager.FunctionManager; @@ -137,9 +136,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { @Override public void killActor(RayActor actor) { - if (!((NativeRayActor) actor).isDirectCallActor()) { - throw new UnsupportedOperationException("Only direct call actors can be killed."); - } nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes()); } diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java index 426544a61..89a3b1b4d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java @@ -64,10 +64,6 @@ public abstract class NativeRayActor implements RayActor, Externalizable { return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId)); } - public boolean isDirectCallActor() { - return nativeIsDirectCallActor(nativeCoreWorkerPointer, actorId); - } - @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(toBytes()); @@ -119,9 +115,6 @@ public abstract class NativeRayActor implements RayActor, Externalizable { private static native int nativeGetLanguage( long nativeCoreWorkerPointer, byte[] actorId); - private static native boolean nativeIsDirectCallActor( - long nativeCoreWorkerPointer, byte[] actorId); - static native List nativeGetActorCreationTaskFunctionDescriptor( long nativeCoreWorkerPointer, byte[] actorId); diff --git a/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java b/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java index 978a5a56b..03486c835 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java @@ -2,12 +2,8 @@ package org.ray.runtime.task; import java.util.ArrayList; import java.util.List; -import org.ray.api.Ray; import org.ray.api.RayObject; import org.ray.api.id.ObjectId; -import org.ray.api.runtime.RayRuntime; -import org.ray.runtime.AbstractRayRuntime; -import org.ray.runtime.RayMultiWorkerNativeRuntime; import org.ray.runtime.generated.Common.Language; import org.ray.runtime.object.NativeRayObject; import org.ray.runtime.object.ObjectSerializer; @@ -32,27 +28,17 @@ public class ArgumentsBuilder { /** * Convert real function arguments to task spec arguments. */ - public static List wrap(Object[] args, Language language, boolean isDirectCall) { + public static List wrap(Object[] args, Language language) { List ret = new ArrayList<>(); for (Object arg : args) { ObjectId id = null; NativeRayObject value = null; if (arg instanceof RayObject) { - if (isDirectCall) { - throw new IllegalArgumentException( - "Passing RayObject to a direct call actor is not supported."); - } id = ((RayObject) arg).getId(); } else { value = ObjectSerializer.serialize(arg); - if (!isDirectCall && value.data.length > LARGEST_SIZE_PASS_BY_VALUE) { - RayRuntime runtime = Ray.internal(); - if (runtime instanceof RayMultiWorkerNativeRuntime) { - runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime(); - } - id = ((AbstractRayRuntime) runtime).getObjectStore() - .putRaw(value); - value = null; + if (value.data.length > LARGEST_SIZE_PASS_BY_VALUE) { + // Do nothing since we are not support pass by reference in direct call. } } if (language == Language.PYTHON) { diff --git a/java/test.sh b/java/test.sh index d1f259a64..14f5e2a54 100755 --- a/java/test.sh +++ b/java/test.sh @@ -33,9 +33,6 @@ echo "Running tests under cluster mode." # bazel test //java:all_tests --action_env=ENABLE_MULTI_LANGUAGE_TESTS=1 --test_output="errors" || cluster_exit_code=$? ENABLE_MULTI_LANGUAGE_TESTS=1 run_testng java -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml -echo "Running tests under cluster mode with direct actor call turned on." -ENABLE_MULTI_LANGUAGE_TESTS=1 DEFAULT_USE_DIRECT_CALL=1 run_testng java -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml - echo "Running tests under single-process mode." # bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --test_output="errors" || single_exit_code=$? run_testng java -Dray.run-mode="SINGLE_PROCESS" -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml diff --git a/java/test/src/main/java/org/ray/api/TestUtils.java b/java/test/src/main/java/org/ray/api/TestUtils.java index d1cb8d95b..cd7151fcd 100644 --- a/java/test/src/main/java/org/ray/api/TestUtils.java +++ b/java/test/src/main/java/org/ray/api/TestUtils.java @@ -3,7 +3,6 @@ package org.ray.api; import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.function.Supplier; -import org.ray.api.options.ActorCreationOptions; import org.ray.api.runtime.RayRuntime; import org.ray.runtime.AbstractRayRuntime; import org.ray.runtime.RayMultiWorkerNativeRuntime; @@ -20,8 +19,12 @@ public class TestUtils { private static final int WAIT_INTERVAL_MS = 5; + public static boolean isSingleProcessMode() { + return getRuntime().getRayConfig().runMode == RunMode.SINGLE_PROCESS; + } + public static void skipTestUnderSingleProcess() { - if (getRuntime().getRayConfig().runMode == RunMode.SINGLE_PROCESS) { + if (isSingleProcessMode()) { throw new SkipException("This test doesn't work under single-process mode."); } } @@ -32,25 +35,6 @@ public class TestUtils { } } - public static boolean isDirectActorCallEnabled() { - return ActorCreationOptions.DEFAULT_USE_DIRECT_CALL; - } - - public static void skipTestIfDirectActorCallEnabled() { - skipTestIfDirectActorCallEnabled(true); - } - - private static void skipTestIfDirectActorCallEnabled(boolean enabled) { - if (enabled == ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) { - throw new SkipException(String.format("This test doesn't work when direct actor call is %s.", - enabled ? "enabled" : "disabled")); - } - } - - public static void skipTestIfDirectActorCallDisabled() { - skipTestIfDirectActorCallEnabled(false); - } - /** * Wait until the given condition is met. * diff --git a/java/test/src/main/java/org/ray/api/test/ActorConcurrentCallTest.java b/java/test/src/main/java/org/ray/api/test/ActorConcurrentCallTest.java index 76251e7a2..c6a3ee343 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorConcurrentCallTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorConcurrentCallTest.java @@ -30,7 +30,7 @@ public class ActorConcurrentCallTest extends BaseTest { } public void testConcurrentCall() { - TestUtils.skipTestIfDirectActorCallDisabled(); + TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions op = new ActorCreationOptions.Builder() .setMaxConcurrency(3) diff --git a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java index e645cd1bd..1cea5b5f4 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java @@ -61,9 +61,8 @@ public class ActorReconstructionTest extends BaseTest { // Wait for the actor to be killed. TimeUnit.SECONDS.sleep(1); - // Try calling increase on this actor again and check the value is now 4. int value = actor.call(Counter::increase).get(); - Assert.assertEquals(value, options.useDirectCall ? 1 : 4); + Assert.assertEquals(value, 1); Assert.assertTrue(actor.call(Counter::wasCurrentActorReconstructed).get()); diff --git a/java/test/src/main/java/org/ray/api/test/ActorTest.java b/java/test/src/main/java/org/ray/api/test/ActorTest.java index f0af00401..baaffdf01 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorTest.java @@ -59,16 +59,12 @@ public class ActorTest extends BaseTest { } /** - * Test getting a direct object (an object that is returned by a direct-call task) twice from the - * object store. + * Test getting an object twice from the local memory store. * - * Direct objects are stored in core worker's local memory. And it will be removed after the first + * Objects are stored in core worker's local memory. And it will be removed after the first * get. To enable getting it twice, we cache the object in `RayObjectImpl`. - * - * NOTE(hchen): this test will run for non-direct actors as well, which doesn't have the above - * issue and should also succeed. */ - public void testGetDirectObjectTwice() { + public void testGetObjectTwice() { RayActor actor = Ray.createActor(Counter::new, 1); RayObject result = actor.call(Counter::getValue); Assert.assertEquals(result.get(), Integer.valueOf(1)); @@ -122,11 +118,12 @@ public class ActorTest extends BaseTest { .get()); } + // TODO(qwang): Will re-enable this test case once ref counting is supported in Java. + @Test(enabled = false) public void testUnreconstructableActorObject() throws InterruptedException { TestUtils.skipTestUnderSingleProcess(); + // The UnreconstructableException is created by raylet. - // TODO (kfstorm): This should be supported by direct actor call. - TestUtils.skipTestIfDirectActorCallEnabled(); RayActor counter = Ray.createActor(Counter::new, 100); // Call an actor method. RayObject value = counter.call(Counter::getValue); diff --git a/java/test/src/main/java/org/ray/api/test/KillActorTest.java b/java/test/src/main/java/org/ray/api/test/KillActorTest.java index 3203a7d59..8af24174b 100644 --- a/java/test/src/main/java/org/ray/api/test/KillActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/KillActorTest.java @@ -27,7 +27,6 @@ public class KillActorTest extends BaseTest { public void testKillActor() { TestUtils.skipTestUnderSingleProcess(); - TestUtils.skipTestIfDirectActorCallDisabled(); RayActor actor = Ray.createActor(HangActor::new); Assert.assertTrue(actor.call(HangActor::alive).get()); RayObject result = actor.call(HangActor::hang); diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java index 9e2bf3dde..693901fc1 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java @@ -25,11 +25,11 @@ public class PlasmaFreeTest extends BaseTest { final boolean result = TestUtils.waitForCondition(() -> !TestUtils.getRuntime().getObjectStore() .wait(ImmutableList.of(helloId.getId()), 1, 0).get(0), 50); - if (TestUtils.isDirectActorCallEnabled()) { - // Direct call will not delete object from im-memory store. - Assert.assertFalse(result); - } else { + if (TestUtils.isSingleProcessMode()) { Assert.assertTrue(result); + } else { + // The object will not be deleted under cluster mode. + Assert.assertFalse(result); } } diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index 0f31cb5e2..38b2765cf 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -64,8 +64,6 @@ jfieldID java_function_arg_value; jclass java_base_task_options_class; jfieldID java_base_task_options_resources; -jfieldID java_base_task_options_use_direct_call; -jfieldID java_base_task_options_default_use_direct_call; jclass java_actor_creation_options_class; jfieldID java_actor_creation_options_max_reconstructions; @@ -169,10 +167,6 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_base_task_options_class = LoadClass(env, "org/ray/api/options/BaseTaskOptions"); java_base_task_options_resources = env->GetFieldID(java_base_task_options_class, "resources", "Ljava/util/Map;"); - java_base_task_options_use_direct_call = - env->GetFieldID(java_base_task_options_class, "useDirectCall", "Z"); - java_base_task_options_default_use_direct_call = - env->GetStaticFieldID(java_base_task_options_class, "DEFAULT_USE_DIRECT_CALL", "Z"); java_actor_creation_options_class = LoadClass(env, "org/ray/api/options/ActorCreationOptions"); diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 6b210f324..02d733019 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -108,10 +108,6 @@ extern jfieldID java_function_arg_value; extern jclass java_base_task_options_class; /// resources field of BaseTaskOptions class extern jfieldID java_base_task_options_resources; -/// useDirectCall field of BaseTaskOptions class -extern jfieldID java_base_task_options_use_direct_call; -/// DEFAULT_USE_DIRECT_CALL field of BaseTaskOptions class -extern jfieldID java_base_task_options_default_use_direct_call; /// ActorCreationOptions class extern jclass java_actor_creation_options_class; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc index b9b36c557..9d631ebf3 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc @@ -37,17 +37,6 @@ JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLangua return native_actor_handle->ActorLanguage(); } -JNIEXPORT jboolean JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor( - JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { - auto actor_id = JavaByteArrayToId(env, actorId); - ray::ActorHandle *native_actor_handle = nullptr; - auto status = GetCoreWorker(nativeCoreWorkerPointer) - .GetActorHandle(actor_id, &native_actor_handle); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, false); - return native_actor_handle->IsDirectCallActor(); -} - JNIEXPORT jobject JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorCreationTaskFunctionDescriptor( JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h index ac417ce0a..26260b5a9 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h @@ -29,15 +29,6 @@ extern "C" { JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage( JNIEnv *, jclass, jlong, jbyteArray); -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeIsDirectCallActor - * Signature: (J[B)Z - */ -JNIEXPORT jboolean JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor(JNIEnv *, jclass, jlong, - jbyteArray); - /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeGetActorCreationTaskFunctionDescriptor diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc index c2f12b530..6c248d082 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc @@ -90,34 +90,25 @@ inline std::unordered_map ToResources(JNIEnv *env, inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) { std::unordered_map resources; - bool use_direct_call; if (callOptions) { jobject java_resources = env->GetObjectField(callOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); - use_direct_call = - env->GetBooleanField(callOptions, java_base_task_options_use_direct_call); - } else { - use_direct_call = env->GetStaticBooleanField( - java_base_task_options_class, java_base_task_options_default_use_direct_call); } - ray::TaskOptions task_options{numReturns, use_direct_call, resources}; + ray::TaskOptions task_options{numReturns, /*use_direct_call=*/true, resources}; return task_options; } inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, jobject actorCreationOptions) { uint64_t max_reconstructions = 0; - bool use_direct_call; std::unordered_map resources; std::vector dynamic_worker_options; uint64_t max_concurrency = 1; if (actorCreationOptions) { max_reconstructions = static_cast(env->GetIntField( actorCreationOptions, java_actor_creation_options_max_reconstructions)); - use_direct_call = env->GetBooleanField(actorCreationOptions, - java_base_task_options_use_direct_call); jobject java_resources = env->GetObjectField(actorCreationOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); @@ -129,14 +120,11 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, } max_concurrency = static_cast(env->GetIntField( actorCreationOptions, java_actor_creation_options_max_concurrency)); - } else { - use_direct_call = env->GetStaticBooleanField( - java_base_task_options_class, java_base_task_options_default_use_direct_call); } ray::ActorCreationOptions actor_creation_options{ static_cast(max_reconstructions), - use_direct_call, + /*use_direct_call=*/true, static_cast(max_concurrency), resources, resources, diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/Worker.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/Worker.java index 16cb4cd52..a852a6fd5 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/Worker.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/Worker.java @@ -215,7 +215,6 @@ class WriterWorker extends Worker { LOGGER.info("WriterWorker actorId: {}", this.peerActor.getId()); } - LOGGER.info("Peer isDirectActorCall: {}", ((NativeRayActor) peer).isDirectCallActor()); int count = 3; while (count-- != 0) { peer.call(ReaderWorker::testRayCall).get();