Skip to content

Commit 3deebe5

Browse files
committed
Added reconnect_backoff_max_ms option to set maximum reconnection time
to exponential reconnection backoff
1 parent 4adfec6 commit 3deebe5

File tree

4 files changed

+58
-33
lines changed

4 files changed

+58
-33
lines changed

CHANGELOG.md

+32-25
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,30 @@
1+
## 9.0.2
2+
- Added `reconnect_backoff_max_ms` option to set maximum reconnection time to exponential reconnection backoff.
3+
[#308](https://github.com/logstash-plugins/logstash-input-kafka/pull/308)
4+
15
## 9.0.0
26
- Removed obsolete `ssl` option
37

48
## 8.3.1
5-
- Added support for kafka property ssl.endpoint.identification.algorithm #302(https://github.com/logstash-plugins/logstash-input-kafka/pull/302)
9+
- Added support for kafka property ssl.endpoint.identification.algorithm
10+
[#302](https://github.com/logstash-plugins/logstash-input-kafka/pull/302)
611

712
## 8.3.0
813
- Changed Kafka client version to 2.1.0
914

1015
## 8.2.1
11-
- Changed Kafka client version to 2.0.1 [#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)
16+
- Changed Kafka client version to 2.0.1
17+
[#295](https://github.com/logstash-plugins/logstash-input-kafka/pull/295)
1218

1319
## 8.2.0
1420
- Upgrade Kafka client to version 2.0.0
1521

1622
## 8.1.2
17-
- Docs: Correct list formatting for `decorate_events`
18-
- Docs: Add kafka default to `partition_assignment_strategy`
23+
- Docs: Correct list formatting for `decorate_events`
24+
- Docs: Add kafka default to `partition_assignment_strategy`
1925

2026
## 8.1.1
21-
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash
27+
- Fix race-condition where shutting down a Kafka Input before it has finished starting could cause Logstash to crash
2228

2329
## 8.1.0
2430
- Internal: Update build to gradle
@@ -115,35 +121,36 @@
115121

116122
## 4.0.0
117123
- Republish all the gems under jruby.
118-
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141
124+
- Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility.
125+
See https://github.com/elastic/logstash/issues/5141
119126
- Support for Kafka 0.9 for LS 5.x
120127

121128
## 3.0.0.beta7
122-
- Fix Log4j warnings by setting up the logger
129+
- Fix Log4j warnings by setting up the logger
123130

124131
## 3.0.0.beta5 and 3.0.0.beta6
125-
- Internal: Use jar dependency
126-
- Fixed issue with snappy compression
132+
- Internal: Use jar dependency
133+
- Fixed issue with snappy compression
127134

128135
## 3.0.0.beta3 and 3.0.0.beta4
129-
- Internal: Update gemspec dependency
136+
- Internal: Update gemspec dependency
130137

131138
## 3.0.0.beta2
132-
- internal: Use jar dependencies library instead of manually downloading jars
133-
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)
139+
- internal: Use jar dependencies library instead of manually downloading jars
140+
- Fixes "java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream" issue (#50)
134141

135142
## 3.0.0.beta2
136-
- Added SSL/TLS connection support to Kafka
137-
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
138-
with inputs because inputs by default are single threaded. This makes it a bad
139-
first user experience. Plain codec is a much better default.
143+
- Added SSL/TLS connection support to Kafka
144+
- Breaking: Changed default codec to plain instead of SSL. Json codec is really slow when used
145+
with inputs because inputs by default are single threaded. This makes it a bad
146+
first user experience. Plain codec is a much better default.
140147

141148
## 3.0.0.beta1
142-
- Refactor to use new Java based consumer, bypassing jruby-kafka
143-
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible
149+
- Refactor to use new Java based consumer, bypassing jruby-kafka
150+
- Breaking: Change configuration to match Kafka's configuration. This version is not backward compatible
144151

145152
## 2.0.7
146-
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.
153+
- Update to jruby-kafka 1.6 which includes Kafka 0.8.2.2 enabling LZ4 decompression.
147154

148155
## 2.0.6
149156
- Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash
@@ -152,13 +159,13 @@
152159
- New dependency requirements for logstash-core for the 5.0 release
153160

154161
## 2.0.4
155-
- Fix safe shutdown while plugin waits on Kafka for new events
156-
- Expose auto_commit_interval_ms to control offset commit frequency
162+
- Fix safe shutdown while plugin waits on Kafka for new events
163+
- Expose auto_commit_interval_ms to control offset commit frequency
157164

158165
## 2.0.3
159-
- Fix infinite loop when no new messages are found in Kafka
166+
- Fix infinite loop when no new messages are found in Kafka
160167

161168
## 2.0.0
162-
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
163-
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
164-
- Dependency on logstash-core update to 2.0
169+
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
170+
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
171+
- Dependency on logstash-core update to 2.0

docs/index.asciidoc

+16-4
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ https://kafka.apache.org/documentation for more details.
102102
| <<plugins-{type}s-{plugin}-partition_assignment_strategy>> |<<string,string>>|No
103103
| <<plugins-{type}s-{plugin}-poll_timeout_ms>> |<<number,number>>|No
104104
| <<plugins-{type}s-{plugin}-receive_buffer_bytes>> |<<string,string>>|No
105-
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<string,string>>|No
105+
| <<plugins-{type}s-{plugin}-reconnect_backoff_ms>> |<<number,number>>|No
106+
| <<plugins-{type}s-{plugin}-reconnect_backoff_max_ms>> |<<number,number>>|No
106107
| <<plugins-{type}s-{plugin}-request_timeout_ms>> |<<string,string>>|No
107108
| <<plugins-{type}s-{plugin}-retry_backoff_ms>> |<<string,string>>|No
108109
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
@@ -394,12 +395,23 @@ The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
394395
[id="plugins-{type}s-{plugin}-reconnect_backoff_ms"]
395396
===== `reconnect_backoff_ms`
396397

397-
* Value type is <<string,string>>
398+
* Value type is <<number,number>>
398399
* There is no default value for this setting.
399400

400-
The amount of time to wait before attempting to reconnect to a given host.
401+
The base amount of time to wait before attempting to reconnect to a given host.
401402
This avoids repeatedly connecting to a host in a tight loop.
402-
This backoff applies to all requests sent by the consumer to the broker.
403+
This backoff applies to all connection attempts by the client to a broker.
404+
405+
[id="plugins-{type}s-{plugin}-reconnect_backoff_max_ms"]
406+
===== `reconnect_backoff_max_ms`
407+
408+
* Value type is <<number,number>>
409+
* There is no default value for this setting.
410+
411+
The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
412+
If provided, the backoff per host will increase exponentially for each consecutive connection failure,
413+
up to this maximum.
414+
After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
403415

404416
[id="plugins-{type}s-{plugin}-request_timeout_ms"]
405417
===== `request_timeout_ms`

lib/logstash/inputs/kafka.rb

+9-3
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,15 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
133133
config :partition_assignment_strategy, :validate => :string
134134
# The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
135135
config :receive_buffer_bytes, :validate => :string
136-
# The amount of time to wait before attempting to reconnect to a given host.
136+
# The base amount of time to wait before attempting to reconnect to a given host.
137137
# This avoids repeatedly connecting to a host in a tight loop.
138-
# This backoff applies to all requests sent by the consumer to the broker.
139-
config :reconnect_backoff_ms, :validate => :string
138+
# This backoff applies to all connection attempts by the client to a broker.
139+
config :reconnect_backoff_ms, :validate => :number
140+
# The maximum amount of time to wait when reconnecting to a broker that has repeatedly failed to connect.
141+
# If provided, the backoff per host will increase exponentially for each consecutive connection failure,
142+
# up to this maximum.
143+
# After calculating the backoff increase, 20% random jitter is added to avoid connection storms.
144+
config :reconnect_backoff_max_ms, :validate => :number
140145
# The configuration controls the maximum amount of time the client will wait
141146
# for the response of a request. If the response is not received before the timeout
142147
# elapses the client will resend the request if necessary or fail the request if
@@ -306,6 +311,7 @@ def create_consumer(client_id)
306311
props.put(kafka::PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition_assignment_strategy) unless partition_assignment_strategy.nil?
307312
props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes) unless receive_buffer_bytes.nil?
308313
props.put(kafka::RECONNECT_BACKOFF_MS_CONFIG, reconnect_backoff_ms) unless reconnect_backoff_ms.nil?
314+
props.put(kafka::RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnect_backoff_max_ms) unless reconnect_backoff_max_ms.nil?
309315
props.put(kafka::REQUEST_TIMEOUT_MS_CONFIG, request_timeout_ms) unless request_timeout_ms.nil?
310316
props.put(kafka::RETRY_BACKOFF_MS_CONFIG, retry_backoff_ms) unless retry_backoff_ms.nil?
311317
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes) unless send_buffer_bytes.nil?

logstash-input-kafka.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-kafka'
3-
s.version = '9.0.0'
3+
s.version = '9.0.2'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Reads events from a Kafka topic"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)