zhaocaimao/service/mqtt.go

120 lines
4.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"encoding/json"
"errors"
"fmt"
"time"
"zhaoCaiMao/vo"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/guid"
)
type MQTT struct {
client mqtt.Client
}
var MQTTService MQTT
/*
func (t *MQTT) InitDriverMQTT() error {
ip := g.Cfg().MustGet(nil, "driverMQTT.ip") //订单MQTT
port := g.Cfg().MustGet(nil, "driverMQTT.port")
// 创建MQTT客户端选项
var broker = server_ip
var port = server_port
opts := mqtt.NewClientOptions() // 创建MQTT客户端选项
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) // 添加MQTT broker地址和端口
userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId
password := HmacSha1(AccessKey_Secret, ClientId)
opts.SetClientID(ClientId) // 设置客户端ID
opts.SetUsername(userName) // 设置用户名用于MQTT的身份验证
opts.SetPassword(string(Decode(password))) // 设置密码用于MQTT的身份验证
opts.SetDefaultPublishHandler(AliMessagePubHandler) // 设置默认的消息发布处理函数
opts.SetAutoReconnect(true) //客户端与MQTT代理的连接断开时,是否自动尝试重新连接
opts.SetMaxReconnectInterval(3) //客户端在两次连续重连尝试之间的最大时间间隔秒
opts.OnConnect = AliConnectHandler // 设置连接成功的处理函数
opts.OnConnectionLost = AliConnectLostHandler // 设置连接丢失的处理函数
fmt.Println("opts:", opts)
alimc = mqtt.NewClient(opts) // 创建新的MQTT客户端实例
if token := alimc.Connect(); token.Wait() && token.Error() != nil { // 尝试连接到MQTT broker
fmt.Println("-------------------------")
fmt.Println("链接异常:", token.Error().Error())
panic(token.Error()) // 如果连接失败,则抛出异常
}
}
*/
func (t *MQTT) InitOrderMQTT() error {
ip := g.Cfg().MustGet(nil, "orderMQTT.ip") //订单MQTT
port := g.Cfg().MustGet(nil, "orderMQTT.port")
/*
user := g.Cfg().MustGet(nil, "MQTT.user")
passwd := g.Cfg().MustGet(nil, "MQTT.passwd")
*/
opts := mqtt.NewClientOptions()
mqttAddress := fmt.Sprintf("mqtt://%s:%d", ip.String(), port.Int())
g.Log().Line().Print(nil, "订单MQTT连接地址:", mqttAddress)
opts.AddBroker(mqttAddress)
opts.SetKeepAlive(time.Hour * 24)
opts.SetPingTimeout(time.Second * 30)
opts.SetClientID(guid.S())
//opts.SetUsername(user.String()) //使用用户名和密码
//opts.SetPassword(passwd.String())
opts.SetDefaultPublishHandler(t.reciveOrderMQTTHandler) // 设置消息回调处理函数
//opts.OnConnect = connectHandler
opts.OnConnectionLost = t.connectLostHandler
t.client = mqtt.NewClient(opts)
token := t.client.Connect()
if token.Wait() && token.Error() != nil {
err := errors.New("订单MQTT初始化失败:" + token.Error().Error())
g.Log().Line().Error(nil, err)
return err
}
payTopic := g.Cfg().MustGet(nil, "orderMQTT.topic").String()
//订阅支付订单消息
token = t.client.Subscribe(payTopic, 2, nil)
if !token.Wait() {
return token.Error()
}
return nil
}
func (t *MQTT) reciveOrderMQTTHandler(client mqtt.Client, msg mqtt.Message) {
logMsg := fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
fmt.Println(logMsg)
g.Log().Line().Print(nil, logMsg)
mqttOrder := vo.MQTTOrder{}
err := json.Unmarshal(msg.Payload(), &mqttOrder)
if err != nil {
g.Log().Line().Print(nil, err)
return
}
//t.createMsgToCat(&mqttOrder)
//to-----发送到招财猫设备
}
func (t *MQTT) connectLostHandler(client mqtt.Client, err error) {
token := t.client.Connect()
g.Log().Line().Print(nil, "订单MQTT 重新连接成功")
token.Wait()
}
/*
// 组织发送招财猫的报文
func (t *MQTT) createMsgToCat(mqttOrder *vo.MQTTOrder) {
mqttOrder.
mqttDevice := vo.MQTTDevice{}
mqttDevice.msgId = mqttOrder.msgId
if mqttOrder.Channel == 1 { //微信渠道
mqttdevice.Type = 1
} else { //支付宝渠道
mqttdevice.Type = 2
}
}
*/