MqttManager.java 4.07 KB
package com.example.dahua.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

public class MqttManager {

    private static String accessKey;

    private static String sign;

    public static MqttClient mqttClient;

    public static String groupId;

    public static String topic;

    private static int qosLevel;

    public void init(){
        final String brokerUrl = "tcp://post-cn-4590mq2hr03.mqtt.aliyuncs.com:1883";
        groupId = "GID_HFJSIURFHAQO110";
        topic = "Topic_Quene_Test";
        qosLevel = 1;
        final Boolean cleanSession = false;
        String clientId = groupId + "@@@9ED96FB6D72C1698";
        accessKey = "UimvLVp0Wj90P88u";
        String secretKey = "TE4rZenITG27tiQqHx9qINjx71Nws7";
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        if (null == mqttClient) {
            try {
                mqttClient = new MqttClient(brokerUrl, clientId, memoryPersistence);
            } catch (MqttException e) {
                e.printStackTrace();
            }
            MqttConnectOptions connOpts = new MqttConnectOptions();
            //cal the sign as password,sign=BASE64(MAC.SHA1(groupId,secretKey))
            try {
                sign = Tools.macSignature(clientId.split("@@@")[0], secretKey);
            } catch (InvalidKeyException e) {
                e.printStackTrace();
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }
            connOpts.setUserName(accessKey);
            connOpts.setPassword(sign.toCharArray());
            connOpts.setCleanSession(cleanSession);
            connOpts.setKeepAliveInterval(90);
            connOpts.setAutomaticReconnect(true);
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    System.out.println("connect success");
                }

                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("connect lost:" + throwable.toString());
                    throwable.printStackTrace();
                    init();//初始化
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    //this notice make sense when qos >0
//                    System.out.println("send msg succeed");
                }
            });
            try {
                mqttClient.connect(connOpts);
            } catch (MqttException e) {
                System.out.println("mqtt:" + e.toString());
                e.printStackTrace();
            }
            /*while (true){
                sendMessageTest("528C8E6CD4A3C659","zy105387",0);
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    System.out.println("connect success:"+e);
                    e.printStackTrace();
                }
            }*/
        }
    }

    public void sendMq(String deviceId,String content) {
        String recvClientId = groupId + "@@@"+deviceId;
        final String p2pSendTopic = topic + "/p2p/" + recvClientId;
//        String content = "{\"cmd\":\""+cmd+"\",\"clientId\":\"\",\"data\": \"6:00-19:25\"}";
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qosLevel);
        System.out.println("发送内容:" + p2pSendTopic);
        if (null != mqttClient) {
            try {
                mqttClient.publish(p2pSendTopic, message);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

    }

}