Merge pull request #106 from coralproject/kue-redis-fix

Moved kue redis connector
This commit is contained in:
Wyatt Johnson
2016-11-28 14:14:15 -07:00
committed by GitHub
6 changed files with 55 additions and 39 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ const session_opts = {
},
store: new RedisStore({
ttl: 1800,
client: redis,
client: redis.createClient(),
})
};
+5 -3
View File
@@ -1,6 +1,8 @@
const redis = require('./redis');
const cache = module.exports = {};
const cache = module.exports = {
client: redis.createClient()
};
/**
* This collects a key that may either be an array or a string and creates a
@@ -51,7 +53,7 @@ cache.wrap = (key, expiry, work) => {
* @return {Promise}
*/
cache.get = (key) => new Promise((resolve, reject) => {
redis.get(keyfunc(key), (err, reply) => {
cache.client.get(keyfunc(key), (err, reply) => {
if (err) {
return reject(err);
}
@@ -87,7 +89,7 @@ cache.set = (key, value, expiry) => new Promise((resolve, reject) => {
// Serialize the value as JSON.
let reply = JSON.stringify(value);
redis.set(keyfunc(key), reply, 'EX', expiry, (err) => {
cache.client.set(keyfunc(key), reply, 'EX', expiry, (err) => {
if (err) {
return reject(err);
}
+11
View File
@@ -0,0 +1,11 @@
const kue = require('kue');
const redis = require('./redis');
module.exports = {
queue: kue.createQueue({
redis: {
createClientFactory: () => redis.createClient()
}
}),
kue
};
+33 -29
View File
@@ -2,38 +2,42 @@ const redis = require('redis');
const debug = require('debug')('talk:redis');
const url = process.env.TALK_REDIS_URL || 'redis://localhost';
const client = redis.createClient(url, {
retry_strategy: function(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
module.exports = {
createClient() {
let client = redis.createClient(url, {
retry_strategy: function(options) {
if (options.error && options.error.code === 'ECONNREFUSED') {
// End reconnecting on a specific error and flush all commands with a individual error
return new Error('The server refused the connection');
}
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting on a specific error and flush all commands with a individual error
return new Error('The server refused the connection');
}
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands with a individual error
return new Error('Retry time exhausted');
}
// End reconnecting after a specific timeout and flush all commands with a individual error
return new Error('Retry time exhausted');
}
if (options.times_connected > 10) {
if (options.times_connected > 10) {
// End reconnecting with built in error
return undefined;
}
// End reconnecting with built in error
return undefined;
}
// reconnect after
return Math.max(options.attempt * 100, 3000);
// reconnect after
return Math.max(options.attempt * 100, 3000);
}
});
client.ping((err) => {
if (err) {
console.error('Can\'t ping the redis server!');
throw err;
}
debug('connection established');
});
return client;
}
});
client.ping((err) => {
if (err) {
console.error('Can\'t ping the redis server!');
throw err;
}
debug('connection established');
});
module.exports = client;
};
+1 -1
View File
@@ -15,6 +15,6 @@ router.use('/stream', require('./stream'));
router.use('/user', require('./user'));
// Bind the kue handler to the /kue path.
router.use('/kue', authorization.needed('admin'), require('kue').app);
router.use('/kue', authorization.needed('admin'), require('../../kue').kue.app);
module.exports = router;
+4 -5
View File
@@ -1,5 +1,4 @@
const kue = require('kue');
const queue = kue.createQueue();
const kue = require('../kue');
const debug = require('debug')('talk:services:scraper');
const Asset = require('../models/asset');
const JOB_NAME = 'scraper';
@@ -19,7 +18,7 @@ const scraper = {
return new Promise((resolve, reject) => {
debug(`Creating job for Asset[${asset.id}]`);
let job = queue
let job = kue.queue
.create(JOB_NAME, {
title: `Scrape for asset ${asset.id}`,
asset_id: asset.id
@@ -72,7 +71,7 @@ const scraper = {
debug(`Now processing ${JOB_NAME} jobs`);
// Process jobs with the processJob function.
queue.process(JOB_NAME, (job, done) => {
kue.queue.process(JOB_NAME, (job, done) => {
debug(`Starting on Job[${job.id}] for Asset[${job.data.asset_id}]`);
@@ -123,7 +122,7 @@ const scraper = {
// Shutdown and give the queue 5 seconds to shutdown before we start
// killing jobs.
queue.shutdown(5000, (err) => {
kue.queue.shutdown(5000, (err) => {
if (err) {
return reject(err);
}