You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

702 lines
21 KiB
Go

9 months ago
package main
import (
8 months ago
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
9 months ago
"database/sql"
8 months ago
"encoding/base64"
"encoding/json"
"encoding/pem"
9 months ago
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"go_mqtt/models"
_ "go_mqtt/models"
"go_mqtt/mydb"
_ "go_mqtt/mydb"
8 months ago
"io/ioutil"
9 months ago
"log"
8 months ago
"os"
9 months ago
"strings"
"time"
)
func getMySqlDB() (*sql.DB, error) {
9 months ago
dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local"
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
return nil, err
9 months ago
}
db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期
db.SetMaxOpenConns(50) // 最大连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
9 months ago
//defer mydb.Close()
err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。
if err != nil {
// 处理连接失败的情况。例如,打印错误信息并退出程序。
log.Println("47.242.184.139:3306连接失败")
fmt.Println("Connection failed:", err)
return nil, err
9 months ago
} else {
log.Println("47.242.184.139:3306连接成功")
fmt.Println("Connected successfully")
//复制代码到你的Go文件中并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。
//然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。
//如果遇到任何问题检查你的DSN是否正确以及MySQL服务器是否正在运行并允许连接。
//确保你的防火墙设置允许你的应用程序访问MySQL端口默认是3306
//如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。
return db, nil
9 months ago
}
}
func moreDBConnect() {
db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306)
if err != nil {
fmt.Println("Error connecting to db1:", err)
return
}
defer db1.Db.Close()
fmt.Println("Connected to databases:", db1.Name)
//db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306)
//if err != nil {
// fmt.Println("Error connecting to db2:", err)
// return
//}
//defer db2.Conn.Close()
//
//fmt.Println("Connected to databases:", db1.Name, "and", db2.Name)
}
func queryOneDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
var temperature models.Temperature
err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?", 1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
9 months ago
if err != nil {
fmt.Println("QueryRow failed:", err)
return
}
fmt.Printf("Temperature Id:%d\n", temperature.Id)
fmt.Printf("Temperature: %+v\n", temperature)
}
func queryMoreDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0, 30)
9 months ago
if err != nil {
fmt.Println("Query failed:", err)
return
}
defer rows.Close() // 确保在使用完毕后关闭资源
9 months ago
items := make([]models.Temperature, 0)
for rows.Next() {
var temperature models.Temperature
if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil {
fmt.Println("Scan failed:", err)
return
}
items = append(items, temperature)
fmt.Println(items)
}
}
// 插入单条数据
func insertDataByMySql(db *sql.DB, temperature *models.Temperature) {
9 months ago
//插入操作:
stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)")
if err != nil {
fmt.Println("Prepare failed:", err)
return
}
res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
if err != nil {
fmt.Println("Exec failed:", err)
return
}
lastID, _ := res.LastInsertId()
fmt.Printf("Inserted Temperature with ID: %d\n", lastID)
}
// 事务处理:保证数据一致性
func insertMoreDataByMySql(db *sql.DB, temperature *models.Temperature) {
9 months ago
tx, err := db.Begin()
if err != nil {
fmt.Println("Begin transaction failed:", err)
return
}
_, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1)
if err != nil {
tx.Rollback() // 如果发生错误,回滚事务
9 months ago
fmt.Println("Exec failed:", err)
return
}
err = tx.Commit() // 提交事务
9 months ago
if err != nil {
fmt.Println("Commit failed:", err)
return
}
fmt.Println("Transaction committed successfully")
}
func dbTest() {
// 初始化数据库连接
DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test")
if err != nil {
fmt.Println("Error opening DB:", err)
return
}
defer DB.Close()
DB.SetConnMaxLifetime(time.Minute * 5)
DB.SetMaxOpenConns(50)
DB.SetMaxIdleConns(10)
// 测试数据库连接
if err := DB.Ping(); err != nil {
fmt.Println("Connection failed:", err)
return
}
fmt.Println("Connected successfully")
// 查询数据
rows, _ := DB.Query("SELECT id, name FROM users")
defer rows.Close()
for rows.Next() {
var id int
var name string
rows.Scan(&id, &name)
fmt.Printf("ID: %d, Name: %s\n", id, name)
}
// 插入数据
stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
stmt.Exec("Jane Doe", 28)
// 删除数据
stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?")
stmt.Exec(1)
}
8 months ago
// GenRsaKey generates an PKCS#1 RSA keypair of the given bit size in PEM format.
func GenRsaKey(bits int) (prvkey, pubkey []byte, err error) {
// Generates private key.
privateKey, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return
}
derStream := x509.MarshalPKCS1PrivateKey(privateKey)
block := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: derStream,
}
prvkey = pem.EncodeToMemory(block)
// Generates public key from private key.
publicKey := &privateKey.PublicKey
derPkix, err := x509.MarshalPKIXPublicKey(publicKey)
if err != nil {
return
}
block = &pem.Block{
Type: "RSA PUBLIC KEY",
Bytes: derPkix,
}
pubkey = pem.EncodeToMemory(block)
return
}
func ParseRSAPrivateKeyFromPEM(pemData string) (*rsa.PrivateKey, error) {
var keyData = []byte(pemData)
// 解析PEM块
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("private key error not block in file")
}
// 解析RSA私钥
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return privateKey, nil
}
8 months ago
func parsePrivateKey(privateKeyFile string) (*rsa.PrivateKey, error) {
// 读取私钥文件
keyData, err := ioutil.ReadFile(privateKeyFile)
if err != nil {
return nil, err
}
// 解析PEM块
block, _ := pem.Decode(keyData)
if block == nil {
return nil, fmt.Errorf("private key error not block in file")
}
// 解析RSA私钥
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return privateKey, nil
}
//生成RSA私钥和公钥保存到文件中
func GenerateRSAKey(bits int) {
//GenerateKey函数使用随机数据生成器random生成一对具有指定字位数的RSA密钥
//Reader是一个全局、共享的密码用强随机数生成器
privateKey, err := rsa.GenerateKey(rand.Reader, bits)
if err != nil {
return
}
//保存私钥
//通过x509标准将得到的ras私钥序列化为ASN.1 的 DER编码字符串
// X509PrivateKey := x509.MarshalPKCS1PrivateKey(privateKey) // PKCS1 和 9 是不一致的
X509PrivateKey, err := x509.MarshalPKCS8PrivateKey(privateKey)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
//使用pem格式对x509输出的内容进行编码
//创建文件保存私钥
privateFile, err := os.Create("private.pem")
if err != nil {
return
}
//构建一个pem.Block结构体对象
privateBlock := pem.Block{Type: "PRIVATE KEY", Bytes: X509PrivateKey}
//将数据保存到文件
pem.Encode(privateFile, &privateBlock)
//保存公钥
//获取公钥的数据
publicKey := privateKey.PublicKey
//X509对公钥编码
X509PublicKey, err := x509.MarshalPKIXPublicKey(&publicKey)
if err != nil {
return
}
//pem格式编码
//创建用于保存公钥的文件
publicFile, err := os.Create("public.pem")
if err != nil {
return
}
//创建一个pem.Block结构体对象
publicBlock := pem.Block{Type: "Public Key", Bytes: X509PublicKey}
//保存到文件
pem.Encode(publicFile, &publicBlock)
}
// RSA_Decrypts RSA解密支持分段解密
func RSA_Decrypts(cipherText []byte, path string) []byte {
//打开文件
var bytesDecrypt []byte
//file, err := os.Open(path)
//if err != nil {
// fmt.Println(err)
//}
//
////获取文件内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取私钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil
}
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
// RSA_DecryptsOne RSA解密支持分段解密
func RSA_DecryptsOne(cipherText []byte, privateKeyStr string) []byte {
//打开文件
var bytesDecrypt []byte
// 读取私钥文件转成字节数组
keyData := []byte(privateKeyStr)
//pem解码
block, _ := pem.Decode(keyData)
//X509解码
privateKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
fmt.Println(err.Error())
os.Exit(0)
}
p := privateKey.(*rsa.PrivateKey)
keySize := p.Size()
srcSize := len(cipherText)
log.Println("密钥长度", keySize, "密文长度", srcSize)
var offSet = 0
var buffer = bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + keySize
if endIndex > srcSize {
endIndex = srcSize
}
bytesOnce, err := rsa.DecryptPKCS1v15(rand.Reader, p, cipherText[offSet:endIndex])
if err != nil {
return nil
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesDecrypt = buffer.Bytes()
return bytesDecrypt
}
8 months ago
// RsaEncryptBlock 公钥加密-分段
func RsaEncryptBlock(src []byte, path string) (bytesEncrypt []byte, err error) {
//打开文件
//file, err := os.Open(path)
//if err != nil {
// return
//}
//
////读取文件的内容
//info, _ := file.Stat()
//buf := make([]byte, info.Size())
//file.Read(buf)
// 读取公钥文件
keyData, err := ioutil.ReadFile(path)
if err != nil {
return nil,err
}
//pem解码
block, _ := pem.Decode(keyData)
//x509解码
publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
fmt.Println(err)
}
//类型断言
publicKey := publicKeyInterface.(*rsa.PublicKey)
keySize, srcSize := publicKey.Size(), len(src)
log.Println("密钥长度", keySize, "明文长度", srcSize)
offSet, once := 0, keySize-11
buffer := bytes.Buffer{}
for offSet < srcSize {
endIndex := offSet + once
if endIndex > srcSize {
endIndex = srcSize
}
// 加密一部分
bytesOnce, err := rsa.EncryptPKCS1v15(rand.Reader, publicKey, src[offSet:endIndex])
if err != nil {
return nil, err
}
buffer.Write(bytesOnce)
offSet = endIndex
}
bytesEncrypt = buffer.Bytes()
return
}
9 months ago
func timing(client mqtt.Client) {
//定时器10秒钟执行一次
ticker := time.NewTicker(10 * time.Second)
9 months ago
for {
time := <-ticker.C
fmt.Println("定时器====>", time.String())
if !client.IsConnectionOpen() {
client.Connect()
} else {
fmt.Println("client has connect")
//moreDBConnect()
//db, err := getMySqlDB()
//if err == nil {
// queryOneDataByMySql(db)
// //mydb.Close()
//}
8 months ago
//GenerateRSAKey(2048)
//publicPath := "public.pem"
//privatePath := "private.pem"
//
//var a = []byte("jack")
//encrptTxt, err := RsaEncryptBlock(a, publicPath)
//if err != nil {
// fmt.Println(err.Error())
//}
//fmt.Println("加密后的字符串:")
//encodeString := base64.StdEncoding.EncodeToString(encrptTxt)
//fmt.Println(encodeString)
//fmt.Println("-------------------")
//decodeByte, err := base64.StdEncoding.DecodeString(encodeString)
//if err != nil {
// fmt.Println(err.Error())
//}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
//fmt.Println("解密后的字符串:")
//fmt.Println(string(decrptCode))
9 months ago
}
}
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
message := fmt.Sprintf("%s", msg.Payload())
topic := fmt.Sprintf("%s", msg.Topic())
9 months ago
fmt.Println(message)
8 months ago
fmt.Println(topic)
8 months ago
//privatePath := "/Users/edao/GolandProjects/go_mqtt/privateKey.pem"
privateKeyPEM := `-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCYwvInDUICXbmWcOAR5hm86mz7WlKL3dDf12MSeIY2jm5QpUSSoMhrJOWLbOt5fXQepPaNO0M30A+3C4SFZrM/9WA5ehazO1u1m1LAkYzoSDRH3MMJsrJC3lCGDeOROteu+safuP9k/npU1YQu/+Ll2xEJNxyvUx4jLGM4LamiMI6ytM3gdnOAGP4YRQo9Etwo6I986yg/seCQC5rza9M4iBamoin7U8h9yOMKrM6xK/k9CcY/vn5+Uhe3Pvk4qj/2Ff3OXkkc1wfdILqdLwLOKL0Tb3ZciwG0p1CKO80yf5hyYoWjqZk5Rcd07nTo2gqTFGfLl9sqI9/+ipMDtnHlAgMBAAECggEATJd5yCC6lusdMRO5FOBUyUaUi9X2i1AU+RZKAynQySvSnbavUgExW58tRCHBUrGW9gJp59ft1N8J8hHhSO18NDY4H7laBlVdnwmYjRqtFo2VQO6sD4G8JRDION5f2iIxn/b2fYDI9H8vILfJRbNgtTSILyGlzTYUZzhLKxCh+8IsN96Nic8wa5COd1vZZmdhf2y8TG8clFWmozaScNSAATx7y+8XLVWjjWiIRZ6xQvx0uQPUParc9KihXXTKR2pA22yPIdz+U4MGD4kC0eczlcFKZ/dYv9e7OIGgnJfT0idSCu7nYb1pxJ1LxD9fS6IScNTF5dSe0OIL98e+XdyoAQKBgQDRep+5cW4iAKrEMH+djmcXAkoMiYtNVtnu0efLE8dP6vjYytQi368X9SdcASbfrQ31eEZmr/xQnlUF8oyHGkI38YS8dpAHzQcrkP3BljbbzB/3gJZaUdghGsDrK0xAJIzzmFKQpeKnGtr23vxUgaGrNsCYvQ0eQ7+5056KXS4r5QKBgQC6r8xtRSaje6L4WIydjWvYywsmRO0Of0aJLMDA/Wt2MWhHfh7ba9oI1cKGN80ap7xB2a9lQLgpv+C53wNtE5SpvjxsikAj96nUMMhGy9ojXrUith6HQhiINETz6Shnznd+AyrXP6KI/RpfA5nkDB5nrJxODwtYLP467IL7Cv7OAQKBgQCl4KxKdH/5fP28jYsAgJsxpSZt9xzQCU5Zxu396ZOSvUaApVyGoQpNtluMh3z48lhzYOKevgzW6gn5w69z7F8zXZT2iAxVoQ1kelP2z7RxKJrHqpNkwhqbXEwX7RlcUZUr8BqxYCqymJl7k+fMIzqaEalBSbLxnEReKi0I8/Bz4QKBgHK4b0ZCtVDHPEmimJ6E9l4dv/c/afF7swu+zaCK2ouiJvOwBCRQbYb6XPR/u/GCXASXUdpF4CX/vIhcDE3uN2/r8FO+zVWM7vbvF1OyF5WesG7pPW9e5ZZlkG3WvLa1wOZV6fCmMSo/ZwI2Q05JSDHrd43cXttLotrw1jiQ9C4BAoGBAKi4SOoOVQ5J5HQCDkBwPbG1AOLHFinzfoDl26GF/8Hy7fmmd1JiRTFldQp/A9VTAABz3sVYmMB92HSIaJhuDMoYJNI2Cf/cZifsv7vUL8cbLn+lPsKsebiuB0m0g4P2qLwLfegfNGEgA7lA5HIz3SELqbdp3iuqJeQl1fsJqD74
-----END PRIVATE KEY-----`
8 months ago
if topic == "app_push" {
fmt.Println("珠海电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
//decrptCode := RSA_Decrypts(decodeByte, privatePath)
8 months ago
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
8 months ago
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----user--------")
var user models.User
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUser(&user) {
models.UpdateUser(&user)
}else{
models.SaveUser(&user)
}
}else if topic == "app_push_dyw" {
fmt.Println("大亚湾电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
8 months ago
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
8 months ago
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
8 months ago
fmt.Println("-----userdyw--------")
var user models.Userdyw
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUserdyw(&user) {
models.UpdateUserdyw(&user)
}else{
models.SaveUserdyw(&user)
}
8 months ago
7 months ago
}else if topic == "app_push_yf" {
fmt.Println("云浮电厂APP收到mqtt消息")
decodeByte, err := base64.StdEncoding.DecodeString(message)
if err != nil {
fmt.Println(err.Error())
}
decrptCode := RSA_DecryptsOne(decodeByte, privateKeyPEM)
fmt.Println("解密后的字符串:")
fmt.Println(string(decrptCode))
fmt.Println("-----userdyw--------")
var user models.Useryf
json.Unmarshal(decrptCode, &user)
fmt.Println(user)
if models.GetUseryf(&user) {
models.UpdateUseryf(&user)
}else{
models.SaveUseryf(&user)
}
8 months ago
}else{
result := strings.Split(message, " ")
fmt.Println(result[0])
fmt.Println(result[1])
now := time.Now()
temperature := new(models.Temperature)
temperature.CreateDate = now
temperature.DataDate = now.Format("2006-01-02")
temperature.DataHour = now.Format("2006-01-02 15")
temperature.DataMinute = now.Format("2006-01-02 15:04")
temperature.Humidity = result[0]
temperature.Temperature = result[1]
temperature.Topic = topic
if topic == "WifiSHT/7C87CE9CA4E6/SHT20" {
temperature.LocationDesc = "广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
}
if topic == "WifiSHT/7C87CE9F5CBF/SHT20" {
temperature.LocationDesc = "广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
}
if topic == "WifiSHT/4CEBD686B6AA/SHT20" {
temperature.LocationDesc = "广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
}
fmt.Println("测温传感器地点:", temperature.LocationDesc)
models.SaveTemperature(temperature)
}
//uuid := uuid.New()
//models.SaveProduct()
//models.SaveUser(uuid.String())
//models.SaveOrder()
//var users []models.User
//var orders []models.Order
//var user models.User
//mydb.DB.Where("name = ?", "wenfei").First(&user)
//fmt.Println(user)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Order("id desc").Limit(2).Offset(0).Find(&users) // 按价格降序排序,取前 10 条记录
//fmt.Println("users:", users)
// 查询多条记录
//mydb.DB.Find(&users) // 查询所有产品信息
//mydb.DB.Where("id > ? and id < ?", 60,65).Find(&orders)
//fmt.Println("orders:", orders)
// 原生 SQL 查询
//var products []models.Product
//mydb.DB.Raw("SELECT * FROM product WHERE price > ?", 1000).Scan(&products) // 查询价格大于 1000 的产品信息
//fmt.Println("Products:", products)
9 months ago
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
8 months ago
subTemperature(client)
8 months ago
subAppPush(client)
subAppPushDyw(client)
9 months ago
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func sub(client mqtt.Client) {
topic := "gomqtt01"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subTemperature(client mqtt.Client) {
topic := "WifiSHT/+/SHT20"
token := client.Subscribe(topic, 2, nil)
9 months ago
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
8 months ago
func subAppPush(client mqtt.Client) {
topic := "app_push"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subAppPushDyw(client mqtt.Client) {
topic := "app_push_dyw"
token := client.Subscribe(topic, 2, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
9 months ago
func publish(client mqtt.Client) {
num := 999999000000000000
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("gomqtt01", 0, false, text)
token.Wait()
fmt.Println(i)
time.Sleep(6 * time.Second)
}
}
func main() {
uuid := uuid.New()
fmt.Println("Generated UUID:", uuid)
var broker = "47.242.184.139"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID(uuid.String())
opts.SetUsername("admin")
opts.SetPassword("publish452131wW452131wW$")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
//client.IsConnectionOpen()
go timing(client)
//sub(client)
subTemperature(client)
8 months ago
subAppPush(client)
subAppPushDyw(client)
9 months ago
publish(client)
//client.Disconnect(1000)
}