作者 唐旭辉

调测

... ... @@ -154,19 +154,19 @@ func (r *Runer) Start(ctx context.Context) {
select {
case msg, more := <-r.Consumer.Messages():
if more {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
r.Consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-r.Consumer.Errors():
if more {
fmt.Println("Kafka consumer error: %v", err.Error())
logs.Info("Kafka consumer error: %v", err.Error())
}
case ntf, more := <-r.Consumer.Notifications():
if more {
fmt.Println("Kafka consumer rebalance: %v", ntf)
logs.Info("Kafka consumer rebalance: %v", ntf)
}
case <-ctx.Done():
fmt.Errorf("Stop consumer server...")
logs.Info("Stop consumer server...")
r.Consumer.Close()
return
}
... ...