网站首页 > 技术文章 正文
如果你喜欢SpringBoot的干货,可以关注我,这都是我工作中用到的技术,都是切实可用的,我会继续分享,谢谢你
0、什么是EMQTT
Emqtt是一个开源的消息推送中间件,基于高并发的Erlang/OTP语言平台设计,支持百万级连接和分布式集群,发布订阅模式的开源MQTT消息服务器。完整支持MQTT V3.1/V3.1.1协议规范,扩展支持WebSocket、Stomp、CoAP、MQTT-SN或私有TCP协议。官网:http://www.emqtt.com/
曾经我们需要做消息推送时候使用到的这个中间件,再次分享我们使用的流程和部分解决方案,代码是可以跑的,但是软件版本可能不是最新的。可以自行改为最新版本。最后面有集群搭建方案和集群调优、以及Demo下载
1、添加依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0</version> </dependency>
2、具体Demo,里面使用了一个callback
public class EmqqClient { public static void main(String[] args) { EmqqClient.sendMsg("client/demo", "来自demo"); } /** * * @param sendTopic 话题 * @param content 发送的内容 * @return */ public static Boolean sendMsg(String sendTopic, String content) { int qos = 2; String serverHost = "tcp://45.78.4.000:1883"; //服务器主机 //服务器主机列表,可以是集群中所有节点 String hosts []= {"tcp://45.78.4.000:1883","tcp://emq.vsalw.com:1883"}; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(serverHost, "server", persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true);//会话保持,是否以新用户身份链接 connOpts.setUserName("xxx");//客户端用户名 connOpts.setPassword("xxxx".toCharArray()); //设置会话心跳时间 秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 connOpts.setKeepAliveInterval(20); connOpts.setWill("client/close", "已经关闭".getBytes(), 0, true);//断开链接的时候发消息通知到某个topic //此处设置了连接多个节点,一个节点挂掉会自动切换到其他节点,当节点恢复又会恢复连接原节点 //注意在程序启动的时候,指定的一个serverHosts必须指定一个节点,如果此节点不可用,会自动连接其他可用节点 connOpts.setServerURLs(hosts); sampleClient.connect(connOpts); //自己订阅自己 可以订阅多个 sampleClient.subscribe("client/#");//订阅的topic MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); //只能接收到最新消息,如果需要接收全部消息,发生到 clinet/update/msg/系统时间 //接收端接收 clinet/update/msg/# 通配符接收所有 message.setRetained(true); //离线期间的消息,上线后可以再次接收到,只接收最新的一条信息 sampleClient.publish(sendTopic, message); //发布消息 sampleClient.setCallback(new PushCallback());//设置接受到消息处理类 } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); return false; } return true; } /** * 三种消息传输方式QoS: 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据, 丢失一次读记录无所谓,因为不久后还会有第二次发送。 1代表“至少一次”,确保消息到达,但消息重复可能会发生。 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1 */ } //下面是一个类,PushCallback ,定义了几种对消息的处理,比如接收到消息,发送成功后,还是连接丢失时候。 public class PushCallback implements MqttCallback { @Override public void connectionLost(Throwable throwable) { System.out.println("链接丢失时候调用"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("接收到消息主题:"+s); System.out.println("接收到消息内容:"+new String(mqttMessage.getPayload())); System.out.println("接收到消息是否保留消息:"+mqttMessage.isRetained()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("消息发生成功后调用"); } }
3、Emqttd安装和集群搭建,系统调优(系统版本Centos 7.0 )
1、安装依赖的一个工具:
yum install lksctp-tools
2、Emqttd安装和集群搭建(系统版本Centos 7.0 ) 此处使用绿色版本安装
wget http://emqtt.com/static/brokers/emqttd-centos7-v2.2.0.zip
unzip emqttd-centos7-v2.2.0.zip
cd emqttd/bin
./emqttd start
3、开放防火墙端口1883和10183,4369,和6000-6999端口范围
Centos7开放端口
firewall-cmd --zone=public --add-port=1883/tcp --permanent
firewall-cmd --zone=public --add-port=10183/tcp --permanent
firewall-cmd --zone=public --add-port= 4369/tcp --permanent
firewall-cmd –reload
centos 6直接关闭防火墙,禁掉开机自启
service iptables stop
chkconfig iptables off
4、修改节点名称,分别修改成自己的公网IP,2台都要设置
修改文件 emqttd/etc/emq.conf
node.name = emqttd@45.78.4.253 注意是公网IP
5、启动停止命令
systemctl start emqttd.service
systemctl stop emqttd.service
systemctl status emqttd.service
6、启动成功后可以登录 http://ip:18083 账号admin/public 可以看到节点正在运行
7、节点A上加入节点B, 同样或者在节点B里面加入节点A.命令如下:
emqttd_ctl cluster join emqttd@45.78.4.253
离开集群的命令 emqttd_ctl cluster leave
如果找不到emqttd_ctl命令,可以使用whereis emqttd_ctl查找
8、启用客户端用户名密码认证
etc/emqttd/plugins/emq_auth_username.conf中配置默认用户:
auth.user.$N.username = admin
auth.user.$N.password = public
启用`emq_auth_username`_插件:
emqttd_ctl plugins load emq_auth_username
此时随便一个账号密码都能连接上,因为默认匿名访问是开的,需要关掉
etc/emq.conf配置启用匿名认证:
## Allow Anonymous authentication
mqtt.allow_anonymous = true 改为false
9、安装完毕后有可能在测试的时候,连接数在990左右就再也连接不上了,是因为系统的设置, 某些系统设置同时最大打开文件数是1024,所以要去掉这个限制。步骤如下:
1.以此执行命令:
sysctl -w fs.file-max=2097152
sysctl -w fs.nr_open=2097152
echo 2097152 > /proc/sys/fs/nr_open
ulimit -n 1048576
2.设置’fs.file-max’路径是/etc/sysctl.conf
fs.file-max = 1048576
3.设置limits 文件路径是/etc/security/limits.conf
持久化设置允许用户/进程打开文件句柄数:
* soft nofile 1048576
* hard nofile 1048576
4.让配置立即生效 sysctl -p
5.查看当前配置 ulimit -a
附:通用包安装和常用命令:
yum install lksctp-tools
wget http://emqtt.com/static/brokers/emqttd-centos7-v2.2.0.zip
unzip emqttd-centos7-v2.2.0.zip
./emqttd console 在控制台启动,日志打印在控制台
./emqttd start 以服务方式启动,不是注册到系统服务
./emqttd stop | restart 停止|重启
./emqttd_ctl status 查看当前状态
5、Demo下载
https://vsalw.com/1270.html
猜你喜欢
- 2024-10-12 EMQ百万级MQTT消息服务(小技巧) mqtt消息级别
- 2024-10-12 开源物联网MQTT 5.0服务器——EMQX服务集群搭建教程
- 2024-10-12 EMQX+阿里云飞天洛神云网络NLB:MQTT消息亿级并发 千万级吞吐性能
- 2024-10-12 QUIC 协议:特性、应用场景及其对物联网/车联网的影响
- 2024-10-12 Nordic助力智能插头以蜂窝物联网和低功耗蓝牙提供电器远程能源管理
- 2024-10-12 Windows环境下安装配置Mosquitto服务及入门操作介绍
- 2024-10-12 共享订阅--MQTT 5.0新特性 mqtt协议 共享单车
- 2024-10-12 EMQX——架构设计解析 im 架构设计
- 2024-10-12 两款常用的 MQTT 调试工具 mqtt客户端调试工具
- 2024-10-12 物联网MQTT在电动车共享充电项目中的应用
你 发表评论:
欢迎- 最近发表
-
- 在 Spring Boot 项目中使用 activiti
- 开箱即用-activiti流程引擎(active 流程引擎)
- 在springBoot项目中整合使用activiti
- activiti中的网关是干什么的?(activiti包含网关)
- SpringBoot集成工作流Activiti(完整源码和配套文档)
- Activiti工作流介绍及使用(activiti工作流会签)
- SpringBoot集成工作流Activiti(实际项目演示)
- activiti工作流引擎(activiti工作流引擎怎么用)
- 工作流Activiti初体验及在数据库中生成的表
- Activiti工作流浅析(activiti6.0工作流引擎深度解析)
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)