教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

Paho是什么?Paho实现消息收发的操作流程

更新时间:2023年10月13日15时28分 来源:传智教育 浏览次数:

Paho Java客户端是用Java编写的MQTT客户端库,用于开发在JVM或其他Java兼容平台(例如Android)上运行的应用程序。

Paho不仅可以对接EMQ X Broker,还可以对接满足符合MQTT协议规范的消息代理服务端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1协议版本基本能满足百分之九十多的接入场景。

Paho Java客户端提供了两个API:

1:MqttAsyncClient提供了一个完全异步的API,其中活动的完成是通过注册的回调通知的。

2:MqttClient是MqttAsyncClient周围的同步包装器,在这里,功能似乎与应用程序同步。

Paho实现消息收发

(1)找到项目:emq-demo,添加坐标依赖

<dependency>
     <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.2</version>
</dependency>

(2)编写客户端封装类的代码:com.itheima.mqtt.client.EmqClient

/**
* Created by 传智播客*黑马程序员.
*/
@Component
public class EmqClient {

    private Logger log = LoggerFactory.getLogger(EmqClient.class);
    
  private IMqttClient mqttClient;
  
  @Autowired
  private MqttProperties mqttProperties;
  
  @Autowired
  private MqttCallback mqttCallback;
  
  @PostConstruct
  private void init(){
      //MqttClientPersistence是接口 实现类有:MqttDefaultFilePersistence;MemoryPersistence
      MqttClientPersistence memoryPersistence = new MemoryPersistence();
      try {
          mqttClient = new
MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence);
      } catch (MqttException e) {
           log.error("MqttClient初始化失败,brokerurl={},clientId=
{}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
      }
}

   /**
    * 连接broker
    * @param username
    * @param password
    */
public void connect(String username,String password){
    //创建MQTT连接选项对象--可配置mqtt连接相关选项
    MqttConnectOptions connectOptions = new MqttConnectOptions();
    
    //自动重连
    connectOptions.setAutomaticReconnect(true);
    /**
     * 设置为true后意味着:客户端断开连接后emq不保留会话保留会话,否则会产生订阅共享队列的存活
客户端收不到消息的情况
     * 因为断开的连接还被保留的话,emq会将队列中的消息负载到断开但还保留的客户端,导致存活的客户
端收不到消息
     * 解决该问题有两种方案:1.连接断开后不要保持;2.保证每个客户端有固定的clientId
     */
    connectOptions.setCleanSession(true);
    connectOptions.setUserName(username);
    connectOptions.setPassword(password.toCharArray());
    //设置mqtt消息回调
    mqttClient.setCallback(mqttCallback);
    //连接broker
    try {
         mqttClient.connect(connectOptions);
    } catch (MqttException e) {
        log.error("连接mqtt broker失败,失败原因:{}",e.getMessage());
    }
    
  }

   /**
    * 发布
    * @param topic
    * @param msg
    */
   public void publish(String topic, String msg, QosEnum qos, boolean retain){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        mqttMessage.setPayload(msg.getBytes());
        if(mqttClient.isConnected()){
                try {
                    mqttClient.publish(topic,mqttMessage);
                } catch (MqttException e) {
                    log.error("mqtt消息发布失败,topic={},msg={},qos={},retain={},errormsg=
{}",topic,msg,qos,retain,e.getMessage());
                }
        }
   }
   
   /**
    * 订阅
    * @param topicFilter
    * @return
    */
   public void subscribe(String topicFilter,QosEnum qos){
      try {
          mqttClient.subscribe(topicFilter,qos.value());
      } catch (MqttException e) {
      
          log.error("订阅失败,topicfilter={},qos={},errormsg=
{}",topicFilter,qos,e.getMessage());
        }
    }
    
    /**
     * 断开连接
     */
    @PreDestroy
    public void disConnect(){
        try {
             mqttClient.disconnect();
        } catch (MqttException e) {
             log.error("断开连接出现异常,errormsg={}",e.getMessage());
        }
    }
}

需要在application.yml中添加自定义的配置:

mqtt:
broker-url: tcp://192.168.200.129:1883
client-id: demo-client
username: user
password: 123456

同时需要创建属性配置类来加载该配置数据,创建:com.itheima.mqtt.properties.MqttProperties

/**
 * Created by 传智播客*黑马程序员.
 */
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;


    public String getBrokerUrl() {
        return brokerUrl;
    }

    public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "MqttProperties{" +
                "brokerUrl='" + brokerUrl + '\'' +
                ", clientId='" + clientId + '\'' +
                ", username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
     }
}

还需创建QoS服务之类枚举:com.itheima.mqtt.enums.QosEnum

/**
 * Created by 传智播客*黑马程序员.
 */
public enum QosEnum {

   QoS0(0),QoS1(1),QoS2(2);

   QosEnum(int qos) {

       this.value = qos;
   }

   private final int value;


   public int value(){
   return this.value;
 }
}

(3)在连接接收到消息之后,我们需要将消息传入消息回调:com.itheima.mqtt.client.MessageCallback

/**
 * Created by 传智播客*黑马程序员.
 */
@Component
public class MessageCallback implements MqttCallback {
    private Logger log = LoggerFactory.getLogger(MessageCallback.class);
    @Override
    public void connectionLost(Throwable cause) {
        //丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连
        log.info("丢失了对broker的连接");
    }
    /**
    * 订阅到消息后的回调
    * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
    * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的
    消息都将由
    * broker服务器再次发送到客户端
    * @param topic
    * @param message
    * @throws Exception
    */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("订阅到了消息;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload()));
    }
    /**
     * 消息发布完成且收到ack确认后的回调
     * QoS0:消息被网络发出后触发一次
     * QoS1:当收到broker的PUBACK消息后触发
     * QoS2:当收到broer的PUBCOMP消息后触发
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发送完成,messageId={},topics={}", messageId, topics);
    }
}

(4)编写消息发布和订阅的测试,在启动类中添加如下代码。

@Autowired
private EmqClient emqClient;

@Autowired
private MqttProperties mqttProperties;

@PostConstruct
public void init(){
     emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword());
     //订阅某一主题
     emqClient.subscribe("testtopic/#", QosEnum.QoS2);
     //开启一个新的线程向该主题发送消息
     new Thread(()->{
         while (true){
          emqClient.publish("testtopic/123","mqtt msg:"+
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false);
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }).start();
}

(5)测试:在Dashboard中开启使用username进行认证的组件,其他组件停止即可,然后启动项目,查看控制台输出即可。

0 分享到:
和我们在线交谈!