作者 kevin

add {.event} in index format

@@ -15,12 +15,12 @@ type ( @@ -15,12 +15,12 @@ type (
15 } 15 }
16 16
17 ElasticSearchConf struct { 17 ElasticSearchConf struct {
18 - Hosts []string  
19 - DailyIndexPrefix string  
20 - DocType string `json:",default=doc"`  
21 - TimeZone string `json:",optional"`  
22 - MaxChunkBytes int `json:",default=1048576"`  
23 - Compress bool `json:",default=false"` 18 + Hosts []string
  19 + Index string
  20 + DocType string `json:",default=doc"`
  21 + TimeZone string `json:",optional"`
  22 + MaxChunkBytes int `json:",default=1048576"`
  23 + Compress bool `json:",default=false"`
24 } 24 }
25 25
26 Filter struct { 26 Filter struct {
@@ -2,40 +2,79 @@ package es @@ -2,40 +2,79 @@ package es
2 2
3 import ( 3 import (
4 "context" 4 "context"
  5 + "fmt"
  6 + "strings"
5 "sync" 7 "sync"
6 "time" 8 "time"
7 9
8 "github.com/olivere/elastic" 10 "github.com/olivere/elastic"
9 "github.com/tal-tech/go-zero/core/fx" 11 "github.com/tal-tech/go-zero/core/fx"
  12 + "github.com/tal-tech/go-zero/core/lang"
10 "github.com/tal-tech/go-zero/core/logx" 13 "github.com/tal-tech/go-zero/core/logx"
11 "github.com/tal-tech/go-zero/core/syncx" 14 "github.com/tal-tech/go-zero/core/syncx"
12 ) 15 )
13 16
14 -const sharedCallsKey = "ensureIndex" 17 +const (
  18 + sharedCallsKey = "ensureIndex"
  19 + timestampFormat = "2006-01-02T15:04:05.000Z"
  20 + timestampKey = "@timestamp"
  21 +)
  22 +
  23 +const (
  24 + stateNormal = iota
  25 + stateWrap
  26 + stateDot
  27 +)
15 28
16 type ( 29 type (
17 - IndexFormat func(time.Time) string 30 + IndexFormat func(m map[string]interface{}) string
18 IndexFunc func() string 31 IndexFunc func() string
19 32
20 Index struct { 33 Index struct {
21 client *elastic.Client 34 client *elastic.Client
22 indexFormat IndexFormat 35 indexFormat IndexFormat
23 - index string 36 + indices map[string]lang.PlaceholderType
24 lock sync.RWMutex 37 lock sync.RWMutex
25 sharedCalls syncx.SharedCalls 38 sharedCalls syncx.SharedCalls
26 } 39 }
27 ) 40 )
28 41
29 -func NewIndex(client *elastic.Client, indexFormat IndexFormat) *Index { 42 +func NewIndex(client *elastic.Client, indexFormat string, loc *time.Location) *Index {
  43 + var formatter func(map[string]interface{}) string
  44 + format, attrs := getFormat(indexFormat)
  45 + if len(attrs) > 0 {
  46 + formatter = func(m map[string]interface{}) string {
  47 + var vals []interface{}
  48 + for _, attr := range attrs {
  49 + if val, ok := m[attr]; ok {
  50 + vals = append(vals, val)
  51 + }
  52 + }
  53 + return getTime(m).In(loc).Format(fmt.Sprintf(format, vals...))
  54 + }
  55 + } else {
  56 + formatter = func(m map[string]interface{}) string {
  57 + return getTime(m).In(loc).Format(format)
  58 + }
  59 + }
  60 +
30 return &Index{ 61 return &Index{
31 client: client, 62 client: client,
32 - indexFormat: indexFormat, 63 + indexFormat: formatter,
  64 + indices: make(map[string]lang.PlaceholderType),
33 sharedCalls: syncx.NewSharedCalls(), 65 sharedCalls: syncx.NewSharedCalls(),
34 } 66 }
35 } 67 }
36 68
37 -func (idx *Index) GetIndex(t time.Time) string {  
38 - index := idx.indexFormat(t) 69 +func (idx *Index) GetIndex(m map[string]interface{}) string {
  70 + index := idx.indexFormat(m)
  71 + idx.lock.RLock()
  72 + if _, ok := idx.indices[index]; ok {
  73 + idx.lock.RUnlock()
  74 + return index
  75 + }
  76 +
  77 + idx.lock.RUnlock()
39 if err := idx.ensureIndex(index); err != nil { 78 if err := idx.ensureIndex(index); err != nil {
40 logx.Error(err) 79 logx.Error(err)
41 } 80 }
@@ -43,17 +82,14 @@ func (idx *Index) GetIndex(t time.Time) string { @@ -43,17 +82,14 @@ func (idx *Index) GetIndex(t time.Time) string {
43 } 82 }
44 83
45 func (idx *Index) ensureIndex(index string) error { 84 func (idx *Index) ensureIndex(index string) error {
46 - idx.lock.RLock()  
47 - if index == idx.index {  
48 - idx.lock.RUnlock()  
49 - return nil  
50 - }  
51 - idx.lock.RUnlock()  
52 -  
53 _, err := idx.sharedCalls.Do(sharedCallsKey, func() (i interface{}, err error) { 85 _, err := idx.sharedCalls.Do(sharedCallsKey, func() (i interface{}, err error) {
54 idx.lock.Lock() 86 idx.lock.Lock()
55 defer idx.lock.Unlock() 87 defer idx.lock.Unlock()
56 88
  89 + if _, ok := idx.indices[index]; ok {
  90 + return nil, nil
  91 + }
  92 +
57 existsService := elastic.NewIndicesExistsService(idx.client) 93 existsService := elastic.NewIndicesExistsService(idx.client)
58 existsService.Index([]string{index}) 94 existsService.Index([]string{index})
59 exist, err := existsService.Do(context.Background()) 95 exist, err := existsService.Do(context.Background())
@@ -61,7 +97,6 @@ func (idx *Index) ensureIndex(index string) error { @@ -61,7 +97,6 @@ func (idx *Index) ensureIndex(index string) error {
61 return nil, err 97 return nil, err
62 } 98 }
63 if exist { 99 if exist {
64 - idx.index = index  
65 return nil, nil 100 return nil, nil
66 } 101 }
67 102
@@ -74,8 +109,53 @@ func (idx *Index) ensureIndex(index string) error { @@ -74,8 +109,53 @@ func (idx *Index) ensureIndex(index string) error {
74 return nil, err 109 return nil, err
75 } 110 }
76 111
77 - idx.index = index 112 + idx.indices[index] = lang.Placeholder
78 return nil, nil 113 return nil, nil
79 }) 114 })
80 return err 115 return err
81 } 116 }
  117 +
  118 +func getTime(m map[string]interface{}) time.Time {
  119 + if ti, ok := m[timestampKey]; ok {
  120 + if ts, ok := ti.(string); ok {
  121 + if t, err := time.Parse(timestampFormat, ts); err == nil {
  122 + return t
  123 + }
  124 + }
  125 + }
  126 +
  127 + return time.Now()
  128 +}
  129 +
  130 +func getFormat(indexFormat string) (format string, attrs []string) {
  131 + var state = stateNormal
  132 + var builder strings.Builder
  133 + var keyBuf strings.Builder
  134 + for _, ch := range indexFormat {
  135 + switch ch {
  136 + case '{':
  137 + state = stateWrap
  138 + case '.':
  139 + if state == stateWrap {
  140 + state = stateDot
  141 + } else {
  142 + builder.WriteRune(ch)
  143 + }
  144 + case '}':
  145 + state = stateNormal
  146 + if keyBuf.Len() > 0 {
  147 + attrs = append(attrs, keyBuf.String())
  148 + builder.WriteString("%s")
  149 + }
  150 + default:
  151 + if state == stateDot {
  152 + keyBuf.WriteRune(ch)
  153 + } else {
  154 + builder.WriteRune(ch)
  155 + }
  156 + }
  157 + }
  158 +
  159 + format = builder.String()
  160 + return
  161 +}
@@ -2,8 +2,8 @@ package es @@ -2,8 +2,8 @@ package es
2 2
3 import ( 3 import (
4 "context" 4 "context"
5 - "time"  
6 5
  6 + jsoniter "github.com/json-iterator/go"
7 "github.com/olivere/elastic" 7 "github.com/olivere/elastic"
8 "github.com/tal-tech/go-stash/stash/config" 8 "github.com/tal-tech/go-stash/stash/config"
9 "github.com/tal-tech/go-zero/core/executors" 9 "github.com/tal-tech/go-zero/core/executors"
@@ -18,9 +18,9 @@ type ( @@ -18,9 +18,9 @@ type (
18 inserter *executors.ChunkExecutor 18 inserter *executors.ChunkExecutor
19 } 19 }
20 20
21 - valueWithTime struct {  
22 - t time.Time  
23 - val string 21 + valueWithIndex struct {
  22 + index string
  23 + val string
24 } 24 }
25 ) 25 )
26 26
@@ -43,18 +43,25 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { @@ -43,18 +43,25 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) {
43 return &writer, nil 43 return &writer, nil
44 } 44 }
45 45
46 -func (w *Writer) Write(t time.Time, val string) error {  
47 - return w.inserter.Add(valueWithTime{  
48 - t: t,  
49 - val: val, 46 +func (w *Writer) Write(m map[string]interface{}) error {
  47 + bs, err := jsoniter.Marshal(m)
  48 + if err != nil {
  49 + return err
  50 + }
  51 +
  52 + index := w.indexer.GetIndex(m)
  53 + val := string(bs)
  54 + return w.inserter.Add(valueWithIndex{
  55 + index: index,
  56 + val: val,
50 }, len(val)) 57 }, len(val))
51 } 58 }
52 59
53 func (w *Writer) execute(vals []interface{}) { 60 func (w *Writer) execute(vals []interface{}) {
54 var bulk = w.client.Bulk() 61 var bulk = w.client.Bulk()
55 for _, val := range vals { 62 for _, val := range vals {
56 - pair := val.(valueWithTime)  
57 - req := elastic.NewBulkIndexRequest().Index(w.indexer.GetIndex(pair.t)).Type(w.docType).Doc(pair.val) 63 + pair := val.(valueWithIndex)
  64 + req := elastic.NewBulkIndexRequest().Index(pair.index).Type(w.docType).Doc(pair.val)
58 bulk.Add(req) 65 bulk.Add(req)
59 } 66 }
60 _, err := bulk.Do(context.Background()) 67 _, err := bulk.Do(context.Background())
@@ -39,4 +39,4 @@ Processors: @@ -39,4 +39,4 @@ Processors:
39 Hosts: 39 Hosts:
40 - "172.16.141.4:9200" 40 - "172.16.141.4:9200"
41 - "172.16.141.5:9200" 41 - "172.16.141.5:9200"
42 - DailyIndexPrefix: "k8s_pro-" 42 + Index: "{.event}-{{yy-mm-dd}}"
1 package handler 1 package handler
2 2
3 import ( 3 import (
4 - "time"  
5 -  
6 jsoniter "github.com/json-iterator/go" 4 jsoniter "github.com/json-iterator/go"
7 "github.com/tal-tech/go-stash/stash/es" 5 "github.com/tal-tech/go-stash/stash/es"
8 "github.com/tal-tech/go-stash/stash/filter" 6 "github.com/tal-tech/go-stash/stash/filter"
9 ) 7 )
10 8
11 -const (  
12 - timestampFormat = "2006-01-02T15:04:05.000Z"  
13 - timestampKey = "@timestamp"  
14 -)  
15 -  
16 type MessageHandler struct { 9 type MessageHandler struct {
17 writer *es.Writer 10 writer *es.Writer
18 filters []filter.FilterFunc 11 filters []filter.FilterFunc
@@ -42,22 +35,5 @@ func (mh *MessageHandler) Consume(_, val string) error { @@ -42,22 +35,5 @@ func (mh *MessageHandler) Consume(_, val string) error {
42 } 35 }
43 } 36 }
44 37
45 - bs, err := jsoniter.Marshal(m)  
46 - if err != nil {  
47 - return err  
48 - }  
49 -  
50 - return mh.writer.Write(mh.getTime(m), string(bs))  
51 -}  
52 -  
53 -func (mh *MessageHandler) getTime(m map[string]interface{}) time.Time {  
54 - if ti, ok := m[timestampKey]; ok {  
55 - if ts, ok := ti.(string); ok {  
56 - if t, err := time.Parse(timestampFormat, ts); err == nil {  
57 - return t  
58 - }  
59 - }  
60 - }  
61 -  
62 - return time.Now() 38 + return mh.writer.Write(m)
63 } 39 }
@@ -37,7 +37,6 @@ func main() { @@ -37,7 +37,6 @@ func main() {
37 ) 37 )
38 logx.Must(err) 38 logx.Must(err)
39 39
40 - indexFormat := processor.Output.ElasticSearch.DailyIndexPrefix + dateFormat  
41 var loc *time.Location 40 var loc *time.Location
42 if len(processor.Output.ElasticSearch.TimeZone) > 0 { 41 if len(processor.Output.ElasticSearch.TimeZone) > 0 {
43 loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone) 42 loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
@@ -45,10 +44,7 @@ func main() { @@ -45,10 +44,7 @@ func main() {
45 } else { 44 } else {
46 loc = time.Local 45 loc = time.Local
47 } 46 }
48 - indexer := es.NewIndex(client, func(t time.Time) string {  
49 - return t.In(loc).Format(indexFormat)  
50 - })  
51 - 47 + indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
52 filters := filter.CreateFilters(processor) 48 filters := filter.CreateFilters(processor)
53 writer, err := es.NewWriter(processor.Output.ElasticSearch, indexer) 49 writer, err := es.NewWriter(processor.Output.ElasticSearch, indexer)
54 logx.Must(err) 50 logx.Must(err)