作者 kevin

fix bug

@@ -3,7 +3,6 @@ package es @@ -3,7 +3,6 @@ package es
3 import ( 3 import (
4 "context" 4 "context"
5 5
6 - jsoniter "github.com/json-iterator/go"  
7 "github.com/olivere/elastic" 6 "github.com/olivere/elastic"
8 "github.com/tal-tech/go-stash/stash/config" 7 "github.com/tal-tech/go-stash/stash/config"
9 "github.com/tal-tech/go-zero/core/executors" 8 "github.com/tal-tech/go-zero/core/executors"
@@ -14,7 +13,6 @@ type ( @@ -14,7 +13,6 @@ type (
14 Writer struct { 13 Writer struct {
15 docType string 14 docType string
16 client *elastic.Client 15 client *elastic.Client
17 - indexer *Index  
18 inserter *executors.ChunkExecutor 16 inserter *executors.ChunkExecutor
19 } 17 }
20 18
@@ -24,7 +22,7 @@ type ( @@ -24,7 +22,7 @@ type (
24 } 22 }
25 ) 23 )
26 24
27 -func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { 25 +func NewWriter(c config.ElasticSearchConf) (*Writer, error) {
28 client, err := elastic.NewClient( 26 client, err := elastic.NewClient(
29 elastic.SetSniff(false), 27 elastic.SetSniff(false),
30 elastic.SetURL(c.Hosts...), 28 elastic.SetURL(c.Hosts...),
@@ -37,20 +35,12 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { @@ -37,20 +35,12 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) {
37 writer := Writer{ 35 writer := Writer{
38 docType: c.DocType, 36 docType: c.DocType,
39 client: client, 37 client: client,
40 - indexer: indexer,  
41 } 38 }
42 writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes)) 39 writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes))
43 return &writer, nil 40 return &writer, nil
44 } 41 }
45 42
46 -func (w *Writer) Write(m map[string]interface{}) error {  
47 - bs, err := jsoniter.Marshal(m)  
48 - if err != nil {  
49 - return err  
50 - }  
51 -  
52 - index := w.indexer.GetIndex(m)  
53 - val := string(bs) 43 +func (w *Writer) Write(index, val string) error {
54 return w.inserter.Add(valueWithIndex{ 44 return w.inserter.Add(valueWithIndex{
55 index: index, 45 index: index,
56 val: val, 46 val: val,
@@ -8,12 +8,14 @@ import ( @@ -8,12 +8,14 @@ import (
8 8
9 type MessageHandler struct { 9 type MessageHandler struct {
10 writer *es.Writer 10 writer *es.Writer
  11 + indexer *es.Index
11 filters []filter.FilterFunc 12 filters []filter.FilterFunc
12 } 13 }
13 14
14 -func NewHandler(writer *es.Writer) *MessageHandler { 15 +func NewHandler(writer *es.Writer, indexer *es.Index) *MessageHandler {
15 return &MessageHandler{ 16 return &MessageHandler{
16 writer: writer, 17 writer: writer,
  18 + indexer: indexer,
17 } 19 }
18 } 20 }
19 21
@@ -29,11 +31,17 @@ func (mh *MessageHandler) Consume(_, val string) error { @@ -29,11 +31,17 @@ func (mh *MessageHandler) Consume(_, val string) error {
29 return err 31 return err
30 } 32 }
31 33
  34 + index := mh.indexer.GetIndex(m)
32 for _, proc := range mh.filters { 35 for _, proc := range mh.filters {
33 if m = proc(m); m == nil { 36 if m = proc(m); m == nil {
34 return nil 37 return nil
35 } 38 }
36 } 39 }
37 40
38 - return mh.writer.Write(m) 41 + bs, err := jsoniter.Marshal(m)
  42 + if err != nil {
  43 + return err
  44 + }
  45 +
  46 + return mh.writer.Write(index, string(bs))
39 } 47 }
@@ -35,6 +35,10 @@ func main() { @@ -35,6 +35,10 @@ func main() {
35 ) 35 )
36 logx.Must(err) 36 logx.Must(err)
37 37
  38 + filters := filter.CreateFilters(processor)
  39 + writer, err := es.NewWriter(processor.Output.ElasticSearch)
  40 + logx.Must(err)
  41 +
38 var loc *time.Location 42 var loc *time.Location
39 if len(processor.Output.ElasticSearch.TimeZone) > 0 { 43 if len(processor.Output.ElasticSearch.TimeZone) > 0 {
40 loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone) 44 loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
@@ -43,11 +47,7 @@ func main() { @@ -43,11 +47,7 @@ func main() {
43 loc = time.Local 47 loc = time.Local
44 } 48 }
45 indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc) 49 indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
46 - filters := filter.CreateFilters(processor)  
47 - writer, err := es.NewWriter(processor.Output.ElasticSearch, indexer)  
48 - logx.Must(err)  
49 -  
50 - handle := handler.NewHandler(writer) 50 + handle := handler.NewHandler(writer, indexer)
51 handle.AddFilters(filters...) 51 handle.AddFilters(filters...)
52 handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) 52 handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
53 group.Add(kq.MustNewQueue(processor.Input.Kafka, handle)) 53 group.Add(kq.MustNewQueue(processor.Input.Kafka, handle))