mirror of
https://github.com/wassname/talk.git
synced 2026-06-30 13:48:15 +08:00
Notification Digesting
This commit is contained in:
@@ -43,6 +43,8 @@ plugins/*
|
||||
!plugins/talk-plugin-notifications-category-featured
|
||||
!plugins/talk-plugin-notifications-category-reply
|
||||
!plugins/talk-plugin-notifications-category-staff
|
||||
!plugins/talk-plugin-notifications-digest-daily
|
||||
!plugins/talk-plugin-notifications-digest-hourly
|
||||
!plugins/talk-plugin-offtopic
|
||||
!plugins/talk-plugin-permalink
|
||||
!plugins/talk-plugin-profile-settings
|
||||
|
||||
@@ -84,6 +84,7 @@ const handler = {
|
||||
category: 'featured',
|
||||
event: 'commentFeatured',
|
||||
hydrate,
|
||||
digestOrder: 10,
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
|
||||
@@ -2,5 +2,5 @@ en:
|
||||
talk-plugin-notifications:
|
||||
categories:
|
||||
featured:
|
||||
subject: "One of your comments was featured on [{0}]"
|
||||
subject: "One of your comments was featured on {0}"
|
||||
body: "{0}\nA member of our team has selected this comment to be featured for other readers: {1}"
|
||||
@@ -90,7 +90,13 @@ const hydrate = async (ctx, category, context) => {
|
||||
return [headline, replier, permalink];
|
||||
};
|
||||
|
||||
const handler = { handle, category: 'reply', event: 'commentAdded', hydrate };
|
||||
const handler = {
|
||||
handle,
|
||||
category: 'reply',
|
||||
event: 'commentAdded',
|
||||
hydrate,
|
||||
digestOrder: 30,
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
typeDefs: `
|
||||
|
||||
@@ -117,6 +117,7 @@ const handler = {
|
||||
event: 'commentAdded',
|
||||
hydrate,
|
||||
supersedesCategories: ['reply'],
|
||||
digestOrder: 20,
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
module.exports = {
|
||||
typeDefs: `
|
||||
enum DIGEST_FREQUENCY {
|
||||
# DAILY will queue up the notifications and send them daily.
|
||||
DAILY
|
||||
}
|
||||
`,
|
||||
notificationDigests: {
|
||||
DAILY: { cronTime: '0 0 * * *', timeZone: 'America/New_York' },
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,11 @@
|
||||
module.exports = {
|
||||
typeDefs: `
|
||||
enum DIGEST_FREQUENCY {
|
||||
# HOURLY will queue up the notifications and send them hourly.
|
||||
HOURLY
|
||||
}
|
||||
`,
|
||||
notificationDigests: {
|
||||
HOURLY: { cronTime: '0 * * * *', timeZone: 'America/New_York' },
|
||||
},
|
||||
};
|
||||
@@ -6,6 +6,7 @@
|
||||
"license": "Apache-2.0",
|
||||
"private": false,
|
||||
"dependencies": {
|
||||
"cron": "^1.3.0",
|
||||
"linkifyjs": "^2.1.5"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,24 @@
|
||||
const { get, find, groupBy, forEach, property } = require('lodash');
|
||||
const debug = require('debug')('talk-plugin-notifications');
|
||||
const uuid = require('uuid/v4');
|
||||
const {
|
||||
UNSUBSCRIBE_SUBJECT,
|
||||
DISABLE_REQUIRE_EMAIL_VERIFICATIONS,
|
||||
} = require('./config');
|
||||
merge,
|
||||
map,
|
||||
get,
|
||||
find,
|
||||
groupBy,
|
||||
forEach,
|
||||
flatten,
|
||||
property,
|
||||
} = require('lodash');
|
||||
const debug = require('debug')('talk-plugin-notifications');
|
||||
const { DISABLE_REQUIRE_EMAIL_VERIFICATIONS } = require('./config');
|
||||
const { CronJob } = require('cron');
|
||||
const { getOrganizationName } = require('./util');
|
||||
const {
|
||||
processNewNotifications,
|
||||
filterSuperseded,
|
||||
filterVerified,
|
||||
sendNotification,
|
||||
} = require('./messages');
|
||||
const { renderDigestMessage } = require('./digests');
|
||||
|
||||
// handleHandlers will call the handle method on each handler to determine if a
|
||||
// notification should be sent for it.
|
||||
@@ -32,97 +46,11 @@ const handleHandlers = (ctx, handlers, ...args) =>
|
||||
})
|
||||
);
|
||||
|
||||
// filterSuperseded will filter all the possible notifications and only send
|
||||
// those notifications that are not superseded by another type of notification.
|
||||
const filterSuperseded = (
|
||||
{ handler: { category }, notification: { userID: destinationUserID } },
|
||||
index,
|
||||
notifications
|
||||
) =>
|
||||
!notifications.some(
|
||||
({
|
||||
handler: { supersedesCategories = [] },
|
||||
notification: { userID: notificationUserID },
|
||||
}) =>
|
||||
// Only allow notifications to supersede another notification if that
|
||||
// notification is also destined for the same user.
|
||||
notificationUserID === destinationUserID &&
|
||||
// If another notification that is destined for the same user also exists
|
||||
// and declares that it supersedes this one, return true so we can filter
|
||||
// this one from the list.
|
||||
supersedesCategories.some(
|
||||
supersededCategory => supersededCategory === category
|
||||
)
|
||||
);
|
||||
|
||||
const USER_CONFIRMATION_QUERY = `
|
||||
query CheckUserConfirmation($userID: ID!) {
|
||||
user(id: $userID) {
|
||||
profiles {
|
||||
provider
|
||||
... on LocalUserProfile {
|
||||
confirmedAt
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
// filterVerifiedNotification checks to see if a user has a verified email
|
||||
// address, and if they do, returns the notification payload again, otherwise,
|
||||
// returns undefined.
|
||||
const filterVerifiedNotification = ctx => async notification => {
|
||||
// Grab the user that we're supposed to be sending the notification to.
|
||||
const { notification: { userID } } = notification;
|
||||
|
||||
// Check their confirmed status. This should have already been hit by the
|
||||
// loaders, so we shouldn't make any more database requests.
|
||||
const { errors, data } = await ctx.graphql(USER_CONFIRMATION_QUERY, {
|
||||
userID,
|
||||
});
|
||||
if (errors) {
|
||||
ctx.log.error(
|
||||
{ err: errors },
|
||||
'could not query for user confirmation status'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the first local profile from the user.
|
||||
const profile = find(get(data, 'user.profiles', []), ['provider', 'local']);
|
||||
if (!profile) {
|
||||
ctx.log.warn({ user_id: userID }, 'user did not have a local profile');
|
||||
return;
|
||||
}
|
||||
|
||||
// Pull out the confirmed status from the profile.
|
||||
const confirmed = get(profile, 'confirmedAt', null) !== null;
|
||||
if (!confirmed) {
|
||||
ctx.log.info(
|
||||
{ user_id: userID },
|
||||
'user did not have their local profile confirmed, but had settings enabled, not mailing'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
return notification;
|
||||
};
|
||||
|
||||
// filterVerified performs filtering in a complicated way because we can't use
|
||||
// Promise.all on a Array.prototype.filter call.
|
||||
const filterVerified = async (ctx, notifications) => {
|
||||
notifications = await Promise.all(
|
||||
notifications.map(filterVerifiedNotification(ctx))
|
||||
);
|
||||
|
||||
// This acts as a poor-mans identity filter to remove all falsy values.
|
||||
return notifications.filter(property('notification'));
|
||||
};
|
||||
|
||||
class NotificationManager {
|
||||
constructor(context) {
|
||||
this.context = context;
|
||||
this.registry = [];
|
||||
this.digests = [];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -149,17 +77,170 @@ class NotificationManager {
|
||||
.map(({ category }) => category)
|
||||
.join(', ')}] handlers when the '${event}' event is emitted`
|
||||
);
|
||||
broker.on(event, this.handle(handlers));
|
||||
broker.on(event, this.handleUserEvent(handlers));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* handle will wrap a notification handler and attach it to the notification
|
||||
* stream system.
|
||||
* registerDigests will register the digest handlers.
|
||||
*
|
||||
* @param {Array<Object>} handlers digest handlers for options related to digesting
|
||||
*/
|
||||
registerDigests(...handlers) {
|
||||
this.digests.push(...handlers);
|
||||
}
|
||||
|
||||
/**
|
||||
* startDigesting will register all the digests to run and setup the cron
|
||||
* jobs.
|
||||
*/
|
||||
startDigesting() {
|
||||
this.digests.forEach(({ frequency, config }) => {
|
||||
new CronJob(
|
||||
merge(config, {
|
||||
start: true,
|
||||
onTick: this.handleDigestEvent(frequency),
|
||||
})
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
handleDigestEvent(frequency) {
|
||||
return async () => {
|
||||
// Create a system context to send down.
|
||||
const ctx = this.context.forSystem();
|
||||
|
||||
try {
|
||||
// Pull out some useful tools.
|
||||
const {
|
||||
connectors: { models: { User }, services: { I18n: { t } } },
|
||||
} = ctx;
|
||||
|
||||
const organizationName = await getOrganizationName(ctx);
|
||||
if (!organizationName) {
|
||||
ctx.log.error(
|
||||
'could not send the notification, organization name not in settings'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const subject = t(
|
||||
'talk-plugin-notifications.templates.digest.subject',
|
||||
organizationName
|
||||
);
|
||||
|
||||
// Continue to pull from the Users digest until the queue is empty.
|
||||
while (true) {
|
||||
// Pull notifications from a user that have notifications enabled for
|
||||
// `frequency` and currently have notifications.
|
||||
const user = await User.findOneAndUpdate(
|
||||
{
|
||||
'metadata.notifications.settings.digestFrequency': frequency,
|
||||
'metadata.notifications.digests': { $exists: true, $ne: [] },
|
||||
},
|
||||
{ $set: { 'metadata.notifications.digests': [] } }
|
||||
);
|
||||
if (!user) {
|
||||
// There are no more users that meet the search criteria! We're
|
||||
// done!
|
||||
ctx.log.info('no notifications from database');
|
||||
break;
|
||||
}
|
||||
|
||||
// Begin rendering the user's digest.
|
||||
const digests = get(user, 'metadata.notifications.digests');
|
||||
if (!digests) {
|
||||
// We couldn't get the digest from the user (even after Mongo said
|
||||
// we would get it?).
|
||||
ctx.log.info(
|
||||
{ userID: user.id },
|
||||
'no notifications from user in database'
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
ctx.log.info(
|
||||
{ userID: user.id, notifications: digests.length },
|
||||
'generating notification digest email'
|
||||
);
|
||||
|
||||
const flattenedDigestCategories = this.flattenDigests(ctx, digests);
|
||||
console.log(JSON.stringify(flattenedDigestCategories));
|
||||
|
||||
// Get all the notifications together.
|
||||
const allMessages = await renderDigestMessage(
|
||||
ctx,
|
||||
flattenedDigestCategories
|
||||
);
|
||||
|
||||
// Send the email with the digested body.
|
||||
await sendNotification(
|
||||
ctx,
|
||||
user.id,
|
||||
subject,
|
||||
flatten(allMessages),
|
||||
'notification-digest'
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
ctx.log.error({ err }, 'could not handle digests');
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
flattenDigests(ctx, digests) {
|
||||
// Digests are store in the database like:
|
||||
//
|
||||
// [{ notification: { userID, date, context }, category }, ...]
|
||||
//
|
||||
// So lets group our notifications by category, creating the
|
||||
// following:
|
||||
//
|
||||
// {[category]: [{notification: { userID, date, context }}, ...], ...}
|
||||
//
|
||||
const groupedDigests = groupBy(digests, 'category');
|
||||
|
||||
// Lets attach the handler reference onto each of these, so we
|
||||
// transform it again to the following:
|
||||
//
|
||||
// [{ handler, notifications: [{ userID, date, context }]}]
|
||||
//
|
||||
return Object.keys(groupedDigests)
|
||||
.map(category => {
|
||||
// Get the handler.
|
||||
const handler = find(this.registry, ['category', category]);
|
||||
if (!handler) {
|
||||
ctx.log.info({ category }, 'notification category not found');
|
||||
return;
|
||||
}
|
||||
// Get the notifications.
|
||||
const notifications = map(get(groupedDigests, category), digests =>
|
||||
get(digests, 'notification')
|
||||
);
|
||||
|
||||
return { notifications, handler };
|
||||
})
|
||||
.filter(digest => digest)
|
||||
.sort((a, b) => {
|
||||
const aDigestOrder = get(a, 'handler.digestOrder', 0);
|
||||
const bDigestOrder = get(b, 'handler.digestOrder', 0);
|
||||
if (aDigestOrder < bDigestOrder) {
|
||||
return -1;
|
||||
}
|
||||
if (aDigestOrder > bDigestOrder) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* handleUserEvent will wrap a notification handler and attach it to the
|
||||
* notification stream system.
|
||||
*
|
||||
* @param {Object} handler a notification handler
|
||||
*/
|
||||
handle(handlers) {
|
||||
handleUserEvent(handlers) {
|
||||
return async (...args) => {
|
||||
// Create a system context to send down.
|
||||
const ctx = this.context.forSystem();
|
||||
@@ -181,95 +262,9 @@ class NotificationManager {
|
||||
}
|
||||
|
||||
// Send the remaining notifications.
|
||||
return Promise.all(
|
||||
notifications.map(
|
||||
({ handler, notification: { userID, date, context } }) =>
|
||||
this.send(ctx, userID, date, handler, context)
|
||||
)
|
||||
);
|
||||
return processNewNotifications(ctx, notifications);
|
||||
};
|
||||
}
|
||||
|
||||
async send(ctx, userID, date, handler, context) {
|
||||
const {
|
||||
connectors: {
|
||||
secrets: { jwt },
|
||||
config: { JWT_ISSUER, JWT_AUDIENCE },
|
||||
services: { Mailer, I18n: { t } },
|
||||
},
|
||||
loaders: { Settings },
|
||||
} = ctx;
|
||||
const { category } = handler;
|
||||
|
||||
try {
|
||||
// Get the settings.
|
||||
const { organizationName = null } = await Settings.load(
|
||||
'organizationName'
|
||||
);
|
||||
if (organizationName === null) {
|
||||
ctx.log.error(
|
||||
'could not send the notification, organization name not in settings'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// unsubscribeToken is the token used to perform the one-click
|
||||
// unsubscribe.
|
||||
const unsubscribeToken = jwt.sign({
|
||||
jti: uuid(),
|
||||
iss: JWT_ISSUER,
|
||||
aud: JWT_AUDIENCE,
|
||||
sub: UNSUBSCRIBE_SUBJECT,
|
||||
user: userID,
|
||||
});
|
||||
|
||||
// Compose the subject for the email.
|
||||
const subject = t(
|
||||
`talk-plugin-notifications.categories.${category}.subject`,
|
||||
organizationName
|
||||
);
|
||||
|
||||
// Load the content into the comment.
|
||||
const body = await this.getBody(ctx, handler, context);
|
||||
|
||||
// Send the notification to the user.
|
||||
const task = await Mailer.send({
|
||||
template: 'notification',
|
||||
locals: { body, organizationName, unsubscribeToken },
|
||||
subject,
|
||||
user: userID,
|
||||
});
|
||||
|
||||
ctx.log.info(`Sent the notification for Job.ID[${task.id}]`);
|
||||
} catch (err) {
|
||||
ctx.log.error(
|
||||
{ err, message: err.message },
|
||||
'could not send the notification, an error occurred'
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* getBody will return the body for the notification payload.
|
||||
*
|
||||
* @param {Object} ctx the graph context
|
||||
* @param {Object} handler the notification handler
|
||||
* @param {Mixed} context the notification context
|
||||
*/
|
||||
async getBody(ctx, handler, context) {
|
||||
const { connectors: { services: { I18n: { t } } } } = ctx;
|
||||
const { category, hydrate = () => [] } = handler;
|
||||
|
||||
// Get the body replacement variables for the translation key.
|
||||
const replacements = await hydrate(ctx, category, context);
|
||||
|
||||
// Generate the body.
|
||||
return t(
|
||||
`talk-plugin-notifications.categories.${category}.body`,
|
||||
...replacements
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = NotificationManager;
|
||||
|
||||
@@ -2,6 +2,7 @@ const debug = require('debug')('talk-plugin-notifications');
|
||||
const path = require('path');
|
||||
const linkify = require('linkifyjs/html');
|
||||
const NotificationManager = require('./NotificationManager');
|
||||
const { map, reduce } = require('lodash');
|
||||
|
||||
module.exports = connectors => {
|
||||
const {
|
||||
@@ -12,17 +13,15 @@ module.exports = connectors => {
|
||||
// Setup the mailer. Other plugins registered before this one can replace the
|
||||
// notification template by passing the same name + format for the template
|
||||
// registration.
|
||||
Mailer.templates.register(
|
||||
path.join(__dirname, 'emails', 'notification.html.ejs'),
|
||||
'notification',
|
||||
'html'
|
||||
);
|
||||
Mailer.templates.register(
|
||||
path.join(__dirname, 'emails', 'notification.txt.ejs'),
|
||||
'notification',
|
||||
'txt'
|
||||
);
|
||||
|
||||
['notification', 'notification-digest'].forEach(name => {
|
||||
['txt', 'html'].forEach(format => {
|
||||
Mailer.templates.register(
|
||||
path.join(__dirname, 'emails', `${name}.${format}.ejs`),
|
||||
name,
|
||||
format
|
||||
);
|
||||
});
|
||||
});
|
||||
// Register the mail helpers. You can register your own helpers by calling
|
||||
// this function in another plugin.
|
||||
Mailer.registerHelpers({ linkify });
|
||||
@@ -67,6 +66,47 @@ module.exports = connectors => {
|
||||
// Attach all the notification handlers.
|
||||
manager.register(...notificationHandlers);
|
||||
|
||||
// Digest handlers should export the following to the `notificationDigests`
|
||||
// plugin hook:
|
||||
//
|
||||
// {DAILY: { cronTime: '0 0 * * *', timeZone: 'America/New_York' }}
|
||||
//
|
||||
// Where `DAILY` is the key referenced in the typeDefs as a new type of
|
||||
// `DIGEST_FREQUENCY`, and the value of that key is the one provided to the
|
||||
// constructor for the Cron object:
|
||||
//
|
||||
// https://github.com/kelektiv/node-cron
|
||||
//
|
||||
// Which is used to trigger the digest operation for those uses setup with
|
||||
// that type of digesting.
|
||||
const digestHandlers = Plugins.get('server', 'notificationDigests').reduce(
|
||||
(handlers, { plugin, notificationDigests }) => {
|
||||
debug(
|
||||
`registered the ${plugin.name} plugin for digest notifications ${map(
|
||||
notificationDigests,
|
||||
(config, frequency) => frequency
|
||||
)}`
|
||||
);
|
||||
|
||||
return reduce(
|
||||
notificationDigests,
|
||||
(handlers, config, frequency) => {
|
||||
handlers.push({ config, frequency });
|
||||
|
||||
return handlers;
|
||||
},
|
||||
handlers
|
||||
);
|
||||
},
|
||||
[]
|
||||
);
|
||||
|
||||
// Attach all the notification digest handlers.
|
||||
manager.registerDigests(...digestHandlers);
|
||||
|
||||
// Attach the broker to the manager so it can listen for the events.
|
||||
manager.attach(broker);
|
||||
|
||||
// Start processing digests.
|
||||
manager.startDigesting();
|
||||
};
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
const { get, flatten } = require('lodash');
|
||||
const { getNotificationBody } = require('./util');
|
||||
|
||||
const QUEUE_DIGEST_NOTIFICATION_QUERY = `
|
||||
query CheckDigest($userID: ID!) {
|
||||
user(id: $userID) {
|
||||
notificationSettings {
|
||||
digestFrequency
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
// checkDigests will return a boolean indicating if the user has digesting
|
||||
// enabled.
|
||||
const checkDigests = async (ctx, userID) => {
|
||||
const { data, errors } = await ctx.graphql(QUEUE_DIGEST_NOTIFICATION_QUERY, {
|
||||
userID,
|
||||
});
|
||||
if (errors) {
|
||||
ctx.log.error(
|
||||
{ err: errors },
|
||||
'could not check the digest status of the user, skipping notifications'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
return get(data, 'user.notificationSettings.digestFrequency') !== 'NONE';
|
||||
};
|
||||
|
||||
// renderDigestMessage will render the notification body value for a digest
|
||||
// message. It expects that the digestCategories are parsed into a list grouped
|
||||
// by category with the handler available.
|
||||
const renderDigestMessage = async (ctx, flattenedDigestCategories) => {
|
||||
// Render the messages in this format:
|
||||
//
|
||||
// [{handler, notifications: [{ context }, ...]}, ...]
|
||||
//
|
||||
// To:
|
||||
//
|
||||
// [['body', 'body'], ['body']]
|
||||
//
|
||||
const notifications = await Promise.all(
|
||||
flattenedDigestCategories.map(async ({ handler, notifications }) =>
|
||||
Promise.all(
|
||||
notifications.map(async ({ context }) =>
|
||||
getNotificationBody(ctx, handler, context)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// Flatten the array of categories:
|
||||
//
|
||||
// [[..., ...], [..., ...], ...] -> [..., ..., ...]
|
||||
//
|
||||
return flatten(notifications);
|
||||
};
|
||||
|
||||
module.exports = { renderDigestMessage, checkDigests };
|
||||
@@ -0,0 +1,6 @@
|
||||
<% body.forEach((message) => { %>
|
||||
<p><%= linkify(message, {nl2br: true}) %></p>
|
||||
<% }) %>
|
||||
<br/>
|
||||
<p><%= t('talk-plugin-notifications.templates.footer', organizationName) %></p>
|
||||
<p><a href="<%= BASE_URL %>account/unsubscribe-notifications#<%= unsubscribeToken %>" target="_blank"><%= t('talk-plugin-notifications.templates.links.unsubscribe') %></a></p>
|
||||
@@ -0,0 +1,9 @@
|
||||
<% body.forEach((message) => { %>
|
||||
<%= message %>
|
||||
|
||||
<% }) %>
|
||||
<%= t('talk-plugin-notifications.templates.footer', organizationName) %>
|
||||
|
||||
<%= t('talk-plugin-notifications.templates.links.unsubscribe') %>
|
||||
|
||||
<%= BASE_URL %>account/unsubscribe-notifications#<%= unsubscribeToken %>
|
||||
@@ -0,0 +1,238 @@
|
||||
const { map, get, find, groupBy, property } = require('lodash');
|
||||
const uuid = require('uuid/v4');
|
||||
const { UNSUBSCRIBE_SUBJECT } = require('./config');
|
||||
const { getOrganizationName, getNotificationBody } = require('./util');
|
||||
const { checkDigests } = require('./digests');
|
||||
|
||||
// processNewNotifications will handle notifications that are collected after an
|
||||
// event hook. These notifications will be batched by user and optionally
|
||||
// queued for digesting or sent immediately depending on the user's settings.
|
||||
const processNewNotifications = async (ctx, notifications) =>
|
||||
Promise.all(
|
||||
map(
|
||||
// Group all the notifications so we don't have to redo the digest check
|
||||
// multiple times for the same user.
|
||||
groupBy(notifications, 'notification.userID'),
|
||||
async (notifications, userID) => {
|
||||
// Check to see if the user has digesting enabled.
|
||||
const hasDigesting = await checkDigests(ctx, userID);
|
||||
if (hasDigesting) {
|
||||
// User has digesting enabled, queue the notifications to be sent
|
||||
// at a later time.
|
||||
return queueNotifications(ctx, userID, notifications);
|
||||
}
|
||||
|
||||
// User does not have digesting enabled, send the messages the old
|
||||
// way.
|
||||
return sendNotificationsBatch(ctx, notifications);
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
// queueNotifications will queue the notifications onto the User.
|
||||
const queueNotifications = async (ctx, userID, notifications) => {
|
||||
// Mutate the notification payloads to what we can store in Mongo safely.
|
||||
const digests = notifications.map(
|
||||
({ notification, handler: { category } }) => ({ notification, category })
|
||||
);
|
||||
|
||||
// Pull out some useful tools.
|
||||
const { connectors: { models: { User } } } = ctx;
|
||||
|
||||
ctx.log.info(
|
||||
{ notifications: notifications.length, userID },
|
||||
'now queueing notifications for digesting'
|
||||
);
|
||||
|
||||
// Push the digests into Mongo.
|
||||
await User.update(
|
||||
{ id: userID },
|
||||
{ $push: { 'metadata.notifications.digests': { $each: digests } } }
|
||||
);
|
||||
};
|
||||
|
||||
// sendNotificationBatch will send a given set of notifications for several
|
||||
// users that do not have digesting enabled.
|
||||
const sendNotificationsBatch = async (ctx, notifications) => {
|
||||
// Get the notification name for the subject.
|
||||
const organizationName = await getOrganizationName(ctx);
|
||||
if (!organizationName) {
|
||||
ctx.log.error(
|
||||
'could not send the notification, organization name not in settings'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(notifications);
|
||||
|
||||
return Promise.all(
|
||||
map(
|
||||
notifications,
|
||||
async ({ handler, notification: { userID, context } }) => {
|
||||
const { connectors: { services: { I18n: { t } } } } = ctx;
|
||||
const { category } = handler;
|
||||
|
||||
// Compose the subject for the email.
|
||||
const subject = t(
|
||||
`talk-plugin-notifications.categories.${category}.subject`,
|
||||
organizationName
|
||||
);
|
||||
|
||||
// Load the content into the comment.
|
||||
const body = await getNotificationBody(ctx, handler, context);
|
||||
|
||||
// Send the email now.
|
||||
return sendNotification(ctx, userID, subject, body);
|
||||
}
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
// sendNotification will send the notification to the specified user with the
|
||||
// given context.
|
||||
const sendNotification = async (
|
||||
ctx,
|
||||
userID,
|
||||
subject,
|
||||
body,
|
||||
template = 'notification'
|
||||
) => {
|
||||
const {
|
||||
connectors: {
|
||||
secrets: { jwt },
|
||||
config: { JWT_ISSUER, JWT_AUDIENCE },
|
||||
services: { Mailer },
|
||||
},
|
||||
} = ctx;
|
||||
|
||||
try {
|
||||
const organizationName = await getOrganizationName(ctx);
|
||||
if (!organizationName) {
|
||||
ctx.log.error(
|
||||
'could not send the notification, organization name not in settings'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// unsubscribeToken is the token used to perform the one-click
|
||||
// unsubscribe.
|
||||
const unsubscribeToken = jwt.sign({
|
||||
jti: uuid(),
|
||||
iss: JWT_ISSUER,
|
||||
aud: JWT_AUDIENCE,
|
||||
sub: UNSUBSCRIBE_SUBJECT,
|
||||
user: userID,
|
||||
});
|
||||
|
||||
// Send the notification to the user.
|
||||
const task = await Mailer.send({
|
||||
template,
|
||||
locals: { body, organizationName, unsubscribeToken },
|
||||
subject,
|
||||
user: userID,
|
||||
});
|
||||
|
||||
ctx.log.info({ jobID: task.id }, 'sent the notification');
|
||||
} catch (err) {
|
||||
ctx.log.error(
|
||||
{ err, message: err.message },
|
||||
'could not send the notification, an error occurred'
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// filterSuperseded will filter all the possible notifications and only send
|
||||
// those notifications that are not superseded by another type of notification.
|
||||
const filterSuperseded = (
|
||||
{ handler: { category }, notification: { userID: destinationUserID } },
|
||||
index,
|
||||
notifications
|
||||
) =>
|
||||
!notifications.some(
|
||||
({
|
||||
handler: { supersedesCategories = [] },
|
||||
notification: { userID: notificationUserID },
|
||||
}) =>
|
||||
// Only allow notifications to supersede another notification if that
|
||||
// notification is also destined for the same user.
|
||||
notificationUserID === destinationUserID &&
|
||||
// If another notification that is destined for the same user also exists
|
||||
// and declares that it supersedes this one, return true so we can filter
|
||||
// this one from the list.
|
||||
supersedesCategories.some(
|
||||
supersededCategory => supersededCategory === category
|
||||
)
|
||||
);
|
||||
|
||||
const USER_CONFIRMATION_QUERY = `
|
||||
query CheckUserConfirmation($userID: ID!) {
|
||||
user(id: $userID) {
|
||||
profiles {
|
||||
provider
|
||||
... on LocalUserProfile {
|
||||
confirmedAt
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
// filterVerifiedNotification checks to see if a user has a verified email
|
||||
// address, and if they do, returns the notification payload again, otherwise,
|
||||
// returns undefined.
|
||||
const filterVerifiedNotification = ctx => async notification => {
|
||||
// Grab the user that we're supposed to be sending the notification to.
|
||||
const { notification: { userID } } = notification;
|
||||
|
||||
// Check their confirmed status. This should have already been hit by the
|
||||
// loaders, so we shouldn't make any more database requests.
|
||||
const { errors, data } = await ctx.graphql(USER_CONFIRMATION_QUERY, {
|
||||
userID,
|
||||
});
|
||||
if (errors) {
|
||||
ctx.log.error(
|
||||
{ err: errors },
|
||||
'could not query for user confirmation status'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the first local profile from the user.
|
||||
const profile = find(get(data, 'user.profiles', []), ['provider', 'local']);
|
||||
if (!profile) {
|
||||
ctx.log.warn({ user_id: userID }, 'user did not have a local profile');
|
||||
return;
|
||||
}
|
||||
|
||||
// Pull out the confirmed status from the profile.
|
||||
const confirmed = get(profile, 'confirmedAt', null) !== null;
|
||||
if (!confirmed) {
|
||||
ctx.log.info(
|
||||
{ user_id: userID },
|
||||
'user did not have their local profile confirmed, but had settings enabled, not mailing'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
return notification;
|
||||
};
|
||||
|
||||
// filterVerified performs filtering in a complicated way because we can't use
|
||||
// Promise.all on a Array.prototype.filter call.
|
||||
const filterVerified = async (ctx, notifications) => {
|
||||
notifications = await Promise.all(
|
||||
notifications.map(filterVerifiedNotification(ctx))
|
||||
);
|
||||
|
||||
// This acts as a poor-mans identity filter to remove all falsy values.
|
||||
return notifications.filter(property('notification'));
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
processNewNotifications,
|
||||
filterSuperseded,
|
||||
filterVerified,
|
||||
getNotificationBody,
|
||||
sendNotification,
|
||||
};
|
||||
@@ -12,6 +12,9 @@ module.exports = {
|
||||
}
|
||||
},
|
||||
},
|
||||
NotificationSettings: {
|
||||
digestFrequency: settings => get(settings, 'digestFrequency', 'NONE'),
|
||||
},
|
||||
RootMutation: {
|
||||
async updateNotificationSettings(obj, { input }, { mutators: { User } }) {
|
||||
await User.updateNotificationSettings(input);
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
en:
|
||||
talk-plugin-notifications:
|
||||
templates:
|
||||
digest:
|
||||
subject: "Your latest comment activity at {0}"
|
||||
footer: "You received this notification because you are a commenter on {0} and you opted in to receive notifications."
|
||||
links:
|
||||
unsubscribe: "Unsubscribe from comment notifications"
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
enum DIGEST_FREQUENCY {
|
||||
# NONE will have the notifications send immediatly rather than bundling
|
||||
# for digesting.
|
||||
NONE
|
||||
}
|
||||
|
||||
# NotificationSettings stores all the preferences related to notifications.
|
||||
type NotificationSettings { }
|
||||
type NotificationSettings {
|
||||
digestFrequency: DIGEST_FREQUENCY
|
||||
}
|
||||
|
||||
type User {
|
||||
notificationSettings: NotificationSettings
|
||||
@@ -18,7 +26,8 @@ type UpdateNotificationSettingsResponse implements Response {
|
||||
}
|
||||
|
||||
input NotificationSettingsInput {
|
||||
|
||||
# digestFrequency is the frequency to send notifications.
|
||||
digestFrequency: DIGEST_FREQUENCY
|
||||
}
|
||||
|
||||
type RootMutation {
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
const getOrganizationName = async ctx => {
|
||||
// Grab some useful tools.
|
||||
const { loaders: { Settings } } = ctx;
|
||||
|
||||
// Get the settings.
|
||||
const { organizationName = null } = await Settings.load('organizationName');
|
||||
|
||||
return organizationName;
|
||||
};
|
||||
|
||||
/**
|
||||
* getNotificationBody will return the body for the notification payload.
|
||||
*
|
||||
* @param {Object} ctx the graph context
|
||||
* @param {Object} handler the notification handler
|
||||
* @param {Mixed} context the notification context
|
||||
*/
|
||||
const getNotificationBody = async (ctx, handler, context) => {
|
||||
const { connectors: { services: { I18n: { t } } } } = ctx;
|
||||
const { category, hydrate = () => [] } = handler;
|
||||
|
||||
// Get the body replacement variables for the translation key.
|
||||
const replacements = await hydrate(ctx, category, context);
|
||||
|
||||
// Generate the body.
|
||||
return t(
|
||||
`talk-plugin-notifications.categories.${category}.body`,
|
||||
...replacements
|
||||
);
|
||||
};
|
||||
|
||||
module.exports = { getNotificationBody, getOrganizationName };
|
||||
+1
-1
@@ -11,7 +11,7 @@ function createLogger(name, id = uuid()) {
|
||||
id,
|
||||
version,
|
||||
level: LOGGING_LEVEL,
|
||||
serializers: { req: Logger.stdSerializers.req },
|
||||
serializers: Logger.stdSerializers,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2319,6 +2319,12 @@ create-react-class@^15.5.1, create-react-class@^15.5.2, create-react-class@^15.6
|
||||
loose-envify "^1.3.1"
|
||||
object-assign "^4.1.1"
|
||||
|
||||
cron@^1.3.0:
|
||||
version "1.3.0"
|
||||
resolved "https://registry.yarnpkg.com/cron/-/cron-1.3.0.tgz#7e459968eaf94e1a445be796ce402166c234659d"
|
||||
dependencies:
|
||||
moment-timezone "^0.5.x"
|
||||
|
||||
cross-spawn-promise@^0.10.1:
|
||||
version "0.10.1"
|
||||
resolved "https://registry.yarnpkg.com/cross-spawn-promise/-/cross-spawn-promise-0.10.1.tgz#db9cb4c50c60b72a15be049b78122ce382d87b10"
|
||||
@@ -6756,10 +6762,20 @@ mocha@^3.1.2:
|
||||
mkdirp "0.5.1"
|
||||
supports-color "3.1.2"
|
||||
|
||||
moment-timezone@^0.5.x:
|
||||
version "0.5.14"
|
||||
resolved "https://registry.yarnpkg.com/moment-timezone/-/moment-timezone-0.5.14.tgz#4eb38ff9538b80108ba467a458f3ed4268ccfcb1"
|
||||
dependencies:
|
||||
moment ">= 2.9.0"
|
||||
|
||||
moment@2.x.x:
|
||||
version "2.19.4"
|
||||
resolved "https://registry.yarnpkg.com/moment/-/moment-2.19.4.tgz#17e5e2c6ead8819c8ecfad83a0acccb312e94682"
|
||||
|
||||
"moment@>= 2.9.0":
|
||||
version "2.21.0"
|
||||
resolved "https://registry.yarnpkg.com/moment/-/moment-2.21.0.tgz#2a114b51d2a6ec9e6d83cf803f838a878d8a023a"
|
||||
|
||||
moment@^2.10.3:
|
||||
version "2.19.1"
|
||||
resolved "https://registry.yarnpkg.com/moment/-/moment-2.19.1.tgz#56da1a2d1cbf01d38b7e1afc31c10bcfa1929167"
|
||||
|
||||
Reference in New Issue
Block a user