business_admin.go 3.5 KB
package handle

import (
	"encoding/json"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/position/service"
	"strconv"

	"github.com/Shopify/sarama"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/company"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/department"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/user"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
)

func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
	var receivedMsg domain.ReceivedMessage
	err := json.Unmarshal(msgData.Value, &receivedMsg)
	if err != nil {
		log.Logger.Error("解析ReceivedMessage 失败" + err.Error())
		return err
	}
	log.Logger.Debug("解析message:", map[string]interface{}{"data": receivedMsg})
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return err
	}
	_ = transactionContext.StartTransaction()

	defer func() {

		_ = transactionContext.RollbackTransaction()

	}()

	msgRepo := factory.CreateReceivedMessageRepository(map[string]interface{}{
		"transactionContext": transactionContext,
	})
	oldMsg, err := msgRepo.FindMessage(receivedMsg.MessageId)
	if err != nil {
		log.Logger.Error(" 查询旧消息发生错误  " + err.Error())
		log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
		return err
	}
	if oldMsg.MessageId == receivedMsg.MessageId {
		log.Logger.Info("消息重复,message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
		return nil
	}
	var msgBody domain.MessageBody
	err = json.Unmarshal([]byte(receivedMsg.MessageBody), &msgBody)
	if err != nil {
		log.Logger.Error("解析messageBody发生错误" + err.Error())
		return err
	}

	switch msgBody.Module {
	case "company":
		companySrv := company.SyncDataCompanyService{}
		err = companySrv.FromBusinessAdmin(&msgBody)
		if err != nil {
			log.Logger.Error("处理company消息失败"+err.Error(), map[string]interface{}{
				"module": msgBody.Module,
				"action": msgBody.Action,
				"data":   string(msgBody.Data),
			})
			return err
		}
	case "department":
		departmentSrv := department.SyncDataDepartmentService{}
		err = departmentSrv.FromBusinessAdmin(&msgBody)
		if err != nil {
			log.Logger.Error("处理department消息失败"+err.Error(), map[string]interface{}{
				"module": msgBody.Module,
				"action": msgBody.Action,
				"data":   string(msgBody.Data),
			})
			return err
		}
	case "employee":
		employeeSrv := user.SyncDataUserService{}
		err = employeeSrv.FromBusinessAdmin(&msgBody)
		if err != nil {
			log.Logger.Error("处理employee消息失败"+err.Error(), map[string]interface{}{
				"module": msgBody.Module,
				"action": msgBody.Action,
				"data":   string(msgBody.Data),
			})
			return err
		}
	case "position":
		positionSrv := service.SyncDataPositionService{}
		err = positionSrv.FromBusinessAdmin(&msgBody)
		if err != nil {
			log.Logger.Error("处理position消息失败"+err.Error(), map[string]interface{}{
				"module": msgBody.Module,
				"action": msgBody.Action,
				"data":   string(msgBody.Data),
			})
			return err
		}
	}
	err = msgRepo.SaveMessage(&receivedMsg)
	if err != nil {
		log.Logger.Error(" 保存新消息发生错误  " + err.Error())
		log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
	}
	_ = transactionContext.CommitTransaction()
	return nil
}