作者 唐旭辉

更新

... ... @@ -41,7 +41,7 @@ func main() {
select {
case <-sigs:
cancel()
wg.wait()
wg.Wait()
return
default:
}
... ...
... ... @@ -3,6 +3,7 @@ package consumer
import (
"context"
"errors"
"time"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
... ... @@ -110,12 +111,15 @@ func (r *Runer) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.consumerGroup.Close()
logs.Warning("ctx cancel;consumerGroup.Close()")
err := r.consumerGroup.Close()
logs.Warning("ctx cancel;consumerGroup.Close();err:%s", err)
return
default:
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
}
r.msgConsumer.ready = make(chan struct{})
}
... ...