名称 最后更新
..
.gitignore 正在载入提交数据...
.travis.yml 正在载入提交数据...
Gopkg.lock 正在载入提交数据...
Gopkg.toml 正在载入提交数据...
LICENSE 正在载入提交数据...
Makefile 正在载入提交数据...
README.md 正在载入提交数据...
README.md.tpl 正在载入提交数据...
balancer.go 正在载入提交数据...
client.go 正在载入提交数据...
cluster.go 正在载入提交数据...
config.go 正在载入提交数据...
consumer.go 正在载入提交数据...
doc.go 正在载入提交数据...
offsets.go 正在载入提交数据...
partitions.go 正在载入提交数据...
util.go 正在载入提交数据...

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"

    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // init (custom) config, enable errors and notifications
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true

    // init consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"my_topic", "other_topic"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        panic(err)
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("Error: %s\n", err.Error())
        }
    }()

    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("Rebalanced: %+v\n", ntf)
        }
    }()

    // consume messages, watch signals
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "")    // mark message as processed
            }
        case <-signals:
            return
        }
    }
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // init (custom) config, set mode to ConsumerModePartitions
    config := cluster.NewConfig()
    config.Group.Mode = cluster.ConsumerModePartitions

    // init consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"my_topic", "other_topic"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        panic(err)
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume partitions
    for {
        select {
        case part, ok := <-consumer.Partitions():
            if !ok {
                return
            }

            // start a separate goroutine to consume messages
            go func(pc cluster.PartitionConsumer) {
                for msg := range pc.Messages() {
                    fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                    consumer.MarkOffset(msg, "")    // mark message as processed
                }
            }(part)
        case <-signals:
            return
        }
    }
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.