produce.go 1.6 KB
package produce

import (
	"fmt"
	"strconv"
	"time"

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

// var (
// 	producer sarama.SyncProducer
// )

func produce(topic string, key string, content string) error {
	var (
		producer sarama.SyncProducer
	)
	logs.Info("init kafka producer, it may take a few seconds to init the connection\n")
	var err error
	mqConfig := sarama.NewConfig()
	mqConfig.Producer.Return.Successes = true
	mqConfig.Version = sarama.V0_10_2_1
	if err = mqConfig.Validate(); err != nil {
		msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
		logs.Info(msg)
		panic(msg)
	}

	producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)
	if err != nil {
		msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
		logs.Info(msg)
		panic(msg)
	}
	msg := &sarama.ProducerMessage{
		Topic:     topic,
		Key:       sarama.StringEncoder(key),
		Value:     sarama.StringEncoder(content),
		Timestamp: time.Now(),
	}

	_, _, err = producer.SendMessage(msg)
	if err != nil {
		msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
		logs.Info(msg)
		return err
	}
	logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)
	return nil
}

func Producer() error {
	key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
	value := "this is a new kafka message123456!"
	err := produce("topic_test", key, value)
	if err != nil {
		logs.Info("producer err:%s \n", err)
		return err
	}
	return nil
}