Merge pull request #1184 from coralproject/jobs-stream-fix

Jobs + Pending Box Fixes
This commit is contained in:
Kim Gardner
2017-11-30 17:06:56 +00:00
committed by GitHub
3 changed files with 60 additions and 25 deletions
+44 -23
View File
@@ -12,7 +12,7 @@ const mongoose = require('../services/mongoose');
const kue = require('../services/kue');
util.onshutdown([
() => mongoose.disconnect()
() => mongoose.disconnect(),
]);
/**
@@ -20,17 +20,17 @@ util.onshutdown([
*/
function processJobs() {
// Start the scraper processor.
scraper.process();
// Start the mail processor.
mailer.process();
// The scraper only needs to shutdown when the scraper has actually been
// started.
util.onshutdown([
() => kue.Task.shutdown()
]);
// Start the scraper processor.
scraper.process();
// Start the mail processor.
mailer.process();
}
/**
@@ -48,22 +48,13 @@ function removeJob(job) {
}));
}
/**
* Removes the jobs passed in and returns a promise.
* @param {Array} jobs array of jobs
* @return {Promise}
*/
function removeJobs(jobs) {
return Promise.all(jobs.map(removeJob));
}
/**
* Get the top n jobs with a specific state.
* @param {String} [state='complete'] state to list jobs by
* @param {Number} limit limit of jobs to load
* @return {Promise}
*/
function rangeJobsByState(state = 'complete', limit) {
function rangeJobsByState(state, limit) {
return new Promise((resolve, reject) => {
kue.Job.rangeByState(state, 0, limit, 'asc', (err, jobs) => {
if (err) {
@@ -75,22 +66,52 @@ function rangeJobsByState(state = 'complete', limit) {
});
}
async function getJobBatch(n, includeStuck) {
let jobs = [];
jobs = await rangeJobsByState('complete', n);
if (includeStuck) {
jobs = jobs.concat(await rangeJobsByState('failed', n));
}
return jobs;
}
/**
* Cleans up the jobs that are in the queue.
*/
async function cleanupJobs(options) {
// The scraper only needs to shutdown when the scraper has actually been
// started.
util.onshutdown([
() => kue.Task.shutdown()
]);
const n = 100;
try {
const joblists = await Promise.all([
rangeJobsByState('complete', n),
options.stuck ? rangeJobsByState('failed', n) : false
]);
await joblists.filter((jobs) => jobs).map(removeJobs);
// Connect to redis by establishing a queue.
kue.Task.connect();
let jobCount = 0;
let jobs = await getJobBatch(n, options.stuck);
while (jobs.length > 0) {
// Remove all the jobs.
await Promise.all(jobs.map((job) => removeJob(job)));
jobCount += jobs.length;
// Get the next batch of jobs.
jobs = await getJobBatch(n, options.stuck);
}
util.shutdown();
console.log('Removed old jobs');
console.log(`Removed ${jobCount} jobs`);
} catch (err) {
console.error(err);
util.shutdown(1);
@@ -224,14 +224,13 @@ class Stream extends React.Component {
const open = !asset.isClosed;
const banned = user && user.status === 'BANNED';
const pending = user && user.status === 'PENDING';
const temporarilySuspended =
user &&
user.suspension.until &&
new Date(user.suspension.until) > new Date();
const showCommentBox = loggedIn && ((!banned && !pending & !temporarilySuspended && !highlightedComment) || keepCommentBox);
const showCommentBox = loggedIn && ((!banned && !temporarilySuspended && !highlightedComment) || keepCommentBox);
const slotProps = {data};
const slotQueryData = {root, asset};
+15
View File
@@ -21,6 +21,9 @@ const getQueue = () => {
}
});
// Watch for stuck jobs to manage.
queue.watchStuckJobs(1000);
return queue;
};
@@ -47,6 +50,7 @@ class Task {
.attempts(this.attempts)
.delay(this.delay)
.backoff({type: 'exponential'})
.removeOnComplete(true)
.save((err) => {
if (err) {
return reject(err);
@@ -66,6 +70,15 @@ class Task {
return getQueue().process(this.name, callback);
}
/**
* Connect to redis now by getting the queue.
*/
static connect() {
// Force setup the redis connection for kue.
getQueue();
}
/**
* Shutdown running jobs.
*/
@@ -145,3 +158,5 @@ if (process.env.NODE_ENV === 'test') {
module.exports.Task = Task;
}
// Add the job reference to the exported params.
module.exports.Job = kue.Job;