|
@@ -10,10 +10,9 @@ import ( |
|
@@ -10,10 +10,9 @@ import ( |
10
|
"github.com/tal-tech/go-zero/core/logx"
|
10
|
"github.com/tal-tech/go-zero/core/logx"
|
11
|
)
|
11
|
)
|
12
|
|
12
|
|
13
|
-const docType = "doc"
|
|
|
14
|
-
|
|
|
15
|
type (
|
13
|
type (
|
16
|
Writer struct {
|
14
|
Writer struct {
|
|
|
15
|
+ docType string
|
17
|
client *elastic.Client
|
16
|
client *elastic.Client
|
18
|
indexer *Index
|
17
|
indexer *Index
|
19
|
inserter *executors.ChunkExecutor
|
18
|
inserter *executors.ChunkExecutor
|
|
@@ -36,6 +35,7 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { |
|
@@ -36,6 +35,7 @@ func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { |
36
|
}
|
35
|
}
|
37
|
|
36
|
|
38
|
writer := Writer{
|
37
|
writer := Writer{
|
|
|
38
|
+ docType: c.DocType,
|
39
|
client: client,
|
39
|
client: client,
|
40
|
indexer: indexer,
|
40
|
indexer: indexer,
|
41
|
}
|
41
|
}
|
|
@@ -54,7 +54,7 @@ func (w *Writer) execute(vals []interface{}) { |
|
@@ -54,7 +54,7 @@ func (w *Writer) execute(vals []interface{}) { |
54
|
var bulk = w.client.Bulk()
|
54
|
var bulk = w.client.Bulk()
|
55
|
for _, val := range vals {
|
55
|
for _, val := range vals {
|
56
|
pair := val.(valueWithTime)
|
56
|
pair := val.(valueWithTime)
|
57
|
- req := elastic.NewBulkIndexRequest().Index(w.indexer.GetIndex(pair.t)).Type(docType).Doc(pair.val)
|
57
|
+ req := elastic.NewBulkIndexRequest().Index(w.indexer.GetIndex(pair.t)).Type(w.docType).Doc(pair.val)
|
58
|
bulk.Add(req)
|
58
|
bulk.Add(req)
|
59
|
}
|
59
|
}
|
60
|
_, err := bulk.Do(context.Background())
|
60
|
_, err := bulk.Do(context.Background())
|