From 4194b08a128e31ea4028e662f13dadd27aa1db70 Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Fri, 10 Apr 2020 16:15:09 +0000 Subject: [PATCH] [CORL-1002] Queue Improvements (#2931) * fix: improved count handler for stories not found * feat: removed performance-now * feat: cleaned up processors, exposed counts * feat: increased verb of schd jobs * fix: removed dead code --- package.json | 1 - src/core/server/app/handlers/api/graphql.ts | 6 +- .../server/app/handlers/api/story/count.ts | 9 +-- src/core/server/app/index.ts | 8 +- src/core/server/app/middleware/logging.ts | 9 +-- src/core/server/app/middleware/metrics.ts | 7 +- src/core/server/config.ts | 36 +++------ src/core/server/cron/scheduled/job.ts | 12 ++- src/core/server/graph/context.ts | 16 +++- .../graph/extensions/LoggerExtension.ts | 9 +-- .../graph/extensions/MetricsExtension.ts | 11 +-- src/core/server/graph/resolvers/Query.ts | 1 + src/core/server/graph/resolvers/Queue.ts | 12 +++ src/core/server/graph/resolvers/Queues.ts | 23 ++++++ src/core/server/graph/resolvers/index.ts | 4 + src/core/server/graph/schema/schema.graphql | 61 ++++++++++++++ src/core/server/helpers/createTimer.ts | 14 ++++ src/core/server/helpers/index.ts | 1 + src/core/server/index.ts | 20 +++-- src/core/server/models/comment/comment.ts | 9 +-- src/core/server/queue/Task.ts | 31 ++++--- src/core/server/queue/index.ts | 12 +-- src/core/server/queue/tasks/Task.ts | 70 ---------------- src/core/server/queue/tasks/mailer/index.ts | 11 ++- .../server/queue/tasks/mailer/processor.ts | 17 ++-- src/core/server/queue/tasks/notifier/index.ts | 62 ++++++-------- .../server/queue/tasks/notifier/processor.ts | 81 +++++++++---------- src/core/server/queue/tasks/rejector.ts | 64 +++++++-------- src/core/server/queue/tasks/scraper.ts | 17 +--- .../server/queue/tasks/webhook/processor.ts | 9 +-- .../services/comments/pipeline/wordList.ts | 12 ++- src/core/server/services/migrate/indexing.ts | 6 +- src/core/server/services/migrate/manager.ts | 24 +++--- 33 files changed, 353 insertions(+), 332 deletions(-) create mode 100644 src/core/server/graph/resolvers/Queue.ts create mode 100644 src/core/server/graph/resolvers/Queues.ts create mode 100644 src/core/server/helpers/createTimer.ts create mode 100644 src/core/server/helpers/index.ts delete mode 100644 src/core/server/queue/tasks/Task.ts diff --git a/package.json b/package.json index 87bcf3908..76330e326 100644 --- a/package.json +++ b/package.json @@ -128,7 +128,6 @@ "passport-local": "^1.0.0", "passport-oauth2": "^1.4.0", "passport-strategy": "^1.0.0", - "performance-now": "^2.1.0", "permit": "^0.2.4", "prom-client": "^11.3.0", "proxy-agent": "^3.1.1", diff --git a/src/core/server/app/handlers/api/graphql.ts b/src/core/server/app/handlers/api/graphql.ts index ccb0f3ef9..0800c6604 100644 --- a/src/core/server/app/handlers/api/graphql.ts +++ b/src/core/server/app/handlers/api/graphql.ts @@ -9,16 +9,18 @@ export type GraphMiddlewareOptions = Pick< | "config" | "i18n" | "mailerQueue" + | "scraperQueue" + | "rejectorQueue" + | "notifierQueue" + | "webhookQueue" | "mongo" | "redis" | "schema" - | "scraperQueue" | "signingConfig" | "pubsub" | "tenantCache" | "metrics" | "broker" - | "rejectorQueue" >; export const graphQLHandler = ({ diff --git a/src/core/server/app/handlers/api/story/count.ts b/src/core/server/app/handlers/api/story/count.ts index da857dbcb..28412c1b6 100644 --- a/src/core/server/app/handlers/api/story/count.ts +++ b/src/core/server/app/handlers/api/story/count.ts @@ -29,13 +29,10 @@ export const countHandler = ({ mongo, i18n }: CountOptions): RequestHandler => { id: req.query.id, url: req.query.url, }); - if (!story) { - throw new Error("Story not found"); - } - const count = calculateTotalPublishedCommentCount( - story.commentCounts.status - ); + const count = story + ? calculateTotalPublishedCommentCount(story.commentCounts.status) + : 0; let html = ""; if (req.query.notext === "true") { diff --git a/src/core/server/app/index.ts b/src/core/server/app/index.ts index 12373dd61..f7913682d 100644 --- a/src/core/server/app/index.ts +++ b/src/core/server/app/index.ts @@ -18,8 +18,10 @@ import { Config } from "coral-server/config"; import CoralEventListenerBroker from "coral-server/events/publisher"; import logger from "coral-server/logger"; import { MailerQueue } from "coral-server/queue/tasks/mailer"; +import { NotifierQueue } from "coral-server/queue/tasks/notifier"; import { RejectorQueue } from "coral-server/queue/tasks/rejector"; import { ScraperQueue } from "coral-server/queue/tasks/scraper"; +import { WebhookQueue } from "coral-server/queue/tasks/webhook"; import { I18n } from "coral-server/services/i18n"; import { JWTSigningConfig } from "coral-server/services/jwt"; import { Metrics } from "coral-server/services/metrics"; @@ -40,6 +42,10 @@ export interface AppOptions { disableClientRoutes: boolean; i18n: I18n; mailerQueue: MailerQueue; + scraperQueue: ScraperQueue; + rejectorQueue: RejectorQueue; + webhookQueue: WebhookQueue; + notifierQueue: NotifierQueue; metrics?: Metrics; mongo: Db; parent: Express; @@ -48,12 +54,10 @@ export interface AppOptions { pubsub: RedisPubSub; redis: AugmentedRedis; schema: GraphQLSchema; - scraperQueue: ScraperQueue; signingConfig: JWTSigningConfig; tenantCache: TenantCache; migrationManager: MigrationManager; broker: CoralEventListenerBroker; - rejectorQueue: RejectorQueue; } /** diff --git a/src/core/server/app/middleware/logging.ts b/src/core/server/app/middleware/logging.ts index cfd2f2b4e..354a8e119 100644 --- a/src/core/server/app/middleware/logging.ts +++ b/src/core/server/app/middleware/logging.ts @@ -1,12 +1,12 @@ +import { Request, Response } from "express"; import onFinished from "on-finished"; -import now from "performance-now"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { ErrorRequestHandler, RequestHandler, } from "coral-server/types/express"; -import { Request, Response } from "express"; const extractMetadata = (req: Request, res: Response) => ({ url: req.originalUrl || req.url, @@ -18,11 +18,10 @@ const extractMetadata = (req: Request, res: Response) => ({ }); export const accessLogger: RequestHandler = (req, res, next) => { - const startTime = now(); - + const timer = createTimer(); onFinished(res, () => { // Compute the end time. - const responseTime = Math.round(now() - startTime); + const responseTime = timer(); // Grab the logger. const log = req.coral ? req.coral.logger : logger; diff --git a/src/core/server/app/middleware/metrics.ts b/src/core/server/app/middleware/metrics.ts index e472c51e3..621dca056 100644 --- a/src/core/server/app/middleware/metrics.ts +++ b/src/core/server/app/middleware/metrics.ts @@ -1,6 +1,6 @@ import onFinished from "on-finished"; -import now from "performance-now"; +import { createTimer } from "coral-server/helpers"; import { Metrics } from "coral-server/services/metrics"; import { RequestHandler } from "coral-server/types/express"; @@ -11,11 +11,10 @@ export const metricsRecorder = ({ httpRequestDurationMilliseconds, }: Metrics): RequestHandler => { return (req, res, next) => { - const startTime = now(); - + const timer = createTimer(); onFinished(res, () => { // Compute the end time. - const responseTime = Math.round(now() - startTime); + const responseTime = timer(); // Increment the request counter. httpRequestsTotal.labels(`${res.statusCode}`, req.method).inc(); diff --git a/src/core/server/config.ts b/src/core/server/config.ts index 20ab8155e..0dfd37ff7 100644 --- a/src/core/server/config.ts +++ b/src/core/server/config.ts @@ -94,49 +94,43 @@ const config = convict({ format: String, default: "", env: "TRUST_PROXY", - arg: "trustProxy", }, enable_graphiql: { doc: "When true, this will enable the GraphiQL interface at /graphiql", format: Boolean, default: false, env: "ENABLE_GRAPHIQL", - arg: "enableGraphiQL", }, concurrency: { doc: "The number of worker nodes to spawn to handle traffic", format: Number, default: os.cpus().length, env: "CONCURRENCY", - arg: "concurrency", }, port: { doc: "The port to bind.", format: "port", default: 3000, env: "PORT", - arg: "port", }, cluster_metrics_port: { doc: "The port to bind for cluster metrics.", format: "port", default: 3001, env: "CLUSTER_METRICS_PORT", - arg: "clusterMetricsPort", }, metrics_username: { doc: "The username to use to authenticate to the metrics endpoint.", format: "String", default: "", env: "METRICS_USERNAME", - arg: "metricsUsername", }, metrics_password: { doc: "The password to use to authenticate to the metrics endpoint.", format: "String", default: "", env: "METRICS_PASSWORD", - arg: "metricsPassword", + sensitive: true, }, dev_port: { @@ -144,14 +138,13 @@ const config = convict({ format: "port", default: 8080, env: "DEV_PORT", - arg: "dev-port", }, mongodb: { doc: "The MongoDB database to connect to.", format: "mongo-uri", default: "mongodb://127.0.0.1:27017/coral", env: "MONGODB_URI", - arg: "mongodb", + sensitive: true, }, redis: { @@ -159,7 +152,7 @@ const config = convict({ format: "redis-uri", default: "redis://127.0.0.1:6379", env: "REDIS_URI", - arg: "redis", + sensitive: true, }, redis_options: { @@ -167,7 +160,6 @@ const config = convict({ format: Object, default: {}, env: "REDIS_OPTIONS", - arg: "redisOptions", }, signing_secret: { doc: @@ -175,7 +167,7 @@ const config = convict({ format: "*", default: "keyboard cat", // TODO: (wyattjoh) evaluate best solution env: "SIGNING_SECRET", - arg: "signingSecret", + sensitive: true, }, signing_algorithm: { @@ -183,14 +175,13 @@ const config = convict({ format: algorithms, default: "HS256", env: "SIGNING_ALGORITHM", - arg: "signingAlgorithm", }, management_signing_secret: { doc: "The secret used to verify management API requests.", format: "*", default: null, env: "MANAGEMENT_SIGNING_SECRET", - arg: "managementSigningSecret", + sensitive: true, }, management_signing_algorithm: { @@ -198,21 +189,18 @@ const config = convict({ format: algorithms, default: "HS256", env: "MANAGEMENT_SIGNING_ALGORITHM", - arg: "managementSigningAlgorithm", }, logging_level: { doc: "The logging level to print to the console", format: ["fatal", "error", "warn", "info", "debug", "trace"], default: "info", env: "LOGGING_LEVEL", - arg: "logging", }, static_uri: { doc: "The URL that static assets will be hosted from", format: "optional-url", default: "", env: "STATIC_URI", - arg: "staticURI", }, websocket_keep_alive_timeout: { doc: @@ -220,7 +208,6 @@ const config = convict({ format: "ms", default: "30 seconds", env: "WEBSOCKET_KEEP_ALIVE_TIMEOUT", - arg: "websocketKeepAliveTimeout", }, disable_tenant_caching: { doc: @@ -228,7 +215,6 @@ const config = convict({ format: Boolean, default: false, env: "DISABLE_TENANT_CACHING", - arg: "disableTenantCaching", }, disable_live_updates: { doc: @@ -236,7 +222,6 @@ const config = convict({ format: Boolean, default: false, env: "DISABLE_LIVE_UPDATES", - arg: "disableLiveUpdates", }, disable_client_routes: { doc: @@ -244,7 +229,6 @@ const config = convict({ format: Boolean, default: false, env: "DISABLE_CLIENT_ROUTES", - arg: "disableClientRoutes", }, disable_rate_limiters: { doc: @@ -252,14 +236,12 @@ const config = convict({ format: Boolean, default: false, env: "DISABLE_RATE_LIMITERS", - arg: "disableRateLimiters", }, scrape_timeout: { doc: "The request timeout (in ms) for scraping operations.", format: "ms", default: "10 seconds", env: "SCRAPE_TIMEOUT", - arg: "scrapeTimeout", }, perspective_timeout: { doc: @@ -267,7 +249,6 @@ const config = convict({ format: "ms", default: "800 milliseconds", env: "PERSPECTIVE_TIMEOUT", - arg: "perspectiveTimeout", }, force_ssl: { doc: @@ -275,7 +256,12 @@ const config = convict({ format: Boolean, default: false, env: "FORCE_SSL", - arg: "forceSSL", + }, + disable_job_processors: { + doc: "Disables job processors when running.", + format: Boolean, + default: false, + env: "DISABLE_JOB_PROCESSORS", }, }); diff --git a/src/core/server/cron/scheduled/job.ts b/src/core/server/cron/scheduled/job.ts index bf11b65d9..b0fca0a7d 100644 --- a/src/core/server/cron/scheduled/job.ts +++ b/src/core/server/cron/scheduled/job.ts @@ -1,7 +1,7 @@ import { CronCommand, CronJob } from "cron"; -import now from "performance-now"; import uuid from "uuid"; +import { createTimer } from "coral-server/helpers"; import logger, { Logger } from "coral-server/logger"; export type ScheduledJobCommand = ( @@ -40,18 +40,16 @@ export class ScheduledJob { private command(command: ScheduledJobCommand): CronCommand { return async () => { const log = this.log.child({ scheduledExecutionID: uuid.v1() }, true); - log.debug("now starting scheduled job"); - const start = now(); + log.info("now starting scheduled job"); + const timer = createTimer(); try { await command({ ...this.context, log, }); - const processingTime = Math.floor(now() - start); - log.debug({ processingTime }, "now finished scheduled job"); + log.info({ took: timer() }, "now finished scheduled job"); } catch (err) { - const processingTime = Math.floor(now() - start); - log.error({ err, processingTime }, "failed to run scheduled job"); + log.error({ err, took: timer() }, "failed to run scheduled job"); } }; } diff --git a/src/core/server/graph/context.ts b/src/core/server/graph/context.ts index e688e4aa9..07146519a 100644 --- a/src/core/server/graph/context.ts +++ b/src/core/server/graph/context.ts @@ -12,8 +12,10 @@ import { PersistedQuery } from "coral-server/models/queries"; import { Tenant } from "coral-server/models/tenant"; import { User } from "coral-server/models/user"; import { MailerQueue } from "coral-server/queue/tasks/mailer"; +import { NotifierQueue } from "coral-server/queue/tasks/notifier"; import { RejectorQueue } from "coral-server/queue/tasks/rejector"; import { ScraperQueue } from "coral-server/queue/tasks/scraper"; +import { WebhookQueue } from "coral-server/queue/tasks/webhook"; import { I18n } from "coral-server/services/i18n"; import { JWTSigningConfig } from "coral-server/services/jwt"; import { AugmentedRedis } from "coral-server/services/redis"; @@ -38,11 +40,13 @@ export interface GraphContextOptions { config: Config; i18n: I18n; mailerQueue: MailerQueue; + rejectorQueue: RejectorQueue; + scraperQueue: ScraperQueue; + webhookQueue: WebhookQueue; + notifierQueue: NotifierQueue; mongo: Db; pubsub: RedisPubSub; redis: AugmentedRedis; - rejectorQueue: RejectorQueue; - scraperQueue: ScraperQueue; tenant: Tenant; tenantCache: TenantCache; broker: CoralEventListenerBroker; @@ -58,13 +62,15 @@ export default class GraphContext { public readonly loaders: ReturnType; public readonly logger: Logger; public readonly mailerQueue: MailerQueue; + public readonly rejectorQueue: RejectorQueue; + public readonly scraperQueue: ScraperQueue; + public readonly webhookQueue: WebhookQueue; + public readonly notifierQueue: NotifierQueue; public readonly mongo: Db; public readonly mutators: ReturnType; public readonly now: Date; public readonly pubsub: RedisPubSub; public readonly redis: AugmentedRedis; - public readonly rejectorQueue: RejectorQueue; - public readonly scraperQueue: ScraperQueue; public readonly tenant: Tenant; public readonly tenantCache: TenantCache; @@ -98,6 +104,8 @@ export default class GraphContext { this.scraperQueue = options.scraperQueue; this.mailerQueue = options.mailerQueue; this.rejectorQueue = options.rejectorQueue; + this.notifierQueue = options.notifierQueue; + this.webhookQueue = options.webhookQueue; this.signingConfig = options.signingConfig; this.clientID = options.clientID; diff --git a/src/core/server/graph/extensions/LoggerExtension.ts b/src/core/server/graph/extensions/LoggerExtension.ts index b7f65db0a..8c00737e6 100644 --- a/src/core/server/graph/extensions/LoggerExtension.ts +++ b/src/core/server/graph/extensions/LoggerExtension.ts @@ -4,9 +4,9 @@ import { GraphQLExtension, GraphQLResponse, } from "graphql-extensions"; -import now from "performance-now"; import GraphContext from "coral-server/graph/context"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { getOperationMetadata, getPersistedQueryMetadata } from "./helpers"; @@ -84,17 +84,14 @@ export class LoggerExtension implements GraphQLExtension { if (o.executionArgs.contextValue) { // Grab the start time so we can calculate the time it takes to execute // the graph query. - const startTime = now(); + const timer = createTimer(); return () => { - // Compute the end time. - const responseTime = Math.round(now() - startTime); - // Log out the details of the request. logQuery( o.executionArgs.contextValue, o.executionArgs.document, undefined, - responseTime + timer() ); }; } diff --git a/src/core/server/graph/extensions/MetricsExtension.ts b/src/core/server/graph/extensions/MetricsExtension.ts index cef01957d..96d66f82f 100644 --- a/src/core/server/graph/extensions/MetricsExtension.ts +++ b/src/core/server/graph/extensions/MetricsExtension.ts @@ -1,8 +1,9 @@ -import GraphContext from "coral-server/graph/context"; -import { Metrics } from "coral-server/services/metrics"; import { ExecutionArgs } from "graphql"; import { EndHandler, GraphQLExtension } from "graphql-extensions"; -import now from "performance-now"; + +import GraphContext from "coral-server/graph/context"; +import { createTimer } from "coral-server/helpers"; +import { Metrics } from "coral-server/services/metrics"; import { getOperationMetadata } from "./helpers"; @@ -16,10 +17,10 @@ export class MetricsExtension implements GraphQLExtension { if (o.executionArgs.contextValue) { // Grab the start time so we can calculate the time it takes to execute // the graph query. - const startTime = now(); + const timer = createTimer(); return () => { // Compute the end time. - const responseTime = Math.round(now() - startTime); + const responseTime = timer(); // Get the request metadata. const { operation, operationName } = getOperationMetadata( diff --git a/src/core/server/graph/resolvers/Query.ts b/src/core/server/graph/resolvers/Query.ts index 1790d851b..9b14a1132 100644 --- a/src/core/server/graph/resolvers/Query.ts +++ b/src/core/server/graph/resolvers/Query.ts @@ -28,4 +28,5 @@ export const Query: Required> = { sites: (source, args, ctx) => ctx.loaders.Sites.connection(args), site: (source, { id }, ctx) => (id ? ctx.loaders.Sites.site.load(id) : null), webhookEndpoint: (source, { id }, ctx) => getWebhookEndpoint(ctx.tenant, id), + queues: () => ({}), }; diff --git a/src/core/server/graph/resolvers/Queue.ts b/src/core/server/graph/resolvers/Queue.ts new file mode 100644 index 000000000..d6b8b9991 --- /dev/null +++ b/src/core/server/graph/resolvers/Queue.ts @@ -0,0 +1,12 @@ +import { + GQLQueueCounts, + GQLQueueTypeResolver, +} from "../schema/__generated__/types"; + +export interface QueueInput { + counts(): Promise; +} + +export const Queue: Required> = { + counts: t => t.counts(), +}; diff --git a/src/core/server/graph/resolvers/Queues.ts b/src/core/server/graph/resolvers/Queues.ts new file mode 100644 index 000000000..bdb105422 --- /dev/null +++ b/src/core/server/graph/resolvers/Queues.ts @@ -0,0 +1,23 @@ +import { GQLQueuesTypeResolver } from "../schema/__generated__/types"; + +import GraphContext from "../context"; +import { QueueInput } from "./Queue"; + +/** + * get produces a resolver that maps the context to a QueueInput. + * + * @param fn the function to map the ctx to the queue. + */ +const get = (fn: (ctx: GraphContext) => QueueInput) => ( + parent: any, + args: any, + ctx: GraphContext +) => fn(ctx); + +export const Queues: Required = { + mailer: get(ctx => ctx.mailerQueue), + scraper: get(ctx => ctx.scraperQueue), + notifier: get(ctx => ctx.notifierQueue), + webhook: get(ctx => ctx.webhookQueue), + rejector: get(ctx => ctx.rejectorQueue), +}; diff --git a/src/core/server/graph/resolvers/index.ts b/src/core/server/graph/resolvers/index.ts index 7b78a0a38..905d26a76 100644 --- a/src/core/server/graph/resolvers/index.ts +++ b/src/core/server/graph/resolvers/index.ts @@ -35,6 +35,8 @@ import { PremodStatus } from "./PremodStatus"; import { PremodStatusHistory } from "./PremodStatusHistory"; import { Profile } from "./Profile"; import { Query } from "./Query"; +import { Queue } from "./Queue"; +import { Queues } from "./Queues"; import { RecentCommentHistory } from "./RecentCommentHistory"; import { RejectCommentPayload } from "./RejectCommentPayload"; import { Secret } from "./Secret"; @@ -99,6 +101,8 @@ const Resolvers: GQLResolver = { Tag, Time, User, + Queue, + Queues, UsernameHistory, UsernameStatus, UserStatus, diff --git a/src/core/server/graph/schema/schema.graphql b/src/core/server/graph/schema/schema.graphql index a88986aad..114c5cf81 100644 --- a/src/core/server/graph/schema/schema.graphql +++ b/src/core/server/graph/schema/schema.graphql @@ -2997,6 +2997,62 @@ type StoriesConnection { pageInfo: PageInfo! } +################################################################################ +## Queue +################################################################################ + +type QueueCounts { + """ + waiting is the number of jobs that are in line to be processed. + """ + waiting: Int! + + """ + active is the number of jobs that are being activly processed. + """ + active: Int! + + """ + delayed is the number of jobs that have been delayed due to a failure and are + waiting for a backoff. + """ + delayed: Int! +} + +type Queue { + """ + counts is the current counts associated with the Queue. + """ + counts: QueueCounts! +} + +type Queues { + """ + mailer is the Queue associated with the Mailer queue. + """ + mailer: Queue! + + """ + scraper is the Queue associated with the Scraper queue. + """ + scraper: Queue! + + """ + notifier is the Queue associated with the Notifier queue. + """ + notifier: Queue! + + """ + webhook is the Queue associated with the Webhook queue. + """ + webhook: Queue! + + """ + rejector is the Queue associated with the Rejector queue. + """ + rejector: Queue! +} + ################################################################################ ## Query ################################################################################ @@ -3119,6 +3175,11 @@ type Query { webhookEndpint will return a specific WebhookEndpoint if it exists. """ webhookEndpoint(id: ID!): WebhookEndpoint @auth(roles: [ADMIN]) + + """ + queues returns information on queues used in Coral to manage + """ + queues: Queues! @auth(roles: [ADMIN]) } ################################################################################ diff --git a/src/core/server/helpers/createTimer.ts b/src/core/server/helpers/createTimer.ts new file mode 100644 index 000000000..72296661f --- /dev/null +++ b/src/core/server/helpers/createTimer.ts @@ -0,0 +1,14 @@ +/** + * createTimer will create a new timer that can be called again to get the + * milliseconds since the timer was created. + * + */ +const createTimer = () => { + const start = Date.now(); + return () => { + const finish = Date.now(); + return finish - start; + }; +}; + +export default createTimer; diff --git a/src/core/server/helpers/index.ts b/src/core/server/helpers/index.ts new file mode 100644 index 000000000..f2469b295 --- /dev/null +++ b/src/core/server/helpers/index.ts @@ -0,0 +1 @@ +export { default as createTimer } from "./createTimer"; diff --git a/src/core/server/index.ts b/src/core/server/index.ts index 2e72777ff..e7f3ff73e 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -201,7 +201,7 @@ class Server { await this.tenantCache.primeAll(); // Create the Job Queue. - this.tasks = await createQueue({ + this.tasks = createQueue({ config: this.config, mongo: this.mongo, redis: this.redis, @@ -253,11 +253,17 @@ class Server { await this.persistedQueryCache.prime(); // Launch all of the job processors. - this.tasks.mailer.process(); - this.tasks.scraper.process(); - this.tasks.notifier.process(); - this.tasks.webhook.process(); - this.tasks.rejector.process(); + if (!this.config.get("disable_job_processors")) { + logger.info("job processing is enabled, starting job processors"); + + this.tasks.mailer.process(); + this.tasks.scraper.process(); + this.tasks.notifier.process(); + this.tasks.webhook.process(); + this.tasks.rejector.process(); + } else { + logger.info("job processing is disabled, not starting job processors"); + } // Start up the cron job processors. this.scheduledTasks = startScheduledTasks({ @@ -360,6 +366,8 @@ class Server { mailerQueue: this.tasks.mailer, scraperQueue: this.tasks.scraper, rejectorQueue: this.tasks.rejector, + webhookQueue: this.tasks.webhook, + notifierQueue: this.tasks.notifier, disableClientRoutes, persistedQueryCache: this.persistedQueryCache, persistedQueriesRequired: diff --git a/src/core/server/models/comment/comment.ts b/src/core/server/models/comment/comment.ts index cb8b3045a..12f838d90 100644 --- a/src/core/server/models/comment/comment.ts +++ b/src/core/server/models/comment/comment.ts @@ -1,11 +1,11 @@ import { isEmpty } from "lodash"; import { Db } from "mongodb"; -import performanceNow from "performance-now"; import uuid from "uuid"; import { Omit, Sub } from "coral-common/types"; import { dotize } from "coral-common/utils/dotize"; import { CommentNotFoundError } from "coral-server/errors"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { EncodedCommentActionCounts, @@ -936,7 +936,7 @@ export async function retrieveStoryCommentTagCounts( } // Get the start time. - const startTime = performanceNow(); + const timer = createTimer(); // Load the counts from the database for this particular tag query. const cursor = collection<{ @@ -956,12 +956,9 @@ export async function retrieveStoryCommentTagCounts( // Get all of the counts. const tags = await cursor.toArray(); - // Compute the end time. - const responseTime = Math.round(performanceNow() - startTime); - // Logging at the info level here to ensure we track any degrading performance // issues from this query. - logger.info({ responseTime, filter: $match }, "counting tags"); + logger.info({ responseTime: timer(), filter: $match }, "counting tags"); // For each of the storyIDs... return storyIDs.map(storyID => { diff --git a/src/core/server/queue/Task.ts b/src/core/server/queue/Task.ts index 405f98075..96b9eb249 100644 --- a/src/core/server/queue/Task.ts +++ b/src/core/server/queue/Task.ts @@ -1,8 +1,10 @@ -import Queue, { Job, Queue as QueueType } from "bull"; +import Queue, { Job, JobCounts, Queue as QueueType } from "bull"; import Logger from "bunyan"; import TIME from "coral-common/time"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; +import { TenantResource } from "coral-server/models/tenant"; export type JobProcessor = (job: Job) => Promise; @@ -13,10 +15,10 @@ export interface TaskOptions { queue: Queue.QueueOptions; } -export default class Task { - private options: TaskOptions; - private queue: QueueType; - private log: Logger; +export default class Task { + private readonly options: Required>; + private readonly queue: QueueType; + private readonly log: Logger; constructor({ jobName, @@ -34,6 +36,9 @@ export default class Task { // with completed entries if we don't need to. removeOnComplete: true, + // Remove the job if it fails after all attempts. + removeOnFail: true, + // By default, configure jobs to use an exponential backoff strategy. backoff: { type: "exponential", @@ -52,6 +57,10 @@ export default class Task { // TODO: (wyattjoh) attach event handlers to the queue for metrics via: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#events } + public async counts(): Promise { + return this.queue.getJobCounts(); + } + /** * Add will add the job to the queue to get processed. It's not needed to * handle the job after it has been created. @@ -62,7 +71,7 @@ export default class Task { // Create the job. const job = await this.queue.add(data, this.options.jobOptions); - this.log.trace({ jobID: job.id }, "added job to queue"); + this.log.info({ jobID: job.id }, "added job to queue"); return job; } @@ -77,15 +86,19 @@ export default class Task { true ); - log.trace("processing job from queue"); + const timer = createTimer(); + log.info("processing job from queue"); try { // Send the job off to the job processor to be handled. const promise: U = await this.options.jobProcessor(job); - log.trace("processing completed"); + + // Log it! + log.info({ took: timer() }, "processing completed"); + return promise; } catch (err) { - log.error({ err }, "job failed to process"); + log.error({ err, took: timer() }, "job failed to process"); throw err; } }); diff --git a/src/core/server/queue/index.ts b/src/core/server/queue/index.ts index 3e366d2f4..83afd1291 100644 --- a/src/core/server/queue/index.ts +++ b/src/core/server/queue/index.ts @@ -13,9 +13,7 @@ import { createRejectorTask, RejectorQueue } from "./tasks/rejector"; import { createScraperTask, ScraperQueue } from "./tasks/scraper"; import { createWebhookTask, WebhookQueue } from "./tasks/webhook"; -const createQueueOptions = async ( - config: Config -): Promise => { +const createQueueOptions = (config: Config): Queue.QueueOptions => { const client = createRedisClient(config); const subscriber = createRedisClient(config); @@ -60,10 +58,13 @@ export interface TaskQueue { rejector: RejectorQueue; } -export async function createQueue(options: QueueOptions): Promise { +export function createQueue(options: QueueOptions): TaskQueue { + // Pull some options out. + const { config } = options; + // Create the processor queue options. This holds references to the Redis // clients that are shared per queue. - const queueOptions = await createQueueOptions(options.config); + const queueOptions = createQueueOptions(config); // Attach process functions to the various tasks in the queue. const mailer = createMailerTask(queueOptions, options); @@ -73,7 +74,6 @@ export async function createQueue(options: QueueOptions): Promise { ...options, }); const webhook = createWebhookTask(queueOptions, options); - const rejector = createRejectorTask(queueOptions, options); // Return the tasks + client. diff --git a/src/core/server/queue/tasks/Task.ts b/src/core/server/queue/tasks/Task.ts deleted file mode 100644 index df86ec389..000000000 --- a/src/core/server/queue/tasks/Task.ts +++ /dev/null @@ -1,70 +0,0 @@ -import Queue, { Job, Queue as QueueType } from "bull"; -import logger from "coral-server/logger"; - -export interface TaskOptions { - jobName: string; - jobProcessor: (job: Job) => Promise; - queue: Queue.QueueOptions; -} - -export default class Task { - private options: TaskOptions; - private queue: QueueType; - - constructor(options: TaskOptions) { - this.queue = new Queue(options.jobName, options.queue); - this.options = options; - - // Sets up and attaches the job processor to the queue. - this.setupAndAttachProcessor(); - } - - /** - * Add will add the job to the queue to get processed. It's not needed to - * handle the job after it has been created. - * - * @param data the data for the job to add. - */ - public async add(data: T) { - const job = await this.queue.add(data, { - // We always remove the job when it's complete, no need to fill up Redis - // with completed entries if we don't need to. - removeOnComplete: true, - }); - - logger.trace( - { jobID: job.id, jobName: this.options.jobName }, - "added job to queue" - ); - return job; - } - - private setupAndAttachProcessor() { - this.queue.process(async (job: Job) => { - logger.trace( - { jobID: job.id, jobName: this.options.jobName }, - "processing job from queue" - ); - - try { - // Send the job off to the job processor to be handled. - const promise: U = await this.options.jobProcessor(job); - logger.trace( - { jobID: job.id, jobName: this.options.jobName }, - "processing completed" - ); - - return promise; - } catch (err) { - logger.error({ err }, "failed to process job from queue"); - - throw err; - } - }); - - logger.trace( - { jobName: this.options.jobName }, - "registered processor for job type" - ); - } -} diff --git a/src/core/server/queue/tasks/mailer/index.ts b/src/core/server/queue/tasks/mailer/index.ts index db2c2be41..27d669a83 100644 --- a/src/core/server/queue/tasks/mailer/index.ts +++ b/src/core/server/queue/tasks/mailer/index.ts @@ -1,6 +1,6 @@ import Queue from "bull"; -import now from "performance-now"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import Task from "coral-server/queue/Task"; import MailerContent from "coral-server/queue/tasks/mailer/content"; @@ -37,6 +37,10 @@ export class MailerQueue { this.tenantCache = options.tenantCache; } + public async counts() { + return this.task.counts(); + } + public async add({ template, tenantID, message: { to } }: MailerInput) { const log = logger.child( { @@ -61,7 +65,7 @@ export class MailerQueue { return; } - const startTemplateGenerationTime = now(); + const timer = createTimer(); let html: string; try { @@ -74,8 +78,7 @@ export class MailerQueue { } // Compute the end time. - const responseTime = Math.round(now() - startTemplateGenerationTime); - log.trace({ responseTime }, "finished template generation"); + log.trace({ took: timer() }, "finished template generation"); // Return the job that'll add the email to the queue to be processed later. return this.task.add({ diff --git a/src/core/server/queue/tasks/mailer/processor.ts b/src/core/server/queue/tasks/mailer/processor.ts index d27d567a1..5a5e49f01 100644 --- a/src/core/server/queue/tasks/mailer/processor.ts +++ b/src/core/server/queue/tasks/mailer/processor.ts @@ -11,11 +11,11 @@ import { camelCase, isNil } from "lodash"; import { Db } from "mongodb"; import { createTransport } from "nodemailer"; import { Options } from "nodemailer/lib/smtp-connection"; -import now from "performance-now"; import { LanguageCode } from "coral-common/helpers/i18n/locales"; import { Config } from "coral-server/config"; import { InternalError } from "coral-server/errors"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { Tenant } from "coral-server/models/tenant"; import { I18n, translate } from "coral-server/services/i18n"; @@ -245,7 +245,7 @@ export const createJobProcessor = (options: MailProcessorOptions) => { // Construct the fromAddress. const fromAddress = fromName ? `${fromName} <${fromEmail}>` : fromEmail; - const startTemplateGenerationTime = now(); + const templateGenerationTimer = createTimer(); // Get the message to send. let message: Message; @@ -261,9 +261,10 @@ export const createJobProcessor = (options: MailProcessorOptions) => { throw new InternalError(e, "could not translate the message"); } - // Compute the end time. - const responseTime = Math.round(now() - startTemplateGenerationTime); - log.trace({ responseTime }, "finished mail translation"); + log.trace( + { responseTime: templateGenerationTimer() }, + "finished mail translation" + ); let transport = cache.get(tenantID); if (!transport) { @@ -300,7 +301,7 @@ export const createJobProcessor = (options: MailProcessorOptions) => { log.debug("starting to send the email"); - const startMessageSendTime = now(); + const messageSendTimer = createTimer(); try { // Send the mail message. @@ -309,8 +310,6 @@ export const createJobProcessor = (options: MailProcessorOptions) => { throw new InternalError(e, "could not send email"); } - // Compute the end time. - const messageSendResponseTime = Math.round(now() - startMessageSendTime); - log.debug({ responseTime: messageSendResponseTime }, "sent the email"); + log.debug({ responseTime: messageSendTimer() }, "sent the email"); }; }; diff --git a/src/core/server/queue/tasks/notifier/index.ts b/src/core/server/queue/tasks/notifier/index.ts index 2ba223cf5..fe8317bf1 100644 --- a/src/core/server/queue/tasks/notifier/index.ts +++ b/src/core/server/queue/tasks/notifier/index.ts @@ -22,46 +22,30 @@ interface Options { signingConfig: JWTSigningConfig; } -/** - * NotifierQueue is designed to handle creating and queuing notifications - * that could be sent to users. - */ -export class NotifierQueue { - private task: Task; - - constructor(queue: Queue.QueueOptions, options: Options) { - const registry = new Map(); - - // Notification categories have been grouped by their event name so that - // each event emitted need only access the associated notification once. - for (const category of categories) { - for (const event of category.events as CoralEventType[]) { - let handlers = registry.get(event); - if (!handlers) { - handlers = []; - } - handlers.push(category); - registry.set(event, handlers); - } - } - - this.task = new Task({ - jobName: JOB_NAME, - jobProcessor: createJobProcessor({ registry, ...options }), - queue, - }); - } - - public async add(data: NotifierData) { - return this.task.add(data); - } - - public process() { - return this.task.process(); - } -} +export type NotifierQueue = Task; export const createNotifierTask = ( queue: Queue.QueueOptions, options: Options -) => new NotifierQueue(queue, options); +) => { + const registry = new Map(); + + // Notification categories have been grouped by their event name so that + // each event emitted need only access the associated notification once. + for (const category of categories) { + for (const event of category.events as CoralEventType[]) { + let handlers = registry.get(event); + if (!handlers) { + handlers = []; + } + handlers.push(category); + registry.set(event, handlers); + } + } + + return new Task({ + jobName: JOB_NAME, + jobProcessor: createJobProcessor({ registry, ...options }), + queue, + }); +}; diff --git a/src/core/server/queue/tasks/notifier/processor.ts b/src/core/server/queue/tasks/notifier/processor.ts index 4af95f810..db7fe0fb3 100644 --- a/src/core/server/queue/tasks/notifier/processor.ts +++ b/src/core/server/queue/tasks/notifier/processor.ts @@ -79,51 +79,42 @@ export const createJobProcessor = ({ log.debug("starting to handle a notify operation"); - try { - // Get all the handlers that are active for this channel. - const categories = registry.get(input.type); - if (!categories || categories.length === 0) { - return; - } - - // Grab the tenant from the cache. - const tenant = await tenantCache.retrieveByID(tenantID); - if (!tenant) { - throw new Error("tenant not found with ID"); - } - - // Create a notification context to handle processing notifications. - const ctx = new NotificationContext({ - mongo, - config, - signingConfig, - tenant, - now, - }); - - // For each of the handler's we need to process, we should iterate to - // generate their notifications. - let notifications = await handleHandlers(ctx, categories, input); - - // Check to see if some of the other notifications that are queued - // had this notification superseded. - notifications = notifications.filter(filterSuperseded); - - // Send all the notifications now. - await processNewNotifications( - ctx, - notifications.map(({ notification }) => notification), - mailerQueue - ); - - log.debug( - { notifications: notifications.length }, - "notifications handled" - ); - } catch (err) { - log.error({ err }, "could not handle the notifications"); - - throw err; + // Get all the handlers that are active for this channel. + const categories = registry.get(input.type); + if (!categories || categories.length === 0) { + return; } + + // Grab the tenant from the cache. + const tenant = await tenantCache.retrieveByID(tenantID); + if (!tenant) { + throw new Error("tenant not found with ID"); + } + + // Create a notification context to handle processing notifications. + const ctx = new NotificationContext({ + mongo, + config, + signingConfig, + tenant, + now, + }); + + // For each of the handler's we need to process, we should iterate to + // generate their notifications. + let notifications = await handleHandlers(ctx, categories, input); + + // Check to see if some of the other notifications that are queued + // had this notification superseded. + notifications = notifications.filter(filterSuperseded); + + // Send all the notifications now. + await processNewNotifications( + ctx, + notifications.map(({ notification }) => notification), + mailerQueue + ); + + log.debug({ notifications: notifications.length }, "notifications handled"); }; }; diff --git a/src/core/server/queue/tasks/rejector.ts b/src/core/server/queue/tasks/rejector.ts index ba9a56771..a5cd9c44e 100644 --- a/src/core/server/queue/tasks/rejector.ts +++ b/src/core/server/queue/tasks/rejector.ts @@ -1,7 +1,7 @@ import Queue, { Job } from "bull"; import { Db } from "mongodb"; -import now from "performance-now"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { Comment, @@ -61,49 +61,49 @@ const createJobProcessor = ({ true ); // Mark the start time. - const startTime = now(); + const timer = createTimer(); + log.debug("starting to reject author comments"); + // Get the tenant. const tenant = await tenantCache.retrieveByID(tenantID); if (!tenant) { log.error("referenced tenant was not found"); return; } + // Get the current time. const currentTime = new Date(); - try { - // Find all comments written by the author that should be rejected. - let connection = await getBatch(mongo, tenantID, authorID); - while (connection.nodes.length > 0) { - for (const comment of connection.nodes) { - // Get the latest revision of the comment. - const revision = getLatestRevision(comment); - // Reject the comment. - await rejectComment( - mongo, - redis, - null, - tenant, - comment.id, - revision.id, - moderatorID, - currentTime - ); - } - // If there was not another page, abort processing. - if (!connection.pageInfo.hasNextPage) { - break; - } - // Load the next page. - connection = await getBatch(mongo, tenantID, authorID, connection); + + // Find all comments written by the author that should be rejected. + let connection = await getBatch(mongo, tenantID, authorID); + while (connection.nodes.length > 0) { + for (const comment of connection.nodes) { + // Get the latest revision of the comment. + const revision = getLatestRevision(comment); + + // Reject the comment. + await rejectComment( + mongo, + redis, + null, + tenant, + comment.id, + revision.id, + moderatorID, + currentTime + ); } - } catch (err) { - log.error({ err }, "could not reject the author's comments"); - throw err; + // If there was not another page, abort processing. + if (!connection.pageInfo.hasNextPage) { + break; + } + // Load the next page. + connection = await getBatch(mongo, tenantID, authorID, connection); } + // Compute the end time. - const took = Math.round(now() - startTime); - log.debug({ took }, "rejected the author's comments"); + log.debug({ took: timer() }, "rejected the author's comments"); }; export type RejectorQueue = Task; diff --git a/src/core/server/queue/tasks/scraper.ts b/src/core/server/queue/tasks/scraper.ts index 5e4f95560..8fc5bd0c3 100644 --- a/src/core/server/queue/tasks/scraper.ts +++ b/src/core/server/queue/tasks/scraper.ts @@ -1,8 +1,8 @@ import Queue, { Job } from "bull"; import { Db } from "mongodb"; -import now from "performance-now"; import { Config } from "coral-server/config"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import Task from "coral-server/queue/Task"; import { scrape } from "coral-server/services/stories/scraper"; @@ -39,22 +39,13 @@ const createJobProcessor = ({ ); // Mark the start time. - const startTime = now(); + const timer = createTimer(); log.debug("starting to scrape the story"); - - try { - await scrape(mongo, config, tenantID, storyID, storyURL); - } catch (err) { - log.error({ err }, "could not scrape the story"); - - throw err; - } + await scrape(mongo, config, tenantID, storyID, storyURL); // Compute the end time. - const responseTime = Math.round(now() - startTime); - - log.debug({ responseTime }, "scraped the story"); + log.debug({ responseTime: timer() }, "scraped the story"); }; export type ScraperQueue = Task; diff --git a/src/core/server/queue/tasks/webhook/processor.ts b/src/core/server/queue/tasks/webhook/processor.ts index 4dff50287..d5667fbfd 100644 --- a/src/core/server/queue/tasks/webhook/processor.ts +++ b/src/core/server/queue/tasks/webhook/processor.ts @@ -1,10 +1,10 @@ import crypto from "crypto"; import { Redis } from "ioredis"; import { Db } from "mongodb"; -import getNow from "performance-now"; import { Config } from "coral-server/config"; import { CoralEventPayload } from "coral-server/events/event"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { filterActiveSecrets, @@ -170,17 +170,16 @@ export function createJobProcessor({ ); // Send the request. - const startedSendingAt = getNow(); + const timer = createTimer(); const res = await fetch(endpoint.url, options); - const took = getNow() - startedSendingAt; if (res.ok) { log.info( - { took, responseStatus: res.status }, + { took: timer(), responseStatus: res.status }, "finished sending webhook" ); } else { log.warn( - { took, responseStatus: res.status }, + { took: timer(), responseStatus: res.status }, "failed to deliver webhook" ); } diff --git a/src/core/server/services/comments/pipeline/wordList.ts b/src/core/server/services/comments/pipeline/wordList.ts index 985144df6..cb78e04ca 100644 --- a/src/core/server/services/comments/pipeline/wordList.ts +++ b/src/core/server/services/comments/pipeline/wordList.ts @@ -1,8 +1,6 @@ -import ms from "ms"; -import now from "performance-now"; - import { LanguageCode } from "coral-common/helpers"; import createWordListRegExp from "coral-common/utils/createWordListRegExp"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { Tenant } from "coral-server/models/tenant"; @@ -54,10 +52,10 @@ export class WordList { // it. let lists = this.cache.get(options); if (!lists) { - const startedAt = now(); + const timer = createTimer(); lists = this.create(options); logger.info( - { tenantID: options.id, took: ms(now() - startedAt) }, + { tenantID: options.id, took: timer() }, "regenerated word list cache" ); @@ -88,10 +86,10 @@ export class WordList { return false; } - const startedAt = now(); + const timer = createTimer(); const result = list.test(testString); logger.info( - { tenantID: options.id, listName, took: ms(now() - startedAt) }, + { tenantID: options.id, listName, took: timer() }, "word list phrase test complete" ); diff --git a/src/core/server/services/migrate/indexing.ts b/src/core/server/services/migrate/indexing.ts index 040ebf130..4f5c9a504 100644 --- a/src/core/server/services/migrate/indexing.ts +++ b/src/core/server/services/migrate/indexing.ts @@ -1,8 +1,8 @@ import { merge } from "lodash"; import { Collection, Db, IndexOptions } from "mongodb"; -import now from "performance-now"; import { Writable } from "coral-common/types"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import collections from "../mongodb/collections"; @@ -33,11 +33,11 @@ export async function createIndex( try { // Try to create the index. - const start = now(); + const timer = createTimer(); log.debug({ indexSpec, indexOptions }, "creating index"); const indexName = await collection.createIndex(indexSpec, indexOptions); log.debug( - { indexName, indexSpec, indexOptions, took: Math.round(now() - start) }, + { indexName, indexSpec, indexOptions, took: timer() }, "index was created" ); diff --git a/src/core/server/services/migrate/manager.ts b/src/core/server/services/migrate/manager.ts index cb7a0b625..61c5c9ede 100644 --- a/src/core/server/services/migrate/manager.ts +++ b/src/core/server/services/migrate/manager.ts @@ -2,9 +2,9 @@ import fs from "fs-extra"; import { Redis } from "ioredis"; import { Db } from "mongodb"; import path from "path"; -import now from "performance-now"; import uuid from "uuid"; +import { createTimer } from "coral-server/helpers"; import logger from "coral-server/logger"; import { failMigration, @@ -177,7 +177,7 @@ export default class Manager { logger.info({ pending: pending.length }, "executing pending migrations"); - const migrationsStartTime = now(); + const migrationsStartTimer = createTimer(); for (const migration of pending) { let log = logger.child( @@ -196,11 +196,13 @@ export default class Manager { // Apply any index changes for the migration. if (migration.indexes) { - const migrationStartTime = now(); + const migrationStartTime = createTimer(); log.info("starting index migration"); await migration.indexes(mongo); - const executionTime = Math.round(now() - migrationStartTime); - log.info({ executionTime }, "finished index migration"); + log.info( + { executionTime: migrationStartTime() }, + "finished index migration" + ); } if (migration.up) { @@ -209,7 +211,7 @@ export default class Manager { for await (const tenant of this.tenantCache) { log = log.child({ tenantID: tenant.id }, true); - const migrationStartTime = now(); + const migrationStartTimer = createTimer(); log.info("starting migration"); try { @@ -241,8 +243,10 @@ export default class Manager { throw err; } - const executionTime = Math.round(now() - migrationStartTime); - log.info({ executionTime }, "finished migration"); + log.info( + { executionTime: migrationStartTimer() }, + "finished migration" + ); } } @@ -250,13 +254,11 @@ export default class Manager { await finishMigration(mongo, migration.id); } - const finishTime = Math.round(now() - migrationsStartTime); - currentMigration = await this.currentMigration(mongo); logger.info( { - finishTime, + finishTime: migrationsStartTimer(), currentMigrationID: currentMigration ? currentMigration.id : null, }, "finished running pending migrations"