|
|
package produce
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
|
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
"github.com/astaxie/beego/logs"
|
|
|
)
|
|
|
|
|
|
// var (
|
|
|
// producer sarama.SyncProducer
|
|
|
// )
|
|
|
|
|
|
func produce(topic string, key string, content string) error {
|
|
|
var (
|
|
|
producer sarama.SyncProducer
|
|
|
)
|
|
|
logs.Info("init kafka producer, it may take a few seconds to init the connection\n")
|
|
|
var err error
|
|
|
mqConfig := sarama.NewConfig()
|
|
|
mqConfig.Producer.Return.Successes = true
|
|
|
mqConfig.Version = sarama.V0_10_2_1
|
|
|
if err = mqConfig.Validate(); err != nil {
|
|
|
msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
|
|
|
logs.Info(msg)
|
|
|
panic(msg)
|
|
|
}
|
|
|
|
|
|
producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)
|
|
|
if err != nil {
|
|
|
msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
|
|
|
logs.Info(msg)
|
|
|
panic(msg)
|
|
|
}
|
|
|
msg := &sarama.ProducerMessage{
|
|
|
Topic: topic,
|
|
|
Key: sarama.StringEncoder(key),
|
|
|
Value: sarama.StringEncoder(content),
|
|
|
Timestamp: time.Now(),
|
|
|
}
|
|
|
|
|
|
_, _, err = producer.SendMessage(msg)
|
|
|
if err != nil {
|
|
|
msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
|
|
|
logs.Info(msg)
|
|
|
return err
|
|
|
}
|
|
|
logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func Producer() error {
|
|
|
key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
|
|
|
value := "this is a new kafka message123456!"
|
|
|
err := produce("topic_test", key, value)
|
|
|
if err != nil {
|
|
|
logs.Info("producer err:%s \n", err)
|
|
|
return err
|
|
|
}
|
|
|
return nil
|
|
|
} |