作者 yangfu
... ... @@ -101,6 +101,8 @@ spec:
value: "8082"
- name: SERVICE_ENV
value: "dev"
- name: GROUP_UP_BLOCK_CHAIN
value: "allied_creation_message_dev"
- name: REDIS_HOST
valueFrom:
configMapKeyRef:
... ...
... ... @@ -105,6 +105,8 @@ spec:
value: "8082"
- name: SERVICE_ENV
value: "test"
- name: GROUP_UP_BLOCK_CHAIN
value: "allied_creation_message_test"
- name: REDIS_HOST
valueFrom:
configMapKeyRef:
... ...
... ... @@ -9,6 +9,8 @@ var (
TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
// kafka topic up_block_chain
TOPIC_UP_BLOCK_CHAIN = "allied_creation_message"
// 区块链消息 消费组
GROUP_UP_BLOCK_CHAIN = "allied_creation_message"
// 是否启用日志收集 (本地不启用)
ENABLE_KAFKA_LOG = false
)
... ... @@ -23,4 +25,10 @@ func init() {
if os.Getenv("ENABLE_KAFKA_LOG") != "" {
ENABLE_KAFKA_LOG = true
}
if os.Getenv("TOPIC_UP_BLOCK_CHAIN") != "" {
TOPIC_UP_BLOCK_CHAIN = os.Getenv("TOPIC_UP_BLOCK_CHAIN")
}
if os.Getenv("GROUP_UP_BLOCK_CHAIN") != "" {
GROUP_UP_BLOCK_CHAIN = os.Getenv("GROUP_UP_BLOCK_CHAIN")
}
}
... ...
... ... @@ -109,7 +109,7 @@ func TestBSNBlockChain_UpToChain(t *testing.T) {
PublicKey: pubKey,
PrivatePem: priK,
}
options := NewUpToChainOptions("table", "2", "149848948").WithDesc("desc")
options := NewUpToChainOptions("table", "2", "149848948000").WithDesc("desc")
rsp, err := bc.UpToChain(options)
if err != nil {
t.Fatal(err)
... ...
... ... @@ -12,10 +12,10 @@ import (
func SetUp() {
go func() {
q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.GROUP_UP_BLOCK_CHAIN, 1), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
defer func() {
q.Stop()
log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN))
log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.GROUP_UP_BLOCK_CHAIN))
}()
q.Start()
}()
... ... @@ -38,7 +38,7 @@ func NewConfig(topic, group string, consumers int) kq.KqConf {
Offset: "first",
Conns: 1,
Consumers: consumers,
Processors: 4,
Processors: 1,
MinBytes: 10200,
MaxBytes: 10485760,
}
... ...
... ... @@ -10,11 +10,11 @@ import (
)
func UpToChainHandler(k, v string) error {
log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})
blockChainService := service.NewBlockChainService(nil)
upChainCommand := &command.UpChainCommand{}
err := json.UnmarshalFromString(v, upChainCommand)
if err != nil {
log.Logger.Error(err.Error(), map[string]interface{}{"info": "UpToChainHandler 解析json错误", "data": v})
return err
}
_, err = blockChainService.UpChain(upChainCommand)
... ... @@ -26,9 +26,12 @@ func FilterTopicHandler(topic string, handler func(string, string) error) func(s
raw := &RawData{}
err := json.UnmarshalFromString(v, raw)
if err != nil {
log.Logger.Error("【GoQueue】消息解析错误:"+err.Error(), map[string]interface{}{"info": "FilterTopicHandler 解析json错误", "data": v})
return err
}
log.Logger.Debug(fmt.Sprintf("【GoQueue】收到消息 Topic:%s", raw.Topic), map[string]interface{}{"data": v})
if raw.Topic != topic {
log.Logger.Debug(fmt.Sprintf("【GoQueue】 topic not equal get:%v want:%v", raw.Topic, topic), map[string]interface{}{"data": v})
return nil
}
return handler(k, string(raw.Data))
... ...