package produce // import ( // "fmt" // "strconv" // "time" // "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" // "github.com/Shopify/sarama" // "github.com/astaxie/beego/logs" // ) // var ( // producer sarama.SyncProducer // ) // func init() { // logs.Info("init kafka producer, it may take a few seconds to init the connection\n") // var err error // mqConfig := sarama.NewConfig() // mqConfig.Producer.Return.Successes = true // mqConfig.Version = sarama.V0_10_2_1 // if err = mqConfig.Validate(); err != nil { // msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err) // logs.Info(msg) // panic(msg) // } // producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig) // if err != nil { // msg := fmt.Sprintf("Kafak producer create fail. err: %v", err) // logs.Info(msg) // panic(msg) // } // } // func produce(topic string, key string, content string) error { // msg := &sarama.ProducerMessage{ // Topic: topic, // Key: sarama.StringEncoder(key), // Value: sarama.StringEncoder(content), // Timestamp: time.Now(), // } // _, _, err := producer.SendMessage(msg) // if err != nil { // msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content) // logs.Info(msg) // return err // } // logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content) // return nil // } // func Producer() error { // key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) // value := "this is a new kafka message!" // err := produce("topic_test", key, value) // if err != nil { // logs.Info("producer err:%s \n", err) // return err // } // return nil // }