Spring Cloud AWS & SQS: Streamline Your Development Workflow with Reliable Queuing | Learn Java Skills

Imran Shaikh
0

Building modern, scalable applications sometimes need effective communication across many components. However, typical synchronous communication can cause bottlenecks and reduce performance.


Asynchronous messaging enables programs to exchange messages without waiting for a response, resulting in greater responsiveness and robustness.


This blog article explains how to use Spring Cloud AWS and AWS SQS to establish strong asynchronous communication in your microservices architecture.


We'll walk you through integrating SQS with your Spring Boot application, sending and receiving messages, and dealing with any issues. By the conclusion, you'll have the knowledge and skills you need to improve communication inside your apps and grasp the benefits of an asynchronous approach.


toc

The complete code for this project is available on my GitHub repository (getButton) #text=(GitHub) #icon=(share) #color=(#000000)

spring cloud aws and amazon sqs thumbnail image

Amazon Simple Queue Service (SQS)


Amazon SQS offers queues for high-volume, system-to-system communications. Queues may be used to decouple heavyweight processes as well as buffer and batch operations.


Amazon SQS retains messages until microservices and serverless applications can process them.

Amazon SQS diagram

Amazon SQS enables producers to put messages into a queue. The messages are then saved in a SQS Queue. When consumers are ready to process new messages, they retrieve them from the queue.


Applications, microservices, and numerous AWS services can function as either producers or consumers.


Spring Cloud AWS


Spring Cloud for Amazon Web Services is an extension of the Spring Cloud umbrella project, which enables Spring Boot applications to easily communicate with Amazon Web Services.


Spring Cloud AWS provides an effortless way for accessing AWS services through well-known Spring idioms and APIs, such as the messaging or caching API.


Developers may design apps based on AWS hosted services without having to worry about infrastructure or maintenance.


Creating Your First Message Queue: A Step-by-Step Guide to Amazon SQS


Access the AWS Management Console:

Navigate to the AWS Management Console at (getButton) #text=(Amazon Console) #icon=(link) #color=(#d35400) Sign in with your AWS credentials if you haven't already.


Open the Amazon SQS service:

Search for SQS in the search box or find it in the Messaging category of the service list.

Search SQS in the search box

Create a new queue:

Click on the Create queue button.

Create Queue

Choose an appropriate name for your queue. In my situation, I named the my-first queue. Amazon SQS offers two types of queues: Standard and FIFO. For this example, I'll be creating a standard queue.

my-first-queue

You can leave the default settings for most use cases or customize them based on your specific needs (e.g., visibility timeout, message retention period).

default-setting-visibility-timeout-in-sqs

Configure the access policy to determine who may send and receive messages from this queue. I created an IAM user, sqs-queue-read-write, to send and receive messages programmatically. Must provide the ARN of the IAM user here.

SQS-queue-access-policy

Click on Create queue to finalize the process.

create-queue-final

Spring Cloud AWS and SQS: Essential Dependencies for Streamlining Your Messaging


spring-cloud-aws-starter-sqs: Provides messaging abstractions for sending and receiving messages.


 // spring cloud aws BOM(Bill of materials)
 implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:${springCloudAwsVersion}")
 // Amazon SQS 
 implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'

Configuring Amazon SQS in Spring Cloud AWS


Securely Configuring AWS Credentials: Different Approaches

Spring Cloud AWS starter automatically configures a DefaultCredentialsProvider that checks for AWS credentials in the following order:


Java System Properties: aws.accessKeyId, aws.secretAccessKey.


• Environment variables include AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY


• Credentials for Web Identity Token can be obtained from system configurations or environmental variables.


• Credential profiles are stored in the default directory (~/.aws/credentials) used by all AWS SDKs and the AWS CLI.


• Credentials provided through the Amazon EC2 container service if the AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is defined and the security manager has access to the variable.


• Instance profile credentials are given via the Amazon EC2 metadata service.


Configuring SQS Properties in Application.properties

 # Amazon SQS configuration
spring.cloud.aws.credentials.access-key=${AWS_ACCESS_KEY_ID}
spring.cloud.aws.credentials.secret-key=${AWS_SECRET_ACCESS_KEY}
spring.cloud.aws.region.static=${AWS_REGION}
spring.cloud.aws.sqs.endpoint=${SQS_ENDPOINT}

Granting IAM User Access to an SQS Queue


Securely grant your IAM user the required permissions to interact with the SQS queue within your account.


 {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "sqs-read-and-write-policy",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "your sqs arn e.g: arn:aws:sqs:*:123456789012:*"
        }
    ]
 }

Sending a Message to an Amazon SQS Queue


Spring Cloud AWS offers the SqsTemplate for sending messages to the Amazon SQS.


Using SqsTemplate for SQS Messaging

When leveraging Spring Boot and autoconfiguration, a SqsTemplate instance is automatically autowired if no other template bean is detected in the context.


This template instance is supported by the autoconfigured SqsAsyncClient, which accepts any configurations given. SqsTemplates are immutable and thread-safe.


If more extensive setup is preferred, a builder is also provided along with a set of options.


 package in.learnjavaskills.sqsmessaging.configuration;

 import io.awspring.cloud.sqs.operations.SqsTemplate;
 import io.awspring.cloud.sqs.operations.TemplateAcknowledgementMode;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import software.amazon.awssdk.services.sqs.SqsAsyncClient;

 import java.time.Duration;

 @Configuration
 public class SqsTemplateConfiguration
 {
    @Bean
    public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient)
    {
        return SqsTemplate.builder()
                .sqsAsyncClient(sqsAsyncClient)
                .configure(options -> options
                        .acknowledgementMode(TemplateAcknowledgementMode.MANUAL)
                        .defaultQueue("my-first-queue")
                        .defaultPollTimeout(Duration.ofSeconds(5))
                        .defaultMaxNumberOfMessages(5)
                )
                .build();
    }
 }

Sending Messages with SqsTemplate

There are several methods for sending messages to SQS queues using the SqsTemplate. The SqsOperations interface provides the following methods, which have async equivalents in SqsAsyncOperations.


Method Description
SendResult<T> send(String queue, T payload);
Send a message to the provided queue with the given payload
SendResult<T> send(T payload);
Send a message to the configured default endpoint
SendResult<T> send(String queue, Message<T> message);
Send the given Message to the provided queue
SendResult<T> send(Consumer<SqsSendOptions> to);
Send a message with the provided options
SendResult.Batch<T> sendMany(String queue, Collection<Message<T>> messages);
Send a batch of Messages to the provided queue

An example using the SqsSendOptions variant follows:


 SendResult<Object> sendResult = sqsTemplate.send(to -> to.queue("my-first-queue")
	.payload("Hello,  My first message to amazon SQS")
	.header("header1", "header-value1")
	.delaySeconds(2));

An example of sending a message using a Java class is:


 Employee employee = new Employee("name", "designation", "department");
 SendResult<Object> sendResult = sqsTemplate.send(to ->to.queue("my-first-queue")
	.payload(employee)
	.header("header1", "header-value1")
	.headers(Map.of("My-second-header", "my-second-header-value"))
	.delaySeconds(1));

Understanding the SendResult of SqsTemplate.send

The SendResult record offers helpful information about the sending operation.


 public record SendResult<T>(UUID messageId, String endpoint, Message<T> message, Map<String, Object> additionalInformation) {
    public record Batch<T>(Collection<SendResult<T>> successful, Collection<SendResult.Failed<T>> failed) {}
    public record Failed<T> (String errorMessage, String endpoint, Message<T> message, Map<String, Object> additionalInformation) {}
 } 

When the send actions succeeds, the SendResult object is created with:


messageId The messageId returned from SQS for the message
endpoint The endpoint the message was sent to
Message The Message instance that was sent, with any additional headers that might have been added by the framework
additionalInformation An additionalInformation map with the sequenceNumber generated for the message in Fifo queues

When the send operation for a single message fails, a MessagingOperationFailedException is thrown, which contains the message.


For Batch send operations, a SendResult.Batch object is returned. This object includes a collection of successful and failed results.


If any messages failed to be sent inside a batch, return the relevant SendResult.Failed objects are created.


Receiving Messages with SqsTemplate


The SqsTemplate provides handy methods for receiving messages from the Standard and FIFO SQS queues.


These methods are divided into two interfaces that SqsTemplate implements: SqsOperations and SqsAsyncOperations. If just sync or async actions are required, utilizing the specified interface might help cut down the options.


The SqsOperations interface provides the following methods, which have async equivalents in SqsAsyncOperations.


Method Description
Optional<Message<?>> receive();
Receive a message from the configured default endpoint and options
<T> Optional<Message<T>> receive(String queue, Class<T> payloadClass);
Receive a message from the provided queue and convert the payload to the provided class
Optional<Message<?>> receive(Consumer<SqsReceiveOptions> from);
Receive a message with the provided options
<T> Optional<Message<T>> receive(Consumer<SqsReceiveOptions> from, Class<T> payloadClass);
Receive a message with the provided options and convert the payload to the provided class
Collection<Message<?>> receiveMany();
Receive a batch of messages from the configured default endpoint and options.
<T> Collection<Message<T>> receiveMany(String queue, Class<T> payloadClass);
Receive a batch of messages from the provided queue and convert the payloads to the provided class.
Collection<Message<?>> receiveMany(Consumer<SqsReceiveOptions> from);
Receive a batch of messages with the provided options
<T> Collection<Message<T>> receiveMany(Consumer<SqsReceiveOptions> from, Class<T> payloadClass);
Receive a batch of messages with the provided options and convert the payloads to the provided class.

The following is an example for receiving a message with options:


 Optional<Message<Employee>> receivedMessage = sqsTemplate.receive(from -> from.queue("my-first-queue")
	.visibilityTimeout(Duration.ofSeconds(10))
	.pollTimeout(Duration.ofSeconds(10)), Employee.class); 
The SqsTemplate by default acknowledges all received messages, which can be changed by setting TemplateAcknowledgementMode.MANUAL in the template options:(alert-warning)
 SqsTemplate.builder()
	.configure(options -> 
    	options.acknowledgementMode(TemplateAcknowledgementMode.MANUAL));

Consuming Messages from SQS Queues using @SqsListener


The easiest approach to consume SQS messages is to use the @SqsListener annotation on a method in a @Component class.


The framework will then build the MessageListenerContainer and configure a MessagingMessageListenerAdapter to call the method whenever a message is received.


Defining Queue Names for @SqsListener Annotation (Spring Cloud AWS)

One or more queues can be provided in the annotation using either the queueNames or value attributes; there is no differentiation between the two.


Queue URLs can be used instead of queue names. Using urls instead of queue names might result in significantly faster startup times since it prevents the framework from searching for the queue url when the containers start.


 @SqsListener({"my-first-queue", "${my.second.queue.url}"})
 public void listener(String message) {
    System.out.println("received message : "  + message);
 }
Any number of @SqsListener annotations can be used in a bean class, and each annotated method will be handled by a separate MessageListenerContainer.(alert-success)

Configuring Advanced Options with @SqsListener

The following properties can be supplied using the @SqsListener annotation. Such properties override the corresponding SqsContainerOptions for the generated MessageListenerContainer.


id Indicate the resultant container's id. This may be used to retrieve the container from the MessageListenerContainerRegistry, and it is also utilized by the container and its components for general logging and thread identification.
maxConcurrentMessages Determine the maximum number of messages that can be in flight at any given point in time.
pollTimeoutSeconds Set the maximum amount of time to wait for a poll to return from SQS. Please keep in mind that if there are messages available, the call may return before this time.
messageVisibilitySeconds Set the minimum visibility for messages collected in a poll. Note that for FIFO single message listener methods, this visibility is applied to the whole batch before each message is delivered to the listener.

Here's an example of @SqsListener annotation properties:


 @SqsListener(queueNames = "my-first-queue", id = "my-first-queue-id",
            maxConcurrentMessages = "10", pollTimeoutSeconds = "2",
            messageVisibilitySeconds = "2")
 public void listener(Message<Employee> message) {
   System.out.println("received message : "  + message.getPayload());
 }

Understanding Listener Method Arguments in @SqsListener

The listener method signature allows a variety of argument types, which are as follows:


Listener Method Arguments Description
MyPojo POJO types are automatically de-serialized from JSON.
Message<MyPojo> Retrieves a Message<MyPojo> object with deserialized payload and MessageHeaders.
List<MyPojo> Enables batch mode and gets the batch polled by SQS.
List<Message<MyPojo>> Enables batch mode and gets the batch polled from SQS, featuring all the headers.
@Header(String headerName) provides the specified header.
@Headers provides the MessageHeaders or a Map<String, Object>
Acknowledgement provides methods for manually acknowledging messages for individual message listeners. AcknowledgementMode must be set to Manual.
BatchAcknowledgement Batch listeners can manually acknowledge partial or entire message batches. AcknowledgementMode must be set to Manual.
Visibility provides the changeTo() method that enables changing the message’s visibility to the provided value.
QueueAttributes Provides queue attributes for the message's receiving queue.
software.amazon.awssdk.services.sqs.model.Message provides the original Message from SQS

Here's an example with multiple arguments:


 @SqsListener("my-first-queue")
 public void listener(Message<Employee> message, MessageHeaders messageHeaders,
		 Visibility visibility, QueueAttributes queueAttributes, Acknowledgement acknowledgement,
		 software.amazon.awssdk.services.sqs.model.Message originalMessage) {
         
	System.out.println("received message : "  + message.getPayload());
	System.out.println("message header : " + messageHeaders.toString());

	acknowledgement.acknowledge();
	visibility.changeTo(10);

	String queueUrl = queueAttributes.getQueueUrl();
	System.out.println("queue url : " + queueUrl);

	System.out.println("original message : " + originalMessage.toString());
 }

Batch listeners support a single List<MyPojo> and List<Message<MyPojo>> method arguments, and an optional BatchAcknowledgement or AsyncBatchAcknowledgement arguments. MessageHeaders should be extracted from the Message instances through the getHeaders() method.(alert-warning)

Handling Failed Messages: Configuring a Dead-Letter Queue for Amazon SQS


Let's delve deeper into robust message handling. This (getButton) #text=(Dead-Letter Queue for Amazon SQS) #icon=(link) #color=(#35a576) post will guide you through configuring a Dead-Letter Queue (DLQ) for your SQS queues, ensuring that failed messages are properly captured and managed for troubleshooting and recovery.


Conclusion: Leveraging SQS with Spring Cloud AWS


In this blog post, we looked at how Amazon SQS and Spring Cloud AWS can work together to provide robust message queuing in your application. We looked into:


Creating a SQS Queue in the AWS Console: To establish the foundation, we demonstrated how to create your SQS queue.


Sending and Receiving Messages with SqsTemplate: We showed how to effectively send and retrieve messages using the Spring Cloud AWS SqsTemplate class.


Using the @SqlListener annotation: We looked into the @SqsListener annotation, which provides a strong method for asynchronous message ingestion from a SQS queue.


By combining these principles, you provide your Spring Cloud AWS application with the capability to:


Decouple services: Create loose coupling between services by outsourcing communication to an asynchronous message queue.


Increase scalability and fault tolerance: SQS makes it easier to handle traffic surges and allows individual services to continue processing messages even if they fail.


Improve message delivery reliability: SQS assures at-least-once delivery, guaranteeing that your messages reach their target.


By incorporating SQS into your Spring Cloud AWS environment, you acquire a powerful tool for creating strong, scalable, and resilient messaging-driven systems. We urge you to experiment and discover how SQS might help your apps communicate and manage messages more efficiently.

The complete code for this project is available on my GitHub repository (getButton) #text=(GitHub) #icon=(share) #color=(#000000)

Keep learning and keep growing.

Post a Comment

0 Comments
Post a Comment (0)

#buttons=(Accept !) #days=(20)

Our website uses cookies to enhance your experience. Learn More
Accept !
To Top