更新时间:2023年10月13日15时28分 来源:传智教育 浏览次数:
Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。
Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。
Paho Java客户端提供了两个API:
1:MqttAsyncClient提供了一个完全异步的API,其中活动的完成是通过注册的回调通知的。
2:MqttClient是MqttAsyncClient周围的同步包装器,在这里,功能似乎与应用程序同步。
(1)找到项目:emq-demo,添加坐标依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
(2)编写客户端封装类的代码:com.itheima.mqtt.client.EmqClient
/** * Created by 传智播客*黑马程序员. */ @Component public class EmqClient { private Logger log = LoggerFactory.getLogger(EmqClient.class); private IMqttClient mqttClient; @Autowired private MqttProperties mqttProperties; @Autowired private MqttCallback mqttCallback; @PostConstruct private void init(){ //MqttClientPersistence是接口 实现类有:MqttDefaultFilePersistence;MemoryPersistence MqttClientPersistence memoryPersistence = new MemoryPersistence(); try { mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence); } catch (MqttException e) { log.error("MqttClient初始化失败,brokerurl={},clientId= {}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId()); } } /** * 连接broker * @param username * @param password */ public void connect(String username,String password){ //创建MQTT连接选项对象--可配置mqtt连接相关选项 MqttConnectOptions connectOptions = new MqttConnectOptions(); //自动重连 connectOptions.setAutomaticReconnect(true); /** * 设置为true后意味着:客户端断开连接后emq不保留会话保留会话,否则会产生订阅共享队列的存活 客户端收不到消息的情况 * 因为断开的连接还被保留的话,emq会将队列中的消息负载到断开但还保留的客户端,导致存活的客户 端收不到消息 * 解决该问题有两种方案:1.连接断开后不要保持;2.保证每个客户端有固定的clientId */ connectOptions.setCleanSession(true); connectOptions.setUserName(username); connectOptions.setPassword(password.toCharArray()); //设置mqtt消息回调 mqttClient.setCallback(mqttCallback); //连接broker try { mqttClient.connect(connectOptions); } catch (MqttException e) { log.error("连接mqtt broker失败,失败原因:{}",e.getMessage()); } } /** * 发布 * @param topic * @param msg */ public void publish(String topic, String msg, QosEnum qos, boolean retain){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos.value()); mqttMessage.setRetained(retain); mqttMessage.setPayload(msg.getBytes()); if(mqttClient.isConnected()){ try { mqttClient.publish(topic,mqttMessage); } catch (MqttException e) { log.error("mqtt消息发布失败,topic={},msg={},qos={},retain={},errormsg= {}",topic,msg,qos,retain,e.getMessage()); } } } /** * 订阅 * @param topicFilter * @return */ public void subscribe(String topicFilter,QosEnum qos){ try { mqttClient.subscribe(topicFilter,qos.value()); } catch (MqttException e) { log.error("订阅失败,topicfilter={},qos={},errormsg= {}",topicFilter,qos,e.getMessage()); } } /** * 断开连接 */ @PreDestroy public void disConnect(){ try { mqttClient.disconnect(); } catch (MqttException e) { log.error("断开连接出现异常,errormsg={}",e.getMessage()); } } }
需要在application.yml中添加自定义的配置:
mqtt: broker-url: tcp://192.168.200.129:1883 client-id: demo-client username: user password: 123456
同时需要创建属性配置类来加载该配置数据,创建:com.itheima.mqtt.properties.MqttProperties
/** * Created by 传智播客*黑马程序员. */ @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String brokerUrl; private String clientId; private String username; private String password; public String getBrokerUrl() { return brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { return "MqttProperties{" + "brokerUrl='" + brokerUrl + '\'' + ", clientId='" + clientId + '\'' + ", username='" + username + '\'' + ", password='" + password + '\'' + '}'; } }
还需创建QoS服务之类枚举:com.itheima.mqtt.enums.QosEnum
/** * Created by 传智播客*黑马程序员. */ public enum QosEnum { QoS0(0),QoS1(1),QoS2(2); QosEnum(int qos) { this.value = qos; } private final int value; public int value(){ return this.value; } }
(3)在连接接收到消息之后,我们需要将消息传入消息回调:com.itheima.mqtt.client.MessageCallback
/** * Created by 传智播客*黑马程序员. */ @Component public class MessageCallback implements MqttCallback { private Logger log = LoggerFactory.getLogger(MessageCallback.class); @Override public void connectionLost(Throwable cause) { //丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连 log.info("丢失了对broker的连接"); } /** * 订阅到消息后的回调 * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的 消息都将由 * broker服务器再次发送到客户端 * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("订阅到了消息;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload())); } /** * 消息发布完成且收到ack确认后的回调 * QoS0:消息被网络发出后触发一次 * QoS1:当收到broker的PUBACK消息后触发 * QoS2:当收到broer的PUBCOMP消息后触发 * @param token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { int messageId = token.getMessageId(); String[] topics = token.getTopics(); log.info("消息发送完成,messageId={},topics={}", messageId, topics); } }
(4)编写消息发布和订阅的测试,在启动类中添加如下代码。
@Autowired private EmqClient emqClient; @Autowired private MqttProperties mqttProperties; @PostConstruct public void init(){ emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword()); //订阅某一主题 emqClient.subscribe("testtopic/#", QosEnum.QoS2); //开启一个新的线程向该主题发送消息 new Thread(()->{ while (true){ emqClient.publish("testtopic/123","mqtt msg:"+ LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }
(5)测试:在Dashboard中开启使用username进行认证的组件,其他组件停止即可,然后启动项目,查看控制台输出即可。