mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 19:22:51 +08:00
Fix actor_registry_ copied on each heartbeat; Improve receive object chunk debug messages (#12187)
This commit is contained in:
@@ -770,14 +770,17 @@ ray::Status ObjectManager::ReceiveObjectChunk(const NodeID &client_id,
|
||||
chunk_index);
|
||||
ray::Status status;
|
||||
ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first;
|
||||
num_chunks_received_total_++;
|
||||
if (chunk_status.second.ok()) {
|
||||
// Avoid handling this chunk if it's already being handled by another process.
|
||||
std::memcpy(chunk_info.data, data.data(), chunk_info.buffer_length);
|
||||
buffer_pool_.SealChunk(object_id, chunk_index);
|
||||
} else {
|
||||
RAY_LOG(WARNING) << "ReceiveObjectChunk index " << chunk_index << " of object "
|
||||
<< object_id << " failed: " << chunk_status.second.message();
|
||||
// TODO(hme): If the object isn't local, create a pull request for this chunk.
|
||||
num_chunks_received_failed_++;
|
||||
RAY_LOG(INFO) << "ReceiveObjectChunk index " << chunk_index << " of object "
|
||||
<< object_id << " failed: " << chunk_status.second.message()
|
||||
<< ", overall " << num_chunks_received_failed_ << "/"
|
||||
<< num_chunks_received_total_ << " failed";
|
||||
}
|
||||
return status;
|
||||
}
|
||||
@@ -899,6 +902,8 @@ std::string ObjectManager::DebugString() const {
|
||||
result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size();
|
||||
result << "\n- num pull requests: " << pull_requests_.size();
|
||||
result << "\n- num buffered profile events: " << profile_events_.size();
|
||||
result << "\n- num chunks received total: " << num_chunks_received_total_;
|
||||
result << "\n- num chunks received failed: " << num_chunks_received_failed_;
|
||||
result << "\n" << push_manager_->DebugString();
|
||||
result << "\n" << object_directory_->DebugString();
|
||||
result << "\n" << store_notification_->DebugString();
|
||||
|
||||
@@ -481,6 +481,12 @@ class ObjectManager : public ObjectManagerInterface,
|
||||
|
||||
/// Running sum of the amount of memory used in the object store.
|
||||
int64_t used_memory_ = 0;
|
||||
|
||||
/// Running total of received chunks.
|
||||
int64_t num_chunks_received_total_ = 0;
|
||||
|
||||
/// Running total of received chunks that failed (duplicated).
|
||||
int64_t num_chunks_received_failed_ = 0;
|
||||
};
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -40,25 +40,6 @@ struct ActorStats {
|
||||
int restarting_actors = 0;
|
||||
};
|
||||
|
||||
/// A helper function to return the statistical data of actors in this node manager.
|
||||
ActorStats GetActorStatisticalData(
|
||||
std::unordered_map<ray::ActorID, ray::raylet::ActorRegistration> actor_registry) {
|
||||
ActorStats item;
|
||||
/* TODO(ekl) this gets slower and slower over time since we never clean up dead actors.
|
||||
* https://github.com/ray-project/ray/issues/11239
|
||||
for (auto &pair : actor_registry) {
|
||||
if (pair.second.GetState() == ray::rpc::ActorTableData::ALIVE) {
|
||||
item.live_actors += 1;
|
||||
} else if (pair.second.GetState() == ray::rpc::ActorTableData::RESTARTING) {
|
||||
item.restarting_actors += 1;
|
||||
} else {
|
||||
item.dead_actors += 1;
|
||||
}
|
||||
}
|
||||
*/
|
||||
return item;
|
||||
}
|
||||
|
||||
inline ray::rpc::ObjectReference FlatbufferToSingleObjectReference(
|
||||
const flatbuffers::String &object_id, const ray::protocol::Address &address) {
|
||||
ray::rpc::ObjectReference ref;
|
||||
@@ -3058,12 +3039,14 @@ std::string NodeManager::DebugString() const {
|
||||
result << "\nnum async plasma notifications: "
|
||||
<< async_plasma_objects_notification_.size();
|
||||
}
|
||||
/* Disabled for now #11239.
|
||||
result << "\nActorRegistry:";
|
||||
|
||||
auto statistical_data = GetActorStatisticalData(actor_registry_);
|
||||
result << "\n- num live actors: " << statistical_data.live_actors;
|
||||
result << "\n- num restarting actors: " << statistical_data.restarting_actors;
|
||||
result << "\n- num dead actors: " << statistical_data.dead_actors;
|
||||
*/
|
||||
|
||||
result << "\nRemote node managers: ";
|
||||
for (const auto &entry : remote_node_manager_addresses_) {
|
||||
@@ -3398,9 +3381,11 @@ void NodeManager::RecordMetrics() {
|
||||
object_manager_.RecordMetrics();
|
||||
local_queues_.RecordMetrics();
|
||||
|
||||
/* Disabled for now #11239.
|
||||
auto statistical_data = GetActorStatisticalData(actor_registry_);
|
||||
stats::LiveActors().Record(statistical_data.live_actors);
|
||||
stats::RestartingActors().Record(statistical_data.restarting_actors);
|
||||
*/
|
||||
}
|
||||
|
||||
bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) {
|
||||
|
||||
Reference in New Issue
Block a user