[Metrics]Ray java worker metric registry (#9636)

* ray worker metrics gauge init

* ray java metric mapping

* add jni source files for gauge and tagkey

* mapping all metric classes to stats object

* check non-null for tags and name

* lint

* add symbol for native metric JNI

* extern c for symbol

* add tests for all metrics

* Update Metric.java

use metricNativePointer instead.

* unify metric native stuff to one class

* fix jni file

* add comments for metric transform function in jni utils

* move metric function to native metric file

* remove unused disconnect jni

* Add a metric registry for java metircs

* Restore install-bazel.sh

* Add some comments for metric registry

* Fix thread safe problem of metrics

* Fix metric tests and remove sleep code from tests

* Fix comments of metrics

Co-authored-by: lingxuan.zlx <skyzlxuan@gmail.com>
This commit is contained in:
bermaker
2020-07-28 21:29:33 +08:00
committed by GitHub
parent ff9c1dac88
commit 6e23aff723
11 changed files with 675 additions and 73 deletions
@@ -3,15 +3,19 @@ package io.ray.runtime.metric;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.stream.Collectors;
/**
* Count measurement is mapped to count object in stats and counts the number.
*/
public class Count extends Metric {
private double count;
private DoubleAdder count;
public Count(String name, String description, String unit, Map<TagKey, String> tags) {
super(name, tags);
count = 0.0d;
count = new DoubleAdder();
metricNativePointer = NativeMetric.registerCountNative(name, description, unit,
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
Preconditions.checkState(metricNativePointer != 0, "Count native pointer must not be 0.");
@@ -19,28 +23,19 @@ public class Count extends Metric {
@Override
public void update(double value) {
super.update(value);
count += value;
count.add(value);
this.value.addAndGet(value);
}
@Override
public void update(double value, Map<TagKey, String> tags) {
super.update(value, tags);
count += value;
}
@Override
public void reset() {
protected double getAndReset() {
return count.sumThenReset();
}
public double getCount() {
return count;
return this.value.get();
}
/**
* @param delta add delta for counter
*/
public void inc(double delta) {
update(delta);
}
@@ -6,7 +6,7 @@ import java.util.stream.Collectors;
/**
* Gauge metric for recording last value and mapping object from stats.
* Gauge measurement is mapped to gauge object in stats and is recording the last value.
*/
public class Gauge extends Metric {
@@ -17,9 +17,18 @@ public class Gauge extends Metric {
Preconditions.checkState(metricNativePointer != 0, "Gauge native pointer must not be 0.");
}
@Override
public void reset() {
public double getValue() {
return value.doubleValue();
}
@Override
protected double getAndReset() {
return value.doubleValue();
}
@Override
public void update(double value) {
this.value.set(value);
}
}
@@ -3,6 +3,7 @@ package io.ray.runtime.metric;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -25,7 +26,7 @@ public class Histogram extends Metric {
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
Preconditions.checkState(metricNativePointer != 0,
"Histogram native pointer must not be 0.");
histogramWindow = new ArrayList<>();
histogramWindow = Collections.synchronizedList(new ArrayList<>());
}
private void updateForWindow(double value) {
@@ -37,22 +38,21 @@ public class Histogram extends Metric {
@Override
public void update(double value) {
super.update(value);
updateForWindow(value);
this.value.set(value);
}
@Override
public void update(double value, Map<TagKey, String> tags) {
super.update(value, tags);
updateForWindow(value);
}
@Override
public void reset() {
protected double getAndReset() {
histogramWindow.clear();
return value.doubleValue();
}
public List<Double> getHistogramWindow() {
return histogramWindow;
}
public double getValue() {
return value.get();
}
}
@@ -1,7 +1,7 @@
package io.ray.runtime.metric;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -14,7 +14,8 @@ import java.util.stream.Collectors;
public abstract class Metric {
protected String name;
protected double value;
protected AtomicDouble value;
// Native pointer mapping to gauge object of stats.
protected long metricNativePointer = 0L;
@@ -25,7 +26,7 @@ public abstract class Metric {
Preconditions.checkNotNull(name, "Metric name must not be null.");
this.name = name;
this.tags = tags;
this.value = 0.0d;
this.value = new AtomicDouble();
}
// Sync metric with core worker stats for registry.
@@ -44,25 +45,28 @@ public abstract class Metric {
tagValues.add(entry.getValue());
}
// Get tag value list from map;
NativeMetric.recordNative(metricNativePointer, value, nativeTagKeyList.stream()
NativeMetric.recordNative(metricNativePointer, getAndReset(), nativeTagKeyList.stream()
.map(TagKey::getTagKey).collect(Collectors.toList()), tagValues);
}
/**
* Get the value to record and then reset.
* @return latest updating value.
*/
protected abstract double getAndReset();
/** Update gauge value without tags.
* Update metric info for user.
* @param value lastest value for updating
* @param value latest value for updating
*/
public void update(double value) {
this.value = value;
}
public abstract void update(double value);
/** Update gauge value with dynamic tag values.
* @param value lastest value for updating
* @param value latest value for updating
* @param tags tag map
*/
public void update(double value, Map<TagKey, String> tags) {
this.value = value;
update(value);
this.tags = tags;
}
@@ -76,16 +80,4 @@ public abstract class Metric {
metricNativePointer = 0;
}
/**
* @return lastest updating value.
*/
public double getValue() {
return value;
}
/**
* It's abstract method for each metric measurements, so metric registry can store transient
* value and aggregate historical data for flushing.
*/
public abstract void reset();
}
@@ -0,0 +1,79 @@
package io.ray.runtime.metric;
import com.google.common.base.MoreObjects;
/**
* Configurations of the metric.
*/
public class MetricConfig {
private static final long DEFAULT_TIME_INTERVAL_MS = 5000L;
private static final int DEFAULT_THREAD_POLL_SIZE = 1;
private static final long DEFAULT_SHUTDOWN_WAIT_TIME_MS = 3000L;
public static final MetricConfig DEFAULT_CONFIG =
new MetricConfig(DEFAULT_TIME_INTERVAL_MS, DEFAULT_THREAD_POLL_SIZE,
DEFAULT_SHUTDOWN_WAIT_TIME_MS);
private final long timeIntervalMs;
private final int threadPoolSize;
private final long shutdownWaitTimeMs;
public MetricConfig(long timeIntervalMs, int threadPoolSize, long shutdownWaitTimeMs) {
this.timeIntervalMs = timeIntervalMs;
this.threadPoolSize = threadPoolSize;
this.shutdownWaitTimeMs = shutdownWaitTimeMs;
}
public long timeIntervalMs() {
return timeIntervalMs;
}
public int threadPoolSize() {
return threadPoolSize;
}
public long shutdownWaitTimeMs() {
return shutdownWaitTimeMs;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("timeIntervalMs", timeIntervalMs)
.add("threadPoolSize", threadPoolSize)
.add("shutdownWaitTimeMs", shutdownWaitTimeMs)
.toString();
}
public static MetricConfigBuilder builder() {
return new MetricConfigBuilder();
}
public static class MetricConfigBuilder {
private long timeIntervalMs = DEFAULT_TIME_INTERVAL_MS;
private int threadPooSize = DEFAULT_THREAD_POLL_SIZE;
private long shutdownWaitTimeMs = DEFAULT_SHUTDOWN_WAIT_TIME_MS;
public MetricConfig create() {
return new MetricConfig(timeIntervalMs, threadPooSize, shutdownWaitTimeMs);
}
public MetricConfigBuilder timeIntervalMs(long timeIntervalMs) {
this.timeIntervalMs = timeIntervalMs;
return this;
}
public MetricConfigBuilder threadPoolSize(int threadPooSize) {
this.threadPooSize = threadPooSize;
return this;
}
public MetricConfigBuilder shutdownWaitTimeMs(long shutdownWaitTimeMs) {
this.shutdownWaitTimeMs = shutdownWaitTimeMs;
return this;
}
}
}
@@ -0,0 +1,64 @@
package io.ray.runtime.metric;
import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Objects;
/**
* MetricId represents a metric with a given type, name and tags.
* If two metrics have the same type and name but different tags(including key and value), they have
* a different MetricId. And in this way, {@link MetricRegistry} can register two metrics with same
* name but different tags.
*/
public class MetricId {
private final MetricType type;
private final String name;
private final Map<TagKey, String> tags;
public MetricId(MetricType type, String name, Map<TagKey, String> tags) {
this.type = type;
this.name = name;
this.tags = tags;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof MetricId)) {
return false;
}
MetricId metricId = (MetricId) o;
return type == metricId.type &&
Objects.equals(name, metricId.name) &&
Objects.equals(tags, metricId.tags);
}
@Override
public int hashCode() {
return Objects.hash(type, name, tags);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("name", name)
.add("tags", tags)
.toString();
}
public MetricType getType() {
return type;
}
public String getName() {
return name;
}
public Map<TagKey, String> getTags() {
return tags;
}
}
@@ -0,0 +1,135 @@
package io.ray.runtime.metric;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricRegistry is a registry for metrics to be registered and updates metrics.
*/
public class MetricRegistry {
public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry();
private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
private final Map<MetricId, Metric> registeredMetrics = new HashMap<>(64);
private MetricConfig metricConfig;
private ScheduledExecutorService scheduledExecutorService;
private volatile boolean isRunning = false;
public void startup() {
startup(MetricConfig.DEFAULT_CONFIG);
}
public void startup(MetricConfig metricConfig) {
synchronized (this) {
if (!isRunning) {
this.metricConfig = metricConfig;
scheduledExecutorService = new ScheduledThreadPoolExecutor(metricConfig.threadPoolSize(),
new ThreadFactoryBuilder().setNameFormat("metric-registry-%d").build());
scheduledExecutorService.scheduleAtFixedRate(this::update, metricConfig.timeIntervalMs(),
metricConfig.timeIntervalMs(), TimeUnit.MILLISECONDS);
isRunning = true;
LOG.info("Finished startup metric registry, metricConfig is {}.", metricConfig);
}
}
}
public void shutdown() {
synchronized (this) {
if (isRunning && scheduledExecutorService != null) {
try {
scheduledExecutorService.shutdownNow();
if (!scheduledExecutorService
.awaitTermination(metricConfig.shutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) {
LOG.warn("Metric registry did not shut down in {}ms time, so try to shut down again.",
metricConfig.shutdownWaitTimeMs());
scheduledExecutorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted when shutting down metric registry, so try to shut down again.",
e.getMessage(), e);
scheduledExecutorService.shutdownNow();
}
if (scheduledExecutorService.isShutdown()) {
isRunning = false;
scheduledExecutorService = null;
LOG.info("Metric registry has been shut down.");
} else {
LOG.warn("Failed to shut down metric registry service.");
}
}
}
}
public Metric register(Metric metric) {
synchronized (this) {
if (!isRunning) {
LOG.warn("Failed to register a metric, because the metric registry is not running.");
return null;
}
try {
MetricId id = genMetricIdByMetric(metric);
Metric ori = registeredMetrics.putIfAbsent(id, metric);
if (ori == null) {
return metric;
} else {
LOG.info("Metric {} has already registered, so use the previous one.", id);
return ori;
}
} catch (Exception e) {
LOG.warn("Failed to register a metric: ", e.getMessage(), e);
return null;
}
}
}
public void unregister(Metric metric) {
synchronized (this) {
if (!isRunning) {
LOG.warn("Failed to unregister a metric, because the metric registry is not running.");
}
try {
MetricId id = genMetricIdByMetric(metric);
registeredMetrics.remove(id);
} catch (Exception e) {
LOG.warn("Failed to unregister a metric: ", e.getMessage(), e);
}
}
}
private void update() {
registeredMetrics.forEach((id, metric) -> {
metric.record();
});
}
private MetricType getMetricType(Metric metric) {
if (metric instanceof Count) {
return MetricType.COUNT;
}
if (metric instanceof Gauge) {
return MetricType.GAUGE;
}
if (metric instanceof Sum) {
return MetricType.SUM;
}
if (metric instanceof Histogram) {
return MetricType.HISTOGRAM;
}
throw new RuntimeException(
"Unknown metric type, the metric is " + metric.getClass().getSimpleName());
}
private MetricId genMetricIdByMetric(Metric metric) {
return new MetricId(getMetricType(metric), metric.name, metric.tags);
}
}
@@ -0,0 +1,16 @@
package io.ray.runtime.metric;
/**
* Types of the metric.
*/
public enum MetricType {
COUNT,
GAUGE,
SUM,
HISTOGRAM;
}
@@ -0,0 +1,148 @@
package io.ray.runtime.metric;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* The entry of metrics for easy use.
*/
public final class Metrics {
private static MetricRegistry metricRegistry;
public static MetricRegistry init(MetricConfig metricConfig) {
synchronized (Metrics.class) {
metricRegistry = new MetricRegistry();
metricRegistry.startup(metricConfig);
return metricRegistry;
}
}
public static void shutdown() {
synchronized (Metrics.class) {
if (metricRegistry != null) {
metricRegistry.shutdown();
metricRegistry = null;
}
}
}
public static CountBuilder count() {
return new CountBuilder();
}
public static GaugeBuilder gauge() {
return new GaugeBuilder();
}
public static SumBuilder sum() {
return new SumBuilder();
}
public static HistogramBuilder histogram() {
return new HistogramBuilder();
}
public static class CountBuilder extends AbstractBuilder<CountBuilder, Count> {
@Override
protected Count create() {
return new Count(name, description, unit, generateTagKeysMap(tags));
}
}
public static class GaugeBuilder extends AbstractBuilder<GaugeBuilder, Gauge> {
@Override
protected Gauge create() {
return new Gauge(name, description, unit, generateTagKeysMap(tags));
}
}
public static class SumBuilder extends AbstractBuilder<SumBuilder, Sum> {
@Override
protected Sum create() {
return new Sum(name, description, unit, generateTagKeysMap(tags));
}
}
public static class HistogramBuilder extends AbstractBuilder<HistogramBuilder, Histogram> {
private List<Double> boundaries;
public HistogramBuilder boundaries(List<Double> boundaries) {
this.boundaries = boundaries;
return this;
}
@Override
protected Histogram create() {
return new Histogram(name, description, unit, boundaries, generateTagKeysMap(tags));
}
}
public abstract static class AbstractBuilder<B extends AbstractBuilder, M extends Metric> {
protected String name;
protected String description;
protected String unit;
protected Map<String, String> tags;
public B name(String name) {
this.name = Preconditions.checkNotNull(name);
return (B) this;
}
public B description(String description) {
this.description = Preconditions.checkNotNull(description);
return (B) this;
}
public B unit(String unit) {
this.unit = Preconditions.checkNotNull(unit);
return (B) this;
}
public B tags(Map<String, String> tags) {
this.tags = Preconditions.checkNotNull(tags);
return (B) this;
}
/**
* Creates a metric by sub-class.
*
* @return a metric
*/
protected abstract M create();
public M register() {
M m = create();
maybeInitRegistry();
return (M) metricRegistry.register(m);
}
}
private static Map<TagKey, String> generateTagKeysMap(Map<String, String> tags) {
Map<TagKey, String> tagKeys = new HashMap<>(tags.size() * 2);
tags.forEach((key, value) -> {
TagKey tagKey = new TagKey(key);
tagKeys.put(tagKey, value);
});
return tagKeys;
}
private static MetricRegistry maybeInitRegistry() {
synchronized (Metrics.class) {
if (metricRegistry != null) {
return metricRegistry;
} else {
metricRegistry = MetricRegistry.DEFAULT_REGISTRY;
metricRegistry.startup();
return metricRegistry;
}
}
}
}
@@ -1,9 +1,9 @@
package io.ray.runtime.metric;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.stream.Collectors;
/**
@@ -11,34 +11,29 @@ import java.util.stream.Collectors;
* Property sum is used for storing transient sum for registry aggregation.
*/
public class Sum extends Metric {
private double sum;
private DoubleAdder sum;
public Sum(String name, String description, String unit, Map<TagKey, String> tags) {
super(name, tags);
metricNativePointer = NativeMetric.registerSumNative(name, description, unit,
tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList()));
Preconditions.checkState(metricNativePointer != 0,"Count native pointer must not be 0.");
this.sum = 0.0d;
Preconditions.checkState(metricNativePointer != 0, "Count native pointer must not be 0.");
this.sum = new DoubleAdder();
}
@Override
public void update(double value) {
super.update(value);
sum += value;
sum.add(value);
this.value.addAndGet(value);
}
@Override
public void update(double value, Map<TagKey, String> tags) {
super.update(value, tags);
sum += value;
}
@Override
public void reset() {
protected double getAndReset() {
return sum.sumThenReset();
}
public double getSum() {
return sum;
return value.get();
}
}