作者 唐旭辉

调试

@@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
6 "os/signal" 6 "os/signal"
7 "sync" 7 "sync"
8 "syscall" 8 "syscall"
  9 + "time"
9 10
10 "github.com/astaxie/beego" 11 "github.com/astaxie/beego"
11 "github.com/astaxie/beego/logs" 12 "github.com/astaxie/beego/logs"
@@ -13,6 +14,7 @@ import ( @@ -13,6 +14,7 @@ import (
13 _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log" 14 _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log"
14 _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego" 15 _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego"
15 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer" 16 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer"
  17 + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/produce"
16 ) 18 )
17 19
18 func main() { 20 func main() {
@@ -37,6 +39,12 @@ func main() { @@ -37,6 +39,12 @@ func main() {
37 <-consumerRun.IsReady() 39 <-consumerRun.IsReady()
38 logs.Info("Sarama consumer up and running!...") 40 logs.Info("Sarama consumer up and running!...")
39 }() 41 }()
  42 + go func() {
  43 + t := time.NewTimer(7 * time.Second)
  44 + <-t.C
  45 + err := produce.Producer()
  46 + logs.Info("err:%s", err)
  47 + }()
40 for { 48 for {
41 select { 49 select {
42 case <-sigs: 50 case <-sigs:
  1 +package handles
  2 +
  3 +import (
  4 + "encoding/json"
  5 + "fmt"
  6 +
  7 + "github.com/Shopify/sarama"
  8 + "github.com/astaxie/beego/logs"
  9 + syncOrderCmd "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/command"
  10 + syncOrderSrv "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/service"
  11 +)
  12 +
  13 +type DataFromMessage struct {
  14 + Module string `json:"module"`
  15 + Action string `json:"action"`
  16 + Data json.RawMessage `json:"data"`
  17 +}
  18 +
  19 +func DataFromXiangMi(message *sarama.ConsumerMessage) error {
  20 + logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
  21 + message.Timestamp, message.Topic, message.Offset, string(message.Value))
  22 + var (
  23 + msgData DataFromMessage
  24 + err error
  25 + )
  26 + err = json.Unmarshal(message.Value, &msgData)
  27 + if err != nil {
  28 + return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
  29 + }
  30 + err = SyncBestshopOrder(msgData.Data)
  31 + if err != nil {
  32 + e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
  33 + return e
  34 + }
  35 + return nil
  36 +}
  37 +
  38 +//SyncBestshopOrder 同步
  39 +func SyncBestshopOrder(data []byte) error {
  40 +
  41 + var (
  42 + cmd syncOrderCmd.CreateOrderFromBestshop
  43 + err error
  44 + )
  45 + err = json.Unmarshal(data, &cmd)
  46 + if err != nil {
  47 + return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
  48 + }
  49 + if cmd.PartnerId <= 0 {
  50 + logs.Info("[Consumer][SyncBestshopOrder] PartnerId<=0 ,不处理消息")
  51 + return nil
  52 + }
  53 + srv := syncOrderSrv.NewOrderInfoService(nil)
  54 + err = srv.SyncOrderFromBestshop(cmd)
  55 + if err != nil {
  56 + e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
  57 + return e
  58 + }
  59 + return err
  60 +}
1 package produce 1 package produce
2 2
3 -// import (  
4 -// "fmt"  
5 -// "strconv"  
6 -// "time" 3 +import (
  4 + "fmt"
  5 + "strconv"
  6 + "time"
7 7
8 -// "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" 8 + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
9 9
10 -// "github.com/Shopify/sarama"  
11 -// "github.com/astaxie/beego/logs"  
12 -// ) 10 + "github.com/Shopify/sarama"
  11 + "github.com/astaxie/beego/logs"
  12 +)
13 13
14 -// var (  
15 -// producer sarama.SyncProducer  
16 -// ) 14 +var (
  15 + producer sarama.SyncProducer
  16 +)
17 17
18 -// func init() { 18 +func init() {
19 19
20 -// logs.Info("init kafka producer, it may take a few seconds to init the connection\n")  
21 -// var err error  
22 -// mqConfig := sarama.NewConfig()  
23 -// mqConfig.Producer.Return.Successes = true  
24 -// mqConfig.Version = sarama.V0_10_2_1  
25 -// if err = mqConfig.Validate(); err != nil {  
26 -// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)  
27 -// logs.Info(msg)  
28 -// panic(msg)  
29 -// } 20 + logs.Info("init kafka producer, it may take a few seconds to init the connection\n")
  21 + var err error
  22 + mqConfig := sarama.NewConfig()
  23 + mqConfig.Producer.Return.Successes = true
  24 + mqConfig.Version = sarama.V0_10_2_1
  25 + if err = mqConfig.Validate(); err != nil {
  26 + msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
  27 + logs.Info(msg)
  28 + panic(msg)
  29 + }
30 30
31 -// producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)  
32 -// if err != nil {  
33 -// msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)  
34 -// logs.Info(msg)  
35 -// panic(msg)  
36 -// } 31 + producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)
  32 + if err != nil {
  33 + msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
  34 + logs.Info(msg)
  35 + panic(msg)
  36 + }
37 37
38 -// } 38 +}
39 39
40 -// func produce(topic string, key string, content string) error {  
41 -// msg := &sarama.ProducerMessage{  
42 -// Topic: topic,  
43 -// Key: sarama.StringEncoder(key),  
44 -// Value: sarama.StringEncoder(content),  
45 -// Timestamp: time.Now(),  
46 -// } 40 +func produce(topic string, key string, content string) error {
  41 + msg := &sarama.ProducerMessage{
  42 + Topic: topic,
  43 + Key: sarama.StringEncoder(key),
  44 + Value: sarama.StringEncoder(content),
  45 + Timestamp: time.Now(),
  46 + }
47 47
48 -// _, _, err := producer.SendMessage(msg)  
49 -// if err != nil {  
50 -// msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)  
51 -// logs.Info(msg)  
52 -// return err  
53 -// }  
54 -// logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)  
55 -// return nil  
56 -// } 48 + _, _, err := producer.SendMessage(msg)
  49 + if err != nil {
  50 + msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
  51 + logs.Info(msg)
  52 + return err
  53 + }
  54 + logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)
  55 + return nil
  56 +}
57 57
58 -// func Producer() error {  
59 -// key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)  
60 -// value := "this is a new kafka message!"  
61 -// err := produce("topic_test", key, value)  
62 -// if err != nil {  
63 -// logs.Info("producer err:%s \n", err)  
64 -// return err  
65 -// }  
66 -// return nil  
67 -// } 58 +func Producer() error {
  59 + key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
  60 + value := "this is a new kafka message123456!"
  61 + err := produce("topic_test", key, value)
  62 + if err != nil {
  63 + logs.Info("producer err:%s \n", err)
  64 + return err
  65 + }
  66 + return nil
  67 +}
@@ -3,6 +3,7 @@ package consumer @@ -3,6 +3,7 @@ package consumer
3 import ( 3 import (
4 "github.com/Shopify/sarama" 4 "github.com/Shopify/sarama"
5 "github.com/astaxie/beego/logs" 5 "github.com/astaxie/beego/logs"
  6 + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles"
6 ) 7 )
7 8
8 //TopicHandle 处理kafka中得消息 9 //TopicHandle 处理kafka中得消息
@@ -18,5 +19,5 @@ var TopicHandleRouters = map[string]TopicHandle{ @@ -18,5 +19,5 @@ var TopicHandleRouters = map[string]TopicHandle{
18 message.Timestamp, message.Topic, message.Offset, string(message.Value)) 19 message.Timestamp, message.Topic, message.Offset, string(message.Value))
19 return nil 20 return nil
20 }, 21 },
21 - "xiangmi-orders": SyncBestshopOrder, 22 + "xiangmi-orders": handles.DataFromXiangMi,
22 } 23 }