diff --git a/bin/cli b/bin/cli index 6237d4740..a0a0a24a0 100755 --- a/bin/cli +++ b/bin/cli @@ -18,7 +18,6 @@ program .command('token', 'work with the access tokens') .command('users', 'work with the application auth') .command('migration', 'provides utilities for migrating the database') - .command('verify', 'provides utilities for performing data verification') .command( 'plugins', 'provides utilities for interacting with the plugin system' diff --git a/bin/cli-verify b/bin/cli-verify deleted file mode 100755 index e32c7d169..000000000 --- a/bin/cli-verify +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env node - -/** - * Module dependencies. - */ - -const util = require('./util'); -const program = require('commander'); -const mongoose = require('../services/mongoose'); -const databaseVerifications = require('./verifications/database'); - -// Register the shutdown criteria. -util.onshutdown([() => mongoose.disconnect()]); - -async function database({ fix = false, limit = Infinity, batch = 1000 }) { - try { - for (const verification of databaseVerifications) { - await verification({ fix, limit, batch }); - } - } catch (err) { - console.error( - `Failed to process all the ${databaseVerifications.length} verifications`, - err - ); - util.shutdown(1); - return; - } - - util.shutdown(); -} - -//============================================================================== -// Setting up the program command line arguments. -//============================================================================== - -program - .command('db') - .description('verifies the database integrity') - .option('-f, --fix', 'fix the problems found with database inconsistencies') - .option( - '-l, --limit [size]', - 'limit the amount of documents to process in a single pass, this will ensure only a maximum number of batch operations are issued [default: inf]', - parseInt - ) - .option( - '-b, --batch [size]', - 'batch size to process verifications and repairs of documents [default: 1000]', - parseInt - ) - .action(database); - -program.parse(process.argv); - -// If there is no command listed, output help. -if (!process.argv.slice(2).length) { - program.outputHelp(); - util.shutdown(); -} diff --git a/bin/verifications/database/action_counts.js b/bin/verifications/database/action_counts.js deleted file mode 100644 index 684c17bca..000000000 --- a/bin/verifications/database/action_counts.js +++ /dev/null @@ -1,194 +0,0 @@ -const UserModel = require('../../../models/user'); -const CommentModel = require('../../../models/comment'); -const ActionsService = require('../../../services/actions'); -const { arrayJoinBy } = require('../../../graph/loaders/util'); -const { get } = require('lodash'); -const debug = require('debug')('talk:cli:verify'); - -const MODELS = [UserModel, CommentModel]; - -async function processBatch(Model, documents) { - // Get an array of all the document id's. - const documentIDs = documents.map(({ id }) => id); - - // Store all the operations on this batch in this array that we'll return - // later. - const operations = []; - - // Get the action summaries for this batch. - const totalActionSummaries = await ActionsService.getActionSummaries( - documentIDs - ).then(arrayJoinBy(documentIDs, 'item_id')); - - // Iterate over the documents. - for (let i = 0; i < documents.length; i++) { - const document = documents[i]; - const actionSummaries = totalActionSummaries[i]; - - let ops = []; - - for (const actionSummary of actionSummaries) { - if (actionSummary.group_id === null) { - continue; - } - - // And we generate the group id. - const ACTION_TYPE = actionSummary.action_type.toLowerCase(); - const GROUP_ID = actionSummary.group_id.toLowerCase(); - - if (GROUP_ID.length <= 0) { - continue; - } - - // And we add a new batch operation if the action summary is associated - // with a group. - const ACTION_COUNT_FIELD = `${ACTION_TYPE}_${GROUP_ID}`; - - // Check that the action summaries match the cached counts. - if ( - get(document, ['action_counts', ACTION_COUNT_FIELD]) !== - actionSummary.count - ) { - // Batch updates for those changes. - ops.push({ - [`action_counts.${ACTION_COUNT_FIELD}`]: actionSummary.count, - }); - } - } - - // Group all the action summaries together from all the different group - // ids. - const groupedActionSummaries = actionSummaries.reduce( - (acc, actionSummary) => { - // action_type is already snake cased (as it would have had to be when it - // was inserted in the database). - const ACTION_TYPE = actionSummary.action_type.toLowerCase(); - - if (!(ACTION_TYPE in acc)) { - acc[ACTION_TYPE] = 0; - } - - acc[ACTION_TYPE] += actionSummary.count; - - return acc; - }, - {} - ); - - for (const ACTION_COUNT_FIELD of Object.keys(groupedActionSummaries)) { - const count = groupedActionSummaries[ACTION_COUNT_FIELD]; - - // Check that the action summaries match the cached counts. - if (get(document, ['action_counts', ACTION_COUNT_FIELD]) !== count) { - // Batch updates for those changes. - ops.push({ - [`action_counts.${ACTION_COUNT_FIELD}`]: count, - }); - } - } - - // If this comment has action summaries that should be updated, then - // perform an update! - if (ops.length > 0) { - operations.push({ - updateOne: { - filter: { - id: document.id, - }, - update: { - $set: Object.assign({}, ...ops), - }, - }, - }); - } - } - - return operations; -} - -module.exports = async ({ fix, batch }) => { - for (const Model of MODELS) { - const cursor = Model.collection - .find({}) - .project({ - id: 1, - action_counts: 1, - }) - .sort({ created_at: 1 }); - - let operations = []; - let documents = []; - - // While there are documents to process. - while (await cursor.hasNext()) { - // Load the document. - const document = await cursor.next(); - - // Push the document into the documents array. - documents.push(document); - - // Check to see if the length of the documents array requires us to - // process it. - if (documents.length > batch) { - // Process this batch. - let batchOperations = await processBatch(Model, documents); - - // Push the batch operations into the model operations. - operations.push(...batchOperations); - - // Clear this batch contents. - documents = []; - } - } - - // Check to see if there are any documents left over. - if (documents.length > 0) { - // Process this batch. - let batchOperations = await processBatch(Model, documents); - - // Push the batch operations into the model operations. - operations.push(...batchOperations); - } - - const OPERATIONS_LENGTH = operations.length; - - console.log( - `action_counts.js: ${OPERATIONS_LENGTH} ${ - Model.collection.name - } need their action counts fixed.` - ); - - // If fix was enabled, execute the batch writes. - if (OPERATIONS_LENGTH > 0) { - if (fix) { - debug( - `action_counts.js: fixing ${OPERATIONS_LENGTH} ${ - Model.collection.name - }...` - ); - - while (operations.length) { - let result = await Model.collection.bulkWrite( - operations.splice(0, batch) - ); - - debug( - `action_counts.js: fixed batch of ${result.modifiedCount} ${ - Model.collection.name - }.` - ); - } - - console.log( - `action_counts.js: applied all ${OPERATIONS_LENGTH} fixes to ${ - Model.collection.name - }.` - ); - } else { - console.warn( - 'Skipping fixing, --fix was not enabled, pass --fix to fix these errors' - ); - } - } - } -}; diff --git a/bin/verifications/database/comment_replies.js b/bin/verifications/database/comment_replies.js deleted file mode 100644 index 309023c75..000000000 --- a/bin/verifications/database/comment_replies.js +++ /dev/null @@ -1,140 +0,0 @@ -const CommentModel = require('../../../models/comment'); -const { singleJoinBy } = require('../../../graph/loaders/util'); -const debug = require('debug')('talk:cli:verify'); - -const getBatch = async (limit, offset) => - CommentModel.find({}) - .select({ id: 1, action_counts: 1, reply_count: 1 }) - .limit(limit) - .skip(offset) - .sort('created_at'); - -module.exports = async ({ fix, limit, batch }) => { - let operations = []; - - // Count how many comments there are to process. - const totalCount = await CommentModel.count(); - - let offset = 0; - let comments = []; - let commentIDs = []; - - console.log(`Processing ${totalCount} comments in batches of ${limit}...`); - - // Keep processing documents until there are is none left. - while (offset < totalCount) { - // Get a batch of comments. - comments = await getBatch(batch, offset); - commentIDs = comments.map(({ id }) => id); - - // Get their reply counts. - let allReplyCounts = await CommentModel.aggregate([ - { - $match: { - parent_id: { - $in: commentIDs, - }, - status: { - $in: ['NONE', 'ACCEPTED'], - }, - }, - }, - { - $group: { - _id: '$parent_id', - count: { - $sum: 1, - }, - }, - }, - ]) - .then(singleJoinBy(commentIDs, '_id')) - .then(results => results.map(result => (result ? result.count : 0))); - - // Loop over the comments, with their action summaries. - for (let i = 0; i < comments.length; i++) { - let comment = comments[i]; - let replyCount = allReplyCounts[i]; - - // And check to see if the action summaries we just computed match what is - // currently set for the comments. - let commentOperations = []; - - // If the reply count needs to be updated, then update it! - if (comment.reply_count !== replyCount) { - commentOperations.push({ - reply_count: replyCount, - }); - } - - // If this comment has action summaries that should be updated, then - // perform an update! - if (commentOperations.length > 0) { - operations.push({ - updateOne: { - filter: { - id: comment.id, - }, - update: { - $set: Object.assign({}, ...commentOperations), - }, - }, - }); - } - } - - debug(`Processed batch of ${comments.length} comments.`); - - if (operations.length >= limit) { - debug( - `Queued operations are ${ - operations.length - }, reached limit of ${limit}, not processing any more.` - ); - - if (operations.length > limit) { - debug( - `${operations.length - - limit} operations have been truncated to enforce the limit` - ); - } - - break; - } - - offset += batch; - } - - const OPERATIONS_LENGTH = operations.length; - - if (limit < Infinity && offset + comments.length < totalCount) { - console.log( - `Processed ${offset + - comments.length}/${totalCount} comments because we reached the update limit of ${limit}.` - ); - } else { - console.log(`Processed all ${totalCount} comments.`); - } - - console.log(`${OPERATIONS_LENGTH} documents need fixing.`); - - // If fix was enabled, execute the batch writes. - if (OPERATIONS_LENGTH > 0) { - if (fix) { - debug(`Fixing ${OPERATIONS_LENGTH} documents...`); - - while (operations.length) { - let batchOperations = operations.splice(0, batch); - let result = await CommentModel.collection.bulkWrite(batchOperations); - - debug(`Fixed batch of ${result.modifiedCount} documents.`); - } - - console.log(`Applied all ${OPERATIONS_LENGTH} fixes.`); - } else { - console.warn( - 'Skipping fixing, --fix was not enabled, pass --fix to fix these errors' - ); - } - } -}; diff --git a/bin/verifications/database/index.js b/bin/verifications/database/index.js deleted file mode 100644 index 58a46bd85..000000000 --- a/bin/verifications/database/index.js +++ /dev/null @@ -1,10 +0,0 @@ -// This will import all the verifications that should be run by the: -// -// cli verify database -// -// command. They exist in the form: -// -// async ({fix = false, batch = 1000}) => {} -// -// where their options are derived. -module.exports = [require('./comment_replies'), require('./action_counts')]; diff --git a/docs/_docs/06-01-migrating-4.md b/docs/_docs/06-01-migrating-4.md index b66921cf0..c8af6844d 100644 --- a/docs/_docs/06-01-migrating-4.md +++ b/docs/_docs/06-01-migrating-4.md @@ -40,28 +40,6 @@ documents rather than performing a nice table alter. If the process crashes during the migration, simply re-run it. The migration operations are designed to act atomically, and be idempotent to documents already updated. -## Database Verifications - -In `v3.*`, we introduced the concept of "verifying the database". Some of our -operations update cached values that live along side the original document to -improve performance. Running the cli command for verifying the database's cache -ensures that all the cached values are up to date. - -Running the following will start the database verification process: - -```bash -./bin/cli verify db --fix -``` -You can notice the `--fix` option, without it, the tool should instead perform -a dry run of the operations it intends to perform. -{: .code-aside} - -This process, like the migration process, should take some time to complete on -large databases. - -Once you have updated your databases, that's all you have to do! Talk should now -function even better and faster with all the new features we poured into v4.0.0! - ## Template Change In `v4.0.0`, we introduced extensive support for compressing our javascript diff --git a/migrations/1496771633_tags.js b/migrations/1496771633_tags.js index 142f10fc0..a7155e2d1 100644 --- a/migrations/1496771633_tags.js +++ b/migrations/1496771633_tags.js @@ -1,63 +1,38 @@ const CommentModel = require('../models/comment'); -const { processUpdates } = require('./utils'); -module.exports = { - async up({ queryBatchSize, updateBatchSize }) { - // Find all comments that have tags. - const cursor = await CommentModel.collection - .aggregate([ - { - $match: { - tags: { - $exists: true, - $ne: [], - }, - }, - }, - { - $project: { - id: true, - tags: true, - }, - }, - ]) - .batchSize(queryBatchSize); +// OLD +// +// [ +// { +// name: 'OFF_TOPIC', +// assigned_by: '', +// created_at: new Date() +// } +// ] - let updates = []; - while (await cursor.hasNext()) { - let { id, tags } = await cursor.next(); - - // OLD - // - // [ - // { - // name: 'OFF_TOPIC', - // assigned_by: '', - // created_at: new Date() - // } - // ] - - // NEW - // - // [ - // { - // tag: { - // name: 'OFF_TOPIC', - // permissions: { - // public: true, - // self: false, - // roles: [] - // }, - // models: ['COMMENTS'], - // created_at: new Date() - // }, - // assigned_by: '', - // created_at: new Date() - // } - // ] - - // Remap the tag structure. - tags = tags.map(({ name, assigned_by, created_at }) => ({ +// NEW +// +// [ +// { +// tag: { +// name: 'OFF_TOPIC', +// permissions: { +// public: true, +// self: false, +// roles: [] +// }, +// models: ['COMMENTS'], +// created_at: new Date() +// }, +// assigned_by: '', +// created_at: new Date() +// } +// ] +const transformTags = ({ id, tags }) => ({ + query: { id }, + update: { + $set: { + tags: tags.map(({ name, assigned_by, created_at }) => ({ tag: { name, permissions: { @@ -70,25 +45,31 @@ module.exports = { }, assigned_by, created_at, - })); + })), + }, + }, +}); - updates.push({ query: { id }, update: { $set: { tags } } }); +module.exports = { + async up({ transformSingleWithCursor }) { + // Find all comments that have tags. + const cursor = CommentModel.collection.aggregate([ + { + $match: { + tags: { + $exists: true, + $ne: [], + }, + }, + }, + { + $project: { + id: true, + tags: true, + }, + }, + ]); - if (updates.length > updateBatchSize) { - // Process the updates. - await processUpdates(CommentModel, updates); - - // Clear the updates array. - updates = []; - } - } - - if (updates.length > 0) { - // Process the updates. - await processUpdates(CommentModel, updates); - - // Clear the updates array. - updates = []; - } + await transformSingleWithCursor(cursor, transformTags, CommentModel); }, }; diff --git a/migrations/1507310316_flags.js b/migrations/1507310316_flags.js index 3b0277e95..1ff3b4248 100644 --- a/migrations/1507310316_flags.js +++ b/migrations/1507310316_flags.js @@ -1,5 +1,4 @@ const ActionModel = require('../models/action'); -const { processUpdates } = require('./utils'); const mapping = { COMMENTS: { @@ -13,7 +12,7 @@ const mapping = { }; module.exports = { - async up() { + async up({ processManyUpdates }) { const updates = []; for (const item_type in mapping) { const mappings = mapping[item_type]; @@ -45,7 +44,7 @@ module.exports = { } if (updates.length > 0) { - await processUpdates(ActionModel, updates); + await processManyUpdates(ActionModel, updates); } }, }; diff --git a/migrations/1510174676_user_status.js b/migrations/1510174676_user_status.js index c8065bd87..80271557b 100644 --- a/migrations/1510174676_user_status.js +++ b/migrations/1510174676_user_status.js @@ -1,56 +1,132 @@ const UserModel = require('../models/user'); -const { processUpdates } = require('./utils'); const merge = require('lodash/merge'); -module.exports = { - async up({ queryBatchSize, updateBatchSize }) { - const created_at = Date.now(); +const transformUser = user => { + const created_at = Date.now(); - // Get the first batch of users. - let cursor = await UserModel.collection - .find({ + const { id, status, canEditName, suspension, disabled } = user; + + let update = { + $unset: { + canEditName: '', + suspension: '', + disabled: '', + }, + $set: { + status: { + // The username status is specific to each case. + username: { + history: [], + }, + + // The user is not banned by default. + banned: { + status: false, + history: [], + }, + + // The user is not suspended by default. + suspension: { + until: null, + history: [], + }, + }, + updated_at: created_at, + }, + }; + + if (disabled) { + update = merge(update, { + $set: { status: { - $in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'], + banned: { + status: true, + history: [ + { + status: true, + created_at, + }, + ], + }, }, - }) - .batchSize(queryBatchSize); + }, + }); + } - let updates = []; - while (await cursor.hasNext()) { - const user = await cursor.next(); - - const { id, status, canEditName, suspension, disabled } = user; - - let update = { - $unset: { - canEditName: '', - suspension: '', - disabled: '', + // If the user has an "until" property of their suspension, then we need + // to reflect that in the new status object. + if (suspension && suspension.until !== null) { + update = merge(update, { + $set: { + status: { + suspension: { + until: suspension.until, + history: [ + { + until: suspension.until, + created_at, + }, + ], + }, }, - $set: { - status: { - // The username status is specific to each case. - username: { - history: [], - }, + }, + }); + } - // The user is not banned by default. - banned: { - status: false, - history: [], - }, - - // The user is not suspended by default. - suspension: { - until: null, - history: [], + switch (status) { + case 'ACTIVE': + if (canEditName) { + update = merge(update, { + $set: { + status: { + username: { + status: 'UNSET', + history: [ + { + status: 'UNSET', + created_at, + }, + ], + }, }, }, - updated_at: created_at, - }, - }; - - if (disabled) { + }); + } else { + update = merge(update, { + $set: { + status: { + username: { + status: 'SET', + history: [ + { + status: 'SET', + created_at, + }, + ], + }, + }, + }, + }); + } + break; + case 'BANNED': + if (canEditName) { + update = merge(update, { + $set: { + status: { + username: { + status: 'REJECTED', + history: [ + { + status: 'REJECTED', + created_at, + }, + ], + }, + }, + }, + }); + } else { update = merge(update, { $set: { status: { @@ -63,22 +139,11 @@ module.exports = { }, ], }, - }, - }, - }); - } - - // If the user has an "until" property of their suspension, then we need - // to reflect that in the new status object. - if (suspension && suspension.until !== null) { - update = merge(update, { - $set: { - status: { - suspension: { - until: suspension.until, + username: { + status: 'SET', history: [ { - until: suspension.until, + status: 'SET', created_at, }, ], @@ -87,143 +152,57 @@ module.exports = { }, }); } - - switch (status) { - case 'ACTIVE': - if (canEditName) { - update = merge(update, { - $set: { - status: { - username: { - status: 'UNSET', - history: [ - { - status: 'UNSET', - created_at, - }, - ], - }, - }, - }, - }); - } else { - update = merge(update, { - $set: { - status: { - username: { - status: 'SET', - history: [ - { - status: 'SET', - created_at, - }, - ], - }, - }, - }, - }); - } - break; - case 'BANNED': - if (canEditName) { - update = merge(update, { - $set: { - status: { - username: { - status: 'REJECTED', - history: [ - { - status: 'REJECTED', - created_at, - }, - ], - }, - }, - }, - }); - } else { - update = merge(update, { - $set: { - status: { - banned: { - status: true, - history: [ - { - status: true, - created_at, - }, - ], - }, - username: { - status: 'SET', - history: [ - { - status: 'SET', - created_at, - }, - ], - }, - }, - }, - }); - } - break; - case 'PENDING': - update = merge(update, { - $set: { - status: { - username: { + break; + case 'PENDING': + update = merge(update, { + $set: { + status: { + username: { + status: 'CHANGED', + history: [ + { status: 'CHANGED', - history: [ - { - status: 'CHANGED', - created_at, - }, - ], + created_at, }, - }, + ], }, - }); - break; - case 'APPROVED': - update = merge(update, { - $set: { - status: { - username: { + }, + }, + }); + break; + case 'APPROVED': + update = merge(update, { + $set: { + status: { + username: { + status: 'APPROVED', + history: [ + { status: 'APPROVED', - history: [ - { - status: 'APPROVED', - created_at, - }, - ], + created_at, }, - }, + ], }, - }); - break; - default: - throw new Error(`${status} is an invalid status`); - } + }, + }, + }); + break; + default: + throw new Error(`${status} is an invalid status`); + } - updates.push({ query: { id }, update }); + return { query: { id }, update }; +}; - // Process every 1000 users. - if (updates.length > updateBatchSize) { - // Process the updates. - await processUpdates(UserModel, updates); +module.exports = { + async up({ transformSingleWithCursor }) { + // Get the first batch of users. + const cursor = UserModel.collection.find({ + status: { + $in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'], + }, + }); - // Clear the updates array. - updates = []; - } - } - - if (updates.length > 0) { - // Process the updates. - await processUpdates(UserModel, updates); - - // Clear the updates array. - updates = []; - } + await transformSingleWithCursor(cursor, transformUser, UserModel); }, }; diff --git a/migrations/1511801783_user_roles.js b/migrations/1511801783_user_roles.js index 384047fff..961a696e2 100644 --- a/migrations/1511801783_user_roles.js +++ b/migrations/1511801783_user_roles.js @@ -1,5 +1,4 @@ const UserModel = require('../models/user'); -const { processUpdates } = require('./utils'); const findNewRole = roles => { if (roles.includes('ADMIN')) { @@ -14,20 +13,16 @@ const findNewRole = roles => { }; module.exports = { - async up({ queryBatchSize, updateBatchSize }) { - const cursor = await UserModel.collection - .find({ - roles: { - $exists: true, - }, - }) - .batchSize(queryBatchSize); + async up({ transformSingleWithCursor }) { + const cursor = UserModel.collection.find({ + roles: { + $exists: true, + }, + }); - let updates = []; - while (await cursor.hasNext()) { - const user = await cursor.next(); - - updates.push({ + await transformSingleWithCursor( + cursor, + user => ({ query: { id: user.id, }, @@ -41,23 +36,8 @@ module.exports = { roles: '', }, }, - }); - - if (updates.length > updateBatchSize) { - // Process the updates. - await processUpdates(UserModel, updates); - - // Clear the updates array. - updates = []; - } - } - - if (updates.length > 0) { - // Process the updates. - await processUpdates(UserModel, updates); - - // Clear the updates array. - updates = []; - } + }), + UserModel + ); }, }; diff --git a/migrations/1516920154_action_counts.js b/migrations/1516920154_action_counts.js new file mode 100644 index 000000000..5aa95614c --- /dev/null +++ b/migrations/1516920154_action_counts.js @@ -0,0 +1,118 @@ +const ActionModel = require('../models/action'); +const UserModel = require('../models/user'); +const CommentModel = require('../models/comment'); + +module.exports = { + async up({ transformSingleWithCursor }) { + const models = [ + { Model: CommentModel, item_type: 'COMMENTS' }, + { Model: UserModel, item_type: 'USERS' }, + ]; + for (const { Model, item_type } of models) { + let cursor = ActionModel.collection.aggregate([ + { + $match: { + group_id: { $ne: null }, + item_type, + }, + }, + { + $group: { + // group unique documents by these properties, we are leveraging the + // fact that each uuid is completely unique. + _id: { + item_id: '$item_id', + action_type: '$action_type', + group_id: '$group_id', + }, + + // and sum up all actions matching the above grouping criteria + count: { + $sum: 1, + }, + }, + }, + { + $project: { + // suppress the _id field + _id: false, + + // map the fields from the _id grouping down a level + item_id: '$_id.item_id', + action_type: { $toLower: '$_id.action_type' }, + group_id: { $toLower: '$_id.group_id' }, + + // map the field directly + count: '$count', + }, + }, + ]); + + // Transform those documents. + await transformSingleWithCursor( + cursor, + ({ item_id, action_type, group_id, count }) => ({ + query: { id: item_id }, + update: { + $set: { + [`action_counts.${action_type}_${group_id}`]: count, + }, + }, + }), + Model + ); + + // Secondly, we'll collect the group group id's (all the actions for a + // specific action type) to update counts of. + cursor = ActionModel.collection.aggregate([ + { + $match: { + item_type, + }, + }, + { + $group: { + // group unique documents by these properties, we are leveraging the + // fact that each uuid is completely unique. + _id: { + item_id: '$item_id', + action_type: '$action_type', + }, + + // and sum up all actions matching the above grouping criteria + count: { + $sum: 1, + }, + }, + }, + { + $project: { + // suppress the _id field + _id: false, + + // map the fields from the _id grouping down a level + item_id: '$_id.item_id', + action_type: { $toLower: '$_id.action_type' }, + + // map the field directly + count: '$count', + }, + }, + ]); + + // Transform those documents. + await transformSingleWithCursor( + cursor, + ({ item_id, action_type, count }) => ({ + query: { id: item_id }, + update: { + $set: { + [`action_counts.${action_type}`]: count, + }, + }, + }), + Model + ); + } + }, +}; diff --git a/migrations/1516920160_reply_counts.js b/migrations/1516920160_reply_counts.js new file mode 100644 index 000000000..7dd18e8f8 --- /dev/null +++ b/migrations/1516920160_reply_counts.js @@ -0,0 +1,30 @@ +const CommentModel = require('../models/comment'); + +const transformComments = ({ _id: parent_id, reply_count }) => ({ + query: { id: parent_id, reply_count: { $ne: reply_count } }, + update: { $set: { reply_count } }, +}); + +module.exports = { + async up({ transformSingleWithCursor }) { + const cursor = CommentModel.collection.aggregate([ + { + $match: { + parent_id: { $ne: null }, + status: { $in: ['NONE', 'ACCEPTED'] }, + }, + }, + { + $group: { + _id: '$parent_id', + reply_count: { + $sum: 1, + }, + }, + }, + ]); + + // Transform those documents. + await transformSingleWithCursor(cursor, transformComments, CommentModel); + }, +}; diff --git a/migrations/utils.js b/migrations/utils.js deleted file mode 100644 index 3a564ede8..000000000 --- a/migrations/utils.js +++ /dev/null @@ -1,19 +0,0 @@ -/** - * processUpdates processes batches of updates on the given model. - * - * @param {Object} model mongoose model that should perform the operations on - * @param {Array} updates array of updates to execute - */ -const processUpdates = async (model, updates) => { - // Create a new batch operation. - const bulk = model.collection.initializeUnorderedBulkOp(); - - for (const { query, update } of updates) { - bulk.find(query).updateOne(update); - } - - // Execute the bulk update operation. - await bulk.execute(); -}; - -module.exports = { processUpdates }; diff --git a/package.json b/package.json index a0cc594ab..b3afdc9b6 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,7 @@ }, "talk": { "migration": { - "minVersion": 1511801783 + "minVersion": 1516920160 } }, "repository": { diff --git a/services/migration/helpers.js b/services/migration/helpers.js new file mode 100644 index 000000000..38df5c128 --- /dev/null +++ b/services/migration/helpers.js @@ -0,0 +1,78 @@ +/** + * processUpdates processes batches of updates on the given model. + * + * @param {Object} model mongoose model that should perform the operations on + * @param {Array} updates array of updates to execute + */ +const processUpdates = async (model, updates) => { + // Create a new batch operation. + const bulk = model.collection.initializeUnorderedBulkOp(); + + for (const { query, update } of updates) { + bulk.find(query).updateOne(update); + } + + // Execute the bulk update operation. + await bulk.execute(); +}; + +const transformSingleWithCursor = ({ + queryBatchSize, + updateBatchSize, +}) => async (cursor, process, Model) => { + // We'll manage the updates that we store inside this object. + let updates = []; + + // First we'll collect all the individual actions with specific group id's. + cursor = await cursor.batchSize(queryBatchSize); + + while (await cursor.hasNext()) { + const result = await cursor.next(); + + const transformed = await process(result); + if (transformed) { + updates.push(transformed); + } + + if (updates.length > updateBatchSize) { + // Process the updates. + await processUpdates(Model, updates); + + // Clear the updates array. + updates = []; + } + } + + if (updates.length > 0) { + // Process the updates. + await processUpdates(Model, updates); + + // Clear the updates array. + updates = []; + } +}; + +/** + * processManyUpdates processes batches of updates on many models with the given + * model. + * + * @param {Object} model mongoose model that should perform the operations on + * @param {Array} updates array of updates to execute + */ +const processManyUpdates = async (model, updates) => { + // Create a new batch operation. + const bulk = model.collection.initializeUnorderedBulkOp(); + + for (const { query, update } of updates) { + bulk.find(query).update(update); + } + + // Execute the bulk update operation. + await bulk.execute(); +}; + +module.exports = ctx => ({ + processManyUpdates, + processUpdates, + transformSingleWithCursor: transformSingleWithCursor(ctx), +}); diff --git a/services/migration.js b/services/migration/index.js similarity index 92% rename from services/migration.js rename to services/migration/index.js index 38ce5339b..312282666 100644 --- a/services/migration.js +++ b/services/migration/index.js @@ -1,12 +1,13 @@ -const MigrationModel = require('../models/migration'); +const MigrationModel = require('../../models/migration'); const fs = require('fs'); const ms = require('ms'); const path = require('path'); const Joi = require('joi'); const debug = require('debug')('talk:services:migration'); const sc = require('snake-case'); +const helpers = require('./helpers'); const { stripIndent } = require('common-tags'); -const { talk: { migration: { minVersion } } } = require('../package.json'); +const { talk: { migration: { minVersion } } } = require('../../package.json'); const migrationTemplate = stripIndent` module.exports = { @@ -46,7 +47,7 @@ class MigrationService { static async listPending() { // Get all the migration files. let migrationFiles = fs.readdirSync( - path.join(__dirname, '..', 'migrations') + path.join(__dirname, '..', '..', 'migrations') ); // Ensure that all migrations follow this format. @@ -78,7 +79,7 @@ class MigrationService { } // Read the migration from the filesystem. - let migration = require(`../migrations/${filename}`); + let migration = require(`../../migrations/${filename}`); Joi.assert( migration, migrationSchema, @@ -121,11 +122,14 @@ class MigrationService { return; } + // Create the context helpers. + const ctx = helpers({ queryBatchSize, updateBatchSize }); + for (let { filename, version, migration } of migrations) { try { const startTime = new Date(); console.log(`Starting migration ${filename}`); - await migration.up({ queryBatchSize, updateBatchSize }); + await migration.up(ctx); const endTime = new Date(); const totalTime = endTime.getTime() - startTime.getTime(); console.log(`Finished migration ${filename} in ${ms(totalTime)}`); diff --git a/test/server/migrations/1510174676_user_status.js b/test/server/migrations/1510174676_user_status.js index 8dbbf543f..f9af655d8 100644 --- a/test/server/migrations/1510174676_user_status.js +++ b/test/server/migrations/1510174676_user_status.js @@ -1,12 +1,13 @@ const migration = require('../../../migrations/1510174676_user_status'); const UserModel = require('../../../models/user'); +const helpers = require('../../../services/migration/helpers'); const chai = require('chai'); chai.use(require('chai-datetime')); const { expect } = chai; const performMigration = () => - migration.up({ queryBatchSize: 100, updateBatchSize: 100 }); + migration.up(helpers({ queryBatchSize: 100, updateBatchSize: 100 })); describe('migration.1510174676_user_status', () => { describe('active user', () => {