4
4
package com .example .sns ;
5
5
6
6
// snippet-start:[sns.java2.PriceUpdateExample.import]
7
+
7
8
import software .amazon .awssdk .policybuilder .iam .IamConditionOperator ;
8
9
import software .amazon .awssdk .policybuilder .iam .IamEffect ;
9
10
import software .amazon .awssdk .policybuilder .iam .IamPolicy ;
@@ -39,14 +40,14 @@ public class PriceUpdateExample {
39
40
public static void main (String [] args ) {
40
41
41
42
final String usage = "\n " +
42
- "Usage: " +
43
- " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n \n " +
44
- "Where:\n " +
45
- " fifoTopicName - The name of the FIFO topic that you want to create. \n \n " +
46
- " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n \n "
47
- +
48
- " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n \n " +
49
- " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n \n " ;
43
+ "Usage: " +
44
+ " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n \n " +
45
+ "Where:\n " +
46
+ " fifoTopicName - The name of the FIFO topic that you want to create. \n \n " +
47
+ " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n \n "
48
+ +
49
+ " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n \n " +
50
+ " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n \n " ;
50
51
if (args .length != 4 ) {
51
52
System .out .println (usage );
52
53
System .exit (1 );
@@ -60,9 +61,9 @@ public static void main(String[] args) {
60
61
// For convenience, the QueueData class holds metadata about a queue: ARN, URL,
61
62
// name and type.
62
63
List <QueueData > queues = List .of (
63
- new QueueData (wholeSaleQueueName , QueueType .FIFO ),
64
- new QueueData (retailQueueName , QueueType .FIFO ),
65
- new QueueData (analyticsQueueName , QueueType .Standard ));
64
+ new QueueData (wholeSaleQueueName , QueueType .FIFO ),
65
+ new QueueData (retailQueueName , QueueType .FIFO ),
66
+ new QueueData (analyticsQueueName , QueueType .Standard ));
66
67
67
68
// Create queues.
68
69
createQueues (queues );
@@ -95,9 +96,9 @@ public static String createFIFOTopic(String topicName) {
95
96
"FifoThroughputScope" , "MessageGroup" );
96
97
97
98
CreateTopicRequest topicRequest = CreateTopicRequest .builder ()
98
- .name (topicName )
99
- .attributes (topicAttributes )
100
- .build ();
99
+ .name (topicName )
100
+ .attributes (topicAttributes )
101
+ .build ();
101
102
102
103
CreateTopicResponse response = snsClient .createTopic (topicRequest );
103
104
String topicArn = response .topicArn ();
@@ -115,10 +116,10 @@ public static String createFIFOTopic(String topicName) {
115
116
public static void subscribeQueues (List <QueueData > queues , String topicARN ) {
116
117
queues .forEach (queue -> {
117
118
SubscribeRequest subscribeRequest = SubscribeRequest .builder ()
118
- .topicArn (topicARN )
119
- .endpoint (queue .queueARN )
120
- .protocol ("sqs" )
121
- .build ();
119
+ .topicArn (topicARN )
120
+ .endpoint (queue .queueARN )
121
+ .protocol ("sqs" )
122
+ .build ();
122
123
123
124
// Subscribe to the endpoint by using the SNS service client.
124
125
// Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
@@ -139,20 +140,20 @@ public static void publishPriceUpdate(String topicArn, String payload, String gr
139
140
String attributeValue = "wholesale" ;
140
141
141
142
MessageAttributeValue msgAttValue = MessageAttributeValue .builder ()
142
- .dataType ("String" )
143
- .stringValue (attributeValue )
144
- .build ();
143
+ .dataType ("String" )
144
+ .stringValue (attributeValue )
145
+ .build ();
145
146
146
147
Map <String , MessageAttributeValue > attributes = new HashMap <>();
147
148
attributes .put (attributeName , msgAttValue );
148
149
PublishRequest pubRequest = PublishRequest .builder ()
149
- .topicArn (topicArn )
150
- .subject (subject )
151
- .message (payload )
152
- .messageGroupId (groupId )
153
- .messageDeduplicationId (dedupId )
154
- .messageAttributes (attributes )
155
- .build ();
150
+ .topicArn (topicArn )
151
+ .subject (subject )
152
+ .message (payload )
153
+ .messageGroupId (groupId )
154
+ .messageDeduplicationId (dedupId )
155
+ .messageAttributes (attributes )
156
+ .build ();
156
157
157
158
final PublishResponse response = snsClient .publish (pubRequest );
158
159
System .out .println (response .messageId ());
@@ -174,17 +175,17 @@ public static void createQueues(List<QueueData> queueData) {
174
175
CreateQueueResponse response ;
175
176
if (isFifoQueue ) {
176
177
response = sqsClient .createQueue (r -> r
177
- .queueName (queue .queueName )
178
- .attributes (Map .of (
179
- QueueAttributeName .FIFO_QUEUE , "true" )));
178
+ .queueName (queue .queueName )
179
+ .attributes (Map .of (
180
+ QueueAttributeName .FIFO_QUEUE , "true" )));
180
181
} else {
181
182
response = sqsClient .createQueue (r -> r
182
- .queueName (queue .queueName ));
183
+ .queueName (queue .queueName ));
183
184
}
184
185
queue .queueURL = response .queueUrl ();
185
186
queue .queueARN = sqsClient .getQueueAttributes (b -> b
186
- .queueUrl (queue .queueURL )
187
- .attributeNames (QueueAttributeName .QUEUE_ARN )).attributes ().get (QueueAttributeName .QUEUE_ARN );
187
+ .queueUrl (queue .queueURL )
188
+ .attributeNames (QueueAttributeName .QUEUE_ARN )).attributes ().get (QueueAttributeName .QUEUE_ARN );
188
189
});
189
190
}
190
191
@@ -195,25 +196,25 @@ public static void addAccessPolicyToQueuesFINAL(List<QueueData> queues, String t
195
196
}
196
197
queues .forEach (queue -> {
197
198
IamPolicy policy = IamPolicy .builder ()
198
- .addStatement (b -> b // Allow account user to send messages to the queue.
199
- .effect (IamEffect .ALLOW )
200
- .addPrincipal (IamPrincipalType .AWS , account )
201
- .addAction ("SQS:*" )
202
- .addResource (queue .queueARN ))
203
- .addStatement (b -> b // Allow the SNS FIFO topic to send messages to the queue.
204
- .effect (IamEffect .ALLOW )
205
- .addPrincipal (IamPrincipalType .AWS , "*" )
206
- .addAction ("SQS:SendMessage" )
207
- .addResource (queue .queueARN )
208
- .addCondition (b1 -> b1
209
- .operator (IamConditionOperator .ARN_LIKE )
210
- .key ("aws:SourceArn" ).value (topicARN )))
211
- .build ();
199
+ .addStatement (b -> b // Allow account user to send messages to the queue.
200
+ .effect (IamEffect .ALLOW )
201
+ .addPrincipal (IamPrincipalType .AWS , account )
202
+ .addAction ("SQS:*" )
203
+ .addResource (queue .queueARN ))
204
+ .addStatement (b -> b // Allow the SNS FIFO topic to send messages to the queue.
205
+ .effect (IamEffect .ALLOW )
206
+ .addPrincipal (IamPrincipalType .AWS , "*" )
207
+ .addAction ("SQS:SendMessage" )
208
+ .addResource (queue .queueARN )
209
+ .addCondition (b1 -> b1
210
+ .operator (IamConditionOperator .ARN_LIKE )
211
+ .key ("aws:SourceArn" ).value (topicARN )))
212
+ .build ();
212
213
sqsClient .setQueueAttributes (b -> b
213
- .queueUrl (queue .queueURL )
214
- .attributes (Map .of (
215
- QueueAttributeName .POLICY ,
216
- policy .toJson ())));
214
+ .queueUrl (queue .queueURL )
215
+ .attributes (Map .of (
216
+ QueueAttributeName .POLICY ,
217
+ policy .toJson ())));
217
218
});
218
219
}
219
220
0 commit comments