package main import ( "bytes" "crypto/rand" "crypto/rsa" "crypto/x509" "database/sql" "encoding/base64" "encoding/json" "encoding/pem" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" _ "github.com/go-sql-driver/mysql" "github.com/google/uuid" "go_mqtt/models" _ "go_mqtt/models" "go_mqtt/mydb" _ "go_mqtt/mydb" "io/ioutil" "log" "os" "strings" "time" ) func getMySqlDB() (*sql.DB, error) { dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local" db, err := sql.Open("mysql", dsn) if err != nil { log.Fatal(err) return nil, err } db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期 db.SetMaxOpenConns(50) // 最大连接数 db.SetMaxIdleConns(10) // 最大空闲连接数 //defer mydb.Close() err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。 if err != nil { // 处理连接失败的情况。例如,打印错误信息并退出程序。 log.Println("47.242.184.139:3306连接失败") fmt.Println("Connection failed:", err) return nil, err } else { log.Println("47.242.184.139:3306连接成功") fmt.Println("Connected successfully") //复制代码到你的Go文件中,并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。 //然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。 //如果遇到任何问题,检查你的DSN是否正确,以及MySQL服务器是否正在运行并允许连接。 //确保你的防火墙设置允许你的应用程序访问MySQL端口(默认是3306)。 //如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。 return db, nil } } func moreDBConnect() { db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306) if err != nil { fmt.Println("Error connecting to db1:", err) return } defer db1.Db.Close() fmt.Println("Connected to databases:", db1.Name) //db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306) //if err != nil { // fmt.Println("Error connecting to db2:", err) // return //} //defer db2.Conn.Close() // //fmt.Println("Connected to databases:", db1.Name, "and", db2.Name) } func queryOneDataByMySql(db *sql.DB) { //DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据: var temperature models.Temperature err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?", 1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic) if err != nil { fmt.Println("QueryRow failed:", err) return } fmt.Printf("Temperature Id:%d\n", temperature.Id) fmt.Printf("Temperature: %+v\n", temperature) } func queryMoreDataByMySql(db *sql.DB) { //DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据: rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0, 30) if err != nil { fmt.Println("Query failed:", err) return } defer rows.Close() // 确保在使用完毕后关闭资源 items := make([]models.Temperature, 0) for rows.Next() { var temperature models.Temperature if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil { fmt.Println("Scan failed:", err) return } items = append(items, temperature) fmt.Println(items) } } // 插入单条数据 func insertDataByMySql(db *sql.DB, temperature *models.Temperature) { //插入操作: stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)") if err != nil { fmt.Println("Prepare failed:", err) return } res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic) if err != nil { fmt.Println("Exec failed:", err) return } lastID, _ := res.LastInsertId() fmt.Printf("Inserted Temperature with ID: %d\n", lastID) } // 事务处理:保证数据一致性 func insertMoreDataByMySql(db *sql.DB, temperature *models.Temperature) { tx, err := db.Begin() if err != nil { fmt.Println("Begin transaction failed:", err) return } _, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1) if err != nil { tx.Rollback() // 如果发生错误,回滚事务 fmt.Println("Exec failed:", err) return } err = tx.Commit() // 提交事务 if err != nil { fmt.Println("Commit failed:", err) return } fmt.Println("Transaction committed successfully") } func dbTest() { // 初始化数据库连接 DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test") if err != nil { fmt.Println("Error opening DB:", err) return } defer DB.Close() DB.SetConnMaxLifetime(time.Minute * 5) DB.SetMaxOpenConns(50) DB.SetMaxIdleConns(10) // 测试数据库连接 if err := DB.Ping(); err != nil { fmt.Println("Connection failed:", err) return } fmt.Println("Connected successfully") // 查询数据 rows, _ := DB.Query("SELECT id, name FROM users") defer rows.Close() for rows.Next() { var id int var name string rows.Scan(&id, &name) fmt.Printf("ID: %d, Name: %s\n", id, name) } // 插入数据 stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)") stmt.Exec("Jane Doe", 28) // 删除数据 stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?") stmt.Exec(1) } // GenRsaKey generates an PKCS#1 RSA keypair of the given bit size in PEM format. func GenRsaKey(bits int) (prvkey, pubkey []byte, err error) { // Generates private key. privateKey, err := rsa.GenerateKey(rand.Reader, bits) if err != nil { return } derStream := x509.MarshalPKCS1PrivateKey(privateKey) block := &pem.Block{ Type: "RSA PRIVATE KEY", Bytes: derStream, } prvkey = pem.EncodeToMemory(block) // Generates public key from private key. publicKey := &privateKey.PublicKey derPkix, err := x509.MarshalPKIXPublicKey(publicKey) if err != nil { return } block = &pem.Block{ Type: "RSA PUBLIC KEY", Bytes: derPkix, } pubkey = pem.EncodeToMemory(block) return } func parsePrivateKey(privateKeyFile string) (*rsa.PrivateKey, error) { // 读取私钥文件 keyData, err := ioutil.ReadFile(privateKeyFile) if err != nil { return nil, err } // 解析PEM块 block, _ := pem.Decode(keyData) if block == nil { return nil, fmt.Errorf("private key error not block in file") } // 解析RSA私钥 privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes) if err != nil { return nil, err } return privateKey, nil } //生成RSA私钥和公钥,保存到文件中 func GenerateRSAKey(bits int) { //GenerateKey函数使用随机数据生成器random生成一对具有指定字位数的RSA密钥 //Reader是一个全局、共享的密码用强随机数生成器 privateKey, err := rsa.GenerateKey(rand.Reader, bits) if err != nil { return } //保存私钥 //通过x509标准将得到的ras私钥序列化为ASN.1 的 DER编码字符串 // X509PrivateKey := x509.MarshalPKCS1PrivateKey(privateKey) // PKCS1 和 9 是不一致的 X509PrivateKey, err := x509.MarshalPKCS8PrivateKey(privateKey) if err != nil { fmt.Println(err.Error()) os.Exit(0) } //使用pem格式对x509输出的内容进行编码 //创建文件保存私钥 privateFile, err := os.Create("private.pem") if err != nil { return } //构建一个pem.Block结构体对象 privateBlock := pem.Block{Type: "PRIVATE KEY", Bytes: X509PrivateKey} //将数据保存到文件 pem.Encode(privateFile, &privateBlock) //保存公钥 //获取公钥的数据 publicKey := privateKey.PublicKey //X509对公钥编码 X509PublicKey, err := x509.MarshalPKIXPublicKey(&publicKey) if err != nil { return } //pem格式编码 //创建用于保存公钥的文件 publicFile, err := os.Create("public.pem") if err != nil { return } //创建一个pem.Block结构体对象 publicBlock := pem.Block{Type: "Public Key", Bytes: X509PublicKey} //保存到文件 pem.Encode(publicFile, &publicBlock) } // RSA_Decrypts RSA解密支持分段解密 func RSA_Decrypts(cipherText []byte, path string) []byte { //打开文件 var bytesDecrypt []byte //file, err := os.Open(path) //if err != nil { // fmt.Println(err) //} // ////获取文件内容 //info, _ := file.Stat() //buf := make([]byte, info.Size()) //file.Read(buf) // 读取私钥文件 keyData, err := ioutil.ReadFile(path) if err != nil { return nil } //pem解码 block, _ := pem.Decode(keyData) //X509解码 privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes) if err != nil { fmt.Println(err.Error()) os.Exit(0) } p := privateKey.(*rsa.PrivateKey) keySize := p.Size() srcSize := len(cipherText) log.Println("密钥长度", keySize, "密文长度", srcSize) var offSet = 0 var buffer = bytes.Buffer{} for offSet < srcSize { endIndex := offSet + keySize if endIndex > srcSize { endIndex = srcSize } bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex]) if err != nil { return nil } buffer.Write(bytesOnce) offSet = endIndex } bytesDecrypt = buffer.Bytes() return bytesDecrypt } // RsaEncryptBlock 公钥加密-分段 func RsaEncryptBlock(src []byte, path string) (bytesEncrypt []byte, err error) { //打开文件 //file, err := os.Open(path) //if err != nil { // return //} // ////读取文件的内容 //info, _ := file.Stat() //buf := make([]byte, info.Size()) //file.Read(buf) // 读取公钥文件 keyData, err := ioutil.ReadFile(path) if err != nil { return nil,err } //pem解码 block, _ := pem.Decode(keyData) //x509解码 publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes) if err != nil { fmt.Println(err) } //类型断言 publicKey := publicKeyInterface.(*rsa.PublicKey) keySize, srcSize := publicKey.Size(), len(src) log.Println("密钥长度", keySize, "明文长度", srcSize) offSet, once := 0, keySize-11 buffer := bytes.Buffer{} for offSet < srcSize { endIndex := offSet + once if endIndex > srcSize { endIndex = srcSize } // 加密一部分 bytesOnce, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, src[offSet:endIndex]) if err != nil { return nil, err } buffer.Write(bytesOnce) offSet = endIndex } bytesEncrypt = buffer.Bytes() return } func timing(client mqtt.Client) { //定时器,10秒钟执行一次 ticker := time.NewTicker(5 * time.Second) for { time := <-ticker.C fmt.Println("定时器====>", time.String()) if !client.IsConnectionOpen() { client.Connect() } else { fmt.Println("client has connect") //moreDBConnect() //db, err := getMySqlDB() //if err == nil { // queryOneDataByMySql(db) // //mydb.Close() //} //GenerateRSAKey(2048) //publicPath := "public.pem" //privatePath := "private.pem" // //var a = []byte("jack") //encrptTxt, err := RsaEncryptBlock(a, publicPath) //if err != nil { // fmt.Println(err.Error()) //} //fmt.Println("加密后的字符串:") //encodeString := base64.StdEncoding.EncodeToString(encrptTxt) //fmt.Println(encodeString) //fmt.Println("-------------------") //decodeByte, err := base64.StdEncoding.DecodeString(encodeString) //if err != nil { // fmt.Println(err.Error()) //} //decrptCode := RSA_Decrypts(decodeByte, privatePath) //fmt.Println("解密后的字符串:") //fmt.Println(string(decrptCode)) } } } var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) message := fmt.Sprintf("%s", msg.Payload()) topic := fmt.Sprintf("%s", msg.Topic()) fmt.Println(message) fmt.Println(topic) privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem" if topic == "app_push" { fmt.Println("珠海电厂APP收到mqtt消息") decodeByte, err := base64.StdEncoding.DecodeString(message) if err != nil { fmt.Println(err.Error()) } decrptCode := RSA_Decrypts(decodeByte, privatePath) fmt.Println("解密后的字符串:") fmt.Println(string(decrptCode)) fmt.Println("-----user--------") var user models.User json.Unmarshal(decrptCode, &user) fmt.Println(user) //models.SaveUser(&user) if models.GetUser(&user) { models.UpdateUser(&user) }else{ models.SaveUser(&user) } }else if topic == "app_push_dyw" { fmt.Println("大亚湾电厂APP收到mqtt消息") decodeByte, err := base64.StdEncoding.DecodeString(message) if err != nil { fmt.Println(err.Error()) } decrptCode := RSA_Decrypts(decodeByte, privatePath) fmt.Println("解密后的字符串:") fmt.Println(string(decrptCode)) fmt.Println("-----user--------") //var user models.User //json.Unmarshal(decrptCode, &user) //fmt.Println(user) // //if models.GetUser(&user) { // models.UpdateUser(&user) //}else{ // models.SaveUser(&user) //} }else{ result := strings.Split(message, " ") fmt.Println(result[0]) fmt.Println(result[1]) now := time.Now() temperature := new(models.Temperature) temperature.CreateDate = now temperature.DataDate = now.Format("2006-01-02") temperature.DataHour = now.Format("2006-01-02 15") temperature.DataMinute = now.Format("2006-01-02 15:04") temperature.Humidity = result[0] temperature.Temperature = result[1] temperature.Topic = topic if topic == "WifiSHT/7C87CE9CA4E6/SHT20" { temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501" } if topic == "WifiSHT/7C87CE9F5CBF/SHT20" { temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房" } if topic == "WifiSHT/4CEBD686B6AA/SHT20" { temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号" } fmt.Println("测温传感器地点:", temperature.LocationDesc) models.SaveTemperature(temperature) } //uuid := uuid.New() //models.SaveProduct() //models.SaveUser(uuid.String()) //models.SaveOrder() //var users []models.User //var orders []models.Order //var user models.User //mydb.DB.Where("name = ?", "wenfei").First(&user) //fmt.Println(user) // 查询多条记录 //mydb.DB.Find(&users) // 查询所有产品信息 //mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录 //fmt.Println("users:", users) // 查询多条记录 //mydb.DB.Find(&users) // 查询所有产品信息 //mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders) //fmt.Println("orders:", orders) // 原生 SQL 查询 //var products []models.Product //mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息 //fmt.Println("Products:", products) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") subTemperature(client) subAppPush(client) subAppPushDyw(client) } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } func sub(client mqtt.Client) { topic := "gomqtt01" token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subTemperature(client mqtt.Client) { topic := "WifiSHT/+/SHT20" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subAppPush(client mqtt.Client) { topic := "app_push" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func subAppPushDyw(client mqtt.Client) { topic := "app_push_dyw" token := client.Subscribe(topic, 2, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) } func publish(client mqtt.Client) { num := 999999000000000000 for i := 0; i < num; i++ { text := fmt.Sprintf("Message %d", i) token := client.Publish("gomqtt01", 0, false, text) token.Wait() fmt.Println(i) time.Sleep(6 * time.Second) } } func main() { uuid := uuid.New() fmt.Println("Generated UUID:", uuid) var broker = "47.242.184.139" var port = 1883 opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) opts.SetClientID(uuid.String()) opts.SetUsername("admin") opts.SetPassword("publish452131wW452131wW$") opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } //client.IsConnectionOpen() go timing(client) //sub(client) subTemperature(client) subAppPush(client) subAppPushDyw(client) publish(client) //client.Disconnect(1000) }