[CORL-1002] Queue Improvements (#2931)

* fix: improved count handler for stories not found

* feat: removed performance-now

* feat: cleaned up processors, exposed counts

* feat: increased verb of schd jobs

* fix: removed dead code
This commit is contained in:
Wyatt Johnson
2020-04-10 16:15:09 +00:00
committed by GitHub
parent 98e6a3ccc7
commit 4194b08a12
33 changed files with 353 additions and 332 deletions
-1
View File
@@ -128,7 +128,6 @@
"passport-local": "^1.0.0",
"passport-oauth2": "^1.4.0",
"passport-strategy": "^1.0.0",
"performance-now": "^2.1.0",
"permit": "^0.2.4",
"prom-client": "^11.3.0",
"proxy-agent": "^3.1.1",
+4 -2
View File
@@ -9,16 +9,18 @@ export type GraphMiddlewareOptions = Pick<
| "config"
| "i18n"
| "mailerQueue"
| "scraperQueue"
| "rejectorQueue"
| "notifierQueue"
| "webhookQueue"
| "mongo"
| "redis"
| "schema"
| "scraperQueue"
| "signingConfig"
| "pubsub"
| "tenantCache"
| "metrics"
| "broker"
| "rejectorQueue"
>;
export const graphQLHandler = ({
@@ -29,13 +29,10 @@ export const countHandler = ({ mongo, i18n }: CountOptions): RequestHandler => {
id: req.query.id,
url: req.query.url,
});
if (!story) {
throw new Error("Story not found");
}
const count = calculateTotalPublishedCommentCount(
story.commentCounts.status
);
const count = story
? calculateTotalPublishedCommentCount(story.commentCounts.status)
: 0;
let html = "";
if (req.query.notext === "true") {
+6 -2
View File
@@ -18,8 +18,10 @@ import { Config } from "coral-server/config";
import CoralEventListenerBroker from "coral-server/events/publisher";
import logger from "coral-server/logger";
import { MailerQueue } from "coral-server/queue/tasks/mailer";
import { NotifierQueue } from "coral-server/queue/tasks/notifier";
import { RejectorQueue } from "coral-server/queue/tasks/rejector";
import { ScraperQueue } from "coral-server/queue/tasks/scraper";
import { WebhookQueue } from "coral-server/queue/tasks/webhook";
import { I18n } from "coral-server/services/i18n";
import { JWTSigningConfig } from "coral-server/services/jwt";
import { Metrics } from "coral-server/services/metrics";
@@ -40,6 +42,10 @@ export interface AppOptions {
disableClientRoutes: boolean;
i18n: I18n;
mailerQueue: MailerQueue;
scraperQueue: ScraperQueue;
rejectorQueue: RejectorQueue;
webhookQueue: WebhookQueue;
notifierQueue: NotifierQueue;
metrics?: Metrics;
mongo: Db;
parent: Express;
@@ -48,12 +54,10 @@ export interface AppOptions {
pubsub: RedisPubSub;
redis: AugmentedRedis;
schema: GraphQLSchema;
scraperQueue: ScraperQueue;
signingConfig: JWTSigningConfig;
tenantCache: TenantCache;
migrationManager: MigrationManager;
broker: CoralEventListenerBroker;
rejectorQueue: RejectorQueue;
}
/**
+4 -5
View File
@@ -1,12 +1,12 @@
import { Request, Response } from "express";
import onFinished from "on-finished";
import now from "performance-now";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import {
ErrorRequestHandler,
RequestHandler,
} from "coral-server/types/express";
import { Request, Response } from "express";
const extractMetadata = (req: Request, res: Response) => ({
url: req.originalUrl || req.url,
@@ -18,11 +18,10 @@ const extractMetadata = (req: Request, res: Response) => ({
});
export const accessLogger: RequestHandler = (req, res, next) => {
const startTime = now();
const timer = createTimer();
onFinished(res, () => {
// Compute the end time.
const responseTime = Math.round(now() - startTime);
const responseTime = timer();
// Grab the logger.
const log = req.coral ? req.coral.logger : logger;
+3 -4
View File
@@ -1,6 +1,6 @@
import onFinished from "on-finished";
import now from "performance-now";
import { createTimer } from "coral-server/helpers";
import { Metrics } from "coral-server/services/metrics";
import { RequestHandler } from "coral-server/types/express";
@@ -11,11 +11,10 @@ export const metricsRecorder = ({
httpRequestDurationMilliseconds,
}: Metrics): RequestHandler => {
return (req, res, next) => {
const startTime = now();
const timer = createTimer();
onFinished(res, () => {
// Compute the end time.
const responseTime = Math.round(now() - startTime);
const responseTime = timer();
// Increment the request counter.
httpRequestsTotal.labels(`${res.statusCode}`, req.method).inc();
+11 -25
View File
@@ -94,49 +94,43 @@ const config = convict({
format: String,
default: "",
env: "TRUST_PROXY",
arg: "trustProxy",
},
enable_graphiql: {
doc: "When true, this will enable the GraphiQL interface at /graphiql",
format: Boolean,
default: false,
env: "ENABLE_GRAPHIQL",
arg: "enableGraphiQL",
},
concurrency: {
doc: "The number of worker nodes to spawn to handle traffic",
format: Number,
default: os.cpus().length,
env: "CONCURRENCY",
arg: "concurrency",
},
port: {
doc: "The port to bind.",
format: "port",
default: 3000,
env: "PORT",
arg: "port",
},
cluster_metrics_port: {
doc: "The port to bind for cluster metrics.",
format: "port",
default: 3001,
env: "CLUSTER_METRICS_PORT",
arg: "clusterMetricsPort",
},
metrics_username: {
doc: "The username to use to authenticate to the metrics endpoint.",
format: "String",
default: "",
env: "METRICS_USERNAME",
arg: "metricsUsername",
},
metrics_password: {
doc: "The password to use to authenticate to the metrics endpoint.",
format: "String",
default: "",
env: "METRICS_PASSWORD",
arg: "metricsPassword",
sensitive: true,
},
dev_port: {
@@ -144,14 +138,13 @@ const config = convict({
format: "port",
default: 8080,
env: "DEV_PORT",
arg: "dev-port",
},
mongodb: {
doc: "The MongoDB database to connect to.",
format: "mongo-uri",
default: "mongodb://127.0.0.1:27017/coral",
env: "MONGODB_URI",
arg: "mongodb",
sensitive: true,
},
redis: {
@@ -159,7 +152,7 @@ const config = convict({
format: "redis-uri",
default: "redis://127.0.0.1:6379",
env: "REDIS_URI",
arg: "redis",
sensitive: true,
},
redis_options: {
@@ -167,7 +160,6 @@ const config = convict({
format: Object,
default: {},
env: "REDIS_OPTIONS",
arg: "redisOptions",
},
signing_secret: {
doc:
@@ -175,7 +167,7 @@ const config = convict({
format: "*",
default: "keyboard cat", // TODO: (wyattjoh) evaluate best solution
env: "SIGNING_SECRET",
arg: "signingSecret",
sensitive: true,
},
signing_algorithm: {
@@ -183,14 +175,13 @@ const config = convict({
format: algorithms,
default: "HS256",
env: "SIGNING_ALGORITHM",
arg: "signingAlgorithm",
},
management_signing_secret: {
doc: "The secret used to verify management API requests.",
format: "*",
default: null,
env: "MANAGEMENT_SIGNING_SECRET",
arg: "managementSigningSecret",
sensitive: true,
},
management_signing_algorithm: {
@@ -198,21 +189,18 @@ const config = convict({
format: algorithms,
default: "HS256",
env: "MANAGEMENT_SIGNING_ALGORITHM",
arg: "managementSigningAlgorithm",
},
logging_level: {
doc: "The logging level to print to the console",
format: ["fatal", "error", "warn", "info", "debug", "trace"],
default: "info",
env: "LOGGING_LEVEL",
arg: "logging",
},
static_uri: {
doc: "The URL that static assets will be hosted from",
format: "optional-url",
default: "",
env: "STATIC_URI",
arg: "staticURI",
},
websocket_keep_alive_timeout: {
doc:
@@ -220,7 +208,6 @@ const config = convict({
format: "ms",
default: "30 seconds",
env: "WEBSOCKET_KEEP_ALIVE_TIMEOUT",
arg: "websocketKeepAliveTimeout",
},
disable_tenant_caching: {
doc:
@@ -228,7 +215,6 @@ const config = convict({
format: Boolean,
default: false,
env: "DISABLE_TENANT_CACHING",
arg: "disableTenantCaching",
},
disable_live_updates: {
doc:
@@ -236,7 +222,6 @@ const config = convict({
format: Boolean,
default: false,
env: "DISABLE_LIVE_UPDATES",
arg: "disableLiveUpdates",
},
disable_client_routes: {
doc:
@@ -244,7 +229,6 @@ const config = convict({
format: Boolean,
default: false,
env: "DISABLE_CLIENT_ROUTES",
arg: "disableClientRoutes",
},
disable_rate_limiters: {
doc:
@@ -252,14 +236,12 @@ const config = convict({
format: Boolean,
default: false,
env: "DISABLE_RATE_LIMITERS",
arg: "disableRateLimiters",
},
scrape_timeout: {
doc: "The request timeout (in ms) for scraping operations.",
format: "ms",
default: "10 seconds",
env: "SCRAPE_TIMEOUT",
arg: "scrapeTimeout",
},
perspective_timeout: {
doc:
@@ -267,7 +249,6 @@ const config = convict({
format: "ms",
default: "800 milliseconds",
env: "PERSPECTIVE_TIMEOUT",
arg: "perspectiveTimeout",
},
force_ssl: {
doc:
@@ -275,7 +256,12 @@ const config = convict({
format: Boolean,
default: false,
env: "FORCE_SSL",
arg: "forceSSL",
},
disable_job_processors: {
doc: "Disables job processors when running.",
format: Boolean,
default: false,
env: "DISABLE_JOB_PROCESSORS",
},
});
+5 -7
View File
@@ -1,7 +1,7 @@
import { CronCommand, CronJob } from "cron";
import now from "performance-now";
import uuid from "uuid";
import { createTimer } from "coral-server/helpers";
import logger, { Logger } from "coral-server/logger";
export type ScheduledJobCommand<T extends {}> = (
@@ -40,18 +40,16 @@ export class ScheduledJob<T extends {} = {}> {
private command(command: ScheduledJobCommand<T>): CronCommand {
return async () => {
const log = this.log.child({ scheduledExecutionID: uuid.v1() }, true);
log.debug("now starting scheduled job");
const start = now();
log.info("now starting scheduled job");
const timer = createTimer();
try {
await command({
...this.context,
log,
});
const processingTime = Math.floor(now() - start);
log.debug({ processingTime }, "now finished scheduled job");
log.info({ took: timer() }, "now finished scheduled job");
} catch (err) {
const processingTime = Math.floor(now() - start);
log.error({ err, processingTime }, "failed to run scheduled job");
log.error({ err, took: timer() }, "failed to run scheduled job");
}
};
}
+12 -4
View File
@@ -12,8 +12,10 @@ import { PersistedQuery } from "coral-server/models/queries";
import { Tenant } from "coral-server/models/tenant";
import { User } from "coral-server/models/user";
import { MailerQueue } from "coral-server/queue/tasks/mailer";
import { NotifierQueue } from "coral-server/queue/tasks/notifier";
import { RejectorQueue } from "coral-server/queue/tasks/rejector";
import { ScraperQueue } from "coral-server/queue/tasks/scraper";
import { WebhookQueue } from "coral-server/queue/tasks/webhook";
import { I18n } from "coral-server/services/i18n";
import { JWTSigningConfig } from "coral-server/services/jwt";
import { AugmentedRedis } from "coral-server/services/redis";
@@ -38,11 +40,13 @@ export interface GraphContextOptions {
config: Config;
i18n: I18n;
mailerQueue: MailerQueue;
rejectorQueue: RejectorQueue;
scraperQueue: ScraperQueue;
webhookQueue: WebhookQueue;
notifierQueue: NotifierQueue;
mongo: Db;
pubsub: RedisPubSub;
redis: AugmentedRedis;
rejectorQueue: RejectorQueue;
scraperQueue: ScraperQueue;
tenant: Tenant;
tenantCache: TenantCache;
broker: CoralEventListenerBroker;
@@ -58,13 +62,15 @@ export default class GraphContext {
public readonly loaders: ReturnType<typeof loaders>;
public readonly logger: Logger;
public readonly mailerQueue: MailerQueue;
public readonly rejectorQueue: RejectorQueue;
public readonly scraperQueue: ScraperQueue;
public readonly webhookQueue: WebhookQueue;
public readonly notifierQueue: NotifierQueue;
public readonly mongo: Db;
public readonly mutators: ReturnType<typeof mutators>;
public readonly now: Date;
public readonly pubsub: RedisPubSub;
public readonly redis: AugmentedRedis;
public readonly rejectorQueue: RejectorQueue;
public readonly scraperQueue: ScraperQueue;
public readonly tenant: Tenant;
public readonly tenantCache: TenantCache;
@@ -98,6 +104,8 @@ export default class GraphContext {
this.scraperQueue = options.scraperQueue;
this.mailerQueue = options.mailerQueue;
this.rejectorQueue = options.rejectorQueue;
this.notifierQueue = options.notifierQueue;
this.webhookQueue = options.webhookQueue;
this.signingConfig = options.signingConfig;
this.clientID = options.clientID;
@@ -4,9 +4,9 @@ import {
GraphQLExtension,
GraphQLResponse,
} from "graphql-extensions";
import now from "performance-now";
import GraphContext from "coral-server/graph/context";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import { getOperationMetadata, getPersistedQueryMetadata } from "./helpers";
@@ -84,17 +84,14 @@ export class LoggerExtension implements GraphQLExtension<GraphContext> {
if (o.executionArgs.contextValue) {
// Grab the start time so we can calculate the time it takes to execute
// the graph query.
const startTime = now();
const timer = createTimer();
return () => {
// Compute the end time.
const responseTime = Math.round(now() - startTime);
// Log out the details of the request.
logQuery(
o.executionArgs.contextValue,
o.executionArgs.document,
undefined,
responseTime
timer()
);
};
}
@@ -1,8 +1,9 @@
import GraphContext from "coral-server/graph/context";
import { Metrics } from "coral-server/services/metrics";
import { ExecutionArgs } from "graphql";
import { EndHandler, GraphQLExtension } from "graphql-extensions";
import now from "performance-now";
import GraphContext from "coral-server/graph/context";
import { createTimer } from "coral-server/helpers";
import { Metrics } from "coral-server/services/metrics";
import { getOperationMetadata } from "./helpers";
@@ -16,10 +17,10 @@ export class MetricsExtension implements GraphQLExtension<GraphContext> {
if (o.executionArgs.contextValue) {
// Grab the start time so we can calculate the time it takes to execute
// the graph query.
const startTime = now();
const timer = createTimer();
return () => {
// Compute the end time.
const responseTime = Math.round(now() - startTime);
const responseTime = timer();
// Get the request metadata.
const { operation, operationName } = getOperationMetadata(
+1
View File
@@ -28,4 +28,5 @@ export const Query: Required<GQLQueryTypeResolver<void>> = {
sites: (source, args, ctx) => ctx.loaders.Sites.connection(args),
site: (source, { id }, ctx) => (id ? ctx.loaders.Sites.site.load(id) : null),
webhookEndpoint: (source, { id }, ctx) => getWebhookEndpoint(ctx.tenant, id),
queues: () => ({}),
};
+12
View File
@@ -0,0 +1,12 @@
import {
GQLQueueCounts,
GQLQueueTypeResolver,
} from "../schema/__generated__/types";
export interface QueueInput {
counts(): Promise<GQLQueueCounts>;
}
export const Queue: Required<GQLQueueTypeResolver<QueueInput>> = {
counts: t => t.counts(),
};
+23
View File
@@ -0,0 +1,23 @@
import { GQLQueuesTypeResolver } from "../schema/__generated__/types";
import GraphContext from "../context";
import { QueueInput } from "./Queue";
/**
* get produces a resolver that maps the context to a QueueInput.
*
* @param fn the function to map the ctx to the queue.
*/
const get = (fn: (ctx: GraphContext) => QueueInput) => (
parent: any,
args: any,
ctx: GraphContext
) => fn(ctx);
export const Queues: Required<GQLQueuesTypeResolver> = {
mailer: get(ctx => ctx.mailerQueue),
scraper: get(ctx => ctx.scraperQueue),
notifier: get(ctx => ctx.notifierQueue),
webhook: get(ctx => ctx.webhookQueue),
rejector: get(ctx => ctx.rejectorQueue),
};
+4
View File
@@ -35,6 +35,8 @@ import { PremodStatus } from "./PremodStatus";
import { PremodStatusHistory } from "./PremodStatusHistory";
import { Profile } from "./Profile";
import { Query } from "./Query";
import { Queue } from "./Queue";
import { Queues } from "./Queues";
import { RecentCommentHistory } from "./RecentCommentHistory";
import { RejectCommentPayload } from "./RejectCommentPayload";
import { Secret } from "./Secret";
@@ -99,6 +101,8 @@ const Resolvers: GQLResolver = {
Tag,
Time,
User,
Queue,
Queues,
UsernameHistory,
UsernameStatus,
UserStatus,
@@ -2997,6 +2997,62 @@ type StoriesConnection {
pageInfo: PageInfo!
}
################################################################################
## Queue
################################################################################
type QueueCounts {
"""
waiting is the number of jobs that are in line to be processed.
"""
waiting: Int!
"""
active is the number of jobs that are being activly processed.
"""
active: Int!
"""
delayed is the number of jobs that have been delayed due to a failure and are
waiting for a backoff.
"""
delayed: Int!
}
type Queue {
"""
counts is the current counts associated with the Queue.
"""
counts: QueueCounts!
}
type Queues {
"""
mailer is the Queue associated with the Mailer queue.
"""
mailer: Queue!
"""
scraper is the Queue associated with the Scraper queue.
"""
scraper: Queue!
"""
notifier is the Queue associated with the Notifier queue.
"""
notifier: Queue!
"""
webhook is the Queue associated with the Webhook queue.
"""
webhook: Queue!
"""
rejector is the Queue associated with the Rejector queue.
"""
rejector: Queue!
}
################################################################################
## Query
################################################################################
@@ -3119,6 +3175,11 @@ type Query {
webhookEndpint will return a specific WebhookEndpoint if it exists.
"""
webhookEndpoint(id: ID!): WebhookEndpoint @auth(roles: [ADMIN])
"""
queues returns information on queues used in Coral to manage
"""
queues: Queues! @auth(roles: [ADMIN])
}
################################################################################
+14
View File
@@ -0,0 +1,14 @@
/**
* createTimer will create a new timer that can be called again to get the
* milliseconds since the timer was created.
*
*/
const createTimer = () => {
const start = Date.now();
return () => {
const finish = Date.now();
return finish - start;
};
};
export default createTimer;
+1
View File
@@ -0,0 +1 @@
export { default as createTimer } from "./createTimer";
+14 -6
View File
@@ -201,7 +201,7 @@ class Server {
await this.tenantCache.primeAll();
// Create the Job Queue.
this.tasks = await createQueue({
this.tasks = createQueue({
config: this.config,
mongo: this.mongo,
redis: this.redis,
@@ -253,11 +253,17 @@ class Server {
await this.persistedQueryCache.prime();
// Launch all of the job processors.
this.tasks.mailer.process();
this.tasks.scraper.process();
this.tasks.notifier.process();
this.tasks.webhook.process();
this.tasks.rejector.process();
if (!this.config.get("disable_job_processors")) {
logger.info("job processing is enabled, starting job processors");
this.tasks.mailer.process();
this.tasks.scraper.process();
this.tasks.notifier.process();
this.tasks.webhook.process();
this.tasks.rejector.process();
} else {
logger.info("job processing is disabled, not starting job processors");
}
// Start up the cron job processors.
this.scheduledTasks = startScheduledTasks({
@@ -360,6 +366,8 @@ class Server {
mailerQueue: this.tasks.mailer,
scraperQueue: this.tasks.scraper,
rejectorQueue: this.tasks.rejector,
webhookQueue: this.tasks.webhook,
notifierQueue: this.tasks.notifier,
disableClientRoutes,
persistedQueryCache: this.persistedQueryCache,
persistedQueriesRequired:
+3 -6
View File
@@ -1,11 +1,11 @@
import { isEmpty } from "lodash";
import { Db } from "mongodb";
import performanceNow from "performance-now";
import uuid from "uuid";
import { Omit, Sub } from "coral-common/types";
import { dotize } from "coral-common/utils/dotize";
import { CommentNotFoundError } from "coral-server/errors";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import {
EncodedCommentActionCounts,
@@ -936,7 +936,7 @@ export async function retrieveStoryCommentTagCounts(
}
// Get the start time.
const startTime = performanceNow();
const timer = createTimer();
// Load the counts from the database for this particular tag query.
const cursor = collection<{
@@ -956,12 +956,9 @@ export async function retrieveStoryCommentTagCounts(
// Get all of the counts.
const tags = await cursor.toArray();
// Compute the end time.
const responseTime = Math.round(performanceNow() - startTime);
// Logging at the info level here to ensure we track any degrading performance
// issues from this query.
logger.info({ responseTime, filter: $match }, "counting tags");
logger.info({ responseTime: timer(), filter: $match }, "counting tags");
// For each of the storyIDs...
return storyIDs.map(storyID => {
+22 -9
View File
@@ -1,8 +1,10 @@
import Queue, { Job, Queue as QueueType } from "bull";
import Queue, { Job, JobCounts, Queue as QueueType } from "bull";
import Logger from "bunyan";
import TIME from "coral-common/time";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import { TenantResource } from "coral-server/models/tenant";
export type JobProcessor<T, U = void> = (job: Job<T>) => Promise<U>;
@@ -13,10 +15,10 @@ export interface TaskOptions<T, U = void> {
queue: Queue.QueueOptions;
}
export default class Task<T, U = any> {
private options: TaskOptions<T, U>;
private queue: QueueType<T>;
private log: Logger;
export default class Task<T extends TenantResource, U = any> {
private readonly options: Required<TaskOptions<T, U>>;
private readonly queue: QueueType<T>;
private readonly log: Logger;
constructor({
jobName,
@@ -34,6 +36,9 @@ export default class Task<T, U = any> {
// with completed entries if we don't need to.
removeOnComplete: true,
// Remove the job if it fails after all attempts.
removeOnFail: true,
// By default, configure jobs to use an exponential backoff strategy.
backoff: {
type: "exponential",
@@ -52,6 +57,10 @@ export default class Task<T, U = any> {
// TODO: (wyattjoh) attach event handlers to the queue for metrics via: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events
}
public async counts(): Promise<JobCounts> {
return this.queue.getJobCounts();
}
/**
* Add will add the job to the queue to get processed. It's not needed to
* handle the job after it has been created.
@@ -62,7 +71,7 @@ export default class Task<T, U = any> {
// Create the job.
const job = await this.queue.add(data, this.options.jobOptions);
this.log.trace({ jobID: job.id }, "added job to queue");
this.log.info({ jobID: job.id }, "added job to queue");
return job;
}
@@ -77,15 +86,19 @@ export default class Task<T, U = any> {
true
);
log.trace("processing job from queue");
const timer = createTimer();
log.info("processing job from queue");
try {
// Send the job off to the job processor to be handled.
const promise: U = await this.options.jobProcessor(job);
log.trace("processing completed");
// Log it!
log.info({ took: timer() }, "processing completed");
return promise;
} catch (err) {
log.error({ err }, "job failed to process");
log.error({ err, took: timer() }, "job failed to process");
throw err;
}
});
+6 -6
View File
@@ -13,9 +13,7 @@ import { createRejectorTask, RejectorQueue } from "./tasks/rejector";
import { createScraperTask, ScraperQueue } from "./tasks/scraper";
import { createWebhookTask, WebhookQueue } from "./tasks/webhook";
const createQueueOptions = async (
config: Config
): Promise<Queue.QueueOptions> => {
const createQueueOptions = (config: Config): Queue.QueueOptions => {
const client = createRedisClient(config);
const subscriber = createRedisClient(config);
@@ -60,10 +58,13 @@ export interface TaskQueue {
rejector: RejectorQueue;
}
export async function createQueue(options: QueueOptions): Promise<TaskQueue> {
export function createQueue(options: QueueOptions): TaskQueue {
// Pull some options out.
const { config } = options;
// Create the processor queue options. This holds references to the Redis
// clients that are shared per queue.
const queueOptions = await createQueueOptions(options.config);
const queueOptions = createQueueOptions(config);
// Attach process functions to the various tasks in the queue.
const mailer = createMailerTask(queueOptions, options);
@@ -73,7 +74,6 @@ export async function createQueue(options: QueueOptions): Promise<TaskQueue> {
...options,
});
const webhook = createWebhookTask(queueOptions, options);
const rejector = createRejectorTask(queueOptions, options);
// Return the tasks + client.
-70
View File
@@ -1,70 +0,0 @@
import Queue, { Job, Queue as QueueType } from "bull";
import logger from "coral-server/logger";
export interface TaskOptions<T, U = any> {
jobName: string;
jobProcessor: (job: Job<T>) => Promise<U>;
queue: Queue.QueueOptions;
}
export default class Task<T, U = any> {
private options: TaskOptions<T, U>;
private queue: QueueType<T>;
constructor(options: TaskOptions<T, U>) {
this.queue = new Queue(options.jobName, options.queue);
this.options = options;
// Sets up and attaches the job processor to the queue.
this.setupAndAttachProcessor();
}
/**
* Add will add the job to the queue to get processed. It's not needed to
* handle the job after it has been created.
*
* @param data the data for the job to add.
*/
public async add(data: T) {
const job = await this.queue.add(data, {
// We always remove the job when it's complete, no need to fill up Redis
// with completed entries if we don't need to.
removeOnComplete: true,
});
logger.trace(
{ jobID: job.id, jobName: this.options.jobName },
"added job to queue"
);
return job;
}
private setupAndAttachProcessor() {
this.queue.process(async (job: Job<T>) => {
logger.trace(
{ jobID: job.id, jobName: this.options.jobName },
"processing job from queue"
);
try {
// Send the job off to the job processor to be handled.
const promise: U = await this.options.jobProcessor(job);
logger.trace(
{ jobID: job.id, jobName: this.options.jobName },
"processing completed"
);
return promise;
} catch (err) {
logger.error({ err }, "failed to process job from queue");
throw err;
}
});
logger.trace(
{ jobName: this.options.jobName },
"registered processor for job type"
);
}
}
+7 -4
View File
@@ -1,6 +1,6 @@
import Queue from "bull";
import now from "performance-now";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import Task from "coral-server/queue/Task";
import MailerContent from "coral-server/queue/tasks/mailer/content";
@@ -37,6 +37,10 @@ export class MailerQueue {
this.tenantCache = options.tenantCache;
}
public async counts() {
return this.task.counts();
}
public async add({ template, tenantID, message: { to } }: MailerInput) {
const log = logger.child(
{
@@ -61,7 +65,7 @@ export class MailerQueue {
return;
}
const startTemplateGenerationTime = now();
const timer = createTimer();
let html: string;
try {
@@ -74,8 +78,7 @@ export class MailerQueue {
}
// Compute the end time.
const responseTime = Math.round(now() - startTemplateGenerationTime);
log.trace({ responseTime }, "finished template generation");
log.trace({ took: timer() }, "finished template generation");
// Return the job that'll add the email to the queue to be processed later.
return this.task.add({
@@ -11,11 +11,11 @@ import { camelCase, isNil } from "lodash";
import { Db } from "mongodb";
import { createTransport } from "nodemailer";
import { Options } from "nodemailer/lib/smtp-connection";
import now from "performance-now";
import { LanguageCode } from "coral-common/helpers/i18n/locales";
import { Config } from "coral-server/config";
import { InternalError } from "coral-server/errors";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import { Tenant } from "coral-server/models/tenant";
import { I18n, translate } from "coral-server/services/i18n";
@@ -245,7 +245,7 @@ export const createJobProcessor = (options: MailProcessorOptions) => {
// Construct the fromAddress.
const fromAddress = fromName ? `${fromName} <${fromEmail}>` : fromEmail;
const startTemplateGenerationTime = now();
const templateGenerationTimer = createTimer();
// Get the message to send.
let message: Message;
@@ -261,9 +261,10 @@ export const createJobProcessor = (options: MailProcessorOptions) => {
throw new InternalError(e, "could not translate the message");
}
// Compute the end time.
const responseTime = Math.round(now() - startTemplateGenerationTime);
log.trace({ responseTime }, "finished mail translation");
log.trace(
{ responseTime: templateGenerationTimer() },
"finished mail translation"
);
let transport = cache.get(tenantID);
if (!transport) {
@@ -300,7 +301,7 @@ export const createJobProcessor = (options: MailProcessorOptions) => {
log.debug("starting to send the email");
const startMessageSendTime = now();
const messageSendTimer = createTimer();
try {
// Send the mail message.
@@ -309,8 +310,6 @@ export const createJobProcessor = (options: MailProcessorOptions) => {
throw new InternalError(e, "could not send email");
}
// Compute the end time.
const messageSendResponseTime = Math.round(now() - startMessageSendTime);
log.debug({ responseTime: messageSendResponseTime }, "sent the email");
log.debug({ responseTime: messageSendTimer() }, "sent the email");
};
};
+23 -39
View File
@@ -22,46 +22,30 @@ interface Options {
signingConfig: JWTSigningConfig;
}
/**
* NotifierQueue is designed to handle creating and queuing notifications
* that could be sent to users.
*/
export class NotifierQueue {
private task: Task<NotifierData>;
constructor(queue: Queue.QueueOptions, options: Options) {
const registry = new Map<CoralEventType, NotificationCategory[]>();
// Notification categories have been grouped by their event name so that
// each event emitted need only access the associated notification once.
for (const category of categories) {
for (const event of category.events as CoralEventType[]) {
let handlers = registry.get(event);
if (!handlers) {
handlers = [];
}
handlers.push(category);
registry.set(event, handlers);
}
}
this.task = new Task({
jobName: JOB_NAME,
jobProcessor: createJobProcessor({ registry, ...options }),
queue,
});
}
public async add(data: NotifierData) {
return this.task.add(data);
}
public process() {
return this.task.process();
}
}
export type NotifierQueue = Task<NotifierData>;
export const createNotifierTask = (
queue: Queue.QueueOptions,
options: Options
) => new NotifierQueue(queue, options);
) => {
const registry = new Map<CoralEventType, NotificationCategory[]>();
// Notification categories have been grouped by their event name so that
// each event emitted need only access the associated notification once.
for (const category of categories) {
for (const event of category.events as CoralEventType[]) {
let handlers = registry.get(event);
if (!handlers) {
handlers = [];
}
handlers.push(category);
registry.set(event, handlers);
}
}
return new Task({
jobName: JOB_NAME,
jobProcessor: createJobProcessor({ registry, ...options }),
queue,
});
};
@@ -79,51 +79,42 @@ export const createJobProcessor = ({
log.debug("starting to handle a notify operation");
try {
// Get all the handlers that are active for this channel.
const categories = registry.get(input.type);
if (!categories || categories.length === 0) {
return;
}
// Grab the tenant from the cache.
const tenant = await tenantCache.retrieveByID(tenantID);
if (!tenant) {
throw new Error("tenant not found with ID");
}
// Create a notification context to handle processing notifications.
const ctx = new NotificationContext({
mongo,
config,
signingConfig,
tenant,
now,
});
// For each of the handler's we need to process, we should iterate to
// generate their notifications.
let notifications = await handleHandlers(ctx, categories, input);
// Check to see if some of the other notifications that are queued
// had this notification superseded.
notifications = notifications.filter(filterSuperseded);
// Send all the notifications now.
await processNewNotifications(
ctx,
notifications.map(({ notification }) => notification),
mailerQueue
);
log.debug(
{ notifications: notifications.length },
"notifications handled"
);
} catch (err) {
log.error({ err }, "could not handle the notifications");
throw err;
// Get all the handlers that are active for this channel.
const categories = registry.get(input.type);
if (!categories || categories.length === 0) {
return;
}
// Grab the tenant from the cache.
const tenant = await tenantCache.retrieveByID(tenantID);
if (!tenant) {
throw new Error("tenant not found with ID");
}
// Create a notification context to handle processing notifications.
const ctx = new NotificationContext({
mongo,
config,
signingConfig,
tenant,
now,
});
// For each of the handler's we need to process, we should iterate to
// generate their notifications.
let notifications = await handleHandlers(ctx, categories, input);
// Check to see if some of the other notifications that are queued
// had this notification superseded.
notifications = notifications.filter(filterSuperseded);
// Send all the notifications now.
await processNewNotifications(
ctx,
notifications.map(({ notification }) => notification),
mailerQueue
);
log.debug({ notifications: notifications.length }, "notifications handled");
};
};
+32 -32
View File
@@ -1,7 +1,7 @@
import Queue, { Job } from "bull";
import { Db } from "mongodb";
import now from "performance-now";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import {
Comment,
@@ -61,49 +61,49 @@ const createJobProcessor = ({
true
);
// Mark the start time.
const startTime = now();
const timer = createTimer();
log.debug("starting to reject author comments");
// Get the tenant.
const tenant = await tenantCache.retrieveByID(tenantID);
if (!tenant) {
log.error("referenced tenant was not found");
return;
}
// Get the current time.
const currentTime = new Date();
try {
// Find all comments written by the author that should be rejected.
let connection = await getBatch(mongo, tenantID, authorID);
while (connection.nodes.length > 0) {
for (const comment of connection.nodes) {
// Get the latest revision of the comment.
const revision = getLatestRevision(comment);
// Reject the comment.
await rejectComment(
mongo,
redis,
null,
tenant,
comment.id,
revision.id,
moderatorID,
currentTime
);
}
// If there was not another page, abort processing.
if (!connection.pageInfo.hasNextPage) {
break;
}
// Load the next page.
connection = await getBatch(mongo, tenantID, authorID, connection);
// Find all comments written by the author that should be rejected.
let connection = await getBatch(mongo, tenantID, authorID);
while (connection.nodes.length > 0) {
for (const comment of connection.nodes) {
// Get the latest revision of the comment.
const revision = getLatestRevision(comment);
// Reject the comment.
await rejectComment(
mongo,
redis,
null,
tenant,
comment.id,
revision.id,
moderatorID,
currentTime
);
}
} catch (err) {
log.error({ err }, "could not reject the author's comments");
throw err;
// If there was not another page, abort processing.
if (!connection.pageInfo.hasNextPage) {
break;
}
// Load the next page.
connection = await getBatch(mongo, tenantID, authorID, connection);
}
// Compute the end time.
const took = Math.round(now() - startTime);
log.debug({ took }, "rejected the author's comments");
log.debug({ took: timer() }, "rejected the author's comments");
};
export type RejectorQueue = Task<RejectorData>;
+4 -13
View File
@@ -1,8 +1,8 @@
import Queue, { Job } from "bull";
import { Db } from "mongodb";
import now from "performance-now";
import { Config } from "coral-server/config";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import Task from "coral-server/queue/Task";
import { scrape } from "coral-server/services/stories/scraper";
@@ -39,22 +39,13 @@ const createJobProcessor = ({
);
// Mark the start time.
const startTime = now();
const timer = createTimer();
log.debug("starting to scrape the story");
try {
await scrape(mongo, config, tenantID, storyID, storyURL);
} catch (err) {
log.error({ err }, "could not scrape the story");
throw err;
}
await scrape(mongo, config, tenantID, storyID, storyURL);
// Compute the end time.
const responseTime = Math.round(now() - startTime);
log.debug({ responseTime }, "scraped the story");
log.debug({ responseTime: timer() }, "scraped the story");
};
export type ScraperQueue = Task<ScraperData>;
@@ -1,10 +1,10 @@
import crypto from "crypto";
import { Redis } from "ioredis";
import { Db } from "mongodb";
import getNow from "performance-now";
import { Config } from "coral-server/config";
import { CoralEventPayload } from "coral-server/events/event";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import {
filterActiveSecrets,
@@ -170,17 +170,16 @@ export function createJobProcessor({
);
// Send the request.
const startedSendingAt = getNow();
const timer = createTimer();
const res = await fetch(endpoint.url, options);
const took = getNow() - startedSendingAt;
if (res.ok) {
log.info(
{ took, responseStatus: res.status },
{ took: timer(), responseStatus: res.status },
"finished sending webhook"
);
} else {
log.warn(
{ took, responseStatus: res.status },
{ took: timer(), responseStatus: res.status },
"failed to deliver webhook"
);
}
@@ -1,8 +1,6 @@
import ms from "ms";
import now from "performance-now";
import { LanguageCode } from "coral-common/helpers";
import createWordListRegExp from "coral-common/utils/createWordListRegExp";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import { Tenant } from "coral-server/models/tenant";
@@ -54,10 +52,10 @@ export class WordList {
// it.
let lists = this.cache.get(options);
if (!lists) {
const startedAt = now();
const timer = createTimer();
lists = this.create(options);
logger.info(
{ tenantID: options.id, took: ms(now() - startedAt) },
{ tenantID: options.id, took: timer() },
"regenerated word list cache"
);
@@ -88,10 +86,10 @@ export class WordList {
return false;
}
const startedAt = now();
const timer = createTimer();
const result = list.test(testString);
logger.info(
{ tenantID: options.id, listName, took: ms(now() - startedAt) },
{ tenantID: options.id, listName, took: timer() },
"word list phrase test complete"
);
+3 -3
View File
@@ -1,8 +1,8 @@
import { merge } from "lodash";
import { Collection, Db, IndexOptions } from "mongodb";
import now from "performance-now";
import { Writable } from "coral-common/types";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import collections from "../mongodb/collections";
@@ -33,11 +33,11 @@ export async function createIndex<T>(
try {
// Try to create the index.
const start = now();
const timer = createTimer();
log.debug({ indexSpec, indexOptions }, "creating index");
const indexName = await collection.createIndex(indexSpec, indexOptions);
log.debug(
{ indexName, indexSpec, indexOptions, took: Math.round(now() - start) },
{ indexName, indexSpec, indexOptions, took: timer() },
"index was created"
);
+13 -11
View File
@@ -2,9 +2,9 @@ import fs from "fs-extra";
import { Redis } from "ioredis";
import { Db } from "mongodb";
import path from "path";
import now from "performance-now";
import uuid from "uuid";
import { createTimer } from "coral-server/helpers";
import logger from "coral-server/logger";
import {
failMigration,
@@ -177,7 +177,7 @@ export default class Manager {
logger.info({ pending: pending.length }, "executing pending migrations");
const migrationsStartTime = now();
const migrationsStartTimer = createTimer();
for (const migration of pending) {
let log = logger.child(
@@ -196,11 +196,13 @@ export default class Manager {
// Apply any index changes for the migration.
if (migration.indexes) {
const migrationStartTime = now();
const migrationStartTime = createTimer();
log.info("starting index migration");
await migration.indexes(mongo);
const executionTime = Math.round(now() - migrationStartTime);
log.info({ executionTime }, "finished index migration");
log.info(
{ executionTime: migrationStartTime() },
"finished index migration"
);
}
if (migration.up) {
@@ -209,7 +211,7 @@ export default class Manager {
for await (const tenant of this.tenantCache) {
log = log.child({ tenantID: tenant.id }, true);
const migrationStartTime = now();
const migrationStartTimer = createTimer();
log.info("starting migration");
try {
@@ -241,8 +243,10 @@ export default class Manager {
throw err;
}
const executionTime = Math.round(now() - migrationStartTime);
log.info({ executionTime }, "finished migration");
log.info(
{ executionTime: migrationStartTimer() },
"finished migration"
);
}
}
@@ -250,13 +254,11 @@ export default class Manager {
await finishMigration(mongo, migration.id);
}
const finishTime = Math.round(now() - migrationsStartTime);
currentMigration = await this.currentMigration(mongo);
logger.info(
{
finishTime,
finishTime: migrationsStartTimer(),
currentMigrationID: currentMigration ? currentMigration.id : null,
},
"finished running pending migrations"