计算机系统应用教程网站

网站首页 > 技术文章 正文

SpringBoot整合Emqtt实现消息推送,含EMQ集群搭建调优

btikc 2024-10-12 11:54:47 技术文章 2 ℃ 0 评论
如果你喜欢SpringBoot的干货,可以关注我,这都是我工作中用到的技术,都是切实可用的,我会继续分享,谢谢你

0、什么是EMQTT

Emqtt是一个开源的消息推送中间件,基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。官网:http://www.emqtt.com/

曾经我们需要做消息推送时候使用到的这个中间件,再次分享我们使用的流程和部分解决方案,代码是可以跑的,但是软件版本可能不是最新的。可以自行改为最新版本。最后面有集群搭建方案和集群调优、以及Demo下载

1、添加依赖

<dependency>
 <groupId>org.eclipse.paho</groupId>
 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 <version>1.1.0</version>
</dependency>

2、具体Demo,里面使用了一个callback

public class EmqqClient {
 public static void main(String[] args) {
 EmqqClient.sendMsg("client/demo", "来自demo");
 }
 /**
 *
 * @param sendTopic 话题
 * @param content 发送的内容
 * @return
 */
 public static Boolean sendMsg(String sendTopic, String content) {
 int qos = 2;
 String serverHost = "tcp://45.78.4.000:1883"; //服务器主机
 //服务器主机列表,可以是集群中所有节点
 String hosts []= {"tcp://45.78.4.000:1883","tcp://emq.vsalw.com:1883"}; 
 MemoryPersistence persistence = new MemoryPersistence();
 try {
 MqttClient sampleClient = new MqttClient(serverHost, "server", persistence);
 MqttConnectOptions connOpts = new MqttConnectOptions();
 connOpts.setCleanSession(true);//会话保持,是否以新用户身份链接
 connOpts.setUserName("xxx");//客户端用户名
 connOpts.setPassword("xxxx".toCharArray());
 //设置会话心跳时间 秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
 connOpts.setKeepAliveInterval(20);
 connOpts.setWill("client/close", "已经关闭".getBytes(), 0, true);//断开链接的时候发消息通知到某个topic
 //此处设置了连接多个节点,一个节点挂掉会自动切换到其他节点,当节点恢复又会恢复连接原节点
 //注意在程序启动的时候,指定的一个serverHosts必须指定一个节点,如果此节点不可用,会自动连接其他可用节点 
 connOpts.setServerURLs(hosts);
 sampleClient.connect(connOpts);
 //自己订阅自己 可以订阅多个
 sampleClient.subscribe("client/#");//订阅的topic
 MqttMessage message = new MqttMessage(content.getBytes());
 message.setQos(qos);
 //只能接收到最新消息,如果需要接收全部消息,发生到 clinet/update/msg/系统时间 
 //接收端接收 clinet/update/msg/# 通配符接收所有
 message.setRetained(true); //离线期间的消息,上线后可以再次接收到,只接收最新的一条信息
 sampleClient.publish(sendTopic, message); //发布消息
 sampleClient.setCallback(new PushCallback());//设置接受到消息处理类
 } catch (MqttException me) {
 System.out.println("reason " + me.getReasonCode());
 System.out.println("msg " + me.getMessage());
 System.out.println("loc " + me.getLocalizedMessage());
 System.out.println("cause " + me.getCause());
 System.out.println("excep " + me);
 me.printStackTrace();
 return false;
 }
 return true;
 }
 /**
 * 三种消息传输方式QoS:
 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,
 丢失一次读记录无所谓,因为不久后还会有第二次发送。
 1代表“至少一次”,确保消息到达,但消息重复可能会发生。
 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
 备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1
 */
}
//下面是一个类,PushCallback ,定义了几种对消息的处理,比如接收到消息,发送成功后,还是连接丢失时候。
public class PushCallback implements MqttCallback {
 @Override
 public void connectionLost(Throwable throwable) {
 System.out.println("链接丢失时候调用");
 }
 @Override
 public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
 System.out.println("接收到消息主题:"+s);
 System.out.println("接收到消息内容:"+new String(mqttMessage.getPayload()));
 System.out.println("接收到消息是否保留消息:"+mqttMessage.isRetained());
 }
 @Override
 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
 System.out.println("消息发生成功后调用");
 }
}

3、Emqttd安装和集群搭建,系统调优(系统版本Centos 7.0 )

1、安装依赖的一个工具:

yum install lksctp-tools

2、Emqttd安装和集群搭建(系统版本Centos 7.0 ) 此处使用绿色版本安装

wget http://emqtt.com/static/brokers/emqttd-centos7-v2.2.0.zip

unzip emqttd-centos7-v2.2.0.zip

cd emqttd/bin

./emqttd start

3、开放防火墙端口1883和10183,4369,和6000-6999端口范围

Centos7开放端口

firewall-cmd --zone=public --add-port=1883/tcp --permanent

firewall-cmd --zone=public --add-port=10183/tcp --permanent

firewall-cmd --zone=public --add-port= 4369/tcp --permanent

firewall-cmd –reload

centos 6直接关闭防火墙,禁掉开机自启

service iptables stop

chkconfig iptables off

4、修改节点名称,分别修改成自己的公网IP,2台都要设置

修改文件 emqttd/etc/emq.conf

node.name = emqttd@45.78.4.253 注意是公网IP

5、启动停止命令

systemctl start emqttd.service

systemctl stop emqttd.service

systemctl status emqttd.service

6、启动成功后可以登录 http://ip:18083 账号admin/public 可以看到节点正在运行

7、节点A上加入节点B, 同样或者在节点B里面加入节点A.命令如下:

emqttd_ctl cluster join emqttd@45.78.4.253

离开集群的命令 emqttd_ctl cluster leave

如果找不到emqttd_ctl命令,可以使用whereis emqttd_ctl查找

8、启用客户端用户名密码认证

etc/emqttd/plugins/emq_auth_username.conf中配置默认用户:

auth.user.$N.username = admin

auth.user.$N.password = public

启用`emq_auth_username`_插件:

emqttd_ctl plugins load emq_auth_username

此时随便一个账号密码都能连接上,因为默认匿名访问是开的,需要关掉

etc/emq.conf配置启用匿名认证:

## Allow Anonymous authentication

mqtt.allow_anonymous = true 改为false

9、安装完毕后有可能在测试的时候,连接数在990左右就再也连接不上了,是因为系统的设置, 某些系统设置同时最大打开文件数是1024,所以要去掉这个限制。步骤如下:

1.以此执行命令:

sysctl -w fs.file-max=2097152

sysctl -w fs.nr_open=2097152

echo 2097152 > /proc/sys/fs/nr_open

ulimit -n 1048576

2.设置’fs.file-max’路径是/etc/sysctl.conf

fs.file-max = 1048576

3.设置limits 文件路径是/etc/security/limits.conf

持久化设置允许用户/进程打开文件句柄数:

* soft nofile 1048576

* hard nofile 1048576

4.让配置立即生效 sysctl -p

5.查看当前配置 ulimit -a

附:通用包安装和常用命令:

yum install lksctp-tools

wget http://emqtt.com/static/brokers/emqttd-centos7-v2.2.0.zip

unzip emqttd-centos7-v2.2.0.zip

./emqttd console 在控制台启动,日志打印在控制台

./emqttd start 以服务方式启动,不是注册到系统服务

./emqttd stop | restart 停止|重启

./emqttd_ctl status 查看当前状态

5、Demo下载

https://vsalw.com/1270.html

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表