From b499100a88b6c6d7331fa5761afe65404805cb92 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 18 Mar 2020 22:39:21 -0700 Subject: [PATCH] Enable distributed ref counting by default (#7628) * enable * Turn on eager eviction * Shorten tests and drain ReferenceCounter * Don't force kill actor handles that have gone out of scope, lint * Fix locks * Cleanup Plasma Async Callback (#7452) * [rllib][tune] fix some nans (#7611) * Change /tmp to platform-specific temporary directory (#7529) * [Serve] UI Improvements (#7569) * bugfix about test_dynres.py (#7615) Co-authored-by: senlin.zsl * Java call Python actor method use actor.call (#7614) * bug fix about useage of absl::flat_hash_map::erase and absl::flat_hash_set::erase (#7633) Co-authored-by: senlin.zsl * [Java] Make both `RayActor` and `RayPyActor` inheriting from `BaseActor` (#7462) * [Java] Fix the issue that the cached value in `RayObject` is serialized (#7613) * Add failure tests to test_reference_counting (#7400) * Fix typo in asyncio documentation (#7602) * Fix segfault * debug * Force kill actor * Fix test --- python/ray/tests/conftest.py | 2 ++ python/ray/tests/test_global_gc.py | 1 + python/ray/tests/test_reference_counting.py | 11 +----- src/ray/common/ray_config_def.h | 4 +-- src/ray/core_worker/core_worker.cc | 3 ++ src/ray/core_worker/reference_count.cc | 21 +++++++++++ src/ray/core_worker/reference_count.h | 13 +++++++ src/ray/core_worker/task_manager.cc | 39 ++++++++++++++------- 8 files changed, 70 insertions(+), 24 deletions(-) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 80d014ff2..e06c655f3 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -22,6 +22,8 @@ def get_default_fixure_internal_config(): internal_config = json.dumps({ "initial_reconstruction_timeout_milliseconds": 200, "num_heartbeats_timeout": 10, + "object_store_full_max_retries": 3, + "object_store_full_initial_delay_ms": 100, }) return internal_config diff --git a/python/ray/tests/test_global_gc.py b/python/ray/tests/test_global_gc.py index d922436b3..6bb7c9540 100644 --- a/python/ray/tests/test_global_gc.py +++ b/python/ray/tests/test_global_gc.py @@ -112,6 +112,7 @@ def test_global_gc_when_full(shutdown_only): # Remote workers. actors = [GarbageHolder.remote() for _ in range(2)] + assert all(ray.get([a.has_garbage.remote() for a in actors])) # GC should be triggered for all workers, including the local driver, # when a remote task tries to put a return value that doesn't fit in diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 1f24654f6..ef3462bb1 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -18,16 +18,7 @@ logger = logging.getLogger(__name__) @pytest.fixture def one_worker_100MiB(request): - config = json.dumps({ - "distributed_ref_counting_enabled": 1, - "task_retry_delay_ms": 0, - "object_store_full_max_retries": 3, - "object_store_full_initial_delay_ms": 100, - }) - yield ray.init( - num_cpus=1, - object_store_memory=100 * 1024 * 1024, - _internal_config=config) + yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024) ray.shutdown() diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 5934813a4..a57da6984 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -72,7 +72,7 @@ RAY_CONFIG(bool, object_pinning_enabled, true) /// cluster and all objects that contain it are also out of scope. If this flag /// is off and object_pinning_enabled is turned on, then an object will not be /// LRU evicted until it is out of scope on the CREATOR of the ObjectID. -RAY_CONFIG(bool, distributed_ref_counting_enabled, false) +RAY_CONFIG(bool, distributed_ref_counting_enabled, true) /// Whether to record the creation sites of object references. This adds more /// information to `ray memstat`, but introduces a little extra overhead when @@ -91,7 +91,7 @@ RAY_CONFIG(bool, record_ref_creation_sites, true) /// NOTE(swang): The timer is checked by the raylet during every heartbeat, so /// this should be set to a value larger than /// raylet_heartbeat_timeout_milliseconds. -RAY_CONFIG(int64_t, free_objects_period_milliseconds, -1) +RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000) /// If object_pinning_enabled is on, then objects that have been unpinned are /// added to a local cache. When the cache is flushed, all objects in the cache diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index aaede2c53..cd3292aee 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -288,6 +288,9 @@ void CoreWorker::Disconnect() { } void CoreWorker::Exit(bool intentional) { + RAY_LOG(INFO) + << "Exit signal " << (intentional ? "(intentional)" : "") + << " received, this process will exit after all outstanding tasks have finished"; exiting_ = true; // Release the resources early in case draining takes a long time. RAY_CHECK_OK(local_raylet_client_->NotifyDirectCallTaskBlocked()); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index fc670432f..a47b542d6 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -29,6 +29,26 @@ namespace {} // namespace namespace ray { +void ReferenceCounter::DrainAndShutdown(std::function shutdown) { + absl::MutexLock lock(&mutex_); + if (object_id_refs_.empty()) { + shutdown(); + } else { + RAY_LOG(WARNING) + << "This worker is still managing " << object_id_refs_.size() + << " objects, waiting for them to go out of scope before shutting down."; + } + shutdown_hook_ = shutdown; +} + +void ReferenceCounter::ShutdownIfNeeded() { + if (shutdown_hook_ && object_id_refs_.empty()) { + RAY_LOG(WARNING) + << "All object references have gone out of scope, shutting down worker."; + shutdown_hook_(); + } +} + ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto( const ReferenceTableProto &proto) { ReferenceTable refs; @@ -342,6 +362,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, if (should_delete_reference) { RAY_LOG(DEBUG) << "Deleting Reference to object " << id; object_id_refs_.erase(it); + ShutdownIfNeeded(); } } diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 95255c17b..97957f909 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -46,6 +46,11 @@ class ReferenceCounter { ~ReferenceCounter() {} + /// Wait for all object references to go out of scope, and then shutdown. + /// + /// \param shutdown The shutdown callback to call. + void DrainAndShutdown(std::function shutdown); + /// Increase the reference count for the ObjectID by one. If there is no /// entry for the ObjectID, one will be created. The object ID will not have /// any owner information, since we don't know how it was created. @@ -365,6 +370,10 @@ class ReferenceCounter { using ReferenceTable = absl::flat_hash_map; + /// Shutdown if all references have gone out of scope and shutdown + /// is scheduled. + void ShutdownIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + /// Deserialize a ReferenceTable. static ReferenceTable ReferenceTableFromProto(const ReferenceTableProto &proto); @@ -487,6 +496,10 @@ class ReferenceCounter { /// Holds all reference counts and dependency information for tracked ObjectIDs. ReferenceTable object_id_refs_ GUARDED_BY(mutex_); + + /// Optional shutdown hook to call when all references have gone + /// out of scope. + std::function shutdown_hook_ GUARDED_BY(mutex_) = nullptr; }; } // namespace ray diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 1acbcacfa..da463753a 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -74,15 +74,22 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, } void TaskManager::DrainAndShutdown(std::function shutdown) { - absl::MutexLock lock(&mu_); - if (pending_tasks_.empty()) { - shutdown(); - } else { - RAY_LOG(WARNING) - << "This worker is still managing " << pending_tasks_.size() - << " in flight tasks, waiting for them to finish before shutting down."; + bool has_pending_tasks = false; + { + absl::MutexLock lock(&mu_); + if (!pending_tasks_.empty()) { + has_pending_tasks = true; + RAY_LOG(WARNING) + << "This worker is still managing " << pending_tasks_.size() + << " in flight tasks, waiting for them to finish before shutting down."; + shutdown_hook_ = shutdown; + } + } + + // Do not hold the lock when calling into the reference counter. + if (!has_pending_tasks) { + reference_counter_->DrainAndShutdown(shutdown); } - shutdown_hook_ = shutdown; } bool TaskManager::IsTaskPending(const TaskID &task_id) const { @@ -201,10 +208,18 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } void TaskManager::ShutdownIfNeeded() { - absl::MutexLock lock(&mu_); - if (shutdown_hook_ && pending_tasks_.empty()) { - RAY_LOG(WARNING) << "All in flight tasks finished, shutting down worker."; - shutdown_hook_(); + std::function shutdown_hook = nullptr; + { + absl::MutexLock lock(&mu_); + if (shutdown_hook_ && pending_tasks_.empty()) { + RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after " + "draining references."; + std::swap(shutdown_hook_, shutdown_hook); + } + } + // Do not hold the lock when calling into the reference counter. + if (shutdown_hook != nullptr) { + reference_counter_->DrainAndShutdown(shutdown_hook); } }