package mqtt import ( "bytes" "crypto/rand" "crypto/rsa" "crypto/x509" "encoding/base64" "encoding/json" "encoding/pem" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" "go_mqtt/models" "io/ioutil" "log" "os" "strings" "time" ) // RSA_Decrypts RSA解密支持分段解密 func RSA_Decrypts(cipherText []byte, path string) []byte { //打开文件 var bytesDecrypt []byte //file, err := os.Open(path) //if err != nil { // fmt.Println(err) //} // ////获取文件内容 //info, _ := file.Stat() //buf := make([]byte, info.Size()) //file.Read(buf) // 读取私钥文件 keyData, err := ioutil.ReadFile(path) if err != nil { return nil } //pem解码 block, _ := pem.Decode(keyData) //X509解码 privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes) if err != nil { fmt.Println(err.Error()) os.Exit(0) } p := privateKey.(*rsa.PrivateKey) keySize := p.Size() srcSize := len(cipherText) log.Println("密钥长度", keySize, "密文长度", srcSize) var offSet = 0 var buffer = bytes.Buffer{} for offSet < srcSize { endIndex := offSet + keySize if endIndex > srcSize { endIndex = srcSize } bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex]) if err != nil { return nil } buffer.Write(bytesOnce) offSet = endIndex } bytesDecrypt = buffer.Bytes() return bytesDecrypt } // RSA_DecryptsOne RSA解密支持分段解密 func RSA_DecryptsOne(cipherText []byte, privateKeyStr string) []byte { //打开文件 var bytesDecrypt []byte // 读取私钥文件转成字节数组 keyData := []byte(privateKeyStr) //pem解码 block, _ := pem.Decode(keyData) //X509解码 privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes) if err != nil { fmt.Println(err.Error()) os.Exit(0) } p := privateKey.(*rsa.PrivateKey) keySize := p.Size() srcSize := len(cipherText) log.Println("密钥长度", keySize, "密文长度", srcSize) var offSet = 0 var buffer = bytes.Buffer{} for offSet < srcSize { endIndex := offSet + keySize if endIndex > srcSize { endIndex = srcSize } bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex]) if err != nil { return nil } buffer.Write(bytesOnce) offSet = endIndex } bytesDecrypt = buffer.Bytes() return bytesDecrypt } // RsaEncryptBlock 公钥加密-分段 func RsaEncryptBlock(src []byte, path string) (bytesEncrypt []byte, err error) { //打开文件 //file, err := os.Open(path) //if err != nil { // return //} // ////读取文件的内容 //info, _ := file.Stat() //buf := make([]byte, info.Size()) //file.Read(buf) // 读取公钥文件 keyData, err := ioutil.ReadFile(path) if err != nil { return nil, err } //pem解码 block, _ := pem.Decode(keyData) //x509解码 publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes) if err != nil { fmt.Println(err) } //类型断言 publicKey := publicKeyInterface.(*rsa.PublicKey) keySize, srcSize := publicKey.Size(), len(src) log.Println("密钥长度", keySize, "明文长度", srcSize) offSet, once := 0, keySize-11 buffer := bytes.Buffer{} for offSet < srcSize { endIndex := offSet + once if endIndex > srcSize { endIndex = srcSize } // 加密一部分 bytesOnce, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, src[offSet:endIndex]) if err != nil { return nil, err } buffer.Write(bytesOnce) offSet = endIndex } bytesEncrypt = buffer.Bytes() return } func timing(client mqtt.Client) { //定时器,10秒钟执行一次 ticker := time.NewTicker(10 * time.Second) for { time := <-ticker.C fmt.Println("定时器====>", time.String()) if !client.IsConnectionOpen() { client.Connect() } else { fmt.Println("client has connect") } } } var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) message := fmt.Sprintf("%s", msg.Payload()) topic := fmt.Sprintf("%s", msg.Topic()) fmt.Println(message) fmt.Println(topic) //privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem" privateKeyPEM := `-----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCYwvInDUICXbmWcOAR5hm86mz7WlKL3dDf12MSeIY2jm5QpUSSoMhrJOWLbOt5fXQepPaNO0M30A+3C4SFZrM/9WA5ehazO1u1m1LAkYzoSDRH3MMJsrJC3lCGDeOROteu+safuP9k/npU1YQu/+Ll2xEJNxyvUx4jLGM4LamiMI6ytM3gdnOAGP4YRQo9Etwo6I986yg/seCQC5rza9M4iBamoin7U8h9yOMKrM6xK/k9CcY/vn5+Uhe3Pvk4qj/2Ff3OXkkc1wfdILqdLwLOKL0Tb3ZciwG0p1CKO80yf5hyYoWjqZk5Rcd07nTo2gqTFGfLl9sqI9/+ipMDtnHlAgMBAAECggEATJd5yCC6lusdMRO5FOBUyUaUi9X2i1AU+RZKAynQySvSnbavUgExW58tRCHBUrGW9gJp59ft1N8J8hHhSO18NDY4H7laBlVdnwmYjRqtFo2VQO6sD4G8JRDION5f2iIxn/b2fYDI9H8vILfJRbNgtTSILyGlzTYUZzhLKxCh+8IsN96Nic8wa5COd1vZZmdhf2y8TG8clFWmozaScNSAATx7y+8XLVWjjWiIRZ6xQvx0uQPUParc9KihXXTKR2pA22yPIdz+U4MGD4kC0eczlcFKZ/dYv9e7OIGgnJfT0idSCu7nYb1pxJ1LxD9fS6IScNTF5dSe0OIL98e+XdyoAQKBgQDRep+5cW4iAKrEMH+djmcXAkoMiYtNVtnu0efLE8dP6vjYytQi368X9SdcASbfrQ31eEZmr/xQnlUF8oyHGkI38YS8dpAHzQcrkP3BljbbzB/3gJZaUdghGsDrK0xAJIzzmFKQpeKnGtr23vxUgaGrNsCYvQ0eQ7+5056KXS4r5QKBgQC6r8xtRSaje6L4WIydjWvYywsmRO0Of0aJLMDA/Wt2MWhHfh7ba9oI1cKGN80ap7xB2a9lQLgpv+C53wNtE5SpvjxsikAj96nUMMhGy9ojXrUith6HQhiINETz6Shnznd+AyrXP6KI/RpfA5nkDB5nrJxODwtYLP467IL7Cv7OAQKBgQCl4KxKdH/5fP28jYsAgJsxpSZt9xzQCU5Zxu396ZOSvUaApVyGoQpNtluMh3z48lhzYOKevgzW6gn5w69z7F8zXZT2iAxVoQ1kelP2z7RxKJrHqpNkwhqbXEwX7RlcUZUr8BqxYCqymJl7k+fMIzqaEalBSbLxnEReKi0I8/Bz4QKBgHK4b0ZCtVDHPEmimJ6E9l4dv/c/afF7swu+zaCK2ouiJvOwBCRQbYb6XPR/u/GCXASXUdpF4CX/vIhcDE3uN2/r8FO+zVWM7vbvF1OyF5WesG7pPW9e5ZZlkG3WvLa1wOZV6fCmMSo/ZwI2Q05JSDHrd43cXttLotrw1jiQ9C4BAoGBAKi4SOoOVQ5J5HQCDkBwPbG1AOLHFinzfoDl26GF/8Hy7fmmd1JiRTFldQp/A9VTAABz3sVYmMB92HSIaJhuDMoYJNI2Cf/cZifsv7vUL8cbLn+lPsKsebiuB0m0g4P2qLwLfegfNGEgA7lA5HIz3SELqbdp3iuqJeQl1fsJqD74 -----END PRIVATE KEY-----` if topic == "app_push" { fmt.Println("珠海电厂APP收到mqtt消息") decodeByte, err := base64.StdEncoding.DecodeString(message) if err != nil { fmt.Println(err.Error()) } //decrptCode := RSA_Decrypts(decodeByte, privatePath) decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM) fmt.Println("解密后的字符串:") fmt.Println(string(decrptCode)) fmt.Println("-----user--------") var user models.User json.Unmarshal(decrptCode, &user) fmt.Println(user) if models.GetUser(&user) { models.UpdateUser(&user) } else { models.SaveUser(&user) } } else if topic == "app_push_dyw" { fmt.Println("大亚湾电厂APP收到mqtt消息") decodeByte, err := base64.StdEncoding.DecodeString(message) if err != nil { fmt.Println(err.Error()) } decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM) fmt.Println("解密后的字符串:") fmt.Println(string(decrptCode)) fmt.Println("-----userdyw--------") var user models.Userdyw json.Unmarshal(decrptCode, &user) fmt.Println(user) if models.GetUserdyw(&user) { models.UpdateUserdyw(&user) } else { models.SaveUserdyw(&user) } } else if topic == "app_push_yf" { fmt.Println("云浮电厂APP收到mqtt消息") decodeByte, err := base64.StdEncoding.DecodeString(message) if err != nil { fmt.Println(err.Error()) } decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM) fmt.Println("解密后的字符串:") fmt.Println(string(decrptCode)) fmt.Println("-----userdyw--------") var user models.Useryf json.Unmarshal(decrptCode, &user) fmt.Println(user) if models.GetUseryf(&user) { models.UpdateUseryf(&user) } else { models.SaveUseryf(&user) } } else { result := strings.Split(message, " ") fmt.Println(result[0]) fmt.Println(result[1]) now := time.Now() fmt.Println("---------now--------") fmt.Println(now) temperature := new(models.Temperature) temperature.CreateDate = now temperature.DataDate = now.Format("2006-01-02") temperature.DataHour = now.Format("2006-01-02 15") temperature.DataMinute = now.Format("2006-01-02 15:04") temperature.Humidity = result[0] temperature.Temperature = result[1] temperature.Topic = topic if topic == "WifiSHT/7C87CE9CA4E6/SHT20" { temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501" } if topic == "WifiSHT/7C87CE9F5CBF/SHT20" { temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房" } if topic == "WifiSHT/4CEBD686B6AA/SHT20" { temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号" } fmt.Println("测温传感器地点:", temperature.LocationDesc) models.SaveTemperature(temperature) } //uuid := uuid.New() //models.SaveProduct() //models.SaveUser(uuid.String()) //models.SaveOrder() //var users []models.User //var orders []models.Order //var user models.User //mydb.DB.Where("name = ?", "wenfei").First(&user) //fmt.Println(user) // 查询多条记录 //mydb.DB.Find(&users) // 查询所有产品信息 //mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录 //fmt.Println("users:", users) // 查询多条记录 //mydb.DB.Find(&users) // 查询所有产品信息 //mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders) //fmt.Println("orders:", orders) // 原生 SQL 查询 //var products []models.Product //mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息 //fmt.Println("Products:", products) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") subTemperature(client) subAppPush(client) subAppPushDyw(client) } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } func sub(client mqtt.Client) { topic := "gomqtt01" token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subTemperature(client mqtt.Client) { topic := "WifiSHT/+/SHT20" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subAppPush(client mqtt.Client) { topic := "app_push" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subAppPushDyw(client mqtt.Client) { topic := "app_push_dyw" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func publish(client mqtt.Client) { num := 999999000000000000 for i := 0; i < num; i++ { text := fmt.Sprintf("Message %d", i) token := client.Publish("gomqtt01", 0, false, text) token.Wait() fmt.Println(i) time.Sleep(6 * time.Second) } } func main() { uuid := uuid.New() fmt.Println("Generated UUID:", uuid) var broker = "47.242.184.139" var port = 1883 opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) opts.SetClientID(uuid.String()) opts.SetUsername("admin") opts.SetPassword("publish452131wW452131wW$") opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } //client.IsConnectionOpen() go timing(client) //sub(client) subTemperature(client) subAppPush(client) subAppPushDyw(client) //publish(client) for true { time.Sleep(5 * time.Second) } //client.Disconnect(1000) }