From 6caae2df9734f46528d34d616064548ee7673a5a Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Thu, 25 Jan 2018 18:17:04 -0700 Subject: [PATCH] applied migration performance improvements --- bin/cli-migration | 5 +- migrations/1516920154_action_counts.js | 136 +++++++++++++------------ migrations/1516920160_reply_counts.js | 31 +++--- services/migration/helpers.js | 35 ++++++- services/migration/index.js | 2 +- 5 files changed, 123 insertions(+), 86 deletions(-) diff --git a/bin/cli-migration b/bin/cli-migration index 32c0cb4dd..63e16f97f 100755 --- a/bin/cli-migration +++ b/bin/cli-migration @@ -28,7 +28,6 @@ async function createMigration(name) { async function runMigrations(options) { const { yes, queryBatchSize, updateBatchSize } = options; - console.log({ yes, queryBatchSize, updateBatchSize }); try { if (!yes) { const { backedUp } = await inquirer.prompt([ @@ -109,13 +108,13 @@ program '-q, --query-batch-size ', 'change the size of queried documents that are batched at a time', parse10, - 100 + 10000 ) .option( '-u, --update-batch-size ', 'change the size of documents that are batched before the update is sent', parse10, - 1000 + 20000 ) .option('-y, --yes', 'will answer yes to all questions') .description('runs all pending migrations') diff --git a/migrations/1516920154_action_counts.js b/migrations/1516920154_action_counts.js index 5aa95614c..7558b5eb0 100644 --- a/migrations/1516920154_action_counts.js +++ b/migrations/1516920154_action_counts.js @@ -9,44 +9,47 @@ module.exports = { { Model: UserModel, item_type: 'USERS' }, ]; for (const { Model, item_type } of models) { - let cursor = ActionModel.collection.aggregate([ - { - $match: { - group_id: { $ne: null }, - item_type, - }, - }, - { - $group: { - // group unique documents by these properties, we are leveraging the - // fact that each uuid is completely unique. - _id: { - item_id: '$item_id', - action_type: '$action_type', - group_id: '$group_id', - }, - - // and sum up all actions matching the above grouping criteria - count: { - $sum: 1, + let cursor = ActionModel.collection.aggregate( + [ + { + $match: { + group_id: { $ne: null }, + item_type, }, }, - }, - { - $project: { - // suppress the _id field - _id: false, + { + $group: { + // group unique documents by these properties, we are leveraging the + // fact that each uuid is completely unique. + _id: { + item_id: '$item_id', + action_type: '$action_type', + group_id: '$group_id', + }, - // map the fields from the _id grouping down a level - item_id: '$_id.item_id', - action_type: { $toLower: '$_id.action_type' }, - group_id: { $toLower: '$_id.group_id' }, - - // map the field directly - count: '$count', + // and sum up all actions matching the above grouping criteria + count: { + $sum: 1, + }, + }, }, - }, - ]); + { + $project: { + // suppress the _id field + _id: false, + + // map the fields from the _id grouping down a level + item_id: '$_id.item_id', + action_type: { $toLower: '$_id.action_type' }, + group_id: { $toLower: '$_id.group_id' }, + + // map the field directly + count: '$count', + }, + }, + ], + { allowDiskUse: true } + ); // Transform those documents. await transformSingleWithCursor( @@ -64,41 +67,44 @@ module.exports = { // Secondly, we'll collect the group group id's (all the actions for a // specific action type) to update counts of. - cursor = ActionModel.collection.aggregate([ - { - $match: { - item_type, - }, - }, - { - $group: { - // group unique documents by these properties, we are leveraging the - // fact that each uuid is completely unique. - _id: { - item_id: '$item_id', - action_type: '$action_type', - }, - - // and sum up all actions matching the above grouping criteria - count: { - $sum: 1, + cursor = ActionModel.collection.aggregate( + [ + { + $match: { + item_type, }, }, - }, - { - $project: { - // suppress the _id field - _id: false, + { + $group: { + // group unique documents by these properties, we are leveraging the + // fact that each uuid is completely unique. + _id: { + item_id: '$item_id', + action_type: '$action_type', + }, - // map the fields from the _id grouping down a level - item_id: '$_id.item_id', - action_type: { $toLower: '$_id.action_type' }, - - // map the field directly - count: '$count', + // and sum up all actions matching the above grouping criteria + count: { + $sum: 1, + }, + }, }, - }, - ]); + { + $project: { + // suppress the _id field + _id: false, + + // map the fields from the _id grouping down a level + item_id: '$_id.item_id', + action_type: { $toLower: '$_id.action_type' }, + + // map the field directly + count: '$count', + }, + }, + ], + { allowDiskUse: true } + ); // Transform those documents. await transformSingleWithCursor( diff --git a/migrations/1516920160_reply_counts.js b/migrations/1516920160_reply_counts.js index 7dd18e8f8..fc81bd912 100644 --- a/migrations/1516920160_reply_counts.js +++ b/migrations/1516920160_reply_counts.js @@ -7,22 +7,25 @@ const transformComments = ({ _id: parent_id, reply_count }) => ({ module.exports = { async up({ transformSingleWithCursor }) { - const cursor = CommentModel.collection.aggregate([ - { - $match: { - parent_id: { $ne: null }, - status: { $in: ['NONE', 'ACCEPTED'] }, - }, - }, - { - $group: { - _id: '$parent_id', - reply_count: { - $sum: 1, + const cursor = CommentModel.collection.aggregate( + [ + { + $match: { + parent_id: { $ne: null }, + status: { $in: ['NONE', 'ACCEPTED'] }, }, }, - }, - ]); + { + $group: { + _id: '$parent_id', + reply_count: { + $sum: 1, + }, + }, + }, + ], + { allowDiskUse: true } + ); // Transform those documents. await transformSingleWithCursor(cursor, transformComments, CommentModel); diff --git a/services/migration/helpers.js b/services/migration/helpers.js index 38df5c128..f2d959ca4 100644 --- a/services/migration/helpers.js +++ b/services/migration/helpers.js @@ -1,3 +1,5 @@ +const debug = require('debug')('talk:services:migration'); + /** * processUpdates processes batches of updates on the given model. * @@ -16,16 +18,37 @@ const processUpdates = async (model, updates) => { await bulk.execute(); }; +const debugProcessStatistics = (count, totalCount) => { + if (totalCount > 0) { + debug( + `processed ${(count / totalCount * 100).toFixed( + 2 + )}% (${count}/${totalCount}) update` + ); + } else { + debug(`processed ${count} updates`); + } +}; + const transformSingleWithCursor = ({ queryBatchSize, updateBatchSize, -}) => async (cursor, process, Model) => { +}) => async (query, process, Model) => { + debug('starting transform'); + // We'll manage the updates that we store inside this object. let updates = []; - // First we'll collect all the individual actions with specific group id's. - cursor = await cursor.batchSize(queryBatchSize); + // Count the elements in the transformation. + let totalCount = 0; + try { + totalCount = await query.count(); + } catch (err) {} + // First we'll collect all the individual actions with specific group id's. + const cursor = await query.batchSize(queryBatchSize); + + let count = 0; while (await cursor.hasNext()) { const result = await cursor.next(); @@ -37,6 +60,8 @@ const transformSingleWithCursor = ({ if (updates.length > updateBatchSize) { // Process the updates. await processUpdates(Model, updates); + count += updates.length; + debugProcessStatistics(count, totalCount); // Clear the updates array. updates = []; @@ -46,10 +71,14 @@ const transformSingleWithCursor = ({ if (updates.length > 0) { // Process the updates. await processUpdates(Model, updates); + count += updates.length; + debugProcessStatistics(count, totalCount); // Clear the updates array. updates = []; } + + debug('finished transform'); }; /** diff --git a/services/migration/index.js b/services/migration/index.js index 312282666..e0da3cddf 100644 --- a/services/migration/index.js +++ b/services/migration/index.js @@ -115,7 +115,7 @@ class MigrationService { */ static async run( migrations, - { queryBatchSize = 100, updateBatchSize = 1000 } = {} + { queryBatchSize = 10000, updateBatchSize = 20000 } = {} ) { if (migrations.length === 0) { console.log('No migrations to run!');