作者 yangfu

refactor: optimize some core operate

... ... @@ -155,14 +155,14 @@
"params": ["产品名2"]
}
```
- [ ] 数据预览 1
- [x] 数据预览 1
- [ ] 表格编辑 1
- [ ] 保存校验文件 (文件地址) 1
- [ ] 生成主表 1
- [ ] 表复制 (副表)1
- [ ] 追加数据 (主表、副表)
- [x] 生成主表 1
- [x] 表复制 (副表)1
- [x] 追加数据 (主表、副表)
- [ ] 表删除 (主表、副表)~~、分表~~
- [ ] 表拆分 1
- [x] 表拆分 1
- [ ] 更新表结构(分表)1
- [ ] 编辑、添加、删除表数据(副表) 1
- [ ] 取消校验
... ... @@ -171,3 +171,13 @@
- [x] 隔天清理校验中的文件
- [x] 隔天清理public临时文件
## 表数据导出
- [ ] 加锁,只允许当前用户同时只能发起一次导出命令 ,3min过期
- [ ] 单次拉取数量 MR
- [ ] 100W ..
- [ ] 50W 120s 读取数据库:30s 保存文件:10s 下载:30M/500K=60S;RAR压缩 24M/500k=50S
- [ ] 20W ..
- [ ] 10W ..
- [ ] 保存单个文件、压缩 | 保存多个文件、压缩
\ No newline at end of file
... ...
... ... @@ -3,7 +3,6 @@ module gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion
go 1.16
require (
github.com/Shopify/sarama v1.30.0 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/beego/beego/v2 v2.0.1
github.com/bwmarrin/snowflake v0.3.0
... ... @@ -13,18 +12,13 @@ require (
github.com/gavv/httpexpect v2.0.0+incompatible
github.com/go-pg/pg/v10 v10.10.6
github.com/go-redis/redis v6.15.9+incompatible
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/uuid v1.3.0
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/linmadan/egglib-go v0.0.0-20210313060205-8b5e456b11f7
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/moul/http2curl v1.0.0 // indirect
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.18.1
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1
github.com/smartystreets/goconvey v1.7.2 // indirect
... ... @@ -36,12 +30,8 @@ require (
github.com/yudai/gojsondiff v1.0.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yudai/pp v2.0.1+incompatible // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 // indirect
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect
github.com/zeromicro/go-zero v1.3.4
golang.org/x/text v0.3.7
golang.org/x/tools v0.1.5 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gorm.io/driver/mysql v1.3.6
gorm.io/driver/postgres v1.3.9
gorm.io/gorm v1.23.8
... ...
... ... @@ -28,6 +28,7 @@ func main() {
log.InitLogHook(constant.ENABLE_KAFKA_LOG, true)
redis.InitRedis()
redis.InitZeroCoreRedis()
pg.Init()
if err := starrocks.Init(); err != nil {
log.Logger.Error(err.Error())
... ...
package command
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type PrepareTemporaryFileCommand struct {
// 文件ID
FileId int `cname:"文件ID" json:"fileId" valid:"Required"`
}
func (cmd *PrepareTemporaryFileCommand) Valid(validation *validation.Validation) {
}
func (cmd *PrepareTemporaryFileCommand) ValidateCommand() error {
valid := validation.Validation{}
b, err := valid.Valid(cmd)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(cmd).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
... ... @@ -4,6 +4,7 @@ import (
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/file/command"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/file/dto"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
)
... ... @@ -37,6 +38,37 @@ func (fileService *FileService) FilePreview(ctx *domain.Context, loadDataTableCo
return data, nil
}
// PrepareTemporaryFile 准备临时文件
func (fileService *FileService) PrepareTemporaryFile(ctx *domain.Context, cmd *command.PrepareTemporaryFileCommand) (interface{}, error) {
if err := cmd.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
loadDataTableService, _ := factory.CreateLoadDataTableService(transactionContext)
data, err := loadDataTableService.CreateTemporaryFile(ctx, cmd.FileId)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
fileDto := &dto.FileDto{}
fileDto.Load(data)
return fileDto, nil
}
// EditDataTable 编辑表格数据
func (fileService *FileService) EditDataTable(ctx *domain.Context, editDataTableCommand *command.EditDataTableCommand) (interface{}, error) {
if err := editDataTableCommand.ValidateCommand(); err != nil {
... ...
... ... @@ -12,12 +12,16 @@ import (
type TablePreviewCommand struct {
// 表Id
TableId int `cname:"表Id" json:"objectId" valid:"Required"`
PageNumber int `json:"pageNumber"`
PageSize int `json:"pageSize"`
Where domain.Where `json:"where"`
}
func (cmd *TablePreviewCommand) Valid(validation *validation.Validation) {
if cmd.PageSize > 0 {
cmd.Where.PageNumber = cmd.PageNumber
cmd.Where.PageSize = cmd.PageSize
}
}
func (cmd *TablePreviewCommand) ValidateCommand() error {
... ...
... ... @@ -13,11 +13,17 @@ type DBTablePreviewCommand struct {
// 表Id
ObjectId int `cname:"表Id" json:"objectId" valid:"Required"`
//ObjectType string `json:"objectType"`
// 适配外层
PageNumber int `json:"pageNumber"`
PageSize int `json:"pageSize"`
Where domain.Where `json:"where"`
}
func (cmd *DBTablePreviewCommand) Valid(validation *validation.Validation) {
if cmd.PageSize > 0 {
cmd.Where.PageNumber = cmd.PageNumber
cmd.Where.PageSize = cmd.PageSize
}
}
func (cmd *DBTablePreviewCommand) ValidateCommand() error {
... ...
... ... @@ -3,11 +3,14 @@ package service
import (
"fmt"
"github.com/linmadan/egglib-go/core/application"
"github.com/zeromicro/go-zero/core/mr"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/table/command"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/excel"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/starrocks"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"time"
)
... ... @@ -29,21 +32,21 @@ func (tableService *TableService) ExportDataTable(ctx *domain.Context, cmd *comm
// TODO:加锁 同一个用户同一个时间点只允许一次下载
var table *domain.Table
var mainTable *domain.Table
//var mainTable *domain.Table
_, table, err = factory.FastPgTable(transactionContext, cmd.TableId)
if err != nil {
return nil, factory.FastError(err)
}
if table.TableType == domain.SubTable.ToString() {
_, mainTable, err = factory.FastPgTable(transactionContext, cmd.TableId)
if err != nil {
return nil, factory.FastError(err)
}
} else {
mainTable = table
}
//if table.TableType == domain.SubTable.ToString() {
// _, mainTable, err = factory.FastPgTable(transactionContext, cmd.TableId)
// if err != nil {
// return nil, factory.FastError(err)
// }
//} else {
// mainTable = table
//}
var options = starrocks.QueryOptions{
TableName: mainTable.SQLName,
TableName: table.SQLName,
Select: table.Fields(true),
}
// 待优化分批下载,压缩
... ... @@ -71,3 +74,143 @@ func (tableService *TableService) ExportDataTable(ctx *domain.Context, cmd *comm
"count": count,
}, nil
}
func (tableService *TableService) ExportDataTableV2(ctx *domain.Context, cmd *command.TablePreviewCommand) (interface{}, error) {
if err := cmd.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
// TODO:加锁 同一个用户同一个时间点只允许一次下载
locker := redis.NewLock(redis.KeyExportTable(ctx, cmd.TableId))
locker.SetExpire(60 * 2)
ok, err := locker.Acquire()
if err != nil {
return nil, factory.FastError(err)
}
if !ok {
return nil, factory.FastError(fmt.Errorf("点击过于频繁,请稍后再试"))
}
defer locker.Release()
var table *domain.Table
//var mainTable *domain.Table
_, table, err = factory.FastPgTable(transactionContext, cmd.TableId)
if err != nil {
return nil, factory.FastError(err)
}
data, err := exportTableTo(ctx, cmd, table, 10000)
if err != nil {
return nil, factory.FastError(err)
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return data, nil
}
func exportTableTo(ctx *domain.Context, cmd *command.TablePreviewCommand, table *domain.Table, blockSize int) (interface{}, error) {
var options = starrocks.QueryOptions{
TableName: table.SQLName,
Select: table.Fields(false),
}
count, err := starrocks.QueryCount(options)
if err != nil {
return nil, factory.FastError(err)
}
generate := func(source chan<- interface{}) {
// generator
t := int(count)/blockSize + 1
for i := 0; i < t; i++ {
options := starrocks.QueryOptions{
TableName: table.SQLName,
Select: table.Fields(false),
//Where: cmd.Where.Conditions,
Offset: i * blockSize,
Limit: blockSize,
}
options.SetCondition(cmd.Where.Conditions).SetDefaultOrder()
source <- Query{
Index: i,
Options: options,
}
}
}
mapper := func(item interface{}, writer mr.Writer, cancel func(error)) {
// mapper
query := item.(Query)
var dataTable *domain.DataTable
dataTable, err = starrocks.Query(query.Options, starrocks.WrapQueryFuncWithDB(starrocks.DB))
if err != nil {
log.Logger.Error(err.Error(), map[string]interface{}{"mapper": query})
return
}
writer.Write(QueryResult{
Index: query.Index,
DataTable: dataTable,
})
}
reducer := func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
// reducer
var data = make(map[int]QueryResult)
for i := range pipe {
item := i.(QueryResult)
data[item.Index] = item
}
writer.Write(data)
}
result, err := mr.MapReduce(generate, mapper, reducer, mr.WithWorkers(5))
if err != nil {
return nil, err
}
dataTable := resultToDataTable(result)
filename := fmt.Sprintf("%v_%v.xlsx", table.Name, time.Now().Format("060102150405"))
path := fmt.Sprintf("public/%v", filename)
excelWriter := excel.NewXLXSWriterTo(domain.Fields(table.Fields(false)).NameArrayString(), dataTable.Data) //
if err = excelWriter.Save(path); err != nil {
return nil, factory.FastError(err)
}
return map[string]interface{}{
"url": domain.DownloadUrl(filename),
"count": count,
}, err
}
func resultToDataTable(result interface{}) *domain.DataTable {
var dataTable *domain.DataTable = &domain.DataTable{}
queryResult := result.(map[int]QueryResult)
for i := 0; i < len(queryResult); i++ {
if v, ok := queryResult[i]; ok {
if dataTable == nil {
dataTable = v.DataTable
continue
}
dataTable.Total += v.DataTable.Total
dataTable.Data = append(dataTable.Data, v.DataTable.Data...)
}
}
return dataTable
}
type Query struct {
Index int
Data struct{}
Options starrocks.QueryOptions
}
type QueryResult struct {
Index int
DataTable *domain.DataTable
}
... ...
... ... @@ -34,7 +34,7 @@ func (tableService *TableService) TablePreview(ctx *domain.Context, cmd *command
TableName: table.SQLName,
Select: table.Fields(true),
}
options.SetCondition(cmd.Where.Conditions)
options.SetCondition(cmd.Where.Conditions).SetDefaultOrder()
options.SetOffsetLimit(cmd.Where.PageNumber, cmd.Where.PageSize)
var dataTable *domain.DataTable
dataTable, err = factory.FastDataTable(options)
... ...
... ... @@ -19,6 +19,7 @@ type TableService interface {
type PreviewDataTableService interface {
Preview(ctx *Context, fileId int, fields []*Field, where Where) (interface{}, error)
CreateTemporaryFile(ctx *Context, fileId int) (*File, error)
GetFileId() int
}
... ...
... ... @@ -202,3 +202,14 @@ func GripData(data []map[string]string, total int64) map[string]interface{} {
"total": total,
}
}
func PK() *Field {
return &Field{
Index: 0,
Name: "序号",
SQLName: "id",
SQLType: String.ToString(),
Description: "主键",
Flag: PKField,
}
}
... ...
... ... @@ -18,21 +18,24 @@ func (ptr *PreviewDataTableService) Preview(ctx *domain.Context, fileId int, fie
fileRepository, _ := repository.NewFileRepository(ptr.transactionContext)
file, err := fileRepository.FindOne(map[string]interface{}{"fileId": fileId})
if err != nil {
return nil, fmt.Errorf("文件不存在")
return nil, fmt.Errorf("校验文件不存在")
}
isSourceFile := false
fileUrl := ""
fileUrl := file.FileInfo.Url
// Copy to TemporaryFile
if file.FileType != domain.TemporaryFile.ToString() {
file = file.CopyTo(domain.TemporaryFile, ctx)
if file, err = fileRepository.Save(file); err != nil {
return nil, err
//file = file.CopyTo(domain.TemporaryFile, ctx)
//if file, err = fileRepository.Save(file); err != nil {
// return nil, err
//}
//isSourceFile = true
//fileUrl = file.FileInfo.Url
return nil, fmt.Errorf("校验文件不存在")
}
ptr.FileId = file.FileId
if len(fields) == 0 {
isSourceFile = true
fileUrl = file.FileInfo.Url
}
//TEST
ptr.FileId = file.FileId
// Load Data From Excel(python api)
byteCore, _ := CreateByteCoreService()
... ... @@ -58,6 +61,22 @@ func (ptr *PreviewDataTableService) Preview(ctx *domain.Context, fileId int, fie
return responseDto, nil
}
func (ptr *PreviewDataTableService) CreateTemporaryFile(ctx *domain.Context, fileId int) (*domain.File, error) {
fileRepository, _ := repository.NewFileRepository(ptr.transactionContext)
file, err := fileRepository.FindOne(map[string]interface{}{"fileId": fileId})
if err != nil {
return nil, fmt.Errorf("文件不存在")
}
if !(file.FileType == domain.SourceFile.ToString() || file.FileType == domain.VerifiedFile.ToString()) {
return nil, fmt.Errorf("源文件/校验文件才可以创建临时文件")
}
file = file.CopyTo(domain.TemporaryFile, ctx)
if file, err = fileRepository.Save(file); err != nil {
return nil, err
}
return file, nil
}
type FilePreviewDto struct {
ObjectId int `json:"objectId"`
ObjectType string `json:"objectType"`
... ...
... ... @@ -8,11 +8,11 @@ import (
"github.com/go-pg/pg/v10/orm"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/pg/models"
liblog "gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"os"
"time"
)
... ... @@ -50,7 +50,7 @@ func Init() {
var err error
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
liblog.GormLogWriter{Module: "【StarRocks】"}, // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Info, // Log level
... ...
package redis
import (
"fmt"
"github.com/zeromicro/go-zero/core/stores/redis"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
)
func KeyExportTable(ctx *domain.Context, tableId int) string {
return fmt.Sprintf("%v.lock-table.%v,%v", constant.SERVICE_ENV, tableId, ctx.OperatorId)
}
var ZeroCoreRedis *redis.Redis
func InitZeroCoreRedis() {
ZeroCoreRedis = redis.New(constant.REDIS_HOST+":"+constant.REDIS_PORT, redis.WithPass(constant.REDIS_AUTH))
}
func NewLock(key string) *redis.RedisLock {
return redis.NewRedisLock(ZeroCoreRedis, key)
}
... ...
... ... @@ -42,12 +42,32 @@ func (o *QueryOptions) SetOffsetLimit(pageNumber, pageSize int) {
o.Limit = pageSize
}
func (o *QueryOptions) SetCondition(conditions []domain.Condition) {
func (o *QueryOptions) SetCondition(conditions []domain.Condition) *QueryOptions {
for _, c := range conditions {
o.Where = append(o.Where, Condition{
Condition: c,
})
}
return o
}
func (o *QueryOptions) SetDefaultOrder() *QueryOptions {
hasOrder := false
for _, c := range o.Where {
if len(c.Order) > 0 {
hasOrder = true
}
}
// 没有排序的加一个排序,才能分页
if !hasOrder {
o.Where = append(o.Where, Condition{
Condition: domain.Condition{
Field: domain.PK(),
Order: "ASC",
},
})
}
return o
}
type Condition struct {
... ... @@ -179,6 +199,10 @@ func WrapQueryCountWithDB(params QueryOptions, db *gorm.DB) func() (int64, error
var total int64
query := db.Table(params.TableName)
queryWithoutLimitOffset(query, params)
if params.Context != nil {
query.Where(fmt.Sprintf("context->>'companyId'='%v'", params.Context.CompanyId))
//query.Where("context->>'companyId'='?'", params.Context.CompanyId)
}
query.Count(&total)
return total, query.Error
}
... ...
... ... @@ -98,6 +98,14 @@ func (controller *FileController) FilePreview() {
controller.Response(data, err)
}
func (controller *FileController) PrepareTemporaryFile() {
fileService := service.NewFileService(nil)
loadDataTableCommand := &command.PrepareTemporaryFileCommand{}
controller.Unmarshal(loadDataTableCommand)
data, err := fileService.PrepareTemporaryFile(ParseContext(controller.BaseController), loadDataTableCommand)
controller.Response(data, err)
}
func (controller *FileController) EditDataTable() {
fileService := service.NewFileService(nil)
editDataTableCommand := &command.EditDataTableCommand{}
... ...
... ... @@ -157,7 +157,7 @@ func (controller *TableController) ExportDataTable() {
tableService := service.NewTableService(nil)
cmd := &command.TablePreviewCommand{}
controller.Unmarshal(cmd)
data, err := tableService.ExportDataTable(ParseContext(controller.BaseController), cmd)
data, err := tableService.ExportDataTableV2(ParseContext(controller.BaseController), cmd)
controller.Response(data, err)
}
... ...
... ... @@ -15,6 +15,7 @@ func init() {
web.Router("/data/files/search-source-file", &controllers.FileController{}, "Post:SearchSourceFile")
web.Router("/data/files/search-verified-file", &controllers.FileController{}, "Post:SearchVerifiedFile")
web.Router("/data/files/cancel-verifying-file", &controllers.FileController{}, "Post:CancelVerifyingFile")
web.Router("/data/files/prepare-temporary-file", &controllers.FileController{}, "Post:PrepareTemporaryFile")
web.Router("/data/file-preview", &controllers.FileController{}, "Post:FilePreview")
web.Router("/data/edit-data-table", &controllers.FileController{}, "Post:EditDataTable")
... ...