pushMessageCommmand.go 4.2 KB
package messageHandler

import (
	"github.com/Shopify/sarama"
	"github.com/linmadan/egglib-go/core/application"
	"github.com/linmadan/egglib-go/utils/json"
	"gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/factory"
	"gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/message/command"
	"gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/application/message/service"
	. "gitlab.fjmaimaimai.com/mmm-go-pp/egglib-go-demos/pkg/log"
)

func PushMessageCommandHandler(message *sarama.ConsumerMessage) error {
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return err
	}
	transactionContext.StartTransaction()
	transactionIsSucceed := false
	defer func() {
		if transactionIsSucceed {
			transactionContext.CommitTransaction()
		} else {
			transactionContext.RollbackTransaction()
		}
	}()
	messageReceiverOptions := make(map[string]interface{})
	localMessageOptions := make(map[string]interface{})
	localMessageOptions["storeType"] = "pg"
	localMessageOptions["converterType"] = "sarama"
	storeOptions := make(map[string]interface{})
	storeOptions["transactionContext"] = transactionContext
	localMessageOptions["storeOptions"] = storeOptions
	messageReceiverOptions["localMessageOptions"] = localMessageOptions
	if receiver, err := factory.CreateMessageReceiver(messageReceiverOptions); err != nil {
		return err
	} else {
		message, isRepeat, err := receiver.ReceiveMessage(message, nil)
		if err != nil {
			return err
		}
		if isRepeat {
			var append = make(map[string]interface{})
			append["messageId"] = message.MessageId
			Logger.Warn("接收到重复消息")
		} else {
			/*填充业务 开始*/
			pushMessageCommand := &command.PushMessageCommand{}
			json.Unmarshal([]byte(message.MessageBody), pushMessageCommand)
			messageServiceOptions := make(map[string]interface{})
			messageServiceOptions["transactionContext"] = transactionContext
			messageService := service.NewMessageService(messageServiceOptions)
			if _, err := messageService.PushMessage(pushMessageCommand); err != nil {
				return err
			}
			/*填充业务 结束*/
			if err := receiver.ConfirmReceive(message); err != nil {
				return err
			}
		}
	}
	transactionIsSucceed = true
	return nil
}

func PgWrapperConsumerHandler(workHandler func(*application.Message, application.TransactionContext) error) func(*sarama.ConsumerMessage) error {
	return func(message *sarama.ConsumerMessage) error {
		transactionContext, err := factory.CreateTransactionContext(nil)
		if err != nil {
			return err
		}
		transactionContext.StartTransaction()
		transactionIsSucceed := false
		defer func() {
			if transactionIsSucceed {
				transactionContext.CommitTransaction()
			} else {
				transactionContext.RollbackTransaction()
			}
		}()
		messageReceiverOptions := make(map[string]interface{})
		localMessageOptions := make(map[string]interface{})
		localMessageOptions["storeType"] = "pg"
		localMessageOptions["converterType"] = "sarama"
		storeOptions := make(map[string]interface{})
		storeOptions["transactionContext"] = transactionContext
		localMessageOptions["storeOptions"] = storeOptions
		messageReceiverOptions["localMessageOptions"] = localMessageOptions
		if receiver, err := factory.CreateMessageReceiver(messageReceiverOptions); err != nil {
			return err
		} else {
			message, isRepeat, err := receiver.ReceiveMessage(message, nil)
			if err != nil {
				return err
			}
			if isRepeat {
				var append = make(map[string]interface{})
				append["messageId"] = message.MessageId
				Logger.Warn("接收到重复消息")
			} else {
				err = workHandler(message, transactionContext)
				if err := receiver.ConfirmReceive(message); err != nil {
					return err
				}
			}
		}
		transactionIsSucceed = true
		return nil
	}
}

func PushMessageHandler(message *application.Message, transactionContext application.TransactionContext) error {
	pushMessageCommand := &command.PushMessageCommand{}
	json.Unmarshal([]byte(message.MessageBody), pushMessageCommand)
	messageServiceOptions := make(map[string]interface{})
	messageServiceOptions["transactionContext"] = transactionContext
	messageService := service.NewMessageService(messageServiceOptions)
	if _, err := messageService.PushMessage(pushMessageCommand); err != nil {
		return err
	}
	return nil
}