作者 tangxvhui

消息处理

{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch file",
"type": "go",
"request": "launch",
"mode": "debug",
"buildFlags": "--tags=local",
"program": "./main.go"
},
]
}
\ No newline at end of file
... ...
... ... @@ -6,17 +6,17 @@ import (
)
type ReceivedMessage struct {
MessageId int64
MessageType string
MessageBody string
OccurredOn time.Time
CreateAt time.Time
MessageId int64 `json:"MessageId"`
MessageType string `json:"MessageType"`
MessageBody string `json:"MessageBody"`
OccurredOn time.Time `json:"OccurredOn"`
CreateAt time.Time `json:"-"`
}
type MessageBody struct {
Module string `json:"module"`
Action string `json:"action"`
Datas json.RawMessage `json:"data"` // 具体的对象JSON数据
Datas json.RawMessage `json:"datas"` // 具体的对象JSON数据
}
type ReceivedMessageRepository interface {
... ...
... ... @@ -15,22 +15,23 @@ import (
func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
var receivedMsg domain.ReceivedMessage
log.Logger.Debug(string(msgData.Value))
err := json.Unmarshal(msgData.Value, &receivedMsg)
if err != nil {
log.Logger.Error("解析ReceivedMessage 失败" + err.Error())
return err
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
transactionContext.StartTransaction()
var transactionIsSucceed bool
_ = transactionContext.StartTransaction()
defer func() {
if transactionIsSucceed {
transactionContext.CommitTransaction()
} else {
transactionContext.RollbackTransaction()
}
_ = transactionContext.RollbackTransaction()
}()
msgRepo := factory.CreateReceivedMessageRepository(map[string]interface{}{
... ... @@ -87,5 +88,6 @@ func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
log.Logger.Error(" 保存新消息发生错误 " + err.Error())
log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
}
_ = transactionContext.CommitTransaction()
return nil
}
... ...