applied migration performance improvements

This commit is contained in:
Wyatt Johnson
2018-01-25 18:17:04 -07:00
parent 75751285fe
commit 6caae2df97
5 changed files with 123 additions and 86 deletions
+2 -3
View File
@@ -28,7 +28,6 @@ async function createMigration(name) {
async function runMigrations(options) {
const { yes, queryBatchSize, updateBatchSize } = options;
console.log({ yes, queryBatchSize, updateBatchSize });
try {
if (!yes) {
const { backedUp } = await inquirer.prompt([
@@ -109,13 +108,13 @@ program
'-q, --query-batch-size <n>',
'change the size of queried documents that are batched at a time',
parse10,
100
10000
)
.option(
'-u, --update-batch-size <n>',
'change the size of documents that are batched before the update is sent',
parse10,
1000
20000
)
.option('-y, --yes', 'will answer yes to all questions')
.description('runs all pending migrations')
+71 -65
View File
@@ -9,44 +9,47 @@ module.exports = {
{ Model: UserModel, item_type: 'USERS' },
];
for (const { Model, item_type } of models) {
let cursor = ActionModel.collection.aggregate([
{
$match: {
group_id: { $ne: null },
item_type,
},
},
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
group_id: '$group_id',
},
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
let cursor = ActionModel.collection.aggregate(
[
{
$match: {
group_id: { $ne: null },
item_type,
},
},
},
{
$project: {
// suppress the _id field
_id: false,
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
group_id: '$group_id',
},
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
group_id: { $toLower: '$_id.group_id' },
// map the field directly
count: '$count',
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
},
},
},
},
]);
{
$project: {
// suppress the _id field
_id: false,
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
group_id: { $toLower: '$_id.group_id' },
// map the field directly
count: '$count',
},
},
],
{ allowDiskUse: true }
);
// Transform those documents.
await transformSingleWithCursor(
@@ -64,41 +67,44 @@ module.exports = {
// Secondly, we'll collect the group group id's (all the actions for a
// specific action type) to update counts of.
cursor = ActionModel.collection.aggregate([
{
$match: {
item_type,
},
},
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
},
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
cursor = ActionModel.collection.aggregate(
[
{
$match: {
item_type,
},
},
},
{
$project: {
// suppress the _id field
_id: false,
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
},
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
// map the field directly
count: '$count',
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
},
},
},
},
]);
{
$project: {
// suppress the _id field
_id: false,
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
// map the field directly
count: '$count',
},
},
],
{ allowDiskUse: true }
);
// Transform those documents.
await transformSingleWithCursor(
+17 -14
View File
@@ -7,22 +7,25 @@ const transformComments = ({ _id: parent_id, reply_count }) => ({
module.exports = {
async up({ transformSingleWithCursor }) {
const cursor = CommentModel.collection.aggregate([
{
$match: {
parent_id: { $ne: null },
status: { $in: ['NONE', 'ACCEPTED'] },
},
},
{
$group: {
_id: '$parent_id',
reply_count: {
$sum: 1,
const cursor = CommentModel.collection.aggregate(
[
{
$match: {
parent_id: { $ne: null },
status: { $in: ['NONE', 'ACCEPTED'] },
},
},
},
]);
{
$group: {
_id: '$parent_id',
reply_count: {
$sum: 1,
},
},
},
],
{ allowDiskUse: true }
);
// Transform those documents.
await transformSingleWithCursor(cursor, transformComments, CommentModel);
+32 -3
View File
@@ -1,3 +1,5 @@
const debug = require('debug')('talk:services:migration');
/**
* processUpdates processes batches of updates on the given model.
*
@@ -16,16 +18,37 @@ const processUpdates = async (model, updates) => {
await bulk.execute();
};
const debugProcessStatistics = (count, totalCount) => {
if (totalCount > 0) {
debug(
`processed ${(count / totalCount * 100).toFixed(
2
)}% (${count}/${totalCount}) update`
);
} else {
debug(`processed ${count} updates`);
}
};
const transformSingleWithCursor = ({
queryBatchSize,
updateBatchSize,
}) => async (cursor, process, Model) => {
}) => async (query, process, Model) => {
debug('starting transform');
// We'll manage the updates that we store inside this object.
let updates = [];
// First we'll collect all the individual actions with specific group id's.
cursor = await cursor.batchSize(queryBatchSize);
// Count the elements in the transformation.
let totalCount = 0;
try {
totalCount = await query.count();
} catch (err) {}
// First we'll collect all the individual actions with specific group id's.
const cursor = await query.batchSize(queryBatchSize);
let count = 0;
while (await cursor.hasNext()) {
const result = await cursor.next();
@@ -37,6 +60,8 @@ const transformSingleWithCursor = ({
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(Model, updates);
count += updates.length;
debugProcessStatistics(count, totalCount);
// Clear the updates array.
updates = [];
@@ -46,10 +71,14 @@ const transformSingleWithCursor = ({
if (updates.length > 0) {
// Process the updates.
await processUpdates(Model, updates);
count += updates.length;
debugProcessStatistics(count, totalCount);
// Clear the updates array.
updates = [];
}
debug('finished transform');
};
/**
+1 -1
View File
@@ -115,7 +115,7 @@ class MigrationService {
*/
static async run(
migrations,
{ queryBatchSize = 100, updateBatchSize = 1000 } = {}
{ queryBatchSize = 10000, updateBatchSize = 20000 } = {}
) {
if (migrations.length === 0) {
console.log('No migrations to run!');