migration rewrite and removed verifications

This commit is contained in:
Wyatt Johnson
2018-01-25 16:12:26 -07:00
parent e8f73ddb87
commit 04390c5acd
17 changed files with 472 additions and 746 deletions
-1
View File
@@ -18,7 +18,6 @@ program
.command('token', 'work with the access tokens')
.command('users', 'work with the application auth')
.command('migration', 'provides utilities for migrating the database')
.command('verify', 'provides utilities for performing data verification')
.command(
'plugins',
'provides utilities for interacting with the plugin system'
-58
View File
@@ -1,58 +0,0 @@
#!/usr/bin/env node
/**
* Module dependencies.
*/
const util = require('./util');
const program = require('commander');
const mongoose = require('../services/mongoose');
const databaseVerifications = require('./verifications/database');
// Register the shutdown criteria.
util.onshutdown([() => mongoose.disconnect()]);
async function database({ fix = false, limit = Infinity, batch = 1000 }) {
try {
for (const verification of databaseVerifications) {
await verification({ fix, limit, batch });
}
} catch (err) {
console.error(
`Failed to process all the ${databaseVerifications.length} verifications`,
err
);
util.shutdown(1);
return;
}
util.shutdown();
}
//==============================================================================
// Setting up the program command line arguments.
//==============================================================================
program
.command('db')
.description('verifies the database integrity')
.option('-f, --fix', 'fix the problems found with database inconsistencies')
.option(
'-l, --limit [size]',
'limit the amount of documents to process in a single pass, this will ensure only a maximum number of batch operations are issued [default: inf]',
parseInt
)
.option(
'-b, --batch [size]',
'batch size to process verifications and repairs of documents [default: 1000]',
parseInt
)
.action(database);
program.parse(process.argv);
// If there is no command listed, output help.
if (!process.argv.slice(2).length) {
program.outputHelp();
util.shutdown();
}
-194
View File
@@ -1,194 +0,0 @@
const UserModel = require('../../../models/user');
const CommentModel = require('../../../models/comment');
const ActionsService = require('../../../services/actions');
const { arrayJoinBy } = require('../../../graph/loaders/util');
const { get } = require('lodash');
const debug = require('debug')('talk:cli:verify');
const MODELS = [UserModel, CommentModel];
async function processBatch(Model, documents) {
// Get an array of all the document id's.
const documentIDs = documents.map(({ id }) => id);
// Store all the operations on this batch in this array that we'll return
// later.
const operations = [];
// Get the action summaries for this batch.
const totalActionSummaries = await ActionsService.getActionSummaries(
documentIDs
).then(arrayJoinBy(documentIDs, 'item_id'));
// Iterate over the documents.
for (let i = 0; i < documents.length; i++) {
const document = documents[i];
const actionSummaries = totalActionSummaries[i];
let ops = [];
for (const actionSummary of actionSummaries) {
if (actionSummary.group_id === null) {
continue;
}
// And we generate the group id.
const ACTION_TYPE = actionSummary.action_type.toLowerCase();
const GROUP_ID = actionSummary.group_id.toLowerCase();
if (GROUP_ID.length <= 0) {
continue;
}
// And we add a new batch operation if the action summary is associated
// with a group.
const ACTION_COUNT_FIELD = `${ACTION_TYPE}_${GROUP_ID}`;
// Check that the action summaries match the cached counts.
if (
get(document, ['action_counts', ACTION_COUNT_FIELD]) !==
actionSummary.count
) {
// Batch updates for those changes.
ops.push({
[`action_counts.${ACTION_COUNT_FIELD}`]: actionSummary.count,
});
}
}
// Group all the action summaries together from all the different group
// ids.
const groupedActionSummaries = actionSummaries.reduce(
(acc, actionSummary) => {
// action_type is already snake cased (as it would have had to be when it
// was inserted in the database).
const ACTION_TYPE = actionSummary.action_type.toLowerCase();
if (!(ACTION_TYPE in acc)) {
acc[ACTION_TYPE] = 0;
}
acc[ACTION_TYPE] += actionSummary.count;
return acc;
},
{}
);
for (const ACTION_COUNT_FIELD of Object.keys(groupedActionSummaries)) {
const count = groupedActionSummaries[ACTION_COUNT_FIELD];
// Check that the action summaries match the cached counts.
if (get(document, ['action_counts', ACTION_COUNT_FIELD]) !== count) {
// Batch updates for those changes.
ops.push({
[`action_counts.${ACTION_COUNT_FIELD}`]: count,
});
}
}
// If this comment has action summaries that should be updated, then
// perform an update!
if (ops.length > 0) {
operations.push({
updateOne: {
filter: {
id: document.id,
},
update: {
$set: Object.assign({}, ...ops),
},
},
});
}
}
return operations;
}
module.exports = async ({ fix, batch }) => {
for (const Model of MODELS) {
const cursor = Model.collection
.find({})
.project({
id: 1,
action_counts: 1,
})
.sort({ created_at: 1 });
let operations = [];
let documents = [];
// While there are documents to process.
while (await cursor.hasNext()) {
// Load the document.
const document = await cursor.next();
// Push the document into the documents array.
documents.push(document);
// Check to see if the length of the documents array requires us to
// process it.
if (documents.length > batch) {
// Process this batch.
let batchOperations = await processBatch(Model, documents);
// Push the batch operations into the model operations.
operations.push(...batchOperations);
// Clear this batch contents.
documents = [];
}
}
// Check to see if there are any documents left over.
if (documents.length > 0) {
// Process this batch.
let batchOperations = await processBatch(Model, documents);
// Push the batch operations into the model operations.
operations.push(...batchOperations);
}
const OPERATIONS_LENGTH = operations.length;
console.log(
`action_counts.js: ${OPERATIONS_LENGTH} ${
Model.collection.name
} need their action counts fixed.`
);
// If fix was enabled, execute the batch writes.
if (OPERATIONS_LENGTH > 0) {
if (fix) {
debug(
`action_counts.js: fixing ${OPERATIONS_LENGTH} ${
Model.collection.name
}...`
);
while (operations.length) {
let result = await Model.collection.bulkWrite(
operations.splice(0, batch)
);
debug(
`action_counts.js: fixed batch of ${result.modifiedCount} ${
Model.collection.name
}.`
);
}
console.log(
`action_counts.js: applied all ${OPERATIONS_LENGTH} fixes to ${
Model.collection.name
}.`
);
} else {
console.warn(
'Skipping fixing, --fix was not enabled, pass --fix to fix these errors'
);
}
}
}
};
@@ -1,140 +0,0 @@
const CommentModel = require('../../../models/comment');
const { singleJoinBy } = require('../../../graph/loaders/util');
const debug = require('debug')('talk:cli:verify');
const getBatch = async (limit, offset) =>
CommentModel.find({})
.select({ id: 1, action_counts: 1, reply_count: 1 })
.limit(limit)
.skip(offset)
.sort('created_at');
module.exports = async ({ fix, limit, batch }) => {
let operations = [];
// Count how many comments there are to process.
const totalCount = await CommentModel.count();
let offset = 0;
let comments = [];
let commentIDs = [];
console.log(`Processing ${totalCount} comments in batches of ${limit}...`);
// Keep processing documents until there are is none left.
while (offset < totalCount) {
// Get a batch of comments.
comments = await getBatch(batch, offset);
commentIDs = comments.map(({ id }) => id);
// Get their reply counts.
let allReplyCounts = await CommentModel.aggregate([
{
$match: {
parent_id: {
$in: commentIDs,
},
status: {
$in: ['NONE', 'ACCEPTED'],
},
},
},
{
$group: {
_id: '$parent_id',
count: {
$sum: 1,
},
},
},
])
.then(singleJoinBy(commentIDs, '_id'))
.then(results => results.map(result => (result ? result.count : 0)));
// Loop over the comments, with their action summaries.
for (let i = 0; i < comments.length; i++) {
let comment = comments[i];
let replyCount = allReplyCounts[i];
// And check to see if the action summaries we just computed match what is
// currently set for the comments.
let commentOperations = [];
// If the reply count needs to be updated, then update it!
if (comment.reply_count !== replyCount) {
commentOperations.push({
reply_count: replyCount,
});
}
// If this comment has action summaries that should be updated, then
// perform an update!
if (commentOperations.length > 0) {
operations.push({
updateOne: {
filter: {
id: comment.id,
},
update: {
$set: Object.assign({}, ...commentOperations),
},
},
});
}
}
debug(`Processed batch of ${comments.length} comments.`);
if (operations.length >= limit) {
debug(
`Queued operations are ${
operations.length
}, reached limit of ${limit}, not processing any more.`
);
if (operations.length > limit) {
debug(
`${operations.length -
limit} operations have been truncated to enforce the limit`
);
}
break;
}
offset += batch;
}
const OPERATIONS_LENGTH = operations.length;
if (limit < Infinity && offset + comments.length < totalCount) {
console.log(
`Processed ${offset +
comments.length}/${totalCount} comments because we reached the update limit of ${limit}.`
);
} else {
console.log(`Processed all ${totalCount} comments.`);
}
console.log(`${OPERATIONS_LENGTH} documents need fixing.`);
// If fix was enabled, execute the batch writes.
if (OPERATIONS_LENGTH > 0) {
if (fix) {
debug(`Fixing ${OPERATIONS_LENGTH} documents...`);
while (operations.length) {
let batchOperations = operations.splice(0, batch);
let result = await CommentModel.collection.bulkWrite(batchOperations);
debug(`Fixed batch of ${result.modifiedCount} documents.`);
}
console.log(`Applied all ${OPERATIONS_LENGTH} fixes.`);
} else {
console.warn(
'Skipping fixing, --fix was not enabled, pass --fix to fix these errors'
);
}
}
};
-10
View File
@@ -1,10 +0,0 @@
// This will import all the verifications that should be run by the:
//
// cli verify database
//
// command. They exist in the form:
//
// async ({fix = false, batch = 1000}) => {}
//
// where their options are derived.
module.exports = [require('./comment_replies'), require('./action_counts')];
-22
View File
@@ -40,28 +40,6 @@ documents rather than performing a nice table alter. If the process crashes
during the migration, simply re-run it. The migration operations are designed
to act atomically, and be idempotent to documents already updated.
## Database Verifications
In `v3.*`, we introduced the concept of "verifying the database". Some of our
operations update cached values that live along side the original document to
improve performance. Running the cli command for verifying the database's cache
ensures that all the cached values are up to date.
Running the following will start the database verification process:
```bash
./bin/cli verify db --fix
```
You can notice the `--fix` option, without it, the tool should instead perform
a dry run of the operations it intends to perform.
{: .code-aside}
This process, like the migration process, should take some time to complete on
large databases.
Once you have updated your databases, that's all you have to do! Talk should now
function even better and faster with all the new features we poured into v4.0.0!
## Template Change
In `v4.0.0`, we introduced extensive support for compressing our javascript
+56 -75
View File
@@ -1,63 +1,38 @@
const CommentModel = require('../models/comment');
const { processUpdates } = require('./utils');
module.exports = {
async up({ queryBatchSize, updateBatchSize }) {
// Find all comments that have tags.
const cursor = await CommentModel.collection
.aggregate([
{
$match: {
tags: {
$exists: true,
$ne: [],
},
},
},
{
$project: {
id: true,
tags: true,
},
},
])
.batchSize(queryBatchSize);
// OLD
//
// [
// {
// name: 'OFF_TOPIC',
// assigned_by: '',
// created_at: new Date()
// }
// ]
let updates = [];
while (await cursor.hasNext()) {
let { id, tags } = await cursor.next();
// OLD
//
// [
// {
// name: 'OFF_TOPIC',
// assigned_by: '',
// created_at: new Date()
// }
// ]
// NEW
//
// [
// {
// tag: {
// name: 'OFF_TOPIC',
// permissions: {
// public: true,
// self: false,
// roles: []
// },
// models: ['COMMENTS'],
// created_at: new Date()
// },
// assigned_by: '',
// created_at: new Date()
// }
// ]
// Remap the tag structure.
tags = tags.map(({ name, assigned_by, created_at }) => ({
// NEW
//
// [
// {
// tag: {
// name: 'OFF_TOPIC',
// permissions: {
// public: true,
// self: false,
// roles: []
// },
// models: ['COMMENTS'],
// created_at: new Date()
// },
// assigned_by: '',
// created_at: new Date()
// }
// ]
const transformTags = ({ id, tags }) => ({
query: { id },
update: {
$set: {
tags: tags.map(({ name, assigned_by, created_at }) => ({
tag: {
name,
permissions: {
@@ -70,25 +45,31 @@ module.exports = {
},
assigned_by,
created_at,
}));
})),
},
},
});
updates.push({ query: { id }, update: { $set: { tags } } });
module.exports = {
async up({ transformSingleWithCursor }) {
// Find all comments that have tags.
const cursor = CommentModel.collection.aggregate([
{
$match: {
tags: {
$exists: true,
$ne: [],
},
},
},
{
$project: {
id: true,
tags: true,
},
},
]);
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(CommentModel, updates);
// Clear the updates array.
updates = [];
}
}
if (updates.length > 0) {
// Process the updates.
await processUpdates(CommentModel, updates);
// Clear the updates array.
updates = [];
}
await transformSingleWithCursor(cursor, transformTags, CommentModel);
},
};
+2 -3
View File
@@ -1,5 +1,4 @@
const ActionModel = require('../models/action');
const { processUpdates } = require('./utils');
const mapping = {
COMMENTS: {
@@ -13,7 +12,7 @@ const mapping = {
};
module.exports = {
async up() {
async up({ processManyUpdates }) {
const updates = [];
for (const item_type in mapping) {
const mappings = mapping[item_type];
@@ -45,7 +44,7 @@ module.exports = {
}
if (updates.length > 0) {
await processUpdates(ActionModel, updates);
await processManyUpdates(ActionModel, updates);
}
},
};
+164 -185
View File
@@ -1,56 +1,132 @@
const UserModel = require('../models/user');
const { processUpdates } = require('./utils');
const merge = require('lodash/merge');
module.exports = {
async up({ queryBatchSize, updateBatchSize }) {
const created_at = Date.now();
const transformUser = user => {
const created_at = Date.now();
// Get the first batch of users.
let cursor = await UserModel.collection
.find({
const { id, status, canEditName, suspension, disabled } = user;
let update = {
$unset: {
canEditName: '',
suspension: '',
disabled: '',
},
$set: {
status: {
// The username status is specific to each case.
username: {
history: [],
},
// The user is not banned by default.
banned: {
status: false,
history: [],
},
// The user is not suspended by default.
suspension: {
until: null,
history: [],
},
},
updated_at: created_at,
},
};
if (disabled) {
update = merge(update, {
$set: {
status: {
$in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'],
banned: {
status: true,
history: [
{
status: true,
created_at,
},
],
},
},
})
.batchSize(queryBatchSize);
},
});
}
let updates = [];
while (await cursor.hasNext()) {
const user = await cursor.next();
const { id, status, canEditName, suspension, disabled } = user;
let update = {
$unset: {
canEditName: '',
suspension: '',
disabled: '',
// If the user has an "until" property of their suspension, then we need
// to reflect that in the new status object.
if (suspension && suspension.until !== null) {
update = merge(update, {
$set: {
status: {
suspension: {
until: suspension.until,
history: [
{
until: suspension.until,
created_at,
},
],
},
},
$set: {
status: {
// The username status is specific to each case.
username: {
history: [],
},
},
});
}
// The user is not banned by default.
banned: {
status: false,
history: [],
},
// The user is not suspended by default.
suspension: {
until: null,
history: [],
switch (status) {
case 'ACTIVE':
if (canEditName) {
update = merge(update, {
$set: {
status: {
username: {
status: 'UNSET',
history: [
{
status: 'UNSET',
created_at,
},
],
},
},
},
updated_at: created_at,
},
};
if (disabled) {
});
} else {
update = merge(update, {
$set: {
status: {
username: {
status: 'SET',
history: [
{
status: 'SET',
created_at,
},
],
},
},
},
});
}
break;
case 'BANNED':
if (canEditName) {
update = merge(update, {
$set: {
status: {
username: {
status: 'REJECTED',
history: [
{
status: 'REJECTED',
created_at,
},
],
},
},
},
});
} else {
update = merge(update, {
$set: {
status: {
@@ -63,22 +139,11 @@ module.exports = {
},
],
},
},
},
});
}
// If the user has an "until" property of their suspension, then we need
// to reflect that in the new status object.
if (suspension && suspension.until !== null) {
update = merge(update, {
$set: {
status: {
suspension: {
until: suspension.until,
username: {
status: 'SET',
history: [
{
until: suspension.until,
status: 'SET',
created_at,
},
],
@@ -87,143 +152,57 @@ module.exports = {
},
});
}
switch (status) {
case 'ACTIVE':
if (canEditName) {
update = merge(update, {
$set: {
status: {
username: {
status: 'UNSET',
history: [
{
status: 'UNSET',
created_at,
},
],
},
},
},
});
} else {
update = merge(update, {
$set: {
status: {
username: {
status: 'SET',
history: [
{
status: 'SET',
created_at,
},
],
},
},
},
});
}
break;
case 'BANNED':
if (canEditName) {
update = merge(update, {
$set: {
status: {
username: {
status: 'REJECTED',
history: [
{
status: 'REJECTED',
created_at,
},
],
},
},
},
});
} else {
update = merge(update, {
$set: {
status: {
banned: {
status: true,
history: [
{
status: true,
created_at,
},
],
},
username: {
status: 'SET',
history: [
{
status: 'SET',
created_at,
},
],
},
},
},
});
}
break;
case 'PENDING':
update = merge(update, {
$set: {
status: {
username: {
break;
case 'PENDING':
update = merge(update, {
$set: {
status: {
username: {
status: 'CHANGED',
history: [
{
status: 'CHANGED',
history: [
{
status: 'CHANGED',
created_at,
},
],
created_at,
},
},
],
},
});
break;
case 'APPROVED':
update = merge(update, {
$set: {
status: {
username: {
},
},
});
break;
case 'APPROVED':
update = merge(update, {
$set: {
status: {
username: {
status: 'APPROVED',
history: [
{
status: 'APPROVED',
history: [
{
status: 'APPROVED',
created_at,
},
],
created_at,
},
},
],
},
});
break;
default:
throw new Error(`${status} is an invalid status`);
}
},
},
});
break;
default:
throw new Error(`${status} is an invalid status`);
}
updates.push({ query: { id }, update });
return { query: { id }, update };
};
// Process every 1000 users.
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(UserModel, updates);
module.exports = {
async up({ transformSingleWithCursor }) {
// Get the first batch of users.
const cursor = UserModel.collection.find({
status: {
$in: ['ACTIVE', 'BANNED', 'PENDING', 'APPROVED'],
},
});
// Clear the updates array.
updates = [];
}
}
if (updates.length > 0) {
// Process the updates.
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
}
await transformSingleWithCursor(cursor, transformUser, UserModel);
},
};
+12 -32
View File
@@ -1,5 +1,4 @@
const UserModel = require('../models/user');
const { processUpdates } = require('./utils');
const findNewRole = roles => {
if (roles.includes('ADMIN')) {
@@ -14,20 +13,16 @@ const findNewRole = roles => {
};
module.exports = {
async up({ queryBatchSize, updateBatchSize }) {
const cursor = await UserModel.collection
.find({
roles: {
$exists: true,
},
})
.batchSize(queryBatchSize);
async up({ transformSingleWithCursor }) {
const cursor = UserModel.collection.find({
roles: {
$exists: true,
},
});
let updates = [];
while (await cursor.hasNext()) {
const user = await cursor.next();
updates.push({
await transformSingleWithCursor(
cursor,
user => ({
query: {
id: user.id,
},
@@ -41,23 +36,8 @@ module.exports = {
roles: '',
},
},
});
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
}
}
if (updates.length > 0) {
// Process the updates.
await processUpdates(UserModel, updates);
// Clear the updates array.
updates = [];
}
}),
UserModel
);
},
};
+118
View File
@@ -0,0 +1,118 @@
const ActionModel = require('../models/action');
const UserModel = require('../models/user');
const CommentModel = require('../models/comment');
module.exports = {
async up({ transformSingleWithCursor }) {
const models = [
{ Model: CommentModel, item_type: 'COMMENTS' },
{ Model: UserModel, item_type: 'USERS' },
];
for (const { Model, item_type } of models) {
let cursor = ActionModel.collection.aggregate([
{
$match: {
group_id: { $ne: null },
item_type,
},
},
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
group_id: '$group_id',
},
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
},
},
},
{
$project: {
// suppress the _id field
_id: false,
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
group_id: { $toLower: '$_id.group_id' },
// map the field directly
count: '$count',
},
},
]);
// Transform those documents.
await transformSingleWithCursor(
cursor,
({ item_id, action_type, group_id, count }) => ({
query: { id: item_id },
update: {
$set: {
[`action_counts.${action_type}_${group_id}`]: count,
},
},
}),
Model
);
// Secondly, we'll collect the group group id's (all the actions for a
// specific action type) to update counts of.
cursor = ActionModel.collection.aggregate([
{
$match: {
item_type,
},
},
{
$group: {
// group unique documents by these properties, we are leveraging the
// fact that each uuid is completely unique.
_id: {
item_id: '$item_id',
action_type: '$action_type',
},
// and sum up all actions matching the above grouping criteria
count: {
$sum: 1,
},
},
},
{
$project: {
// suppress the _id field
_id: false,
// map the fields from the _id grouping down a level
item_id: '$_id.item_id',
action_type: { $toLower: '$_id.action_type' },
// map the field directly
count: '$count',
},
},
]);
// Transform those documents.
await transformSingleWithCursor(
cursor,
({ item_id, action_type, count }) => ({
query: { id: item_id },
update: {
$set: {
[`action_counts.${action_type}`]: count,
},
},
}),
Model
);
}
},
};
+30
View File
@@ -0,0 +1,30 @@
const CommentModel = require('../models/comment');
const transformComments = ({ _id: parent_id, reply_count }) => ({
query: { id: parent_id, reply_count: { $ne: reply_count } },
update: { $set: { reply_count } },
});
module.exports = {
async up({ transformSingleWithCursor }) {
const cursor = CommentModel.collection.aggregate([
{
$match: {
parent_id: { $ne: null },
status: { $in: ['NONE', 'ACCEPTED'] },
},
},
{
$group: {
_id: '$parent_id',
reply_count: {
$sum: 1,
},
},
},
]);
// Transform those documents.
await transformSingleWithCursor(cursor, transformComments, CommentModel);
},
};
-19
View File
@@ -1,19 +0,0 @@
/**
* processUpdates processes batches of updates on the given model.
*
* @param {Object} model mongoose model that should perform the operations on
* @param {Array<Object>} updates array of updates to execute
*/
const processUpdates = async (model, updates) => {
// Create a new batch operation.
const bulk = model.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).updateOne(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
module.exports = { processUpdates };
+1 -1
View File
@@ -29,7 +29,7 @@
},
"talk": {
"migration": {
"minVersion": 1511801783
"minVersion": 1516920160
}
},
"repository": {
+78
View File
@@ -0,0 +1,78 @@
/**
* processUpdates processes batches of updates on the given model.
*
* @param {Object} model mongoose model that should perform the operations on
* @param {Array<Object>} updates array of updates to execute
*/
const processUpdates = async (model, updates) => {
// Create a new batch operation.
const bulk = model.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).updateOne(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
const transformSingleWithCursor = ({
queryBatchSize,
updateBatchSize,
}) => async (cursor, process, Model) => {
// We'll manage the updates that we store inside this object.
let updates = [];
// First we'll collect all the individual actions with specific group id's.
cursor = await cursor.batchSize(queryBatchSize);
while (await cursor.hasNext()) {
const result = await cursor.next();
const transformed = await process(result);
if (transformed) {
updates.push(transformed);
}
if (updates.length > updateBatchSize) {
// Process the updates.
await processUpdates(Model, updates);
// Clear the updates array.
updates = [];
}
}
if (updates.length > 0) {
// Process the updates.
await processUpdates(Model, updates);
// Clear the updates array.
updates = [];
}
};
/**
* processManyUpdates processes batches of updates on many models with the given
* model.
*
* @param {Object} model mongoose model that should perform the operations on
* @param {Array<Object>} updates array of updates to execute
*/
const processManyUpdates = async (model, updates) => {
// Create a new batch operation.
const bulk = model.collection.initializeUnorderedBulkOp();
for (const { query, update } of updates) {
bulk.find(query).update(update);
}
// Execute the bulk update operation.
await bulk.execute();
};
module.exports = ctx => ({
processManyUpdates,
processUpdates,
transformSingleWithCursor: transformSingleWithCursor(ctx),
});
@@ -1,12 +1,13 @@
const MigrationModel = require('../models/migration');
const MigrationModel = require('../../models/migration');
const fs = require('fs');
const ms = require('ms');
const path = require('path');
const Joi = require('joi');
const debug = require('debug')('talk:services:migration');
const sc = require('snake-case');
const helpers = require('./helpers');
const { stripIndent } = require('common-tags');
const { talk: { migration: { minVersion } } } = require('../package.json');
const { talk: { migration: { minVersion } } } = require('../../package.json');
const migrationTemplate = stripIndent`
module.exports = {
@@ -46,7 +47,7 @@ class MigrationService {
static async listPending() {
// Get all the migration files.
let migrationFiles = fs.readdirSync(
path.join(__dirname, '..', 'migrations')
path.join(__dirname, '..', '..', 'migrations')
);
// Ensure that all migrations follow this format.
@@ -78,7 +79,7 @@ class MigrationService {
}
// Read the migration from the filesystem.
let migration = require(`../migrations/${filename}`);
let migration = require(`../../migrations/${filename}`);
Joi.assert(
migration,
migrationSchema,
@@ -121,11 +122,14 @@ class MigrationService {
return;
}
// Create the context helpers.
const ctx = helpers({ queryBatchSize, updateBatchSize });
for (let { filename, version, migration } of migrations) {
try {
const startTime = new Date();
console.log(`Starting migration ${filename}`);
await migration.up({ queryBatchSize, updateBatchSize });
await migration.up(ctx);
const endTime = new Date();
const totalTime = endTime.getTime() - startTime.getTime();
console.log(`Finished migration ${filename} in ${ms(totalTime)}`);
@@ -1,12 +1,13 @@
const migration = require('../../../migrations/1510174676_user_status');
const UserModel = require('../../../models/user');
const helpers = require('../../../services/migration/helpers');
const chai = require('chai');
chai.use(require('chai-datetime'));
const { expect } = chai;
const performMigration = () =>
migration.up({ queryBatchSize: 100, updateBatchSize: 100 });
migration.up(helpers({ queryBatchSize: 100, updateBatchSize: 100 }));
describe('migration.1510174676_user_status', () => {
describe('active user', () => {