diff --git a/graph/subscriptions.js b/graph/subscriptions.js index 59dd6e923..60afc9c45 100644 --- a/graph/subscriptions.js +++ b/graph/subscriptions.js @@ -5,6 +5,7 @@ const debug = require('debug')('talk:graph:subscriptions'); const pubsub = require('../services/pubsub'); const schema = require('./schema'); const Context = require('./context'); +const plugins = require('../services/plugins'); const {deserializeUser} = require('../services/subscriptions'); const setupFunctions = require('./setupFunctions'); @@ -16,16 +17,42 @@ const { const {BASE_PATH} = require('../url'); -const onConnect = ({token}, connection) => { +// Collect all the plugin hooks that should be executed onConnect and +// onDisconnect. +const hooks = plugins.get('server', 'websockets') + .map(({plugin, websockets}) => { + debug(`added websocket hooks ${Object.keys(websockets)} from plugin '${plugin.name}'`); + + return websockets; + }) + .reduce((hooks, {onConnect = null, onDisconnect = null}) => { + if (onConnect) { + hooks.onConnect.push(onConnect); + } + + if (onDisconnect) { + hooks.onDisconnect.push(onDisconnect); + } + + return hooks; + }, { + onConnect: [], + onDisconnect: [], + }); + +const onConnect = async (connectionParams, connection) => { // Attach the token from the connection options if it was provided. - if (token) { + if (connectionParams.token) { debug('token sent via onConnect, attaching to the headers of the upgrade request'); // Attach it to the upgrade request. - connection.upgradeReq.headers['authorization'] = `Bearer ${token}`; + connection.upgradeReq.headers['authorization'] = `Bearer ${connectionParams.token}`; } + + // Call all the hooks. + await Promise.all(hooks.onConnect.map((hook) => hook(connectionParams, connection))); }; const onOperation = (parsedMessage, baseParams, connection) => { @@ -52,6 +79,9 @@ const onOperation = (parsedMessage, baseParams, connection) => { return baseParams; }; +const onDisconnect = (connection) => + Promise.all(hooks.onDisconnect.map((hook) => hook(connection))); + /** * This creates a new subscription manager. */ @@ -62,6 +92,7 @@ const createSubscriptionManager = (server) => new SubscriptionServer({ setupFunctions, }), onConnect, + onDisconnect, onOperation, keepAlive: ms(KEEP_ALIVE) }, { diff --git a/plugins.js b/plugins.js index 487825be7..577289b25 100644 --- a/plugins.js +++ b/plugins.js @@ -57,6 +57,10 @@ const hookSchemas = { resolvers: Joi.object().pattern(/\w/, Joi.object().pattern(/(?:__resolveType|\w+)/, Joi.func())), typeDefs: Joi.string(), schemaLevelResolveFunction: Joi.func(), + websockets: Joi.object({ + onConnect: Joi.func(), + onDisconnect: Joi.func(), + }), }; /**