[Java] Do not expose GcsClient to normal users (#4675)

This commit is contained in:
Qing Wang
2019-04-24 10:50:35 +08:00
committed by Hao Chen
parent c99e3caaca
commit 4dd628a837
13 changed files with 37 additions and 57 deletions
+1 -9
View File
@@ -1,11 +1,10 @@
package org.ray.api;
import java.util.List;
import org.ray.api.gcs.GcsClient;
import org.ray.api.id.UniqueId;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtime.RayRuntimeFactory;
import org.ray.api.runtimecontext.RuntimeContext;
/**
* This class contains all public APIs of Ray.
@@ -130,11 +129,4 @@ public final class Ray extends RayCall {
public static RuntimeContext getRuntimeContext() {
return runtime.getRuntimeContext();
}
/**
* Get gcs client.
*/
public static GcsClient getGcsClient() {
return runtime.getGcsClient();
}
}
@@ -1,15 +0,0 @@
package org.ray.api.gcs;
import java.util.List;
/**
* The client used to interface with the GCS.
*/
public interface GcsClient {
/**
* Get all node information in Ray cluster.
*/
List<NodeInfo> getAllNodeInfo();
}
@@ -4,13 +4,12 @@ import java.util.List;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.RuntimeContext;
import org.ray.api.WaitResult;
import org.ray.api.function.RayFunc;
import org.ray.api.gcs.GcsClient;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtimecontext.RuntimeContext;
/**
* Base interface of a Ray runtime.
@@ -132,6 +131,4 @@ public interface RayRuntime {
*/
RayPyActor createPyActor(String moduleName, String className, Object[] args,
ActorCreationOptions options);
GcsClient getGcsClient();
}
@@ -1,4 +1,4 @@
package org.ray.api.gcs;
package org.ray.api.runtimecontext;
import java.util.Map;
import org.ray.api.id.UniqueId;
@@ -1,5 +1,6 @@
package org.ray.api;
package org.ray.api.runtimecontext;
import java.util.List;
import org.ray.api.id.UniqueId;
/**
@@ -43,4 +44,9 @@ public interface RuntimeContext {
* Return true if Ray is running in single-process mode, false if Ray is running in cluster mode.
*/
boolean isSingleProcess();
/**
* Get all node information in Ray cluster.
*/
List<NodeInfo> getAllNodeInfo();
}
@@ -12,20 +12,20 @@ import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.RuntimeContext;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.function.RayFunc;
import org.ray.api.gcs.GcsClient;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.BaseTaskOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.functionmanager.FunctionManager;
import org.ray.runtime.functionmanager.PyFunctionDescriptor;
import org.ray.runtime.gcs.GcsClient;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.objectstore.ObjectStoreProxy.GetResult;
import org.ray.runtime.raylet.RayletClient;
@@ -318,11 +318,6 @@ public abstract class AbstractRayRuntime implements RayRuntime {
return actor;
}
@Override
public GcsClient getGcsClient() {
return gcsClient;
}
/**
* Create the task specification.
*
@@ -423,4 +418,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
return runtimeContext;
}
public GcsClient getGcsClient() {
return gcsClient;
}
}
@@ -13,7 +13,7 @@ import java.util.HashMap;
import java.util.Map;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.gcs.GcsClientImpl;
import org.ray.runtime.gcs.GcsClient;
import org.ray.runtime.gcs.RedisClient;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.raylet.RayletClientImpl;
@@ -92,7 +92,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
manager.startRayProcesses(true);
}
gcsClient = new GcsClientImpl(rayConfig.getRedisAddress(), rayConfig.redisPassword);
gcsClient = new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
// TODO(qwang): Get object_store_socket_name and raylet_socket_name from Redis.
objectStoreProxy = new ObjectStoreProxy(this, rayConfig.objectStoreSocketName);
@@ -1,11 +1,11 @@
package org.ray.runtime;
import com.google.common.base.Preconditions;
import org.ray.api.Ray;
import org.ray.api.RuntimeContext;
import java.util.List;
import org.ray.api.id.UniqueId;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.gcs.GcsClientImpl;
import org.ray.runtime.task.TaskSpec;
public class RuntimeContextImpl implements RuntimeContext {
@@ -38,7 +38,7 @@ public class RuntimeContextImpl implements RuntimeContext {
return false;
}
return ((GcsClientImpl) Ray.getGcsClient()).actorExists(getCurrentActorId());
return runtime.getGcsClient().actorExists(getCurrentActorId());
}
@Override
@@ -56,4 +56,8 @@ public class RuntimeContextImpl implements RuntimeContext {
return RunMode.SINGLE_PROCESS == runtime.getRayConfig().runMode;
}
@Override
public List<NodeInfo> getAllNodeInfo() {
return runtime.getGcsClient().getAllNodeInfo();
}
}
@@ -10,7 +10,6 @@ import org.ray.api.exception.RayTaskException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.functionmanager.RayFunction;
import org.ray.runtime.gcs.GcsClientImpl;
import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.TaskSpec;
import org.slf4j.Logger;
@@ -173,7 +172,7 @@ public class Worker {
lastCheckpointTimestamp = System.currentTimeMillis();
checkpointIds = new ArrayList<>();
List<Checkpoint> availableCheckpoints
= ((GcsClientImpl)runtime.getGcsClient()).getCheckpointsForActor(actorId);
= runtime.getGcsClient().getCheckpointsForActor(actorId);
if (availableCheckpoints.isEmpty()) {
return;
}
@@ -9,9 +9,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.ray.api.Checkpointable.Checkpoint;
import org.ray.api.gcs.GcsClient;
import org.ray.api.gcs.NodeInfo;
import org.ray.api.id.UniqueId;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.runtime.generated.ActorCheckpointIdData;
import org.ray.runtime.generated.ClientTableData;
import org.ray.runtime.generated.TablePrefix;
@@ -22,15 +21,15 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of GcsClient.
*/
public class GcsClientImpl implements GcsClient {
public class GcsClient {
private static Logger LOGGER = LoggerFactory.getLogger(GcsClientImpl.class);
private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class);
private RedisClient primary;
private List<RedisClient> shards;
public GcsClientImpl(String redisAddress, String redisPassword) {
public GcsClient(String redisAddress, String redisPassword) {
primary = new RedisClient(redisAddress, redisPassword);
int numShards = 0;
try {
@@ -48,7 +47,6 @@ public class GcsClientImpl implements GcsClient {
}).collect(Collectors.toList());
}
@Override
public List<NodeInfo> getAllNodeInfo() {
final String prefix = TablePrefix.name(TablePrefix.CLIENT);
final byte[] key = ArrayUtils.addAll(prefix.getBytes(), UniqueId.NIL.getBytes());
@@ -20,7 +20,8 @@ public class MicroBenchmarks {
}
final long duration = System.nanoTime() - start;
LOGGER.info(
"Benchmark \"{}\" finished, repeated {} times, total duration {} ms, average duration {} ns.",
"Benchmark \"{}\" finished, repeated {} times, total duration {} ms," +
" average duration {} ns.",
name, numRepeats, duration / 1_000_000, duration / numRepeats);
}
@@ -4,10 +4,10 @@ import com.google.common.base.Preconditions;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.TestUtils;
import org.ray.api.gcs.GcsClient;
import org.ray.api.gcs.NodeInfo;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.gcs.GcsClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -31,7 +31,7 @@ public class GcsClientTest extends BaseTest {
RayConfig config = ((AbstractRayRuntime)Ray.internal()).getRayConfig();
Preconditions.checkNotNull(config);
GcsClient gcsClient = Ray.getGcsClient();
GcsClient gcsClient = ((AbstractRayRuntime)Ray.internal()).getGcsClient();
List<NodeInfo> allNodeInfo = gcsClient.getAllNodeInfo();
Assert.assertEquals(allNodeInfo.size(), 1);
Assert.assertEquals(allNodeInfo.get(0).nodeAddress, config.nodeIp);
@@ -6,7 +6,6 @@ import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.gcs.GcsClientImpl;
import org.ray.runtime.util.UniqueIdUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,7 +36,8 @@ public class PlasmaFreeTest extends BaseTest {
Assert.assertEquals("hello", helloId.get());
Ray.internal().free(ImmutableList.of(helloId.getId()), true, true);
final boolean result = TestUtils.waitForCondition(() -> !((GcsClientImpl) Ray.getGcsClient())
final boolean result = TestUtils.waitForCondition(
() -> !(((AbstractRayRuntime)Ray.internal()).getGcsClient())
.rayletTaskExistsInGcs(UniqueIdUtil.computeTaskId(helloId.getId())), 50);
Assert.assertTrue(result);
}