offset_request.go 3.0 KB
package sarama

type offsetRequestBlock struct {
	time       int64
	maxOffsets int32 // Only used in version 0
}

func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
	pe.putInt64(int64(b.time))
	if version == 0 {
		pe.putInt32(b.maxOffsets)
	}

	return nil
}

func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
	if b.time, err = pd.getInt64(); err != nil {
		return err
	}
	if version == 0 {
		if b.maxOffsets, err = pd.getInt32(); err != nil {
			return err
		}
	}
	return nil
}

type OffsetRequest struct {
	Version        int16
	replicaID      int32
	isReplicaIDSet bool
	blocks         map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
	if r.isReplicaIDSet {
		pe.putInt32(r.replicaID)
	} else {
		// default replica ID is always -1 for clients
		pe.putInt32(-1)
	}

	err := pe.putArrayLength(len(r.blocks))
	if err != nil {
		return err
	}
	for topic, partitions := range r.blocks {
		err = pe.putString(topic)
		if err != nil {
			return err
		}
		err = pe.putArrayLength(len(partitions))
		if err != nil {
			return err
		}
		for partition, block := range partitions {
			pe.putInt32(partition)
			if err = block.encode(pe, r.Version); err != nil {
				return err
			}
		}
	}
	return nil
}

func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
	r.Version = version

	replicaID, err := pd.getInt32()
	if err != nil {
		return err
	}
	if replicaID >= 0 {
		r.SetReplicaID(replicaID)
	}

	blockCount, err := pd.getArrayLength()
	if err != nil {
		return err
	}
	if blockCount == 0 {
		return nil
	}
	r.blocks = make(map[string]map[int32]*offsetRequestBlock)
	for i := 0; i < blockCount; i++ {
		topic, err := pd.getString()
		if err != nil {
			return err
		}
		partitionCount, err := pd.getArrayLength()
		if err != nil {
			return err
		}
		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
		for j := 0; j < partitionCount; j++ {
			partition, err := pd.getInt32()
			if err != nil {
				return err
			}
			block := &offsetRequestBlock{}
			if err := block.decode(pd, version); err != nil {
				return err
			}
			r.blocks[topic][partition] = block
		}
	}
	return nil
}

func (r *OffsetRequest) key() int16 {
	return 2
}

func (r *OffsetRequest) version() int16 {
	return r.Version
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
	switch r.Version {
	case 1:
		return V0_10_1_0
	default:
		return MinVersion
	}
}

func (r *OffsetRequest) SetReplicaID(id int32) {
	r.replicaID = id
	r.isReplicaIDSet = true
}

func (r *OffsetRequest) ReplicaID() int32 {
	if r.isReplicaIDSet {
		return r.replicaID
	}
	return -1
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
	if r.blocks == nil {
		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
	}

	if r.blocks[topic] == nil {
		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
	}

	tmp := new(offsetRequestBlock)
	tmp.time = time
	if r.Version == 0 {
		tmp.maxOffsets = maxOffsets
	}

	r.blocks[topic][partitionID] = tmp
}