= SQS Integration :source-highlighter: highlight.js :highlightjs-theme: googlecode Amazon `Simple Queue Service` is a messaging service that provides point-to-point communication with queues. Spring Cloud AWS SQS integration offers support to receive and send messages using common `Spring` abstractions such as `@SqsListener`, `MessageListenerContainer` and `MessageListenerContainerFactory`. Compared to JMS or other message services Amazon SQS has limitations that should be taken into consideration. * Amazon SQS allows only `String` payloads, so any `Object` must be transformed into a String representation. Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS messages by converting them to JSON. * Amazon SQS has a maximum message size of 256kb per message, so bigger messages will fail to be sent. A Spring Boot starter is provided to auto-configure SQS integration beans. Maven coordinates, using <>: [source,xml] ---- io.awspring.cloud spring-cloud-aws-starter-sqs ---- == Sample Listener Application Below is a minimal sample application leveraging auto-configuration from `Spring Boot`. [source,java] ---- @SpringBootApplication public class SqsApplication { public static void main(String[] args) { SpringApplication.run(SqsApplication.class, args); } @SqsListener("myQueue") public void listen(String message) { System.out.println(message); } } ---- Without Spring Boot, it's necessary to import the `SqsBootstrapConfiguration` class in a `@Configuration`, as well as declare a `SqsMessageListenerContainerFactory` bean. [source, java] ---- public class Listener { @SqsListener("myQueue") public void listen(String message) { System.out.println(message); } } @Import(SqsBootstrapConfiguration.class) @Configuration public class SQSConfiguration { @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient()) .build(); } @Bean public SqsAsyncClient sqsAsyncClient() { return SqsAsyncClient.builder().build(); } @Bean public Listener listener() { return new Listener(); } } ---- == Sending Messages `Spring Cloud AWS` provides the `SqsTemplate` to send messages to `SQS`. [[sqs-template-send]] === SqsTemplate When using `Spring Boot` and autoconfiguration, a `SqsTemplate` instance is autowired by default in case no other template bean is found in the context. This template instance is backed by the autoconfigured `SqsAsyncClient`, with any configurations provided. `SqsTemplate` instances are immutable and thread-safe. NOTE: The endpoint to which the message will be sent can be either the queue name or URL. === Creating a SqsTemplate Instance `SqsTemplate` implements two `Operations` interfaces: `SqsOperations` contains blocking methods, and `SqsAsyncOperations` contains async methods that return `CompletableFuture` instances. In case only sync or async operations are to be used, the corresponding interface can be utilized to eliminate unnecessary methods. The following methods can be used to create new `SqsTemplate` instances with default options: ```java SqsTemplate template = SqsTemplate.newTemplate(sqsAsyncClient); SqsOperations blockingTemplate = SqsTemplate.newSyncTemplate(sqsAsyncClient); SqsAsyncOperations asyncTemplate = SqsTemplate.newAsyncTemplate(sqsAsyncClient); ``` NOTE: The returned object is always the `SqsTemplate`, and the separate methods are only for convenience of the interface return type. In case more complex configuration is required, a builder is also provided, and a set of options: ```java SqsTemplate template = SqsTemplate.builder() .sqsAsyncClient(this.asyncClient) .configure(options -> options .acknowledgementMode(TemplateAcknowledgementMode.MANUAL) .defaultEndpointName("my-queue")) .build(); ``` The builder also offers the `buildSyncTemplate()` method to return the template as `SqsOperations`, and `buildAsyncTemplate()` to return it as `SqsAsyncOperations`. === Template Options The following options can be configured through the `options` object. The defaults are applied in case no other value is provided as a parameter in the operation method. ==== TemplateOptions Descriptions [cols="12,5,5,16", options="header"] |=== | Name | Type | Default | Description |`acknowledgementMode` |TemplateAcknowledgementMode |TemplateAcknowledgementMode #ACKNOWLEDGE_ON_RECEIVE |Whether messages should be acknowledged by the template after being received. Messages can be acknowledged later by using the `Acknowledgement#acknowledge` methods. See <> for more information. |`sendBatchFailureHandlingStrategy` |SendBatchFailureStrategy |SendBatchFailureStrategy #THROW |Whether a `SendBatchOperationFailedException` containing a `SendResult.Batch` instance should be thrown if at least one message from a sent batch fails to be delivered. With SendBatchFailureStrategy#DO_NOT_THROW, the `SendResult.Batch` object is returned. |`defaultPollTimeout` |Duration |10 seconds |The default maximum time to wait for messages when performing a receive request to SQS. See <> for more information. |`defaultMaxNumberOfMessages` |Integer |10 |The default maximum of messages to be returned by a receive request to SQS. See <> for more information. |`defaultEndpointName` |String |blank |The default endpoint name to be used by the template. See <> for more information. |`defaultPayloadClass` |Class |null |The default class to which payloads should be converted to. Note that messages sent with the `SqsTemplate` by default contains a header with the type information, so no configuration is needed. See <> for more information. |`additionalHeaderForReceive` |String, Object |empty |Set a single header to be added to all messages received by this template. |`additionalHeadersForReceive` |Map |empty |Set headers to be added to all messages received by this template. |`queueNotFoundStrategy` |QueueNotFoundStrategy |QueueNotFoundStrategy #CREATE |Set the strategy to use in case a queue is not found. With `QueueNotFoundStrategy#FAIL`, an exception is thrown in case a queue is not found. |`queueAttributeNames` |Collection |empty |Set the queue attribute names that will be retrieved. Such attributes are available as `MessageHeaders` in received messages. |`messageAttributeNames` |Collection |All |Set the message attribute names that will be retrieved with messages on receive operations. Such attributes are available as `MessageHeaders` in received messages. |`messageSystemAttributeNames` |Collection |All |Set the message system attribute names that will be retrieved with messages on receive operations. Such attributes are available as `MessageHeaders` in received messages. |=== === Sending Messages There are a number of available methods to send messages to SQS queues using the `SqsTemplate`. The following methods are available through the `SqsOperations` interface, with the respective `async` counterparts available in the `SqsAsyncOperations`. ```java // Send a message to the configured default endpoint. SendResult send(T payload); // Send a message to the provided queue with the given payload. SendResult send(String queue, T payload); // Send the given Message to the provided queue. SendResult send(String queue, Message message); // Send a message with the provided options. SendResult send(Consumer to); // Send a batch of Messages to the provided queue SendResult.Batch sendMany(String queue, Collection> messages); ``` An example using the `options` variant follows: ```java SendResult result = template.send(to -> to.queue("myQueue") .payload("myPayload") .header("myHeaderName", "myHeaderValue") .headers(Map.of("myOtherHeaderName", "myOtherHeaderValue")) .delaySeconds(10) ); ``` NOTE: To send messages to a Fifo queue, the options include `messageDeduplicationId` and `messageGroupId` properties. If `messageGroupId` is not provided, a random UUID is generated by the framework. If `messageDeduplicationId` is not provided and content deduplication is disabled on AWS, a random UUID is generated. The generated values can be retrieved in the headers of the `Message` contained in the `SendResult` object. ==== SendResult The `SendResult` record contains useful information on the send operation. ```java public record SendResult(UUID messageId, String endpoint, Message message, Map additionalInformation) { public record Batch(Collection> successful, Collection> failed) {} public record Failed (String errorMessage, String endpoint, Message message, Map additionalInformation) {} } ``` When the send operation is successful, the `SendResult` object is created with: * the `messageId` returned from `SQS` for the message * the `endpoint` the message was sent to * the `Message` instance that was sent, with any additional headers that might have been added by the framework * an `additionalInformation` map with the `sequenceNumber` generated for the message in `Fifo` queues. When the send operation fails for single message operations, a `MessagingOperationFailedException` containing the message is thrown. For `Batch` send operations, a `SendResult.Batch` object is returned. This object contains a `Collection` of `successful` and `failed` results. In case there are messages that failed to be sent within a batch, corresponding `SendResult.Failed` objects are generated. The `SendBatch.Failed` object contains: * the `errorMessage` returned by SQS * the `endpoint` the message was to be sent to * the `Message` instance that was tried to be sent, with any additional headers that might have been added by the framework * an `additionalInformation` map with the `code` and `senderFault` parameters returned by SQS. By default, if there's at least one failed message in a send batch operation, a `SendBatchOperationFailedException` will be thrown. Such exception contains a `SendResult.Batch` property containing both successful and failed messages. This behavior can be configured using the `sendBatchFailureHandlingStrategy` option when creating the template. If `SendBatchFailureStrategy#DO_NOT_THROW` is configured, no exception is thrown and a `SendResult.Batch` object containing both successful and failed messages is returned. For convenience, the `additionalInformation` parameters can be found as constants in the `SqsTemplateParameters` class. [[template-message-conversion]] === Template Message Conversion Message conversion by default is handled by a `SqsMessagingMessageConverter` instance, which contains: * `SqsHeaderMapper` for mapping headers to and from `messageAttributes` * `CompositeMessageConverter` with a `StringMessageConverter` and a `MappingJackson2MessageConverter` for converting payloads to and from JSON. A custom `MessagingMessageConverter` implementation can be provided in the `SqsTemplate.builder()`: ```java SqsOperations template = SqsTemplate .builder() .sqsAsyncClient(sqsAsyncClient) .messageConverter(converter) .buildSyncTemplate(); ``` The default `SqsMessagingMessageConverter` instance can also be configured in the builder: ```java SqsOperations template = SqsTemplate .builder() .sqsAsyncClient(sqsAsyncClient) .configureDefaultConverter(converter -> { converter.setObjectMapper(objectMapper); converter.setHeaderMapper(headerMapper); converter.setPayloadTypeHeader("my-custom-type-header"); } ) .buildSyncTemplate(); ``` ==== Specifying a Payload Class for Receive Operations By default, the `SqsTemplate` adds a header with name `JavaType` containing the fully qualified name of the payload class to all messages sent. Such header is used in receive operations by the `SqsTemplate`, `SqsMessageListenerContainer` and `@SqsListener` to identify to which class the payload should be deserialized to. This behavior can be configured in the `SqsMessagingMessageConverter` using the `setPayloadTypeHeaderValueFunction` method. The function receives a `Message` object and returns a `String` with the value to be used in the header, the payload's class `FQCN` by default. If `null` is returned by the function, no header with type information is added. The `typeHeaderName` can be configured using the `setPayloadTypeHeader` method. In case type mapping information is not available, the payload class can be specified either in the <> or in the `receive()` method variants: ```java Optional> receivedMessage = template .receive(queue, SampleRecord.class); ``` == Receiving Messages The framework offers the following options to receive messages from a queue. [[template-receive]] === SqsTemplate The `SqsTemplate` offers convenient methods to receive messages from `Standard` and `Fifo` SQS queues. These methods are separated into two interfaces that are implemented by `SqsTemplate`: `SqsOperations` and `SqsAsyncOperations`. If only `sync` or `async` operations are to be used, using the specific interface can narrow down the methods. See <> for more information on the interfaces, <> and <>. The following methods are available through the `SqsOperations` interface, with the respective `async` counterparts available in the `SqsAsyncOperations`. ```java // Receive a message from the configured default endpoint and options. Optional> receive(); // Receive a message from the provided queue and convert the payload to the provided class. Optional> receive(String queue, Class payloadClass); // Receive a message with the provided options. Optional> receive(Consumer from); // Receive a message with the provided options and convert the payload to the provided class. Optional> receive(Consumer from, Class payloadClass); // Receive a batch of messages from the configured default endpoint and options. Collection> receiveMany(); // Receive a batch of messages from the provided queue and convert the payloads to the provided class. Collection> receiveMany(String queue, Class payloadClass); // Receive a batch of messages with the provided options. Collection> receiveMany(Consumer from); // Receive a batch of messages with the provided options and convert the payloads to the provided class. Collection> receiveMany(Consumer from, Class payloadClass); ``` The following is an example for receiving a message with options: ```java Optional> receivedMessage = template .receive(from -> from.queue("my-queue") .visibilityTimeout(Duration.ofSeconds(10)) .pollTimeout(Duration.ofSeconds(5)) .additionalHeaders(Map.of("my-custom-header-name", "my-custom-header-value")), SampleRecord.class); ``` NOTE: To receive messages from a Fifo queue, the options include a `receiveRequestAttemptId` parameter. If no such parameter is provided, a random one is generated. [[sqs-template-acknowledgement]] ==== SqsTemplate Acknowledgement The `SqsTemplate` by default acknowledges all received messages, which can be changed by setting `TemplateAcknowledgementMode.MANUAL` in the template options: ```java SqsTemplate.builder().configure(options -> options.acknowledgementMode(TemplateAcknowledgementMode.MANUAL)); ``` If an error occurs during acknowledgement, a `SqsAcknowledgementException` is thrown, containing both the messages that were successfully acknowledged and those which failed. To acknowledge messages received with `MANUAL` acknowledgement, the `Acknowledgement#acknowledge` and `Acknowledgement#acknowledgeAsync` methods can be used. === Message Listeners To receive messages in a manually created container, a `MessageListener` or `AsyncMessageListener` must be provided. Both interfaces come with `single message` and a `batch` methods. These are functional interfaces and a lambda or method reference can be provided for the single message methods. Single message / batch modes and message payload conversion can be configured via `SqsContainerOptions`. See <> for more information. [source, java] ---- @FunctionalInterface public interface MessageListener { void onMessage(Message message); default void onMessage(Collection> messages) { throw new UnsupportedOperationException("Batch not implemented by this MessageListener"); } } ---- [source, java] ---- @FunctionalInterface public interface AsyncMessageListener { CompletableFuture onMessage(Message message); default CompletableFuture onMessage(Collection> messages) { return CompletableFutures .failedFuture(new UnsupportedOperationException("Batch not implemented by this AsyncMessageListener")); } } ---- === SqsMessageListenerContainer The `MessageListenerContainer` manages the entire messages` lifecycle, from polling, to processing, to acknowledging. It can be instantiated directly, using a `SqsMessageListenerContainerFactory`, or using `@SqsListener` annotations. If declared as a `@Bean`, the `Spring` context will manage its lifecycle, starting the container on application startup and stopping it on application shutdown. See <> for more information. It implements the `MessageListenerContainer` interface: [source,java] ---- public interface MessageListenerContainer extends SmartLifecycle { String getId(); void setId(String id); void setMessageListener(MessageListener messageListener); void setAsyncMessageListener(AsyncMessageListener asyncMessageListener); } ---- NOTE: The generic parameter `` stands for the `payload type` of messages to be consumed by this container. This allows ensuring at compile-time that all components used with the container are for the same type. If more than one payload type is to be used by the same container or factory, simply type it as `Object`. This type is not considered for payload conversion. A container can be instantiated in a familiar Spring way in a `@Configuration` annotated class. For example: [source,java] ---- @Bean MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient) { SqsMessageListenerContainer container = new SqsMessageListenerContainer<>(sqsAsyncClient); container.setMessageListener(System.out::println); container.setQueueNames("myTestQueue"); return container; } ---- This framework also provides a convenient `Builder` that allows a different approach, such as: [source,java] ---- @Bean MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainer .builder() .sqsAsyncClient(sqsAsyncClient) .messageListener(System.out::println) .queueNames("myTestQueue") .build(); } ---- The container's lifecycle can also be managed manually: [source,java] ---- void myMethod(SqsAsyncClient sqsAsyncClient) { SqsMessageListenerContainer container = SqsMessageListenerContainer .builder() .sqsAsyncClient(sqsAsyncClient) .messageListener(System.out::println) .queueNames("myTestQueue") .build(); container.start(); container.stop(); } ---- === SqsMessageListenerContainerFactory A `MessageListenerContainerFactory` can be used to create `MessageListenerContainer` instances, both directly or through `@SqsListener` annotations. It can be created in a familiar `Spring` way, such as: [source, java,indent=0] ---- @Bean SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); factory.setSqsAsyncClient(sqsAsyncClient); return factory; } ---- Or through the `Builder`: [source,java] ---- @Bean SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient) .build(); } ---- IMPORTANT: Using this method for setting the `SqsAsyncClient` instance in the factory, all containers created by this factory will share the same `SqsAsyncClient` instance. For high-throughput applications, a `Supplier` can be provided instead through the factory's `setSqsAsyncClientSupplier` or the builder's `sqsAsyncSupplier` methods. In this case each container will receive a `SqsAsyncClient` instance. Alternatively, a single `SqsAsyncClient` instance can be configured for higher throughput. See the AWS documentation for more information on tradeoffs of each approach. The factory can also be used to create a container directly, such as: [source,java] ---- @Bean MessageListenerContainer myListenerContainer(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient) .messageListener(System.out::println) .build() .createContainer("myQueue"); } ---- === @SqsListener Annotation The simplest way to consume `SQS` messages is by annotating a method in a `@Component` class with the `@SqsListener` annotation. The framework will then create the `MessageListenerContainer` and set a `MessagingMessageListenerAdapter` to invoke the method when a message is received. When using `Spring Boot` with `auto-configuration`, no configuration is necessary. Most attributes on the annotation can be resolved from SpEL `(#{...})` or property placeholders `(${...})`. ==== Queue Names One or more queues can be specified in the annotation through the `queueNames` or `value` properties - there's no distinction between the two properties. Instead of queue names, queue urls can also be provided. Using urls instead of queue names can result in slightly faster startup times since it prevents the framework from looking up the queue url when the containers start. [source, java] ---- @SqsListener({"${my.queue.url}", "myOtherQueue"}) public void listenTwoQueues(String message) { System.out.println(message); } ---- Any number of `@SqsListener` annotations can be used in a bean class, and each annotated method will be handled by a separate `MessageListenerContainer`. NOTE: Queues declared in the same annotation will share the container, though each will have separate throughput and acknowledgement controls. ===== Specifying a MessageListenerContainerFactory A `MessageListenerContainerFactory` can be specified through the `factory` property. Such factory will then be used to create the container for the annotated method. If not specified, a factory with the `defaultSqsListenerContainerFactory` name will be looked up. For changing this default name, see <>. [source,java] ---- @SqsListener(queueNames = "myQueue", factory = "myFactory") public void listen(String message) { System.out.println(message); } ---- When using a `Spring Boot` application with `auto-configuration`, a default factory is provided if there are no other factory beans declared in the context. ==== Other Annotation Properties The following properties can be specified in the `@SqsListener` annotation. Such properties override the equivalent `SqsContainerOptions` for the resulting `MessageListenerContainer`. - `id` - Specify the resulting container's id. This can be used for fetching the container from the `MessageListenerContainerRegistry`, and is used by the container and its components for general logging and thread naming. - `maxConcurrentMessages` - Set the maximum number of messages that can be `inflight` at any given moment. See <> for more information. - `pollTimeoutSeconds` - Set the maximum time to wait before a poll returns from SQS. Note that if there are messages available the call may return earlier than this setting. - `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll. Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener. See <> for more information. - `acknowledgementMode` - Set the acknowledgement mode for the container. If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options. See <> for more information. ==== Listener Method Arguments A number of possible argument types are allowed in the listener method's signature. - `MyPojo` - POJO types are automatically deserialized from JSON. - `Message` - Provides a `Message` instance with the deserialized payload and `MessageHeaders`. - `List` - Enables batch mode and receives the batch that was polled from SQS. - `List>` - Enables batch mode and receives the batch that was polled from SQS along with headers. - `@Header(String headerName)` - provides the specified header. - `@Headers` - provides the `MessageHeaders` or a `Map` - `Acknowledgement` - provides methods for manually acknowledging messages for single message listeners. AcknowledgementMode must be set to `MANUAL` (see <>) - `BatchAcknowledgement` - provides methods for manually acknowledging partial or whole message batches for batch listeners. AcknowledgementMode must be set to `MANUAL` (see <>) - `Visibility` - provides the `changeTo()` method that enables changing the message's visibility to the provided value. - `QueueAttributes` - provides queue attributes for the queue that received the message. See <> for how to specify the queue attributes that will be fetched from `SQS` - `software.amazon.awssdk.services.sqs.model.Message` - provides the original `Message` from `SQS` Here's a sample with many arguments: [source, java] ---- @SqsListener("${my-queue-name}") public void listen(Message message, MyPojo pojo, MessageHeaders headers, Acknowledgement ack, Visibility visibility, QueueAttributes queueAttributes, software.amazon.awssdk.services.sqs.model.Message originalMessage) { Assert.notNull(message); Assert.notNull(pojo); Assert.notNull(headers); Assert.notNull(ack); Assert.notNull(visibility); Assert.notNull(queueAttributes); Assert.notNull(originalMessage); } ---- IMPORTANT: Batch listeners support a single `List` and `List>` method arguments, and an optional `BatchAcknowledgement` or `AsyncBatchAcknowledgement` arguments. `MessageHeaders` should be extracted from the `Message` instances through the `getHeaders()` method. === Batch Processing All message processing interfaces have both `single message` and `batch` methods. This means the same set of components can be used to process both single and batch methods, and share logic where applicable. When batch mode is enabled, the framework will serve the entire result of a poll to the listener. If a value greater than 10 is provided for `maxMessagesPerPoll`, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener. To enable batch processing using `@SqsListener`, a single `List` or `List>` method argument should be provided in the listener method. The listener method can also have an optional `BatchAcknowledgement` argument for `AcknowledgementMode.MANUAL`. Alternatively, `SqsContainerOptions` can be set to `ListenerMode.BATCH` in the `SqsContainerOptions` in the factory or container. NOTE: The same factory can be used to create both `single message` and `batch` containers for `@SqsListener` methods. IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods. === Container Options Each `MessageListenerContainer` can have a different set of options. `MessageListenerContainerFactory` instances have a `SqsContainerOptions.Builder` instance property that is used as a template for the containers it creates. Both factory and container offer a `configure` method that can be used to change the options: [source, java] ---- @Bean SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .configure(options -> options .messagesPerPoll(5) .pollTimeout(Duration.ofSeconds(10))) .sqsAsyncClient(sqsAsyncClient) .build(); } ---- [source, java] ---- @Bean MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainer .builder() .configure(options -> options .messagesPerPoll(5) .pollTimeout(Duration.ofSeconds(10))) .sqsAsyncClient(sqsAsyncClient) .messageListener(System.out::println) .queueNames("myTestQueue") .build(); } ---- The `SqsContainerOptions` instance is immutable and can be retrieved via the `container.getContainerOptions()` method. If more complex configurations are necessary, the `toBuilder` and `fromBuilder` methods provide ways to create a new copy of the options, and then set it back to the factory or container: [source, java] ---- void myMethod(MessageListenerContainer container) { SqsContainerOptions.Builder modifiedOptions = container.getContainerOptions() .toBuilder() .pollTimeout(Duration.ofSeconds(5)) .shutdownTimeout(Duration.ofSeconds(20)); container.configure(options -> options.fromBuilder(modifiedOptions)); } ---- A copy of the options can also be created with `containerOptions.createCopy()` or `containerOptionsBuilder.createCopy()`. ==== Using Auto-Configuration The Spring Boot Starter for SQS provides the following auto-configuration properties: [cols="2,3,1,1"] |=== | Name | Description | Required | Default value | `spring.cloud.aws.sqs.enabled` | Enables the SQS integration. | No | `true` | `spring.cloud.aws.sqs.endpoint` | Configures endpoint used by `SqsAsyncClient`. | No | `http://localhost:4566` | `spring.cloud.aws.sqs.region` | Configures region used by `SqsAsyncClient`. | No | `eu-west-1` | <> | Maximum number of inflight messages per queue. | No | 10 | <> | Maximum number of messages to be received per poll. | No | 10 | <> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds |=== ==== SqsContainerOptions Descriptions [cols="13,9,9,16", options="header"] |=== | Property | Range | Default | Description |<> |1 - `Integer.MAX_VALUE` |10 |The maximum number of messages from each queue that can be processed simultaneously in this container. This number will be used for defining the thread pool size for the container following (maxConcurrentMessages * number of queues). For batching acknowledgements a message is considered as no longer inflight when it's handed to the acknowledgement queue. See <>. |<> |1 - `Integer.MAX_VALUE` |10 |The maximum number of messages that will be received by a poll to a SQS queue in this container. If a value greater than 10 is provided, the result of multiple polls will be combined, which can be useful for batch listeners. See AWS documentation for more information. |<> |1 - 10 seconds |10 seconds |The maximum duration for a poll to a SQS queue before returning empty. Longer polls decrease the chance of empty polls when messages are available. See AWS documentation for more information. |<> |1 - 10 seconds |10 seconds |The maximum time the framework will wait for permits to be available for a queue before attempting the next poll. After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured. See <>. |`autoStartup` |true, false |true |Determines wherever container should start automatically. When set to false the container will not launch on startup, requiring manual intervention to start it. See <>. |`listenerShutdownTimeout` |0 - undefined |10 seconds |The amount of time the container will wait for a queue to complete message processing before attempting to forcefully shutdown. See <>. |`acknowledgementShutdownTimeout` |0 - undefined |10 seconds |The amount of time the container will wait for acknowledgements to complete for a queue after message processing has ended. See <>. |`backPressureMode` |`AUTO`, `ALWAYS_POLL_MAX_MESSAGES`, `FIXED_HIGH_THROUGHPUT` |`AUTO` |Configures the backpressure strategy to be used by the container. See <>. |`listenerMode` |`SINGLE_MESSAGE`, `BATCH` |`SINGLE_MESSAGE` |Configures whether this container will use `single message` or `batch` listeners. This value is overriden by `@SqsListener` depending on whether the listener method contains a `List` argument. See <>. |`queueAttributeNames` |`Collection` |Empty list |Configures the `QueueAttributes` that will be retrieved from SQS when a container starts. See <>. |`messageAttributeNames` |`Collection` |`ALL` |Configures the `MessageAttributes` that will be retrieved from SQS for each message. See <>. |`messageSystemAttributeNames` |`Collection` |`ALL` |Configures the `MessageSystemAttribute` that will be retrieved from SQS for each message. See <>. |`messageConverter` |`MessagingMessageConverter` |`SqsMessagingMessageConverter` |Configures the `MessagingMessageConverter` that will be used to convert SQS messages into Spring Messaging Messages. See <>. |`acknowledgementMode` |`ON_SUCCESS`, `ALWAYS`, `MANUAL` |`ON_SUCCESS` |Configures the processing outcomes that will trigger automatic acknowledging of messages. See <>. |`acknowledgementInterval` |0 - undefined |`1 second` for `Standard SQS`, `Duration.ZERO` for `FIFO SQS` |Configures the interval between acknowledges for batching. Set to `Duration.ZERO` along with `acknowledgementThreshold` to zero to enable `immediate acknowledgement` See <>. |`acknowledgementThreshold` |0 - undefined |`10` for `Standard SQS`, `0` for `FIFO SQS` |Configures the minimal amount of messages in the acknowledgement queue to trigger acknowledgement of a batch. Set to zero along with `acknowledgementInterval` to `Duration.ZERO` to enable `immediate acknowledgement`. See <>. |`acknowledgementOrdering` |`PARALLEL`, `ORDERED` |`PARALLEL` for `Standard SQS` and `FIFO` queues with immediate acknowledgement, `ORDERED` for `FIFO` queues with acknowledgement batching enabled. |Configures the order acknowledgements should be made. Fifo queues can be acknowledged in parallel for immediate acknowledgement since the next message for a message group will only start being processed after the previous one has been acknowledged. See <>. |`componentsTaskExecutor` |`TaskExecutor` |`null` |Provides a `TaskExecutor` instance to be used by the `MessageListenerContainer` internal components. See <>. |`acknowledgementResultTaskExecutor` |`TaskExecutor` |`null` |Provides a `TaskExecutor` instance to be used by the `AcknowledgementProcessor` for blocking `AcknowledgementResultCallback`. See <>. |`messageVisibility` |`Duration` |`null` |Specify the message visibility duration for messages polled in this container. For `FIFO` queues, visibility is extended for all messages in a message group before each message is processed. See <>. Otherwise, visibility is specified once when polling SQS. |`queueNotFoundStrategy` |`FAIL`, `CREATE` |`CREATE` |Configures the behavior when a queue is not found at container startup. See <>. |=== === Retrieving Attributes from SQS `QueueAttributes`, `MessageAttributes` and `MessageSystemAttributes` can be retrieved from SQS. These can be configured using the `SqsContainerOptions` `queueAttributeNames`, `messageAttributeNames` and `messageSystemAttributeNames` methods. `QueueAttributes` for a queue are retrieved when containers start, and can be looked up by adding the `QueueAttributes` method parameter in a `@SqsListener` method, or by getting the `SqsHeaders.SQS_QUEUE_ATTRIBUTES_HEADER` header. `MessageAttributes` and `MessageSystemAttributes` are retrieved with each message, and are mapped to message headers. Those can be retrieved with `@Header` parameters, or directly in the `Message`. The message headers are prefixed with `SqsHeaders.SQS_MA_HEADER_PREFIX` ("Sqs_MA_") for message attributes and `SqsHeaders.SQS_MSA_HEADER_PREFIX` ("Sqs_MSA_") for message system attributes. NOTE: By default, no `QueueAttributes` and `ALL` `MessageAttributes` and `MessageSystemAttributes` are retrieved. === Container Lifecycle The `MessageListenerContainer` interface extends `SmartLifecycle`, which provides methods to control the container's lifecycle. Containers created from `@SqsListener` annotations are registered in a `MessageListenerContainerRegistry` bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown. NOTE: The `DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase` method. The default value is `MessageListenerContainer.DEFAULT_PHASE`. At startup, the containers will make requests to `SQS` to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving `QueueAttributes` if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there's no need for such requests. NOTE: If retrieving the queue url fails due to the queue not existing, the framework can be configured to either create the queue or fail. If a URL is provided instead of a queue name the framework will not make this request at startup, and thus if the queue does not exist it will fail at runtime. This configuration is available in `SqsContainerOptions` `queueNotFoundStrategy.` At shutdown, by default containers will wait for all polling, processing and acknowledging operations to finish, up to `SqsContainerOptions.getShutdownTimeout()`. After this period, operations will be canceled and the container will attempt to forcefully shutdown. ==== Containers as Spring Beans Manually created containers can be registered as beans, e.g. by declaring a `@Bean` in a `@Configuration` annotated class. In these cases the containers lifecycle will be managed by the `Spring` context at application startup and shutdown. [source, java] ---- @Bean MessageListenerContainer listenerContainer(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainer .builder() .sqsAsyncClient(sqsAsyncClient) .messageListener(System.out::println) .queueNames("myTestQueue") .build(); } ---- NOTE: The `SqsMessageListenerContainer.builder()` allows to specify the `SmartLifecycle.phase`, to override the default value defined in `MessageListenerContainer.DEFAULT_PHASE` ==== Retrieving Containers from the Registry Containers can be retrieved by fetching the `MessageListenerContainer` bean from the container and using the `getListenerContainers` and `getContainerById` methods. Then lifecycle methods can be used to start and stop instances. [source,java] ---- @Autowired MessageListenerContainerRegistry registry; public void myLifecycleMethod() { MessageListenerContainer container = registry.getContainerById("myId"); container.stop(); container.start(); } ---- ==== Lifecycle Execution By default, all lifecycle actions performed by the `MessageListenerContainerRegistry` and internally by the `MessageListenerContainer` instances are executed in parallel. This behavior can be disabled by setting `LifecycleHandler.get().setParallelLifecycle(false)`. NOTE: Spring-managed `MessageListenerContainer` beans' lifecycle actions are always performed sequentially. === FIFO Support `FIFO` SQS queues are fully supported for receiving messages - queues with names that ends in `.fifo` will automatically be setup as such. * Messages are polled with a `receiveRequestAttemptId`, and the received batch of messages is split according to the message`s `MessageGroupId`. * Each message from a given group will then be processed in order, while each group is processed in parallel. * If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility` expires. * Messages which were already successfully processed and acknowledged will not be served again. * If a `batch` listener is used, each message group from a poll will be served as a batch to the listener method. * `FIFO` queues also have different defaults for acknowledging messages, see <> for more information. * If a `message visibility` is set through `@SqsListener` or `SqsContainerOptions`, visibility will be extended for all messages in the message group before each message is processed. IMPORTANT: A `MessageListenerContainer` can either have only `Standard` queues or `FIFO` queues - not both. This is valid both for manually created containers and `@SqsListener` annotated methods. == Message Interceptor The framework offers the `MessageInterceptor` and the `AsyncMessageInterceptor` interfaces: [source, java] ---- public interface MessageInterceptor { default Message intercept(Message message) { return message; } default Collection> intercept(Collection> messages) { return messages; } default void afterProcessing(Message message, Throwable t) { } default void afterProcessing(Collection> messages, Throwable t) { } } ---- [source, java] ---- public interface AsyncMessageInterceptor { default CompletableFuture> intercept(Message message) { return CompletableFuture.completedFuture(message); } default CompletableFuture>> intercept(Collection> messages) { return CompletableFuture.completedFuture(messages); } default CompletableFuture afterProcessing(Message message, Throwable t) { return CompletableFuture.completedFuture(null); } default CompletableFuture afterProcessing(Collection> messages, Throwable t) { return CompletableFuture.completedFuture(null); } } ---- When using the auto-configured factory, simply declare a `@Bean` and the interceptor will be set: [source, java] ---- @Bean public MessageInterceptor messageInterceptor() { return new MessageInterceptor() { @Override public Message intercept(Message message) { return MessageBuilder .fromMessage(message) .setHeader("newHeader", "newValue") .build(); } }; } ---- Alternatively, implementations can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: [source, java] ---- @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) .messageInterceptor(new MessageInterceptor() { @Override public Message intercept(Message message) { return MessageBuilder .fromMessage(message) .setHeader("newHeader", "newValue") .build(); } }) .build(); } ---- NOTE: Multiple interceptors can be added to the same factory or container. The `intercept` methods are executed `before` a message is processed, and a different message can be returned. IMPORTANT: In case a different message is returned, it's important to add the `SqsHeaders.SQS_RECEIPT_HANDLE_HEADER` with the value of the original handler so the original message is acknowledged after processing. Also, a `SqsHeaders.SQS_MESSAGE_ID_HEADER` must always be present. IMPORTANT: The `intercept` methods must not return null. The `afterProcessing` methods are executed after message is processed and the `ErrorHandler` is invoked, but before the message is acknowledged. == Error Handling By default, messages that have an error thrown by the listener will not be acknowledged, and the message can be polled again after `visibility timeout` expires. Alternatively, the framework offers the `ErrorHandler` and `AsyncErrorHandler` interfaces, which are invoked after a listener execution fails. [source, java] ---- public interface ErrorHandler { default void handle(Message message, Throwable t) { } default void handle(Collection> messages, Throwable t) { } } ---- [source, java] ---- public interface AsyncErrorHandler { default CompletableFuture handle(Message message, Throwable t) { return CompletableFutures.failedFuture(t); } default CompletableFuture handle(Collection> messages, Throwable t) { return CompletableFutures.failedFuture(t); } } ---- When using the auto-configured factory, simply declare a `@Bean` and the error handler will be set: [source, java] ---- @Bean public ErrorHandler errorHandler() { return new ErrorHandler() { @Override public void handle(Message message, Throwable t) { // error handling logic // throw if the message should not be acknowledged } }} ---- Alternatively, implementations can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: [source, java] ---- @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) .errorHandler(new ErrorHandler() { @Override public void handle(Message message, Throwable t) { // error handling logic } }) .build(); } ---- If the error handler execution succeeds, i.e. no error is thrown from the error handler, the message is considered to be recovered and is acknowledged according to the acknowledgement configuration. IMPORTANT: If the message should not be acknowledged and the `ON_SUCCESS` acknowledgement mode is set, it's important to propagate the error. For simply executing an action in case of errors, an `interceptor` should be used instead, checking the presence of the `throwable` argument for detecting a failed execution. == Message Conversion and Payload Deserialization Payloads are automatically deserialized from `JSON` for `@SqsListener` annotated methods using a `MappingJackson2MessageConverter`. NOTE: When using Spring Boot's auto-configuration, if there's a single `ObjectMapper` in Spring Context, such object mapper will be used for converting messages. This includes the one provided by Spring Boot's auto-configuration itself. For configuring a different `ObjectMapper`, see <>. For manually created `MessageListeners`, `MessageInterceptor` and `ErrorHandler` components, or more fine-grained conversion such as using `interfaces` or `inheritance` in listener methods, type mapping is required for payload deserialization. By default, the framework looks for a `MessageHeader` named `Sqs_MA_JavaType` containing the fully qualified class name (`FQCN`) for which the payload should be deserialized to. If such header is found, the message is automatically deserialized to the provided class. Further configuration can be achieved by providing a configured `MessagingMessageConverter` instance in the `SqsContainerOptions`. NOTE: If type mapping is setup or type information is added to the headers, payloads are deserialized right after the message is polled. Otherwise, for `@SqsListener` annotated methods, payloads are deserialized right before the message is sent to the listener. For providing custom `MessageConverter` instances to be used by `@SqsListener` methods, see <> === Configuring a MessagingMessageConverter The framework provides the `SqsMessagingMessageConverter`, which implements the `MessagingMessageConverter` interface. [source, java] ---- public interface MessagingMessageConverter { Message toMessagingMessage(S source); S fromMessagingMessage(Message message); } ---- The default header-based type mapping can be configured to use a different header name by using the `setPayloadTypeHeader` method. More complex mapping can be achieved by using the `setPayloadTypeMapper` method, which overrides the default header-based mapping. This method receives a `Function, Class> payloadTypeMapper` that will be applied to incoming messages. The default `MappingJackson2MessageConverter` can be replaced by using the `setPayloadMessageConverter` method. The framework also provides the `SqsHeaderMapper`, which implements the `HeaderMapper` interface and is invoked by the `SqsMessagingMessageConverter`. To provide a different `HeaderMapper` implementation, use the `setHeaderMapper` method. An example of such configuration follows: [source, java] ---- // Create converter instance SqsMessagingMessageConverter messageConverter = new SqsMessagingMessageConverter(); // Configure Type Header messageConverter.setPayloadTypeHeader("myTypeHeader"); // Configure Header Mapper SqsHeaderMapper headerMapper = new SqsHeaderMapper(); headerMapper.setAdditionalHeadersFunction(((sqsMessage, accessor) -> { accessor.setHeader("myCustomHeader", "myValue"); return accessor.toMessageHeaders(); })); messageConverter.setHeaderMapper(headerMapper); // Configure Payload Converter MappingJackson2MessageConverter payloadConverter = new MappingJackson2MessageConverter(); payloadConverter.setPrettyPrint(true); messageConverter.setPayloadMessageConverter(payloadConverter); // Set MessageConverter to the factory or container factory.configure(options -> options.messageConverter(messageConverter)); ---- === Interfaces and Subclasses in Listener Methods Interfaces and subclasses can be used in `@SqsListener` annotated methods by configuring a `type mapper`: [source, java] ---- messageConverter.setPayloadTypeMapper(message -> { String eventTypeHeader = message.getHeaders().get("myEventTypeHeader", String.class); return "eventTypeA".equals(eventTypeHeader) ? MyTypeA.class : MyTypeB.class; }); ---- And then, in the listener method: [source, java] ---- @SpringBootApplication public class SqsApplication { public static void main(String[] args) { SpringApplication.run(SqsApplication.class, args); } // Retrieve the converted payload @SqsListener("myQueue") public void listen(MyInterface message) { System.out.println(message); } // Or retrieve a Message with the converted payload @SqsListener("myOtherQueue") public void listen(Message message) { System.out.println(message); } } ---- == Acknowledging Messages In `SQS` acknowledging a message is the same as deleting the message from the queue. A number of `Acknowledgement` strategies are available and can be configured via `SqsContainerOptions`. Optionally, a callback action can be added to be executed after either a successful or failed acknowledgement. Here's an example of a possible configuration: [source, java] ---- @Bean SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .configure(options -> options .acknowledgementMode(AcknowledgementMode.ALWAYS) .acknowledgementInterval(Duration.ofSeconds(3)) .acknowledgementThreshold(5) .acknowledgementOrdering(AcknowledgementOrdering.ORDERED) ) .sqsAsyncClient(sqsAsyncClient) .build(); } ---- Each option is explained in the following sections. NOTE: All options are available for both `single message` and `batch` message listeners. === Acknowledgement Mode - `ON_SUCCESS` - Acknowledges a message or batch of messages after successful processing. - `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error. - `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method. The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation. === Acknowledgement Batching The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching. Acknowledgements will be executed after either the amount of time specified in the `interval` or the number of messages to acknowledge reaches the `threshold`. Setting `acknowledgementInterval` to `Duration.ZERO` will disable the periodic acknowledgement, which will be executed only when the number of messages to acknowledge reaches the specified `acknowledgementThreshold`. Setting `acknowledgementThreshold` to `0` will disable acknowledging per number of messages, and messages will be acknowledged only on the specified `acknowldgementInterval` IMPORTANT: When using acknowledgement batching messages stay inflight for SQS purposes until their respective batch is acknowledged. `MessageVisibility` should be taken into consideration when configuring this strategy. ==== Immediate Acknowledging Setting both `acknowledgementInterval` and `acknowledgementThreshold` to `Duration.ZERO` and `0` respectively enables `Immediate Acknowledging`. With this configuration, messages are acknowledged sequentially after being processed, and the message is only considered processed after the message is successfully acknowledged. IMPORTANT: If an immediate acknowledging triggers an error, message processing is considered failed and will be retried after the specified `visibilityTimeout`. ==== Manual Acknowledgement Acknowledgements can be handled manually by setting `AcknowledgementMode.MANUAL` in the `SqsContainerOptions`. Manual acknowledgement can be used in conjunction with acknowledgement batching - the message will be queued for acknowledgement but won't be executed until one of the acknowledgement thresholds is reached. It can also be used in conjunction with immediate acknowledgement. The `Acknowledgement#acknowledge` and `Acknowledgement#acknowledgeAsync` methods are also available to acknowledge messages received in `MANUAL` acknowledgement mode. The following arguments can be used in listener methods to manually acknowledge: ==== `Acknowledgement` The `Acknowledgement` interface can be used to acknowledge messages in `ListenerMode.SINGLE_MESSAGE`. ```java public interface Acknowledgement { /** * Acknowledge the message. */ void acknowledge(); /** * Asynchronously acknowledge the message. */ CompletableFuture acknowledgeAsync(); } ``` ==== `BatchAcknowledgement` The `BatchAcknowledgement` interface can be used to acknowledge messages in `ListenerMode.BATCH`. The `acknowledge(Collection)` method enables acknowledging partial batches. ```java public interface BatchAcknowledgement { /** * Acknowledge all messages from the batch. */ void acknowledge(); /** * Asynchronously acknowledge all messages from the batch. */ CompletableFuture acknowledgeAsync(); /** * Acknowledge the provided messages. */ void acknowledge(Collection> messagesToAcknowledge); /** * Asynchronously acknowledge the provided messages. */ CompletableFuture acknowledgeAsync(Collection> messagesToAcknowledge); } ``` === Acknowledgement Ordering - `PARALLEL` - Acknowledges the messages as soon as one of the above criteria is met - many acknowledgement calls can be made in parallel. - `ORDERED` - One batch of acknowledgements will be executed after the previous one is completed, ensuring `FIFO` ordering for `batching` acknowledgements. - `ORDERED_BY_GROUP` - One batch of acknowledgements will be executed after the previous one for the same group is completed, ensuring `FIFO` ordering of acknowledgements with parallelism between message groups. Only available for `FIFO` queues. === Acknowledgement Defaults The defaults for acknowledging differ for `Standard` and `FIFO` SQS queues. ==== Standard SQS - Acknowledgement Interval: One second - Acknowledgement Threshold: Ten messages - Acknowledgement Ordering: `PARALLEL` ==== FIFO SQS - Acknowledgement Interval: Zero (Immediate) - Acknowledgement Threshold: Zero (Immediate) - Acknowledgement Ordering: `PARALLEL` if immediate acknowledgement, `ORDERED` if batching is enabled (one or both above defaults are overridden). NOTE: PARALLEL is the default for FIFO because ordering is guaranteed for processing. This assures no messages from a given `MessageGroup` will be polled until the previous batch is acknowledged. Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure. === Acknowledgement Result Callback The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. ```java public interface AcknowledgementResultCallback { default void onSuccess(Collection> messages) { } default void onFailure(Collection> messages, Throwable t) { } } ``` ```java public interface AsyncAcknowledgementResultCallback { default CompletableFuture onSuccess(Collection> messages) { return CompletableFuture.completedFuture(null); } default CompletableFuture onFailure(Collection> messages, Throwable t) { return CompletableFuture.completedFuture(null); } } ``` ```java @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient) .acknowledgementResultCallback(getAcknowledgementResultCallback()) .build(); } ``` NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queues, the callback will be executed **before** the next message in the batch is processed, and next message processing will wait for the callback completion. This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. For `batch parallel processing`, as is the default for `Standard` queues the callback execution happens asynchronously. == Global Configuration for @SqsListeners A set of configurations can be set for all containers from `@SqsListener` by providing `SqsListenerConfigurer` beans. [source, java] ---- @FunctionalInterface public interface SqsListenerConfigurer { void configure(EndpointRegistrar registrar); } ---- The following attributes can be configured in the registrar: - `setMessageHandlerMethodFactory` - provide a different factory to be used to create the `invocableHandlerMethod` instances that wrap the listener methods. - `setListenerContainerRegistry` - provide a different `MessageListenerContainerRegistry` implementation to be used to register the `MessageListenerContainers` - `setMessageListenerContainerRegistryBeanName` - provide a different bean name to be used to retrieve the `MessageListenerContainerRegistry` - `setObjectMapper` - set the `ObjectMapper` instance that will be used to deserialize payloads in listener methods. See <> for more information on where this is used. - `setValidator` - set the `Validator` instance that will be used for payload validation in listener methods. - `manageMessageConverters` - gives access to the list of message converters that will be used to convert messages. By default, `StringMessageConverter`, `SimpleMessageConverter` and `MappingJackson2MessageConverter` are used. - `manageArgumentResolvers` - gives access to the list of argument resolvers that will be used to resolve the listener method arguments. The order of resolvers is important - `PayloadMethodArgumentResolver` should generally be last since it's used as default. A simple example would be: [source, java] ---- @Bean SqsListenerConfigurer configurer(ObjectMapper objectMapper) { return registrar -> registrar.setObjectMapper(objectMapper); } ---- NOTE: Any number of `SqsListenerConfigurer` beans can be registered in the context. All instances will be looked up at application startup and iterated through. == Message Processing Throughput The following options are available for tuning the application's throughput. When a configuration is available both in the `SqsContainerOptions` and `@SqsListener` annotation, the annotation value takes precedence, if any. === SqsContainerOptions and `@SqsListener` properties ==== maxConcurrentMessages Can be set in either the `SqsContainerOptions` or the `@SqsListener` annotation. Represents the maximum number of messages being processed by the container at a given time. Defaults to 10. This value is enforced per queue, meaning the number of inflight messages in a container can be up to (number of queues in container * maxConcurrentMessages). NOTE: When using acknowledgement batching, a message is considered as no longer inflight when it's delivered to the acknowledgement queue. In this case, the actual number of inflight messages on AWS SQS console can be higher than the configured value. When using immediate acknowledgement, a message is considered as no longer inflight after it's been acknowledged or throws an error. ==== maxMessagesPerPoll Set in `SqsContainerOptions` or the `@SqsListener` annotation. Represents the maximum number of messages returned by a single poll to a SQS queue, to a maximum of 10. This value has to be less than or equal to `maxConcurrentMessages`. Defaults to 10. Note that even if the queue has more messages, a poll can return less messages than specified. See the AWS documentation for more information. ==== pollTimeout Can be set in either the `SqsContainerOptions` or the `@SqsListener` annotation. Represents the maximum duration of a poll. Higher values represent `long polls` and increase the probability of receiving full batches of messages. Defaults to 10 seconds. ==== maxDelayBetweenPolls Set in `SqsContainerOptions`. Represents the maximum amount of time the container will wait for `maxMessagesPerPoll` permits to be available before trying to acquire a partial batch if so configured. This wait is applied per queue and one queue has no interference in another in this regard. Defaults to 10 seconds. === Default Polling Behavior By default, the framework starts all queues in `low throughput mode`, where it will perform one poll for messages at a time. When a poll returns at least one message, the queue enters a `high throughput mode` where it will try to fulfill `maxConcurrentMessages` messages by making (maxConcurrentMessages / maxMessagesPerPoll) parallel polls to the queue. Any poll that returns no messages will trigger a `low throughput mode` again, until at least one message is returned, triggering `high throughput mode` again, and so forth. After `maxDelayBetweenPolls`, if `maxMessagesPerPoll` permits are not available, it'll poll for the difference, i.e. as many messages as have been processed so far, if any. E.g. Let's consider a scenario where the container is configured for: `maxConcurrentMessages` = 20, `maxMessagesPerPoll` = 10, `maxDelayBetweenPolls` = 5 seconds, and a `pollTimeout` = 10 seconds. The container starts in `low throughput mode`, meaning it'll attempt a single poll for 10 messages. If any messages are returned, it'll switch to `high throughput mode`, and will make up to 2 simultaneous polls for 10 messages each. If all 20 messages are retrieved, it'll not attempt any more polls until messages are processed. If after the 5 seconds for `maxDelayBetweenPolls` 6 messages have been processed, the framework will poll for the 6 messages. If the queue is depleted and a poll returns no messages, it'll enter `low throughput` mode again and perform only one poll at a time. === Configuring BackPressureMode The following `BackPressureMode` values can be set in `SqsContainerOptions` to configure polling behavior: * `AUTO` - The default mode, as described in the previous section. * `ALWAYS_POLL_MAX_MESSAGES` - Disables partial batch polling, i.e. if the container is configured for 10 messages per poll, it'll wait for 10 messages to be processed before attempting to poll for the next 10 messages. Useful for optimizing for fewer polls at the expense of throughput. * `FIXED_HIGH_THROUGHPUT` - Disables `low throughput mode`, while still attempting partial batch polling as described in the previous section. Useful for really high throughput scenarios where the risk of making parallel polls to an idle queue is preferable to an eventual switch to `low throughput mode` . NOTE: The `AUTO` setting should be balanced for most use cases, including high throughput ones. == Blocking and Non-Blocking (Async) Components The SQS integration leverages the `CompletableFuture`-based async capabilities of `AWS SDK 2.0` to deliver a fully non-blocking infrastructure. All processing involved in polling for messages, changing message visibilities and acknowledging messages is done in an async, non-blocking fashion. This allows a higher overall throughput for the application. When a `MessageListener`, `MessageInterceptor`, and `ErrorHandler` implementation is set to a `MesssageListenerContainer` or `MesssageListenerContainerFactory` these are adapted by the framework. This way, blocking and non-blocking components can be used in conjunction with each other. Listener methods annotated with `@SqsListener` can either return a simple value, e.g. `void`, or a `CompletableFuture`. The listener method will then be wrapped in either a `MessagingMessageListenerAdapter` or a `AsyncMessagingMessageListenerAdapter` respectively. NOTE: In order to achieve higher throughput, it's encouraged that, at least for simpler logic in message listeners, `interceptors` and `error handlers`, the async variants are used. === Threading and Blocking Components Message processing always starts in a framework thread from the default or provided `TaskExecutor`. If an async component is invoked and the execution returns to the framework on a different thread, such thread will be used until a `blocking` component is found, when the execution switches back to a `TaskExecutor` thread to avoid blocking i.e. `SqsAsyncClient` or `HttpClient` threads. If by the time the execution reaches a `blocking` component it's already on a framework thread, it remains in the same thread to avoid excessive thread allocation and hopping. IMPORTANT: When using `async` methods it's critical not to block the incoming thread, which might be very detrimental to overall performance. If thread-blocking logic has to be used, the blocking logic should be executed on another thread, e.g. using `CompletableFuture.supplyAsync(() -> myLogic(), myExecutor)`. Otherwise, a `sync` interface should be used. === Providing a TaskExecutor The default `TaskExecutor` is a `ThreadPoolTaskExecutor`, and a different `componentTaskExecutor` supplier can be set in the `SqsContainerOptions`. When providing a custom executor, it's important that it's configured to support all threads that will be created, which should be (maxConcurrentMessages * total number of queues). IMPORTANT: To avoid unnecessary thread hopping between blocking components, a `MessageExecutionThreadFactory` MUST be set to the executor. == IAM Permissions Following IAM permissions are required by Spring Cloud AWS SQS: [cols="2"] |=== | Send message to Queue | `sqs:SendMessage` | Receive message from queue | `sqs:ReceiveMessage` | Delete message from queue | `sqs:DeleteMessage` | To use sqsListener with SimpleMessageListenerContainerFactory you will need to add as well | `sqs:GetQueueAttributes` | To use SqsListener with Sqs name instead of ARN you will need | `sqs:GetQueueUrl` |=== Sample IAM policy granting access to SQS: [source,json,indent=0] ---- { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:SendMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl" ], "Resource": "yourARN" } ----