package com.rehome.mqttclienttemperature; import com.rehome.mqttclienttemperature.service.TemperatureService; import com.rehome.mqttclienttemperature.utils.MqttSSLSocketFactory; import com.rehome.mqttclienttemperature.utils.UUIDUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import javax.net.ssl.SSLSocketFactory; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Timer; import java.util.TimerTask; public class MqttRSAClient { /** * 代理服务器ip地址 */ public static final String MQTT_BROKER_HOST = "ssl://47.242.184.139:8883"; /** * 客户端唯一标识 */ public static String MQTT_CLIENT_ID = "AppServer_admin-client-temperature_APP_server_01"; /** *帐号 */ public static String USERNAME = "admin"; /** * 密码 */ public static String PASSWORD = "publish452131wW452131wW$"; /** * 订阅标识 */ public static String TOPIC_FILTER = "WifiSHT/+/SHT20"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; private static int qos = 2; //定时器 private Timer timer; public MqttRSAClient(){ try { MQTT_CLIENT_ID = UUIDUtil.getUUID(); // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); // 配置参数信息 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(false); // 设置用户名 options.setUserName(USERNAME); // 设置密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //断线重连 options.setAutomaticReconnect(true); //mqtt服务器端单双向加密 InputStream caCrtFile = this.getClass().getResourceAsStream("/ssl/my_root_ca.crt"); InputStream crtFile = this.getClass().getResourceAsStream("/ssl/client.crt"); InputStream keyFile = this.getClass().getResourceAsStream("/ssl/client.key"); String password = ""; SSLSocketFactory socketFactory = MqttSSLSocketFactory.getTwoDirSocketFactory(caCrtFile,crtFile,keyFile,password); options.setSocketFactory(socketFactory); } catch (Exception e) { e.printStackTrace(); } } public void start(TemperatureService temperatureService) { try { // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe(TOPIC_FILTER,qos); // 设置回调 mqttClient.setCallback(new MqttCallbackExtended(){ @Override public void connectionLost(Throwable throwable) { System.out.println("connectionLost"); try { mqttClient.reconnect(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String s, MqttMessage mqttMessage) { String strData = new String(mqttMessage.getPayload()); System.out.println("topic:"+s); System.out.println("Qos:"+mqttMessage.getQos()); System.out.println("message RSA:"+strData); temperatureService.saveTemperature(strData,s); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete---------"+ iMqttDeliveryToken.isComplete()); } @Override public void connectComplete(boolean b, String s) { //连接成功后调用 try { mqttClient.subscribe(TOPIC_FILTER,qos);//具体订阅代码 } catch (MqttException e) { e.printStackTrace(); } } }); timer = new Timer(); timer.schedule(new TimerTask() { public void run() { System.out.println("-------设定要指定任务--------"); try { //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!mqttClient.isConnected()) { System.out.println("***** 没有连接到服务器 *****"); System.out.println("***** client to connect *****"); // 重新连接 mqttClient.connect(options); } if (mqttClient.isConnected()) {//连接成功,跳出连接 System.out.println("***** connect success *****"); } } catch (MqttException e1) { e1.printStackTrace(); } } }, 10000,10000); // 设定指定的时间time,此处为10000毫秒 } catch (Exception e) { e.printStackTrace(); } } }