网站首页 > 技术文章 正文
- Ajax短轮询
- MQ
- redis 订阅/发布
Ajax短轮询
优点:
简单高效、浏览器使用循环不断地、间隔地发送请求获取数据
缺点:
频繁创建/断开连接,每次请求都会查询一遍数据不管有无都返回,对服务器业务处理的性能有很大的需求和压力;因为请求间有间隔时间,获取的数据是伪实时的,不适应对实时性要求很高的项目。
典型运用:
扫码登录
MQ
MQ的引入虽然 会造成技术的复杂度提升,但是合理的使用会极大的提高系统的 容错能力。
- 优点:
- 一般MQ都用于 系统解耦、流量削峰、数据分发
- 缺点:
如果MQ服务挂了,导致消息发送和接收就无法使用了
复杂度提高。
MQ的对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 |
rabbitmq 基于Elang语言编写 虽然提供了天然的高并发能力,但是 不利于 深入了解与掌握。
MQ的引入 需要保证两点:可靠性、高可用、幂等性。
mq想如果需要保证可靠性、在某些 对于实时性要求较高的 业务中,那么需要对消息进行持久化、以及保证消息的不丢失。
可靠性
结合三点就是生产者丢失消息、mq自身丢失消息、消费者丢失消息
MQ 若要保障消息的不丢失,对于rabbitmq来讲,常用的有两种方式:
1、开启事务
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
但是 对于rabbitmq 来说 开启事务 造成性能上的 浪费是很大的
消息数量 | 开启事务 | 未开启事务 |
10w | 320796ms | 10246ms |
开启事务 与不开启事务 对于 性能上的开销是 320倍。因为 其被 @Transaction注解 标注过,对于每条消息都会被事务拦截器拦截处理。
2、ACK机制
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费。
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
rabbitq提供了两个回调方法
confirm 与return 回调
confirm 是用于生产者发送消息,保证交换机exchange能正常收到,但是无法保证 从exchange的消息 正常发送给队列去消费。
return回调是处理一些 不可正确路由的消息,如exchange 不存在,或者就是路由key 无法正确找到队列。
这两种机制 是可靠性的 重要保障,可以保证消息正常的在mq中传递。
@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
}
}
/**
* Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
String messageId = message.getMessageProperties().getMessageId();
}
}
MQ的幂等性搭建:
ACK机制能保证消息一定能被消费但是无法保证消息被消息了几次,这就需要额外编码来保证幂等性,而rabbitmq没有提供额外的幂等操作需要额外代码保证。
MQ的高可用
rabbitmq的消息是储存在一个节点中,让mq的节点崩溃后 其存储的消息就会丢失,会造成服务的不可用,如果使用缓存使用一个持久化的queue,但是在message发送并写入磁盘之间会存在一个虽然短暂的时间差。
为了避免节点失效,将mq节点进行集群处理,当一个节点失效后 就有第二个节点接替前一个节点工作。单失效的那个节点上的消息无法被找回。
镜像队列的配置:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指n明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
可以通过下面命令判断那些slaves已经完成同步
rabbitmqctl list_queues name slave_pids synchronised_slave_pids
镜像队列的原理:
redis 订阅/通知
优缺点:
redis的 订阅通知 与rabbitmq相比,其优势体现在不想要搭建复杂笨重的 MQ ,简单轻量。但是由于redis没有类似于mq的消息持久化与ACK的保证,所以redis实现的发布/订阅功能并不可靠,仅适用于实时、且可靠性不高的场景(因为redis的订阅/发布目前是发送即忘的形式,如果客户端短线即会丢失)。如一些列消息的弹窗通知、有效期等等。
实现方式之一:
redis的键空间通知
配置:
- 首先找到redis.conf配置文件,打开文件,查找notify-keyspace-events,将前面的#去掉便可。注意:这里配置的是notify-keyspace-events的Ex参数,即说明,当键过时的时候会触发通知,若是只须要哈希命令键触发通知则能够设置为notify-keyspace-events Eh。
- 重启redis-server。
- 配置完成。
redis:
localhost: localhost
port: 6379
database: 7
password:
# 过期事件订阅,接收7号数据库中所有key的过期事件
listen-pattern: __keyevent@7__:expired
@Configuration
public class RedisListenerConfiguration {
@Value("${spring.redis.listen-pattern}")
public String pattern;
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnection);
/**
* Topic是消息发布(Pub)者和订阅(Sub)者之间的传输中介
*/
Topic topic = new PatternTopic(this.pattern);
container.addMessageListener(new RedisMessageListener(), topic);
return container;
}
}
监听:
public class RedisMessageListener implements MessageListener {
/**
* Redis 事件监听回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
}
}
当向redis订阅一个 过期时间的时候,当key过期的时候 redis会发送一个通知高速服务器,key事件已过期,然后 服务器可以执行自己的相关逻辑,可以在key过期的时候 执行一系列操作
4】三种通信方式的优缺点
短轮询 | 长轮询 | WebSocket | |
浏览器支持 | 几乎所有现代浏览器 | 几乎所有现代浏览器 | IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+ |
服务器负载 | 较少的CPU资源,较多的内存资源和带宽资源 | 与传统轮询相似,但是占用带宽较少 | 无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。三种方式里性能最佳。 |
客户端负载 | 占用较多的内存资源与请求数。 | 与传统轮询相似。 | 同Server-Sent Event。 |
延迟 | 非实时,延迟取决于请求间隔。 | 同传统轮询。 | 实时。 |
实现复杂度 | 非常简单。 | 需要服务器配合,客户端实现非常简单。 | 需要Socket程序实现和额外端口,客户端实现简单。 |
技术方案的选型:
关于业务场景,如果并发量不大,请求频率不高的情况下 选用轮询难度实现上小很多,而且容错率更高。如果在频繁请求资源,一次请求无法返回所有数据的情况下 适合使用websocket。具体需要看业务场景决定。
MQ的典型应用:
哔哩哔哩的弹幕技术架构
- Kafka(第三方服务)
消息队列系统。Kafka 是一个分布式的基于发布/订阅的消息系统,它是支持水平扩展的。每条发布到 Kafka 集群的消息都会打上一个名为 Topic(逻辑上可以被认为是一个 queue)的类别,起到消息分布式分发的作用。 - Router
存储消息。Comet 将信息传送给 Logic 之后,Logic 会对所收到的信息进行存储,采用 register session 的方式在 Router 上进行存储。Router 里面会收录用户的注册信息,这样就可以知道用户是与哪个机器建立的连接。 - Logic
对消息进行逻辑处理。用户建立连接之后会将消息转发给 Logic ,在 Logic 上可以进行账号验证。当然,类似于 IP 过滤以及黑名单设置此类的操作也可以经由 Logic 进行。 - Comet
维护客户端长链接。在上面可以规定一些业务需求,比如可以规定用户传送的信息的内容、输送用户信息等。Comet 提供并维持服务端与客户端之间的链接,这里保证链接可用性的方法主要是发送链接协议(如 Socket 等)。 - Client
客户端。与 Comet 建立链接。 - Jop
消息分发。可以起多个 Jop 模块放到不同的机器上进行覆盖,将消息收录之后,分发到所有的 Comet 上,之后再由 Comet 转发出去。
MQ的实现延迟的方式
rabbitmq:
rabbitmq并没有提供 原生的 延迟队列的实现方式,如果要实现延迟的效果可以使用 死信队列的方式
“死信”是RabbitMQ中的一种消息机制,死信是当MQ出现以下情况的时候:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度
如何配置死信队列
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
最后总结:选用哪种推送技术需要根据具体的业务场景,一句话一切脱离业务的设计都是耍流氓,不能杀鸡用牛刀,也不能不考虑未来。
猜你喜欢
- 2024-10-17 一次教会你如何解决RabbitMQ消息丢失问题
- 2024-10-17 RabbitMQ消息可靠性分析和应用 rabbitmq消息数据类型
- 2024-10-17 基于本地消息表实现MQ最终一致性 本地消息表(异步确保)
- 2024-10-17 MQ怎么确保不丢数据 mq防止数据丢失
- 2024-10-17 Java面试必备!RabbitMQ 常用知识点总结,纯手绘23张图带你拿下
- 2024-10-17 SpringBoot+RabbitMQ 实现 RPC 调用
- 2024-10-17 RabbitMQ消息更多细节 rabbitmq消息堆积怎么解决
- 2024-10-17 springboot+rabbitmq+消息发送确认
- 2024-10-17 Rabbitmq消费端实战 rabbitmq官网
- 2024-10-17 每日学习~RabbitMQ消息应答机制 rabbit mq五种消息模型
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)