作者 kevin

support multiple topics

module github.com/tal-tech/go-stash
go 1.14
go 1.13
require (
github.com/fortytw2/leaktest v1.3.0 // indirect
... ... @@ -9,5 +9,5 @@ require (
github.com/mailru/easyjson v0.7.3 // indirect
github.com/olivere/elastic v6.2.34+incompatible
github.com/tal-tech/go-queue v0.0.0-20200901073541-0da84ebed328
github.com/tal-tech/go-zero v1.0.8
github.com/tal-tech/go-zero v1.0.12
)
... ...
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ClickHouse/clickhouse-go v1.4.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/StackExchange/wmi v0.0.0-20170410192909-ea383cf3ba6e/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
... ... @@ -59,6 +60,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-xorm/builder v0.3.4/go.mod h1:KxkQkNN1DpPKTedxXyTQcmH+rXfvk4LZ9SOOBoZBAxw=
... ... @@ -109,6 +111,7 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
... ... @@ -137,6 +140,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kshvakov/clickhouse v1.3.11/go.mod h1:/SVBAcqF3u7rxQ9sTWCZwf8jzzvxiZGeQvtmSF2BBEc=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mailru/easyjson v0.7.3 h1:M6wcO9gFHCIPynXGu4iA+NMs//FCgFUWR2jxqV3/+Xk=
... ... @@ -151,6 +155,7 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
... ... @@ -219,6 +224,8 @@ github.com/tal-tech/go-queue v0.0.0-20200901073541-0da84ebed328 h1:rWeU2ZWx/Jwmd
github.com/tal-tech/go-queue v0.0.0-20200901073541-0da84ebed328/go.mod h1:vffND44z/ta5szc5rD0cEAK7qphrgT2rrTxghQ1JWDE=
github.com/tal-tech/go-zero v1.0.8 h1:Wca6UVi5+Pr1GOFCpvcB178EzlecLlKEgGb3v8Ln1dQ=
github.com/tal-tech/go-zero v1.0.8/go.mod h1:/e0i8rMFzFO6Lha+UG9/nkzLSvv5dyYCCN+TFP1JcB0=
github.com/tal-tech/go-zero v1.0.12 h1:ymdKjhNOBaO5Ddqa/XVo4qRwMNsB3LOKWj6UdFJN5rc=
github.com/tal-tech/go-zero v1.0.12/go.mod h1:y2wBHTkxNJw79K9/wCSeDKzv2pCT6x45oOmXEsJdQK8=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
... ...
... ... @@ -28,7 +28,7 @@ type (
Fields []string `json:",optional"`
}
Config struct {
Processor struct {
Input struct {
Kafka kq.KqConf
}
... ... @@ -36,6 +36,10 @@ type (
Output struct {
ElasticSearch ElasticSearchConf
}
}
Config struct {
Processors []Processor
GracePeriod time.Duration `json:",default=10s"`
}
)
... ...
---
Input:
Kafka:
Name: gostash
Brokers:
- 172.16.186.16:19092
- 172.16.186.17:19092
Topic: k8slog
Group: pro
NumProducers: 16
Filters:
- Action: drop
Conditions:
- Key: k8s_container_name
Value: "-rpc"
Type: contains
- Key: level
Value: info
Type: match
Op: and
- Action: remove_field
Fields:
- message
- _source
- _type
- _score
- _id
- "@version"
- topic
- index
- beat
- docker_container
- offset
- prospector
- source
- stream
Output:
ElasticSearch:
Hosts:
- 172.16.141.4:9200
- 172.16.141.5:9200
DailyIndexPrefix: k8s_pro-
Processors:
- Input:
Kafka:
Name: gostash
Brokers:
- ":172.16.186.16:19092"
- "172.16.186.17:19092"
Topic: k8slog
Group: pro
NumProducers: 16
Filters:
- Action: drop
Conditions:
- Key: k8s_container_name
Value: "-rpc"
Type: contains
- Key: level
Value: info
Type: match
Op: and
- Action: remove_field
Fields:
- message
- _source
- _type
- _score
- _id
- "@version"
- topic
- index
- beat
- docker_container
- offset
- prospector
- source
- stream
Output:
ElasticSearch:
Hosts:
- "172.16.141.4:9200"
- "172.16.141.5:9200"
DailyIndexPrefix: "k8s_pro-"
... ...
... ... @@ -13,10 +13,10 @@ const (
type FilterFunc func(map[string]interface{}) map[string]interface{}
func CreateFilters(c config.Config) []FilterFunc {
func CreateFilters(p config.Processor) []FilterFunc {
var filters []FilterFunc
for _, f := range c.Filters {
for _, f := range p.Filters {
switch f.Action {
case filterDrop:
filters = append(filters, DropFilter(f.Conditions))
... ...
... ... @@ -13,6 +13,7 @@ import (
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/service"
)
const dateFormat = "2006.01.02"
... ... @@ -26,31 +27,37 @@ func main() {
conf.MustLoad(*configFile, &c)
proc.SetTimeoutToForceQuit(c.GracePeriod)
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(c.Output.ElasticSearch.Hosts...),
)
logx.Must(err)
group := service.NewServiceGroup()
defer group.Stop()
indexFormat := c.Output.ElasticSearch.DailyIndexPrefix + dateFormat
var loc *time.Location
if len(c.Output.ElasticSearch.TimeZone) > 0 {
loc, err = time.LoadLocation(c.Output.ElasticSearch.TimeZone)
for _, processor := range c.Processors {
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
)
logx.Must(err)
} else {
loc = time.Local
indexFormat := processor.Output.ElasticSearch.DailyIndexPrefix + dateFormat
var loc *time.Location
if len(processor.Output.ElasticSearch.TimeZone) > 0 {
loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
logx.Must(err)
} else {
loc = time.Local
}
indexer := es.NewIndex(client, func(t time.Time) string {
return t.In(loc).Format(indexFormat)
})
filters := filter.CreateFilters(processor)
writer, err := es.NewWriter(processor.Output.ElasticSearch, indexer)
logx.Must(err)
handle := handler.NewHandler(writer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
group.Add(kq.MustNewQueue(processor.Input.Kafka, handle))
}
indexer := es.NewIndex(client, func(t time.Time) string {
return t.In(loc).Format(indexFormat)
})
filters := filter.CreateFilters(c)
writer, err := es.NewWriter(c.Output.ElasticSearch, indexer)
logx.Must(err)
handle := handler.NewHandler(writer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
q := kq.MustNewQueue(c.Input.Kafka, handle)
q.Start()
group.Start()
}
... ...