subscribe_client.go
2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package mqtt
import (
"fmt"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/linmadan/egglib-go/log"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"runtime/debug"
"time"
)
type SubscribeClient struct {
topic string
handler pahomqtt.MessageHandler
client pahomqtt.Client
log log.Logger
}
type MessageHandler pahomqtt.MessageHandler
func NewSubscribeClient(log log.Logger) *SubscribeClient {
return &SubscribeClient{
log: log,
}
}
func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
opts.SetUsername(constant.MQTT_USER)
opts.SetPassword(constant.MQTT_PASSWORD)
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
//opts.CleanSession = false
opts.SetClientID(constant.SERVICE_NAME)
//opts.Order = true
return opts
}
func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
opts := subscribeClient.options()
subscribeClient.log.Info("mqtt start connect......")
opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
//defer func() {
// if r := recover(); r != nil {
// subscribeClient.log.Info(fmt.Sprintf("%v %s", r, debug.Stack()))
// }
//}()
subscribeClient.log.Info("mqtt connect lost,error:" + err.Error())
//for {
// subscribeClient.log.Info("reconnect server")
// token := subscribeClient.client.Connect()
// token.Wait()
// subscribeClient.log.Info(fmt.Sprintf("server Connect status:%v", subscribeClient.client.IsConnectionOpen()))
// if subscribeClient.client.IsConnectionOpen() {
// break
// }
// time.Sleep(3 * time.Second)
//}
}
opts.OnConnect = func(c pahomqtt.Client) {
subscribeClient.log.Info("mqtt connected")
c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
}
opts.OnReconnecting = func(client pahomqtt.Client, options *pahomqtt.ClientOptions) {
subscribeClient.log.Info("mqtt reconnecting...")
}
subscribeClient.client = pahomqtt.NewClient(opts)
token := subscribeClient.client.Connect()
token.Wait()
return subscribeClient
}
func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler) {
subscribeClient.topic = topic
subscribeClient.handler = messageHandler
token := subscribeClient.client.Subscribe(topic, 0, messageHandler)
token.Wait()
token.Done()
}
func StartSubscribe(topic string, handler MessageHandler, log log.Logger) {
defer func() {
if err := recover(); err != nil {
log.Error(fmt.Sprintf("%s", debug.Stack()))
StartSubscribe(topic, handler, log)
}
}()
log.Info("mqtt start subscribe...")
NewSubscribeClient(log).Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
}