From ce3f5427394af288f9699bed3d479c47181b252f Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sun, 19 Jul 2020 10:43:21 +0800 Subject: [PATCH] [Metric] new cython interface for python worker metric (#9469) --- BUILD.bazel | 1 + python/ray/_raylet.pyx | 1 + python/ray/experimental/metrics.py | 38 +++++++ python/ray/includes/metric.pxd | 45 ++++++++ python/ray/includes/metric.pxi | 170 +++++++++++++++++++++++++++++ src/ray/stats/metric.cc | 11 ++ src/ray/stats/metric.h | 8 +- 7 files changed, 273 insertions(+), 1 deletion(-) create mode 100644 python/ray/experimental/metrics.py create mode 100644 python/ray/includes/metric.pxd create mode 100644 python/ray/includes/metric.pxi diff --git a/BUILD.bazel b/BUILD.bazel index 21076fc2c..1449e2478 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1778,6 +1778,7 @@ pyx_library( "//:serialization_cc_proto", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", + "//:stats_lib", ], ) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index de2cff63e..c70e76c54 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -112,6 +112,7 @@ include "includes/common.pxi" include "includes/serialization.pxi" include "includes/libcoreworker.pxi" include "includes/global_state_accessor.pxi" +include "includes/metric.pxi" # Expose GCC & Clang macro to report # whether C++ optimizations were enabled during compilation. diff --git a/python/ray/experimental/metrics.py b/python/ray/experimental/metrics.py new file mode 100644 index 000000000..db7079828 --- /dev/null +++ b/python/ray/experimental/metrics.py @@ -0,0 +1,38 @@ +from ray._raylet import ( + Count, + Histogram, + Gauge, + Sum, +) # noqa: E402 +"""Metric/Stats module for worker. + +This module is responsible for providing four classes mapping from stats of +cpp. + +How to use: + For Count, Gauge and Sum, we may define a metric like this following: + gauge = Gauge( + 'ray.worker.metric', + 'description', + 'unit', + ['tagk1', 'tagk2']). + The last parameter is default tag map. You can use gauge.record(1.0) with + default tags or gauge.record(1.0, {'tagk1', 'tagv1'}) that means the tagk1 + is updating in tagv1. + + It's addtional boundaries to Histogram measurement, + histogram = Histogram( + 'ray.worker.histogram1', + 'a', 'b', [1.0, 2.0], + ['tagk1']) + + Recommended metric name pattern : ray.{component_name}.{module_name}, and + name format must be in [0-9a-zA-Z]. +""" + +__all__ = [ + "Count", + "Histogram", + "Gauge", + "Sum", +] diff --git a/python/ray/includes/metric.pxd b/python/ray/includes/metric.pxd new file mode 100644 index 000000000..018c85231 --- /dev/null +++ b/python/ray/includes/metric.pxd @@ -0,0 +1,45 @@ +from libcpp.string cimport string as c_string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector as c_vector + +cdef extern from "opencensus/tags/tag_key.h" nogil: + cdef cppclass CTagKey "opencensus::tags::TagKey": + @staticmethod + CTagKey Register(c_string &name) + const c_string &name() const + +cdef extern from "ray/stats/metric.h" nogil: + cdef cppclass CMetric "ray::stats::Metric": + CMetric(const c_string &name, + const c_string &description, + const c_string &unit, + const c_vector[CTagKey] &tag_keys) + c_string GetName() const + void Record(double value) + void Record(double value, + unordered_map[c_string, c_string] &tags) + + cdef cppclass CGauge "ray::stats::Gauge": + CGauge(const c_string &name, + const c_string &description, + const c_string &unit, + const c_vector[CTagKey] &tag_keys) + + cdef cppclass CCount "ray::stats::Count": + CCount(const c_string &name, + const c_string &description, + const c_string &unit, + const c_vector[CTagKey] &tag_keys) + + cdef cppclass CSum "ray::stats::Sum": + CSum(const c_string &name, + const c_string &description, + const c_string &unit, + const c_vector[CTagKey] &tag_keys) + + cdef cppclass CHistogram "ray::stats::Histogram": + CHistogram(const c_string &name, + const c_string &description, + const c_string &unit, + const c_vector[double] &boundaries, + const c_vector[CTagKey] &tag_keys) \ No newline at end of file diff --git a/python/ray/includes/metric.pxi b/python/ray/includes/metric.pxi new file mode 100644 index 000000000..f7e5cd6b1 --- /dev/null +++ b/python/ray/includes/metric.pxi @@ -0,0 +1,170 @@ +from ray.includes.metric cimport ( + CCount, + CGauge, + CHistogram, + CTagKey, + CSum, + CMetric, +) +from libcpp.memory cimport unique_ptr +from libcpp.string cimport string as c_string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector as c_vector + +cdef class TagKey: + """Cython wrapper class of C++ `opencensus::stats::TagKey`.""" + cdef c_string name + + def __init__(self, name): + self.name = name.encode("ascii") + CTagKey.Register(self.name) + + def name(self): + return self.name + + +cdef class Metric: + """Cython wrapper class of C++ `ray::stats::Metric`. + + It's an abstract class of all metric types. + """ + cdef: + unique_ptr[CMetric] metric + c_vector[CTagKey] c_tag_keys + + def __init__(self, tag_keys): + for tag_key in tag_keys: + self.c_tag_keys.push_back(CTagKey.Register(tag_key.encode("ascii"))) + + def record(self, value, tags=None): + """Record a measurement of metric. + + Flush a metric raw point to stats module with a key-value dict tags. + Args: + value (double): metric name. + tags (dict): default none. + """ + cdef unordered_map[c_string, c_string] c_tags + cdef double c_value + # Default tags will be exported if it's empty map. + if tags: + for tag_k, tag_v in tags.items(): + c_tags[tag_v.encode("ascii")] = tag_v.encode("ascii") + c_value = value + with nogil: + self.metric.get().Record(c_value, c_tags) + + def get_name(self): + return self.metric.get().GetName() + + +cdef class Gauge(Metric): + """Cython wrapper class of C++ `ray::stats::Gauge`. + + Gauge: Keeps the last recorded value, drops everything before. + """ + def __init__(self, name, description, unit, tag_keys): + """Create a gauge metric + + Args: + name (string): metric name. + description (string): description of this metric. + unit (string): measure unit of this metric. + tag_keys (list): a list of tay keys in string format. + """ + super().__init__(tag_keys) + + self.metric.reset( + new CGauge( + name.encode("ascii"), + description.encode("ascii"), + unit.encode("ascii"), + self.c_tag_keys + ) + ) + + +cdef class Count(Metric): + """Cython wrapper class of C++ `ray::stats::Count`. + + Count: The count of the number of metric points. + """ + def __init__(self, name, description, unit, tag_keys): + """Create a count metric + + Args: + name (string): metric name. + description (string): description of this metric. + unit (string): measure unit of this metric. + tag_keys (list): a list of tay keys in string format. + """ + super().__init__(tag_keys) + + self.metric.reset( + new CCount( + name.encode("ascii"), + description.encode("ascii"), + unit.encode("ascii"), + self.c_tag_keys + ) + ) + + +cdef class Sum(Metric): + """Cython wrapper class of C++ `ray::stats::Sum`. + + Sum: A sum up of the metric points. + """ + def __init__(self, name, description, unit, tag_keys): + """Create a sum metric + + Args: + name (string): metric name. + description (string): description of this metric. + unit (string): measure unit of this metric. + tag_keys (list): a list of tay keys in string format. + """ + + super().__init__(tag_keys) + + self.metric.reset( + new CSum( + name.encode("ascii"), + description.encode("ascii"), + unit.encode("ascii"), + self.c_tag_keys + ) + ) + + +cdef class Histogram(Metric): + """Cython wrapper class of C++ `ray::stats::Histogram`. + + Histogram: Histogram distribution of metric points. + """ + def __init__(self, name, description, unit, boundaries, tag_keys): + """Create a sum metric + + Args: + name (string): metric name. + description (string): description of this metric. + unit (string): measure unit of this metric. + boundaries (list): a double type list boundaries of histogram. + tag_keys (list): a list of tay key in string format. + """ + + super().__init__(tag_keys) + + cdef c_vector[double] c_boundaries + for value in boundaries: + c_boundaries.push_back(value) + + self.metric.reset( + new CHistogram( + name.encode("ascii"), + description.encode("ascii"), + unit.encode("ascii"), + c_boundaries, + self.c_tag_keys + ) + ) diff --git a/src/ray/stats/metric.cc b/src/ray/stats/metric.cc index a5c2bd466..53fd22ed6 100644 --- a/src/ray/stats/metric.cc +++ b/src/ray/stats/metric.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/stats/metric.h" + #include "opencensus/stats/internal/aggregation_window.h" #include "opencensus/stats/internal/set_aggregation_window.h" @@ -85,6 +86,16 @@ void Metric::Record(double value, const TagsType &tags) { opencensus::stats::Record({{*measure_, value}}, combined_tags); } +void Metric::Record(double value, std::unordered_map &tags) { + TagsType tags_pair_vec; + std::for_each( + tags.begin(), tags.end(), + [&tags_pair_vec](std::pair tag) { + return tags_pair_vec.push_back({TagKeyType::Register(tag.first), tag.second}); + }); + Record(value, tags_pair_vec); +} + void Gauge::RegisterView() { opencensus::stats::ViewDescriptor view_descriptor = opencensus::stats::ViewDescriptor() diff --git a/src/ray/stats/metric.h b/src/ray/stats/metric.h index 20520f271..751bfd9b6 100644 --- a/src/ray/stats/metric.h +++ b/src/ray/stats/metric.h @@ -92,7 +92,7 @@ class Metric { std::string GetName() const { return name_; } /// Record the value for this metric. - void Record(double value) { Record(value, {}); } + void Record(double value) { Record(value, TagsType{}); } /// Record the value for this metric. /// @@ -100,6 +100,12 @@ class Metric { /// \param tags The tag values that we want to record for this metric record. void Record(double value, const TagsType &tags); + /// Record the value for this metric. + /// + /// \param value The value that we record. + /// \param tags The map tag values that we want to record for this metric record. + void Record(double value, std::unordered_map &tags); + protected: virtual void RegisterView() = 0;