mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
[Java] Avoid failure of serializing a user-defined unserializable exception. (#13119)
This commit is contained in:
@@ -5,6 +5,10 @@ import io.ray.runtime.util.SystemUtil;
|
||||
|
||||
public class RayTaskException extends RayException {
|
||||
|
||||
public RayTaskException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public RayTaskException(String message, Throwable cause) {
|
||||
super(
|
||||
String.format(
|
||||
|
||||
@@ -17,6 +17,7 @@ import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -159,9 +160,26 @@ public abstract class TaskExecutor<T extends TaskExecutor.ActorContext> {
|
||||
boolean hasReturn = rayFunction != null && rayFunction.hasReturn();
|
||||
boolean isCrossLanguage = parseFunctionDescriptor(rayFunctionInfo).signature.equals("");
|
||||
if (hasReturn || isCrossLanguage) {
|
||||
returnObjects.add(
|
||||
ObjectSerializer.serialize(
|
||||
new RayTaskException("Error executing task " + taskId, e)));
|
||||
NativeRayObject serializedException;
|
||||
try {
|
||||
serializedException =
|
||||
ObjectSerializer.serialize(
|
||||
new RayTaskException("Error executing task " + taskId, e));
|
||||
} catch (Exception unserializable) {
|
||||
// We should try-catch `ObjectSerializer.serialize` here. Because otherwise if the
|
||||
// application-level exception is not serializable. `ObjectSerializer.serialize`
|
||||
// will throw an exception and crash the worker.
|
||||
// Refer to the case `TaskExceptionTest.java` for more details.
|
||||
LOGGER.warn("Failed to serialize the exception to a RayObject.", unserializable);
|
||||
serializedException =
|
||||
ObjectSerializer.serialize(
|
||||
new RayTaskException(
|
||||
String.format(
|
||||
"Error executing task %s with the exception: %s",
|
||||
taskId, ExceptionUtils.getStackTrace(e))));
|
||||
}
|
||||
Preconditions.checkNotNull(serializedException);
|
||||
returnObjects.add(serializedException);
|
||||
}
|
||||
} else {
|
||||
actorContext.actorCreationException = e;
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package io.ray.test;
|
||||
|
||||
import io.ray.api.ActorHandle;
|
||||
import io.ray.api.Ray;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
public class TaskExceptionTest extends BaseTest {
|
||||
|
||||
private static class UnserializableClass {}
|
||||
|
||||
private static class UnserializableException extends RuntimeException {
|
||||
|
||||
public UnserializableException() {
|
||||
super();
|
||||
}
|
||||
|
||||
private UnserializableClass unSerializableClass = new UnserializableClass();
|
||||
}
|
||||
|
||||
private static class MyActor {
|
||||
|
||||
public String sayHi() {
|
||||
return "Hi";
|
||||
}
|
||||
|
||||
public String throwUnserializableException() {
|
||||
throw new UnserializableException();
|
||||
}
|
||||
}
|
||||
|
||||
private static String throwUnserializableException() {
|
||||
throw new UnserializableException();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThrowUnserializableExceptionInNormalTask() {
|
||||
// Test that if a task throws an unserializable exception, the worker won't crash.
|
||||
Assert.assertThrows(
|
||||
(() -> Ray.task(TaskExceptionTest::throwUnserializableException).remote().get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThrowUnserializableExceptionInActorTask() {
|
||||
ActorHandle<MyActor> myActor = Ray.actor(MyActor::new).remote();
|
||||
Assert.assertEquals("Hi", myActor.task(MyActor::sayHi).remote().get());
|
||||
Assert.assertThrows((() -> myActor.task(MyActor::throwUnserializableException).remote().get()));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user