计算机系统应用教程网站

网站首页 > 技术文章 正文

从放弃到入门:消息队列之RocketMQ

btikc 2025-01-23 15:32:33 技术文章 18 ℃ 0 评论


一、消息队列(MQ)基本概念

1、什么是 MQ ?

MQ:message queen,消息队列(先进先出的数据结构)

2、应用场景

① 应用解耦

系统的耦合性越高,容错率越低。比如一个订单系统,包含支付系统,库存系统,物流系统,如果耦合调用,任何一个子系统出现故障或者升级等原因不可用,会造成下单异常,影响用户体验。


② 流量削峰

比如“秒杀活动”,系统请求流量瞬间猛增,消息队列可以把大量请求缓存起来,分散到很长时间去处理,从而提高系统稳定性和用户体验。

③ 数据分发

需求总是在不断变化,比如 A 系统 需要向 B,C,D,E 分发数据,如果 D, E系统有调整,需要修改 A 系统代码。

3、MQ 的优缺点

① 优点

解耦、削峰、数据分发

② 缺点

I、系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。如果 MQ 宕机,业务会影响。

II、系统复杂度提高

MQ 增加了系统的复杂度,未使用前是同步远程调用,使用后异步调用。

III、一致性问题

A 系统处理完业务,通过 MQ 分发 B、C、D、E,如何保证 B、C、D、E之间的一致性问题

4、各种 MQ 产品比较

二、RocketMQ

RocketMQ 是阿里 2016 年 MQ 中间件,使用 Java 语言开发

1、RocketMQ 环境

2020 最新版本是 4.7.1

2、启动 RocketMQ

① 启动 NameServer

② 启动 Broker

3、RocketMQ 集群角色

官方文档:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/


四个组成部分:

① Name Server:管理 Broker,类似服务员;

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

② Broker:暂存和传输信息,类似菜单;

Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

③ Producer 集群:消息的发送者,类似厨师;

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

④ Consumer 集群:消息的接受者,类似顾客;

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

⑤ Topic :区分消息的种类

一个发送者可以发消息给 1-N 个 Topic,一个消息的接受者可以订阅 1-N 个 Topic 消息

⑥ Message Queue :相当于 Topic 的子类别,用于并行发送和接收消息


4、RocketMQ 集群架构(2m-2s同步双写)

工作流程:

① 启动 NameServer,NameServer 启动成功监听端口,等待 Broker、Producer、Consumer 连接,相当于路由中心;

② Broker 启动,跟所有 NameServer 保持长连接,定时发送心跳包(包含 Broker 信息以及存储的所有 Topic 信息)。注册成功后,NameServer 集群就有 Topic 跟 Broker 的映射关系;

③ 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 存储在那些 Broker 上,也可以在发送消息时自动创建(启动 broker 需要加上 autoCreateTopicEnable=true );

④ Producer 发送消息,启动时先跟 NameServer 集群中的一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接,从而向 Broker 发消息;

⑤ Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 中,然后直接跟 Broker 建立通道,开始消费消息。

5、RocketMQ 可视化界面

Github 项目地址:https://github.com/apache/rocketmq-externals

application.properties 修改成自己的配置

执行编译后的 jar 包 :java -jar 包名

6、Java RocketMQ 使用

① Maven 依赖

② 生产者发送消息步骤

  • 创建 Producer,并制定生产者组名
  • 制定 NameServer 地址
  • 启动 Producer
  • 创建消息对象,指定主题 Topic 、Tag 和 消息体
  • 发送消息
  • 关闭 Producer

③ 消费者消费消息步骤

  • 创建 Consumer,并制定消费者组名
  • 制定 NameServer 地址
  • 订阅主题 Topic 和 Tag
  • 设置回调函数,处理消息
  • 启动 Consumer

7、SpringBoot 集成 RocketMQ 同步发送消息

以下展示的是 MQ 发送同步消息,可靠性高,使用场景:重要消息通知,短信通知

provider 项目

I. application.properties

II、RocketMQ MQProducerConfiguration 配置类

III、发送消息

打印日志:

8、SpringBoot 集成 RocketMQ 异步发送消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间等待 Broker 的响应,可靠性比同步低。

9、发送单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送


10、SpringBoot 集成 RocketMQ 同步接受消息

Consumer

I. application.properties

II、RocketMQ MQConsumerConfiguration 配置类

注意:消息类有 2 种,一是 BROADCASTING(广播模式),二是 CLUSTERING(默认是集群模式)

当 Consumer 使用集群模式时,每条消息只会被 Consumer 集群内的任意一个 Consumer 实例消费一次。

当 Consumer 使用广播模式时,每条消息都会被 Consumer 集群内所有的 Consumer 实例消费一次。

III、消费消息

打印日志:

欢迎关注 @Python 大星 ,一个会点 Python 的 Java 程序员。如果你有更好的想法,欢迎留言,一起探讨,想说你就说啊!后面继续分享 Java 的相关开发,人少就散了吧!@Python 大星

@Python 大星 | 文


本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表