consumer_test.go
2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package message
import (
"fmt"
"github.com/Shopify/sarama"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils"
"log"
"testing"
)
// 消息持久化
func TestNewConsumer(t *testing.T) {
consumer := NewConsumer(constant.KAFKA_HOSTS, "0")
consumer.WithMessageReceiver(NewPgMessageReceiverRepository(transaction.NewPGTransactionContext(pg.DB)))
consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error {
m, ok := message.(*sarama.Message)
if !ok {
return nil
}
if len(m.Value) > 0 {
var msg models.Message
utils.JsonUnmarshal(string(m.Value), &msg)
t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value)
}
return nil
})
consumer.StartConsume()
}
// 消息不需要持久化
func TestNewConsumerNoRepository(t *testing.T) {
consumer := NewConsumer(constant.KAFKA_HOSTS, "0")
consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error {
m, ok := message.(*sarama.Message)
if !ok {
return nil
}
if len(m.Value) > 0 {
var msg models.Message
utils.JsonUnmarshal(string(m.Value), &msg)
t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value)
}
return nil
})
consumer.StartConsume()
}
type PgMessageReceiverRepository struct {
transactionContext *transaction.TransactionContext
}
func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository {
return &PgMessageReceiverRepository{
transactionContext: transactionContext,
}
}
func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error {
var num int
checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?`
_, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"])
if err != nil {
return err
}
if num > 0 {
return fmt.Errorf("receive repeate message [%v]", params)
}
sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)`
_, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"])
return err
}
func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error {
log.Println(params)
_, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"])
return err
}