From eab3e6c41f8800ba8547833fe5dcfc53c134a313 Mon Sep 17 00:00:00 2001 From: tangxuhui <987654321@qq.com> Date: Mon, 19 Oct 2020 17:14:12 +0800 Subject: [PATCH] 调试 --- main.go | 7 ------- pkg/application/orderinfo/service/order_info.go | 1 + pkg/port/consumer/consumer.go | 5 ++++- pkg/port/consumer/produce/produce.go | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------- 4 files changed, 60 insertions(+), 63 deletions(-) diff --git a/main.go b/main.go index c18a574..2b0ff15 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( "os/signal" "sync" "syscall" - "time" "github.com/astaxie/beego" "github.com/astaxie/beego/logs" @@ -14,7 +13,6 @@ import ( _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log" _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego" "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer" - "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/produce" ) func main() { @@ -39,11 +37,6 @@ func main() { <-consumerRun.IsReady() logs.Info("Sarama consumer up and running!...") }() - go func() { - t := time.NewTimer(10 * time.Second) - <-t.C - produce.Producer() - }() for { select { case <-sigs: diff --git a/pkg/application/orderinfo/service/order_info.go b/pkg/application/orderinfo/service/order_info.go index 748ba21..843624a 100644 --- a/pkg/application/orderinfo/service/order_info.go +++ b/pkg/application/orderinfo/service/order_info.go @@ -922,6 +922,7 @@ func (service OrderInfoService) buildOrderBestshopInfoData(orderBase *domain.Ord "receivedDividends": orderBase.OrderCompute.PartnerBonusHas, "notReceivedDividend": orderBase.OrderCompute.PartnerBonusNot, "dividendSpending": orderBase.OrderCompute.PartnerBonusExpense, + "orderNumber": orderBase.OrderCode, } //订单中的商品 product := map[string]interface{}{ diff --git a/pkg/port/consumer/consumer.go b/pkg/port/consumer/consumer.go index f367933..546ec3a 100644 --- a/pkg/port/consumer/consumer.go +++ b/pkg/port/consumer/consumer.go @@ -97,7 +97,10 @@ func (r *Runer) InitConsumer() error { config := sarama.NewConfig() //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest - config.Version = sarama.V0_10_2_0 + config.Version = sarama.V0_10_2_1 + // config.Version = sarama.KafkaVersion{ + // version: [4]int{}, + // } consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config) if err != nil { return err diff --git a/pkg/port/consumer/produce/produce.go b/pkg/port/consumer/produce/produce.go index a638754..8fc2d77 100644 --- a/pkg/port/consumer/produce/produce.go +++ b/pkg/port/consumer/produce/produce.go @@ -1,67 +1,67 @@ package produce -import ( - "fmt" - "strconv" - "time" +// import ( +// "fmt" +// "strconv" +// "time" - "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" +// "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" - "github.com/Shopify/sarama" - "github.com/astaxie/beego/logs" -) +// "github.com/Shopify/sarama" +// "github.com/astaxie/beego/logs" +// ) -var ( - producer sarama.SyncProducer -) +// var ( +// producer sarama.SyncProducer +// ) -func init() { +// func init() { - logs.Info("init kafka producer, it may take a few seconds to init the connection\n") - var err error - mqConfig := sarama.NewConfig() - mqConfig.Producer.Return.Successes = true - mqConfig.Version = sarama.V0_10_2_0 - if err = mqConfig.Validate(); err != nil { - msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err) - logs.Info(msg) - panic(msg) - } +// logs.Info("init kafka producer, it may take a few seconds to init the connection\n") +// var err error +// mqConfig := sarama.NewConfig() +// mqConfig.Producer.Return.Successes = true +// mqConfig.Version = sarama.V0_10_2_1 +// if err = mqConfig.Validate(); err != nil { +// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err) +// logs.Info(msg) +// panic(msg) +// } - producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig) - if err != nil { - msg := fmt.Sprintf("Kafak producer create fail. err: %v", err) - logs.Info(msg) - panic(msg) - } +// producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig) +// if err != nil { +// msg := fmt.Sprintf("Kafak producer create fail. err: %v", err) +// logs.Info(msg) +// panic(msg) +// } -} +// } -func produce(topic string, key string, content string) error { - msg := &sarama.ProducerMessage{ - Topic: topic, - Key: sarama.StringEncoder(key), - Value: sarama.StringEncoder(content), - Timestamp: time.Now(), - } +// func produce(topic string, key string, content string) error { +// msg := &sarama.ProducerMessage{ +// Topic: topic, +// Key: sarama.StringEncoder(key), +// Value: sarama.StringEncoder(content), +// Timestamp: time.Now(), +// } - _, _, err := producer.SendMessage(msg) - if err != nil { - msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content) - logs.Info(msg) - return err - } - logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content) - return nil -} +// _, _, err := producer.SendMessage(msg) +// if err != nil { +// msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content) +// logs.Info(msg) +// return err +// } +// logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content) +// return nil +// } -func Producer() error { - key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - value := "this is a new kafka message!" - err := produce("topic_test", key, value) - if err != nil { - logs.Info("producer err:%s \n", err) - return err - } - return nil -} +// func Producer() error { +// key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) +// value := "this is a new kafka message!" +// err := produce("topic_test", key, value) +// if err != nil { +// logs.Info("producer err:%s \n", err) +// return err +// } +// return nil +// } -- libgit2 0.24.0