Files
talk/bin/verifications/database/comments.js
T
2017-08-28 14:06:54 -06:00

194 lines
5.9 KiB
JavaScript

const CommentModel = require('../../../models/comment');
const ActionsService = require('../../../services/actions');
const {arrayJoinBy, singleJoinBy} = require('../../../graph/loaders/util');
const sc = require('snake-case');
const debug = require('debug')('talk:cli:verify');
const getBatch = async (limit, offset) => CommentModel
.find({})
.select({'id': 1, 'action_counts': 1, 'reply_count': 1})
.limit(limit)
.skip(offset)
.sort('created_at');
module.exports = async ({fix, limit, batch}) => {
let operations = [];
// Count how many comments there are to process.
const totalCount = await CommentModel.count();
let offset = 0;
let comments = [];
let commentIDs = [];
console.log(`Processing ${totalCount} comments in batches of ${limit}...`);
// Keep processing documents until there are is none left.
while (offset < totalCount) {
// Get a batch of comments.
comments = await getBatch(batch, offset);
commentIDs = comments.map(({id}) => id);
// Get their reply counts.
let allReplyCounts = await CommentModel
.aggregate([
{
$match: {
parent_id: {
$in: commentIDs,
},
status: {
$in: ['NONE', 'ACCEPTED']
}
}
},
{
$group: {
_id: '$parent_id',
count: {
$sum: 1
}
}
}
])
.then(singleJoinBy(commentIDs, '_id'))
.then((results) => results.map((result) => result ? result.count : 0));
// Get their action summaries.
let allActionSummaries = await ActionsService
.getActionSummaries(commentIDs)
.then(arrayJoinBy(commentIDs, 'item_id'));
// Loop over the comments, with their action summaries.
for (let i = 0; i < comments.length; i++) {
let comment = comments[i];
let actionSummaries = allActionSummaries[i];
let replyCount = allReplyCounts[i];
// And check to see if the action summaries we just computed match what is
// currently set for the comments.
let commentOperations = [];
// If the reply count needs to be updated, then update it!
if (comment.reply_count !== replyCount) {
commentOperations.push({
reply_count: replyCount,
});
}
// First we process all the group id's.
for (let actionSummary of actionSummaries) {
if (actionSummary.group_id === null) {
continue;
}
// And we generate the group id.
const ACTION_TYPE = sc(actionSummary.action_type.toLowerCase());
const GROUP_ID = sc(actionSummary.group_id.toLowerCase());
if (GROUP_ID.length <= 0) {
continue;
}
// And we add a new batch operation if the action summary is associated
// with a group.
const ACTION_COUNT_FIELD = `${ACTION_TYPE}_${GROUP_ID}`;
// Check that the action summaries match the cached counts.
if (!comment.action_counts || !(ACTION_COUNT_FIELD in comment.action_counts) || comment.action_counts[ACTION_COUNT_FIELD] !== actionSummary.count) {
// Batch updates for those changes.
commentOperations.push({
[`action_counts.${ACTION_COUNT_FIELD}`]: actionSummary.count,
});
}
}
// Group all the action summaries together from all the different group
// ids.
let groupedActionSummaries = actionSummaries.reduce((acc, actionSummary) => {
const ACTION_TYPE = sc(actionSummary.action_type.toLowerCase());
if (!(ACTION_TYPE in acc)) {
acc[ACTION_TYPE] = 0;
}
acc[ACTION_TYPE] += actionSummary.count;
return acc;
}, {});
for (const ACTION_COUNT_FIELD of Object.keys(groupedActionSummaries)) {
const count = groupedActionSummaries[ACTION_COUNT_FIELD];
// Check that the action summaries match the cached counts.
if (!comment.action_counts || !(ACTION_COUNT_FIELD in comment.action_counts) || comment.action_counts[ACTION_COUNT_FIELD] !== count) {
// Batch updates for those changes.
commentOperations.push({
[`action_counts.${ACTION_COUNT_FIELD}`]: count,
});
}
}
// If this comment has action summaries that should be updated, then
// perform an update!
if (commentOperations.length > 0) {
operations.push({
updateOne: {
filter: {
id: comment.id
},
update: {
$set: Object.assign({}, ...commentOperations),
},
},
});
}
}
debug(`Processed batch of ${comments.length} comments.`);
if (operations.length >= limit) {
debug(`Queued operations are ${operations.length}, reached limit of ${limit}, not processing any more.`);
if (operations.length > limit) {
debug(`${operations.length - limit} operations have been truncated to enforce the limit`);
}
break;
}
offset += batch;
}
const OPERATIONS_LENGTH = operations.length;
if (limit < Infinity && offset + comments.length < totalCount) {
console.log(`Processed ${offset + comments.length}/${totalCount} comments because we reached the update limit of ${limit}.`);
} else {
console.log(`Processed all ${totalCount} comments.`);
}
console.log(`${OPERATIONS_LENGTH} documents need fixing.`);
// If fix was enabled, execute the batch writes.
if (OPERATIONS_LENGTH > 0) {
if (fix) {
debug(`Fixing ${OPERATIONS_LENGTH} documents...`);
while (operations.length) {
let batchOperations = operations.splice(0, batch);
let result = await CommentModel.collection.bulkWrite(batchOperations);
debug(`Fixed batch of ${result.modifiedCount} documents.`);
}
console.log(`Applied all ${OPERATIONS_LENGTH} fixes.`);
} else {
console.warn('Skipping fixing, --fix was not enabled, pass --fix to fix these errors');
}
}
};