mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 02:42:52 +08:00
[Metrics] Java metric API (#9377)
This commit is contained in:
@@ -135,8 +135,6 @@ public class GlobalStateAccessor {
|
||||
|
||||
private native boolean nativeConnect(long nativePtr);
|
||||
|
||||
private native void nativeDisconnect(long nativePtr);
|
||||
|
||||
private native List<byte[]> nativeGetAllJobInfo(long nativePtr);
|
||||
|
||||
private native List<byte[]> nativeGetAllNodeInfo(long nativePtr);
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Count extends Metric {
|
||||
|
||||
private double count;
|
||||
|
||||
public Count(String name, String description, String unit, Map<TagKey, String> tags) {
|
||||
super(name, tags);
|
||||
count = 0.0d;
|
||||
metricNativePointer = NativeMetric.registerCountNative(name, description, unit,
|
||||
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
|
||||
Preconditions.checkState(metricNativePointer != 0, "Count native pointer must not be 0.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value) {
|
||||
super.update(value);
|
||||
count += value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value, Map<TagKey, String> tags) {
|
||||
super.update(value, tags);
|
||||
count += value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
|
||||
}
|
||||
|
||||
public double getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param delta add delta for counter
|
||||
*/
|
||||
public void inc(double delta) {
|
||||
update(delta);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* Gauge metric for recording last value and mapping object from stats.
|
||||
*/
|
||||
public class Gauge extends Metric {
|
||||
|
||||
public Gauge(String name, String description, String unit, Map<TagKey, String> tags) {
|
||||
super(name, tags);
|
||||
metricNativePointer = NativeMetric.registerGaugeNative(name, description, unit,
|
||||
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
|
||||
Preconditions.checkState(metricNativePointer != 0, "Gauge native pointer must not be 0.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Histogram measurement is mapped to histogram object in stats.
|
||||
* In order to reduce JNI calls overhead, a memory historical window is used
|
||||
* for storing transient value and we assume its max size is 100.
|
||||
*/
|
||||
public class Histogram extends Metric {
|
||||
|
||||
private List<Double> histogramWindow;
|
||||
public static final int HISTOGRAM_WINDOW_SIZE = 100;
|
||||
|
||||
public Histogram(String name, String description, String unit, List<Double> boundaries,
|
||||
Map<TagKey, String> tags) {
|
||||
super(name, tags);
|
||||
metricNativePointer = NativeMetric.registerHistogramNative(name, description, unit,
|
||||
boundaries.stream().mapToDouble(Double::doubleValue).toArray(),
|
||||
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
|
||||
Preconditions.checkState(metricNativePointer != 0,
|
||||
"Histogram native pointer must not be 0.");
|
||||
histogramWindow = new ArrayList<>();
|
||||
}
|
||||
|
||||
private void updateForWindow(double value) {
|
||||
if (histogramWindow.size() == HISTOGRAM_WINDOW_SIZE) {
|
||||
histogramWindow.remove(0);
|
||||
}
|
||||
histogramWindow.add(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value) {
|
||||
super.update(value);
|
||||
updateForWindow(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value, Map<TagKey, String> tags) {
|
||||
super.update(value, tags);
|
||||
updateForWindow(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
|
||||
}
|
||||
|
||||
public List<Double> getHistogramWindow() {
|
||||
return histogramWindow;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Class metric is mapped to stats metric object in core worker.
|
||||
* it must be in categories set [Gague, Count, Sum, Histogram].
|
||||
*/
|
||||
public abstract class Metric {
|
||||
protected String name;
|
||||
|
||||
protected double value;
|
||||
// Native pointer mapping to gauge object of stats.
|
||||
protected long metricNativePointer = 0L;
|
||||
|
||||
protected Map<TagKey, String> tags;
|
||||
|
||||
public Metric(String name, Map<TagKey, String> tags) {
|
||||
Preconditions.checkNotNull(tags, "Metric tags map must not be null.");
|
||||
Preconditions.checkNotNull(name, "Metric name must not be null.");
|
||||
this.name = name;
|
||||
this.tags = tags;
|
||||
this.value = 0.0d;
|
||||
}
|
||||
|
||||
// Sync metric with core worker stats for registry.
|
||||
// Metric data will be flushed into stats view data inside core worker immediately after
|
||||
// record is called.
|
||||
/**
|
||||
* Flush records to stats in last aggregator.
|
||||
*/
|
||||
public void record() {
|
||||
Preconditions.checkState(metricNativePointer != 0, "Metric native pointer must not be 0.");
|
||||
// Get tag key list from map;
|
||||
List<TagKey> nativeTagKeyList = new ArrayList<>();
|
||||
List<String> tagValues = new ArrayList<>();
|
||||
for (Map.Entry<TagKey, String> entry : tags.entrySet()) {
|
||||
nativeTagKeyList.add(entry.getKey());
|
||||
tagValues.add(entry.getValue());
|
||||
}
|
||||
// Get tag value list from map;
|
||||
NativeMetric.recordNative(metricNativePointer, value, nativeTagKeyList.stream()
|
||||
.map(TagKey::getTagKey).collect(Collectors.toList()), tagValues);
|
||||
}
|
||||
|
||||
/** Update gauge value without tags.
|
||||
* Update metric info for user.
|
||||
* @param value lastest value for updating
|
||||
*/
|
||||
public void update(double value) {
|
||||
this.value = value;
|
||||
|
||||
}
|
||||
|
||||
/** Update gauge value with dynamic tag values.
|
||||
* @param value lastest value for updating
|
||||
* @param tags tag map
|
||||
*/
|
||||
public void update(double value, Map<TagKey, String> tags) {
|
||||
this.value = value;
|
||||
this.tags = tags;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deallocate object from stats and reset native pointer in null.
|
||||
*/
|
||||
public void unregister() {
|
||||
if (0 != metricNativePointer) {
|
||||
NativeMetric.unregisterMetricNative(metricNativePointer);
|
||||
}
|
||||
metricNativePointer = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return lastest updating value.
|
||||
*/
|
||||
public double getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* It's abstract method for each metric measurements, so metric registry can store transient
|
||||
* value and aggregate historical data for flushing.
|
||||
*/
|
||||
public abstract void reset();
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Native metric provide a native interface to register tag or metric for current metric package.
|
||||
*/
|
||||
class NativeMetric {
|
||||
public static native void registerTagkeyNative(String tagKey);
|
||||
|
||||
public static native long registerCountNative(String name, String description,
|
||||
String unit, List<String> tagKeys);
|
||||
|
||||
public static native long registerGaugeNative(String name, String description,
|
||||
String unit, List<String> tagKeys);
|
||||
|
||||
public static native long registerHistogramNative(String name, String description,
|
||||
String unit, double[] boundaries,
|
||||
List<String> tagKeys);
|
||||
|
||||
public static native long registerSumNative(String name, String description,
|
||||
String unit, List<String> tagKeys);
|
||||
|
||||
public static native void recordNative(long metricNativePointer, double value,
|
||||
List tagKeys, List<String> tagValues);
|
||||
|
||||
public static native void unregisterMetricNative(long gaugePtr);
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Sum measurement is mapped to sum object in stats.
|
||||
* Property sum is used for storing transient sum for registry aggregation.
|
||||
*/
|
||||
public class Sum extends Metric {
|
||||
private double sum;
|
||||
|
||||
public Sum(String name, String description, String unit, Map<TagKey, String> tags) {
|
||||
super(name, tags);
|
||||
metricNativePointer = NativeMetric.registerSumNative(name, description, unit,
|
||||
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
|
||||
Preconditions.checkState(metricNativePointer != 0,"Count native pointer must not be 0.");
|
||||
this.sum = 0.0d;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value) {
|
||||
super.update(value);
|
||||
sum += value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(double value, Map<TagKey, String> tags) {
|
||||
super.update(value, tags);
|
||||
sum += value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
|
||||
}
|
||||
|
||||
public double getSum() {
|
||||
return sum;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package io.ray.runtime.metric;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Tagkey is mapping java object to stats tagkey object.
|
||||
*/
|
||||
public class TagKey {
|
||||
|
||||
private String tagKey;
|
||||
|
||||
public TagKey(String key) {
|
||||
tagKey = key;
|
||||
NativeMetric.registerTagkeyNative(key);
|
||||
}
|
||||
|
||||
public String getTagKey() {
|
||||
return tagKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (!(o instanceof TagKey)) {
|
||||
return false;
|
||||
}
|
||||
TagKey tagKey1 = (TagKey) o;
|
||||
return Objects.equals(tagKey, tagKey1.tagKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(tagKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TagKey{" +
|
||||
", tagKey='" + tagKey + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user