From 5cfbfe5df633e629efc1f10d5e922d4629713a20 Mon Sep 17 00:00:00 2001 From: Wang Qing Date: Fri, 12 Apr 2019 22:44:47 +0800 Subject: [PATCH] [Java] Implement GcsClient (#4601) --- java/api/src/main/java/org/ray/api/Ray.java | 9 ++ .../main/java/org/ray/api/gcs/GcsClient.java | 15 ++ .../main/java/org/ray/api/gcs/NodeInfo.java | 36 +++++ .../java/org/ray/api/runtime/RayRuntime.java | 3 + .../org/ray/runtime/AbstractRayRuntime.java | 7 + .../org/ray/runtime/RayNativeRuntime.java | 101 +------------ .../org/ray/runtime/RuntimeContextImpl.java | 4 +- .../src/main/java/org/ray/runtime/Worker.java | 5 +- .../org/ray/runtime/gcs/GcsClientImpl.java | 140 ++++++++++++++++++ .../java/org/ray/runtime/gcs/RedisClient.java | 11 +- .../org/ray/runtime/util/UniqueIdUtil.java | 69 +++++++++ .../main/java/org/ray/api/test/BaseTest.java | 4 - .../java/org/ray/api/test/GcsClientTest.java | 42 ++++++ .../java/org/ray/api/test/PlasmaFreeTest.java | 4 +- .../ray/api/test/ResourcesManagementTest.java | 12 ++ .../java/org/ray/api/test/UniqueIdTest.java | 7 + 16 files changed, 364 insertions(+), 105 deletions(-) create mode 100644 java/api/src/main/java/org/ray/api/gcs/GcsClient.java create mode 100644 java/api/src/main/java/org/ray/api/gcs/NodeInfo.java create mode 100644 java/runtime/src/main/java/org/ray/runtime/gcs/GcsClientImpl.java create mode 100644 java/test/src/main/java/org/ray/api/test/GcsClientTest.java diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index 2e660e543..769345811 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -1,6 +1,8 @@ package org.ray.api; import java.util.List; + +import org.ray.api.gcs.GcsClient; import org.ray.api.id.UniqueId; import org.ray.api.runtime.RayRuntime; import org.ray.api.runtime.RayRuntimeFactory; @@ -127,4 +129,11 @@ public final class Ray extends RayCall { public static RuntimeContext getRuntimeContext() { return runtime.getRuntimeContext(); } + + /** + * Get gcs client. + */ + public static GcsClient getGcsClient() { + return runtime.getGcsClient(); + } } diff --git a/java/api/src/main/java/org/ray/api/gcs/GcsClient.java b/java/api/src/main/java/org/ray/api/gcs/GcsClient.java new file mode 100644 index 000000000..6f48417b2 --- /dev/null +++ b/java/api/src/main/java/org/ray/api/gcs/GcsClient.java @@ -0,0 +1,15 @@ +package org.ray.api.gcs; + +import java.util.List; + +/** + * The client used to interface with the GCS. + */ +public interface GcsClient { + + /** + * Get all node information in Ray cluster. + */ + List getAllNodeInfo(); + +} diff --git a/java/api/src/main/java/org/ray/api/gcs/NodeInfo.java b/java/api/src/main/java/org/ray/api/gcs/NodeInfo.java new file mode 100644 index 000000000..0c98c5be3 --- /dev/null +++ b/java/api/src/main/java/org/ray/api/gcs/NodeInfo.java @@ -0,0 +1,36 @@ +package org.ray.api.gcs; + +import java.util.Map; +import org.ray.api.id.UniqueId; + +/** + * A class that represents the information of a node. + */ +public class NodeInfo { + + public final UniqueId nodeId; + + public final String nodeAddress; + + public final boolean isAlive; + + public final Map resources; + + public NodeInfo(UniqueId nodeId, String nodeAddress, + boolean isAlive, Map resources) { + this.nodeId = nodeId; + this.nodeAddress = nodeAddress; + this.isAlive = isAlive; + this.resources = resources; + } + + public String toString() { + return "NodeInfo{" + + "nodeId='" + nodeId + '\'' + + ", nodeAddress='" + nodeAddress + "\'" + + ", isAlive=" + isAlive + + ", resources=" + resources + + "}"; + } + +} diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 76c71cde4..be50349b6 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -7,6 +7,7 @@ import org.ray.api.RayPyActor; import org.ray.api.RuntimeContext; import org.ray.api.WaitResult; import org.ray.api.function.RayFunc; +import org.ray.api.gcs.GcsClient; import org.ray.api.id.UniqueId; import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.CallOptions; @@ -131,4 +132,6 @@ public interface RayRuntime { */ RayPyActor createPyActor(String moduleName, String className, Object[] args, ActorCreationOptions options); + + GcsClient getGcsClient(); } diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index e91d4df7b..00d24714f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -16,6 +16,7 @@ import org.ray.api.RuntimeContext; import org.ray.api.WaitResult; import org.ray.api.exception.RayException; import org.ray.api.function.RayFunc; +import org.ray.api.gcs.GcsClient; import org.ray.api.id.UniqueId; import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.BaseTaskOptions; @@ -67,6 +68,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { protected ObjectStoreProxy objectStoreProxy; protected FunctionManager functionManager; protected RuntimeContext runtimeContext; + protected GcsClient gcsClient; public AbstractRayRuntime(RayConfig rayConfig) { this.rayConfig = rayConfig; @@ -317,6 +319,11 @@ public abstract class AbstractRayRuntime implements RayRuntime { return actor; } + @Override + public GcsClient getGcsClient() { + return gcsClient; + } + /** * Create the task specification. * diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 0925574bc..d1845638a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -6,27 +6,18 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.lang3.ArrayUtils; -import org.ray.api.Checkpointable.Checkpoint; -import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; import org.ray.runtime.config.WorkerMode; +import org.ray.runtime.gcs.GcsClientImpl; import org.ray.runtime.gcs.RedisClient; -import org.ray.runtime.generated.ActorCheckpointIdData; -import org.ray.runtime.generated.TablePrefix; import org.ray.runtime.objectstore.ObjectStoreProxy; import org.ray.runtime.raylet.RayletClientImpl; import org.ray.runtime.runner.RunManager; -import org.ray.runtime.util.UniqueIdUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,14 +28,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { private static final Logger LOGGER = LoggerFactory.getLogger(RayNativeRuntime.class); - /** - * Redis client of the primary shard. - */ - private RedisClient redisClient; - /** - * Redis clients of all shards. - */ - private List redisClients; private RunManager manager = null; static { @@ -109,7 +92,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { manager.startRayProcesses(true); } - initRedisClients(); + gcsClient = new GcsClientImpl(rayConfig.getRedisAddress(), rayConfig.redisPassword); // TODO(qwang): Get object_store_socket_name and raylet_socket_name from Redis. objectStoreProxy = new ObjectStoreProxy(this, rayConfig.objectStoreSocketName); @@ -128,16 +111,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { rayConfig.objectStoreSocketName, rayConfig.rayletSocketName); } - private void initRedisClients() { - redisClient = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); - int numRedisShards = Integer.valueOf(redisClient.get("NumRedisShards", null)); - List addresses = redisClient.lrange("RedisShards", 0, -1); - Preconditions.checkState(numRedisShards == addresses.size()); - redisClients = addresses.stream().map(RedisClient::new) - .collect(Collectors.toList()); - redisClients.add(redisClient); - } - @Override public void shutdown() { if (null != manager) { @@ -145,7 +118,11 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } } + /** + * Register this worker or driver to GCS. + */ private void registerWorker() { + RedisClient redisClient = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); Map workerInfo = new HashMap<>(); String workerId = new String(workerContext.getCurrentWorkerId().getBytes()); if (rayConfig.workerMode == WorkerMode.DRIVER) { @@ -165,70 +142,4 @@ public final class RayNativeRuntime extends AbstractRayRuntime { redisClient.hmset("Workers:" + workerId, workerInfo); } } - - /** - * Get the available checkpoints for the given actor ID, return a list sorted by checkpoint - * timestamp in descending order. - */ - List getCheckpointsForActor(UniqueId actorId) { - List checkpoints = new ArrayList<>(); - // TODO(hchen): implement the equivalent of Python's `GlobalState`, to avoid looping over - // all redis shards.. - String prefix = TablePrefix.name(TablePrefix.ACTOR_CHECKPOINT_ID); - byte[] key = ArrayUtils.addAll(prefix.getBytes(), actorId.getBytes()); - for (RedisClient client : redisClients) { - byte[] result = client.get(key, null); - if (result == null) { - continue; - } - ActorCheckpointIdData data = ActorCheckpointIdData - .getRootAsActorCheckpointIdData(ByteBuffer.wrap(result)); - - UniqueId[] checkpointIds - = UniqueIdUtil.getUniqueIdsFromByteBuffer(data.checkpointIdsAsByteBuffer()); - - for (int i = 0; i < checkpointIds.length; i++) { - checkpoints.add(new Checkpoint(checkpointIds[i], data.timestamps(i))); - } - break; - } - checkpoints.sort((x, y) -> Long.compare(y.timestamp, x.timestamp)); - return checkpoints; - } - - - /** - * Query whether the actor exists in Gcs. - */ - boolean actorExistsInGcs(UniqueId actorId) { - byte[] key = ArrayUtils.addAll("ACTOR".getBytes(), actorId.getBytes()); - - // TODO(qwang): refactor this with `GlobalState` after this issue - // getting finished. https://github.com/ray-project/ray/issues/3933 - for (RedisClient client : redisClients) { - if (client.exists(key)) { - return true; - } - } - - return false; - } - - /** - * Query whether the raylet task exists in Gcs. - */ - public boolean rayletTaskExistsInGcs(UniqueId taskId) { - byte[] key = ArrayUtils.addAll("RAYLET_TASK".getBytes(), taskId.getBytes()); - - // TODO(qwang): refactor this with `GlobalState` after this issue - // getting finished. https://github.com/ray-project/ray/issues/3933 - for (RedisClient client : redisClients) { - if (client.exists(key)) { - return true; - } - } - - return false; - } - } diff --git a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java index b0ba67a4c..30f668a8c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/RuntimeContextImpl.java @@ -1,9 +1,11 @@ package org.ray.runtime; import com.google.common.base.Preconditions; +import org.ray.api.Ray; import org.ray.api.RuntimeContext; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RunMode; +import org.ray.runtime.gcs.GcsClientImpl; import org.ray.runtime.task.TaskSpec; public class RuntimeContextImpl implements RuntimeContext { @@ -36,7 +38,7 @@ public class RuntimeContextImpl implements RuntimeContext { return false; } - return ((RayNativeRuntime) runtime).actorExistsInGcs(getCurrentActorId()); + return ((GcsClientImpl) Ray.getGcsClient()).actorExists(getCurrentActorId()); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index c8f5aaa34..7cbf4d77d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -10,6 +10,7 @@ import org.ray.api.exception.RayTaskException; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RunMode; import org.ray.runtime.functionmanager.RayFunction; +import org.ray.runtime.gcs.GcsClientImpl; import org.ray.runtime.task.ArgumentsBuilder; import org.ray.runtime.task.TaskSpec; import org.slf4j.Logger; @@ -171,8 +172,8 @@ public class Worker { numTasksSinceLastCheckpoint = 0; lastCheckpointTimestamp = System.currentTimeMillis(); checkpointIds = new ArrayList<>(); - List availableCheckpoints = ((RayNativeRuntime) runtime) - .getCheckpointsForActor(actorId); + List availableCheckpoints + = ((GcsClientImpl)runtime.getGcsClient()).getCheckpointsForActor(actorId); if (availableCheckpoints.isEmpty()) { return; } diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClientImpl.java new file mode 100644 index 000000000..34f11dc5c --- /dev/null +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClientImpl.java @@ -0,0 +1,140 @@ +package org.ray.runtime.gcs; + +import com.google.common.base.Preconditions; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; +import org.ray.api.Checkpointable.Checkpoint; +import org.ray.api.gcs.GcsClient; +import org.ray.api.gcs.NodeInfo; +import org.ray.api.id.UniqueId; +import org.ray.runtime.generated.ActorCheckpointIdData; +import org.ray.runtime.generated.ClientTableData; +import org.ray.runtime.generated.TablePrefix; +import org.ray.runtime.util.UniqueIdUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of GcsClient. + */ +public class GcsClientImpl implements GcsClient { + + private static Logger LOGGER = LoggerFactory.getLogger(GcsClientImpl.class); + + private RedisClient primary; + + private List shards; + + public GcsClientImpl(String redisAddress, String redisPassword) { + primary = new RedisClient(redisAddress, redisPassword); + int numShards = 0; + try { + numShards = Integer.valueOf(primary.get("NumRedisShards", null)); + Preconditions.checkState(numShards > 0, + String.format("Expected at least one Redis shards, found %d.", numShards)); + } catch (NumberFormatException e) { + throw new RuntimeException("Failed to get number of redis shards.", e); + } + + List shardAddresses = primary.lrange("RedisShards".getBytes(), 0, -1); + Preconditions.checkState(shardAddresses.size() == numShards); + shards = shardAddresses.stream().map((byte[] address) -> { + return new RedisClient(new String(address)); + }).collect(Collectors.toList()); + } + + @Override + public List getAllNodeInfo() { + final String prefix = TablePrefix.name(TablePrefix.CLIENT); + final byte[] key = ArrayUtils.addAll(prefix.getBytes(), UniqueId.NIL.getBytes()); + List results = primary.lrange(key, 0, -1); + + if (results == null) { + return new ArrayList<>(); + } + + // This map is used for deduplication of client entries. + Map clients = new HashMap<>(); + for (byte[] result : results) { + Preconditions.checkNotNull(result); + ClientTableData data = ClientTableData.getRootAsClientTableData(ByteBuffer.wrap(result)); + final UniqueId clientId = UniqueId.fromByteBuffer(data.clientIdAsByteBuffer()); + + if (data.isInsertion()) { + //Code path of node insertion. + Map resources = new HashMap<>(); + // Compute resources. + Preconditions.checkState( + data.resourcesTotalLabelLength() == data.resourcesTotalCapacityLength()); + for (int i = 0; i < data.resourcesTotalLabelLength(); i++) { + resources.put(data.resourcesTotalLabel(i), data.resourcesTotalCapacity(i)); + } + + NodeInfo nodeInfo = new NodeInfo( + clientId, data.nodeManagerAddress(), true, resources); + clients.put(clientId, nodeInfo); + } else { + // Code path of node deletion. + NodeInfo nodeInfo = new NodeInfo(clientId, clients.get(clientId).nodeAddress, + false, clients.get(clientId).resources); + clients.put(clientId, nodeInfo); + } + } + + return new ArrayList<>(clients.values()); + } + + /** + * If the actor exists in GCS. + */ + public boolean actorExists(UniqueId actorId) { + byte[] key = ArrayUtils.addAll( + TablePrefix.name(TablePrefix.ACTOR).getBytes(), actorId.getBytes()); + return primary.exists(key); + } + + /** + * Query whether the raylet task exists in Gcs. + */ + public boolean rayletTaskExistsInGcs(UniqueId taskId) { + byte[] key = ArrayUtils.addAll(TablePrefix.name(TablePrefix.RAYLET_TASK).getBytes(), + taskId.getBytes()); + RedisClient client = getShardClient(taskId); + return client.exists(key); + } + + /** + * Get the available checkpoints for the given actor ID. + */ + public List getCheckpointsForActor(UniqueId actorId) { + List checkpoints = new ArrayList<>(); + final String prefix = TablePrefix.name(TablePrefix.ACTOR_CHECKPOINT_ID); + final byte[] key = ArrayUtils.addAll(prefix.getBytes(), actorId.getBytes()); + RedisClient client = getShardClient(actorId); + + byte[] result = client.get(key); + if (result != null) { + ActorCheckpointIdData data = + ActorCheckpointIdData.getRootAsActorCheckpointIdData(ByteBuffer.wrap(result)); + UniqueId[] checkpointIds = UniqueIdUtil.getUniqueIdsFromByteBuffer( + data.checkpointIdsAsByteBuffer()); + + for (int i = 0; i < checkpointIds.length; i++) { + checkpoints.add(new Checkpoint(checkpointIds[i], data.timestamps(i))); + } + } + checkpoints.sort((x, y) -> Long.compare(y.timestamp, x.timestamp)); + return checkpoints; + } + + private RedisClient getShardClient(UniqueId key) { + return shards.get((int) Long.remainderUnsigned(UniqueIdUtil.murmurHashCode(key), + shards.size())); + } + +} diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java index 62e82a9ec..e7c1d5473 100644 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java @@ -70,6 +70,10 @@ public class RedisClient { } + public byte[] get(byte[] key) { + return get(key, null); + } + public byte[] get(byte[] key, byte[] field) { try (Jedis jedis = jedisPool.getResource()) { if (field == null) { @@ -80,7 +84,12 @@ public class RedisClient { } } - public List lrange(String key, long start, long end) { + /** + * Return the specified elements of the list stored at the specified key. + * + * @return Multi bulk reply, specifically a list of elements in the specified range. + */ + public List lrange(byte[] key, long start, long end) { try (Jedis jedis = jedisPool.getResource()) { return jedis.lrange(key, start, end); } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java index 009d5a410..fa8b51ffa 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java @@ -134,4 +134,73 @@ public class UniqueIdUtil { return ByteBuffer.wrap(bytesOfIds); } + + + /** + * Compute the murmur hash code of this ID. + */ + public static long murmurHashCode(UniqueId id) { + return murmurHash64A(id.getBytes(), UniqueId.LENGTH, 0); + } + + /** + * This method is the same as `hash()` method of `ID` class in ray/src/ray/id.h + */ + private static long murmurHash64A(byte[] data, int length, int seed) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (seed & 0xFFFFFFFFL) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = ((long)data[i8] & 0xff) + + (((long)data[i8 + 1] & 0xff) << 8) + + (((long)data[i8 + 2] & 0xff) << 16) + + (((long)data[i8 + 3] & 0xff) << 24) + + (((long)data[i8 + 4] & 0xff) << 32) + + (((long)data[i8 + 5] & 0xff) << 40) + + (((long)data[i8 + 6] & 0xff) << 48) + + (((long)data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + final int remaining = length % 8; + if (remaining >= 7) { + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + } + if (remaining >= 6) { + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + } + if (remaining >= 5) { + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + } + if (remaining >= 4) { + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + } + if (remaining >= 3) { + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + } + if (remaining >= 2) { + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + } + if (remaining >= 1) { + h ^= (long) (data[length & ~7] & 0xff); + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h; + } } diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java index 366ee9a67..0170aa3a2 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java @@ -20,7 +20,6 @@ public class BaseTest { public void setUpBase(Method method) { LOGGER.info("===== Running test: " + method.getDeclaringClass().getName() + "." + method.getName()); - System.setProperty("ray.resources", "CPU:4,RES-A:4"); Ray.init(); // These files need to be deleted after each test case. filesToDelete = ImmutableList.of( @@ -38,9 +37,6 @@ public class BaseTest { for (File file : filesToDelete) { file.delete(); } - - // Unset system properties. - System.clearProperty("ray.resources"); } } diff --git a/java/test/src/main/java/org/ray/api/test/GcsClientTest.java b/java/test/src/main/java/org/ray/api/test/GcsClientTest.java new file mode 100644 index 000000000..0b2ce43e4 --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/GcsClientTest.java @@ -0,0 +1,42 @@ +package org.ray.api.test; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.ray.api.Ray; +import org.ray.api.TestUtils; +import org.ray.api.gcs.GcsClient; +import org.ray.api.gcs.NodeInfo; +import org.ray.runtime.AbstractRayRuntime; +import org.ray.runtime.config.RayConfig; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class GcsClientTest extends BaseTest { + + @BeforeClass + public void setUp() { + System.setProperty("ray.resources", "A:8"); + } + + @AfterClass + public void tearDown() { + System.clearProperty("ray.resources"); + } + + @Test + public void testGetAllNodeInfo() { + TestUtils.skipTestUnderSingleProcess(); + RayConfig config = ((AbstractRayRuntime)Ray.internal()).getRayConfig(); + + Preconditions.checkNotNull(config); + GcsClient gcsClient = Ray.getGcsClient(); + List allNodeInfo = gcsClient.getAllNodeInfo(); + Assert.assertEquals(allNodeInfo.size(), 1); + Assert.assertEquals(allNodeInfo.get(0).nodeAddress, config.nodeIp); + Assert.assertTrue(allNodeInfo.get(0).isAlive); + Assert.assertEquals(allNodeInfo.get(0).resources.get("A"), 8.0); + } + +} diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java index 4737740d8..9b122eafe 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java @@ -6,7 +6,7 @@ import org.ray.api.RayObject; import org.ray.api.TestUtils; import org.ray.api.annotation.RayRemote; import org.ray.runtime.AbstractRayRuntime; -import org.ray.runtime.RayNativeRuntime; +import org.ray.runtime.gcs.GcsClientImpl; import org.ray.runtime.util.UniqueIdUtil; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,7 +37,7 @@ public class PlasmaFreeTest extends BaseTest { Assert.assertEquals("hello", helloId.get()); Ray.internal().free(ImmutableList.of(helloId.getId()), true, true); - final boolean result = TestUtils.waitForCondition(() -> !((RayNativeRuntime) Ray.internal()) + final boolean result = TestUtils.waitForCondition(() -> !((GcsClientImpl) Ray.getGcsClient()) .rayletTaskExistsInGcs(UniqueIdUtil.computeTaskId(helloId.getId())), 50); Assert.assertTrue(result); } diff --git a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java index 114dfd396..6ca9d7c9d 100644 --- a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java +++ b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java @@ -11,6 +11,8 @@ import org.ray.api.annotation.RayRemote; import org.ray.api.options.ActorCreationOptions; import org.ray.api.options.CallOptions; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** @@ -18,6 +20,16 @@ import org.testng.annotations.Test; */ public class ResourcesManagementTest extends BaseTest { + @BeforeClass + public void setUp() { + System.setProperty("ray.resources", "CPU:4,RES-A:4"); + } + + @AfterClass + public void tearDown() { + System.clearProperty("ray.resources"); + } + @RayRemote public static Integer echo(Integer number) { return number; diff --git a/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java b/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java index 5607e81cd..5b3d773db 100644 --- a/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java +++ b/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java @@ -95,4 +95,11 @@ public class UniqueIdTest { } } + @Test + void testMurmurHash() { + UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232"); + long remainder = Long.remainderUnsigned(UniqueIdUtil.murmurHashCode(id), 1000000000); + Assert.assertEquals(remainder, 787616861); + } + }