package message

import (
	"github.com/tiptok/gocomm/common"
	"github.com/tiptok/gocomm/pkg/broker/kafkax"
	"github.com/tiptok/gocomm/pkg/broker/local"
	"github.com/tiptok/gocomm/pkg/broker/models"
	"github.com/tiptok/gocomm/pkg/log"
	"gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/domain"
	"gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/infrastructure/pg"
)

const (
	KAFKA_HOSTS           = "106.52.15.41:9092" // "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.52.15.41:9092" //
	GroupId               = "godevp_test"       // "calf_basic_test"                                          //"godevp"
	TOPIC_BUSSINESS_ADMIN = "mmm_business_test"
)

func RunConsumer() {
	saramaConsumer := kafkax.NewSaramaConsumer(KAFKA_HOSTS, GroupId) //models.WithHandlerOriginalMessageFlag(true)   models.WithVersion("0.10.2.1")
	saramaConsumer.WithTopicHandler(TOPIC_BUSSINESS_ADMIN, UserLoginHandler)
	saramaConsumer.WithMessageReceiver(local.NewPgMessageReceiverRepository(pg.DB, nil)) // 持久化

	err := saramaConsumer.StartConsume()
	if err != nil {
		log.Error(err)
	}
}

func UserLoginHandler(message interface{}) error {
	msg, ok := message.(*models.Message)
	if ok {
		var user = &domain.Users{}
		common.JsonUnmarshal(string(msg.Value), user)
		log.Info("消费消息:", msg.Id, msg.Topic, msg.Value)
		log.Info("登录用户信息:", user.Id, user.Name)
	}
	return nil
}