作者 yangfu

wrapper handler

@@ -11,6 +11,9 @@ import ( @@ -11,6 +11,9 @@ import (
11 11
12 func main() { 12 func main() {
13 messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error) 13 messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
14 - messageHandlerMap[constant.PUSH_MESSAGE_TOPIC] = messageHandler.PushMessageCommandHandler 14 + // 方法1 直接引用
  15 + // messageHandlerMap[constant.PUSH_MESSAGE_TOPIC] = messageHandler.PushMessageCommandHandler
  16 + // 方法2 函数包装
  17 + messageHandlerMap[constant.PUSH_MESSAGE_TOPIC] = messageHandler.PgWrapperConsumerHandler(messageHandler.PushMessageHandler)
15 saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger) 18 saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger)
16 } 19 }
@@ -2,6 +2,7 @@ package messageHandler @@ -2,6 +2,7 @@ package messageHandler
2 2
3 import ( 3 import (
4 "github.com/Shopify/sarama" 4 "github.com/Shopify/sarama"
  5 + "github.com/linmadan/egglib-go/core/application"
5 "github.com/linmadan/egglib-go/utils/json" 6 "github.com/linmadan/egglib-go/utils/json"
6 "gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/factory" 7 "gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/factory"
7 "gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/message/command" 8 "gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/message/command"
@@ -61,3 +62,61 @@ func PushMessageCommandHandler(message *sarama.ConsumerMessage) error { @@ -61,3 +62,61 @@ func PushMessageCommandHandler(message *sarama.ConsumerMessage) error {
61 transactionIsSucceed = true 62 transactionIsSucceed = true
62 return nil 63 return nil
63 } 64 }
  65 +
  66 +func PgWrapperConsumerHandler(workHandler func(*application.Message, application.TransactionContext) error) func(*sarama.ConsumerMessage) error {
  67 + return func(message *sarama.ConsumerMessage) error {
  68 + transactionContext, err := factory.CreateTransactionContext(nil)
  69 + if err != nil {
  70 + return err
  71 + }
  72 + transactionContext.StartTransaction()
  73 + transactionIsSucceed := false
  74 + defer func() {
  75 + if transactionIsSucceed {
  76 + transactionContext.CommitTransaction()
  77 + } else {
  78 + transactionContext.RollbackTransaction()
  79 + }
  80 + }()
  81 + messageReceiverOptions := make(map[string]interface{})
  82 + localMessageOptions := make(map[string]interface{})
  83 + localMessageOptions["storeType"] = "pg"
  84 + localMessageOptions["converterType"] = "sarama"
  85 + storeOptions := make(map[string]interface{})
  86 + storeOptions["transactionContext"] = transactionContext
  87 + localMessageOptions["storeOptions"] = storeOptions
  88 + messageReceiverOptions["localMessageOptions"] = localMessageOptions
  89 + if receiver, err := factory.CreateMessageReceiver(messageReceiverOptions); err != nil {
  90 + return err
  91 + } else {
  92 + message, isRepeat, err := receiver.ReceiveMessage(message, nil)
  93 + if err != nil {
  94 + return err
  95 + }
  96 + if isRepeat {
  97 + var append = make(map[string]interface{})
  98 + append["messageId"] = message.MessageId
  99 + Logger.Warn("接收到重复消息")
  100 + } else {
  101 + err = workHandler(message, transactionContext)
  102 + if err := receiver.ConfirmReceive(message); err != nil {
  103 + return err
  104 + }
  105 + }
  106 + }
  107 + transactionIsSucceed = true
  108 + return nil
  109 + }
  110 +}
  111 +
  112 +func PushMessageHandler(message *application.Message, transactionContext application.TransactionContext) error {
  113 + pushMessageCommand := &command.PushMessageCommand{}
  114 + json.Unmarshal([]byte(message.MessageBody), pushMessageCommand)
  115 + messageServiceOptions := make(map[string]interface{})
  116 + messageServiceOptions["transactionContext"] = transactionContext
  117 + messageService := service.NewMessageService(messageServiceOptions)
  118 + if _, err := messageService.PushMessage(pushMessageCommand); err != nil {
  119 + return err
  120 + }
  121 + return nil
  122 +}