正在显示
2 个修改的文件
包含
71 行增加
和
0 行删除
@@ -13,6 +13,7 @@ import ( | @@ -13,6 +13,7 @@ import ( | ||
13 | _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log" | 13 | _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log" |
14 | _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego" | 14 | _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego" |
15 | "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer" | 15 | "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer" |
16 | + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/produce" | ||
16 | ) | 17 | ) |
17 | 18 | ||
18 | func main() { | 19 | func main() { |
@@ -37,6 +38,9 @@ func main() { | @@ -37,6 +38,9 @@ func main() { | ||
37 | <-consumerRun.IsReady() | 38 | <-consumerRun.IsReady() |
38 | logs.Info("Sarama consumer up and running!...") | 39 | logs.Info("Sarama consumer up and running!...") |
39 | }() | 40 | }() |
41 | + go func() { | ||
42 | + produce.Producer() | ||
43 | + }() | ||
40 | for { | 44 | for { |
41 | select { | 45 | select { |
42 | case <-sigs: | 46 | case <-sigs: |
pkg/port/consumer/produce/produce.go
0 → 100644
1 | +package produce | ||
2 | + | ||
3 | +import ( | ||
4 | + "fmt" | ||
5 | + "strconv" | ||
6 | + "time" | ||
7 | + | ||
8 | + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" | ||
9 | + | ||
10 | + "github.com/Shopify/sarama" | ||
11 | + "github.com/astaxie/beego/logs" | ||
12 | +) | ||
13 | + | ||
14 | +var ( | ||
15 | + producer sarama.SyncProducer | ||
16 | +) | ||
17 | + | ||
18 | +func init() { | ||
19 | + | ||
20 | + logs.Info("init kafka producer, it may take a few seconds to init the connection\n") | ||
21 | + var err error | ||
22 | + mqConfig := sarama.NewConfig() | ||
23 | + mqConfig.Producer.Return.Successes = true | ||
24 | + mqConfig.Version = sarama.V0_10_2_1 | ||
25 | + if err = mqConfig.Validate(); err != nil { | ||
26 | + msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err) | ||
27 | + logs.Info(msg) | ||
28 | + panic(msg) | ||
29 | + } | ||
30 | + | ||
31 | + producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig) | ||
32 | + if err != nil { | ||
33 | + msg := fmt.Sprintf("Kafak producer create fail. err: %v", err) | ||
34 | + logs.Info(msg) | ||
35 | + panic(msg) | ||
36 | + } | ||
37 | + | ||
38 | +} | ||
39 | + | ||
40 | +func produce(topic string, key string, content string) error { | ||
41 | + msg := &sarama.ProducerMessage{ | ||
42 | + Topic: topic, | ||
43 | + Key: sarama.StringEncoder(key), | ||
44 | + Value: sarama.StringEncoder(content), | ||
45 | + Timestamp: time.Now(), | ||
46 | + } | ||
47 | + | ||
48 | + _, _, err := producer.SendMessage(msg) | ||
49 | + if err != nil { | ||
50 | + msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content) | ||
51 | + logs.Info(msg) | ||
52 | + return err | ||
53 | + } | ||
54 | + logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content) | ||
55 | + return nil | ||
56 | +} | ||
57 | + | ||
58 | +func Producer() error { | ||
59 | + key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) | ||
60 | + value := "this is a new kafka message!" | ||
61 | + err := produce("topic_test", key, value) | ||
62 | + if err != nil { | ||
63 | + logs.Info("producer err:%s \n", err) | ||
64 | + return err | ||
65 | + } | ||
66 | + return nil | ||
67 | +} |
-
请 注册 或 登录 后发表评论