MqttServer.java 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package com.gyee.dataadapter.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.eclipse.paho.client.mqttv3.*;
  4. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.stereotype.Service;
  8. import javax.annotation.Resource;
  9. /**
  10. * 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
  11. * 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
  12. */
  13. @Slf4j
  14. @Service
  15. public class MqttServer {
  16. /* 订阅者客户端对象 */
  17. private MqttClient subsribeClient;
  18. /**
  19. * 发布者客户端对象
  20. * 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
  21. * 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
  22. * 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
  23. */
  24. private MqttClient publishClient;
  25. /* 主题对象 */
  26. public MqttTopic topic;
  27. /* 消息内容对象 */
  28. public MqttMessage message;
  29. @Resource
  30. private MqttConnect mqttConnect;
  31. @Resource
  32. private MqttConfig config;
  33. private String topicStr;
  34. private int qos;
  35. public String getTopicStr() {
  36. return topicStr;
  37. }
  38. public void setTopicStr(String topicStr) {
  39. this.topicStr = topicStr;
  40. }
  41. public int getQos() {
  42. return qos;
  43. }
  44. public void setQos(int qos) {
  45. this.qos = qos;
  46. }
  47. public MqttServer() {
  48. log.info("9011上线了");
  49. }
  50. /**
  51. * 发布者客户端和服务端建立连接
  52. */
  53. public MqttClient publishConnect() throws MqttException {
  54. //防止重复创建MQTTClient实例
  55. if (publishClient == null) {
  56. //先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
  57. publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
  58. //发布消息不需要回调连接
  59. //client.setCallback(new PushCallback());
  60. }
  61. MqttConnectOptions options = mqttConnect.getOptions();
  62. //判断拦截状态*
  63. if (!publishClient.isConnected()) {
  64. publishClient.connect(options);
  65. log.info("---------------------连接成功");
  66. } else {//这里的逻辑是如果连接成功就重新连接
  67. publishClient.disconnect();
  68. publishClient.connect(mqttConnect.getOptions(options));
  69. log.info("---------------------连接成功");
  70. }
  71. return publishClient;
  72. }
  73. /**
  74. * 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
  75. * 断线重连方法,如果是持久订阅,重连时不需要再次订阅
  76. * 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
  77. * true为非持久订阅
  78. */
  79. public void subsribeConnect() {
  80. try {
  81. //防止重复创建MQTTClient实例
  82. if (subsribeClient == null) {
  83. //clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
  84. System.out.println("host: " + config.getHost() + " " + "客户端id: " + config.getClientid());
  85. subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
  86. //如果是订阅者则添加回调类,发布不需要,PushCallback类在后面
  87. subsribeClient.setCallback(new PushCallback(MqttServer.this));
  88. }
  89. MqttConnectOptions options = mqttConnect.getOptions();
  90. //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
  91. if (!subsribeClient.isConnected()) {
  92. subsribeClient.connect(options);
  93. } else {//这里的逻辑是如果连接成功就重新连接
  94. subsribeClient.disconnect();
  95. subsribeClient.connect(mqttConnect.getOptions(options));
  96. }
  97. log.info("----------Mqtt客户端连接成功");
  98. } catch (MqttException e) {
  99. log.info(e.getMessage(), e);
  100. }
  101. }
  102. /**
  103. * 把组装好的消息发出去
  104. */
  105. public boolean publish(MqttTopic topic, MqttMessage message) {
  106. MqttDeliveryToken token = null;
  107. try {
  108. //把消息发送给对应的主题
  109. token = topic.publish(message);
  110. token.waitForCompletion();
  111. //检查发送是否成功
  112. boolean flag = token.isComplete();
  113. StringBuffer sbf = new StringBuffer(200);
  114. sbf.append("给主题为'" + topic.getName());
  115. sbf.append("'发布消息:");
  116. if (flag) {
  117. sbf.append("成功!消息内容是:" + new String(message.getPayload()));
  118. } else {
  119. sbf.append("失败!");
  120. }
  121. log.info(sbf.toString());
  122. } catch (MqttException e) {
  123. log.info(e.toString());
  124. }
  125. return token.isComplete();
  126. }
  127. /**
  128. * MQTT发送指令:主要是组装消息体
  129. *
  130. * @param topic 主题
  131. * @param data 消息内容
  132. * @param qos 消息级别
  133. */
  134. public void sendMQTTMessage(String topic, String data, int qos) {
  135. try {
  136. this.publishClient = publishConnect();
  137. this.topic = this.publishClient.getTopic(topic);
  138. message = new MqttMessage();
  139. //消息等级
  140. //level 0:消息最多传递一次,不再关心它有没有发送到对方,也不设置任何重发机制
  141. //level 1:包含了简单的重发机制,发送消息之后等待接收者的回复,如果没收到回复则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复
  142. //level 2: 有了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次
  143. message.setQos(qos);
  144. //如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
  145. message.setRetained(false);
  146. message.setPayload(data.getBytes());
  147. //将组装好的消息发出去
  148. publish(this.topic, message);
  149. } catch (Exception e) {
  150. log.info(e.toString());
  151. e.printStackTrace();
  152. }
  153. }
  154. /**
  155. * 订阅端订阅消息
  156. *
  157. * @param topic 要订阅的主题
  158. * @param qos 订阅消息的级别
  159. */
  160. public void init(String topic, int qos) {
  161. //建立连接
  162. subsribeConnect();
  163. //以某个消息级别订阅某个主题
  164. try {
  165. subsribeClient.subscribe(topic, qos);
  166. } catch (MqttException e) {
  167. log.info(e.getMessage(), e);
  168. }
  169. this.topicStr = topic;
  170. this.qos = qos;
  171. }
  172. public void init() throws MqttException {
  173. //建立连接
  174. subsribeConnect();
  175. //以某个消息级别订阅某个主题
  176. subsribeClient.subscribe(topicStr, qos);
  177. }
  178. /**
  179. * 订阅端取消订阅消息
  180. *
  181. * @param topic 要订阅的主题
  182. */
  183. public void unionInit(String topic) {
  184. //建立连接
  185. subsribeConnect();
  186. //取消订阅某个主题
  187. try {
  188. //MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
  189. subsribeClient.unsubscribe(topic);
  190. } catch (MqttException e) {
  191. log.info(e.getMessage(), e);
  192. }
  193. }
  194. }