[Java] Throw exception if Ray.init() is not called and users try to access ray API (#10497)

This commit is contained in:
chaokunyang
2020-09-05 10:09:19 +08:00
committed by GitHub
parent 8ee7c182f5
commit 7ee0fdba3d
6 changed files with 35 additions and 26 deletions
+25 -17
View File
@@ -49,7 +49,7 @@ public final class Ray extends RayCall {
*/
public static synchronized void shutdown() {
if (runtime != null) {
runtime.shutdown();
internal().shutdown();
runtime = null;
}
}
@@ -61,7 +61,7 @@ public final class Ray extends RayCall {
* @return A ObjectRef instance that represents the in-store object.
*/
public static <T> ObjectRef<T> put(T obj) {
return runtime.put(obj);
return internal().put(obj);
}
/**
@@ -71,7 +71,7 @@ public final class Ray extends RayCall {
* @return The Java object.
*/
public static <T> T get(ObjectRef<T> objectRef) {
return runtime.get(objectRef);
return internal().get(objectRef);
}
/**
@@ -81,7 +81,7 @@ public final class Ray extends RayCall {
* @return A list of Java objects.
*/
public static <T> List<T> get(List<ObjectRef<T>> objectList) {
return runtime.get(objectList);
return internal().get(objectList);
}
/**
@@ -95,7 +95,7 @@ public final class Ray extends RayCall {
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns,
int timeoutMs) {
return runtime.wait(waitList, numReturns, timeoutMs);
return internal().wait(waitList, numReturns, timeoutMs);
}
/**
@@ -107,7 +107,7 @@ public final class Ray extends RayCall {
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns) {
return runtime.wait(waitList, numReturns, Integer.MAX_VALUE);
return internal().wait(waitList, numReturns, Integer.MAX_VALUE);
}
/**
@@ -118,7 +118,7 @@ public final class Ray extends RayCall {
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList) {
return runtime.wait(waitList, waitList.size(), Integer.MAX_VALUE);
return internal().wait(waitList, waitList.size(), Integer.MAX_VALUE);
}
/**
@@ -132,7 +132,7 @@ public final class Ray extends RayCall {
* Optional.empty()
*/
public static <T extends BaseActorHandle> Optional<T> getActor(String name) {
return runtime.getActor(name, false);
return internal().getActor(name, false);
}
/**
@@ -146,7 +146,7 @@ public final class Ray extends RayCall {
* Optional.empty()
*/
public static <T extends BaseActorHandle> Optional<T> getGlobalActor(String name) {
return runtime.getActor(name, true);
return internal().getActor(name, true);
}
/**
@@ -156,7 +156,7 @@ public final class Ray extends RayCall {
* @return The async context.
*/
public static Object getAsyncContext() {
return runtime.getAsyncContext();
return internal().getAsyncContext();
}
/**
@@ -165,7 +165,7 @@ public final class Ray extends RayCall {
* @param asyncContext The async context to set.
*/
public static void setAsyncContext(Object asyncContext) {
runtime.setAsyncContext(asyncContext);
internal().setAsyncContext(asyncContext);
}
// TODO (kfstorm): add the `rollbackAsyncContext` API to allow rollbacking the async context of
@@ -181,7 +181,7 @@ public final class Ray extends RayCall {
* @return The wrapped runnable.
*/
public static Runnable wrapRunnable(Runnable runnable) {
return runtime.wrapRunnable(runnable);
return internal().wrapRunnable(runnable);
}
/**
@@ -192,13 +192,21 @@ public final class Ray extends RayCall {
* @return The wrapped callable.
*/
public static <T> Callable<T> wrapCallable(Callable<T> callable) {
return runtime.wrapCallable(callable);
return internal().wrapCallable(callable);
}
public static boolean isInitialized() {
return runtime != null;
}
/**
* Get the underlying runtime instance.
*/
public static RayRuntime internal() {
if (runtime == null) {
throw new IllegalStateException(
"Ray has not been started yet. You can start Ray with 'Ray.init()'");
}
return runtime;
}
@@ -207,21 +215,21 @@ public final class Ray extends RayCall {
* Set the resource for the specific node.
*/
public static void setResource(UniqueId nodeId, String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, nodeId);
internal().setResource(resourceName, capacity, nodeId);
}
/**
* Set the resource for local node.
*/
public static void setResource(String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, UniqueId.NIL);
internal().setResource(resourceName, capacity, UniqueId.NIL);
}
/**
* Get the runtime context.
*/
public static RuntimeContext getRuntimeContext() {
return runtime.getRuntimeContext();
return internal().getRuntimeContext();
}
/**
@@ -239,7 +247,7 @@ public final class Ray extends RayCall {
*/
public static PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy) {
return runtime.createPlacementGroup(bundles, strategy);
return internal().createPlacementGroup(bundles, strategy);
}
/**
@@ -40,7 +40,8 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
addLocalReference();
}
public ObjectRefImpl() {}
public ObjectRefImpl() {
}
@Override
public synchronized T get() {
@@ -103,10 +104,10 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
// unit tests). So if `workerId` is null, it means this method has been invoked.
if (!removed.getAndSet(true)) {
REFERENCES.remove(this);
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
// It's possible that GC is executed after the runtime is shutdown.
if (runtime != null) {
runtime.getObjectStore().removeLocalReference(workerId, objectId);
if (Ray.isInitialized()) {
((RayRuntimeInternal) (Ray.internal())).getObjectStore()
.removeLocalReference(workerId, objectId);
}
}
}
@@ -91,7 +91,7 @@ public abstract class BaseMultiLanguageTest {
}
// Connect to the cluster.
Assert.assertNull(Ray.internal());
Assert.assertFalse(Ray.isInitialized());
System.setProperty("ray.redis.address", "127.0.0.1:6379");
System.setProperty("ray.object-store.socket-name", PLASMA_STORE_SOCKET_NAME);
System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME);
@@ -19,7 +19,7 @@ public class BaseTest {
@BeforeMethod(alwaysRun = true)
public void setUpBase(Method method) {
Assert.assertNull(Ray.internal());
Assert.assertFalse(Ray.isInitialized());
Ray.init();
// These files need to be deleted after each test case.
filesToDelete = ImmutableList.of(