mirror of
https://github.com/wassname/talk.git
synced 2026-06-28 19:02:05 +08:00
197fbc33e0
* Bump version 4.6.3 * Update index.js
194 lines
5.1 KiB
JavaScript
194 lines
5.1 KiB
JavaScript
const { SubscriptionManager } = require('graphql-subscriptions');
|
|
const { SubscriptionServer } = require('subscriptions-transport-ws');
|
|
const debug = require('debug')('talk:graph:subscriptions');
|
|
const DataLoader = require('dataloader');
|
|
|
|
const { getPubsub } = require('./pubsub');
|
|
const schema = require('../schema');
|
|
const Context = require('../context');
|
|
const plugins = require('../../services/plugins');
|
|
const User = require('../../models/user');
|
|
const { singleJoinBy } = require('../loaders/util');
|
|
|
|
const { deserializeUser } = require('../../services/subscriptions');
|
|
const setupFunctions = require('./setupFunctions');
|
|
|
|
const ms = require('ms');
|
|
const { KEEP_ALIVE } = require('../../config');
|
|
|
|
const { BASE_PATH } = require('../../url');
|
|
|
|
// Collect all the plugin hooks that should be executed onConnect and
|
|
// onDisconnect.
|
|
const hooks = plugins
|
|
.get('server', 'websockets')
|
|
.map(({ plugin, websockets }) => {
|
|
debug(
|
|
`added websocket hooks ${Object.keys(websockets)} from plugin '${
|
|
plugin.name
|
|
}'`
|
|
);
|
|
|
|
return websockets;
|
|
})
|
|
.reduce(
|
|
(hooks, { onConnect = null, onDisconnect = null }) => {
|
|
if (onConnect) {
|
|
hooks.onConnect.push(onConnect);
|
|
}
|
|
|
|
if (onDisconnect) {
|
|
hooks.onDisconnect.push(onDisconnect);
|
|
}
|
|
|
|
return hooks;
|
|
},
|
|
{
|
|
onConnect: [],
|
|
onDisconnect: [],
|
|
}
|
|
);
|
|
|
|
const onConnect = async (connectionParams, connection) => {
|
|
// Attach the token from the connection options if it was provided.
|
|
if (connectionParams.token) {
|
|
debug(
|
|
'token sent via onConnect, attaching to the headers of the upgrade request'
|
|
);
|
|
|
|
// Attach it to the upgrade request.
|
|
connection.upgradeReq.headers['authorization'] = `Bearer ${
|
|
connectionParams.token
|
|
}`;
|
|
}
|
|
|
|
try {
|
|
// Pull the user off of the upgrade request.
|
|
const hydratedRequest = await deserializeUser(connection.upgradeReq);
|
|
|
|
// Update the connections upgrade request, as we'll use that to verify that
|
|
// the user is allowed each operation.
|
|
connection.upgradeReq = hydratedRequest;
|
|
} catch (err) {
|
|
console.error(err);
|
|
}
|
|
|
|
// Call all the hooks.
|
|
await Promise.all(
|
|
hooks.onConnect.map(hook => hook(connectionParams, connection))
|
|
);
|
|
};
|
|
|
|
/**
|
|
* batchedUserRefresher will get users based on ID for websocket user refresh
|
|
* operations to reduce load related to user refreshing.
|
|
*/
|
|
const batchedUserRefresher = new DataLoader(
|
|
userIDs => {
|
|
debug(`OPERATION: refreshing ${userIDs.length} users.`);
|
|
return User.find({ id: { $in: userIDs } }).then(
|
|
singleJoinBy(userIDs, 'id')
|
|
);
|
|
},
|
|
{
|
|
// Disable the cache, as this dataloader is long lived, and the point of
|
|
// using this dataloader is to batch refetch operations rather than caching
|
|
// then as we normally would.
|
|
cache: false,
|
|
}
|
|
);
|
|
|
|
const contextGenerator = req => {
|
|
// Pull the user(?) off the request.
|
|
const { user, jwt } = req;
|
|
|
|
if (!user || !jwt) {
|
|
// There is no valid user on the request, let it continue as is then.
|
|
return async () => new Context(req);
|
|
}
|
|
|
|
// Provide a flag that can be used to short circuit invalid requests.
|
|
let expiredLogin = false;
|
|
|
|
async function refreshUser() {
|
|
// Check to see if this request has been short circuited.
|
|
if (expiredLogin) {
|
|
// It has, let's exit here.
|
|
return null;
|
|
}
|
|
|
|
// Validate that the JWT for this user has not expired.
|
|
const { exp = false } = jwt;
|
|
if (exp && exp < Date.now() / 1000) {
|
|
// Mark that this token has expired, don't bother performing this syscall
|
|
// again to check the time.
|
|
expiredLogin = true;
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
// Let's refresh the user from the database, as they may have changed.
|
|
const refreshedUser = await batchedUserRefresher.load(user.id);
|
|
if (!refreshedUser) {
|
|
return null;
|
|
}
|
|
|
|
return refreshedUser;
|
|
} catch (err) {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// Return the context builder function that'll use the passed context to
|
|
// generate future contexts.
|
|
return async () => {
|
|
// Refresh the user (potentially null).
|
|
const refreshedUser = await refreshUser();
|
|
|
|
// Attach the refreshedUser to the request.
|
|
req.user = refreshedUser;
|
|
|
|
// Return the new context.
|
|
return new Context(req);
|
|
};
|
|
};
|
|
|
|
const onOperation = async (parsedMessage, baseParams, connection) => {
|
|
// Pull the upgrade request off of the connection.
|
|
const upgradeReq = connection.upgradeReq;
|
|
|
|
// Attach the context handler to the request.
|
|
baseParams.context = contextGenerator(upgradeReq);
|
|
|
|
return baseParams;
|
|
};
|
|
|
|
const onDisconnect = connection =>
|
|
Promise.all(hooks.onDisconnect.map(hook => hook(connection)));
|
|
|
|
/**
|
|
* This creates a new subscription manager.
|
|
*/
|
|
const createSubscriptionManager = server =>
|
|
new SubscriptionServer(
|
|
{
|
|
subscriptionManager: new SubscriptionManager({
|
|
schema,
|
|
pubsub: getPubsub(),
|
|
setupFunctions,
|
|
}),
|
|
onConnect,
|
|
onDisconnect,
|
|
onOperation,
|
|
keepAlive: ms(KEEP_ALIVE),
|
|
},
|
|
{
|
|
server,
|
|
path: `${BASE_PATH}api/v1/live`,
|
|
}
|
|
);
|
|
|
|
module.exports = {
|
|
createSubscriptionManager,
|
|
};
|