Merge branch 'master' into docs-plugin-tutorial-part-1

This commit is contained in:
Wyatt Johnson
2018-03-14 09:19:16 -06:00
committed by GitHub
8 changed files with 59 additions and 37 deletions
+7 -1
View File
@@ -10,6 +10,12 @@ const serve = require('../serve');
program
.option('-j, --jobs', 'enable job processing on this thread')
.option(
'--disabled-jobs <jobs>',
'disable jobs specified if the -j option is passed, specified as a comma separated list',
val => val.split(','),
[]
)
.option(
'-w, --websockets',
'enable the websocket (subscriptions) handler on this thread'
@@ -17,7 +23,7 @@ program
.parse(process.argv);
// Start serving.
serve({ jobs: program.jobs, websockets: program.websockets }).catch(err => {
serve(program).catch(err => {
console.error(err);
util.shutdown(1);
});
+5 -8
View File
@@ -2,6 +2,7 @@
require('../services/env');
const debug = require('debug')('talk:util');
const { uniq } = require('lodash');
const util = (module.exports = {});
@@ -23,11 +24,7 @@ util.shutdown = (defaultCode = 0, signal = null) => {
debug(`${util.toshutdown.length} jobs now being called`);
Promise.all(
util.toshutdown
.map(func => (func ? func(signal) : null))
.filter(func => func)
)
Promise.all(util.toshutdown.map(func => (func ? func(signal) : null)))
.then(() => {
debug('Shutdown complete, now exiting');
process.exit(defaultCode);
@@ -49,14 +46,14 @@ util.onshutdown = jobs => {
debug(`${jobs.length} jobs registered to be called during shutdown`);
// Add the new jobs to shutdown to the object reference.
util.toshutdown = util.toshutdown.concat(jobs);
util.toshutdown = uniq(util.toshutdown.concat(jobs));
};
// Attach to the SIGTERM + SIGINT handles to ensure a clean shutdown in the
// event that we have an external event. SIGUSR2 is called when the app is asked
// to be 'killed', same procedure here.
process.on('SIGTERM', () => util.shutdown(0, 'SIGTERM'));
process.on('SIGINT', () => util.shutdown(0, 'SIGINT'));
process.once('SIGTERM', () => util.shutdown(0, 'SIGTERM'));
process.once('SIGINT', () => util.shutdown(0, 'SIGINT'));
process.once('SIGUSR2', () => util.shutdown(0, 'SIGUSR2'));
// Makes the script crash on unhandled rejections instead of silently
+17 -2
View File
@@ -1,5 +1,20 @@
const jobs = [require('./mailer'), require('./scraper')];
const mailer = require('./mailer');
const scraper = require('./scraper');
const { createLogger } = require('../services/logging');
const logger = createLogger('jobs');
const process = () => jobs.forEach(job => job());
const jobs = { mailer, scraper };
const process = (...disabledJobs) =>
Object.entries(jobs).forEach(([taskName, taskFnc]) => {
if (disabledJobs.includes(taskName)) {
logger.info({ taskName }, 'Not starting job, disabled');
return;
}
logger.info({ taskName }, 'Starting job');
taskFnc();
});
module.exports = { process };
+9 -9
View File
@@ -1,6 +1,7 @@
const { task } = require('../services/mailer');
const nodemailer = require('nodemailer');
const debug = require('debug')('talk:jobs:mailer');
const { createLogger } = require('../services/logging');
const logger = createLogger('jobs:mailer');
const Context = require('../graph/context');
const { get } = require('lodash');
@@ -112,16 +113,17 @@ const processJob = transport => async ({ id, data }, done) => {
// Get the email address from the job data.
message.to = await getEmailAddress(data);
debug(`Starting to send mail for Job[${id}]`);
const log = logger.child({ jobID: id });
log.info('Starting to send mail');
// Actually send the email.
transport.sendMail(message, err => {
if (err) {
debug(`Failed to send mail for Job[${id}]:`, err);
logger.error({ err }, 'Failed to send mail');
return done(err);
}
debug(`Finished sending mail for Job[${id}]`);
logger.info('Finished sending mail');
return done();
});
};
@@ -133,15 +135,13 @@ module.exports = () => {
// Get a transport.
const transport = getTransport();
if (transport === null) {
console.warn(
new Error(
'sending email is not enabled because required configuration is not available'
)
logger.warn(
'Sending email is not enabled because required configuration is not available'
);
return;
}
debug(`Now processing ${task.name} jobs`);
logger.info({ taskName: task.name }, 'Now processing jobs');
return task.process(processJob(transport));
};
@@ -165,7 +165,6 @@ class NotificationManager {
);
const flattenedDigestCategories = this.flattenDigests(ctx, digests);
console.log(JSON.stringify(flattenedDigestCategories));
// Get all the notifications together.
const allMessages = await renderDigestMessage(
@@ -7,6 +7,6 @@
"author": "The Coral Project Team <coral@mozillafoundation.org>",
"license": "Apache-2.0",
"dependencies": {
"pell": "^0.7.0"
"pell": "^1.0.1"
}
}
+17 -12
View File
@@ -1,5 +1,4 @@
const app = require('./app');
const debug = require('debug')('talk:cli:serve');
const errors = require('./errors');
const { createServer } = require('http');
const jobs = require('./jobs');
@@ -12,6 +11,8 @@ const cache = require('./services/cache');
const util = require('./bin/util');
const { createSubscriptionManager } = require('./graph/subscriptions');
const { PORT } = require('./config');
const { createLogger } = require('./services/logging');
const logger = createLogger('jobs');
const port = normalizePort(PORT);
@@ -70,15 +71,17 @@ function normalizePort(val) {
*/
async function onListening() {
let addr = server.address();
let bind = typeof addr === 'string' ? `pipe ${addr}` : `port ${addr.port}`;
console.log(`API Server Listening on ${bind}`);
logger.info({ port }, 'API server started');
}
/**
* Start the app.
*/
async function serve({ jobs: processJobs = false, websockets = false } = {}) {
async function serve({
jobs: enableJobs = false,
disabledJobs = [],
websockets = false,
} = {}) {
// Run the deferred plugins.
PluginsService.runDeferred();
@@ -91,13 +94,15 @@ async function serve({ jobs: processJobs = false, websockets = false } = {}) {
// just means we don't have to check that the migrations have run.
await SetupService.isAvailable();
debug('setup is currently available, migrations not being checked');
logger.info('Setup is currently available, migrations not being checked');
} catch (e) {
// Check the error.
switch (e) {
case errors.ErrInstallLock:
case errors.ErrSettingsInit:
debug('setup is not currently available, migrations now being checked');
logger.info(
'Setup is not currently available, migrations now being checked'
);
// The error was expected, just continue.
break;
@@ -115,7 +120,7 @@ async function serve({ jobs: processJobs = false, websockets = false } = {}) {
process.exit(1);
}
debug('migrations do not have to be run');
logger.info('Migrations do not have to be run');
}
/**
@@ -127,7 +132,7 @@ async function serve({ jobs: processJobs = false, websockets = false } = {}) {
server.listen(port, () => {
// Mount the websocket server if requested.
if (websockets) {
console.log(`Websocket Server Listening on ${port}`);
logger.info({ port }, 'Websocket server started');
// Mount the subscriptions server on the application server.
createSubscriptionManager(server);
@@ -135,16 +140,16 @@ async function serve({ jobs: processJobs = false, websockets = false } = {}) {
});
// Enable job processing on the thread if enabled.
if (processJobs) {
if (enableJobs) {
// Start the mail processor.
jobs.process();
jobs.process(...disabledJobs);
}
// Define a safe shutdown function to call in the event we need to shutdown
// because the node hooks are below which will interrupt the shutdown process.
// Shutdown the mongoose connection, the app server, and the scraper.
util.onshutdown([
() => (processJobs ? kue.Task.shutdown() : null),
() => (enableJobs ? kue.Task.shutdown() : null),
() => mongoose.disconnect(),
() => server.close(),
]);
+3 -3
View File
@@ -8205,9 +8205,9 @@ pbkdf2@^3.0.3:
safe-buffer "^5.0.1"
sha.js "^2.4.8"
pell@^0.7.0:
version "0.7.0"
resolved "https://registry.yarnpkg.com/pell/-/pell-0.7.0.tgz#46b3fcdfa8dd7e5999f73c550a337ecc80193dcc"
pell@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/pell/-/pell-1.0.1.tgz#8f1e97165001024e5f371e0ce0b329457c847b5d"
pend@~1.2.0:
version "1.2.0"