|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/port/sarama/messageHandler"
|
|
|
//"suplus-message/pkg/constant"
|
|
|
//"suplus-message/pkg/port/sarama/messageHandler"
|
|
|
)
|
|
|
|
|
|
type PgMessageReceiverRepository struct {
|
|
|
transactionContext *transaction.TransactionContext
|
|
|
}
|
|
|
|
|
|
func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository {
|
|
|
return &PgMessageReceiverRepository{
|
|
|
transactionContext: transactionContext,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error {
|
|
|
var num int
|
|
|
checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?`
|
|
|
_, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"])
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
if num > 0 {
|
|
|
return fmt.Errorf("receive repeate message [%v]", params)
|
|
|
}
|
|
|
|
|
|
sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)`
|
|
|
_, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"])
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error {
|
|
|
fmt.Println(params)
|
|
|
_, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"])
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
func Run() {
|
|
|
var (
|
|
|
ucenterMessage = &messageHandler.UcenterMessageCommand{}
|
|
|
uCenterMessage = &messageHandler.UCenterMessageCommand{}
|
|
|
)
|
|
|
|
|
|
saramaConsumer := kafkax.NewSaramaConsumer(constant.KAFKA_HOSTS, constant.SERVICE_NAME)
|
|
|
saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, ucenterMessage.ChangePhoneHandler)
|
|
|
saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, uCenterMessage.ChangePhoneHandler)
|
|
|
saramaConsumer.WithMessageReceiver(NewPgMessageReceiverRepository(nil)) // 持久化
|
|
|
|
|
|
err := saramaConsumer.StartConsume()
|
|
|
if err != nil {
|
...
|
...
|
|