diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index bfbf64d81..117153423 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -113,8 +113,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { int numObjectIds = objectIds.size(); // Do an initial fetch for remote objects. - List> fetchBatches = - splitIntoBatches(objectIds, FETCH_BATCH_SIZE); + List> fetchBatches = splitIntoBatches(objectIds); for (List batch : fetchBatches) { rayletClient.fetchOrReconstruct(batch, true, workerContext.getCurrentTaskId()); } @@ -139,8 +138,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { while (unreadys.size() > 0) { retryCounter++; List unreadyList = new ArrayList<>(unreadys.keySet()); - List> reconstructBatches = - splitIntoBatches(unreadyList, FETCH_BATCH_SIZE); + List> reconstructBatches = splitIntoBatches(unreadyList); for (List batch : reconstructBatches) { rayletClient.fetchOrReconstruct(batch, false, workerContext.getCurrentTaskId()); @@ -198,12 +196,12 @@ public abstract class AbstractRayRuntime implements RayRuntime { rayletClient.freePlasmaObjects(objectIds, localOnly); } - private List> splitIntoBatches(List objectIds, int batchSize) { + private List> splitIntoBatches(List objectIds) { List> batches = new ArrayList<>(); int objectsSize = objectIds.size(); - for (int i = 0; i < objectsSize; i += batchSize) { - int endIndex = i + batchSize; + for (int i = 0; i < objectsSize; i += FETCH_BATCH_SIZE) { + int endIndex = i + FETCH_BATCH_SIZE; List batchIds = (endIndex < objectsSize) ? objectIds.subList(i, endIndex) : objectIds.subList(i, objectsSize); @@ -256,17 +254,6 @@ public abstract class AbstractRayRuntime implements RayRuntime { return (RayActor) actor; } - /** - * Generate the return ids of a task. - */ - private UniqueId[] genReturnIds(UniqueId taskId, int numReturns) { - UniqueId[] ret = new UniqueId[numReturns]; - for (int i = 0; i < numReturns; i++) { - ret[i] = UniqueIdUtil.computeReturnId(taskId, i + 1); - } - return ret; - } - /** * Create the task specification. * @param func The target remote function. @@ -280,7 +267,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { UniqueId taskId = rayletClient.generateTaskId(workerContext.getCurrentDriverId(), workerContext.getCurrentTaskId(), workerContext.nextTaskIndex()); int numReturns = actor.getId().isNil() ? 1 : 2; - UniqueId[] returnIds = genReturnIds(taskId, numReturns); + UniqueId[] returnIds = UniqueIdUtil.genReturnIds(taskId, numReturns); UniqueId actorCreationId = UniqueId.NIL; if (isActorCreationTask) { diff --git a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java index 366937b10..7223fa28e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java +++ b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java @@ -1,13 +1,9 @@ package org.ray.runtime; -import com.google.common.base.Strings; -import java.lang.reflect.Field; -import java.util.stream.Collectors; import org.ray.api.runtime.RayRuntime; import org.ray.api.runtime.RayRuntimeFactory; import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.RunMode; -import org.ray.runtime.util.logger.RayLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +16,6 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory { @Override public RayRuntime createRayRuntime() { - RayLog.init(); RayConfig rayConfig = RayConfig.create(); try { AbstractRayRuntime runtime; diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 139abdf63..386202d38 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -31,7 +31,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { super(rayConfig); } - private void resetLibaryPath() { + private void resetLibraryPath() { String path = System.getProperty("java.library.path"); if (Strings.isNullOrEmpty(path)) { path = ""; @@ -60,7 +60,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { public void start() throws Exception { // Load native libraries. try { - resetLibaryPath(); + resetLibraryPath(); System.loadLibrary("raylet_library_java"); System.loadLibrary("plasma_java"); } catch (Exception e) { diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index b3df8d1cb..b804a481f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -37,7 +37,7 @@ public class RayletClientImpl implements RayletClient { ); /** - * Point to c++'s local scheduler client. + * The pointer to c++'s local scheduler client. */ private long client = 0; diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java index e67ec1967..6fd3ea0e7 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/worker/DefaultWorker.java @@ -16,14 +16,10 @@ public class DefaultWorker { try { System.setProperty("ray.worker.mode", "WORKER"); Ray.init(); - } catch (Exception e) { - LOGGER.error("Worker failed to start.", e); - } - LOGGER.info("Worker started."); - try { + LOGGER.info("Worker started."); ((AbstractRayRuntime)Ray.internal()).loop(); } catch (Exception e) { - LOGGER.error("Error occurred in worker.", e); + LOGGER.error("Failed to start worker.", e); } } } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/ObjectUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/ObjectUtil.java deleted file mode 100644 index 6b9df2993..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/ObjectUtil.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.ray.runtime.util; - -import java.lang.reflect.InvocationTargetException; - -public class ObjectUtil { - - public static T newObject(Class cls) { - try { - return cls.getConstructor().newInstance(); - } catch (InstantiationException | IllegalAccessException | NoSuchMethodException - | InvocationTargetException e) { - e.printStackTrace(); - return null; - } - } - - public static boolean[] toBooleanArray(Object[] vs) { - boolean[] nvs = new boolean[vs.length]; - for (int i = 0; i < vs.length; i++) { - nvs[i] = (boolean) vs[i]; - } - return nvs; - } - -} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java index d7b347945..021279924 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java @@ -71,6 +71,21 @@ public class UniqueIdUtil { return new UniqueId(taskId); } + /** + * Generate the return ids of a task. + * + * @param taskId The ID of the task that generates returnsIds. + * @param numReturns The number of returnIds. + * @return The Return Ids of this task. + */ + public static UniqueId[] genReturnIds(UniqueId taskId, int numReturns) { + UniqueId[] ret = new UniqueId[numReturns]; + for (int i = 0; i < numReturns; i++) { + ret[i] = UniqueIdUtil.computeReturnId(taskId, i + 1); + } + return ret; + } + public static byte[][] getIdBytes(List objectIds) { int size = objectIds.size(); byte[][] ids = new byte[size][]; diff --git a/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java b/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java deleted file mode 100644 index efc38d1db..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/logger/RayLog.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.ray.runtime.util.logger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * loggers in Ray. - * 1. core logger is used for internal Ray status logging. - * 2. rapp for ray applications logging. - */ -public class RayLog { - - /** - * for ray itself. - */ - public static Logger core; - - /** - * for ray app. - */ - public static Logger rapp; - - public static void init() { - core = LoggerFactory.getLogger("core"); - rapp = core; - } -} diff --git a/java/test/src/main/java/org/ray/api/benchmark/RayBenchmarkTest.java b/java/test/src/main/java/org/ray/api/benchmark/RayBenchmarkTest.java index ab73dd214..8e96b78fa 100644 --- a/java/test/src/main/java/org/ray/api/benchmark/RayBenchmarkTest.java +++ b/java/test/src/main/java/org/ray/api/benchmark/RayBenchmarkTest.java @@ -12,7 +12,6 @@ import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.annotation.RayRemote; import org.ray.api.test.BaseTest; -import org.ray.runtime.util.logger.RayLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/java/test/src/main/java/org/ray/api/test/RayMethodsTest.java b/java/test/src/main/java/org/ray/api/test/RayMethodsTest.java index a612ef7c2..8fb430bdc 100644 --- a/java/test/src/main/java/org/ray/api/test/RayMethodsTest.java +++ b/java/test/src/main/java/org/ray/api/test/RayMethodsTest.java @@ -8,7 +8,6 @@ import org.junit.Test; import org.ray.api.Ray; import org.ray.api.RayObject; import org.ray.api.WaitResult; -import org.ray.runtime.util.logger.RayLog; /** * Integration test for Ray.* @@ -30,8 +29,6 @@ public class RayMethodsTest extends BaseTest { double f1 = f1Id.get(); Object n1 = n1Id.get(); - RayLog.rapp.info("Strings: " + ss.get(0) + ss.get(1) + " int: " + i1 + " double: " + f1 - + " null: " + n1); Assert.assertEquals("Hello World!", ss.get(0) + ss.get(1)); Assert.assertEquals(1, i1); Assert.assertEquals(3.14, f1, Double.MIN_NORMAL);