package handle import ( "encoding/json" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/position/service" "strconv" "github.com/Shopify/sarama" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/company" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/department" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/user" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log" ) func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error { var receivedMsg domain.ReceivedMessage err := json.Unmarshal(msgData.Value, &receivedMsg) if err != nil { log.Logger.Error("解析ReceivedMessage 失败" + err.Error()) return err } log.Logger.Debug("解析message:", map[string]interface{}{"data": receivedMsg}) transactionContext, err := factory.CreateTransactionContext(nil) if err != nil { return err } _ = transactionContext.StartTransaction() defer func() { _ = transactionContext.RollbackTransaction() }() msgRepo := factory.CreateReceivedMessageRepository(map[string]interface{}{ "transactionContext": transactionContext, }) oldMsg, err := msgRepo.FindMessage(receivedMsg.MessageId) if err != nil { log.Logger.Error(" 查询旧消息发生错误 " + err.Error()) log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10)) return err } if oldMsg.MessageId == receivedMsg.MessageId { log.Logger.Info("消息重复,message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10)) return nil } var msgBody domain.MessageBody err = json.Unmarshal([]byte(receivedMsg.MessageBody), &msgBody) if err != nil { log.Logger.Error("解析messageBody发生错误" + err.Error()) return err } switch msgBody.Module { case "company": companySrv := company.SyncDataCompanyService{} err = companySrv.FromBusinessAdmin(&msgBody) if err != nil { log.Logger.Error("处理company消息失败"+err.Error(), map[string]interface{}{ "module": msgBody.Module, "action": msgBody.Action, "data": string(msgBody.Data), }) return err } case "department": departmentSrv := department.SyncDataDepartmentService{} err = departmentSrv.FromBusinessAdmin(&msgBody) if err != nil { log.Logger.Error("处理department消息失败"+err.Error(), map[string]interface{}{ "module": msgBody.Module, "action": msgBody.Action, "data": string(msgBody.Data), }) return err } case "employee": employeeSrv := user.SyncDataUserService{} err = employeeSrv.FromBusinessAdmin(&msgBody) if err != nil { log.Logger.Error("处理employee消息失败"+err.Error(), map[string]interface{}{ "module": msgBody.Module, "action": msgBody.Action, "data": string(msgBody.Data), }) return err } case "position": positionSrv := service.SyncDataPositionService{} err = positionSrv.FromBusinessAdmin(&msgBody) if err != nil { log.Logger.Error("处理position消息失败"+err.Error(), map[string]interface{}{ "module": msgBody.Module, "action": msgBody.Action, "data": string(msgBody.Data), }) return err } } err = msgRepo.SaveMessage(&receivedMsg) if err != nil { log.Logger.Error(" 保存新消息发生错误 " + err.Error()) log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10)) } _ = transactionContext.CommitTransaction() return nil }