作者 唐旭辉

调试

@@ -7,6 +7,7 @@ require ( @@ -7,6 +7,7 @@ require (
7 github.com/Shopify/sarama v1.23.1 7 github.com/Shopify/sarama v1.23.1
8 github.com/ajg/form v1.5.1 // indirect 8 github.com/ajg/form v1.5.1 // indirect
9 github.com/astaxie/beego v1.12.2 9 github.com/astaxie/beego v1.12.2
  10 + github.com/bsm/sarama-cluster v2.1.15+incompatible
10 github.com/dgrijalva/jwt-go v3.2.0+incompatible 11 github.com/dgrijalva/jwt-go v3.2.0+incompatible
11 github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072 // indirect 12 github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072 // indirect
12 github.com/fatih/structs v1.1.0 // indirect 13 github.com/fatih/structs v1.1.0 // indirect
@@ -57,7 +57,25 @@ func (s SyncOrderService) SyncOrderFromBestshop(cmd command.CreateOrderFromBests @@ -57,7 +57,25 @@ func (s SyncOrderService) SyncOrderFromBestshop(cmd command.CreateOrderFromBests
57 logs.Info("订单已存在,order_code=%s", cmd.OrderCode) 57 logs.Info("订单已存在,order_code=%s", cmd.OrderCode)
58 return nil 58 return nil
59 } 59 }
  60 + err = transactionContext.CommitTransaction()
  61 + if err != nil {
  62 + return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  63 + }
  64 + err = s.CreateOrderFromBestshop(cmd)
  65 + return err
  66 +}
60 67
  68 +func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBestshop) error {
  69 + var (
  70 + transactionContext, _ = factory.CreateTransactionContext(nil)
  71 + err error
  72 + )
  73 + if err = transactionContext.StartTransaction(); err != nil {
  74 + return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  75 + }
  76 + defer func() {
  77 + transactionContext.RollbackTransaction()
  78 + }()
61 var ( 79 var (
62 orderBestshopRepository domain.OrderBestshopRepository 80 orderBestshopRepository domain.OrderBestshopRepository
63 orderGoodBestshopRepository domain.OrderGoodBestshopRepository 81 orderGoodBestshopRepository domain.OrderGoodBestshopRepository
@@ -237,3 +255,85 @@ func (s SyncOrderService) copyOrderBestshopToOrderBase(orderBestshop *domain.Ord @@ -237,3 +255,85 @@ func (s SyncOrderService) copyOrderBestshopToOrderBase(orderBestshop *domain.Ord
237 } 255 }
238 return nil 256 return nil
239 } 257 }
  258 +
  259 +// func (s SyncOrderService) UpdateOrderFromBestshop(cmd command.CreateOrderFromBestshop) error {
  260 +// var (
  261 +// transactionContext, _ = factory.CreateTransactionContext(nil)
  262 +// err error
  263 +// )
  264 +// if err = transactionContext.StartTransaction(); err != nil {
  265 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  266 +// }
  267 +// defer func() {
  268 +// transactionContext.RollbackTransaction()
  269 +// }()
  270 +// var (
  271 +// orderBestshopRepository domain.OrderBestshopRepository
  272 +// orderGoodBestshopRepository domain.OrderGoodBestshopRepository
  273 +// )
  274 +// if orderBestshopRepository, err = factory.CreateOrderBestshopRepository(map[string]interface{}{
  275 +// "transactionContext": transactionContext,
  276 +// }); err != nil {
  277 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  278 +// }
  279 +// if orderGoodBestshopRepository, err = factory.CreateOrderGoodBestshopRepository(map[string]interface{}{
  280 +// "transactionContext": transactionContext,
  281 +// }); err != nil {
  282 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  283 +// }
  284 +// var (
  285 +// orderData *domain.OrderBestShop
  286 +// orderGoods []domain.OrderGoodBestShop
  287 +// )
  288 +// orderData, err = orderBestshopRepository.FindOne(domain.OrderBestshopFindOneQuery{
  289 +// OrderCode: cmd.OrderCode,
  290 +// })
  291 +// if err != nil {
  292 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "获取orderBestshop(order_code=%s)数据失败,err=%s", cmd.OrderCode, err.Error())
  293 +// }
  294 +
  295 +// orderData.OrderCode= cmd.OrderCode,
  296 +// orderData.OrderTime=cmd.OrderTime,
  297 +// orderData.OrderState: cmd.OrderState,
  298 +// orderData.OrderCount: cmd.OrderCount,
  299 +// orderData.OrderAmount: cmd.OrderAmount,
  300 +// orderData.CreateTime: time.Now(),
  301 +// orderData.PartnerId: cmd.PartnerId,
  302 +// BuyerName: cmd.BuyerName,
  303 +// BuyerPhone: cmd.BuyerPhone,
  304 +// BuyerAddress: cmd.BuyerAddress,
  305 +// BuyerRemark: cmd.BuyerRemark,
  306 +// BuyerId: cmd.BuyerId,
  307 +// DeliveryState: cmd.DeliveryState,
  308 +// DeliveryTime: cmd.DeliveryTime,
  309 +// IsCopy: false,
  310 +// CompanyId: cmd.CompanyId,
  311 +// }
  312 +// err = orderBestshopRepository.Add(&order)
  313 +// if err != nil {
  314 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "添加order_bestshop失败:"+err.Error())
  315 +// }
  316 +// goods := []domain.OrderGoodBestShop{}
  317 +// for i := range cmd.Goods {
  318 +// good := domain.OrderGoodBestShop{
  319 +// OrderId: order.Id,
  320 +// Sn: cmd.Goods[i].Sn,
  321 +// Bn: cmd.Goods[i].Bn,
  322 +// Name: cmd.Goods[i].Name,
  323 +// Price: cmd.Goods[i].Price,
  324 +// Nums: cmd.Goods[i].Nums,
  325 +// Amount: cmd.Goods[i].Amount,
  326 +// }
  327 +// err = orderGoodBestshopRepository.Add(&good)
  328 +// if err != nil {
  329 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "添加order_good失败:"+err.Error())
  330 +// }
  331 +// goods = append(goods, good)
  332 +// }
  333 +// order.Goods = goods
  334 +// err = transactionContext.CommitTransaction()
  335 +// if err != nil {
  336 +// return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
  337 +// }
  338 +// return nil
  339 +// }
@@ -59,6 +59,7 @@ func (order OrderBestShop) CopyToOrderBase(o *OrderBase) { @@ -59,6 +59,7 @@ func (order OrderBestShop) CopyToOrderBase(o *OrderBase) {
59 59
60 type OrderBestshopFindOneQuery struct { 60 type OrderBestshopFindOneQuery struct {
61 OrderId int64 61 OrderId int64
  62 + OrderCode string
62 } 63 }
63 64
64 type OrderBestshopRepository interface { 65 type OrderBestshopRepository interface {
@@ -3,12 +3,13 @@ package consumer @@ -3,12 +3,13 @@ package consumer
3 import ( 3 import (
4 "context" 4 "context"
5 "errors" 5 "errors"
6 - "time" 6 + "fmt"
7 7
8 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" 8 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
9 9
10 "github.com/Shopify/sarama" 10 "github.com/Shopify/sarama"
11 "github.com/astaxie/beego/logs" 11 "github.com/astaxie/beego/logs"
  12 + cluster "github.com/bsm/sarama-cluster"
12 ) 13 )
13 14
14 //MessageConsumer 消息消费者 15 //MessageConsumer 消息消费者
@@ -65,6 +66,7 @@ func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) { @@ -65,6 +66,7 @@ func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
65 type Runer struct { 66 type Runer struct {
66 msgConsumer *MessageConsumer 67 msgConsumer *MessageConsumer
67 consumerGroup sarama.ConsumerGroup 68 consumerGroup sarama.ConsumerGroup
  69 + Consumer *cluster.Consumer
68 } 70 }
69 71
70 func NewRuner() *Runer { 72 func NewRuner() *Runer {
@@ -88,43 +90,89 @@ func NewRuner() *Runer { @@ -88,43 +90,89 @@ func NewRuner() *Runer {
88 return r 90 return r
89 } 91 }
90 92
  93 +// func (r *Runer) InitConsumer() error {
  94 +// config := sarama.NewConfig()
  95 +// //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  96 +// config.Consumer.Offsets.Initial = sarama.OffsetOldest
  97 +// config.Version = sarama.V0_10_2_1
  98 +// if err := config.Validate(); err != nil {
  99 +// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
  100 +// logs.Error(msg)
  101 +// panic(msg)
  102 +// }
  103 +
  104 +// consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
  105 +// if err != nil {
  106 +// return err
  107 +// }
  108 +// r.consumerGroup = consumerGroup
  109 +// return nil
  110 +// }
91 func (r *Runer) InitConsumer() error { 111 func (r *Runer) InitConsumer() error {
92 - config := sarama.NewConfig()  
93 - //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin  
94 - config.Consumer.Offsets.Initial = sarama.OffsetOldest  
95 - config.Version = sarama.V0_10_2_1  
96 - consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config) 112 + clusterCfg := cluster.NewConfig()
  113 + clusterCfg.Consumer.Return.Errors = true
  114 + clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  115 + clusterCfg.Group.Return.Notifications = true
  116 + clusterCfg.Version = sarama.V0_10_2_1
  117 + consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
97 if err != nil { 118 if err != nil {
98 - return err 119 + msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
  120 + logs.Error(msg)
  121 + panic(msg)
99 } 122 }
100 - r.consumerGroup = consumerGroup 123 + r.Consumer = consumer
101 return nil 124 return nil
102 } 125 }
103 126
  127 +// func (r *Runer) Start(ctx context.Context) {
  128 +// defer func() {
  129 +// if e := recover(); e != nil {
  130 +// logs.Error(e)
  131 +// }
  132 +// }()
  133 +// for {
  134 +// select {
  135 +// case <-ctx.Done():
  136 +// logs.Warning("ctx cancel;consumerGroup.Close()")
  137 +// r.consumerGroup.Close()
  138 +// return
  139 +// default:
  140 +// if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
  141 +// logs.Error("consumerGroup err:%s \n", err)
  142 +// //等待重试
  143 +// timer := time.NewTimer(5 * time.Second)
  144 +// <-timer.C
  145 +// }
  146 +// r.msgConsumer.ready = make(chan struct{})
  147 +// }
  148 +
  149 +// }
  150 +// }
  151 +
104 func (r *Runer) Start(ctx context.Context) { 152 func (r *Runer) Start(ctx context.Context) {
105 - defer func() {  
106 - if e := recover(); e != nil {  
107 - logs.Error(e)  
108 - }  
109 - }()  
110 for { 153 for {
111 select { 154 select {
  155 + case msg, more := <-r.Consumer.Messages():
  156 + if more {
  157 + fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
  158 + r.Consumer.MarkOffset(msg, "") // mark message as processed
  159 + }
  160 + case err, more := <-r.Consumer.Errors():
  161 + if more {
  162 + fmt.Println("Kafka consumer error: %v", err.Error())
  163 + }
  164 + case ntf, more := <-r.Consumer.Notifications():
  165 + if more {
  166 + fmt.Println("Kafka consumer rebalance: %v", ntf)
  167 + }
112 case <-ctx.Done(): 168 case <-ctx.Done():
113 - logs.Warning("ctx cancel;consumerGroup.Close()")  
114 - r.consumerGroup.Close() 169 + fmt.Errorf("Stop consumer server...")
  170 + r.Consumer.Close()
115 return 171 return
116 - default:  
117 - if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {  
118 - logs.Error("consumerGroup err:%s \n", err)  
119 - //等待重试  
120 - timer := time.NewTimer(5 * time.Second)  
121 - <-timer.C  
122 } 172 }
123 - r.msgConsumer.ready = make(chan struct{})  
124 - }  
125 -  
126 } 173 }
127 } 174 }
  175 +
128 func (r *Runer) IsReady() <-chan struct{} { 176 func (r *Runer) IsReady() <-chan struct{} {
129 return r.msgConsumer.ready 177 return r.msgConsumer.ready
130 } 178 }
1 module github.com/Shopify/sarama 1 module github.com/Shopify/sarama
2 2
3 -go 1.14  
4 -  
5 require ( 3 require (
6 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 4 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798
7 github.com/Shopify/toxiproxy v2.1.4+incompatible 5 github.com/Shopify/toxiproxy v2.1.4+incompatible
  1 +*.log
  2 +*.pid
  3 +kafka*/
  4 +vendor/
  1 +sudo: false
  2 +language: go
  3 +go:
  4 + - 1.10.x
  5 + - 1.9.x
  6 +install:
  7 + - go get -u github.com/golang/dep/cmd/dep
  8 + - dep ensure
  9 +env:
  10 + - SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1
  11 + - SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1
  12 + - SCALA_VERSION=2.12 KAFKA_VERSION=1.1.0
  13 +script:
  14 + - make default test-race
  15 +addons:
  16 + apt:
  17 + packages:
  18 + - oracle-java8-set-default
  1 +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
  2 +
  3 +
  4 +[[projects]]
  5 + name = "github.com/Shopify/sarama"
  6 + packages = ["."]
  7 + revision = "35324cf48e33d8260e1c7c18854465a904ade249"
  8 + version = "v1.17.0"
  9 +
  10 +[[projects]]
  11 + name = "github.com/davecgh/go-spew"
  12 + packages = ["spew"]
  13 + revision = "346938d642f2ec3594ed81d874461961cd0faa76"
  14 + version = "v1.1.0"
  15 +
  16 +[[projects]]
  17 + name = "github.com/eapache/go-resiliency"
  18 + packages = ["breaker"]
  19 + revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce"
  20 + version = "v1.1.0"
  21 +
  22 +[[projects]]
  23 + branch = "master"
  24 + name = "github.com/eapache/go-xerial-snappy"
  25 + packages = ["."]
  26 + revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c"
  27 +
  28 +[[projects]]
  29 + name = "github.com/eapache/queue"
  30 + packages = ["."]
  31 + revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98"
  32 + version = "v1.1.0"
  33 +
  34 +[[projects]]
  35 + branch = "master"
  36 + name = "github.com/golang/snappy"
  37 + packages = ["."]
  38 + revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"
  39 +
  40 +[[projects]]
  41 + name = "github.com/onsi/ginkgo"
  42 + packages = [
  43 + ".",
  44 + "config",
  45 + "extensions/table",
  46 + "internal/codelocation",
  47 + "internal/containernode",
  48 + "internal/failer",
  49 + "internal/leafnodes",
  50 + "internal/remote",
  51 + "internal/spec",
  52 + "internal/spec_iterator",
  53 + "internal/specrunner",
  54 + "internal/suite",
  55 + "internal/testingtproxy",
  56 + "internal/writer",
  57 + "reporters",
  58 + "reporters/stenographer",
  59 + "reporters/stenographer/support/go-colorable",
  60 + "reporters/stenographer/support/go-isatty",
  61 + "types"
  62 + ]
  63 + revision = "fa5fabab2a1bfbd924faf4c067d07ae414e2aedf"
  64 + version = "v1.5.0"
  65 +
  66 +[[projects]]
  67 + name = "github.com/onsi/gomega"
  68 + packages = [
  69 + ".",
  70 + "format",
  71 + "internal/assertion",
  72 + "internal/asyncassertion",
  73 + "internal/oraclematcher",
  74 + "internal/testingtsupport",
  75 + "matchers",
  76 + "matchers/support/goraph/bipartitegraph",
  77 + "matchers/support/goraph/edge",
  78 + "matchers/support/goraph/node",
  79 + "matchers/support/goraph/util",
  80 + "types"
  81 + ]
  82 + revision = "62bff4df71bdbc266561a0caee19f0594b17c240"
  83 + version = "v1.4.0"
  84 +
  85 +[[projects]]
  86 + name = "github.com/pierrec/lz4"
  87 + packages = [
  88 + ".",
  89 + "internal/xxh32"
  90 + ]
  91 + revision = "6b9367c9ff401dbc54fabce3fb8d972e799b702d"
  92 + version = "v2.0.2"
  93 +
  94 +[[projects]]
  95 + branch = "master"
  96 + name = "github.com/rcrowley/go-metrics"
  97 + packages = ["."]
  98 + revision = "e2704e165165ec55d062f5919b4b29494e9fa790"
  99 +
  100 +[[projects]]
  101 + branch = "master"
  102 + name = "golang.org/x/net"
  103 + packages = [
  104 + "html",
  105 + "html/atom",
  106 + "html/charset"
  107 + ]
  108 + revision = "afe8f62b1d6bbd81f31868121a50b06d8188e1f9"
  109 +
  110 +[[projects]]
  111 + branch = "master"
  112 + name = "golang.org/x/sys"
  113 + packages = ["unix"]
  114 + revision = "63fc586f45fe72d95d5240a5d5eb95e6503907d3"
  115 +
  116 +[[projects]]
  117 + name = "golang.org/x/text"
  118 + packages = [
  119 + "encoding",
  120 + "encoding/charmap",
  121 + "encoding/htmlindex",
  122 + "encoding/internal",
  123 + "encoding/internal/identifier",
  124 + "encoding/japanese",
  125 + "encoding/korean",
  126 + "encoding/simplifiedchinese",
  127 + "encoding/traditionalchinese",
  128 + "encoding/unicode",
  129 + "internal/gen",
  130 + "internal/tag",
  131 + "internal/utf8internal",
  132 + "language",
  133 + "runes",
  134 + "transform",
  135 + "unicode/cldr"
  136 + ]
  137 + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
  138 + version = "v0.3.0"
  139 +
  140 +[[projects]]
  141 + name = "gopkg.in/yaml.v2"
  142 + packages = ["."]
  143 + revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
  144 + version = "v2.2.1"
  145 +
  146 +[solve-meta]
  147 + analyzer-name = "dep"
  148 + analyzer-version = 1
  149 + inputs-digest = "2fa33a2d1ae87e0905ef09332bb4b3fda29179f6bcd48fd3b94070774b9e458b"
  150 + solver-name = "gps-cdcl"
  151 + solver-version = 1
  1 +
  2 +# Gopkg.toml example
  3 +#
  4 +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
  5 +# for detailed Gopkg.toml documentation.
  6 +#
  7 +# required = ["github.com/user/thing/cmd/thing"]
  8 +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
  9 +#
  10 +# [[constraint]]
  11 +# name = "github.com/user/project"
  12 +# version = "1.0.0"
  13 +#
  14 +# [[constraint]]
  15 +# name = "github.com/user/project2"
  16 +# branch = "dev"
  17 +# source = "github.com/myfork/project2"
  18 +#
  19 +# [[override]]
  20 +# name = "github.com/x/y"
  21 +# version = "2.4.0"
  22 +
  23 +
  24 +[[constraint]]
  25 + name = "github.com/Shopify/sarama"
  26 + version = "^1.14.0"
  1 +(The MIT License)
  2 +
  3 +Copyright (c) 2017 Black Square Media Ltd
  4 +
  5 +Permission is hereby granted, free of charge, to any person obtaining
  6 +a copy of this software and associated documentation files (the
  7 +'Software'), to deal in the Software without restriction, including
  8 +without limitation the rights to use, copy, modify, merge, publish,
  9 +distribute, sublicense, and/or sell copies of the Software, and to
  10 +permit persons to whom the Software is furnished to do so, subject to
  11 +the following conditions:
  12 +
  13 +The above copyright notice and this permission notice shall be
  14 +included in all copies or substantial portions of the Software.
  15 +
  16 +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
  17 +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  18 +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
  19 +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
  20 +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
  21 +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
  22 +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  1 +SCALA_VERSION?= 2.12
  2 +KAFKA_VERSION?= 1.1.0
  3 +KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION)
  4 +KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz
  5 +KAFKA_ROOT= testdata/$(KAFKA_DIR)
  6 +PKG=$(shell go list ./... | grep -v vendor)
  7 +
  8 +default: vet test
  9 +
  10 +vet:
  11 + go vet $(PKG)
  12 +
  13 +test: testdeps
  14 + KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60
  15 +
  16 +test-verbose: testdeps
  17 + KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v
  18 +
  19 +test-race: testdeps
  20 + KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race
  21 +
  22 +testdeps: $(KAFKA_ROOT)
  23 +
  24 +doc: README.md
  25 +
  26 +.PHONY: test testdeps vet doc
  27 +
  28 +# ---------------------------------------------------------------------
  29 +
  30 +$(KAFKA_ROOT):
  31 + @mkdir -p $(dir $@)
  32 + cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz
  33 +
  34 +README.md: README.md.tpl $(wildcard *.go)
  35 + becca -package $(subst $(GOPATH)/src/,,$(PWD))
  1 +# Sarama Cluster
  2 +
  3 +[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster)
  4 +[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster)
  5 +[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
  6 +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
  7 +
  8 +Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
  9 +
  10 +## Documentation
  11 +
  12 +Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
  13 +
  14 +## Examples
  15 +
  16 +Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
  17 +topics and partitions are all passed to the single channel:
  18 +
  19 +```go
  20 +package main
  21 +
  22 +import (
  23 + "fmt"
  24 + "log"
  25 + "os"
  26 + "os/signal"
  27 +
  28 + cluster "github.com/bsm/sarama-cluster"
  29 +)
  30 +
  31 +func main() {
  32 +
  33 + // init (custom) config, enable errors and notifications
  34 + config := cluster.NewConfig()
  35 + config.Consumer.Return.Errors = true
  36 + config.Group.Return.Notifications = true
  37 +
  38 + // init consumer
  39 + brokers := []string{"127.0.0.1:9092"}
  40 + topics := []string{"my_topic", "other_topic"}
  41 + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
  42 + if err != nil {
  43 + panic(err)
  44 + }
  45 + defer consumer.Close()
  46 +
  47 + // trap SIGINT to trigger a shutdown.
  48 + signals := make(chan os.Signal, 1)
  49 + signal.Notify(signals, os.Interrupt)
  50 +
  51 + // consume errors
  52 + go func() {
  53 + for err := range consumer.Errors() {
  54 + log.Printf("Error: %s\n", err.Error())
  55 + }
  56 + }()
  57 +
  58 + // consume notifications
  59 + go func() {
  60 + for ntf := range consumer.Notifications() {
  61 + log.Printf("Rebalanced: %+v\n", ntf)
  62 + }
  63 + }()
  64 +
  65 + // consume messages, watch signals
  66 + for {
  67 + select {
  68 + case msg, ok := <-consumer.Messages():
  69 + if ok {
  70 + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  71 + consumer.MarkOffset(msg, "") // mark message as processed
  72 + }
  73 + case <-signals:
  74 + return
  75 + }
  76 + }
  77 +}
  78 +```
  79 +
  80 +Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
  81 +consumers:
  82 +
  83 +```go
  84 +package main
  85 +
  86 +import (
  87 + "fmt"
  88 + "log"
  89 + "os"
  90 + "os/signal"
  91 +
  92 + cluster "github.com/bsm/sarama-cluster"
  93 +)
  94 +
  95 +func main() {
  96 +
  97 + // init (custom) config, set mode to ConsumerModePartitions
  98 + config := cluster.NewConfig()
  99 + config.Group.Mode = cluster.ConsumerModePartitions
  100 +
  101 + // init consumer
  102 + brokers := []string{"127.0.0.1:9092"}
  103 + topics := []string{"my_topic", "other_topic"}
  104 + consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
  105 + if err != nil {
  106 + panic(err)
  107 + }
  108 + defer consumer.Close()
  109 +
  110 + // trap SIGINT to trigger a shutdown.
  111 + signals := make(chan os.Signal, 1)
  112 + signal.Notify(signals, os.Interrupt)
  113 +
  114 + // consume partitions
  115 + for {
  116 + select {
  117 + case part, ok := <-consumer.Partitions():
  118 + if !ok {
  119 + return
  120 + }
  121 +
  122 + // start a separate goroutine to consume messages
  123 + go func(pc cluster.PartitionConsumer) {
  124 + for msg := range pc.Messages() {
  125 + fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  126 + consumer.MarkOffset(msg, "") // mark message as processed
  127 + }
  128 + }(part)
  129 + case <-signals:
  130 + return
  131 + }
  132 + }
  133 +}
  134 +```
  135 +
  136 +## Running tests
  137 +
  138 +You need to install Ginkgo & Gomega to run tests. Please see
  139 +http://onsi.github.io/ginkgo for more details.
  140 +
  141 +To run tests, call:
  142 +
  143 + $ make test
  144 +
  145 +## Troubleshooting
  146 +
  147 +### Consumer not receiving any messages?
  148 +
  149 +By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
  150 +
  151 +If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
  1 +# Sarama Cluster
  2 +
  3 +[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster)
  4 +[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster)
  5 +[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
  6 +[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
  7 +
  8 +Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
  9 +
  10 +## Documentation
  11 +
  12 +Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
  13 +
  14 +## Examples
  15 +
  16 +Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
  17 +topics and partitions are all passed to the single channel:
  18 +
  19 +```go
  20 +package main
  21 +
  22 +import (
  23 + "fmt"
  24 + "log"
  25 + "os"
  26 + "os/signal"
  27 +
  28 + cluster "github.com/bsm/sarama-cluster"
  29 +)
  30 +
  31 +func main() {{ "ExampleConsumer" | code }}
  32 +```
  33 +
  34 +Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
  35 +consumers:
  36 +
  37 +```go
  38 +package main
  39 +
  40 +import (
  41 + "fmt"
  42 + "log"
  43 + "os"
  44 + "os/signal"
  45 +
  46 + cluster "github.com/bsm/sarama-cluster"
  47 +)
  48 +
  49 +func main() {{ "ExampleConsumer_Partitions" | code }}
  50 +```
  51 +
  52 +## Running tests
  53 +
  54 +You need to install Ginkgo & Gomega to run tests. Please see
  55 +http://onsi.github.io/ginkgo for more details.
  56 +
  57 +To run tests, call:
  58 +
  59 + $ make test
  60 +
  61 +## Troubleshooting
  62 +
  63 +### Consumer not receiving any messages?
  64 +
  65 +By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
  66 +
  67 +If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
  1 +package cluster
  2 +
  3 +import (
  4 + "math"
  5 + "sort"
  6 +
  7 + "github.com/Shopify/sarama"
  8 +)
  9 +
  10 +// NotificationType defines the type of notification
  11 +type NotificationType uint8
  12 +
  13 +// String describes the notification type
  14 +func (t NotificationType) String() string {
  15 + switch t {
  16 + case RebalanceStart:
  17 + return "rebalance start"
  18 + case RebalanceOK:
  19 + return "rebalance OK"
  20 + case RebalanceError:
  21 + return "rebalance error"
  22 + }
  23 + return "unknown"
  24 +}
  25 +
  26 +const (
  27 + UnknownNotification NotificationType = iota
  28 + RebalanceStart
  29 + RebalanceOK
  30 + RebalanceError
  31 +)
  32 +
  33 +// Notification are state events emitted by the consumers on rebalance
  34 +type Notification struct {
  35 + // Type exposes the notification type
  36 + Type NotificationType
  37 +
  38 + // Claimed contains topic/partitions that were claimed by this rebalance cycle
  39 + Claimed map[string][]int32
  40 +
  41 + // Released contains topic/partitions that were released as part of this rebalance cycle
  42 + Released map[string][]int32
  43 +
  44 + // Current are topic/partitions that are currently claimed to the consumer
  45 + Current map[string][]int32
  46 +}
  47 +
  48 +func newNotification(current map[string][]int32) *Notification {
  49 + return &Notification{
  50 + Type: RebalanceStart,
  51 + Current: current,
  52 + }
  53 +}
  54 +
  55 +func (n *Notification) success(current map[string][]int32) *Notification {
  56 + o := &Notification{
  57 + Type: RebalanceOK,
  58 + Claimed: make(map[string][]int32),
  59 + Released: make(map[string][]int32),
  60 + Current: current,
  61 + }
  62 + for topic, partitions := range current {
  63 + o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
  64 + }
  65 + for topic, partitions := range n.Current {
  66 + o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
  67 + }
  68 + return o
  69 +}
  70 +
  71 +// --------------------------------------------------------------------
  72 +
  73 +type topicInfo struct {
  74 + Partitions []int32
  75 + MemberIDs []string
  76 +}
  77 +
  78 +func (info topicInfo) Perform(s Strategy) map[string][]int32 {
  79 + if s == StrategyRoundRobin {
  80 + return info.RoundRobin()
  81 + }
  82 + return info.Ranges()
  83 +}
  84 +
  85 +func (info topicInfo) Ranges() map[string][]int32 {
  86 + sort.Strings(info.MemberIDs)
  87 +
  88 + mlen := len(info.MemberIDs)
  89 + plen := len(info.Partitions)
  90 + res := make(map[string][]int32, mlen)
  91 +
  92 + for pos, memberID := range info.MemberIDs {
  93 + n, i := float64(plen)/float64(mlen), float64(pos)
  94 + min := int(math.Floor(i*n + 0.5))
  95 + max := int(math.Floor((i+1)*n + 0.5))
  96 + sub := info.Partitions[min:max]
  97 + if len(sub) > 0 {
  98 + res[memberID] = sub
  99 + }
  100 + }
  101 + return res
  102 +}
  103 +
  104 +func (info topicInfo) RoundRobin() map[string][]int32 {
  105 + sort.Strings(info.MemberIDs)
  106 +
  107 + mlen := len(info.MemberIDs)
  108 + res := make(map[string][]int32, mlen)
  109 + for i, pnum := range info.Partitions {
  110 + memberID := info.MemberIDs[i%mlen]
  111 + res[memberID] = append(res[memberID], pnum)
  112 + }
  113 + return res
  114 +}
  115 +
  116 +// --------------------------------------------------------------------
  117 +
  118 +type balancer struct {
  119 + client sarama.Client
  120 + topics map[string]topicInfo
  121 +}
  122 +
  123 +func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
  124 + balancer := newBalancer(client)
  125 + for memberID, meta := range members {
  126 + for _, topic := range meta.Topics {
  127 + if err := balancer.Topic(topic, memberID); err != nil {
  128 + return nil, err
  129 + }
  130 + }
  131 + }
  132 + return balancer, nil
  133 +}
  134 +
  135 +func newBalancer(client sarama.Client) *balancer {
  136 + return &balancer{
  137 + client: client,
  138 + topics: make(map[string]topicInfo),
  139 + }
  140 +}
  141 +
  142 +func (r *balancer) Topic(name string, memberID string) error {
  143 + topic, ok := r.topics[name]
  144 + if !ok {
  145 + nums, err := r.client.Partitions(name)
  146 + if err != nil {
  147 + return err
  148 + }
  149 + topic = topicInfo{
  150 + Partitions: nums,
  151 + MemberIDs: make([]string, 0, 1),
  152 + }
  153 + }
  154 + topic.MemberIDs = append(topic.MemberIDs, memberID)
  155 + r.topics[name] = topic
  156 + return nil
  157 +}
  158 +
  159 +func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
  160 + res := make(map[string]map[string][]int32, 1)
  161 + for topic, info := range r.topics {
  162 + for memberID, partitions := range info.Perform(s) {
  163 + if _, ok := res[memberID]; !ok {
  164 + res[memberID] = make(map[string][]int32, 1)
  165 + }
  166 + res[memberID][topic] = partitions
  167 + }
  168 + }
  169 + return res
  170 +}
  1 +package cluster
  2 +
  3 +import (
  4 + "errors"
  5 + "sync/atomic"
  6 +
  7 + "github.com/Shopify/sarama"
  8 +)
  9 +
  10 +var errClientInUse = errors.New("cluster: client is already used by another consumer")
  11 +
  12 +// Client is a group client
  13 +type Client struct {
  14 + sarama.Client
  15 + config Config
  16 +
  17 + inUse uint32
  18 +}
  19 +
  20 +// NewClient creates a new client instance
  21 +func NewClient(addrs []string, config *Config) (*Client, error) {
  22 + if config == nil {
  23 + config = NewConfig()
  24 + }
  25 +
  26 + if err := config.Validate(); err != nil {
  27 + return nil, err
  28 + }
  29 +
  30 + client, err := sarama.NewClient(addrs, &config.Config)
  31 + if err != nil {
  32 + return nil, err
  33 + }
  34 +
  35 + return &Client{Client: client, config: *config}, nil
  36 +}
  37 +
  38 +// ClusterConfig returns the cluster configuration.
  39 +func (c *Client) ClusterConfig() *Config {
  40 + cfg := c.config
  41 + return &cfg
  42 +}
  43 +
  44 +func (c *Client) claim() bool {
  45 + return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
  46 +}
  47 +
  48 +func (c *Client) release() {
  49 + atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
  50 +}
  1 +package cluster
  2 +
  3 +// Strategy for partition to consumer assignement
  4 +type Strategy string
  5 +
  6 +const (
  7 + // StrategyRange is the default and assigns partition ranges to consumers.
  8 + // Example with six partitions and two consumers:
  9 + // C1: [0, 1, 2]
  10 + // C2: [3, 4, 5]
  11 + StrategyRange Strategy = "range"
  12 +
  13 + // StrategyRoundRobin assigns partitions by alternating over consumers.
  14 + // Example with six partitions and two consumers:
  15 + // C1: [0, 2, 4]
  16 + // C2: [1, 3, 5]
  17 + StrategyRoundRobin Strategy = "roundrobin"
  18 +)
  19 +
  20 +// Error instances are wrappers for internal errors with a context and
  21 +// may be returned through the consumer's Errors() channel
  22 +type Error struct {
  23 + Ctx string
  24 + error
  25 +}
  1 +package cluster
  2 +
  3 +import (
  4 + "regexp"
  5 + "time"
  6 +
  7 + "github.com/Shopify/sarama"
  8 +)
  9 +
  10 +var minVersion = sarama.V0_9_0_0
  11 +
  12 +type ConsumerMode uint8
  13 +
  14 +const (
  15 + ConsumerModeMultiplex ConsumerMode = iota
  16 + ConsumerModePartitions
  17 +)
  18 +
  19 +// Config extends sarama.Config with Group specific namespace
  20 +type Config struct {
  21 + sarama.Config
  22 +
  23 + // Group is the namespace for group management properties
  24 + Group struct {
  25 +
  26 + // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
  27 + PartitionStrategy Strategy
  28 +
  29 + // By default, messages and errors from the subscribed topics and partitions are all multiplexed and
  30 + // made available through the consumer's Messages() and Errors() channels.
  31 + //
  32 + // Users who require low-level access can enable ConsumerModePartitions where individual partitions
  33 + // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
  34 + // themselves.
  35 + Mode ConsumerMode
  36 +
  37 + Offsets struct {
  38 + Retry struct {
  39 + // The numer retries when committing offsets (defaults to 3).
  40 + Max int
  41 + }
  42 + Synchronization struct {
  43 + // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
  44 + // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
  45 + DwellTime time.Duration
  46 + }
  47 + }
  48 +
  49 + Session struct {
  50 + // The allowed session timeout for registered consumers (defaults to 30s).
  51 + // Must be within the allowed server range.
  52 + Timeout time.Duration
  53 + }
  54 +
  55 + Heartbeat struct {
  56 + // Interval between each heartbeat (defaults to 3s). It should be no more
  57 + // than 1/3rd of the Group.Session.Timout setting
  58 + Interval time.Duration
  59 + }
  60 +
  61 + // Return specifies which group channels will be populated. If they are set to true,
  62 + // you must read from the respective channels to prevent deadlock.
  63 + Return struct {
  64 + // If enabled, rebalance notification will be returned on the
  65 + // Notifications channel (default disabled).
  66 + Notifications bool
  67 + }
  68 +
  69 + Topics struct {
  70 + // An additional whitelist of topics to subscribe to.
  71 + Whitelist *regexp.Regexp
  72 + // An additional blacklist of topics to avoid. If set, this will precede over
  73 + // the Whitelist setting.
  74 + Blacklist *regexp.Regexp
  75 + }
  76 +
  77 + Member struct {
  78 + // Custom metadata to include when joining the group. The user data for all joined members
  79 + // can be retrieved by sending a DescribeGroupRequest to the broker that is the
  80 + // coordinator for the group.
  81 + UserData []byte
  82 + }
  83 + }
  84 +}
  85 +
  86 +// NewConfig returns a new configuration instance with sane defaults.
  87 +func NewConfig() *Config {
  88 + c := &Config{
  89 + Config: *sarama.NewConfig(),
  90 + }
  91 + c.Group.PartitionStrategy = StrategyRange
  92 + c.Group.Offsets.Retry.Max = 3
  93 + c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
  94 + c.Group.Session.Timeout = 30 * time.Second
  95 + c.Group.Heartbeat.Interval = 3 * time.Second
  96 + c.Config.Version = minVersion
  97 + return c
  98 +}
  99 +
  100 +// Validate checks a Config instance. It will return a
  101 +// sarama.ConfigurationError if the specified values don't make sense.
  102 +func (c *Config) Validate() error {
  103 + if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
  104 + sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
  105 + }
  106 + if c.Group.Session.Timeout%time.Millisecond != 0 {
  107 + sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
  108 + }
  109 + if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
  110 + sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
  111 + }
  112 + if !c.Version.IsAtLeast(minVersion) {
  113 + sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
  114 + c.Version = minVersion
  115 + }
  116 + if err := c.Config.Validate(); err != nil {
  117 + return err
  118 + }
  119 +
  120 + // validate the Group values
  121 + switch {
  122 + case c.Group.Offsets.Retry.Max < 0:
  123 + return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
  124 + case c.Group.Offsets.Synchronization.DwellTime <= 0:
  125 + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
  126 + case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
  127 + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
  128 + case c.Group.Heartbeat.Interval <= 0:
  129 + return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
  130 + case c.Group.Session.Timeout <= 0:
  131 + return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
  132 + case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
  133 + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
  134 + case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
  135 + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
  136 + }
  137 +
  138 + // ensure offset is correct
  139 + switch c.Consumer.Offsets.Initial {
  140 + case sarama.OffsetOldest, sarama.OffsetNewest:
  141 + default:
  142 + return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
  143 + }
  144 +
  145 + return nil
  146 +}
  1 +package cluster
  2 +
  3 +import (
  4 + "sort"
  5 + "sync"
  6 + "sync/atomic"
  7 + "time"
  8 +
  9 + "github.com/Shopify/sarama"
  10 +)
  11 +
  12 +// Consumer is a cluster group consumer
  13 +type Consumer struct {
  14 + client *Client
  15 + ownClient bool
  16 +
  17 + consumer sarama.Consumer
  18 + subs *partitionMap
  19 +
  20 + consumerID string
  21 + groupID string
  22 +
  23 + memberID string
  24 + generationID int32
  25 + membershipMu sync.RWMutex
  26 +
  27 + coreTopics []string
  28 + extraTopics []string
  29 +
  30 + dying, dead chan none
  31 + closeOnce sync.Once
  32 +
  33 + consuming int32
  34 + messages chan *sarama.ConsumerMessage
  35 + errors chan error
  36 + partitions chan PartitionConsumer
  37 + notifications chan *Notification
  38 +
  39 + commitMu sync.Mutex
  40 +}
  41 +
  42 +// NewConsumer initializes a new consumer
  43 +func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
  44 + client, err := NewClient(addrs, config)
  45 + if err != nil {
  46 + return nil, err
  47 + }
  48 +
  49 + consumer, err := NewConsumerFromClient(client, groupID, topics)
  50 + if err != nil {
  51 + return nil, err
  52 + }
  53 + consumer.ownClient = true
  54 + return consumer, nil
  55 +}
  56 +
  57 +// NewConsumerFromClient initializes a new consumer from an existing client.
  58 +//
  59 +// Please note that clients cannot be shared between consumers (due to Kafka internals),
  60 +// they can only be re-used which requires the user to call Close() on the first consumer
  61 +// before using this method again to initialize another one. Attempts to use a client with
  62 +// more than one consumer at a time will return errors.
  63 +func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
  64 + if !client.claim() {
  65 + return nil, errClientInUse
  66 + }
  67 +
  68 + consumer, err := sarama.NewConsumerFromClient(client.Client)
  69 + if err != nil {
  70 + client.release()
  71 + return nil, err
  72 + }
  73 +
  74 + sort.Strings(topics)
  75 + c := &Consumer{
  76 + client: client,
  77 + consumer: consumer,
  78 + subs: newPartitionMap(),
  79 + groupID: groupID,
  80 +
  81 + coreTopics: topics,
  82 +
  83 + dying: make(chan none),
  84 + dead: make(chan none),
  85 +
  86 + messages: make(chan *sarama.ConsumerMessage),
  87 + errors: make(chan error, client.config.ChannelBufferSize),
  88 + partitions: make(chan PartitionConsumer, 1),
  89 + notifications: make(chan *Notification),
  90 + }
  91 + if err := c.client.RefreshCoordinator(groupID); err != nil {
  92 + client.release()
  93 + return nil, err
  94 + }
  95 +
  96 + go c.mainLoop()
  97 + return c, nil
  98 +}
  99 +
  100 +// Messages returns the read channel for the messages that are returned by
  101 +// the broker.
  102 +//
  103 +// This channel will only return if Config.Group.Mode option is set to
  104 +// ConsumerModeMultiplex (default).
  105 +func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
  106 +
  107 +// Partitions returns the read channels for individual partitions of this broker.
  108 +//
  109 +// This will channel will only return if Config.Group.Mode option is set to
  110 +// ConsumerModePartitions.
  111 +//
  112 +// The Partitions() channel must be listened to for the life of this consumer;
  113 +// when a rebalance happens old partitions will be closed (naturally come to
  114 +// completion) and new ones will be emitted. The returned channel will only close
  115 +// when the consumer is completely shut down.
  116 +func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
  117 +
  118 +// Errors returns a read channel of errors that occur during offset management, if
  119 +// enabled. By default, errors are logged and not returned over this channel. If
  120 +// you want to implement any custom error handling, set your config's
  121 +// Consumer.Return.Errors setting to true, and read from this channel.
  122 +func (c *Consumer) Errors() <-chan error { return c.errors }
  123 +
  124 +// Notifications returns a channel of Notifications that occur during consumer
  125 +// rebalancing. Notifications will only be emitted over this channel, if your config's
  126 +// Group.Return.Notifications setting to true.
  127 +func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
  128 +
  129 +// HighWaterMarks returns the current high water marks for each topic and partition
  130 +// Consistency between partitions is not guaranteed since high water marks are updated separately.
  131 +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
  132 +
  133 +// MarkOffset marks the provided message as processed, alongside a metadata string
  134 +// that represents the state of the partition consumer at that point in time. The
  135 +// metadata string can be used by another consumer to restore that state, so it
  136 +// can resume consumption.
  137 +//
  138 +// Note: calling MarkOffset does not necessarily commit the offset to the backend
  139 +// store immediately for efficiency reasons, and it may never be committed if
  140 +// your application crashes. This means that you may end up processing the same
  141 +// message twice, and your processing should ideally be idempotent.
  142 +func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
  143 + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
  144 + sub.MarkOffset(msg.Offset, metadata)
  145 + }
  146 +}
  147 +
  148 +// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
  149 +// See MarkOffset for additional explanation.
  150 +func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  151 + if sub := c.subs.Fetch(topic, partition); sub != nil {
  152 + sub.MarkOffset(offset, metadata)
  153 + }
  154 +}
  155 +
  156 +// MarkOffsets marks stashed offsets as processed.
  157 +// See MarkOffset for additional explanation.
  158 +func (c *Consumer) MarkOffsets(s *OffsetStash) {
  159 + s.mu.Lock()
  160 + defer s.mu.Unlock()
  161 +
  162 + for tp, info := range s.offsets {
  163 + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
  164 + sub.MarkOffset(info.Offset, info.Metadata)
  165 + }
  166 + delete(s.offsets, tp)
  167 + }
  168 +}
  169 +
  170 +// ResetOffsets marks the provided message as processed, alongside a metadata string
  171 +// that represents the state of the partition consumer at that point in time. The
  172 +// metadata string can be used by another consumer to restore that state, so it
  173 +// can resume consumption.
  174 +//
  175 +// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
  176 +func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
  177 + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
  178 + sub.ResetOffset(msg.Offset, metadata)
  179 + }
  180 +}
  181 +
  182 +// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
  183 +// See ResetOffset for additional explanation.
  184 +func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  185 + sub := c.subs.Fetch(topic, partition)
  186 + if sub != nil {
  187 + sub.ResetOffset(offset, metadata)
  188 + }
  189 +}
  190 +
  191 +// ResetOffsets marks stashed offsets as processed.
  192 +// See ResetOffset for additional explanation.
  193 +func (c *Consumer) ResetOffsets(s *OffsetStash) {
  194 + s.mu.Lock()
  195 + defer s.mu.Unlock()
  196 +
  197 + for tp, info := range s.offsets {
  198 + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
  199 + sub.ResetOffset(info.Offset, info.Metadata)
  200 + }
  201 + delete(s.offsets, tp)
  202 + }
  203 +}
  204 +
  205 +// Subscriptions returns the consumed topics and partitions
  206 +func (c *Consumer) Subscriptions() map[string][]int32 {
  207 + return c.subs.Info()
  208 +}
  209 +
  210 +// CommitOffsets allows to manually commit previously marked offsets. By default there is no
  211 +// need to call this function as the consumer will commit offsets automatically
  212 +// using the Config.Consumer.Offsets.CommitInterval setting.
  213 +//
  214 +// Please be aware that calling this function during an internal rebalance cycle may return
  215 +// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
  216 +func (c *Consumer) CommitOffsets() error {
  217 + c.commitMu.Lock()
  218 + defer c.commitMu.Unlock()
  219 +
  220 + memberID, generationID := c.membership()
  221 + req := &sarama.OffsetCommitRequest{
  222 + Version: 2,
  223 + ConsumerGroup: c.groupID,
  224 + ConsumerGroupGeneration: generationID,
  225 + ConsumerID: memberID,
  226 + RetentionTime: -1,
  227 + }
  228 +
  229 + if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
  230 + req.RetentionTime = int64(ns / time.Millisecond)
  231 + }
  232 +
  233 + snap := c.subs.Snapshot()
  234 + dirty := false
  235 + for tp, state := range snap {
  236 + if state.Dirty {
  237 + dirty = true
  238 + req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
  239 + }
  240 + }
  241 + if !dirty {
  242 + return nil
  243 + }
  244 +
  245 + broker, err := c.client.Coordinator(c.groupID)
  246 + if err != nil {
  247 + c.closeCoordinator(broker, err)
  248 + return err
  249 + }
  250 +
  251 + resp, err := broker.CommitOffset(req)
  252 + if err != nil {
  253 + c.closeCoordinator(broker, err)
  254 + return err
  255 + }
  256 +
  257 + for topic, errs := range resp.Errors {
  258 + for partition, kerr := range errs {
  259 + if kerr != sarama.ErrNoError {
  260 + err = kerr
  261 + } else if state, ok := snap[topicPartition{topic, partition}]; ok {
  262 + if sub := c.subs.Fetch(topic, partition); sub != nil {
  263 + sub.markCommitted(state.Info.Offset)
  264 + }
  265 + }
  266 + }
  267 + }
  268 + return err
  269 +}
  270 +
  271 +// Close safely closes the consumer and releases all resources
  272 +func (c *Consumer) Close() (err error) {
  273 + c.closeOnce.Do(func() {
  274 + close(c.dying)
  275 + <-c.dead
  276 +
  277 + if e := c.release(); e != nil {
  278 + err = e
  279 + }
  280 + if e := c.consumer.Close(); e != nil {
  281 + err = e
  282 + }
  283 + close(c.messages)
  284 + close(c.errors)
  285 +
  286 + if e := c.leaveGroup(); e != nil {
  287 + err = e
  288 + }
  289 + close(c.partitions)
  290 + close(c.notifications)
  291 +
  292 + // drain
  293 + for range c.messages {
  294 + }
  295 + for range c.errors {
  296 + }
  297 + for p := range c.partitions {
  298 + _ = p.Close()
  299 + }
  300 + for range c.notifications {
  301 + }
  302 +
  303 + c.client.release()
  304 + if c.ownClient {
  305 + if e := c.client.Close(); e != nil {
  306 + err = e
  307 + }
  308 + }
  309 + })
  310 + return
  311 +}
  312 +
  313 +func (c *Consumer) mainLoop() {
  314 + defer close(c.dead)
  315 + defer atomic.StoreInt32(&c.consuming, 0)
  316 +
  317 + for {
  318 + atomic.StoreInt32(&c.consuming, 0)
  319 +
  320 + // Check if close was requested
  321 + select {
  322 + case <-c.dying:
  323 + return
  324 + default:
  325 + }
  326 +
  327 + // Start next consume cycle
  328 + c.nextTick()
  329 + }
  330 +}
  331 +
  332 +func (c *Consumer) nextTick() {
  333 + // Remember previous subscriptions
  334 + var notification *Notification
  335 + if c.client.config.Group.Return.Notifications {
  336 + notification = newNotification(c.subs.Info())
  337 + }
  338 +
  339 + // Refresh coordinator
  340 + if err := c.refreshCoordinator(); err != nil {
  341 + c.rebalanceError(err, nil)
  342 + return
  343 + }
  344 +
  345 + // Release subscriptions
  346 + if err := c.release(); err != nil {
  347 + c.rebalanceError(err, nil)
  348 + return
  349 + }
  350 +
  351 + // Issue rebalance start notification
  352 + if c.client.config.Group.Return.Notifications {
  353 + c.handleNotification(notification)
  354 + }
  355 +
  356 + // Rebalance, fetch new subscriptions
  357 + subs, err := c.rebalance()
  358 + if err != nil {
  359 + c.rebalanceError(err, notification)
  360 + return
  361 + }
  362 +
  363 + // Coordinate loops, make sure everything is
  364 + // stopped on exit
  365 + tomb := newLoopTomb()
  366 + defer tomb.Close()
  367 +
  368 + // Start the heartbeat
  369 + tomb.Go(c.hbLoop)
  370 +
  371 + // Subscribe to topic/partitions
  372 + if err := c.subscribe(tomb, subs); err != nil {
  373 + c.rebalanceError(err, notification)
  374 + return
  375 + }
  376 +
  377 + // Update/issue notification with new claims
  378 + if c.client.config.Group.Return.Notifications {
  379 + notification = notification.success(subs)
  380 + c.handleNotification(notification)
  381 + }
  382 +
  383 + // Start topic watcher loop
  384 + tomb.Go(c.twLoop)
  385 +
  386 + // Start consuming and committing offsets
  387 + tomb.Go(c.cmLoop)
  388 + atomic.StoreInt32(&c.consuming, 1)
  389 +
  390 + // Wait for signals
  391 + select {
  392 + case <-tomb.Dying():
  393 + case <-c.dying:
  394 + }
  395 +}
  396 +
  397 +// heartbeat loop, triggered by the mainLoop
  398 +func (c *Consumer) hbLoop(stopped <-chan none) {
  399 + ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
  400 + defer ticker.Stop()
  401 +
  402 + for {
  403 + select {
  404 + case <-ticker.C:
  405 + switch err := c.heartbeat(); err {
  406 + case nil, sarama.ErrNoError:
  407 + case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
  408 + return
  409 + default:
  410 + c.handleError(&Error{Ctx: "heartbeat", error: err})
  411 + return
  412 + }
  413 + case <-stopped:
  414 + return
  415 + case <-c.dying:
  416 + return
  417 + }
  418 + }
  419 +}
  420 +
  421 +// topic watcher loop, triggered by the mainLoop
  422 +func (c *Consumer) twLoop(stopped <-chan none) {
  423 + ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
  424 + defer ticker.Stop()
  425 +
  426 + for {
  427 + select {
  428 + case <-ticker.C:
  429 + topics, err := c.client.Topics()
  430 + if err != nil {
  431 + c.handleError(&Error{Ctx: "topics", error: err})
  432 + return
  433 + }
  434 +
  435 + for _, topic := range topics {
  436 + if !c.isKnownCoreTopic(topic) &&
  437 + !c.isKnownExtraTopic(topic) &&
  438 + c.isPotentialExtraTopic(topic) {
  439 + return
  440 + }
  441 + }
  442 + case <-stopped:
  443 + return
  444 + case <-c.dying:
  445 + return
  446 + }
  447 + }
  448 +}
  449 +
  450 +// commit loop, triggered by the mainLoop
  451 +func (c *Consumer) cmLoop(stopped <-chan none) {
  452 + ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
  453 + defer ticker.Stop()
  454 +
  455 + for {
  456 + select {
  457 + case <-ticker.C:
  458 + if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
  459 + c.handleError(&Error{Ctx: "commit", error: err})
  460 + return
  461 + }
  462 + case <-stopped:
  463 + return
  464 + case <-c.dying:
  465 + return
  466 + }
  467 + }
  468 +}
  469 +
  470 +func (c *Consumer) rebalanceError(err error, n *Notification) {
  471 + if n != nil {
  472 + n.Type = RebalanceError
  473 + c.handleNotification(n)
  474 + }
  475 +
  476 + switch err {
  477 + case sarama.ErrRebalanceInProgress:
  478 + default:
  479 + c.handleError(&Error{Ctx: "rebalance", error: err})
  480 + }
  481 +
  482 + select {
  483 + case <-c.dying:
  484 + case <-time.After(c.client.config.Metadata.Retry.Backoff):
  485 + }
  486 +}
  487 +
  488 +func (c *Consumer) handleNotification(n *Notification) {
  489 + if c.client.config.Group.Return.Notifications {
  490 + select {
  491 + case c.notifications <- n:
  492 + case <-c.dying:
  493 + return
  494 + }
  495 + }
  496 +}
  497 +
  498 +func (c *Consumer) handleError(e *Error) {
  499 + if c.client.config.Consumer.Return.Errors {
  500 + select {
  501 + case c.errors <- e:
  502 + case <-c.dying:
  503 + return
  504 + }
  505 + } else {
  506 + sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
  507 + }
  508 +}
  509 +
  510 +// Releases the consumer and commits offsets, called from rebalance() and Close()
  511 +func (c *Consumer) release() (err error) {
  512 + // Stop all consumers
  513 + c.subs.Stop()
  514 +
  515 + // Clear subscriptions on exit
  516 + defer c.subs.Clear()
  517 +
  518 + // Wait for messages to be processed
  519 + timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
  520 + defer timeout.Stop()
  521 +
  522 + select {
  523 + case <-c.dying:
  524 + case <-timeout.C:
  525 + }
  526 +
  527 + // Commit offsets, continue on errors
  528 + if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
  529 + err = e
  530 + }
  531 +
  532 + return
  533 +}
  534 +
  535 +// --------------------------------------------------------------------
  536 +
  537 +// Performs a heartbeat, part of the mainLoop()
  538 +func (c *Consumer) heartbeat() error {
  539 + broker, err := c.client.Coordinator(c.groupID)
  540 + if err != nil {
  541 + c.closeCoordinator(broker, err)
  542 + return err
  543 + }
  544 +
  545 + memberID, generationID := c.membership()
  546 + resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
  547 + GroupId: c.groupID,
  548 + MemberId: memberID,
  549 + GenerationId: generationID,
  550 + })
  551 + if err != nil {
  552 + c.closeCoordinator(broker, err)
  553 + return err
  554 + }
  555 + return resp.Err
  556 +}
  557 +
  558 +// Performs a rebalance, part of the mainLoop()
  559 +func (c *Consumer) rebalance() (map[string][]int32, error) {
  560 + memberID, _ := c.membership()
  561 + sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
  562 +
  563 + allTopics, err := c.client.Topics()
  564 + if err != nil {
  565 + return nil, err
  566 + }
  567 + c.extraTopics = c.selectExtraTopics(allTopics)
  568 + sort.Strings(c.extraTopics)
  569 +
  570 + // Re-join consumer group
  571 + strategy, err := c.joinGroup()
  572 + switch {
  573 + case err == sarama.ErrUnknownMemberId:
  574 + c.membershipMu.Lock()
  575 + c.memberID = ""
  576 + c.membershipMu.Unlock()
  577 + return nil, err
  578 + case err != nil:
  579 + return nil, err
  580 + }
  581 +
  582 + // Sync consumer group state, fetch subscriptions
  583 + subs, err := c.syncGroup(strategy)
  584 + switch {
  585 + case err == sarama.ErrRebalanceInProgress:
  586 + return nil, err
  587 + case err != nil:
  588 + _ = c.leaveGroup()
  589 + return nil, err
  590 + }
  591 + return subs, nil
  592 +}
  593 +
  594 +// Performs the subscription, part of the mainLoop()
  595 +func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
  596 + // fetch offsets
  597 + offsets, err := c.fetchOffsets(subs)
  598 + if err != nil {
  599 + _ = c.leaveGroup()
  600 + return err
  601 + }
  602 +
  603 + // create consumers in parallel
  604 + var mu sync.Mutex
  605 + var wg sync.WaitGroup
  606 +
  607 + for topic, partitions := range subs {
  608 + for _, partition := range partitions {
  609 + wg.Add(1)
  610 +
  611 + info := offsets[topic][partition]
  612 + go func(topic string, partition int32) {
  613 + if e := c.createConsumer(tomb, topic, partition, info); e != nil {
  614 + mu.Lock()
  615 + err = e
  616 + mu.Unlock()
  617 + }
  618 + wg.Done()
  619 + }(topic, partition)
  620 + }
  621 + }
  622 + wg.Wait()
  623 +
  624 + if err != nil {
  625 + _ = c.release()
  626 + _ = c.leaveGroup()
  627 + }
  628 + return err
  629 +}
  630 +
  631 +// --------------------------------------------------------------------
  632 +
  633 +// Send a request to the broker to join group on rebalance()
  634 +func (c *Consumer) joinGroup() (*balancer, error) {
  635 + memberID, _ := c.membership()
  636 + req := &sarama.JoinGroupRequest{
  637 + GroupId: c.groupID,
  638 + MemberId: memberID,
  639 + SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
  640 + ProtocolType: "consumer",
  641 + }
  642 +
  643 + meta := &sarama.ConsumerGroupMemberMetadata{
  644 + Version: 1,
  645 + Topics: append(c.coreTopics, c.extraTopics...),
  646 + UserData: c.client.config.Group.Member.UserData,
  647 + }
  648 + err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
  649 + if err != nil {
  650 + return nil, err
  651 + }
  652 + err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
  653 + if err != nil {
  654 + return nil, err
  655 + }
  656 +
  657 + broker, err := c.client.Coordinator(c.groupID)
  658 + if err != nil {
  659 + c.closeCoordinator(broker, err)
  660 + return nil, err
  661 + }
  662 +
  663 + resp, err := broker.JoinGroup(req)
  664 + if err != nil {
  665 + c.closeCoordinator(broker, err)
  666 + return nil, err
  667 + } else if resp.Err != sarama.ErrNoError {
  668 + c.closeCoordinator(broker, resp.Err)
  669 + return nil, resp.Err
  670 + }
  671 +
  672 + var strategy *balancer
  673 + if resp.LeaderId == resp.MemberId {
  674 + members, err := resp.GetMembers()
  675 + if err != nil {
  676 + return nil, err
  677 + }
  678 +
  679 + strategy, err = newBalancerFromMeta(c.client, members)
  680 + if err != nil {
  681 + return nil, err
  682 + }
  683 + }
  684 +
  685 + c.membershipMu.Lock()
  686 + c.memberID = resp.MemberId
  687 + c.generationID = resp.GenerationId
  688 + c.membershipMu.Unlock()
  689 +
  690 + return strategy, nil
  691 +}
  692 +
  693 +// Send a request to the broker to sync the group on rebalance().
  694 +// Returns a list of topics and partitions to consume.
  695 +func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
  696 + memberID, generationID := c.membership()
  697 + req := &sarama.SyncGroupRequest{
  698 + GroupId: c.groupID,
  699 + MemberId: memberID,
  700 + GenerationId: generationID,
  701 + }
  702 +
  703 + if strategy != nil {
  704 + for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
  705 + if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
  706 + Topics: topics,
  707 + }); err != nil {
  708 + return nil, err
  709 + }
  710 + }
  711 + }
  712 +
  713 + broker, err := c.client.Coordinator(c.groupID)
  714 + if err != nil {
  715 + c.closeCoordinator(broker, err)
  716 + return nil, err
  717 + }
  718 +
  719 + resp, err := broker.SyncGroup(req)
  720 + if err != nil {
  721 + c.closeCoordinator(broker, err)
  722 + return nil, err
  723 + } else if resp.Err != sarama.ErrNoError {
  724 + c.closeCoordinator(broker, resp.Err)
  725 + return nil, resp.Err
  726 + }
  727 +
  728 + // Return if there is nothing to subscribe to
  729 + if len(resp.MemberAssignment) == 0 {
  730 + return nil, nil
  731 + }
  732 +
  733 + // Get assigned subscriptions
  734 + members, err := resp.GetMemberAssignment()
  735 + if err != nil {
  736 + return nil, err
  737 + }
  738 +
  739 + // Sort partitions, for each topic
  740 + for topic := range members.Topics {
  741 + sort.Sort(int32Slice(members.Topics[topic]))
  742 + }
  743 + return members.Topics, nil
  744 +}
  745 +
  746 +// Fetches latest committed offsets for all subscriptions
  747 +func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
  748 + offsets := make(map[string]map[int32]offsetInfo, len(subs))
  749 + req := &sarama.OffsetFetchRequest{
  750 + Version: 1,
  751 + ConsumerGroup: c.groupID,
  752 + }
  753 +
  754 + for topic, partitions := range subs {
  755 + offsets[topic] = make(map[int32]offsetInfo, len(partitions))
  756 + for _, partition := range partitions {
  757 + offsets[topic][partition] = offsetInfo{Offset: -1}
  758 + req.AddPartition(topic, partition)
  759 + }
  760 + }
  761 +
  762 + broker, err := c.client.Coordinator(c.groupID)
  763 + if err != nil {
  764 + c.closeCoordinator(broker, err)
  765 + return nil, err
  766 + }
  767 +
  768 + resp, err := broker.FetchOffset(req)
  769 + if err != nil {
  770 + c.closeCoordinator(broker, err)
  771 + return nil, err
  772 + }
  773 +
  774 + for topic, partitions := range subs {
  775 + for _, partition := range partitions {
  776 + block := resp.GetBlock(topic, partition)
  777 + if block == nil {
  778 + return nil, sarama.ErrIncompleteResponse
  779 + }
  780 +
  781 + if block.Err == sarama.ErrNoError {
  782 + offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
  783 + } else {
  784 + return nil, block.Err
  785 + }
  786 + }
  787 + }
  788 + return offsets, nil
  789 +}
  790 +
  791 +// Send a request to the broker to leave the group on failes rebalance() and on Close()
  792 +func (c *Consumer) leaveGroup() error {
  793 + broker, err := c.client.Coordinator(c.groupID)
  794 + if err != nil {
  795 + c.closeCoordinator(broker, err)
  796 + return err
  797 + }
  798 +
  799 + memberID, _ := c.membership()
  800 + if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
  801 + GroupId: c.groupID,
  802 + MemberId: memberID,
  803 + }); err != nil {
  804 + c.closeCoordinator(broker, err)
  805 + }
  806 + return err
  807 +}
  808 +
  809 +// --------------------------------------------------------------------
  810 +
  811 +func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
  812 + memberID, _ := c.membership()
  813 + sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
  814 +
  815 + // Create partitionConsumer
  816 + pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
  817 + if err != nil {
  818 + return err
  819 + }
  820 +
  821 + // Store in subscriptions
  822 + c.subs.Store(topic, partition, pc)
  823 +
  824 + // Start partition consumer goroutine
  825 + tomb.Go(func(stopper <-chan none) {
  826 + if c.client.config.Group.Mode == ConsumerModePartitions {
  827 + pc.waitFor(stopper, c.errors)
  828 + } else {
  829 + pc.multiplex(stopper, c.messages, c.errors)
  830 + }
  831 + })
  832 +
  833 + if c.client.config.Group.Mode == ConsumerModePartitions {
  834 + c.partitions <- pc
  835 + }
  836 + return nil
  837 +}
  838 +
  839 +func (c *Consumer) commitOffsetsWithRetry(retries int) error {
  840 + err := c.CommitOffsets()
  841 + if err != nil && retries > 0 {
  842 + return c.commitOffsetsWithRetry(retries - 1)
  843 + }
  844 + return err
  845 +}
  846 +
  847 +func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
  848 + if broker != nil {
  849 + _ = broker.Close()
  850 + }
  851 +
  852 + switch err {
  853 + case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
  854 + _ = c.client.RefreshCoordinator(c.groupID)
  855 + }
  856 +}
  857 +
  858 +func (c *Consumer) selectExtraTopics(allTopics []string) []string {
  859 + extra := allTopics[:0]
  860 + for _, topic := range allTopics {
  861 + if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
  862 + extra = append(extra, topic)
  863 + }
  864 + }
  865 + return extra
  866 +}
  867 +
  868 +func (c *Consumer) isKnownCoreTopic(topic string) bool {
  869 + pos := sort.SearchStrings(c.coreTopics, topic)
  870 + return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
  871 +}
  872 +
  873 +func (c *Consumer) isKnownExtraTopic(topic string) bool {
  874 + pos := sort.SearchStrings(c.extraTopics, topic)
  875 + return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
  876 +}
  877 +
  878 +func (c *Consumer) isPotentialExtraTopic(topic string) bool {
  879 + rx := c.client.config.Group.Topics
  880 + if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
  881 + return false
  882 + }
  883 + if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
  884 + return true
  885 + }
  886 + return false
  887 +}
  888 +
  889 +func (c *Consumer) refreshCoordinator() error {
  890 + if err := c.refreshMetadata(); err != nil {
  891 + return err
  892 + }
  893 + return c.client.RefreshCoordinator(c.groupID)
  894 +}
  895 +
  896 +func (c *Consumer) refreshMetadata() (err error) {
  897 + if c.client.config.Metadata.Full {
  898 + err = c.client.RefreshMetadata()
  899 + } else {
  900 + var topics []string
  901 + if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
  902 + err = c.client.RefreshMetadata(topics...)
  903 + }
  904 + }
  905 +
  906 + // maybe we didn't have authorization to describe all topics
  907 + switch err {
  908 + case sarama.ErrTopicAuthorizationFailed:
  909 + err = c.client.RefreshMetadata(c.coreTopics...)
  910 + }
  911 + return
  912 +}
  913 +
  914 +func (c *Consumer) membership() (memberID string, generationID int32) {
  915 + c.membershipMu.RLock()
  916 + memberID, generationID = c.memberID, c.generationID
  917 + c.membershipMu.RUnlock()
  918 + return
  919 +}
  1 +/*
  2 +Package cluster provides cluster extensions for Sarama, enabing users
  3 +to consume topics across from multiple, balanced nodes.
  4 +
  5 +It requires Kafka v0.9+ and follows the steps guide, described in:
  6 +https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
  7 +*/
  8 +package cluster
  1 +package cluster
  2 +
  3 +import (
  4 + "sync"
  5 +
  6 + "github.com/Shopify/sarama"
  7 +)
  8 +
  9 +// OffsetStash allows to accumulate offsets and
  10 +// mark them as processed in a bulk
  11 +type OffsetStash struct {
  12 + offsets map[topicPartition]offsetInfo
  13 + mu sync.Mutex
  14 +}
  15 +
  16 +// NewOffsetStash inits a blank stash
  17 +func NewOffsetStash() *OffsetStash {
  18 + return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
  19 +}
  20 +
  21 +// MarkOffset stashes the provided message offset
  22 +func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
  23 + s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
  24 +}
  25 +
  26 +// MarkPartitionOffset stashes the offset for the provided topic/partition combination
  27 +func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  28 + s.mu.Lock()
  29 + defer s.mu.Unlock()
  30 +
  31 + key := topicPartition{Topic: topic, Partition: partition}
  32 + if info := s.offsets[key]; offset >= info.Offset {
  33 + info.Offset = offset
  34 + info.Metadata = metadata
  35 + s.offsets[key] = info
  36 + }
  37 +}
  38 +
  39 +// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
  40 +// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
  41 +func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
  42 + s.mu.Lock()
  43 + defer s.mu.Unlock()
  44 +
  45 + key := topicPartition{Topic: topic, Partition: partition}
  46 + if info := s.offsets[key]; offset <= info.Offset {
  47 + info.Offset = offset
  48 + info.Metadata = metadata
  49 + s.offsets[key] = info
  50 + }
  51 +}
  52 +
  53 +// ResetOffset stashes the provided message offset
  54 +// See ResetPartitionOffset for explanation
  55 +func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
  56 + s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
  57 +}
  58 +
  59 +// Offsets returns the latest stashed offsets by topic-partition
  60 +func (s *OffsetStash) Offsets() map[string]int64 {
  61 + s.mu.Lock()
  62 + defer s.mu.Unlock()
  63 +
  64 + res := make(map[string]int64, len(s.offsets))
  65 + for tp, info := range s.offsets {
  66 + res[tp.String()] = info.Offset
  67 + }
  68 + return res
  69 +}
  1 +package cluster
  2 +
  3 +import (
  4 + "sort"
  5 + "sync"
  6 + "time"
  7 +
  8 + "github.com/Shopify/sarama"
  9 +)
  10 +
  11 +// PartitionConsumer allows code to consume individual partitions from the cluster.
  12 +//
  13 +// See docs for Consumer.Partitions() for more on how to implement this.
  14 +type PartitionConsumer interface {
  15 + sarama.PartitionConsumer
  16 +
  17 + // Topic returns the consumed topic name
  18 + Topic() string
  19 +
  20 + // Partition returns the consumed partition
  21 + Partition() int32
  22 +
  23 + // InitialOffset returns the offset used for creating the PartitionConsumer instance.
  24 + // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
  25 + InitialOffset() int64
  26 +
  27 + // MarkOffset marks the offset of a message as preocessed.
  28 + MarkOffset(offset int64, metadata string)
  29 +
  30 + // ResetOffset resets the offset to a previously processed message.
  31 + ResetOffset(offset int64, metadata string)
  32 +}
  33 +
  34 +type partitionConsumer struct {
  35 + sarama.PartitionConsumer
  36 +
  37 + state partitionState
  38 + mu sync.Mutex
  39 +
  40 + topic string
  41 + partition int32
  42 + initialOffset int64
  43 +
  44 + closeOnce sync.Once
  45 + closeErr error
  46 +
  47 + dying, dead chan none
  48 +}
  49 +
  50 +func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
  51 + offset := info.NextOffset(defaultOffset)
  52 + pcm, err := manager.ConsumePartition(topic, partition, offset)
  53 +
  54 + // Resume from default offset, if requested offset is out-of-range
  55 + if err == sarama.ErrOffsetOutOfRange {
  56 + info.Offset = -1
  57 + offset = defaultOffset
  58 + pcm, err = manager.ConsumePartition(topic, partition, offset)
  59 + }
  60 + if err != nil {
  61 + return nil, err
  62 + }
  63 +
  64 + return &partitionConsumer{
  65 + PartitionConsumer: pcm,
  66 + state: partitionState{Info: info},
  67 +
  68 + topic: topic,
  69 + partition: partition,
  70 + initialOffset: offset,
  71 +
  72 + dying: make(chan none),
  73 + dead: make(chan none),
  74 + }, nil
  75 +}
  76 +
  77 +// Topic implements PartitionConsumer
  78 +func (c *partitionConsumer) Topic() string { return c.topic }
  79 +
  80 +// Partition implements PartitionConsumer
  81 +func (c *partitionConsumer) Partition() int32 { return c.partition }
  82 +
  83 +// InitialOffset implements PartitionConsumer
  84 +func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
  85 +
  86 +// AsyncClose implements PartitionConsumer
  87 +func (c *partitionConsumer) AsyncClose() {
  88 + c.closeOnce.Do(func() {
  89 + c.closeErr = c.PartitionConsumer.Close()
  90 + close(c.dying)
  91 + })
  92 +}
  93 +
  94 +// Close implements PartitionConsumer
  95 +func (c *partitionConsumer) Close() error {
  96 + c.AsyncClose()
  97 + <-c.dead
  98 + return c.closeErr
  99 +}
  100 +
  101 +func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
  102 + defer close(c.dead)
  103 +
  104 + for {
  105 + select {
  106 + case err, ok := <-c.Errors():
  107 + if !ok {
  108 + return
  109 + }
  110 + select {
  111 + case errors <- err:
  112 + case <-stopper:
  113 + return
  114 + case <-c.dying:
  115 + return
  116 + }
  117 + case <-stopper:
  118 + return
  119 + case <-c.dying:
  120 + return
  121 + }
  122 + }
  123 +}
  124 +
  125 +func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
  126 + defer close(c.dead)
  127 +
  128 + for {
  129 + select {
  130 + case msg, ok := <-c.Messages():
  131 + if !ok {
  132 + return
  133 + }
  134 + select {
  135 + case messages <- msg:
  136 + case <-stopper:
  137 + return
  138 + case <-c.dying:
  139 + return
  140 + }
  141 + case err, ok := <-c.Errors():
  142 + if !ok {
  143 + return
  144 + }
  145 + select {
  146 + case errors <- err:
  147 + case <-stopper:
  148 + return
  149 + case <-c.dying:
  150 + return
  151 + }
  152 + case <-stopper:
  153 + return
  154 + case <-c.dying:
  155 + return
  156 + }
  157 + }
  158 +}
  159 +
  160 +func (c *partitionConsumer) getState() partitionState {
  161 + c.mu.Lock()
  162 + state := c.state
  163 + c.mu.Unlock()
  164 +
  165 + return state
  166 +}
  167 +
  168 +func (c *partitionConsumer) markCommitted(offset int64) {
  169 + c.mu.Lock()
  170 + if offset == c.state.Info.Offset {
  171 + c.state.Dirty = false
  172 + }
  173 + c.mu.Unlock()
  174 +}
  175 +
  176 +// MarkOffset implements PartitionConsumer
  177 +func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
  178 + c.mu.Lock()
  179 + if next := offset + 1; next > c.state.Info.Offset {
  180 + c.state.Info.Offset = next
  181 + c.state.Info.Metadata = metadata
  182 + c.state.Dirty = true
  183 + }
  184 + c.mu.Unlock()
  185 +}
  186 +
  187 +// ResetOffset implements PartitionConsumer
  188 +func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
  189 + c.mu.Lock()
  190 + if next := offset + 1; next <= c.state.Info.Offset {
  191 + c.state.Info.Offset = next
  192 + c.state.Info.Metadata = metadata
  193 + c.state.Dirty = true
  194 + }
  195 + c.mu.Unlock()
  196 +}
  197 +
  198 +// --------------------------------------------------------------------
  199 +
  200 +type partitionState struct {
  201 + Info offsetInfo
  202 + Dirty bool
  203 + LastCommit time.Time
  204 +}
  205 +
  206 +// --------------------------------------------------------------------
  207 +
  208 +type partitionMap struct {
  209 + data map[topicPartition]*partitionConsumer
  210 + mu sync.RWMutex
  211 +}
  212 +
  213 +func newPartitionMap() *partitionMap {
  214 + return &partitionMap{
  215 + data: make(map[topicPartition]*partitionConsumer),
  216 + }
  217 +}
  218 +
  219 +func (m *partitionMap) IsSubscribedTo(topic string) bool {
  220 + m.mu.RLock()
  221 + defer m.mu.RUnlock()
  222 +
  223 + for tp := range m.data {
  224 + if tp.Topic == topic {
  225 + return true
  226 + }
  227 + }
  228 + return false
  229 +}
  230 +
  231 +func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
  232 + m.mu.RLock()
  233 + pc, _ := m.data[topicPartition{topic, partition}]
  234 + m.mu.RUnlock()
  235 + return pc
  236 +}
  237 +
  238 +func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
  239 + m.mu.Lock()
  240 + m.data[topicPartition{topic, partition}] = pc
  241 + m.mu.Unlock()
  242 +}
  243 +
  244 +func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
  245 + m.mu.RLock()
  246 + defer m.mu.RUnlock()
  247 +
  248 + snap := make(map[topicPartition]partitionState, len(m.data))
  249 + for tp, pc := range m.data {
  250 + snap[tp] = pc.getState()
  251 + }
  252 + return snap
  253 +}
  254 +
  255 +func (m *partitionMap) Stop() {
  256 + m.mu.RLock()
  257 + defer m.mu.RUnlock()
  258 +
  259 + var wg sync.WaitGroup
  260 + for tp := range m.data {
  261 + wg.Add(1)
  262 + go func(p *partitionConsumer) {
  263 + _ = p.Close()
  264 + wg.Done()
  265 + }(m.data[tp])
  266 + }
  267 + wg.Wait()
  268 +}
  269 +
  270 +func (m *partitionMap) Clear() {
  271 + m.mu.Lock()
  272 + for tp := range m.data {
  273 + delete(m.data, tp)
  274 + }
  275 + m.mu.Unlock()
  276 +}
  277 +
  278 +func (m *partitionMap) Info() map[string][]int32 {
  279 + info := make(map[string][]int32)
  280 + m.mu.RLock()
  281 + for tp := range m.data {
  282 + info[tp.Topic] = append(info[tp.Topic], tp.Partition)
  283 + }
  284 + m.mu.RUnlock()
  285 +
  286 + for topic := range info {
  287 + sort.Sort(int32Slice(info[topic]))
  288 + }
  289 + return info
  290 +}
  1 +package cluster
  2 +
  3 +import (
  4 + "fmt"
  5 + "sort"
  6 + "sync"
  7 +)
  8 +
  9 +type none struct{}
  10 +
  11 +type topicPartition struct {
  12 + Topic string
  13 + Partition int32
  14 +}
  15 +
  16 +func (tp *topicPartition) String() string {
  17 + return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
  18 +}
  19 +
  20 +type offsetInfo struct {
  21 + Offset int64
  22 + Metadata string
  23 +}
  24 +
  25 +func (i offsetInfo) NextOffset(fallback int64) int64 {
  26 + if i.Offset > -1 {
  27 + return i.Offset
  28 + }
  29 + return fallback
  30 +}
  31 +
  32 +type int32Slice []int32
  33 +
  34 +func (p int32Slice) Len() int { return len(p) }
  35 +func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
  36 +func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  37 +
  38 +func (p int32Slice) Diff(o int32Slice) (res []int32) {
  39 + on := len(o)
  40 + for _, x := range p {
  41 + n := sort.Search(on, func(i int) bool { return o[i] >= x })
  42 + if n < on && o[n] == x {
  43 + continue
  44 + }
  45 + res = append(res, x)
  46 + }
  47 + return
  48 +}
  49 +
  50 +// --------------------------------------------------------------------
  51 +
  52 +type loopTomb struct {
  53 + c chan none
  54 + o sync.Once
  55 + w sync.WaitGroup
  56 +}
  57 +
  58 +func newLoopTomb() *loopTomb {
  59 + return &loopTomb{c: make(chan none)}
  60 +}
  61 +
  62 +func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) }
  63 +func (t *loopTomb) Close() { t.stop(); t.w.Wait() }
  64 +
  65 +func (t *loopTomb) Dying() <-chan none { return t.c }
  66 +func (t *loopTomb) Go(f func(<-chan none)) {
  67 + t.w.Add(1)
  68 +
  69 + go func() {
  70 + defer t.stop()
  71 + defer t.w.Done()
  72 +
  73 + f(t.c)
  74 + }()
  75 +}
@@ -25,6 +25,9 @@ github.com/astaxie/beego/toolbox @@ -25,6 +25,9 @@ github.com/astaxie/beego/toolbox
25 github.com/astaxie/beego/utils 25 github.com/astaxie/beego/utils
26 # github.com/beorn7/perks v1.0.1 26 # github.com/beorn7/perks v1.0.1
27 github.com/beorn7/perks/quantile 27 github.com/beorn7/perks/quantile
  28 +# github.com/bsm/sarama-cluster v2.1.15+incompatible
  29 +## explicit
  30 +github.com/bsm/sarama-cluster
28 # github.com/cespare/xxhash/v2 v2.1.1 31 # github.com/cespare/xxhash/v2 v2.1.1
29 github.com/cespare/xxhash/v2 32 github.com/cespare/xxhash/v2
30 # github.com/codemodus/kace v0.5.1 33 # github.com/codemodus/kace v0.5.1