From d7cdd6c36b4efac969525cf4e3a63a3a7675fe43 Mon Sep 17 00:00:00 2001 From: wenfei Date: Wed, 12 Nov 2025 20:53:43 +0800 Subject: [PATCH] add huaweiyun mqtt --- admin-client-temperature.iml | 117 +++++++++++++- .../AdminClientTemperatureApplication.java | 20 ++- .../MqttDianDengTechClient.java | 2 +- .../MqttHuaWeiYunClient.java | 148 ++++++++++++++++++ .../utils/MqttSSLSocketFactory.java | 34 ++++ src/main/resources/ssn/client.p12 | Bin 0 -> 2659 bytes src/main/resources/ssn/rootCA.crt | 23 +++ 7 files changed, 333 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/rehome/mqttclienttemperature/MqttHuaWeiYunClient.java create mode 100644 src/main/resources/ssn/client.p12 create mode 100644 src/main/resources/ssn/rootCA.crt diff --git a/admin-client-temperature.iml b/admin-client-temperature.iml index c93e2f9..4239187 100644 --- a/admin-client-temperature.iml +++ b/admin-client-temperature.iml @@ -1,5 +1,5 @@ - + @@ -10,10 +10,7 @@ - - - - + @@ -21,4 +18,114 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java b/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java index b95910e..d6e73aa 100644 --- a/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java +++ b/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java @@ -66,14 +66,24 @@ public class AdminClientTemperatureApplication implements CommandLineRunner, App // }else { // log.info("temperatureService is empty"); // } - if (temperatureEspService != null) { + + if (temperatureService != null) { log.info("------------------------"); - log.info("temperatureEspService is not empty"); - MqttDianDengTechClient client = new MqttDianDengTechClient(); - client.start(temperatureEspService); + log.info("TemperatureController is not empty"); + MqttHuaWeiYunClient client = new MqttHuaWeiYunClient(); + client.start(temperatureService); }else { - log.info("temperatureEspService is empty"); + log.info("temperatureService is empty"); } + +// if (temperatureEspService != null) { +// log.info("------------------------"); +// log.info("temperatureEspService is not empty"); +// MqttDianDengTechClient client = new MqttDianDengTechClient(); +// client.start(temperatureEspService); +// }else { +// log.info("temperatureEspService is empty"); +// } } catch (Exception ex) { ex.printStackTrace(); } diff --git a/src/main/java/com/rehome/mqttclienttemperature/MqttDianDengTechClient.java b/src/main/java/com/rehome/mqttclienttemperature/MqttDianDengTechClient.java index 1513634..653003f 100644 --- a/src/main/java/com/rehome/mqttclienttemperature/MqttDianDengTechClient.java +++ b/src/main/java/com/rehome/mqttclienttemperature/MqttDianDengTechClient.java @@ -112,7 +112,7 @@ public class MqttDianDengTechClient { log.info("message RSA:"+strData); if(strData!=null&&strData.length()>=11){ String[] strDataTemperature = strData.split(" "); - temperatureEspService.saveTemperature(strDataTemperature[0],strDataTemperature[1],s); + //temperatureEspService.saveTemperature(strDataTemperature[0],strDataTemperature[1],s); } } diff --git a/src/main/java/com/rehome/mqttclienttemperature/MqttHuaWeiYunClient.java b/src/main/java/com/rehome/mqttclienttemperature/MqttHuaWeiYunClient.java new file mode 100644 index 0000000..3caf020 --- /dev/null +++ b/src/main/java/com/rehome/mqttclienttemperature/MqttHuaWeiYunClient.java @@ -0,0 +1,148 @@ +package com.rehome.mqttclienttemperature; + + + +import com.rehome.mqttclienttemperature.service.TemperatureService; +import com.rehome.mqttclienttemperature.utils.MqttSSLSocketFactory; +import com.rehome.mqttclienttemperature.utils.UUIDUtil; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import javax.net.ssl.SSLSocketFactory; +import java.io.InputStream; +import java.util.Timer; +import java.util.TimerTask; + + +@Slf4j +public class MqttHuaWeiYunClient { + /** + * 代理服务器ip地址 + */ + public static final String MQTT_BROKER_HOST = "ssl://mqtt.fileview123.com:8883"; + + /** + * 客户端唯一标识 + */ + public static String MQTT_CLIENT_ID = "AppServer_temperature_APP_server_02"; + + /** + *帐号 + */ + public static String USERNAME = "admin"; + /** + * 密码 + */ + public static String PASSWORD = "publish452131wW452131wW$"; + /** + * 订阅标识 + */ + public static String TOPIC_FILTER = "/device/esp8266/+"; + + private volatile static MqttClient mqttClient; + private static MqttConnectOptions options; + private static int qos = 2; + + //定时器 + private Timer timer; + + public MqttHuaWeiYunClient(){ + try { + MQTT_CLIENT_ID = UUIDUtil.getUUID(); + // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, + // MemoryPersistence设置clientid的保存形式,默认为以内存保存 + mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); + // 配置参数信息 + options = new MqttConnectOptions(); + // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, + // 这里设置为true表示每次连接到服务器都以新的身份连接 + options.setCleanSession(false); + // 设置用户名 + options.setUserName(USERNAME); + // 设置密码 + options.setPassword(PASSWORD.toCharArray()); + // 设置超时时间 单位为秒 + options.setConnectionTimeout(10); + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 + options.setKeepAliveInterval(20); + //断线重连 + options.setAutomaticReconnect(true); + //mqtt服务器端单双向加密 + InputStream certInput = this.getClass().getResourceAsStream("/ssn/rootCA.crt"); + InputStream clientCertInput = this.getClass().getResourceAsStream("/ssn/client.p12"); + String password = "12345678"; + SSLSocketFactory socketFactory = MqttSSLSocketFactory.getThreeSocketFactory(certInput,clientCertInput,password); + options.setSocketFactory(socketFactory); + } catch (Exception e) { + e.printStackTrace(); + } + } + public void start(TemperatureService temperatureService) { + try { + // 连接 + mqttClient.connect(options); + // 订阅 + mqttClient.subscribe(TOPIC_FILTER,qos); + // 设置回调 + mqttClient.setCallback(new MqttCallbackExtended(){ + + @Override + public void connectionLost(Throwable throwable) { + log.info("connectionLost"); + try { + mqttClient.reconnect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + String strData = new String(mqttMessage.getPayload()); + log.info("topic:"+s); + log.info("Qos:"+mqttMessage.getQos()); + log.info("message RSA:"+strData); + //temperatureService.saveTemperature(strData,s); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("deliveryComplete---------"+ iMqttDeliveryToken.isComplete()); + } + + @Override + public void connectComplete(boolean b, String s) { + //连接成功后调用 + try { + mqttClient.subscribe(TOPIC_FILTER,qos);//具体订阅代码 + } catch (MqttException e) { + e.printStackTrace(); + } + } + }); + timer = new Timer(); + timer.schedule(new TimerTask() { + public void run() { + log.info("-------设定要指定任务--------"); + try { + //判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的 + if (!mqttClient.isConnected()) { + log.info("***** 没有连接到服务器 *****"); + log.info("***** client to connect *****"); + // 重新连接 + mqttClient.connect(options); + } + if (mqttClient.isConnected()) {//连接成功,跳出连接 + log.info("***** connect success *****"); + } + } catch (MqttException e1) { + e1.printStackTrace(); + } + } + }, 10000,10000); + // 设定指定的时间time,此处为10000毫秒 + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/rehome/mqttclienttemperature/utils/MqttSSLSocketFactory.java b/src/main/java/com/rehome/mqttclienttemperature/utils/MqttSSLSocketFactory.java index 8809702..855ccc5 100644 --- a/src/main/java/com/rehome/mqttclienttemperature/utils/MqttSSLSocketFactory.java +++ b/src/main/java/com/rehome/mqttclienttemperature/utils/MqttSSLSocketFactory.java @@ -10,13 +10,16 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import java.io.BufferedInputStream; +import java.io.FileInputStream; import java.io.InputStream; import java.io.InputStreamReader; import java.security.KeyPair; import java.security.KeyStore; import java.security.Security; +import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.util.Collection; public class MqttSSLSocketFactory { public static SSLSocketFactory getSingleSocketFactory(InputStream caCrtFileInputStream) throws Exception { @@ -133,4 +136,35 @@ public class MqttSSLSocketFactory { return sslContext.getSocketFactory(); } + public static SSLSocketFactory getThreeSocketFactory(InputStream certInput, InputStream clientCertInput,String password) throws Exception { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + Collection certs = cf.generateCertificates(certInput); + + // 将服务器端 CA 证书存入 KeyStore + KeyStore tmKs = KeyStore.getInstance(KeyStore.getDefaultType()); + tmKs.load(null, null); + int index = 0; + for (Certificate cert : certs) { + tmKs.setCertificateEntry("server_ca_" + index++, cert); + } + + // 创建 TrustManager + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(tmKs); + + // 将客户端证书存入 KeyStore + //String password = "mypassword"; //与导出密码一致 + KeyStore kmKs = KeyStore.getInstance("PKCS12"); + kmKs.load(clientCertInput, password.toCharArray()); + + // 创建 KeyManager + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(kmKs, password.toCharArray()); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + // SSLContext 中设置好 KeyManager 和 TrustManager + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + //SSLSocketFactory socketFactory = sslContext.getSocketFactory(); + return sslContext.getSocketFactory(); + } } diff --git a/src/main/resources/ssn/client.p12 b/src/main/resources/ssn/client.p12 new file mode 100644 index 0000000000000000000000000000000000000000..cec741867b0d23fe0f2b102b8b4ba8ce7d601007 GIT binary patch literal 2659 zcmai$cQhLc8^(#)R8X}_&>}{ORk5WtQ!7EOw$vVt5flwYj2bm#R$J;)>)L9s+7z*x zBB9i%5-oM{)%@J^eW&-&`^S6U^E}Ua&->r+JWw2yJ%E}9ieuuUW0Z_Fh&^Pbrl&5z zG0_2WOaLg3@eGOsV*ZKf3UEND3+v*D>S9^{lTbsgehXSwC;}?+k7R*f1u{&CVAD$W zLi+#!1}Z2E9mD^=0T^kipaOJ^4zUK*E;K-DNg!yfYlE7t4;QoYo$r}XOw(Zmj%J6x z+Em~f?R8|Ej9TOYlQ68@!7kD)>}AzGFQT?rGZS1} zK6_%xMwj~7el5B}owR7Y>564L*<)2mlKin$MVs?j2~5~{o7-a!8Wx!H!l!;lCYT}g zs}kak8b}E`1Q;qRLqB8mli#cH*l;4VlW|!;z~v*)c8C^;+QIH_Pr1owX;*@CFVF-N zxrcg_p2N!Hm)^!~Kv?j?2(~m`6OFtA)}8Md`c$3-#zLoG;>k00%ib07kQ?&jxpeh^fv!_2v@>(Pn*t0I&fP5`> zF63e8Ney!6F&12iJXJ6@>qaHsoomb=M+>1#;`Ey%V41`%jQ!ML(WWG~Buq-IV;j?=u~Ok z@XT~nyLA(PRaDvdKg*E*D!V3^y8IpN3o@BaX}YKs?K#p<4E`&^J#JoJ#aroG`ptS9 zp@SA|Vn~PSy9Z#%8eFgTYz?EnZgyfDsnyG;ge9knI;6z4xHMO(OMT67nJg$VU*J~o zECEtvw|}Nv*WH&n!>#lGNWZ1BlU4%KRAjmZ{;hV3{mhzQFpI^HSNX9V=RXU089!vhECia`-euq<+pD)g}59qYy(JL5f z2)=zF^g_W+Iewz#JiPh!tpE`*NJfPmZ2Sym_Pv=8q;WT{dAjFaJnL-M!xh9OeD0oBtXj1OpCfTh?EM)V~{!;RR^Md;5 z2F#IA?+svA3q{hMRQM<9>z|g z2s7^mI~9SKqQd0ijS-D^M?!b`=PpxAU7e9sWi2!)h-k-2TakSCD`SCZ9Xaj*bTE7b$eK8(%%>QQ<3llZqB9yx@ zZ~Pm8Tmv;cYf2b^f_tZh(43>R~X*}<28NrFlZ z(O^oeRT$@upEmc3rnT)W$}0E0l${HT#~mERkA6w>;}wZ~e{lcEE~(Og>~n5XahKmj zslBGl%-8y8+<({8*7Ae*$U+*64!`*GT+HebjSi3Bs(KgUrlbrna}-T8uu0b52K!c^ z!MXeC0?>VRwii?L?j;)Y+63wzY2EzIcp5Nl)Md9U+!dX&0$a;-Mk*mYT+Fw;#6)eGL{64e3`|AhIBfZyu98KeOv{pCU2V{ zsUV3-)6v~4SvTdc(S;Bc+Vb~M?0}!{e3nu8M;;@8{d59BG%FVeW>_un@LFx|#oB&v zBR2b*@gK{KUD@u`0^?b**LP}1MRmj}t`q{@8ET+Gv_h{4lcby7CYoQJ8qTmzWJjDY z)DD!P7?ULwggAa$W?At-%0gEE+OW7|Dj)Nyv(&qsJ6&fo`m`>|6J?@`jLa7WJyR=| z0ne_rEDO(y8!7MFHpvb4b z27_l^XbBF#SiRL+-$79)wTK*q6K z>sE(M^EsvMlyyWfUIBB<`z(7qdzDb$pJ19;e3;`zPovd^LN(5;OFN ztpMK#&O$9ygKBUW?~Qd_Vm?G*~-jIp|_(;#IK-A8r;^= zoA87J>vWUEGnbHuih8Q$^bd!7zDK2Zl0YxwWU87v$ac$X=r3|Y;Wi~reQ{Xl!+Yd` z;e>h5`siWHp5sT|C<2^nF5!lXv4OQI6}t5423xI%A`M%fp1Q<9=BEb{FpcPsRL~n)HD=$Kf}rs3nRo!q&^7i8zn0wuNe;A)bru8 zE`o=5hK|K=0mz333MFem{N2A_CBA@^xO$MLcH5x zGzB8OI90C%qEjWs{b=--x&1+?>JV@UPA=69#vYg#jk1kU(tgS6=P9#zW!KO56WW^< zgx%cni7Ikw64$TK%ZWm-{RS)}m)sTLW#rT`Fb^W1y2)kZrp{{cnqQlT6AFjQL0Ny- zGXOO$6;O!q+1XcISsk+@5z=bzD3N29Q{^