Skip to content

Commit 2770316

Browse files
authored
Modified KafkaStreamingJob to operate on JSON (#87)
* Modified KafkaStreamingJob to operate on JSON instead of strings following the schema in Stock.java * Modified READMEs to make instructions more clear and changed the signature of createKafkaSink
1 parent d413469 commit 2770316

File tree

9 files changed

+151
-19
lines changed

9 files changed

+151
-19
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ env/
1212
venv/
1313
.java-version
1414
/pyflink/
15+
__pycache__/
1516

1617
/.run/

java/KafkaConnectors/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ source and sink.
1111

1212
This example uses KafkaSource and KafkaSink.
1313

14+
This example expects a JSON payload as input and outputs a corresponding JSON output.
15+
The JSON input follows the structure set in `Stock.java` and can be automatically generated with
16+
[`stock_kafka.py`](../../python/data-generator/stock_kafka.py) under the `python/data_generator` directory.
17+
1418
![Flink Example](images/flink-example.png),
1519

1620
> In this example, the Kafka Sink uses *exactly-once* delivery guarantees. This leverages Kafka transaction under the hood, improving guarantees but

java/KafkaConnectors/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@
6666
<scope>provided</scope>
6767
</dependency>
6868

69+
<dependency>
70+
<groupId>org.apache.flink</groupId>
71+
<artifactId>flink-json</artifactId>
72+
<version>${flink.version}</version>
73+
<scope>provided</scope>
74+
</dependency>
75+
6976
<!-- Logging framework, to produce console output when running in the IDE. -->
7077
<!-- These dependencies are excluded from the application JAR by default. -->
7178
<dependency>

java/KafkaConnectors/src/main/java/com/amazonaws/services/msf/KafkaStreamingJob.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
44
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
5-
import org.apache.flink.api.common.serialization.SimpleStringSchema;
5+
import org.apache.flink.api.common.serialization.DeserializationSchema;
6+
import org.apache.flink.api.common.serialization.SerializationSchema;
67
import org.apache.flink.connector.base.DeliveryGuarantee;
78
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
89
import org.apache.flink.connector.kafka.sink.KafkaSink;
910
import org.apache.flink.connector.kafka.source.KafkaSource;
1011
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
12+
import org.apache.flink.formats.json.JsonDeserializationSchema;
13+
import org.apache.flink.formats.json.JsonSerializationSchema;
1114
import org.apache.flink.streaming.api.datastream.DataStream;
1215
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
1316
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -53,31 +56,26 @@ private static Map<String, Properties> loadApplicationProperties(StreamExecution
5356
}
5457

5558

56-
private static KafkaSource<String> createKafkaSource(Properties inputProperties) {
59+
private static <T> KafkaSource<T> createKafkaSource(Properties inputProperties, final DeserializationSchema<T> valueDeserializationSchema) {
5760
OffsetsInitializer startingOffsetsInitializer = inputProperties.containsKey("startTimestamp") ? OffsetsInitializer.timestamp(
5861
Long.parseLong(inputProperties.getProperty("startTimestamp"))) : DEFAULT_OFFSETS_INITIALIZER;
5962

60-
return KafkaSource.<String>builder()
63+
return KafkaSource.<T>builder()
6164
.setBootstrapServers(inputProperties.getProperty("bootstrap.servers"))
6265
.setTopics(inputProperties.getProperty("topic", DEFAULT_SOURCE_TOPIC))
6366
.setGroupId(inputProperties.getProperty("group.id", DEFAULT_GROUP_ID))
6467
.setStartingOffsets(startingOffsetsInitializer) // Used when the application starts with no state
65-
.setValueOnlyDeserializer(new SimpleStringSchema())
68+
.setValueOnlyDeserializer(valueDeserializationSchema)
6669
.setProperties(inputProperties)
6770
.build();
6871
}
6972

7073

71-
private static KafkaSink<String> createKafkaSink(Properties outputProperties) {
72-
return KafkaSink.<String>builder()
74+
private static <T> KafkaSink<T> createKafkaSink(Properties outputProperties, KafkaRecordSerializationSchema<T> recordSerializationSchema) {
75+
return KafkaSink.<T>builder()
7376
.setBootstrapServers(outputProperties.getProperty("bootstrap.servers"))
7477
.setKafkaProducerConfig(outputProperties)
75-
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
76-
.setTopic(outputProperties.getProperty("topic", DEFAULT_SINK_TOPIC))
77-
.setKeySerializationSchema(new SimpleStringSchema())
78-
.setValueSerializationSchema(new SimpleStringSchema())
79-
.build()
80-
)
78+
.setRecordSerializer(recordSerializationSchema)
8179
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
8280
.build();
8381
}
@@ -91,6 +89,7 @@ private static Properties mergeProperties(Properties properties, Properties auth
9189
public static void main(String[] args) throws Exception {
9290
// Set up the streaming execution environment
9391
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
92+
env.enableCheckpointing(1000);
9493

9594
// Load the application properties
9695
final Map<String, Properties> applicationProperties = loadApplicationProperties(env);
@@ -105,11 +104,21 @@ public static void main(String[] args) throws Exception {
105104
Properties outputProperties = mergeProperties(applicationProperties.get("Output0"), authProperties);
106105

107106
// Create and add the Source
108-
KafkaSource<String> source = createKafkaSource(inputProperties);
109-
DataStream<String> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
107+
KafkaSource<Stock> source = createKafkaSource(inputProperties, new JsonDeserializationSchema<>(Stock.class));
108+
DataStream<Stock> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
109+
110+
KafkaRecordSerializationSchema<Stock> recordSerializationSchema = KafkaRecordSerializationSchema.<Stock>builder()
111+
.setTopic(outputProperties.getProperty("topic", DEFAULT_SINK_TOPIC))
112+
// Use a field as kafka record key
113+
// Define no keySerializationSchema to publish kafka records with no key
114+
.setKeySerializationSchema(stock -> stock.getTicker().getBytes())
115+
// Serialize the Kafka record value (payload) as JSON
116+
.setValueSerializationSchema(new JsonSerializationSchema<>())
117+
.build();
118+
110119

111120
// Create and add the Sink
112-
KafkaSink<String> sink = createKafkaSink(outputProperties);
121+
KafkaSink<Stock> sink = createKafkaSink(outputProperties, recordSerializationSchema);
113122
input.sinkTo(sink);
114123

115124
env.execute("Flink Kafka Source and Sink examples");
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.amazonaws.services.msf;
2+
3+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
4+
5+
public class Stock {
6+
@JsonProperty("event_time")
7+
private String eventTime;
8+
private String ticker;
9+
private float price;
10+
11+
public Stock() {}
12+
13+
public String getEventTime() {
14+
return eventTime;
15+
}
16+
17+
public void setEventTime(String eventTime) {
18+
this.eventTime = eventTime;
19+
}
20+
21+
public String getTicker() {
22+
return ticker;
23+
}
24+
25+
public void setTicker(String ticker) {
26+
this.ticker = ticker;
27+
}
28+
29+
public float getPrice() {
30+
return price;
31+
}
32+
33+
public void setPrice(float price) {
34+
this.price = price;
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return "Stock{" +
40+
"event_time='" + eventTime + '\'' +
41+
", ticker='" + ticker + '\'' +
42+
", price=" + price +
43+
'}';
44+
}
45+
}

java/KinesisConnectors/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ This example uses `FlinkKinesisConsumer` and `KinesisStreamsSink` connectors.
1414

1515
This example expects a JSON payload as input and outputs a corresponding JSON output.
1616
The JSON input follows the structure set in `Stock.java` and can be automatically generated with
17-
`stock.py` under the python directory.
17+
[`stock.py`](../../python/data-generator/stock.py) under the `python/data_generator` directory.
1818

1919
![Flink Example](images/flink-kinesis-example.png)
2020

python/data-generator/README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,34 @@
11
## Sample data generator
22

3-
Simple data generator application, publishing stock data to Kinesis Data Stream.
3+
Simple data generator application, publishing stock data to Kinesis or Kafka Data Stream.
44
This data generator is used by some of the examples in this repository.
55

6-
Requires `boto3`
6+
Requires `boto3` and `kafka-python-ng`.
7+
> `kafka-python-ng` is used instead of `kafka-python` because `kafka-python` has not been updated for Python 3.8 and above.
78
8-
### Usage
9+
### Installation Instructions
10+
Install the required dependencies with the following command:
11+
12+
```bash
13+
python -m pip install -r requirements.txt
14+
```
15+
16+
### Kinesis Usage
917

1018
```
1119
python stock.py <stream_name>
1220
```
1321
If not stream name is specified, `ExampleInputStream` will be used.
1422

23+
### Kafka Usage
24+
25+
```
26+
python stock_kafka.py <topic_name> <bootstrap_server>
27+
```
28+
29+
If no topic name is specified, `StockInputTopic` will be used.
30+
If no bootstrap server is specified, `localhost:9092` will be used.
31+
1532
### Data example
1633

1734
```
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
boto3==1.35.90
2+
botocore==1.35.90
3+
kafka-python-ng==2.2.3

python/data-generator/stock_kafka.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import json
2+
import sys
3+
import time
4+
5+
from kafka import KafkaProducer
6+
from stock import get_data
7+
from traceback import print_exc
8+
9+
DEFAULT_STREAM_NAME = "StockInputTopic"
10+
DEFAULT_BOOTSTRAP_SERVERS = ['localhost:9092']
11+
12+
def generate_kafka_data(topic_name, bootstrap_servers):
13+
try:
14+
# Create producer instance
15+
producer = KafkaProducer(
16+
bootstrap_servers=bootstrap_servers, # Replace with your Kafka broker(s)
17+
value_serializer=lambda x: json.dumps(x).encode('utf-8') # Serialize data to JSON
18+
)
19+
while True:
20+
data = get_data()
21+
future = producer.send(topic_name, data, key=data['ticker'].encode('utf-8'))
22+
future.get(timeout=60)
23+
24+
print(f"{data} sent to {topic_name}")
25+
time.sleep(1)
26+
27+
except Exception as e:
28+
print_exc()
29+
print(f"Error sending message: {e}")
30+
31+
finally:
32+
# Close the producer
33+
producer.close()
34+
35+
36+
if __name__ == '__main__':
37+
topic_name = DEFAULT_STREAM_NAME
38+
bootstrap_servers = DEFAULT_BOOTSTRAP_SERVERS
39+
if len(sys.argv) == 2:
40+
topic_name = sys.argv[1]
41+
elif len(sys.argv) == 3:
42+
topic_name = sys.argv[1]
43+
bootstrap_servers = [sys.argv[2]]
44+
45+
print(f"Sending data to '{topic_name}'")
46+
generate_kafka_data(topic_name, bootstrap_servers)

0 commit comments

Comments
 (0)