计算机系统应用教程网站

网站首页 > 技术文章 正文

一文搞懂消息推送技术选型 消息推送的几种实现方式

btikc 2024-10-17 08:46:24 技术文章 6 ℃ 0 评论


  1. Ajax短轮询
  2. MQ
  3. redis 订阅/发布

Ajax短轮询

优点:

简单高效、浏览器使用循环不断地、间隔地发送请求获取数据

缺点:

频繁创建/断开连接,每次请求都会查询一遍数据不管有无都返回,对服务器业务处理的性能有很大的需求和压力;因为请求间有间隔时间,获取的数据是伪实时的,不适应对实时性要求很高的项目。

典型运用:

扫码登录

MQ

MQ的引入虽然 会造成技术的复杂度提升,但是合理的使用会极大的提高系统的 容错能力。

  • 优点:
  • 一般MQ都用于 系统解耦、流量削峰、数据分发
  • 缺点:

如果MQ服务挂了,导致消息发送和接收就无法使用了

复杂度提高。

MQ的对比

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

10万级,RocketMQ也是可以支撑高吞吐的一种MQ

10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景

topic数量对吞吐量的影响



topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic

topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源

时效性

ms级

微秒级,这是rabbitmq的一大特点,延迟是最低的

ms级

延迟在ms级以内

可用性

高,基于主从架构实现高可用性

高,基于主从架构实现高可用性

非常高,分布式架构

非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用

消息可靠性

有较低的概率丢失数据

有较低的概率丢失数据

经过参数优化配置,可以做到0丢失

经过参数优化配置,消息可以做到0丢失

rabbitmq 基于Elang语言编写 虽然提供了天然的高并发能力,但是 不利于 深入了解与掌握。

MQ的引入 需要保证两点:可靠性、高可用、幂等性。

mq想如果需要保证可靠性、在某些 对于实时性要求较高的 业务中,那么需要对消息进行持久化、以及保证消息的不丢失。

可靠性

结合三点就是生产者丢失消息、mq自身丢失消息、消费者丢失消息

MQ 若要保障消息的不丢失,对于rabbitmq来讲,常用的有两种方式:

1、开启事务

// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit

但是 对于rabbitmq 来说 开启事务 造成性能上的 浪费是很大的

消息数量

开启事务

未开启事务

10w

320796ms

10246ms

开启事务 与不开启事务 对于 性能上的开销是 320倍。因为 其被 @Transaction注解 标注过,对于每条消息都会被事务拦截器拦截处理。

2、ACK机制

关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费。

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true

@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

}

rabbitq提供了两个回调方法

confirm 与return 回调

confirm 是用于生产者发送消息,保证交换机exchange能正常收到,但是无法保证 从exchange的消息 正常发送给队列去消费。

return回调是处理一些 不可正确路由的消息,如exchange 不存在,或者就是路由key 无法正确找到队列。

这两种机制 是可靠性的 重要保障,可以保证消息正常的在mq中传递。

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
}
}
/**
* Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
String messageId = message.getMessageProperties().getMessageId();
}
}

MQ的幂等性搭建:

ACK机制能保证消息一定能被消费但是无法保证消息被消息了几次,这就需要额外编码来保证幂等性,而rabbitmq没有提供额外的幂等操作需要额外代码保证。

MQ的高可用

rabbitmq的消息是储存在一个节点中,让mq的节点崩溃后 其存储的消息就会丢失,会造成服务的不可用,如果使用缓存使用一个持久化的queue,但是在message发送并写入磁盘之间会存在一个虽然短暂的时间差。

为了避免节点失效,将mq节点进行集群处理,当一个节点失效后 就有第二个节点接替前一个节点工作。单失效的那个节点上的消息无法被找回。

镜像队列的配置:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指n明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级

rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

可以通过下面命令判断那些slaves已经完成同步

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

镜像队列的原理:

redis 订阅/通知

优缺点:

redis的 订阅通知 与rabbitmq相比,其优势体现在不想要搭建复杂笨重的 MQ ,简单轻量。但是由于redis没有类似于mq的消息持久化与ACK的保证,所以redis实现的发布/订阅功能并不可靠,仅适用于实时、且可靠性不高的场景(因为redis的订阅/发布目前是发送即忘的形式,如果客户端短线即会丢失)。如一些列消息的弹窗通知、有效期等等。

实现方式之一:

redis的键空间通知

配置:

  1. 首先找到redis.conf配置文件,打开文件,查找notify-keyspace-events,将前面的#去掉便可。注意:这里配置的是notify-keyspace-events的Ex参数,即说明,当键过时的时候会触发通知,若是只须要哈希命令键触发通知则能够设置为notify-keyspace-events Eh。
  2. 重启redis-server。
  3. 配置完成。

redis:
localhost: localhost
port: 6379
database: 7
password:
# 过期事件订阅,接收7号数据库中所有key的过期事件
listen-pattern: __keyevent@7__:expired

@Configuration
public class RedisListenerConfiguration {
@Value("${spring.redis.listen-pattern}")
public String pattern;
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnection);
/**
* Topic是消息发布(Pub)者和订阅(Sub)者之间的传输中介
*/
Topic topic = new PatternTopic(this.pattern);
container.addMessageListener(new RedisMessageListener(), topic);
return container;
}
}

监听:
public class RedisMessageListener implements MessageListener {
/**
* Redis 事件监听回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {

}
}

当向redis订阅一个 过期时间的时候,当key过期的时候 redis会发送一个通知高速服务器,key事件已过期,然后 服务器可以执行自己的相关逻辑,可以在key过期的时候 执行一系列操作

4】三种通信方式的优缺点


短轮询

长轮询

WebSocket

浏览器支持

几乎所有现代浏览器

几乎所有现代浏览器

IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+

服务器负载

较少的CPU资源,较多的内存资源和带宽资源

与传统轮询相似,但是占用带宽较少

无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。三种方式里性能最佳。

客户端负载

占用较多的内存资源与请求数。

与传统轮询相似。

同Server-Sent Event。

延迟

非实时,延迟取决于请求间隔。

同传统轮询。

实时。

实现复杂度

非常简单。

需要服务器配合,客户端实现非常简单。

需要Socket程序实现和额外端口,客户端实现简单。

技术方案的选型:

关于业务场景,如果并发量不大,请求频率不高的情况下 选用轮询难度实现上小很多,而且容错率更高。如果在频繁请求资源,一次请求无法返回所有数据的情况下 适合使用websocket。具体需要看业务场景决定。

MQ的典型应用:

哔哩哔哩的弹幕技术架构

  • Kafka(第三方服务)
    消息队列系统。Kafka 是一个分布式的基于发布/订阅的消息系统,它是支持水平扩展的。每条发布到 Kafka 集群的消息都会打上一个名为 Topic(逻辑上可以被认为是一个 queue)的类别,起到消息分布式分发的作用。
  • Router
    存储消息。Comet 将信息传送给 Logic 之后,Logic 会对所收到的信息进行存储,采用 register session 的方式在 Router 上进行存储。Router 里面会收录用户的注册信息,这样就可以知道用户是与哪个机器建立的连接。
  • Logic
    对消息进行逻辑处理。用户建立连接之后会将消息转发给 Logic ,在 Logic 上可以进行账号验证。当然,类似于 IP 过滤以及黑名单设置此类的操作也可以经由 Logic 进行。
  • Comet
    维护客户端长链接。在上面可以规定一些业务需求,比如可以规定用户传送的信息的内容、输送用户信息等。Comet 提供并维持服务端与客户端之间的链接,这里保证链接可用性的方法主要是发送链接协议(如 Socket 等)。
  • Client
    客户端。与 Comet 建立链接。
  • Jop
    消息分发。可以起多个 Jop 模块放到不同的机器上进行覆盖,将消息收录之后,分发到所有的 Comet 上,之后再由 Comet 转发出去。

MQ的实现延迟的方式

rabbitmq:

rabbitmq并没有提供 原生的 延迟队列的实现方式,如果要实现延迟的效果可以使用 死信队列的方式

“死信”是RabbitMQ中的一种消息机制,死信是当MQ出现以下情况的时候:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度

如何配置死信队列

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}


最后总结:选用哪种推送技术需要根据具体的业务场景,一句话一切脱离业务的设计都是耍流氓,不能杀鸡用牛刀,也不能不考虑未来。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表