[Java] Fix return of java doc (#13601)

This commit is contained in:
Kai Yang
2021-01-21 23:57:20 +08:00
committed by GitHub
parent 587f207c2f
commit 92f1e0902e
67 changed files with 350 additions and 229 deletions
@@ -11,7 +11,7 @@ public interface Function extends Serializable {
* storage, and load it back when in fail-over through. {@link
* Function#loadCheckpoint(Serializable)}.
*
* <p>Returns A serializable object which represents function state.
* @return A serializable object which represents function state.
*/
default Serializable saveCheckpoint() {
return null;
@@ -14,8 +14,8 @@ public interface FilterFunction<T> extends Function {
/**
* The filter function that evaluates the predicate.
*
* @param value The value to be filtered. Returns True for values that should be retained, false
* for values to be filtered out.
* @param value The value to be filtered.
* @return True for values that should be retained, false for values to be filtered out.
*/
boolean filter(T value) throws Exception;
}
@@ -15,8 +15,8 @@ public interface Partition<T> extends Function {
* record.
*
* @param record The record.
* @param numPartition num of partitions Returns IDs of the downstream partitions that should
* receive the record.
* @param numPartition num of partitions
* @return IDs of the downstream partitions that should receive the record.
*/
int[] partition(T record, int numPartition);
}
@@ -59,7 +59,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a map function to this stream.
*
* @param mapFunction The map function.
* @param <R> Type of data returned by the map function. Returns A new DataStream.
* @param <R> Type of data returned by the map function.
* @return A new DataStream.
*/
public <R> DataStream<R> map(MapFunction<T, R> mapFunction) {
return new DataStream<>(this, new MapOperator<>(mapFunction));
@@ -69,7 +70,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a flat-map function to this stream.
*
* @param flatMapFunction The FlatMapFunction
* @param <R> Type of data returned by the flatmap function. Returns A new DataStream
* @param <R> Type of data returned by the flatmap function.
* @return A new DataStream
*/
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapFunction) {
return new DataStream<>(this, new FlatMapOperator<>(flatMapFunction));
@@ -84,7 +86,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with. Returns A new UnionStream.
* @param others The other DataStreams to union output with.
* @return A new UnionStream.
*/
@SafeVarargs
public final DataStream<T> union(DataStream<T> stream, DataStream<T>... others) {
@@ -98,7 +101,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply union transformations to this stream by merging {@link DataStream} outputs of the same
* type with each other.
*
* @param streams The DataStreams to union output with. Returns A new UnionStream.
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
*/
public final DataStream<T> union(List<DataStream<T>> streams) {
if (this instanceof UnionStream) {
@@ -115,7 +119,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
*
* @param other Another stream.
* @param <O> The type of the other stream data.
* @param <R> The type of the data in the joined stream. Returns A new JoinStream.
* @param <R> The type of the data in the joined stream.
* @return A new JoinStream.
*/
public <O, R> JoinStream<T, O, R> join(DataStream<O> other) {
return new JoinStream<>(this, other);
@@ -129,7 +134,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply a sink function and get a StreamSink.
*
* @param sinkFunction The sink function. Returns A new StreamSink.
* @param sinkFunction The sink function.
* @return A new StreamSink.
*/
public DataStreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new DataStreamSink<>(this, new SinkOperator<>(sinkFunction));
@@ -139,7 +145,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
* Apply a key-by function to this stream.
*
* @param keyFunction the key function.
* @param <K> The type of the key. Returns A new KeyDataStream.
* @param <K> The type of the key.
* @return A new KeyDataStream.
*/
public <K> KeyDataStream<K, T> keyBy(KeyFunction<T, K> keyFunction) {
checkPartitionCall();
@@ -149,7 +156,7 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply broadcast to this stream.
*
* <p>Returns This stream.
* @return This stream.
*/
public DataStream<T> broadcast() {
checkPartitionCall();
@@ -159,7 +166,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy. Returns This stream.
* @param partition The partitioning strategy.
* @return This stream.
*/
public DataStream<T> partitionBy(Partition<T> partition) {
checkPartitionCall();
@@ -27,7 +27,8 @@ public class DataStreamSource<T> extends DataStream<T> implements StreamSource<T
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data. Returns A DataStreamSource.
* @param <T> The type of source data.
* @return A DataStreamSource.
*/
public static <T> DataStreamSource<T> fromCollection(
StreamingContext context, Collection<T> values) {
@@ -33,7 +33,8 @@ public class KeyDataStream<K, T> extends DataStream<T> {
/**
* Apply a reduce function to this stream.
*
* @param reduceFunction The reduce function. Returns A new DataStream.
* @param reduceFunction The reduce function.
* @return A new DataStream.
*/
public DataStream<T> reduce(ReduceFunction reduceFunction) {
return new DataStream<>(this, new ReduceOperator(reduceFunction));
@@ -44,7 +45,8 @@ public class KeyDataStream<K, T> extends DataStream<T> {
*
* @param aggregateFunction The aggregate function
* @param <A> The type of aggregated intermediate data.
* @param <O> The type of result data. Returns A new DataStream.
* @param <O> The type of result data.
* @return A new DataStream.
*/
public <A, O> DataStream<O> aggregate(AggregateFunction<T, A, O> aggregateFunction) {
return new DataStream<>(this, null);
@@ -43,7 +43,7 @@ public class JobGraph implements Serializable {
* Generate direct-graph(made up of a set of vertices and connected by edges) by current job graph
* for simple log printing.
*
* <p>Returns Digraph in string type.
* @return Digraph in string type.
*/
public String generateDigraph() {
StringBuilder digraph = new StringBuilder();
@@ -51,7 +51,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a map function to this stream.
*
* @param func The python MapFunction. Returns A new PythonDataStream.
* @param func The python MapFunction.
* @return A new PythonDataStream.
*/
public PythonDataStream map(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.MAP_FUNCTION);
@@ -65,7 +66,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a flat-map function to this stream.
*
* @param func The python FlapMapFunction. Returns A new PythonDataStream
* @param func The python FlapMapFunction.
* @return A new PythonDataStream
*/
public PythonDataStream flatMap(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FLAT_MAP_FUNCTION);
@@ -79,8 +81,9 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a filter function to this stream.
*
* @param func The python FilterFunction. Returns A new PythonDataStream that contains only the
* elements satisfying the given filter predicate.
* @param func The python FilterFunction.
* @return A new PythonDataStream that contains only the elements satisfying the given filter
* predicate.
*/
public PythonDataStream filter(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FILTER_FUNCTION);
@@ -92,7 +95,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
* same type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with. Returns A new UnionStream.
* @param others The other DataStreams to union output with.
* @return A new UnionStream.
*/
public final PythonDataStream union(PythonDataStream stream, PythonDataStream... others) {
List<PythonDataStream> streams = new ArrayList<>();
@@ -105,7 +109,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of the
* same type with each other.
*
* @param streams The DataStreams to union output with. Returns A new UnionStream.
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
*/
public final PythonDataStream union(List<PythonDataStream> streams) {
if (this instanceof PythonUnionStream) {
@@ -124,7 +129,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a sink function and get a StreamSink.
*
* @param func The python SinkFunction. Returns A new StreamSink.
* @param func The python SinkFunction.
* @return A new StreamSink.
*/
public PythonStreamSink sink(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.SINK_FUNCTION);
@@ -138,7 +144,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a key-by function to this stream.
*
* @param func the python keyFunction. Returns A new KeyDataStream.
* @param func the python keyFunction.
* @return A new KeyDataStream.
*/
public PythonKeyDataStream keyBy(PythonFunction func) {
checkPartitionCall();
@@ -149,7 +156,7 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply broadcast to this stream.
*
* <p>Returns This stream.
* @return This stream.
*/
public PythonDataStream broadcast() {
checkPartitionCall();
@@ -159,7 +166,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy. Returns This stream.
* @param partition The partitioning strategy.
* @return This stream.
*/
public PythonDataStream partitionBy(PythonPartition partition) {
checkPartitionCall();
@@ -31,7 +31,8 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
/**
* Apply a reduce function to this stream.
*
* @param func The reduce function. Returns A new DataStream.
* @param func The reduce function.
* @return A new DataStream.
*/
public PythonDataStream reduce(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.REDUCE_FUNCTION);
@@ -11,7 +11,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job id. Non-custom.
*
* <p>Returns Job id with string type.
* @return Job id with string type.
*/
@DefaultValue(value = "default-job-id")
@Key(value = JOB_ID)
@@ -20,7 +20,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job name. Non-custom.
*
* <p>Returns Job name with string type.
* @return Job name with string type.
*/
@DefaultValue(value = "default-job-name")
@Key(value = JOB_NAME)
@@ -11,7 +11,7 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker initiation. Default is: 10000ms(10s).
*
* <p>Returns timeout ms
* @return timeout ms
*/
@Key(WORKER_INITIATION_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
@@ -20,7 +20,7 @@ public interface SchedulerConfig extends Config {
/**
* The timeout ms of worker starting. Default is: 10000ms(10s).
*
* <p>Returns timeout ms
* @return timeout ms
*/
@Key(WORKER_STARTING_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
@@ -12,14 +12,15 @@ public interface ContextBackend {
/**
* check if key exists in state
*
* <p>Returns true if exists
* @return true if exists
*/
boolean exists(final String key) throws Exception;
/**
* get content by key
*
* @param key key Returns the StateBackend
* @param key key
* @return the StateBackend
*/
byte[] get(final String key) throws Exception;
@@ -156,7 +156,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices from current execution graph.
*
* <p>Returns all execution vertices.
* @return all execution vertices.
*/
public List<ExecutionVertex> getAllExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -168,7 +168,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all execution vertices whose status is 'TO_ADD' from current execution graph.
*
* <p>Returns all added execution vertices.
* @return all added execution vertices.
*/
public List<ExecutionVertex> getAllAddedExecutionVertices() {
return executionJobVertexMap.values().stream()
@@ -181,7 +181,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified execution vertex from current execution graph by execution vertex id.
*
* @param executionVertexId execution vertex id. Returns the specified execution vertex.
* @param executionVertexId execution vertex id.
* @return the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByExecutionVertexId(int executionVertexId) {
if (executionVertexMap.containsKey(executionVertexId)) {
@@ -193,7 +194,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified execution vertex from current execution graph by actor id.
*
* @param actorId the actor id of execution vertex. Returns the specified execution vertex.
* @param actorId the actor id of execution vertex.
* @return the specified execution vertex.
*/
public ExecutionVertex getExecutionVertexByActorId(ActorId actorId) {
return actorIdExecutionVertexMap.get(actorId);
@@ -202,7 +204,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get specified actor by actor id.
*
* @param actorId the actor id of execution vertex. Returns the specified actor handle.
* @param actorId the actor id of execution vertex.
* @return the specified actor handle.
*/
public Optional<BaseActorHandle> getActorById(ActorId actorId) {
return getAllActors().stream().filter(actor -> actor.getId().equals(actorId)).findFirst();
@@ -212,7 +215,8 @@ public class ExecutionGraph implements Serializable {
* Get the peer actor in the other side of channelName of a given actor
*
* @param actor actor in this side
* @param channelName the channel name Returns the peer actor in the other side
* @param channelName the channel name
* @return the peer actor in the other side
*/
public BaseActorHandle getPeerActor(BaseActorHandle actor, String channelName) {
Set<BaseActorHandle> set = getActorsByChannelId(channelName);
@@ -229,7 +233,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get actors in both sides of a channelId
*
* @param channelId the channelId Returns actors in both sides
* @param channelId the channelId
* @return actors in both sides
*/
public Set<BaseActorHandle> getActorsByChannelId(String channelId) {
return channelGroupedActors.getOrDefault(channelId, Sets.newHashSet());
@@ -238,7 +243,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get all actors by graph.
*
* <p>Returns actor list
* @return actor list
*/
public List<BaseActorHandle> getAllActors() {
return getActorsFromJobVertices(getExecutionJobVertexList());
@@ -247,7 +252,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get source actors by graph.
*
* <p>Returns actor list
* @return actor list
*/
public List<BaseActorHandle> getSourceActors() {
List<ExecutionJobVertex> executionJobVertices =
@@ -261,7 +266,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get transformation and sink actors by graph.
*
* <p>Returns actor list
* @return actor list
*/
public List<BaseActorHandle> getNonSourceActors() {
List<ExecutionJobVertex> executionJobVertices =
@@ -278,7 +283,7 @@ public class ExecutionGraph implements Serializable {
/**
* Get sink actors by graph.
*
* <p>Returns actor list
* @return actor list
*/
public List<BaseActorHandle> getSinkActors() {
List<ExecutionJobVertex> executionJobVertices =
@@ -292,7 +297,8 @@ public class ExecutionGraph implements Serializable {
/**
* Get actors according to job vertices.
*
* @param executionJobVertices specified job vertices Returns actor list
* @param executionJobVertices specified job vertices
* @return actor list
*/
public List<BaseActorHandle> getActorsFromJobVertices(
List<ExecutionJobVertex> executionJobVertices) {
@@ -109,7 +109,7 @@ public class ExecutionJobVertex implements Serializable {
/**
* e.g. 1-SourceOperator
*
* <p>Returns operator name with index
* @return operator name with index
*/
public String getExecutionJobVertexNameWithIndex() {
return executionJobVertexId + "-" + executionJobVertexName;
@@ -24,7 +24,7 @@ public class Resources implements Serializable {
/**
* Get registered containers, the container list is read-only.
*
* <p>Returns container list.
* @return container list.
*/
public ImmutableList<Container> getRegisteredContainers() {
return ImmutableList.copyOf(registerContainers);
@@ -101,7 +101,7 @@ public class JobMaster {
/**
* Init JobMaster. To initiate or recover other components(like metrics and extra coordinators).
*
* <p>Returns init result
* @return init result
*/
public Boolean init(boolean isRecover) {
LOG.info("Initializing job master, isRecover={}.", isRecover);
@@ -136,7 +136,8 @@ public class JobMaster {
* </ol>
*
* @param jobMasterActor JobMaster actor
* @param jobGraph logical plan Returns submit result
* @param jobGraph logical plan
* @return submit result
*/
public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
@@ -19,21 +19,22 @@ public interface GraphManager {
/**
* Build execution graph from job graph.
*
* @param jobGraph logical plan of streaming job. Returns physical plan of streaming job.
* @param jobGraph logical plan of streaming job.
* @return physical plan of streaming job.
*/
ExecutionGraph buildExecutionGraph(JobGraph jobGraph);
/**
* Get job graph.
*
* <p>Returns the job graph.
* @return the job graph.
*/
JobGraph getJobGraph();
/**
* Get execution graph.
*
* <p>Returns the execution graph.
* @return the execution graph.
*/
ExecutionGraph getExecutionGraph();
}
@@ -10,7 +10,7 @@ public interface ResourceManager extends ResourceAssignStrategy {
/**
* Get registered containers, the container list is read-only.
*
* <p>Returns the registered container list
* @return the registered container list
*/
ImmutableList<Container> getRegisteredContainers();
}
@@ -13,7 +13,8 @@ public interface ResourceAssignStrategy {
* Assign {@link Container} for {@link ExecutionVertex}
*
* @param containers registered container
* @param executionGraph execution graph Returns allocating view
* @param executionGraph execution graph
* @return allocating view
*/
ResourceAssignmentView assignResource(List<Container> containers, ExecutionGraph executionGraph);
@@ -42,8 +42,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Assign resource to each execution vertex in the given execution graph.
*
* @param containers registered containers
* @param executionGraph execution graph Returns allocating map, key is container ID, value is
* list of vertextId, and contains vertices
* @param executionGraph execution graph
* @return allocating map, key is container ID, value is list of vertextId, and contains vertices
*/
@Override
public ResourceAssignmentView assignResource(
@@ -133,7 +133,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Find a container which matches required resource
*
* @param requiredResource required resource
* @param containers registered containers Returns container that matches the required resource
* @param containers registered containers
* @return container that matches the required resource
*/
private Container findMatchedContainer(
Map<String, Double> requiredResource, List<Container> containers) {
@@ -159,7 +160,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
* Check if current container has enough resource
*
* @param requiredResource required resource
* @param container container Returns true if matches, false else
* @param container container
* @return true if matches, false else
*/
private boolean hasEnoughResource(Map<String, Double> requiredResource, Container container) {
LOG.info("Check resource for index: {}, container: {}", currentContainerIndex, container);
@@ -200,7 +202,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Forward to next container
*
* @param containers registered container list Returns next container in the list
* @param containers registered container list
* @return next container in the list
*/
private Container forwardToNextContainer(List<Container> containers) {
this.currentContainerIndex = (this.currentContainerIndex + 1) % containers.size();
@@ -210,7 +213,8 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Get current container
*
* @param containers registered container Returns current container to allocate actor
* @param containers registered container
* @return current container to allocate actor
*/
private Container getCurrentContainer(List<Container> containers) {
return containers.get(currentContainerIndex);
@@ -8,7 +8,8 @@ public interface JobScheduler {
/**
* Schedule streaming job using the physical plan.
*
* @param executionGraph physical plan Returns scheduling result
* @param executionGraph physical plan
* @return scheduling result
*/
boolean scheduleJob(ExecutionGraph executionGraph);
}
@@ -95,7 +95,8 @@ public class JobSchedulerImpl implements JobScheduler {
/**
* Create JobWorker actors according to the physical plan.
*
* @param executionGraph physical plan Returns actor creation result
* @param executionGraph physical plan
* @return actor creation result
*/
public boolean createWorkers(ExecutionGraph executionGraph) {
LOG.info("Begin creating workers.");
@@ -148,7 +149,8 @@ public class JobSchedulerImpl implements JobScheduler {
/**
* Build workers context.
*
* @param executionGraph execution graph Returns vertex to worker context map
* @param executionGraph execution graph
* @return vertex to worker context map
*/
protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(
ExecutionGraph executionGraph) {
@@ -36,7 +36,8 @@ public class WorkerLifecycleController {
/**
* Create JobWorker actor according to the execution vertex.
*
* @param executionVertex target execution vertex Returns creation result
* @param executionVertex target execution vertex
* @return creation result
*/
private boolean createWorker(ExecutionVertex executionVertex) {
LOG.info(
@@ -84,7 +85,8 @@ public class WorkerLifecycleController {
* Using context to init JobWorker.
*
* @param vertexToContextMap target JobWorker actor
* @param timeout timeout for waiting, unit: ms Returns initiation result
* @param timeout timeout for waiting, unit: ms
* @return initiation result
*/
public boolean initWorkers(
Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) {
@@ -120,7 +122,8 @@ public class WorkerLifecycleController {
* Start JobWorkers to run task.
*
* @param executionGraph physical plan
* @param timeout timeout for waiting, unit: ms Returns starting result
* @param timeout timeout for waiting, unit: ms
* @return starting result
*/
public boolean startWorkers(ExecutionGraph executionGraph, long lastCheckpointId, int timeout) {
LOG.info("Begin starting workers.");
@@ -150,7 +153,8 @@ public class WorkerLifecycleController {
/**
* Stop and destroy JobWorkers' actor.
*
* @param executionVertices target vertices Returns destroy result
* @param executionVertices target vertices
* @return destroy result
*/
public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
return asyncBatchExecute(this::destroyWorker, executionVertices);
@@ -25,7 +25,8 @@ public class RemoteCallWorker {
* Call JobWorker actor to init.
*
* @param actor target JobWorker actor
* @param context JobWorker's context Returns init result
* @param context JobWorker's context
* @return init result
*/
public static ObjectRef<Boolean> initWorker(BaseActorHandle actor, JobWorkerContext context) {
LOG.info("Call worker to initiate, actor: {}, context: {}.", actor.getId(), context);
@@ -50,7 +51,8 @@ public class RemoteCallWorker {
* Call JobWorker actor to start.
*
* @param actor target JobWorker actor
* @param checkpointId checkpoint ID to be rollback Returns start result
* @param checkpointId checkpoint ID to be rollback
* @return start result
*/
public static ObjectRef rollback(BaseActorHandle actor, final Long checkpointId) {
LOG.info("Call worker to start, actor: {}.", actor.getId());
@@ -79,7 +81,8 @@ public class RemoteCallWorker {
/**
* Call JobWorker actor to destroy without reconstruction.
*
* @param actor target JobWorker actor Returns destroy result
* @param actor target JobWorker actor
* @return destroy result
*/
public static Boolean shutdownWithoutReconstruction(BaseActorHandle actor) {
LOG.info("Call worker to shutdown without reconstruction, actor is {}.", actor.getId());
@@ -115,7 +115,8 @@ public class DataReader {
/**
* Read message from input channels, if timeout, return null.
*
* @param timeoutMillis timeout Returns message or null
* @param timeoutMillis timeout
* @return message or null
*/
public ChannelMessage read(long timeoutMillis) {
if (buf.isEmpty()) {
@@ -86,7 +86,8 @@ public class ChannelId {
* Generate channel name, which will be {@link ChannelId#ID_LENGTH} character
*
* @param fromTaskId upstream task id
* @param toTaskId downstream task id Returns channel name
* @param toTaskId downstream task id
* @return channel name
*/
public static String genIdStr(int fromTaskId, int toTaskId, long ts) {
/*
@@ -116,7 +117,8 @@ public class ChannelId {
}
/**
* @param id hex string representation of channel id Returns bytes representation of channel id
* @param id hex string representation of channel id
* @return bytes representation of channel id
*/
public static byte[] idStrToBytes(String id) {
byte[] idBytes = BaseEncoding.base16().decode(id.toUpperCase());
@@ -125,7 +127,8 @@ public class ChannelId {
}
/**
* @param id bytes representation of channel id Returns hex string representation of channel id
* @param id bytes representation of channel id
* @return hex string representation of channel id
*/
public static String idBytesToStr(byte[] id) {
assert id.length == ChannelId.ID_LENGTH;
@@ -36,7 +36,7 @@ public class EnvUtil {
/**
* Execute an external command.
*
* <p>Returns Whether the command succeeded.
* @return Whether the command succeeded.
*/
public static boolean executeCommand(List<String> command, int waitTimeoutSeconds) {
try {
@@ -77,7 +77,10 @@ public final class Platform {
buffer.clear();
}
/** @param buffer a DirectBuffer backed by off-heap memory Returns address of off-heap memory */
/**
* @param buffer a DirectBuffer backed by off-heap memory
* @return address of off-heap memory
*/
public static long getAddress(ByteBuffer buffer) {
return ((DirectBuffer) buffer).address();
}
@@ -15,7 +15,7 @@ public class RayUtils {
/**
* Get all node info from GCS
*
* <p>Returns node info list
* @return node info list
*/
public static List<NodeInfo> getAllNodeInfo() {
if (Ray.getRuntimeContext().isSingleProcess()) {
@@ -28,7 +28,7 @@ public class RayUtils {
/**
* Get all alive node info map
*
* <p>Returns node info map, key is unique node id , value is node info
* @return node info map, key is unique node id , value is node info
*/
public static Map<UniqueId, NodeInfo> getAliveNodeInfoMap() {
return getAllNodeInfo().stream()
@@ -20,7 +20,7 @@ public class ReflectionUtils {
/**
* For covariant return type, return the most specific method.
*
* <p>Returns all methods named by {@code methodName},
* @return all methods named by {@code methodName},
*/
public static List<Method> findMethods(Class<?> cls, String methodName) {
List<Class<?>> classes = new ArrayList<>();
@@ -52,8 +52,8 @@ public class ResourceUtil {
}
/**
* Returns jvm heap usage ratio. note that one of the survivor space is not include in total
* memory while calculating this ratio.
* @return jvm heap usage ratio. note that one of the survivor space is not include in total
* memory while calculating this ratio.
*/
public static double getJvmHeapUsageRatio() {
Runtime runtime = Runtime.getRuntime();
@@ -61,8 +61,8 @@ public class ResourceUtil {
}
/**
* Returns jvm heap usage(in bytes). note that this value doesn't include one of the survivor
* space.
* @return jvm heap usage(in bytes). note that this value doesn't include one of the survivor
* space.
*/
public static long getJvmHeapUsageInBytes() {
Runtime runtime = Runtime.getRuntime();
@@ -95,8 +95,8 @@ public class ResourceUtil {
}
/**
* Returns the system cpu usage. This value is a double in the [0.0,1.0] We will try to use `vsar`
* to get cpu usage by default, and use MXBean if any exception raised.
* @return the system cpu usage. This value is a double in the [0.0,1.0] We will try to use `vsar`
* to get cpu usage by default, and use MXBean if any exception raised.
*/
public static double getSystemCpuUsage() {
double cpuUsage = 0.0;
@@ -109,10 +109,10 @@ public class ResourceUtil {
}
/**
* Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0]
* interval. A value of 0.0 means that all CPUs were idle during the recent period of time
* observed, while a value of 1.0 means that all CPUs were actively running 100% of the time
* during the recent period being observed
* @return the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0]
* interval. A value of 0.0 means that all CPUs were idle during the recent period of time
* observed, while a value of 1.0 means that all CPUs were actively running 100% of the time
* during the recent period being observed
*/
public static double getSystemCpuUtilByMXBean() {
return osmxb.getSystemCpuLoad();
@@ -144,7 +144,7 @@ public class ResourceUtil {
return cpuUsageFromVsar;
}
/** Returnss the system load average for the last minute */
/** Returns the system load average for the last minute */
public static double getSystemLoadAverage() {
return osmxb.getSystemLoadAverage();
}
@@ -158,7 +158,8 @@ public class ResourceUtil {
* Get containers by hostname of address
*
* @param containers container list
* @param containerHosts container hostname or address set Returns matched containers
* @param containerHosts container hostname or address set
* @return matched containers
*/
public static List<Container> getContainersByHostname(
List<Container> containers, Collection<String> containerHosts) {
@@ -174,7 +175,8 @@ public class ResourceUtil {
/**
* Get container by hostname
*
* @param hostName container hostname Returns container
* @param hostName container hostname
* @return container
*/
public static Optional<Container> getContainerByHostname(
List<Container> containers, String hostName) {
@@ -188,7 +190,8 @@ public class ResourceUtil {
/**
* Get container by id
*
* @param containerID container id Returns container
* @param containerID container id
* @return container
*/
public static Optional<Container> getContainerById(
List<Container> containers, ContainerId containerID) {
@@ -137,8 +137,8 @@ public class JobWorker implements Serializable {
/**
* Start worker's stream tasks with specific checkpoint ID.
*
* <p>Returns a {@link CallResult} with {@link ChannelRecoverInfo}, contains {@link
* ChannelCreationStatus} of each input queue.
* @return a {@link CallResult} with {@link ChannelRecoverInfo}, contains {@link
* ChannelCreationStatus} of each input queue.
*/
public CallResult<ChannelRecoverInfo> rollback(Long checkpointId, Long startRollbackTs) {
synchronized (initialStateChangeLock) {
@@ -49,8 +49,8 @@ public class Mockitools {
/**
* Mock get node info map
*
* @param nodeInfos all node infos fetched from GCS Returns node info map, key is node unique id,
* value is node info
* @param nodeInfos all node infos fetched from GCS
* @return node info map, key is node unique id, value is node info
*/
public static Map<UniqueId, NodeInfo> mockGetNodeInfoMap(List<NodeInfo> nodeInfos) {
return nodeInfos.stream()
@@ -50,8 +50,8 @@ public final class KeyGroupAssignment {
* Assigning the key to a key-group index.
*
* @param key the key to assign.
* @param maxParallelism the maximum parallelism. Returns the key-group index to which the given
* key is assigned.
* @param maxParallelism the maximum parallelism.
* @return the key-group index to which the given key is assigned.
*/
public static int assignKeyGroupIndexForKey(Object key, int maxParallelism) {
return Math.abs(key.hashCode() % maxParallelism);
@@ -28,7 +28,8 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Returns the current value associated with the given key.
*
* @param key The key of the mapping Returns The value of the mapping with the given key
* @param key The key of the mapping
* @return The value of the mapping with the given key
*/
V get(K key);
@@ -64,8 +65,8 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Returns whether there exists the given mapping.
*
* @param key The key of the mapping Returns True if there exists a mapping whose key equals to
* the given key
* @param key The key of the mapping
* @return True if there exists a mapping whose key equals to the given key
*/
default boolean contains(K key) {
return get().containsKey(key);
@@ -74,7 +75,7 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Returns all the mappings in the state
*
* <p>Returns An iterable view of all the key-value pairs in the state.
* @return An iterable view of all the key-value pairs in the state.
*/
default Iterable<Entry<K, V>> entries() {
return get().entrySet();
@@ -83,7 +84,7 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Returns all the keys in the state
*
* <p>Returns An iterable view of all the keys in the state.
* @return An iterable view of all the keys in the state.
*/
default Iterable<K> keys() {
return get().keySet();
@@ -92,7 +93,7 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Returns all the values in the state.
*
* <p>Returns An iterable view of all the values in the state.
* @return An iterable view of all the values in the state.
*/
default Iterable<V> values() {
return get().values();
@@ -101,7 +102,7 @@ public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Iterates over all the mappings in the state.
*
* <p>Returns An iterator over all the mappings in the state
* @return An iterator over all the mappings in the state
*/
default Iterator<Entry<K, V>> iterator() {
return get().entrySet().iterator();
@@ -24,7 +24,7 @@ public interface UnaryState<O> extends State {
/**
* get the value in state
*
* <p>Returns the value in state
* @return the value in state
*/
O get();
}