describe_configs_response.go 5.5 KB
package sarama

import (
	"fmt"
	"time"
)

type ConfigSource int8

func (s ConfigSource) String() string {
	switch s {
	case SourceUnknown:
		return "Unknown"
	case SourceTopic:
		return "Topic"
	case SourceDynamicBroker:
		return "DynamicBroker"
	case SourceDynamicDefaultBroker:
		return "DynamicDefaultBroker"
	case SourceStaticBroker:
		return "StaticBroker"
	case SourceDefault:
		return "Default"
	}
	return fmt.Sprintf("Source Invalid: %d", int(s))
}

const (
	SourceUnknown ConfigSource = iota
	SourceTopic
	SourceDynamicBroker
	SourceDynamicDefaultBroker
	SourceStaticBroker
	SourceDefault
)

type DescribeConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*ResourceResponse
}

type ResourceResponse struct {
	ErrorCode int16
	ErrorMsg  string
	Type      ConfigResourceType
	Name      string
	Configs   []*ConfigEntry
}

type ConfigEntry struct {
	Name      string
	Value     string
	ReadOnly  bool
	Default   bool
	Source    ConfigSource
	Sensitive bool
	Synonyms  []*ConfigSynonym
}

type ConfigSynonym struct {
	ConfigName  string
	ConfigValue string
	Source      ConfigSource
}

func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
	if err = pe.putArrayLength(len(r.Resources)); err != nil {
		return err
	}

	for _, c := range r.Resources {
		if err = c.encode(pe, r.Version); err != nil {
			return err
		}
	}

	return nil
}

func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
	r.Version = version
	throttleTime, err := pd.getInt32()
	if err != nil {
		return err
	}
	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

	n, err := pd.getArrayLength()
	if err != nil {
		return err
	}

	r.Resources = make([]*ResourceResponse, n)
	for i := 0; i < n; i++ {
		rr := &ResourceResponse{}
		if err := rr.decode(pd, version); err != nil {
			return err
		}
		r.Resources[i] = rr
	}

	return nil
}

func (r *DescribeConfigsResponse) key() int16 {
	return 32
}

func (r *DescribeConfigsResponse) version() int16 {
	return r.Version
}

func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
	switch r.Version {
	case 1:
		return V1_0_0_0
	case 2:
		return V2_0_0_0
	default:
		return V0_11_0_0
	}
}

func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
	pe.putInt16(r.ErrorCode)

	if err = pe.putString(r.ErrorMsg); err != nil {
		return err
	}

	pe.putInt8(int8(r.Type))

	if err = pe.putString(r.Name); err != nil {
		return err
	}

	if err = pe.putArrayLength(len(r.Configs)); err != nil {
		return err
	}

	for _, c := range r.Configs {
		if err = c.encode(pe, version); err != nil {
			return err
		}
	}
	return nil
}

func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
	ec, err := pd.getInt16()
	if err != nil {
		return err
	}
	r.ErrorCode = ec

	em, err := pd.getString()
	if err != nil {
		return err
	}
	r.ErrorMsg = em

	t, err := pd.getInt8()
	if err != nil {
		return err
	}
	r.Type = ConfigResourceType(t)

	name, err := pd.getString()
	if err != nil {
		return err
	}
	r.Name = name

	n, err := pd.getArrayLength()
	if err != nil {
		return err
	}

	r.Configs = make([]*ConfigEntry, n)
	for i := 0; i < n; i++ {
		c := &ConfigEntry{}
		if err := c.decode(pd, version); err != nil {
			return err
		}
		r.Configs[i] = c
	}
	return nil
}

func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
	if err = pe.putString(r.Name); err != nil {
		return err
	}

	if err = pe.putString(r.Value); err != nil {
		return err
	}

	pe.putBool(r.ReadOnly)

	if version <= 0 {
		pe.putBool(r.Default)
		pe.putBool(r.Sensitive)
	} else {
		pe.putInt8(int8(r.Source))
		pe.putBool(r.Sensitive)

		if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
			return err
		}
		for _, c := range r.Synonyms {
			if err = c.encode(pe, version); err != nil {
				return err
			}
		}
	}

	return nil
}

//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
	if version == 0 {
		r.Source = SourceUnknown
	}
	name, err := pd.getString()
	if err != nil {
		return err
	}
	r.Name = name

	value, err := pd.getString()
	if err != nil {
		return err
	}
	r.Value = value

	read, err := pd.getBool()
	if err != nil {
		return err
	}
	r.ReadOnly = read

	if version == 0 {
		defaultB, err := pd.getBool()
		if err != nil {
			return err
		}
		r.Default = defaultB
	} else {
		source, err := pd.getInt8()
		if err != nil {
			return err
		}
		r.Source = ConfigSource(source)
	}

	sensitive, err := pd.getBool()
	if err != nil {
		return err
	}
	r.Sensitive = sensitive

	if version > 0 {
		n, err := pd.getArrayLength()
		if err != nil {
			return err
		}
		r.Synonyms = make([]*ConfigSynonym, n)

		for i := 0; i < n; i++ {
			s := &ConfigSynonym{}
			if err := s.decode(pd, version); err != nil {
				return err
			}
			r.Synonyms[i] = s
		}

	}
	return nil
}

func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
	err = pe.putString(c.ConfigName)
	if err != nil {
		return err
	}

	err = pe.putString(c.ConfigValue)
	if err != nil {
		return err
	}

	pe.putInt8(int8(c.Source))

	return nil
}

func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
	name, err := pd.getString()
	if err != nil {
		return nil
	}
	c.ConfigName = name

	value, err := pd.getString()
	if err != nil {
		return nil
	}
	c.ConfigValue = value

	source, err := pd.getInt8()
	if err != nil {
		return nil
	}
	c.Source = ConfigSource(source)
	return nil
}