You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go_mqtt/mqtt/main_mqtt.go

391 lines
11 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package mqtt
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"go_mqtt/models"
"io/ioutil"
"log"
"os"
"strings"
"time"
)
// RSA_Decrypts RSA解密支持分段解密
func RSA_Decrypts(cipherText []byte, path string) []byte {
//打开文件
var bytesDecrypt []byte
//file, err := os.Open(path)
//if err != nil {
// fmt.Println(err)
//}
//
////获取文件内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取私钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil
}
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
// RSA_DecryptsOne RSA解密支持分段解密
func RSA_DecryptsOne(cipherText []byte, privateKeyStr string) []byte {
//打开文件
var bytesDecrypt []byte
// 读取私钥文件转成字节数组
keyData := []byte(privateKeyStr)
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
// RsaEncryptBlock 公钥加密-分段
func RsaEncryptBlock(src []byte, path string) (bytesEncrypt []byte, err error) {
//打开文件
//file, err := os.Open(path)
//if err != nil {
// return
//}
//
////读取文件的内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取公钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
//pem解码
block, _ := pem.Decode(keyData)
//x509解码
publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
fmt.Println(err)
}
//类型断言
publicKey := publicKeyInterface.(*rsa.PublicKey)
keySize, srcSize := publicKey.Size(), len(src)
log.Println("密钥长度", keySize, "明文长度", srcSize)
offSet, once := 0, keySize-11
buffer := bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + once
if endIndex > srcSize {
endIndex = srcSize
}
// 加密一部分
bytesOnce, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, src[offSet:endIndex])
if err != nil {
return nil, err
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesEncrypt = buffer.Bytes()
return
}
func timing(client mqtt.Client) {
//定时器10秒钟执行一次
ticker := time.NewTicker(10 * time.Second)
for {
time := <-ticker.C
fmt.Println("定时器====>", time.String())
if !client.IsConnectionOpen() {
client.Connect()
} else {
fmt.Println("client has connect")
}
}
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
message := fmt.Sprintf("%s", msg.Payload())
topic := fmt.Sprintf("%s", msg.Topic())
fmt.Println(message)
fmt.Println(topic)
//privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem"
privateKeyPEM := `-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCYwvInDUICXbmWcOAR5hm86mz7WlKL3dDf12MSeIY2jm5QpUSSoMhrJOWLbOt5fXQepPaNO0M30A+3C4SFZrM/9WA5ehazO1u1m1LAkYzoSDRH3MMJsrJC3lCGDeOROteu+safuP9k/npU1YQu/+Ll2xEJNxyvUx4jLGM4LamiMI6ytM3gdnOAGP4YRQo9Etwo6I986yg/seCQC5rza9M4iBamoin7U8h9yOMKrM6xK/k9CcY/vn5+Uhe3Pvk4qj/2Ff3OXkkc1wfdILqdLwLOKL0Tb3ZciwG0p1CKO80yf5hyYoWjqZk5Rcd07nTo2gqTFGfLl9sqI9/+ipMDtnHlAgMBAAECggEATJd5yCC6lusdMRO5FOBUyUaUi9X2i1AU+RZKAynQySvSnbavUgExW58tRCHBUrGW9gJp59ft1N8J8hHhSO18NDY4H7laBlVdnwmYjRqtFo2VQO6sD4G8JRDION5f2iIxn/b2fYDI9H8vILfJRbNgtTSILyGlzTYUZzhLKxCh+8IsN96Nic8wa5COd1vZZmdhf2y8TG8clFWmozaScNSAATx7y+8XLVWjjWiIRZ6xQvx0uQPUParc9KihXXTKR2pA22yPIdz+U4MGD4kC0eczlcFKZ/dYv9e7OIGgnJfT0idSCu7nYb1pxJ1LxD9fS6IScNTF5dSe0OIL98e+XdyoAQKBgQDRep+5cW4iAKrEMH+djmcXAkoMiYtNVtnu0efLE8dP6vjYytQi368X9SdcASbfrQ31eEZmr/xQnlUF8oyHGkI38YS8dpAHzQcrkP3BljbbzB/3gJZaUdghGsDrK0xAJIzzmFKQpeKnGtr23vxUgaGrNsCYvQ0eQ7+5056KXS4r5QKBgQC6r8xtRSaje6L4WIydjWvYywsmRO0Of0aJLMDA/Wt2MWhHfh7ba9oI1cKGN80ap7xB2a9lQLgpv+C53wNtE5SpvjxsikAj96nUMMhGy9ojXrUith6HQhiINETz6Shnznd+AyrXP6KI/RpfA5nkDB5nrJxODwtYLP467IL7Cv7OAQKBgQCl4KxKdH/5fP28jYsAgJsxpSZt9xzQCU5Zxu396ZOSvUaApVyGoQpNtluMh3z48lhzYOKevgzW6gn5w69z7F8zXZT2iAxVoQ1kelP2z7RxKJrHqpNkwhqbXEwX7RlcUZUr8BqxYCqymJl7k+fMIzqaEalBSbLxnEReKi0I8/Bz4QKBgHK4b0ZCtVDHPEmimJ6E9l4dv/c/afF7swu+zaCK2ouiJvOwBCRQbYb6XPR/u/GCXASXUdpF4CX/vIhcDE3uN2/r8FO+zVWM7vbvF1OyF5WesG7pPW9e5ZZlkG3WvLa1wOZV6fCmMSo/ZwI2Q05JSDHrd43cXttLotrw1jiQ9C4BAoGBAKi4SOoOVQ5J5HQCDkBwPbG1AOLHFinzfoDl26GF/8Hy7fmmd1JiRTFldQp/A9VTAABz3sVYmMB92HSIaJhuDMoYJNI2Cf/cZifsv7vUL8cbLn+lPsKsebiuB0m0g4P2qLwLfegfNGEgA7lA5HIz3SELqbdp3iuqJeQl1fsJqD74
-----END PRIVATE KEY-----`
if topic == "app_push" {
fmt.Println("珠海电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----user--------")
var user models.User
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUser(&user) {
models.UpdateUser(&user)
} else {
models.SaveUser(&user)
}
} else if topic == "app_push_dyw" {
fmt.Println("大亚湾电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----userdyw--------")
var user models.Userdyw
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUserdyw(&user) {
models.UpdateUserdyw(&user)
} else {
models.SaveUserdyw(&user)
}
} else if topic == "app_push_yf" {
fmt.Println("云浮电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----userdyw--------")
var user models.Useryf
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUseryf(&user) {
models.UpdateUseryf(&user)
} else {
models.SaveUseryf(&user)
}
} else {
result := strings.Split(message, " ")
fmt.Println(result[0])
fmt.Println(result[1])
now := time.Now()
fmt.Println("---------now--------")
fmt.Println(now)
temperature := new(models.Temperature)
temperature.CreateDate = now
temperature.DataDate = now.Format("2006-01-02")
temperature.DataHour = now.Format("2006-01-02 15")
temperature.DataMinute = now.Format("2006-01-02 15:04")
temperature.Humidity = result[0]
temperature.Temperature = result[1]
temperature.Topic = topic
if topic == "WifiSHT/7C87CE9CA4E6/SHT20" {
temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
}
if topic == "WifiSHT/7C87CE9F5CBF/SHT20" {
temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
}
if topic == "WifiSHT/4CEBD686B6AA/SHT20" {
temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
}
fmt.Println("测温传感器地点:", temperature.LocationDesc)
models.SaveTemperature(temperature)
}
//uuid := uuid.New()
//models.SaveProduct()
//models.SaveUser(uuid.String())
//models.SaveOrder()
//var users []models.User
//var orders []models.Order
//var user models.User
//mydb.DB.Where("name = ?", "wenfei").First(&user)
//fmt.Println(user)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录
//fmt.Println("users:", users)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders)
//fmt.Println("orders:", orders)
// 原生 SQL 查询
//var products []models.Product
//mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息
//fmt.Println("Products:", products)
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
subTemperature(client)
subAppPush(client)
subAppPushDyw(client)
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func sub(client mqtt.Client) {
topic := "gomqtt01"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subTemperature(client mqtt.Client) {
topic := "WifiSHT/+/SHT20"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPush(client mqtt.Client) {
topic := "app_push"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPushDyw(client mqtt.Client) {
topic := "app_push_dyw"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func publish(client mqtt.Client) {
num := 999999000000000000
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("gomqtt01", 0, false, text)
token.Wait()
fmt.Println(i)
time.Sleep(6 * time.Second)
}
}
func main() {
uuid := uuid.New()
fmt.Println("Generated UUID:", uuid)
var broker = "47.242.184.139"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID(uuid.String())
opts.SetUsername("admin")
opts.SetPassword("publish452131wW452131wW$")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
//client.IsConnectionOpen()
go timing(client)
//sub(client)
subTemperature(client)
subAppPush(client)
subAppPushDyw(client)
//publish(client)
for true {
time.Sleep(5 * time.Second)
}
//client.Disconnect(1000)
}