作者 kevin

rename to match producer/consumer in kafka

... ... @@ -3,13 +3,14 @@ module github.com/tal-tech/go-stash
go 1.13
require (
github.com/dsymonds/gotoc v0.0.0-20160928043926-5aebcfc91819 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/json-iterator/go v1.1.10
github.com/mailru/easyjson v0.7.3 // indirect
github.com/olivere/elastic v6.2.34+incompatible
github.com/stretchr/testify v1.5.1
github.com/tal-tech/go-queue v1.0.1
github.com/tal-tech/go-zero v1.0.20
github.com/tal-tech/go-queue v1.0.2
github.com/tal-tech/go-zero v1.0.21
github.com/vjeantet/jodaTime v1.0.0
)
... ...
... ... @@ -40,6 +40,7 @@ github.com/dsymonds/gotoc v0.0.0-20160928043926-5aebcfc91819/go.mod h1:MvzMVHq8B
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/emicklei/proto v1.9.0/go.mod h1:rn1FgRS/FANiZdD2djyH7TMA9jdRDcYQ9IEN9yvjX0A=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
... ... @@ -229,6 +230,8 @@ github.com/tal-tech/go-queue v1.0.0 h1:FRRHuWbSUwyfXpQgK4rNa+9iJT0jN6+WBj5OjZWst
github.com/tal-tech/go-queue v1.0.0/go.mod h1:vffND44z/ta5szc5rD0cEAK7qphrgT2rrTxghQ1JWDE=
github.com/tal-tech/go-queue v1.0.1 h1:TSWauhHGxOX1lAQ5p2xQn86eI1GtamiP0kAIQAiS8DY=
github.com/tal-tech/go-queue v1.0.1/go.mod h1:V3cpvoQDKfVpPDvCiPDAZTdR/6RYuFhNGhJeXl4ak9w=
github.com/tal-tech/go-queue v1.0.2 h1:A9V321RMHNITfRvmIvKrBAK3BfoykDNooAttk7pUg1o=
github.com/tal-tech/go-queue v1.0.2/go.mod h1:gQK4Eg8pqel8Z9r1hjlSXbJFavLeJQVyTSwBKeAnpm8=
github.com/tal-tech/go-zero v1.0.8 h1:Wca6UVi5+Pr1GOFCpvcB178EzlecLlKEgGb3v8Ln1dQ=
github.com/tal-tech/go-zero v1.0.8/go.mod h1:/e0i8rMFzFO6Lha+UG9/nkzLSvv5dyYCCN+TFP1JcB0=
github.com/tal-tech/go-zero v1.0.12 h1:ymdKjhNOBaO5Ddqa/XVo4qRwMNsB3LOKWj6UdFJN5rc=
... ... @@ -239,6 +242,8 @@ github.com/tal-tech/go-zero v1.0.14 h1:a8vH2pMCEt+sackLDZEIYL/u7DhRY7SBbN04ATgzU
github.com/tal-tech/go-zero v1.0.14/go.mod h1:y2wBHTkxNJw79K9/wCSeDKzv2pCT6x45oOmXEsJdQK8=
github.com/tal-tech/go-zero v1.0.20 h1:BluZPak0minwoNqhSvtlBuwjDK3Q7Vgke8YSQJS2S+w=
github.com/tal-tech/go-zero v1.0.20/go.mod h1:NgINotJQNboHp9OrqhNzgmk8WbklEkx13tvT4MSPVe4=
github.com/tal-tech/go-zero v1.0.21 h1:IB0c6zmkhUERcagSLPrlVT4oA74yE+l2w2K1ujW0sY8=
github.com/tal-tech/go-zero v1.0.21/go.mod h1:llP5PQjnATfnzZo/lo5unjR41njzoL3lkGO/KXbnisw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
... ...
... ... @@ -17,7 +17,7 @@ gostash -f etc/config.yaml
config.yaml example as below:
```yaml
Processors:
Clusters:
- Input:
Kafka:
Name: gostash
... ... @@ -27,7 +27,7 @@ Processors:
Topics:
- k8slog
Group: pro
NumProducers: 16
Consumers: 16
Filters:
- Action: drop
Conditions:
... ...
... ... @@ -33,18 +33,18 @@ type (
KafkaConf struct {
service.ServiceConf
Brokers []string
Group string
Topics []string
Offset string `json:",options=first|last,default=last"`
NumConns int `json:",default=1"`
NumProducers int `json:",default=8"`
NumConsumers int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
Brokers []string
Group string
Topics []string
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Processors int `json:",default=8"`
MinBytes int `json:",default=10240"` // 10K
MaxBytes int `json:",default=10485760"` // 10M
}
Processor struct {
Cluster struct {
Input struct {
Kafka KafkaConf
}
... ... @@ -55,7 +55,7 @@ type (
}
Config struct {
Processors []Processor
Clusters []Cluster
GracePeriod time.Duration `json:",default=10s"`
}
)
... ...
Processors:
Clusters:
- Input:
Kafka:
Name: gostash
... ... @@ -8,7 +8,7 @@ Processors:
Topics:
- k8slog
Group: pro
NumProducers: 16
Consumers: 16
Filters:
- Action: drop
Conditions:
... ...
... ... @@ -14,7 +14,7 @@ const (
type FilterFunc func(map[string]interface{}) map[string]interface{}
func CreateFilters(p config.Processor) []FilterFunc {
func CreateFilters(p config.Cluster) []FilterFunc {
var filters []FilterFunc
for _, f := range p.Filters {
... ...
... ... @@ -23,16 +23,16 @@ func toKqConf(c config.KafkaConf) []kq.KqConf {
for _, topic := range c.Topics {
ret = append(ret, kq.KqConf{
ServiceConf: c.ServiceConf,
Brokers: c.Brokers,
Group: c.Group,
Topic: topic,
Offset: c.Offset,
NumConns: c.NumConns,
NumProducers: c.NumProducers,
NumConsumers: c.NumConsumers,
MinBytes: c.MinBytes,
MaxBytes: c.MaxBytes,
ServiceConf: c.ServiceConf,
Brokers: c.Brokers,
Group: c.Group,
Topic: topic,
Offset: c.Offset,
Conns: c.Conns,
Consumers: c.Consumers,
Processors: c.Processors,
MinBytes: c.MinBytes,
MaxBytes: c.MaxBytes,
})
}
... ... @@ -49,7 +49,7 @@ func main() {
group := service.NewServiceGroup()
defer group.Stop()
for _, processor := range c.Processors {
for _, processor := range c.Clusters {
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
... ...