sync_table_logic.go 5.6 KB
package table

import (
	"context"
	"encoding/json"
	"fmt"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/pkg/xerr"

	"github.com/zeromicro/go-zero/core/logx"
	"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
)

type SyncTableLogic struct {
	logx.Logger
	ctx    context.Context
	svcCtx *svc.ServiceContext
	conn   transaction.Conn
}

func NewSyncTableLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncTableLogic {
	return &SyncTableLogic{
		Logger: logx.WithContext(ctx),
		ctx:    ctx,
		svcCtx: svcCtx,
		conn:   svcCtx.DefaultDBConn(),
	}
}

func (l *SyncTableLogic) SyncTable() (resp interface{}, err error) {
	companyIds, err := l.svcCtx.ChartRepository.FindCompanyIds(l.ctx, l.conn)
	if err != nil {
		return nil, xerr.NewErrMsg("查询公司ID失败:" + err.Error())
	}
	tables := make([]*domain.ObjectTable, 0)
	for _, companyId := range companyIds {
		accessToken, _ := types.TableAccessToken{CompanyId: companyId}.GenerateToken()
		//获取导入模块
		response, err := l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
			Token:      accessToken,
			TableTypes: []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable},
			Module:     bytelib.ModuleChartTemplate,
		})
		if err == nil {
			tables = append(tables, l.getTables(companyId, response)...)
		}
		//拆解模块
		response, err = l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
			Token:           accessToken,
			TableTypes:      []string{bytelib.SchemaTable},
			Module:          bytelib.ModuleQuerySetCenter,
			ReturnGroupItem: true,
		})
		if err == nil {
			tables = append(tables, l.getTables(companyId, response)...)
		}
		//计算项 计算集
		response, err = l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
			Token:           accessToken,
			TableTypes:      []string{bytelib.CalculateItem, bytelib.CalculateSet},
			Module:          bytelib.ModuleCalculateCenter,
			ReturnGroupItem: true,
			ExcludeTables:   []int{0},
		})
		if err == nil {
			tables = append(tables, l.getTables(companyId, response)...)
		}
	}
	//获取字段信息
	fields := l.getFields(tables)
	//保存数据
	err = transaction.UseTrans(l.ctx, l.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
		err = l.SaveTables(conn, tables)
		if err != nil {
			return err
		}
		err = l.SaveFields(conn, fields)
		if err != nil {
			return err
		}
		return nil
	}, true)
	if err != nil {
		return nil, xerr.NewErrMsg("保存表失败:" + err.Error())
	}
	//同步表数据
	for _, item := range tables {
		if item.TableId <= 0 {
			continue
		}
		used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, l.conn, item.TableId)
		if err == nil && used {
			pusher := &types.SyncTableDataPusher{
				CompanyId: item.CompanyId,
				ObjectId:  int(item.TableId),
			}
			mBytes, _ := json.Marshal(pusher)
			_, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes))
			fmt.Println(" ===========================> 加入数据下载队列 " + string(mBytes))
		}
	}
	fmt.Println("========>数据同步完成")
	return companyIds, err
}

func (l *SyncTableLogic) getTables(companyId int64, response bytelib.ObjectTableSearchResponse) []*domain.ObjectTable {
	tables := make([]*domain.ObjectTable, 0)
	if len(response.List) > 0 {
		for _, item := range response.List {
			tables = append(tables, &domain.ObjectTable{
				Id:            item.Id,
				TableId:       item.TableId,
				Name:          item.Name,
				TableType:     item.TableType,
				CompanyId:     companyId,
				ParentId:      item.ParentId,
				Flag:          item.Flag,
				Version:       1,
				IsLocal:       false,
				RemoteDeleted: 0,
			})
		}
	}
	return tables
}

func (l *SyncTableLogic) getFields(list []*domain.ObjectTable) []*domain.ObjectField {
	fields := make([]*domain.ObjectField, 0)
	for _, item := range list {
		accessToken, _ := types.TableAccessToken{CompanyId: item.CompanyId}.GenerateToken()
		if item.TableId > 0 {
			response, err := l.svcCtx.ByteMetadataService.TableInfo(l.ctx, &bytelib.TableInfoRequest{
				Token:   accessToken,
				TableId: item.TableId,
			})
			if err == nil {
				fields = append(fields, &domain.ObjectField{
					Id:      int64(item.TableId),
					Name:    response.Name,
					Fields:  response.Fields,
					Version: 1,
				})
			}
		}
	}
	return fields
}

// SaveTables 保存数据
func (l *SyncTableLogic) SaveTables(conn transaction.Conn, list []*domain.ObjectTable) error {
	for _, item := range list {
		objectTable, err := l.svcCtx.ObjectTableRepository.FindOne(l.ctx, conn, item.Id)
		if err == nil && objectTable.Id > 0 {
			item.IsLocal = objectTable.IsLocal
			item.Version = objectTable.Version + 1
		}
		_, err = l.svcCtx.ObjectTableRepository.Insert(l.ctx, conn, item)
		if err != nil {
			return err
		}
	}
	return nil
}

func (l *SyncTableLogic) SaveFields(conn transaction.Conn, list []*domain.ObjectField) error {
	for _, item := range list {
		objectTable, err := l.svcCtx.ObjectFieldRepository.FindOne(l.ctx, conn, item.Id)
		if err == nil && objectTable.Id > 0 {
			item.Version = objectTable.Version + 1
		}
		_, err = l.svcCtx.ObjectFieldRepository.Insert(l.ctx, conn, item)
		if err != nil {
			return err
		}
	}
	return nil
}