From a54a240ae61a3fc205dc5fb2a9075cee35b72741 Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Thu, 27 Jul 2017 15:14:11 +1000 Subject: [PATCH] fixed bug with job init --- app.js | 4 +++- graph/subscriptions.js | 4 +++- routes/api/index.js | 11 ---------- secrets.js | 4 ++-- services/kue.js | 47 +++++++++++++++++------------------------- services/mailer.js | 9 +++----- services/pubsub.js | 23 ++++++++++++++------- services/scraper.js | 9 +++----- 8 files changed, 48 insertions(+), 63 deletions(-) diff --git a/app.js b/app.js index a6b669b0e..108ad2e11 100644 --- a/app.js +++ b/app.js @@ -96,12 +96,14 @@ app.use(passport.initialize()); // (if present) the JWT on the request. app.use('/api', authentication); +const pubsubClient = pubsub.createClientFactory(); + // To handle dependancy injection safer, we inject the pubsub handle onto the // request object. app.use('/api', (req, res, next) => { // Attach the pubsub handle to the requests. - req.pubsub = pubsub.createClient(); + req.pubsub = pubsubClient(); // Forward on the request. next(); diff --git a/graph/subscriptions.js b/graph/subscriptions.js index b8b4f2c0f..adac849e0 100644 --- a/graph/subscriptions.js +++ b/graph/subscriptions.js @@ -127,13 +127,15 @@ const setupFunctions = plugins.get('server', 'setupFunctions').reduce((acc, {plu }), }); +const pubsubClient = pubsub.createClientFactory(); + /** * This creates a new subscription manager. */ const createSubscriptionManager = (server) => new SubscriptionServer({ subscriptionManager: new SubscriptionManager({ schema, - pubsub: pubsub.createClient(), + pubsub: pubsubClient(), setupFunctions, }), onConnect: ({token}, connection) => { diff --git a/routes/api/index.js b/routes/api/index.js index a3bc552f6..eccd8d435 100644 --- a/routes/api/index.js +++ b/routes/api/index.js @@ -1,9 +1,6 @@ const express = require('express'); const authorization = require('../../middleware/authorization'); const pkg = require('../../package.json'); -const { - WEBPACK -} = require('../../config'); const router = express.Router(); @@ -18,12 +15,4 @@ router.use('/users', require('./users')); router.use('/account', require('./account')); router.use('/setup', require('./setup')); -// Enable the kue app only if we aren't in webpack mode. -if (!WEBPACK) { - - // Bind the kue handler to the /kue path. - router.use('/kue', authorization.needed('ADMIN'), require('../../services/kue').kue.app); - -} - module.exports = router; diff --git a/secrets.js b/secrets.js index ee484a420..14b5fc922 100644 --- a/secrets.js +++ b/secrets.js @@ -34,7 +34,7 @@ if (JWT_SECRETS) { return new jwt.AsymmetricSecret(secret, JWT_ALG); })); - debug(`loaded ${JWT_SECRET.length} secrets`); + debug(`loaded ${JWT_SECRET.length} ${JWT_ALG.startsWith('HS') ? 'shared' : 'asymmetric'} secrets`); } else if (JWT_SECRET) { if (JWT_ALG.startsWith('HS')) { module.exports.jwt = new jwt.SharedSecret({ @@ -44,5 +44,5 @@ if (JWT_SECRETS) { module.exports.jwt = new jwt.AsymmetricSecret(JSON.parse(JWT_SECRET), JWT_ALG); } - debug('loaded 1 secret'); + debug(`loaded a ${JWT_ALG.startsWith('HS') ? 'shared' : 'asymmetric'} secret`); } diff --git a/services/kue.js b/services/kue.js index 1b3fb605a..0fa610faa 100644 --- a/services/kue.js +++ b/services/kue.js @@ -3,12 +3,26 @@ const redis = require('./redis'); module.exports = {}; -const kue = module.exports.kue = require('kue'); +const kue = require('kue'); // Note that unlike what the name createQueue suggests, it currently returns a // singleton Queue instance. So you can configure and use only a single Queue // object within your node.js process. -let Queue = module.exports.queue = null; +let queue = null; +const getQueue = () => { + if (queue) { + return queue; + } + + debug('init the queue'); + queue = kue.createQueue({ + redis: { + createClientFactory: () => redis.createClient() + } + }); + + return queue; +}; class Task { @@ -18,14 +32,6 @@ class Task { this.name = name; this.attempts = attempts; this.delay = delay; - - if (!Queue) { - module.exports.queue = Queue = kue.createQueue({ - redis: { - createClientFactory: redis.createClientFactory() - } - }); - } } /** @@ -36,7 +42,7 @@ class Task { debug(`Creating new job for Queue[${this.name}]`); return new Promise((resolve, reject) => { - let job = Queue + let job = getQueue() .create(this.name, data) .attempts(this.attempts) .delay(this.delay) @@ -57,7 +63,7 @@ class Task { * Process jobs for the queue. */ process(callback) { - return Queue.process(this.name, callback); + return getQueue().process(this.name, callback); } /** @@ -71,7 +77,7 @@ class Task { // Shutdown and give the queue 5 seconds to shutdown before we start // killing jobs. - Queue.shutdown(5000, (err) => { + getQueue().shutdown(5000, (err) => { if (err) { return reject(err); } @@ -139,18 +145,3 @@ if (process.env.NODE_ENV === 'test') { module.exports.Task = Task; } -module.exports.createTaskFactory = () => { - let taskInstance = null; - - return (options) => { - if (taskInstance) { - return taskInstance; - } - - options = Object.assign({}, options); - - taskInstance = new module.exports.Task(options); - - return taskInstance; - }; -}; diff --git a/services/mailer.js b/services/mailer.js index 7212979df..27672a35c 100644 --- a/services/mailer.js +++ b/services/mailer.js @@ -1,7 +1,6 @@ const debug = require('debug')('talk:services:mailer'); const nodemailer = require('nodemailer'); const kue = require('./kue'); -const taskFactory = kue.createTaskFactory(); const path = require('path'); const fs = require('fs'); const _ = require('lodash'); @@ -76,11 +75,9 @@ const mailer = module.exports = { /** * Create the new Task kue. */ - get task() { - return taskFactory({ - name: 'mailer' - }); - }, + task: new kue.Task({ + name: 'mailer' + }), sendSimple({template, locals, to, subject}) { diff --git a/services/pubsub.js b/services/pubsub.js index 56e0f3e4d..780aeba16 100644 --- a/services/pubsub.js +++ b/services/pubsub.js @@ -2,15 +2,22 @@ const {RedisPubSub} = require('graphql-redis-subscriptions'); const {connectionOptions} = require('./redis'); -let pubsubInstance = null; -module.exports = { - createClient: () => { - if (pubsubInstance) { - return pubsubInstance; +const createClient = () => new RedisPubSub({connection: connectionOptions}); + +const createClientFactory = () => { + let ins = null; + return () => { + if (ins) { + return ins; } - pubsubInstance = new RedisPubSub({connection: connectionOptions}); + ins = createClient(); - return pubsubInstance; - } + return ins; + }; +}; + +module.exports = { + createClient, + createClientFactory }; diff --git a/services/scraper.js b/services/scraper.js index 40aac29ad..3d9a2c894 100644 --- a/services/scraper.js +++ b/services/scraper.js @@ -1,5 +1,4 @@ const kue = require('./kue'); -const taskFactory = kue.createTaskFactory(); const debug = require('debug')('talk:services:scraper'); const AssetModel = require('../models/asset'); const AssetsService = require('./assets'); @@ -15,11 +14,9 @@ const scraper = { /** * Create the new Task kue singleton. */ - get task() { - return taskFactory({ - name: 'scraper' - }); - }, + task: new kue.Task({ + name: 'scraper' + }), /** * Creates a new scraper job and scrapes the url when it gets processed.