From 1b9c2404db489a1512c0fc2c6582649abb6cbbdf Mon Sep 17 00:00:00 2001 From: Wyatt Johnson Date: Wed, 29 Nov 2017 15:57:28 -0700 Subject: [PATCH] fixed job cli, reverted comment box pending beheviour --- bin/cli-jobs | 67 ++++++++++++------- .../src/tabs/stream/components/Stream.js | 3 +- services/kue.js | 15 +++++ 3 files changed, 60 insertions(+), 25 deletions(-) diff --git a/bin/cli-jobs b/bin/cli-jobs index c5aefc518..6048a69db 100755 --- a/bin/cli-jobs +++ b/bin/cli-jobs @@ -12,7 +12,7 @@ const mongoose = require('../services/mongoose'); const kue = require('../services/kue'); util.onshutdown([ - () => mongoose.disconnect() + () => mongoose.disconnect(), ]); /** @@ -20,17 +20,17 @@ util.onshutdown([ */ function processJobs() { - // Start the scraper processor. - scraper.process(); - - // Start the mail processor. - mailer.process(); - // The scraper only needs to shutdown when the scraper has actually been // started. util.onshutdown([ () => kue.Task.shutdown() ]); + + // Start the scraper processor. + scraper.process(); + + // Start the mail processor. + mailer.process(); } /** @@ -48,22 +48,13 @@ function removeJob(job) { })); } -/** - * Removes the jobs passed in and returns a promise. - * @param {Array} jobs array of jobs - * @return {Promise} - */ -function removeJobs(jobs) { - return Promise.all(jobs.map(removeJob)); -} - /** * Get the top n jobs with a specific state. * @param {String} [state='complete'] state to list jobs by * @param {Number} limit limit of jobs to load * @return {Promise} */ -function rangeJobsByState(state = 'complete', limit) { +function rangeJobsByState(state, limit) { return new Promise((resolve, reject) => { kue.Job.rangeByState(state, 0, limit, 'asc', (err, jobs) => { if (err) { @@ -75,22 +66,52 @@ function rangeJobsByState(state = 'complete', limit) { }); } +async function getJobBatch(n, includeStuck) { + let jobs = []; + + jobs = await rangeJobsByState('complete', n); + + if (includeStuck) { + jobs = jobs.concat(await rangeJobsByState('failed', n)); + } + + return jobs; +} + /** * Cleans up the jobs that are in the queue. */ async function cleanupJobs(options) { + + // The scraper only needs to shutdown when the scraper has actually been + // started. + util.onshutdown([ + () => kue.Task.shutdown() + ]); + const n = 100; try { - const joblists = await Promise.all([ - rangeJobsByState('complete', n), - options.stuck ? rangeJobsByState('failed', n) : false - ]); - await joblists.filter((jobs) => jobs).map(removeJobs); + // Connect to redis by establishing a queue. + kue.Task.connect(); + + let jobCount = 0; + let jobs = await getJobBatch(n, options.stuck); + + while (jobs.length > 0) { + + // Remove all the jobs. + await Promise.all(jobs.map((job) => removeJob(job))); + + jobCount += jobs.length; + + // Get the next batch of jobs. + jobs = await getJobBatch(n, options.stuck); + } util.shutdown(); - console.log('Removed old jobs'); + console.log(`Removed ${jobCount} jobs`); } catch (err) { console.error(err); util.shutdown(1); diff --git a/client/coral-embed-stream/src/tabs/stream/components/Stream.js b/client/coral-embed-stream/src/tabs/stream/components/Stream.js index b5f45b8e5..4e09ef86f 100644 --- a/client/coral-embed-stream/src/tabs/stream/components/Stream.js +++ b/client/coral-embed-stream/src/tabs/stream/components/Stream.js @@ -224,14 +224,13 @@ class Stream extends React.Component { const open = !asset.isClosed; const banned = user && user.status === 'BANNED'; - const pending = user && user.status === 'PENDING'; const temporarilySuspended = user && user.suspension.until && new Date(user.suspension.until) > new Date(); - const showCommentBox = loggedIn && ((!banned && !pending & !temporarilySuspended && !highlightedComment) || keepCommentBox); + const showCommentBox = loggedIn && ((!banned && !temporarilySuspended && !highlightedComment) || keepCommentBox); const slotProps = {data}; const slotQueryData = {root, asset}; diff --git a/services/kue.js b/services/kue.js index 0fa610faa..01806632e 100644 --- a/services/kue.js +++ b/services/kue.js @@ -21,6 +21,9 @@ const getQueue = () => { } }); + // Watch for stuck jobs to manage. + queue.watchStuckJobs(1000); + return queue; }; @@ -47,6 +50,7 @@ class Task { .attempts(this.attempts) .delay(this.delay) .backoff({type: 'exponential'}) + .removeOnComplete(true) .save((err) => { if (err) { return reject(err); @@ -66,6 +70,15 @@ class Task { return getQueue().process(this.name, callback); } + /** + * Connect to redis now by getting the queue. + */ + static connect() { + + // Force setup the redis connection for kue. + getQueue(); + } + /** * Shutdown running jobs. */ @@ -145,3 +158,5 @@ if (process.env.NODE_ENV === 'test') { module.exports.Task = Task; } +// Add the job reference to the exported params. +module.exports.Job = kue.Job;