stash.go
1.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"flag"
"time"
"github.com/olivere/elastic"
"github.com/tal-tech/go-queue/kq"
"github.com/tal-tech/go-stash/stash/config"
"github.com/tal-tech/go-stash/stash/es"
"github.com/tal-tech/go-stash/stash/filter"
"github.com/tal-tech/go-stash/stash/handler"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/service"
)
const dateFormat = "2006.01.02"
var configFile = flag.String("f", "etc/config.json", "Specify the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
proc.SetTimeoutToForceQuit(c.GracePeriod)
group := service.NewServiceGroup()
defer group.Stop()
for _, processor := range c.Processors {
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
)
logx.Must(err)
indexFormat := processor.Output.ElasticSearch.DailyIndexPrefix + dateFormat
var loc *time.Location
if len(processor.Output.ElasticSearch.TimeZone) > 0 {
loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
logx.Must(err)
} else {
loc = time.Local
}
indexer := es.NewIndex(client, func(t time.Time) string {
return t.In(loc).Format(indexFormat)
})
filters := filter.CreateFilters(processor)
writer, err := es.NewWriter(processor.Output.ElasticSearch, indexer)
logx.Must(err)
handle := handler.NewHandler(writer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
group.Add(kq.MustNewQueue(processor.Input.Kafka, handle))
}
group.Start()
}