作者 kevin

support attributes transfer

@@ -49,10 +49,9 @@ Processors: @@ -49,10 +49,9 @@ Processors:
49 - index 49 - index
50 - beat 50 - beat
51 - docker_container 51 - docker_container
52 - - offset  
53 - - prospector  
54 - - source  
55 - - stream 52 + - Action: transfer
  53 + Field: message
  54 + Target: data
56 Output: 55 Output:
57 ElasticSearch: 56 ElasticSearch:
58 Hosts: 57 Hosts:
@@ -49,10 +49,9 @@ Processors: @@ -49,10 +49,9 @@ Processors:
49 - index 49 - index
50 - beat 50 - beat
51 - docker_container 51 - docker_container
52 - - offset  
53 - - prospector  
54 - - source  
55 - - stream 52 + - Action: transfer
  53 + Field: message
  54 + Target: data
56 Output: 55 Output:
57 ElasticSearch: 56 ElasticSearch:
58 Hosts: 57 Hosts:
@@ -24,9 +24,11 @@ type ( @@ -24,9 +24,11 @@ type (
24 } 24 }
25 25
26 Filter struct { 26 Filter struct {
27 - Action string `json:",options=drop|remove_field"` 27 + Action string `json:",options=drop|remove_field|transfer"`
28 Conditions []Condition `json:",optional"` 28 Conditions []Condition `json:",optional"`
29 Fields []string `json:",optional"` 29 Fields []string `json:",optional"`
  30 + Field string `json:",optional"`
  31 + Target string `json:",optional"`
30 } 32 }
31 33
32 Processor struct { 34 Processor struct {
@@ -34,6 +34,9 @@ Processors: @@ -34,6 +34,9 @@ Processors:
34 - prospector 34 - prospector
35 - source 35 - source
36 - stream 36 - stream
  37 + - Action: transfer
  38 + Field: message
  39 + Target: data
37 Output: 40 Output:
38 ElasticSearch: 41 ElasticSearch:
39 Hosts: 42 Hosts:
@@ -5,6 +5,7 @@ import "github.com/tal-tech/go-stash/stash/config" @@ -5,6 +5,7 @@ import "github.com/tal-tech/go-stash/stash/config"
5 const ( 5 const (
6 filterDrop = "drop" 6 filterDrop = "drop"
7 filterRemoveFields = "remove_field" 7 filterRemoveFields = "remove_field"
  8 + filterTransfer = "transfer"
8 opAnd = "and" 9 opAnd = "and"
9 opOr = "or" 10 opOr = "or"
10 typeContains = "contains" 11 typeContains = "contains"
@@ -22,6 +23,8 @@ func CreateFilters(p config.Processor) []FilterFunc { @@ -22,6 +23,8 @@ func CreateFilters(p config.Processor) []FilterFunc {
22 filters = append(filters, DropFilter(f.Conditions)) 23 filters = append(filters, DropFilter(f.Conditions))
23 case filterRemoveFields: 24 case filterRemoveFields:
24 filters = append(filters, RemoveFieldFilter(f.Fields)) 25 filters = append(filters, RemoveFieldFilter(f.Fields))
  26 + case filterTransfer:
  27 + filters = append(filters, TransferFilter(f.Field, f.Target))
25 } 28 }
26 } 29 }
27 30
  1 +package filter
  2 +
  3 +import (
  4 + "testing"
  5 +
  6 + "github.com/stretchr/testify/assert"
  7 +)
  8 +
  9 +func TestRemoveFieldFilter(t *testing.T) {
  10 + tests := []struct {
  11 + name string
  12 + input map[string]interface{}
  13 + fields []string
  14 + expect map[string]interface{}
  15 + }{
  16 + {
  17 + name: "remove field",
  18 + input: map[string]interface{}{
  19 + "a": "aa",
  20 + "b": `{"c":"cc"}`,
  21 + },
  22 + fields: []string{"b"},
  23 + expect: map[string]interface{}{
  24 + "a": "aa",
  25 + },
  26 + },
  27 + {
  28 + name: "remove field",
  29 + input: map[string]interface{}{
  30 + "a": "aa",
  31 + "b": `{"c":"cc"}`,
  32 + },
  33 + fields: []string{"c"},
  34 + expect: map[string]interface{}{
  35 + "a": "aa",
  36 + "b": `{"c":"cc"}`,
  37 + },
  38 + },
  39 + }
  40 +
  41 + for _, test := range tests {
  42 + t.Run(test.name, func(t *testing.T) {
  43 + actual := RemoveFieldFilter(test.fields)(test.input)
  44 + assert.EqualValues(t, test.expect, actual)
  45 + })
  46 + }
  47 +}
  1 +package filter
  2 +
  3 +import (
  4 + jsoniter "github.com/json-iterator/go"
  5 +)
  6 +
  7 +func TransferFilter(field, target string) FilterFunc {
  8 + return func(m map[string]interface{}) map[string]interface{} {
  9 + val, ok := m[field]
  10 + if !ok {
  11 + return m
  12 + }
  13 +
  14 + s, ok := val.(string)
  15 + if !ok {
  16 + return m
  17 + }
  18 +
  19 + var nm map[string]interface{}
  20 + if err := jsoniter.Unmarshal([]byte(s), &nm); err != nil {
  21 + return m
  22 + }
  23 +
  24 + delete(m, field)
  25 + if len(target) > 0 {
  26 + m[target] = nm
  27 + } else {
  28 + for k, v := range nm {
  29 + m[k] = v
  30 + }
  31 + }
  32 +
  33 + return m
  34 + }
  35 +}
  1 +package filter
  2 +
  3 +import (
  4 + "testing"
  5 +
  6 + "github.com/stretchr/testify/assert"
  7 +)
  8 +
  9 +func TestTransferFilter(t *testing.T) {
  10 + tests := []struct {
  11 + name string
  12 + input map[string]interface{}
  13 + field string
  14 + target string
  15 + expect map[string]interface{}
  16 + }{
  17 + {
  18 + name: "with target",
  19 + input: map[string]interface{}{
  20 + "a": "aa",
  21 + "b": `{"c":"cc"}`,
  22 + },
  23 + field: "b",
  24 + target: "data",
  25 + expect: map[string]interface{}{
  26 + "a": "aa",
  27 + "data": map[string]interface{}{
  28 + "c": "cc",
  29 + },
  30 + },
  31 + },
  32 + {
  33 + name: "without target",
  34 + input: map[string]interface{}{
  35 + "a": "aa",
  36 + "b": `{"c":"cc"}`,
  37 + },
  38 + field: "b",
  39 + expect: map[string]interface{}{
  40 + "a": "aa",
  41 + "c": "cc",
  42 + },
  43 + },
  44 + {
  45 + name: "without field",
  46 + input: map[string]interface{}{
  47 + "a": "aa",
  48 + "b": `{"c":"cc"}`,
  49 + },
  50 + field: "c",
  51 + expect: map[string]interface{}{
  52 + "a": "aa",
  53 + "b": `{"c":"cc"}`,
  54 + },
  55 + },
  56 + {
  57 + name: "with not json",
  58 + input: map[string]interface{}{
  59 + "a": "aa",
  60 + "b": `{"c":"cc"`,
  61 + },
  62 + field: "b",
  63 + expect: map[string]interface{}{
  64 + "a": "aa",
  65 + "b": `{"c":"cc"`,
  66 + },
  67 + },
  68 + {
  69 + name: "with not string",
  70 + input: map[string]interface{}{
  71 + "a": "aa",
  72 + "b": map[string]interface{}{"c": "cc"},
  73 + },
  74 + field: "b",
  75 + expect: map[string]interface{}{
  76 + "a": "aa",
  77 + "b": map[string]interface{}{"c": "cc"},
  78 + },
  79 + },
  80 + }
  81 +
  82 + for _, test := range tests {
  83 + t.Run(test.name, func(t *testing.T) {
  84 + actual := TransferFilter(test.field, test.target)(test.input)
  85 + assert.EqualValues(t, test.expect, actual)
  86 + })
  87 + }
  88 +}