作者 kevin

support multiple topics in one kafka

@@ -24,7 +24,8 @@ Processors: @@ -24,7 +24,8 @@ Processors:
24 Brokers: 24 Brokers:
25 - "172.16.186.16:19092" 25 - "172.16.186.16:19092"
26 - "172.16.186.17:19092" 26 - "172.16.186.17:19092"
27 - Topic: k8slog 27 + Topics:
  28 + - k8slog
28 Group: pro 29 Group: pro
29 NumProducers: 16 30 NumProducers: 16
30 Filters: 31 Filters:
@@ -24,7 +24,8 @@ Processors: @@ -24,7 +24,8 @@ Processors:
24 Brokers: 24 Brokers:
25 - "172.16.186.16:19092" 25 - "172.16.186.16:19092"
26 - "172.16.186.17:19092" 26 - "172.16.186.17:19092"
27 - Topic: k8slog 27 + Topics:
  28 + - k8slog
28 Group: pro 29 Group: pro
29 NumProducers: 16 30 NumProducers: 16
30 Filters: 31 Filters:
@@ -3,7 +3,7 @@ package config @@ -3,7 +3,7 @@ package config
3 import ( 3 import (
4 "time" 4 "time"
5 5
6 - "github.com/tal-tech/go-queue/kq" 6 + "github.com/tal-tech/go-zero/core/service"
7 ) 7 )
8 8
9 type ( 9 type (
@@ -31,9 +31,22 @@ type ( @@ -31,9 +31,22 @@ type (
31 Target string `json:",optional"` 31 Target string `json:",optional"`
32 } 32 }
33 33
  34 + KafkaConf struct {
  35 + service.ServiceConf
  36 + Brokers []string
  37 + Group string
  38 + Topics []string
  39 + Offset string `json:",options=first|last,default=last"`
  40 + NumConns int `json:",default=1"`
  41 + NumProducers int `json:",default=8"`
  42 + NumConsumers int `json:",default=8"`
  43 + MinBytes int `json:",default=10240"` // 10K
  44 + MaxBytes int `json:",default=10485760"` // 10M
  45 + }
  46 +
34 Processor struct { 47 Processor struct {
35 Input struct { 48 Input struct {
36 - Kafka kq.KqConf 49 + Kafka KafkaConf
37 } 50 }
38 Filters []Filter `json:",optional"` 51 Filters []Filter `json:",optional"`
39 Output struct { 52 Output struct {
@@ -5,7 +5,8 @@ Processors: @@ -5,7 +5,8 @@ Processors:
5 Brokers: 5 Brokers:
6 - ":172.16.186.16:19092" 6 - ":172.16.186.16:19092"
7 - "172.16.186.17:19092" 7 - "172.16.186.17:19092"
8 - Topic: k8slog 8 + Topics:
  9 + - k8slog
9 Group: pro 10 Group: pro
10 NumProducers: 16 11 NumProducers: 16
11 Filters: 12 Filters:
@@ -18,6 +18,27 @@ import ( @@ -18,6 +18,27 @@ import (
18 18
19 var configFile = flag.String("f", "etc/config.yaml", "Specify the config file") 19 var configFile = flag.String("f", "etc/config.yaml", "Specify the config file")
20 20
  21 +func toKqConf(c config.KafkaConf) []kq.KqConf {
  22 + var ret []kq.KqConf
  23 +
  24 + for _, topic := range c.Topics {
  25 + ret = append(ret, kq.KqConf{
  26 + ServiceConf: c.ServiceConf,
  27 + Brokers: c.Brokers,
  28 + Group: c.Group,
  29 + Topic: topic,
  30 + Offset: c.Offset,
  31 + NumConns: c.NumConns,
  32 + NumProducers: c.NumProducers,
  33 + NumConsumers: c.NumConsumers,
  34 + MinBytes: c.MinBytes,
  35 + MaxBytes: c.MaxBytes,
  36 + })
  37 + }
  38 +
  39 + return ret
  40 +}
  41 +
21 func main() { 42 func main() {
22 flag.Parse() 43 flag.Parse()
23 44
@@ -50,7 +71,9 @@ func main() { @@ -50,7 +71,9 @@ func main() {
50 handle := handler.NewHandler(writer, indexer) 71 handle := handler.NewHandler(writer, indexer)
51 handle.AddFilters(filters...) 72 handle.AddFilters(filters...)
52 handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) 73 handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
53 - group.Add(kq.MustNewQueue(processor.Input.Kafka, handle)) 74 + for _, k := range toKqConf(processor.Input.Kafka) {
  75 + group.Add(kq.MustNewQueue(k, handle))
  76 + }
54 } 77 }
55 78
56 group.Start() 79 group.Start()