package service import ( "encoding/json" "errors" "fmt" "time" "yuleduiPay/service/vo" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/guid" ) type MQTT struct { client mqtt.Client } var MQTTService MQTT func (t *MQTT) InitMQTT() error { ip := g.Cfg().MustGet(nil, "MQTT.ip") port := g.Cfg().MustGet(nil, "MQTT.port") /* user := g.Cfg().MustGet(nil, "MQTT.user") passwd := g.Cfg().MustGet(nil, "MQTT.passwd") */ opts := mqtt.NewClientOptions() mqttAddress := fmt.Sprintf("mqtt://%s:%d", ip.String(), port.Int()) g.Log().Line().Print(nil, "MQTT连接地址:", mqttAddress) opts.AddBroker(mqttAddress) opts.SetKeepAlive(time.Hour * 24) opts.SetPingTimeout(time.Second * 30) opts.SetClientID(guid.S()) //opts.SetUsername(user.String()) //使用用户名和密码 //opts.SetPassword(passwd.String()) //opts.SetDefaultPublishHandler(messagePubHandler) // 设置消息回调处理函数 //opts.OnConnect = connectHandler opts.OnConnectionLost = t.connectLostHandler t.client = mqtt.NewClient(opts) token := t.client.Connect() if token.Wait() && token.Error() != nil { err := errors.New("MQTT初始化失败:" + token.Error().Error()) g.Log().Line().Error(nil, err) return err } return nil } // MQTT 推送订单结果消息 func (t *MQTT) Publish(msg *vo.MQTTOrder) error { msg.MsgId = guid.S() msg.SendTime = time.Now().Unix() jsonbytes, err := json.Marshal(msg) if err != nil { return err } token := t.client.Publish(msg.Topic, 2, false, string(jsonbytes)) tokenOk := token.Wait() if !tokenOk { //失败,获取错误原因 return token.Error() } return nil } func (t *MQTT) connectLostHandler(client mqtt.Client, err error) { token := t.client.Connect() g.Log().Line().Print(nil, "MQTT 重新连接成功") token.Wait() }