byte_notice_logic.go 7.5 KB
package consumer

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/pkg/errors"
	"github.com/zeromicro/go-zero/core/stores/redis"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
)

type ByteNoticeLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	conn   transaction.Conn
}

func NewByteNoticeLogic(svcCtx *svc.ServiceContext) *ByteNoticeLogic {
	return &ByteNoticeLogic{
		ctx:    context.Background(),
		svcCtx: svcCtx,
		conn:   svcCtx.DefaultDBConn(),
	}
}

func (logic *ByteNoticeLogic) Consume(key, value string) error {
	fmt.Println(key, value)
	notice := &domain.ObjectNotice{}
	err := json.Unmarshal([]byte(value), notice)
	if err != nil {
		return err
	}
	//保存推送消息
	_, err = logic.svcCtx.ObjectNoticeRepository.Insert(logic.ctx, logic.conn, notice)
	if err != nil {
		return err
	}
	//处理消息
	err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
		return logic.handleNotice(conn, notice)
	}, true)
	//更新处理结果
	if err != nil {
		notice.Status = domain.ObjectNoticeStatusError
		notice.Message = err.Error()
	} else {
		notice.Status = domain.ObjectNoticeStatusDone
		notice.Message = "OK"
	}
	_, _ = logic.svcCtx.ObjectNoticeRepository.Update(logic.ctx, logic.conn, notice)
	return err
}

// handleNotice 处理消息
func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain.ObjectNotice) error {
	//是否删除消息
	if notice.IsDeletedEvent() {
		return logic.handleDelete(conn, notice)
	}
	accessToken, _ := types.TableAccessToken{CompanyId: notice.CompanyId}.GenerateToken()
	//结构变更
	if notice.StructChanged {
		request := bytelib.ObjectTableSearchRequest{
			Token:  accessToken,
			Module: bytelib.ModuleDigitalCenter,
		}
		if notice.ObjectType == "导入模块" {
			request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable}
		}
		if notice.ObjectType == "拆解模块" {
			request.TableTypes = []string{bytelib.SchemaTable}
			request.ReturnGroupItem = true
		}
		if notice.ObjectType == "计算模块" {
			request.TableTypes = []string{bytelib.CalculateItem, bytelib.CalculateSet}
			request.ReturnGroupItem = true
			request.ExcludeTables = []int{0}
		}
		list, err := logic.svcCtx.ByteMetadataService.ObjectTableSearch(logic.ctx, request)
		if err != nil {
			return err
		}
		tableInfo, err := logic.svcCtx.ByteMetadataService.TableInfo(logic.ctx, &bytelib.TableInfoRequest{
			Token:   accessToken,
			TableId: notice.TableId,
		})
		if err != nil {
			return nil
		}
		if len(list.List) > 0 {
			objectTables := make([]*domain.ObjectTable, 0)
			for _, item := range list.List {
				if item.TableId == notice.TableId {
					objectTables = append(objectTables, &domain.ObjectTable{
						Id:        item.Id,
						TableId:   item.TableId,
						Name:      item.Name,
						TableType: item.TableType,
						CompanyId: notice.CompanyId,
						ParentId:  item.ParentId,
						Flag:      item.Flag,
					})
					//父级节点
					objectTables = append(objectTables, logic.getParents(notice.CompanyId, item.ParentId, list.List)...)
				}
			}
			err = logic.saveTables(conn, objectTables)
			if err != nil {
				return err
			}
		}
		//保存字段
		_, err = logic.saveFields(conn, &domain.ObjectField{
			Id:     int64(tableInfo.TableId),
			Name:   tableInfo.Name,
			Fields: tableInfo.Fields,
		})
		if err != nil {
			return err
		}
	}
	//数据变更
	if notice.DataChanged {
		tableDataPreview, err := logic.svcCtx.ByteMetadataService.TableDataPreview(logic.ctx, &bytelib.TableDataPreviewRequest{
			Token:      accessToken,
			ObjectId:   int64(notice.TableId),
			ObjectType: bytelib.ObjectMetaTable,
			Where: &bytelib.TableQueryWhere{
				PageNumber: 1,
				PageSize:   bytelib.MaxPageSize,
			},
			UseCache:   true,
			HiddenData: false,
		})
		if err != nil {
			return err
		}
		//加锁,避免图表在请求读取本地数据时报错
		key := logic.svcCtx.Config.Name + ":bytelib:" + fmt.Sprintf("%v", tableDataPreview.ObjectId)
		lock := redis.NewRedisLock(logic.svcCtx.Redis, key)
		// 设置过期时间
		lock.SetExpire(10 * 60)
		acquire, err := lock.Acquire()
		fmt.Println(acquire, err)
		defer lock.Release()
		err = logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
		if err != nil {
			return err
		}
		//更新标记本地存储
		err = logic.updateTableWithLocal(conn, int(tableDataPreview.ObjectId))
		if err != nil {
			return err
		}
	}
	return nil
}

func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []*bytelib.Table) []*domain.ObjectTable {
	result := make([]*domain.ObjectTable, 0)
	for _, item := range list {
		if item.Id == parentId {
			result = append(result, &domain.ObjectTable{
				Id:        item.Id,
				TableId:   item.TableId,
				Name:      item.Name,
				TableType: item.TableType,
				CompanyId: companyId,
				ParentId:  item.ParentId,
				Flag:      item.Flag,
			})
			if item.ParentId > 0 {
				result = append(result, logic.getParents(companyId, item.ParentId, list)...)
			}
		}
	}
	return result
}

// handleDelete 删除
func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain.ObjectNotice) error {
	objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, notice.TableId)
	if err == nil && objectTable.Id > 0 {
		objectTable.RemoteDeleted = 1
		_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
		if err != nil {
			return err
		}
	}
	return nil
}

// saveTables 保存表结构
func (logic *ByteNoticeLogic) saveTables(conn transaction.Conn, tables []*domain.ObjectTable) error {
	if len(tables) > 0 {
		for _, item := range tables {
			objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
			if err == nil && objectTable.Id > 0 {
				item.Id = objectTable.Id
				item.Version = objectTable.Version + 1
				_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
				if err != nil {
					return err
				}
			} else {
				_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
				if err != nil {
					return err
				}
			}
		}
	}
	return nil
}

// saveFields 保存表字段
func (logic *ByteNoticeLogic) saveFields(conn transaction.Conn, objectField *domain.ObjectField) (*domain.ObjectField, error) {
	mField, err := logic.svcCtx.ObjectFieldRepository.FindOne(logic.ctx, conn, objectField.Id)
	if err == nil && mField.Id > 0 { //已存在 - 更新
		objectField.Version = mField.Version + 1
		return logic.svcCtx.ObjectFieldRepository.Update(logic.ctx, conn, objectField)
	} else {
		return logic.svcCtx.ObjectFieldRepository.Insert(logic.ctx, conn, objectField)
	}
}

// updateTableWithLocal 更新表标记本地存储
func (logic *ByteNoticeLogic) updateTableWithLocal(conn transaction.Conn, tableId int) error {
	objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, tableId)
	if err == nil && objectTable.Id > 0 {
		objectTable.IsLocal = true
		_, err = logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
		return err
	}
	return errors.New("表不存在")
}