subscribe_client.go 2.7 KB
package mqtt

import (
	"fmt"
	pahomqtt "github.com/eclipse/paho.mqtt.golang"
	"github.com/linmadan/egglib-go/log"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
	"runtime/debug"
	"time"
)

type SubscribeClient struct {
	topic   string
	handler pahomqtt.MessageHandler
	client  pahomqtt.Client
	log     log.Logger
}

type MessageHandler pahomqtt.MessageHandler

func NewSubscribeClient(log log.Logger) *SubscribeClient {
	return &SubscribeClient{
		log: log,
	}
}

func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
	opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
	opts.SetUsername(constant.MQTT_USER)
	opts.SetPassword(constant.MQTT_PASSWORD)
	opts.SetKeepAlive(2 * time.Second)
	opts.SetPingTimeout(1 * time.Second)
	//opts.CleanSession = false
	opts.SetClientID(constant.SERVICE_NAME)
	//opts.Order = true
	return opts
}

func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
	opts := subscribeClient.options()
	subscribeClient.log.Info("mqtt start connect......")
	opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
		//defer func() {
		//	if r := recover(); r != nil {
		//		subscribeClient.log.Info(fmt.Sprintf("%v %s", r, debug.Stack()))
		//	}
		//}()
		subscribeClient.log.Info("mqtt connect lost,error:" + err.Error())
		//for {
		//	subscribeClient.log.Info("reconnect server")
		//	token := subscribeClient.client.Connect()
		//	token.Wait()
		//	subscribeClient.log.Info(fmt.Sprintf("server Connect status:%v", subscribeClient.client.IsConnectionOpen()))
		//	if subscribeClient.client.IsConnectionOpen() {
		//		break
		//	}
		//	time.Sleep(3 * time.Second)
		//}
	}
	opts.OnConnect = func(c pahomqtt.Client) {
		subscribeClient.log.Info("mqtt connected")
		c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
	}
	opts.OnReconnecting = func(client pahomqtt.Client, options *pahomqtt.ClientOptions) {
		subscribeClient.log.Info("mqtt reconnecting...")
	}
	subscribeClient.client = pahomqtt.NewClient(opts)
	token := subscribeClient.client.Connect()
	token.Wait()
	return subscribeClient
}

func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler) {
	subscribeClient.topic = topic
	subscribeClient.handler = messageHandler
	token := subscribeClient.client.Subscribe(topic, 0, messageHandler)
	token.Wait()
	token.Done()
}

func StartSubscribe(topic string, handler MessageHandler, log log.Logger) {
	defer func() {
		if err := recover(); err != nil {
			log.Error(fmt.Sprintf("%s", debug.Stack()))
			StartSubscribe(topic, handler, log)
		}
	}()
	log.Info("mqtt start subscribe...")
	NewSubscribeClient(log).Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
}