mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 03:27:50 +08:00
Enable distributed ref counting by default (#7628)
* enable * Turn on eager eviction * Shorten tests and drain ReferenceCounter * Don't force kill actor handles that have gone out of scope, lint * Fix locks * Cleanup Plasma Async Callback (#7452) * [rllib][tune] fix some nans (#7611) * Change /tmp to platform-specific temporary directory (#7529) * [Serve] UI Improvements (#7569) * bugfix about test_dynres.py (#7615) Co-authored-by: senlin.zsl <senlin.zsl@antfin.com> * Java call Python actor method use actor.call (#7614) * bug fix about useage of absl::flat_hash_map::erase and absl::flat_hash_set::erase (#7633) Co-authored-by: senlin.zsl <senlin.zsl@antfin.com> * [Java] Make both `RayActor` and `RayPyActor` inheriting from `BaseActor` (#7462) * [Java] Fix the issue that the cached value in `RayObject` is serialized (#7613) * Add failure tests to test_reference_counting (#7400) * Fix typo in asyncio documentation (#7602) * Fix segfault * debug * Force kill actor * Fix test
This commit is contained in:
@@ -22,6 +22,8 @@ def get_default_fixure_internal_config():
|
||||
internal_config = json.dumps({
|
||||
"initial_reconstruction_timeout_milliseconds": 200,
|
||||
"num_heartbeats_timeout": 10,
|
||||
"object_store_full_max_retries": 3,
|
||||
"object_store_full_initial_delay_ms": 100,
|
||||
})
|
||||
return internal_config
|
||||
|
||||
|
||||
@@ -112,6 +112,7 @@ def test_global_gc_when_full(shutdown_only):
|
||||
|
||||
# Remote workers.
|
||||
actors = [GarbageHolder.remote() for _ in range(2)]
|
||||
assert all(ray.get([a.has_garbage.remote() for a in actors]))
|
||||
|
||||
# GC should be triggered for all workers, including the local driver,
|
||||
# when a remote task tries to put a return value that doesn't fit in
|
||||
|
||||
@@ -18,16 +18,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.fixture
|
||||
def one_worker_100MiB(request):
|
||||
config = json.dumps({
|
||||
"distributed_ref_counting_enabled": 1,
|
||||
"task_retry_delay_ms": 0,
|
||||
"object_store_full_max_retries": 3,
|
||||
"object_store_full_initial_delay_ms": 100,
|
||||
})
|
||||
yield ray.init(
|
||||
num_cpus=1,
|
||||
object_store_memory=100 * 1024 * 1024,
|
||||
_internal_config=config)
|
||||
yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ RAY_CONFIG(bool, object_pinning_enabled, true)
|
||||
/// cluster and all objects that contain it are also out of scope. If this flag
|
||||
/// is off and object_pinning_enabled is turned on, then an object will not be
|
||||
/// LRU evicted until it is out of scope on the CREATOR of the ObjectID.
|
||||
RAY_CONFIG(bool, distributed_ref_counting_enabled, false)
|
||||
RAY_CONFIG(bool, distributed_ref_counting_enabled, true)
|
||||
|
||||
/// Whether to record the creation sites of object references. This adds more
|
||||
/// information to `ray memstat`, but introduces a little extra overhead when
|
||||
@@ -91,7 +91,7 @@ RAY_CONFIG(bool, record_ref_creation_sites, true)
|
||||
/// NOTE(swang): The timer is checked by the raylet during every heartbeat, so
|
||||
/// this should be set to a value larger than
|
||||
/// raylet_heartbeat_timeout_milliseconds.
|
||||
RAY_CONFIG(int64_t, free_objects_period_milliseconds, -1)
|
||||
RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000)
|
||||
|
||||
/// If object_pinning_enabled is on, then objects that have been unpinned are
|
||||
/// added to a local cache. When the cache is flushed, all objects in the cache
|
||||
|
||||
@@ -288,6 +288,9 @@ void CoreWorker::Disconnect() {
|
||||
}
|
||||
|
||||
void CoreWorker::Exit(bool intentional) {
|
||||
RAY_LOG(INFO)
|
||||
<< "Exit signal " << (intentional ? "(intentional)" : "")
|
||||
<< " received, this process will exit after all outstanding tasks have finished";
|
||||
exiting_ = true;
|
||||
// Release the resources early in case draining takes a long time.
|
||||
RAY_CHECK_OK(local_raylet_client_->NotifyDirectCallTaskBlocked());
|
||||
|
||||
@@ -29,6 +29,26 @@ namespace {} // namespace
|
||||
|
||||
namespace ray {
|
||||
|
||||
void ReferenceCounter::DrainAndShutdown(std::function<void()> shutdown) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
if (object_id_refs_.empty()) {
|
||||
shutdown();
|
||||
} else {
|
||||
RAY_LOG(WARNING)
|
||||
<< "This worker is still managing " << object_id_refs_.size()
|
||||
<< " objects, waiting for them to go out of scope before shutting down.";
|
||||
}
|
||||
shutdown_hook_ = shutdown;
|
||||
}
|
||||
|
||||
void ReferenceCounter::ShutdownIfNeeded() {
|
||||
if (shutdown_hook_ && object_id_refs_.empty()) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "All object references have gone out of scope, shutting down worker.";
|
||||
shutdown_hook_();
|
||||
}
|
||||
}
|
||||
|
||||
ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto(
|
||||
const ReferenceTableProto &proto) {
|
||||
ReferenceTable refs;
|
||||
@@ -342,6 +362,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
|
||||
if (should_delete_reference) {
|
||||
RAY_LOG(DEBUG) << "Deleting Reference to object " << id;
|
||||
object_id_refs_.erase(it);
|
||||
ShutdownIfNeeded();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,11 @@ class ReferenceCounter {
|
||||
|
||||
~ReferenceCounter() {}
|
||||
|
||||
/// Wait for all object references to go out of scope, and then shutdown.
|
||||
///
|
||||
/// \param shutdown The shutdown callback to call.
|
||||
void DrainAndShutdown(std::function<void()> shutdown);
|
||||
|
||||
/// Increase the reference count for the ObjectID by one. If there is no
|
||||
/// entry for the ObjectID, one will be created. The object ID will not have
|
||||
/// any owner information, since we don't know how it was created.
|
||||
@@ -365,6 +370,10 @@ class ReferenceCounter {
|
||||
|
||||
using ReferenceTable = absl::flat_hash_map<ObjectID, Reference>;
|
||||
|
||||
/// Shutdown if all references have gone out of scope and shutdown
|
||||
/// is scheduled.
|
||||
void ShutdownIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||
|
||||
/// Deserialize a ReferenceTable.
|
||||
static ReferenceTable ReferenceTableFromProto(const ReferenceTableProto &proto);
|
||||
|
||||
@@ -487,6 +496,10 @@ class ReferenceCounter {
|
||||
|
||||
/// Holds all reference counts and dependency information for tracked ObjectIDs.
|
||||
ReferenceTable object_id_refs_ GUARDED_BY(mutex_);
|
||||
|
||||
/// Optional shutdown hook to call when all references have gone
|
||||
/// out of scope.
|
||||
std::function<void()> shutdown_hook_ GUARDED_BY(mutex_) = nullptr;
|
||||
};
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -74,15 +74,22 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
|
||||
}
|
||||
|
||||
void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
|
||||
absl::MutexLock lock(&mu_);
|
||||
if (pending_tasks_.empty()) {
|
||||
shutdown();
|
||||
} else {
|
||||
RAY_LOG(WARNING)
|
||||
<< "This worker is still managing " << pending_tasks_.size()
|
||||
<< " in flight tasks, waiting for them to finish before shutting down.";
|
||||
bool has_pending_tasks = false;
|
||||
{
|
||||
absl::MutexLock lock(&mu_);
|
||||
if (!pending_tasks_.empty()) {
|
||||
has_pending_tasks = true;
|
||||
RAY_LOG(WARNING)
|
||||
<< "This worker is still managing " << pending_tasks_.size()
|
||||
<< " in flight tasks, waiting for them to finish before shutting down.";
|
||||
shutdown_hook_ = shutdown;
|
||||
}
|
||||
}
|
||||
|
||||
// Do not hold the lock when calling into the reference counter.
|
||||
if (!has_pending_tasks) {
|
||||
reference_counter_->DrainAndShutdown(shutdown);
|
||||
}
|
||||
shutdown_hook_ = shutdown;
|
||||
}
|
||||
|
||||
bool TaskManager::IsTaskPending(const TaskID &task_id) const {
|
||||
@@ -201,10 +208,18 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
|
||||
}
|
||||
|
||||
void TaskManager::ShutdownIfNeeded() {
|
||||
absl::MutexLock lock(&mu_);
|
||||
if (shutdown_hook_ && pending_tasks_.empty()) {
|
||||
RAY_LOG(WARNING) << "All in flight tasks finished, shutting down worker.";
|
||||
shutdown_hook_();
|
||||
std::function<void()> shutdown_hook = nullptr;
|
||||
{
|
||||
absl::MutexLock lock(&mu_);
|
||||
if (shutdown_hook_ && pending_tasks_.empty()) {
|
||||
RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after "
|
||||
"draining references.";
|
||||
std::swap(shutdown_hook_, shutdown_hook);
|
||||
}
|
||||
}
|
||||
// Do not hold the lock when calling into the reference counter.
|
||||
if (shutdown_hook != nullptr) {
|
||||
reference_counter_->DrainAndShutdown(shutdown_hook);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user