作者 唐旭辉

kafka 消息提交当前位移

@@ -81,7 +81,7 @@ spec: @@ -81,7 +81,7 @@ spec:
81 - name: KAFKA_HOST 81 - name: KAFKA_HOST
82 value: "" 82 value: ""
83 - name: KAFKA_CONSUMER_ID 83 - name: KAFKA_CONSUMER_ID
84 - value: "partnermg_test" 84 + value: "partnermg_prd"
85 volumes: 85 volumes:
86 - name: accesslogs 86 - name: accesslogs
87 emptyDir: {} 87 emptyDir: {}
@@ -61,9 +61,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, @@ -61,9 +61,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
61 } 61 }
62 if err = topicHandle(message); err != nil { 62 if err = topicHandle(message); err != nil {
63 logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err) 63 logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
64 - } 64 + } else {
65 groupSession.MarkMessage(message, "") 65 groupSession.MarkMessage(message, "")
66 - 66 + }
67 } 67 }
68 return nil 68 return nil
69 } 69 }