|
|
package message
|
|
|
|
|
|
import (
|
|
|
"github.com/tiptok/gocomm/common"
|
|
|
"github.com/tiptok/gocomm/pkg/broker/kafkax"
|
|
|
"github.com/tiptok/gocomm/pkg/broker/local"
|
|
|
"github.com/tiptok/gocomm/pkg/broker/models"
|
|
|
"github.com/tiptok/gocomm/pkg/log"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/domain"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/infrastructure/pg"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
KAFKA_HOSTS = "106.52.15.41:9092" // "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.52.15.41:9092" //
|
|
|
GroupId = "godevp_test" // "calf_basic_test" //"godevp"
|
|
|
TOPIC_BUSSINESS_ADMIN = "mmm_business_test"
|
|
|
)
|
|
|
|
|
|
func RunConsumer() {
|
|
|
saramaConsumer := kafkax.NewSaramaConsumer(KAFKA_HOSTS, GroupId) // models.WithVersion("0.10.2.1")
|
|
|
saramaConsumer.WithTopicHandler(TOPIC_BUSSINESS_ADMIN, UserLoginHandler)
|
|
|
saramaConsumer.WithMessageReceiver(local.NewPgMessageReceiverRepository(pg.DB, nil)) // 持久化
|
|
|
|
|
|
err := saramaConsumer.StartConsume()
|
|
|
if err != nil {
|
|
|
log.Error(err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func UserLoginHandler(message interface{}) error {
|
|
|
msg, ok := message.(*models.Message)
|
|
|
if ok {
|
|
|
var user = &domain.Users{}
|
|
|
common.JsonUnmarshal(string(msg.Value), user)
|
|
|
log.Info("消费消息:", msg.Id, msg.Topic, msg.Value)
|
|
|
log.Info("登录用户信息:", user.Id, user.Name)
|
|
|
}
|
|
|
return nil
|
|
|
} |
...
|
...
|
|