digital_platform_event_subscribe.go 8.8 KB
package service

import (
	"fmt"
	"github.com/linmadan/egglib-go/core/application"
	pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/broker"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
	"math"
	"time"
)

func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	if err := transactionContext.StartTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()

	var (
		dataChanged   = true
		structChanged = true
		ok            bool
		tableId       int
	)
	event := cmd.EventTable
	if tableId = resolveTableId(event); tableId == 0 {
		return nil, nil
	}
	if event.Type == domain.TableApplyOnEvent {
		dataChanged = false
	}
	var notifyData = &NotifyData{
		DataChanged:   dataChanged,
		StructChanged: structChanged,
		TableId:       tableId,
		Event:         event.Type.ToString(),
		Metadata:      event.Metadata,
	}
	// 表类型
	tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
	if table == nil && event.Table != nil {
		table = event.Table
		notifyData.CompanyId = table.Context.CompanyId
	}
	notifyData.SetType(event, table)

	// 依赖的表 \ 依赖的查询集合
	_, tables, err := tableRepository.Find(map[string]interface{}{"context": event.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
	if err != nil {
		return nil, err
	}
	tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
	tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
	tree := tableDependTree.Tree
	querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
	var mapTableQuerySet = make(map[int]*domain.QuerySet)
	if len(tree) > 0 {
		_, querySets, _ := querySetRepository.Find(map[string]interface{}{
			"types":        []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
			"bindTableIds": tree,
			"status":       domain.StatusOn,
		})
		for _, q := range querySets {
			mapTableQuerySet[q.QuerySetInfo.BindTableId] = q
		}
	}

	// 过滤出需要推送的表
	for i := range tree {
		table, ok = tableDependencyService.TableMap[tree[i]]
		if !ok {
			continue
		}
		if notifyData.CompanyId == 0 {
			notifyData.CompanyId = table.Context.CompanyId
		}
		switch table.TableType {
		case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
			if table.TableInfo != nil {
				applyOn := domain.ModuleDigitalCenter | domain.ModuleChartTemplate
				if table.TableInfo.ApplyOnModule&applyOn == 0 {
					continue
				}
			}
			break
		case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
			continue
		case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
			var querySet *domain.QuerySet
			if querySet, ok = mapTableQuerySet[tree[i]]; !ok {
				continue
			}
			// 不是当前的查询集。且状态为关闭的都补推送
			if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
				continue
			}
		}
		notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
	}
	// 包含自己
	if !exist(notifyData.TableAffectedList, tableId) {
		notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
	}
	// 通过消息队列发送
	if err = tableEventService.send(notifyData, tableEventService.sendBroker); err != nil {
		return nil, err
	}
	if err = transactionContext.CommitTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return nil, nil
}

func (tableEventService *TableEventService) send(notifyData *NotifyData, sendFunc func(*NotifyData) error) error {
	var err error
	if err = sendFunc(notifyData); err != nil {
		log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
		if t, ok := notifyData.Retry(); ok {
			tableEventService.TimingWheel.SetTimer(notifyData.Key(), &notifyData, t)
			log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
		}
	}
	return err
}

func (tableEventService *TableEventService) sendHttp(notifyData *NotifyData) error {
	var err error
	lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
	if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
		return err
	}
	return nil
}

func (tableEventService *TableEventService) sendBroker(notifyData *NotifyData) error {
	var err error
	if err = broker.Push(constant.KAFKA_HOST, constant.TOPIC_TABLE_DATA_SYNC, notifyData); err != nil {
		return err
	}
	return nil
}

func resolveTableId(event *domain.EventTable) (tableId int) {
	switch event.Type {
	case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent, domain.TableStructEditEvent:
		tableId = event.Table.TableId
	case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
		if event.QuerySet.Status != domain.StatusOn {
			return
		}
		if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
			return
		}
		tableId = event.QuerySet.QuerySetInfo.BindTableId
	case domain.QuerySetUpdateStatusEvent:
		if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
			return
		}
		tableId = event.QuerySet.QuerySetInfo.BindTableId
	case domain.TableApplyOnEvent:
		tableId = event.Table.TableId
	case domain.QuerySetDeleteEvent:
		tableId = event.Table.TableId
	}
	return tableId
}

func exist(idList []int, target int) bool {
	found := false
	for _, id := range idList {
		if id == target {
			found = true
		}
	}
	return found
}

type NotifyData struct {
	DataChanged       bool                   `json:"dataChanged"`       // 数据有变化
	StructChanged     bool                   `json:"structChanged"`     // 结构有变化
	TableId           int                    `json:"tableId"`           // 表ID
	TableType         string                 `json:"tableType"`         // 表类型:导入模块(主表,副表,分表) 拆解模块(方案、子过程、计算表) 计算模块(计算项,计算集)
	ObjectType        string                 `json:"objectType"`        // 导入模块、拆解模块、计算模块
	CompanyId         int                    `json:"companyId"`         // 公司
	Event             string                 `json:"event"`             // 事件名称
	TableAffectedList []int                  `json:"tableAffectedList"` // 级联影响到的表
	Metadata          map[string]interface{} `json:"metadata"`          // 元数据
	sendRetry         int
}

func (n *NotifyData) SetType(event *domain.EventTable, table *domain.Table) {
	var tableType string
	// 表类型
	if tableType == "" && table != nil {
		tableType = table.TableType
	}
	if tableType == "" && event.QuerySet != nil {
		tableType = event.QuerySet.Type
	}
	n.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
	if table != nil {
		switch domain.TableType(tableType) {
		case domain.MainTable, domain.SubTable, domain.SideTable:
			n.ObjectType = "导入模块"
		case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
			n.ObjectType = "拆解模块"
		case domain.CalculateItem, domain.CalculateSet:
			n.ObjectType = "计算模块"
		}
	}
}

func (n *NotifyData) Key() string {
	return fmt.Sprintf("delay:notify:table:%d", n.TableId)
}

func (n *NotifyData) Retry() (time.Duration, bool) {
	n.sendRetry++
	if n.sendRetry > 3 {
		return n.Delay(), false
	}
	if n.sendRetry == 1 {
		return n.Delay(), true
	}
	return n.Delay() * time.Duration(int(math.Pow(float64(2), float64(n.sendRetry)))), true
}

func (n *NotifyData) Delay() time.Duration {
	return time.Second * 10
}

func (n *NotifyData) RetryTime() int {
	return n.sendRetry
}