consumer_metadata_response.go 1.7 KB
package sarama

import (
	"net"
	"strconv"
)

//ConsumerMetadataResponse holds the response for a consumer group meta data requests
type ConsumerMetadataResponse struct {
	Err             KError
	Coordinator     *Broker
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
	CoordinatorHost string // deprecated: use Coordinator.Addr()
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
	tmp := new(FindCoordinatorResponse)

	if err := tmp.decode(pd, version); err != nil {
		return err
	}

	r.Err = tmp.Err

	r.Coordinator = tmp.Coordinator
	if tmp.Coordinator == nil {
		return nil
	}

	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
	// backwards compatibility
	host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
	if err != nil {
		return err
	}
	port, err := strconv.ParseInt(portstr, 10, 32)
	if err != nil {
		return err
	}
	r.CoordinatorID = r.Coordinator.ID()
	r.CoordinatorHost = host
	r.CoordinatorPort = int32(port)

	return nil
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
	if r.Coordinator == nil {
		r.Coordinator = new(Broker)
		r.Coordinator.id = r.CoordinatorID
		r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
	}

	tmp := &FindCoordinatorResponse{
		Version:     0,
		Err:         r.Err,
		Coordinator: r.Coordinator,
	}

	if err := tmp.encode(pe); err != nil {
		return err
	}

	return nil
}

func (r *ConsumerMetadataResponse) key() int16 {
	return 10
}

func (r *ConsumerMetadataResponse) version() int16 {
	return 0
}

func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
	return V0_8_2_0
}