yuleduiPay/service/mqtt.go

72 lines
1.8 KiB
Go

package service
import (
"encoding/json"
"errors"
"fmt"
"time"
"yuleduiPay/service/vo"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/guid"
)
type MQTT struct {
client mqtt.Client
}
var MQTTService MQTT
func (t *MQTT) InitMQTT() error {
ip := g.Cfg().MustGet(nil, "MQTT.ip")
port := g.Cfg().MustGet(nil, "MQTT.port")
/*
user := g.Cfg().MustGet(nil, "MQTT.user")
passwd := g.Cfg().MustGet(nil, "MQTT.passwd")
*/
opts := mqtt.NewClientOptions()
mqttAddress := fmt.Sprintf("mqtt://%s:%d", ip.String(), port.Int())
g.Log().Line().Print(nil, "MQTT连接地址:", mqttAddress)
opts.AddBroker(mqttAddress)
opts.SetKeepAlive(time.Hour * 24)
opts.SetPingTimeout(time.Second * 30)
opts.SetClientID(guid.S())
//opts.SetUsername(user.String()) //使用用户名和密码
//opts.SetPassword(passwd.String())
//opts.SetDefaultPublishHandler(messagePubHandler) // 设置消息回调处理函数
//opts.OnConnect = connectHandler
opts.OnConnectionLost = t.connectLostHandler
t.client = mqtt.NewClient(opts)
token := t.client.Connect()
if token.Wait() && token.Error() != nil {
err := errors.New("MQTT初始化失败:" + token.Error().Error())
g.Log().Line().Error(nil, err)
return err
}
return nil
}
// MQTT 推送订单结果消息
func (t *MQTT) Publish(msg *vo.MQTTOrder) error {
msg.MsgId = guid.S()
msg.SendTime = time.Now().Unix()
jsonbytes, err := json.Marshal(msg)
if err != nil {
return err
}
token := t.client.Publish(msg.Topic, 2, false, string(jsonbytes))
tokenOk := token.Wait()
if !tokenOk { //失败,获取错误原因
return token.Error()
}
return nil
}
func (t *MQTT) connectLostHandler(client mqtt.Client, err error) {
token := t.client.Connect()
g.Log().Line().Print(nil, "MQTT 重新连接成功")
token.Wait()
}