2024-11-14 10:07:39 +08:00
|
|
|
|
package service
|
|
|
|
|
|
|
|
|
|
import (
|
2024-11-25 14:14:16 +08:00
|
|
|
|
"encoding/json"
|
2024-11-14 10:07:39 +08:00
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"time"
|
|
|
|
|
"zhaoCaiMao/vo"
|
|
|
|
|
|
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
"github.com/gogf/gf/v2/util/guid"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type MQTT struct {
|
|
|
|
|
client mqtt.Client
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var MQTTService MQTT
|
|
|
|
|
|
2024-11-25 14:14:16 +08:00
|
|
|
|
/*
|
|
|
|
|
func (t *MQTT) InitDriverMQTT() error {
|
|
|
|
|
ip := g.Cfg().MustGet(nil, "driverMQTT.ip") //订单MQTT
|
|
|
|
|
port := g.Cfg().MustGet(nil, "driverMQTT.port")
|
|
|
|
|
// 创建MQTT客户端选项
|
|
|
|
|
var broker = server_ip
|
|
|
|
|
var port = server_port
|
|
|
|
|
opts := mqtt.NewClientOptions() // 创建MQTT客户端选项
|
|
|
|
|
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) // 添加MQTT broker地址和端口
|
|
|
|
|
userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId
|
|
|
|
|
password := HmacSha1(AccessKey_Secret, ClientId)
|
|
|
|
|
opts.SetClientID(ClientId) // 设置客户端ID
|
|
|
|
|
opts.SetUsername(userName) // 设置用户名(用于MQTT的身份验证)
|
|
|
|
|
opts.SetPassword(string(Decode(password))) // 设置密码(用于MQTT的身份验证)
|
|
|
|
|
opts.SetDefaultPublishHandler(AliMessagePubHandler) // 设置默认的消息发布处理函数
|
|
|
|
|
opts.SetAutoReconnect(true) //客户端与MQTT代理的连接断开时,是否自动尝试重新连接
|
|
|
|
|
opts.SetMaxReconnectInterval(3) //客户端在两次连续重连尝试之间的最大时间间隔秒
|
|
|
|
|
opts.OnConnect = AliConnectHandler // 设置连接成功的处理函数
|
|
|
|
|
opts.OnConnectionLost = AliConnectLostHandler // 设置连接丢失的处理函数
|
|
|
|
|
fmt.Println("opts:", opts)
|
|
|
|
|
alimc = mqtt.NewClient(opts) // 创建新的MQTT客户端实例
|
|
|
|
|
if token := alimc.Connect(); token.Wait() && token.Error() != nil { // 尝试连接到MQTT broker
|
|
|
|
|
fmt.Println("-------------------------")
|
|
|
|
|
fmt.Println("链接异常:", token.Error().Error())
|
|
|
|
|
panic(token.Error()) // 如果连接失败,则抛出异常
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
func (t *MQTT) InitOrderMQTT() error {
|
|
|
|
|
ip := g.Cfg().MustGet(nil, "orderMQTT.ip") //订单MQTT
|
|
|
|
|
port := g.Cfg().MustGet(nil, "orderMQTT.port")
|
2024-11-14 10:07:39 +08:00
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
user := g.Cfg().MustGet(nil, "MQTT.user")
|
|
|
|
|
passwd := g.Cfg().MustGet(nil, "MQTT.passwd")
|
|
|
|
|
*/
|
|
|
|
|
opts := mqtt.NewClientOptions()
|
|
|
|
|
mqttAddress := fmt.Sprintf("mqtt://%s:%d", ip.String(), port.Int())
|
|
|
|
|
g.Log().Line().Print(nil, "订单MQTT连接地址:", mqttAddress)
|
|
|
|
|
opts.AddBroker(mqttAddress)
|
|
|
|
|
opts.SetKeepAlive(time.Hour * 24)
|
|
|
|
|
opts.SetPingTimeout(time.Second * 30)
|
|
|
|
|
opts.SetClientID(guid.S())
|
|
|
|
|
//opts.SetUsername(user.String()) //使用用户名和密码
|
|
|
|
|
//opts.SetPassword(passwd.String())
|
|
|
|
|
opts.SetDefaultPublishHandler(t.reciveOrderMQTTHandler) // 设置消息回调处理函数
|
|
|
|
|
//opts.OnConnect = connectHandler
|
|
|
|
|
opts.OnConnectionLost = t.connectLostHandler
|
|
|
|
|
t.client = mqtt.NewClient(opts)
|
|
|
|
|
token := t.client.Connect()
|
|
|
|
|
if token.Wait() && token.Error() != nil {
|
|
|
|
|
err := errors.New("订单MQTT初始化失败:" + token.Error().Error())
|
|
|
|
|
g.Log().Line().Error(nil, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-11-25 14:14:16 +08:00
|
|
|
|
payTopic := g.Cfg().MustGet(nil, "orderMQTT.topic").String()
|
2024-11-14 10:07:39 +08:00
|
|
|
|
//订阅支付订单消息
|
|
|
|
|
token = t.client.Subscribe(payTopic, 2, nil)
|
|
|
|
|
if !token.Wait() {
|
|
|
|
|
return token.Error()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
func (t *MQTT) reciveOrderMQTTHandler(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
logMsg := fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
|
|
|
|
|
fmt.Println(logMsg)
|
|
|
|
|
g.Log().Line().Print(nil, logMsg)
|
|
|
|
|
mqttOrder := vo.MQTTOrder{}
|
2024-11-25 14:14:16 +08:00
|
|
|
|
err := json.Unmarshal(msg.Payload(), &mqttOrder)
|
2024-11-14 10:07:39 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
g.Log().Line().Print(nil, err)
|
2024-11-25 14:14:16 +08:00
|
|
|
|
return
|
2024-11-14 10:07:39 +08:00
|
|
|
|
}
|
2024-11-25 14:14:16 +08:00
|
|
|
|
//t.createMsgToCat(&mqttOrder)
|
2024-11-14 10:07:39 +08:00
|
|
|
|
|
|
|
|
|
//to-----发送到招财猫设备
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (t *MQTT) connectLostHandler(client mqtt.Client, err error) {
|
|
|
|
|
token := t.client.Connect()
|
|
|
|
|
g.Log().Line().Print(nil, "订单MQTT 重新连接成功")
|
|
|
|
|
token.Wait()
|
|
|
|
|
}
|
2024-11-25 14:14:16 +08:00
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
// 组织发送招财猫的报文
|
|
|
|
|
func (t *MQTT) createMsgToCat(mqttOrder *vo.MQTTOrder) {
|
2024-11-14 10:07:39 +08:00
|
|
|
|
mqttOrder.
|
2024-11-25 14:14:16 +08:00
|
|
|
|
mqttDevice := vo.MQTTDevice{}
|
2024-11-14 10:07:39 +08:00
|
|
|
|
mqttDevice.msgId = mqttOrder.msgId
|
2024-11-25 14:14:16 +08:00
|
|
|
|
if mqttOrder.Channel == 1 { //微信渠道
|
2024-11-14 10:07:39 +08:00
|
|
|
|
mqttdevice.Type = 1
|
2024-11-25 14:14:16 +08:00
|
|
|
|
} else { //支付宝渠道
|
2024-11-14 10:07:39 +08:00
|
|
|
|
mqttdevice.Type = 2
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-25 14:14:16 +08:00
|
|
|
|
}
|
|
|
|
|
*/
|