subscribe_client.go
2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package mqtt
import (
"fmt"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"time"
)
type SubscribeClient struct {
topic string
handler pahomqtt.MessageHandler
client pahomqtt.Client
}
type MessageHandler pahomqtt.MessageHandler
func NewSubscribeClient() *SubscribeClient {
return &SubscribeClient{}
}
func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
opts.SetUsername(constant.MQTT_USER)
opts.SetPassword(constant.MQTT_PASSWORD)
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.CleanSession = false
//opts.SetClientID("test")
//opts.Order = true
return opts
}
func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
opts := subscribeClient.options()
fmt.Println("start connect......")
opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
defer func() {
if r := recover(); r != nil {
log.Logger.Error(fmt.Sprintf("%v", r))
}
}()
fmt.Println("Connect error:", err)
for {
fmt.Println("reconnect server")
token := subscribeClient.client.Connect()
token.Wait()
fmt.Println("server Connect status:", subscribeClient.client.IsConnectionOpen())
if subscribeClient.client.IsConnectionOpen() {
break
}
time.Sleep(3 * time.Second)
}
}
opts.OnConnect = func(c pahomqtt.Client) {
c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
}
subscribeClient.client = pahomqtt.NewClient(opts)
token := subscribeClient.client.Connect()
token.Wait()
return subscribeClient
}
func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler) {
subscribeClient.topic = topic
subscribeClient.handler = messageHandler
token := subscribeClient.client.Subscribe(topic, 0, messageHandler)
token.Wait()
token.Done()
}
func StartSubscribe(topic string, handler MessageHandler) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
StartSubscribe(topic, handler)
}
}()
fmt.Println("start subscribe...")
NewSubscribeClient().Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
}