You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
9.1 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"database/sql"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"go_mqtt/models"
_ "go_mqtt/models"
"go_mqtt/mydb"
_ "go_mqtt/mydb"
"log"
"strings"
"time"
)
func getMySqlDB() (*sql.DB, error){
dsn := "root:Skyinno251,@tcp(47.242.184.139:3306)/appserver?charset=utf8mb4&parseTime=True&loc=Local"
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
return nil,err
}
db.SetConnMaxLifetime(time.Minute * 5) // 连接最大生命周期
db.SetMaxOpenConns(50) // 最大连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
//defer mydb.Close()
err = db.Ping() // 检查连接是否成功建立。如果失败,则返回错误。
if err != nil {
// 处理连接失败的情况。例如,打印错误信息并退出程序。
log.Println("47.242.184.139:3306连接失败")
fmt.Println("Connection failed:", err)
return nil,err
} else {
log.Println("47.242.184.139:3306连接成功")
fmt.Println("Connected successfully")
//复制代码到你的Go文件中并根据你的实际情况修改DSN中的用户名、密码、主机名、端口和数据库名。
//然后运行你的程序。如果一切正常,你应该能够看到查询结果被打印出来。
//如果遇到任何问题检查你的DSN是否正确以及MySQL服务器是否正在运行并允许连接。
//确保你的防火墙设置允许你的应用程序访问MySQL端口默认是3306
//如果你使用的是远程数据库,确保网络设置允许你从当前位置访问数据库服务器。
return db,nil
}
}
func moreDBConnect() {
db1, err := mydb.NewDatabase("appserver", "root", "Skyinno251,", "47.242.184.139", 3306)
if err != nil {
fmt.Println("Error connecting to db1:", err)
return
}
defer db1.Db.Close()
fmt.Println("Connected to databases:", db1.Name)
//db2, err := mydb.NewDatabase("db2", "user2", "password2", "localhost", 3306)
//if err != nil {
// fmt.Println("Error connecting to db2:", err)
// return
//}
//defer db2.Conn.Close()
//
//fmt.Println("Connected to databases:", db1.Name, "and", db2.Name)
}
func queryOneDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
var temperature models.Temperature
err := db.QueryRow("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature order by id desc limit 0,?",1).Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
if err != nil {
fmt.Println("QueryRow failed:", err)
return
}
fmt.Printf("Temperature Id:%d\n", temperature.Id)
fmt.Printf("Temperature: %+v\n", temperature)
}
func queryMoreDataByMySql(db *sql.DB) {
//DB.QueryRow 用于查询单行数据,返回的是一个单独的 Row 对象。和 Query 不同,它只返回一行结果,适合用于查询根据条件唯一的数据:
rows, err := db.Query("SELECT id,create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic FROM temperature WHERE id > ? and id < ?", 0,30)
if err != nil {
fmt.Println("Query failed:", err)
return
}
defer rows.Close() // 确保在使用完毕后关闭资源
items := make([]models.Temperature, 0)
for rows.Next() {
var temperature models.Temperature
if err := rows.Scan(&temperature.Id, &temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic); err != nil {
fmt.Println("Scan failed:", err)
return
}
items = append(items, temperature)
fmt.Println(items)
}
}
//插入单条数据
func insertDataByMySql(db *sql.DB,temperature *models.Temperature) {
//插入操作:
stmt, err := db.Prepare("INSERT INTO temperature(create_date,data_date,data_hour,data_minute,humidity,location_desc,temperature,topic) VALUES(?, ?,?, ?,?, ?,?, ?)")
if err != nil {
fmt.Println("Prepare failed:", err)
return
}
res, err := stmt.Exec(&temperature.CreateDate, &temperature.DataDate, &temperature.DataHour, &temperature.DataMinute, &temperature.Humidity, &temperature.LocationDesc, &temperature.Temperature, &temperature.Topic)
if err != nil {
fmt.Println("Exec failed:", err)
return
}
lastID, _ := res.LastInsertId()
fmt.Printf("Inserted Temperature with ID: %d\n", lastID)
}
//事务处理:保证数据一致性
func insertMoreDataByMySql(db *sql.DB,temperature *models.Temperature) {
tx, err := db.Begin()
if err != nil {
fmt.Println("Begin transaction failed:", err)
return
}
_, err = tx.Exec("UPDATE users SET age = ? WHERE id = ?", 35, 1)
if err != nil {
tx.Rollback() // 如果发生错误,回滚事务
fmt.Println("Exec failed:", err)
return
}
err = tx.Commit() // 提交事务
if err != nil {
fmt.Println("Commit failed:", err)
return
}
fmt.Println("Transaction committed successfully")
}
func dbTest() {
// 初始化数据库连接
DB, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test")
if err != nil {
fmt.Println("Error opening DB:", err)
return
}
defer DB.Close()
DB.SetConnMaxLifetime(time.Minute * 5)
DB.SetMaxOpenConns(50)
DB.SetMaxIdleConns(10)
// 测试数据库连接
if err := DB.Ping(); err != nil {
fmt.Println("Connection failed:", err)
return
}
fmt.Println("Connected successfully")
// 查询数据
rows, _ := DB.Query("SELECT id, name FROM users")
defer rows.Close()
for rows.Next() {
var id int
var name string
rows.Scan(&id, &name)
fmt.Printf("ID: %d, Name: %s\n", id, name)
}
// 插入数据
stmt, _ := DB.Prepare("INSERT INTO users(name, age) VALUES(?, ?)")
stmt.Exec("Jane Doe", 28)
// 删除数据
stmt, _ = DB.Prepare("DELETE FROM users WHERE id = ?")
stmt.Exec(1)
}
func timing(client mqtt.Client) {
//定时器10秒钟执行一次
ticker := time.NewTicker(5 * time.Second)
for {
time := <-ticker.C
fmt.Println("定时器====>", time.String())
if !client.IsConnectionOpen() {
client.Connect()
} else {
fmt.Println("client has connect")
//moreDBConnect()
db, err := getMySqlDB()
if err == nil {
queryOneDataByMySql(db)
//mydb.Close()
}
}
}
}
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
message := fmt.Sprintf("%s",msg.Payload())
topic := fmt.Sprintf("%s",msg.Topic())
fmt.Println(message)
result := strings.Split(message, " ")
fmt.Println(result[0])
fmt.Println(result[1])
now := time.Now()
temperature := new(models.Temperature)
temperature.CreateDate=now
temperature.DataDate=now.Format("2006-01-02")
temperature.DataHour=now.Format("2006-01-02 15")
temperature.DataMinute=now.Format("2006-01-02 15:04")
temperature.Humidity=result[0]
temperature.Temperature=result[1]
temperature.Topic=topic
if topic=="WifiSHT/7C87CE9CA4E6/SHT20" {
temperature.LocationDesc="广东省珠海市高新区唐家湾镇东岸村水风三街28号501"
}
if topic=="WifiSHT/7C87CE9F5CBF/SHT20" {
temperature.LocationDesc="广东省珠海市金湾区三灶镇百川路1号1栋1单元1508房"
}
if topic=="WifiSHT/4CEBD686B6AA/SHT20" {
temperature.LocationDesc="广西壮族自治区崇左市天等县天等镇荣华村弄在屯113号"
}
db, err := getMySqlDB()
if err == nil {
insertDataByMySql(db,temperature)
//mydb.Close()
}
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func sub(client mqtt.Client) {
topic := "gomqtt01"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func subTemperature(client mqtt.Client) {
topic := "WifiSHT/+/SHT20"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
func publish(client mqtt.Client) {
num := 999999000000000000
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("gomqtt01", 0, false, text)
token.Wait()
fmt.Println(i)
time.Sleep(6 * time.Second)
}
}
func main() {
uuid := uuid.New()
fmt.Println("Generated UUID:", uuid)
var broker = "47.242.184.139"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID(uuid.String())
opts.SetUsername("admin")
opts.SetPassword("publish452131wW452131wW$")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
//client.IsConnectionOpen()
go timing(client)
//sub(client)
subTemperature(client)
publish(client)
//client.Disconnect(1000)
}