produce.go
1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package produce
// import (
// "fmt"
// "strconv"
// "time"
// "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
// "github.com/Shopify/sarama"
// "github.com/astaxie/beego/logs"
// )
// var (
// producer sarama.SyncProducer
// )
// func init() {
// logs.Info("init kafka producer, it may take a few seconds to init the connection\n")
// var err error
// mqConfig := sarama.NewConfig()
// mqConfig.Producer.Return.Successes = true
// mqConfig.Version = sarama.V0_10_2_1
// if err = mqConfig.Validate(); err != nil {
// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
// logs.Info(msg)
// panic(msg)
// }
// producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)
// if err != nil {
// msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
// logs.Info(msg)
// panic(msg)
// }
// }
// func produce(topic string, key string, content string) error {
// msg := &sarama.ProducerMessage{
// Topic: topic,
// Key: sarama.StringEncoder(key),
// Value: sarama.StringEncoder(content),
// Timestamp: time.Now(),
// }
// _, _, err := producer.SendMessage(msg)
// if err != nil {
// msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
// logs.Info(msg)
// return err
// }
// logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)
// return nil
// }
// func Producer() error {
// key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
// value := "this is a new kafka message!"
// err := produce("topic_test", key, value)
// if err != nil {
// logs.Info("producer err:%s \n", err)
// return err
// }
// return nil
// }