diff --git a/bin/cli-migration b/bin/cli-migration index ca93360ad..24f41ac81 100755 --- a/bin/cli-migration +++ b/bin/cli-migration @@ -5,6 +5,7 @@ */ const util = require('./util'); +const _ = require('lodash'); const program = require('commander'); const inquirer = require('inquirer'); const mongoose = require('../services/mongoose'); @@ -25,46 +26,61 @@ async function createMigration(name) { } } -async function runMigrations() { +async function runMigrations(options) { + const { yes, queryBatchSize, updateBatchSize } = options; + console.log({ yes, queryBatchSize, updateBatchSize }); try { - let { backedUp } = await inquirer.prompt([ - { - type: 'confirm', - name: 'backedUp', - message: 'Did you perform a database backup', - default: false, - }, - ]); + if (!yes) { + const { backedUp } = await inquirer.prompt([ + { + type: 'confirm', + name: 'backedUp', + message: 'Did you perform a database backup', + default: false, + }, + ]); - if (!backedUp) { - throw new Error( - 'Please backup your databases prior to migrations occuring' - ); + if (!backedUp) { + throw new Error( + 'Please backup your databases prior to migrations occuring' + ); + } } // Get the migrations to run. - let migrations = await MigrationService.listPending(); + const migrations = await MigrationService.listPending(); console.log('Now going to run the following migrations:\n'); - for (let { filename } of migrations) { + for (const { filename } of migrations) { console.log(`\tmigrations/${filename}`); } - let { confirm } = await inquirer.prompt([ - { - type: 'confirm', - name: 'confirm', - message: 'Proceed with migrations', - default: false, - }, - ]); + if (!yes) { + const { confirm } = await inquirer.prompt([ + { + type: 'confirm', + name: 'confirm', + message: 'Proceed with migrations', + default: false, + }, + ]); - if (confirm) { - // Run the migrations. - await MigrationService.run(migrations); + if (confirm) { + // Run the migrations. + await MigrationService.run(migrations, { + queryBatchSize, + updateBatchSize, + }); + } else { + console.warn('Skipping migrations'); + } } else { - console.warn('Skipping migrations'); + // Run the migrations. + await MigrationService.run(migrations, { + queryBatchSize, + updateBatchSize, + }); } util.shutdown(); @@ -83,8 +99,25 @@ program .description('creates a new migration') .action(createMigration); +// Bypasses issue that defaults + coercion doesn't work well together. +// Ref: https://github.com/tj/commander.js/issues/400#issuecomment-310860869 +const parse10 = _.ary(_.partialRight(parseInt, 10), 1); + program .command('run') + .option( + '-q, --query-batch-size ', + 'will answer yes to all questions', + parse10, + 100 + ) + .option( + '-u, --update-batch-size ', + 'will answer yes to all questions', + parse10, + 1000 + ) + .option('-y, --yes', 'will answer yes to all questions') .description('runs all pending migrations') .action(runMigrations); diff --git a/migrations/1496771633_tags.js b/migrations/1496771633_tags.js index b21ff671e..142f10fc0 100644 --- a/migrations/1496771633_tags.js +++ b/migrations/1496771633_tags.js @@ -1,34 +1,32 @@ const CommentModel = require('../models/comment'); +const { processUpdates } = require('./utils'); module.exports = { - async up() { + async up({ queryBatchSize, updateBatchSize }) { // Find all comments that have tags. - let comments = await CommentModel.aggregate([ - { - $match: { - tags: { - $exists: true, - $ne: [], + const cursor = await CommentModel.collection + .aggregate([ + { + $match: { + tags: { + $exists: true, + $ne: [], + }, }, }, - }, - { - $project: { - id: true, - tags: true, + { + $project: { + id: true, + tags: true, + }, }, - }, - ]); + ]) + .batchSize(queryBatchSize); - // If no comments were found, nothing needs to be done! - if (comments.length <= 0) { - return; - } + let updates = []; + while (await cursor.hasNext()) { + let { id, tags } = await cursor.next(); - const updates = []; - - // Loop over the comments retrieved, updating the tag structure. - for (let { id, tags } of comments) { // OLD // // [ @@ -75,19 +73,22 @@ module.exports = { })); updates.push({ query: { id }, update: { $set: { tags } } }); + + if (updates.length > updateBatchSize) { + // Process the updates. + await processUpdates(CommentModel, updates); + + // Clear the updates array. + updates = []; + } } if (updates.length > 0) { - // Create a new batch operation. - let batch = CommentModel.collection.initializeUnorderedBulkOp(); + // Process the updates. + await processUpdates(CommentModel, updates); - for (const { query, update } of updates) { - // Execute the batch operation. - batch.find(query).updateOne(update); - } - - // Execute the batch update operation. - await batch.execute(); + // Clear the updates array. + updates = []; } }, }; diff --git a/migrations/1507310316_flags.js b/migrations/1507310316_flags.js index d29a2c25b..3b0277e95 100644 --- a/migrations/1507310316_flags.js +++ b/migrations/1507310316_flags.js @@ -1,4 +1,5 @@ const ActionModel = require('../models/action'); +const { processUpdates } = require('./utils'); const mapping = { COMMENTS: { @@ -44,15 +45,7 @@ module.exports = { } if (updates.length > 0) { - // Setup the batch operation. - const batch = ActionModel.collection.initializeUnorderedBulkOp(); - - for (const { query, update } of updates) { - batch.find(query).update(update); - } - - // Execute the batch update operation. - await batch.execute(); + await processUpdates(ActionModel, updates); } }, }; diff --git a/migrations/1510174676_user_status.js b/migrations/1510174676_user_status.js index 6ddd1e4b4..c8065bd87 100644 --- a/migrations/1510174676_user_status.js +++ b/migrations/1510174676_user_status.js @@ -1,35 +1,19 @@ const UserModel = require('../models/user'); +const { processUpdates } = require('./utils'); const merge = require('lodash/merge'); -const getUserBatch = async () => { - let query = { - status: { - $in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'], - }, - }; - - // Find all the users that need migrating. - return UserModel.collection.find(query).batchSize(100); -}; - -const processUpdates = async updates => { - // Create a new batch operation. - let bulk = UserModel.collection.initializeUnorderedBulkOp(); - - for (const { query, update } of updates) { - bulk.find(query).updateOne(update); - } - - // Execute the bulk update operation. - await bulk.execute(); -}; - module.exports = { - async up() { + async up({ queryBatchSize, updateBatchSize }) { const created_at = Date.now(); // Get the first batch of users. - let cursor = await getUserBatch(); + let cursor = await UserModel.collection + .find({ + status: { + $in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'], + }, + }) + .batchSize(queryBatchSize); let updates = []; while (await cursor.hasNext()) { @@ -225,9 +209,9 @@ module.exports = { updates.push({ query: { id }, update }); // Process every 1000 users. - if (updates.length > 1000) { + if (updates.length > updateBatchSize) { // Process the updates. - await processUpdates(updates); + await processUpdates(UserModel, updates); // Clear the updates array. updates = []; @@ -236,7 +220,7 @@ module.exports = { if (updates.length > 0) { // Process the updates. - await processUpdates(updates); + await processUpdates(UserModel, updates); // Clear the updates array. updates = []; diff --git a/migrations/1511801783_user_roles.js b/migrations/1511801783_user_roles.js index 2341463a5..384047fff 100644 --- a/migrations/1511801783_user_roles.js +++ b/migrations/1511801783_user_roles.js @@ -1,4 +1,5 @@ const UserModel = require('../models/user'); +const { processUpdates } = require('./utils'); const findNewRole = roles => { if (roles.includes('ADMIN')) { @@ -12,27 +13,15 @@ const findNewRole = roles => { return 'COMMENTER'; }; -const processUpdates = async updates => { - // Create a new batch operation. - const bulk = UserModel.collection.initializeUnorderedBulkOp(); - - for (const { query, update } of updates) { - bulk.find(query).updateOne(update); - } - - // Execute the bulk update operation. - await bulk.execute(); -}; - module.exports = { - async up() { + async up({ queryBatchSize, updateBatchSize }) { const cursor = await UserModel.collection .find({ roles: { $exists: true, }, }) - .batchSize(100); + .batchSize(queryBatchSize); let updates = []; while (await cursor.hasNext()) { @@ -54,9 +43,9 @@ module.exports = { }, }); - if (updates.length > 1000) { + if (updates.length > updateBatchSize) { // Process the updates. - await processUpdates(updates); + await processUpdates(UserModel, updates); // Clear the updates array. updates = []; @@ -65,7 +54,7 @@ module.exports = { if (updates.length > 0) { // Process the updates. - await processUpdates(updates); + await processUpdates(UserModel, updates); // Clear the updates array. updates = []; diff --git a/migrations/utils.js b/migrations/utils.js new file mode 100644 index 000000000..3a564ede8 --- /dev/null +++ b/migrations/utils.js @@ -0,0 +1,19 @@ +/** + * processUpdates processes batches of updates on the given model. + * + * @param {Object} model mongoose model that should perform the operations on + * @param {Array} updates array of updates to execute + */ +const processUpdates = async (model, updates) => { + // Create a new batch operation. + const bulk = model.collection.initializeUnorderedBulkOp(); + + for (const { query, update } of updates) { + bulk.find(query).updateOne(update); + } + + // Execute the bulk update operation. + await bulk.execute(); +}; + +module.exports = { processUpdates }; diff --git a/services/migration.js b/services/migration.js index f8f09d469..38ce5339b 100644 --- a/services/migration.js +++ b/services/migration.js @@ -1,17 +1,19 @@ const MigrationModel = require('../models/migration'); const fs = require('fs'); +const ms = require('ms'); const path = require('path'); const Joi = require('joi'); const debug = require('debug')('talk:services:migration'); const sc = require('snake-case'); +const { stripIndent } = require('common-tags'); const { talk: { migration: { minVersion } } } = require('../package.json'); -const migrationTemplate = `module.exports = { - async up() { - - } -}; +const migrationTemplate = stripIndent` + module.exports = { + async up({ queryBatchSize, updateBatchSize }) { + } + }; `; class MigrationService { @@ -61,6 +63,7 @@ class MigrationService { // Parse the migrations from the file listing. let migrations = migrationFiles + .filter(filename => versionRe.test(filename)) .map(filename => { // Parse the version from the filename. let matches = filename.match(versionRe); @@ -109,7 +112,10 @@ class MigrationService { * * @param {Array} migrations a list of migrations returned by `listPending` */ - static async run(migrations) { + static async run( + migrations, + { queryBatchSize = 100, updateBatchSize = 1000 } = {} + ) { if (migrations.length === 0) { console.log('No migrations to run!'); return; @@ -117,9 +123,12 @@ class MigrationService { for (let { filename, version, migration } of migrations) { try { + const startTime = new Date(); console.log(`Starting migration ${filename}`); - await migration.up(); - console.log(`Finished migration ${filename}`); + await migration.up({ queryBatchSize, updateBatchSize }); + const endTime = new Date(); + const totalTime = endTime.getTime() - startTime.getTime(); + console.log(`Finished migration ${filename} in ${ms(totalTime)}`); } catch (e) { console.error(`Migration ${filename} failed`); throw e; diff --git a/test/server/migrations/1510174676_user_status.js b/test/server/migrations/1510174676_user_status.js index 8e1ceca5d..8dbbf543f 100644 --- a/test/server/migrations/1510174676_user_status.js +++ b/test/server/migrations/1510174676_user_status.js @@ -5,6 +5,9 @@ const chai = require('chai'); chai.use(require('chai-datetime')); const { expect } = chai; +const performMigration = () => + migration.up({ queryBatchSize: 100, updateBatchSize: 100 }); + describe('migration.1510174676_user_status', () => { describe('active user', () => { beforeEach(async () => { @@ -24,7 +27,7 @@ describe('migration.1510174676_user_status', () => { expect(user).to.have.property('canEditName', false); // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' }); @@ -54,7 +57,7 @@ describe('migration.1510174676_user_status', () => { expect(user).to.have.property('canEditName', true); // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' }); @@ -85,7 +88,7 @@ describe('migration.1510174676_user_status', () => { expect(user.canEditName).to.equal(true); // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' }); @@ -117,7 +120,7 @@ describe('migration.1510174676_user_status', () => { expect(user.canEditName).to.equal(false); // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' }); @@ -153,7 +156,7 @@ describe('migration.1510174676_user_status', () => { const until = user.suspension.until; // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' }); @@ -187,7 +190,7 @@ describe('migration.1510174676_user_status', () => { expect(user.status).to.equal('BANNED'); // Perform the migration. - await migration.up(); + await performMigration(); user = await UserModel.collection.findOne({ id: '123' });