From 2eec87aa80e5f10787c66cef725e6f26c0786953 Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Wed, 6 Dec 2017 16:10:42 -0700 Subject: [PATCH] modified asset loading to be less lazy --- graph/loaders/assets.js | 164 ++++++++++++++++++++++++-------------- graph/loaders/comments.js | 36 ++++----- graph/loaders/settings.js | 16 +++- models/comment.js | 28 +++++-- services/cache.js | 104 ++++++++++++++++++++---- services/hcache.js | 61 -------------- services/settings.js | 6 +- 7 files changed, 245 insertions(+), 170 deletions(-) delete mode 100644 services/hcache.js diff --git a/graph/loaders/assets.js b/graph/loaders/assets.js index 832b7b473..6ea7e6599 100644 --- a/graph/loaders/assets.js +++ b/graph/loaders/assets.js @@ -1,31 +1,14 @@ const DataLoader = require('dataloader'); -const url = require('url'); +const {URL} = require('url'); +const {singleJoinBy, SingletonResolver} = require('./util'); -const errors = require('../../errors'); -const scraper = require('../../services/scraper'); -const util = require('./util'); - -const AssetModel = require('../../models/asset'); -const AssetsService = require('../../services/assets'); - -/** - * Retrieves assets by an array of ids. - * @param {Object} context the context of the request - * @param {Array} ids array of ids to lookup - */ -const genAssetsByID = (context, ids) => AssetModel.find({ +const genAssetsByID = ({connectors: {models: {Asset}}}, ids) => Asset.find({ id: { $in: ids } -}).then(util.singleJoinBy(ids, 'id')); +}).then(singleJoinBy(ids, 'id')); -/** - * [getAssetsByQuery description] - * @param {Object} context the context of the request - * @param {Object} query the query - * @return {Promise} resolves the assets - */ -const getAssetsByQuery = async (context, query) => { +const getAssetsByQuery = async ({connectors: {services: {Assets}}}, query) => { // If we are requesting based on a limit, ask for one more than we want. const limit = query.limit; @@ -33,7 +16,7 @@ const getAssetsByQuery = async (context, query) => { query.limit += 1; } - const nodes = await AssetsService.search(query); + const nodes = await Assets.search(query); // The hasNextPage is always handled the same (ask for one more than we need, // if there is one more, than there is more). @@ -54,58 +37,121 @@ const getAssetsByQuery = async (context, query) => { }; }; -/** - * This endpoint find or creates an asset at the given url when it is loaded. - * @param {Object} context the context of the request - * @param {String} asset_url the url passed in from the query - * @returns {Promise} resolves to the asset - */ -const findOrCreateAssetByURL = async (context, asset_url) => { +const findOrCreateAssetByURL = async (ctx, url) => { - // Verify that the asset_url is parsable. - let parsed_asset_url = url.parse(asset_url); - if (!parsed_asset_url.protocol) { - throw errors.ErrInvalidAssetURL; + // Pull our connectors out of the context. + const { + loaders: { + Assets, + Settings, + }, + connectors: { + models: { + Asset, + }, + services: { + DomainList, + Scraper, + }, + errors: { + ErrInvalidAssetURL, + }, + }, + } = ctx; + + // Try to validate that the url is valid. If the URL constructor throws an + // error, throw our internal ErrInvalidAssetURL instead. This will validate + // that the url contains a valid scheme. + try { + new URL(url); + } catch (err) { + throw ErrInvalidAssetURL; } - let asset = await AssetsService.findOrCreateByUrl(asset_url); + // Try the easy lookup first. + let asset = await Assets.findByUrl(url); + if (asset) { + return asset; + } - // If the asset wasn't scraped before, scrape it! Otherwise just return - // the asset. + // Seems the asset wasn't here yet.. We should do some validation. + + // Check for whitelisting + get the settings at the same time. + const [ + whitelisted, + settings, + ] = await Promise.all([ + DomainList.urlCheck(url), + Settings.load('autoCloseStream closedTimeout'), + ]); + + // If the domain wasn't whitelisted, then we shouldn't create this asset! + if (!whitelisted) { + throw ErrInvalidAssetURL; + } + + // Construct the update operator that we'll use to create the asset. + const update = { + $setOnInsert: { + url, + }, + }; + + // If the auto-close stream is enabled, close the stream after the designated + // timeout. + if (settings.autoCloseStream) { + update.$setOnInsert.closedAt = new Date(Date.now() + settings.closedTimeout * 1000); + } + + // We're using the findOneAndUpdate here instead of a insert to protect + // against race conditions. + asset = await Asset.findOneAndUpdate({ + url, + }, update, { + + // Ensure that if it's new, we return the new object created. + new: true, + + // Perform an upsert in the event that this doesn't exist. + upsert: true, + + // Set the default values if not provided based on the mongoose models. + setDefaultsOnInsert: true, + + // Ensure that we validate the input that we do have. + runValidators: true, + }); + + // If this is a new asset, then we need to scrape it! if (!asset.scraped) { - await scraper.create(asset); + + // Create the Scraper job. + await Scraper.create(asset); } return asset; }; -const findByUrl = async (context, asset_url) => { +const findByUrl = async ({connectors: {errors, services: {Assets}}}, asset_url) => { - // Verify that the asset_url is parsable. - let parsed_asset_url = url.parse(asset_url); - if (!parsed_asset_url.protocol) { + // Try to validate that the url is valid. If the URL constructor throws an + // error, throw our internal ErrInvalidAssetURL instead. This will validate + // that the url contains a valid scheme. + try { + new URL(asset_url); + } catch (err) { throw errors.ErrInvalidAssetURL; } - return AssetsService.findByUrl(asset_url); + return Assets.findByUrl(asset_url); }; -/** - * Creates a set of loaders based on a GraphQL context. - * @param {Object} context the context of the GraphQL request - * @return {Object} object of loaders - */ - -module.exports = (context) => ({ +module.exports = (ctx) => ({ Assets: { - - // TODO: decide whether we want to move these to mutators or not, as in fact - // this operation create a new asset if one isn't found. - getByURL: (url) => findOrCreateAssetByURL(context, url), - - findByUrl: (url) => findByUrl(context, url), - getByQuery: (query) => getAssetsByQuery(context, query), - getByID: new DataLoader((ids) => genAssetsByID(context, ids)), - getAll: new util.SingletonResolver(() => AssetModel.find({})) + getByURL: (url) => findOrCreateAssetByURL(ctx, url), + findByUrl: (url) => findByUrl(ctx, url), + getByQuery: (query) => getAssetsByQuery(ctx, query), + getByID: new DataLoader((ids) => genAssetsByID(ctx, ids)), + getAll: new SingletonResolver(() => ctx.connectors.models.Asset.find({})) } }); diff --git a/graph/loaders/comments.js b/graph/loaders/comments.js index 2ad94deac..050bd6340 100644 --- a/graph/loaders/comments.js +++ b/graph/loaders/comments.js @@ -82,54 +82,52 @@ const getParentCountsByAssetID = (context, asset_ids) => { /** * Retrieves the count of comments based on the passed in query. - * @param {Object} context graph context + * @param {Object} ctx graph context * @param {Object} query query to execute against the comments collection * to compute the counts * @return {Promise} resolves to the counts of the comments from the * query */ -const getCommentCountByQuery = (context, {ids, statuses, asset_id, parent_id, author_id, tags, action_type}) => { - let query = CommentModel.find(); +const getCommentCountByQuery = (ctx, options) => { + const {statuses, asset_id, parent_id, author_id, tags, action_type} = options; // If user queries for statuses other than NONE and/or ACCEPTED statuses, it needs // special privileges. if ( (!statuses || statuses.some((status) => !['NONE', 'ACCEPTED'].includes(status))) && - (context.user == null || !context.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS)) + (ctx.user == null || !ctx.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS)) ) { return null; } - if (statuses && statuses.length > 0) { - query = query.where({status: {$in: statuses}}); - } - - if (ids) { - query = query.where({id: {$in: ids}}); - } + const query = CommentModel.find(); if (asset_id != null) { - query = query.where({asset_id}); + query.merge({asset_id}); } if (parent_id !== undefined) { - query = query.where({parent_id}); + query.merge({parent_id}); } if (author_id) { - query = query.where({author_id}); + query.merge({author_id}); } - if (context.user != null && context.user.can(SEARCH_OTHERS_COMMENTS) && action_type) { - query = query.where({ + if (ctx.user != null && ctx.user.can(SEARCH_OTHERS_COMMENTS) && action_type) { + query.merge({ [`action_counts.${sc(action_type.toLowerCase())}`]: { $gt: 0, }, }); } - if (tags) { - query = query.find({ + if (statuses && statuses.length > 0) { + query.merge({status: {$in: statuses}}); + } + + if (tags && tags.length > 0) { + query.merge({ 'tags.tag.name': { $in: tags, }, @@ -289,7 +287,7 @@ const getCommentsByQuery = async (ctx, {ids, statuses, asset_id, parent_id, auth let comments = CommentModel.find(); // If user queries for statuses other than NONE and/or ACCEPTED statuses, it needs - // special priviledges. + // special privileges. if ( (!statuses || statuses.some((status) => !['NONE', 'ACCEPTED'].includes(status))) && (ctx.user == null || !ctx.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS)) diff --git a/graph/loaders/settings.js b/graph/loaders/settings.js index 2d6189b7d..a7a4b74e1 100644 --- a/graph/loaders/settings.js +++ b/graph/loaders/settings.js @@ -1,11 +1,19 @@ const SettingsService = require('../../services/settings'); -const {SingletonResolver} = require('./util'); +const DataLoader = require('dataloader'); /** * Creates a set of loaders based on a GraphQL context. * @param {Object} context the context of the GraphQL request * @return {Object} object of loaders */ -module.exports = () => ({ - Settings: new SingletonResolver(() => SettingsService.retrieve()) -}); +module.exports = () => { + const loader = new DataLoader((selections) => Promise.all(selections.map((fields) => { + return SettingsService.retrieve(fields); + }))); + + return { + Settings: { + load: (fields = false) => loader.load(fields), + } + }; +}; diff --git a/models/comment.js b/models/comment.js index bc4cecd5d..ae6aa558f 100644 --- a/models/comment.js +++ b/models/comment.js @@ -123,20 +123,23 @@ CommentSchema.index({ background: true, }); -// Add an index that is optimized for sorting based on the action count data. -CommentSchema.index({ - 'created_at': 1, - 'action_counts.flag': 1, -}, { - background: true, -}); - +// Create a sparse index to search across. CommentSchema.index({ 'created_at': 1, 'action_counts.flag': 1, 'status': 1, }, { background: true, + sparse: true, +}); + +// Create a sparse index to search across. +CommentSchema.index({ + 'action_counts.flag': 1, + 'status': 1, +}, { + background: true, + sparse: true, }); // Add an index that is optimized for finding flagged comments. @@ -166,6 +169,15 @@ CommentSchema.index({ background: true, }); +// Optimize for tag searches/counts. +CommentSchema.index({ + 'tags.tag.name': 1, + 'status': 1, +}, { + background: true, + sparse: true, +}); + // Add an index that is optimized for sorting based on the created_at timestamp // but also good at locating comments that have a specific asset id. CommentSchema.index({ diff --git a/services/cache.js b/services/cache.js index 155992fe0..cac05c091 100644 --- a/services/cache.js +++ b/services/cache.js @@ -61,30 +61,36 @@ cache.init = async () => { // This is designed to increment a key and add an expiry iff the key already // exists. - const INCR_SCRIPT = ` - if redis.call('GET', KEYS[1]) ~= false then - redis.call('INCR', KEYS[1]) - redis.call('EXPIRE', KEYS[1], ARGV[1]) - end - `; - cache.client.defineCommand('increx', { numberOfKeys: 1, - lua: INCR_SCRIPT, + lua: ` + if redis.call('GET', KEYS[1]) ~= false then + redis.call('INCR', KEYS[1]) + redis.call('EXPIRE', KEYS[1], ARGV[1]) + end + `, }); // This is designed to decrement a key and add an expiry iff the key already // exists. - const DECR_SCRIPT = ` - if redis.call('GET', KEYS[1]) ~= false then - redis.call('DECR', KEYS[1]) - redis.call('EXPIRE', KEYS[1], ARGV[1]) - end - `; - cache.client.defineCommand('decrex', { numberOfKeys: 1, - lua: DECR_SCRIPT, + lua: ` + if redis.call('GET', KEYS[1]) ~= false then + redis.call('DECR', KEYS[1]) + redis.call('EXPIRE', KEYS[1], ARGV[1]) + end + `, + }); + + cache.client.defineCommand('hincrbyex', { + numberOfKeys: 2, + lua: ` + if redis.call('HGET', KEYS[1], KEYS[2]) ~= false then + redis.call('HINCRBY', KEYS[1], KEYS[2], ARGV[1]) + redis.call('EXPIRE', KEYS[1], ARGV[2]) + end + `, }); }; @@ -270,3 +276,69 @@ cache.set = async (key, value, expiry, kf = keyfunc) => { return cache.client.set(kf(key), reply, 'EX', expiry); }; + +/** + * h is the hash form of the cache. + */ +cache.h = {}; + +cache.h.get = async (key, field = '__default__') => { + + // Get the current value from redis. + const reply = await cache.client.hget(keyfunc(key), field); + + if (typeof reply !== 'undefined' && reply !== null) { + return JSON.parse(reply); + } + + return null; +}; + +cache.h.set = async (key, field = '__default__', value, expiry = 60) => { + + // Serialize the value as JSON. + let reply = JSON.stringify(value); + + return cache.client + .pipeline() + .hset(keyfunc(key), field, reply) + .expire(keyfunc(key), expiry) + .exec(); +}; + +cache.h.invalidate = async (key, field = null) => { + if (field === null) { + return cache.invalidate(key); + } + + debug(`invalidate: ${keyfunc(key)} ${field}`); + + return cache.client.hdel(keyfunc(key), field); +}; + +cache.h.wrap = async (key, field, expiry, work) => { + let value = await cache.h.get(key, field); + if (value !== null) { + debug('wrap: hit', keyfunc(key)); + return value; + } + + debug('wrap: miss', keyfunc(key)); + + value = await work(); + + process.nextTick(async () => { + try { + await cache.h.set(key, field, value, expiry); + debug('wrap: set complete'); + } catch (err) { + console.error(err); + } + }); + + return value; +}; + +cache.h.incr = async (key, field = '__default__', expiry) => cache.client.hincrbyex(keyfunc(key), field, 1, expiry); + +cache.h.decr = async (key, field = '__default__', expiry) => cache.client.hincrbyex(keyfunc(key), field, -1, expiry); diff --git a/services/hcache.js b/services/hcache.js deleted file mode 100644 index 72e52d9eb..000000000 --- a/services/hcache.js +++ /dev/null @@ -1,61 +0,0 @@ -const cache = require('./cache'); -const debug = require('debug')('talk:services:hcache'); - -const kf = (key) => `hcache:${key}`; - -const hcache = module.exports = {}; - -hcache.get = async (key, field = '__default__') => { - - // Get the current value from redis. - const reply = await cache.client.hget(kf(key), field); - - if (typeof reply !== 'undefined' && reply !== null) { - return JSON.parse(reply); - } - - return null; -}; - -hcache.set = async (key, field = '__default__', value, expiry = 60) => { - - // Serialize the value as JSON. - let reply = JSON.stringify(value); - - return cache.client - .pipeline() - .hset(kf(key), field, reply) - .expire(kf(key), expiry) - .exec(); -}; - -hcache.del = async (key, field = null) => { - if (field === null) { - return cache.client.del(kf(key)); - } - - return cache.client.hdel(kf(key), field); -}; - -hcache.wrap = async (key, field, expiry, work) => { - let value = await hcache.get(key, field); - if (value !== null) { - debug('wrap: hit', kf(key)); - return value; - } - - debug('wrap: miss', kf(key)); - - value = await work(); - - process.nextTick(async () => { - try { - await hcache.set(key, field, value, expiry); - debug('wrap: set complete'); - } catch (err) { - console.error(err); - } - }); - - return value; -}; diff --git a/services/settings.js b/services/settings.js index 90ff6bbd6..e8537c9c9 100644 --- a/services/settings.js +++ b/services/settings.js @@ -1,5 +1,5 @@ const SettingModel = require('../models/setting'); -const hcache = require('./hcache'); +const cache = require('./cache'); const errors = require('../errors'); const {dotize} = require('./utils'); @@ -35,7 +35,7 @@ module.exports = class SettingsService { if (process.env.NODE_ENV === 'production') { // When in production, wrap the settings retrieval with a cache. - const settings = await hcache.wrap('settings', fields, 60, () => retrieve(fields)); + const settings = await cache.h.wrap('settings', fields, 60, () => retrieve(fields)); return new SettingModel(settings); } @@ -58,7 +58,7 @@ module.exports = class SettingsService { }); if (process.env.NODE_ENV === 'production') { - await hcache.del('settings'); + await cache.h.invalidate('settings'); } return updatedSettings;