作者 庄敏学

本地存储

1 package main 1 package main
2 2
3 import ( 3 import (
4 - "encoding/json"  
5 "flag" 4 "flag"
6 - "fmt"  
7 "github.com/golang-jwt/jwt/v4/request" 5 "github.com/golang-jwt/jwt/v4/request"
8 "github.com/zeromicro/go-queue/kq" 6 "github.com/zeromicro/go-queue/kq"
9 "github.com/zeromicro/go-zero/core/logx" 7 "github.com/zeromicro/go-zero/core/logx"
@@ -69,26 +67,26 @@ func main() { @@ -69,26 +67,26 @@ func main() {
69 func startConsume(c config.Config) { 67 func startConsume(c config.Config) {
70 //svcCtx := svc.NewServiceContext(c) 68 //svcCtx := svc.NewServiceContext(c)
71 go func() { 69 go func() {
72 - for {  
73 - notice := &domain.ObjectNotice{  
74 - CompanyId: 1598224576532189184,  
75 - TableId: 521,  
76 - TableType: "主表",  
77 - ObjectType: "导入模块",  
78 - Event: "table.data.edit",  
79 - TableAffectedList: []int{521},  
80 - DataChanged: true,  
81 - StructChanged: true,  
82 - MetaData: domain.ObjectNoticeMetaData{  
83 - Module: 0,  
84 - Status: 0,  
85 - },  
86 - }  
87 - mBytes, _ := json.Marshal(notice)  
88 - err := kq.NewPusher(c.KqConsumerConf.Brokers, c.KqConsumerConf.Topic).Push(string(mBytes))  
89 - fmt.Println(err)  
90 - time.Sleep(10 * 10 * time.Second)  
91 - } 70 + //for {
  71 + // notice := &domain.ObjectNotice{
  72 + // CompanyId: 1598224576532189184,
  73 + // TableId: 521,
  74 + // TableType: "主表",
  75 + // ObjectType: "导入模块",
  76 + // Event: "table.data.edit",
  77 + // TableAffectedList: []int{521},
  78 + // DataChanged: true,
  79 + // StructChanged: true,
  80 + // MetaData: domain.ObjectNoticeMetaData{
  81 + // Module: 0,
  82 + // Status: 0,
  83 + // },
  84 + // }
  85 + // mBytes, _ := json.Marshal(notice)
  86 + // err := kq.NewPusher(c.KqConsumerConf.Brokers, c.KqConsumerConf.Topic).Push(string(mBytes))
  87 + // fmt.Println(err)
  88 + // time.Sleep(10 * 10 * time.Second)
  89 + //}
92 }() 90 }()
93 //kafka消费队列 处理字库推送事件 91 //kafka消费队列 处理字库推送事件
94 go func() { 92 go func() {
@@ -33,7 +33,7 @@ ByteMetadata: @@ -33,7 +33,7 @@ ByteMetadata:
33 KqConsumerConf: 33 KqConsumerConf:
34 Name: KqConsumer 34 Name: KqConsumer
35 Brokers: 35 Brokers:
36 - - 192.168.100.221:9092  
37 - Group: sumifcc  
38 - Topic: sumifcc 36 + - 47.97.5.102:9092
  37 + Group: bchart_dev
  38 + Topic: allied_creation_metadata_table_sync_notice
39 Processors: 1 39 Processors: 1
@@ -95,6 +95,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { @@ -95,6 +95,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
95 Path: "/table/data", 95 Path: "/table/data",
96 Handler: table.SearchTableDataHandler(serverCtx), 96 Handler: table.SearchTableDataHandler(serverCtx),
97 }, 97 },
  98 + {
  99 + Method: http.MethodGet,
  100 + Path: "/table/sync",
  101 + Handler: table.SyncTableHandler(serverCtx),
  102 + },
98 }, 103 },
99 rest.WithPrefix("/v1"), 104 rest.WithPrefix("/v1"),
100 ) 105 )
  1 +package table
  2 +
  3 +import (
  4 + "net/http"
  5 +
  6 + "github.com/zeromicro/go-zero/rest/httpx"
  7 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/logic/table"
  8 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
  9 +)
  10 +
  11 +func SyncTableHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
  12 + return func(w http.ResponseWriter, r *http.Request) {
  13 + l := table.NewSyncTableLogic(r.Context(), svcCtx)
  14 + resp, err := l.SyncTable()
  15 + if err != nil {
  16 + httpx.ErrorCtx(r.Context(), w, err)
  17 + } else {
  18 + httpx.OkJsonCtx(r.Context(), w, resp)
  19 + }
  20 + }
  21 +}
@@ -133,8 +133,8 @@ func (l *SearchTableByModuleLogic) getTableByLocal(modules []string) (types.Sear @@ -133,8 +133,8 @@ func (l *SearchTableByModuleLogic) getTableByLocal(modules []string) (types.Sear
133 List: make([]types.SearchTableByModuleItem, 0), 133 List: make([]types.SearchTableByModuleItem, 0),
134 } 134 }
135 total, list, err := l.svcCtx.ObjectTableRepository.Find(l.ctx, l.svcCtx.DefaultDBConn(), map[string]interface{}{ 135 total, list, err := l.svcCtx.ObjectTableRepository.Find(l.ctx, l.svcCtx.DefaultDBConn(), map[string]interface{}{
136 - "companyId": tenantId,  
137 - "tableType": modules, 136 + "companyId": tenantId,
  137 + "tableTypeIn": modules,
138 }) 138 })
139 if err != nil { 139 if err != nil {
140 return response, err 140 return response, err
  1 +package table
  2 +
  3 +import (
  4 + "context"
  5 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
  6 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
  7 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
  8 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
  9 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/pkg/xerr"
  10 +
  11 + "github.com/zeromicro/go-zero/core/logx"
  12 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
  13 +)
  14 +
  15 +type SyncTableLogic struct {
  16 + logx.Logger
  17 + ctx context.Context
  18 + svcCtx *svc.ServiceContext
  19 + conn transaction.Conn
  20 +}
  21 +
  22 +func NewSyncTableLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncTableLogic {
  23 + return &SyncTableLogic{
  24 + Logger: logx.WithContext(ctx),
  25 + ctx: ctx,
  26 + svcCtx: svcCtx,
  27 + conn: svcCtx.DefaultDBConn(),
  28 + }
  29 +}
  30 +
  31 +func (l *SyncTableLogic) SyncTable() (resp interface{}, err error) {
  32 + companyIds, err := l.svcCtx.ChartRepository.FindCompanyIds(l.ctx, l.conn)
  33 + if err != nil {
  34 + return nil, xerr.NewErrMsg("查询公司ID失败:" + err.Error())
  35 + }
  36 + tables := make([]*domain.ObjectTable, 0)
  37 + for _, companyId := range companyIds {
  38 + accessToken, _ := types.TableAccessToken{CompanyId: companyId}.GenerateToken()
  39 + //获取导入模块
  40 + response, err := l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
  41 + Token: accessToken,
  42 + TableTypes: []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable},
  43 + Module: bytelib.ModuleChartTemplate,
  44 + })
  45 + if err == nil {
  46 + tables = append(tables, l.getTables(companyId, response)...)
  47 + }
  48 + //拆解模块
  49 + response, err = l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
  50 + Token: accessToken,
  51 + TableTypes: []string{bytelib.SchemaTable},
  52 + Module: bytelib.ModuleQuerySetCenter,
  53 + ReturnGroupItem: true,
  54 + })
  55 + if err == nil {
  56 + tables = append(tables, l.getTables(companyId, response)...)
  57 + }
  58 + //计算项 计算集
  59 + response, err = l.svcCtx.ByteMetadataService.ObjectTableSearch(l.ctx, bytelib.ObjectTableSearchRequest{
  60 + Token: accessToken,
  61 + TableTypes: []string{bytelib.CalculateItem, bytelib.CalculateSet},
  62 + Module: bytelib.ModuleCalculateCenter,
  63 + ReturnGroupItem: true,
  64 + ExcludeTables: []int{0},
  65 + })
  66 + if err == nil {
  67 + tables = append(tables, l.getTables(companyId, response)...)
  68 + }
  69 + }
  70 + //获取字段信息
  71 + fields := l.getFields(tables)
  72 + //保存数据
  73 + err = transaction.UseTrans(l.ctx, l.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
  74 + err = l.SaveTables(conn, tables)
  75 + if err != nil {
  76 + return err
  77 + }
  78 + err = l.SaveFields(conn, fields)
  79 + if err != nil {
  80 + return err
  81 + }
  82 + return nil
  83 + }, true)
  84 + return companyIds, err
  85 +}
  86 +
  87 +func (l *SyncTableLogic) getTables(companyId int64, response bytelib.ObjectTableSearchResponse) []*domain.ObjectTable {
  88 + tables := make([]*domain.ObjectTable, 0)
  89 + if len(response.List) > 0 {
  90 + for _, item := range response.List {
  91 + tables = append(tables, &domain.ObjectTable{
  92 + Id: item.Id,
  93 + TableId: item.TableId,
  94 + Name: item.Name,
  95 + TableType: item.TableType,
  96 + CompanyId: companyId,
  97 + ParentId: item.ParentId,
  98 + Flag: item.Flag,
  99 + Version: 1,
  100 + IsLocal: false,
  101 + RemoteDeleted: 0,
  102 + })
  103 + }
  104 + }
  105 + return tables
  106 +}
  107 +
  108 +func (l *SyncTableLogic) getFields(list []*domain.ObjectTable) []*domain.ObjectField {
  109 + fields := make([]*domain.ObjectField, 0)
  110 + for _, item := range list {
  111 + accessToken, _ := types.TableAccessToken{CompanyId: item.CompanyId}.GenerateToken()
  112 + if item.TableId > 0 {
  113 + response, err := l.svcCtx.ByteMetadataService.TableInfo(l.ctx, &bytelib.TableInfoRequest{
  114 + Token: accessToken,
  115 + TableId: item.TableId,
  116 + })
  117 + if err == nil {
  118 + fields = append(fields, &domain.ObjectField{
  119 + Id: int64(item.TableId),
  120 + Name: response.Name,
  121 + Fields: response.Fields,
  122 + Version: 1,
  123 + })
  124 + }
  125 + }
  126 + }
  127 + return fields
  128 +}
  129 +
  130 +// SaveTables 保存数据
  131 +func (l *SyncTableLogic) SaveTables(conn transaction.Conn, list []*domain.ObjectTable) error {
  132 + for _, item := range list {
  133 + objectTable, err := l.svcCtx.ObjectTableRepository.FindOne(l.ctx, conn, item.Id)
  134 + if err == nil && objectTable.Id > 0 {
  135 + item.IsLocal = objectTable.IsLocal
  136 + item.Version = objectTable.Version + 1
  137 + }
  138 + _, err = l.svcCtx.ObjectTableRepository.Insert(l.ctx, conn, item)
  139 + if err != nil {
  140 + return err
  141 + }
  142 + }
  143 + return nil
  144 +}
  145 +
  146 +func (l *SyncTableLogic) SaveFields(conn transaction.Conn, list []*domain.ObjectField) error {
  147 + for _, item := range list {
  148 + objectTable, err := l.svcCtx.ObjectFieldRepository.FindOne(l.ctx, conn, item.Id)
  149 + if err == nil && objectTable.Id > 0 {
  150 + item.Version = objectTable.Version + 1
  151 + }
  152 + _, err = l.svcCtx.ObjectFieldRepository.Insert(l.ctx, conn, item)
  153 + if err != nil {
  154 + return err
  155 + }
  156 + }
  157 + return nil
  158 +}
@@ -247,6 +247,9 @@ type SyncTableDataPusher struct { @@ -247,6 +247,9 @@ type SyncTableDataPusher struct {
247 ObjectId int `json:"objectId"` 247 ObjectId int `json:"objectId"`
248 } 248 }
249 249
  250 +type SyncTableResponse struct {
  251 +}
  252 +
250 type AppPageGetRequest struct { 253 type AppPageGetRequest struct {
251 Id int64 `path:"id"` 254 Id int64 `path:"id"`
252 } 255 }
@@ -201,6 +201,18 @@ func (repository *ChartRepository) FindOneByGroup(ctx context.Context, conn tran @@ -201,6 +201,18 @@ func (repository *ChartRepository) FindOneByGroup(ctx context.Context, conn tran
201 return repository.ModelToDomainModel(m) 201 return repository.ModelToDomainModel(m)
202 } 202 }
203 203
  204 +// FindCompanyIds 获取所有的公司ID
  205 +func (repository *ChartRepository) FindCompanyIds(ctx context.Context, conn transaction.Conn) ([]int64, error) {
  206 + var (
  207 + err error
  208 + tx = conn.DB()
  209 + m = new(models.Chart)
  210 + )
  211 + list := make([]int64, 0)
  212 + err = tx.Model(&m).Group("tenant_id").Pluck("tenant_id", &list).Error
  213 + return list, err
  214 +}
  215 +
204 func (repository *ChartRepository) ModelToDomainModel(from *models.Chart) (*domain.Chart, error) { 216 func (repository *ChartRepository) ModelToDomainModel(from *models.Chart) (*domain.Chart, error) {
205 to := &domain.Chart{} 217 to := &domain.Chart{}
206 err := copier.Copy(to, from) 218 err := copier.Copy(to, from)
@@ -29,7 +29,6 @@ func (repository *ObjectFieldRepository) Insert(ctx context.Context, conn transa @@ -29,7 +29,6 @@ func (repository *ObjectFieldRepository) Insert(ctx context.Context, conn transa
29 } 29 }
30 dm.Id = m.Id 30 dm.Id = m.Id
31 return repository.ModelToDomainModel(m) 31 return repository.ModelToDomainModel(m)
32 -  
33 } 32 }
34 33
35 func (repository *ObjectFieldRepository) Update(ctx context.Context, conn transaction.Conn, dm *domain.ObjectField) (*domain.ObjectField, error) { 34 func (repository *ObjectFieldRepository) Update(ctx context.Context, conn transaction.Conn, dm *domain.ObjectField) (*domain.ObjectField, error) {
@@ -81,7 +80,7 @@ func (repository *ObjectFieldRepository) Delete(ctx context.Context, conn transa @@ -81,7 +80,7 @@ func (repository *ObjectFieldRepository) Delete(ctx context.Context, conn transa
81 m = &models.ObjectField{Id: dm.Identify().(int64)} 80 m = &models.ObjectField{Id: dm.Identify().(int64)}
82 ) 81 )
83 queryFunc := func() (interface{}, error) { 82 queryFunc := func() (interface{}, error) {
84 - tx = tx.Where("id = ?", m.Id).Delete(m) 83 + tx = tx.Where("id = ?", m.Id).Unscoped().Delete(m)
85 return m, tx.Error 84 return m, tx.Error
86 } 85 }
87 if _, err := repository.Query(queryFunc, m.CacheKeyFunc()); err != nil { 86 if _, err := repository.Query(queryFunc, m.CacheKeyFunc()); err != nil {
@@ -81,7 +81,7 @@ func (repository *ObjectTableRepository) Delete(ctx context.Context, conn transa @@ -81,7 +81,7 @@ func (repository *ObjectTableRepository) Delete(ctx context.Context, conn transa
81 m = &models.ObjectTable{Id: dm.Identify().(int)} 81 m = &models.ObjectTable{Id: dm.Identify().(int)}
82 ) 82 )
83 queryFunc := func() (interface{}, error) { 83 queryFunc := func() (interface{}, error) {
84 - tx = tx.Where("id = ?", m.Id).Delete(m) 84 + tx = tx.Where("id = ?", m.Id).Unscoped().Delete(m)
85 return m, tx.Error 85 return m, tx.Error
86 } 86 }
87 if _, err := repository.Query(queryFunc, m.CacheKeyFunc()); err != nil { 87 if _, err := repository.Query(queryFunc, m.CacheKeyFunc()); err != nil {
@@ -35,6 +35,7 @@ type ChartRepository interface { @@ -35,6 +35,7 @@ type ChartRepository interface {
35 Find(ctx context.Context, conn transaction.Conn, queryOptions map[string]interface{}) (int64, []*Chart, error) 35 Find(ctx context.Context, conn transaction.Conn, queryOptions map[string]interface{}) (int64, []*Chart, error)
36 FindOneByGroup(ctx context.Context, conn transaction.Conn, tenantId, pid int64) (*Chart, error) 36 FindOneByGroup(ctx context.Context, conn transaction.Conn, tenantId, pid int64) (*Chart, error)
37 FindByTypeAndName(ctx context.Context, conn transaction.Conn, tenantId int64, t string, name string) (int64, []*Chart, error) 37 FindByTypeAndName(ctx context.Context, conn transaction.Conn, tenantId int64, t string, name string) (int64, []*Chart, error)
  38 + FindCompanyIds(ctx context.Context, conn transaction.Conn) ([]int64, error)
38 } 39 }
39 40
40 /*************** 索引函数 开始****************/ 41 /*************** 索引函数 开始****************/
@@ -3,6 +3,7 @@ package domain @@ -3,6 +3,7 @@ package domain
3 import ( 3 import (
4 "context" 4 "context"
5 "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction" 5 "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
  6 + "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
6 "gorm.io/plugin/soft_delete" 7 "gorm.io/plugin/soft_delete"
7 ) 8 )
8 9
@@ -69,14 +70,14 @@ func (m *ObjectNotice) IsDeletedEvent() bool { @@ -69,14 +70,14 @@ func (m *ObjectNotice) IsDeletedEvent() bool {
69 } 70 }
70 //导入模块 取消应用和应用于 71 //导入模块 取消应用和应用于
71 if m.Event == "table.apply-on" { 72 if m.Event == "table.apply-on" {
72 - if m.MetaData.Module&1 == 0 { 73 + if m.MetaData.Module&bytelib.ModuleChartTemplate == 0 {
73 return true 74 return true
74 } 75 }
75 } 76 }
76 //拆解模块 方案启用、禁用 77 //拆解模块 方案启用、禁用
77 //计算模块 计算项和计算集启用、禁用 78 //计算模块 计算项和计算集启用、禁用
78 if m.Event == "table.query.set.update.status" { 79 if m.Event == "table.query.set.update.status" {
79 - if m.MetaData.Status&1 == 0 { 80 + if m.MetaData.Status&bytelib.ModuleChartTemplate == 0 {
80 return true 81 return true
81 } 82 }
82 } 83 }
@@ -26,6 +26,10 @@ service Core { @@ -26,6 +26,10 @@ service Core {
26 @doc "源数据表-数据" 26 @doc "源数据表-数据"
27 @handler searchTableData 27 @handler searchTableData
28 post /table/data (SearchTableDataRequest) returns (SearchTableDataResponse) 28 post /table/data (SearchTableDataRequest) returns (SearchTableDataResponse)
  29 +
  30 + @doc "源数据表-初始化同步表(首次使用本地存储执行一次)"
  31 + @handler syncTable
  32 + get /table/sync returns (SyncTableResponse)
29 } 33 }
30 34
31 @server( 35 @server(
@@ -101,4 +105,7 @@ type ( @@ -101,4 +105,7 @@ type (
101 CompanyId int64 `json:"companyId,string"` //公司ID 105 CompanyId int64 `json:"companyId,string"` //公司ID
102 ObjectId int `json:"objectId"` 106 ObjectId int `json:"objectId"`
103 } 107 }
  108 + SyncTableResponse {
  109 +
  110 + }
104 ) 111 )