sarama.go 2.3 KB
package sarama

import (
	"fmt"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/port/sarama/messageHandler"
	//"suplus-message/pkg/constant"
	//"suplus-message/pkg/port/sarama/messageHandler"
)

type PgMessageReceiverRepository struct {
	transactionContext *transaction.TransactionContext
}

func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository {
	return &PgMessageReceiverRepository{
		transactionContext: transactionContext,
	}
}

func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error {
	var num int
	checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?`
	_, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"])
	if err != nil {
		return err
	}
	if num > 0 {
		return fmt.Errorf("receive repeate message [%v]", params)
	}

	sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)`
	_, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"])
	return err
}

func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error {
	fmt.Println(params)
	_, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"])
	return err
}

func Run() {
	var (
		uCenterMessage = &messageHandler.UCenterMessageCommand{}
	)

	saramaConsumer := kafkax.NewSaramaConsumer(constant.KAFKA_HOSTS, constant.SERVICE_NAME)
	saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, uCenterMessage.ChangePhoneHandler)
	saramaConsumer.WithMessageReceiver(NewPgMessageReceiverRepository(nil)) // 持久化

	err := saramaConsumer.StartConsume()
	if err != nil {
		log.Error(err)
	}
}