Merge branch 'master' into document-optimization-v2

This commit is contained in:
Wyatt Johnson
2017-12-08 15:08:15 -07:00
committed by GitHub
7 changed files with 245 additions and 170 deletions
+105 -59
View File
@@ -1,31 +1,14 @@
const DataLoader = require('dataloader');
const url = require('url');
const {URL} = require('url');
const {singleJoinBy, SingletonResolver} = require('./util');
const errors = require('../../errors');
const scraper = require('../../services/scraper');
const util = require('./util');
const AssetModel = require('../../models/asset');
const AssetsService = require('../../services/assets');
/**
* Retrieves assets by an array of ids.
* @param {Object} context the context of the request
* @param {Array} ids array of ids to lookup
*/
const genAssetsByID = (context, ids) => AssetModel.find({
const genAssetsByID = ({connectors: {models: {Asset}}}, ids) => Asset.find({
id: {
$in: ids
}
}).then(util.singleJoinBy(ids, 'id'));
}).then(singleJoinBy(ids, 'id'));
/**
* [getAssetsByQuery description]
* @param {Object} context the context of the request
* @param {Object} query the query
* @return {Promise} resolves the assets
*/
const getAssetsByQuery = async (context, query) => {
const getAssetsByQuery = async ({connectors: {services: {Assets}}}, query) => {
// If we are requesting based on a limit, ask for one more than we want.
const limit = query.limit;
@@ -33,7 +16,7 @@ const getAssetsByQuery = async (context, query) => {
query.limit += 1;
}
const nodes = await AssetsService.search(query);
const nodes = await Assets.search(query);
// The hasNextPage is always handled the same (ask for one more than we need,
// if there is one more, than there is more).
@@ -54,58 +37,121 @@ const getAssetsByQuery = async (context, query) => {
};
};
/**
* This endpoint find or creates an asset at the given url when it is loaded.
* @param {Object} context the context of the request
* @param {String} asset_url the url passed in from the query
* @returns {Promise} resolves to the asset
*/
const findOrCreateAssetByURL = async (context, asset_url) => {
const findOrCreateAssetByURL = async (ctx, url) => {
// Verify that the asset_url is parsable.
let parsed_asset_url = url.parse(asset_url);
if (!parsed_asset_url.protocol) {
throw errors.ErrInvalidAssetURL;
// Pull our connectors out of the context.
const {
loaders: {
Assets,
Settings,
},
connectors: {
models: {
Asset,
},
services: {
DomainList,
Scraper,
},
errors: {
ErrInvalidAssetURL,
},
},
} = ctx;
// Try to validate that the url is valid. If the URL constructor throws an
// error, throw our internal ErrInvalidAssetURL instead. This will validate
// that the url contains a valid scheme.
try {
new URL(url);
} catch (err) {
throw ErrInvalidAssetURL;
}
let asset = await AssetsService.findOrCreateByUrl(asset_url);
// Try the easy lookup first.
let asset = await Assets.findByUrl(url);
if (asset) {
return asset;
}
// If the asset wasn't scraped before, scrape it! Otherwise just return
// the asset.
// Seems the asset wasn't here yet.. We should do some validation.
// Check for whitelisting + get the settings at the same time.
const [
whitelisted,
settings,
] = await Promise.all([
DomainList.urlCheck(url),
Settings.load('autoCloseStream closedTimeout'),
]);
// If the domain wasn't whitelisted, then we shouldn't create this asset!
if (!whitelisted) {
throw ErrInvalidAssetURL;
}
// Construct the update operator that we'll use to create the asset.
const update = {
$setOnInsert: {
url,
},
};
// If the auto-close stream is enabled, close the stream after the designated
// timeout.
if (settings.autoCloseStream) {
update.$setOnInsert.closedAt = new Date(Date.now() + settings.closedTimeout * 1000);
}
// We're using the findOneAndUpdate here instead of a insert to protect
// against race conditions.
asset = await Asset.findOneAndUpdate({
url,
}, update, {
// Ensure that if it's new, we return the new object created.
new: true,
// Perform an upsert in the event that this doesn't exist.
upsert: true,
// Set the default values if not provided based on the mongoose models.
setDefaultsOnInsert: true,
// Ensure that we validate the input that we do have.
runValidators: true,
});
// If this is a new asset, then we need to scrape it!
if (!asset.scraped) {
await scraper.create(asset);
// Create the Scraper job.
await Scraper.create(asset);
}
return asset;
};
const findByUrl = async (context, asset_url) => {
const findByUrl = async ({connectors: {errors, services: {Assets}}}, asset_url) => {
// Verify that the asset_url is parsable.
let parsed_asset_url = url.parse(asset_url);
if (!parsed_asset_url.protocol) {
// Try to validate that the url is valid. If the URL constructor throws an
// error, throw our internal ErrInvalidAssetURL instead. This will validate
// that the url contains a valid scheme.
try {
new URL(asset_url);
} catch (err) {
throw errors.ErrInvalidAssetURL;
}
return AssetsService.findByUrl(asset_url);
return Assets.findByUrl(asset_url);
};
/**
* Creates a set of loaders based on a GraphQL context.
* @param {Object} context the context of the GraphQL request
* @return {Object} object of loaders
*/
module.exports = (context) => ({
module.exports = (ctx) => ({
Assets: {
// TODO: decide whether we want to move these to mutators or not, as in fact
// this operation create a new asset if one isn't found.
getByURL: (url) => findOrCreateAssetByURL(context, url),
findByUrl: (url) => findByUrl(context, url),
getByQuery: (query) => getAssetsByQuery(context, query),
getByID: new DataLoader((ids) => genAssetsByID(context, ids)),
getAll: new util.SingletonResolver(() => AssetModel.find({}))
getByURL: (url) => findOrCreateAssetByURL(ctx, url),
findByUrl: (url) => findByUrl(ctx, url),
getByQuery: (query) => getAssetsByQuery(ctx, query),
getByID: new DataLoader((ids) => genAssetsByID(ctx, ids)),
getAll: new SingletonResolver(() => ctx.connectors.models.Asset.find({}))
}
});
+17 -19
View File
@@ -82,54 +82,52 @@ const getParentCountsByAssetID = (context, asset_ids) => {
/**
* Retrieves the count of comments based on the passed in query.
* @param {Object} context graph context
* @param {Object} ctx graph context
* @param {Object} query query to execute against the comments collection
* to compute the counts
* @return {Promise} resolves to the counts of the comments from the
* query
*/
const getCommentCountByQuery = (context, {ids, statuses, asset_id, parent_id, author_id, tags, action_type}) => {
let query = CommentModel.find();
const getCommentCountByQuery = (ctx, options) => {
const {statuses, asset_id, parent_id, author_id, tags, action_type} = options;
// If user queries for statuses other than NONE and/or ACCEPTED statuses, it needs
// special privileges.
if (
(!statuses || statuses.some((status) => !['NONE', 'ACCEPTED'].includes(status))) &&
(context.user == null || !context.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS))
(ctx.user == null || !ctx.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS))
) {
return null;
}
if (statuses && statuses.length > 0) {
query = query.where({status: {$in: statuses}});
}
if (ids) {
query = query.where({id: {$in: ids}});
}
const query = CommentModel.find();
if (asset_id != null) {
query = query.where({asset_id});
query.merge({asset_id});
}
if (parent_id !== undefined) {
query = query.where({parent_id});
query.merge({parent_id});
}
if (author_id) {
query = query.where({author_id});
query.merge({author_id});
}
if (context.user != null && context.user.can(SEARCH_OTHERS_COMMENTS) && action_type) {
query = query.where({
if (ctx.user != null && ctx.user.can(SEARCH_OTHERS_COMMENTS) && action_type) {
query.merge({
[`action_counts.${sc(action_type.toLowerCase())}`]: {
$gt: 0,
},
});
}
if (tags) {
query = query.find({
if (statuses && statuses.length > 0) {
query.merge({status: {$in: statuses}});
}
if (tags && tags.length > 0) {
query.merge({
'tags.tag.name': {
$in: tags,
},
@@ -289,7 +287,7 @@ const getCommentsByQuery = async (ctx, {ids, statuses, asset_id, parent_id, auth
let comments = CommentModel.find();
// If user queries for statuses other than NONE and/or ACCEPTED statuses, it needs
// special priviledges.
// special privileges.
if (
(!statuses || statuses.some((status) => !['NONE', 'ACCEPTED'].includes(status))) &&
(ctx.user == null || !ctx.user.can(SEARCH_NON_NULL_OR_ACCEPTED_COMMENTS))
+12 -4
View File
@@ -1,11 +1,19 @@
const SettingsService = require('../../services/settings');
const {SingletonResolver} = require('./util');
const DataLoader = require('dataloader');
/**
* Creates a set of loaders based on a GraphQL context.
* @param {Object} context the context of the GraphQL request
* @return {Object} object of loaders
*/
module.exports = () => ({
Settings: new SingletonResolver(() => SettingsService.retrieve())
});
module.exports = () => {
const loader = new DataLoader((selections) => Promise.all(selections.map((fields) => {
return SettingsService.retrieve(fields);
})));
return {
Settings: {
load: (fields = false) => loader.load(fields),
}
};
};
+20 -8
View File
@@ -123,20 +123,23 @@ CommentSchema.index({
background: true,
});
// Add an index that is optimized for sorting based on the action count data.
CommentSchema.index({
'created_at': 1,
'action_counts.flag': 1,
}, {
background: true,
});
// Create a sparse index to search across.
CommentSchema.index({
'created_at': 1,
'action_counts.flag': 1,
'status': 1,
}, {
background: true,
sparse: true,
});
// Create a sparse index to search across.
CommentSchema.index({
'action_counts.flag': 1,
'status': 1,
}, {
background: true,
sparse: true,
});
// Add an index that is optimized for finding flagged comments.
@@ -166,6 +169,15 @@ CommentSchema.index({
background: true,
});
// Optimize for tag searches/counts.
CommentSchema.index({
'tags.tag.name': 1,
'status': 1,
}, {
background: true,
sparse: true,
});
// Add an index that is optimized for sorting based on the created_at timestamp
// but also good at locating comments that have a specific asset id.
CommentSchema.index({
+88 -16
View File
@@ -61,30 +61,36 @@ cache.init = async () => {
// This is designed to increment a key and add an expiry iff the key already
// exists.
const INCR_SCRIPT = `
if redis.call('GET', KEYS[1]) ~= false then
redis.call('INCR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
`;
cache.client.defineCommand('increx', {
numberOfKeys: 1,
lua: INCR_SCRIPT,
lua: `
if redis.call('GET', KEYS[1]) ~= false then
redis.call('INCR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
`,
});
// This is designed to decrement a key and add an expiry iff the key already
// exists.
const DECR_SCRIPT = `
if redis.call('GET', KEYS[1]) ~= false then
redis.call('DECR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
`;
cache.client.defineCommand('decrex', {
numberOfKeys: 1,
lua: DECR_SCRIPT,
lua: `
if redis.call('GET', KEYS[1]) ~= false then
redis.call('DECR', KEYS[1])
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
`,
});
cache.client.defineCommand('hincrbyex', {
numberOfKeys: 2,
lua: `
if redis.call('HGET', KEYS[1], KEYS[2]) ~= false then
redis.call('HINCRBY', KEYS[1], KEYS[2], ARGV[1])
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
`,
});
};
@@ -270,3 +276,69 @@ cache.set = async (key, value, expiry, kf = keyfunc) => {
return cache.client.set(kf(key), reply, 'EX', expiry);
};
/**
* h is the hash form of the cache.
*/
cache.h = {};
cache.h.get = async (key, field = '__default__') => {
// Get the current value from redis.
const reply = await cache.client.hget(keyfunc(key), field);
if (typeof reply !== 'undefined' && reply !== null) {
return JSON.parse(reply);
}
return null;
};
cache.h.set = async (key, field = '__default__', value, expiry = 60) => {
// Serialize the value as JSON.
let reply = JSON.stringify(value);
return cache.client
.pipeline()
.hset(keyfunc(key), field, reply)
.expire(keyfunc(key), expiry)
.exec();
};
cache.h.invalidate = async (key, field = null) => {
if (field === null) {
return cache.invalidate(key);
}
debug(`invalidate: ${keyfunc(key)} ${field}`);
return cache.client.hdel(keyfunc(key), field);
};
cache.h.wrap = async (key, field, expiry, work) => {
let value = await cache.h.get(key, field);
if (value !== null) {
debug('wrap: hit', keyfunc(key));
return value;
}
debug('wrap: miss', keyfunc(key));
value = await work();
process.nextTick(async () => {
try {
await cache.h.set(key, field, value, expiry);
debug('wrap: set complete');
} catch (err) {
console.error(err);
}
});
return value;
};
cache.h.incr = async (key, field = '__default__', expiry) => cache.client.hincrbyex(keyfunc(key), field, 1, expiry);
cache.h.decr = async (key, field = '__default__', expiry) => cache.client.hincrbyex(keyfunc(key), field, -1, expiry);
-61
View File
@@ -1,61 +0,0 @@
const cache = require('./cache');
const debug = require('debug')('talk:services:hcache');
const kf = (key) => `hcache:${key}`;
const hcache = module.exports = {};
hcache.get = async (key, field = '__default__') => {
// Get the current value from redis.
const reply = await cache.client.hget(kf(key), field);
if (typeof reply !== 'undefined' && reply !== null) {
return JSON.parse(reply);
}
return null;
};
hcache.set = async (key, field = '__default__', value, expiry = 60) => {
// Serialize the value as JSON.
let reply = JSON.stringify(value);
return cache.client
.pipeline()
.hset(kf(key), field, reply)
.expire(kf(key), expiry)
.exec();
};
hcache.del = async (key, field = null) => {
if (field === null) {
return cache.client.del(kf(key));
}
return cache.client.hdel(kf(key), field);
};
hcache.wrap = async (key, field, expiry, work) => {
let value = await hcache.get(key, field);
if (value !== null) {
debug('wrap: hit', kf(key));
return value;
}
debug('wrap: miss', kf(key));
value = await work();
process.nextTick(async () => {
try {
await hcache.set(key, field, value, expiry);
debug('wrap: set complete');
} catch (err) {
console.error(err);
}
});
return value;
};
+3 -3
View File
@@ -1,5 +1,5 @@
const SettingModel = require('../models/setting');
const hcache = require('./hcache');
const cache = require('./cache');
const errors = require('../errors');
const {dotize} = require('./utils');
@@ -35,7 +35,7 @@ module.exports = class SettingsService {
if (process.env.NODE_ENV === 'production') {
// When in production, wrap the settings retrieval with a cache.
const settings = await hcache.wrap('settings', fields, 60, () => retrieve(fields));
const settings = await cache.h.wrap('settings', fields, 60, () => retrieve(fields));
return new SettingModel(settings);
}
@@ -58,7 +58,7 @@ module.exports = class SettingsService {
});
if (process.env.NODE_ENV === 'production') {
await hcache.del('settings');
await cache.h.invalidate('settings');
}
return updatedSettings;