export_table.go 7.4 KB
package service

import (
	"fmt"
	"github.com/linmadan/egglib-go/core/application"
	"github.com/zeromicro/go-zero/core/mr"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/table/command"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/excel"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/pg"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/redis"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/starrocks"
	"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
	"time"
)

func (tableService *TableService) ExportDataTable(ctx *domain.Context, cmd *command.TablePreviewCommand) (interface{}, error) {
	if err := cmd.ValidateCommand(); err != nil {
		return nil, application.ThrowError(application.ARG_ERROR, err.Error())
	}
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	if err := transactionContext.StartTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()

	var table *domain.Table
	//var mainTable *domain.Table
	_, table, err = factory.FastPgTable(transactionContext, cmd.TableId)
	if err != nil {
		return nil, factory.FastError(err)
	}
	var options = starrocks.QueryOptions{
		TableName: table.SQLName,
		Select:    table.Fields(true),
	}
	// 待优化分批下载,压缩
	var dataTable *domain.DataTable
	dataTable, err = starrocks.Query(options, starrocks.DefaultQueryFunc)
	if err != nil {
		return nil, factory.FastError(err)
	}
	count, err := starrocks.QueryCount(options)
	if err != nil {
		return nil, factory.FastError(err)
	}
	filename := fmt.Sprintf("%v_%v.xlsx", table.Name, time.Now().Format("060102150405"))
	path := fmt.Sprintf("public/%v", filename)
	excelWriter := excel.NewXLXSWriterTo(domain.Fields(table.Fields(false)).NameArrayString(), dataTable.Data) //
	if err = excelWriter.Save(path); err != nil {
		return nil, factory.FastError(err)
	}

	if err := transactionContext.CommitTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return map[string]interface{}{
		"url":   domain.DownloadUrl(filename),
		"count": count,
	}, nil
}

func MakeToInterfaces(fields []*domain.Field) func([]string) []interface{} {
	return func(input []string) []interface{} {
		output := make([]interface{}, len(input))
		for i, v := range input {
			if i < len(fields) {
				convValue, err := domain.ValueToType(v, fields[i].SQLType)
				if err == nil {
					output[i] = convValue
					continue
				}
			}
			output[i] = v
		}
		return output
	}
}

func (tableService *TableService) ExportDataTableV2(ctx *domain.Context, cmd *command.TablePreviewCommand) (interface{}, error) {
	if err := cmd.ValidateCommand(); err != nil {
		return nil, application.ThrowError(application.ARG_ERROR, err.Error())
	}
	transactionContext, err := factory.CreateTransactionContext(nil)
	if err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	if err := transactionContext.StartTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()

	locker := redis.NewLock(redis.KeyExportTable(ctx, cmd.TableId))
	locker.SetExpire(60 * 2)
	ok, err := locker.Acquire()
	if err != nil {
		return nil, factory.FastError(err)
	}
	if !ok {
		return nil, factory.FastError(fmt.Errorf("点击过于频繁,请稍后再试"))
	}
	defer locker.Release()
	var table *domain.Table
	//var mainTable *domain.Table
	if cmd.ObjectType == domain.ObjectDBTable {
		table = domain.DBTables[cmd.TableId]
		if table == nil {
			return nil, factory.FastError(fmt.Errorf("表不存在"))
		}
	} else {
		_, table, err = factory.FastPgTable(transactionContext, cmd.TableId)
		if err != nil {
			return nil, factory.FastError(err)
		}
	}

	data, err := exportTableTo(ctx, cmd, table, 10000)
	if err != nil {
		return nil, factory.FastError(err)
	}

	if err := transactionContext.CommitTransaction(); err != nil {
		return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return data, nil
}

func exportTableTo(ctx *domain.Context, cmd *command.TablePreviewCommand, table *domain.Table, blockSize int) (interface{}, error) {
	var options = starrocks.QueryOptions{
		TableName: table.SQLName,
		Select:    table.Fields(false),
		//Table:     table,
	}

	db := starrocks.DB
	if table.TableType == domain.ObjectDBTable {
		db = pg.GormDB
	}
	count, err := starrocks.WrapQueryCountWithDB(options, db)()
	if err != nil {
		return nil, factory.FastError(err)
	}
	generate := func(source chan<- interface{}) {
		// generator
		t := int(count)/blockSize + 1
		for i := 0; i < t; i++ {
			options := starrocks.QueryOptions{
				TableName: table.SQLName,
				Select:    table.Fields(false),
				//Where:     cmd.Where.Conditions,
				Offset: i * blockSize,
				Limit:  blockSize,
				Table:  table,
			}
			options.SetCondition(cmd.Where.Conditions).SetDefaultOrder()
			source <- Query{
				Index:   i,
				Options: options,
			}
		}
	}
	mapper := func(item interface{}, writer mr.Writer, cancel func(error)) {
		// mapper
		query := item.(Query)
		var dataTable *domain.DataTable
		dataTable, err = starrocks.Query(query.Options, starrocks.WrapQueryFuncWithDB(db))
		if err != nil {
			log.Logger.Error(err.Error(), map[string]interface{}{"mapper": query})
			return
		}
		writer.Write(QueryResult{
			Index:     query.Index,
			DataTable: dataTable,
		})
	}
	reducer := func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
		// reducer
		var data = make(map[int]QueryResult)
		for i := range pipe {
			item := i.(QueryResult)
			data[item.Index] = item
		}
		writer.Write(data)
	}
	result, err := mr.MapReduce(generate, mapper, reducer, mr.WithWorkers(5))
	if err != nil {
		return nil, err
	}
	dataTable := resultToDataTable(result)

	filename := fmt.Sprintf("%v_%v.xlsx", table.Name, time.Now().Format("060102150405"))
	path := fmt.Sprintf("public/%v", filename)
	excelWriter := excel.NewXLXSWriterTo(domain.Fields(table.Fields(false)).NameArrayString(), dataTable.Data) //
	excelWriter.ToInterfaces = MakeToInterfaces(table.Fields(false))
	if err = excelWriter.Save(path); err != nil {
		return nil, factory.FastError(err)
	}

	return map[string]interface{}{
		"url":   domain.DownloadUrl(filename),
		"count": count,
	}, err
}

func resultToDataTable(result interface{}) *domain.DataTable {
	var dataTable *domain.DataTable = &domain.DataTable{}
	queryResult := result.(map[int]QueryResult)
	for i := 0; i < len(queryResult); i++ {
		if v, ok := queryResult[i]; ok {
			if dataTable == nil {
				dataTable = v.DataTable
				continue
			}
			dataTable.Total += v.DataTable.Total
			dataTable.Data = append(dataTable.Data, v.DataTable.Data...)
		}
	}
	return dataTable
}

type Query struct {
	Index   int
	Data    struct{}
	Options starrocks.QueryOptions
}

type QueryResult struct {
	Index     int
	DataTable *domain.DataTable
}