...
|
...
|
@@ -4,6 +4,7 @@ import ( |
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/pkg/errors"
|
|
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
|
...
|
...
|
@@ -39,7 +40,9 @@ func (logic *ByteNoticeLogic) Consume(key, value string) error { |
|
|
return err
|
|
|
}
|
|
|
//处理消息
|
|
|
err = logic.handleNotice(notice)
|
|
|
err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
return logic.handleNotice(conn, notice)
|
|
|
}, true)
|
|
|
//更新处理结果
|
|
|
if err != nil {
|
|
|
notice.Status = domain.ObjectNoticeStatusError
|
...
|
...
|
@@ -53,10 +56,10 @@ func (logic *ByteNoticeLogic) Consume(key, value string) error { |
|
|
}
|
|
|
|
|
|
// handleNotice 处理消息
|
|
|
func (logic *ByteNoticeLogic) handleNotice(notice *domain.ObjectNotice) error {
|
|
|
func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain.ObjectNotice) error {
|
|
|
//是否删除消息
|
|
|
if notice.IsDeletedEvent() {
|
|
|
return logic.handleDelete(notice)
|
|
|
return logic.handleDelete(conn, notice)
|
|
|
}
|
|
|
accessToken, _ := types.TableAccessToken{CompanyId: notice.CompanyId}.GenerateToken()
|
|
|
//结构变更
|
...
|
...
|
@@ -100,17 +103,25 @@ func (logic *ByteNoticeLogic) handleNotice(notice *domain.ObjectNotice) error { |
|
|
CompanyId: notice.CompanyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
Fields: tableInfo.Fields,
|
|
|
})
|
|
|
//父级节点
|
|
|
objectTables = append(objectTables, logic.getParents(notice.CompanyId, item.ParentId, list.List)...)
|
|
|
}
|
|
|
}
|
|
|
err = logic.saveTables(objectTables)
|
|
|
err = logic.saveTables(conn, objectTables)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
//保存字段
|
|
|
_, err = logic.saveFields(conn, &domain.ObjectField{
|
|
|
Id: int64(tableInfo.TableId),
|
|
|
Name: tableInfo.Name,
|
|
|
Fields: tableInfo.Fields,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
//数据变更
|
|
|
if notice.DataChanged {
|
...
|
...
|
@@ -136,10 +147,15 @@ func (logic *ByteNoticeLogic) handleNotice(notice *domain.ObjectNotice) error { |
|
|
acquire, err := lock.Acquire()
|
|
|
fmt.Println(acquire, err)
|
|
|
defer lock.Release()
|
|
|
err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
return logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
|
|
|
}, true)
|
|
|
fmt.Println(err)
|
|
|
err = logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//更新标记本地存储
|
|
|
err = logic.updateTableWithLocal(conn, int(tableDataPreview.ObjectId))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
...
|
...
|
@@ -156,7 +172,6 @@ func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []* |
|
|
CompanyId: companyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
Fields: item.Fields,
|
|
|
})
|
|
|
if item.ParentId > 0 {
|
|
|
result = append(result, logic.getParents(companyId, item.ParentId, list)...)
|
...
|
...
|
@@ -167,11 +182,11 @@ func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []* |
|
|
}
|
|
|
|
|
|
// handleDelete 删除
|
|
|
func (logic *ByteNoticeLogic) handleDelete(notice *domain.ObjectNotice) error {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, logic.conn, notice.TableId)
|
|
|
func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain.ObjectNotice) error {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, notice.TableId)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
objectTable.RemoteDeleted = 1
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, logic.conn, objectTable)
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
...
|
...
|
@@ -180,26 +195,46 @@ func (logic *ByteNoticeLogic) handleDelete(notice *domain.ObjectNotice) error { |
|
|
}
|
|
|
|
|
|
// saveTables 保存表结构
|
|
|
func (logic *ByteNoticeLogic) saveTables(tables []*domain.ObjectTable) error {
|
|
|
err := transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
if len(tables) > 0 {
|
|
|
for _, item := range tables {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
item.Id = objectTable.Id
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
func (logic *ByteNoticeLogic) saveTables(conn transaction.Conn, tables []*domain.ObjectTable) error {
|
|
|
if len(tables) > 0 {
|
|
|
for _, item := range tables {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
item.Id = objectTable.Id
|
|
|
item.Version = objectTable.Version + 1
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}, true)
|
|
|
return err
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// saveFields 保存表字段
|
|
|
func (logic *ByteNoticeLogic) saveFields(conn transaction.Conn, objectField *domain.ObjectField) (*domain.ObjectField, error) {
|
|
|
mField, err := logic.svcCtx.ObjectFieldRepository.FindOne(logic.ctx, conn, objectField.Id)
|
|
|
if err == nil && mField.Id > 0 { //已存在 - 更新
|
|
|
objectField.Version = mField.Version + 1
|
|
|
return logic.svcCtx.ObjectFieldRepository.Update(logic.ctx, conn, objectField)
|
|
|
} else {
|
|
|
return logic.svcCtx.ObjectFieldRepository.Insert(logic.ctx, conn, objectField)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// updateTableWithLocal 更新表标记本地存储
|
|
|
func (logic *ByteNoticeLogic) updateTableWithLocal(conn transaction.Conn, tableId int) error {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, tableId)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
objectTable.IsLocal = true
|
|
|
_, err = logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
|
|
|
return err
|
|
|
}
|
|
|
return errors.New("表不存在")
|
|
|
} |
...
|
...
|
|