expanded on migration support

This commit is contained in:
Wyatt Johnson
2018-01-25 13:07:29 -07:00
parent ef7693afa5
commit e6f796e99c
8 changed files with 157 additions and 126 deletions
+60 -27
View File
@@ -5,6 +5,7 @@
*/
const util = require('./util');
const _ = require('lodash');
const program = require('commander');
const inquirer = require('inquirer');
const mongoose = require('../services/mongoose');
@@ -25,46 +26,61 @@ async function createMigration(name) {
}
}
async function runMigrations() {
async function runMigrations(options) {
const { yes, queryBatchSize, updateBatchSize } = options;
console.log({ yes, queryBatchSize, updateBatchSize });
try {
let { backedUp } = await inquirer.prompt([
{
type: 'confirm',
name: 'backedUp',
message: 'Did you perform a database backup',
default: false,
},
]);
if (!yes) {
const { backedUp } = await inquirer.prompt([
{
type: 'confirm',
name: 'backedUp',
message: 'Did you perform a database backup',
default: false,
},
]);
if (!backedUp) {
throw new Error(
'Please backup your databases prior to migrations occuring'
);
if (!backedUp) {
throw new Error(
'Please backup your databases prior to migrations occuring'
);
}
}
// Get the migrations to run.
let migrations = await MigrationService.listPending();
const migrations = await MigrationService.listPending();
console.log('Now going to run the following migrations:\n');
for (let { filename } of migrations) {
for (const { filename } of migrations) {
console.log(`\tmigrations/${filename}`);
}
let { confirm } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirm',
message: 'Proceed with migrations',
default: false,
},
]);
if (!yes) {
const { confirm } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirm',
message: 'Proceed with migrations',
default: false,
},
]);
if (confirm) {
// Run the migrations.
await MigrationService.run(migrations);
if (confirm) {
// Run the migrations.
await MigrationService.run(migrations, {
queryBatchSize,
updateBatchSize,
});
} else {
console.warn('Skipping migrations');
}
} else {
console.warn('Skipping migrations');
// Run the migrations.
await MigrationService.run(migrations, {
queryBatchSize,
updateBatchSize,
});
}
util.shutdown();
@@ -83,8 +99,25 @@ program
.description('creates a new migration')
.action(createMigration);
// Bypasses issue that defaults + coercion doesn't work well together.
// Ref: https://github.com/tj/commander.js/issues/400#issuecomment-310860869
const parse10 = _.ary(_.partialRight(parseInt, 10), 1);
program
.command('run')
.option(
'-q, --query-batch-size <n>',
'will answer yes to all questions',
parse10,
100
)
.option(
'-u, --update-batch-size <n>',
'will answer yes to all questions',
parse10,
1000
)
.option('-y, --yes', 'will answer yes to all questions')
.description('runs all pending migrations')
.action(runMigrations);
+32 -31
View File
@@ -1,34 +1,32 @@
const CommentModel = require('../models/comment');
const { processUpdates } = require('./utils');
module.exports = {
async up() {
async up({ queryBatchSize, updateBatchSize }) {
// Find all comments that have tags.
let comments = await CommentModel.aggregate([
{
$match: {
tags: {
$exists: true,
$ne: [],
const cursor = await CommentModel.collection
.aggregate([
{
$match: {
tags: {
$exists: true,
$ne: [],
},
},
},
},
{
$project: {
id: true,
tags: true,
{
$project: {
id: true,
tags: true,
},
},
},
]);
])
.batchSize(queryBatchSize);
// If no comments were found, nothing needs to be done!
if (comments.length <= 0) {
return;
}
let updates = [];
while (await cursor.hasNext()) {
let { id, tags } = await cursor.next();
const updates = [];
// Loop over the comments retrieved, updating the tag structure.
for (let { id, tags } of comments) {
// OLD
//
// [
@@ -75,19 +73,22 @@ module.exports = {
}));
updates.push({ query: { id }, update: { $set: { tags } } });
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(CommentModel, updates);
// Clear the updates array.
updates = [];
}
}
if (updates.length > 0) {
// Create a new batch operation.
let batch = CommentModel.collection.initializeUnorderedBulkOp();
// Process the updates.
await processUpdates(CommentModel, updates);
for (const { query, update } of updates) {
// Execute the batch operation.
batch.find(query).updateOne(update);
}
// Execute the batch update operation.
await batch.execute();
// Clear the updates array.
updates = [];
}
},
};
+2 -9
View File
@@ -1,4 +1,5 @@
const ActionModel = require('../models/action');
const { processUpdates } = require('./utils');
const mapping = {
COMMENTS: {
@@ -44,15 +45,7 @@ module.exports = {
}
if (updates.length > 0) {
// Setup the batch operation.
const batch = ActionModel.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
batch.find(query).update(update);
}
// Execute the batch update operation.
await batch.execute();
await processUpdates(ActionModel, updates);
}
},
};
+12 -28
View File
@@ -1,35 +1,19 @@
const UserModel = require('../models/user');
const { processUpdates } = require('./utils');
const merge = require('lodash/merge');
const getUserBatch = async () => {
let query = {
status: {
$in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'],
},
};
// Find all the users that need migrating.
return UserModel.collection.find(query).batchSize(100);
};
const processUpdates = async updates => {
// Create a new batch operation.
let bulk = UserModel.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).updateOne(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
module.exports = {
async up() {
async up({ queryBatchSize, updateBatchSize }) {
const created_at = Date.now();
// Get the first batch of users.
let cursor = await getUserBatch();
let cursor = await UserModel.collection
.find({
status: {
$in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'],
},
})
.batchSize(queryBatchSize);
let updates = [];
while (await cursor.hasNext()) {
@@ -225,9 +209,9 @@ module.exports = {
updates.push({ query: { id }, update });
// Process every 1000 users.
if (updates.length > 1000) {
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(updates);
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
@@ -236,7 +220,7 @@ module.exports = {
if (updates.length > 0) {
// Process the updates.
await processUpdates(updates);
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
+6 -17
View File
@@ -1,4 +1,5 @@
const UserModel = require('../models/user');
const { processUpdates } = require('./utils');
const findNewRole = roles => {
if (roles.includes('ADMIN')) {
@@ -12,27 +13,15 @@ const findNewRole = roles => {
return 'COMMENTER';
};
const processUpdates = async updates => {
// Create a new batch operation.
const bulk = UserModel.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).updateOne(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
module.exports = {
async up() {
async up({ queryBatchSize, updateBatchSize }) {
const cursor = await UserModel.collection
.find({
roles: {
$exists: true,
},
})
.batchSize(100);
.batchSize(queryBatchSize);
let updates = [];
while (await cursor.hasNext()) {
@@ -54,9 +43,9 @@ module.exports = {
},
});
if (updates.length > 1000) {
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(updates);
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
@@ -65,7 +54,7 @@ module.exports = {
if (updates.length > 0) {
// Process the updates.
await processUpdates(updates);
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
+19
View File
@@ -0,0 +1,19 @@
/**
* processUpdates processes batches of updates on the given model.
*
* @param {Object} model mongoose model that should perform the operations on
* @param {Array<Object>} updates array of updates to execute
*/
const processUpdates = async (model, updates) => {
// Create a new batch operation.
const bulk = model.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).updateOne(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
module.exports = { processUpdates };
+17 -8
View File
@@ -1,17 +1,19 @@
const MigrationModel = require('../models/migration');
const fs = require('fs');
const ms = require('ms');
const path = require('path');
const Joi = require('joi');
const debug = require('debug')('talk:services:migration');
const sc = require('snake-case');
const { stripIndent } = require('common-tags');
const { talk: { migration: { minVersion } } } = require('../package.json');
const migrationTemplate = `module.exports = {
async up() {
}
};
const migrationTemplate = stripIndent`
module.exports = {
async up({ queryBatchSize, updateBatchSize }) {
}
};
`;
class MigrationService {
@@ -61,6 +63,7 @@ class MigrationService {
// Parse the migrations from the file listing.
let migrations = migrationFiles
.filter(filename => versionRe.test(filename))
.map(filename => {
// Parse the version from the filename.
let matches = filename.match(versionRe);
@@ -109,7 +112,10 @@ class MigrationService {
*
* @param {Array} migrations a list of migrations returned by `listPending`
*/
static async run(migrations) {
static async run(
migrations,
{ queryBatchSize = 100, updateBatchSize = 1000 } = {}
) {
if (migrations.length === 0) {
console.log('No migrations to run!');
return;
@@ -117,9 +123,12 @@ class MigrationService {
for (let { filename, version, migration } of migrations) {
try {
const startTime = new Date();
console.log(`Starting migration ${filename}`);
await migration.up();
console.log(`Finished migration ${filename}`);
await migration.up({ queryBatchSize, updateBatchSize });
const endTime = new Date();
const totalTime = endTime.getTime() - startTime.getTime();
console.log(`Finished migration ${filename} in ${ms(totalTime)}`);
} catch (e) {
console.error(`Migration ${filename} failed`);
throw e;
@@ -5,6 +5,9 @@ const chai = require('chai');
chai.use(require('chai-datetime'));
const { expect } = chai;
const performMigration = () =>
migration.up({ queryBatchSize: 100, updateBatchSize: 100 });
describe('migration.1510174676_user_status', () => {
describe('active user', () => {
beforeEach(async () => {
@@ -24,7 +27,7 @@ describe('migration.1510174676_user_status', () => {
expect(user).to.have.property('canEditName', false);
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });
@@ -54,7 +57,7 @@ describe('migration.1510174676_user_status', () => {
expect(user).to.have.property('canEditName', true);
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });
@@ -85,7 +88,7 @@ describe('migration.1510174676_user_status', () => {
expect(user.canEditName).to.equal(true);
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });
@@ -117,7 +120,7 @@ describe('migration.1510174676_user_status', () => {
expect(user.canEditName).to.equal(false);
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });
@@ -153,7 +156,7 @@ describe('migration.1510174676_user_status', () => {
const until = user.suspension.until;
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });
@@ -187,7 +190,7 @@ describe('migration.1510174676_user_status', () => {
expect(user.status).to.equal('BANNED');
// Perform the migration.
await migration.up();
await performMigration();
user = await UserModel.collection.findOne({ id: '123' });