[Java] Fix bug when actor creation task fails (#3740)

* [Java] Fix bug when actor creation task fails

* remove imports
This commit is contained in:
Hao Chen
2019-01-14 11:09:15 +08:00
committed by Yuhong Guo
parent 27c20a41a9
commit 1bb20badec
3 changed files with 95 additions and 8 deletions
@@ -48,11 +48,6 @@ public abstract class AbstractRayRuntime implements RayRuntime {
protected ObjectStoreProxy objectStoreProxy;
protected FunctionManager functionManager;
/**
* Actor ID -> local actor instance.
*/
Map<UniqueId, Object> localActors = new HashMap<>();
public AbstractRayRuntime(RayConfig rayConfig) {
this.rayConfig = rayConfig;
functionManager = new FunctionManager(rayConfig.driverResourcePath);
@@ -1,5 +1,6 @@
package org.ray.runtime;
import com.google.common.base.Preconditions;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.functionmanager.RayFunction;
@@ -18,6 +19,21 @@ public class Worker {
private final AbstractRayRuntime runtime;
/**
* The current actor object, if this worker is an actor, otherwise null.
*/
private Object currentActor = null;
/**
* Id of the current actor object, if the worker is an actor, otherwise NIL.
*/
private UniqueId currentActorId = UniqueId.NIL;
/**
* The exception that failed the actor creation task, if any.
*/
private Exception actorCreationException = null;
public Worker(AbstractRayRuntime runtime) {
this.runtime = runtime;
}
@@ -46,7 +62,14 @@ public class Worker {
runtime.getWorkerContext().setCurrentTask(spec, rayFunction.classLoader);
Thread.currentThread().setContextClassLoader(rayFunction.classLoader);
// Get local actor object and arguments.
Object actor = spec.isActorTask() ? runtime.localActors.get(spec.actorId) : null;
Object actor = null;
if (spec.isActorTask()) {
Preconditions.checkState(spec.actorId.equals(currentActorId));
if (actorCreationException != null) {
throw actorCreationException;
}
actor = currentActor;
}
Object[] args = ArgumentsBuilder.unwrap(spec, rayFunction.classLoader);
// Execute the task.
Object result;
@@ -59,12 +82,18 @@ public class Worker {
if (!spec.isActorCreationTask()) {
runtime.put(returnId, result);
} else {
runtime.localActors.put(returnId, result);
currentActor = result;
currentActorId = returnId;
}
LOGGER.info("Finished executing task {}", spec.taskId);
} catch (Exception e) {
LOGGER.error("Error executing task " + spec, e);
runtime.put(returnId, new RayException("Error executing task " + spec, e));
if (!spec.isActorCreationTask()) {
runtime.put(returnId, new RayException("Error executing task " + spec, e));
} else {
actorCreationException = e;
currentActorId = returnId;
}
} finally {
runtime.getWorkerContext().setCurrentTask(null, null);
Thread.currentThread().setContextClassLoader(oldLoader);