Skip to content

Commit c17a487

Browse files
committedOct 12, 2019
Producer: add generic class producer
1 parent a27f85f commit c17a487

File tree

2 files changed

+70
-24
lines changed

2 files changed

+70
-24
lines changed
 

‎Producer/src/main/java/com/telkomdev/producer/App.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,9 @@
2020
import com.telkomdev.producer.serializer.ProductAvroSerializer;
2121
import com.telkomdev.producer.serializer.ProductJsonSerializer;
2222
import com.telkomdev.producer.serializer.ProductProtobufSerializer;
23-
import org.apache.kafka.clients.producer.KafkaProducer;
24-
import org.apache.kafka.clients.producer.Producer;
25-
import org.apache.kafka.clients.producer.ProducerConfig;
26-
import org.apache.kafka.clients.producer.ProducerRecord;
2723

2824
import java.util.ArrayList;
2925
import java.util.List;
30-
import java.util.Properties;
3126
import java.util.Scanner;
3227

3328
public class App {
@@ -48,34 +43,27 @@ public static void main(String[] args) {
4843
System.exit(0);
4944
}
5045

51-
Properties producerConfig = new Properties();
52-
53-
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "client-1");
54-
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
55-
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
56-
57-
// kafka brokers
58-
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
59-
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
60-
6146
// send String data
62-
//producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
47+
//String valueSerializer = org.apache.kafka.common.serialization.StringSerializer.class.getName();
6348

6449
// send Protocol Buffer data
65-
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductProtobufSerializer.class.getName());
50+
String valueSerializer = ProductProtobufSerializer.class.getName();
6651

6752
// send JSON data
68-
//producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductJsonSerializer.class.getName());
53+
// String valueSerializer = ProductJsonSerializer.class.getName());
6954

7055
// send AVRO data
71-
//producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductAvroSerializer.class.getName());
56+
// String valueSerializer = producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProductAvroSerializer.class.getName();
57+
58+
final MyKafkaProducer<String, Product> producer = new MyKafkaProducer("client-1",
59+
org.apache.kafka.common.serialization.StringSerializer.class.getName(),
60+
valueSerializer, brokers);
7261

73-
Producer producer = new KafkaProducer<String, String>(producerConfig);
7462

7563
// read input
7664
Scanner in = new Scanner(System.in);
7765

78-
System.out.println("Type Message (type 'exit' to quit)");
66+
System.out.println("Click Enter to send message (type 'exit' to quit)");
7967
String input = in.nextLine();
8068

8169
while (!input.equals("exit")) {
@@ -91,12 +79,11 @@ public static void main(String[] args) {
9179
images.add("wuriyanto.com/img2");
9280
p.setImages(images);
9381

94-
ProducerRecord<String, Product> record = new ProducerRecord<String, Product>(topic, p);
9582

9683
try {
9784
System.out.println(input);
9885

99-
producer.send(record);
86+
producer.send(topic, p);
10087
input = in.nextLine();
10188
} catch (Exception ex) {
10289
System.out.println("error send data to kafka: " + ex.getMessage());
@@ -105,6 +92,6 @@ public static void main(String[] args) {
10592
}
10693

10794
in.close();
108-
producer.close();
95+
producer.getProducer().close();
10996
}
11097
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2019 wuriyanto.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.telkomdev.producer;
18+
19+
import org.apache.kafka.clients.producer.KafkaProducer;
20+
import org.apache.kafka.clients.producer.Producer;
21+
import org.apache.kafka.clients.producer.ProducerConfig;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
24+
import java.util.Properties;
25+
26+
public class MyKafkaProducer<K, V> {
27+
28+
private Producer<K, V> producer;
29+
30+
public MyKafkaProducer() {
31+
}
32+
33+
public MyKafkaProducer(String clientId, String keySerializerClass, String valueSerializerClass, String brokers) {
34+
Properties producerConfig = new Properties();
35+
36+
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
37+
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
38+
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
39+
40+
// kafka brokers
41+
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
42+
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
43+
44+
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
45+
46+
this.producer = new KafkaProducer(producerConfig);
47+
}
48+
49+
public void send(String topic, V v) {
50+
ProducerRecord<K, V> record = new ProducerRecord(topic, v);
51+
this.producer.send(record);
52+
}
53+
54+
public Producer<K, V> getProducer() {
55+
return this.producer;
56+
}
57+
58+
59+
}

0 commit comments

Comments
 (0)
Please sign in to comment.