Skip to content

Commit 56888f0

Browse files
author
Gareth Jones
committed
fix: getting shutdown to work sensibly
1 parent 2296fb2 commit 56888f0

File tree

6 files changed

+216
-90
lines changed

6 files changed

+216
-90
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Push log events to a [Rabbitmq](https://www.rabbitmq.com/) MQ. Require [log4js-n
66
npm install @log4js-node/rabbitmq
77
```
88

9+
If you want to be sure that all messages have been sent before your programme exits, remember to call `log4js.shutdown(<callback function>)`.
10+
911
## Configuration
1012

1113
* `type` - `@log4js-ndoe/rabbitmq`
@@ -15,11 +17,13 @@ npm install @log4js-node/rabbitmq
1517
* `password` - `string` (optional, defaults to `guest`) - password to use when authenticating connection to rabbitmq
1618
* `routing_key` - `string` (optional, defaults to `logstash`) - rabbitmq message's routing_key
1719
* `durable` - `string` (optional, defaults to false) - will that RabbitMQ lose our queue.
18-
* `exchange` - `string` - rabbitmq send message's exchange
19-
* `mq_type` - `string` - rabbitmq message's mq_type
20+
* `exchange` - `string` (optional, defaults to `log`)- rabbitmq send message's exchange
21+
* `mq_type` - `string` (optional, defaults to `direct`) - rabbitmq message's mq_type
22+
* `vhost` - `string` (optional, defaults to `/`) - vhost to use
2023
* `layout` - `object` (optional, defaults to `messagePassThroughLayout`) - the layout to use for log events (see [layouts](layouts.md)).
24+
* `shutdownTimeout` - `integer` (optional, defaults to `10000`) - maximum time in milliseconds to wait for messages to be sent during log4js shutdown.
2125

22-
The appender will use the Rabbitmq Routing model command to send the log event messages to the channel.
26+
The appender will use the RabbitMQ Routing model command to send the log event messages to the channel.
2327

2428
## Example
2529

lib/index.js

Lines changed: 92 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
'use strict';
22

33
const amqplib = require('amqplib');
4+
const debug = require('debug')('log4js:rabbitmq');
45

56
function rabbitmqAppender(config, layout) {
67
const host = config.host || '127.0.0.1';
78
const port = config.port || 5672;
89
const username = config.username || 'guest';
910
const password = config.password || 'guest';
10-
const exchange = config.exchange || '';
11-
const type = config.mq_type || '';
11+
const exchange = config.exchange || 'log';
12+
const type = config.mq_type || 'direct';
1213
const durable = config.durable || false;
1314
const routingKey = config.routing_key || 'logstash';
15+
const vhost = config.vhost || '/';
16+
const shutdownTimeout = config.shutdownTimeout || 10000;
1417
const con = {
1518
protocol: 'amqp',
1619
hostname: host,
@@ -20,30 +23,106 @@ function rabbitmqAppender(config, layout) {
2023
locale: 'en_US',
2124
frameMax: 0,
2225
heartbeat: 0,
23-
vhost: '/',
26+
vhost: vhost,
2427
routing_key: routingKey,
2528
exchange: exchange,
2629
mq_type: type,
2730
durable: durable,
2831
};
29-
const client = amqplib.connect(con);
30-
const publish = (message) => {
31-
client.then((conn) => {
32-
const rn = conn.createChannel().then((ch) => {
33-
const ok = ch.assertExchange(exchange, type, { durable: durable });
34-
return ok.then(() => {
32+
const messagesToSend = [];
33+
let promisesWaiting = 0;
34+
let waitingToConnect = true;
35+
let connection;
36+
37+
debug('Connecting...');
38+
amqplib.connect(con).then((c) => {
39+
connection = c;
40+
waitingToConnect = false;
41+
debug('Connected.');
42+
}).catch((e) => {
43+
debug('connect failed.');
44+
waitingToConnect = false;
45+
console.error(e); // eslint-disable-line
46+
});
47+
48+
const send = (messages) => {
49+
const rn = connection.createChannel().then((ch) => {
50+
const ok = ch.assertExchange(exchange, type, { durable: durable });
51+
return ok.then(() => {
52+
messages.forEach((message) => {
53+
debug('Sending message.');
3554
ch.publish(exchange, routingKey, Buffer.from(message));
36-
return ch.close();
3755
});
56+
messages.length = 0;
57+
return ch.close();
3858
});
39-
return rn;
40-
}).catch(e => console.error(e)); //eslint-disable-line
59+
});
60+
promisesWaiting += 1;
61+
debug(`Promises waiting: ${promisesWaiting}`);
62+
rn.then(() => {
63+
promisesWaiting -= 1;
64+
debug(`Promise resolved. Waiting is now: ${promisesWaiting}`);
65+
});
66+
};
67+
68+
const publish = (message) => {
69+
if (message) {
70+
messagesToSend.push(message);
71+
debug(`Added message to buffer. Buffer length: ${messagesToSend.length}`);
72+
}
73+
if (!waitingToConnect && connection) {
74+
debug('Sending buffer.');
75+
send(messagesToSend);
76+
}
77+
};
78+
79+
const waitForPromises = (done) => {
80+
let howLongWaiting = 0;
81+
const checker = () => {
82+
howLongWaiting += 100;
83+
debug(`waitingToConnect? ${waitingToConnect}`);
84+
if (messagesToSend.length > 0) {
85+
debug('Messages to send.');
86+
publish();
87+
}
88+
if (howLongWaiting > shutdownTimeout) {
89+
debug(`Done waiting for promises. Waiting: ${promisesWaiting}`);
90+
if (connection) {
91+
connection.close().then(done);
92+
return;
93+
}
94+
done();
95+
return;
96+
}
97+
if (!waitingToConnect && connection) {
98+
if (messagesToSend.length > 0 || promisesWaiting > 0) {
99+
debug('Promises to wait for.');
100+
setTimeout(checker, 100);
101+
return;
102+
}
103+
connection.close().then(done);
104+
return;
105+
}
106+
debug('Nothing to wait for, shutdown now.');
107+
done();
108+
};
109+
setTimeout(checker, 100);
41110
};
42111

43112
const appender = loggingEvent => publish(layout(loggingEvent));
44113

45114
appender.shutdown = function (done) {
46-
client.close().then(done);
115+
debug('Appender shutdown.');
116+
debug(`waitingToConnect: ${waitingToConnect},
117+
messagesToSend: ${messagesToSend},
118+
promisesWaiting: ${promisesWaiting}`);
119+
if (promisesWaiting > 0 || messagesToSend.length > 0) {
120+
debug(`Things to do, will wait up to ${shutdownTimeout}ms.`);
121+
waitForPromises(done);
122+
} else {
123+
debug('Nothing to wait for, shutdown now.');
124+
done();
125+
}
47126
};
48127
return appender;
49128
}

test/fakeAmqpLib.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
const assert = require('assert');
2+
3+
const state = {
4+
params: {},
5+
msgs: [],
6+
msgCallbacks: [],
7+
closed: false
8+
};
9+
10+
const channel = () => ({
11+
assertExchange: (exch, type, opts) => new Promise((resolve) => {
12+
assert.strictEqual(exch, state.params.exchange);
13+
assert.strictEqual(type, state.params.mq_type);
14+
assert.strictEqual(opts.durable, state.params.durable);
15+
resolve();
16+
}),
17+
publish: (exch, key, msg) => {
18+
state.msgs.push(msg);
19+
},
20+
close: () => new Promise((resolve) => {
21+
state.msgCallbacks.push(resolve);
22+
})
23+
});
24+
25+
const channelPromise = () => new Promise((resolve) => {
26+
resolve(channel());
27+
});
28+
29+
const connection = () => ({
30+
createChannel: function () {
31+
return channelPromise();
32+
},
33+
close: function () {
34+
state.closed = true;
35+
return new Promise((resolve) => {
36+
resolve();
37+
});
38+
}
39+
});
40+
41+
const connectionPromise = connectionError => new Promise((resolve, reject) => {
42+
if (connectionError) {
43+
reject(connectionError);
44+
}
45+
resolve(connection());
46+
});
47+
48+
module.exports = (connectionError) => {
49+
state.params = {};
50+
state.msgs = [];
51+
state.msgCallbacks = [];
52+
state.closed = false;
53+
const fakeRabbitmq = {
54+
state,
55+
connect: function (conn) {
56+
state.params = conn;
57+
return connectionPromise(connectionError);
58+
}
59+
};
60+
return fakeRabbitmq;
61+
};

test/integration.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
const debug = require('debug')('test');
2+
const appender = require('../lib').configure({}, {
3+
messagePassThroughLayout: e => e.data[0]
4+
});
5+
6+
debug('sending message');
7+
appender({
8+
data: ['some log message']
9+
});
10+
11+
debug('shutting down');
12+
appender.shutdown(() => {
13+
debug('all done');
14+
});

test/integration.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/usr/bin/env bash
2+
docker run -d -p 5672:5672 --hostname rabbitmq --name rabbitmq rabbitmq:alpine
3+
sleep 10
4+
DEBUG='*' node integration.js
5+
6+
docker stop rabbitmq && docker rm rabbitmq

0 commit comments

Comments
 (0)