2026/4/3 23:51:54
网站建设
项目流程
招商网站建设需要什么,微趋道小程序免费注册,课程设计登录页面,wordpress tdk消息队列消费场景下#xff0c;经常会发生预期外的异常#xff0c;比如#xff1a;消息处理时业务报错#xff08;比如数据格式异常或下游服务短暂不可用#xff09;、业务处理消息耗时超过ack最大等待时间等。为应对这些场景#xff0c;Pulsar 提供了消息重试和死信机制…消息队列消费场景下经常会发生预期外的异常比如消息处理时业务报错比如数据格式异常或下游服务短暂不可用、业务处理消息耗时超过ack最大等待时间等。为应对这些场景Pulsar 提供了消息重试和死信机制通过消费者客户端不同的配置在处理消息出现异常时可以实现有限重试和无限重试两种效果。有限重试有限重试是利用 Pulsar 的重试队列和死信队列机制保证业务的最终一致性。当某些消息第一次被消费者消费后没有得到正常的回应则会进入重试 Topic 中当重试达到一定次数后停止重试投递到死信 Topic 中。当消息进入到死信队列中一般这时就需要人为介入来处理这批消息。可以通过编写专门的客户端来订阅死信 Topic处理这批之前处理失败的消息。实现原理客户端处理消息失败后调用consumer.reconsumeLater接口开始走重试策略。首先客户端检查消息对应的重试次数如果达到指定的最大重试次数消息被投递到死信队列投递到死信队列的消息不会自动消费如果需要用户自己创建额外的消费者进行消费如果没有达到最大重试次数消费被投递到重试队列。重试间隔是通过延迟消息实现的投递到重试队列的实际上是一个延迟消息延迟时间就是用户在reconsumeLater中指定的时间。使用重试队列实现自动重试的关键点总结发送到 Retry Topic消息被发送到 Retry Topic并设置 deliverAfter(delayTime, unit) 延迟投递自动 ACK发送成功后通过 doAcknowledge() 自动确认原始消息原消息状态原始 Topic 中的消息变为已确认Acknowledged状态延迟重试延迟时间到达后消费者会从 Retry Topic 收到该消息注意事项当使用 Token 访问重试/死信队列时需要为消费者所使用角色赋予生产消息权限。代码示例自动重试的代码示例消费过程中出现某些异常进入重试 Topic 重试最后进入死信Topic中。注如果消费的消息 ack 超时会触发重新投递Redelivery消息会从原 Topic 重新发送给消费者消费者参数设置注意要重试队列和死信队列的topic需要在hulk云平台创建好并给token配置读写权限。PulsarClient pulsarClient Constant.getPulsarClient(); ConsumerString consumer pulsarClient.newConsumer(Schema.STRING) .topic(persistent://my-property/my-ns/test_retry_p2) .subscriptionName(sub1) .subscriptionType(SubscriptionType.Key_Shared) .enableRetry(true)//开启重试消费 .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度默认 1 秒 .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(3)//可以指定最大重试次数 .retryLetterTopic(persistent://my-property/my-ns/sub1-retry) //指定重试队列 .deadLetterTopic(persistent://my-property/my-ns/sub1-dlq) //指定死信队列 .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); while (true) { MessageString message null; try { message consumer.receive(); log.info(Received message: {}, Properties{}, message.getValue(), message.getProperties()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info(Ack: {},{}, message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error(Consumer exception:{}, e.getMessage(), e); consumer.reconsumeLater(message, 2, TimeUnit.SECONDS);//延迟重试 } }Message对象中有个字段 property包含了重试相关的属性{ REAL_TOPICpersistent://my-property/my-ns/real-topic, #原 Topic REAL_SUBSCRIPTIONsub_topic1, ORIGIN_MESSAGE_ID8143097:5:0, #最初生产的消息 ID ORIGIN_MESSAGE_IDY_TIME8143097:5:0, DELAY_TIME1000, RECONSUMETIMES2 #消息重试的次数 }可以使用属性中的重试的次数实现指数级回退重试。无限重试无限重试是指客户端在处理消息失败后主动发一条否定应答让服务端重新推送。如果一直发送否定应答服务端会一直重推因此实现无限重试的效果。仅需2步即可开启无限重试1、初始化consumer时指定重试间隔negativeAckRedeliveryDelay默认1min2、捕获业务异常对处理失败的消息发送否定应答consumer.negativeAcknowledge以下为主动重试的代码示例Consumerbyte[] consumer client.newConsumer() .topic(persistent://my-property/my-ns/real-topic) .subscriptionName(my-subscription) .subscriptionType(SubscriptionType.Key_Shared) .ackTimeout(30, TimeUnit.SECONDS) // 设置为最大处理时间的 2-3 倍 .ackTimeoutTickTime(5, TimeUnit.SECONDS) // 检查粒度默认 1 秒 .negativeAckRedeliveryDelay(1, TimeUnit.MINUTES) // 默认1min .subscribe(); while (true) { MessageString message null; try { message consumer.receive(); log.info(Received message: {}, Redelivery count{}, message.getValue(), message.getRedeliveryCount()); doBusinessMaybeThrowException(message); consumer.acknowledge(message); log.info(ack: {},{}, message.getTopicName(), message.getMessageId()); } catch (Exception e) { log.error(consumer exception, e); //否定应答 consumer.negativeAcknowledge(message); } }值得注意的是当消费者 unack 的消息过多时Broker 会停止向消费者发送增量消息而是一直推送 unack 的消息。直到 unack 的消息消息数量低于阈值才会继续推送新消息。默认单个消费者unack消息数量上限是5万订阅的 unack 消息数量上限是 20万参考文档[1]. Pulsar 消费者客户端官方文档 https://pulsar.apache.org/docs/4.0.x/client-libraries-consumers/