implement
/**
* Apply a partition to this stream.
*
- * @param partition The partitioning strategy. Returns This stream.
+ * @param partition The partitioning strategy.
+ * @return This stream.
*/
public PythonDataStream partitionBy(PythonPartition partition) {
checkPartitionCall();
diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/python/stream/PythonKeyDataStream.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/python/stream/PythonKeyDataStream.java
index 8116fd392..078f84ac4 100644
--- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/python/stream/PythonKeyDataStream.java
+++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/python/stream/PythonKeyDataStream.java
@@ -31,7 +31,8 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
/**
* Apply a reduce function to this stream.
*
- * @param func The reduce function. Returns A new DataStream.
+ * @param func The reduce function.
+ * @return A new DataStream.
*/
public PythonDataStream reduce(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.REDUCE_FUNCTION);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/CommonConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/CommonConfig.java
index 0c555e7c5..2ec3b6dfb 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/CommonConfig.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/global/CommonConfig.java
@@ -11,7 +11,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job id. Non-custom.
*
- * Returns Job id with string type.
+ * @return Job id with string type.
*/
@DefaultValue(value = "default-job-id")
@Key(value = JOB_ID)
@@ -20,7 +20,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job name. Non-custom.
*
- *
Returns Job name with string type.
+ * @return Job name with string type.
*/
@DefaultValue(value = "default-job-name")
@Key(value = JOB_NAME)
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java
index bc2fc2bd3..79189431a 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/config/master/SchedulerConfig.java
@@ -11,7 +11,7 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker initiation. Default is: 10000ms(10s).
*
- *
Returns timeout ms
+ * @return timeout ms
*/
@Key(WORKER_INITIATION_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
@@ -20,7 +20,7 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker starting. Default is: 10000ms(10s).
*
- *
Returns timeout ms
+ * @return timeout ms
*/
@Key(WORKER_STARTING_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/context/ContextBackend.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/context/ContextBackend.java
index faf870390..83b62696e 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/context/ContextBackend.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/context/ContextBackend.java
@@ -12,14 +12,15 @@ public interface ContextBackend {
/**
* check if key exists in state
*
- *
Returns true if exists
+ * @return true if exists
*/
boolean exists(final String key) throws Exception;
/**
* get content by key
*
- * @param key key Returns the StateBackend
+ * @param key key
+ * @return the StateBackend
*/
byte[] get(final String key) throws Exception;
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java
index b0d3b522e..2852e0f99 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionGraph.java
@@ -156,7 +156,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices from current execution graph.
*
- *
Returns all execution vertices.
+ * @return all execution vertices.
*/
public List getAllExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -168,7 +168,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices whose status is 'TO_ADD' from current execution graph.
*
- * Returns all added execution vertices.
+ * @return all added execution vertices.
*/
public List getAllAddedExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -181,7 +181,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified execution vertex from current execution graph by execution vertex id.
*
- * @param executionVertexId execution vertex id. Returns the specified execution vertex.
+ * @param executionVertexId execution vertex id.
+ * @return the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByExecutionVertexId(int executionVertexId) {
if (executionVertexMap.containsKey(executionVertexId)) {
@@ -193,7 +194,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified execution vertex from current execution graph by actor id.
*
- * @param actorId the actor id of execution vertex. Returns the specified execution vertex.
+ * @param actorId the actor id of execution vertex.
+ * @return the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByActorId(ActorId actorId) {
return actorIdExecutionVertexMap.get(actorId);
@@ -202,7 +204,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified actor by actor id.
*
- * @param actorId the actor id of execution vertex. Returns the specified actor handle.
+ * @param actorId the actor id of execution vertex.
+ * @return the specified actor handle.
*/
public Optional getActorById(ActorId actorId) {
return getAllActors().stream().filter(actor -> actor.getId().equals(actorId)).findFirst();
@@ -212,7 +215,8 @@ public class ExecutionGraph implements Serializable {
* Get the peer actor in the other side of channelName of a given actor
*
* @param actor actor in this side
- * @param channelName the channel name Returns the peer actor in the other side
+ * @param channelName the channel name
+ * @return the peer actor in the other side
*/
public BaseActorHandle getPeerActor(BaseActorHandle actor, String channelName) {
Set set = getActorsByChannelId(channelName);
@@ -229,7 +233,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get actors in both sides of a channelId
*
- * @param channelId the channelId Returns actors in both sides
+ * @param channelId the channelId
+ * @return actors in both sides
*/
public Set getActorsByChannelId(String channelId) {
return channelGroupedActors.getOrDefault(channelId, Sets.newHashSet());
@@ -238,7 +243,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all actors by graph.
*
- * Returns actor list
+ * @return actor list
*/
public List getAllActors() {
return getActorsFromJobVertices(getExecutionJobVertexList());
@@ -247,7 +252,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get source actors by graph.
*
- * Returns actor list
+ * @return actor list
*/
public List getSourceActors() {
List executionJobVertices =
@@ -261,7 +266,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get transformation and sink actors by graph.
*
- * Returns actor list
+ * @return actor list
*/
public List getNonSourceActors() {
List executionJobVertices =
@@ -278,7 +283,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get sink actors by graph.
*
- * Returns actor list
+ * @return actor list
*/
public List getSinkActors() {
List executionJobVertices =
@@ -292,7 +297,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get actors according to job vertices.
*
- * @param executionJobVertices specified job vertices Returns actor list
+ * @param executionJobVertices specified job vertices
+ * @return actor list
*/
public List getActorsFromJobVertices(
List executionJobVertices) {
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java
index 0aa426672..cf869c0c4 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java
@@ -109,7 +109,7 @@ public class ExecutionJobVertex implements Serializable {
/**
* e.g. 1-SourceOperator
*
- * Returns operator name with index
+ * @return operator name with index
*/
public String getExecutionJobVertexNameWithIndex() {
return executionJobVertexId + "-" + executionJobVertexName;
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Resources.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Resources.java
index b0dec4aef..9b07d131f 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Resources.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/resource/Resources.java
@@ -24,7 +24,7 @@ public class Resources implements Serializable {
/**
* Get registered containers, the container list is read-only.
*
- *
Returns container list.
+ * @return container list.
*/
public ImmutableList getRegisteredContainers() {
return ImmutableList.copyOf(registerContainers);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java
index a1dd5b6bc..fd672978a 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java
@@ -101,7 +101,7 @@ public class JobMaster {
/**
* Init JobMaster. To initiate or recover other components(like metrics and extra coordinators).
*
- * Returns init result
+ * @return init result
*/
public Boolean init(boolean isRecover) {
LOG.info("Initializing job master, isRecover={}.", isRecover);
@@ -136,7 +136,8 @@ public class JobMaster {
*
*
* @param jobMasterActor JobMaster actor
- * @param jobGraph logical plan Returns submit result
+ * @param jobGraph logical plan
+ * @return submit result
*/
public boolean submitJob(ActorHandle jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManager.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManager.java
index ce8dd4741..b563917d9 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManager.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/graphmanager/GraphManager.java
@@ -19,21 +19,22 @@ public interface GraphManager {
/**
* Build execution graph from job graph.
*
- * @param jobGraph logical plan of streaming job. Returns physical plan of streaming job.
+ * @param jobGraph logical plan of streaming job.
+ * @return physical plan of streaming job.
*/
ExecutionGraph buildExecutionGraph(JobGraph jobGraph);
/**
* Get job graph.
*
- * Returns the job graph.
+ * @return the job graph.
*/
JobGraph getJobGraph();
/**
* Get execution graph.
*
- *
Returns the execution graph.
+ * @return the execution graph.
*/
ExecutionGraph getExecutionGraph();
}
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManager.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManager.java
index 43671eea1..fbe3f696a 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManager.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/ResourceManager.java
@@ -10,7 +10,7 @@ public interface ResourceManager extends ResourceAssignStrategy {
/**
* Get registered containers, the container list is read-only.
*
- *
Returns the registered container list
+ * @return the registered container list
*/
ImmutableList getRegisteredContainers();
}
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/ResourceAssignStrategy.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/ResourceAssignStrategy.java
index 8df20790c..9ce131d25 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/ResourceAssignStrategy.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/ResourceAssignStrategy.java
@@ -13,7 +13,8 @@ public interface ResourceAssignStrategy {
* Assign {@link Container} for {@link ExecutionVertex}
*
* @param containers registered container
- * @param executionGraph execution graph Returns allocating view
+ * @param executionGraph execution graph
+ * @return allocating view
*/
ResourceAssignmentView assignResource(List containers, ExecutionGraph executionGraph);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java
index 74b646c67..48f2366cd 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/resourcemanager/strategy/impl/PipelineFirstStrategy.java
@@ -42,8 +42,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Assign resource to each execution vertex in the given execution graph.
*
* @param containers registered containers
- * @param executionGraph execution graph Returns allocating map, key is container ID, value is
- * list of vertextId, and contains vertices
+ * @param executionGraph execution graph
+ * @return allocating map, key is container ID, value is list of vertextId, and contains vertices
*/
@Override
public ResourceAssignmentView assignResource(
@@ -133,7 +133,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Find a container which matches required resource
*
* @param requiredResource required resource
- * @param containers registered containers Returns container that matches the required resource
+ * @param containers registered containers
+ * @return container that matches the required resource
*/
private Container findMatchedContainer(
Map requiredResource, List containers) {
@@ -159,7 +160,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Check if current container has enough resource
*
* @param requiredResource required resource
- * @param container container Returns true if matches, false else
+ * @param container container
+ * @return true if matches, false else
*/
private boolean hasEnoughResource(Map requiredResource, Container container) {
LOG.info("Check resource for index: {}, container: {}", currentContainerIndex, container);
@@ -200,7 +202,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Forward to next container
*
- * @param containers registered container list Returns next container in the list
+ * @param containers registered container list
+ * @return next container in the list
*/
private Container forwardToNextContainer(List containers) {
this.currentContainerIndex = (this.currentContainerIndex + 1) % containers.size();
@@ -210,7 +213,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Get current container
*
- * @param containers registered container Returns current container to allocate actor
+ * @param containers registered container
+ * @return current container to allocate actor
*/
private Container getCurrentContainer(List containers) {
return containers.get(currentContainerIndex);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java
index 962c0bdfa..d0fb60d54 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobScheduler.java
@@ -8,7 +8,8 @@ public interface JobScheduler {
/**
* Schedule streaming job using the physical plan.
*
- * @param executionGraph physical plan Returns scheduling result
+ * @param executionGraph physical plan
+ * @return scheduling result
*/
boolean scheduleJob(ExecutionGraph executionGraph);
}
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java
index 6309bb334..039715ccb 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java
@@ -95,7 +95,8 @@ public class JobSchedulerImpl implements JobScheduler {
/**
* Create JobWorker actors according to the physical plan.
*
- * @param executionGraph physical plan Returns actor creation result
+ * @param executionGraph physical plan
+ * @return actor creation result
*/
public boolean createWorkers(ExecutionGraph executionGraph) {
LOG.info("Begin creating workers.");
@@ -148,7 +149,8 @@ public class JobSchedulerImpl implements JobScheduler {
/**
* Build workers context.
*
- * @param executionGraph execution graph Returns vertex to worker context map
+ * @param executionGraph execution graph
+ * @return vertex to worker context map
*/
protected Map buildWorkersContext(
ExecutionGraph executionGraph) {
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java
index f5c4be5f7..3cd3984b2 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java
@@ -36,7 +36,8 @@ public class WorkerLifecycleController {
/**
* Create JobWorker actor according to the execution vertex.
*
- * @param executionVertex target execution vertex Returns creation result
+ * @param executionVertex target execution vertex
+ * @return creation result
*/
private boolean createWorker(ExecutionVertex executionVertex) {
LOG.info(
@@ -84,7 +85,8 @@ public class WorkerLifecycleController {
* Using context to init JobWorker.
*
* @param vertexToContextMap target JobWorker actor
- * @param timeout timeout for waiting, unit: ms Returns initiation result
+ * @param timeout timeout for waiting, unit: ms
+ * @return initiation result
*/
public boolean initWorkers(
Map vertexToContextMap, int timeout) {
@@ -120,7 +122,8 @@ public class WorkerLifecycleController {
* Start JobWorkers to run task.
*
* @param executionGraph physical plan
- * @param timeout timeout for waiting, unit: ms Returns starting result
+ * @param timeout timeout for waiting, unit: ms
+ * @return starting result
*/
public boolean startWorkers(ExecutionGraph executionGraph, long lastCheckpointId, int timeout) {
LOG.info("Begin starting workers.");
@@ -150,7 +153,8 @@ public class WorkerLifecycleController {
/**
* Stop and destroy JobWorkers' actor.
*
- * @param executionVertices target vertices Returns destroy result
+ * @param executionVertices target vertices
+ * @return destroy result
*/
public boolean destroyWorkers(List executionVertices) {
return asyncBatchExecute(this::destroyWorker, executionVertices);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java
index 5a5475350..6cd788138 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/rpc/RemoteCallWorker.java
@@ -25,7 +25,8 @@ public class RemoteCallWorker {
* Call JobWorker actor to init.
*
* @param actor target JobWorker actor
- * @param context JobWorker's context Returns init result
+ * @param context JobWorker's context
+ * @return init result
*/
public static ObjectRef initWorker(BaseActorHandle actor, JobWorkerContext context) {
LOG.info("Call worker to initiate, actor: {}, context: {}.", actor.getId(), context);
@@ -50,7 +51,8 @@ public class RemoteCallWorker {
* Call JobWorker actor to start.
*
* @param actor target JobWorker actor
- * @param checkpointId checkpoint ID to be rollback Returns start result
+ * @param checkpointId checkpoint ID to be rollback
+ * @return start result
*/
public static ObjectRef rollback(BaseActorHandle actor, final Long checkpointId) {
LOG.info("Call worker to start, actor: {}.", actor.getId());
@@ -79,7 +81,8 @@ public class RemoteCallWorker {
/**
* Call JobWorker actor to destroy without reconstruction.
*
- * @param actor target JobWorker actor Returns destroy result
+ * @param actor target JobWorker actor
+ * @return destroy result
*/
public static Boolean shutdownWithoutReconstruction(BaseActorHandle actor) {
LOG.info("Call worker to shutdown without reconstruction, actor is {}.", actor.getId());
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java
index 17ab4fe1e..ff3c62fee 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java
@@ -115,7 +115,8 @@ public class DataReader {
/**
* Read message from input channels, if timeout, return null.
*
- * @param timeoutMillis timeout Returns message or null
+ * @param timeoutMillis timeout
+ * @return message or null
*/
public ChannelMessage read(long timeoutMillis) {
if (buf.isEmpty()) {
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java
index d3a4b8d71..731031d62 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java
@@ -86,7 +86,8 @@ public class ChannelId {
* Generate channel name, which will be {@link ChannelId#ID_LENGTH} character
*
* @param fromTaskId upstream task id
- * @param toTaskId downstream task id Returns channel name
+ * @param toTaskId downstream task id
+ * @return channel name
*/
public static String genIdStr(int fromTaskId, int toTaskId, long ts) {
/*
@@ -116,7 +117,8 @@ public class ChannelId {
}
/**
- * @param id hex string representation of channel id Returns bytes representation of channel id
+ * @param id hex string representation of channel id
+ * @return bytes representation of channel id
*/
public static byte[] idStrToBytes(String id) {
byte[] idBytes = BaseEncoding.base16().decode(id.toUpperCase());
@@ -125,7 +127,8 @@ public class ChannelId {
}
/**
- * @param id bytes representation of channel id Returns hex string representation of channel id
+ * @param id bytes representation of channel id
+ * @return hex string representation of channel id
*/
public static String idBytesToStr(byte[] id) {
assert id.length == ChannelId.ID_LENGTH;
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
index 07fda18a6..29ac29f4d 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
@@ -36,7 +36,7 @@ public class EnvUtil {
/**
* Execute an external command.
*
- * Returns Whether the command succeeded.
+ * @return Whether the command succeeded.
*/
public static boolean executeCommand(List command, int waitTimeoutSeconds) {
try {
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/Platform.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/Platform.java
index effafcc54..324e1ab9d 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/Platform.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/Platform.java
@@ -77,7 +77,10 @@ public final class Platform {
buffer.clear();
}
- /** @param buffer a DirectBuffer backed by off-heap memory Returns address of off-heap memory */
+ /**
+ * @param buffer a DirectBuffer backed by off-heap memory
+ * @return address of off-heap memory
+ */
public static long getAddress(ByteBuffer buffer) {
return ((DirectBuffer) buffer).address();
}
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java
index a97a2f5ba..b3243d69f 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/RayUtils.java
@@ -15,7 +15,7 @@ public class RayUtils {
/**
* Get all node info from GCS
*
- * Returns node info list
+ * @return node info list
*/
public static List getAllNodeInfo() {
if (Ray.getRuntimeContext().isSingleProcess()) {
@@ -28,7 +28,7 @@ public class RayUtils {
/**
* Get all alive node info map
*
- * Returns node info map, key is unique node id , value is node info
+ * @return node info map, key is unique node id , value is node info
*/
public static Map