网站首页 > 技术文章 正文
写了一个小demo, 用于发送和接收消息,下面创建一个工作队列,向多个消费者分发耗时的任务
图是这样子的
代码学习
生产者
package com.tgb.kwy.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* Description
*生产
* @author kongwy 15732621629@163.com
* @version 1.0
* @date 2018-07-08-20 -58
*/
public class NewTask {
private static final String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/
/*指定用户名和密码*/
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
String message = getMessage(args);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings){
if(strings.length<1){
return "Hello World!";
}
return joinStrings(strings," ");
}
private static String joinStrings(String[] strings,String delimiter){
int length=strings.length;
if(length==0){
return "";
}
StringBuilder words=new StringBuilder(strings[0]);
for (int i = 0; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
消费者代码
package com.tgb.kwy.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Description
*
* @author kongwy 15732621629@163.com
* @version 1.0
* @date 2018-07-08-21 -03
*/
public class Worker {
private static final String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.159.132");/*设置rabbitmq所在主机ip或主机名*/
/*指定用户名和密码*/
factory.setUsername("admin");
factory.setPassword("admin");
final Connection connection=factory.newConnection();
final Channel channel=connection.createChannel();
//声明队列,主要为了防止消息接受者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//channel.basicQos(1); // accept only one unack-ed message at a time (see below)
int prefetchCount=1;
channel.basicQos(prefetchCount);
final Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumeTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
String message=new String(body,"UTF-8");
System.out.println(" [x] Received '" + message + "'");
try{
doWork(message);
}finally {
System.out.println("[x] Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);
}
private static void doWork(String task){
for(char ch: task.toCharArray()){
if(ch == '.'){
try {
Thread.sleep(1000);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
运行结果
任务分发
轮询分发
rabbitmq将逐个发送消息到序列中的写一个消费者,平均每个消费者获得的数量是相同的
Fair dispatch
如果这里有两个消费者的话,可能一个消费者非常的忙碌,但是另一个消费者,几乎一直闲着.但是rabbitmq,还是会均匀地发送消息. 因为当消息进入队列后, rabbitmq就会分配消息,不会看消费者未确认消息的数量.所以为了解决这个问题,就使用了basicQos(prefetchCount=1)方法,来限制rabbitmq只发不超过1条的消息给同一个消费者.当消费者处理完毕后,得到反馈,再发送下一次
注:代码来自官网.官网对于概念性的内容,讲解的还是很清楚的
- 上一篇: RabbitMQ消息反序列化失败问题回顾
- 下一篇: RabbitMQ原理和架构图解(附6大工作模式)
猜你喜欢
- 2024-10-02 「MQ实战」RabbitMQ 延迟队列,消息延迟推送
- 2024-10-02 未读消息(小红点),RabbitMQ实时消息推送实践,贼简单
- 2024-10-02 RabbitMQ——延迟队列,消息延迟推送
- 2024-10-02 RabbitMQ实现即时通讯居然如此简单
- 2024-10-02 RabbitMQ没有延时队列?我就教你一招,玩转延时队列
- 2024-10-02 RabbitMQ详解,用心看完这一篇就够了
- 2024-10-02 RabbitMQ原理与相关操作(三)消息持久化
- 2024-10-02 Java互联网架构-互联网大厂面试必备RabbitMQ
- 2024-10-02 RabbitMQ的介绍及使用进阶(Docker+.Net Core)
- 2024-10-02 快速尝鲜:RabbitMQ 搭建完就得用起来
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)