From f0465bc68c16b17a0abbb8881f3296556e7c4177 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 7 Mar 2019 09:59:13 +0800 Subject: [PATCH] [Java] Refine tests and fix single-process mode (#4265) --- .travis.yml | 2 ++ .../java/org/ray/runtime/RayDevRuntime.java | 5 +++ .../org/ray/runtime/RuntimeContextImpl.java | 7 ++-- .../src/main/java/org/ray/runtime/Worker.java | 3 +- .../runtime/objectstore/MockObjectStore.java | 7 ++-- .../ray/runtime/raylet/MockRayletClient.java | 31 +++++++++------- .../src/main/resources/ray.default.conf | 2 +- java/test.sh | 35 +++++++------------ .../src/main/java/org/ray/api/TestUtils.java | 2 +- .../ray/api/test/ActorReconstructionTest.java | 10 ++---- .../main/java/org/ray/api/test/ActorTest.java | 2 ++ .../main/java/org/ray/api/test/BaseTest.java | 25 +++++-------- .../org/ray/api/test/ClientExceptionTest.java | 6 +--- .../java/org/ray/api/test/FailureTest.java | 10 +++--- .../api/test/MultiLanguageClusterTest.java | 26 ++++++++------ .../org/ray/api/test/MultiThreadingTest.java | 5 +++ .../org/ray/api/test/PlasmaStoreTest.java | 2 ++ .../org/ray/api/test/RedisPasswordTest.java | 10 +++--- .../ray/api/test/ResourcesManagementTest.java | 8 ++--- .../org/ray/api/test/RuntimeContextTest.java | 13 +++++-- .../java/org/ray/api/test/StressTest.java | 9 +++-- python/ray/services.py | 7 +++- python/setup.py | 6 +++- 23 files changed, 126 insertions(+), 107 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6e48d406c..1d6d61de0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,9 +26,11 @@ matrix: - JDK='Oracle JDK 8' - PYTHON=3.5 PYTHONWARNINGS=ignore - RAY_USE_CMAKE=1 + - RAY_INSTALL_JAVA=1 install: - ./ci/travis/install-dependencies.sh - export PATH="$HOME/miniconda/bin:$PATH" + - ./ci/travis/install-ray.sh script: - ./java/test.sh diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 7dffd3fd5..e5d7b20b1 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -28,4 +28,9 @@ public class RayDevRuntime extends AbstractRayRuntime { public MockObjectStore getObjectStore() { return store; } + + @Override + public Worker getWorker() { + return ((MockRayletClient) rayletClient).getCurrentWorker(); + } } diff --git a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java index f0780cc2d..b0ba67a4c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java @@ -4,7 +4,6 @@ import com.google.common.base.Preconditions; import org.ray.api.RuntimeContext; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RunMode; -import org.ray.runtime.config.WorkerMode; import org.ray.runtime.task.TaskSpec; public class RuntimeContextImpl implements RuntimeContext { @@ -22,8 +21,10 @@ public class RuntimeContextImpl implements RuntimeContext { @Override public UniqueId getCurrentActorId() { - Preconditions.checkState(runtime.rayConfig.workerMode == WorkerMode.WORKER); - return runtime.getWorker().getCurrentActorId(); + Worker worker = runtime.getWorker(); + Preconditions.checkState(worker != null && !worker.getCurrentActorId().isNil(), + "This method should only be called from an actor."); + return worker.getCurrentActorId(); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index e6a069efc..ef319ea20 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -79,7 +79,6 @@ public class Worker { * Execute a task. */ public void execute(TaskSpec spec) { - LOGGER.info("Executing task {}", spec.taskId); LOGGER.debug("Executing task {}", spec); UniqueId returnId = spec.returnIds[0]; ClassLoader oldLoader = Thread.currentThread().getContextClassLoader(); @@ -123,7 +122,7 @@ public class Worker { maybeLoadCheckpoint(result, returnId); currentActor = result; } - LOGGER.info("Finished executing task {}", spec.taskId); + LOGGER.debug("Finished executing task {}", spec.taskId); } catch (Exception e) { LOGGER.error("Error executing task " + spec, e); if (!spec.isActorCreationTask()) { diff --git a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java index 347084082..d81c566bf 100644 --- a/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java +++ b/java/runtime/src/main/java/org/ray/runtime/objectstore/MockObjectStore.java @@ -95,11 +95,10 @@ public class MockObjectStore implements ObjectStoreLink { ArrayList rets = new ArrayList<>(); for (byte[] id : objectIds) { try { - Constructor constructor = ObjectStoreData.class.getConstructor( - byte[].class, byte[].class); + Constructor constructor = ObjectStoreData.class.getDeclaredConstructors()[0]; constructor.setAccessible(true); - rets.add(constructor.newInstance(metadata.get(new UniqueId(id)), - data.get(new UniqueId(id)))); + rets.add((ObjectStoreData) constructor.newInstance(data.get(new UniqueId(id)), + metadata.get(new UniqueId(id)))); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java index f16c8f9f8..e44fd1014 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java @@ -39,6 +39,7 @@ public class MockRayletClient implements RayletClient { private final ExecutorService exec; private final Deque idleWorkers; private final Map actorWorkers; + private final ThreadLocal currentWorker; public MockRayletClient(RayDevRuntime runtime, int numberThreads) { this.runtime = runtime; @@ -48,6 +49,7 @@ public class MockRayletClient implements RayletClient { exec = Executors.newFixedThreadPool(numberThreads); idleWorkers = new LinkedList<>(); actorWorkers = new HashMap<>(); + currentWorker = new ThreadLocal<>(); } public synchronized void onObjectPut(UniqueId id) { @@ -60,22 +62,28 @@ public class MockRayletClient implements RayletClient { } } + public Worker getCurrentWorker() { + return currentWorker.get(); + } + /** * Get a worker from the worker pool to run the given task. */ private Worker getWorker(TaskSpec task) { - if (task.isActorTask()) { - return actorWorkers.get(task.actorId); - } Worker worker; - if (idleWorkers.size() > 0) { - worker = idleWorkers.pop(); + if (task.isActorTask()) { + worker = actorWorkers.get(task.actorId); } else { - worker = new Worker(runtime); - } - if (task.isActorCreationTask()) { - actorWorkers.put(task.actorCreationId, worker); + if (idleWorkers.size() > 0) { + worker = idleWorkers.pop(); + } else { + worker = new Worker(runtime); + } + if (task.isActorCreationTask()) { + actorWorkers.put(task.actorCreationId, worker); + } } + currentWorker.set(worker); return worker; } @@ -83,6 +91,7 @@ public class MockRayletClient implements RayletClient { * Return the worker to the worker pool. */ private void returnWorker(Worker worker) { + currentWorker.remove(); idleWorkers.push(worker); } @@ -105,9 +114,7 @@ public class MockRayletClient implements RayletClient { new byte[]{}, new byte[]{}); } } finally { - if (!task.isActorCreationTask() && !task.isActorTask()) { - returnWorker(worker); - } + returnWorker(worker); } }); } else { diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 81dab4d3d..5faeda7cf 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -100,7 +100,7 @@ ray { // ---------------------------- dev-runtime { // Number of threads that you process tasks - execution-parallelism: 5 + execution-parallelism: 10 } } diff --git a/java/test.sh b/java/test.sh index 1c6370d1f..b3e889371 100755 --- a/java/test.sh +++ b/java/test.sh @@ -2,36 +2,25 @@ # Cause the script to exit if a single command fails. set -e - # Show explicitly which commands are currently running. set -x ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) -$ROOT_DIR/../build.sh -l java pushd $ROOT_DIR/../java +echo "Compiling Java code." mvn clean install -Dmaven.test.skip -check_style=$(mvn checkstyle:check) -echo "${check_style}" -[[ ${check_style} =~ "BUILD FAILURE" ]] && exit 1 -# test raylet -mvn test | tee mvn_test -if [ `grep -c "BUILD FAILURE" mvn_test` -eq '0' ]; then - rm mvn_test - echo "Tests passed under CLUSTER mode!" -else - rm mvn_test - exit 1 -fi -# test raylet under SINGLE_PROCESS mode -mvn test -Dray.run-mode=SINGLE_PROCESS | tee dev_mvn_test -if [ `grep -c "BUILD FAILURE" dev_mvn_test` -eq '0' ]; then - rm dev_mvn_test - echo "Tests passed under SINGLE_PROCESS mode!" -else - rm dev_mvn_test - exit 1 -fi +echo "Checking code format." +mvn checkstyle:check + +echo "Running tests under cluster mode." +ENABLE_MULTI_LANGUAGE_TESTS=1 mvn test + +echo "Running tests under single-process mode." +mvn test -Dray.run-mode=SINGLE_PROCESS + +set +x +set +e popd diff --git a/java/test/src/main/java/org/ray/api/TestUtils.java b/java/test/src/main/java/org/ray/api/TestUtils.java index 18b7230ee..9b1ea915b 100644 --- a/java/test/src/main/java/org/ray/api/TestUtils.java +++ b/java/test/src/main/java/org/ray/api/TestUtils.java @@ -9,7 +9,7 @@ public class TestUtils { public static void skipTestUnderSingleProcess() { AbstractRayRuntime runtime = (AbstractRayRuntime)Ray.internal(); if (runtime.getRayConfig().runMode == RunMode.SINGLE_PROCESS) { - throw new SkipException("Skip case."); + throw new SkipException("This test doesn't work under single-process mode."); } } } diff --git a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java index 12d7d1a8a..e575daa84 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorReconstructionTest.java @@ -44,13 +44,9 @@ public class ActorReconstructionTest extends BaseTest { } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testActorReconstruction() throws InterruptedException, IOException { + TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1); RayActor actor = Ray.createActor(Counter::new, options); // Call increase 3 times. @@ -130,6 +126,8 @@ public class ActorReconstructionTest extends BaseTest { @Test public void testActorCheckpointing() throws IOException, InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1); RayActor actor = Ray.createActor(CheckpointableCounter::new, options); // Call increase 3 times. @@ -138,8 +136,6 @@ public class ActorReconstructionTest extends BaseTest { } // Assert that the actor wasn't resumed from a checkpoint. Assert.assertFalse(Ray.call(CheckpointableCounter::wasResumedFromCheckpoint, actor).get()); - - // Kill the actor process. int pid = Ray.call(CheckpointableCounter::getPid, actor).get(); Runtime.getRuntime().exec("kill -9 " + pid); // Wait for the actor to be killed. diff --git a/java/test/src/main/java/org/ray/api/test/ActorTest.java b/java/test/src/main/java/org/ray/api/test/ActorTest.java index 96be700b9..876ab322d 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorTest.java @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; +import org.ray.api.TestUtils; import org.ray.api.annotation.RayRemote; import org.ray.api.exception.UnreconstructableException; import org.ray.api.id.UniqueId; @@ -90,6 +91,7 @@ public class ActorTest extends BaseTest { @Test public void testUnreconstructableActorObject() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); RayActor counter = Ray.createActor(Counter::new, 100); // Call an actor method. RayObject value = Ray.call(Counter::getValue, counter); diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java index e84e8fadf..b67a8f64c 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java @@ -1,27 +1,31 @@ package org.ray.api.test; +import java.lang.reflect.Method; import org.ray.api.Ray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; public class BaseTest { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class); + @BeforeMethod - public void setUp() { + public void setUpBase(Method method) { + LOGGER.info("===== Running test: " + + method.getDeclaringClass().getName() + "." + method.getName()); System.setProperty("ray.home", "../.."); System.setProperty("ray.resources", "CPU:4,RES-A:4"); - beforeInitRay(); Ray.init(); - beforeEachCase(); } @AfterMethod - public void tearDown() { + public void tearDownBase() { // TODO(qwang): This is double check to check that the socket file is removed actually. // We could not enable this until `systemInfo` enabled. //File rayletSocketFIle = new File(Ray.systemInfo().rayletSocketName()); Ray.shutdown(); - afterShutdownRay(); //remove raylet socket file //rayletSocketFIle.delete(); @@ -31,15 +35,4 @@ public class BaseTest { System.clearProperty("ray.resources"); } - protected void beforeInitRay() { - - } - - protected void afterShutdownRay() { - - } - - protected void beforeEachCase() { - - } } diff --git a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java index 0c0433299..e9f53dddd 100644 --- a/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java +++ b/java/test/src/main/java/org/ray/api/test/ClientExceptionTest.java @@ -17,13 +17,9 @@ public class ClientExceptionTest extends BaseTest { private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionTest.class); - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testWaitAndCrash() { + TestUtils.skipTestUnderSingleProcess(); UniqueId randomId = UniqueId.randomId(); RayObject notExisting = new RayObjectImpl(randomId); diff --git a/java/test/src/main/java/org/ray/api/test/FailureTest.java b/java/test/src/main/java/org/ray/api/test/FailureTest.java index f74860177..6d47a2fc9 100644 --- a/java/test/src/main/java/org/ray/api/test/FailureTest.java +++ b/java/test/src/main/java/org/ray/api/test/FailureTest.java @@ -55,30 +55,29 @@ public class FailureTest extends BaseTest { } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testNormalTaskFailure() { + TestUtils.skipTestUnderSingleProcess(); assertTaskFailedWithRayTaskException(Ray.call(FailureTest::badFunc)); } @Test public void testActorCreationFailure() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, true); assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor)); } @Test public void testActorTaskFailure() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, false); assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor)); } @Test public void testWorkerProcessDying() { + TestUtils.skipTestUnderSingleProcess(); try { Ray.call(FailureTest::badFunc2).get(); Assert.fail("This line shouldn't be reached."); @@ -90,6 +89,7 @@ public class FailureTest extends BaseTest { @Test public void testActorProcessDying() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(BadActor::new, false); try { Ray.call(BadActor::badMethod2, actor).get(); diff --git a/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java b/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java index b3a8e87b7..c81a14898 100644 --- a/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java +++ b/java/test/src/main/java/org/ray/api/test/MultiLanguageClusterTest.java @@ -2,7 +2,7 @@ package org.ray.api.test; import com.google.common.collect.ImmutableList; import java.io.File; -import java.lang.ProcessBuilder.Redirect; +import java.lang.reflect.Method; import java.util.List; import java.util.concurrent.TimeUnit; import org.ray.api.Ray; @@ -33,13 +33,13 @@ public class MultiLanguageClusterTest { /** * Execute an external command. + * * @return Whether the command succeeded. */ private boolean executeCommand(List command, int waitTimeoutSeconds) { try { LOGGER.info("Executing command: {}", String.join(" ", command)); - Process process = new ProcessBuilder(command).redirectOutput(Redirect.INHERIT) - .redirectError(Redirect.INHERIT).start(); + Process process = new ProcessBuilder(command).inheritIO().start(); process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS); return process.exitValue() == 0; } catch (Exception e) { @@ -48,11 +48,12 @@ public class MultiLanguageClusterTest { } @BeforeMethod - public void setUp() { - // Check whether 'ray' command is installed. - boolean rayCommandExists = executeCommand(ImmutableList.of("which", "ray"), 5); - if (!rayCommandExists) { - throw new SkipException("Skipping test, because ray command doesn't exist."); + public void setUp(Method method) { + String testName = method.getName(); + if (!"1".equals(System.getenv("ENABLE_MULTI_LANGUAGE_TESTS"))) { + LOGGER.info("Skip " + testName + + " because env variable ENABLE_MULTI_LANGUAGE_TESTS isn't set"); + throw new SkipException("Skip test."); } // Delete existing socket files. @@ -64,15 +65,20 @@ public class MultiLanguageClusterTest { } // Start ray cluster. + String testDir = System.getProperty("user.dir"); + String workerOptions = String.format("-Dray.home=%s/../../", testDir); + workerOptions += + " -classpath " + String.format("%s/../../build/java/*:%s/target/*", testDir, testDir); final List startCommand = ImmutableList.of( "ray", "start", "--head", "--redis-port=6379", - "--include-java", String.format("--plasma-store-socket-name=%s", PLASMA_STORE_SOCKET_NAME), String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME), - "--java-worker-options=-classpath ../../build/java/*:../../java/test/target/*" + "--load-code-from-local", + "--include-java", + "--java-worker-options=" + workerOptions ); if (!executeCommand(startCommand, 10)) { throw new RuntimeException("Couldn't start ray cluster."); diff --git a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java index 6bbd39ffa..6289d1cd7 100644 --- a/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java +++ b/java/test/src/main/java/org/ray/api/test/MultiThreadingTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; +import org.ray.api.TestUtils; import org.ray.api.WaitResult; import org.ray.api.annotation.RayRemote; import org.testng.Assert; @@ -73,11 +74,15 @@ public class MultiThreadingTest extends BaseTest { @Test public void testInDriver() { + // TODO(hchen): Fix this test under single-process mode. + TestUtils.skipTestUnderSingleProcess(); testMultiThreading(); } @Test public void testInWorker() { + // Single-process mode doesn't have real workers. + TestUtils.skipTestUnderSingleProcess(); RayObject obj = Ray.call(MultiThreadingTest::testMultiThreading); Assert.assertEquals("ok", obj.get()); } diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java index 726bad3da..7abc3f421 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaStoreTest.java @@ -4,6 +4,7 @@ import org.apache.arrow.plasma.PlasmaClient; import org.apache.arrow.plasma.exceptions.DuplicateObjectException; import org.ray.api.Ray; +import org.ray.api.TestUtils; import org.ray.api.id.UniqueId; import org.ray.runtime.AbstractRayRuntime; import org.testng.Assert; @@ -13,6 +14,7 @@ public class PlasmaStoreTest extends BaseTest { @Test public void testPutWithDuplicateId() { + TestUtils.skipTestUnderSingleProcess(); UniqueId objectId = UniqueId.randomId(); AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); PlasmaClient store = new PlasmaClient(runtime.getRayConfig().objectStoreSocketName, "", 0); diff --git a/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java b/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java index 210a4a045..114ef7498 100644 --- a/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java +++ b/java/test/src/main/java/org/ray/api/test/RedisPasswordTest.java @@ -4,18 +4,20 @@ import org.ray.api.Ray; import org.ray.api.RayObject; import org.ray.api.annotation.RayRemote; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class RedisPasswordTest extends BaseTest { - @Override - public void beforeInitRay() { + @BeforeClass + public void setUp() { System.setProperty("ray.redis.head-password", "12345678"); System.setProperty("ray.redis.password", "12345678"); } - @Override - public void afterShutdownRay() { + @AfterClass + public void tearDown() { System.clearProperty("ray.redis.head-password"); System.clearProperty("ray.redis.password"); } diff --git a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java index 5d021d0cb..114dfd396 100644 --- a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java +++ b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java @@ -25,18 +25,15 @@ public class ResourcesManagementTest extends BaseTest { @RayRemote public static class Echo { + public Integer echo(Integer number) { return number; } } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testMethods() { + TestUtils.skipTestUnderSingleProcess(); CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0)); // This is a case that can satisfy required resources. @@ -57,6 +54,7 @@ public class ResourcesManagementTest extends BaseTest { @Test public void testActors() { + TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions actorCreationOptions1 = new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0)); diff --git a/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java b/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java index b6fdca32f..512519bce 100644 --- a/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java +++ b/java/test/src/main/java/org/ray/api/test/RuntimeContextTest.java @@ -5,6 +5,8 @@ import org.ray.api.RayActor; import org.ray.api.annotation.RayRemote; import org.ray.api.id.UniqueId; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class RuntimeContextTest extends BaseTest { @@ -14,13 +16,20 @@ public class RuntimeContextTest extends BaseTest { private static String RAYLET_SOCKET_NAME = "/tmp/ray/test/raylet_socket"; private static String OBJECT_STORE_SOCKET_NAME = "/tmp/ray/test/object_store_socket"; - @Override - public void beforeInitRay() { + @BeforeClass + public void setUp() { System.setProperty("ray.driver.id", DRIVER_ID.toString()); System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME); System.setProperty("ray.object-store.socket-name", OBJECT_STORE_SOCKET_NAME); } + @AfterClass + public void tearDown() { + System.clearProperty("ray.driver.id"); + System.clearProperty("ray.raylet.socket-name"); + System.clearProperty("ray.object-store.socket-name"); + } + @Test public void testRuntimeContextInDriver() { Assert.assertEquals(DRIVER_ID, Ray.getRuntimeContext().getCurrentDriverId()); diff --git a/java/test/src/main/java/org/ray/api/test/StressTest.java b/java/test/src/main/java/org/ray/api/test/StressTest.java index 24bc467db..b5bf1356e 100644 --- a/java/test/src/main/java/org/ray/api/test/StressTest.java +++ b/java/test/src/main/java/org/ray/api/test/StressTest.java @@ -17,13 +17,9 @@ public class StressTest extends BaseTest { return x; } - @Override - public void beforeEachCase() { - TestUtils.skipTestUnderSingleProcess(); - } - @Test public void testSubmittingTasks() { + TestUtils.skipTestUnderSingleProcess(); for (int numIterations : ImmutableList.of(1, 10, 100, 1000)) { int numTasks = 1000 / numIterations; for (int i = 0; i < numIterations; i++) { @@ -40,6 +36,7 @@ public class StressTest extends BaseTest { @Test public void testDependency() { + TestUtils.skipTestUnderSingleProcess(); RayObject x = Ray.call(StressTest::echo, 1); for (int i = 0; i < 1000; i++) { x = Ray.call(StressTest::echo, x); @@ -77,6 +74,7 @@ public class StressTest extends BaseTest { @Test public void testSubmittingManyTasksToOneActor() { + TestUtils.skipTestUnderSingleProcess(); RayActor actor = Ray.createActor(Actor::new); List objectIds = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -90,6 +88,7 @@ public class StressTest extends BaseTest { @Test public void testPuttingAndGettingManyObjects() { + TestUtils.skipTestUnderSingleProcess(); Integer objectToPut = 1; List> objects = new ArrayList<>(); for (int i = 0; i < 100_000; i++) { diff --git a/python/ray/services.py b/python/ray/services.py index 76be5bc9d..42b28581b 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1242,7 +1242,7 @@ def build_java_worker_command( """ assert java_worker_options is not None - command = "java {} ".format(java_worker_options) + command = "java ".format(java_worker_options) if redis_address is not None: command += "-Dray.redis.address={} ".format(redis_address) @@ -1259,6 +1259,11 @@ def build_java_worker_command( command += "-Dray.home={} ".format(RAY_HOME) # TODO(suquark): We should use temp_dir as the input of a java worker. command += "-Dray.log-dir={} ".format(os.path.join(temp_dir, "sockets")) + + if java_worker_options: + # Put `java_worker_options` in the last, so it can overwrite the + # above options. + command += java_worker_options + " " command += "org.ray.runtime.runner.worker.DefaultWorker" return command diff --git a/python/setup.py b/python/setup.py index 2177e0c6b..1b36b0ffa 100644 --- a/python/setup.py +++ b/python/setup.py @@ -80,7 +80,11 @@ class build_ext(_build_ext.build_ext): # version of Python to build pyarrow inside the build.sh script. Note # that certain flags will not be passed along such as --user or sudo. # TODO(rkn): Fix this. - subprocess.check_call(["../build.sh", "-p", sys.executable]) + command = ["../build.sh", "-p", sys.executable] + if os.getenv("RAY_INSTALL_JAVA") == "1": + # Also build binaries for Java if the above env variable exists. + command += ["-l", "python,java"] + subprocess.check_call(command) # We also need to install pyarrow along with Ray, so make sure that the # relevant non-Python pyarrow files get copied.