作者 yangfu

feat: file header row

package service
import (
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/cache"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
)
func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
//if err := transactionContext.StartTransaction(); err != nil {
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
//}
//defer func() {
// transactionContext.RollbackTransaction()
//}()
var (
dataChanged = true
structChanged = true
)
data := cmd.EventTable
tableId := 0
switch data.Type {
case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent:
// dataChanged = true
tableId = data.Table.TableId
case domain.QuerySetUpdateEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
// structChanged = true
}
if tableId == 0 {
return nil, nil
}
var notifyData = struct {
DataChanged bool `json:"dataChanged"`
StructChanged bool `json:"structChanged"`
TableId int `json:"tableId"`
Event string `json:"event"`
TableAffectedList []int `json:"tableAffectedList"`
}{
DataChanged: dataChanged,
StructChanged: structChanged,
TableId: tableId,
Event: data.Type.ToString(),
}
// tableId 相关联的
tableRepository, _, _ := factory.FastPgTable(transactionContext, 0)
_, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
if err != nil {
return nil, err
}
tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
tree := tableDependTree.Tree
//tableService := tableservice.NewTableService(nil)
for i := range tree {
cache.DefaultDataTableCacheService.DeleteDataTable(tree[i])
// fresh cache
//tableService.TablePreview(data.Context, &tablecommand.TablePreviewCommand{
// TableId: tree[i],
// ObjectType: domain.ObjectMetaTable,
// PageSize: 10000,
// PageNumber: 0,
// UseCache: true,
//})
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
}
//if err := transactionContext.CommitTransaction(); err != nil {
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
//}
return nil, nil
}
... ...
... ... @@ -15,6 +15,7 @@ type EditDataTableCommand struct {
//
//Fields []*domain.Field
domain.EditTableRequest
HeaderRow int `json:"headerRow"` // 行号 默认:0
}
func (editDataTableCommand *EditDataTableCommand) Valid(validation *validation.Validation) {
... ... @@ -30,6 +31,7 @@ func (editDataTableCommand *EditDataTableCommand) Valid(validation *validation.V
validation.Error("文件ID不能为空")
return
}
editDataTableCommand.Where.HeaderRow = domain.SetHeaderRow(editDataTableCommand.HeaderRow)
}
func (editDataTableCommand *EditDataTableCommand) ValidateCommand() error {
... ...
... ... @@ -23,6 +23,7 @@ func (loadDataTableCommand *LoadDataTableCommand) Valid(validation *validation.V
if loadDataTableCommand.PageSize == 0 {
loadDataTableCommand.PageSize = 20
}
loadDataTableCommand.HeaderRow = domain.SetHeaderRow(loadDataTableCommand.HeaderRow)
}
func (loadDataTableCommand *LoadDataTableCommand) ValidateCommand() error {
... ...
package command
import (
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type ResetTableHeaderCommand struct {
// 文件ID
FileId int `cname:"文件ID" json:"objectId" valid:"Required"`
domain.Where
}
func (loadDataTableCommand *ResetTableHeaderCommand) Valid(validation *validation.Validation) {
loadDataTableCommand.HeaderRow = domain.SetHeaderRow(loadDataTableCommand.HeaderRow)
}
func (loadDataTableCommand *ResetTableHeaderCommand) ValidateCommand() error {
valid := validation.Validation{}
b, err := valid.Valid(loadDataTableCommand)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(loadDataTableCommand).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
... ... @@ -18,6 +18,8 @@ type FileDto struct {
Ext string `json:"ext"`
// 创建时间
Time string `json:"time"`
// 行号
HeaderRow int `json:"headerRow"`
}
func (d *FileDto) Load(f *domain.File) *FileDto {
... ... @@ -30,5 +32,6 @@ func (d *FileDto) Load(f *domain.File) *FileDto {
d.FileType = f.FileType
d.Ext = f.FileInfo.Ext
d.Time = xtime.New(f.UpdatedAt).Local().Format("2006-01-02 15:04:05")
d.HeaderRow = domain.GetHeaderRow(f.FileInfo.HeaderRow)
return d
}
... ...
... ... @@ -44,6 +44,38 @@ func (fileService *FileService) FilePreview(ctx *domain.Context, loadDataTableCo
return data, nil
}
func (fileService *FileService) ResetHeaderRow(ctx *domain.Context, loadDataTableCommand *command.ResetTableHeaderCommand) (interface{}, error) {
if err := loadDataTableCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
cache := redis.NewFileCacheService()
temporaryFile, err := cache.Get(redis.KeyTemporaryFileInfo(loadDataTableCommand.FileId))
if err != nil {
return nil, factory.FastError(err)
}
loadDataTableService, _ := factory.CreateLoadDataTableService(transactionContext)
data, err := loadDataTableService.RePreview(ctx, loadDataTableCommand.FileId, temporaryFile.Fields, loadDataTableCommand.Where)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return data, nil
}
// PrepareTemporaryFile 准备临时文件
func (fileService *FileService) PrepareTemporaryFile(ctx *domain.Context, cmd *command.PrepareTemporaryFileCommand) (interface{}, error) {
if err := cmd.ValidateCommand(); err != nil {
... ... @@ -152,6 +184,7 @@ func (fileService *FileService) FlushDataTable(ctx *domain.Context, flushDataTab
if _, err := flushDataTableService.Flush(ctx, flushDataTableCommand.ObjectId, &domain.Table{
DataFields: temporaryFile.Fields,
RowCount: temporaryFile.Total,
HeaderRow: temporaryFile.HeaderRow,
}); err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
... ...
... ... @@ -117,7 +117,9 @@ func (tableService *TableService) RowsDelete(ctx *domain.Context, cmd *command.R
if err != nil {
return nil, factory.FastError(err)
}
defer func() {
domainService.AsyncEvent(domain.NewEventTable(ctx, domain.TableDataEditEvent).WithTable(table))
}()
var options = starrocks.QueryOptions{
TableName: table.SQLName,
Select: []*domain.Field{domain.PK()}, //table.Fields(true),
... ...
... ... @@ -111,6 +111,7 @@ type (
ProcessFields []*Field `json:"processFields"`
Action string `json:"action"`
Params map[string]interface{} `json:"params"`
HeaderRow int `json:"headerRow"`
}
DataEditDataTable struct {
... ...
... ... @@ -11,6 +11,7 @@ type DataTable struct {
type Where struct {
PageNumber int `json:"pageNumber"`
PageSize int `json:"pageSize"`
HeaderRow int `json:"headerRow"` // 行号 默认:0
Conditions []Condition `json:"conditions"`
}
... ...
... ... @@ -19,6 +19,7 @@ type TableService interface {
type PreviewDataTableService interface {
Preview(ctx *Context, fileId int, fields []*Field, where Where) (interface{}, error)
RePreview(ctx *Context, fileId int, fields []*Field, where Where) (interface{}, error)
CreateTemporaryFile(ctx *Context, fileId int) (*File, error)
GetFileId() int
}
... ...
... ... @@ -85,3 +85,25 @@ func (file *File) CopyTo(fileType FileType, ctx *Context) *File {
}
return t
}
func (file *File) SetHeaderRow(headerRow int) {
//file.FileInfo.HeaderRow = headerRow
}
func (file *File) GetHeaderRow() int {
return file.FileInfo.HeaderRow
}
func SetHeaderRow(headerRow int) int {
if headerRow-1 < 0 {
return 0
}
return headerRow - 1
}
func GetHeaderRow(headerRow int) int {
if headerRow == 0 {
return 1
}
return headerRow + 1
}
... ...
... ... @@ -14,4 +14,6 @@ type FileInfo struct {
RowCount int `json:"rowCount"`
// 表结构ID
TableId int `json:"tableId"`
// 行号
HeaderRow int `json:"headerRow"`
}
... ...
... ... @@ -45,6 +45,9 @@ type Table struct {
Context *Context `json:"context"`
// 表信息
TableInfo *TableInfo `json:"tableInfo"`
// 表头行号 从0开始
HeaderRow int `json:"-"`
}
type TableRepository interface {
... ...
... ... @@ -16,6 +16,7 @@ type RequestCheckoutTablesQuery struct {
QueryParameters []domain.QueryParameter `json:"queryParameters"`
//QueryParameters map[string]interface{} `json:"queryParameters"`
SortParameters map[string]interface{} `json:"sortParameters"`
HeaderRow int `json:"headerRow"`
}
type DataCheckoutTables struct {
... ... @@ -37,6 +38,9 @@ func NewRequestCheckoutTablesQuery(param domain.ReqLoadDataTable) RequestCheckou
if param.IsFromOriginalTable {
tableFileUrl = param.TableFileUrl
}
if param.HeaderRow > 0 {
isSourceFile = true
}
return RequestCheckoutTablesQuery{
OriginalTableId: param.OriginalTableId,
IsFromOriginalTable: isSourceFile,
... ... @@ -48,6 +52,7 @@ func NewRequestCheckoutTablesQuery(param domain.ReqLoadDataTable) RequestCheckou
QueryParameters: make([]domain.QueryParameter, 0),
//QueryParameters: make(map[string]interface{}),
SortParameters: param.SortParameters,
HeaderRow: param.HeaderRow,
}
}
... ... @@ -61,6 +66,7 @@ type RequestCheckoutTablesPreProccess struct {
PageSize int `json:"pageSize"`
QueryParameters interface{} `json:"queryParameters"`
SortParameters map[string]interface{} `json:"sortParameters"`
HeaderRow int `json:"headerRow"`
}
func NewRequestCheckoutTablesPreProccess(param domain.ReqEditDataTable) RequestCheckoutTablesPreProccess {
... ... @@ -73,6 +79,7 @@ func NewRequestCheckoutTablesPreProccess(param domain.ReqEditDataTable) RequestC
PageSize: 20, //param.PageSize,
QueryParameters: []interface{}{},
SortParameters: make(map[string]interface{}),
HeaderRow: param.HeaderRow,
}
if len(param.Params) == 0 {
... ...
... ... @@ -31,6 +31,7 @@ func (ptr *EditDataTableService) Edit(ctx *domain.Context, req domain.EditTableR
ProcessFields: domain.ToFields(req.ProcessFields),
Action: req.Action,
Params: req.Params,
HeaderRow: req.Where.HeaderRow,
})
if err != nil {
return nil, err
... ...
... ... @@ -29,6 +29,7 @@ func (ptr *FlushDataTableService) Flush(ctx *domain.Context, fileId int, table *
if err != nil {
return nil, fmt.Errorf("源文件不存在")
}
sourceFile.SetHeaderRow(table.HeaderRow)
// New Table
table = NewTable(domain.ExcelTable, file.FileInfo.Name, table.DataFields, table.RowCount).WithContext(ctx)
// 通知底层保存、进行回调
... ... @@ -72,9 +73,13 @@ func (ptr *FlushDataTableService) flushSourceFile(ctx *domain.Context, table *do
if err != nil {
return err
}
if _, err = fileRepository.Save(sourceFile); err != nil {
return err
}
file.FileInfo.TableId = table.TableId
file.FileType = domain.VerifiedFile.ToString()
file.UpdateFileUrl(url)
file.SetHeaderRow(sourceFile.FileInfo.HeaderRow)
if file, err = fileRepository.Save(file); err != nil {
return err
}
... ...
... ... @@ -14,6 +14,50 @@ type PreviewDataTableService struct {
transactionContext *pgTransaction.TransactionContext
}
// RePreview 重新预览
func (ptr *PreviewDataTableService) RePreview(ctx *domain.Context, fileId int, fields []*domain.Field, where domain.Where) (interface{}, error) {
fileRepository, _ := repository.NewFileRepository(ptr.transactionContext)
file, err := fileRepository.FindOne(map[string]interface{}{"fileId": fileId})
if err != nil {
return nil, fmt.Errorf("校验文件不存在")
}
isSourceFile := true
fileUrl := file.FileInfo.Url
fileCache := redis.NewFileCacheService()
tempFile, _ := fileCache.Get(redis.KeyTemporaryFileInfo(fileId))
if tempFile == nil {
return nil, fmt.Errorf("临时文件不存在")
}
// Load Data From Excel(python api)
byteCore, _ := CreateByteCoreService()
response, err := byteCore.LoadDataTable(domain.ReqLoadDataTable{
FileId: file.FileId,
FileName: file.FileInfo.Name,
Url: file.FileInfo.Url,
Ext: file.FileInfo.Ext,
Where: where,
OriginalTableId: fmt.Sprintf("%v", file.FileId),
IsFromOriginalTable: isSourceFile,
TableFileUrl: fileUrl,
ColumnSchemas: bytelib.DomainFieldsToColumnSchemas(fields),
SortParameters: make(map[string]interface{}),
})
if err != nil {
return nil, err
}
// 强制刷新列
fields = response.Fields
cache := redis.NewFileCacheService()
tempFile, err = cache.Update(redis.KeyTemporaryFileInfo(file.FileId), file, fields, response.Total, redis.WithHeaderRow(where.HeaderRow))
if err != nil {
return nil, err
}
var responseDto = &FilePreviewDto{}
//responseDto.Load(file.FileId, response, tempFile)
return responseDto, nil
}
// Preview 预览 【data-table】
func (ptr *PreviewDataTableService) Preview(ctx *domain.Context, fileId int, fields []*domain.Field, where domain.Where) (interface{}, error) {
fileRepository, _ := repository.NewFileRepository(ptr.transactionContext)
... ... @@ -98,6 +142,7 @@ type FilePreviewDto struct {
Data interface{} `json:"grid"`
PageNumber int `json:"pageNumber"`
InValidCells []domain.InValidCell `json:"inValidCells"`
HeaderRow int `json:"headerRow"`
}
func (d *FilePreviewDto) Load(fileId int, m *domain.DataLoadDataTable, file *redis.TemporaryFileInfo) {
... ... @@ -137,6 +182,7 @@ func (d *FilePreviewDto) Load(fileId int, m *domain.DataLoadDataTable, file *red
}
d.InValidCells = domain.NewInValidCells(fields, mapData)
d.HeaderRow = domain.GetHeaderRow(file.HeaderRow)
}
func (ptr *PreviewDataTableService) GetFileId() int {
... ...
... ... @@ -20,6 +20,7 @@ type TemporaryFileInfo struct {
FileId int `json:"fileId"`
FileType string `json:"fileType"`
Total int `json:"total"`
HeaderRow int `json:"headerRow"`
Fields []*domain.Field `json:"fields"`
// 编辑表错误,有错误不允许保存成校验文件
// 行记录错误
... ... @@ -66,6 +67,11 @@ func (f *TemporaryFileInfo) SetTotal(total int) *TemporaryFileInfo {
return f
}
func (f *TemporaryFileInfo) SetHeaderRow(headerRow int) *TemporaryFileInfo {
f.HeaderRow = headerRow
return f
}
func (f *TemporaryFileInfo) AddConvertTypeError(e ConvertTypeError) *TemporaryFileInfo {
f.RemoveConvertTypeError(e)
f.addConvertTypeError(e)
... ... @@ -99,46 +105,52 @@ type ConvertTypeError struct {
type FileCacheService struct {
}
func (s *FileCacheService) Update(key string, file *domain.File, fields []*domain.Field, total int, errors ...FileCacheOptionsFunc) (*TemporaryFileInfo, error) {
func (s *FileCacheService) Update(key string, file *domain.File, fields []*domain.Field, total int, option ...FileCacheOptionsFunc) (*TemporaryFileInfo, error) {
options := NewFileCacheOptions(option...)
ok, err := ZeroCoreRedis.Exists(key)
var response = &TemporaryFileInfo{}
var tmpFile = &TemporaryFileInfo{}
if err != nil {
return response, err
return tmpFile, err
}
if !ok {
response.SetFile(file).SetFields(fields).SetTotal(total)
return response, ZeroCoreRedis.Setex(key, json.MarshalToString(response), TemporaryFileExpire)
tmpFile.SetFile(file).SetFields(fields).SetTotal(total)
if options.HasSetHeaderRow {
tmpFile.SetHeaderRow(options.HeaderRow)
}
return tmpFile, ZeroCoreRedis.Setex(key, json.MarshalToString(tmpFile), TemporaryFileExpire)
}
data, err := ZeroCoreRedis.Get(key)
if err != nil {
return nil, err
}
err = json.UnmarshalFromString(data, response)
err = json.UnmarshalFromString(data, tmpFile)
if err != nil {
return nil, err
}
response.SetFields(fields)
tmpFile.SetFields(fields)
if options.HasSetHeaderRow {
tmpFile.SetHeaderRow(options.HeaderRow)
}
options := NewFileCacheOptions(errors...)
for i := range options.AddConvertTypeErrors {
response.AddConvertTypeError(options.AddConvertTypeErrors[i])
tmpFile.AddConvertTypeError(options.AddConvertTypeErrors[i])
}
for i := range options.RemoveConvertTypeErrors {
convertType := options.RemoveConvertTypeErrors[i]
response.RemoveConvertTypeError(options.RemoveConvertTypeErrors[i])
for j := range response.Fields {
if response.Fields[j].Name == convertType.FieldName {
response.Fields[j].SQLType = convertType.ToType
tmpFile.RemoveConvertTypeError(options.RemoveConvertTypeErrors[i])
for j := range tmpFile.Fields {
if tmpFile.Fields[j].Name == convertType.FieldName {
tmpFile.Fields[j].SQLType = convertType.ToType
break
}
}
}
err = ZeroCoreRedis.Setex(key, json.MarshalToString(response), TemporaryFileExpire)
err = ZeroCoreRedis.Setex(key, json.MarshalToString(tmpFile), TemporaryFileExpire)
if err != nil {
return nil, err
}
return response, err
return tmpFile, err
}
func (s *FileCacheService) UpdateField(key string, file *domain.File, errors ...FileCacheOptionsFunc) (*TemporaryFileInfo, error) {
... ... @@ -209,6 +221,8 @@ type FileCacheOptions struct {
//OriginalFileId int
RemoveConvertTypeErrors []ConvertTypeError
AddConvertTypeErrors []ConvertTypeError
HeaderRow int
HasSetHeaderRow bool
}
type FileCacheOptionsFunc func(o *FileCacheOptions)
... ... @@ -225,6 +239,13 @@ func WithAddConvertTypeErrors(errors []ConvertTypeError) FileCacheOptionsFunc {
}
}
func WithHeaderRow(headerRow int) FileCacheOptionsFunc {
return func(o *FileCacheOptions) {
o.HeaderRow = headerRow
o.HasSetHeaderRow = true
}
}
//func WithOriginalFileId(originalFileId int) FileCacheOptionsFunc {
// return func(o *FileCacheOptions) {
// o.OriginalFileId = originalFileId
... ...
... ... @@ -368,6 +368,12 @@ func WrapDeleteFuncWithDB(db *gorm.DB) func(QueryOptions) (int64, error) {
continue
}
idList = append(idList, row[0])
if len(idList) > 5000 {
c := Condition{}
sql := fmt.Sprintf("delete from %v where id in %v", params.TableName, c.InArgs(idList))
query = db.Exec(sql)
idList = make([]string, 0)
}
}
if len(idList) == 0 {
return 0, nil
... ...
... ... @@ -248,6 +248,7 @@ func (controller *TableController) Preview() {
fileService := fileservice.NewFileService(nil)
cmd := &struct {
ObjectType string `json:"objectType"`
Action string `json:"action"`
}{}
Must(controller.Unmarshal(cmd))
var data interface{}
... ... @@ -271,6 +272,14 @@ func (controller *TableController) Preview() {
controller.Response(data, err)
}
func (controller *TableController) TableResetHeaderRow() {
fileService := fileservice.NewFileService(nil)
cmd := &filecommand.ResetTableHeaderCommand{}
Must(controller.Unmarshal(cmd))
data, err := fileService.ResetHeaderRow(ParseContext(controller.BaseController), cmd)
controller.Response(data, err)
}
func (controller *TableController) RowEdit() {
tableService := service.NewTableService(nil)
cmd := &command.RowEditCommandV2{}
... ...
... ... @@ -42,5 +42,7 @@ func init() {
web.Router("/business/db-table-preview", tableController, "Post:DBTablePreview")
web.Router("/data/table-preview", tableController, "Post:Preview")
web.Router("/data/reset-header-row", tableController, "Post:TableResetHeaderRow")
web.Router("/data/tables/exec/:name", tableController, "Get:ExecScript")
}
... ...
... ... @@ -19,5 +19,9 @@ func tableDataChangeHandler(e event.Event) error {
_, err := svr.Handler(nil, &command.TableEventCommand{
EventTable: et,
})
_, err = svr.DigitalPlatformEventSubscribe(nil, &command.TableEventCommand{
EventTable: et,
})
return err
}
... ...