digital_platform_event_subscribe.go 7.5 KB
package service

import (
	"fmt"
	"github.com/linmadan/egglib-go/core/application"
	pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
	"math"
	"time"
)

func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	if err := transactionContext.StartTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()

	var (
		dataChanged   = true
		structChanged = true
		tableType     string
	)

	data := cmd.EventTable
	tableId := 0
	switch data.Type {
	case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent:
		tableId = data.Table.TableId
	case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
		tableId = data.QuerySet.QuerySetInfo.BindTableId
		if data.QuerySet.Status != domain.StatusOn {
			return nil, nil
		}
		if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
			return nil, nil
		}
	case domain.QuerySetUpdateStatusEvent:
		tableId = data.QuerySet.QuerySetInfo.BindTableId
		if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
			return nil, nil
		}
	case domain.TableApplyOnEvent:
		tableId = data.Table.TableId
		dataChanged = false
	case domain.QuerySetDeleteEvent:
		tableId = data.Table.TableId
	}
	if tableId == 0 {
		return nil, nil
	}
	var notifyData = NotifyData{
		DataChanged:   dataChanged,
		StructChanged: structChanged,
		TableId:       tableId,
		Event:         data.Type.ToString(),
		Metadata:      cmd.EventTable.Metadata,
	}
	// tableId 相关联的
	tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
	if table == nil && data.Table != nil {
		table = data.Table
		notifyData.CompanyId = table.Context.CompanyId
	}
	if tableType == "" && table != nil {
		tableType = table.TableType
	}
	if tableType == "" && data.QuerySet != nil {
		tableType = data.QuerySet.Type
	}
	if table != nil {
		notifyData.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
		switch domain.TableType(tableType) {
		case domain.MainTable, domain.SubTable, domain.SideTable:
			notifyData.ObjectType = "导入模块"
		case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
			notifyData.ObjectType = "拆解模块"
		case domain.CalculateItem, domain.CalculateSet:
			notifyData.ObjectType = "计算模块"
		}
	}

	_, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
	if err != nil {
		return nil, err
	}

	tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
	tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
	tree := tableDependTree.Tree

	querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
	var mapTableQuerySet = make(map[int]*domain.QuerySet)
	if len(tree) > 0 && cmd.EventTable.QuerySet != nil {
		_, querySets, _ := querySetRepository.Find(map[string]interface{}{
			"types":        []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
			"bindTableIds": tree,
			"status":       domain.StatusOn,
		})
		for _, q := range querySets {
			mapTableQuerySet[q.QuerySetInfo.BindTableId] = q
		}
	}

	for i := range tree {
		table, ok := tableDependencyService.TableMap[tree[i]]
		if !ok {
			continue
		}
		if notifyData.CompanyId == 0 {
			notifyData.CompanyId = table.Context.CompanyId
		}
		switch table.TableType {
		case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
			if table.TableInfo != nil && table.TableInfo.ApplyOnModule&domain.ModuleDigitalCenter == 0 {
				continue
			}
			break
		case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
			continue
		case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
			if querySet, ok := mapTableQuerySet[tree[i]]; !ok {
				continue
			} else {
				// 不是当前的查询集。且状态为关闭的都补推送
				if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
					continue
				}
			}
		}
		notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
	}
	found := false
	for _, id := range notifyData.TableAffectedList {
		if id == tableId {
			found = true
		}
	}
	if !found {
		notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
	}
	lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
	if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
		log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
		if t, ok := notifyData.Retry(); ok {
			tableEventService.TimingWheel.SetTimer(notifyData.Key(), &notifyData, t)
			log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
		}

	}
	if err := transactionContext.CommitTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return nil, nil
}

type NotifyData struct {
	DataChanged       bool                   `json:"dataChanged"`       // 数据有变化
	StructChanged     bool                   `json:"structChanged"`     // 结构有变化
	TableId           int                    `json:"tableId"`           // 表ID
	TableType         string                 `json:"tableType"`         // 表类型:导入模块(主表,副表,分表) 拆解模块(方案、子过程、计算表) 计算模块(计算项,计算集)
	ObjectType        string                 `json:"objectType"`        // 导入模块、拆解模块、计算模块
	CompanyId         int                    `json:"companyId"`         // 公司
	Event             string                 `json:"event"`             // 事件名称
	TableAffectedList []int                  `json:"tableAffectedList"` // 级联影响到的表
	Metadata          map[string]interface{} `json:"metadata"`          // 元数据
	sendRetry         int
}

func (n *NotifyData) Key() string {
	return fmt.Sprintf("delay:notify:table:%d", n.TableId)
}

func (n *NotifyData) Retry() (time.Duration, bool) {
	n.sendRetry++
	if n.sendRetry > 3 {
		return n.Delay(), false
	}
	if n.sendRetry == 1 {
		return n.Delay(), true
	}
	return n.Delay() * time.Duration(int(math.Pow(float64(2), float64(n.sendRetry)))), true
}

func (n *NotifyData) Delay() time.Duration {
	return time.Second * 10
}

func (n *NotifyData) RetryTime() int {
	return n.sendRetry
}