正在显示
7 个修改的文件
包含
160 行增加
和
37 行删除
| @@ -8,6 +8,8 @@ require ( | @@ -8,6 +8,8 @@ require ( | ||
| 8 | github.com/json-iterator/go v1.1.10 | 8 | github.com/json-iterator/go v1.1.10 |
| 9 | github.com/mailru/easyjson v0.7.3 // indirect | 9 | github.com/mailru/easyjson v0.7.3 // indirect |
| 10 | github.com/olivere/elastic v6.2.34+incompatible | 10 | github.com/olivere/elastic v6.2.34+incompatible |
| 11 | + github.com/stretchr/testify v1.5.1 | ||
| 11 | github.com/tal-tech/go-queue v1.0.1 | 12 | github.com/tal-tech/go-queue v1.0.1 |
| 12 | github.com/tal-tech/go-zero v1.0.13 | 13 | github.com/tal-tech/go-zero v1.0.13 |
| 14 | + github.com/vjeantet/jodaTime v1.0.0 | ||
| 13 | ) | 15 | ) |
| @@ -215,6 +215,7 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 | @@ -215,6 +215,7 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 | ||
| 215 | github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= | 215 | github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= |
| 216 | github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | 216 | github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
| 217 | github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | 217 | github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
| 218 | +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= | ||
| 218 | github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= | 219 | github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= |
| 219 | github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | 220 | github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= |
| 220 | github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= | 221 | github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= |
| @@ -236,6 +237,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 | @@ -236,6 +237,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 | ||
| 236 | github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= | 237 | github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= |
| 237 | github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= | 238 | github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= |
| 238 | github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= | 239 | github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= |
| 240 | +github.com/vjeantet/jodaTime v1.0.0 h1:Fq2K9UCsbTFtKbHpe/L7C57XnSgbZ5z+gyGpn7cTE3s= | ||
| 241 | +github.com/vjeantet/jodaTime v1.0.0/go.mod h1:gA+i8InPfZxL1ToHaDpzi6QT/npjl3uPlcV4cxDNerI= | ||
| 239 | github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= | 242 | github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= |
| 240 | github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= | 243 | github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= |
| 241 | github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= | 244 | github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= |
| @@ -58,7 +58,9 @@ Processors: | @@ -58,7 +58,9 @@ Processors: | ||
| 58 | Hosts: | 58 | Hosts: |
| 59 | - "172.16.141.4:9200" | 59 | - "172.16.141.4:9200" |
| 60 | - "172.16.141.5:9200" | 60 | - "172.16.141.5:9200" |
| 61 | - Index: {.event}-2006.01.02 | 61 | + # {.event}是json输入的event属性值 |
| 62 | + # {{yyyy-MM-dd}}表示日期,比如2020-09-09 | ||
| 63 | + Index: {.event}-{{yyyy-MM-dd}} | ||
| 62 | ``` | 64 | ``` |
| 63 | 65 | ||
| 64 | ### 微信交流群 | 66 | ### 微信交流群 |
| @@ -58,5 +58,7 @@ Processors: | @@ -58,5 +58,7 @@ Processors: | ||
| 58 | Hosts: | 58 | Hosts: |
| 59 | - "172.16.141.4:9200" | 59 | - "172.16.141.4:9200" |
| 60 | - "172.16.141.5:9200" | 60 | - "172.16.141.5:9200" |
| 61 | - Index: {.event}-2006.01.02 | 61 | + # {.event} is the value of the json attribute from input |
| 62 | + # {{yyyy-MM-dd}} means date, like 2020-09-09 | ||
| 63 | + Index: {.event}-{{yyyy-MM-dd}} | ||
| 62 | ``` | 64 | ``` |
| @@ -12,16 +12,21 @@ import ( | @@ -12,16 +12,21 @@ import ( | ||
| 12 | "github.com/tal-tech/go-zero/core/lang" | 12 | "github.com/tal-tech/go-zero/core/lang" |
| 13 | "github.com/tal-tech/go-zero/core/logx" | 13 | "github.com/tal-tech/go-zero/core/logx" |
| 14 | "github.com/tal-tech/go-zero/core/syncx" | 14 | "github.com/tal-tech/go-zero/core/syncx" |
| 15 | + "github.com/vjeantet/jodaTime" | ||
| 15 | ) | 16 | ) |
| 16 | 17 | ||
| 17 | const ( | 18 | const ( |
| 18 | timestampFormat = "2006-01-02T15:04:05.000Z" | 19 | timestampFormat = "2006-01-02T15:04:05.000Z" |
| 19 | timestampKey = "@timestamp" | 20 | timestampKey = "@timestamp" |
| 21 | + leftBrace = '{' | ||
| 22 | + rightBrace = '}' | ||
| 23 | + dot = '.' | ||
| 20 | ) | 24 | ) |
| 21 | 25 | ||
| 22 | const ( | 26 | const ( |
| 23 | stateNormal = iota | 27 | stateNormal = iota |
| 24 | stateWrap | 28 | stateWrap |
| 29 | + stateVar | ||
| 25 | stateDot | 30 | stateDot |
| 26 | ) | 31 | ) |
| 27 | 32 | ||
| @@ -39,29 +44,9 @@ type ( | @@ -39,29 +44,9 @@ type ( | ||
| 39 | ) | 44 | ) |
| 40 | 45 | ||
| 41 | func NewIndex(client *elastic.Client, indexFormat string, loc *time.Location) *Index { | 46 | func NewIndex(client *elastic.Client, indexFormat string, loc *time.Location) *Index { |
| 42 | - var formatter func(map[string]interface{}) string | ||
| 43 | - format, attrs := getFormat(indexFormat) | ||
| 44 | - if len(attrs) > 0 { | ||
| 45 | - formatter = func(m map[string]interface{}) string { | ||
| 46 | - var vals []interface{} | ||
| 47 | - for _, attr := range attrs { | ||
| 48 | - if val, ok := m[attr]; ok { | ||
| 49 | - vals = append(vals, val) | ||
| 50 | - } else { | ||
| 51 | - vals = append(vals, "") | ||
| 52 | - } | ||
| 53 | - } | ||
| 54 | - return getTime(m).In(loc).Format(fmt.Sprintf(format, vals...)) | ||
| 55 | - } | ||
| 56 | - } else { | ||
| 57 | - formatter = func(m map[string]interface{}) string { | ||
| 58 | - return getTime(m).In(loc).Format(format) | ||
| 59 | - } | ||
| 60 | - } | ||
| 61 | - | ||
| 62 | return &Index{ | 47 | return &Index{ |
| 63 | client: client, | 48 | client: client, |
| 64 | - indexFormat: formatter, | 49 | + indexFormat: buildIndexFormatter(indexFormat, loc), |
| 65 | indices: make(map[string]lang.PlaceholderType), | 50 | indices: make(map[string]lang.PlaceholderType), |
| 66 | sharedCalls: syncx.NewSharedCalls(), | 51 | sharedCalls: syncx.NewSharedCalls(), |
| 67 | } | 52 | } |
| @@ -121,6 +106,36 @@ func (idx *Index) ensureIndex(index string) error { | @@ -121,6 +106,36 @@ func (idx *Index) ensureIndex(index string) error { | ||
| 121 | return err | 106 | return err |
| 122 | } | 107 | } |
| 123 | 108 | ||
| 109 | +func buildIndexFormatter(indexFormat string, loc *time.Location) func(map[string]interface{}) string { | ||
| 110 | + format, attrs, timePos := getFormat(indexFormat) | ||
| 111 | + if len(attrs) == 0 { | ||
| 112 | + return func(m map[string]interface{}) string { | ||
| 113 | + return format | ||
| 114 | + } | ||
| 115 | + } | ||
| 116 | + | ||
| 117 | + return func(m map[string]interface{}) string { | ||
| 118 | + var vals []interface{} | ||
| 119 | + for i, attr := range attrs { | ||
| 120 | + if i == timePos { | ||
| 121 | + vals = append(vals, formatTime(attr, getTime(m).In(loc))) | ||
| 122 | + continue | ||
| 123 | + } | ||
| 124 | + | ||
| 125 | + if val, ok := m[attr]; ok { | ||
| 126 | + vals = append(vals, val) | ||
| 127 | + } else { | ||
| 128 | + vals = append(vals, "") | ||
| 129 | + } | ||
| 130 | + } | ||
| 131 | + return fmt.Sprintf(format, vals...) | ||
| 132 | + } | ||
| 133 | +} | ||
| 134 | + | ||
| 135 | +func formatTime(format string, t time.Time) string { | ||
| 136 | + return jodaTime.Format(format, t) | ||
| 137 | +} | ||
| 138 | + | ||
| 124 | func getTime(m map[string]interface{}) time.Time { | 139 | func getTime(m map[string]interface{}) time.Time { |
| 125 | if ti, ok := m[timestampKey]; ok { | 140 | if ti, ok := m[timestampKey]; ok { |
| 126 | if ts, ok := ti.(string); ok { | 141 | if ts, ok := ti.(string); ok { |
| @@ -133,32 +148,59 @@ func getTime(m map[string]interface{}) time.Time { | @@ -133,32 +148,59 @@ func getTime(m map[string]interface{}) time.Time { | ||
| 133 | return time.Now() | 148 | return time.Now() |
| 134 | } | 149 | } |
| 135 | 150 | ||
| 136 | -func getFormat(indexFormat string) (format string, attrs []string) { | 151 | +func getFormat(indexFormat string) (format string, attrs []string, timePos int) { |
| 137 | var state = stateNormal | 152 | var state = stateNormal |
| 138 | var builder strings.Builder | 153 | var builder strings.Builder |
| 139 | var keyBuf strings.Builder | 154 | var keyBuf strings.Builder |
| 155 | + timePos = -1 | ||
| 156 | + writeHolder := func() { | ||
| 157 | + if keyBuf.Len() > 0 { | ||
| 158 | + attrs = append(attrs, keyBuf.String()) | ||
| 159 | + keyBuf.Reset() | ||
| 160 | + builder.WriteString("%s") | ||
| 161 | + } | ||
| 162 | + } | ||
| 163 | + | ||
| 140 | for _, ch := range indexFormat { | 164 | for _, ch := range indexFormat { |
| 165 | + switch state { | ||
| 166 | + case stateNormal: | ||
| 141 | switch ch { | 167 | switch ch { |
| 142 | - case '{': | 168 | + case leftBrace: |
| 143 | state = stateWrap | 169 | state = stateWrap |
| 144 | - case '.': | ||
| 145 | - if state == stateWrap { | ||
| 146 | - state = stateDot | ||
| 147 | - } else { | 170 | + default: |
| 148 | builder.WriteRune(ch) | 171 | builder.WriteRune(ch) |
| 149 | } | 172 | } |
| 150 | - case '}': | 173 | + case stateWrap: |
| 174 | + switch ch { | ||
| 175 | + case leftBrace: | ||
| 176 | + state = stateVar | ||
| 177 | + case dot: | ||
| 178 | + state = stateDot | ||
| 179 | + keyBuf.Reset() | ||
| 180 | + case rightBrace: | ||
| 151 | state = stateNormal | 181 | state = stateNormal |
| 152 | - if keyBuf.Len() > 0 { | ||
| 153 | - attrs = append(attrs, keyBuf.String()) | ||
| 154 | - builder.WriteString("%s") | 182 | + timePos = len(attrs) |
| 183 | + writeHolder() | ||
| 184 | + default: | ||
| 185 | + keyBuf.WriteRune(ch) | ||
| 155 | } | 186 | } |
| 187 | + case stateVar: | ||
| 188 | + switch ch { | ||
| 189 | + case rightBrace: | ||
| 190 | + state = stateWrap | ||
| 156 | default: | 191 | default: |
| 157 | - if state == stateDot { | ||
| 158 | keyBuf.WriteRune(ch) | 192 | keyBuf.WriteRune(ch) |
| 159 | - } else { | ||
| 160 | - builder.WriteRune(ch) | ||
| 161 | } | 193 | } |
| 194 | + case stateDot: | ||
| 195 | + switch ch { | ||
| 196 | + case rightBrace: | ||
| 197 | + state = stateNormal | ||
| 198 | + writeHolder() | ||
| 199 | + default: | ||
| 200 | + keyBuf.WriteRune(ch) | ||
| 201 | + } | ||
| 202 | + default: | ||
| 203 | + builder.WriteRune(ch) | ||
| 162 | } | 204 | } |
| 163 | } | 205 | } |
| 164 | 206 |
stash/es/index_test.go
0 → 100644
| 1 | +package es | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "testing" | ||
| 5 | + "time" | ||
| 6 | + | ||
| 7 | + "github.com/stretchr/testify/assert" | ||
| 8 | +) | ||
| 9 | + | ||
| 10 | +const testTime = "2020-09-13T08:22:29.294Z" | ||
| 11 | + | ||
| 12 | +func TestBuildIndexFormatter(t *testing.T) { | ||
| 13 | + tests := []struct { | ||
| 14 | + name string | ||
| 15 | + val string | ||
| 16 | + attrs map[string]interface{} | ||
| 17 | + expect string | ||
| 18 | + }{ | ||
| 19 | + { | ||
| 20 | + name: "plain text only", | ||
| 21 | + val: "yyyy/MM/dd", | ||
| 22 | + expect: "yyyy/MM/dd", | ||
| 23 | + }, | ||
| 24 | + { | ||
| 25 | + name: "time only", | ||
| 26 | + val: "{{yyyy/MM/dd}}", | ||
| 27 | + expect: time.Now().Format("2006/01/02"), | ||
| 28 | + }, | ||
| 29 | + { | ||
| 30 | + name: "attr without time", | ||
| 31 | + val: "{.event}", | ||
| 32 | + attrs: map[string]interface{}{ | ||
| 33 | + "event": "foo", | ||
| 34 | + }, | ||
| 35 | + expect: "foo", | ||
| 36 | + }, | ||
| 37 | + { | ||
| 38 | + name: "attr with time", | ||
| 39 | + val: "{.event}-{{yyyy/MM/dd}}", | ||
| 40 | + attrs: map[string]interface{}{ | ||
| 41 | + "event": "foo", | ||
| 42 | + timestampKey: testTime, | ||
| 43 | + }, | ||
| 44 | + expect: "foo-2020/09/13", | ||
| 45 | + }, | ||
| 46 | + { | ||
| 47 | + name: "attr with time, with missing", | ||
| 48 | + val: "{.event}-{.foo}-{{yyyy/MM/dd}}", | ||
| 49 | + attrs: map[string]interface{}{ | ||
| 50 | + "event": "foo", | ||
| 51 | + timestampKey: testTime, | ||
| 52 | + }, | ||
| 53 | + expect: "foo--2020/09/13", | ||
| 54 | + }, | ||
| 55 | + { | ||
| 56 | + name: "attr with time, leading alphas", | ||
| 57 | + val: "{the.event}-{{yyyy/MM/dd}}", | ||
| 58 | + attrs: map[string]interface{}{ | ||
| 59 | + "event": "foo", | ||
| 60 | + timestampKey: testTime, | ||
| 61 | + }, | ||
| 62 | + expect: "foo-2020/09/13", | ||
| 63 | + }, | ||
| 64 | + } | ||
| 65 | + | ||
| 66 | + for _, test := range tests { | ||
| 67 | + t.Run(test.name, func(t *testing.T) { | ||
| 68 | + formatter := buildIndexFormatter(test.val, time.Local) | ||
| 69 | + assert.Equal(t, test.expect, formatter(test.attrs)) | ||
| 70 | + }) | ||
| 71 | + } | ||
| 72 | +} |
-
请 注册 或 登录 后发表评论