2026/5/31 22:28:54
网站建设
项目流程
百度蜘蛛开发网站,批量查询网站是否正常,门户网站开发费用,网站建设摊销方法RabbitMQ 的文章之前写过#xff0c;但是当时给的示例是 Demo 版的#xff0c;这篇文章主要是结合之前写的理论知识#xff0c;将 RabbitMQ 集成到技术派项目中。
话不多说#xff0c;上文章目录#xff1a; 下面我们先回顾一下理论知识#xff0c;如果对这块知识已经清…RabbitMQ 的文章之前写过但是当时给的示例是 Demo 版的这篇文章主要是结合之前写的理论知识将 RabbitMQ 集成到技术派项目中。话不多说上文章目录下面我们先回顾一下理论知识如果对这块知识已经清楚的同学可以直接跳到实战部分。1. 消息队列1.1 消息队列模式消息队列目前主要 2 种模式分别为“点对点模式”和“发布/订阅模式”。点对点模式一个具体的消息只能由一个消费者消费多个生产者可以向同一个消息队列发送消息但是一个消息在被一个消息者处理的时候这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。需要额外注意的是如果消费者处理一个消息失败了消息系统一般会把这个消息放回队列这样其他消费者可以继续处理。发布/订阅模式单个消息可以被多个订阅者并发的获取和处理。一般来说订阅有两种类型临时ephemeral订阅这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出相应的订阅以及尚未处理的消息就会丢失。持久durable订阅这种订阅会一直存在除非主动去删除。消费者退出后消息系统会继续维护该订阅并且后续消息可以被继续处理。1.2 RabbitMQ 特征消息路由支持RabbitMQ可以通过不同的交换器支持不同种类的消息路由消息有序不支持当消费消息时如果消费失败消息会被放回队列然后重新消费这样会导致消息无序消息时序非常好通过延时队列可以指定消息的延时时间过期时间TTL等容错处理非常好通过交付重试和死信交换器DLX来处理消息处理故障伸缩一般伸缩其实没有非常智能因为即使伸缩了master queue还是只有一个负载还是只有这一个master queue去抗所以我理解RabbitMQ的伸缩很弱个人理解。持久化不太好没有消费的消息可以支持持久化这个是为了保证机器宕机时消息可以恢复但是消费过的消息就会被马上删除因为RabbitMQ设计时就不是为了去存储历史数据的。消息回溯支持因为消息不支持永久保存所以自然就不支持回溯。高吞吐中等因为所有的请求的执行最后都是在master queue它的这个设计导致单机性能达不到十万级的标准。2. RabbitMQ 原理初探RabbitMQ 2007 年发布是使用 Erlang 语言开发的开源消息队列系统基于 AMQP 协议来实现。2.1 基本概念提到RabbitMQ就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。先了解一下AMQP协议中间的几个重要概念Server接收客户端的连接实现AMQP实体服务。Connection连接应用程序与Server的网络连接TCP连接。Channel信道消息读写等操作在信道中进行。客户端可以建立多个信道每个信道代表一个会话任务。Message消息应用程序和服务器之间传送的数据消息可以非常简单也可以很复杂。由Properties和Body组成。Properties为外包装可以对消息进行修饰比如消息的优先级、延迟等高级特性Body就是消息体内容。Virtual Host虚拟主机用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue同一个虚拟主机里面不能有相同名称的Exchange或Queue。Exchange交换器接收消息按照路由规则将消息路由到一个或者多个队列。如果路由不到或者返回给生产者或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种后面详细介绍。Binding绑定交换器和消息队列之间的虚拟连接绑定中可以包含一个或者多个RoutingKey。RoutingKey路由键生产者将消息发送给交换器的时候会发送一个RoutingKey用来指定路由规则这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串例如“com.rabbitmq”。Queue消息队列用来保存消息供消费者消费。2.2 工作原理AMQP 协议模型由三部分组成生产者、消费者和服务端执行流程如下生产者是连接到 Server建立一个连接开启一个信道。生产者声明交换器和队列设置相关属性并通过路由键将交换器和队列进行绑定。消费者也需要进行建立连接开启信道等操作便于接收消息。生产者发送消息发送到服务端中的虚拟主机。虚拟主机中的交换器根据路由键选择路由规则发送到不同的消息队列中。订阅了消息队列的消费者就可以获取到消息进行消费。2.3 常用交换器RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种Direct Exchange见文知意直连交换机意思是此交换机需要绑定一个队列要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的点对点的发送。Fanout Exchange这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。Topic Exchange直接翻译的话叫做主题交换机如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配路由到对应的队列。通配符有两种* 、 #。需要注意的是通配符前面必须要加上.符号。*符号有且只匹配一个词。比如 a.*可以匹配到a.b、a.c但是匹配不了a.b.c。#符号匹配一个或多个词。比如rabbit.#既可以匹配到rabbit.a.b、rabbit.a也可以匹配到rabbit.a.b.c。Headers Exchange这种交换机用的相对没这么多。它跟上面三种有点区别它的路由不是用routingKey进行路由匹配而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息有两种模式全部匹配和部分匹配。如上图所示交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值路由到对应的队列。3. RabbitMQ环境搭建因为我用的是Mac所以直接可以参考官网https://www.rabbitmq.com/install-homebrew.html需要注意的是一定需要先执行brew update然后再执行brew install rabbitmq之前没有执行brew update直接执行brew install rabbitmq时会报各种各样奇怪的错误其中“403 Forbidde”居多。但是在执行“brew install rabbitmq”会自动安装其它的程序如果你使用源码安装Rabbitmq因为启动该服务依赖erlang环境所以你还需手动安装erlang但是目前官方已经一键给你搞定会自动安装Rabbitmq依赖的所有程序是不是很棒最后执行成功的输出如下启动服务# 启动方式1后台启动 brew services start rabbitmq # 启动方式2当前窗口启动 cd /usr/local/Cellar/rabbitmq/3.8.19 rabbitmq-server在浏览器输入http://localhost:15672/会出现RabbitMQ后台管理界面用户名和密码都为guest通过brew安装一行命令搞定真香4. RabbitMQ 集成4.1 前置工作添加账号## 添加账号 ./rabbitmqctl add_user admin admin ## 添加访问权限 ./rabbitmqctl set_permissions -p / admin .* .* .* ## 设置超级权限 ./rabbitmqctl set_user_tags admin administratorpom 引入依赖dependency groupIdcom.rabbitmq/groupId artifactIdamqp-client/artifactId version5.5.1/version /dependency4.2 代码实现核心代码先整一个 ConnectionFactory 单例每台机器都有自己的 ConnectionFactory防止每次都初始化在后面的迭代中我会把这个去掉整成连接池。/** * author Louzai * date 2023/5/10 */ public class RabbitmqUtil { /** * 每个key都有自己的工厂 */ private static MapString, ConnectionFactory executors new ConcurrentHashMap(); /** * 初始化一个工厂 * * param host * param port * param username * param passport * param virtualhost * return */ public static ConnectionFactory init(String host, Integer port, String username, String passport, String virtualhost) { ConnectionFactory factory new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(passport); factory.setVirtualHost(virtualhost); return factory; } /** * 工厂单例每个key都有属于自己的工厂 * * param key * param host * param port * param username * param passport * param virtualhost * return */ public static ConnectionFactory getOrInitConnectionFactory(String key, String host, Integer port, String username, String passport, String virtualhost) { ConnectionFactory connectionFactory executors.get(key); if (null connectionFactory) { synchronized (RabbitmqUtil.class) { connectionFactory executors.get(key); if (null connectionFactory) { connectionFactory init(host, port, username, passport, virtualhost); executors.put(key, connectionFactory); } } } return connectionFactory; } }获取 RabbitmqClient/** * author Louzai * date 2023/5/10 */ Component public class RabbitmqClient { Autowired private RabbitmqProperties rabbitmqProperties; /** * 创建一个工厂 * param key * return */ public ConnectionFactory getConnectionFactory(String key) { String host rabbitmqProperties.getHost(); Integer port rabbitmqProperties.getPort(); String userName rabbitmqProperties.getUsername(); String password rabbitmqProperties.getPassport(); String virtualhost rabbitmqProperties.getVirtualhost(); return RabbitmqUtil.getOrInitConnectionFactory(key, host, port, userName,password, virtualhost); } }重点敲黑板这里就是 RabbmitMQ 的核心逻辑了。我们使用的交换机类型是 Direct Exchange此交换机需要绑定一个队列要求该消息与一个特定的路由键完全匹配简单点说就是一对一的点对点的发送。至于为什么不用广播和主题交换机模式因为技术派的使用场景就是发送单个消息点到点发送和消费的模式完全可以满足我们的需求。下面 3 个方法都很简单发送消息拿到工厂 - 创建链接 - 创建通道 - 声明交换机 - 发送消息 - 关闭链接消费消息拿到工厂 - 创建链接 - 创建通道 - 确定消息队列 - 绑定队列到交换机 - 接受并消费消息消费消息永动模式非阻塞模式消费 RabbitMQ 消息。Component public class RabbitmqServiceImpl implements RabbitmqService { Autowired private RabbitmqClient rabbitmqClient; Autowired private NotifyService notifyService; Override public void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory rabbitmqClient.getConnectionFactory(toutingKey); // TODO: 这种并发量起不来需要改造成连接池 //创建连接 Connection connection factory.newConnection(); //创建消息通道 Channel channel connection.createChannel(); // 声明exchange中的消息为可持久化不自动删除 channel.exchangeDeclare(exchange, exchangeType, true, false, null); // 发布消息 channel.basicPublish(exchange, toutingKey, null, message.getBytes()); System.out.println(Publish msg: message); channel.close(); connection.close(); } Override public void consumerMsg(String exchange, String queue, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory rabbitmqClient.getConnectionFactory(routingKey); // TODO: 这种并发量起不来需要改造成连接池 //创建连接 Connection connection factory.newConnection(); //创建消息信道 final Channel channel connection.createChannel(); //消息队列 channel.queueDeclare(queue, true, false, false, null); //绑定队列到交换机 channel.queueBind(queue, exchange, routingKey); Consumer consumer new DefaultConsumer(channel) { Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message new String(body, UTF-8); System.out.println(Consumer msg: message); // 获取Rabbitmq消息并保存到DB // 说明这里仅作为示例如果有多种类型的消息可以根据消息判定简单的用 if...else 处理复杂的用工厂 策略模式 notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE); channel.basicAck(envelope.getDeliveryTag(), false); } }; // 取消自动ack channel.basicConsume(queue, false, consumer); } Override public void processConsumerMsg() { System.out.println(Begin to processConsumerMsg.); Integer stepTotal 1; Integer step 0; // TODO: 这种方式非常 Low后续会改造成阻塞 I/O 模式 while (true) { step ; try { System.out.println(processConsumerMsg cycle.); consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE, CommonConstants.QUERE_KEY_PRAISE); if (step.equals(stepTotal)) { Thread.sleep(10000); step 0; } } catch (Exception e) { } } } }这里只是给个示例如果要真正用到生产环境你觉得有哪些问题呢 你自己先想想文末再告诉你。调用入口其实之前我们是通过 Java 的内置异步调用方式为了方便验证我把文章点赞的功能迁移到 RabbitMQ 中只要是点赞就走 RabbitMQ 模式。// 点赞消息走 RabbitMQ其它走 Java 内置消息机制 if (notifyType.equals(NotifyTypeEnum.PRAISE) rabbitmqProperties.getSwitchFlag()) { rabbitmqService.publishMsg( CommonConstants.EXCHANGE_NAME_DIRECT, BuiltinExchangeType.DIRECT, CommonConstants.QUERE_KEY_PRAISE, JsonUtil.toStr(foot)); } else { Optional.ofNullable(notifyType).ifPresent(notify - SpringUtil.publishEvent(new NotifyMsgEvent(this, notify, foot))); }那消费入口放哪里呢其实是在程序启动的时候我们就启动 RabbitMQ 进行消费然后整个进程一直在程序中跑。Override public void run(ApplicationArguments args) { // 设置类型转换, 主要用于mybatis读取varchar/json类型数据据并写入到json格式的实体Entity中 JacksonTypeHandler.setObjectMapper(new ObjectMapper()); // 应用启动之后执行 GlobalViewConfig config SpringUtil.getBean(GlobalViewConfig.class); if (webPort ! null) { config.setHost(http://127.0.0.1: webPort); } // 启动 RabbitMQ 进行消费 if (rabbitmqProperties.getSwitchFlag()) { taskExecutor.execute(() - rabbitmqService.processConsumerMsg()); } log.info(启动成功点击进入首页: {}, config.getHost()); }4.3 演示一下我们多次点击“点赞”按钮触发 RammitMQ 消息发送。可以通过日志也可以看到发送和消费过的消息。我靠好多没有关闭的链接。。。还有一堆没有关闭的 channel。。。估计再多跑一会内存全部吃光机器就死机了怎么破答案是连接池4.4 代码分支为了方便大家学习功能演变的过程每个模块都会单独开个分支包括后面的升级版代码仓库https://github.com/itwanger/paicoding代码分支feature/add_rabbitmq_20230506如果需要运行 RabbitMQ下面的配置需要改成 true因为代码默认是 false。5 后记这篇文章让大家知道 RabbitMQ 的基本原理以及如何去集成 RabbitMQ但是还不能用到实际生产环境但是这个确实是我写的第一个版本存粹是搞着玩的因为里面存在的问题还非常多。我简单列举一下需要给 Connection 加个连接池否则内存会持续消耗机器肯定扛不住需要对 RabbitMQ 的消费方式进行改造因为 while sleep 的方式过于简单粗暴假如消费的任务挂掉了你需要有重启 RabbitMQ 的消费机制假如机器挂了重启后RabbitMQ 内部的消息不能丢失。如果你对上面的问题也非常感兴趣可以直接基于分支 feature/add_rabbitmq_20230506然后给我提 PR技术嘛我喜欢边玩边学。预告一下我后面会给 RabbitMQ 加个连接池代码已经写完了还是用 ChatGPT 帮忙完成的下一篇文章发出敬请期待