sarama.go
2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package sarama
import (
"fmt"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/port/sarama/messageHandler"
//"suplus-message/pkg/constant"
//"suplus-message/pkg/port/sarama/messageHandler"
)
type PgMessageReceiverRepository struct {
transactionContext *transaction.TransactionContext
}
func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository {
return &PgMessageReceiverRepository{
transactionContext: transactionContext,
}
}
func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error {
var num int
checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?`
_, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"])
if err != nil {
return err
}
if num > 0 {
return fmt.Errorf("receive repeate message [%v]", params)
}
sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)`
_, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"])
return err
}
func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error {
fmt.Println(params)
_, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"])
return err
}
func Run() {
var (
uCenterMessage = &messageHandler.UCenterMessageCommand{}
)
saramaConsumer := kafkax.NewSaramaConsumer(constant.KAFKA_HOSTS, constant.SERVICE_NAME)
saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, uCenterMessage.ChangePhoneHandler)
saramaConsumer.WithMessageReceiver(NewPgMessageReceiverRepository(nil)) // 持久化
err := saramaConsumer.StartConsume()
if err != nil {
log.Error(err)
}
}