diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index b1c29b513..b3adaa11c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -39,6 +39,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { private static final int GET_TIMEOUT_MS = 1000; private static final int FETCH_BATCH_SIZE = 1000; + private static final int LIMITED_RETRY_COUNTER = 10; protected RayConfig rayConfig; protected WorkerContext workerContext; @@ -137,7 +138,9 @@ public abstract class AbstractRayRuntime implements RayRuntime { // Try reconstructing any objects we haven't gotten yet. Try to get them // until at least PlasmaLink.GET_TIMEOUT_MS milliseconds passes, then repeat. + int retryCounter = 0; while (unreadys.size() > 0) { + retryCounter++; List unreadyList = new ArrayList<>(unreadys.keySet()); List> reconstructBatches = splitIntoBatches(unreadyList, FETCH_BATCH_SIZE); @@ -159,11 +162,20 @@ public abstract class AbstractRayRuntime implements RayRuntime { unreadys.remove(id); } } + + if (retryCounter % LIMITED_RETRY_COUNTER == 0) { + LOGGER.warn("Attempted {} times to reconstruct objects {}, " + + "but haven't received response. If this message continues to print," + + " it may indicate that the task is hanging, or someting wrong " + + "happened in raylet backend.", + retryCounter, unreadys.keySet()); + } } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Got objects {} for task {}.", Arrays.toString(objectIds.toArray()), taskId); } + List finalRet = new ArrayList<>(); for (Pair value : ret) { diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index 5fa9ced27..f658d3b16 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -265,7 +265,7 @@ public class RayletClientImpl implements RayletClient { LOGGER.error( "Allocated buffer is not enough to transfer the task specification: {}vs {}", TASK_SPEC_BUFFER_SIZE, buffer.remaining()); - assert (false); + throw new RuntimeException("Allocated buffer is not enough to transfer to task."); } return buffer; }