参考指南
本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。 它包含有关其设计、用法和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。
1. 用途
要使用 RabbitMQ 绑定器,您可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化图显示了 RabbitMQ 绑定程序的运行方式:
默认情况下,RabbitMQ Binder 实现将每个目标映射到TopicExchange.
对于每个消费者组,一个Queue与此绑定TopicExchange.
每个消费者实例都有对应的 RabbitMQConsumer实例的Queue.
对于分区的生产者和消费者,队列以分区索引为后缀,并使用分区索引作为路由键。
对于匿名消费者(那些没有group属性),则使用自动删除队列(具有随机的唯一名称)。
通过使用可选的autoBindDlq选项,您可以配置活页夹以创建和配置死信队列 (DLQ)(以及死信交换DLX,以及路由基础设施)。
默认情况下,死信队列具有目标名称,并附加.dlq.
如果启用重试 (maxAttempts > 1),在重试用尽后,失败的消息将传递到 DLQ。
如果禁用重试 (maxAttempts = 1),您应该将requeueRejected自false(缺省值),以便将失败的消息路由到 DLQ,而不是重新排队。
另外republishToDlq导致 Binder 将失败的消息发布到 DLQ(而不是拒绝它)。
此功能允许其他信息(例如x-exception-stacktraceheader)添加到 headers 中的消息中。
请参阅frameMaxHeadroom属性有关截断堆栈跟踪的信息。
此选项不需要启用重试。
只需尝试一次即可重新发布失败的邮件。
从 1.2 版本开始,您可以配置重新发布的消息的传递方式。
请参阅republishDeliveryMode属性.
如果流侦听器抛出ImmediateAcknowledgeAmqpException,则绕过 DLQ 并直接丢弃消息。
从 2.1 版开始,无论republishToDlq;以前只有在以下情况下才会出现这种情况republishToDlq是false.
设置requeueRejected自true(与republishToDlq=false) 会导致消息不断重新排队和重新传递,这可能不是您想要的,除非失败原因是暂时的。
通常,应通过设置maxAttempts设置为大于 1 或通过设置republishToDlq自true. |
有关这些属性的更多信息,请参阅 RabbitMQ Binder 属性。
该框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。 死信队列处理中介绍了一些选项。
当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定器时,请务必禁用“RabbitAutoConfiguration”以避免从RabbitAutoConfiguration应用于两个粘合剂。
您可以使用@SpringBootApplication注解。 |
从 2.0 版开始,RabbitMessageChannelBinder将RabbitTemplate.userPublisherConnection属性设置为true这样,非事务生产者就可以避免消费者的死锁,如果缓存的连接由于代理上的内存警报而被阻止,则可能会发生这种情况。
目前,一个multiplexConsumer(监听多个队列的单个消费者)仅支持消息驱动的消费者;轮询的使用者只能从单个队列中检索消息。 |
3. 配置选项
本节包含特定于 RabbitMQ Binder 和绑定通道的设置。
有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ 绑定器使用 Spring Boot 的ConnectionFactory.
因此,它支持 RabbitMQ 的所有 Spring Boot 配置选项。
(有关参考,请参阅 Spring Boot 文档)。
RabbitMQ 配置选项使用spring.rabbitmq前缀。
除了 Spring Boot 选项之外,RabbitMQ 绑定器还支持以下属性:
- spring.cloud.stream.rabbit.binder.admin地址
-
RabbitMQ 管理插件 URL 的逗号分隔列表。 仅在以下情况下使用
nodes包含多个条目。 此列表中的每个条目都必须在spring.rabbitmq.addresses. 仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参阅队列关联性和 LocalizedQueueConnectionFactory。默认值:空。
- spring.cloud.stream.rabbit.binder.nodes
-
以逗号分隔的 RabbitMQ 节点名称列表。 当有多个条目时,用于查找队列所在的服务器地址。 此列表中的每个条目都必须在
spring.rabbitmq.addresses. 仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参阅队列关联性和 LocalizedQueueConnectionFactory。默认值:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩级别。 看
java.util.zip.Deflater.违约:
1(BEST_LEVEL)。 - spring.cloud.stream.binder.connection-name-prefix
-
用于命名此活页夹创建的连接的连接名称前缀。 名称是这个前缀,后跟
#n哪里n每次打开新连接时递增。默认值:none(Spring AMQP 默认值)。
3.2. RabbitMQ 消费者属性
以下属性仅适用于 Rabbit 使用者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..
但是,如果需要将同一组属性应用于大多数绑定,请将
避免重复,Spring Cloud Stream 支持为所有通道设置值,
以spring.cloud.stream.rabbit.default.<property>=<value>.
另外,请记住,绑定特定属性将覆盖其默认中的等效属性。
- 确认模式
-
确认模式。
违约:
AUTO. - 匿名组前缀
-
当绑定没有
group属性,则匿名的自动删除队列绑定到目标交换。 此类队列的默认命名策略会导致名为anonymous.<base64 representation of a UUID>. 设置此属性可将前缀更改为默认值以外的内容。违约:
anonymous.. - 自动绑定Dlq
-
是否自动声明 DLQ 并将其绑定到绑定程序 DLX。
违约:
false. - bindingRoutingKey
-
用于将队列绑定到交换的路由密钥(如果
bindQueue是true). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-<instanceIndex>附加到每个键。违约:。
# - bindingRoutingKeyDelimiter
-
当它不为空时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。
违约:
null. - 绑定队列
-
是否声明队列并将其绑定到目标交换。 将其设置为
false如果您已经设置了自己的基础设施,并且之前已经创建并绑定了队列。违约:
true. - consumerTagPrefix
-
用于创建消费者标签;将附加
#n哪里n为创建的每个消费者增量。 例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.默认值:无 - 代理将生成随机消费者标签。
- 容器类型
-
选择要使用的侦听器容器的类型。 有关更多信息,请参阅 Spring AMQP 文档中的选择容器。
违约:
simple - deadLetter队列名称
-
DLQ 的名称
违约:
prefix+destination.dlq - 死信交换
-
要分配给队列的 DLX。 仅当
autoBindDlq是true.默认值:“前缀+DLX”
- deadLetter交换类型
-
要分配给队列的 DLX 类型。 仅当
autoBindDlq是true.默认值:“直接”
- deadLetterRouting键
-
要分配给队列的死信路由键。 仅当
autoBindDlq是true.违约:
destination - 声明Dlx
-
是否为目的地申报死信交换。 仅当
autoBindDlq是true. 设置为false如果您有预配置的 DLX。违约:
true. - 声明交换
-
是否声明目标的交换。
违约:
true. - 延迟交换
-
是否将交易所声明为
Delayed Message Exchange. 需要代理上的延迟消息交换插件。 这x-delayed-type参数设置为exchangeType.违约:
false. - dlqBinding参数
-
将 dlq 绑定到死信交换时应用的参数;与
headersdeadLetterExchangeType以指定要匹配的标头。 例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue.默认值:空
- dlq死信交换
-
如果声明了 DLQ,则要分配给该队列的 DLX。
违约:
none - dlqDeadLetterRoutingKey
-
如果声明了 DLQ,则要分配给该队列的死信路由密钥。
违约:
none - dlq过期
-
删除未使用的死信队列之前多长时间(以毫秒为单位)。
违约:
no expiration - dlq懒惰
-
使用
x-queue-mode=lazy论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。违约:
false. - dlq最大长度
-
死信队列中的最大消息数。
违约:
no limit - dlqMaxLength字节
-
所有消息的死信队列中的最大总字节数。
违约:
no limit - dlqMax优先级
-
死信队列中邮件的最大优先级 (0-255)。
违约:
none - dlqOverflow行为
-
在以下情况下要采取的作
dlqMaxLength或dlqMaxLengthBytes已超过;现在drop-head或reject-publish但请参阅 RabbitMQ 文档。违约:
none - dlqQuorum.deliveryLimit
-
什么时候
quorum.enabled=true,设置传递限制,在此之后,邮件将被丢弃或死信。默认值:无 - 代理默认值将适用。
- dlqQuorum.enabled
-
如果为 true,请创建仲裁死信队列,而不是经典队列。
默认值:false
- dlqQuorum.initialQuorum大小
-
什么时候
quorum.enabled=true,设置初始仲裁大小。默认值:无 - 代理默认值将适用。
- dlqSingleActiveConsumer
-
设置为 true 以设置
x-single-active-consumerqueue 属性设置为 true。违约:
false - dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。
违约:
no limit - 持久订阅
-
订阅是否应持久。 仅当
group也设置了。违约:
true. - 交易所自动删除
-
如果
declareExchange为 true,则是否应自动删除交换(即在删除最后一个队列后删除)。违约:
true. - 交换耐用
-
如果
declareExchange是否为 true,则交换是否应该是持久的(即,它在代理重启后仍然存在)。违约:
true. - 交换类型
-
交易所类型:
direct,fanout,headers或topic对于未分区的目标和direct、标题或topic用于分区目标。违约:
topic. - 独家
-
是否创建专属消费者。 当这是
true. 通常在需要严格排序但允许热备用实例在故障后接管时使用。 看recoveryInterval,控制备用实例尝试使用的频率。 考虑使用singleActiveConsumer而是在使用 RabbitMQ 3.8 或更高版本时。违约:
false. - 到期
-
删除未使用的队列之前多长时间(以毫秒为单位)。
违约:
no expiration - failedDeclarationRetryInterval (失败声明重试间隔)
-
尝试从队列中消耗(如果缺少)之间的间隔(以毫秒为单位)。
默认值:5000
- 框架最大净空
-
将堆栈跟踪添加到 DLQ 消息标头时要为其他标头保留的字节数。 所有标头都必须适合
frame_maxsize 配置在代理上。 堆栈跟踪可能很大;如果大小加上此属性超过frame_max则堆栈跟踪将被截断。 将写入 WARN 日志;考虑增加frame_max或者通过捕获异常并抛出具有较小堆栈跟踪的异常来减少堆栈跟踪。默认值:20000
- headerPatterns
-
要从入站消息映射的标头的模式。
默认值:(所有标头)。
['*'] - 懒惰
-
使用
x-queue-mode=lazy论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。违约:
false. - 最大并发
-
使用者的最大数量。 当
containerType是direct.违约:
1. - 最大长度
-
队列中的最大消息数。
违约:
no limit - maxLength字节
-
队列中所有消息的最大总字节数。
违约:
no limit - 最大优先级
-
队列中消息的最大优先级 (0-255)。
违约:
none - missingQueues致命
-
当找不到队列时,是否将该条件视为致命并停止监听器容器。默认为
false以便容器不断尝试从队列中消费——例如,当使用集群并且托管非 HA 队列的节点关闭时。违约:
false - 溢出行为
-
在以下情况下要采取的作
maxLength或maxLengthBytes已超过;现在drop-head或reject-publish但请参阅 RabbitMQ 文档。违约:
none - 预取
-
预取计数。
违约:
1. - 前缀
-
要添加到
destination和队列。默认值:“”。
- queueBindingArguments
-
将队列绑定到交换时应用的参数;与
headersexchangeType以指定要匹配的标头。 例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue.默认值:空
- queueDeclarationRetries
-
如果缺少队列,则重试从队列中使用的次数。 仅在以下情况下相关
missingQueuesFatal是true. 否则,容器会无限期地重试。 当containerType是direct.违约:
3 - 队列名称仅组
-
当 true 时,从名称等于
group. 否则队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中消费时,这很有用。默认值:false。
- quorum.deliveryLimit
-
什么时候
quorum.enabled=true,设置传递限制,在此之后,邮件将被丢弃或死信。默认值:无 - 代理默认值将适用。
- 法定人数已启用
-
如果为 true,请创建仲裁队列而不是经典队列。
默认值:false
- quorum.initialQuorum大小
-
什么时候
quorum.enabled=true,设置初始仲裁大小。默认值:无 - 代理默认值将适用。
- 恢复间隔
-
连接恢复尝试之间的间隔(以毫秒为单位)。
违约:
5000. - requeue已拒绝
-
禁用重试时是否应将传递失败重新排队,或者
republishToDlq是false.违约:
false.
- 重新发布交付模式
-
什么时候
republishToDlq是true,指定重新发布的邮件的传递方式。违约:
DeliveryMode.PERSISTENT - 重新发布到Dlq
-
默认情况下,重试用尽后失败的邮件将被拒绝。 如果配置了死信队列 (DLQ),则 RabbitMQ 会将失败的消息(原封不动)路由到 DLQ。 如果设置为
true,则 Binder 会使用其他标头将失败的消息重新发布到 DLQ,包括异常消息和最终失败原因的堆栈跟踪。 另请参阅 frameMaxHeadroom 属性。默认值:false
- 单主动消费者
-
设置为 true 以设置
x-single-active-consumerqueue 属性设置为 true。违约:
false - 交易
-
是否使用交易通道。
违约:
false. - TTL的
-
声明时应用于队列的默认生存时间(以毫秒为单位)。
违约:
no limit - tx大小
-
确认之间的交付次数。 当
containerType是direct.违约:
1.
3.3. 高级侦听器容器配置
要设置未公开为绑定器或绑定属性的侦听器容器属性,请添加类型为ListenerContainerCustomizer到应用程序上下文。
将设置活页夹和绑定属性,然后调用定制器。
定制器 (configure()方法)与队列名称以及消费者组作为参数一起提供。
3.4. 高级队列/交换/绑定配置
RabbitMQ 团队会不时添加新功能,这些功能是通过在声明队列时设置一些参数来启用的。通常,通过添加适当的属性在绑定器中启用此类功能,但这在当前版本中可能无法立即可用。从版本 3.0.1 开始,您现在可以将DeclarableCustomizerbean(s) 添加到应用程序上下文中以修改Declarable (Queue,Exchange或Binding) 在执行声明之前。这允许您添加绑定器当前不直接支持的参数。
3.5. 接收批处理消息
通常,如果生产者绑定具有batch-enabled=true(参见 Rabbit Producer 属性),或者消息是由BatchingRabbitTemplate,批处理的元素将作为对侦听器方法的单独调用返回。从 3.0 版开始,任何此类批处理都可以表示为List<?>如果spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true.
3.6. 兔子生产者属性
以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer..
但是,如果需要将同一组属性应用于大多数绑定,请将
避免重复,Spring Cloud Stream 支持为所有通道设置值,
以spring.cloud.stream.rabbit.default.<property>=<value>.
另外,请记住,绑定特定属性将覆盖其默认中的等效属性。
- 自动绑定Dlq
-
是否自动声明 DLQ 并将其绑定到绑定程序 DLX。
违约:
false. - 批处理已启用
-
是否由生产者启用消息批处理。 消息根据以下属性批处理为一条消息(在此列表中的接下来的三个条目中描述): 'batchSize',
batchBufferLimit和batchTimeout. 有关详细信息,请参阅批处理。 另请参阅接收批处理消息。违约:
false. - batch大小
-
启用批处理时要缓冲的消息数。
违约:
100. - batch缓冲限制
-
启用批处理时的最大缓冲区大小。
违约:
10000. - batchTimeout
-
启用批处理时的批处理超时。
违约:
5000. - bindingRoutingKey
-
用于将队列绑定到交换的路由密钥(如果
bindQueue是true). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-n附加到每个键。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:。
# - bindingRoutingKeyDelimiter
-
当它不为空时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
null. - 绑定队列
-
是否声明队列并将其绑定到目标交换。 将其设置为
false如果您已经设置了自己的基础设施,并且之前已经创建并绑定了队列。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
true. - 压缩
-
发送时是否应压缩数据。
违约:
false. - 确认AckChannel
-
什么时候
errorChannelEnabled为 true,则是向其发送正面投放确认(又称发布者确认)的通道。 如果通道不存在,则DirectChannel以此名称注册。 必须将连接工厂配置为启用发布者确认。违约:
nullChannel(acks 被丢弃)。 - deadLetter队列名称
-
DLQ 的名称 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
prefix+destination.dlq - 死信交换
-
要分配给队列的 DLX。 仅在以下情况下相关
autoBindDlq是true. 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:“前缀+DLX”
- deadLetter交换类型
-
要分配给队列的 DLX 类型。 仅当
autoBindDlq是true. 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:“直接”
- deadLetterRouting键
-
要分配给队列的死信路由键。 仅在以下情况下相关
autoBindDlq是true. 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
destination - 声明Dlx
-
是否为目的地申报死信交换。 仅当
autoBindDlq是true. 设置为false如果您有预配置的 DLX。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
true. - 声明交换
-
是否声明目标的交换。
违约:
true. - 延迟表达式
-
用于评估要应用于消息的延迟的 SpEL 表达式 (
x-delay标头)。 如果交换不是延迟消息交换,则无效。默认值:否
x-delayheader 已设置。 - 延迟交换
-
是否将交易所声明为
Delayed Message Exchange. 需要代理上的延迟消息交换插件。 这x-delayed-type参数设置为exchangeType.违约:
false. - 交付模式
-
交付模式。
违约:
PERSISTENT. - dlqBinding参数
-
将 dlq 绑定到死信交换时应用的参数;与
headersdeadLetterExchangeType以指定要匹配的标头。 例如…dlqBindingArguments.x-match=any,…dlqBindingArguments.someHeader=someValue. 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:空
- dlq死信交换
-
声明 DLQ 时,要分配给该队列的 DLX。仅在以下情况下适用
requiredGroups提供,然后仅提供给这些组。违约:
none - dlqDeadLetterRoutingKey
-
声明 DLQ 时,要分配给该队列的死信路由密钥。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
none - dlq过期
-
删除未使用的死信队列之前多长时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no expiration - dlq懒惰
-
使用
x-queue-mode=lazy论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。 - dlq最大长度
-
死信队列中的最大消息数。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit - dlqMaxLength字节
-
所有消息的死信队列中的最大总字节数。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit - dlqMax优先级
-
死信队列中消息的最大优先级 (0-255) 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
none - dlqQuorum.deliveryLimit
-
什么时候
quorum.enabled=true,设置传递限制,在此之后,邮件将被丢弃或死信。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- dlqQuorum.enabled
-
如果为 true,请创建仲裁死信队列,而不是经典队列。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。默认值:false
- dlqQuorum.initialQuorum大小
-
什么时候
quorum.enabled=true,设置初始仲裁大小。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- dlqSingleActiveConsumer
-
设置为 true 以设置
x-single-active-consumerqueue 属性设置为 true。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
false - dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit - 交易所自动删除
-
如果
declareExchange是true,交换是否应自动删除(在删除最后一个队列后将其删除)。违约:
true. - 交换耐用
-
如果
declareExchange是true,交易所是否应该持久(在代理重启后幸存)。违约:
true. - 交换类型
-
交易所类型:
direct,fanout,headers或topic对于未分区的目标和direct,headers或topic用于分区目标。违约:
topic. - 到期
-
删除未使用的队列之前的时间(以毫秒为单位)。仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no expiration - headerPatterns
-
要映射到出站邮件的标头的模式。
默认值:(所有标头)。
['*'] - 懒惰
-
使用
x-queue-mode=lazy论点。 请参阅“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
false. - 最大长度
-
队列中的最大消息数。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit - maxLength字节
-
队列中所有消息的最大总字节数。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit - 最大优先级
-
队列中消息的最大优先级 (0-255)。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
none - 前缀
-
要添加到
destination交换。默认值:“”。
- queueBindingArguments
-
将队列绑定到交换时应用的参数;与
headersexchangeType以指定要匹配的标头。 例如…queueBindingArguments.x-match=any,…queueBindingArguments.someHeader=someValue. 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:空
- 队列名称仅组
-
什么时候
true,从名称等于group. 否则队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中消费时,这很有用。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:false。
- quorum.deliveryLimit
-
什么时候
quorum.enabled=true,设置传递限制,在此之后,邮件将被丢弃或死信。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- 法定人数已启用
-
如果为 true,请创建仲裁队列而不是经典队列。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。默认值:false
- quorum.initialQuorum大小
-
什么时候
quorum.enabled=true,设置初始仲裁大小。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。默认值:无 - 代理默认值将适用。
- 路由键表达式
-
用于确定发布消息时要使用的路由键的 SpEL 表达式。 对于固定路由键,请使用文字表达式,例如
routingKeyExpression='my.routingKey'在属性文件中或routingKeyExpression: '''my.routingKey'''在 YAML 文件中。违约:
destination或destination-<partition>用于分区目标。 - 单主动消费者
-
设置为 true 以设置
x-single-active-consumerqueue 属性设置为 true。 仅适用于以下情况requiredGroups提供,然后仅提供给这些组。违约:
false - 交易
-
是否使用交易通道。
违约:
false. - TTL的
-
声明时应用于队列的默认生存时间(以毫秒为单位)。 仅适用于以下情况
requiredGroups提供,然后仅提供给这些组。违约:
no limit
| 对于 RabbitMQ,内容类型标头可以由外部应用程序设置。 Spring Cloud Stream 将它们作为用于任何类型传输的扩展内部协议的一部分,包括本机不支持标头的传输,例如 Kafka(0.11 之前)。 |
4. 使用现有队列/交换
默认情况下,绑定程序将自动预配主题交换,其名称派生自目标绑定属性的值<prefix><destination>.
目标默认为绑定名称(如果未提供)。
绑定使用者时,将自动预配一个队列,名称为<prefix><destination>.<group>(如果groupbinding 属性),或者如果没有group.
队列将使用“match-all”通配符路由键 () 绑定到交换,用于非分区绑定或#<destination>-<instanceIndex>用于分区绑定。
前缀为空String默认情况下。
如果输出绑定指定为requiredGroups,将为每个组预配一个队列/绑定。
有许多特定于 rabbit 的绑定属性允许您修改此默认行为。
如果您希望使用现有的交换/队列,则可以完全禁用自动配置,如下所示,假设交换名为myExchange队列命名为myQueue:
-
spring.cloud.stream.bindings.<binding name>.destination=myExhange -
spring.cloud.stream.bindings.<binding name>.group=myQueue -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果您希望 Binder 预配队列/交换,但又想使用此处讨论的默认值以外的其他内容来执行此作,请使用以下属性。 有关更多信息,请参阅上面的属性文档。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey -
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type> -
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
声明死信交换/队列时使用类似的属性,当autoBindDlq是true.
5. 使用 RabbitMQ Binder 重试
在绑定程序中启用重试时,侦听器容器线程将在配置的任何回退期内挂起。 当需要对单个使用者进行严格排序时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。 使用活页夹重试的替代方法是设置死信,并在死信队列 (DLQ) 上设置生存时间,以及在 DLQ 本身上设置死信配置。 有关此处讨论的属性的更多信息,请参阅“RabbitMQ Binder 属性”。 您可以使用以下示例配置来启用此功能:
-
设置
autoBindDlq自true. 活页夹创建 DLQ。 或者,您可以在deadLetterQueueName. -
设置
dlqTtl到您要在重新投放之间等待的回退时间。 -
将
dlqDeadLetterExchange到默认交易所。 来自 DLQ 的过期消息将路由到原始队列,因为默认的deadLetterRoutingKey是队列名称 (destination.group). 设置为默认交换是通过设置没有值的属性来实现的,如下一个示例所示。
要强制消息为死信,请抛出AmqpRejectAndDontRequeueException或将requeueRejected自true(默认值)并抛出任何异常。
循环无休止地继续下去,这对于暂时性问题来说很好,但您可能想在尝试几次后放弃。
幸运的是,RabbitMQ 提供了x-death标头,可让您确定发生了多少个周期。
要在放弃后确认消息,请抛出一个ImmediateAcknowledgeAmqpException.
5.1. 把它们放在一起
以下配置创建交换myDestination带队列myDestination.consumerGroup绑定到具有通配符路由密钥的主题交换:#
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置创建绑定到直接交换 (DLX),路由键为myDestination.consumerGroup.
当消息被拒绝时,它们将被路由到 DLQ。
5秒后,消息过期,使用队列名称作为路由键路由到原始队列,如以下示例所示:
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
请注意,count 属性中的x-deathheader 是一个Long.
6. 错误通道
从 1.3 版开始,绑定器无条件地将异常发送到每个使用者目标的错误通道,也可以配置为将异步生产者发送失败发送到错误通道。 有关更多信息,请参阅“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ 有两种类型的发送失败:
-
返回的消息,
-
负面确认的出版商确认。
后者很少见。 根据 RabbitMQ 文档,“只有当负责队列的 Erlang 进程中发生内部错误时,才会传递 [a nack]。
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)外,RabbitMQ 绑定器仅在正确配置连接工厂时才向通道发送消息,如下所示:
-
ccf.setPublisherConfirms(true); -
ccf.setPublisherReturns(true);
对连接工厂使用 Spring Boot 配置时,请设置以下属性:
-
spring.rabbitmq.publisher-confirms -
spring.rabbitmq.publisher-returns
的有效负载ErrorMessage对于返回的消息,是ReturnedAmqpMessageException具有以下属性:
-
failedMessage:春季消息Message<?>未能发送。 -
amqpMessage:原始 spring-amqpMessage. -
replyCode:指示失败原因的整数值(例如,312 - 无路由)。 -
replyText:指示失败原因的文本值(例如,NO_ROUTE). -
exchange:将消息发布到的交换。 -
routingKey:发布邮件时使用的路由密钥。
对于否定确认,有效负载是NackedAmqpMessageException具有以下属性:
-
failedMessage:春季消息Message<?>未能发送。 -
nackReason:原因(如果可用 - 您可能需要检查代理日志以获取更多信息)。
没有自动处理这些异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流使用这些异常。
7. 死信队列处理
因为您无法预测用户希望如何处理死信消息,所以该框架没有提供任何标准机制来处理它们。如果死信的原因是暂时的,您可能希望将消息路由回原始队列。但是,如果问题是永久性问题,则可能会导致无限循环。以下 Spring Boot 应用程序显示了一个示例,说明如何将这些消息路由回原始队列,但在三次尝试后将它们移动到第三个“停车场”队列。第二个示例使用 RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在此示例中,每次尝试的延迟都会增加。这些示例使用@RabbitListener接收来自 DLQ 的消息。
您还可以使用RabbitTemplate.receive()在批处理中。
这些示例假定原始目标为so8400in消费群体是so8400.
7.1. 非分区目标
前两个示例适用于未对目标进行分区的情况:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2. 分区目标
使用分区目标时,所有分区都有一个 DLQ。我们从标头确定原始队列。
7.2.1.republishToDlq=false
什么时候republishToDlq是false,RabbitMQ 将消息发布到 DLX/DLQ,并使用x-death标头,其中包含有关原始目标的信息,如以下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2.2.republishToDlq=true
什么时候republishToDlq是true,则重新发布恢复器会将原始交换和路由密钥添加到标头,如以下示例所示:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8. 使用 RabbitMQ Binder 进行分区
RabbitMQ 原生不支持分区。
有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时,特定客户的所有消息都应转到同一分区。
这RabbitMessageChannelBinder通过将每个分区的队列绑定到目标交换来提供分区。
以下 Java 和 YAML 示例显示了如何配置生产者:
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前面示例中的配置使用默认分区 ( 这 |
以下配置预配主题交换:
以下队列绑定到该交换:
以下绑定将队列与交换相关联:
以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置使用者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + " received from queue " + queue);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
这RabbitMessageChannelBinder不支持动态缩放。
每个分区必须至少有一个使用者。
消费者的instanceIndex用于指示使用哪个分区。
Cloud Foundry 等平台只能有一个实例,具有instanceIndex. |