mirror of
https://github.com/wassname/talk.git
synced 2026-06-28 20:41:01 +08:00
135 lines
2.7 KiB
JavaScript
135 lines
2.7 KiB
JavaScript
const debug = require('debug')('talk:services:kue');
|
|
const redis = require('./redis');
|
|
|
|
module.exports = {};
|
|
|
|
const kue = module.exports.kue = require('kue');
|
|
|
|
// Note that unlike what the name createQueue suggests, it currently returns a
|
|
// singleton Queue instance. So you can configure and use only a single Queue
|
|
// object within your node.js process.
|
|
const Queue = module.exports.queue = kue.createQueue({
|
|
redis: {
|
|
createClientFactory: () => redis.createClient()
|
|
}
|
|
});
|
|
|
|
class Task {
|
|
|
|
constructor({name, attempts = 3, delay = 1000}) {
|
|
this.name = name;
|
|
this.attempts = attempts;
|
|
this.delay = delay;
|
|
}
|
|
|
|
/**
|
|
* Add a new job to the queue.
|
|
*/
|
|
create(data) {
|
|
|
|
debug(`Creating new job for Queue[${this.name}]`);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
let job = Queue
|
|
.create(this.name, data)
|
|
.attempts(this.attempts)
|
|
.delay(this.delay)
|
|
.backoff({type: 'exponential'})
|
|
.save((err) => {
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
|
|
debug(`Job[${job.id}] created on Queue[${this.name}]`);
|
|
|
|
return resolve(job);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Process jobs for the queue.
|
|
*/
|
|
process(callback) {
|
|
return Queue.process(this.name, callback);
|
|
}
|
|
|
|
/**
|
|
* Shutdown running jobs.
|
|
*/
|
|
static shutdown() {
|
|
|
|
debug('Shutting down the Queue');
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
// Shutdown and give the queue 5 seconds to shutdown before we start
|
|
// killing jobs.
|
|
Queue.shutdown(5000, (err) => {
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
|
|
debug('Queue shut down.');
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stores the tasks during testing.
|
|
* @type {Array}
|
|
*/
|
|
const TestQueue = [];
|
|
|
|
/**
|
|
* TestTask is a Task queue that is implemented for when the application is in
|
|
* test mode, and does not send the jobs to redis, instead it queues them in
|
|
* an array which can be inspected.
|
|
*/
|
|
class TestTask {
|
|
|
|
constructor({name}) {
|
|
this.name = name;
|
|
}
|
|
|
|
/**
|
|
* Push the task into the fake queue.
|
|
*/
|
|
create(task) {
|
|
let id = TestQueue.push({
|
|
name: this.name,
|
|
task
|
|
});
|
|
|
|
return Promise.resolve({id});
|
|
}
|
|
|
|
// This is a NO-OP action simply provided to match the Task interface.
|
|
process() { return null; }
|
|
|
|
/**
|
|
* Returns the current tasks for this queue.
|
|
* @return {Array} the tasks in the queue
|
|
*/
|
|
get tasks() {
|
|
return TestQueue
|
|
.filter((testTask) => testTask.name === this.name)
|
|
.map((testTask) => testTask.task);
|
|
}
|
|
|
|
static shutdown() {
|
|
return Task.shutdown();
|
|
}
|
|
|
|
}
|
|
|
|
if (process.env.NODE_ENV === 'test') {
|
|
module.exports.Task = TestTask;
|
|
module.exports.TestQueue = TestQueue;
|
|
} else {
|
|
module.exports.Task = Task;
|
|
}
|