package service import ( "encoding/json" "errors" "fmt" "time" "zhaoCaiMao/vo" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/guid" ) type MQTT struct { client mqtt.Client } var MQTTService MQTT /* func (t *MQTT) InitDriverMQTT() error { ip := g.Cfg().MustGet(nil, "driverMQTT.ip") //订单MQTT port := g.Cfg().MustGet(nil, "driverMQTT.port") // 创建MQTT客户端选项 var broker = server_ip var port = server_port opts := mqtt.NewClientOptions() // 创建MQTT客户端选项 opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) // 添加MQTT broker地址和端口 userName := "Signature" + "|" + AccessKey_ID + "|" + InstanceId password := HmacSha1(AccessKey_Secret, ClientId) opts.SetClientID(ClientId) // 设置客户端ID opts.SetUsername(userName) // 设置用户名(用于MQTT的身份验证) opts.SetPassword(string(Decode(password))) // 设置密码(用于MQTT的身份验证) opts.SetDefaultPublishHandler(AliMessagePubHandler) // 设置默认的消息发布处理函数 opts.SetAutoReconnect(true) //客户端与MQTT代理的连接断开时,是否自动尝试重新连接 opts.SetMaxReconnectInterval(3) //客户端在两次连续重连尝试之间的最大时间间隔秒 opts.OnConnect = AliConnectHandler // 设置连接成功的处理函数 opts.OnConnectionLost = AliConnectLostHandler // 设置连接丢失的处理函数 fmt.Println("opts:", opts) alimc = mqtt.NewClient(opts) // 创建新的MQTT客户端实例 if token := alimc.Connect(); token.Wait() && token.Error() != nil { // 尝试连接到MQTT broker fmt.Println("-------------------------") fmt.Println("链接异常:", token.Error().Error()) panic(token.Error()) // 如果连接失败,则抛出异常 } } */ func (t *MQTT) InitOrderMQTT() error { ip := g.Cfg().MustGet(nil, "orderMQTT.ip") //订单MQTT port := g.Cfg().MustGet(nil, "orderMQTT.port") /* user := g.Cfg().MustGet(nil, "MQTT.user") passwd := g.Cfg().MustGet(nil, "MQTT.passwd") */ opts := mqtt.NewClientOptions() mqttAddress := fmt.Sprintf("mqtt://%s:%d", ip.String(), port.Int()) g.Log().Line().Print(nil, "订单MQTT连接地址:", mqttAddress) opts.AddBroker(mqttAddress) opts.SetKeepAlive(time.Hour * 24) opts.SetPingTimeout(time.Second * 30) opts.SetClientID(guid.S()) //opts.SetUsername(user.String()) //使用用户名和密码 //opts.SetPassword(passwd.String()) opts.SetDefaultPublishHandler(t.reciveOrderMQTTHandler) // 设置消息回调处理函数 //opts.OnConnect = connectHandler opts.OnConnectionLost = t.connectLostHandler t.client = mqtt.NewClient(opts) token := t.client.Connect() if token.Wait() && token.Error() != nil { err := errors.New("订单MQTT初始化失败:" + token.Error().Error()) g.Log().Line().Error(nil, err) return err } payTopic := g.Cfg().MustGet(nil, "orderMQTT.topic").String() //订阅支付订单消息 token = t.client.Subscribe(payTopic, 2, nil) if !token.Wait() { return token.Error() } return nil } func (t *MQTT) reciveOrderMQTTHandler(client mqtt.Client, msg mqtt.Message) { logMsg := fmt.Sprintf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) fmt.Println(logMsg) g.Log().Line().Print(nil, logMsg) mqttOrder := vo.MQTTOrder{} err := json.Unmarshal(msg.Payload(), &mqttOrder) if err != nil { g.Log().Line().Print(nil, err) return } //t.createMsgToCat(&mqttOrder) //to-----发送到招财猫设备 } func (t *MQTT) connectLostHandler(client mqtt.Client, err error) { token := t.client.Connect() g.Log().Line().Print(nil, "订单MQTT 重新连接成功") token.Wait() } /* // 组织发送招财猫的报文 func (t *MQTT) createMsgToCat(mqttOrder *vo.MQTTOrder) { mqttOrder. mqttDevice := vo.MQTTDevice{} mqttDevice.msgId = mqttOrder.msgId if mqttOrder.Channel == 1 { //微信渠道 mqttdevice.Type = 1 } else { //支付宝渠道 mqttdevice.Type = 2 } } */