byte_notice_logic.go 6.5 KB
package consumer

import (
	"context"
	"encoding/json"
	"fmt"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
)

type ByteNoticeLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	conn   transaction.Conn
}

func NewByteNoticeLogic(svcCtx *svc.ServiceContext) *ByteNoticeLogic {
	return &ByteNoticeLogic{
		ctx:    context.Background(),
		svcCtx: svcCtx,
		conn:   svcCtx.DefaultDBConn(),
	}
}

func (logic *ByteNoticeLogic) Consume(key, value string) error {
	fmt.Println(key, value)
	notice := &domain.ObjectNotice{}
	err := json.Unmarshal([]byte(value), notice)
	if err != nil {
		return err
	}
	//保存推送消息
	_, err = logic.svcCtx.ObjectNoticeRepository.Insert(logic.ctx, logic.conn, notice)
	if err != nil {
		return err
	}
	//处理消息
	err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
		return logic.handleNotice(conn, notice)
	}, true)
	//更新处理结果
	if err != nil {
		notice.Status = domain.ObjectNoticeStatusError
		notice.Message = err.Error()
	} else {
		notice.Status = domain.ObjectNoticeStatusDone
		notice.Message = "OK"
	}
	_, _ = logic.svcCtx.ObjectNoticeRepository.Update(logic.ctx, logic.conn, notice)
	return err
}

// handleNotice 处理消息
func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain.ObjectNotice) error {
	//是否删除消息
	if notice.IsDeletedEvent() {
		return logic.handleDelete(conn, notice)
	}
	accessToken, _ := types.TableAccessToken{CompanyId: notice.CompanyId}.GenerateToken()
	//结构变更
	if notice.StructChanged {
		request := bytelib.ObjectTableSearchRequest{
			Token:  accessToken,
			Module: bytelib.ModuleDigitalCenter,
		}
		if notice.ObjectType == "导入模块" {
			request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable}
		}
		if notice.ObjectType == "拆解模块" {
			request.TableTypes = []string{bytelib.SchemaTable}
			request.ReturnGroupItem = true
		}
		if notice.ObjectType == "计算模块" {
			request.TableTypes = []string{bytelib.CalculateItem, bytelib.CalculateSet}
			request.ReturnGroupItem = true
			request.ExcludeTables = []int{0}
		}
		list, err := logic.svcCtx.ByteMetadataService.ObjectTableSearch(logic.ctx, request)
		if err != nil {
			return err
		}
		tableInfo, err := logic.svcCtx.ByteMetadataService.TableInfo(logic.ctx, &bytelib.TableInfoRequest{
			Token:   accessToken,
			TableId: notice.TableId,
		})
		if err != nil {
			return nil
		}
		if len(list.List) > 0 {
			objectTables := make([]*domain.ObjectTable, 0)
			for _, item := range list.List {
				if item.TableId == notice.TableId {
					objectTables = append(objectTables, &domain.ObjectTable{
						Id:        item.Id,
						TableId:   item.TableId,
						Name:      item.Name,
						TableType: item.TableType,
						CompanyId: notice.CompanyId,
						ParentId:  item.ParentId,
						Flag:      item.Flag,
					})
					//父级节点
					objectTables = append(objectTables, logic.getParents(notice.CompanyId, item.ParentId, list.List)...)
				}
			}
			err = logic.saveTables(conn, objectTables)
			if err != nil {
				return err
			}
		}
		//保存字段
		_, err = logic.saveFields(conn, &domain.ObjectField{
			Id:     int64(tableInfo.TableId),
			Name:   tableInfo.Name,
			Fields: tableInfo.Fields,
		})
		if err != nil {
			return err
		}
	}
	//数据变更
	if notice.DataChanged {
		data := &types.SyncTableDataPusher{
			CompanyId: notice.CompanyId,
			ObjectId:  notice.TableId,
		}
		mBytes, _ := json.Marshal(data)
		_, err := logic.svcCtx.Redis.LpushCtx(logic.ctx, logic.svcCtx.Config.Name+":table_data", string(mBytes))
		return err
	}
	return nil
}

func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []*bytelib.Table) []*domain.ObjectTable {
	result := make([]*domain.ObjectTable, 0)
	for _, item := range list {
		if item.Id == parentId {
			result = append(result, &domain.ObjectTable{
				Id:        item.Id,
				TableId:   item.TableId,
				Name:      item.Name,
				TableType: item.TableType,
				CompanyId: companyId,
				ParentId:  item.ParentId,
				Flag:      item.Flag,
			})
			if item.ParentId > 0 {
				result = append(result, logic.getParents(companyId, item.ParentId, list)...)
			}
		}
	}
	return result
}

// handleDelete 删除
func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain.ObjectNotice) error {
	objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, notice.TableId)
	if err == nil && objectTable.Id > 0 {
		objectTable.RemoteDeleted = 1
		_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
		if err != nil {
			return err
		}
	}
	//是否有使用数据源
	used, err := logic.svcCtx.ChartSettingRepository.CheckUseDataSource(logic.ctx, conn, notice.TableId)
	if err == nil && !used {
		err = logic.svcCtx.ObjectTableDataRepository.DropTable(logic.ctx, conn, notice.TableId)
		if err != nil {
			return err
		}
	}
	return nil
}

// saveTables 保存表结构
func (logic *ByteNoticeLogic) saveTables(conn transaction.Conn, tables []*domain.ObjectTable) error {
	if len(tables) > 0 {
		for _, item := range tables {
			objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
			if err == nil && objectTable.Id > 0 {
				item.Id = objectTable.Id
				item.Version = objectTable.Version + 1
				_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
				if err != nil {
					return err
				}
			} else {
				_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
				if err != nil {
					return err
				}
			}
		}
	}
	return nil
}

// saveFields 保存表字段
func (logic *ByteNoticeLogic) saveFields(conn transaction.Conn, objectField *domain.ObjectField) (*domain.ObjectField, error) {
	mField, err := logic.svcCtx.ObjectFieldRepository.FindOne(logic.ctx, conn, objectField.Id)
	if err == nil && mField.Id > 0 { //已存在 - 更新
		objectField.Version = mField.Version + 1
		return logic.svcCtx.ObjectFieldRepository.Update(logic.ctx, conn, objectField)
	} else {
		return logic.svcCtx.ObjectFieldRepository.Insert(logic.ctx, conn, objectField)
	}
}