From 4584c3d4fb37eb2cb7415dba2a3b00cc44f784ea Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Thu, 6 Dec 2018 23:37:33 +0000 Subject: [PATCH] [next] Moderation Queues (#2098) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: edit comment fix for reactions * feat: Comment Queue Counts - Removed "remove flag" - Rearranged moderation services - Rearranged comment counts on stories - Added moderation queue counts to stories - Added comments edge - Improved Cursor/Connection types - Improved count updators for stories * feat: added shared comment counts and queues - Added new AugmentedRedis type - Added more log calls - Update counts in shared counter as well - Return a Query level Moderation Queue * fix: fixed test --- package-lock.json | 4 +- .../stream/mutations/EditCommentMutation.ts | 3 + .../stream/test/comments/editComment.spec.tsx | 3 + src/core/server/app/index.ts | 4 +- .../server/app/middleware/context/tenant.ts | 4 +- .../server/app/middleware/passport/index.ts | 12 +- .../passport/strategies/verifiers/jwt.ts | 9 + src/core/server/graph/tenant/context.ts | 8 +- .../server/graph/tenant/loaders/Comments.ts | 11 +- .../server/graph/tenant/mutators/Actions.ts | 6 +- .../server/graph/tenant/mutators/Comment.ts | 18 +- .../server/graph/tenant/mutators/Story.ts | 8 +- .../graph/tenant/resolvers/ModerationQueue.ts | 25 + .../tenant/resolvers/ModerationQueues.ts | 54 ++ .../server/graph/tenant/resolvers/Mutation.ts | 4 - .../server/graph/tenant/resolvers/Query.ts | 17 + .../server/graph/tenant/resolvers/Story.ts | 15 +- .../server/graph/tenant/resolvers/index.ts | 4 + .../server/graph/tenant/schema/schema.graphql | 119 ++-- src/core/server/models/action/comment.spec.ts | 35 ++ src/core/server/models/action/comment.ts | 24 +- .../models/action/moderation/comment.ts | 14 +- src/core/server/models/comment.ts | 317 ++++++---- src/core/server/models/connection.ts | 26 + src/core/server/models/story/counts/empty.ts | 24 + src/core/server/models/story/counts/index.ts | 246 ++++++++ src/core/server/models/story/counts/shared.ts | 554 ++++++++++++++++++ .../models/{story.ts => story/index.ts} | 180 +----- src/core/server/services/comments/actions.ts | 150 ++--- src/core/server/services/comments/index.ts | 256 ++++++-- .../comments/moderation/counts.spec.ts | 194 ++++++ .../services/comments/moderation/counts.ts | 58 ++ .../comments/moderation/index.spec.ts | 296 ++++++---- .../services/comments/moderation/index.ts | 174 +++--- .../services/comments/pipeline/index.spec.ts | 108 ++++ .../services/comments/pipeline/index.ts | 91 +++ .../phases/commentLength.ts | 2 +- .../phases/commentingDisabled.ts | 2 +- .../{moderation => pipeline}/phases/index.ts | 2 +- .../{moderation => pipeline}/phases/karma.ts | 2 +- .../{moderation => pipeline}/phases/links.ts | 2 +- .../phases/preModerate.ts | 2 +- .../{moderation => pipeline}/phases/spam.ts | 2 +- .../{moderation => pipeline}/phases/staff.ts | 2 +- .../phases/storyClosed.spec.ts | 2 +- .../phases/storyClosed.ts | 2 +- .../{moderation => pipeline}/phases/toxic.ts | 2 +- .../phases/wordList.ts | 4 +- .../{moderation => pipeline}/wordList.spec.ts | 2 +- .../{moderation => pipeline}/wordList.ts | 3 +- src/core/server/services/moderation/index.ts | 81 --- src/core/server/services/redis/index.ts | 36 +- src/core/server/services/stories/index.ts | 19 +- 53 files changed, 2459 insertions(+), 783 deletions(-) create mode 100644 src/core/server/graph/tenant/resolvers/ModerationQueue.ts create mode 100644 src/core/server/graph/tenant/resolvers/ModerationQueues.ts create mode 100644 src/core/server/models/story/counts/empty.ts create mode 100644 src/core/server/models/story/counts/index.ts create mode 100644 src/core/server/models/story/counts/shared.ts rename src/core/server/models/{story.ts => story/index.ts} (54%) create mode 100644 src/core/server/services/comments/moderation/counts.spec.ts create mode 100644 src/core/server/services/comments/moderation/counts.ts create mode 100644 src/core/server/services/comments/pipeline/index.spec.ts create mode 100644 src/core/server/services/comments/pipeline/index.ts rename src/core/server/services/comments/{moderation => pipeline}/phases/commentLength.ts (96%) rename src/core/server/services/comments/{moderation => pipeline}/phases/commentingDisabled.ts (92%) rename src/core/server/services/comments/{moderation => pipeline}/phases/index.ts (96%) rename src/core/server/services/comments/{moderation => pipeline}/phases/karma.ts (96%) rename src/core/server/services/comments/{moderation => pipeline}/phases/links.ts (96%) rename src/core/server/services/comments/{moderation => pipeline}/phases/preModerate.ts (94%) rename src/core/server/services/comments/{moderation => pipeline}/phases/spam.ts (98%) rename src/core/server/services/comments/{moderation => pipeline}/phases/staff.ts (90%) rename src/core/server/services/comments/{moderation => pipeline}/phases/storyClosed.spec.ts (94%) rename src/core/server/services/comments/{moderation => pipeline}/phases/storyClosed.ts (93%) rename src/core/server/services/comments/{moderation => pipeline}/phases/toxic.ts (98%) rename src/core/server/services/comments/{moderation => pipeline}/phases/wordList.ts (95%) rename src/core/server/services/comments/{moderation => pipeline}/wordList.spec.ts (97%) rename src/core/server/services/comments/{moderation => pipeline}/wordList.ts (93%) delete mode 100644 src/core/server/services/moderation/index.ts diff --git a/package-lock.json b/package-lock.json index 806f7d05e..6ac28d8eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12334,7 +12334,7 @@ }, "handle-thing": { "version": "1.2.5", - "resolved": "http://registry.npmjs.org/handle-thing/-/handle-thing-1.2.5.tgz", + "resolved": "https://registry.npmjs.org/handle-thing/-/handle-thing-1.2.5.tgz", "integrity": "sha1-/Xqtcmvxpf0W38KbL3pmAdJxOcQ=", "dev": true }, @@ -12803,7 +12803,7 @@ }, "http-proxy-middleware": { "version": "0.18.0", - "resolved": "http://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-0.18.0.tgz", + "resolved": "https://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-0.18.0.tgz", "integrity": "sha512-Fs25KVMPAIIcgjMZkVHJoKg9VcXcC1C8yb9JUgeDvVXY0S/zgVIhMb+qVswDIgtJe2DfckMSY2d6TuTEutlk6Q==", "dev": true, "requires": { diff --git a/src/core/client/stream/mutations/EditCommentMutation.ts b/src/core/client/stream/mutations/EditCommentMutation.ts index 3eee01303..0b85b6a26 100644 --- a/src/core/client/stream/mutations/EditCommentMutation.ts +++ b/src/core/client/stream/mutations/EditCommentMutation.ts @@ -19,6 +19,9 @@ const mutation = graphql` editComment(input: $input) { comment { body + revision { + id + } editing { edited } diff --git a/src/core/client/stream/test/comments/editComment.spec.tsx b/src/core/client/stream/test/comments/editComment.spec.tsx index a669501f0..f9e626b27 100644 --- a/src/core/client/stream/test/comments/editComment.spec.tsx +++ b/src/core/client/stream/test/comments/editComment.spec.tsx @@ -40,6 +40,9 @@ function createTestRenderer() { editing: { edited: true, }, + revision: { + id: stories[0].comments.edges[0].node.revision.id, + }, }, clientMutationId: "0", }) diff --git a/src/core/server/app/index.ts b/src/core/server/app/index.ts index d9d6160a8..263cee64e 100644 --- a/src/core/server/app/index.ts +++ b/src/core/server/app/index.ts @@ -2,7 +2,6 @@ import cons from "consolidate"; import cors from "cors"; import { Express } from "express"; import http from "http"; -import { Redis } from "ioredis"; import { Db } from "mongodb"; import nunjucks from "nunjucks"; import path from "path"; @@ -16,6 +15,7 @@ import { handleSubscriptions } from "talk-server/graph/common/subscriptions/midd import { Schemas } from "talk-server/graph/schemas"; import { TaskQueue } from "talk-server/queue"; import { JWTSigningConfig } from "talk-server/services/jwt"; +import { AugmentedRedis } from "talk-server/services/redis"; import TenantCache from "talk-server/services/tenant/cache"; import { accessLogger, errorLogger } from "./middleware/logging"; @@ -27,7 +27,7 @@ export interface AppOptions { queue: TaskQueue; config: Config; mongo: Db; - redis: Redis; + redis: AugmentedRedis; schemas: Schemas; signingConfig: JWTSigningConfig; tenantCache: TenantCache; diff --git a/src/core/server/app/middleware/context/tenant.ts b/src/core/server/app/middleware/context/tenant.ts index f887ac71d..7aca25892 100644 --- a/src/core/server/app/middleware/context/tenant.ts +++ b/src/core/server/app/middleware/context/tenant.ts @@ -1,15 +1,15 @@ import { RequestHandler } from "express-jwt"; -import { Redis } from "ioredis"; import { Db } from "mongodb"; import { Config } from "talk-common/config"; import TenantContext from "talk-server/graph/tenant/context"; import { TaskQueue } from "talk-server/queue"; +import { AugmentedRedis } from "talk-server/services/redis"; import { Request } from "talk-server/types/express"; export interface TenantContextMiddlewareOptions { mongo: Db; - redis: Redis; + redis: AugmentedRedis; queue: TaskQueue; config: Config; } diff --git a/src/core/server/app/middleware/passport/index.ts b/src/core/server/app/middleware/passport/index.ts index 85cf5c8fa..22ecf0a42 100644 --- a/src/core/server/app/middleware/passport/index.ts +++ b/src/core/server/app/middleware/passport/index.ts @@ -4,6 +4,7 @@ import Joi from "joi"; import jwt from "jsonwebtoken"; import { Db } from "mongodb"; import passport, { Authenticator } from "passport"; +import now from "performance-now"; import { Config } from "talk-common/config"; import FacebookStrategy from "talk-server/app/middleware/passport/strategies/facebook"; @@ -12,6 +13,7 @@ import { JWTStrategy } from "talk-server/app/middleware/passport/strategies/jwt" import { createLocalStrategy } from "talk-server/app/middleware/passport/strategies/local"; import OIDCStrategy from "talk-server/app/middleware/passport/strategies/oidc"; import { validate } from "talk-server/app/request/body"; +import logger from "talk-server/logger"; import { User } from "talk-server/models/user"; import { blacklistJWT, @@ -149,11 +151,18 @@ export const wrapAuthn = ( signingConfig: JWTSigningConfig, name: string, options?: any -): RequestHandler => (req: Request, res, next) => +): RequestHandler => (req: Request, res, next) => { + const startTime = now(); + authenticator.authenticate( name, { ...options, session: false }, (err: Error | null, user: User | null) => { + // Compute the end time. + const responseTime = Math.round(now() - startTime); + + logger.debug({ responseTime }, "user token generated"); + if (err) { return next(err); } @@ -165,3 +174,4 @@ export const wrapAuthn = ( handleSuccessfulLogin(user, signingConfig, req, res, next); } )(req, res, next); +}; diff --git a/src/core/server/app/middleware/passport/strategies/verifiers/jwt.ts b/src/core/server/app/middleware/passport/strategies/verifiers/jwt.ts index 94c040a1c..904955bfb 100644 --- a/src/core/server/app/middleware/passport/strategies/verifiers/jwt.ts +++ b/src/core/server/app/middleware/passport/strategies/verifiers/jwt.ts @@ -1,7 +1,9 @@ import { Redis } from "ioredis"; import jwt from "jsonwebtoken"; import { Db } from "mongodb"; +import now from "performance-now"; +import logger from "talk-server/logger"; import { Tenant } from "talk-server/models/tenant"; import { retrieveUser } from "talk-server/models/user"; import { checkBlacklistJWT, JWTSigningConfig } from "talk-server/services/jwt"; @@ -46,12 +48,19 @@ export class JWTVerifier { } public async verify(tokenString: string, token: JWTToken, tenant: Tenant) { + const startTime = now(); + // Verify that the token is valid. This will throw an error if it isn't. jwt.verify(tokenString, this.signingConfig.secret, { issuer: tenant.id, algorithms: [this.signingConfig.algorithm], }); + // Compute the end time. + const responseTime = Math.round(now() - startTime); + + logger.trace({ responseTime }, "jwt verification complete"); + // Check to see if the token has been blacklisted, as these tokens can be // revoked. await checkBlacklistJWT(this.redis, token.jti); diff --git a/src/core/server/graph/tenant/context.ts b/src/core/server/graph/tenant/context.ts index 03f30d72a..d4e39f62e 100644 --- a/src/core/server/graph/tenant/context.ts +++ b/src/core/server/graph/tenant/context.ts @@ -1,20 +1,20 @@ -import { Redis } from "ioredis"; import { Db } from "mongodb"; +import { Config } from "talk-common/config"; import CommonContext from "talk-server/graph/common/context"; import { Tenant } from "talk-server/models/tenant"; import { User } from "talk-server/models/user"; import { TaskQueue } from "talk-server/queue"; +import { AugmentedRedis } from "talk-server/services/redis"; import TenantCache from "talk-server/services/tenant/cache"; import { Request } from "talk-server/types/express"; -import { Config } from "talk-common/config"; import loaders from "./loaders"; import mutators from "./mutators"; export interface TenantContextOptions { mongo: Db; - redis: Redis; + redis: AugmentedRedis; tenant: Tenant; tenantCache: TenantCache; queue: TaskQueue; @@ -28,7 +28,7 @@ export default class TenantContext extends CommonContext { public readonly tenantCache: TenantCache; public readonly user?: User; public readonly mongo: Db; - public readonly redis: Redis; + public readonly redis: AugmentedRedis; public readonly queue: TaskQueue; public readonly loaders: ReturnType; public readonly mutators: ReturnType; diff --git a/src/core/server/graph/tenant/loaders/Comments.ts b/src/core/server/graph/tenant/loaders/Comments.ts index d3845bd24..7db234394 100644 --- a/src/core/server/graph/tenant/loaders/Comments.ts +++ b/src/core/server/graph/tenant/loaders/Comments.ts @@ -6,11 +6,13 @@ import { CommentToRepliesArgs, GQLActionPresence, GQLCOMMENT_SORT, + QueryToCommentsArgs, StoryToCommentsArgs, } from "talk-server/graph/tenant/schema/__generated__/types"; import { retrieveManyUserActionPresence } from "talk-server/models/action/comment"; import { Comment, + retrieveCommentConnection, retrieveCommentParentsConnection, retrieveCommentRepliesConnection, retrieveCommentStoryConnection, @@ -40,6 +42,13 @@ export default (ctx: Context) => ({ comment: new DataLoader((ids: string[]) => retrieveManyComments(ctx.mongo, ctx.tenant.id, ids) ), + forFilter: ({ first = 10, after, filter }: QueryToCommentsArgs) => + retrieveCommentConnection(ctx.mongo, ctx.tenant.id, { + first, + after, + orderBy: GQLCOMMENT_SORT.CREATED_AT_DESC, + filter, + }).then(primeCommentsFromConnection(ctx)), retrieveMyActionPresence: new DataLoader( (commentIDs: string[]) => retrieveManyUserActionPresence( @@ -63,7 +72,7 @@ export default (ctx: Context) => ({ first, orderBy, after, - }), + }).then(primeCommentsFromConnection(ctx)), forStory: ( storyID: string, // Apply the graph schema defaults at the loader. diff --git a/src/core/server/graph/tenant/mutators/Actions.ts b/src/core/server/graph/tenant/mutators/Actions.ts index f165f783f..d30eb4e6d 100644 --- a/src/core/server/graph/tenant/mutators/Actions.ts +++ b/src/core/server/graph/tenant/mutators/Actions.ts @@ -1,5 +1,5 @@ import TenantContext from "talk-server/graph/tenant/context"; -import { accept, reject } from "talk-server/services/moderation"; +import { accept, reject } from "talk-server/services/comments/moderation"; import { GQLAcceptCommentInput, GQLRejectCommentInput, @@ -7,13 +7,13 @@ import { export const Actions = (ctx: TenantContext) => ({ acceptComment: (input: GQLAcceptCommentInput) => - accept(ctx.mongo, ctx.tenant, { + accept(ctx.mongo, ctx.redis, ctx.tenant, { commentID: input.commentID, commentRevisionID: input.commentRevisionID, moderatorID: ctx.user!.id, }), rejectComment: (input: GQLRejectCommentInput) => - reject(ctx.mongo, ctx.tenant, { + reject(ctx.mongo, ctx.redis, ctx.tenant, { commentID: input.commentID, commentRevisionID: input.commentRevisionID, moderatorID: ctx.user!.id, diff --git a/src/core/server/graph/tenant/mutators/Comment.ts b/src/core/server/graph/tenant/mutators/Comment.ts index 1b984df89..072f46631 100644 --- a/src/core/server/graph/tenant/mutators/Comment.ts +++ b/src/core/server/graph/tenant/mutators/Comment.ts @@ -7,7 +7,6 @@ import { GQLCreateCommentReplyInput, GQLEditCommentInput, GQLRemoveCommentDontAgreeInput, - GQLRemoveCommentFlagInput, GQLRemoveCommentReactionInput, } from "talk-server/graph/tenant/schema/__generated__/types"; import { create, edit } from "talk-server/services/comments"; @@ -16,7 +15,6 @@ import { createFlag, createReaction, removeDontAgree, - removeFlag, removeReaction, } from "talk-server/services/comments/actions"; @@ -27,6 +25,7 @@ export const Comment = (ctx: TenantContext) => ({ }: GQLCreateCommentInput | GQLCreateCommentReplyInput) => create( ctx.mongo, + ctx.redis, ctx.tenant, ctx.user!, { authorID: ctx.user!.id, ...comment }, @@ -35,6 +34,7 @@ export const Comment = (ctx: TenantContext) => ({ edit: ({ commentID, body }: GQLEditCommentInput) => edit( ctx.mongo, + ctx.redis, ctx.tenant, ctx.user!, { @@ -47,24 +47,24 @@ export const Comment = (ctx: TenantContext) => ({ commentID, commentRevisionID, }: GQLCreateCommentReactionInput) => - createReaction(ctx.mongo, ctx.tenant, ctx.user!, { + createReaction(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, { commentID, commentRevisionID, }), removeReaction: ({ commentID }: GQLRemoveCommentReactionInput) => - removeReaction(ctx.mongo, ctx.tenant, ctx.user!, { + removeReaction(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, { commentID, }), createDontAgree: ({ commentID, commentRevisionID, }: GQLCreateCommentDontAgreeInput) => - createDontAgree(ctx.mongo, ctx.tenant, ctx.user!, { + createDontAgree(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, { commentID, commentRevisionID, }), removeDontAgree: ({ commentID }: GQLRemoveCommentDontAgreeInput) => - removeDontAgree(ctx.mongo, ctx.tenant, ctx.user!, { + removeDontAgree(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, { commentID, }), createFlag: ({ @@ -72,13 +72,9 @@ export const Comment = (ctx: TenantContext) => ({ commentRevisionID, reason, }: GQLCreateCommentFlagInput) => - createFlag(ctx.mongo, ctx.tenant, ctx.user!, { + createFlag(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, { commentID, commentRevisionID, reason, }), - removeFlag: ({ commentID }: GQLRemoveCommentFlagInput) => - removeFlag(ctx.mongo, ctx.tenant, ctx.user!, { - commentID, - }), }); diff --git a/src/core/server/graph/tenant/mutators/Story.ts b/src/core/server/graph/tenant/mutators/Story.ts index 014d5aa56..524791f99 100644 --- a/src/core/server/graph/tenant/mutators/Story.ts +++ b/src/core/server/graph/tenant/mutators/Story.ts @@ -30,7 +30,13 @@ export const Story = (ctx: TenantContext) => ({ merge: async ( input: GQLMergeStoriesInput ): Promise | null> => - merge(ctx.mongo, ctx.tenant, input.destinationID, input.sourceIDs), + merge( + ctx.mongo, + ctx.redis, + ctx.tenant, + input.destinationID, + input.sourceIDs + ), remove: async ( input: GQLRemoveStoryInput ): Promise | null> => diff --git a/src/core/server/graph/tenant/resolvers/ModerationQueue.ts b/src/core/server/graph/tenant/resolvers/ModerationQueue.ts new file mode 100644 index 000000000..40d0950a2 --- /dev/null +++ b/src/core/server/graph/tenant/resolvers/ModerationQueue.ts @@ -0,0 +1,25 @@ +import { + CommentConnectionInput, + retrieveCommentConnection, +} from "talk-server/models/comment"; +import { + GQLCOMMENT_SORT, + GQLModerationQueueTypeResolver, +} from "../schema/__generated__/types"; + +export interface ModerationQueueInput { + connection: Partial; + count: number; +} + +export const ModerationQueue: GQLModerationQueueTypeResolver< + ModerationQueueInput +> = { + comments: ({ connection }, { first = 10, after }, { mongo, tenant }) => + retrieveCommentConnection(mongo, tenant.id, { + ...connection, + first, + after, + orderBy: GQLCOMMENT_SORT.CREATED_AT_DESC, + }), +}; diff --git a/src/core/server/graph/tenant/resolvers/ModerationQueues.ts b/src/core/server/graph/tenant/resolvers/ModerationQueues.ts new file mode 100644 index 000000000..03bc6ca56 --- /dev/null +++ b/src/core/server/graph/tenant/resolvers/ModerationQueues.ts @@ -0,0 +1,54 @@ +import { CommentConnectionInput } from "talk-server/models/comment"; +import { FilterQuery } from "talk-server/models/query"; +import { CommentModerationCountsPerQueue } from "talk-server/models/story"; +import { + PENDING_STATUS, + REPORTED_STATUS, + UNMODERATED_STATUSES, +} from "talk-server/services/comments/moderation/counts"; + +import { GQLModerationQueuesTypeResolver } from "../schema/__generated__/types"; +import { ModerationQueueInput } from "./ModerationQueue"; + +export interface ModerationQueuesInput { + connection: Partial; + counts: CommentModerationCountsPerQueue; +} + +const mergeModerationInputFilters = ( + filter: FilterQuery, + selector: keyof CommentModerationCountsPerQueue +) => (input: ModerationQueuesInput): ModerationQueueInput => ({ + connection: { + ...input.connection, + filter: { + ...input.connection.filter, + ...filter, + }, + }, + count: input.counts[selector], +}); + +export const ModerationQueues: GQLModerationQueuesTypeResolver< + ModerationQueuesInput +> = { + unmoderated: mergeModerationInputFilters( + { + status: { $in: UNMODERATED_STATUSES }, + }, + "unmoderated" + ), + reported: mergeModerationInputFilters( + { + status: { $in: REPORTED_STATUS }, + "actionCounts.FLAG": { $gt: 0 }, + }, + "reported" + ), + pending: mergeModerationInputFilters( + { + status: { $in: PENDING_STATUS }, + }, + "pending" + ), +}; diff --git a/src/core/server/graph/tenant/resolvers/Mutation.ts b/src/core/server/graph/tenant/resolvers/Mutation.ts index ccd54174c..3bf571c42 100644 --- a/src/core/server/graph/tenant/resolvers/Mutation.ts +++ b/src/core/server/graph/tenant/resolvers/Mutation.ts @@ -49,10 +49,6 @@ export const Mutation: Required> = { comment: await ctx.mutators.Comment.createFlag(input), clientMutationId: input.clientMutationId, }), - removeCommentFlag: async (source, { input }, ctx) => ({ - comment: await ctx.mutators.Comment.removeFlag(input), - clientMutationId: input.clientMutationId, - }), regenerateSSOKey: async (source, { input }, ctx) => ({ settings: await ctx.mutators.Settings.regenerateSSOKey(), clientMutationId: input.clientMutationId, diff --git a/src/core/server/graph/tenant/resolvers/Query.ts b/src/core/server/graph/tenant/resolvers/Query.ts index 879fe2f43..e6fec375c 100644 --- a/src/core/server/graph/tenant/resolvers/Query.ts +++ b/src/core/server/graph/tenant/resolvers/Query.ts @@ -1,13 +1,30 @@ import { GQLQueryTypeResolver } from "talk-server/graph/tenant/schema/__generated__/types"; +import { retrieveSharedModerationQueueQueuesCounts } from "talk-server/models/story"; +import { ModerationQueuesInput } from "./ModerationQueues"; export const Query: GQLQueryTypeResolver = { story: (source, args, ctx) => ctx.loaders.Stories.findOrCreate(args), comment: (source, { id }, ctx) => id ? ctx.loaders.Comments.comment.load(id) : null, + comments: (source, args, ctx) => ctx.loaders.Comments.forFilter(args), settings: (source, args, ctx) => ctx.tenant, me: (source, args, ctx) => ctx.user, discoverOIDCConfiguration: (source, { issuer }, ctx) => ctx.loaders.Auth.discoverOIDCConfiguration.load(issuer), debugScrapeStoryMetadata: (source, { url }, ctx) => ctx.loaders.Stories.debugScrapeMetadata.load(url), + moderationQueues: async ( + source, + args, + ctx + ): Promise => ({ + // We don't need to filter the connection, as this is tenant wide (tenant + // filtering is completed at the model layer). + connection: {}, + counts: await retrieveSharedModerationQueueQueuesCounts( + ctx.mongo, + ctx.redis, + ctx.tenant.id + ), + }), }; diff --git a/src/core/server/graph/tenant/resolvers/Story.ts b/src/core/server/graph/tenant/resolvers/Story.ts index b2996e2e1..b9cba2f53 100644 --- a/src/core/server/graph/tenant/resolvers/Story.ts +++ b/src/core/server/graph/tenant/resolvers/Story.ts @@ -3,6 +3,7 @@ import { DateTime } from "luxon"; import { GQLStoryTypeResolver } from "talk-server/graph/tenant/schema/__generated__/types"; import { decodeActionCounts } from "talk-server/models/action/comment"; import * as story from "talk-server/models/story"; +import { ModerationQueuesInput } from "./ModerationQueues"; export const Story: GQLStoryTypeResolver = { comments: (s, input, ctx) => ctx.loaders.Comments.forStory(s.id, input), @@ -20,6 +21,16 @@ export const Story: GQLStoryTypeResolver = { return null; }, - commentActionCounts: s => decodeActionCounts(s.commentActionCounts), - commentCounts: s => s.commentCounts, + commentActionCounts: s => decodeActionCounts(s.commentCounts.action), + commentCounts: s => s.commentCounts.status, + moderationQueues: (s): ModerationQueuesInput => ({ + connection: { + filter: { + // This moderationQueues is being sourced from the Story, so require + // that all the comments for theses queues are also for this Story. + storyID: s.id, + }, + }, + counts: s.commentCounts.moderationQueue.queues, + }), }; diff --git a/src/core/server/graph/tenant/resolvers/index.ts b/src/core/server/graph/tenant/resolvers/index.ts index 50266a9be..e131b355d 100644 --- a/src/core/server/graph/tenant/resolvers/index.ts +++ b/src/core/server/graph/tenant/resolvers/index.ts @@ -10,6 +10,8 @@ import { CommentModerationAction } from "./CommentModerationAction"; import { CommentRevision } from "./CommentRevision"; import { FacebookAuthIntegration } from "./FacebookAuthIntegration"; import { GoogleAuthIntegration } from "./GoogleAuthIntegration"; +import { ModerationQueue } from "./ModerationQueue"; +import { ModerationQueues } from "./ModerationQueues"; import { Mutation } from "./Mutation"; import { OIDCAuthIntegration } from "./OIDCAuthIntegration"; import { Profile } from "./Profile"; @@ -25,6 +27,8 @@ const Resolvers: GQLResolver = { CommentRevision, Cursor, Mutation, + ModerationQueue, + ModerationQueues, OIDCAuthIntegration, FacebookAuthIntegration, GoogleAuthIntegration, diff --git a/src/core/server/graph/tenant/schema/schema.graphql b/src/core/server/graph/tenant/schema/schema.graphql index f09176cc4..d39819904 100644 --- a/src/core/server/graph/tenant/schema/schema.graphql +++ b/src/core/server/graph/tenant/schema/schema.graphql @@ -240,6 +240,50 @@ type WordList { suspect: [String!]! } +################################################################################ +## Moderation +################################################################################ + +""" +ModerationQueue returns the Comments associated with a Moderation Queue. +""" +type ModerationQueue { + """ + count will return the number of Comments that are in this queue. + """ + count: Int! + + """ + comments are the comments on the ModerationQueue. + """ + comments(first: Int = 10, after: Cursor): CommentsConnection! +} + +""" +ModerationQueues are the list of ModerationQueue's that are supported inside +Talk that can be used to moderate Comments. +""" +type ModerationQueues { + """ + unmoderated will return a ModerationQueue for all Comments that have not been + moderated yet. + """ + unmoderated: ModerationQueue! + + """ + reported will return a ModerationQueue for all Comments that have been + published, have not been moderated by a human yet, and have been reported by + a User via a flag. + """ + reported: ModerationQueue! + + """ + pending will return a ModerationQueue for all Comments that were held back by + the system and require moderation in order to be published. + """ + pending: ModerationQueue! +} + ################################################################################ ## Auth ################################################################################ @@ -1249,7 +1293,7 @@ type PageInfo { } """ -CommentEdge represents a unique Comment in a CommentConnection. +CommentEdge represents a unique Comment in a CommentsConnection. """ type CommentEdge { """ @@ -1415,6 +1459,12 @@ type Story { """ commentActionCounts: ActionCounts! @auth(roles: [ADMIN, MODERATOR]) + """ + moderationQueues returns the set of ModerationQueues that are available for + this Story. + """ + moderationQueues: ModerationQueues @auth(roles: [ADMIN, MODERATOR]) + """ closedAt is the Time that the Story is closed for commenting. """ @@ -1466,6 +1516,23 @@ type StoriesConnection { pageInfo: PageInfo! } +################################################################################ +## CommentsFilterInput +################################################################################ + +input CommentsFilterInput { + """ + storyID when specified, will filter to show only Comment's on that Story. + """ + storyID: ID + + """ + status when specified, will filter to show only Comment's with that + COMMENT_STATUS. + """ + status: COMMENT_STATUS +} + ################################################################################ ## Query ################################################################################ @@ -1476,6 +1543,15 @@ type Query { """ comment(id: ID!): Comment + """ + comments returns a filtered comments connection that can be paginated. + """ + comments( + first: Int = 10 + after: Cursor + filter: CommentsFilterInput! + ): CommentsConnection @auth(roles: [ADMIN, MODERATOR]) + """ story is the Story specified by its ID/URL. """ @@ -1510,6 +1586,12 @@ type Query { on this scrape. """ debugScrapeStoryMetadata(url: String!): StoryMetadata @auth(roles: [ADMIN]) + + """ + moderationQueues returns the set of ModerationQueues that are available for + all stories. + """ + moderationQueues: ModerationQueues @auth(roles: [ADMIN, MODERATOR]) } ################################################################################ @@ -2233,34 +2315,6 @@ type CreateCommentFlagPayload { clientMutationId: String! } -################## -## removeCommentFlag -################## - -input RemoveCommentFlagInput { - """ - commentID is the Comment's ID that we want to remove a Flag on. - """ - commentID: ID! - - """ - clientMutationId is required for Relay support. - """ - clientMutationId: String! -} - -type RemoveCommentFlagPayload { - """ - comment is the Comment that the Flag was removed on. - """ - comment: Comment - - """ - clientMutationId is required for Relay support. - """ - clientMutationId: String! -} - ################## ## regenerateSSOKey ################## @@ -2936,13 +2990,6 @@ type Mutation { createCommentFlag(input: CreateCommentFlagInput!): CreateCommentFlagPayload @auth - """ - removeCommentFlag will create a Flag authored by the current logged in User on - a given Comment. - """ - removeCommentFlag(input: RemoveCommentFlagInput!): RemoveCommentFlagPayload - @auth - """ createStory will create the provided Story. """ diff --git a/src/core/server/models/action/comment.spec.ts b/src/core/server/models/action/comment.spec.ts index 40c1cf837..fbac9adb7 100644 --- a/src/core/server/models/action/comment.spec.ts +++ b/src/core/server/models/action/comment.spec.ts @@ -2,8 +2,10 @@ import { GQLCOMMENT_FLAG_REASON } from "talk-server/graph/tenant/schema/__genera import { ACTION_TYPE, CommentAction, + CreateActionInput, decodeActionCounts, encodeActionCounts, + filterDuplicateActions, validateAction, } from "talk-server/models/action/comment"; @@ -119,3 +121,36 @@ describe("#validateAction", () => { } }); }); + +describe("#filterDuplicateActions", () => { + it("removes duplicate action items", () => { + const actions: CreateActionInput[] = [ + { + storyID: "1", + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_BANNED_WORD, + commentID: "1", + commentRevisionID: "1", + userID: null, + }, + { + storyID: "1", + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_BANNED_WORD, + commentID: "1", + commentRevisionID: "1", + userID: null, + }, + { + storyID: "1", + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SUSPECT_WORD, + commentID: "1", + commentRevisionID: "1", + userID: null, + }, + ]; + + expect(filterDuplicateActions(actions)).toHaveLength(2); + }); +}); diff --git a/src/core/server/models/action/comment.ts b/src/core/server/models/action/comment.ts index 98926b4b2..1f4c6d645 100644 --- a/src/core/server/models/action/comment.ts +++ b/src/core/server/models/action/comment.ts @@ -1,5 +1,5 @@ import Joi from "joi"; -import { camelCase, pick } from "lodash"; +import { camelCase, isEqual, omit, pick, uniqWith } from "lodash"; import { Db } from "mongodb"; import uuid from "uuid"; @@ -76,11 +76,9 @@ export interface CommentAction extends TenantResource { reason?: FLAG_REASON; /** - * storyID represents the identifier for the item's associated item. In - * the case of a REACTION left on a Comment, this ID would be the Stories ID. - * In the case of a FLAG left on a User, this ID would be null. + * storyID represents the ID of the Story where the comment was left on. */ - storyID?: string; + storyID: string; /** * userID is the ID of the User that left this Action. In the event that the @@ -159,6 +157,12 @@ export interface CreateActionResultObject { wasUpserted: boolean; } +export function filterDuplicateActions(actions: T[]): T[] { + return uniqWith(actions, (first, second) => + isEqual(omit(first, "metadata"), omit(second, "metadata")) + ); +} + export async function createAction( mongo: Db, tenantID: string, @@ -351,7 +355,7 @@ export const ACTION_COUNT_JOIN_CHAR = "__"; * @param actions list of actions to generate the action counts from */ export function encodeActionCounts( - ...actions: CommentAction[] + ...actions: Array> ): EncodedCommentActionCounts { const actionCounts: EncodedCommentActionCounts = {}; @@ -394,7 +398,9 @@ export function invertEncodedActionCounts( * encodeActionCountKeys encodes the action into string keys which represents * the groupings as seen in `EncodedActionCounts`. */ -function encodeActionCountKeys(action: CommentAction): string[] { +function encodeActionCountKeys( + action: Pick +): string[] { const keys = [action.actionType as string]; if (action.reason) { keys.push( @@ -495,7 +501,7 @@ function createEmptyActionCounts(): GQLActionCounts { } export function mergeCommentActionCounts( - actionCounts: EncodedCommentActionCounts[] + ...actionCounts: EncodedCommentActionCounts[] ): EncodedCommentActionCounts { const mergedActionCounts: EncodedCommentActionCounts = {}; @@ -504,7 +510,7 @@ export function mergeCommentActionCounts( if (key in mergedActionCounts) { mergedActionCounts[key] += count; } else { - mergedActionCounts[key] += 1; + mergedActionCounts[key] = count; } } } diff --git a/src/core/server/models/action/moderation/comment.ts b/src/core/server/models/action/moderation/comment.ts index 50a00fb89..09afd9c0e 100644 --- a/src/core/server/models/action/moderation/comment.ts +++ b/src/core/server/models/action/moderation/comment.ts @@ -5,7 +5,7 @@ import { Omit, Sub } from "talk-common/types"; import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; import { Connection, - Cursor, + ConnectionInput, getPageInfo, nodesToEdges, } from "talk-server/models/connection"; @@ -107,16 +107,14 @@ export async function retrieveCommentModerationActions( return result.toArray(); } -export interface ConnectionInput { - first: number; - after?: Cursor; - filter?: CommentModerationActionFilter; -} +export type CommentModerationActionConnectionInput = ConnectionInput< + CommentModerationAction +>; export async function retrieveCommentModerationActionConnection( mongo: Db, tenantID: string, - input: ConnectionInput + input: CommentModerationActionConnectionInput ): Promise>>> { // Create the query. const query = new Query(collection(mongo)).where({ tenantID }); @@ -130,7 +128,7 @@ export async function retrieveCommentModerationActionConnection( } async function retrieveConnection( - input: ConnectionInput, + input: CommentModerationActionConnectionInput, query: Query ): Promise>>> { // Apply the cursor to the query. Currently only supporting sorting by the diff --git a/src/core/server/models/comment.ts b/src/core/server/models/comment.ts index a30cf65c6..4dad31a5a 100644 --- a/src/core/server/models/comment.ts +++ b/src/core/server/models/comment.ts @@ -1,3 +1,4 @@ +import { isEmpty, merge } from "lodash"; import { Db } from "mongodb"; import uuid from "uuid"; @@ -7,20 +8,23 @@ import { GQLCOMMENT_SORT, GQLCOMMENT_STATUS, } from "talk-server/graph/tenant/schema/__generated__/types"; -import { EncodedCommentActionCounts } from "talk-server/models/action/comment"; +import { + EncodedCommentActionCounts, + mergeCommentActionCounts, +} from "talk-server/models/action/comment"; import { Connection, createConnection, - Cursor, getPageInfo, nodesToEdges, NodeToCursorTransformer, + OrderedConnectionInput, } from "talk-server/models/connection"; import Query from "talk-server/models/query"; import { TenantResource } from "talk-server/models/tenant"; -function collection(db: Db) { - return db.collection>("comments"); +function collection(mongo: Db) { + return mongo.collection>("comments"); } /** @@ -143,23 +147,24 @@ export type CreateCommentInput = Omit< | "actionCounts" | "revisions" > & - Required>; + Required> & + Partial>; export async function createComment( - db: Db, + mongo: Db, tenantID: string, input: CreateCommentInput ) { const createdAt = new Date(); // Pull out some useful properties from the input. - const { body, ...rest } = input; + const { body, actionCounts = {}, ...rest } = input; // Generate the revision. const revision: Revision = { id: uuid.v4(), body, - actionCounts: {}, + actionCounts, createdAt, }; @@ -168,21 +173,24 @@ export async function createComment( const defaults: Sub = { id: uuid.v4(), tenantID, - createdAt, replyIDs: [], replyCount: 0, - actionCounts: {}, revisions: [revision], + createdAt, }; // Merge the defaults and the input together. const comment: Readonly = { + // Defaults for things that always stay the same, or are computed. ...defaults, + // Rest for things that are passed in and are not actionCounts. ...rest, + // ActionCounts because they may be passed in! + actionCounts, }; // Insert it into the database. - await collection(db).insertOne(comment); + await collection(mongo).insertOne(comment); return comment; } @@ -222,22 +230,70 @@ export type EditCommentInput = Pick< * `editCommentWindowLength` property. */ lastEditableCommentCreatedAt: Date; -} & Required>; +} & Required> & + Partial>; +// Only comments with the following status's can be edited. +const EDITABLE_STATUSES = [ + GQLCOMMENT_STATUS.NONE, + GQLCOMMENT_STATUS.PREMOD, + GQLCOMMENT_STATUS.ACCEPTED, +]; + +export function validateEditable( + comment: Comment, + { + authorID, + lastEditableCommentCreatedAt, + }: Pick +) { + if (comment.authorID !== authorID) { + // TODO: (wyattjoh) return better error + throw new Error("comment author mismatch"); + } + + // Check to see if the comment had a status that was editable. + if (!EDITABLE_STATUSES.includes(comment.status)) { + // TODO: (wyattjoh) return better error + throw new Error("comment status is not editable"); + } + + // Check to see if the edit window expired. + if (comment.createdAt <= lastEditableCommentCreatedAt) { + // TODO: (wyattjoh) return better error + throw new Error("edit window expired"); + } +} + +export interface EditComment { + /** + * oldComment is the Comment that was previously set. + */ + oldComment: Comment; + + /** + * editedComment is the Comment after the edit was performed. + */ + editedComment: Comment; + + /** + * newRevision returns the new revision that was created in the Comment. + */ + newRevision: Revision; +} + +/** + * editComment will edit a comment if it's within the time allotment. + * + * @param mongo MongoDB database handle + * @param tenantID ID for the Tenant where the Comment exists + * @param input input for editing the comment + */ export async function editComment( - db: Db, + mongo: Db, tenantID: string, input: EditCommentInput -) { - // TODO: (wyattjoh) now that we have revisions, do we really have this restriction? - - // Only comments with the following status's can be edited. - const EDITABLE_STATUSES = [ - GQLCOMMENT_STATUS.NONE, - GQLCOMMENT_STATUS.PREMOD, - GQLCOMMENT_STATUS.ACCEPTED, - ]; - +): Promise { const createdAt = new Date(); const { @@ -247,17 +303,35 @@ export async function editComment( status, authorID, metadata, + actionCounts = {}, } = input; // Generate the revision. const revision: Revision = { id: uuid.v4(), body, - actionCounts: {}, + actionCounts, createdAt, }; - const result = await collection(db).findOneAndUpdate( + const update: Record = { + $set: { + status, + // Embed all the metadata properties, this may override the existing + // metadata, but we won't replace metadata that has been recalculated. + // TODO: (wyattjoh) consider if we want to replace the metadata for edited comments instead of supplementing it + ...dotize({ metadata }), + }, + $push: { + revisions: revision, + }, + }; + if (!isEmpty(actionCounts)) { + // Action counts are being provided! Increment the base action counts too! + update.$inc = dotize({ actionCounts }); + } + + const result = await collection(mongo).findOneAndUpdate( { id, tenantID, @@ -270,64 +344,59 @@ export async function editComment( $gt: lastEditableCommentCreatedAt, }, }, + update, { - $set: { - status, - // Embed all the metadata properties, this may override the existing - // metadata, but we won't replace metadata that has been recalculated. - // TODO: (wyattjoh) consider if we want to replace the metadata for edited comments instead of supplementing it - ...dotize({ metadata }), - }, - $push: { - revisions: revision, - }, - }, - // False to return the updated document instead of the original - // document. - { returnOriginal: false } + // True to return the original document instead of the updated document. + returnOriginal: true, + } ); if (!result.value) { // Try to get the comment. - const comment = await retrieveComment(db, tenantID, id); + const comment = await retrieveComment(mongo, tenantID, id); if (!comment) { // TODO: (wyattjoh) return better error throw new Error("comment not found"); } - if (comment.authorID !== authorID) { - // TODO: (wyattjoh) return better error - throw new Error("comment author mismatch"); - } - - // Check to see if the comment had a status that was editable. - if (!EDITABLE_STATUSES.includes(comment.status)) { - // TODO: (wyattjoh) return better error - throw new Error("comment status is not editable"); - } - - // Check to see if the edit window expired. - if (comment.createdAt <= lastEditableCommentCreatedAt) { - // TODO: (wyattjoh) return better error - throw new Error("edit window expired"); - } + // Validate and potentially return with a more useful error. + validateEditable(comment, input); // TODO: (wyattjoh) return better error throw new Error("comment edit failed for an unexpected reason"); } - return result.value; + // Create a new "editedComment" where the same changes were applied to it as + // we did to the MongoDB document. + const editedComment: Comment = merge({}, result.value, { + // Add in all the $set operations. + status, + metadata, + // Merge the actionCounts from the old Comment with the new actionCounts. + actionCounts: mergeCommentActionCounts( + result.value.actionCounts, + actionCounts + ), + // Add in the $push operations. + revisions: [...result.value.revisions, revision], + }); + + return { + oldComment: result.value, + editedComment, + newRevision: revision, + }; } -export async function retrieveComment(db: Db, tenantID: string, id: string) { - return collection(db).findOne({ id, tenantID }); +export async function retrieveComment(mongo: Db, tenantID: string, id: string) { + return collection(mongo).findOne({ id, tenantID }); } export async function retrieveManyComments( - db: Db, + mongo: Db, tenantID: string, ids: string[] ) { - const cursor = await collection(db).find({ + const cursor = await collection(mongo).find({ id: { $in: ids, }, @@ -339,14 +408,13 @@ export async function retrieveManyComments( return ids.map(id => comments.find(comment => comment.id === id) || null); } -export interface ConnectionInput { - first: number; - orderBy: GQLCOMMENT_SORT; - after?: Cursor; -} +export type CommentConnectionInput = OrderedConnectionInput< + Comment, + GQLCOMMENT_SORT +>; function cursorGetterFactory( - input: Pick + input: Pick ): NodeToCursorTransformer { switch (input.orderBy) { case GQLCOMMENT_SORT.CREATED_AT_DESC: @@ -363,28 +431,25 @@ function cursorGetterFactory( * retrieveRepliesConnection returns a Connection for a given comments * replies. * - * @param db database connection + * @param mongo database connection * @param parentID the parent id for the comment to retrieve * @param input connection configuration */ -export async function retrieveCommentRepliesConnection( - db: Db, +export const retrieveCommentRepliesConnection = ( + mongo: Db, tenantID: string, storyID: string, parentID: string, - input: ConnectionInput -) { - // Create the query. - const query = new Query(collection(db)).where({ - tenantID, - storyID, - parentID, + input: CommentConnectionInput +) => + retrieveCommentConnection(mongo, tenantID, { + ...input, + filter: { + storyID, + parentID, + }, }); - // Return a connection for the comments query. - return retrieveConnection(input, query); -} - /** * retrieveCommentParentsConnection will return a comment connection used to * represent the parents of a given comment. @@ -485,54 +550,64 @@ export async function retrieveCommentParentsConnection( * retrieveStoryConnection returns a Connection for a given Stories * comments. * - * @param db database connection + * @param mongo database connection * @param storyID the Story id for the comment to retrieve * @param input connection configuration */ -export async function retrieveCommentStoryConnection( - db: Db, +export const retrieveCommentStoryConnection = ( + mongo: Db, tenantID: string, storyID: string, - input: ConnectionInput -) { - // Create the query. - const query = new Query(collection(db)).where({ - tenantID, - storyID, - // Only get Comments that are top level. If the client wants to load another - // layer, they can request another nested connection. - parentID: null, - // Only get Comment's that are visible. - status: { - $in: [GQLCOMMENT_STATUS.NONE, GQLCOMMENT_STATUS.ACCEPTED], + input: CommentConnectionInput +) => + retrieveCommentConnection(mongo, tenantID, { + ...input, + filter: { + storyID, + // Only get Comments that are top level. If the client wants to load another + // layer, they can request another nested connection. + parentID: null, + // Only get Comment's that are visible. + status: { + $in: [GQLCOMMENT_STATUS.NONE, GQLCOMMENT_STATUS.ACCEPTED], + }, }, }); - // Return a connection for the comments query. - return retrieveConnection(input, query); -} - /** * retrieveCommentUserConnection returns a Connection for a given User's * comments. * - * @param db database connection + * @param mongo database connection * @param userID the User id for the comment to retrieve * @param input connection configuration */ -export async function retrieveCommentUserConnection( - db: Db, +export const retrieveCommentUserConnection = ( + mongo: Db, tenantID: string, userID: string, - input: ConnectionInput -) { - // Create the query. - const query = new Query(collection(db)).where({ - tenantID, - authorID: userID, + input: CommentConnectionInput +) => + retrieveCommentConnection(mongo, tenantID, { + ...input, + filter: { + authorID: userID, + }, }); - // Return a connection for the comments query. +export async function retrieveCommentConnection( + mongo: Db, + tenantID: string, + input: CommentConnectionInput +): Promise>>> { + // Create the query. + const query = new Query(collection(mongo)).where({ tenantID }); + + // If a filter is being applied, filter it as well. + if (input.filter) { + query.where(input.filter); + } + return retrieveConnection(input, query); } @@ -545,7 +620,7 @@ export async function retrieveCommentUserConnection( * configuration applied */ async function retrieveConnection( - input: ConnectionInput, + input: CommentConnectionInput, query: Query ): Promise>>> { // Apply some sorting options. @@ -562,6 +637,14 @@ async function retrieveConnection( // Get the comments from the cursor. const nodes = await cursor.toArray(); + // Return a connection. + return convertNodesToConnection(input, nodes); +} + +export function convertNodesToConnection( + input: CommentConnectionInput, + nodes: Array> +) { // Convert the nodes to edges (which will include the extra edge we don't need // if there is more results). const edges = nodesToEdges(nodes, cursorGetterFactory(input)); @@ -583,7 +666,10 @@ async function retrieveConnection( }; } -function applyInputToQuery(input: ConnectionInput, query: Query) { +function applyInputToQuery( + input: CommentConnectionInput, + query: Query +) { switch (input.orderBy) { case GQLCOMMENT_SORT.CREATED_AT_DESC: query.orderBy({ createdAt: -1 }); @@ -613,7 +699,14 @@ function applyInputToQuery(input: ConnectionInput, query: Query) { } export interface UpdateCommentStatus { + /** + * comment is the updated Comment with the new status associated with it. + */ comment: Readonly; + + /** + * oldStatus is the previous status that the given Comment had. + */ oldStatus: GQLCOMMENT_STATUS; } diff --git a/src/core/server/models/connection.ts b/src/core/server/models/connection.ts index f58a850e7..9a1949e57 100644 --- a/src/core/server/models/connection.ts +++ b/src/core/server/models/connection.ts @@ -1,4 +1,5 @@ import { merge } from "lodash"; +import { FilterQuery } from "./query"; export type Cursor = Date | number | string | null; @@ -19,6 +20,31 @@ export interface Connection { pageInfo: PageInfo; } +export interface ConnectionInput { + /** + * first is the number of items to load for the connection. The returned + * amount of items may be less. + */ + first: number; + + /** + * after is an optional cursor that can be used to paginate the result set. + */ + after?: Cursor; + + /** + * filter is an optional query that can be used to constrain the result set. + */ + filter?: FilterQuery; +} + +export interface OrderedConnectionInput extends ConnectionInput { + /** + * orderBy allows ordering of the returned connection. + */ + orderBy: U; +} + /** * createConnection will create a base Connection that can be used to satisfy * the Connection interface. diff --git a/src/core/server/models/story/counts/empty.ts b/src/core/server/models/story/counts/empty.ts new file mode 100644 index 000000000..e7826319e --- /dev/null +++ b/src/core/server/models/story/counts/empty.ts @@ -0,0 +1,24 @@ +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; + +import { CommentModerationQueueCounts, CommentStatusCounts } from "."; + +export function createEmptyCommentModerationQueueCounts(): CommentModerationQueueCounts { + return { + total: 0, + queues: { + unmoderated: 0, + reported: 0, + pending: 0, + }, + }; +} + +export function createEmptyCommentStatusCounts(): CommentStatusCounts { + return { + [GQLCOMMENT_STATUS.ACCEPTED]: 0, + [GQLCOMMENT_STATUS.NONE]: 0, + [GQLCOMMENT_STATUS.PREMOD]: 0, + [GQLCOMMENT_STATUS.REJECTED]: 0, + [GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: 0, + }; +} diff --git a/src/core/server/models/story/counts/index.ts b/src/core/server/models/story/counts/index.ts new file mode 100644 index 000000000..1c17c9bca --- /dev/null +++ b/src/core/server/models/story/counts/index.ts @@ -0,0 +1,246 @@ +export * from "./empty"; +export * from "./shared"; + +import { Db } from "mongodb"; + +import { identity, isEmpty, pickBy } from "lodash"; +import { DeepPartial } from "talk-common/types"; +import { dotize } from "talk-common/utils/dotize"; +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; +import logger from "talk-server/logger"; +import { EncodedCommentActionCounts } from "talk-server/models/action/comment"; +import { AugmentedRedis } from "talk-server/services/redis"; + +import { retrieveStory, Story } from ".."; +import { createEmptyCommentStatusCounts } from "./empty"; +import { updateSharedCommentCounts } from "./shared"; + +/** + * collection provides a reference to the stories collection used by the + * counting system. + */ +function collection(db: Db) { + return db.collection>("stories"); +} + +// TODO: (wyattjoh) write a test to verify that this set of counts is always in sync with GQLCOMMENT_STATUS. + +/** + * CommentStatusCounts stores the count of Comments that have the particular + * statuses. + */ +export interface CommentStatusCounts { + [GQLCOMMENT_STATUS.ACCEPTED]: number; + [GQLCOMMENT_STATUS.NONE]: number; + [GQLCOMMENT_STATUS.PREMOD]: number; + [GQLCOMMENT_STATUS.REJECTED]: number; + [GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: number; +} + +/** + * CommentModerationCountsPerQueue stores the number of Comments that exist in + * each of the Moderation Queues. + */ +export interface CommentModerationCountsPerQueue { + /** + * unmoderated is the number of Comment's that have not been moderated. This + * includes all Comment's that are NONE, PREMOD, SYSTEM_WITHHELD. + */ + unmoderated: number; + + /** + * pending is the number of Comment's that are not published and are pending + * moderation. + */ + pending: number; + + /** + * reported is the number of Comment's that have not been moderated but have + * been flagged. + */ + reported: number; +} + +/** + * CommentModerationQueueCounts stores the number of Comments that exist in each + * of the ModerationQueue's on this Story. + */ +export interface CommentModerationQueueCounts { + /** + * total is the number of Comment's that exist in the below moderation queues. + */ + total: number; + + /** + * queues contains all the queue specific counts. + */ + queues: CommentModerationCountsPerQueue; +} + +/** + * StoryCommentCounts stores all the Comment Counts that will be stored on each + * Story. + */ +export interface StoryCommentCounts { + /** + * actionCounts stores all the action counts for all Comment's on this Story. + */ + action: EncodedCommentActionCounts; + + /** + * commentCounts stores the different counts for each comment on the Story + * according to their statuses. + */ + status: CommentStatusCounts; + + /** + * moderationQueue stores the number of Comments that exist in + * each of the ModerationQueue's on this Story. + */ + moderationQueue: CommentModerationQueueCounts; +} + +/** + * updateStoryCommentStatusCount will update a given Story's status counts. + * + * @param mongo database handle + * @param redis redis database handle + * @param tenantID ID of the Tenant where this Story is + * @param id ID of the Story where we're updating the counts + * @param status the set of counts that we will update + */ +export const updateStoryCommentStatusCount = ( + mongo: Db, + redis: AugmentedRedis, + tenantID: string, + id: string, + status: Partial +) => updateStoryCounts(mongo, redis, tenantID, id, { status }); + +/** + * updateStoryCommentModerationQueueCounts will update the moderation queue + * counts on a Story. + * + * @param mongo mongo database handle + * @param redis redis database handle + * @param tenantID ID of the Tenant where this Story is + * @param id ID of the Story where we're updating the counts + * @param moderationQueue the set of counts that we will update + */ +export const updateStoryCommentModerationQueueCounts = ( + mongo: Db, + redis: AugmentedRedis, + tenantID: string, + id: string, + moderationQueue: DeepPartial +) => updateStoryCounts(mongo, redis, tenantID, id, { moderationQueue }); + +export const updateStoryActionCounts = ( + mongo: Db, + redis: AugmentedRedis, + tenantID: string, + id: string, + action: EncodedCommentActionCounts +) => updateStoryCounts(mongo, redis, tenantID, id, { action }); + +export type StoryCounts = DeepPartial; + +/** + * updateStoryCounts will update the comment counts for the story indicated. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID ID of the Tenant where the Story is on + * @param id the ID of the Story that we are updating counts on + * @param commentCounts the counts that we are updating + */ +export async function updateStoryCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string, + id: string, + commentCounts: StoryCounts +) { + // Update all the specific comment moderation queue counts. + const update: DeepPartial = { commentCounts }; + const $inc = pickBy(dotize(update), identity); + if (isEmpty($inc)) { + return retrieveStory(mongo, tenantID, id); + } + + logger.trace({ update: { $inc } }, "incrementing story counts"); + + const result = await collection(mongo).findOneAndUpdate( + { id, tenantID }, + { $inc }, + // False to return the updated document instead of the original + // document. + { returnOriginal: false } + ); + + // Update the shared counts. + await updateSharedCommentCounts(redis, tenantID, commentCounts); + + return result.value || null; +} + +/** + * mergeCommentStatusCount will merge an array of commentStatusCount's into one. + */ +export function mergeCommentStatusCount( + statusCounts: CommentStatusCounts[] +): CommentStatusCounts { + const mergedStatusCounts = createEmptyCommentStatusCounts(); + for (const commentCounts of statusCounts) { + for (const status in commentCounts) { + if (!commentCounts.hasOwnProperty(status)) { + continue; + } + + // Because the CommentStatusCounts are not indexable, it should be accessed + // by walking the structure. + switch (status) { + case GQLCOMMENT_STATUS.ACCEPTED: + case GQLCOMMENT_STATUS.NONE: + case GQLCOMMENT_STATUS.PREMOD: + case GQLCOMMENT_STATUS.REJECTED: + case GQLCOMMENT_STATUS.SYSTEM_WITHHELD: + mergedStatusCounts[status] += commentCounts[status]; + break; + default: + throw new Error("unrecognized status"); + } + } + } + return mergedStatusCounts; +} + +/** + * calculateTotalCommentCount will compute the total amount of comments left on + * an Asset by parsing the `CommentStatusCounts`. + */ +export function calculateTotalCommentCount( + commentCounts: CommentStatusCounts +): number { + let count = 0; + for (const status in commentCounts) { + if (!commentCounts.hasOwnProperty(status)) { + continue; + } + + // Because the CommentStatusCounts are not indexable, it should be accessed + // by walking the structure. + switch (status) { + case GQLCOMMENT_STATUS.ACCEPTED: + case GQLCOMMENT_STATUS.NONE: + case GQLCOMMENT_STATUS.PREMOD: + case GQLCOMMENT_STATUS.REJECTED: + case GQLCOMMENT_STATUS.SYSTEM_WITHHELD: + count += commentCounts[status]; + break; + default: + throw new Error("unrecognized status"); + } + } + return count; +} diff --git a/src/core/server/models/story/counts/shared.ts b/src/core/server/models/story/counts/shared.ts new file mode 100644 index 000000000..f7272e847 --- /dev/null +++ b/src/core/server/models/story/counts/shared.ts @@ -0,0 +1,554 @@ +import { flattenDeep, identity, isEmpty, pickBy } from "lodash"; +import { Db } from "mongodb"; +import ms from "ms"; + +import logger from "talk-server/logger"; +import { EncodedCommentActionCounts } from "talk-server/models/action/comment"; +import { AugmentedPipeline, AugmentedRedis } from "talk-server/services/redis"; + +import { + CommentModerationCountsPerQueue, + CommentStatusCounts, + StoryCounts, +} from "."; +import { Story } from ".."; +import { + createEmptyCommentModerationQueueCounts, + createEmptyCommentStatusCounts, +} from "./empty"; + +/** + * COUNT_FRESHNESS_EXPIRY will expire the :fresh key for cached counts that will + * trigger a recalculation of the cached count values after the time that it + * last computed it plus the below time in seconds elapsed. + */ +const COUNT_FRESHNESS_EXPIRY_SECONDS = Math.floor(ms("24h") / 1000); + +/** + * shared keys + */ + +const freshenKey = (key: string) => `${key}:fresh`; + +const commentCountsActionKey = (tenantID: string) => + `${tenantID}:commentCounts:action`; + +const commentCountsStatusKey = (tenantID: string) => + `${tenantID}:commentCounts:status`; + +const commentCountsModerationQueueTotalKey = (tenantID: string) => + `${tenantID}:commentCounts:moderationQueue:total`; + +const commentCountsModerationQueueQueuesKey = (tenantID: string) => + `${tenantID}:commentCounts:moderationQueue:queues`; + +/** + * collection provides a reference to the stories collection used by the + * counting system. + */ +function collection(db: Db) { + return db.collection>("stories"); +} + +/** + * recalculateSharedModerationQueueQueueCounts will reset the counts stored for + * this Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the tenant ID that we are resetting the counts for + */ +export async function recalculateSharedModerationQueueQueueCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const now = new Date(); + const key = commentCountsModerationQueueQueuesKey(tenantID); + const freshKey = freshenKey(key); + + // Clear the existing cached queues. + await redis.del(key, freshKey); + + // Fetch all the moderation queue counts. + const queueResults = await collection<{ + _id: string; + total: number; + }>(mongo).aggregate([ + { + $match: { tenantID, createdAt: { $lt: now } }, + }, + { + $project: { + moderationQueue: { + $objectToArray: "$commentCounts.moderationQueue.queues", + }, + }, + }, + { + $unwind: "$moderationQueue", + }, + { + $group: { + _id: "$moderationQueue.k", + total: { + $sum: "$moderationQueue.v", + }, + }, + }, + ]); + + // Convert the cursor from the results into an array. + const queues = await queueResults.toArray(); + + // Increment the hash key values. + const pipeline = redis.pipeline(); + + // Mark the key as fresh, and update the values! + pipeline.mhincrby( + key, + ...queues.reduce((acc, queue) => [...acc, queue._id, queue.total], []) + ); + pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS); + + await pipeline.exec(); + + const queueCounts: CommentModerationCountsPerQueue = queues.reduce( + (acc, queue) => ({ + ...acc, + [queue._id]: queue.total, + }), + createEmptyCommentModerationQueueCounts().queues + ); + + return queueCounts; +} + +/** + * recalculateSharedModerationQueueTotalCounts will reset the counts stored for + * this Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the tenant ID that we are resetting the counts for + */ +export async function recalculateSharedModerationQueueTotalCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const now = new Date(); + const key = commentCountsModerationQueueTotalKey(tenantID); + const freshKey = freshenKey(key); + + // Clear the existing cached queues. + await redis.del(key, freshKey); + + // Fetch all the totals for the moderation queues. + const totalResults = await collection<{ + total: number; + }>(mongo).aggregate([ + { + $match: { tenantID, createdAt: { $lt: now } }, + }, + { + $group: { + _id: "total", + total: { + $sum: "$commentCounts.moderationQueue.total", + }, + }, + }, + ]); + + const totals = await totalResults.toArray(); + if (totals.length !== 1) { + throw new Error("total results returned incorrect"); + } + + const [{ total }] = totals; + + // Mark the key as fresh, and update the values! + const pipeline = redis.pipeline(); + + pipeline.incrby(key, total); + pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS); + + await pipeline.exec(); + + return total; +} + +/** + * recalculateSharedStatusCommentCounts will reset the counts stored for this + * Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the tenant ID that we are resetting the counts for + */ +export async function recalculateSharedStatusCommentCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const now = new Date(); + const key = commentCountsStatusKey(tenantID); + const freshKey = freshenKey(key); + + // Clear the existing cached queues. + await redis.del(key, freshKey); + + // Fetch all the comments of each status. + const statusResults = await collection<{ + _id: string; + total: number; + }>(mongo).aggregate([ + { + $match: { tenantID, createdAt: { $lt: now } }, + }, + { + $project: { + status: { + $objectToArray: "$commentCounts.status", + }, + }, + }, + { + $unwind: "$status", + }, + { + $group: { + _id: "$status.k", + total: { + $sum: "$status.v", + }, + }, + }, + ]); + + // Convert the cursor from the results into an array. + const statuses = await statusResults.toArray(); + + // Mark the key as fresh, and update the values! + const pipeline = redis.pipeline(); + + pipeline.mhincrby( + key, + ...statuses.reduce((acc, status) => [...acc, status._id, status.total], []) + ); + pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS); + + await pipeline.exec(); + + // Now, reconstruct the status counts so we can return it. + const statusCounts: CommentStatusCounts = statuses.reduce( + (acc, status) => ({ + ...acc, + [status._id]: status.total, + }), + createEmptyCommentStatusCounts() + ); + + return statusCounts; +} + +/** + * recalculateSharedActionCommentCounts will reset the counts stored for this + * Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the tenant ID that we are resetting the counts for + */ +export async function recalculateSharedActionCommentCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const now = new Date(); + const key = commentCountsActionKey(tenantID); + const freshKey = freshenKey(key); + + // Clear the existing cached queues. + await redis.del(key, freshKey); + + // Fetch all the comments of each status. + const actionResults = await collection<{ + _id: string; + total: number; + }>(mongo).aggregate([ + { + $match: { tenantID, createdAt: { $lt: now } }, + }, + { + $project: { + status: { + $objectToArray: "$commentCounts.status", + }, + }, + }, + { + $unwind: "$status", + }, + { + $group: { + _id: "$status.k", + total: { + $sum: "$status.v", + }, + }, + }, + ]); + + // Convert the cursor from the results into an array. + const actions = await actionResults.toArray(); + + // Mark the key as fresh, and update the values! + const pipeline = redis.pipeline(); + + pipeline.mhincrby( + key, + ...actions.reduce((acc, action) => [...acc, action._id, action.total], []) + ); + pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS); + + await pipeline.exec(); + + // Now, compile the action counts into EncodedActionCounts. + const actionCounts: EncodedCommentActionCounts = actions.reduce( + (acc, action) => ({ + ...acc, + [action._id]: action.total, + }), + {} + ); + + return actionCounts; +} + +/** + * recalculateSharedCommentCounts will reset the counts stored for this + * Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the tenant ID that we are resetting the counts for + */ +export async function recalculateSharedCommentCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + await Promise.all([ + recalculateSharedModerationQueueQueueCounts(mongo, redis, tenantID), + recalculateSharedModerationQueueTotalCounts(mongo, redis, tenantID), + recalculateSharedStatusCommentCounts(mongo, redis, tenantID), + recalculateSharedActionCommentCounts(mongo, redis, tenantID), + ]); +} + +/** + * stringObjectToNumber will convert an object that has string keys and string + * values into string keys and number values. + */ +function stringObjectToNumber( + input: Record, + defaultValue: number = 0 +): T { + return Object.entries(input).reduce( + (acc, [key, value]) => ({ + ...acc, + [key]: parseInt(value, 10) || defaultValue, + }), + {} + ) as T; +} + +/** + * retrieveSharedActionCommentCounts will retrieve the comment counts based on + * actions for this Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the ID of the Tenant that we are getting the shared action + * counts from + */ +export async function retrieveSharedActionCommentCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const key = commentCountsActionKey(tenantID); + const freshKey = freshenKey(key); + + // Get the values, and the freshness key. + const [[, actions], [, fresh]]: [ + [Error | undefined, Record | null], + [Error | undefined, string | null] + ] = await redis + .pipeline() + .hgetall(key) + .get(freshKey) + .exec(); + if (!fresh || !actions) { + return recalculateSharedActionCommentCounts(mongo, redis, tenantID); + } + + return stringObjectToNumber(actions); +} + +/** + * retrieveSharedStatusCommentCounts will retrieve the comment counts based on + * the status for this Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the ID of the Tenant that we are getting the shared status + * counts from + */ +export async function retrieveSharedStatusCommentCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const key = commentCountsStatusKey(tenantID); + const freshKey = freshenKey(key); + + // Get the values, and the freshness key. + const [[, statuses], [, fresh]]: [ + [Error | undefined, Record | null], + [Error | undefined, string | null] + ] = await redis + .pipeline() + .hgetall(key) + .get(freshKey) + .exec(); + if (!fresh || !statuses) { + return recalculateSharedStatusCommentCounts(mongo, redis, tenantID); + } + + return stringObjectToNumber(statuses); +} + +/** + * retrieveSharedModerationQueueTotal will retrieve the count of comments in + * this Tenant that are in need of moderation. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the ID of the Tenant that we are getting the shared + * moderation counts from + */ +export async function retrieveSharedModerationQueueTotal( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const key = commentCountsModerationQueueTotalKey(tenantID); + const freshKey = freshenKey(key); + + // Get the values, and the freshness key. + const [total, fresh]: [string | null, string | null] = await redis.mget( + key, + freshKey + ); + if (fresh === null || total === null) { + return recalculateSharedModerationQueueTotalCounts(mongo, redis, tenantID); + } + + return parseInt(total, 10) || 0; +} + +/** + * retrieveSharedModerationQueueQueuesCounts will retrieve the count of comments + * in each moderation queue for this Tenant. + * + * @param mongo mongodb database handle + * @param redis redis database handle + * @param tenantID the ID of the Tenant that we are getting the shared + * moderation queue counts from + */ +export async function retrieveSharedModerationQueueQueuesCounts( + mongo: Db, + redis: AugmentedRedis, + tenantID: string +) { + const key = commentCountsModerationQueueQueuesKey(tenantID); + const freshKey = freshenKey(key); + + // Get the values, and the freshness key. + const [[, queues], [, fresh]]: [ + [Error | undefined, Record | null], + [Error | undefined, string | null] + ] = await redis + .pipeline() + .hgetall(key) + .get(freshKey) + .exec(); + if (!fresh || !queues) { + logger.debug({ tenantID }, "comment moderation counts were not cached"); + return recalculateSharedModerationQueueQueueCounts(mongo, redis, tenantID); + } + + logger.debug({ tenantID }, "comment moderation counts were cached"); + + return stringObjectToNumber(queues); +} + +export async function updateSharedCommentCounts( + redis: AugmentedRedis, + tenantID: string, + commentCounts: StoryCounts +) { + const pipeline: AugmentedPipeline = redis.pipeline(); + + // HASH ${tenantID}:commentCounts:action + + const action = pickBy(commentCounts.action || {}, identity); + if (!isEmpty(action)) { + // Determine the arguments that we will increment. + const args = flattenDeep(Object.entries(action)); + + // Add the command to the pipeline. + pipeline.mhincrby(commentCountsActionKey(tenantID), ...args); + } + + // HASH ${tenantID}:commentCounts:status + + const status = pickBy(commentCounts.status || {}, identity); + if (!isEmpty(status)) { + // Determine the arguments that we will increment. + const args = flattenDeep(Object.entries(status)); + + // Add the command to the pipeline. + pipeline.mhincrby(commentCountsStatusKey(tenantID), ...args); + } + + // HASH ${tenantID}:commentCounts:moderationQueue:total + + const moderationQueue = commentCounts.moderationQueue || {}; + const moderationQueueTotal = + typeof moderationQueue.total === "number" ? moderationQueue.total : 0; + if (moderationQueueTotal !== 0) { + // Add the command to the pipeline. + pipeline.incrby( + commentCountsModerationQueueTotalKey(tenantID), + moderationQueueTotal + ); + } + + // HASH ${tenantID}:commentCounts:moderationQueue:queues + + const moderationQueueQueues = pickBy(moderationQueue.queues || {}, identity); + if (!isEmpty(moderationQueueQueues)) { + // Determine the arguments that we will increment. + const args = flattenDeep(Object.entries(moderationQueueQueues)); + + // Add the command to the pipeline. + pipeline.mhincrby(commentCountsModerationQueueQueuesKey(tenantID), ...args); + } + + // Execute the pipeline. + await pipeline.exec(); +} diff --git a/src/core/server/models/story.ts b/src/core/server/models/story/index.ts similarity index 54% rename from src/core/server/models/story.ts rename to src/core/server/models/story/index.ts index f947343b7..471f7c320 100644 --- a/src/core/server/models/story.ts +++ b/src/core/server/models/story/index.ts @@ -3,25 +3,20 @@ import uuid from "uuid"; import { Omit } from "talk-common/types"; import { dotize } from "talk-common/utils/dotize"; -import { - GQLCOMMENT_STATUS, - GQLStoryMetadata, -} from "talk-server/graph/tenant/schema/__generated__/types"; -import { EncodedCommentActionCounts } from "talk-server/models/action/comment"; +import { GQLStoryMetadata } from "talk-server/graph/tenant/schema/__generated__/types"; import { ModerationSettings } from "talk-server/models/settings"; import { TenantResource } from "talk-server/models/tenant"; +import { + createEmptyCommentModerationQueueCounts, + createEmptyCommentStatusCounts, + StoryCommentCounts, +} from "./counts"; -function collection(db: Db) { - return db.collection>("stories"); -} +// Export everything under counts. +export * from "./counts"; -// TODO: (wyattjoh) write a test to verify that this set of counts is always in sync with GQLCOMMENT_STATUS. -export interface CommentStatusCounts { - [GQLCOMMENT_STATUS.ACCEPTED]: number; - [GQLCOMMENT_STATUS.NONE]: number; - [GQLCOMMENT_STATUS.PREMOD]: number; - [GQLCOMMENT_STATUS.REJECTED]: number; - [GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: number; +function collection(db: Db) { + return db.collection>("stories"); } export interface Story extends TenantResource { @@ -43,15 +38,9 @@ export interface Story extends TenantResource { scrapedAt?: Date; /** - * actionCounts stores all the action counts for all Comment's on this Story. + * commentCounts stores all the comment counters. */ - commentActionCounts: EncodedCommentActionCounts; - - /** - * commentCounts stores the different counts for each comment on the Story - * according to their statuses. - */ - commentCounts: CommentStatusCounts; + commentCounts: StoryCommentCounts; /** * settings provides a point where the settings can be overridden for a @@ -91,8 +80,11 @@ export async function upsertStory( url, tenantID, createdAt: now, - commentActionCounts: {}, - commentCounts: createEmptyCommentCounts(), + commentCounts: { + action: {}, + status: createEmptyCommentStatusCounts(), + moderationQueue: createEmptyCommentModerationQueueCounts(), + }, }, }; @@ -117,79 +109,6 @@ export async function upsertStory( return result.value || null; } -/** - * updateCommentStatusCount increments the number of status counts for the - * given Story ID. - * - * @param mongo the database handle - * @param tenantID the tenant that the Story is on. - * @param id the ID of the Story. - * @param commentStatusCounts the update document that contains a positive or - * negative number of comments to increment on the given Story. - */ -export async function updateCommentStatusCount( - mongo: Db, - tenantID: string, - id: string, - commentStatusCounts: Partial -) { - const result = await collection(mongo).findOneAndUpdate( - { - id, - tenantID, - }, - // Update all the specific comment status counts that are associated with - // each of the counts. - { $inc: dotize({ commentCounts: commentStatusCounts }) }, - // False to return the updated document instead of the original - // document. - { returnOriginal: false } - ); - - return result.value || null; -} - -/** - * mergeCommentStatusCount will merge an array of commentStatusCount's into one. - */ -export function mergeCommentStatusCount( - commentStatusCounts: CommentStatusCounts[] -): CommentStatusCounts { - const statusCounts = createEmptyCommentCounts(); - for (const commentCounts of commentStatusCounts) { - for (const status in commentCounts) { - if (!commentCounts.hasOwnProperty(status)) { - continue; - } - - // Because the CommentStatusCounts are not indexable, it should be accessed - // by walking the structure. - switch (status) { - case GQLCOMMENT_STATUS.ACCEPTED: - case GQLCOMMENT_STATUS.NONE: - case GQLCOMMENT_STATUS.PREMOD: - case GQLCOMMENT_STATUS.REJECTED: - case GQLCOMMENT_STATUS.SYSTEM_WITHHELD: - statusCounts[status] += commentCounts[status]; - break; - default: - throw new Error("unrecognized status"); - } - } - } - return statusCounts; -} - -function createEmptyCommentCounts(): CommentStatusCounts { - return { - [GQLCOMMENT_STATUS.ACCEPTED]: 0, - [GQLCOMMENT_STATUS.NONE]: 0, - [GQLCOMMENT_STATUS.PREMOD]: 0, - [GQLCOMMENT_STATUS.REJECTED]: 0, - [GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: 0, - }; -} - export interface FindOrCreateStoryInput { id?: string; url?: string; @@ -240,8 +159,11 @@ export async function createStory( url, tenantID, createdAt: now, - commentActionCounts: {}, - commentCounts: createEmptyCommentCounts(), + commentCounts: { + action: {}, + moderationQueue: createEmptyCommentModerationQueueCounts(), + status: createEmptyCommentStatusCounts(), + }, }; try { @@ -346,34 +268,6 @@ export async function updateStory( } } -/** - * updateStoryActionCounts will update the given comment's action counts on - * the Story. - * - * @param mongo the database handle - * @param tenantID the id of the Tenant - * @param id the id of the Story being updated - * @param actionCounts the action counts to merge into the Story - */ -export async function updateStoryActionCounts( - mongo: Db, - tenantID: string, - id: string, - actionCounts: EncodedCommentActionCounts -) { - const result = await collection(mongo).findOneAndUpdate( - { id, tenantID }, - // Update all the specific action counts that are associated with each of - // the counts. - { $inc: dotize({ actionCounts }) }, - // False to return the updated document instead of the original - // document. - { returnOriginal: false } - ); - - return result.value || null; -} - export async function removeStory(mongo: Db, tenantID: string, id: string) { const result = await collection(mongo).findOneAndDelete({ id, @@ -398,33 +292,3 @@ export async function removeStories( }, }); } - -/** - * calculateTotalCommentCount will compute the total amount of comments left on - * an Asset by parsing the `CommentStatusCounts`. - */ -export function calculateTotalCommentCount( - commentCounts: CommentStatusCounts -): number { - let count = 0; - for (const status in commentCounts) { - if (!commentCounts.hasOwnProperty(status)) { - continue; - } - - // Because the CommentStatusCounts are not indexable, it should be accessed - // by walking the structure. - switch (status) { - case GQLCOMMENT_STATUS.ACCEPTED: - case GQLCOMMENT_STATUS.NONE: - case GQLCOMMENT_STATUS.PREMOD: - case GQLCOMMENT_STATUS.REJECTED: - case GQLCOMMENT_STATUS.SYSTEM_WITHHELD: - count += commentCounts[status]; - break; - default: - throw new Error("unrecognized status"); - } - } - return count; -} diff --git a/src/core/server/services/comments/actions.ts b/src/core/server/services/comments/actions.ts index 1884c1552..d86c74b16 100644 --- a/src/core/server/services/comments/actions.ts +++ b/src/core/server/services/comments/actions.ts @@ -4,6 +4,7 @@ import { Omit } from "talk-common/types"; import { GQLCOMMENT_FLAG_REPORTED_REASON } from "talk-server/graph/tenant/schema/__generated__/types"; import { ACTION_TYPE, + CommentAction, CreateActionInput, createActions, encodeActionCounts, @@ -18,89 +19,107 @@ import { updateCommentActionCounts, } from "talk-server/models/comment"; import { Comment } from "talk-server/models/comment"; -import { updateStoryActionCounts } from "talk-server/models/story"; +import { + updateStoryActionCounts, + updateStoryCounts, +} from "talk-server/models/story"; import { Tenant } from "talk-server/models/tenant"; import { User } from "talk-server/models/user"; -export type CreateAction = Omit & - Required>; +import { AugmentedRedis } from "../redis"; +import { calculateCountsDiff } from "./moderation/counts"; + +export type CreateAction = CreateActionInput; export async function addCommentActions( mongo: Db, tenant: Tenant, - comment: Readonly, - inputs: CreateAction[] -): Promise> { + ...inputs: CreateAction[] +) { // Create each of the actions, returning each of the action results. const results = await createActions(mongo, tenant.id, inputs); // Get the actions that were upserted, we only want to increment the action // counts of actions that were just created. - const upsertedActions = results + return results .filter(({ wasUpserted }) => wasUpserted) .map(({ action }) => action); +} - if (upsertedActions.length > 0) { - // Compute the action counts. - const actionCounts = encodeActionCounts(...upsertedActions); +export async function addCommentActionCounts( + mongo: Db, + tenant: Tenant, + oldComment: Readonly, + ...actions: CommentAction[] +) { + // Compute the action counts. + const action = encodeActionCounts(...actions); - // Grab the last revision (the most recent). - const revision = getLatestRevision(comment); + // Grab the last revision (the most recent). + const revision = getLatestRevision(oldComment); - // Update the comment action counts here. - const updatedComment = await updateCommentActionCounts( - mongo, - tenant.id, - comment.id, - revision.id, - actionCounts - ); - - // Update the Story with the updated action counts. - await updateStoryActionCounts( - mongo, - tenant.id, - comment.storyID, - actionCounts - ); - - // Check to see if there was an actual comment returned (there should - // have been, we just created it!). - if (!updatedComment) { - // TODO: (wyattjoh) return a better error. - throw new Error("could not update comment action counts"); - } - - return updatedComment; + // Update the comment action counts here. + const updatedComment = await updateCommentActionCounts( + mongo, + tenant.id, + oldComment.id, + revision.id, + action + ); + if (!updatedComment) { + // TODO: (wyattjoh) return a better error. + throw new Error("could not update comment action counts"); } - return comment; + return updatedComment; } async function addCommentAction( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, - input: CreateActionInput + input: Omit ): Promise> { - const comment = await retrieveComment(mongo, tenant.id, input.commentID); - if (!comment) { + const oldComment = await retrieveComment(mongo, tenant.id, input.commentID); + if (!oldComment) { // TODO: replace to match error returned by the models/comments.ts throw new Error("comment not found"); } - // Store the story ID on the action as a story_id. - input.storyID = comment.storyID; + // Create the action creator input. + const action: CreateAction = { + ...input, + storyID: oldComment.storyID, + }; - // We have to perform a type assertion here because for some reason, the type - // coercion is not determining that because we filled in the `storyID` - // above, that at this point, it satisfies the CreateAction type. - return addCommentActions(mongo, tenant, comment, [input as CreateAction]); + // Update the actions for the comment. + const commentActions = await addCommentActions(mongo, tenant, action); + if (commentActions.length > 0) { + // Update the comment action counts. + const updatedComment = await addCommentActionCounts( + mongo, + tenant, + oldComment, + ...commentActions + ); + + // Calculate the new story counts. + await updateStoryCounts(mongo, redis, tenant.id, updatedComment.storyID, { + action: encodeActionCounts(...commentActions), + moderationQueue: calculateCountsDiff(oldComment, updatedComment), + }); + + return updatedComment; + } + + return oldComment; } export async function removeCommentAction( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, - input: Omit + input: Omit ): Promise> { // Get the Comment that we are leaving the Action on. const comment = await retrieveComment(mongo, tenant.id, input.commentID); @@ -144,9 +163,14 @@ export async function removeCommentAction( actionCounts ); + // Flags can't be removed, an that is the only type of operation that will + // affect the moderation queue counts, so we don't have to interact with + // updating the moderation queue counts. + // Update the Story with the updated action counts. await updateStoryActionCounts( mongo, + redis, tenant.id, comment.storyID, actionCounts @@ -171,11 +195,12 @@ export type CreateCommentReaction = Pick< export async function createReaction( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: CreateCommentReaction ) { - return addCommentAction(mongo, tenant, { + return addCommentAction(mongo, redis, tenant, { actionType: ACTION_TYPE.REACTION, commentID: input.commentID, commentRevisionID: input.commentRevisionID, @@ -187,11 +212,12 @@ export type RemoveCommentReaction = Pick; export async function removeReaction( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: RemoveCommentReaction ) { - return removeCommentAction(mongo, tenant, { + return removeCommentAction(mongo, redis, tenant, { actionType: ACTION_TYPE.REACTION, commentID: input.commentID, userID: author.id, @@ -205,11 +231,12 @@ export type CreateCommentDontAgree = Pick< export async function createDontAgree( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: CreateCommentDontAgree ) { - return addCommentAction(mongo, tenant, { + return addCommentAction(mongo, redis, tenant, { actionType: ACTION_TYPE.DONT_AGREE, commentID: input.commentID, commentRevisionID: input.commentRevisionID, @@ -221,11 +248,12 @@ export type RemoveCommentDontAgree = Pick; export async function removeDontAgree( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: RemoveCommentDontAgree ) { - return removeCommentAction(mongo, tenant, { + return removeCommentAction(mongo, redis, tenant, { actionType: ACTION_TYPE.DONT_AGREE, commentID: input.commentID, userID: author.id, @@ -241,11 +269,12 @@ export type CreateCommentFlag = Pick< export async function createFlag( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: CreateCommentFlag ) { - return addCommentAction(mongo, tenant, { + return addCommentAction(mongo, redis, tenant, { actionType: ACTION_TYPE.FLAG, reason: input.reason, commentID: input.commentID, @@ -253,18 +282,3 @@ export async function createFlag( userID: author.id, }); } - -export type RemoveCommentFlag = Pick; - -export async function removeFlag( - mongo: Db, - tenant: Tenant, - author: User, - input: RemoveCommentFlag -) { - return removeCommentAction(mongo, tenant, { - actionType: ACTION_TYPE.FLAG, - commentID: input.commentID, - userID: author.id, - }); -} diff --git a/src/core/server/services/comments/index.ts b/src/core/server/services/comments/index.ts index 165e0230c..7a06a84a1 100644 --- a/src/core/server/services/comments/index.ts +++ b/src/core/server/services/comments/index.ts @@ -1,6 +1,11 @@ import { Db } from "mongodb"; import { Omit } from "talk-common/types"; +import logger from "talk-server/logger"; +import { + encodeActionCounts, + filterDuplicateActions, +} from "talk-server/models/action/comment"; import { createComment, CreateCommentInput, @@ -9,32 +14,46 @@ import { getLatestRevision, pushChildCommentIDOntoParent, retrieveComment, + validateEditable, } from "talk-server/models/comment"; import { retrieveStory, - updateCommentStatusCount, + StoryCounts, + updateStoryCounts, } from "talk-server/models/story"; import { Tenant } from "talk-server/models/tenant"; import { User } from "talk-server/models/user"; -import { - addCommentActions, - CreateAction, -} from "talk-server/services/comments/actions"; -import { processForModeration } from "talk-server/services/comments/moderation"; import { Request } from "talk-server/types/express"; +import { AugmentedRedis } from "../redis"; +import { addCommentActions, CreateAction } from "./actions"; +import { calculateCounts, calculateCountsDiff } from "./moderation/counts"; +import { processForModeration } from "./pipeline"; + export type CreateComment = Omit< CreateCommentInput, - "status" | "metadata" | "grandparentIDs" + "status" | "metadata" | "grandparentIDs" | "actionCounts" >; export async function create( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: CreateComment, req?: Request ) { + let log = logger.child({ + authorID: author.id, + tenantID: tenant.id, + storyID: input.storyID, + parentID: input.parentID, + }); + + // TODO: (wyattjoh) perform rate limiting based on the user? + + log.trace("creating comment on story"); + // Grab the story that we'll use to check moderation pieces with. const story = await retrieveStory(mongo, tenant.id, input.storyID); if (!story) { @@ -42,8 +61,6 @@ export async function create( throw new Error("story referenced does not exist"); } - // TODO: (wyattjoh) Check that the story was visible. - const grandparentIDs: string[] = []; if (input.parentID) { // Check to see that the reference parent ID exists. @@ -53,7 +70,7 @@ export async function create( throw new Error("parent comment referenced does not exist"); } - // TODO: (wyattjoh) Check that the parent comment was visible. + // FIXME: (wyattjoh) Check that the parent comment was visible! // Push the parent's parent id's into the comment's grandparent id's. grandparentIDs.push(...parent.grandparentIDs); @@ -61,6 +78,11 @@ export async function create( // If this parent has a parent, push it down as well. grandparentIDs.push(parent.parentID); } + + log.trace( + { grandparentIDs: grandparentIDs.length }, + "pushed grandparent id's into comment creation" + ); } // Run the comment through the moderation phases. @@ -72,32 +94,42 @@ export async function create( req, }); + // This is the first time this comment is being published.. So we need to + // ensure we don't run into any race conditions when we create the comment. + // One of the situations where we could encounter a race is when the comment + // is created, and does not have it's flag data associated with it. This would + // cause the comment to not be added to the flagged queue. If a flag is + // pending, and a user flags this comment before the next step can proceed, + // then we would end up double adding the comment to the flagged queue. + // Instead, we need to add the action metadata to the comment before we add it + // for the first time to ensure that the data is there for when the next flag + // is added, that it can already know that the comment is already in the + // queue. + let actionCounts = {}; + if (actions.length > 0) { + // Determine the unique actions, we will use this to compute the comment + // action counts. This should match what is added below. + const deDuplicatedActions = filterDuplicateActions(actions); + + // Encode the action counts. + actionCounts = encodeActionCounts(...deDuplicatedActions); + } + // Create the comment! - let comment = await createComment(mongo, tenant.id, { + const comment = await createComment(mongo, tenant.id, { ...input, status, grandparentIDs, metadata, + actionCounts, }); - if (actions.length > 0) { - // The actions coming from the moderation phases didn't know the commentID - // at the time, and we didn't want the repetitive nature of adding the - // item_type each time, so this mapping function adds them! - const inputs = actions.map( - (action): CreateAction => ({ - ...action, - commentID: comment.id, - commentRevisionID: getLatestRevision(comment!).id, + // Pull the revision out. + const revision = getLatestRevision(comment); - // Store the Story ID on the action. - storyID: story.id, - }) - ); + log = log.child({ commentID: comment.id, status, revisionID: revision.id }); - // Insert and handle creating the actions. - comment = await addCommentActions(mongo, tenant, comment, inputs); - } + log.trace("comment created"); if (input.parentID) { // Push the child's ID onto the parent. @@ -107,12 +139,44 @@ export async function create( input.parentID, comment.id ); + + log.trace("pushed child comment id onto parent"); } + if (actions.length > 0) { + // Actually add the actions to the database. This will not interact with the + // counts at all. + const upsertedActions = await addCommentActions( + mongo, + tenant, + ...actions.map( + (action): CreateAction => ({ + ...action, + commentID: comment.id, + commentRevisionID: revision.id, + + // Store the Story ID on the action. + storyID: story.id, + }) + ) + ); + + log.trace({ actions: upsertedActions.length }, "added actions to comment"); + } + + // Compile the changes we want to apply to the story counts. + const storyCounts: Required> = { + // This is a new comment, so we need to increment for this status. + status: { [status]: 1 }, + // This comment is being created, so we can compute it raw from the comment + // that we created. + moderationQueue: calculateCounts(comment), + }; + + log.trace({ storyCounts }, "updating story status counts"); + // Increment the status count for the particular status on the Story. - await updateCommentStatusCount(mongo, tenant.id, story.id, { - [status]: 1, - }); + await updateStoryCounts(mongo, redis, tenant.id, story.id, storyCounts); return comment; } @@ -124,20 +188,46 @@ export type EditComment = Omit< export async function edit( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, author: User, input: EditComment, req?: Request ) { - // Get the comment that we're editing. - const comment = await retrieveComment(mongo, tenant.id, input.id); - if (!comment) { + let log = logger.child({ commentID: input.id, tenantID: tenant.id }); + + // Get the comment that we're editing. This comment is considered stale, + // because it wasn't involved in the atomic transaction. + const originalStaleComment = await retrieveComment( + mongo, + tenant.id, + input.id + ); + if (!originalStaleComment) { // TODO: replace to match error returned by the models/comments.ts throw new Error("comment not found"); } + // The editable time is based on the current time, and the edit window + // length. By subtracting the current date from the edit window length, we + // get the maximum value for the `created_at` time that would be permitted + // for the comment edit to succeed. + const lastEditableCommentCreatedAt = new Date( + Date.now() - tenant.editCommentWindowLength + ); + + // Validate and potentially return with a more useful error. + validateEditable(originalStaleComment, { + authorID: author.id, + lastEditableCommentCreatedAt, + }); + // Grab the story that we'll use to check moderation pieces with. - const story = await retrieveStory(mongo, tenant.id, comment.storyID); + const story = await retrieveStory( + mongo, + tenant.id, + originalStaleComment.storyID + ); if (!story) { // TODO: (wyattjoh) return better error. throw new Error("story referenced does not exist"); @@ -152,54 +242,90 @@ export async function edit( req, }); - let editedComment = await editComment(mongo, tenant.id, { + let actionCounts = {}; + if (actions.length > 0) { + // Encode the new action counts that are going to be added to the new + // revision. + actionCounts = encodeActionCounts(...filterDuplicateActions(actions)); + } + + log.trace( + { predictedActionCounts: actionCounts }, + "associating action counts with comment" + ); + + // Perform the edit. + const result = await editComment(mongo, tenant.id, { id: input.id, authorID: author.id, body: input.body, status, metadata, - // The editable time is based on the current time, and the edit window - // length. By subtracting the current date from the edit window length, we - // get the maximum value for the `created_at` time that would be permitted - // for the comment edit to succeed. - lastEditableCommentCreatedAt: new Date( - Date.now() - tenant.editCommentWindowLength - ), + actionCounts, + lastEditableCommentCreatedAt, }); - if (!comment) { + if (!result) { // TODO: replace to match error returned by the models/comments.ts throw new Error("comment not found"); } - if (actions.length > 0) { - // The actions coming from the moderation phases didn't know the commentID - // at the time, and we didn't want the repetitive nature of adding the - // item_type each time, so this mapping function adds them! - const inputs = actions.map( - (action): CreateAction => ({ - ...action, - // Strict null check seems to have failed here... Null checking was done - // above where we errored if the comment was falsely. - commentID: comment!.id, - commentRevisionID: getLatestRevision(comment!).id, + // Pull the old/edited comments out of the edit result. + const { oldComment, editedComment, newRevision } = result; - // Store the Story ID on the action. - storyID: story.id, - }) + log = log.child({ revisionID: newRevision.id }); + + if (actions.length > 0) { + // Insert and handle creating the actions. + const upsertedActions = await addCommentActions( + mongo, + tenant, + ...actions.map( + (action): CreateAction => ({ + ...action, + commentID: editedComment.id, + commentRevisionID: newRevision.id, + storyID: story.id, + }) + ) ); - // Insert and handle creating the actions. - editedComment = await addCommentActions(mongo, tenant, comment, inputs); + log.trace( + { + actualActionCounts: encodeActionCounts(...upsertedActions), + actions: upsertedActions.length, + }, + "added actions to comment" + ); } - if (comment.status !== editedComment.status) { + // Compile the changes we want to apply to the story counts. + const storyCounts: Required> = { + // Status is updated below if it has been changed. + status: {}, + // Compute the changes in queue counts. This looks at the action counts that + // are encoded, as well as the comment status's. We however may have had the + // comment status when we grabbed the updated comment after changing the + // action counts, so we extract the action counts out of the edited comment + // and use the status from the moderation decision. + moderationQueue: calculateCountsDiff(oldComment, { + status, + actionCounts: editedComment.actionCounts, + }), + }; + + if (oldComment.status !== editedComment.status) { // Increment the status count for the particular status on the Story, and - // decrement the status on the comment's previous status. - await updateCommentStatusCount(mongo, tenant.id, story.id, { - [comment.status]: -1, - [editedComment.status]: 1, - }); + // decrement the status on the comment's previous status. The old comment + // status was only there before the atomic mutation. The new status is based + // on the moderation pipeline. + storyCounts.status[oldComment.status] = -1; + storyCounts.status[status] = 1; } + log.trace({ storyCounts }, "updating story status counts"); + + // Update the story counts as a result. + await updateStoryCounts(mongo, redis, tenant.id, story.id, storyCounts); + return editedComment; } diff --git a/src/core/server/services/comments/moderation/counts.spec.ts b/src/core/server/services/comments/moderation/counts.spec.ts new file mode 100644 index 000000000..4c4e2c170 --- /dev/null +++ b/src/core/server/services/comments/moderation/counts.spec.ts @@ -0,0 +1,194 @@ +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; +import { calculateCountsDiff } from "./counts"; + +it("allows transition from NONE to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from NONE to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from NONE to FLAGGED*", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: 0, + queues: { + reported: 1, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows transition from FLAGGED* to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: -1, + queues: { + reported: -1, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from FLAGGED* to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: -1, + queues: { + reported: -1, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from PREMOD to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from PREMOD to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from SYSTEM_WITHHELD to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from SYSTEM_WITHHELD to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from ACCEPTED to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows transition from REJECTED to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows no transition once a comment has been flagged more than once", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 2 } } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, + }); +}); diff --git a/src/core/server/services/comments/moderation/counts.ts b/src/core/server/services/comments/moderation/counts.ts new file mode 100644 index 000000000..26a196281 --- /dev/null +++ b/src/core/server/services/comments/moderation/counts.ts @@ -0,0 +1,58 @@ +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; +import { decodeActionCounts } from "talk-server/models/action/comment"; +import { Comment } from "talk-server/models/comment"; +import { CommentModerationQueueCounts } from "talk-server/models/story"; + +export const UNMODERATED_STATUSES = [ + GQLCOMMENT_STATUS.NONE, + GQLCOMMENT_STATUS.PREMOD, + GQLCOMMENT_STATUS.SYSTEM_WITHHELD, +]; + +export const isUnmoderated = (comment: Pick) => + UNMODERATED_STATUSES.includes(comment.status); + +export const PENDING_STATUS = [ + GQLCOMMENT_STATUS.PREMOD, + GQLCOMMENT_STATUS.SYSTEM_WITHHELD, +]; + +export const isPending = (comment: Pick) => + PENDING_STATUS.includes(comment.status); + +export const REPORTED_STATUS = [GQLCOMMENT_STATUS.NONE]; + +export const isReported = (comment: Pick) => + decodeActionCounts(comment.actionCounts).flag.total > 0 && + REPORTED_STATUS.includes(comment.status); + +export const isQueued = (comment: Pick) => + isUnmoderated(comment) || isPending(comment) || isReported(comment); + +export const calculateCounts = ( + comment: Pick +): CommentModerationQueueCounts => ({ + total: isQueued(comment) ? 1 : 0, + queues: { + reported: isReported(comment) ? 1 : 0, + pending: isPending(comment) ? 1 : 0, + unmoderated: isUnmoderated(comment) ? 1 : 0, + }, +}); + +export const calculateCountsDiff = ( + oldComment: Pick, + newComment: Pick +): CommentModerationQueueCounts => { + const oldCounts = calculateCounts(oldComment); + const newCounts = calculateCounts(newComment); + + return { + total: newCounts.total - oldCounts.total, + queues: { + reported: newCounts.queues.reported - oldCounts.queues.reported, + pending: newCounts.queues.pending - oldCounts.queues.pending, + unmoderated: newCounts.queues.unmoderated - oldCounts.queues.unmoderated, + }, + }; +}; diff --git a/src/core/server/services/comments/moderation/index.spec.ts b/src/core/server/services/comments/moderation/index.spec.ts index 3a641fc3b..4c4e2c170 100644 --- a/src/core/server/services/comments/moderation/index.spec.ts +++ b/src/core/server/services/comments/moderation/index.spec.ts @@ -1,108 +1,194 @@ -import { - GQLCOMMENT_FLAG_REASON, - GQLCOMMENT_STATUS, -} from "talk-server/graph/tenant/schema/__generated__/types"; -import { ACTION_TYPE } from "talk-server/models/action/comment"; -import { - compose, - ModerationPhaseContext, -} from "talk-server/services/comments/moderation"; +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; +import { calculateCountsDiff } from "./counts"; -describe("compose", () => { - it("handles when a phase throws an error", async () => { - const err = new Error("this is an error"); - const enhanced = compose([ - () => { - throw err; - }, - ]); - - await expect(enhanced({} as ModerationPhaseContext)).rejects.toEqual(err); - }); - - it("handles when it returns a status", async () => { - const status = GQLCOMMENT_STATUS.ACCEPTED; - const enhanced = compose([() => ({ status })]); - - await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ - status, - metadata: {}, - actions: [], - }); - }); - - it("merges the metadata", async () => { - const status = GQLCOMMENT_STATUS.ACCEPTED; - const enhanced = compose([ - () => ({ metadata: { first: true } }), - () => ({ status, metadata: { second: true } }), - () => ({ metadata: { third: true } }), - ]); - - await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ - status, - metadata: { first: true, second: true }, - actions: [], - }); - }); - - it("merges actions", async () => { - const status = GQLCOMMENT_STATUS.ACCEPTED; - - const flags = [ - { - userID: null, - actionType: ACTION_TYPE.FLAG, - reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_TOXIC, - }, - { - userID: null, - actionType: ACTION_TYPE.FLAG, - reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SPAM, - }, - ]; - - const enhanced = compose([ - () => ({ - actions: [flags[0]], - }), - () => ({ - status, - actions: [flags[1]], - }), - () => ({ - actions: [ - { - userID: null, - actionType: ACTION_TYPE.FLAG, - reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS, - }, - ], - }), - ]); - - const final = await enhanced({} as ModerationPhaseContext); - - for (const flag of flags) { - expect(final.actions).toContainEqual(flag); - } - - expect(final.actions).not.toContainEqual({ - actionType: ACTION_TYPE.FLAG, - reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS, - }); - }); - - it("handles when it does not return a status", async () => { - const enhanced = compose([ - () => ({ metadata: { first: true } }), - () => ({ metadata: { second: true } }), - ]); - - await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ - status: GQLCOMMENT_STATUS.NONE, - metadata: { first: true, second: true }, - actions: [], - }); +it("allows transition from NONE to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from NONE to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from NONE to FLAGGED*", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: 0, + queues: { + reported: 1, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows transition from FLAGGED* to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: -1, + queues: { + reported: -1, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from FLAGGED* to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: { FLAG: 1 } } + ) + ).toEqual({ + total: -1, + queues: { + reported: -1, + pending: 0, + unmoderated: -1, + }, + }); +}); + +it("allows transition from PREMOD to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from PREMOD to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from SYSTEM_WITHHELD to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from SYSTEM_WITHHELD to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: -1, + queues: { + reported: 0, + pending: -1, + unmoderated: -1, + }, + }); +}); + +it("allows transition from ACCEPTED to REJECTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows transition from REJECTED to ACCEPTED", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }, + { status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, + }); +}); + +it("allows no transition once a comment has been flagged more than once", () => { + expect( + calculateCountsDiff( + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }, + { status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 2 } } + ) + ).toEqual({ + total: 0, + queues: { + reported: 0, + pending: 0, + unmoderated: 0, + }, }); }); diff --git a/src/core/server/services/comments/moderation/index.ts b/src/core/server/services/comments/moderation/index.ts index f2b729dc4..194823092 100644 --- a/src/core/server/services/comments/moderation/index.ts +++ b/src/core/server/services/comments/moderation/index.ts @@ -1,91 +1,103 @@ -import { Omit, Promiseable } from "talk-common/types"; +import { Db } from "mongodb"; + +import { Omit } from "talk-common/types"; import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; -import { CreateActionInput } from "talk-server/models/action/comment"; -import { EditCommentInput } from "talk-server/models/comment"; -import { Story } from "talk-server/models/story"; +import logger from "talk-server/logger"; +import { + createCommentModerationAction, + CreateCommentModerationActionInput, +} from "talk-server/models/action/moderation/comment"; +import { updateCommentStatus } from "talk-server/models/comment"; +import { updateStoryCounts } from "talk-server/models/story"; import { Tenant } from "talk-server/models/tenant"; -import { User } from "talk-server/models/user"; -import { Request } from "talk-server/types/express"; +import { AugmentedRedis } from "talk-server/services/redis"; +import { calculateCountsDiff } from "./counts"; -import { moderationPhases } from "./phases"; +export type Moderate = Omit; -export type ModerationAction = Omit< - CreateActionInput, - "commentID" | "commentRevisionID" ->; +const moderate = ( + status: GQLCOMMENT_STATUS.ACCEPTED | GQLCOMMENT_STATUS.REJECTED +) => async ( + mongo: Db, + redis: AugmentedRedis, + tenant: Tenant, + input: Moderate +) => { + // TODO: wrap these operations in a transaction? -export interface PhaseResult { - actions: ModerationAction[]; - status: GQLCOMMENT_STATUS; - metadata: Record; -} + // Create the logger. + const log = logger.child({ + ...input, + tenantID: tenant.id, + newStatus: status, + }); -export interface ModerationPhaseContext { - story: Story; - tenant: Tenant; - comment: Partial; - author: User; - req?: Request; -} - -export type ModerationPhase = ( - context: ModerationPhaseContext -) => Promiseable; - -export type IntermediatePhaseResult = Partial | void; - -export type IntermediateModerationPhase = ( - context: ModerationPhaseContext -) => Promiseable; - -/** - * compose will create a moderation pipeline for which is executable with the - * passed actions. - */ -export const compose = ( - phases: IntermediateModerationPhase[] -): ModerationPhase => async context => { - const final: PhaseResult = { - status: GQLCOMMENT_STATUS.NONE, - actions: [], - metadata: {}, - }; - - // Loop over all the moderation phases and see if we've resolved the status. - for (const phase of phases) { - const result = await phase(context); - if (result) { - // If this result contained actions, then we should push it into the - // other actions. - const { actions } = result; - if (actions) { - final.actions.push(...actions); - } - - // If this result contained metadata, then we should merge it into the - // other metadata. - const { metadata } = result; - if (metadata) { - final.metadata = { - ...final.metadata, - ...metadata, - }; - } - - // If this result contained a status, then we've finished resolving - // phases! - const { status } = result; - if (status) { - final.status = status; - break; - } - } + // Update the Comment's status. + const result = await updateCommentStatus( + mongo, + tenant.id, + input.commentID, + input.commentRevisionID, + status + ); + if (!result) { + // TODO: wrap in better error? + throw new Error("specified comment not found"); } - return final; + log.trace("updated comment status"); + + // Create the moderation action in the audit log. + const action = await createCommentModerationAction(mongo, tenant.id, { + ...input, + status, + }); + if (!action) { + // TODO: wrap in better error? + throw new Error("could not create moderation action"); + } + + log.trace( + { commentModerationActionID: action.id }, + "created the moderation action" + ); + + // Update the story comment counts. + const story = await updateStoryCounts( + mongo, + redis, + tenant.id, + result.comment.storyID, + { + // Update the comment counts. + status: { + [result.oldStatus]: -1, + [status]: 1, + }, + // Compute the queue difference as a result of the old status and the new + // status. + moderationQueue: calculateCountsDiff( + { + status: result.oldStatus, + actionCounts: result.comment.actionCounts, + }, + { + status, + actionCounts: result.comment.actionCounts, + } + ), + } + ); + if (!story) { + // TODO: wrap in better error? + throw new Error("specified story not found"); + } + + log.trace({ oldStatus: result.oldStatus }, "adjusted story comment counts"); + + return result.comment; }; -/** - * process the comment and return moderation details. - */ -export const processForModeration: ModerationPhase = compose(moderationPhases); +export const accept = moderate(GQLCOMMENT_STATUS.ACCEPTED); + +export const reject = moderate(GQLCOMMENT_STATUS.REJECTED); diff --git a/src/core/server/services/comments/pipeline/index.spec.ts b/src/core/server/services/comments/pipeline/index.spec.ts new file mode 100644 index 000000000..15545e3e1 --- /dev/null +++ b/src/core/server/services/comments/pipeline/index.spec.ts @@ -0,0 +1,108 @@ +import { + GQLCOMMENT_FLAG_REASON, + GQLCOMMENT_STATUS, +} from "talk-server/graph/tenant/schema/__generated__/types"; +import { ACTION_TYPE } from "talk-server/models/action/comment"; +import { + compose, + ModerationPhaseContext, +} from "talk-server/services/comments/pipeline"; + +describe("compose", () => { + it("handles when a phase throws an error", async () => { + const err = new Error("this is an error"); + const enhanced = compose([ + () => { + throw err; + }, + ]); + + await expect(enhanced({} as ModerationPhaseContext)).rejects.toEqual(err); + }); + + it("handles when it returns a status", async () => { + const status = GQLCOMMENT_STATUS.ACCEPTED; + const enhanced = compose([() => ({ status })]); + + await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ + status, + metadata: {}, + actions: [], + }); + }); + + it("merges the metadata", async () => { + const status = GQLCOMMENT_STATUS.ACCEPTED; + const enhanced = compose([ + () => ({ metadata: { first: true } }), + () => ({ status, metadata: { second: true } }), + () => ({ metadata: { third: true } }), + ]); + + await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ + status, + metadata: { first: true, second: true }, + actions: [], + }); + }); + + it("merges actions", async () => { + const status = GQLCOMMENT_STATUS.ACCEPTED; + + const flags = [ + { + userID: null, + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_TOXIC, + }, + { + userID: null, + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SPAM, + }, + ]; + + const enhanced = compose([ + () => ({ + actions: [flags[0]], + }), + () => ({ + status, + actions: [flags[1]], + }), + () => ({ + actions: [ + { + userID: null, + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS, + }, + ], + }), + ]); + + const final = await enhanced({} as ModerationPhaseContext); + + for (const flag of flags) { + expect(final.actions).toContainEqual(flag); + } + + expect(final.actions).not.toContainEqual({ + actionType: ACTION_TYPE.FLAG, + reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS, + }); + }); + + it("handles when it does not return a status", async () => { + const enhanced = compose([ + () => ({ metadata: { first: true } }), + () => ({ metadata: { second: true } }), + ]); + + await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({ + status: GQLCOMMENT_STATUS.NONE, + metadata: { first: true, second: true }, + actions: [], + }); + }); +}); diff --git a/src/core/server/services/comments/pipeline/index.ts b/src/core/server/services/comments/pipeline/index.ts new file mode 100644 index 000000000..131865b28 --- /dev/null +++ b/src/core/server/services/comments/pipeline/index.ts @@ -0,0 +1,91 @@ +import { Omit, Promiseable } from "talk-common/types"; +import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; +import { CreateActionInput } from "talk-server/models/action/comment"; +import { EditCommentInput } from "talk-server/models/comment"; +import { Story } from "talk-server/models/story"; +import { Tenant } from "talk-server/models/tenant"; +import { User } from "talk-server/models/user"; +import { Request } from "talk-server/types/express"; + +import { moderationPhases } from "./phases"; + +export type ModerationAction = Omit< + CreateActionInput, + "commentID" | "commentRevisionID" | "storyID" +>; + +export interface PhaseResult { + actions: ModerationAction[]; + status: GQLCOMMENT_STATUS; + metadata: Record; +} + +export interface ModerationPhaseContext { + story: Story; + tenant: Tenant; + comment: Partial; + author: User; + req?: Request; +} + +export type ModerationPhase = ( + context: ModerationPhaseContext +) => Promiseable; + +export type IntermediatePhaseResult = Partial | void; + +export type IntermediateModerationPhase = ( + context: ModerationPhaseContext +) => Promiseable; + +/** + * compose will create a moderation pipeline for which is executable with the + * passed actions. + */ +export const compose = ( + phases: IntermediateModerationPhase[] +): ModerationPhase => async context => { + const final: PhaseResult = { + status: GQLCOMMENT_STATUS.NONE, + actions: [], + metadata: {}, + }; + + // Loop over all the moderation phases and see if we've resolved the status. + for (const phase of phases) { + const result = await phase(context); + if (result) { + // If this result contained actions, then we should push it into the + // other actions. + const { actions } = result; + if (actions) { + final.actions.push(...actions); + } + + // If this result contained metadata, then we should merge it into the + // other metadata. + const { metadata } = result; + if (metadata) { + final.metadata = { + ...final.metadata, + ...metadata, + }; + } + + // If this result contained a status, then we've finished resolving + // phases! + const { status } = result; + if (status) { + final.status = status; + break; + } + } + } + + return final; +}; + +/** + * process the comment and return moderation details. + */ +export const processForModeration: ModerationPhase = compose(moderationPhases); diff --git a/src/core/server/services/comments/moderation/phases/commentLength.ts b/src/core/server/services/comments/pipeline/phases/commentLength.ts similarity index 96% rename from src/core/server/services/comments/moderation/phases/commentLength.ts rename to src/core/server/services/comments/pipeline/phases/commentLength.ts index 1b61990f1..d1eb4a7bc 100644 --- a/src/core/server/services/comments/moderation/phases/commentLength.ts +++ b/src/core/server/services/comments/pipeline/phases/commentLength.ts @@ -10,7 +10,7 @@ import { ModerationSettings } from "talk-server/models/settings"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; const testCharCount = ( settings: Partial, diff --git a/src/core/server/services/comments/moderation/phases/commentingDisabled.ts b/src/core/server/services/comments/pipeline/phases/commentingDisabled.ts similarity index 92% rename from src/core/server/services/comments/moderation/phases/commentingDisabled.ts rename to src/core/server/services/comments/pipeline/phases/commentingDisabled.ts index dc91f813b..c17cc4869 100644 --- a/src/core/server/services/comments/moderation/phases/commentingDisabled.ts +++ b/src/core/server/services/comments/pipeline/phases/commentingDisabled.ts @@ -2,7 +2,7 @@ import { ModerationSettings } from "talk-server/models/settings"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; const testDisabledCommenting = (settings: Partial) => settings.disableCommenting; diff --git a/src/core/server/services/comments/moderation/phases/index.ts b/src/core/server/services/comments/pipeline/phases/index.ts similarity index 96% rename from src/core/server/services/comments/moderation/phases/index.ts rename to src/core/server/services/comments/pipeline/phases/index.ts index 595436f39..8948d8210 100644 --- a/src/core/server/services/comments/moderation/phases/index.ts +++ b/src/core/server/services/comments/pipeline/phases/index.ts @@ -1,4 +1,4 @@ -import { IntermediateModerationPhase } from "talk-server/services/comments/moderation"; +import { IntermediateModerationPhase } from "talk-server/services/comments/pipeline"; import { commentingDisabled } from "./commentingDisabled"; import { commentLength } from "./commentLength"; diff --git a/src/core/server/services/comments/moderation/phases/karma.ts b/src/core/server/services/comments/pipeline/phases/karma.ts similarity index 96% rename from src/core/server/services/comments/moderation/phases/karma.ts rename to src/core/server/services/comments/pipeline/phases/karma.ts index 8a1e15734..38db01ee7 100755 --- a/src/core/server/services/comments/moderation/phases/karma.ts +++ b/src/core/server/services/comments/pipeline/phases/karma.ts @@ -6,7 +6,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; import { getCommentTrustScore, isReliableCommenter, diff --git a/src/core/server/services/comments/moderation/phases/links.ts b/src/core/server/services/comments/pipeline/phases/links.ts similarity index 96% rename from src/core/server/services/comments/moderation/phases/links.ts rename to src/core/server/services/comments/pipeline/phases/links.ts index 13486d9ee..e6ec86952 100755 --- a/src/core/server/services/comments/moderation/phases/links.ts +++ b/src/core/server/services/comments/pipeline/phases/links.ts @@ -10,7 +10,7 @@ import { ModerationSettings } from "talk-server/models/settings"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; /** * The preloaded linkify instance with common tlds. diff --git a/src/core/server/services/comments/moderation/phases/preModerate.ts b/src/core/server/services/comments/pipeline/phases/preModerate.ts similarity index 94% rename from src/core/server/services/comments/moderation/phases/preModerate.ts rename to src/core/server/services/comments/pipeline/phases/preModerate.ts index 130112aef..9da8954bd 100755 --- a/src/core/server/services/comments/moderation/phases/preModerate.ts +++ b/src/core/server/services/comments/pipeline/phases/preModerate.ts @@ -6,7 +6,7 @@ import { ModerationSettings } from "talk-server/models/settings"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; const testModerationMode = (settings: Partial) => settings.moderation === GQLMODERATION_MODE.PRE; diff --git a/src/core/server/services/comments/moderation/phases/spam.ts b/src/core/server/services/comments/pipeline/phases/spam.ts similarity index 98% rename from src/core/server/services/comments/moderation/phases/spam.ts rename to src/core/server/services/comments/pipeline/phases/spam.ts index 90aac04c1..4f38068c5 100644 --- a/src/core/server/services/comments/moderation/phases/spam.ts +++ b/src/core/server/services/comments/pipeline/phases/spam.ts @@ -9,7 +9,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; export const spam: IntermediateModerationPhase = async ({ story, diff --git a/src/core/server/services/comments/moderation/phases/staff.ts b/src/core/server/services/comments/pipeline/phases/staff.ts similarity index 90% rename from src/core/server/services/comments/moderation/phases/staff.ts rename to src/core/server/services/comments/pipeline/phases/staff.ts index 13d130df1..58c5fa73e 100755 --- a/src/core/server/services/comments/moderation/phases/staff.ts +++ b/src/core/server/services/comments/pipeline/phases/staff.ts @@ -5,7 +5,7 @@ import { import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; // If a given user is a staff member, always approve their comment. export const staff: IntermediateModerationPhase = ({ diff --git a/src/core/server/services/comments/moderation/phases/storyClosed.spec.ts b/src/core/server/services/comments/pipeline/phases/storyClosed.spec.ts similarity index 94% rename from src/core/server/services/comments/moderation/phases/storyClosed.spec.ts rename to src/core/server/services/comments/pipeline/phases/storyClosed.spec.ts index aa20ba2fc..c300b088a 100644 --- a/src/core/server/services/comments/moderation/phases/storyClosed.spec.ts +++ b/src/core/server/services/comments/pipeline/phases/storyClosed.spec.ts @@ -4,7 +4,7 @@ import { Comment } from "talk-server/models/comment"; import { Story } from "talk-server/models/story"; import { Tenant } from "talk-server/models/tenant"; import { User } from "talk-server/models/user"; -import { storyClosed } from "talk-server/services/comments/moderation/phases/storyClosed"; +import { storyClosed } from "talk-server/services/comments/pipeline/phases/storyClosed"; describe("storyClosed", () => { it("throws an error when the story is closed", () => { diff --git a/src/core/server/services/comments/moderation/phases/storyClosed.ts b/src/core/server/services/comments/pipeline/phases/storyClosed.ts similarity index 93% rename from src/core/server/services/comments/moderation/phases/storyClosed.ts rename to src/core/server/services/comments/pipeline/phases/storyClosed.ts index a51b44d2d..e031f221c 100644 --- a/src/core/server/services/comments/moderation/phases/storyClosed.ts +++ b/src/core/server/services/comments/pipeline/phases/storyClosed.ts @@ -1,7 +1,7 @@ import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; // This phase checks to see if the story being processed is closed or not. export const storyClosed: IntermediateModerationPhase = ({ diff --git a/src/core/server/services/comments/moderation/phases/toxic.ts b/src/core/server/services/comments/pipeline/phases/toxic.ts similarity index 98% rename from src/core/server/services/comments/moderation/phases/toxic.ts rename to src/core/server/services/comments/pipeline/phases/toxic.ts index 431a8d42c..0261ed699 100644 --- a/src/core/server/services/comments/moderation/phases/toxic.ts +++ b/src/core/server/services/comments/pipeline/phases/toxic.ts @@ -13,7 +13,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; +} from "talk-server/services/comments/pipeline"; export const toxic: IntermediateModerationPhase = async ({ tenant, diff --git a/src/core/server/services/comments/moderation/phases/wordList.ts b/src/core/server/services/comments/pipeline/phases/wordList.ts similarity index 95% rename from src/core/server/services/comments/moderation/phases/wordList.ts rename to src/core/server/services/comments/pipeline/phases/wordList.ts index 30ffea763..e27424f22 100755 --- a/src/core/server/services/comments/moderation/phases/wordList.ts +++ b/src/core/server/services/comments/pipeline/phases/wordList.ts @@ -6,8 +6,8 @@ import { ACTION_TYPE } from "talk-server/models/action/comment"; import { IntermediateModerationPhase, IntermediatePhaseResult, -} from "talk-server/services/comments/moderation"; -import { containsMatchingPhraseMemoized } from "talk-server/services/comments/moderation/wordList"; +} from "talk-server/services/comments/pipeline"; +import { containsMatchingPhraseMemoized } from "talk-server/services/comments/pipeline/wordList"; // This phase checks the comment against the wordList. export const wordList: IntermediateModerationPhase = ({ diff --git a/src/core/server/services/comments/moderation/wordList.spec.ts b/src/core/server/services/comments/pipeline/wordList.spec.ts similarity index 97% rename from src/core/server/services/comments/moderation/wordList.spec.ts rename to src/core/server/services/comments/pipeline/wordList.spec.ts index 5e68398b1..a2b6d54c5 100644 --- a/src/core/server/services/comments/moderation/wordList.spec.ts +++ b/src/core/server/services/comments/pipeline/wordList.spec.ts @@ -1,4 +1,4 @@ -import { containsMatchingPhrase } from "talk-server/services/comments/moderation/wordList"; +import { containsMatchingPhrase } from "talk-server/services/comments/pipeline/wordList"; const phrases = [ "cookies", diff --git a/src/core/server/services/comments/moderation/wordList.ts b/src/core/server/services/comments/pipeline/wordList.ts similarity index 93% rename from src/core/server/services/comments/moderation/wordList.ts rename to src/core/server/services/comments/pipeline/wordList.ts index f4a3d7e5d..418086054 100644 --- a/src/core/server/services/comments/moderation/wordList.ts +++ b/src/core/server/services/comments/pipeline/wordList.ts @@ -22,8 +22,7 @@ export function generateRegExp(phrases: string[]) { .join('[\\s"?!.]+') ) .join("|"); - - return new RegExp(`(^|[^\\w])(${inner})(?=[^\\w]|$)`, "iu"); + return new RegExp(`(^|[^\\w])(${inner})(?=[^\\w]|$)`, "gmiu"); } export const generateRegExpMemoized = memoize(generateRegExp); diff --git a/src/core/server/services/moderation/index.ts b/src/core/server/services/moderation/index.ts deleted file mode 100644 index 185abe372..000000000 --- a/src/core/server/services/moderation/index.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { Db } from "mongodb"; -import { Omit } from "talk-common/types"; -import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types"; -import logger from "talk-server/logger"; -import { - createCommentModerationAction, - CreateCommentModerationActionInput, -} from "talk-server/models/action/moderation/comment"; -import { updateCommentStatus } from "talk-server/models/comment"; -import { updateCommentStatusCount } from "talk-server/models/story"; -import { Tenant } from "talk-server/models/tenant"; - -export type Moderate = Omit; - -const moderate = (status: GQLCOMMENT_STATUS) => async ( - mongo: Db, - tenant: Tenant, - input: Moderate -) => { - // TODO: wrap these operations in a transaction? - - // Create the logger. - const log = logger.child({ - ...input, - tenantID: tenant.id, - newStatus: status, - }); - - // Update the Comment's status. - const result = await updateCommentStatus( - mongo, - tenant.id, - input.commentID, - input.commentRevisionID, - status - ); - if (!result) { - // TODO: wrap in better error? - throw new Error("specified comment not found"); - } - - log.trace("updated comment status"); - - // Create the moderation action in the audit log. - const action = await createCommentModerationAction(mongo, tenant.id, { - ...input, - status, - }); - if (!action) { - // TODO: wrap in better error? - throw new Error("could not create moderation action"); - } - - log.trace( - { commentModerationActionID: action.id }, - "created the moderation action" - ); - - // Update the story comment counts. - const story = await updateCommentStatusCount( - mongo, - tenant.id, - result.comment.storyID, - { - [result.oldStatus]: -1, - [status]: 1, - } - ); - if (!story) { - // TODO: wrap in better error? - throw new Error("specified story not found"); - } - - log.trace({ oldStatus: result.oldStatus }, "adjusted story comment counts"); - - return result.comment; -}; - -export const accept = moderate(GQLCOMMENT_STATUS.ACCEPTED); - -export const reject = moderate(GQLCOMMENT_STATUS.REJECTED); diff --git a/src/core/server/services/redis/index.ts b/src/core/server/services/redis/index.ts index 78d8e4d0d..8fe20a0fa 100644 --- a/src/core/server/services/redis/index.ts +++ b/src/core/server/services/redis/index.ts @@ -1,11 +1,41 @@ -import RedisClient, { Redis } from "ioredis"; +import RedisClient, { Pipeline, Redis } from "ioredis"; + import { Config } from "talk-common/config"; +import { Omit } from "talk-common/types"; + +export interface AugmentedRedisCommands { + mhincrby(key: string, ...args: any[]): Promise; +} + +export type AugmentedPipeline = Pipeline & AugmentedRedisCommands; + +export type AugmentedRedis = Omit & + AugmentedRedisCommands & { + pipeline(commands?: string[][]): AugmentedPipeline; + }; + +function configureRedisClient(redis: Redis) { + // mhincrby will increment many hash values. + redis.defineCommand("mhincrby", { + numberOfKeys: 1, + lua: ` + for i = 1, #ARGV, 2 do + redis.call('HINCRBY', KEYS[1], ARGV[i], ARGV[i + 1]) + end + `, + }); +} /** * create will connect to the Redis instance identified in the configuration. * * @param config application configuration. */ -export function createRedisClient(config: Config): Redis { - return new RedisClient(config.get("redis"), {}); +export function createRedisClient(config: Config): AugmentedRedis { + const redis = new RedisClient(config.get("redis"), {}); + + // Configure the redis client for use with the custom commands. + configureRedisClient(redis); + + return redis as AugmentedRedis; } diff --git a/src/core/server/services/stories/index.ts b/src/core/server/services/stories/index.ts index faa5e8d90..cddcd3a43 100644 --- a/src/core/server/services/stories/index.ts +++ b/src/core/server/services/stories/index.ts @@ -30,9 +30,9 @@ import { retrieveManyStories, retrieveStory, Story, - updateCommentStatusCount, updateStory, updateStoryActionCounts, + updateStoryCommentStatusCount, UpdateStoryInput, } from "talk-server/models/story"; import { Tenant } from "talk-server/models/tenant"; @@ -40,6 +40,8 @@ import Task from "talk-server/queue/Task"; import { ScraperData } from "talk-server/queue/tasks/scraper"; import { scrape } from "talk-server/services/stories/scraper"; +import { AugmentedRedis } from "../redis"; + export type FindOrCreateStory = FindOrCreateStoryInput; export async function findOrCreate( @@ -155,7 +157,7 @@ export async function remove( ); log.debug({ removedComments }, "removed comments while deleting story"); - } else if (calculateTotalCommentCount(story.commentCounts) > 0) { + } else if (calculateTotalCommentCount(story.commentCounts.status) > 0) { log.warn( "attempted to remove story that has linked comments without consent for deleting comments" ); @@ -227,6 +229,7 @@ export async function update( export async function merge( mongo: Db, + redis: AugmentedRedis, tenant: Tenant, destinationID: string, sourceIDs: string[] @@ -288,27 +291,27 @@ export async function merge( // Merge the comment and action counts for all the source stories. const [, ...sourceStories] = stories; - let destinationStory = await updateCommentStatusCount( + let destinationStory = await updateStoryCommentStatusCount( mongo, + redis, tenant.id, destinationID, mergeCommentStatusCount( // We perform the type assertion here because above, we already verified // that none of the stories are null. - (sourceStories as Story[]).map(({ commentCounts }) => commentCounts) + (sourceStories as Story[]).map(({ commentCounts: { status } }) => status) ) ); const mergedActionCounts = mergeCommentActionCounts( // We perform the type assertion here because above, we already verified // that none of the stories are null. - (sourceStories as Story[]).map( - ({ commentActionCounts }) => commentActionCounts - ) + ...(sourceStories as Story[]).map(({ commentCounts: { action } }) => action) ); if (countTotalActionCounts(mergedActionCounts) > 0) { destinationStory = await updateStoryActionCounts( mongo, + redis, tenant.id, destinationID, mergedActionCounts @@ -321,7 +324,7 @@ export async function merge( } log.debug( - { commentCounts: destinationStory.commentCounts }, + { commentCounts: destinationStory.commentCounts.status }, "updated destination story with new comment counts" );