mirror of
https://github.com/wassname/talk.git
synced 2026-07-02 08:30:46 +08:00
added subscription hooks
This commit is contained in:
+34
-3
@@ -5,6 +5,7 @@ const debug = require('debug')('talk:graph:subscriptions');
|
||||
const pubsub = require('../services/pubsub');
|
||||
const schema = require('./schema');
|
||||
const Context = require('./context');
|
||||
const plugins = require('../services/plugins');
|
||||
|
||||
const {deserializeUser} = require('../services/subscriptions');
|
||||
const setupFunctions = require('./setupFunctions');
|
||||
@@ -16,16 +17,42 @@ const {
|
||||
|
||||
const {BASE_PATH} = require('../url');
|
||||
|
||||
const onConnect = ({token}, connection) => {
|
||||
// Collect all the plugin hooks that should be executed onConnect and
|
||||
// onDisconnect.
|
||||
const hooks = plugins.get('server', 'websockets')
|
||||
.map(({plugin, websockets}) => {
|
||||
debug(`added websocket hooks ${Object.keys(websockets)} from plugin '${plugin.name}'`);
|
||||
|
||||
return websockets;
|
||||
})
|
||||
.reduce((hooks, {onConnect = null, onDisconnect = null}) => {
|
||||
if (onConnect) {
|
||||
hooks.onConnect.push(onConnect);
|
||||
}
|
||||
|
||||
if (onDisconnect) {
|
||||
hooks.onDisconnect.push(onDisconnect);
|
||||
}
|
||||
|
||||
return hooks;
|
||||
}, {
|
||||
onConnect: [],
|
||||
onDisconnect: [],
|
||||
});
|
||||
|
||||
const onConnect = async (connectionParams, connection) => {
|
||||
|
||||
// Attach the token from the connection options if it was provided.
|
||||
if (token) {
|
||||
if (connectionParams.token) {
|
||||
|
||||
debug('token sent via onConnect, attaching to the headers of the upgrade request');
|
||||
|
||||
// Attach it to the upgrade request.
|
||||
connection.upgradeReq.headers['authorization'] = `Bearer ${token}`;
|
||||
connection.upgradeReq.headers['authorization'] = `Bearer ${connectionParams.token}`;
|
||||
}
|
||||
|
||||
// Call all the hooks.
|
||||
await Promise.all(hooks.onConnect.map((hook) => hook(connectionParams, connection)));
|
||||
};
|
||||
|
||||
const onOperation = (parsedMessage, baseParams, connection) => {
|
||||
@@ -52,6 +79,9 @@ const onOperation = (parsedMessage, baseParams, connection) => {
|
||||
return baseParams;
|
||||
};
|
||||
|
||||
const onDisconnect = (connection) =>
|
||||
Promise.all(hooks.onDisconnect.map((hook) => hook(connection)));
|
||||
|
||||
/**
|
||||
* This creates a new subscription manager.
|
||||
*/
|
||||
@@ -62,6 +92,7 @@ const createSubscriptionManager = (server) => new SubscriptionServer({
|
||||
setupFunctions,
|
||||
}),
|
||||
onConnect,
|
||||
onDisconnect,
|
||||
onOperation,
|
||||
keepAlive: ms(KEEP_ALIVE)
|
||||
}, {
|
||||
|
||||
@@ -57,6 +57,10 @@ const hookSchemas = {
|
||||
resolvers: Joi.object().pattern(/\w/, Joi.object().pattern(/(?:__resolveType|\w+)/, Joi.func())),
|
||||
typeDefs: Joi.string(),
|
||||
schemaLevelResolveFunction: Joi.func(),
|
||||
websockets: Joi.object({
|
||||
onConnect: Joi.func(),
|
||||
onDisconnect: Joi.func(),
|
||||
}),
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user