mirror of
https://github.com/wassname/talk.git
synced 2026-07-02 23:22:24 +08:00
Merge pull request #805 from coralproject/redis-fixes
Fixed bug with Kue
This commit is contained in:
@@ -96,12 +96,14 @@ app.use(passport.initialize());
|
||||
// (if present) the JWT on the request.
|
||||
app.use('/api', authentication);
|
||||
|
||||
const pubsubClient = pubsub.createClientFactory();
|
||||
|
||||
// To handle dependancy injection safer, we inject the pubsub handle onto the
|
||||
// request object.
|
||||
app.use('/api', (req, res, next) => {
|
||||
|
||||
// Attach the pubsub handle to the requests.
|
||||
req.pubsub = pubsub.createClient();
|
||||
req.pubsub = pubsubClient();
|
||||
|
||||
// Forward on the request.
|
||||
next();
|
||||
|
||||
@@ -127,13 +127,15 @@ const setupFunctions = plugins.get('server', 'setupFunctions').reduce((acc, {plu
|
||||
}),
|
||||
});
|
||||
|
||||
const pubsubClient = pubsub.createClientFactory();
|
||||
|
||||
/**
|
||||
* This creates a new subscription manager.
|
||||
*/
|
||||
const createSubscriptionManager = (server) => new SubscriptionServer({
|
||||
subscriptionManager: new SubscriptionManager({
|
||||
schema,
|
||||
pubsub: pubsub.createClient(),
|
||||
pubsub: pubsubClient(),
|
||||
setupFunctions,
|
||||
}),
|
||||
onConnect: ({token}, connection) => {
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
const express = require('express');
|
||||
const authorization = require('../../middleware/authorization');
|
||||
const pkg = require('../../package.json');
|
||||
const {
|
||||
WEBPACK
|
||||
} = require('../../config');
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
@@ -18,12 +15,4 @@ router.use('/users', require('./users'));
|
||||
router.use('/account', require('./account'));
|
||||
router.use('/setup', require('./setup'));
|
||||
|
||||
// Enable the kue app only if we aren't in webpack mode.
|
||||
if (!WEBPACK) {
|
||||
|
||||
// Bind the kue handler to the /kue path.
|
||||
router.use('/kue', authorization.needed('ADMIN'), require('../../services/kue').kue.app);
|
||||
|
||||
}
|
||||
|
||||
module.exports = router;
|
||||
|
||||
+2
-2
@@ -34,7 +34,7 @@ if (JWT_SECRETS) {
|
||||
return new jwt.AsymmetricSecret(secret, JWT_ALG);
|
||||
}));
|
||||
|
||||
debug(`loaded ${JWT_SECRET.length} secrets`);
|
||||
debug(`loaded ${JWT_SECRET.length} ${JWT_ALG.startsWith('HS') ? 'shared' : 'asymmetric'} secrets`);
|
||||
} else if (JWT_SECRET) {
|
||||
if (JWT_ALG.startsWith('HS')) {
|
||||
module.exports.jwt = new jwt.SharedSecret({
|
||||
@@ -44,5 +44,5 @@ if (JWT_SECRETS) {
|
||||
module.exports.jwt = new jwt.AsymmetricSecret(JSON.parse(JWT_SECRET), JWT_ALG);
|
||||
}
|
||||
|
||||
debug('loaded 1 secret');
|
||||
debug(`loaded a ${JWT_ALG.startsWith('HS') ? 'shared' : 'asymmetric'} secret`);
|
||||
}
|
||||
|
||||
+19
-28
@@ -3,12 +3,26 @@ const redis = require('./redis');
|
||||
|
||||
module.exports = {};
|
||||
|
||||
const kue = module.exports.kue = require('kue');
|
||||
const kue = require('kue');
|
||||
|
||||
// Note that unlike what the name createQueue suggests, it currently returns a
|
||||
// singleton Queue instance. So you can configure and use only a single Queue
|
||||
// object within your node.js process.
|
||||
let Queue = module.exports.queue = null;
|
||||
let queue = null;
|
||||
const getQueue = () => {
|
||||
if (queue) {
|
||||
return queue;
|
||||
}
|
||||
|
||||
debug('init the queue');
|
||||
queue = kue.createQueue({
|
||||
redis: {
|
||||
createClientFactory: () => redis.createClient()
|
||||
}
|
||||
});
|
||||
|
||||
return queue;
|
||||
};
|
||||
|
||||
class Task {
|
||||
|
||||
@@ -18,14 +32,6 @@ class Task {
|
||||
this.name = name;
|
||||
this.attempts = attempts;
|
||||
this.delay = delay;
|
||||
|
||||
if (!Queue) {
|
||||
module.exports.queue = Queue = kue.createQueue({
|
||||
redis: {
|
||||
createClientFactory: redis.createClientFactory()
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -36,7 +42,7 @@ class Task {
|
||||
debug(`Creating new job for Queue[${this.name}]`);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let job = Queue
|
||||
let job = getQueue()
|
||||
.create(this.name, data)
|
||||
.attempts(this.attempts)
|
||||
.delay(this.delay)
|
||||
@@ -57,7 +63,7 @@ class Task {
|
||||
* Process jobs for the queue.
|
||||
*/
|
||||
process(callback) {
|
||||
return Queue.process(this.name, callback);
|
||||
return getQueue().process(this.name, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -71,7 +77,7 @@ class Task {
|
||||
|
||||
// Shutdown and give the queue 5 seconds to shutdown before we start
|
||||
// killing jobs.
|
||||
Queue.shutdown(5000, (err) => {
|
||||
getQueue().shutdown(5000, (err) => {
|
||||
if (err) {
|
||||
return reject(err);
|
||||
}
|
||||
@@ -139,18 +145,3 @@ if (process.env.NODE_ENV === 'test') {
|
||||
module.exports.Task = Task;
|
||||
}
|
||||
|
||||
module.exports.createTaskFactory = () => {
|
||||
let taskInstance = null;
|
||||
|
||||
return (options) => {
|
||||
if (taskInstance) {
|
||||
return taskInstance;
|
||||
}
|
||||
|
||||
options = Object.assign({}, options);
|
||||
|
||||
taskInstance = new module.exports.Task(options);
|
||||
|
||||
return taskInstance;
|
||||
};
|
||||
};
|
||||
|
||||
+3
-6
@@ -1,7 +1,6 @@
|
||||
const debug = require('debug')('talk:services:mailer');
|
||||
const nodemailer = require('nodemailer');
|
||||
const kue = require('./kue');
|
||||
const taskFactory = kue.createTaskFactory();
|
||||
const path = require('path');
|
||||
const fs = require('fs');
|
||||
const _ = require('lodash');
|
||||
@@ -76,11 +75,9 @@ const mailer = module.exports = {
|
||||
/**
|
||||
* Create the new Task kue.
|
||||
*/
|
||||
get task() {
|
||||
return taskFactory({
|
||||
name: 'mailer'
|
||||
});
|
||||
},
|
||||
task: new kue.Task({
|
||||
name: 'mailer'
|
||||
}),
|
||||
|
||||
sendSimple({template, locals, to, subject}) {
|
||||
|
||||
|
||||
+15
-8
@@ -2,15 +2,22 @@ const {RedisPubSub} = require('graphql-redis-subscriptions');
|
||||
|
||||
const {connectionOptions} = require('./redis');
|
||||
|
||||
let pubsubInstance = null;
|
||||
module.exports = {
|
||||
createClient: () => {
|
||||
if (pubsubInstance) {
|
||||
return pubsubInstance;
|
||||
const createClient = () => new RedisPubSub({connection: connectionOptions});
|
||||
|
||||
const createClientFactory = () => {
|
||||
let ins = null;
|
||||
return () => {
|
||||
if (ins) {
|
||||
return ins;
|
||||
}
|
||||
|
||||
pubsubInstance = new RedisPubSub({connection: connectionOptions});
|
||||
ins = createClient();
|
||||
|
||||
return pubsubInstance;
|
||||
}
|
||||
return ins;
|
||||
};
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
createClient,
|
||||
createClientFactory
|
||||
};
|
||||
|
||||
+3
-6
@@ -1,5 +1,4 @@
|
||||
const kue = require('./kue');
|
||||
const taskFactory = kue.createTaskFactory();
|
||||
const debug = require('debug')('talk:services:scraper');
|
||||
const AssetModel = require('../models/asset');
|
||||
const AssetsService = require('./assets');
|
||||
@@ -15,11 +14,9 @@ const scraper = {
|
||||
/**
|
||||
* Create the new Task kue singleton.
|
||||
*/
|
||||
get task() {
|
||||
return taskFactory({
|
||||
name: 'scraper'
|
||||
});
|
||||
},
|
||||
task: new kue.Task({
|
||||
name: 'scraper'
|
||||
}),
|
||||
|
||||
/**
|
||||
* Creates a new scraper job and scrapes the url when it gets processed.
|
||||
|
||||
Reference in New Issue
Block a user