mirror of
https://github.com/wassname/talk.git
synced 2026-06-28 18:30:01 +08:00
194 lines
5.9 KiB
JavaScript
194 lines
5.9 KiB
JavaScript
const CommentModel = require('../../../models/comment');
|
|
const ActionsService = require('../../../services/actions');
|
|
const {arrayJoinBy, singleJoinBy} = require('../../../graph/loaders/util');
|
|
const sc = require('snake-case');
|
|
const debug = require('debug')('talk:cli:verify');
|
|
|
|
const getBatch = async (limit, offset) => CommentModel
|
|
.find({})
|
|
.select({'id': 1, 'action_counts': 1, 'reply_count': 1})
|
|
.limit(limit)
|
|
.skip(offset)
|
|
.sort('created_at');
|
|
|
|
module.exports = async ({fix, limit, batch}) => {
|
|
let operations = [];
|
|
|
|
// Count how many comments there are to process.
|
|
const totalCount = await CommentModel.count();
|
|
|
|
let offset = 0;
|
|
let comments = [];
|
|
let commentIDs = [];
|
|
|
|
console.log(`Processing ${totalCount} comments in batches of ${limit}...`);
|
|
|
|
// Keep processing documents until there are is none left.
|
|
while (offset < totalCount) {
|
|
|
|
// Get a batch of comments.
|
|
comments = await getBatch(batch, offset);
|
|
commentIDs = comments.map(({id}) => id);
|
|
|
|
// Get their reply counts.
|
|
let allReplyCounts = await CommentModel
|
|
.aggregate([
|
|
{
|
|
$match: {
|
|
parent_id: {
|
|
$in: commentIDs,
|
|
},
|
|
status: {
|
|
$in: ['NONE', 'ACCEPTED']
|
|
}
|
|
}
|
|
},
|
|
{
|
|
$group: {
|
|
_id: '$parent_id',
|
|
count: {
|
|
$sum: 1
|
|
}
|
|
}
|
|
}
|
|
])
|
|
.then(singleJoinBy(commentIDs, '_id'))
|
|
.then((results) => results.map((result) => result ? result.count : 0));
|
|
|
|
// Get their action summaries.
|
|
let allActionSummaries = await ActionsService
|
|
.getActionSummaries(commentIDs)
|
|
.then(arrayJoinBy(commentIDs, 'item_id'));
|
|
|
|
// Loop over the comments, with their action summaries.
|
|
for (let i = 0; i < comments.length; i++) {
|
|
let comment = comments[i];
|
|
let actionSummaries = allActionSummaries[i];
|
|
let replyCount = allReplyCounts[i];
|
|
|
|
// And check to see if the action summaries we just computed match what is
|
|
// currently set for the comments.
|
|
let commentOperations = [];
|
|
|
|
// If the reply count needs to be updated, then update it!
|
|
if (comment.reply_count !== replyCount) {
|
|
commentOperations.push({
|
|
reply_count: replyCount,
|
|
});
|
|
}
|
|
|
|
// First we process all the group id's.
|
|
for (let actionSummary of actionSummaries) {
|
|
if (actionSummary.group_id === null) {
|
|
continue;
|
|
}
|
|
|
|
// And we generate the group id.
|
|
const ACTION_TYPE = sc(actionSummary.action_type.toLowerCase());
|
|
const GROUP_ID = sc(actionSummary.group_id.toLowerCase());
|
|
|
|
if (GROUP_ID.length <= 0) {
|
|
continue;
|
|
}
|
|
|
|
// And we add a new batch operation if the action summary is associated
|
|
// with a group.
|
|
const ACTION_COUNT_FIELD = `${ACTION_TYPE}_${GROUP_ID}`;
|
|
|
|
// Check that the action summaries match the cached counts.
|
|
if (!comment.action_counts || !(ACTION_COUNT_FIELD in comment.action_counts) || comment.action_counts[ACTION_COUNT_FIELD] !== actionSummary.count) {
|
|
|
|
// Batch updates for those changes.
|
|
commentOperations.push({
|
|
[`action_counts.${ACTION_COUNT_FIELD}`]: actionSummary.count,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Group all the action summaries together from all the different group
|
|
// ids.
|
|
let groupedActionSummaries = actionSummaries.reduce((acc, actionSummary) => {
|
|
const ACTION_TYPE = sc(actionSummary.action_type.toLowerCase());
|
|
|
|
if (!(ACTION_TYPE in acc)) {
|
|
acc[ACTION_TYPE] = 0;
|
|
}
|
|
|
|
acc[ACTION_TYPE] += actionSummary.count;
|
|
|
|
return acc;
|
|
}, {});
|
|
|
|
for (const ACTION_COUNT_FIELD of Object.keys(groupedActionSummaries)) {
|
|
const count = groupedActionSummaries[ACTION_COUNT_FIELD];
|
|
|
|
// Check that the action summaries match the cached counts.
|
|
if (!comment.action_counts || !(ACTION_COUNT_FIELD in comment.action_counts) || comment.action_counts[ACTION_COUNT_FIELD] !== count) {
|
|
|
|
// Batch updates for those changes.
|
|
commentOperations.push({
|
|
[`action_counts.${ACTION_COUNT_FIELD}`]: count,
|
|
});
|
|
}
|
|
}
|
|
|
|
// If this comment has action summaries that should be updated, then
|
|
// perform an update!
|
|
if (commentOperations.length > 0) {
|
|
operations.push({
|
|
updateOne: {
|
|
filter: {
|
|
id: comment.id
|
|
},
|
|
update: {
|
|
$set: Object.assign({}, ...commentOperations),
|
|
},
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
debug(`Processed batch of ${comments.length} comments.`);
|
|
|
|
if (operations.length >= limit) {
|
|
debug(`Queued operations are ${operations.length}, reached limit of ${limit}, not processing any more.`);
|
|
|
|
if (operations.length > limit) {
|
|
debug(`${operations.length - limit} operations have been truncated to enforce the limit`);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
offset += batch;
|
|
}
|
|
|
|
const OPERATIONS_LENGTH = operations.length;
|
|
|
|
if (limit < Infinity && offset + comments.length < totalCount) {
|
|
console.log(`Processed ${offset + comments.length}/${totalCount} comments because we reached the update limit of ${limit}.`);
|
|
} else {
|
|
console.log(`Processed all ${totalCount} comments.`);
|
|
}
|
|
|
|
console.log(`${OPERATIONS_LENGTH} documents need fixing.`);
|
|
|
|
// If fix was enabled, execute the batch writes.
|
|
if (OPERATIONS_LENGTH > 0) {
|
|
if (fix) {
|
|
debug(`Fixing ${OPERATIONS_LENGTH} documents...`);
|
|
|
|
while (operations.length) {
|
|
let batchOperations = operations.splice(0, batch);
|
|
let result = await CommentModel.collection.bulkWrite(batchOperations);
|
|
|
|
debug(`Fixed batch of ${result.modifiedCount} documents.`);
|
|
}
|
|
|
|
console.log(`Applied all ${OPERATIONS_LENGTH} fixes.`);
|
|
} else {
|
|
console.warn('Skipping fixing, --fix was not enabled, pass --fix to fix these errors');
|
|
}
|
|
}
|
|
};
|