Files
talk/src/core/server/queue/index.ts
T
Wyatt Johnson efea0e8e1c [CORL-498, CORL-495, CORL-539, CORL-496, CORL-494] Email Notifications Support & Framework (#2498)
* chore: renamed old templates

* feat: initial notifications support

* feat: email enhancements

* fix: linting

* feat: initial digesting beheviour

* feat: added notification configuration

* feat: added unsubscribe routes

* fix: fixed failing snapshots/tests bc random ids

* feat: adjusted the save beheviour, added tests

* feat: added tests

* feat: added staff replies

* feat: renamed E-Mail to Email

* feat: enhanced cron processing

* fix: linting + updating tests

* feat: enhanced cron context

* fix: added staff replies back in
2019-09-05 07:02:26 +00:00

78 lines
2.3 KiB
TypeScript

import Queue from "bull";
import { Db } from "mongodb";
import { Config } from "coral-server/config";
import { I18n } from "coral-server/services/i18n";
import { JWTSigningConfig } from "coral-server/services/jwt";
import { createRedisClient } from "coral-server/services/redis";
import TenantCache from "coral-server/services/tenant/cache";
import { createMailerTask, MailerQueue } from "./tasks/mailer";
import { createNotifierTask, NotifierQueue } from "./tasks/notifier";
import { createScraperTask, ScraperQueue } from "./tasks/scraper";
const createQueueOptions = async (
config: Config
): Promise<Queue.QueueOptions> => {
const client = createRedisClient(config);
const subscriber = createRedisClient(config);
// Return the options that can be used by the Queue.
return {
// Here, we are reusing the clients based on the requested types. This way,
// any time we need a specific client, we get to use one of the ones that
// already have been created.
createClient: type => {
switch (type) {
case "subscriber":
return subscriber;
case "client":
return client;
case "bclient":
return createRedisClient(config);
}
},
// Because bull uses atomic operations across separate keys, we need to add
// a prefix to the keys to help the Redis cluster place all those elements
// together to support the atomic operations. See:
// https://redis.io/topics/cluster-tutorial
prefix: "{queue}",
};
};
export interface QueueOptions {
mongo: Db;
config: Config;
tenantCache: TenantCache;
i18n: I18n;
signingConfig: JWTSigningConfig;
}
export interface TaskQueue {
mailer: MailerQueue;
scraper: ScraperQueue;
notifier: NotifierQueue;
}
export async function createQueue(options: QueueOptions): Promise<TaskQueue> {
// Create the processor queue options. This holds references to the Redis
// clients that are shared per queue.
const queueOptions = await createQueueOptions(options.config);
// Attach process functions to the various tasks in the queue.
const mailer = createMailerTask(queueOptions, options);
const scraper = createScraperTask(queueOptions, options);
const notifier = createNotifierTask(queueOptions, {
mailerQueue: mailer,
...options,
});
// Return the tasks + client.
return {
mailer,
scraper,
notifier,
};
}