diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 9215d74eb..0a8000490 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -188,11 +188,11 @@ def test_tempdir_privilege(shutdown_only): def test_session_dir_uniqueness(): session_dirs = set() - for _ in range(3): + for i in range(2): ray.init(num_cpus=1) session_dirs.add(ray.worker._global_node.get_session_dir_path) ray.shutdown() - assert len(session_dirs) == 3 + assert len(session_dirs) == 2 if __name__ == "__main__": diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index 807271afe..4f001b7aa 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -109,6 +109,7 @@ static inline void Shutdown() { return; } metrics_io_service_pool->Stop(); + opencensus::stats::DeltaProducer::Get()->Shutdown(); opencensus::stats::StatsExporter::Shutdown(); metrics_io_service_pool = nullptr; exporter = nullptr; diff --git a/src/ray/stats/stats_test.cc b/src/ray/stats/stats_test.cc index 327d681af..47ed0d68a 100644 --- a/src/ray/stats/stats_test.cc +++ b/src/ray/stats/stats_test.cc @@ -119,6 +119,62 @@ TEST_F(StatsTest, InitializationTest) { ASSERT_TRUE(new_first_tag.second == test_tag_value_that_shouldnt_be_applied); } +TEST_F(StatsTest, MultiThreadedInitializationTest) { + // Make sure stats module is thread-safe. + // Shutdown the stats module first. + ray::stats::Shutdown(); + // Spawn 10 threads that init and shutdown again and again. + // The test will have memory corruption if it doesn't work as expected. + const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"}, + {stats::WorkerPidKey, "1000"}}; + std::vector threads; + for (int i = 0; i < 5; i++) { + threads.emplace_back([global_tags]() { + for (int i = 0; i < 5; i++) { + std::shared_ptr exporter( + new stats::StdoutExporterClient()); + unsigned int upper_bound = 100; + unsigned int init_or_shutdown = (rand() % upper_bound); + if (init_or_shutdown >= (upper_bound / 2)) { + ray::stats::Init(global_tags, MetricsAgentPort, exporter); + } else { + ray::stats::Shutdown(); + } + } + }); + } + for (auto &thread : threads) { + thread.join(); + } + ray::stats::Shutdown(); + ASSERT_FALSE(ray::stats::StatsConfig::instance().IsInitialized()); + std::shared_ptr exporter( + new stats::StdoutExporterClient()); + ray::stats::Init(global_tags, MetricsAgentPort, exporter); + ASSERT_TRUE(ray::stats::StatsConfig::instance().IsInitialized()); +} + +TEST_F(StatsTest, TestShutdownTakesLongTime) { + // Make sure it doesn't take long time to shutdown when harvestor / export interval is + // large. + ray::stats::Shutdown(); + // Spawn 10 threads that init and shutdown again and again. + // The test will have memory corruption if it doesn't work as expected. + const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"}, + {stats::WorkerPidKey, "1000"}}; + std::shared_ptr exporter( + new stats::StdoutExporterClient()); + + // Flush interval is 30 seconds. Shutdown should not take 30 seconds in this case. + uint32_t kReportFlushInterval = 30000; + absl::Duration report_interval = absl::Milliseconds(kReportFlushInterval); + absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval); + ray::stats::StatsConfig::instance().SetReportInterval(report_interval); + ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval); + ray::stats::Init(global_tags, MetricsAgentPort, exporter); + ray::stats::Shutdown(); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/thirdparty/patches/opencensus-cpp-shutdown-api.patch b/thirdparty/patches/opencensus-cpp-shutdown-api.patch index 62f7777ff..5fa92a206 100644 --- a/thirdparty/patches/opencensus-cpp-shutdown-api.patch +++ b/thirdparty/patches/opencensus-cpp-shutdown-api.patch @@ -1,7 +1,91 @@ -diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/internal/stats_exporter.cc +diff --git opencensus/stats/internal/delta_producer.cc opencensus/stats/internal/delta_producer.cc +index c61b4d9..b3e4ef2 100644 +--- opencensus/stats/internal/delta_producer.cc ++++ opencensus/stats/internal/delta_producer.cc +@@ -75,6 +75,20 @@ DeltaProducer* DeltaProducer::Get() { + return global_delta_producer; + } + ++void DeltaProducer::Shutdown() { ++ { ++ absl::MutexLock l(&mu_); ++ if (!thread_started_) { ++ return; ++ } ++ thread_started_ = false; ++ } ++ // Join loop thread when shutdown. ++ if (harvester_thread_.joinable()) { ++ harvester_thread_.join(); ++ } ++} ++ + void DeltaProducer::AddMeasure() { + delta_mu_.Lock(); + absl::MutexLock harvester_lock(&harvester_mu_); +@@ -115,7 +129,10 @@ void DeltaProducer::Flush() { + } + + DeltaProducer::DeltaProducer() +- : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {} ++ : harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) { ++ absl::MutexLock l(&mu_); ++ thread_started_ = true; ++} + + void DeltaProducer::SwapDeltas() { + ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed."); +@@ -131,11 +148,19 @@ void DeltaProducer::RunHarvesterLoop() { + absl::Time next_harvest_time = absl::Now() + harvest_interval_; + while (true) { + const absl::Time now = absl::Now(); +- absl::SleepFor(next_harvest_time - now); ++ absl::SleepFor(absl::Seconds(0.1)); + // Account for the possibility that the last harvest took longer than + // harvest_interval_ and we are already past next_harvest_time. +- next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_; +- Flush(); ++ if (absl::Now() > next_harvest_time) { ++ next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_; ++ Flush(); ++ } ++ { ++ absl::MutexLock l(&mu_); ++ if (!thread_started_) { ++ break; ++ } ++ } + } + } + +diff --git opencensus/stats/internal/delta_producer.h opencensus/stats/internal/delta_producer.h +index 2cff522..c8e2e95 100644 +--- opencensus/stats/internal/delta_producer.h ++++ opencensus/stats/internal/delta_producer.h +@@ -71,6 +71,8 @@ class DeltaProducer final { + // Returns a pointer to the singleton DeltaProducer. + static DeltaProducer* Get(); + ++ void Shutdown(); ++ + // Adds a new Measure. + void AddMeasure(); + +@@ -122,6 +124,9 @@ class DeltaProducer final { + // thread when calling a flush during harvesting. + Delta last_delta_ GUARDED_BY(harvester_mu_); + std::thread harvester_thread_ GUARDED_BY(harvester_mu_); ++ ++ mutable absl::Mutex mu_; ++ bool thread_started_ GUARDED_BY(mu_) = false; + }; + + } // namespace stats +diff --git opencensus/stats/internal/stats_exporter.cc opencensus/stats/internal/stats_exporter.cc +index 43ddbc7..37b4ae1 100644 --- opencensus/stats/internal/stats_exporter.cc +++ opencensus/stats/internal/stats_exporter.cc -@@ -95,25 +95,51 @@ void StatsExporterImpl::ClearHandlersForTesting() { +@@ -95,25 +95,52 @@ void StatsExporterImpl::ClearHandlersForTesting() { } void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) { @@ -11,11 +95,13 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern +} + +void StatsExporterImpl::Shutdown() { -+ absl::MutexLock l(&mu_); -+ if (!thread_started_) { -+ return; ++ { ++ absl::MutexLock l(&mu_); ++ if (!thread_started_) { ++ return; ++ } ++ thread_started_ = false; + } -+ thread_started_ = false; + // Join loop thread when shutdown. + if (t_.joinable()) { + t_.join(); @@ -24,22 +110,23 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern void StatsExporterImpl::RunWorkerLoop() { absl::Time next_export_time = GetNextExportTime(); -- while (true) { -+ bool thread_started = false; -+ { -+ absl::MutexLock l(&mu_); -+ bool thread_started = thread_started_; -+ } -+ while (thread_started) { + while (true) { // SleepFor() returns immediately when given a negative duration. - absl::SleepFor(next_export_time - absl::Now()); +- absl::SleepFor(next_export_time - absl::Now()); ++ absl::SleepFor(absl::Seconds(0.1)); // In case the last export took longer than the export interval, we // calculate the next time from now. - next_export_time = GetNextExportTime(); - Export(); +- next_export_time = GetNextExportTime(); +- Export(); ++ if (absl::Now() > next_export_time) { ++ next_export_time = GetNextExportTime(); ++ Export(); ++ } + { + absl::MutexLock l(&mu_); -+ thread_started = thread_started_; ++ if (!thread_started_) { ++ break; ++ } + } } } @@ -55,18 +142,20 @@ diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/intern // static void StatsExporter::SetInterval(absl::Duration interval) { StatsExporterImpl::Get()->SetInterval(interval); -diff --git opencensus/stats/internal/stats_exporter_impl.h b/opencensus/stats/internal/stats_exporter_impl.h +diff --git opencensus/stats/internal/stats_exporter_impl.h opencensus/stats/internal/stats_exporter_impl.h +index 11ae3c4..ebe9c4d 100644 --- opencensus/stats/internal/stats_exporter_impl.h +++ opencensus/stats/internal/stats_exporter_impl.h -@@ -35,6 +35,7 @@ class StatsExporterImpl { +@@ -34,6 +34,7 @@ class StatsExporterImpl { + public: static StatsExporterImpl* Get(); void SetInterval(absl::Duration interval); - absl::Time GetNextExportTime() const; + void Shutdown(); + absl::Time GetNextExportTime() const; void AddView(const ViewDescriptor& view); void RemoveView(absl::string_view name); - -diff --git opencensus/stats/stats_exporter.h b/opencensus/stats/stats_exporter.h +diff --git opencensus/stats/stats_exporter.h opencensus/stats/stats_exporter.h +index 6756858..65e0262 100644 --- opencensus/stats/stats_exporter.h +++ opencensus/stats/stats_exporter.h @@ -45,6 +45,8 @@ class StatsExporter final {