作者 yangfu

fix:增加日志,区块链消费组可配置

@@ -101,6 +101,8 @@ spec: @@ -101,6 +101,8 @@ spec:
101 value: "8082" 101 value: "8082"
102 - name: SERVICE_ENV 102 - name: SERVICE_ENV
103 value: "dev" 103 value: "dev"
  104 + - name: GROUP_UP_BLOCK_CHAIN
  105 + value: "allied_creation_message_dev"
104 - name: REDIS_HOST 106 - name: REDIS_HOST
105 valueFrom: 107 valueFrom:
106 configMapKeyRef: 108 configMapKeyRef:
@@ -105,6 +105,8 @@ spec: @@ -105,6 +105,8 @@ spec:
105 value: "8082" 105 value: "8082"
106 - name: SERVICE_ENV 106 - name: SERVICE_ENV
107 value: "test" 107 value: "test"
  108 + - name: GROUP_UP_BLOCK_CHAIN
  109 + value: "allied_creation_message_test"
108 - name: REDIS_HOST 110 - name: REDIS_HOST
109 valueFrom: 111 valueFrom:
110 configMapKeyRef: 112 configMapKeyRef:
@@ -9,6 +9,8 @@ var ( @@ -9,6 +9,8 @@ var (
9 TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage" 9 TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
10 // kafka topic up_block_chain 10 // kafka topic up_block_chain
11 TOPIC_UP_BLOCK_CHAIN = "allied_creation_message" 11 TOPIC_UP_BLOCK_CHAIN = "allied_creation_message"
  12 + // 区块链消息 消费组
  13 + GROUP_UP_BLOCK_CHAIN = "allied_creation_message"
12 // 是否启用日志收集 (本地不启用) 14 // 是否启用日志收集 (本地不启用)
13 ENABLE_KAFKA_LOG = false 15 ENABLE_KAFKA_LOG = false
14 ) 16 )
@@ -23,4 +25,10 @@ func init() { @@ -23,4 +25,10 @@ func init() {
23 if os.Getenv("ENABLE_KAFKA_LOG") != "" { 25 if os.Getenv("ENABLE_KAFKA_LOG") != "" {
24 ENABLE_KAFKA_LOG = true 26 ENABLE_KAFKA_LOG = true
25 } 27 }
  28 + if os.Getenv("TOPIC_UP_BLOCK_CHAIN") != "" {
  29 + TOPIC_UP_BLOCK_CHAIN = os.Getenv("TOPIC_UP_BLOCK_CHAIN")
  30 + }
  31 + if os.Getenv("GROUP_UP_BLOCK_CHAIN") != "" {
  32 + GROUP_UP_BLOCK_CHAIN = os.Getenv("GROUP_UP_BLOCK_CHAIN")
  33 + }
26 } 34 }
@@ -109,7 +109,7 @@ func TestBSNBlockChain_UpToChain(t *testing.T) { @@ -109,7 +109,7 @@ func TestBSNBlockChain_UpToChain(t *testing.T) {
109 PublicKey: pubKey, 109 PublicKey: pubKey,
110 PrivatePem: priK, 110 PrivatePem: priK,
111 } 111 }
112 - options := NewUpToChainOptions("table", "2", "149848948").WithDesc("desc") 112 + options := NewUpToChainOptions("table", "2", "149848948000").WithDesc("desc")
113 rsp, err := bc.UpToChain(options) 113 rsp, err := bc.UpToChain(options)
114 if err != nil { 114 if err != nil {
115 t.Fatal(err) 115 t.Fatal(err)
@@ -12,10 +12,10 @@ import ( @@ -12,10 +12,10 @@ import (
12 12
13 func SetUp() { 13 func SetUp() {
14 go func() { 14 go func() {
15 - q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler))) 15 + q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.GROUP_UP_BLOCK_CHAIN, 1), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
16 defer func() { 16 defer func() {
17 q.Stop() 17 q.Stop()
18 - log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN)) 18 + log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.GROUP_UP_BLOCK_CHAIN))
19 }() 19 }()
20 q.Start() 20 q.Start()
21 }() 21 }()
@@ -38,7 +38,7 @@ func NewConfig(topic, group string, consumers int) kq.KqConf { @@ -38,7 +38,7 @@ func NewConfig(topic, group string, consumers int) kq.KqConf {
38 Offset: "first", 38 Offset: "first",
39 Conns: 1, 39 Conns: 1,
40 Consumers: consumers, 40 Consumers: consumers,
41 - Processors: 4, 41 + Processors: 1,
42 MinBytes: 10200, 42 MinBytes: 10200,
43 MaxBytes: 10485760, 43 MaxBytes: 10485760,
44 } 44 }
@@ -10,11 +10,11 @@ import ( @@ -10,11 +10,11 @@ import (
10 ) 10 )
11 11
12 func UpToChainHandler(k, v string) error { 12 func UpToChainHandler(k, v string) error {
13 - log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})  
14 blockChainService := service.NewBlockChainService(nil) 13 blockChainService := service.NewBlockChainService(nil)
15 upChainCommand := &command.UpChainCommand{} 14 upChainCommand := &command.UpChainCommand{}
16 err := json.UnmarshalFromString(v, upChainCommand) 15 err := json.UnmarshalFromString(v, upChainCommand)
17 if err != nil { 16 if err != nil {
  17 + log.Logger.Error(err.Error(), map[string]interface{}{"info": "UpToChainHandler 解析json错误", "data": v})
18 return err 18 return err
19 } 19 }
20 _, err = blockChainService.UpChain(upChainCommand) 20 _, err = blockChainService.UpChain(upChainCommand)
@@ -26,9 +26,12 @@ func FilterTopicHandler(topic string, handler func(string, string) error) func(s @@ -26,9 +26,12 @@ func FilterTopicHandler(topic string, handler func(string, string) error) func(s
26 raw := &RawData{} 26 raw := &RawData{}
27 err := json.UnmarshalFromString(v, raw) 27 err := json.UnmarshalFromString(v, raw)
28 if err != nil { 28 if err != nil {
  29 + log.Logger.Error("【GoQueue】消息解析错误:"+err.Error(), map[string]interface{}{"info": "FilterTopicHandler 解析json错误", "data": v})
29 return err 30 return err
30 } 31 }
  32 + log.Logger.Debug(fmt.Sprintf("【GoQueue】收到消息 Topic:%s", raw.Topic), map[string]interface{}{"data": v})
31 if raw.Topic != topic { 33 if raw.Topic != topic {
  34 + log.Logger.Debug(fmt.Sprintf("【GoQueue】 topic not equal get:%v want:%v", raw.Topic, topic), map[string]interface{}{"data": v})
32 return nil 35 return nil
33 } 36 }
34 return handler(k, string(raw.Data)) 37 return handler(k, string(raw.Data))