作者 唐旭辉

提交

... ... @@ -6,6 +6,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"github.com/astaxie/beego"
"github.com/astaxie/beego/logs"
... ... @@ -39,6 +40,8 @@ func main() {
logs.Info("Sarama consumer up and running!...")
}()
go func() {
t := time.NewTimer(10 * time.Second)
<-t.C
produce.Producer()
}()
for {
... ...
... ... @@ -14,7 +14,7 @@ var KafkaCfg KafkaConfig
func init() {
KafkaCfg = KafkaConfig{
Servers: []string{"114.55.200.59:30092"},
Servers: []string{"192.168.190.136:9092"},
ConsumerId: "partnermg_local",
}
if os.Getenv("KAFKA_HOST") != "" {
... ...
... ... @@ -97,7 +97,7 @@ func (r *Runer) InitConsumer() error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V0_11_0_2
config.Version = sarama.V0_10_2_1
consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
if err != nil {
return err
... ... @@ -119,15 +119,15 @@ func (r *Runer) Start(ctx context.Context) {
r.consumerGroup.Close()
return
default:
}
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
}
r.msgConsumer.ready = make(chan struct{})
}
r.msgConsumer.ready = make(chan struct{})
}
}
func (r *Runer) IsReady() <-chan struct{} {
... ...