From 616dd534d9cb1f4a7439f7b83c20780c307afd36 Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Mon, 28 Nov 2016 14:05:06 -0700 Subject: [PATCH] Moved kue redis connector --- app.js | 2 +- cache.js | 8 +++--- kue.js | 11 ++++++++ redis.js | 62 ++++++++++++++++++++++++--------------------- routes/api/index.js | 2 +- services/scraper.js | 9 +++---- 6 files changed, 55 insertions(+), 39 deletions(-) create mode 100644 kue.js diff --git a/app.js b/app.js index 8008cb01e..e84eb4b79 100644 --- a/app.js +++ b/app.js @@ -45,7 +45,7 @@ const session_opts = { }, store: new RedisStore({ ttl: 1800, - client: redis, + client: redis.createClient(), }) }; diff --git a/cache.js b/cache.js index efe689f9c..9345f8cde 100644 --- a/cache.js +++ b/cache.js @@ -1,6 +1,8 @@ const redis = require('./redis'); -const cache = module.exports = {}; +const cache = module.exports = { + client: redis.createClient() +}; /** * This collects a key that may either be an array or a string and creates a @@ -51,7 +53,7 @@ cache.wrap = (key, expiry, work) => { * @return {Promise} */ cache.get = (key) => new Promise((resolve, reject) => { - redis.get(keyfunc(key), (err, reply) => { + cache.client.get(keyfunc(key), (err, reply) => { if (err) { return reject(err); } @@ -87,7 +89,7 @@ cache.set = (key, value, expiry) => new Promise((resolve, reject) => { // Serialize the value as JSON. let reply = JSON.stringify(value); - redis.set(keyfunc(key), reply, 'EX', expiry, (err) => { + cache.client.set(keyfunc(key), reply, 'EX', expiry, (err) => { if (err) { return reject(err); } diff --git a/kue.js b/kue.js new file mode 100644 index 000000000..e2229d424 --- /dev/null +++ b/kue.js @@ -0,0 +1,11 @@ +const kue = require('kue'); +const redis = require('./redis'); + +module.exports = { + queue: kue.createQueue({ + redis: { + createClientFactory: () => redis.createClient() + } + }), + kue +}; diff --git a/redis.js b/redis.js index c37fcc64e..9f67c34bb 100644 --- a/redis.js +++ b/redis.js @@ -2,38 +2,42 @@ const redis = require('redis'); const debug = require('debug')('talk:redis'); const url = process.env.TALK_REDIS_URL || 'redis://localhost'; -const client = redis.createClient(url, { - retry_strategy: function(options) { - if (options.error && options.error.code === 'ECONNREFUSED') { +module.exports = { + createClient() { + let client = redis.createClient(url, { + retry_strategy: function(options) { + if (options.error && options.error.code === 'ECONNREFUSED') { - // End reconnecting on a specific error and flush all commands with a individual error - return new Error('The server refused the connection'); - } - if (options.total_retry_time > 1000 * 60 * 60) { + // End reconnecting on a specific error and flush all commands with a individual error + return new Error('The server refused the connection'); + } + if (options.total_retry_time > 1000 * 60 * 60) { - // End reconnecting after a specific timeout and flush all commands with a individual error - return new Error('Retry time exhausted'); - } + // End reconnecting after a specific timeout and flush all commands with a individual error + return new Error('Retry time exhausted'); + } - if (options.times_connected > 10) { + if (options.times_connected > 10) { - // End reconnecting with built in error - return undefined; - } + // End reconnecting with built in error + return undefined; + } - // reconnect after - return Math.max(options.attempt * 100, 3000); + // reconnect after + return Math.max(options.attempt * 100, 3000); + } + }); + + client.ping((err) => { + if (err) { + console.error('Can\'t ping the redis server!'); + + throw err; + } + + debug('connection established'); + }); + + return client; } -}); - -client.ping((err) => { - if (err) { - console.error('Can\'t ping the redis server!'); - - throw err; - } - - debug('connection established'); -}); - -module.exports = client; +}; diff --git a/routes/api/index.js b/routes/api/index.js index 8da3f791b..9b4f0432b 100644 --- a/routes/api/index.js +++ b/routes/api/index.js @@ -15,6 +15,6 @@ router.use('/stream', require('./stream')); router.use('/user', require('./user')); // Bind the kue handler to the /kue path. -router.use('/kue', authorization.needed('admin'), require('kue').app); +router.use('/kue', authorization.needed('admin'), require('../../kue').kue.app); module.exports = router; diff --git a/services/scraper.js b/services/scraper.js index 665d4fc66..922ef77bc 100644 --- a/services/scraper.js +++ b/services/scraper.js @@ -1,5 +1,4 @@ -const kue = require('kue'); -const queue = kue.createQueue(); +const kue = require('../kue'); const debug = require('debug')('talk:services:scraper'); const Asset = require('../models/asset'); const JOB_NAME = 'scraper'; @@ -19,7 +18,7 @@ const scraper = { return new Promise((resolve, reject) => { debug(`Creating job for Asset[${asset.id}]`); - let job = queue + let job = kue.queue .create(JOB_NAME, { title: `Scrape for asset ${asset.id}`, asset_id: asset.id @@ -72,7 +71,7 @@ const scraper = { debug(`Now processing ${JOB_NAME} jobs`); // Process jobs with the processJob function. - queue.process(JOB_NAME, (job, done) => { + kue.queue.process(JOB_NAME, (job, done) => { debug(`Starting on Job[${job.id}] for Asset[${job.data.asset_id}]`); @@ -123,7 +122,7 @@ const scraper = { // Shutdown and give the queue 5 seconds to shutdown before we start // killing jobs. - queue.shutdown(5000, (err) => { + kue.queue.shutdown(5000, (err) => { if (err) { return reject(err); }