作者 tangxvhui

更新

@@ -14,17 +14,17 @@ import ( @@ -14,17 +14,17 @@ import (
14 type SyncDataCompanyService struct { 14 type SyncDataCompanyService struct {
15 } 15 }
16 16
17 -type BusinessAdminCommand struct {  
18 - //company:公司  
19 - Module string `json:"module"`  
20 - // add:添加,edit:编辑,setCompanyCharge:更改公司主管,changeAdmin换管理员  
21 - Action string `json:"action"`  
22 - // 具体的对象JSON数据  
23 - Datas json.RawMessage `json:"data"`  
24 -} 17 +// type BusinessAdminCommand struct {
  18 +// //company:公司
  19 +// Module string `json:"module"`
  20 +// // add:添加,edit:编辑,setCompanyCharge:更改公司主管,changeAdmin换管理员
  21 +// Action string `json:"action"`
  22 +// // 具体的对象JSON数据
  23 +// Datas json.RawMessage `json:"data"`
  24 +// }
25 25
26 //从BusinessAdmins 接收消息,变更公司数据 26 //从BusinessAdmins 接收消息,变更公司数据
27 -func (c SyncDataCompanyService) FromBusinessAdminCompany(param *BusinessAdminCommand) error { 27 +func (c SyncDataCompanyService) FromBusinessAdmin(param *domain.MessageBody) error {
28 action := param.Module + "/" + param.Action 28 action := param.Module + "/" + param.Action
29 var err error 29 var err error
30 switch action { 30 switch action {
@@ -12,16 +12,16 @@ import ( @@ -12,16 +12,16 @@ import (
12 12
13 type SyncDataDepartmentService struct{} 13 type SyncDataDepartmentService struct{}
14 14
15 -type BusinessAdminCommand struct {  
16 - // department:部门  
17 - Module string `json:"module"`  
18 - // add:添加,edit:编辑,batchDelete:批量删除,import:导入部门  
19 - Action string `json:"action"`  
20 - // 具体的对象JSON数据  
21 - Datas json.RawMessage `json:"data"`  
22 -} 15 +// type BusinessAdminCommand struct {
  16 +// // department:部门
  17 +// Module string `json:"module"`
  18 +// // add:添加,edit:编辑,batchDelete:批量删除,import:导入部门
  19 +// Action string `json:"action"`
  20 +// // 具体的对象JSON数据
  21 +// Datas json.RawMessage `json:"data"`
  22 +// }
23 23
24 -func (srv SyncDataDepartmentService) FromBusinessAdminCompany(param BusinessAdminCommand) error { 24 +func (srv SyncDataDepartmentService) FromBusinessAdmin(param *domain.MessageBody) error {
25 action := param.Module + "/" + param.Action 25 action := param.Module + "/" + param.Action
26 var err error 26 var err error
27 switch action { 27 switch action {
@@ -35,3 +35,11 @@ func CreateDepartmentRepository(options map[string]interface{}) domain.Departmen @@ -35,3 +35,11 @@ func CreateDepartmentRepository(options map[string]interface{}) domain.Departmen
35 } 35 }
36 return repository.NewDepartmentRepository(transactionContext) 36 return repository.NewDepartmentRepository(transactionContext)
37 } 37 }
  38 +
  39 +func CreateReceivedMessageRepository(options map[string]interface{}) domain.ReceivedMessageRepository {
  40 + var transactionContext *pg.TransactionContext
  41 + if value, ok := options["transactionContext"]; ok {
  42 + transactionContext = value.(*pg.TransactionContext)
  43 + }
  44 + return repository.NewReceivedMessageRepository(transactionContext)
  45 +}
@@ -14,16 +14,16 @@ import ( @@ -14,16 +14,16 @@ import (
14 14
15 type SyncDataUserService struct{} 15 type SyncDataUserService struct{}
16 16
17 -type BusinessAdminCommand struct {  
18 - // employee:员工  
19 - Module string `json:"module"`  
20 - // add:添加,edit:编辑,batchDelete:批量删除,batchForbid:批量禁用用户,batchRemove:批量更改用户部门,import:导入用户  
21 - Action string `json:"action"`  
22 - // 具体的对象JSON数据  
23 - Datas json.RawMessage `json:"data"`  
24 -} 17 +// type BusinessAdminCommand struct {
  18 +// // employee:员工
  19 +// Module string `json:"module"`
  20 +// // add:添加,edit:编辑,batchDelete:批量删除,batchForbid:批量禁用用户,batchRemove:批量更改用户部门,import:导入用户
  21 +// Action string `json:"action"`
  22 +// // 具体的对象JSON数据
  23 +// Datas json.RawMessage `json:"data"`
  24 +// }
25 25
26 -func (srv SyncDataUserService) FromBusinessAdminCompany(param *BusinessAdminCommand) error { 26 +func (srv SyncDataUserService) FromBusinessAdmin(param *domain.MessageBody) error {
27 action := param.Module + "/" + param.Action 27 action := param.Module + "/" + param.Action
28 var err error 28 var err error
29 switch action { 29 switch action {
1 package domain 1 package domain
2 2
3 -import "time" 3 +import (
  4 + "encoding/json"
  5 + "time"
  6 +)
4 7
5 type ReceivedMessage struct { 8 type ReceivedMessage struct {
6 MessageId int64 9 MessageId int64
@@ -10,6 +13,12 @@ type ReceivedMessage struct { @@ -10,6 +13,12 @@ type ReceivedMessage struct {
10 CreateAt time.Time 13 CreateAt time.Time
11 } 14 }
12 15
  16 +type MessageBody struct {
  17 + Module string `json:"module"`
  18 + Action string `json:"action"`
  19 + Datas json.RawMessage `json:"data"` // 具体的对象JSON数据
  20 +}
  21 +
13 type ReceivedMessageRepository interface { 22 type ReceivedMessageRepository interface {
14 SaveMessage(param *ReceivedMessage) error 23 SaveMessage(param *ReceivedMessage) error
15 FindMessage(id int64) (*ReceivedMessage, error) 24 FindMessage(id int64) (*ReceivedMessage, error)
@@ -30,6 +30,7 @@ func init() { @@ -30,6 +30,7 @@ func init() {
30 &models.Company{}, 30 &models.Company{},
31 &models.Department{}, 31 &models.Department{},
32 &models.User{}, 32 &models.User{},
  33 + &models.ReceivedMessage{},
33 } 34 }
34 for _, model := range tables { 35 for _, model := range tables {
35 err := DB.Model(model).CreateTable(&orm.CreateTableOptions{ 36 err := DB.Model(model).CreateTable(&orm.CreateTableOptions{
@@ -3,6 +3,7 @@ package repository @@ -3,6 +3,7 @@ package repository
3 import ( 3 import (
4 "time" 4 "time"
5 5
  6 + "github.com/go-pg/pg/v10"
6 pgTransaction "github.com/linmadan/egglib-go/transaction/pg" 7 pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain" 8 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
8 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/pg/models" 9 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/pg/models"
@@ -14,6 +15,12 @@ type ReceivedMessageRepository struct { @@ -14,6 +15,12 @@ type ReceivedMessageRepository struct {
14 15
15 var _ domain.ReceivedMessageRepository = (*ReceivedMessageRepository)(nil) 16 var _ domain.ReceivedMessageRepository = (*ReceivedMessageRepository)(nil)
16 17
  18 +func NewReceivedMessageRepository(tx *pgTransaction.TransactionContext) *ReceivedMessageRepository {
  19 + return &ReceivedMessageRepository{
  20 + transactionContext: tx,
  21 + }
  22 +}
  23 +
17 func (repo *ReceivedMessageRepository) SaveMessage(param *domain.ReceivedMessage) error { 24 func (repo *ReceivedMessageRepository) SaveMessage(param *domain.ReceivedMessage) error {
18 message := &models.ReceivedMessage{ 25 message := &models.ReceivedMessage{
19 MessageId: param.MessageId, 26 MessageId: param.MessageId,
@@ -31,9 +38,10 @@ func (repo *ReceivedMessageRepository) SaveMessage(param *domain.ReceivedMessage @@ -31,9 +38,10 @@ func (repo *ReceivedMessageRepository) SaveMessage(param *domain.ReceivedMessage
31 func (repo *ReceivedMessageRepository) FindMessage(messageId int64) (*domain.ReceivedMessage, error) { 38 func (repo *ReceivedMessageRepository) FindMessage(messageId int64) (*domain.ReceivedMessage, error) {
32 tx := repo.transactionContext.PgTx 39 tx := repo.transactionContext.PgTx
33 receivedMessageModel := new(models.ReceivedMessage) 40 receivedMessageModel := new(models.ReceivedMessage)
34 - query := tx.Model(receivedMessageModel).Where("message.id = ?", messageId) 41 + query := tx.Model(receivedMessageModel).
  42 + Where("message_id = ?", messageId)
35 if err := query.First(); err != nil { 43 if err := query.First(); err != nil {
36 - if err.Error() != "pg: no rows in result set" { 44 + if err != pg.ErrNoRows {
37 return nil, err 45 return nil, err
38 } 46 }
39 } 47 }
  1 +package handle
  2 +
  3 +import (
  4 + "encoding/json"
  5 + "strconv"
  6 +
  7 + "github.com/Shopify/sarama"
  8 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/company"
  9 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/department"
  10 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
  11 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/user"
  12 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
  13 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
  14 +)
  15 +
  16 +func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
  17 + var receivedMsg domain.ReceivedMessage
  18 + err := json.Unmarshal(msgData.Value, &receivedMsg)
  19 + if err != nil {
  20 + return err
  21 + }
  22 + transactionContext, err := factory.CreateTransactionContext(nil)
  23 + if err != nil {
  24 + return err
  25 + }
  26 + transactionContext.StartTransaction()
  27 + var transactionIsSucceed bool
  28 + defer func() {
  29 + if transactionIsSucceed {
  30 + transactionContext.CommitTransaction()
  31 + } else {
  32 + transactionContext.RollbackTransaction()
  33 + }
  34 + }()
  35 +
  36 + msgRepo := factory.CreateReceivedMessageRepository(map[string]interface{}{
  37 + "transactionContext": transactionContext,
  38 + })
  39 + oldMsg, err := msgRepo.FindMessage(receivedMsg.MessageId)
  40 + if err != nil {
  41 + log.Logger.Error(" 查询旧消息发生错误 " + err.Error())
  42 + log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
  43 + return err
  44 + }
  45 + if oldMsg.MessageId == receivedMsg.MessageId {
  46 + log.Logger.Info("消息重复,message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
  47 + return nil
  48 + }
  49 + var msgBody domain.MessageBody
  50 + err = json.Unmarshal([]byte(receivedMsg.MessageBody), &msgBody)
  51 + if err != nil {
  52 + log.Logger.Error("解析messageBody发生错误" + err.Error())
  53 + return err
  54 + }
  55 +
  56 + switch msgBody.Module {
  57 + case "company":
  58 + companySrv := company.SyncDataCompanyService{}
  59 + err = companySrv.FromBusinessAdmin(&msgBody)
  60 + if err != nil {
  61 + log.Logger.Error("处理company消息失败"+err.Error(), map[string]interface{}{
  62 + "data": msgBody,
  63 + })
  64 + return err
  65 + }
  66 + case "department":
  67 + departmentSrv := department.SyncDataDepartmentService{}
  68 + err = departmentSrv.FromBusinessAdmin(&msgBody)
  69 + if err != nil {
  70 + log.Logger.Error("处理department消息失败"+err.Error(), map[string]interface{}{
  71 + "data": msgBody,
  72 + })
  73 + return err
  74 + }
  75 + case "employee":
  76 + employeeSrv := user.SyncDataUserService{}
  77 + err = employeeSrv.FromBusinessAdmin(&msgBody)
  78 + if err != nil {
  79 + log.Logger.Error("处理employee消息失败"+err.Error(), map[string]interface{}{
  80 + "data": msgBody,
  81 + })
  82 + return err
  83 + }
  84 + }
  85 + err = msgRepo.SaveMessage(&receivedMsg)
  86 + if err != nil {
  87 + log.Logger.Error(" 保存新消息发生错误 " + err.Error())
  88 + log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
  89 + }
  90 + return nil
  91 +}
@@ -5,12 +5,14 @@ import ( @@ -5,12 +5,14 @@ import (
5 saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama" 5 saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama"
6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant" 6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log" 7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
  8 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle"
8 ) 9 )
9 10
10 func Run() { 11 func Run() {
11 messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error) 12 messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
12 messageHandlerMap["demo-v1"] = Demo 13 messageHandlerMap["demo-v1"] = Demo
13 - 14 + //"指定topic" => 对应的处理方法
  15 + messageHandlerMap["mmm-business-admin-dev"] = handle.SyncDataBusinessAdmin
14 err := saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger) 16 err := saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger)
15 log.Logger.Error(err.Error()) 17 log.Logger.Error(err.Error())
16 } 18 }