diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Count.java b/java/runtime/src/main/java/io/ray/runtime/metric/Count.java index b196621e0..986af6c5e 100644 --- a/java/runtime/src/main/java/io/ray/runtime/metric/Count.java +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Count.java @@ -3,15 +3,19 @@ package io.ray.runtime.metric; import com.google.common.base.Preconditions; import java.util.Map; +import java.util.concurrent.atomic.DoubleAdder; import java.util.stream.Collectors; +/** + * Count measurement is mapped to count object in stats and counts the number. + */ public class Count extends Metric { - private double count; + private DoubleAdder count; public Count(String name, String description, String unit, Map tags) { super(name, tags); - count = 0.0d; + count = new DoubleAdder(); metricNativePointer = NativeMetric.registerCountNative(name, description, unit, tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList())); Preconditions.checkState(metricNativePointer != 0, "Count native pointer must not be 0."); @@ -19,28 +23,19 @@ public class Count extends Metric { @Override public void update(double value) { - super.update(value); - count += value; + count.add(value); + this.value.addAndGet(value); } @Override - public void update(double value, Map tags) { - super.update(value, tags); - count += value; - } - - @Override - public void reset() { - + protected double getAndReset() { + return count.sumThenReset(); } public double getCount() { - return count; + return this.value.get(); } - /** - * @param delta add delta for counter - */ public void inc(double delta) { update(delta); } diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Gauge.java b/java/runtime/src/main/java/io/ray/runtime/metric/Gauge.java index 2cafd6903..330349337 100644 --- a/java/runtime/src/main/java/io/ray/runtime/metric/Gauge.java +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Gauge.java @@ -6,7 +6,7 @@ import java.util.stream.Collectors; /** - * Gauge metric for recording last value and mapping object from stats. + * Gauge measurement is mapped to gauge object in stats and is recording the last value. */ public class Gauge extends Metric { @@ -17,9 +17,18 @@ public class Gauge extends Metric { Preconditions.checkState(metricNativePointer != 0, "Gauge native pointer must not be 0."); } - @Override - public void reset() { + public double getValue() { + return value.doubleValue(); + } + @Override + protected double getAndReset() { + return value.doubleValue(); + } + + @Override + public void update(double value) { + this.value.set(value); } } diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Histogram.java b/java/runtime/src/main/java/io/ray/runtime/metric/Histogram.java index abc3d2fb0..1b506cdfe 100644 --- a/java/runtime/src/main/java/io/ray/runtime/metric/Histogram.java +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Histogram.java @@ -3,6 +3,7 @@ package io.ray.runtime.metric; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -25,7 +26,7 @@ public class Histogram extends Metric { tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList())); Preconditions.checkState(metricNativePointer != 0, "Histogram native pointer must not be 0."); - histogramWindow = new ArrayList<>(); + histogramWindow = Collections.synchronizedList(new ArrayList<>()); } private void updateForWindow(double value) { @@ -37,22 +38,21 @@ public class Histogram extends Metric { @Override public void update(double value) { - super.update(value); updateForWindow(value); + this.value.set(value); } @Override - public void update(double value, Map tags) { - super.update(value, tags); - updateForWindow(value); - } - - @Override - public void reset() { - + protected double getAndReset() { + histogramWindow.clear(); + return value.doubleValue(); } public List getHistogramWindow() { return histogramWindow; } + + public double getValue() { + return value.get(); + } } diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Metric.java b/java/runtime/src/main/java/io/ray/runtime/metric/Metric.java index da599d860..c89ef37a9 100644 --- a/java/runtime/src/main/java/io/ray/runtime/metric/Metric.java +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Metric.java @@ -1,7 +1,7 @@ package io.ray.runtime.metric; import com.google.common.base.Preconditions; - +import com.google.common.util.concurrent.AtomicDouble; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -14,7 +14,8 @@ import java.util.stream.Collectors; public abstract class Metric { protected String name; - protected double value; + protected AtomicDouble value; + // Native pointer mapping to gauge object of stats. protected long metricNativePointer = 0L; @@ -25,7 +26,7 @@ public abstract class Metric { Preconditions.checkNotNull(name, "Metric name must not be null."); this.name = name; this.tags = tags; - this.value = 0.0d; + this.value = new AtomicDouble(); } // Sync metric with core worker stats for registry. @@ -44,25 +45,28 @@ public abstract class Metric { tagValues.add(entry.getValue()); } // Get tag value list from map; - NativeMetric.recordNative(metricNativePointer, value, nativeTagKeyList.stream() + NativeMetric.recordNative(metricNativePointer, getAndReset(), nativeTagKeyList.stream() .map(TagKey::getTagKey).collect(Collectors.toList()), tagValues); } + /** + * Get the value to record and then reset. + * @return latest updating value. + */ + protected abstract double getAndReset(); + /** Update gauge value without tags. * Update metric info for user. - * @param value lastest value for updating + * @param value latest value for updating */ - public void update(double value) { - this.value = value; - - } + public abstract void update(double value); /** Update gauge value with dynamic tag values. - * @param value lastest value for updating + * @param value latest value for updating * @param tags tag map */ public void update(double value, Map tags) { - this.value = value; + update(value); this.tags = tags; } @@ -76,16 +80,4 @@ public abstract class Metric { metricNativePointer = 0; } - /** - * @return lastest updating value. - */ - public double getValue() { - return value; - } - - /** - * It's abstract method for each metric measurements, so metric registry can store transient - * value and aggregate historical data for flushing. - */ - public abstract void reset(); } diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/MetricConfig.java b/java/runtime/src/main/java/io/ray/runtime/metric/MetricConfig.java new file mode 100644 index 000000000..85bb9dc11 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/metric/MetricConfig.java @@ -0,0 +1,79 @@ +package io.ray.runtime.metric; + +import com.google.common.base.MoreObjects; + +/** + * Configurations of the metric. + */ +public class MetricConfig { + + private static final long DEFAULT_TIME_INTERVAL_MS = 5000L; + private static final int DEFAULT_THREAD_POLL_SIZE = 1; + private static final long DEFAULT_SHUTDOWN_WAIT_TIME_MS = 3000L; + + public static final MetricConfig DEFAULT_CONFIG = + new MetricConfig(DEFAULT_TIME_INTERVAL_MS, DEFAULT_THREAD_POLL_SIZE, + DEFAULT_SHUTDOWN_WAIT_TIME_MS); + + private final long timeIntervalMs; + private final int threadPoolSize; + private final long shutdownWaitTimeMs; + + public MetricConfig(long timeIntervalMs, int threadPoolSize, long shutdownWaitTimeMs) { + this.timeIntervalMs = timeIntervalMs; + this.threadPoolSize = threadPoolSize; + this.shutdownWaitTimeMs = shutdownWaitTimeMs; + } + + public long timeIntervalMs() { + return timeIntervalMs; + } + + public int threadPoolSize() { + return threadPoolSize; + } + + public long shutdownWaitTimeMs() { + return shutdownWaitTimeMs; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("timeIntervalMs", timeIntervalMs) + .add("threadPoolSize", threadPoolSize) + .add("shutdownWaitTimeMs", shutdownWaitTimeMs) + .toString(); + } + + public static MetricConfigBuilder builder() { + return new MetricConfigBuilder(); + } + + public static class MetricConfigBuilder { + private long timeIntervalMs = DEFAULT_TIME_INTERVAL_MS; + private int threadPooSize = DEFAULT_THREAD_POLL_SIZE; + private long shutdownWaitTimeMs = DEFAULT_SHUTDOWN_WAIT_TIME_MS; + + public MetricConfig create() { + return new MetricConfig(timeIntervalMs, threadPooSize, shutdownWaitTimeMs); + } + + public MetricConfigBuilder timeIntervalMs(long timeIntervalMs) { + this.timeIntervalMs = timeIntervalMs; + return this; + } + + public MetricConfigBuilder threadPoolSize(int threadPooSize) { + this.threadPooSize = threadPooSize; + return this; + } + + public MetricConfigBuilder shutdownWaitTimeMs(long shutdownWaitTimeMs) { + this.shutdownWaitTimeMs = shutdownWaitTimeMs; + return this; + } + } + + +} \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/MetricId.java b/java/runtime/src/main/java/io/ray/runtime/metric/MetricId.java new file mode 100644 index 000000000..07e27cc0b --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/metric/MetricId.java @@ -0,0 +1,64 @@ +package io.ray.runtime.metric; + +import com.google.common.base.MoreObjects; +import java.util.Map; +import java.util.Objects; + +/** + * MetricId represents a metric with a given type, name and tags. + * If two metrics have the same type and name but different tags(including key and value), they have + * a different MetricId. And in this way, {@link MetricRegistry} can register two metrics with same + * name but different tags. + */ +public class MetricId { + + private final MetricType type; + private final String name; + private final Map tags; + + public MetricId(MetricType type, String name, Map tags) { + this.type = type; + this.name = name; + this.tags = tags; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MetricId)) { + return false; + } + MetricId metricId = (MetricId) o; + return type == metricId.type && + Objects.equals(name, metricId.name) && + Objects.equals(tags, metricId.tags); + } + + @Override + public int hashCode() { + return Objects.hash(type, name, tags); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("type", type) + .add("name", name) + .add("tags", tags) + .toString(); + } + + public MetricType getType() { + return type; + } + + public String getName() { + return name; + } + + public Map getTags() { + return tags; + } +} \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/MetricRegistry.java b/java/runtime/src/main/java/io/ray/runtime/metric/MetricRegistry.java new file mode 100644 index 000000000..88c887599 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/metric/MetricRegistry.java @@ -0,0 +1,135 @@ +package io.ray.runtime.metric; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MetricRegistry is a registry for metrics to be registered and updates metrics. + */ +public class MetricRegistry { + + public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); + + private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); + + private final Map registeredMetrics = new HashMap<>(64); + + private MetricConfig metricConfig; + private ScheduledExecutorService scheduledExecutorService; + private volatile boolean isRunning = false; + + public void startup() { + startup(MetricConfig.DEFAULT_CONFIG); + } + + public void startup(MetricConfig metricConfig) { + synchronized (this) { + if (!isRunning) { + this.metricConfig = metricConfig; + scheduledExecutorService = new ScheduledThreadPoolExecutor(metricConfig.threadPoolSize(), + new ThreadFactoryBuilder().setNameFormat("metric-registry-%d").build()); + scheduledExecutorService.scheduleAtFixedRate(this::update, metricConfig.timeIntervalMs(), + metricConfig.timeIntervalMs(), TimeUnit.MILLISECONDS); + isRunning = true; + LOG.info("Finished startup metric registry, metricConfig is {}.", metricConfig); + } + } + } + + public void shutdown() { + synchronized (this) { + if (isRunning && scheduledExecutorService != null) { + try { + scheduledExecutorService.shutdownNow(); + if (!scheduledExecutorService + .awaitTermination(metricConfig.shutdownWaitTimeMs(), TimeUnit.MILLISECONDS)) { + LOG.warn("Metric registry did not shut down in {}ms time, so try to shut down again.", + metricConfig.shutdownWaitTimeMs()); + scheduledExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted when shutting down metric registry, so try to shut down again.", + e.getMessage(), e); + scheduledExecutorService.shutdownNow(); + } + if (scheduledExecutorService.isShutdown()) { + isRunning = false; + scheduledExecutorService = null; + LOG.info("Metric registry has been shut down."); + } else { + LOG.warn("Failed to shut down metric registry service."); + } + } + } + } + + public Metric register(Metric metric) { + synchronized (this) { + if (!isRunning) { + LOG.warn("Failed to register a metric, because the metric registry is not running."); + return null; + } + try { + MetricId id = genMetricIdByMetric(metric); + Metric ori = registeredMetrics.putIfAbsent(id, metric); + if (ori == null) { + return metric; + } else { + LOG.info("Metric {} has already registered, so use the previous one.", id); + return ori; + } + } catch (Exception e) { + LOG.warn("Failed to register a metric: ", e.getMessage(), e); + return null; + } + } + } + + public void unregister(Metric metric) { + synchronized (this) { + if (!isRunning) { + LOG.warn("Failed to unregister a metric, because the metric registry is not running."); + } + try { + MetricId id = genMetricIdByMetric(metric); + registeredMetrics.remove(id); + } catch (Exception e) { + LOG.warn("Failed to unregister a metric: ", e.getMessage(), e); + } + } + } + + private void update() { + registeredMetrics.forEach((id, metric) -> { + metric.record(); + }); + } + + private MetricType getMetricType(Metric metric) { + if (metric instanceof Count) { + return MetricType.COUNT; + } + if (metric instanceof Gauge) { + return MetricType.GAUGE; + } + if (metric instanceof Sum) { + return MetricType.SUM; + } + if (metric instanceof Histogram) { + return MetricType.HISTOGRAM; + } + throw new RuntimeException( + "Unknown metric type, the metric is " + metric.getClass().getSimpleName()); + } + + private MetricId genMetricIdByMetric(Metric metric) { + return new MetricId(getMetricType(metric), metric.name, metric.tags); + } + +} \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/MetricType.java b/java/runtime/src/main/java/io/ray/runtime/metric/MetricType.java new file mode 100644 index 000000000..0f9c772d2 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/metric/MetricType.java @@ -0,0 +1,16 @@ +package io.ray.runtime.metric; + +/** + * Types of the metric. + */ +public enum MetricType { + + COUNT, + + GAUGE, + + SUM, + + HISTOGRAM; + +} \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Metrics.java b/java/runtime/src/main/java/io/ray/runtime/metric/Metrics.java new file mode 100644 index 000000000..eda3b3760 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Metrics.java @@ -0,0 +1,148 @@ +package io.ray.runtime.metric; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The entry of metrics for easy use. + */ +public final class Metrics { + + private static MetricRegistry metricRegistry; + + public static MetricRegistry init(MetricConfig metricConfig) { + synchronized (Metrics.class) { + metricRegistry = new MetricRegistry(); + metricRegistry.startup(metricConfig); + return metricRegistry; + } + } + + public static void shutdown() { + synchronized (Metrics.class) { + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + } + } + + public static CountBuilder count() { + return new CountBuilder(); + } + + public static GaugeBuilder gauge() { + return new GaugeBuilder(); + } + + public static SumBuilder sum() { + return new SumBuilder(); + } + + public static HistogramBuilder histogram() { + return new HistogramBuilder(); + } + + public static class CountBuilder extends AbstractBuilder { + + @Override + protected Count create() { + return new Count(name, description, unit, generateTagKeysMap(tags)); + } + } + + public static class GaugeBuilder extends AbstractBuilder { + + @Override + protected Gauge create() { + return new Gauge(name, description, unit, generateTagKeysMap(tags)); + } + } + + public static class SumBuilder extends AbstractBuilder { + + @Override + protected Sum create() { + return new Sum(name, description, unit, generateTagKeysMap(tags)); + } + } + + public static class HistogramBuilder extends AbstractBuilder { + + private List boundaries; + + public HistogramBuilder boundaries(List boundaries) { + this.boundaries = boundaries; + return this; + } + + @Override + protected Histogram create() { + return new Histogram(name, description, unit, boundaries, generateTagKeysMap(tags)); + } + } + + public abstract static class AbstractBuilder { + protected String name; + protected String description; + protected String unit; + protected Map tags; + + public B name(String name) { + this.name = Preconditions.checkNotNull(name); + return (B) this; + } + + public B description(String description) { + this.description = Preconditions.checkNotNull(description); + return (B) this; + } + + public B unit(String unit) { + this.unit = Preconditions.checkNotNull(unit); + return (B) this; + } + + public B tags(Map tags) { + this.tags = Preconditions.checkNotNull(tags); + return (B) this; + } + + /** + * Creates a metric by sub-class. + * + * @return a metric + */ + protected abstract M create(); + + public M register() { + M m = create(); + maybeInitRegistry(); + return (M) metricRegistry.register(m); + } + } + + private static Map generateTagKeysMap(Map tags) { + Map tagKeys = new HashMap<>(tags.size() * 2); + tags.forEach((key, value) -> { + TagKey tagKey = new TagKey(key); + tagKeys.put(tagKey, value); + }); + return tagKeys; + } + + private static MetricRegistry maybeInitRegistry() { + synchronized (Metrics.class) { + if (metricRegistry != null) { + return metricRegistry; + } else { + metricRegistry = MetricRegistry.DEFAULT_REGISTRY; + metricRegistry.startup(); + return metricRegistry; + } + } + } + +} \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/metric/Sum.java b/java/runtime/src/main/java/io/ray/runtime/metric/Sum.java index d55116388..e42425011 100644 --- a/java/runtime/src/main/java/io/ray/runtime/metric/Sum.java +++ b/java/runtime/src/main/java/io/ray/runtime/metric/Sum.java @@ -1,9 +1,9 @@ package io.ray.runtime.metric; - import com.google.common.base.Preconditions; import java.util.Map; +import java.util.concurrent.atomic.DoubleAdder; import java.util.stream.Collectors; /** @@ -11,34 +11,29 @@ import java.util.stream.Collectors; * Property sum is used for storing transient sum for registry aggregation. */ public class Sum extends Metric { - private double sum; + + private DoubleAdder sum; public Sum(String name, String description, String unit, Map tags) { super(name, tags); metricNativePointer = NativeMetric.registerSumNative(name, description, unit, tags.keySet().stream().map(TagKey::getTagKey).collect(Collectors.toList())); - Preconditions.checkState(metricNativePointer != 0,"Count native pointer must not be 0."); - this.sum = 0.0d; + Preconditions.checkState(metricNativePointer != 0, "Count native pointer must not be 0."); + this.sum = new DoubleAdder(); } @Override public void update(double value) { - super.update(value); - sum += value; + sum.add(value); + this.value.addAndGet(value); } @Override - public void update(double value, Map tags) { - super.update(value, tags); - sum += value; - } - - @Override - public void reset() { - + protected double getAndReset() { + return sum.sumThenReset(); } public double getSum() { - return sum; + return value.get(); } } diff --git a/java/test/src/main/java/io/ray/test/MetricTest.java b/java/test/src/main/java/io/ray/test/MetricTest.java index ec3a46ef6..a439d2102 100644 --- a/java/test/src/main/java/io/ray/test/MetricTest.java +++ b/java/test/src/main/java/io/ray/test/MetricTest.java @@ -1,17 +1,20 @@ package io.ray.test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.ray.runtime.metric.Count; import io.ray.runtime.metric.Gauge; import io.ray.runtime.metric.Histogram; +import io.ray.runtime.metric.MetricConfig; +import io.ray.runtime.metric.Metrics; import io.ray.runtime.metric.Sum; import io.ray.runtime.metric.TagKey; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; public class MetricTest extends BaseTest { @@ -20,6 +23,60 @@ public class MetricTest extends BaseTest { return value <= other + 1e-5 && value >= other - 1e-5; } + private MetricConfig initRayMetrics(long timeIntervalMs, + int threadPoolSize, + long shutdownWaitTimeMs) { + MetricConfig config = MetricConfig.builder() + .timeIntervalMs(timeIntervalMs) + .threadPoolSize(threadPoolSize) + .shutdownWaitTimeMs(shutdownWaitTimeMs) + .create(); + Metrics.init(config); + return config; + } + + private Gauge registerGauge() { + return Metrics.gauge() + .name("metric_gauge") + .description("gauge") + .unit("") + .tags(ImmutableMap.of("tag1", "value1")) + .register(); + } + + private Count registerCount() { + return Metrics.count() + .name("metric_count") + .description("counter") + .unit("1pc") + .tags(ImmutableMap.of("tag1", "value1", "count_tag", "default")) + .register(); + } + + private Sum registerSum() { + return Metrics.sum() + .name("metric_sum") + .description("sum") + .unit("1pc") + .tags(ImmutableMap.of("tag1", "value1", "sum_tag", "default")) + .register(); + } + + private Histogram registerHistogram() { + return Metrics.histogram() + .name("metric_histogram") + .description("histogram") + .unit("1pc") + .boundaries(ImmutableList.of(10.0, 15.0, 20.0)) + .tags(ImmutableMap.of("tag1", "value1", "histogram_tag", "default")) + .register(); + } + + @AfterMethod + public void maybeShutdownMetrics() { + Metrics.shutdown(); + } + @Test public void testAddGauge() { TestUtils.skipTestUnderSingleProcess(); @@ -44,7 +101,6 @@ public class MetricTest extends BaseTest { count.inc(10.0); count.inc(20.0); count.record(); - Assert.assertTrue(doubleEqual(count.getValue(), 20.0)); Assert.assertTrue(doubleEqual(count.getCount(), 30.0)); } @@ -59,7 +115,6 @@ public class MetricTest extends BaseTest { sum.update(10.0); sum.update(20.0); sum.record(); - Assert.assertTrue(doubleEqual(sum.getValue(), 20.0)); Assert.assertTrue(doubleEqual(sum.getSum(), 30.0)); } @@ -74,14 +129,128 @@ public class MetricTest extends BaseTest { boundaries.add(15.0); boundaries.add(12.0); Histogram histogram = new Histogram("metric_histogram", "histogram", "1pc", - boundaries, tags); + boundaries, tags); for (int i = 1; i <= 200; ++i) { histogram.update(i * 1.0d); - histogram.record(); } + Assert.assertTrue(doubleEqual(200.0d, histogram.getValue())); List window = histogram.getHistogramWindow(); for (int i = 0; i < Histogram.HISTOGRAM_WINDOW_SIZE; ++i) { Assert.assertTrue(doubleEqual(i + 101.0d, window.get(i))); } + histogram.record(); + Assert.assertTrue(doubleEqual(200.0d, histogram.getValue())); + Assert.assertEquals(window.size(), 0); } + + @Test + public void testRegisterGauge() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + Gauge gauge = registerGauge(); + + gauge.update(2.0); + Assert.assertTrue(doubleEqual(gauge.getValue(), 2.0)); + gauge.update(5.0); + Assert.assertTrue(doubleEqual(gauge.getValue(), 5.0)); + } + + @Test + public void testRegisterCount() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + Count count = registerCount(); + + count.inc(10.0); + count.inc(20.0); + Assert.assertTrue(doubleEqual(count.getCount(), 30.0)); + count.inc(1.0); + count.inc(2.0); + Assert.assertTrue(doubleEqual(count.getCount(), 33.0)); + } + + @Test + public void testRegisterSum() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + Sum sum = registerSum(); + + sum.update(10.0); + sum.update(20.0); + Assert.assertTrue(doubleEqual(sum.getSum(), 30.0)); + sum.update(1.0); + sum.update(2.0); + Assert.assertTrue(doubleEqual(sum.getSum(), 33.0)); + } + + @Test + public void testRegisterHistogram() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + Histogram histogram = registerHistogram(); + + for (int i = 1; i <= 200; ++i) { + histogram.update(i * 1.0d); + } + Assert.assertTrue(doubleEqual(histogram.getValue(), 200.0d)); + List window = histogram.getHistogramWindow(); + for (int i = 0; i < Histogram.HISTOGRAM_WINDOW_SIZE; ++i) { + Assert.assertTrue(doubleEqual(i + 101.0d, window.get(i))); + } + Assert.assertTrue(doubleEqual(histogram.getValue(), 200.0d)); + } + + @Test + public void testRegisterGaugeWithConfig() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + initRayMetrics(2000L, 1, 1000L); + Gauge gauge = registerGauge(); + + gauge.update(2.0); + Assert.assertTrue(doubleEqual(gauge.getValue(), 2.0)); + gauge.update(5.0); + Assert.assertTrue(doubleEqual(gauge.getValue(), 5.0)); + } + + @Test + public void testRegisterCountWithConfig() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + initRayMetrics(2000L, 1, 1000L); + Count count = registerCount(); + + count.inc(10.0); + count.inc(20.0); + Assert.assertTrue(doubleEqual(count.getCount(), 30.0)); + count.inc(1.0); + count.inc(2.0); + Assert.assertTrue(doubleEqual(count.getCount(), 33.0)); + } + + @Test + public void testRegisterSumWithConfig() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + initRayMetrics(2000L, 1, 1000L); + Sum sum = registerSum(); + + sum.update(10.0); + sum.update(20.0); + Assert.assertTrue(doubleEqual(sum.getSum(), 30.0)); + sum.update(1.0); + sum.update(2.0); + Assert.assertTrue(doubleEqual(sum.getSum(), 33.0)); + } + + @Test + public void testRegisterHistogramWithConfig() throws InterruptedException { + TestUtils.skipTestUnderSingleProcess(); + initRayMetrics(2000L, 1, 1000L); + Histogram histogram = registerHistogram(); + + for (int i = 1; i <= 200; ++i) { + histogram.update(i * 1.0d); + } + Assert.assertTrue(doubleEqual(histogram.getValue(), 200.0d)); + List window = histogram.getHistogramWindow(); + for (int i = 0; i < Histogram.HISTOGRAM_WINDOW_SIZE; ++i) { + Assert.assertTrue(doubleEqual(i + 101.0d, window.get(i))); + } + Assert.assertTrue(doubleEqual(histogram.getValue(), 200.0d)); + } + }