作者 yangfu

feat: P1 case query set cascade update

package service
import (
"errors"
"fmt"
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
... ... @@ -10,6 +12,7 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/cache"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
)
type TableEventService struct {
... ... @@ -68,6 +71,62 @@ func (tableEventService *TableEventService) Handler(ctx *domain.Context, cmd *co
return nil, nil
}
func (tableEventService *TableEventService) HandlerTableAffectedMarkToConflictStatus(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
data := cmd.EventTable
tableId := 0
switch data.Type {
case domain.TableDataEditEvent:
tableId = data.Table.TableId
case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
default:
return nil, err
}
if tableId == 0 {
return nil, nil
}
// tableId 相关联的
tableRepository, _, _ := factory.FastPgTable(transactionContext, 0)
_, tables, err := tableRepository.Find(map[string]interface{}{"context": ctx, "dependencyTable": tableId, "tableTypesNotIn": []string{domain.TemporaryTable.ToString()}})
if errors.Is(err, domain.ErrorNotFound) {
return nil, nil
}
tableIds := make([]int, 0)
for _, table := range tables {
tableIds = append(tableIds, table.TableId)
}
if len(tableIds) == 0 {
return nil, nil
}
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
_, querySets, _ := querySetRepository.Find(map[string]interface{}{"context": ctx, "bindTableIds": tableIds})
for _, querySet := range querySets {
log.Logger.Debug(fmt.Sprintf("【集合状态更新】 id:%v name:%v ReadyStatus:1", querySet.QuerySetId, querySet.Name))
querySet.QuerySetInfo.WithConflictStatus()
_, err = querySetRepository.Save(querySet)
if err != nil {
return nil, err
}
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil, nil
}
func NewTableEventService(options map[string]interface{}) *TableEventService {
svr := &TableEventService{}
return svr
... ...
... ... @@ -12,6 +12,18 @@ func FastError(err error) error {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
func FastErrorResponse(err error, args ...interface{}) interface{} {
var response = make(map[string]interface{})
response["internalErr"] = err.Error()
for i := 0; i < len(args); i += 2 {
if i+1 >= len(args) {
break
}
response[args[i].(string)] = args[i+1]
}
return response
}
func FastDataTable(options starrocks.QueryOptions) (*domain.DataTable, error) {
var err error
// 待优化分批下载,压缩
... ...
... ... @@ -109,12 +109,13 @@ func (fileService *FileService) GetFile(ctx *domain.Context, getFileQuery *query
options["fileId"] = getFileQuery.FileId
}
if len(getFileQuery.FileName) > 0 {
options["fileName"] = getFileQuery.FileName
options["fileName"] = domain.FileName(getFileQuery.FileName)
}
if len(getFileQuery.FileType) > 0 {
options["fileType"] = getFileQuery.FileType
}
if len(options) == 0 {
// 未传递参数
if len(options) == 1 {
return response, nil
}
file, _ := fileRepository.FindOne(options)
... ...
package command
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type RefreshQuerySetCommand struct {
// 查询集合ID
QuerySetId int `cname:"查询集合ID" json:"querySetId" valid:"Required"`
}
func (updateQuerySetCommand *RefreshQuerySetCommand) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (updateQuerySetCommand *RefreshQuerySetCommand) ValidateCommand() error {
valid := validation.Validation{}
b, err := valid.Valid(updateQuerySetCommand)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(updateQuerySetCommand).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
... ... @@ -15,6 +15,8 @@ type QuerySetDetailDto struct {
QueryComponents []*domain.QueryComponent `json:"queryComponents"`
// 查询集绑定的表
TableId int `json:"tableId"`
// 在冲突状态 true:冲突异常 false:正常
InConflict bool `json:"inConflict"`
}
func (d *QuerySetDetailDto) Load(m *domain.QuerySet, mapTables map[int]*domain.Table) *QuerySetDetailDto {
... ... @@ -26,15 +28,31 @@ func (d *QuerySetDetailDto) Load(m *domain.QuerySet, mapTables map[int]*domain.T
if m.QuerySetInfo != nil {
d.TableId = m.QuerySetInfo.BindTableId
}
hasUpdateTable := false
for i, q := range d.QueryComponents {
if q.MasterTable != nil && q.MasterTable.TableId != 0 {
if t, ok := mapTables[q.MasterTable.TableId]; ok {
d.QueryComponents[i].MasterTable = domain.NewQueryComponentTable(t)
d.QueryComponents[i].UpdateTables(t)
hasUpdateTable = true
}
}
if d.QueryComponents[i].Aggregation != nil {
d.QueryComponents[i].Aggregation.Aggregation.AllFields = d.QueryComponents[i].Aggregation.AggregationFields()
}
if !hasUpdateTable && len(mapTables) == 1 {
for _, t := range mapTables {
d.QueryComponents[i].UpdateTables(t)
hasUpdateTable = true
break
}
}
}
d.InConflict = false
if m.QuerySetInfo != nil {
if m.QuerySetInfo.ReadyStatus == 1 {
d.InConflict = true
}
}
return d
}
... ...
... ... @@ -21,6 +21,8 @@ type QuerySetDto struct {
Time string `json:"time"`
// 绑定的表ID
BindTableId int `json:"tableId"`
// 在冲突状态 true:冲突异常 false:正常
InConflict bool `json:"inConflict"`
}
func (d *QuerySetDto) Load(m *domain.QuerySet) *QuerySetDto {
... ... @@ -33,6 +35,7 @@ func (d *QuerySetDto) Load(m *domain.QuerySet) *QuerySetDto {
d.Sort = m.Sort
d.Time = m.CreatedAt.Local().Format("2006-01-02 15:04:05")
d.BindTableId = m.QuerySetInfo.BindTableId
d.InConflict = m.QuerySetInfo.ReadyStatus == 1
return d
}
... ...
... ... @@ -396,6 +396,36 @@ func (querySetService *QuerySetService) UpdateQuerySet(ctx *domain.Context, upda
return struct{}{}, nil
}
// 更新查询集合服务
func (querySetService *QuerySetService) RefreshQuerySet(ctx *domain.Context, updateQuerySetCommand *command.RefreshQuerySetCommand) (interface{}, error) {
if err := updateQuerySetCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
svr, _ := factory.FastQuerySetServices(transactionContext)
_, querySet, err := factory.FastPgQuerySet(transactionContext, updateQuerySetCommand.QuerySetId)
if err != nil {
return nil, factory.FastError(err)
}
if err := svr.Update(ctx, updateQuerySetCommand.QuerySetId, querySet.QueryComponents); err != nil {
return factory.FastErrorResponse(err, "title", fmt.Sprintf("%v:%v", domain.EnumsDescription(domain.ObjectTypeMap, querySet.Type), querySet.Name), "result", "更新失败"), nil
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return struct{}{}, nil
}
func (querySetService *QuerySetService) PreviewPrepare(ctx *domain.Context, updateQuerySetCommand *command.UpdateQuerySetCommand) (interface{}, error) {
if err := updateQuerySetCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
... ...
... ... @@ -23,6 +23,8 @@ type TableObjectDto struct {
Flag string `json:"flag,omitempty"`
// 启用状态
Status int `json:"status"`
// 冲突状态
InConflict bool `json:"inConflict"`
// 表字段
Fields []*domain.Field `json:"fields"`
}
... ... @@ -61,6 +63,10 @@ func (d *TableObjectDto) Update(m *domain.QuerySet) *TableObjectDto {
d.Flag = m.Flag
d.Status = m.Status
d.ParentId = m.ParentId
d.InConflict = false
if m.QuerySetInfo != nil {
d.InConflict = m.QuerySetInfo.ReadyStatus == 1
}
return d
}
... ...
... ... @@ -362,13 +362,14 @@ func (tableService *TableService) UpdateTableStruct(ctx *domain.Context, cmd *co
}()
UpdateTableStructService, _ := factory.CreateUpdateTableStructService(transactionContext)
if _, err := UpdateTableStructService.UpdateTableStruct(ctx, cmd.TableId, cmd.Fields, cmd.Name); err != nil {
response, err := UpdateTableStructService.UpdateTableStruct(ctx, cmd.TableId, cmd.Fields, cmd.Name)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return struct{}{}, nil
return response, nil
}
// 更新表服务
... ...
package service
import (
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/table/query"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
)
func (tableService *TableService) DependencyGraph(ctx *domain.Context, cmd *query.GetTableQuery) (interface{}, error) {
if err := cmd.ValidateQuery(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
response, err := domainService.DependencyTables(transactionContext.(*pgTransaction.TransactionContext), ctx, cmd.TableId)
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return map[string]interface{}{
"nodes": response,
}, nil
}
... ...
... ... @@ -13,6 +13,7 @@ const (
TableDataImportEvent EventType = "table.data.import"
TableDeleteEvent EventType = "table.delete"
QuerySetUpdateEvent EventType = "table.query.set.update"
QuerySetUpdateRenameEvent EventType = "table.query.set.update.rename"
)
type EventTable struct {
... ...
... ... @@ -77,6 +77,7 @@ func (querySet *QuerySet) Update(queryComponents []*QueryComponent, tableId int)
if querySet.QuerySetInfo.BindTableId == 0 {
querySet.QuerySetInfo.BindTableId = tableId
}
querySet.QuerySetInfo.ResolveConflictStatus()
querySet.UpdatedAt = time.Now()
return nil
}
... ...
... ... @@ -38,7 +38,7 @@ type QueryComponent struct {
Layout *LayoutRule `json:"layout"`
}
func (qc QueryComponent) AllSelectExpr() []SelectExpr {
func (qc *QueryComponent) AllSelectExpr() []SelectExpr {
var res = make([]SelectExpr, 0)
for _, s := range qc.Selects {
res = append(res, s.SelectExpr)
... ... @@ -113,6 +113,24 @@ func (expr *FieldExpr) Complete() string {
return exprSql
}
func (expr *FieldExpr) UpdateTable(t *Table) {
if t == nil || t.TableId == 0 {
return
}
matchTableName := ""
for i, f := range expr.TableFields {
if f.TableId == t.TableId {
matchTableName = f.TableName
if f.TableId == t.TableId {
expr.TableFields[i].TableName = t.Name
}
}
}
if len(matchTableName) > 0 {
expr.ExprHuman = strings.ReplaceAll(expr.ExprHuman, matchTableName, t.Name)
}
}
func (expr *FieldExpr) Tables() []int {
set := collection.NewSet()
for _, f := range expr.TableFields {
... ...
package domain
func (qc *QueryComponent) UpdateTables(tables ...*Table) {
for _, table := range tables {
qc.updateTable(table)
}
}
func (qc *QueryComponent) updateTable(table *Table) {
for i := range qc.Conditions {
qc.Conditions[i].FieldLeft.UpdateTable(table)
qc.Conditions[i].FieldRight.UpdateTable(table)
}
for i := range qc.Selects {
qc.Selects[i].FieldLeft.UpdateTable(table)
qc.Selects[i].FieldRight.UpdateTable(table)
for j := range qc.Selects[i].SubSelects {
qc.Selects[i].SubSelects[j].FieldLeft.UpdateTable(table)
qc.Selects[i].SubSelects[j].FieldRight.UpdateTable(table)
}
}
if qc.Formula != nil && len(qc.Formula.ExprHuman) > 0 {
qc.Formula.FieldExpr.UpdateTable(table)
}
if qc.Aggregation != nil {
for i := range qc.Aggregation.RowFields {
qc.Aggregation.RowFields[i].Expr.UpdateTable(table)
}
for i := range qc.Aggregation.ValueFields {
qc.Aggregation.ValueFields[i].Expr.UpdateTable(table)
}
}
if qc.Layout != nil {
for i := range qc.Layout.Cells {
if qc.Layout.Cells[i].Data == nil || qc.Layout.Cells[i].Data.TableField == nil {
continue
}
if qc.Layout.Cells[i].Data.TableField.TableId == table.TableId {
qc.Layout.Cells[i].Data.TableField.TableName = table.Name
}
}
}
}
... ...
... ... @@ -2,4 +2,13 @@ package domain
type QuerySetInfo struct {
BindTableId int // 查询集绑定的表
ReadyStatus int // 准备状态 0:正常 1:冲突状态
}
func (q *QuerySetInfo) WithConflictStatus() {
q.ReadyStatus = 1
}
func (q *QuerySetInfo) ResolveConflictStatus() {
q.ReadyStatus = 0
}
... ...
... ... @@ -878,7 +878,7 @@ func (ptr *QuerySetService) Rename(ctx *domain.Context, querySetId int, name str
return ErrQuerySetNameExists
}
qs.Name = name
_, err = querySetRepository.Save(qs)
qs, err = querySetRepository.Save(qs)
if err != nil {
return err
}
... ... @@ -896,6 +896,9 @@ func (ptr *QuerySetService) Rename(ctx *domain.Context, querySetId int, name str
return err
}
}
defer func() {
AsyncEvent(domain.NewEventTable(ctx, domain.QuerySetUpdateRenameEvent).WithQuerySet(qs))
}()
// 日志
if err = FastLog(ptr.transactionContext, domain.QuerySetLog, qs.QuerySetId, &RenameQuerySetLog{
LogEntry: domain.NewLogEntry(qs.Name, qs.Type, domain.UnKnown, ctx),
... ...
... ... @@ -3,8 +3,10 @@ package domainService
import (
"fmt"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"github.com/zeromicro/go-zero/core/collection"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/repository"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/utils"
"time"
)
... ... @@ -78,9 +80,66 @@ func (ptr *UpdateTableStructService) UpdateTableStruct(ctx *domain.Context, tabl
if _, err = ByteCore.SplitTable(domain.ReqSplitTable{FromTable: mainTable, ToSubTable: table}); err != nil {
return nil, err
}
//var tablesAffected []TableNode
//if len(adds) > 0 {
// tablesAffected, _ = DependencyTables(ptr.transactionContext, ctx, tableId)
//}
//return map[string]interface{}{
// "tablesAffected": tablesAffected,
//}, nil
return struct{}{}, nil
}
func DependencyTables(ptr *pgTransaction.TransactionContext, context *domain.Context, tableId int) ([]TableNode, error) {
ret := make([]TableNode, 0)
// tableId 相关联的
tableRepository, _ := repository.NewTableRepository(ptr)
_, tables, err := tableRepository.Find(map[string]interface{}{"context": context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
if err != nil {
return nil, err
}
tableDependencyService, _ := NewTableDependencyService(ptr)
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
set := collection.NewSet()
stack := utils.NewEmptyStack()
list := make([]int, 0)
stack.Push(tableId)
for {
item := stack.Pop()
if item == nil {
break
}
id := item.(int)
for _, edge := range tableDependTree.Edges {
if edge.DependChildId == id {
stack.Push(edge.Id)
if !set.Contains(edge.Id) {
set.Add(edge.Id)
list = append(list, edge.Id)
}
}
}
}
if len(list) > 0 {
querySetRepository, _ := repository.NewQuerySetRepository(ptr)
_, querySets, _ := querySetRepository.Find(map[string]interface{}{"context": context, "bindTableIds": list})
for _, id := range list {
if v, ok := tableDependencyService.TableMap[id]; ok {
node := NewTableNode(v)
for _, q := range querySets {
if q.QuerySetInfo.BindTableId == node.TableId {
node.QuerySetId = q.QuerySetId
break
}
}
ret = append(ret, node)
}
}
}
return ret, nil
}
//func MappingFields(mainTable *domain.Table, fields []*domain.Field) []*domain.Field {
// tableFields := mainTable.Fields(false)
// tableFieldsMap := (domain.Fields)(tableFields).ToMap()
... ...
... ... @@ -29,6 +29,14 @@ func (controller *QuerySetController) UpdateQuerySet() {
controller.Response(data, err)
}
func (controller *QuerySetController) RefreshQuerySet() {
querySetService := service.NewQuerySetService(nil)
updateQuerySetCommand := &command.RefreshQuerySetCommand{}
Must(controller.Unmarshal(updateQuerySetCommand))
data, err := querySetService.RefreshQuerySet(ParseContext(controller.BaseController), updateQuerySetCommand)
controller.Response(data, err)
}
func (controller *QuerySetController) PreviewPrepare() {
querySetService := service.NewQuerySetService(nil)
updateQuerySetCommand := &command.UpdateQuerySetCommand{}
... ...
... ... @@ -276,3 +276,11 @@ func (controller *TableController) RowEdit() {
data, err := tableService.RowEditV2(ParseContext(controller.BaseController), cmd)
controller.Response(data, err)
}
func (controller *TableController) DependencyGraph() {
tableService := service.NewTableService(nil)
updateTableCommand := &query.GetTableQuery{}
controller.Unmarshal(updateTableCommand)
data, err := tableService.DependencyGraph(ParseContext(controller.BaseController), updateTableCommand)
controller.Response(data, err)
}
... ...
... ... @@ -19,6 +19,7 @@ func init() {
web.Router("/data/query-sets/rename", &controllers.QuerySetController{}, "Post:Rename")
web.Router("/data/query-sets/search", &controllers.QuerySetController{}, "Post:SearchQuerySet")
web.Router("/data/query-sets/preview-prepare", &controllers.QuerySetController{}, "Post:PreviewPrepare")
web.Router("/data/query-sets/refresh", &controllers.QuerySetController{}, "Post:RefreshQuerySet")
web.Router("/data/query-sets/formula/", &controllers.QuerySetController{}, "Post:CreateQuerySet")
web.Router("/data/query-sets/formula/:querySetId", &controllers.QuerySetController{}, "Put:UpdateQuerySet")
... ...
... ... @@ -21,6 +21,7 @@ func init() {
web.Router("/data/tables/search-appended-list", &controllers.TableController{}, "Post:SearchAppendedList")
web.Router("/data/tables/search-sub-table-list", &controllers.TableController{}, "Post:SearchSubTableList")
web.Router("/data/tables/search-query-set-tables", &controllers.TableController{}, "Post:SearchQuerySetTables")
web.Router("/data/tables/dependency-graph", &controllers.TableController{}, "Post:DependencyGraph")
//web.Router("/data/tables/split-data-table", &controllers.TableController{}, "Post:SplitDataTable")
//web.Router("/data/tables/batch-edit-sub-table", &controllers.TableController{}, "Post:BatchEditSubTable")
... ...
... ... @@ -14,4 +14,5 @@ func RegisterEvent() {
event.On(domain.TableDataImportEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.TableDeleteEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.QuerySetUpdateEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.QuerySetUpdateRenameEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
}
... ...
... ... @@ -14,5 +14,9 @@ func tableDataChangeHandler(e event.Event) error {
_, err := svr.Handler(nil, &command.TableEventCommand{
EventTable: et,
})
svr.HandlerTableAffectedMarkToConflictStatus(et.Context, &command.TableEventCommand{
EventTable: et,
})
return err
}
... ...