Files
talk/graph/subscriptions/index.js
T
Kim Gardner 197fbc33e0 Bump version 4.6.3 (#1871)
* Bump version 4.6.3

* Update index.js
2018-09-17 15:12:30 -06:00

194 lines
5.1 KiB
JavaScript

const { SubscriptionManager } = require('graphql-subscriptions');
const { SubscriptionServer } = require('subscriptions-transport-ws');
const debug = require('debug')('talk:graph:subscriptions');
const DataLoader = require('dataloader');
const { getPubsub } = require('./pubsub');
const schema = require('../schema');
const Context = require('../context');
const plugins = require('../../services/plugins');
const User = require('../../models/user');
const { singleJoinBy } = require('../loaders/util');
const { deserializeUser } = require('../../services/subscriptions');
const setupFunctions = require('./setupFunctions');
const ms = require('ms');
const { KEEP_ALIVE } = require('../../config');
const { BASE_PATH } = require('../../url');
// Collect all the plugin hooks that should be executed onConnect and
// onDisconnect.
const hooks = plugins
.get('server', 'websockets')
.map(({ plugin, websockets }) => {
debug(
`added websocket hooks ${Object.keys(websockets)} from plugin '${
plugin.name
}'`
);
return websockets;
})
.reduce(
(hooks, { onConnect = null, onDisconnect = null }) => {
if (onConnect) {
hooks.onConnect.push(onConnect);
}
if (onDisconnect) {
hooks.onDisconnect.push(onDisconnect);
}
return hooks;
},
{
onConnect: [],
onDisconnect: [],
}
);
const onConnect = async (connectionParams, connection) => {
// Attach the token from the connection options if it was provided.
if (connectionParams.token) {
debug(
'token sent via onConnect, attaching to the headers of the upgrade request'
);
// Attach it to the upgrade request.
connection.upgradeReq.headers['authorization'] = `Bearer ${
connectionParams.token
}`;
}
try {
// Pull the user off of the upgrade request.
const hydratedRequest = await deserializeUser(connection.upgradeReq);
// Update the connections upgrade request, as we'll use that to verify that
// the user is allowed each operation.
connection.upgradeReq = hydratedRequest;
} catch (err) {
console.error(err);
}
// Call all the hooks.
await Promise.all(
hooks.onConnect.map(hook => hook(connectionParams, connection))
);
};
/**
* batchedUserRefresher will get users based on ID for websocket user refresh
* operations to reduce load related to user refreshing.
*/
const batchedUserRefresher = new DataLoader(
userIDs => {
debug(`OPERATION: refreshing ${userIDs.length} users.`);
return User.find({ id: { $in: userIDs } }).then(
singleJoinBy(userIDs, 'id')
);
},
{
// Disable the cache, as this dataloader is long lived, and the point of
// using this dataloader is to batch refetch operations rather than caching
// then as we normally would.
cache: false,
}
);
const contextGenerator = req => {
// Pull the user(?) off the request.
const { user, jwt } = req;
if (!user || !jwt) {
// There is no valid user on the request, let it continue as is then.
return async () => new Context(req);
}
// Provide a flag that can be used to short circuit invalid requests.
let expiredLogin = false;
async function refreshUser() {
// Check to see if this request has been short circuited.
if (expiredLogin) {
// It has, let's exit here.
return null;
}
// Validate that the JWT for this user has not expired.
const { exp = false } = jwt;
if (exp && exp < Date.now() / 1000) {
// Mark that this token has expired, don't bother performing this syscall
// again to check the time.
expiredLogin = true;
return null;
}
try {
// Let's refresh the user from the database, as they may have changed.
const refreshedUser = await batchedUserRefresher.load(user.id);
if (!refreshedUser) {
return null;
}
return refreshedUser;
} catch (err) {
return null;
}
}
// Return the context builder function that'll use the passed context to
// generate future contexts.
return async () => {
// Refresh the user (potentially null).
const refreshedUser = await refreshUser();
// Attach the refreshedUser to the request.
req.user = refreshedUser;
// Return the new context.
return new Context(req);
};
};
const onOperation = async (parsedMessage, baseParams, connection) => {
// Pull the upgrade request off of the connection.
const upgradeReq = connection.upgradeReq;
// Attach the context handler to the request.
baseParams.context = contextGenerator(upgradeReq);
return baseParams;
};
const onDisconnect = connection =>
Promise.all(hooks.onDisconnect.map(hook => hook(connection)));
/**
* This creates a new subscription manager.
*/
const createSubscriptionManager = server =>
new SubscriptionServer(
{
subscriptionManager: new SubscriptionManager({
schema,
pubsub: getPubsub(),
setupFunctions,
}),
onConnect,
onDisconnect,
onOperation,
keepAlive: ms(KEEP_ALIVE),
},
{
server,
path: `${BASE_PATH}api/v1/live`,
}
);
module.exports = {
createSubscriptionManager,
};