做推送的网站有哪些网页制作公司广州
2026/6/1 10:40:47 网站建设 项目流程
做推送的网站有哪些,网页制作公司广州,双11各大电商平台销售数据,安装百度一下Kafka本身只保证单个分区内的消息是有序的作者#xff1a;淘书创始人摘要Kafka本身只保证单个分区内的消息是有序的在 Kafka 集群环境下#xff0c;当多个相同的服务实例同时消费消息时#xff0c;确保消息落地到数据库时能保持原有的顺序性。这是分布式消费中非常常见且关键…Kafka本身只保证单个分区内的消息是有序的作者淘书创始人摘要Kafka本身只保证单个分区内的消息是有序的在 Kafka 集群环境下当多个相同的服务实例同时消费消息时确保消息落地到数据库时能保持原有的顺序性。这是分布式消费中非常常见且关键的需求。问题分析与核心思路要保证落库顺序性核心是让需要保证顺序的消息始终由同一个消费实例处理并在消费端和数据库端做好配套的顺序控制。具体拆解为以下关键步骤1. Kafka 层面保证分区内消息的有序性Kafka 本身只保证单个分区内的消息是有序的跨分区无法保证。因此第一步是将需要保证顺序的消息发送到同一个分区•发送消息时指定相同的key比如业务ID订单ID、用户IDKafka 会根据key的哈希值将消息路由到固定分区。•示例所有属于同一个订单的消息都用订单ID作为key发送确保进入同一个分区。2. 消费端单分区单线程消费即使消息进入同一个分区如果消费端用多线程消费该分区依然会打乱顺序。因此•每个消费实例的每个分区仅用单线程消费Kafka Consumer 默认就是单线程消费分区只需避免手动开启多线程消费单个分区。•多个服务实例会消费不同的分区由 Kafka 消费者组自动分配但单个分区始终只被一个实例消费保证分区内顺序。3. 数据库层面防止并发写入打乱顺序即使消费端顺序消费若数据库写入时存在并发比如消费线程异步落库仍可能导致顺序错乱。需通过以下方式控制•单线程落库消费到消息后不异步写入而是同步按顺序落库最简单直接。•数据库锁/事务若必须异步可针对业务ID加行锁如SELECT ... FOR UPDATE确保同一业务ID的写入串行执行。•版本号/时间戳校验在数据库表中增加版本号version或消息时间戳字段落库前校验当前版本是否为预期值避免旧消息覆盖新消息。完整实现示例以下是基于 Java Spring Kafka 的完整示例最常用的技术栈涵盖消息生产、消费、落库的全流程顺序控制1. 生产者指定业务Key发送消息import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class OrderProducer { private static final String TOPIC_NAME order_topic; public static void main(String[] args) { Properties props new Properties(); props.put(bootstrap.servers, kafka1:9092,kafka2:9092,kafka3:9092); props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); KafkaProducerString, String producer new KafkaProducer(props); // 模拟发送同一个订单的3条消息指定订单ID为key确保进入同一分区 String orderId ORDER_001; producer.send(new ProducerRecord(TOPIC_NAME, orderId, 订单创建ORDER_001)); producer.send(new ProducerRecord(TOPIC_NAME, orderId, 订单支付ORDER_001)); producer.send(new ProducerRecord(TOPIC_NAME, orderId, 订单完成ORDER_001)); producer.close(); } }2. 消费者单线程消费 同步落库import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class OrderConsumer { private static final String TOPIC_NAME order_topic; private final JdbcTemplate jdbcTemplate; // 初始化数据库连接 public OrderConsumer(DataSource dataSource) { this.jdbcTemplate new JdbcTemplate(dataSource); } public void startConsume() { Properties props new Properties(); props.put(bootstrap.servers, kafka1:9092,kafka2:9092,kafka3:9092); props.put(group.id, order_consumer_group); // 消费者组 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 关键禁止自动提交offset确保消息落库成功后再提交避免重复消费 props.put(enable.auto.commit, false); // 关键每次poll的最大消息数避免单次拉取过多导致顺序错乱 props.put(max.poll.records, 100); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC_NAME)); while (true) { // 拉取消息单线程 ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100)); for (var record : records) { String orderId record.key(); String message record.value(); long offset record.offset(); // 记录offset用于提交 try { // 核心同步落库确保顺序 insertToDb(orderId, message); // 落库成功后手动提交offset按分区提交保证顺序 consumer.commitSync(); } catch (Exception e) { // 落库失败不提交offset下次重新消费 System.err.println(消息落库失败orderId orderId 错误 e.getMessage()); // 可选重试机制避免直接丢弃 retryInsert(orderId, message, 3); } } } } // 同步落库方法单线程执行 private void insertToDb(String orderId, String message) { String sql INSERT INTO order_message (order_id, content, create_time) VALUES (?, ?, NOW()); // 若需更强的顺序保证可加行锁SELECT * FROM order_message WHERE order_id? FOR UPDATE jdbcTemplate.update(sql, orderId, message); } // 重试落库 private void retryInsert(String orderId, String message, int retryCount) { int count 0; while (count retryCount) { try { insertToDb(orderId, message); return; } catch (Exception e) { count; try { Thread.sleep(1000 * count); // 指数退避重试 } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } // 重试失败可记录到死信队列 System.err.println(重试落库失败orderId orderId); } public static void main(String[] args) { // 初始化数据源实际项目中用Spring配置 DataSource dataSource getDataSource(); OrderConsumer consumer new OrderConsumer(dataSource); consumer.startConsume(); } private static DataSource getDataSource() { // 配置数据库连接示例MySQL org.apache.tomcat.dbcp.dbcp2.BasicDataSource ds new org.apache.tomcat.dbcp.dbcp2.BasicDataSource(); ds.setUrl(jdbc:mysql://db:3306/test?useUnicodetruecharacterEncodingutf8); ds.setUsername(root); ds.setPassword(123456); return ds; } }3. 数据库表设计增加版本号强化顺序CREATE TABLE order_message ( id bigint NOT NULL AUTO_INCREMENT, order_id varchar(64) NOT NULL COMMENT 订单ID业务Key, content varchar(255) NOT NULL COMMENT 消息内容, version int NOT NULL DEFAULT 0 COMMENT 版本号用于顺序校验, create_time datetime NOT NULL, PRIMARY KEY (id), UNIQUE KEY uk_order_version (order_id, version) COMMENT 确保同一订单的版本号唯一 ) ENGINEInnoDB DEFAULT CHARSETutf8;落库时可基于版本号递增确保顺序// 改进后的insertToDb方法基于版本号保证顺序 private void insertToDb(String orderId, String message) { // 1. 获取当前订单的最大版本号 Integer maxVersion jdbcTemplate.queryForObject( SELECT IFNULL(MAX(version), 0) FROM order_message WHERE order_id?, new Object[]{orderId}, Integer.class ); int newVersion maxVersion 1; // 2. 插入新消息版本号1 String sql INSERT INTO order_message (order_id, content, version, create_time) VALUES (?, ?, ?, NOW()); jdbcTemplate.update(sql, orderId, message, newVersion); }关键补充说明消费者组配置多个服务实例必须加入同一个消费者组Kafka 会自动将分区均匀分配给组内实例确保单个分区仅被一个实例消费。避免重复消费关闭自动提交 offset仅在落库成功后手动提交防止消息落库失败但 offset 已提交导致的丢失或重复提交导致的重复消费。性能与顺序的平衡单分区单线程消费会限制吞吐量若需高性能可按业务维度拆分多个分区比如按订单ID尾号分10个分区每个分区独立保证顺序整体提升并发。异常处理落库失败时需重试重试失败则写入死信队列避免阻塞消费死信队列可单独处理人工介入修复后重新发送。总结要保证 Kafka 多实例消费时落库的顺序性核心要点如下Kafka 层面通过相同的业务 Key 将需顺序的消息路由到同一个分区利用 Kafka 分区内有序的特性。消费层面单个分区仅由一个消费实例的单线程消费关闭自动提交 offset落库成功后再手动提交。数据库层面同步落库或加行锁/版本号确保同一业务ID的消息串行写入避免并发打乱顺序。这三个环节缺一不可既利用了 Kafka 的分区特性保证消费顺序又通过数据库的控制确保落库顺序是分布式场景下保证顺序性的标准方案。原文链接 https://1024bat.cn/article/78来源 淘书1024bat

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询