subscribe_client.go 2.3 KB
package mqtt

import (
	"fmt"
	pahomqtt "github.com/eclipse/paho.mqtt.golang"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
	"time"
)

type SubscribeClient struct {
	topic   string
	handler pahomqtt.MessageHandler
	client  pahomqtt.Client
}

type MessageHandler pahomqtt.MessageHandler

func NewSubscribeClient() *SubscribeClient {
	return &SubscribeClient{}
}

func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
	opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
	opts.SetUsername(constant.MQTT_USER)
	opts.SetPassword(constant.MQTT_PASSWORD)
	opts.SetKeepAlive(2 * time.Second)
	opts.SetPingTimeout(1 * time.Second)
	opts.CleanSession = false
	//opts.SetClientID("test")
	//opts.Order = true
	return opts
}

func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
	opts := subscribeClient.options()
	fmt.Println("start connect......")
	opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
		defer func() {
			if r := recover(); r != nil {
				log.Logger.Error(fmt.Sprintf("%v", r))
			}
		}()
		fmt.Println("Connect error:", err)
		for {
			fmt.Println("reconnect server")
			token := subscribeClient.client.Connect()
			token.Wait()
			fmt.Println("server Connect status:", subscribeClient.client.IsConnectionOpen())
			if subscribeClient.client.IsConnectionOpen() {
				break
			}
			time.Sleep(3 * time.Second)
		}
	}
	opts.OnConnect = func(c pahomqtt.Client) {
		c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
	}
	subscribeClient.client = pahomqtt.NewClient(opts)
	token := subscribeClient.client.Connect()
	token.Wait()
	return subscribeClient
}

func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler) {
	subscribeClient.topic = topic
	subscribeClient.handler = messageHandler
	token := subscribeClient.client.Subscribe(topic, 0, messageHandler)
	token.Wait()
	token.Done()
}

func StartSubscribe(topic string, handler MessageHandler) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err)
			StartSubscribe(topic, handler)
		}
	}()
	fmt.Println("start subscribe...")
	NewSubscribeClient().Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
}