offsets.go 2.0 KB
package cluster

import (
	"sync"

	"github.com/Shopify/sarama"
)

// OffsetStash allows to accumulate offsets and
// mark them as processed in a bulk
type OffsetStash struct {
	offsets map[topicPartition]offsetInfo
	mu      sync.Mutex
}

// NewOffsetStash inits a blank stash
func NewOffsetStash() *OffsetStash {
	return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
}

// MarkOffset stashes the provided message offset
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
	s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}

// MarkPartitionOffset stashes the offset for the provided topic/partition combination
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
	s.mu.Lock()
	defer s.mu.Unlock()

	key := topicPartition{Topic: topic, Partition: partition}
	if info := s.offsets[key]; offset >= info.Offset {
		info.Offset = offset
		info.Metadata = metadata
		s.offsets[key] = info
	}
}

// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
	s.mu.Lock()
	defer s.mu.Unlock()

	key := topicPartition{Topic: topic, Partition: partition}
	if info := s.offsets[key]; offset <= info.Offset {
		info.Offset = offset
		info.Metadata = metadata
		s.offsets[key] = info
	}
}

// ResetOffset stashes the provided message offset
// See ResetPartitionOffset for explanation
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
	s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}

// Offsets returns the latest stashed offsets by topic-partition
func (s *OffsetStash) Offsets() map[string]int64 {
	s.mu.Lock()
	defer s.mu.Unlock()

	res := make(map[string]int64, len(s.offsets))
	for tp, info := range s.offsets {
		res[tp.String()] = info.Offset
	}
	return res
}