Skip to content

Commit 0087883

Browse files
authored
Example on how to use Kafka config providers in Flink Kafka connectors, using Table API & SQL(Java), with mTLS authentication. (#72)
Authored-by: “Ravtej” <“sravtej”@amazon.co.uk”>
1 parent 336a970 commit 0087883

File tree

18 files changed

+524
-40
lines changed

18 files changed

+524
-40
lines changed

java/KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ is controlled via networking (SecurityGroups, NACL) and by the SASL credentials
162162

163163
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
164164

165-
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
165+
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.
166166

167167
Runtime parameters:
168168

@@ -186,4 +186,4 @@ All parameters are case-sensitive.
186186
187187
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
188188

189-
See [Running examples locally](../running-examples-locally.md) for details.
189+
See [Running examples locally](../../running-examples-locally.md) for details.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
!*.jar
1+
local-repo/**/*.jar

java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders/README.md

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
## Sample illustrating how to use MSK config providers in Flink Kafka connectors, for mTLS authentication
22

3-
* Flink version: 1.19
3+
* Flink version: 1.20
44
* Flink API: DataStream API
55
* Language: Java (11)
66
* Flink connectors: Kafka (mTLS authentication)
77

88
This sample illustrates how to configure the Flink Kafka connectors (KafkaSource and KafkaSink)
9-
retrieving custom KeyStore and TrustStore at runtime, using Config Providers.
9+
retrieving custom KeyStore at runtime, using Config Providers.
1010
More details on the MSK Config Providers in [this repo](https://github.com/aws-samples/msk-config-providers).
1111

12-
* KeyStore and TrustStore are fetched from S3, when the job starts.
13-
* The passwords to open both KeyStore and TrustStore are also fetched when the job starts, from AWS Secret Manager.
12+
* KeyStore is fetched from S3, when the job starts.
13+
* The password to open KeyStore is also fetched when the job starts, from AWS Secret Manager.
1414
* No secret is packaged with the application.
1515

1616
### High level approach
@@ -45,16 +45,13 @@ builder.setProperty("config.providers.s3import.class", "com.amazonaws.kafka.conf
4545
String region = appProperties.get("S3BucketRegion");
4646
String keystoreS3Bucket = appProperties.get("KeystoreS3Bucket");
4747
String keystoreS3Path = appProperties.get("KeystoreS3Path");
48-
String truststoreS3Bucket = appProperties.get("TruststoreS3Bucket");
49-
String truststoreS3Path = appProperties.get("TruststoreS3Path");
5048
String keystorePassSecret = appProperties.get("KeystorePassSecret");
5149
String keystorePassSecretField = appProperties.get("KeystorePassSecretField");
5250

5351
// region, etc..
5452
builder.setProperty("config.providers.s3import.param.region", region);
5553

5654
// properties
57-
builder.setProperty("ssl.truststore.location", "${s3import:" + region + ":" + truststoreS3Bucket + "/" + truststoreS3Path + "}");
5855
builder.setProperty("ssl.keystore.type", "PKCS12");
5956
builder.setProperty("ssl.keystore.location", "${s3import:" + region + ":" + keystoreS3Bucket + "/" + keystoreS3Path + "}");
6057
builder.setProperty("ssl.keystore.password", "${secretsmanager:" + keystorePassSecret + ":" + keystorePassSecretField + "}");
@@ -73,7 +70,7 @@ Access Policy/Role associated with the application that is running a config prov
7370

7471
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
7572

76-
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
73+
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](src/main/resources/flink-application-properties-dev.json) file located in the resources folder.
7774

7875
Runtime parameters:
7976

@@ -82,11 +79,9 @@ Runtime parameters:
8279
| `Input0` | `bootstrap.servers` | | kafka cluster boostrap servers |
8380
| `Input0` | `topic` | `source` | source topic name |
8481
| `Input0` | `group.id` | `flink-app` | kafka consumer group id |
85-
| `Input0` | `bucket.region` | | region of the S3 bucket(s) containing the keystore and truststore |
82+
| `Input0` | `bucket.region` | | region of the S3 bucket(s) containing the keystore |
8683
| `Input0` | `keystore.bucket` | | name of the S3 bucket containing the keystore |
8784
| `Input0` | `keystore.path` | | path to the keystore object, omitting any trailing `/` |
88-
| `Input0` | `truststore.bucket` | | name of the S3 bucket containing the truststore |
89-
| `Input0` | `truststore.path` | | path to the truststore object, omitting any trailing `/` |
9085
| `Input0` | `keystore.secret` | | SecretManager secret ID containing the password of the keystore |
9186
| `Input0` | `keystore.secret.field` | | SecretManager secret key containing the password of the keystore |
9287

@@ -99,4 +94,4 @@ All parameters are case-sensitive.
9994
10095
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
10196

102-
See [Running examples locally](../running-examples-locally.md) for details.
97+
See [Running examples locally](../../running-examples-locally.md) for details.

java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders/docs/step-by-step.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ Also add a self-reference inbound rule to the Security Group for port 9094, that
2222

2323
### 2. Setup client bastion
2424

25-
We can use the AWS Cloud9 environment and `KafkaClientEC2Instance` setup as described in the [MSK Labs Workshop](https://catalog.workshops.aws/msk-labs/en-US/overview/setup) for testing.
25+
We can setup the Kafka client EC2 instance using the steps described in the [MSK Labs Workshop](https://catalog.workshops.aws/msk-labs/en-US/overview/prerequisites) for testing.
2626

27-
SSH into `KafkaClientEC2Instance` from Cloud9 terminal.
27+
Connect to the Kafka client EC2 instance via SSM.
2828

2929
Export the brokers and zookeeper connection strings for the Amazon MSK cluster-
3030
```
@@ -109,13 +109,12 @@ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$zkeeper --add --all
109109

110110
### 6. Setup Amazon S3 Bucket to be used by Amazon Managed Service for Apache Flink
111111

112-
Create an S3 bucket in the required AWS region e.g. us-east-1. We will use this bucket e.g. `kafkaclientstore` to get the keystore/truststore and the Flink application jar in subsequent steps.
112+
Create an S3 bucket in the required AWS region e.g. us-east-1. We will use this bucket e.g. `kafkaclientstore` to get the keystore and Flink application jar in subsequent steps.
113113

114-
Now copy the keystore and truststore from `/tmp` to this S3 bucket
114+
Now copy the keystore from `/tmp` to this S3 bucket
115115
```
116116
cd /tmp
117117
aws s3 cp kafka.client.keystore.jks s3://kafkaclientstore
118-
aws s3 cp kafka.client.truststore.jks s3://kafkaclientstore
119118
aws s3 cp private_key.pem s3://kafkaclientstore
120119
aws s3 cp client_cert.pem s3://kafkaclientstore
121120
```
@@ -165,11 +164,9 @@ Under `Runtime properties`, add the following properties:
165164
| `Input0` | `bootstrap.servers` | | kafka cluster boostrap servers |
166165
| `Input0` | `topic` | `source` | source topic name |
167166
| `Input0` | `group.id` | `flink-app` | kafka consumer group id |
168-
| `Input0` | `bucket.region` | | region of the S3 bucket(s) containing the keystore and truststore |
167+
| `Input0` | `bucket.region` | | region of the S3 bucket(s) containing the keystore |
169168
| `Input0` | `keystore.bucket` | | name of the S3 bucket containing the keystore |
170169
| `Input0` | `keystore.path` | | path to the keystore object, omitting any trailing `/` |
171-
| `Input0` | `truststore.bucket` | | name of the S3 bucket containing the truststore |
172-
| `Input0` | `truststore.path` | | path to the truststore object, omitting any trailing `/` |
173170
| `Input0` | `keystore.secret` | | SecretManager secret ID containing the password of the keystore |
174171
| `Input0` | `keystore.secret.field` | | SecretManager secret key containing the password of the keystore |
175172

java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders/docs/troubleshoot-guide.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ Few important points to consider that will help troubleshoot or prevent any pote
1414

1515
2. While configuring Managed Apache Flink application, please ensure that the VPC connectivity, Subnets and Security Groups (under ‘Networking’ section) are correctly selected and are allowing access to the required resources, e.g. Kafka cluster and brokers. Depending on the setup e.g. for mTLS you may need to add a self-reference inbound rule to the security group for port 9094. Also, check if there are any Kafka ACLs set on the respective topic(s) for authorization and if the required operation(s) have the `ALLOW` permissionType.
1616

17-
3. While configuring the Runtime properties for the Apache Flink application, please ensure that the values for `keystore.bucket` and `truststore.bucket` do not contain the prefix `s3://` . This is different from `Application code location` section where the specified Amazon S3 bucket needs to have the format `s3://bucket`.Also, the path to S3 object(s) e.g. `keystore.path` doesn't need a trailing slash. For e.g., if the keystore `kafka.client.keystore.jks` is stored within the S3 bucket `my-bucket`, then the runtime properties can be as follows:
18-
19-
![Runtime Properties](../images/flink-keystore-s3-example.png)
17+
3. While configuring the Runtime properties for the Apache Flink application, please ensure that the value for `keystore.bucket` does not contain the prefix `s3://` . This is different from `Application code location` section where the specified Amazon S3 bucket needs to have the format `s3://bucket`.Also, the path to S3 object(s) e.g. `keystore.path` doesn't need a trailing slash.
2018

2119
4. When running the Apache Flink application, if you are getting a `SecretsManagerException` with Status Code 400 (e.g. not authorized to perform: `secretsmanager:GetSecretValue` on resource: SSL_KEYSTORE_PASS because no identity-based policy allows the `secretsmanager:GetSecretValue` action), please make sure that the IAM Role for the application has the necessary permission policy for SecretsManager.

java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<target.java.version>11</target.java.version>
1717
<maven.compiler.source>${target.java.version}</maven.compiler.source>
1818
<maven.compiler.target>${target.java.version}</maven.compiler.target>
19-
<flink.version>1.19.1</flink.version>
19+
<flink.version>1.20.0</flink.version>
2020
<kafka.connector.version>3.2.0-1.19</kafka.connector.version>
2121
<kda.runtime.version>1.2.0</kda.runtime.version>
2222
<jackson.databind.version>2.13.4.2</jackson.databind.version>

java/KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders/src/main/java/com/amazonaws/services/msf/StreamingJob.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,14 @@ public class StreamingJob {
2626
private static final String DEFAULT_SOURCE_TOPIC = "source";
2727
private static final String DEFAULT_CONSUMER_GROUP = "flink-app";
2828

29-
public static final String KAFKA_SOURCE_TOPIC_KEY = "topic";
30-
public static final String MSKBOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
31-
public static final String KAFKA_CONSUMER_GROUP_ID_KEY = "group.id";
32-
public static final String S3_BUCKET_REGION_KEY = "bucket.region";
33-
public static final String KEYSTORE_S3_BUCKET_KEY = "keystore.bucket";
34-
public static final String KEYSTORE_S3_PATH_KEY = "keystore.path";
35-
public static final String TRUSTSTORE_S3_BUCKET_KEY = "truststore.bucket";
36-
public static final String TRUSTSTORE_S3_PATH_KEY = "truststore.path";
37-
public static final String KEYSTORE_PASS_SECRET_KEY = "keystore.secret";
38-
public static final String KEYSTORE_PASS_SECRET_FIELD_KEY = "keystore.secret.field";
29+
private static final String KAFKA_SOURCE_TOPIC_KEY = "topic";
30+
private static final String MSKBOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
31+
private static final String KAFKA_CONSUMER_GROUP_ID_KEY = "group.id";
32+
private static final String S3_BUCKET_REGION_KEY = "bucket.region";
33+
private static final String KEYSTORE_S3_BUCKET_KEY = "keystore.bucket";
34+
private static final String KEYSTORE_S3_PATH_KEY = "keystore.path";
35+
private static final String KEYSTORE_PASS_SECRET_KEY = "keystore.secret";
36+
private static final String KEYSTORE_PASS_SECRET_FIELD_KEY = "keystore.secret.field";
3937

4038
private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
4139

@@ -90,16 +88,13 @@ private static void configureConnectorPropsWithConfigProviders(KafkaSourceBuilde
9088
String region = appProperties.getProperty(S3_BUCKET_REGION_KEY);
9189
String keystoreS3Bucket = appProperties.getProperty(KEYSTORE_S3_BUCKET_KEY);
9290
String keystoreS3Path = appProperties.getProperty(KEYSTORE_S3_PATH_KEY);
93-
String truststoreS3Bucket = appProperties.getProperty(TRUSTSTORE_S3_BUCKET_KEY);
94-
String truststoreS3Path = appProperties.getProperty(TRUSTSTORE_S3_PATH_KEY);
9591
String keystorePassSecret = appProperties.getProperty(KEYSTORE_PASS_SECRET_KEY);
9692
String keystorePassSecretField = appProperties.getProperty(KEYSTORE_PASS_SECRET_FIELD_KEY);
9793

9894
// region, etc..
9995
builder.setProperty("config.providers.s3import.param.region", region);
10096

10197
// properties
102-
builder.setProperty("ssl.truststore.location", "${s3import:" + region + ":" + truststoreS3Bucket + "/" + truststoreS3Path + "}");
10398
builder.setProperty("ssl.keystore.type", "PKCS12");
10499
builder.setProperty("ssl.keystore.location", "${s3import:" + region + ":" + keystoreS3Bucket + "/" + keystoreS3Path + "}");
105100
builder.setProperty("ssl.keystore.password", "${secretsmanager:" + keystorePassSecret + ":" + keystorePassSecretField + "}");
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
local-repo/**/*.jar

0 commit comments

Comments
 (0)