作者 yangfu

表同步kafka推送

... ... @@ -13,6 +13,9 @@ ALLIED_CREATION_USER_HOST = http://allied-creation-user-dev.fjmaimaimai.com
BYTE_CORE_HOST = http://47.97.5.102:8303
METADATA_BASTION_HOST = http://106.75.231.90:9999
KAFKA_HOST =47.97.5.102:9092
#192.168.100.35:9092
STARROCKS_DB_NAME = character_library
STARROCKS_USER = root
STARROCKS_PASSWORD = eagle1010
... ...
... ... @@ -93,8 +93,8 @@ spec:
value: "1"
- name: ERROR_BASE_CODE_MULTIPLE
value: "2000"
- name: ENABLE_KAFKA_LOG
value: "false"
- name: KAFKA_HOST
value: "47.97.5.102:9092"
- name: HTTP_PORT
value: "8082"
- name: SERVICE_ENV
... ...
... ... @@ -46,6 +46,7 @@ require (
github.com/yudai/gojsondiff v1.0.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/yudai/pp v2.0.1+incompatible // indirect
github.com/zeromicro/go-queue v1.1.6
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
)
... ...
... ... @@ -9,6 +9,7 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/broker"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"math"
... ... @@ -30,79 +31,42 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
var (
dataChanged = true
structChanged = true
tableType string
ok bool
tableId int
)
data := cmd.EventTable
tableId := 0
switch data.Type {
case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent, domain.TableStructEditEvent:
tableId = data.Table.TableId
case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
if data.QuerySet.Status != domain.StatusOn {
return nil, nil
}
if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
event := cmd.EventTable
if tableId = resolveTableId(event); tableId == 0 {
return nil, nil
}
case domain.QuerySetUpdateStatusEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
return nil, nil
}
case domain.TableApplyOnEvent:
tableId = data.Table.TableId
if event.Type == domain.TableApplyOnEvent {
dataChanged = false
case domain.QuerySetDeleteEvent:
tableId = data.Table.TableId
}
if tableId == 0 {
return nil, nil
}
var notifyData = NotifyData{
var notifyData = &NotifyData{
DataChanged: dataChanged,
StructChanged: structChanged,
TableId: tableId,
Event: data.Type.ToString(),
Metadata: cmd.EventTable.Metadata,
Event: event.Type.ToString(),
Metadata: event.Metadata,
}
// tableId 相关联的
// 表类型
tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
if table == nil && data.Table != nil {
table = data.Table
if table == nil && event.Table != nil {
table = event.Table
notifyData.CompanyId = table.Context.CompanyId
}
if tableType == "" && table != nil {
tableType = table.TableType
}
if tableType == "" && data.QuerySet != nil {
tableType = data.QuerySet.Type
}
if table != nil {
notifyData.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
switch domain.TableType(tableType) {
case domain.MainTable, domain.SubTable, domain.SideTable:
notifyData.ObjectType = "导入模块"
case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
notifyData.ObjectType = "拆解模块"
case domain.CalculateItem, domain.CalculateSet:
notifyData.ObjectType = "计算模块"
}
}
notifyData.SetType(event, table)
_, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
// 依赖的表 \ 依赖的查询集合
_, tables, err := tableRepository.Find(map[string]interface{}{"context": event.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
if err != nil {
return nil, err
}
tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
tree := tableDependTree.Tree
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
var mapTableQuerySet = make(map[int]*domain.QuerySet)
if len(tree) > 0 { // && cmd.EventTable.QuerySet != nil
if len(tree) > 0 {
_, querySets, _ := querySetRepository.Find(map[string]interface{}{
"types": []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
"bindTableIds": tree,
... ... @@ -113,8 +77,9 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
}
}
// 过滤出需要推送的表
for i := range tree {
table, ok := tableDependencyService.TableMap[tree[i]]
table, ok = tableDependencyService.TableMap[tree[i]]
if !ok {
continue
}
... ... @@ -123,46 +88,103 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
}
switch table.TableType {
case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
if table.TableInfo != nil && table.TableInfo.ApplyOnModule&domain.ModuleDigitalCenter == 0 {
if table.TableInfo != nil {
applyOn := domain.ModuleDigitalCenter | domain.ModuleChartTemplate
if table.TableInfo.ApplyOnModule&applyOn == 0 {
continue
}
}
break
case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
continue
case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
if querySet, ok := mapTableQuerySet[tree[i]]; !ok {
var querySet *domain.QuerySet
if querySet, ok = mapTableQuerySet[tree[i]]; !ok {
continue
} else {
}
// 不是当前的查询集。且状态为关闭的都补推送
if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
continue
}
}
}
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
}
found := false
for _, id := range notifyData.TableAffectedList {
if id == tableId {
found = true
// 包含自己
if !exist(notifyData.TableAffectedList, tableId) {
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
}
// 通过消息队列发送
if err = tableEventService.send(notifyData, tableEventService.sendBroker); err != nil {
return nil, err
}
if !found {
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
if err = transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
return nil, nil
}
func (tableEventService *TableEventService) send(notifyData *NotifyData, sendFunc func(*NotifyData) error) error {
var err error
if err = sendFunc(notifyData); err != nil {
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
if t, ok := notifyData.Retry(); ok {
tableEventService.TimingWheel.SetTimer(notifyData.Key(), &notifyData, t)
log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
}
}
return err
}
func (tableEventService *TableEventService) sendHttp(notifyData *NotifyData) error {
var err error
lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
return err
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
return nil
}
func (tableEventService *TableEventService) sendBroker(notifyData *NotifyData) error {
var err error
if err = broker.Push(constant.KAFKA_HOST, constant.TOPIC_TABLE_DATA_SYNC, notifyData); err != nil {
return err
}
return nil, nil
return nil
}
func resolveTableId(event *domain.EventTable) (tableId int) {
switch event.Type {
case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent, domain.TableStructEditEvent:
tableId = event.Table.TableId
case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
if event.QuerySet.Status != domain.StatusOn {
return
}
if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
return
}
tableId = event.QuerySet.QuerySetInfo.BindTableId
case domain.QuerySetUpdateStatusEvent:
if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
return
}
tableId = event.QuerySet.QuerySetInfo.BindTableId
case domain.TableApplyOnEvent:
tableId = event.Table.TableId
case domain.QuerySetDeleteEvent:
tableId = event.Table.TableId
}
return tableId
}
func exist(idList []int, target int) bool {
found := false
for _, id := range idList {
if id == target {
found = true
}
}
return found
}
type NotifyData struct {
... ... @@ -178,6 +200,28 @@ type NotifyData struct {
sendRetry int
}
func (n *NotifyData) SetType(event *domain.EventTable, table *domain.Table) {
var tableType string
// 表类型
if tableType == "" && table != nil {
tableType = table.TableType
}
if tableType == "" && event.QuerySet != nil {
tableType = event.QuerySet.Type
}
n.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
if table != nil {
switch domain.TableType(tableType) {
case domain.MainTable, domain.SubTable, domain.SideTable:
n.ObjectType = "导入模块"
case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
n.ObjectType = "拆解模块"
case domain.CalculateItem, domain.CalculateSet:
n.ObjectType = "计算模块"
}
}
}
func (n *NotifyData) Key() string {
return fmt.Sprintf("delay:notify:table:%d", n.TableId)
}
... ...
package constant
var (
// kafka 地址
KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.75.231.90:9092"
// kafka topic log stash
TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
// kafka topic up_block_chain
TOPIC_UP_BLOCK_CHAIN = "allied_creation_message"
// 是否启用日志收集 (本地不启用)
KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"
TOPIC_TABLE_DATA_SYNC = "allied_creation_metadata_table_sync_notice"
TOPIC_LOG_STASH = "go_stash_dev"
ENABLE_KAFKA_LOG = false
)
func init() {
KAFKA_HOST = Configurator.DefaultString("KAFKA_HOST", KAFKA_HOST)
TOPIC_TABLE_DATA_SYNC = Configurator.DefaultString("TOPIC_TABLE_DATA_SYNC", TOPIC_TABLE_DATA_SYNC)
ENABLE_KAFKA_LOG = Configurator.DefaultBool("ENABLE_KAFKA_LOG", ENABLE_KAFKA_LOG)
}
... ...
package broker
import (
"fmt"
"github.com/linmadan/egglib-go/utils/json"
"github.com/zeromicro/go-queue/kq"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"strings"
"sync"
)
var defaultClientMap = make(map[string]*kq.Pusher)
var locker sync.RWMutex
type client struct {
pusher *kq.Pusher
}
func Register(host, topic string) *kq.Pusher {
if v, ok := defaultClientMap[topic]; ok {
return v
}
locker.Lock()
defer locker.Unlock()
pusher := kq.NewPusher(strings.Split(host, ","), topic)
defaultClientMap[topic] = pusher
return pusher
}
// Push 异步推送到队列
func Push(host string, topic string, o interface{}) error {
var pusher *kq.Pusher
if pusher = Register(host, topic); pusher == nil {
return fmt.Errorf("pusher client [%v] not found ", topic)
}
if err := pusher.Push(json.MarshalToString(o)); err != nil {
log.Logger.Debug(fmt.Sprintf("topic:%v error:%v", topic, err.Error()), map[string]interface{}{"message": o})
return err
} else {
log.Logger.Debug(fmt.Sprintf("topic:%v 发送成功", topic), map[string]interface{}{"message": o})
}
return nil
}
... ...