[next] Moderation Queues (#2098)

* fix: edit comment fix for reactions

* feat: Comment Queue Counts

- Removed "remove flag"
- Rearranged moderation services
- Rearranged comment counts on stories
- Added moderation queue counts to stories
- Added comments edge
- Improved Cursor/Connection types
- Improved count updators for stories

* feat: added shared comment counts and queues

- Added new AugmentedRedis type
- Added more log calls
- Update counts in shared counter as well
- Return a Query level Moderation Queue

* fix: fixed test
This commit is contained in:
Wyatt Johnson
2018-12-06 23:37:33 +00:00
committed by Kiwi
parent 5b62082693
commit 4584c3d4fb
53 changed files with 2459 additions and 783 deletions
+2 -2
View File
@@ -12334,7 +12334,7 @@
},
"handle-thing": {
"version": "1.2.5",
"resolved": "http://registry.npmjs.org/handle-thing/-/handle-thing-1.2.5.tgz",
"resolved": "https://registry.npmjs.org/handle-thing/-/handle-thing-1.2.5.tgz",
"integrity": "sha1-/Xqtcmvxpf0W38KbL3pmAdJxOcQ=",
"dev": true
},
@@ -12803,7 +12803,7 @@
},
"http-proxy-middleware": {
"version": "0.18.0",
"resolved": "http://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-0.18.0.tgz",
"resolved": "https://registry.npmjs.org/http-proxy-middleware/-/http-proxy-middleware-0.18.0.tgz",
"integrity": "sha512-Fs25KVMPAIIcgjMZkVHJoKg9VcXcC1C8yb9JUgeDvVXY0S/zgVIhMb+qVswDIgtJe2DfckMSY2d6TuTEutlk6Q==",
"dev": true,
"requires": {
@@ -19,6 +19,9 @@ const mutation = graphql`
editComment(input: $input) {
comment {
body
revision {
id
}
editing {
edited
}
@@ -40,6 +40,9 @@ function createTestRenderer() {
editing: {
edited: true,
},
revision: {
id: stories[0].comments.edges[0].node.revision.id,
},
},
clientMutationId: "0",
})
+2 -2
View File
@@ -2,7 +2,6 @@ import cons from "consolidate";
import cors from "cors";
import { Express } from "express";
import http from "http";
import { Redis } from "ioredis";
import { Db } from "mongodb";
import nunjucks from "nunjucks";
import path from "path";
@@ -16,6 +15,7 @@ import { handleSubscriptions } from "talk-server/graph/common/subscriptions/midd
import { Schemas } from "talk-server/graph/schemas";
import { TaskQueue } from "talk-server/queue";
import { JWTSigningConfig } from "talk-server/services/jwt";
import { AugmentedRedis } from "talk-server/services/redis";
import TenantCache from "talk-server/services/tenant/cache";
import { accessLogger, errorLogger } from "./middleware/logging";
@@ -27,7 +27,7 @@ export interface AppOptions {
queue: TaskQueue;
config: Config;
mongo: Db;
redis: Redis;
redis: AugmentedRedis;
schemas: Schemas;
signingConfig: JWTSigningConfig;
tenantCache: TenantCache;
@@ -1,15 +1,15 @@
import { RequestHandler } from "express-jwt";
import { Redis } from "ioredis";
import { Db } from "mongodb";
import { Config } from "talk-common/config";
import TenantContext from "talk-server/graph/tenant/context";
import { TaskQueue } from "talk-server/queue";
import { AugmentedRedis } from "talk-server/services/redis";
import { Request } from "talk-server/types/express";
export interface TenantContextMiddlewareOptions {
mongo: Db;
redis: Redis;
redis: AugmentedRedis;
queue: TaskQueue;
config: Config;
}
@@ -4,6 +4,7 @@ import Joi from "joi";
import jwt from "jsonwebtoken";
import { Db } from "mongodb";
import passport, { Authenticator } from "passport";
import now from "performance-now";
import { Config } from "talk-common/config";
import FacebookStrategy from "talk-server/app/middleware/passport/strategies/facebook";
@@ -12,6 +13,7 @@ import { JWTStrategy } from "talk-server/app/middleware/passport/strategies/jwt"
import { createLocalStrategy } from "talk-server/app/middleware/passport/strategies/local";
import OIDCStrategy from "talk-server/app/middleware/passport/strategies/oidc";
import { validate } from "talk-server/app/request/body";
import logger from "talk-server/logger";
import { User } from "talk-server/models/user";
import {
blacklistJWT,
@@ -149,11 +151,18 @@ export const wrapAuthn = (
signingConfig: JWTSigningConfig,
name: string,
options?: any
): RequestHandler => (req: Request, res, next) =>
): RequestHandler => (req: Request, res, next) => {
const startTime = now();
authenticator.authenticate(
name,
{ ...options, session: false },
(err: Error | null, user: User | null) => {
// Compute the end time.
const responseTime = Math.round(now() - startTime);
logger.debug({ responseTime }, "user token generated");
if (err) {
return next(err);
}
@@ -165,3 +174,4 @@ export const wrapAuthn = (
handleSuccessfulLogin(user, signingConfig, req, res, next);
}
)(req, res, next);
};
@@ -1,7 +1,9 @@
import { Redis } from "ioredis";
import jwt from "jsonwebtoken";
import { Db } from "mongodb";
import now from "performance-now";
import logger from "talk-server/logger";
import { Tenant } from "talk-server/models/tenant";
import { retrieveUser } from "talk-server/models/user";
import { checkBlacklistJWT, JWTSigningConfig } from "talk-server/services/jwt";
@@ -46,12 +48,19 @@ export class JWTVerifier {
}
public async verify(tokenString: string, token: JWTToken, tenant: Tenant) {
const startTime = now();
// Verify that the token is valid. This will throw an error if it isn't.
jwt.verify(tokenString, this.signingConfig.secret, {
issuer: tenant.id,
algorithms: [this.signingConfig.algorithm],
});
// Compute the end time.
const responseTime = Math.round(now() - startTime);
logger.trace({ responseTime }, "jwt verification complete");
// Check to see if the token has been blacklisted, as these tokens can be
// revoked.
await checkBlacklistJWT(this.redis, token.jti);
+4 -4
View File
@@ -1,20 +1,20 @@
import { Redis } from "ioredis";
import { Db } from "mongodb";
import { Config } from "talk-common/config";
import CommonContext from "talk-server/graph/common/context";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
import { TaskQueue } from "talk-server/queue";
import { AugmentedRedis } from "talk-server/services/redis";
import TenantCache from "talk-server/services/tenant/cache";
import { Request } from "talk-server/types/express";
import { Config } from "talk-common/config";
import loaders from "./loaders";
import mutators from "./mutators";
export interface TenantContextOptions {
mongo: Db;
redis: Redis;
redis: AugmentedRedis;
tenant: Tenant;
tenantCache: TenantCache;
queue: TaskQueue;
@@ -28,7 +28,7 @@ export default class TenantContext extends CommonContext {
public readonly tenantCache: TenantCache;
public readonly user?: User;
public readonly mongo: Db;
public readonly redis: Redis;
public readonly redis: AugmentedRedis;
public readonly queue: TaskQueue;
public readonly loaders: ReturnType<typeof loaders>;
public readonly mutators: ReturnType<typeof mutators>;
@@ -6,11 +6,13 @@ import {
CommentToRepliesArgs,
GQLActionPresence,
GQLCOMMENT_SORT,
QueryToCommentsArgs,
StoryToCommentsArgs,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { retrieveManyUserActionPresence } from "talk-server/models/action/comment";
import {
Comment,
retrieveCommentConnection,
retrieveCommentParentsConnection,
retrieveCommentRepliesConnection,
retrieveCommentStoryConnection,
@@ -40,6 +42,13 @@ export default (ctx: Context) => ({
comment: new DataLoader((ids: string[]) =>
retrieveManyComments(ctx.mongo, ctx.tenant.id, ids)
),
forFilter: ({ first = 10, after, filter }: QueryToCommentsArgs) =>
retrieveCommentConnection(ctx.mongo, ctx.tenant.id, {
first,
after,
orderBy: GQLCOMMENT_SORT.CREATED_AT_DESC,
filter,
}).then(primeCommentsFromConnection(ctx)),
retrieveMyActionPresence: new DataLoader<string, GQLActionPresence>(
(commentIDs: string[]) =>
retrieveManyUserActionPresence(
@@ -63,7 +72,7 @@ export default (ctx: Context) => ({
first,
orderBy,
after,
}),
}).then(primeCommentsFromConnection(ctx)),
forStory: (
storyID: string,
// Apply the graph schema defaults at the loader.
@@ -1,5 +1,5 @@
import TenantContext from "talk-server/graph/tenant/context";
import { accept, reject } from "talk-server/services/moderation";
import { accept, reject } from "talk-server/services/comments/moderation";
import {
GQLAcceptCommentInput,
GQLRejectCommentInput,
@@ -7,13 +7,13 @@ import {
export const Actions = (ctx: TenantContext) => ({
acceptComment: (input: GQLAcceptCommentInput) =>
accept(ctx.mongo, ctx.tenant, {
accept(ctx.mongo, ctx.redis, ctx.tenant, {
commentID: input.commentID,
commentRevisionID: input.commentRevisionID,
moderatorID: ctx.user!.id,
}),
rejectComment: (input: GQLRejectCommentInput) =>
reject(ctx.mongo, ctx.tenant, {
reject(ctx.mongo, ctx.redis, ctx.tenant, {
commentID: input.commentID,
commentRevisionID: input.commentRevisionID,
moderatorID: ctx.user!.id,
@@ -7,7 +7,6 @@ import {
GQLCreateCommentReplyInput,
GQLEditCommentInput,
GQLRemoveCommentDontAgreeInput,
GQLRemoveCommentFlagInput,
GQLRemoveCommentReactionInput,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { create, edit } from "talk-server/services/comments";
@@ -16,7 +15,6 @@ import {
createFlag,
createReaction,
removeDontAgree,
removeFlag,
removeReaction,
} from "talk-server/services/comments/actions";
@@ -27,6 +25,7 @@ export const Comment = (ctx: TenantContext) => ({
}: GQLCreateCommentInput | GQLCreateCommentReplyInput) =>
create(
ctx.mongo,
ctx.redis,
ctx.tenant,
ctx.user!,
{ authorID: ctx.user!.id, ...comment },
@@ -35,6 +34,7 @@ export const Comment = (ctx: TenantContext) => ({
edit: ({ commentID, body }: GQLEditCommentInput) =>
edit(
ctx.mongo,
ctx.redis,
ctx.tenant,
ctx.user!,
{
@@ -47,24 +47,24 @@ export const Comment = (ctx: TenantContext) => ({
commentID,
commentRevisionID,
}: GQLCreateCommentReactionInput) =>
createReaction(ctx.mongo, ctx.tenant, ctx.user!, {
createReaction(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, {
commentID,
commentRevisionID,
}),
removeReaction: ({ commentID }: GQLRemoveCommentReactionInput) =>
removeReaction(ctx.mongo, ctx.tenant, ctx.user!, {
removeReaction(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, {
commentID,
}),
createDontAgree: ({
commentID,
commentRevisionID,
}: GQLCreateCommentDontAgreeInput) =>
createDontAgree(ctx.mongo, ctx.tenant, ctx.user!, {
createDontAgree(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, {
commentID,
commentRevisionID,
}),
removeDontAgree: ({ commentID }: GQLRemoveCommentDontAgreeInput) =>
removeDontAgree(ctx.mongo, ctx.tenant, ctx.user!, {
removeDontAgree(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, {
commentID,
}),
createFlag: ({
@@ -72,13 +72,9 @@ export const Comment = (ctx: TenantContext) => ({
commentRevisionID,
reason,
}: GQLCreateCommentFlagInput) =>
createFlag(ctx.mongo, ctx.tenant, ctx.user!, {
createFlag(ctx.mongo, ctx.redis, ctx.tenant, ctx.user!, {
commentID,
commentRevisionID,
reason,
}),
removeFlag: ({ commentID }: GQLRemoveCommentFlagInput) =>
removeFlag(ctx.mongo, ctx.tenant, ctx.user!, {
commentID,
}),
});
@@ -30,7 +30,13 @@ export const Story = (ctx: TenantContext) => ({
merge: async (
input: GQLMergeStoriesInput
): Promise<Readonly<story.Story> | null> =>
merge(ctx.mongo, ctx.tenant, input.destinationID, input.sourceIDs),
merge(
ctx.mongo,
ctx.redis,
ctx.tenant,
input.destinationID,
input.sourceIDs
),
remove: async (
input: GQLRemoveStoryInput
): Promise<Readonly<story.Story> | null> =>
@@ -0,0 +1,25 @@
import {
CommentConnectionInput,
retrieveCommentConnection,
} from "talk-server/models/comment";
import {
GQLCOMMENT_SORT,
GQLModerationQueueTypeResolver,
} from "../schema/__generated__/types";
export interface ModerationQueueInput {
connection: Partial<CommentConnectionInput>;
count: number;
}
export const ModerationQueue: GQLModerationQueueTypeResolver<
ModerationQueueInput
> = {
comments: ({ connection }, { first = 10, after }, { mongo, tenant }) =>
retrieveCommentConnection(mongo, tenant.id, {
...connection,
first,
after,
orderBy: GQLCOMMENT_SORT.CREATED_AT_DESC,
}),
};
@@ -0,0 +1,54 @@
import { CommentConnectionInput } from "talk-server/models/comment";
import { FilterQuery } from "talk-server/models/query";
import { CommentModerationCountsPerQueue } from "talk-server/models/story";
import {
PENDING_STATUS,
REPORTED_STATUS,
UNMODERATED_STATUSES,
} from "talk-server/services/comments/moderation/counts";
import { GQLModerationQueuesTypeResolver } from "../schema/__generated__/types";
import { ModerationQueueInput } from "./ModerationQueue";
export interface ModerationQueuesInput {
connection: Partial<CommentConnectionInput>;
counts: CommentModerationCountsPerQueue;
}
const mergeModerationInputFilters = (
filter: FilterQuery<Comment>,
selector: keyof CommentModerationCountsPerQueue
) => (input: ModerationQueuesInput): ModerationQueueInput => ({
connection: {
...input.connection,
filter: {
...input.connection.filter,
...filter,
},
},
count: input.counts[selector],
});
export const ModerationQueues: GQLModerationQueuesTypeResolver<
ModerationQueuesInput
> = {
unmoderated: mergeModerationInputFilters(
{
status: { $in: UNMODERATED_STATUSES },
},
"unmoderated"
),
reported: mergeModerationInputFilters(
{
status: { $in: REPORTED_STATUS },
"actionCounts.FLAG": { $gt: 0 },
},
"reported"
),
pending: mergeModerationInputFilters(
{
status: { $in: PENDING_STATUS },
},
"pending"
),
};
@@ -49,10 +49,6 @@ export const Mutation: Required<GQLMutationTypeResolver<void>> = {
comment: await ctx.mutators.Comment.createFlag(input),
clientMutationId: input.clientMutationId,
}),
removeCommentFlag: async (source, { input }, ctx) => ({
comment: await ctx.mutators.Comment.removeFlag(input),
clientMutationId: input.clientMutationId,
}),
regenerateSSOKey: async (source, { input }, ctx) => ({
settings: await ctx.mutators.Settings.regenerateSSOKey(),
clientMutationId: input.clientMutationId,
@@ -1,13 +1,30 @@
import { GQLQueryTypeResolver } from "talk-server/graph/tenant/schema/__generated__/types";
import { retrieveSharedModerationQueueQueuesCounts } from "talk-server/models/story";
import { ModerationQueuesInput } from "./ModerationQueues";
export const Query: GQLQueryTypeResolver<void> = {
story: (source, args, ctx) => ctx.loaders.Stories.findOrCreate(args),
comment: (source, { id }, ctx) =>
id ? ctx.loaders.Comments.comment.load(id) : null,
comments: (source, args, ctx) => ctx.loaders.Comments.forFilter(args),
settings: (source, args, ctx) => ctx.tenant,
me: (source, args, ctx) => ctx.user,
discoverOIDCConfiguration: (source, { issuer }, ctx) =>
ctx.loaders.Auth.discoverOIDCConfiguration.load(issuer),
debugScrapeStoryMetadata: (source, { url }, ctx) =>
ctx.loaders.Stories.debugScrapeMetadata.load(url),
moderationQueues: async (
source,
args,
ctx
): Promise<ModerationQueuesInput> => ({
// We don't need to filter the connection, as this is tenant wide (tenant
// filtering is completed at the model layer).
connection: {},
counts: await retrieveSharedModerationQueueQueuesCounts(
ctx.mongo,
ctx.redis,
ctx.tenant.id
),
}),
};
@@ -3,6 +3,7 @@ import { DateTime } from "luxon";
import { GQLStoryTypeResolver } from "talk-server/graph/tenant/schema/__generated__/types";
import { decodeActionCounts } from "talk-server/models/action/comment";
import * as story from "talk-server/models/story";
import { ModerationQueuesInput } from "./ModerationQueues";
export const Story: GQLStoryTypeResolver<story.Story> = {
comments: (s, input, ctx) => ctx.loaders.Comments.forStory(s.id, input),
@@ -20,6 +21,16 @@ export const Story: GQLStoryTypeResolver<story.Story> = {
return null;
},
commentActionCounts: s => decodeActionCounts(s.commentActionCounts),
commentCounts: s => s.commentCounts,
commentActionCounts: s => decodeActionCounts(s.commentCounts.action),
commentCounts: s => s.commentCounts.status,
moderationQueues: (s): ModerationQueuesInput => ({
connection: {
filter: {
// This moderationQueues is being sourced from the Story, so require
// that all the comments for theses queues are also for this Story.
storyID: s.id,
},
},
counts: s.commentCounts.moderationQueue.queues,
}),
};
@@ -10,6 +10,8 @@ import { CommentModerationAction } from "./CommentModerationAction";
import { CommentRevision } from "./CommentRevision";
import { FacebookAuthIntegration } from "./FacebookAuthIntegration";
import { GoogleAuthIntegration } from "./GoogleAuthIntegration";
import { ModerationQueue } from "./ModerationQueue";
import { ModerationQueues } from "./ModerationQueues";
import { Mutation } from "./Mutation";
import { OIDCAuthIntegration } from "./OIDCAuthIntegration";
import { Profile } from "./Profile";
@@ -25,6 +27,8 @@ const Resolvers: GQLResolver = {
CommentRevision,
Cursor,
Mutation,
ModerationQueue,
ModerationQueues,
OIDCAuthIntegration,
FacebookAuthIntegration,
GoogleAuthIntegration,
@@ -240,6 +240,50 @@ type WordList {
suspect: [String!]!
}
################################################################################
## Moderation
################################################################################
"""
ModerationQueue returns the Comments associated with a Moderation Queue.
"""
type ModerationQueue {
"""
count will return the number of Comments that are in this queue.
"""
count: Int!
"""
comments are the comments on the ModerationQueue.
"""
comments(first: Int = 10, after: Cursor): CommentsConnection!
}
"""
ModerationQueues are the list of ModerationQueue's that are supported inside
Talk that can be used to moderate Comments.
"""
type ModerationQueues {
"""
unmoderated will return a ModerationQueue for all Comments that have not been
moderated yet.
"""
unmoderated: ModerationQueue!
"""
reported will return a ModerationQueue for all Comments that have been
published, have not been moderated by a human yet, and have been reported by
a User via a flag.
"""
reported: ModerationQueue!
"""
pending will return a ModerationQueue for all Comments that were held back by
the system and require moderation in order to be published.
"""
pending: ModerationQueue!
}
################################################################################
## Auth
################################################################################
@@ -1249,7 +1293,7 @@ type PageInfo {
}
"""
CommentEdge represents a unique Comment in a CommentConnection.
CommentEdge represents a unique Comment in a CommentsConnection.
"""
type CommentEdge {
"""
@@ -1415,6 +1459,12 @@ type Story {
"""
commentActionCounts: ActionCounts! @auth(roles: [ADMIN, MODERATOR])
"""
moderationQueues returns the set of ModerationQueues that are available for
this Story.
"""
moderationQueues: ModerationQueues @auth(roles: [ADMIN, MODERATOR])
"""
closedAt is the Time that the Story is closed for commenting.
"""
@@ -1466,6 +1516,23 @@ type StoriesConnection {
pageInfo: PageInfo!
}
################################################################################
## CommentsFilterInput
################################################################################
input CommentsFilterInput {
"""
storyID when specified, will filter to show only Comment's on that Story.
"""
storyID: ID
"""
status when specified, will filter to show only Comment's with that
COMMENT_STATUS.
"""
status: COMMENT_STATUS
}
################################################################################
## Query
################################################################################
@@ -1476,6 +1543,15 @@ type Query {
"""
comment(id: ID!): Comment
"""
comments returns a filtered comments connection that can be paginated.
"""
comments(
first: Int = 10
after: Cursor
filter: CommentsFilterInput!
): CommentsConnection @auth(roles: [ADMIN, MODERATOR])
"""
story is the Story specified by its ID/URL.
"""
@@ -1510,6 +1586,12 @@ type Query {
on this scrape.
"""
debugScrapeStoryMetadata(url: String!): StoryMetadata @auth(roles: [ADMIN])
"""
moderationQueues returns the set of ModerationQueues that are available for
all stories.
"""
moderationQueues: ModerationQueues @auth(roles: [ADMIN, MODERATOR])
}
################################################################################
@@ -2233,34 +2315,6 @@ type CreateCommentFlagPayload {
clientMutationId: String!
}
##################
## removeCommentFlag
##################
input RemoveCommentFlagInput {
"""
commentID is the Comment's ID that we want to remove a Flag on.
"""
commentID: ID!
"""
clientMutationId is required for Relay support.
"""
clientMutationId: String!
}
type RemoveCommentFlagPayload {
"""
comment is the Comment that the Flag was removed on.
"""
comment: Comment
"""
clientMutationId is required for Relay support.
"""
clientMutationId: String!
}
##################
## regenerateSSOKey
##################
@@ -2936,13 +2990,6 @@ type Mutation {
createCommentFlag(input: CreateCommentFlagInput!): CreateCommentFlagPayload
@auth
"""
removeCommentFlag will create a Flag authored by the current logged in User on
a given Comment.
"""
removeCommentFlag(input: RemoveCommentFlagInput!): RemoveCommentFlagPayload
@auth
"""
createStory will create the provided Story.
"""
@@ -2,8 +2,10 @@ import { GQLCOMMENT_FLAG_REASON } from "talk-server/graph/tenant/schema/__genera
import {
ACTION_TYPE,
CommentAction,
CreateActionInput,
decodeActionCounts,
encodeActionCounts,
filterDuplicateActions,
validateAction,
} from "talk-server/models/action/comment";
@@ -119,3 +121,36 @@ describe("#validateAction", () => {
}
});
});
describe("#filterDuplicateActions", () => {
it("removes duplicate action items", () => {
const actions: CreateActionInput[] = [
{
storyID: "1",
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_BANNED_WORD,
commentID: "1",
commentRevisionID: "1",
userID: null,
},
{
storyID: "1",
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_BANNED_WORD,
commentID: "1",
commentRevisionID: "1",
userID: null,
},
{
storyID: "1",
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SUSPECT_WORD,
commentID: "1",
commentRevisionID: "1",
userID: null,
},
];
expect(filterDuplicateActions(actions)).toHaveLength(2);
});
});
+15 -9
View File
@@ -1,5 +1,5 @@
import Joi from "joi";
import { camelCase, pick } from "lodash";
import { camelCase, isEqual, omit, pick, uniqWith } from "lodash";
import { Db } from "mongodb";
import uuid from "uuid";
@@ -76,11 +76,9 @@ export interface CommentAction extends TenantResource {
reason?: FLAG_REASON;
/**
* storyID represents the identifier for the item's associated item. In
* the case of a REACTION left on a Comment, this ID would be the Stories ID.
* In the case of a FLAG left on a User, this ID would be null.
* storyID represents the ID of the Story where the comment was left on.
*/
storyID?: string;
storyID: string;
/**
* userID is the ID of the User that left this Action. In the event that the
@@ -159,6 +157,12 @@ export interface CreateActionResultObject {
wasUpserted: boolean;
}
export function filterDuplicateActions<T extends {}>(actions: T[]): T[] {
return uniqWith(actions, (first, second) =>
isEqual(omit(first, "metadata"), omit(second, "metadata"))
);
}
export async function createAction(
mongo: Db,
tenantID: string,
@@ -351,7 +355,7 @@ export const ACTION_COUNT_JOIN_CHAR = "__";
* @param actions list of actions to generate the action counts from
*/
export function encodeActionCounts(
...actions: CommentAction[]
...actions: Array<Pick<CommentAction, "actionType" | "reason">>
): EncodedCommentActionCounts {
const actionCounts: EncodedCommentActionCounts = {};
@@ -394,7 +398,9 @@ export function invertEncodedActionCounts(
* encodeActionCountKeys encodes the action into string keys which represents
* the groupings as seen in `EncodedActionCounts`.
*/
function encodeActionCountKeys(action: CommentAction): string[] {
function encodeActionCountKeys(
action: Pick<CommentAction, "actionType" | "reason">
): string[] {
const keys = [action.actionType as string];
if (action.reason) {
keys.push(
@@ -495,7 +501,7 @@ function createEmptyActionCounts(): GQLActionCounts {
}
export function mergeCommentActionCounts(
actionCounts: EncodedCommentActionCounts[]
...actionCounts: EncodedCommentActionCounts[]
): EncodedCommentActionCounts {
const mergedActionCounts: EncodedCommentActionCounts = {};
@@ -504,7 +510,7 @@ export function mergeCommentActionCounts(
if (key in mergedActionCounts) {
mergedActionCounts[key] += count;
} else {
mergedActionCounts[key] += 1;
mergedActionCounts[key] = count;
}
}
}
@@ -5,7 +5,7 @@ import { Omit, Sub } from "talk-common/types";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import {
Connection,
Cursor,
ConnectionInput,
getPageInfo,
nodesToEdges,
} from "talk-server/models/connection";
@@ -107,16 +107,14 @@ export async function retrieveCommentModerationActions(
return result.toArray();
}
export interface ConnectionInput {
first: number;
after?: Cursor;
filter?: CommentModerationActionFilter;
}
export type CommentModerationActionConnectionInput = ConnectionInput<
CommentModerationAction
>;
export async function retrieveCommentModerationActionConnection(
mongo: Db,
tenantID: string,
input: ConnectionInput
input: CommentModerationActionConnectionInput
): Promise<Readonly<Connection<Readonly<CommentModerationAction>>>> {
// Create the query.
const query = new Query(collection(mongo)).where({ tenantID });
@@ -130,7 +128,7 @@ export async function retrieveCommentModerationActionConnection(
}
async function retrieveConnection(
input: ConnectionInput,
input: CommentModerationActionConnectionInput,
query: Query<CommentModerationAction>
): Promise<Readonly<Connection<Readonly<CommentModerationAction>>>> {
// Apply the cursor to the query. Currently only supporting sorting by the
+205 -112
View File
@@ -1,3 +1,4 @@
import { isEmpty, merge } from "lodash";
import { Db } from "mongodb";
import uuid from "uuid";
@@ -7,20 +8,23 @@ import {
GQLCOMMENT_SORT,
GQLCOMMENT_STATUS,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { EncodedCommentActionCounts } from "talk-server/models/action/comment";
import {
EncodedCommentActionCounts,
mergeCommentActionCounts,
} from "talk-server/models/action/comment";
import {
Connection,
createConnection,
Cursor,
getPageInfo,
nodesToEdges,
NodeToCursorTransformer,
OrderedConnectionInput,
} from "talk-server/models/connection";
import Query from "talk-server/models/query";
import { TenantResource } from "talk-server/models/tenant";
function collection(db: Db) {
return db.collection<Readonly<Comment>>("comments");
function collection(mongo: Db) {
return mongo.collection<Readonly<Comment>>("comments");
}
/**
@@ -143,23 +147,24 @@ export type CreateCommentInput = Omit<
| "actionCounts"
| "revisions"
> &
Required<Pick<Revision, "body">>;
Required<Pick<Revision, "body">> &
Partial<Pick<Comment, "actionCounts">>;
export async function createComment(
db: Db,
mongo: Db,
tenantID: string,
input: CreateCommentInput
) {
const createdAt = new Date();
// Pull out some useful properties from the input.
const { body, ...rest } = input;
const { body, actionCounts = {}, ...rest } = input;
// Generate the revision.
const revision: Revision = {
id: uuid.v4(),
body,
actionCounts: {},
actionCounts,
createdAt,
};
@@ -168,21 +173,24 @@ export async function createComment(
const defaults: Sub<Comment, CreateCommentInput> = {
id: uuid.v4(),
tenantID,
createdAt,
replyIDs: [],
replyCount: 0,
actionCounts: {},
revisions: [revision],
createdAt,
};
// Merge the defaults and the input together.
const comment: Readonly<Comment> = {
// Defaults for things that always stay the same, or are computed.
...defaults,
// Rest for things that are passed in and are not actionCounts.
...rest,
// ActionCounts because they may be passed in!
actionCounts,
};
// Insert it into the database.
await collection(db).insertOne(comment);
await collection(mongo).insertOne(comment);
return comment;
}
@@ -222,22 +230,70 @@ export type EditCommentInput = Pick<
* `editCommentWindowLength` property.
*/
lastEditableCommentCreatedAt: Date;
} & Required<Pick<Revision, "body">>;
} & Required<Pick<Revision, "body">> &
Partial<Pick<Comment, "actionCounts">>;
// Only comments with the following status's can be edited.
const EDITABLE_STATUSES = [
GQLCOMMENT_STATUS.NONE,
GQLCOMMENT_STATUS.PREMOD,
GQLCOMMENT_STATUS.ACCEPTED,
];
export function validateEditable(
comment: Comment,
{
authorID,
lastEditableCommentCreatedAt,
}: Pick<EditCommentInput, "authorID" | "lastEditableCommentCreatedAt">
) {
if (comment.authorID !== authorID) {
// TODO: (wyattjoh) return better error
throw new Error("comment author mismatch");
}
// Check to see if the comment had a status that was editable.
if (!EDITABLE_STATUSES.includes(comment.status)) {
// TODO: (wyattjoh) return better error
throw new Error("comment status is not editable");
}
// Check to see if the edit window expired.
if (comment.createdAt <= lastEditableCommentCreatedAt) {
// TODO: (wyattjoh) return better error
throw new Error("edit window expired");
}
}
export interface EditComment {
/**
* oldComment is the Comment that was previously set.
*/
oldComment: Comment;
/**
* editedComment is the Comment after the edit was performed.
*/
editedComment: Comment;
/**
* newRevision returns the new revision that was created in the Comment.
*/
newRevision: Revision;
}
/**
* editComment will edit a comment if it's within the time allotment.
*
* @param mongo MongoDB database handle
* @param tenantID ID for the Tenant where the Comment exists
* @param input input for editing the comment
*/
export async function editComment(
db: Db,
mongo: Db,
tenantID: string,
input: EditCommentInput
) {
// TODO: (wyattjoh) now that we have revisions, do we really have this restriction?
// Only comments with the following status's can be edited.
const EDITABLE_STATUSES = [
GQLCOMMENT_STATUS.NONE,
GQLCOMMENT_STATUS.PREMOD,
GQLCOMMENT_STATUS.ACCEPTED,
];
): Promise<EditComment> {
const createdAt = new Date();
const {
@@ -247,17 +303,35 @@ export async function editComment(
status,
authorID,
metadata,
actionCounts = {},
} = input;
// Generate the revision.
const revision: Revision = {
id: uuid.v4(),
body,
actionCounts: {},
actionCounts,
createdAt,
};
const result = await collection(db).findOneAndUpdate(
const update: Record<string, any> = {
$set: {
status,
// Embed all the metadata properties, this may override the existing
// metadata, but we won't replace metadata that has been recalculated.
// TODO: (wyattjoh) consider if we want to replace the metadata for edited comments instead of supplementing it
...dotize({ metadata }),
},
$push: {
revisions: revision,
},
};
if (!isEmpty(actionCounts)) {
// Action counts are being provided! Increment the base action counts too!
update.$inc = dotize({ actionCounts });
}
const result = await collection(mongo).findOneAndUpdate(
{
id,
tenantID,
@@ -270,64 +344,59 @@ export async function editComment(
$gt: lastEditableCommentCreatedAt,
},
},
update,
{
$set: {
status,
// Embed all the metadata properties, this may override the existing
// metadata, but we won't replace metadata that has been recalculated.
// TODO: (wyattjoh) consider if we want to replace the metadata for edited comments instead of supplementing it
...dotize({ metadata }),
},
$push: {
revisions: revision,
},
},
// False to return the updated document instead of the original
// document.
{ returnOriginal: false }
// True to return the original document instead of the updated document.
returnOriginal: true,
}
);
if (!result.value) {
// Try to get the comment.
const comment = await retrieveComment(db, tenantID, id);
const comment = await retrieveComment(mongo, tenantID, id);
if (!comment) {
// TODO: (wyattjoh) return better error
throw new Error("comment not found");
}
if (comment.authorID !== authorID) {
// TODO: (wyattjoh) return better error
throw new Error("comment author mismatch");
}
// Check to see if the comment had a status that was editable.
if (!EDITABLE_STATUSES.includes(comment.status)) {
// TODO: (wyattjoh) return better error
throw new Error("comment status is not editable");
}
// Check to see if the edit window expired.
if (comment.createdAt <= lastEditableCommentCreatedAt) {
// TODO: (wyattjoh) return better error
throw new Error("edit window expired");
}
// Validate and potentially return with a more useful error.
validateEditable(comment, input);
// TODO: (wyattjoh) return better error
throw new Error("comment edit failed for an unexpected reason");
}
return result.value;
// Create a new "editedComment" where the same changes were applied to it as
// we did to the MongoDB document.
const editedComment: Comment = merge({}, result.value, {
// Add in all the $set operations.
status,
metadata,
// Merge the actionCounts from the old Comment with the new actionCounts.
actionCounts: mergeCommentActionCounts(
result.value.actionCounts,
actionCounts
),
// Add in the $push operations.
revisions: [...result.value.revisions, revision],
});
return {
oldComment: result.value,
editedComment,
newRevision: revision,
};
}
export async function retrieveComment(db: Db, tenantID: string, id: string) {
return collection(db).findOne({ id, tenantID });
export async function retrieveComment(mongo: Db, tenantID: string, id: string) {
return collection(mongo).findOne({ id, tenantID });
}
export async function retrieveManyComments(
db: Db,
mongo: Db,
tenantID: string,
ids: string[]
) {
const cursor = await collection(db).find({
const cursor = await collection(mongo).find({
id: {
$in: ids,
},
@@ -339,14 +408,13 @@ export async function retrieveManyComments(
return ids.map(id => comments.find(comment => comment.id === id) || null);
}
export interface ConnectionInput {
first: number;
orderBy: GQLCOMMENT_SORT;
after?: Cursor;
}
export type CommentConnectionInput = OrderedConnectionInput<
Comment,
GQLCOMMENT_SORT
>;
function cursorGetterFactory(
input: Pick<ConnectionInput, "orderBy" | "after">
input: Pick<CommentConnectionInput, "orderBy" | "after">
): NodeToCursorTransformer<Comment> {
switch (input.orderBy) {
case GQLCOMMENT_SORT.CREATED_AT_DESC:
@@ -363,28 +431,25 @@ function cursorGetterFactory(
* retrieveRepliesConnection returns a Connection<Comment> for a given comments
* replies.
*
* @param db database connection
* @param mongo database connection
* @param parentID the parent id for the comment to retrieve
* @param input connection configuration
*/
export async function retrieveCommentRepliesConnection(
db: Db,
export const retrieveCommentRepliesConnection = (
mongo: Db,
tenantID: string,
storyID: string,
parentID: string,
input: ConnectionInput
) {
// Create the query.
const query = new Query(collection(db)).where({
tenantID,
storyID,
parentID,
input: CommentConnectionInput
) =>
retrieveCommentConnection(mongo, tenantID, {
...input,
filter: {
storyID,
parentID,
},
});
// Return a connection for the comments query.
return retrieveConnection(input, query);
}
/**
* retrieveCommentParentsConnection will return a comment connection used to
* represent the parents of a given comment.
@@ -485,54 +550,64 @@ export async function retrieveCommentParentsConnection(
* retrieveStoryConnection returns a Connection<Comment> for a given Stories
* comments.
*
* @param db database connection
* @param mongo database connection
* @param storyID the Story id for the comment to retrieve
* @param input connection configuration
*/
export async function retrieveCommentStoryConnection(
db: Db,
export const retrieveCommentStoryConnection = (
mongo: Db,
tenantID: string,
storyID: string,
input: ConnectionInput
) {
// Create the query.
const query = new Query(collection(db)).where({
tenantID,
storyID,
// Only get Comments that are top level. If the client wants to load another
// layer, they can request another nested connection.
parentID: null,
// Only get Comment's that are visible.
status: {
$in: [GQLCOMMENT_STATUS.NONE, GQLCOMMENT_STATUS.ACCEPTED],
input: CommentConnectionInput
) =>
retrieveCommentConnection(mongo, tenantID, {
...input,
filter: {
storyID,
// Only get Comments that are top level. If the client wants to load another
// layer, they can request another nested connection.
parentID: null,
// Only get Comment's that are visible.
status: {
$in: [GQLCOMMENT_STATUS.NONE, GQLCOMMENT_STATUS.ACCEPTED],
},
},
});
// Return a connection for the comments query.
return retrieveConnection(input, query);
}
/**
* retrieveCommentUserConnection returns a Connection<Comment> for a given User's
* comments.
*
* @param db database connection
* @param mongo database connection
* @param userID the User id for the comment to retrieve
* @param input connection configuration
*/
export async function retrieveCommentUserConnection(
db: Db,
export const retrieveCommentUserConnection = (
mongo: Db,
tenantID: string,
userID: string,
input: ConnectionInput
) {
// Create the query.
const query = new Query(collection(db)).where({
tenantID,
authorID: userID,
input: CommentConnectionInput
) =>
retrieveCommentConnection(mongo, tenantID, {
...input,
filter: {
authorID: userID,
},
});
// Return a connection for the comments query.
export async function retrieveCommentConnection(
mongo: Db,
tenantID: string,
input: CommentConnectionInput
): Promise<Readonly<Connection<Readonly<Comment>>>> {
// Create the query.
const query = new Query(collection(mongo)).where({ tenantID });
// If a filter is being applied, filter it as well.
if (input.filter) {
query.where(input.filter);
}
return retrieveConnection(input, query);
}
@@ -545,7 +620,7 @@ export async function retrieveCommentUserConnection(
* configuration applied
*/
async function retrieveConnection(
input: ConnectionInput,
input: CommentConnectionInput,
query: Query<Comment>
): Promise<Readonly<Connection<Readonly<Comment>>>> {
// Apply some sorting options.
@@ -562,6 +637,14 @@ async function retrieveConnection(
// Get the comments from the cursor.
const nodes = await cursor.toArray();
// Return a connection.
return convertNodesToConnection(input, nodes);
}
export function convertNodesToConnection(
input: CommentConnectionInput,
nodes: Array<Readonly<Comment>>
) {
// Convert the nodes to edges (which will include the extra edge we don't need
// if there is more results).
const edges = nodesToEdges(nodes, cursorGetterFactory(input));
@@ -583,7 +666,10 @@ async function retrieveConnection(
};
}
function applyInputToQuery(input: ConnectionInput, query: Query<Comment>) {
function applyInputToQuery(
input: CommentConnectionInput,
query: Query<Comment>
) {
switch (input.orderBy) {
case GQLCOMMENT_SORT.CREATED_AT_DESC:
query.orderBy({ createdAt: -1 });
@@ -613,7 +699,14 @@ function applyInputToQuery(input: ConnectionInput, query: Query<Comment>) {
}
export interface UpdateCommentStatus {
/**
* comment is the updated Comment with the new status associated with it.
*/
comment: Readonly<Comment>;
/**
* oldStatus is the previous status that the given Comment had.
*/
oldStatus: GQLCOMMENT_STATUS;
}
+26
View File
@@ -1,4 +1,5 @@
import { merge } from "lodash";
import { FilterQuery } from "./query";
export type Cursor = Date | number | string | null;
@@ -19,6 +20,31 @@ export interface Connection<T> {
pageInfo: PageInfo;
}
export interface ConnectionInput<T> {
/**
* first is the number of items to load for the connection. The returned
* amount of items may be less.
*/
first: number;
/**
* after is an optional cursor that can be used to paginate the result set.
*/
after?: Cursor;
/**
* filter is an optional query that can be used to constrain the result set.
*/
filter?: FilterQuery<T>;
}
export interface OrderedConnectionInput<T, U> extends ConnectionInput<T> {
/**
* orderBy allows ordering of the returned connection.
*/
orderBy: U;
}
/**
* createConnection will create a base Connection that can be used to satisfy
* the Connection<T> interface.
@@ -0,0 +1,24 @@
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { CommentModerationQueueCounts, CommentStatusCounts } from ".";
export function createEmptyCommentModerationQueueCounts(): CommentModerationQueueCounts {
return {
total: 0,
queues: {
unmoderated: 0,
reported: 0,
pending: 0,
},
};
}
export function createEmptyCommentStatusCounts(): CommentStatusCounts {
return {
[GQLCOMMENT_STATUS.ACCEPTED]: 0,
[GQLCOMMENT_STATUS.NONE]: 0,
[GQLCOMMENT_STATUS.PREMOD]: 0,
[GQLCOMMENT_STATUS.REJECTED]: 0,
[GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: 0,
};
}
@@ -0,0 +1,246 @@
export * from "./empty";
export * from "./shared";
import { Db } from "mongodb";
import { identity, isEmpty, pickBy } from "lodash";
import { DeepPartial } from "talk-common/types";
import { dotize } from "talk-common/utils/dotize";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import logger from "talk-server/logger";
import { EncodedCommentActionCounts } from "talk-server/models/action/comment";
import { AugmentedRedis } from "talk-server/services/redis";
import { retrieveStory, Story } from "..";
import { createEmptyCommentStatusCounts } from "./empty";
import { updateSharedCommentCounts } from "./shared";
/**
* collection provides a reference to the stories collection used by the
* counting system.
*/
function collection<T = Story>(db: Db) {
return db.collection<Readonly<T>>("stories");
}
// TODO: (wyattjoh) write a test to verify that this set of counts is always in sync with GQLCOMMENT_STATUS.
/**
* CommentStatusCounts stores the count of Comments that have the particular
* statuses.
*/
export interface CommentStatusCounts {
[GQLCOMMENT_STATUS.ACCEPTED]: number;
[GQLCOMMENT_STATUS.NONE]: number;
[GQLCOMMENT_STATUS.PREMOD]: number;
[GQLCOMMENT_STATUS.REJECTED]: number;
[GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: number;
}
/**
* CommentModerationCountsPerQueue stores the number of Comments that exist in
* each of the Moderation Queues.
*/
export interface CommentModerationCountsPerQueue {
/**
* unmoderated is the number of Comment's that have not been moderated. This
* includes all Comment's that are NONE, PREMOD, SYSTEM_WITHHELD.
*/
unmoderated: number;
/**
* pending is the number of Comment's that are not published and are pending
* moderation.
*/
pending: number;
/**
* reported is the number of Comment's that have not been moderated but have
* been flagged.
*/
reported: number;
}
/**
* CommentModerationQueueCounts stores the number of Comments that exist in each
* of the ModerationQueue's on this Story.
*/
export interface CommentModerationQueueCounts {
/**
* total is the number of Comment's that exist in the below moderation queues.
*/
total: number;
/**
* queues contains all the queue specific counts.
*/
queues: CommentModerationCountsPerQueue;
}
/**
* StoryCommentCounts stores all the Comment Counts that will be stored on each
* Story.
*/
export interface StoryCommentCounts {
/**
* actionCounts stores all the action counts for all Comment's on this Story.
*/
action: EncodedCommentActionCounts;
/**
* commentCounts stores the different counts for each comment on the Story
* according to their statuses.
*/
status: CommentStatusCounts;
/**
* moderationQueue stores the number of Comments that exist in
* each of the ModerationQueue's on this Story.
*/
moderationQueue: CommentModerationQueueCounts;
}
/**
* updateStoryCommentStatusCount will update a given Story's status counts.
*
* @param mongo database handle
* @param redis redis database handle
* @param tenantID ID of the Tenant where this Story is
* @param id ID of the Story where we're updating the counts
* @param status the set of counts that we will update
*/
export const updateStoryCommentStatusCount = (
mongo: Db,
redis: AugmentedRedis,
tenantID: string,
id: string,
status: Partial<CommentStatusCounts>
) => updateStoryCounts(mongo, redis, tenantID, id, { status });
/**
* updateStoryCommentModerationQueueCounts will update the moderation queue
* counts on a Story.
*
* @param mongo mongo database handle
* @param redis redis database handle
* @param tenantID ID of the Tenant where this Story is
* @param id ID of the Story where we're updating the counts
* @param moderationQueue the set of counts that we will update
*/
export const updateStoryCommentModerationQueueCounts = (
mongo: Db,
redis: AugmentedRedis,
tenantID: string,
id: string,
moderationQueue: DeepPartial<CommentModerationQueueCounts>
) => updateStoryCounts(mongo, redis, tenantID, id, { moderationQueue });
export const updateStoryActionCounts = (
mongo: Db,
redis: AugmentedRedis,
tenantID: string,
id: string,
action: EncodedCommentActionCounts
) => updateStoryCounts(mongo, redis, tenantID, id, { action });
export type StoryCounts = DeepPartial<StoryCommentCounts>;
/**
* updateStoryCounts will update the comment counts for the story indicated.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID ID of the Tenant where the Story is on
* @param id the ID of the Story that we are updating counts on
* @param commentCounts the counts that we are updating
*/
export async function updateStoryCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string,
id: string,
commentCounts: StoryCounts
) {
// Update all the specific comment moderation queue counts.
const update: DeepPartial<Story> = { commentCounts };
const $inc = pickBy(dotize(update), identity);
if (isEmpty($inc)) {
return retrieveStory(mongo, tenantID, id);
}
logger.trace({ update: { $inc } }, "incrementing story counts");
const result = await collection(mongo).findOneAndUpdate(
{ id, tenantID },
{ $inc },
// False to return the updated document instead of the original
// document.
{ returnOriginal: false }
);
// Update the shared counts.
await updateSharedCommentCounts(redis, tenantID, commentCounts);
return result.value || null;
}
/**
* mergeCommentStatusCount will merge an array of commentStatusCount's into one.
*/
export function mergeCommentStatusCount(
statusCounts: CommentStatusCounts[]
): CommentStatusCounts {
const mergedStatusCounts = createEmptyCommentStatusCounts();
for (const commentCounts of statusCounts) {
for (const status in commentCounts) {
if (!commentCounts.hasOwnProperty(status)) {
continue;
}
// Because the CommentStatusCounts are not indexable, it should be accessed
// by walking the structure.
switch (status) {
case GQLCOMMENT_STATUS.ACCEPTED:
case GQLCOMMENT_STATUS.NONE:
case GQLCOMMENT_STATUS.PREMOD:
case GQLCOMMENT_STATUS.REJECTED:
case GQLCOMMENT_STATUS.SYSTEM_WITHHELD:
mergedStatusCounts[status] += commentCounts[status];
break;
default:
throw new Error("unrecognized status");
}
}
}
return mergedStatusCounts;
}
/**
* calculateTotalCommentCount will compute the total amount of comments left on
* an Asset by parsing the `CommentStatusCounts`.
*/
export function calculateTotalCommentCount(
commentCounts: CommentStatusCounts
): number {
let count = 0;
for (const status in commentCounts) {
if (!commentCounts.hasOwnProperty(status)) {
continue;
}
// Because the CommentStatusCounts are not indexable, it should be accessed
// by walking the structure.
switch (status) {
case GQLCOMMENT_STATUS.ACCEPTED:
case GQLCOMMENT_STATUS.NONE:
case GQLCOMMENT_STATUS.PREMOD:
case GQLCOMMENT_STATUS.REJECTED:
case GQLCOMMENT_STATUS.SYSTEM_WITHHELD:
count += commentCounts[status];
break;
default:
throw new Error("unrecognized status");
}
}
return count;
}
@@ -0,0 +1,554 @@
import { flattenDeep, identity, isEmpty, pickBy } from "lodash";
import { Db } from "mongodb";
import ms from "ms";
import logger from "talk-server/logger";
import { EncodedCommentActionCounts } from "talk-server/models/action/comment";
import { AugmentedPipeline, AugmentedRedis } from "talk-server/services/redis";
import {
CommentModerationCountsPerQueue,
CommentStatusCounts,
StoryCounts,
} from ".";
import { Story } from "..";
import {
createEmptyCommentModerationQueueCounts,
createEmptyCommentStatusCounts,
} from "./empty";
/**
* COUNT_FRESHNESS_EXPIRY will expire the :fresh key for cached counts that will
* trigger a recalculation of the cached count values after the time that it
* last computed it plus the below time in seconds elapsed.
*/
const COUNT_FRESHNESS_EXPIRY_SECONDS = Math.floor(ms("24h") / 1000);
/**
* shared keys
*/
const freshenKey = (key: string) => `${key}:fresh`;
const commentCountsActionKey = (tenantID: string) =>
`${tenantID}:commentCounts:action`;
const commentCountsStatusKey = (tenantID: string) =>
`${tenantID}:commentCounts:status`;
const commentCountsModerationQueueTotalKey = (tenantID: string) =>
`${tenantID}:commentCounts:moderationQueue:total`;
const commentCountsModerationQueueQueuesKey = (tenantID: string) =>
`${tenantID}:commentCounts:moderationQueue:queues`;
/**
* collection provides a reference to the stories collection used by the
* counting system.
*/
function collection<T = Story>(db: Db) {
return db.collection<Readonly<T>>("stories");
}
/**
* recalculateSharedModerationQueueQueueCounts will reset the counts stored for
* this Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the tenant ID that we are resetting the counts for
*/
export async function recalculateSharedModerationQueueQueueCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const now = new Date();
const key = commentCountsModerationQueueQueuesKey(tenantID);
const freshKey = freshenKey(key);
// Clear the existing cached queues.
await redis.del(key, freshKey);
// Fetch all the moderation queue counts.
const queueResults = await collection<{
_id: string;
total: number;
}>(mongo).aggregate([
{
$match: { tenantID, createdAt: { $lt: now } },
},
{
$project: {
moderationQueue: {
$objectToArray: "$commentCounts.moderationQueue.queues",
},
},
},
{
$unwind: "$moderationQueue",
},
{
$group: {
_id: "$moderationQueue.k",
total: {
$sum: "$moderationQueue.v",
},
},
},
]);
// Convert the cursor from the results into an array.
const queues = await queueResults.toArray();
// Increment the hash key values.
const pipeline = redis.pipeline();
// Mark the key as fresh, and update the values!
pipeline.mhincrby(
key,
...queues.reduce((acc, queue) => [...acc, queue._id, queue.total], [])
);
pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS);
await pipeline.exec();
const queueCounts: CommentModerationCountsPerQueue = queues.reduce(
(acc, queue) => ({
...acc,
[queue._id]: queue.total,
}),
createEmptyCommentModerationQueueCounts().queues
);
return queueCounts;
}
/**
* recalculateSharedModerationQueueTotalCounts will reset the counts stored for
* this Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the tenant ID that we are resetting the counts for
*/
export async function recalculateSharedModerationQueueTotalCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const now = new Date();
const key = commentCountsModerationQueueTotalKey(tenantID);
const freshKey = freshenKey(key);
// Clear the existing cached queues.
await redis.del(key, freshKey);
// Fetch all the totals for the moderation queues.
const totalResults = await collection<{
total: number;
}>(mongo).aggregate([
{
$match: { tenantID, createdAt: { $lt: now } },
},
{
$group: {
_id: "total",
total: {
$sum: "$commentCounts.moderationQueue.total",
},
},
},
]);
const totals = await totalResults.toArray();
if (totals.length !== 1) {
throw new Error("total results returned incorrect");
}
const [{ total }] = totals;
// Mark the key as fresh, and update the values!
const pipeline = redis.pipeline();
pipeline.incrby(key, total);
pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS);
await pipeline.exec();
return total;
}
/**
* recalculateSharedStatusCommentCounts will reset the counts stored for this
* Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the tenant ID that we are resetting the counts for
*/
export async function recalculateSharedStatusCommentCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const now = new Date();
const key = commentCountsStatusKey(tenantID);
const freshKey = freshenKey(key);
// Clear the existing cached queues.
await redis.del(key, freshKey);
// Fetch all the comments of each status.
const statusResults = await collection<{
_id: string;
total: number;
}>(mongo).aggregate([
{
$match: { tenantID, createdAt: { $lt: now } },
},
{
$project: {
status: {
$objectToArray: "$commentCounts.status",
},
},
},
{
$unwind: "$status",
},
{
$group: {
_id: "$status.k",
total: {
$sum: "$status.v",
},
},
},
]);
// Convert the cursor from the results into an array.
const statuses = await statusResults.toArray();
// Mark the key as fresh, and update the values!
const pipeline = redis.pipeline();
pipeline.mhincrby(
key,
...statuses.reduce((acc, status) => [...acc, status._id, status.total], [])
);
pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS);
await pipeline.exec();
// Now, reconstruct the status counts so we can return it.
const statusCounts: CommentStatusCounts = statuses.reduce(
(acc, status) => ({
...acc,
[status._id]: status.total,
}),
createEmptyCommentStatusCounts()
);
return statusCounts;
}
/**
* recalculateSharedActionCommentCounts will reset the counts stored for this
* Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the tenant ID that we are resetting the counts for
*/
export async function recalculateSharedActionCommentCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const now = new Date();
const key = commentCountsActionKey(tenantID);
const freshKey = freshenKey(key);
// Clear the existing cached queues.
await redis.del(key, freshKey);
// Fetch all the comments of each status.
const actionResults = await collection<{
_id: string;
total: number;
}>(mongo).aggregate([
{
$match: { tenantID, createdAt: { $lt: now } },
},
{
$project: {
status: {
$objectToArray: "$commentCounts.status",
},
},
},
{
$unwind: "$status",
},
{
$group: {
_id: "$status.k",
total: {
$sum: "$status.v",
},
},
},
]);
// Convert the cursor from the results into an array.
const actions = await actionResults.toArray();
// Mark the key as fresh, and update the values!
const pipeline = redis.pipeline();
pipeline.mhincrby(
key,
...actions.reduce((acc, action) => [...acc, action._id, action.total], [])
);
pipeline.set(freshKey, now.getTime(), "EX", COUNT_FRESHNESS_EXPIRY_SECONDS);
await pipeline.exec();
// Now, compile the action counts into EncodedActionCounts.
const actionCounts: EncodedCommentActionCounts = actions.reduce(
(acc, action) => ({
...acc,
[action._id]: action.total,
}),
{}
);
return actionCounts;
}
/**
* recalculateSharedCommentCounts will reset the counts stored for this
* Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the tenant ID that we are resetting the counts for
*/
export async function recalculateSharedCommentCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
await Promise.all([
recalculateSharedModerationQueueQueueCounts(mongo, redis, tenantID),
recalculateSharedModerationQueueTotalCounts(mongo, redis, tenantID),
recalculateSharedStatusCommentCounts(mongo, redis, tenantID),
recalculateSharedActionCommentCounts(mongo, redis, tenantID),
]);
}
/**
* stringObjectToNumber will convert an object that has string keys and string
* values into string keys and number values.
*/
function stringObjectToNumber<T>(
input: Record<string, string>,
defaultValue: number = 0
): T {
return Object.entries(input).reduce(
(acc, [key, value]) => ({
...acc,
[key]: parseInt(value, 10) || defaultValue,
}),
{}
) as T;
}
/**
* retrieveSharedActionCommentCounts will retrieve the comment counts based on
* actions for this Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the ID of the Tenant that we are getting the shared action
* counts from
*/
export async function retrieveSharedActionCommentCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const key = commentCountsActionKey(tenantID);
const freshKey = freshenKey(key);
// Get the values, and the freshness key.
const [[, actions], [, fresh]]: [
[Error | undefined, Record<string, string> | null],
[Error | undefined, string | null]
] = await redis
.pipeline()
.hgetall(key)
.get(freshKey)
.exec();
if (!fresh || !actions) {
return recalculateSharedActionCommentCounts(mongo, redis, tenantID);
}
return stringObjectToNumber<EncodedCommentActionCounts>(actions);
}
/**
* retrieveSharedStatusCommentCounts will retrieve the comment counts based on
* the status for this Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the ID of the Tenant that we are getting the shared status
* counts from
*/
export async function retrieveSharedStatusCommentCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const key = commentCountsStatusKey(tenantID);
const freshKey = freshenKey(key);
// Get the values, and the freshness key.
const [[, statuses], [, fresh]]: [
[Error | undefined, Record<string, string> | null],
[Error | undefined, string | null]
] = await redis
.pipeline()
.hgetall(key)
.get(freshKey)
.exec();
if (!fresh || !statuses) {
return recalculateSharedStatusCommentCounts(mongo, redis, tenantID);
}
return stringObjectToNumber<CommentStatusCounts>(statuses);
}
/**
* retrieveSharedModerationQueueTotal will retrieve the count of comments in
* this Tenant that are in need of moderation.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the ID of the Tenant that we are getting the shared
* moderation counts from
*/
export async function retrieveSharedModerationQueueTotal(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const key = commentCountsModerationQueueTotalKey(tenantID);
const freshKey = freshenKey(key);
// Get the values, and the freshness key.
const [total, fresh]: [string | null, string | null] = await redis.mget(
key,
freshKey
);
if (fresh === null || total === null) {
return recalculateSharedModerationQueueTotalCounts(mongo, redis, tenantID);
}
return parseInt(total, 10) || 0;
}
/**
* retrieveSharedModerationQueueQueuesCounts will retrieve the count of comments
* in each moderation queue for this Tenant.
*
* @param mongo mongodb database handle
* @param redis redis database handle
* @param tenantID the ID of the Tenant that we are getting the shared
* moderation queue counts from
*/
export async function retrieveSharedModerationQueueQueuesCounts(
mongo: Db,
redis: AugmentedRedis,
tenantID: string
) {
const key = commentCountsModerationQueueQueuesKey(tenantID);
const freshKey = freshenKey(key);
// Get the values, and the freshness key.
const [[, queues], [, fresh]]: [
[Error | undefined, Record<string, string> | null],
[Error | undefined, string | null]
] = await redis
.pipeline()
.hgetall(key)
.get(freshKey)
.exec();
if (!fresh || !queues) {
logger.debug({ tenantID }, "comment moderation counts were not cached");
return recalculateSharedModerationQueueQueueCounts(mongo, redis, tenantID);
}
logger.debug({ tenantID }, "comment moderation counts were cached");
return stringObjectToNumber<CommentModerationCountsPerQueue>(queues);
}
export async function updateSharedCommentCounts(
redis: AugmentedRedis,
tenantID: string,
commentCounts: StoryCounts
) {
const pipeline: AugmentedPipeline = redis.pipeline();
// HASH ${tenantID}:commentCounts:action
const action = pickBy(commentCounts.action || {}, identity);
if (!isEmpty(action)) {
// Determine the arguments that we will increment.
const args = flattenDeep(Object.entries(action));
// Add the command to the pipeline.
pipeline.mhincrby(commentCountsActionKey(tenantID), ...args);
}
// HASH ${tenantID}:commentCounts:status
const status = pickBy(commentCounts.status || {}, identity);
if (!isEmpty(status)) {
// Determine the arguments that we will increment.
const args = flattenDeep(Object.entries(status));
// Add the command to the pipeline.
pipeline.mhincrby(commentCountsStatusKey(tenantID), ...args);
}
// HASH ${tenantID}:commentCounts:moderationQueue:total
const moderationQueue = commentCounts.moderationQueue || {};
const moderationQueueTotal =
typeof moderationQueue.total === "number" ? moderationQueue.total : 0;
if (moderationQueueTotal !== 0) {
// Add the command to the pipeline.
pipeline.incrby(
commentCountsModerationQueueTotalKey(tenantID),
moderationQueueTotal
);
}
// HASH ${tenantID}:commentCounts:moderationQueue:queues
const moderationQueueQueues = pickBy(moderationQueue.queues || {}, identity);
if (!isEmpty(moderationQueueQueues)) {
// Determine the arguments that we will increment.
const args = flattenDeep(Object.entries(moderationQueueQueues));
// Add the command to the pipeline.
pipeline.mhincrby(commentCountsModerationQueueQueuesKey(tenantID), ...args);
}
// Execute the pipeline.
await pipeline.exec();
}
@@ -3,25 +3,20 @@ import uuid from "uuid";
import { Omit } from "talk-common/types";
import { dotize } from "talk-common/utils/dotize";
import {
GQLCOMMENT_STATUS,
GQLStoryMetadata,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { EncodedCommentActionCounts } from "talk-server/models/action/comment";
import { GQLStoryMetadata } from "talk-server/graph/tenant/schema/__generated__/types";
import { ModerationSettings } from "talk-server/models/settings";
import { TenantResource } from "talk-server/models/tenant";
import {
createEmptyCommentModerationQueueCounts,
createEmptyCommentStatusCounts,
StoryCommentCounts,
} from "./counts";
function collection(db: Db) {
return db.collection<Readonly<Story>>("stories");
}
// Export everything under counts.
export * from "./counts";
// TODO: (wyattjoh) write a test to verify that this set of counts is always in sync with GQLCOMMENT_STATUS.
export interface CommentStatusCounts {
[GQLCOMMENT_STATUS.ACCEPTED]: number;
[GQLCOMMENT_STATUS.NONE]: number;
[GQLCOMMENT_STATUS.PREMOD]: number;
[GQLCOMMENT_STATUS.REJECTED]: number;
[GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: number;
function collection<T = Story>(db: Db) {
return db.collection<Readonly<T>>("stories");
}
export interface Story extends TenantResource {
@@ -43,15 +38,9 @@ export interface Story extends TenantResource {
scrapedAt?: Date;
/**
* actionCounts stores all the action counts for all Comment's on this Story.
* commentCounts stores all the comment counters.
*/
commentActionCounts: EncodedCommentActionCounts;
/**
* commentCounts stores the different counts for each comment on the Story
* according to their statuses.
*/
commentCounts: CommentStatusCounts;
commentCounts: StoryCommentCounts;
/**
* settings provides a point where the settings can be overridden for a
@@ -91,8 +80,11 @@ export async function upsertStory(
url,
tenantID,
createdAt: now,
commentActionCounts: {},
commentCounts: createEmptyCommentCounts(),
commentCounts: {
action: {},
status: createEmptyCommentStatusCounts(),
moderationQueue: createEmptyCommentModerationQueueCounts(),
},
},
};
@@ -117,79 +109,6 @@ export async function upsertStory(
return result.value || null;
}
/**
* updateCommentStatusCount increments the number of status counts for the
* given Story ID.
*
* @param mongo the database handle
* @param tenantID the tenant that the Story is on.
* @param id the ID of the Story.
* @param commentStatusCounts the update document that contains a positive or
* negative number of comments to increment on the given Story.
*/
export async function updateCommentStatusCount(
mongo: Db,
tenantID: string,
id: string,
commentStatusCounts: Partial<CommentStatusCounts>
) {
const result = await collection(mongo).findOneAndUpdate(
{
id,
tenantID,
},
// Update all the specific comment status counts that are associated with
// each of the counts.
{ $inc: dotize({ commentCounts: commentStatusCounts }) },
// False to return the updated document instead of the original
// document.
{ returnOriginal: false }
);
return result.value || null;
}
/**
* mergeCommentStatusCount will merge an array of commentStatusCount's into one.
*/
export function mergeCommentStatusCount(
commentStatusCounts: CommentStatusCounts[]
): CommentStatusCounts {
const statusCounts = createEmptyCommentCounts();
for (const commentCounts of commentStatusCounts) {
for (const status in commentCounts) {
if (!commentCounts.hasOwnProperty(status)) {
continue;
}
// Because the CommentStatusCounts are not indexable, it should be accessed
// by walking the structure.
switch (status) {
case GQLCOMMENT_STATUS.ACCEPTED:
case GQLCOMMENT_STATUS.NONE:
case GQLCOMMENT_STATUS.PREMOD:
case GQLCOMMENT_STATUS.REJECTED:
case GQLCOMMENT_STATUS.SYSTEM_WITHHELD:
statusCounts[status] += commentCounts[status];
break;
default:
throw new Error("unrecognized status");
}
}
}
return statusCounts;
}
function createEmptyCommentCounts(): CommentStatusCounts {
return {
[GQLCOMMENT_STATUS.ACCEPTED]: 0,
[GQLCOMMENT_STATUS.NONE]: 0,
[GQLCOMMENT_STATUS.PREMOD]: 0,
[GQLCOMMENT_STATUS.REJECTED]: 0,
[GQLCOMMENT_STATUS.SYSTEM_WITHHELD]: 0,
};
}
export interface FindOrCreateStoryInput {
id?: string;
url?: string;
@@ -240,8 +159,11 @@ export async function createStory(
url,
tenantID,
createdAt: now,
commentActionCounts: {},
commentCounts: createEmptyCommentCounts(),
commentCounts: {
action: {},
moderationQueue: createEmptyCommentModerationQueueCounts(),
status: createEmptyCommentStatusCounts(),
},
};
try {
@@ -346,34 +268,6 @@ export async function updateStory(
}
}
/**
* updateStoryActionCounts will update the given comment's action counts on
* the Story.
*
* @param mongo the database handle
* @param tenantID the id of the Tenant
* @param id the id of the Story being updated
* @param actionCounts the action counts to merge into the Story
*/
export async function updateStoryActionCounts(
mongo: Db,
tenantID: string,
id: string,
actionCounts: EncodedCommentActionCounts
) {
const result = await collection(mongo).findOneAndUpdate(
{ id, tenantID },
// Update all the specific action counts that are associated with each of
// the counts.
{ $inc: dotize({ actionCounts }) },
// False to return the updated document instead of the original
// document.
{ returnOriginal: false }
);
return result.value || null;
}
export async function removeStory(mongo: Db, tenantID: string, id: string) {
const result = await collection(mongo).findOneAndDelete({
id,
@@ -398,33 +292,3 @@ export async function removeStories(
},
});
}
/**
* calculateTotalCommentCount will compute the total amount of comments left on
* an Asset by parsing the `CommentStatusCounts`.
*/
export function calculateTotalCommentCount(
commentCounts: CommentStatusCounts
): number {
let count = 0;
for (const status in commentCounts) {
if (!commentCounts.hasOwnProperty(status)) {
continue;
}
// Because the CommentStatusCounts are not indexable, it should be accessed
// by walking the structure.
switch (status) {
case GQLCOMMENT_STATUS.ACCEPTED:
case GQLCOMMENT_STATUS.NONE:
case GQLCOMMENT_STATUS.PREMOD:
case GQLCOMMENT_STATUS.REJECTED:
case GQLCOMMENT_STATUS.SYSTEM_WITHHELD:
count += commentCounts[status];
break;
default:
throw new Error("unrecognized status");
}
}
return count;
}
+82 -68
View File
@@ -4,6 +4,7 @@ import { Omit } from "talk-common/types";
import { GQLCOMMENT_FLAG_REPORTED_REASON } from "talk-server/graph/tenant/schema/__generated__/types";
import {
ACTION_TYPE,
CommentAction,
CreateActionInput,
createActions,
encodeActionCounts,
@@ -18,89 +19,107 @@ import {
updateCommentActionCounts,
} from "talk-server/models/comment";
import { Comment } from "talk-server/models/comment";
import { updateStoryActionCounts } from "talk-server/models/story";
import {
updateStoryActionCounts,
updateStoryCounts,
} from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
export type CreateAction = Omit<CreateActionInput, "storyID"> &
Required<Pick<CreateActionInput, "storyID">>;
import { AugmentedRedis } from "../redis";
import { calculateCountsDiff } from "./moderation/counts";
export type CreateAction = CreateActionInput;
export async function addCommentActions(
mongo: Db,
tenant: Tenant,
comment: Readonly<Comment>,
inputs: CreateAction[]
): Promise<Readonly<Comment>> {
...inputs: CreateAction[]
) {
// Create each of the actions, returning each of the action results.
const results = await createActions(mongo, tenant.id, inputs);
// Get the actions that were upserted, we only want to increment the action
// counts of actions that were just created.
const upsertedActions = results
return results
.filter(({ wasUpserted }) => wasUpserted)
.map(({ action }) => action);
}
if (upsertedActions.length > 0) {
// Compute the action counts.
const actionCounts = encodeActionCounts(...upsertedActions);
export async function addCommentActionCounts(
mongo: Db,
tenant: Tenant,
oldComment: Readonly<Comment>,
...actions: CommentAction[]
) {
// Compute the action counts.
const action = encodeActionCounts(...actions);
// Grab the last revision (the most recent).
const revision = getLatestRevision(comment);
// Grab the last revision (the most recent).
const revision = getLatestRevision(oldComment);
// Update the comment action counts here.
const updatedComment = await updateCommentActionCounts(
mongo,
tenant.id,
comment.id,
revision.id,
actionCounts
);
// Update the Story with the updated action counts.
await updateStoryActionCounts(
mongo,
tenant.id,
comment.storyID,
actionCounts
);
// Check to see if there was an actual comment returned (there should
// have been, we just created it!).
if (!updatedComment) {
// TODO: (wyattjoh) return a better error.
throw new Error("could not update comment action counts");
}
return updatedComment;
// Update the comment action counts here.
const updatedComment = await updateCommentActionCounts(
mongo,
tenant.id,
oldComment.id,
revision.id,
action
);
if (!updatedComment) {
// TODO: (wyattjoh) return a better error.
throw new Error("could not update comment action counts");
}
return comment;
return updatedComment;
}
async function addCommentAction(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
input: CreateActionInput
input: Omit<CreateActionInput, "storyID">
): Promise<Readonly<Comment>> {
const comment = await retrieveComment(mongo, tenant.id, input.commentID);
if (!comment) {
const oldComment = await retrieveComment(mongo, tenant.id, input.commentID);
if (!oldComment) {
// TODO: replace to match error returned by the models/comments.ts
throw new Error("comment not found");
}
// Store the story ID on the action as a story_id.
input.storyID = comment.storyID;
// Create the action creator input.
const action: CreateAction = {
...input,
storyID: oldComment.storyID,
};
// We have to perform a type assertion here because for some reason, the type
// coercion is not determining that because we filled in the `storyID`
// above, that at this point, it satisfies the CreateAction type.
return addCommentActions(mongo, tenant, comment, [input as CreateAction]);
// Update the actions for the comment.
const commentActions = await addCommentActions(mongo, tenant, action);
if (commentActions.length > 0) {
// Update the comment action counts.
const updatedComment = await addCommentActionCounts(
mongo,
tenant,
oldComment,
...commentActions
);
// Calculate the new story counts.
await updateStoryCounts(mongo, redis, tenant.id, updatedComment.storyID, {
action: encodeActionCounts(...commentActions),
moderationQueue: calculateCountsDiff(oldComment, updatedComment),
});
return updatedComment;
}
return oldComment;
}
export async function removeCommentAction(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
input: Omit<RemoveActionInput, "commentRevisionID">
input: Omit<RemoveActionInput, "commentRevisionID" | "reason">
): Promise<Readonly<Comment>> {
// Get the Comment that we are leaving the Action on.
const comment = await retrieveComment(mongo, tenant.id, input.commentID);
@@ -144,9 +163,14 @@ export async function removeCommentAction(
actionCounts
);
// Flags can't be removed, an that is the only type of operation that will
// affect the moderation queue counts, so we don't have to interact with
// updating the moderation queue counts.
// Update the Story with the updated action counts.
await updateStoryActionCounts(
mongo,
redis,
tenant.id,
comment.storyID,
actionCounts
@@ -171,11 +195,12 @@ export type CreateCommentReaction = Pick<
export async function createReaction(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: CreateCommentReaction
) {
return addCommentAction(mongo, tenant, {
return addCommentAction(mongo, redis, tenant, {
actionType: ACTION_TYPE.REACTION,
commentID: input.commentID,
commentRevisionID: input.commentRevisionID,
@@ -187,11 +212,12 @@ export type RemoveCommentReaction = Pick<RemoveActionInput, "commentID">;
export async function removeReaction(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: RemoveCommentReaction
) {
return removeCommentAction(mongo, tenant, {
return removeCommentAction(mongo, redis, tenant, {
actionType: ACTION_TYPE.REACTION,
commentID: input.commentID,
userID: author.id,
@@ -205,11 +231,12 @@ export type CreateCommentDontAgree = Pick<
export async function createDontAgree(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: CreateCommentDontAgree
) {
return addCommentAction(mongo, tenant, {
return addCommentAction(mongo, redis, tenant, {
actionType: ACTION_TYPE.DONT_AGREE,
commentID: input.commentID,
commentRevisionID: input.commentRevisionID,
@@ -221,11 +248,12 @@ export type RemoveCommentDontAgree = Pick<RemoveActionInput, "commentID">;
export async function removeDontAgree(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: RemoveCommentDontAgree
) {
return removeCommentAction(mongo, tenant, {
return removeCommentAction(mongo, redis, tenant, {
actionType: ACTION_TYPE.DONT_AGREE,
commentID: input.commentID,
userID: author.id,
@@ -241,11 +269,12 @@ export type CreateCommentFlag = Pick<
export async function createFlag(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: CreateCommentFlag
) {
return addCommentAction(mongo, tenant, {
return addCommentAction(mongo, redis, tenant, {
actionType: ACTION_TYPE.FLAG,
reason: input.reason,
commentID: input.commentID,
@@ -253,18 +282,3 @@ export async function createFlag(
userID: author.id,
});
}
export type RemoveCommentFlag = Pick<RemoveActionInput, "commentID">;
export async function removeFlag(
mongo: Db,
tenant: Tenant,
author: User,
input: RemoveCommentFlag
) {
return removeCommentAction(mongo, tenant, {
actionType: ACTION_TYPE.FLAG,
commentID: input.commentID,
userID: author.id,
});
}
+191 -65
View File
@@ -1,6 +1,11 @@
import { Db } from "mongodb";
import { Omit } from "talk-common/types";
import logger from "talk-server/logger";
import {
encodeActionCounts,
filterDuplicateActions,
} from "talk-server/models/action/comment";
import {
createComment,
CreateCommentInput,
@@ -9,32 +14,46 @@ import {
getLatestRevision,
pushChildCommentIDOntoParent,
retrieveComment,
validateEditable,
} from "talk-server/models/comment";
import {
retrieveStory,
updateCommentStatusCount,
StoryCounts,
updateStoryCounts,
} from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
import {
addCommentActions,
CreateAction,
} from "talk-server/services/comments/actions";
import { processForModeration } from "talk-server/services/comments/moderation";
import { Request } from "talk-server/types/express";
import { AugmentedRedis } from "../redis";
import { addCommentActions, CreateAction } from "./actions";
import { calculateCounts, calculateCountsDiff } from "./moderation/counts";
import { processForModeration } from "./pipeline";
export type CreateComment = Omit<
CreateCommentInput,
"status" | "metadata" | "grandparentIDs"
"status" | "metadata" | "grandparentIDs" | "actionCounts"
>;
export async function create(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: CreateComment,
req?: Request
) {
let log = logger.child({
authorID: author.id,
tenantID: tenant.id,
storyID: input.storyID,
parentID: input.parentID,
});
// TODO: (wyattjoh) perform rate limiting based on the user?
log.trace("creating comment on story");
// Grab the story that we'll use to check moderation pieces with.
const story = await retrieveStory(mongo, tenant.id, input.storyID);
if (!story) {
@@ -42,8 +61,6 @@ export async function create(
throw new Error("story referenced does not exist");
}
// TODO: (wyattjoh) Check that the story was visible.
const grandparentIDs: string[] = [];
if (input.parentID) {
// Check to see that the reference parent ID exists.
@@ -53,7 +70,7 @@ export async function create(
throw new Error("parent comment referenced does not exist");
}
// TODO: (wyattjoh) Check that the parent comment was visible.
// FIXME: (wyattjoh) Check that the parent comment was visible!
// Push the parent's parent id's into the comment's grandparent id's.
grandparentIDs.push(...parent.grandparentIDs);
@@ -61,6 +78,11 @@ export async function create(
// If this parent has a parent, push it down as well.
grandparentIDs.push(parent.parentID);
}
log.trace(
{ grandparentIDs: grandparentIDs.length },
"pushed grandparent id's into comment creation"
);
}
// Run the comment through the moderation phases.
@@ -72,32 +94,42 @@ export async function create(
req,
});
// This is the first time this comment is being published.. So we need to
// ensure we don't run into any race conditions when we create the comment.
// One of the situations where we could encounter a race is when the comment
// is created, and does not have it's flag data associated with it. This would
// cause the comment to not be added to the flagged queue. If a flag is
// pending, and a user flags this comment before the next step can proceed,
// then we would end up double adding the comment to the flagged queue.
// Instead, we need to add the action metadata to the comment before we add it
// for the first time to ensure that the data is there for when the next flag
// is added, that it can already know that the comment is already in the
// queue.
let actionCounts = {};
if (actions.length > 0) {
// Determine the unique actions, we will use this to compute the comment
// action counts. This should match what is added below.
const deDuplicatedActions = filterDuplicateActions(actions);
// Encode the action counts.
actionCounts = encodeActionCounts(...deDuplicatedActions);
}
// Create the comment!
let comment = await createComment(mongo, tenant.id, {
const comment = await createComment(mongo, tenant.id, {
...input,
status,
grandparentIDs,
metadata,
actionCounts,
});
if (actions.length > 0) {
// The actions coming from the moderation phases didn't know the commentID
// at the time, and we didn't want the repetitive nature of adding the
// item_type each time, so this mapping function adds them!
const inputs = actions.map(
(action): CreateAction => ({
...action,
commentID: comment.id,
commentRevisionID: getLatestRevision(comment!).id,
// Pull the revision out.
const revision = getLatestRevision(comment);
// Store the Story ID on the action.
storyID: story.id,
})
);
log = log.child({ commentID: comment.id, status, revisionID: revision.id });
// Insert and handle creating the actions.
comment = await addCommentActions(mongo, tenant, comment, inputs);
}
log.trace("comment created");
if (input.parentID) {
// Push the child's ID onto the parent.
@@ -107,12 +139,44 @@ export async function create(
input.parentID,
comment.id
);
log.trace("pushed child comment id onto parent");
}
if (actions.length > 0) {
// Actually add the actions to the database. This will not interact with the
// counts at all.
const upsertedActions = await addCommentActions(
mongo,
tenant,
...actions.map(
(action): CreateAction => ({
...action,
commentID: comment.id,
commentRevisionID: revision.id,
// Store the Story ID on the action.
storyID: story.id,
})
)
);
log.trace({ actions: upsertedActions.length }, "added actions to comment");
}
// Compile the changes we want to apply to the story counts.
const storyCounts: Required<Omit<StoryCounts, "action">> = {
// This is a new comment, so we need to increment for this status.
status: { [status]: 1 },
// This comment is being created, so we can compute it raw from the comment
// that we created.
moderationQueue: calculateCounts(comment),
};
log.trace({ storyCounts }, "updating story status counts");
// Increment the status count for the particular status on the Story.
await updateCommentStatusCount(mongo, tenant.id, story.id, {
[status]: 1,
});
await updateStoryCounts(mongo, redis, tenant.id, story.id, storyCounts);
return comment;
}
@@ -124,20 +188,46 @@ export type EditComment = Omit<
export async function edit(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
author: User,
input: EditComment,
req?: Request
) {
// Get the comment that we're editing.
const comment = await retrieveComment(mongo, tenant.id, input.id);
if (!comment) {
let log = logger.child({ commentID: input.id, tenantID: tenant.id });
// Get the comment that we're editing. This comment is considered stale,
// because it wasn't involved in the atomic transaction.
const originalStaleComment = await retrieveComment(
mongo,
tenant.id,
input.id
);
if (!originalStaleComment) {
// TODO: replace to match error returned by the models/comments.ts
throw new Error("comment not found");
}
// The editable time is based on the current time, and the edit window
// length. By subtracting the current date from the edit window length, we
// get the maximum value for the `created_at` time that would be permitted
// for the comment edit to succeed.
const lastEditableCommentCreatedAt = new Date(
Date.now() - tenant.editCommentWindowLength
);
// Validate and potentially return with a more useful error.
validateEditable(originalStaleComment, {
authorID: author.id,
lastEditableCommentCreatedAt,
});
// Grab the story that we'll use to check moderation pieces with.
const story = await retrieveStory(mongo, tenant.id, comment.storyID);
const story = await retrieveStory(
mongo,
tenant.id,
originalStaleComment.storyID
);
if (!story) {
// TODO: (wyattjoh) return better error.
throw new Error("story referenced does not exist");
@@ -152,54 +242,90 @@ export async function edit(
req,
});
let editedComment = await editComment(mongo, tenant.id, {
let actionCounts = {};
if (actions.length > 0) {
// Encode the new action counts that are going to be added to the new
// revision.
actionCounts = encodeActionCounts(...filterDuplicateActions(actions));
}
log.trace(
{ predictedActionCounts: actionCounts },
"associating action counts with comment"
);
// Perform the edit.
const result = await editComment(mongo, tenant.id, {
id: input.id,
authorID: author.id,
body: input.body,
status,
metadata,
// The editable time is based on the current time, and the edit window
// length. By subtracting the current date from the edit window length, we
// get the maximum value for the `created_at` time that would be permitted
// for the comment edit to succeed.
lastEditableCommentCreatedAt: new Date(
Date.now() - tenant.editCommentWindowLength
),
actionCounts,
lastEditableCommentCreatedAt,
});
if (!comment) {
if (!result) {
// TODO: replace to match error returned by the models/comments.ts
throw new Error("comment not found");
}
if (actions.length > 0) {
// The actions coming from the moderation phases didn't know the commentID
// at the time, and we didn't want the repetitive nature of adding the
// item_type each time, so this mapping function adds them!
const inputs = actions.map(
(action): CreateAction => ({
...action,
// Strict null check seems to have failed here... Null checking was done
// above where we errored if the comment was falsely.
commentID: comment!.id,
commentRevisionID: getLatestRevision(comment!).id,
// Pull the old/edited comments out of the edit result.
const { oldComment, editedComment, newRevision } = result;
// Store the Story ID on the action.
storyID: story.id,
})
log = log.child({ revisionID: newRevision.id });
if (actions.length > 0) {
// Insert and handle creating the actions.
const upsertedActions = await addCommentActions(
mongo,
tenant,
...actions.map(
(action): CreateAction => ({
...action,
commentID: editedComment.id,
commentRevisionID: newRevision.id,
storyID: story.id,
})
)
);
// Insert and handle creating the actions.
editedComment = await addCommentActions(mongo, tenant, comment, inputs);
log.trace(
{
actualActionCounts: encodeActionCounts(...upsertedActions),
actions: upsertedActions.length,
},
"added actions to comment"
);
}
if (comment.status !== editedComment.status) {
// Compile the changes we want to apply to the story counts.
const storyCounts: Required<Omit<StoryCounts, "action">> = {
// Status is updated below if it has been changed.
status: {},
// Compute the changes in queue counts. This looks at the action counts that
// are encoded, as well as the comment status's. We however may have had the
// comment status when we grabbed the updated comment after changing the
// action counts, so we extract the action counts out of the edited comment
// and use the status from the moderation decision.
moderationQueue: calculateCountsDiff(oldComment, {
status,
actionCounts: editedComment.actionCounts,
}),
};
if (oldComment.status !== editedComment.status) {
// Increment the status count for the particular status on the Story, and
// decrement the status on the comment's previous status.
await updateCommentStatusCount(mongo, tenant.id, story.id, {
[comment.status]: -1,
[editedComment.status]: 1,
});
// decrement the status on the comment's previous status. The old comment
// status was only there before the atomic mutation. The new status is based
// on the moderation pipeline.
storyCounts.status[oldComment.status] = -1;
storyCounts.status[status] = 1;
}
log.trace({ storyCounts }, "updating story status counts");
// Update the story counts as a result.
await updateStoryCounts(mongo, redis, tenant.id, story.id, storyCounts);
return editedComment;
}
@@ -0,0 +1,194 @@
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { calculateCountsDiff } from "./counts";
it("allows transition from NONE to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from NONE to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from NONE to FLAGGED*", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: 0,
queues: {
reported: 1,
pending: 0,
unmoderated: 0,
},
});
});
it("allows transition from FLAGGED* to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: -1,
queues: {
reported: -1,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from FLAGGED* to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: -1,
queues: {
reported: -1,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from PREMOD to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from PREMOD to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from SYSTEM_WITHHELD to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from SYSTEM_WITHHELD to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from ACCEPTED to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
it("allows transition from REJECTED to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
it("allows no transition once a comment has been flagged more than once", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 2 } }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
@@ -0,0 +1,58 @@
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { decodeActionCounts } from "talk-server/models/action/comment";
import { Comment } from "talk-server/models/comment";
import { CommentModerationQueueCounts } from "talk-server/models/story";
export const UNMODERATED_STATUSES = [
GQLCOMMENT_STATUS.NONE,
GQLCOMMENT_STATUS.PREMOD,
GQLCOMMENT_STATUS.SYSTEM_WITHHELD,
];
export const isUnmoderated = (comment: Pick<Comment, "status">) =>
UNMODERATED_STATUSES.includes(comment.status);
export const PENDING_STATUS = [
GQLCOMMENT_STATUS.PREMOD,
GQLCOMMENT_STATUS.SYSTEM_WITHHELD,
];
export const isPending = (comment: Pick<Comment, "status">) =>
PENDING_STATUS.includes(comment.status);
export const REPORTED_STATUS = [GQLCOMMENT_STATUS.NONE];
export const isReported = (comment: Pick<Comment, "status" | "actionCounts">) =>
decodeActionCounts(comment.actionCounts).flag.total > 0 &&
REPORTED_STATUS.includes(comment.status);
export const isQueued = (comment: Pick<Comment, "status" | "actionCounts">) =>
isUnmoderated(comment) || isPending(comment) || isReported(comment);
export const calculateCounts = (
comment: Pick<Comment, "status" | "actionCounts">
): CommentModerationQueueCounts => ({
total: isQueued(comment) ? 1 : 0,
queues: {
reported: isReported(comment) ? 1 : 0,
pending: isPending(comment) ? 1 : 0,
unmoderated: isUnmoderated(comment) ? 1 : 0,
},
});
export const calculateCountsDiff = (
oldComment: Pick<Comment, "status" | "actionCounts">,
newComment: Pick<Comment, "status" | "actionCounts">
): CommentModerationQueueCounts => {
const oldCounts = calculateCounts(oldComment);
const newCounts = calculateCounts(newComment);
return {
total: newCounts.total - oldCounts.total,
queues: {
reported: newCounts.queues.reported - oldCounts.queues.reported,
pending: newCounts.queues.pending - oldCounts.queues.pending,
unmoderated: newCounts.queues.unmoderated - oldCounts.queues.unmoderated,
},
};
};
@@ -1,108 +1,194 @@
import {
GQLCOMMENT_FLAG_REASON,
GQLCOMMENT_STATUS,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
compose,
ModerationPhaseContext,
} from "talk-server/services/comments/moderation";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { calculateCountsDiff } from "./counts";
describe("compose", () => {
it("handles when a phase throws an error", async () => {
const err = new Error("this is an error");
const enhanced = compose([
() => {
throw err;
},
]);
await expect(enhanced({} as ModerationPhaseContext)).rejects.toEqual(err);
});
it("handles when it returns a status", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const enhanced = compose([() => ({ status })]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status,
metadata: {},
actions: [],
});
});
it("merges the metadata", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const enhanced = compose([
() => ({ metadata: { first: true } }),
() => ({ status, metadata: { second: true } }),
() => ({ metadata: { third: true } }),
]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status,
metadata: { first: true, second: true },
actions: [],
});
});
it("merges actions", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const flags = [
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_TOXIC,
},
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SPAM,
},
];
const enhanced = compose([
() => ({
actions: [flags[0]],
}),
() => ({
status,
actions: [flags[1]],
}),
() => ({
actions: [
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS,
},
],
}),
]);
const final = await enhanced({} as ModerationPhaseContext);
for (const flag of flags) {
expect(final.actions).toContainEqual(flag);
}
expect(final.actions).not.toContainEqual({
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS,
});
});
it("handles when it does not return a status", async () => {
const enhanced = compose([
() => ({ metadata: { first: true } }),
() => ({ metadata: { second: true } }),
]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status: GQLCOMMENT_STATUS.NONE,
metadata: { first: true, second: true },
actions: [],
});
it("allows transition from NONE to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from NONE to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from NONE to FLAGGED*", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: 0,
queues: {
reported: 1,
pending: 0,
unmoderated: 0,
},
});
});
it("allows transition from FLAGGED* to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: -1,
queues: {
reported: -1,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from FLAGGED* to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: { FLAG: 1 } }
)
).toEqual({
total: -1,
queues: {
reported: -1,
pending: 0,
unmoderated: -1,
},
});
});
it("allows transition from PREMOD to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from PREMOD to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.PREMOD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from SYSTEM_WITHHELD to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from SYSTEM_WITHHELD to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.SYSTEM_WITHHELD, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: -1,
queues: {
reported: 0,
pending: -1,
unmoderated: -1,
},
});
});
it("allows transition from ACCEPTED to REJECTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
it("allows transition from REJECTED to ACCEPTED", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.REJECTED, actionCounts: {} },
{ status: GQLCOMMENT_STATUS.ACCEPTED, actionCounts: {} }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
it("allows no transition once a comment has been flagged more than once", () => {
expect(
calculateCountsDiff(
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 1 } },
{ status: GQLCOMMENT_STATUS.NONE, actionCounts: { FLAG: 2 } }
)
).toEqual({
total: 0,
queues: {
reported: 0,
pending: 0,
unmoderated: 0,
},
});
});
@@ -1,91 +1,103 @@
import { Omit, Promiseable } from "talk-common/types";
import { Db } from "mongodb";
import { Omit } from "talk-common/types";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { CreateActionInput } from "talk-server/models/action/comment";
import { EditCommentInput } from "talk-server/models/comment";
import { Story } from "talk-server/models/story";
import logger from "talk-server/logger";
import {
createCommentModerationAction,
CreateCommentModerationActionInput,
} from "talk-server/models/action/moderation/comment";
import { updateCommentStatus } from "talk-server/models/comment";
import { updateStoryCounts } from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
import { Request } from "talk-server/types/express";
import { AugmentedRedis } from "talk-server/services/redis";
import { calculateCountsDiff } from "./counts";
import { moderationPhases } from "./phases";
export type Moderate = Omit<CreateCommentModerationActionInput, "status">;
export type ModerationAction = Omit<
CreateActionInput,
"commentID" | "commentRevisionID"
>;
const moderate = (
status: GQLCOMMENT_STATUS.ACCEPTED | GQLCOMMENT_STATUS.REJECTED
) => async (
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
input: Moderate
) => {
// TODO: wrap these operations in a transaction?
export interface PhaseResult {
actions: ModerationAction[];
status: GQLCOMMENT_STATUS;
metadata: Record<string, any>;
}
// Create the logger.
const log = logger.child({
...input,
tenantID: tenant.id,
newStatus: status,
});
export interface ModerationPhaseContext {
story: Story;
tenant: Tenant;
comment: Partial<EditCommentInput>;
author: User;
req?: Request;
}
export type ModerationPhase = (
context: ModerationPhaseContext
) => Promiseable<PhaseResult>;
export type IntermediatePhaseResult = Partial<PhaseResult> | void;
export type IntermediateModerationPhase = (
context: ModerationPhaseContext
) => Promiseable<IntermediatePhaseResult>;
/**
* compose will create a moderation pipeline for which is executable with the
* passed actions.
*/
export const compose = (
phases: IntermediateModerationPhase[]
): ModerationPhase => async context => {
const final: PhaseResult = {
status: GQLCOMMENT_STATUS.NONE,
actions: [],
metadata: {},
};
// Loop over all the moderation phases and see if we've resolved the status.
for (const phase of phases) {
const result = await phase(context);
if (result) {
// If this result contained actions, then we should push it into the
// other actions.
const { actions } = result;
if (actions) {
final.actions.push(...actions);
}
// If this result contained metadata, then we should merge it into the
// other metadata.
const { metadata } = result;
if (metadata) {
final.metadata = {
...final.metadata,
...metadata,
};
}
// If this result contained a status, then we've finished resolving
// phases!
const { status } = result;
if (status) {
final.status = status;
break;
}
}
// Update the Comment's status.
const result = await updateCommentStatus(
mongo,
tenant.id,
input.commentID,
input.commentRevisionID,
status
);
if (!result) {
// TODO: wrap in better error?
throw new Error("specified comment not found");
}
return final;
log.trace("updated comment status");
// Create the moderation action in the audit log.
const action = await createCommentModerationAction(mongo, tenant.id, {
...input,
status,
});
if (!action) {
// TODO: wrap in better error?
throw new Error("could not create moderation action");
}
log.trace(
{ commentModerationActionID: action.id },
"created the moderation action"
);
// Update the story comment counts.
const story = await updateStoryCounts(
mongo,
redis,
tenant.id,
result.comment.storyID,
{
// Update the comment counts.
status: {
[result.oldStatus]: -1,
[status]: 1,
},
// Compute the queue difference as a result of the old status and the new
// status.
moderationQueue: calculateCountsDiff(
{
status: result.oldStatus,
actionCounts: result.comment.actionCounts,
},
{
status,
actionCounts: result.comment.actionCounts,
}
),
}
);
if (!story) {
// TODO: wrap in better error?
throw new Error("specified story not found");
}
log.trace({ oldStatus: result.oldStatus }, "adjusted story comment counts");
return result.comment;
};
/**
* process the comment and return moderation details.
*/
export const processForModeration: ModerationPhase = compose(moderationPhases);
export const accept = moderate(GQLCOMMENT_STATUS.ACCEPTED);
export const reject = moderate(GQLCOMMENT_STATUS.REJECTED);
@@ -0,0 +1,108 @@
import {
GQLCOMMENT_FLAG_REASON,
GQLCOMMENT_STATUS,
} from "talk-server/graph/tenant/schema/__generated__/types";
import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
compose,
ModerationPhaseContext,
} from "talk-server/services/comments/pipeline";
describe("compose", () => {
it("handles when a phase throws an error", async () => {
const err = new Error("this is an error");
const enhanced = compose([
() => {
throw err;
},
]);
await expect(enhanced({} as ModerationPhaseContext)).rejects.toEqual(err);
});
it("handles when it returns a status", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const enhanced = compose([() => ({ status })]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status,
metadata: {},
actions: [],
});
});
it("merges the metadata", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const enhanced = compose([
() => ({ metadata: { first: true } }),
() => ({ status, metadata: { second: true } }),
() => ({ metadata: { third: true } }),
]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status,
metadata: { first: true, second: true },
actions: [],
});
});
it("merges actions", async () => {
const status = GQLCOMMENT_STATUS.ACCEPTED;
const flags = [
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_TOXIC,
},
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_SPAM,
},
];
const enhanced = compose([
() => ({
actions: [flags[0]],
}),
() => ({
status,
actions: [flags[1]],
}),
() => ({
actions: [
{
userID: null,
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS,
},
],
}),
]);
const final = await enhanced({} as ModerationPhaseContext);
for (const flag of flags) {
expect(final.actions).toContainEqual(flag);
}
expect(final.actions).not.toContainEqual({
actionType: ACTION_TYPE.FLAG,
reason: GQLCOMMENT_FLAG_REASON.COMMENT_DETECTED_LINKS,
});
});
it("handles when it does not return a status", async () => {
const enhanced = compose([
() => ({ metadata: { first: true } }),
() => ({ metadata: { second: true } }),
]);
await expect(enhanced({} as ModerationPhaseContext)).resolves.toEqual({
status: GQLCOMMENT_STATUS.NONE,
metadata: { first: true, second: true },
actions: [],
});
});
});
@@ -0,0 +1,91 @@
import { Omit, Promiseable } from "talk-common/types";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import { CreateActionInput } from "talk-server/models/action/comment";
import { EditCommentInput } from "talk-server/models/comment";
import { Story } from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
import { Request } from "talk-server/types/express";
import { moderationPhases } from "./phases";
export type ModerationAction = Omit<
CreateActionInput,
"commentID" | "commentRevisionID" | "storyID"
>;
export interface PhaseResult {
actions: ModerationAction[];
status: GQLCOMMENT_STATUS;
metadata: Record<string, any>;
}
export interface ModerationPhaseContext {
story: Story;
tenant: Tenant;
comment: Partial<EditCommentInput>;
author: User;
req?: Request;
}
export type ModerationPhase = (
context: ModerationPhaseContext
) => Promiseable<PhaseResult>;
export type IntermediatePhaseResult = Partial<PhaseResult> | void;
export type IntermediateModerationPhase = (
context: ModerationPhaseContext
) => Promiseable<IntermediatePhaseResult>;
/**
* compose will create a moderation pipeline for which is executable with the
* passed actions.
*/
export const compose = (
phases: IntermediateModerationPhase[]
): ModerationPhase => async context => {
const final: PhaseResult = {
status: GQLCOMMENT_STATUS.NONE,
actions: [],
metadata: {},
};
// Loop over all the moderation phases and see if we've resolved the status.
for (const phase of phases) {
const result = await phase(context);
if (result) {
// If this result contained actions, then we should push it into the
// other actions.
const { actions } = result;
if (actions) {
final.actions.push(...actions);
}
// If this result contained metadata, then we should merge it into the
// other metadata.
const { metadata } = result;
if (metadata) {
final.metadata = {
...final.metadata,
...metadata,
};
}
// If this result contained a status, then we've finished resolving
// phases!
const { status } = result;
if (status) {
final.status = status;
break;
}
}
}
return final;
};
/**
* process the comment and return moderation details.
*/
export const processForModeration: ModerationPhase = compose(moderationPhases);
@@ -10,7 +10,7 @@ import { ModerationSettings } from "talk-server/models/settings";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
const testCharCount = (
settings: Partial<ModerationSettings>,
@@ -2,7 +2,7 @@ import { ModerationSettings } from "talk-server/models/settings";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
const testDisabledCommenting = (settings: Partial<ModerationSettings>) =>
settings.disableCommenting;
@@ -1,4 +1,4 @@
import { IntermediateModerationPhase } from "talk-server/services/comments/moderation";
import { IntermediateModerationPhase } from "talk-server/services/comments/pipeline";
import { commentingDisabled } from "./commentingDisabled";
import { commentLength } from "./commentLength";
@@ -6,7 +6,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
import {
getCommentTrustScore,
isReliableCommenter,
@@ -10,7 +10,7 @@ import { ModerationSettings } from "talk-server/models/settings";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
/**
* The preloaded linkify instance with common tlds.
@@ -6,7 +6,7 @@ import { ModerationSettings } from "talk-server/models/settings";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
const testModerationMode = (settings: Partial<ModerationSettings>) =>
settings.moderation === GQLMODERATION_MODE.PRE;
@@ -9,7 +9,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
export const spam: IntermediateModerationPhase = async ({
story,
@@ -5,7 +5,7 @@ import {
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
// If a given user is a staff member, always approve their comment.
export const staff: IntermediateModerationPhase = ({
@@ -4,7 +4,7 @@ import { Comment } from "talk-server/models/comment";
import { Story } from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
import { User } from "talk-server/models/user";
import { storyClosed } from "talk-server/services/comments/moderation/phases/storyClosed";
import { storyClosed } from "talk-server/services/comments/pipeline/phases/storyClosed";
describe("storyClosed", () => {
it("throws an error when the story is closed", () => {
@@ -1,7 +1,7 @@
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
// This phase checks to see if the story being processed is closed or not.
export const storyClosed: IntermediateModerationPhase = ({
@@ -13,7 +13,7 @@ import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
} from "talk-server/services/comments/pipeline";
export const toxic: IntermediateModerationPhase = async ({
tenant,
@@ -6,8 +6,8 @@ import { ACTION_TYPE } from "talk-server/models/action/comment";
import {
IntermediateModerationPhase,
IntermediatePhaseResult,
} from "talk-server/services/comments/moderation";
import { containsMatchingPhraseMemoized } from "talk-server/services/comments/moderation/wordList";
} from "talk-server/services/comments/pipeline";
import { containsMatchingPhraseMemoized } from "talk-server/services/comments/pipeline/wordList";
// This phase checks the comment against the wordList.
export const wordList: IntermediateModerationPhase = ({
@@ -1,4 +1,4 @@
import { containsMatchingPhrase } from "talk-server/services/comments/moderation/wordList";
import { containsMatchingPhrase } from "talk-server/services/comments/pipeline/wordList";
const phrases = [
"cookies",
@@ -22,8 +22,7 @@ export function generateRegExp(phrases: string[]) {
.join('[\\s"?!.]+')
)
.join("|");
return new RegExp(`(^|[^\\w])(${inner})(?=[^\\w]|$)`, "iu");
return new RegExp(`(^|[^\\w])(${inner})(?=[^\\w]|$)`, "gmiu");
}
export const generateRegExpMemoized = memoize(generateRegExp);
@@ -1,81 +0,0 @@
import { Db } from "mongodb";
import { Omit } from "talk-common/types";
import { GQLCOMMENT_STATUS } from "talk-server/graph/tenant/schema/__generated__/types";
import logger from "talk-server/logger";
import {
createCommentModerationAction,
CreateCommentModerationActionInput,
} from "talk-server/models/action/moderation/comment";
import { updateCommentStatus } from "talk-server/models/comment";
import { updateCommentStatusCount } from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
export type Moderate = Omit<CreateCommentModerationActionInput, "status">;
const moderate = (status: GQLCOMMENT_STATUS) => async (
mongo: Db,
tenant: Tenant,
input: Moderate
) => {
// TODO: wrap these operations in a transaction?
// Create the logger.
const log = logger.child({
...input,
tenantID: tenant.id,
newStatus: status,
});
// Update the Comment's status.
const result = await updateCommentStatus(
mongo,
tenant.id,
input.commentID,
input.commentRevisionID,
status
);
if (!result) {
// TODO: wrap in better error?
throw new Error("specified comment not found");
}
log.trace("updated comment status");
// Create the moderation action in the audit log.
const action = await createCommentModerationAction(mongo, tenant.id, {
...input,
status,
});
if (!action) {
// TODO: wrap in better error?
throw new Error("could not create moderation action");
}
log.trace(
{ commentModerationActionID: action.id },
"created the moderation action"
);
// Update the story comment counts.
const story = await updateCommentStatusCount(
mongo,
tenant.id,
result.comment.storyID,
{
[result.oldStatus]: -1,
[status]: 1,
}
);
if (!story) {
// TODO: wrap in better error?
throw new Error("specified story not found");
}
log.trace({ oldStatus: result.oldStatus }, "adjusted story comment counts");
return result.comment;
};
export const accept = moderate(GQLCOMMENT_STATUS.ACCEPTED);
export const reject = moderate(GQLCOMMENT_STATUS.REJECTED);
+33 -3
View File
@@ -1,11 +1,41 @@
import RedisClient, { Redis } from "ioredis";
import RedisClient, { Pipeline, Redis } from "ioredis";
import { Config } from "talk-common/config";
import { Omit } from "talk-common/types";
export interface AugmentedRedisCommands {
mhincrby(key: string, ...args: any[]): Promise<void>;
}
export type AugmentedPipeline = Pipeline & AugmentedRedisCommands;
export type AugmentedRedis = Omit<Redis, "pipeline"> &
AugmentedRedisCommands & {
pipeline(commands?: string[][]): AugmentedPipeline;
};
function configureRedisClient(redis: Redis) {
// mhincrby will increment many hash values.
redis.defineCommand("mhincrby", {
numberOfKeys: 1,
lua: `
for i = 1, #ARGV, 2 do
redis.call('HINCRBY', KEYS[1], ARGV[i], ARGV[i + 1])
end
`,
});
}
/**
* create will connect to the Redis instance identified in the configuration.
*
* @param config application configuration.
*/
export function createRedisClient(config: Config): Redis {
return new RedisClient(config.get("redis"), {});
export function createRedisClient(config: Config): AugmentedRedis {
const redis = new RedisClient(config.get("redis"), {});
// Configure the redis client for use with the custom commands.
configureRedisClient(redis);
return redis as AugmentedRedis;
}
+11 -8
View File
@@ -30,9 +30,9 @@ import {
retrieveManyStories,
retrieveStory,
Story,
updateCommentStatusCount,
updateStory,
updateStoryActionCounts,
updateStoryCommentStatusCount,
UpdateStoryInput,
} from "talk-server/models/story";
import { Tenant } from "talk-server/models/tenant";
@@ -40,6 +40,8 @@ import Task from "talk-server/queue/Task";
import { ScraperData } from "talk-server/queue/tasks/scraper";
import { scrape } from "talk-server/services/stories/scraper";
import { AugmentedRedis } from "../redis";
export type FindOrCreateStory = FindOrCreateStoryInput;
export async function findOrCreate(
@@ -155,7 +157,7 @@ export async function remove(
);
log.debug({ removedComments }, "removed comments while deleting story");
} else if (calculateTotalCommentCount(story.commentCounts) > 0) {
} else if (calculateTotalCommentCount(story.commentCounts.status) > 0) {
log.warn(
"attempted to remove story that has linked comments without consent for deleting comments"
);
@@ -227,6 +229,7 @@ export async function update(
export async function merge(
mongo: Db,
redis: AugmentedRedis,
tenant: Tenant,
destinationID: string,
sourceIDs: string[]
@@ -288,27 +291,27 @@ export async function merge(
// Merge the comment and action counts for all the source stories.
const [, ...sourceStories] = stories;
let destinationStory = await updateCommentStatusCount(
let destinationStory = await updateStoryCommentStatusCount(
mongo,
redis,
tenant.id,
destinationID,
mergeCommentStatusCount(
// We perform the type assertion here because above, we already verified
// that none of the stories are null.
(sourceStories as Story[]).map(({ commentCounts }) => commentCounts)
(sourceStories as Story[]).map(({ commentCounts: { status } }) => status)
)
);
const mergedActionCounts = mergeCommentActionCounts(
// We perform the type assertion here because above, we already verified
// that none of the stories are null.
(sourceStories as Story[]).map(
({ commentActionCounts }) => commentActionCounts
)
...(sourceStories as Story[]).map(({ commentCounts: { action } }) => action)
);
if (countTotalActionCounts(mergedActionCounts) > 0) {
destinationStory = await updateStoryActionCounts(
mongo,
redis,
tenant.id,
destinationID,
mergedActionCounts
@@ -321,7 +324,7 @@ export async function merge(
}
log.debug(
{ commentCounts: destinationStory.commentCounts },
{ commentCounts: destinationStory.commentCounts.status },
"updated destination story with new comment counts"
);