mirror of
https://github.com/wassname/talk.git
synced 2026-06-29 02:00:29 +08:00
273 lines
7.9 KiB
JavaScript
273 lines
7.9 KiB
JavaScript
const {
|
|
merge,
|
|
map,
|
|
get,
|
|
find,
|
|
groupBy,
|
|
forEach,
|
|
flatten,
|
|
property,
|
|
} = require('lodash');
|
|
const debug = require('debug')('talk-plugin-notifications');
|
|
const { DISABLE_REQUIRE_EMAIL_VERIFICATIONS } = require('./config');
|
|
const { CronJob } = require('cron');
|
|
const { getOrganizationName } = require('./util');
|
|
const {
|
|
processNewNotifications,
|
|
filterSuperseded,
|
|
filterVerified,
|
|
sendNotification,
|
|
} = require('./messages');
|
|
const { renderDigestMessage } = require('./digests');
|
|
|
|
// handleHandlers will call the handle method on each handler to determine if a
|
|
// notification should be sent for it.
|
|
const handleHandlers = (ctx, handlers, ...args) =>
|
|
Promise.all(
|
|
handlers.map(async handler => {
|
|
// Grab the handler reference.
|
|
const { handle, category, event } = handler;
|
|
|
|
try {
|
|
// Attempt to create a notification out of it.
|
|
const notification = await handle(ctx, ...args);
|
|
if (!notification) {
|
|
ctx.log.info(
|
|
{ category, event },
|
|
'no notification deemed by event handler'
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Send the notification back.
|
|
ctx.log.info({ category, event }, 'notification detected for event');
|
|
return { handler, notification };
|
|
} catch (err) {
|
|
ctx.log.error({ err }, 'could not handle the event');
|
|
return;
|
|
}
|
|
})
|
|
);
|
|
|
|
class NotificationManager {
|
|
constructor(context) {
|
|
this.context = context;
|
|
this.registry = [];
|
|
this.digests = [];
|
|
}
|
|
|
|
/**
|
|
* register will include the notification handlers on the manager.
|
|
*
|
|
* @param {Array<Object>} handlers notification handlers to register
|
|
*/
|
|
register(...handlers) {
|
|
this.registry.push(...handlers);
|
|
}
|
|
|
|
/**
|
|
* attach will setup the notifications by walking the registry and loading all
|
|
* the notification types onto the handler.
|
|
*
|
|
* @param {Object} broker the event emitter for the Talk events
|
|
*/
|
|
attach(broker) {
|
|
const events = groupBy(this.registry, 'event');
|
|
|
|
forEach(events, (handlers, event) => {
|
|
debug(
|
|
`will now notify the [${handlers
|
|
.map(({ category }) => category)
|
|
.join(', ')}] handlers when the '${event}' event is emitted`
|
|
);
|
|
broker.on(event, this.handleUserEvent(handlers));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* registerDigests will register the digest handlers.
|
|
*
|
|
* @param {Array<Object>} handlers digest handlers for options related to digesting
|
|
*/
|
|
registerDigests(...handlers) {
|
|
this.digests.push(...handlers);
|
|
}
|
|
|
|
/**
|
|
* startDigesting will register all the digests to run and setup the cron
|
|
* jobs.
|
|
*/
|
|
startDigesting() {
|
|
this.digests.forEach(({ frequency, config }) => {
|
|
new CronJob(
|
|
merge(config, {
|
|
start: true,
|
|
onTick: this.handleDigestEvent(frequency),
|
|
})
|
|
);
|
|
});
|
|
}
|
|
|
|
handleDigestEvent(frequency) {
|
|
return async () => {
|
|
// Create a system context to send down.
|
|
const ctx = this.context.forSystem();
|
|
|
|
try {
|
|
// Pull out some useful tools.
|
|
const {
|
|
connectors: { models: { User }, services: { I18n: { t } } },
|
|
} = ctx;
|
|
|
|
const organizationName = await getOrganizationName(ctx);
|
|
if (!organizationName) {
|
|
ctx.log.error(
|
|
'could not send the notification, organization name not in settings'
|
|
);
|
|
return;
|
|
}
|
|
|
|
const subject = t(
|
|
'talk-plugin-notifications.templates.digest.subject',
|
|
organizationName
|
|
);
|
|
|
|
// Continue to pull from the Users digest until the queue is empty.
|
|
while (true) {
|
|
// Pull notifications from a user that have notifications enabled for
|
|
// `frequency` and currently have notifications.
|
|
const user = await User.findOneAndUpdate(
|
|
{
|
|
'metadata.notifications.settings.digestFrequency': frequency,
|
|
'metadata.notifications.digests': { $exists: true, $ne: [] },
|
|
},
|
|
{ $set: { 'metadata.notifications.digests': [] } }
|
|
);
|
|
if (!user) {
|
|
// There are no more users that meet the search criteria! We're
|
|
// done!
|
|
ctx.log.info('no notifications from database');
|
|
break;
|
|
}
|
|
|
|
// Begin rendering the user's digest.
|
|
const digests = get(user, 'metadata.notifications.digests');
|
|
if (!digests) {
|
|
// We couldn't get the digest from the user (even after Mongo said
|
|
// we would get it?).
|
|
ctx.log.info(
|
|
{ userID: user.id },
|
|
'no notifications from user in database'
|
|
);
|
|
continue;
|
|
}
|
|
|
|
ctx.log.info(
|
|
{ userID: user.id, notifications: digests.length },
|
|
'generating notification digest email'
|
|
);
|
|
|
|
const flattenedDigestCategories = this.flattenDigests(ctx, digests);
|
|
|
|
// Get all the notifications together.
|
|
const allMessages = await renderDigestMessage(
|
|
ctx,
|
|
flattenedDigestCategories
|
|
);
|
|
|
|
// Send the email with the digested body.
|
|
await sendNotification(
|
|
ctx,
|
|
user.id,
|
|
subject,
|
|
flatten(allMessages),
|
|
'notification-digest'
|
|
);
|
|
}
|
|
} catch (err) {
|
|
ctx.log.error({ err }, 'could not handle digests');
|
|
}
|
|
};
|
|
}
|
|
|
|
flattenDigests(ctx, digests) {
|
|
// Digests are store in the database like:
|
|
//
|
|
// [{ notification: { userID, date, context }, category }, ...]
|
|
//
|
|
// So lets group our notifications by category, creating the
|
|
// following:
|
|
//
|
|
// {[category]: [{notification: { userID, date, context }}, ...], ...}
|
|
//
|
|
const groupedDigests = groupBy(digests, 'category');
|
|
|
|
// Lets attach the handler reference onto each of these, so we
|
|
// transform it again to the following:
|
|
//
|
|
// [{ handler, notifications: [{ userID, date, context }]}]
|
|
//
|
|
return Object.keys(groupedDigests)
|
|
.map(category => {
|
|
// Get the handler.
|
|
const handler = find(this.registry, ['category', category]);
|
|
if (!handler) {
|
|
ctx.log.info({ category }, 'notification category not found');
|
|
return;
|
|
}
|
|
// Get the notifications.
|
|
const notifications = map(get(groupedDigests, category), digests =>
|
|
get(digests, 'notification')
|
|
);
|
|
|
|
return { notifications, handler };
|
|
})
|
|
.filter(digest => digest)
|
|
.sort((a, b) => {
|
|
const aDigestOrder = get(a, 'handler.digestOrder', 0);
|
|
const bDigestOrder = get(b, 'handler.digestOrder', 0);
|
|
if (aDigestOrder < bDigestOrder) {
|
|
return -1;
|
|
}
|
|
if (aDigestOrder > bDigestOrder) {
|
|
return 1;
|
|
}
|
|
return 0;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* handleUserEvent will wrap a notification handler and attach it to the
|
|
* notification stream system.
|
|
*
|
|
* @param {Object} handler a notification handler
|
|
*/
|
|
handleUserEvent(handlers) {
|
|
return async (...args) => {
|
|
// Create a system context to send down.
|
|
const ctx = this.context.forSystem();
|
|
|
|
// Get all the notifications to load.
|
|
let notifications = await handleHandlers(ctx, handlers, ...args);
|
|
|
|
// Only let handlers past that have a notification to send.
|
|
notifications = notifications.filter(property('notification'));
|
|
|
|
// Check to see if some of the other notifications that are queued
|
|
// had this notification superseded.
|
|
notifications = notifications.filter(filterSuperseded);
|
|
|
|
// Only let notifications through for users who have their email addresses
|
|
// verified if we are configured to do so.
|
|
if (!DISABLE_REQUIRE_EMAIL_VERIFICATIONS) {
|
|
notifications = await filterVerified(ctx, notifications);
|
|
}
|
|
|
|
// Send the remaining notifications.
|
|
return processNewNotifications(ctx, notifications);
|
|
};
|
|
}
|
|
}
|
|
|
|
module.exports = NotificationManager;
|