[Stats] Fix harvestor threads + Fix flaky stats shutdown. (#9745)

This commit is contained in:
SangBin Cho
2020-07-29 16:57:59 -07:00
committed by GitHub
parent 07022f3f11
commit 826f14c824
4 changed files with 170 additions and 24 deletions
+2 -2
View File
@@ -188,11 +188,11 @@ def test_tempdir_privilege(shutdown_only):
def test_session_dir_uniqueness():
session_dirs = set()
for _ in range(3):
for i in range(2):
ray.init(num_cpus=1)
session_dirs.add(ray.worker._global_node.get_session_dir_path)
ray.shutdown()
assert len(session_dirs) == 3
assert len(session_dirs) == 2
if __name__ == "__main__":
+1
View File
@@ -109,6 +109,7 @@ static inline void Shutdown() {
return;
}
metrics_io_service_pool->Stop();
opencensus::stats::DeltaProducer::Get()->Shutdown();
opencensus::stats::StatsExporter::Shutdown();
metrics_io_service_pool = nullptr;
exporter = nullptr;
+56
View File
@@ -119,6 +119,62 @@ TEST_F(StatsTest, InitializationTest) {
ASSERT_TRUE(new_first_tag.second == test_tag_value_that_shouldnt_be_applied);
}
TEST_F(StatsTest, MultiThreadedInitializationTest) {
// Make sure stats module is thread-safe.
// Shutdown the stats module first.
ray::stats::Shutdown();
// Spawn 10 threads that init and shutdown again and again.
// The test will have memory corruption if it doesn't work as expected.
const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"},
{stats::WorkerPidKey, "1000"}};
std::vector<std::thread> threads;
for (int i = 0; i < 5; i++) {
threads.emplace_back([global_tags]() {
for (int i = 0; i < 5; i++) {
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
unsigned int upper_bound = 100;
unsigned int init_or_shutdown = (rand() % upper_bound);
if (init_or_shutdown >= (upper_bound / 2)) {
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
} else {
ray::stats::Shutdown();
}
}
});
}
for (auto &thread : threads) {
thread.join();
}
ray::stats::Shutdown();
ASSERT_FALSE(ray::stats::StatsConfig::instance().IsInitialized());
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
ASSERT_TRUE(ray::stats::StatsConfig::instance().IsInitialized());
}
TEST_F(StatsTest, TestShutdownTakesLongTime) {
// Make sure it doesn't take long time to shutdown when harvestor / export interval is
// large.
ray::stats::Shutdown();
// Spawn 10 threads that init and shutdown again and again.
// The test will have memory corruption if it doesn't work as expected.
const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"},
{stats::WorkerPidKey, "1000"}};
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
// Flush interval is 30 seconds. Shutdown should not take 30 seconds in this case.
uint32_t kReportFlushInterval = 30000;
absl::Duration report_interval = absl::Milliseconds(kReportFlushInterval);
absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval);
ray::stats::StatsConfig::instance().SetReportInterval(report_interval);
ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval);
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
ray::stats::Shutdown();
}
} // namespace ray
int main(int argc, char **argv) {
+111 -22
View File
@@ -1,7 +1,91 @@
diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/internal/stats_exporter.cc
diff --git opencensus/stats/internal/delta_producer.cc opencensus/stats/internal/delta_producer.cc
index c61b4d9..b3e4ef2 100644
--- opencensus/stats/internal/delta_producer.cc
+++ opencensus/stats/internal/delta_producer.cc
@@ -75,6 +75,20 @@ DeltaProducer* DeltaProducer::Get() {
return global_delta_producer;
}
+void DeltaProducer::Shutdown() {
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ }
+ thread_started_ = false;
+ }
+ // Join loop thread when shutdown.
+ if (harvester_thread_.joinable()) {
+ harvester_thread_.join();
+ }
+}
+
void DeltaProducer::AddMeasure() {
delta_mu_.Lock();
absl::MutexLock harvester_lock(&harvester_mu_);
@@ -115,7 +129,10 @@ void DeltaProducer::Flush() {
}
DeltaProducer::DeltaProducer()
- : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {}
+ : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {
+ absl::MutexLock l(&mu_);
+ thread_started_ = true;
+}
void DeltaProducer::SwapDeltas() {
ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed.");
@@ -131,11 +148,19 @@ void DeltaProducer::RunHarvesterLoop() {
absl::Time next_harvest_time = absl::Now() + harvest_interval_;
while (true) {
const absl::Time now = absl::Now();
- absl::SleepFor(next_harvest_time - now);
+ absl::SleepFor(absl::Seconds(0.1));
// Account for the possibility that the last harvest took longer than
// harvest_interval_ and we are already past next_harvest_time.
- next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
- Flush();
+ if (absl::Now() > next_harvest_time) {
+ next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
+ Flush();
+ }
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ break;
+ }
+ }
}
}
diff --git opencensus/stats/internal/delta_producer.h opencensus/stats/internal/delta_producer.h
index 2cff522..c8e2e95 100644
--- opencensus/stats/internal/delta_producer.h
+++ opencensus/stats/internal/delta_producer.h
@@ -71,6 +71,8 @@ class DeltaProducer final {
// Returns a pointer to the singleton DeltaProducer.
static DeltaProducer* Get();
+ void Shutdown();
+
// Adds a new Measure.
void AddMeasure();
@@ -122,6 +124,9 @@ class DeltaProducer final {
// thread when calling a flush during harvesting.
Delta last_delta_ GUARDED_BY(harvester_mu_);
std::thread harvester_thread_ GUARDED_BY(harvester_mu_);
+
+ mutable absl::Mutex mu_;
+ bool thread_started_ GUARDED_BY(mu_) = false;
};
} // namespace stats
diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal/stats_exporter.cc
index 43ddbc7..37b4ae1 100644
--- opencensus/stats/internal/stats_exporter.cc
+++ opencensus/stats/internal/stats_exporter.cc
@@ -95,25 +95,51 @@ void StatsExporterImpl::ClearHandlersForTesting() {
@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() {
}
void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
@@ -11,11 +95,13 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern
+}
+
+void StatsExporterImpl::Shutdown() {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ }
+ thread_started_ = false;
+ }
+ thread_started_ = false;
+ // Join loop thread when shutdown.
+ if (t_.joinable()) {
+ t_.join();
@@ -24,22 +110,23 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern
void StatsExporterImpl::RunWorkerLoop() {
absl::Time next_export_time = GetNextExportTime();
- while (true) {
+ bool thread_started = false;
+ {
+ absl::MutexLock l(&mu_);
+ bool thread_started = thread_started_;
+ }
+ while (thread_started) {
while (true) {
// SleepFor() returns immediately when given a negative duration.
absl::SleepFor(next_export_time - absl::Now());
- absl::SleepFor(next_export_time - absl::Now());
+ absl::SleepFor(absl::Seconds(0.1));
// In case the last export took longer than the export interval, we
// calculate the next time from now.
next_export_time = GetNextExportTime();
Export();
- next_export_time = GetNextExportTime();
- Export();
+ if (absl::Now() > next_export_time) {
+ next_export_time = GetNextExportTime();
+ Export();
+ }
+ {
+ absl::MutexLock l(&mu_);
+ thread_started = thread_started_;
+ if (!thread_started_) {
+ break;
+ }
+ }
}
}
@@ -55,18 +142,20 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern
// static
void StatsExporter::SetInterval(absl::Duration interval) {
StatsExporterImpl::Get()->SetInterval(interval);
diff --git opencensus/stats/internal/stats_exporter_impl.h b/opencensus/stats/internal/stats_exporter_impl.h
diff --git opencensus/stats/internal/stats_exporter_impl.h opencensus/stats/internal/stats_exporter_impl.h
index 11ae3c4..ebe9c4d 100644
--- opencensus/stats/internal/stats_exporter_impl.h
+++ opencensus/stats/internal/stats_exporter_impl.h
@@ -35,6 +35,7 @@ class StatsExporterImpl {
@@ -34,6 +34,7 @@ class StatsExporterImpl {
public:
static StatsExporterImpl* Get();
void SetInterval(absl::Duration interval);
absl::Time GetNextExportTime() const;
+ void Shutdown();
absl::Time GetNextExportTime() const;
void AddView(const ViewDescriptor& view);
void RemoveView(absl::string_view name);
diff --git opencensus/stats/stats_exporter.h b/opencensus/stats/stats_exporter.h
diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h
index 6756858..65e0262 100644
--- opencensus/stats/stats_exporter.h
+++ opencensus/stats/stats_exporter.h
@@ -45,6 +45,8 @@ class StatsExporter final {