package com.rehome.adminclientappmqttserver; import com.google.gson.Gson; import com.rehome.adminclientappmqttserver.controller.UseryfController; import com.rehome.adminclientappmqttserver.entity.Useryf; import com.rehome.adminclientappmqttserver.utils.UUIDUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.Timer; import java.util.TimerTask; public class MqttRSAClientYFAPP { /** * 代理服务器ip地址 */ public static final String MQTT_BROKER_HOST = "tcp://47.242.184.139:1883"; /** * 客户端唯一标识 */ public static String MQTT_CLIENT_ID = "AppServer_YF_APP_server_03"; /** * */ public static final String USERNAME = "admin"; /** * 密码 */ public static final String PASSWORD = "publish452131wW452131wW$"; /** * 订阅标识 */ public static final String TOPIC_FILTER = "app_push_yf"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; private static int qos = 2; //定时器 private Timer timer; public MqttRSAClientYFAPP(){ try { MQTT_CLIENT_ID = UUIDUtil.getUUID(); // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); // 配置参数信息 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(USERNAME); // 设置密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //断线重连 options.setAutomaticReconnect(true); } catch (Exception e) { e.printStackTrace(); } } public void start(UseryfController userController) { try { // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe(TOPIC_FILTER,qos); // 设置回调 mqttClient.setCallback(new MqttCallbackExtended(){ @Override public void connectionLost(Throwable throwable) { System.out.println("connectionLost"); try { mqttClient.reconnect(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("topic:"+s); System.out.println("Qos:"+mqttMessage.getQos()); System.out.println("message RSA:"+new String(mqttMessage.getPayload())); System.out.println("云浮APP收到mqtt消息"); try { String messageDe = RSAAndroid.decryptByPrivateKeyForSpiltStr(new String(mqttMessage.getPayload()), RSAAndroid.privateRsaKey); System.out.println("message content:"+messageDe); Gson gson = new Gson(); Useryf userInfo = gson.fromJson(messageDe, Useryf.class); System.out.println(userInfo.getUsername()); System.out.println(userInfo.getPassword()); System.out.println(userInfo.getDate()); System.out.println(userInfo.getNfc()); System.out.println(userInfo.getName()); userController.saveUserYF(userInfo); }catch (Exception e){ e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete---------"+ iMqttDeliveryToken.isComplete()); } @Override public void connectComplete(boolean b, String s) { //连接成功后调用 try { mqttClient.subscribe(TOPIC_FILTER,qos);//具体订阅代码 } catch (MqttException e) { e.printStackTrace(); } } }); timer = new Timer(); timer.schedule(new TimerTask() { public void run() { System.out.println("-------设定要指定任务--------"); try { //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 if (!mqttClient.isConnected()) { System.out.println("***** 没有连接到服务器 *****"); System.out.println("***** client to connect *****"); // 重新连接 mqttClient.connect(options); } if (mqttClient.isConnected()) {//连接成功,跳出连接 System.out.println("***** connect success *****"); } } catch (MqttException e1) { e1.printStackTrace(); } } }, 10000,10000); } catch (Exception e) { e.printStackTrace(); } } }