package main
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"database/sql"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"go_mqtt/models"
_ "go_mqtt/models"
"go_mqtt/mydb"
_ "go_mqtt/mydb"
"io/ioutil"
"log"
"os"
"strings"
"time"
)
func getMySqlDB ( ) ( * sql . DB , error ) {
dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local"
db , err := sql . Open ( "mysql" , dsn )
if err != nil {
log . Fatal ( err )
return nil , err
}
db . SetConnMaxLifetime ( time . Minute * 5 ) // 连接最大生命周期
db . SetMaxOpenConns ( 50 ) // 最大连接数
db . SetMaxIdleConns ( 10 ) // 最大空闲连接数
//defer mydb.Close()
err = db . Ping ( ) // 检查连接是否成功建立。如果失败,则返回错误。
if err != nil {
// 处理连接失败的情况。例如,打印错误信息并退出程序。
log . Println ( "47.242.184.139:3306连接失败" )
fmt . Println ( "Connection failed:" , err )
return nil , err
} else {
log . Println ( "47.242.184.139:3306连接成功" )
fmt . Println ( "Connected successfully" )
//复制代码到你的Go文件中, 并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。
//然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。
//如果遇到任何问题, 检查你的DSN是否正确, 以及MySQL服务器是否正在运行并允许连接。
//确保你的防火墙设置允许你的应用程序访问MySQL端口( 默认是3306) 。
//如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。
return db , nil
}
}
func moreDBConnect ( ) {
db1 , err := mydb . NewDatabase ( "appserver" , "root" , "Skyinno251," , "47.242.184.139" , 3306 )
if err != nil {
fmt . Println ( "Error connecting to db1:" , err )
return
}
defer db1 . Db . Close ( )
fmt . Println ( "Connected to databases:" , db1 . Name )
//db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306)
//if err != nil {
// fmt.Println("Error connecting to db2:", err)
// return
//}
//defer db2.Conn.Close()
//
//fmt.Println("Connected to databases:", db1.Name, "and", db2.Name)
}
func queryOneDataByMySql ( db * sql . DB ) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
var temperature models . Temperature
err := db . QueryRow ( "SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?" , 1 ) . Scan ( & temperature . Id , & temperature . CreateDate , & temperature . DataDate , & temperature . DataHour , & temperature . DataMinute , & temperature . Humidity , & temperature . LocationDesc , & temperature . Temperature , & temperature . Topic )
if err != nil {
fmt . Println ( "QueryRow failed:" , err )
return
}
fmt . Printf ( "Temperature Id:%d\n" , temperature . Id )
fmt . Printf ( "Temperature: %+v\n" , temperature )
}
func queryMoreDataByMySql ( db * sql . DB ) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
rows , err := db . Query ( "SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?" , 0 , 30 )
if err != nil {
fmt . Println ( "Query failed:" , err )
return
}
defer rows . Close ( ) // 确保在使用完毕后关闭资源
items := make ( [ ] models . Temperature , 0 )
for rows . Next ( ) {
var temperature models . Temperature
if err := rows . Scan ( & temperature . Id , & temperature . CreateDate , & temperature . DataDate , & temperature . DataHour , & temperature . DataMinute , & temperature . Humidity , & temperature . LocationDesc , & temperature . Temperature , & temperature . Topic ) ; err != nil {
fmt . Println ( "Scan failed:" , err )
return
}
items = append ( items , temperature )
fmt . Println ( items )
}
}
// 插入单条数据
func insertDataByMySql ( db * sql . DB , temperature * models . Temperature ) {
//插入操作:
stmt , err := db . Prepare ( "INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)" )
if err != nil {
fmt . Println ( "Prepare failed:" , err )
return
}
res , err := stmt . Exec ( & temperature . CreateDate , & temperature . DataDate , & temperature . DataHour , & temperature . DataMinute , & temperature . Humidity , & temperature . LocationDesc , & temperature . Temperature , & temperature . Topic )
if err != nil {
fmt . Println ( "Exec failed:" , err )
return
}
lastID , _ := res . LastInsertId ( )
fmt . Printf ( "Inserted Temperature with ID: %d\n" , lastID )
}
// 事务处理:保证数据一致性
func insertMoreDataByMySql ( db * sql . DB , temperature * models . Temperature ) {
tx , err := db . Begin ( )
if err != nil {
fmt . Println ( "Begin transaction failed:" , err )
return
}
_ , err = tx . Exec ( "UPDATE users SET age = ? WHERE id = ?" , 35 , 1 )
if err != nil {
tx . Rollback ( ) // 如果发生错误,回滚事务
fmt . Println ( "Exec failed:" , err )
return
}
err = tx . Commit ( ) // 提交事务
if err != nil {
fmt . Println ( "Commit failed:" , err )
return
}
fmt . Println ( "Transaction committed successfully" )
}
func dbTest ( ) {
// 初始化数据库连接
DB , err := sql . Open ( "mysql" , "root:123456@tcp(127.0.0.1:3306)/test" )
if err != nil {
fmt . Println ( "Error opening DB:" , err )
return
}
defer DB . Close ( )
DB . SetConnMaxLifetime ( time . Minute * 5 )
DB . SetMaxOpenConns ( 50 )
DB . SetMaxIdleConns ( 10 )
// 测试数据库连接
if err := DB . Ping ( ) ; err != nil {
fmt . Println ( "Connection failed:" , err )
return
}
fmt . Println ( "Connected successfully" )
// 查询数据
rows , _ := DB . Query ( "SELECT id, name FROM users" )
defer rows . Close ( )
for rows . Next ( ) {
var id int
var name string
rows . Scan ( & id , & name )
fmt . Printf ( "ID: %d, Name: %s\n" , id , name )
}
// 插入数据
stmt , _ := DB . Prepare ( "INSERT INTO users(name, age) VALUES(?, ?)" )
stmt . Exec ( "Jane Doe" , 28 )
// 删除数据
stmt , _ = DB . Prepare ( "DELETE FROM users WHERE id = ?" )
stmt . Exec ( 1 )
}
// GenRsaKey generates an PKCS#1 RSA keypair of the given bit size in PEM format.
func GenRsaKey ( bits int ) ( prvkey , pubkey [ ] byte , err error ) {
// Generates private key.
privateKey , err := rsa . GenerateKey ( rand . Reader , bits )
if err != nil {
return
}
derStream := x509 . MarshalPKCS1PrivateKey ( privateKey )
block := & pem . Block {
Type : "RSA PRIVATE KEY" ,
Bytes : derStream ,
}
prvkey = pem . EncodeToMemory ( block )
// Generates public key from private key.
publicKey := & privateKey . PublicKey
derPkix , err := x509 . MarshalPKIXPublicKey ( publicKey )
if err != nil {
return
}
block = & pem . Block {
Type : "RSA PUBLIC KEY" ,
Bytes : derPkix ,
}
pubkey = pem . EncodeToMemory ( block )
return
}
func ParseRSAPrivateKeyFromPEM ( pemData string ) ( * rsa . PrivateKey , error ) {
var keyData = [ ] byte ( pemData )
// 解析PEM块
block , _ := pem . Decode ( keyData )
if block == nil {
return nil , fmt . Errorf ( "private key error not block in file" )
}
// 解析RSA私钥
privateKey , err := x509 . ParsePKCS1PrivateKey ( block . Bytes )
if err != nil {
return nil , err
}
return privateKey , nil
}
func parsePrivateKey ( privateKeyFile string ) ( * rsa . PrivateKey , error ) {
// 读取私钥文件
keyData , err := ioutil . ReadFile ( privateKeyFile )
if err != nil {
return nil , err
}
// 解析PEM块
block , _ := pem . Decode ( keyData )
if block == nil {
return nil , fmt . Errorf ( "private key error not block in file" )
}
// 解析RSA私钥
privateKey , err := x509 . ParsePKCS1PrivateKey ( block . Bytes )
if err != nil {
return nil , err
}
return privateKey , nil
}
//生成RSA私钥和公钥, 保存到文件中
func GenerateRSAKey ( bits int ) {
//GenerateKey函数使用随机数据生成器random生成一对具有指定字位数的RSA密钥
//Reader是一个全局、共享的密码用强随机数生成器
privateKey , err := rsa . GenerateKey ( rand . Reader , bits )
if err != nil {
return
}
//保存私钥
//通过x509标准将得到的ras私钥序列化为ASN.1 的 DER编码字符串
// X509PrivateKey := x509.MarshalPKCS1PrivateKey(privateKey) // PKCS1 和 9 是不一致的
X509PrivateKey , err := x509 . MarshalPKCS8PrivateKey ( privateKey )
if err != nil {
fmt . Println ( err . Error ( ) )
os . Exit ( 0 )
}
//使用pem格式对x509输出的内容进行编码
//创建文件保存私钥
privateFile , err := os . Create ( "private.pem" )
if err != nil {
return
}
//构建一个pem.Block结构体对象
privateBlock := pem . Block { Type : "PRIVATE KEY" , Bytes : X509PrivateKey }
//将数据保存到文件
pem . Encode ( privateFile , & privateBlock )
//保存公钥
//获取公钥的数据
publicKey := privateKey . PublicKey
//X509对公钥编码
X509PublicKey , err := x509 . MarshalPKIXPublicKey ( & publicKey )
if err != nil {
return
}
//pem格式编码
//创建用于保存公钥的文件
publicFile , err := os . Create ( "public.pem" )
if err != nil {
return
}
//创建一个pem.Block结构体对象
publicBlock := pem . Block { Type : "Public Key" , Bytes : X509PublicKey }
//保存到文件
pem . Encode ( publicFile , & publicBlock )
}
// RSA_Decrypts RSA解密支持分段解密
func RSA_Decrypts ( cipherText [ ] byte , path string ) [ ] byte {
//打开文件
var bytesDecrypt [ ] byte
//file, err := os.Open(path)
//if err != nil {
// fmt.Println(err)
//}
//
////获取文件内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取私钥文件
keyData , err := ioutil . ReadFile ( path )
if err != nil {
return nil
}
//pem解码
block , _ := pem . Decode ( keyData )
//X509解码
privateKey , err := x509 . ParsePKCS8PrivateKey ( block . Bytes )
if err != nil {
fmt . Println ( err . Error ( ) )
os . Exit ( 0 )
}
p := privateKey . ( * rsa . PrivateKey )
keySize := p . Size ( )
srcSize := len ( cipherText )
log . Println ( "密钥长度" , keySize , "密文长度" , srcSize )
var offSet = 0
var buffer = bytes . Buffer { }
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce , err := rsa . DecryptPKCS1v15 ( rand . Reader , p , cipherText [ offSet : endIndex ] )
if err != nil {
return nil
}
buffer . Write ( bytesOnce )
offSet = endIndex
}
bytesDecrypt = buffer . Bytes ( )
return bytesDecrypt
}
// RSA_DecryptsOne RSA解密支持分段解密
func RSA_DecryptsOne ( cipherText [ ] byte , privateKeyStr string ) [ ] byte {
//打开文件
var bytesDecrypt [ ] byte
// 读取私钥文件转成字节数组
keyData := [ ] byte ( privateKeyStr )
//pem解码
block , _ := pem . Decode ( keyData )
//X509解码
privateKey , err := x509 . ParsePKCS8PrivateKey ( block . Bytes )
if err != nil {
fmt . Println ( err . Error ( ) )
os . Exit ( 0 )
}
p := privateKey . ( * rsa . PrivateKey )
keySize := p . Size ( )
srcSize := len ( cipherText )
log . Println ( "密钥长度" , keySize , "密文长度" , srcSize )
var offSet = 0
var buffer = bytes . Buffer { }
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce , err := rsa . DecryptPKCS1v15 ( rand . Reader , p , cipherText [ offSet : endIndex ] )
if err != nil {
return nil
}
buffer . Write ( bytesOnce )
offSet = endIndex
}
bytesDecrypt = buffer . Bytes ( )
return bytesDecrypt
}
// RsaEncryptBlock 公钥加密-分段
func RsaEncryptBlock ( src [ ] byte , path string ) ( bytesEncrypt [ ] byte , err error ) {
//打开文件
//file, err := os.Open(path)
//if err != nil {
// return
//}
//
////读取文件的内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取公钥文件
keyData , err := ioutil . ReadFile ( path )
if err != nil {
return nil , err
}
//pem解码
block , _ := pem . Decode ( keyData )
//x509解码
publicKeyInterface , err := x509 . ParsePKIXPublicKey ( block . Bytes )
if err != nil {
fmt . Println ( err )
}
//类型断言
publicKey := publicKeyInterface . ( * rsa . PublicKey )
keySize , srcSize := publicKey . Size ( ) , len ( src )
log . Println ( "密钥长度" , keySize , "明文长度" , srcSize )
offSet , once := 0 , keySize - 11
buffer := bytes . Buffer { }
for offSet < srcSize {
endIndex := offSet + once
if endIndex > srcSize {
endIndex = srcSize
}
// 加密一部分
bytesOnce , err := rsa . EncryptPKCS1v15 ( rand . Reader , publicKey , src [ offSet : endIndex ] )
if err != nil {
return nil , err
}
buffer . Write ( bytesOnce )
offSet = endIndex
}
bytesEncrypt = buffer . Bytes ( )
return
}
func timing ( client mqtt . Client ) {
//定时器, 10秒钟执行一次
ticker := time . NewTicker ( 10 * time . Second )
for {
time := <- ticker . C
fmt . Println ( "定时器====>" , time . String ( ) )
if ! client . IsConnectionOpen ( ) {
client . Connect ( )
} else {
fmt . Println ( "client has connect" )
//moreDBConnect()
//db, err := getMySqlDB()
//if err == nil {
// queryOneDataByMySql(db)
// //mydb.Close()
//}
//GenerateRSAKey(2048)
//publicPath := "public.pem"
//privatePath := "private.pem"
//
//var a = []byte("jack")
//encrptTxt, err := RsaEncryptBlock(a, publicPath)
//if err != nil {
// fmt.Println(err.Error())
//}
//fmt.Println("加密后的字符串:")
//encodeString := base64.StdEncoding.EncodeToString(encrptTxt)
//fmt.Println(encodeString)
//fmt.Println("-------------------")
//decodeByte, err := base64.StdEncoding.DecodeString(encodeString)
//if err != nil {
// fmt.Println(err.Error())
//}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
//fmt.Println("解密后的字符串:")
//fmt.Println(string(decrptCode))
}
}
}
var messagePubHandler mqtt . MessageHandler = func ( client mqtt . Client , msg mqtt . Message ) {
fmt . Printf ( "Received message: %s from topic: %s\n" , msg . Payload ( ) , msg . Topic ( ) )
message := fmt . Sprintf ( "%s" , msg . Payload ( ) )
topic := fmt . Sprintf ( "%s" , msg . Topic ( ) )
fmt . Println ( message )
fmt . Println ( topic )
privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem"
if topic == "app_push" {
fmt . Println ( "珠海电厂APP收到mqtt消息" )
decodeByte , err := base64 . StdEncoding . DecodeString ( message )
if err != nil {
fmt . Println ( err . Error ( ) )
}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
privateKeyPEM := ` -- -- - BEGIN PRIVATE KEY -- -- -
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCYwvInDUICXbmWcOAR5hm86mz7WlKL3dDf12MSeIY2jm5QpUSSoMhrJOWLbOt5fXQepPaNO0M30A + 3 C4SFZrM / 9 WA5ehazO1u1m1LAkYzoSDRH3MMJsrJC3lCGDeOROteu + safuP9k / npU1YQu / + Ll2xEJNxyvUx4jLGM4LamiMI6ytM3gdnOAGP4YRQo9Etwo6I986yg / seCQC5rza9M4iBamoin7U8h9yOMKrM6xK / k9CcY / vn5 + Uhe3Pvk4qj / 2 Ff3OXkkc1wfdILqdLwLOKL0Tb3ZciwG0p1CKO80yf5hyYoWjqZk5Rcd07nTo2gqTFGfLl9sqI9 / + ipMDtnHlAgMBAAECggEATJd5yCC6lusdMRO5FOBUyUaUi9X2i1AU + RZKAynQySvSnbavUgExW58tRCHBUrGW9gJp59ft1N8J8hHhSO18NDY4H7laBlVdnwmYjRqtFo2VQO6sD4G8JRDION5f2iIxn / b2fYDI9H8vILfJRbNgtTSILyGlzTYUZzhLKxCh + 8 IsN96Nic8wa5COd1vZZmdhf2y8TG8clFWmozaScNSAATx7y + 8 XLVWjjWiIRZ6xQvx0uQPUParc9KihXXTKR2pA22yPIdz + U4MGD4kC0eczlcFKZ / dYv9e7OIGgnJfT0idSCu7nYb1pxJ1LxD9fS6IScNTF5dSe0OIL98e + XdyoAQKBgQDRep + 5 cW4iAKrEMH + djmcXAkoMiYtNVtnu0efLE8dP6vjYytQi368X9SdcASbfrQ31eEZmr / xQnlUF8oyHGkI38YS8dpAHzQcrkP3BljbbzB / 3 gJZaUdghGsDrK0xAJIzzmFKQpeKnGtr23vxUgaGrNsCYvQ0eQ7 + 5056 KXS4r5QKBgQC6r8xtRSaje6L4WIydjWvYywsmRO0Of0aJLMDA / Wt2MWhHfh7ba9oI1cKGN80ap7xB2a9lQLgpv + C53wNtE5SpvjxsikAj96nUMMhGy9ojXrUith6HQhiINETz6Shnznd + AyrXP6KI / RpfA5nkDB5nrJxODwtYLP467IL7Cv7OAQKBgQCl4KxKdH / 5 fP28jYsAgJsxpSZt9xzQCU5Zxu396ZOSvUaApVyGoQpNtluMh3z48lhzYOKevgzW6gn5w69z7F8zXZT2iAxVoQ1kelP2z7RxKJrHqpNkwhqbXEwX7RlcUZUr8BqxYCqymJl7k + fMIzqaEalBSbLxnEReKi0I8 / Bz4QKBgHK4b0ZCtVDHPEmimJ6E9l4dv / c / afF7swu + zaCK2ouiJvOwBCRQbYb6XPR / u / GCXASXUdpF4CX / vIhcDE3uN2 / r8FO + zVWM7vbvF1OyF5WesG7pPW9e5ZZlkG3WvLa1wOZV6fCmMSo / ZwI2Q05JSDHrd43cXttLotrw1jiQ9C4BAoGBAKi4SOoOVQ5J5HQCDkBwPbG1AOLHFinzfoDl26GF / 8 Hy7fmmd1JiRTFldQp / A9VTAABz3sVYmMB92HSIaJhuDMoYJNI2Cf / cZifsv7vUL8cbLn + lPsKsebiuB0m0g4P2qLwLfegfNGEgA7lA5HIz3SELqbdp3iuqJeQl1fsJqD74
-- -- - END PRIVATE KEY -- -- - `
decrptCode := RSA_DecryptsOne ( decodeByte , privateKeyPEM )
fmt . Println ( "解密后的字符串:" )
fmt . Println ( string ( decrptCode ) )
fmt . Println ( "-----user--------" )
var user models . User
json . Unmarshal ( decrptCode , & user )
fmt . Println ( user )
if models . GetUser ( & user ) {
models . UpdateUser ( & user )
} else {
models . SaveUser ( & user )
}
} else if topic == "app_push_dyw" {
fmt . Println ( "大亚湾电厂APP收到mqtt消息" )
decodeByte , err := base64 . StdEncoding . DecodeString ( message )
if err != nil {
fmt . Println ( err . Error ( ) )
}
decrptCode := RSA_Decrypts ( decodeByte , privatePath )
fmt . Println ( "解密后的字符串:" )
fmt . Println ( string ( decrptCode ) )
fmt . Println ( "-----userdyw--------" )
var user models . Userdyw
json . Unmarshal ( decrptCode , & user )
fmt . Println ( user )
if models . GetUserdyw ( & user ) {
models . UpdateUserdyw ( & user )
} else {
models . SaveUserdyw ( & user )
}
} else {
result := strings . Split ( message , " " )
fmt . Println ( result [ 0 ] )
fmt . Println ( result [ 1 ] )
now := time . Now ( )
temperature := new ( models . Temperature )
temperature . CreateDate = now
temperature . DataDate = now . Format ( "2006-01-02" )
temperature . DataHour = now . Format ( "2006-01-02 15" )
temperature . DataMinute = now . Format ( "2006-01-02 15:04" )
temperature . Humidity = result [ 0 ]
temperature . Temperature = result [ 1 ]
temperature . Topic = topic
if topic == "WifiSHT/7C87CE9CA4E6/SHT20" {
temperature . LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
}
if topic == "WifiSHT/7C87CE9F5CBF/SHT20" {
temperature . LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
}
if topic == "WifiSHT/4CEBD686B6AA/SHT20" {
temperature . LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
}
fmt . Println ( "测温传感器地点:" , temperature . LocationDesc )
models . SaveTemperature ( temperature )
}
//uuid := uuid.New()
//models.SaveProduct()
//models.SaveUser(uuid.String())
//models.SaveOrder()
//var users []models.User
//var orders []models.Order
//var user models.User
//mydb.DB.Where("name = ?", "wenfei").First(&user)
//fmt.Println(user)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录
//fmt.Println("users:", users)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders)
//fmt.Println("orders:", orders)
// 原生 SQL 查询
//var products []models.Product
//mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息
//fmt.Println("Products:", products)
}
var connectHandler mqtt . OnConnectHandler = func ( client mqtt . Client ) {
fmt . Println ( "Connected" )
subTemperature ( client )
subAppPush ( client )
subAppPushDyw ( client )
}
var connectLostHandler mqtt . ConnectionLostHandler = func ( client mqtt . Client , err error ) {
fmt . Printf ( "Connect lost: %v" , err )
}
func sub ( client mqtt . Client ) {
topic := "gomqtt01"
token := client . Subscribe ( topic , 1 , nil )
token . Wait ( )
fmt . Printf ( "Subscribed to topic: %s" , topic )
}
func subTemperature ( client mqtt . Client ) {
topic := "WifiSHT/+/SHT20"
token := client . Subscribe ( topic , 2 , nil )
token . Wait ( )
fmt . Printf ( "Subscribed to topic: %s" , topic )
}
func subAppPush ( client mqtt . Client ) {
topic := "app_push"
token := client . Subscribe ( topic , 2 , nil )
token . Wait ( )
fmt . Printf ( "Subscribed to topic: %s" , topic )
}
func subAppPushDyw ( client mqtt . Client ) {
topic := "app_push_dyw"
token := client . Subscribe ( topic , 2 , nil )
token . Wait ( )
fmt . Printf ( "Subscribed to topic: %s" , topic )
}
func publish ( client mqtt . Client ) {
num := 999999000000000000
for i := 0 ; i < num ; i ++ {
text := fmt . Sprintf ( "Message %d" , i )
token := client . Publish ( "gomqtt01" , 0 , false , text )
token . Wait ( )
fmt . Println ( i )
time . Sleep ( 6 * time . Second )
}
}
func main ( ) {
uuid := uuid . New ( )
fmt . Println ( "Generated UUID:" , uuid )
var broker = "47.242.184.139"
var port = 1883
opts := mqtt . NewClientOptions ( )
opts . AddBroker ( fmt . Sprintf ( "tcp://%s:%d" , broker , port ) )
opts . SetClientID ( uuid . String ( ) )
opts . SetUsername ( "admin" )
opts . SetPassword ( "publish452131wW452131wW$" )
opts . SetDefaultPublishHandler ( messagePubHandler )
opts . OnConnect = connectHandler
opts . OnConnectionLost = connectLostHandler
client := mqtt . NewClient ( opts )
if token := client . Connect ( ) ; token . Wait ( ) && token . Error ( ) != nil {
fmt . Println ( token . Error ( ) )
}
//client.IsConnectionOpen()
go timing ( client )
//sub(client)
subTemperature ( client )
subAppPush ( client )
subAppPushDyw ( client )
publish ( client )
//client.Disconnect(1000)
}