Skip to content

Commit 542c801

Browse files
bgKgaryrussell
authored andcommitted
GH-753: Close transactional producer on error
Fixes #753 Improve exception handling for producer transaction commit / rollback * Close the producer if an exception is thrown while committing / rollbacking a transaction when synchronizing the Kafka transaction with another TransactionManager. * Don't reuse transactional producers if an exception is thrown when committing / rollbacking a transaction. Some of the exceptions are fatal and mean the producer cannot be reused. * Close the transactional producer if an exception occurs when calling beginTransaction when not using DefaultKafkaProducerFactory.
1 parent 4811ea8 commit 542c801

File tree

5 files changed

+212
-22
lines changed

5 files changed

+212
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+26-14
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,6 @@ public void beginTransaction() throws ProducerFencedException {
299299
}
300300
catch (RuntimeException e) {
301301
this.txFailed = true;
302-
logger.error("Illegal transaction state; producer removed from cache; possible cause: "
303-
+ "broker restarted during transaction", e);
304-
try {
305-
this.delegate.close();
306-
}
307-
catch (Exception ee) {
308-
// empty
309-
}
310302
throw e;
311303
}
312304
}
@@ -319,20 +311,40 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
319311

320312
@Override
321313
public void commitTransaction() throws ProducerFencedException {
322-
this.delegate.commitTransaction();
314+
try {
315+
this.delegate.commitTransaction();
316+
}
317+
catch (RuntimeException e) {
318+
this.txFailed = true;
319+
throw e;
320+
}
323321
}
324322

325323
@Override
326324
public void abortTransaction() throws ProducerFencedException {
327-
this.delegate.abortTransaction();
325+
try {
326+
this.delegate.abortTransaction();
327+
}
328+
catch (RuntimeException e) {
329+
this.txFailed = true;
330+
throw e;
331+
}
328332
}
329333

330334
@Override
331335
public void close() {
332-
if (this.cache != null && !this.txFailed) {
333-
synchronized (this) {
334-
if (!this.cache.contains(this)) {
335-
this.cache.offer(this);
336+
if (this.cache != null) {
337+
if (this.txFailed) {
338+
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
339+
+ "broker restarted during transaction");
340+
341+
this.delegate.close();
342+
}
343+
else {
344+
synchronized (this) {
345+
if (!this.cache.contains(this)) {
346+
this.cache.offer(this);
347+
}
336348
}
337349
}
338350
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,15 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
237237
Producer<K, V> producer = this.producers.get();
238238
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
239239
producer = this.producerFactory.createProducer();
240-
producer.beginTransaction();
240+
241+
try {
242+
producer.beginTransaction();
243+
}
244+
catch (Exception e) {
245+
closeProducer(producer, false);
246+
throw e;
247+
}
248+
241249
this.producers.set(producer);
242250
T result = null;
243251
try {

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,15 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
5757
.getResource(producerFactory);
5858
if (resourceHolder == null) {
5959
Producer<K, V> producer = producerFactory.createProducer();
60-
producer.beginTransaction();
60+
61+
try {
62+
producer.beginTransaction();
63+
}
64+
catch (RuntimeException e) {
65+
producer.close();
66+
throw e;
67+
}
68+
6169
resourceHolder = new KafkaResourceHolder<K, V>(producer);
6270
bindResourceToTransaction(resourceHolder, producerFactory);
6371
}
@@ -128,14 +136,17 @@ protected boolean shouldReleaseBeforeCompletion() {
128136

129137
@Override
130138
public void afterCompletion(int status) {
131-
if (status == TransactionSynchronization.STATUS_COMMITTED) {
132-
this.resourceHolder.commit();
139+
try {
140+
if (status == TransactionSynchronization.STATUS_COMMITTED) {
141+
this.resourceHolder.commit();
142+
}
143+
else {
144+
this.resourceHolder.rollback();
145+
}
133146
}
134-
else {
135-
this.resourceHolder.rollback();
147+
finally {
148+
super.afterCompletion(status);
136149
}
137-
138-
super.afterCompletion(status);
139150
}
140151

141152
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

+54
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.clients.consumer.ConsumerRecord;
3838
import org.apache.kafka.clients.consumer.ConsumerRecords;
3939
import org.apache.kafka.clients.producer.Callback;
40+
import org.apache.kafka.clients.producer.MockProducer;
4041
import org.apache.kafka.clients.producer.Producer;
4142
import org.apache.kafka.clients.producer.ProducerConfig;
4243
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -50,6 +51,7 @@
5051
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5152
import org.springframework.context.annotation.Bean;
5253
import org.springframework.context.annotation.Configuration;
54+
import org.springframework.kafka.support.transaction.ResourcelessTransactionManager;
5355
import org.springframework.kafka.test.rule.KafkaEmbedded;
5456
import org.springframework.kafka.test.utils.KafkaTestUtils;
5557
import org.springframework.kafka.transaction.KafkaTransactionManager;
@@ -180,6 +182,58 @@ public void testNoTx() {
180182
.hasMessageContaining("No transaction is in process;");
181183
}
182184

185+
@Test
186+
public void testTransactionSynchronization() {
187+
MockProducer<String, String> producer = new MockProducer<>();
188+
producer.initTransactions();
189+
190+
@SuppressWarnings("unchecked")
191+
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
192+
given(pf.transactionCapable()).willReturn(true);
193+
given(pf.createProducer()).willReturn(producer);
194+
195+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
196+
template.setDefaultTopic(STRING_KEY_TOPIC);
197+
198+
ResourcelessTransactionManager tm = new ResourcelessTransactionManager();
199+
200+
new TransactionTemplate(tm).execute(s -> {
201+
template.sendDefault("foo", "bar");
202+
return null;
203+
});
204+
205+
assertThat(producer.history()).containsExactly(new ProducerRecord<>(STRING_KEY_TOPIC, "foo", "bar"));
206+
assertThat(producer.transactionCommitted()).isTrue();
207+
assertThat(producer.closed()).isTrue();
208+
}
209+
210+
@Test
211+
public void testTransactionSynchronizationExceptionOnCommit() {
212+
MockProducer<String, String> producer = new MockProducer<>();
213+
producer.initTransactions();
214+
215+
@SuppressWarnings("unchecked")
216+
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
217+
given(pf.transactionCapable()).willReturn(true);
218+
given(pf.createProducer()).willReturn(producer);
219+
220+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
221+
template.setDefaultTopic(STRING_KEY_TOPIC);
222+
223+
ResourcelessTransactionManager tm = new ResourcelessTransactionManager();
224+
225+
new TransactionTemplate(tm).execute(s -> {
226+
template.sendDefault("foo", "bar");
227+
228+
// Mark the mock producer as fenced so it throws when committing the transaction
229+
producer.fenceProducer();
230+
return null;
231+
});
232+
233+
assertThat(producer.transactionCommitted()).isFalse();
234+
assertThat(producer.closed()).isTrue();
235+
}
236+
183237
@Configuration
184238
@EnableTransactionManagement
185239
public static class DeclarativeConfig {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2017-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.transaction;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import org.springframework.transaction.TransactionDefinition;
23+
import org.springframework.transaction.TransactionException;
24+
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
25+
import org.springframework.transaction.support.DefaultTransactionStatus;
26+
import org.springframework.transaction.support.TransactionSynchronizationManager;
27+
28+
@SuppressWarnings("serial")
29+
public class ResourcelessTransactionManager extends AbstractPlatformTransactionManager {
30+
31+
@Override
32+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
33+
((ResourcelessTransaction) transaction).begin();
34+
}
35+
36+
@Override
37+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
38+
if (logger.isDebugEnabled()) {
39+
logger.debug("Committing resourceless transaction on [" + status.getTransaction() + "]");
40+
}
41+
}
42+
43+
@Override
44+
protected Object doGetTransaction() throws TransactionException {
45+
Object transaction = new ResourcelessTransaction();
46+
List<Object> resources;
47+
if (!TransactionSynchronizationManager.hasResource(this)) {
48+
resources = new ArrayList<>();
49+
TransactionSynchronizationManager.bindResource(this, resources);
50+
}
51+
else {
52+
@SuppressWarnings("unchecked")
53+
List<Object> stack = (List<Object>) TransactionSynchronizationManager.getResource(this);
54+
resources = stack;
55+
}
56+
resources.add(transaction);
57+
return transaction;
58+
}
59+
60+
@Override
61+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
62+
if (logger.isDebugEnabled()) {
63+
logger.debug("Rolling back resourceless transaction on [" + status.getTransaction() + "]");
64+
}
65+
}
66+
67+
@Override
68+
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
69+
if (TransactionSynchronizationManager.hasResource(this)) {
70+
List<?> stack = (List<?>) TransactionSynchronizationManager.getResource(this);
71+
return stack.size() > 1;
72+
}
73+
return ((ResourcelessTransaction) transaction).isActive();
74+
}
75+
76+
@Override
77+
protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
78+
}
79+
80+
@Override
81+
protected void doCleanupAfterCompletion(Object transaction) {
82+
List<?> resources = (List<?>) TransactionSynchronizationManager.getResource(this);
83+
resources.clear();
84+
TransactionSynchronizationManager.unbindResource(this);
85+
((ResourcelessTransaction) transaction).clear();
86+
}
87+
88+
private static class ResourcelessTransaction {
89+
90+
private boolean active = false;
91+
92+
public boolean isActive() {
93+
return active;
94+
}
95+
96+
public void begin() {
97+
active = true;
98+
}
99+
100+
public void clear() {
101+
active = false;
102+
}
103+
104+
}
105+
}

0 commit comments

Comments
 (0)