sync_producer.go
4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package sarama
import "sync"
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
//
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
//
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
// be set to true in its configuration.
type SyncProducer interface {
// SendMessage produces a given message, and returns only when it either has
// succeeded or failed to produce. It will return the partition and the offset
// of the produced message, or an error if the message failed to produce.
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
SendMessages(msgs []*ProducerMessage) error
// Close shuts down the producer and waits for any buffered messages to be
// flushed. You must call this function before a producer object passes out of
// scope, as it may otherwise leak memory. You must call this before calling
// Close on the underlying client.
Close() error
}
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if config == nil {
config = NewConfig()
config.Producer.Return.Successes = true
}
if err := verifyProducerConfig(config); err != nil {
return nil, err
}
p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
if err := verifyProducerConfig(client.Config()); err != nil {
return nil, err
}
p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
sp := &syncProducer{producer: p}
sp.wg.Add(2)
go withRecover(sp.handleSuccesses)
go withRecover(sp.handleErrors)
return sp
}
func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
if err := <-expectation; err != nil {
return -1, -1, err.Err
}
return msg.Partition, msg.Offset, nil
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
expectations := make(chan chan *ProducerError, len(msgs))
go func() {
for _, msg := range msgs {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
expectations <- expectation
}
close(expectations)
}()
var errors ProducerErrors
for expectation := range expectations {
if err := <-expectation; err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return errors
}
return nil
}
func (sp *syncProducer) handleSuccesses() {
defer sp.wg.Done()
for msg := range sp.producer.Successes() {
expectation := msg.expectation
expectation <- nil
}
}
func (sp *syncProducer) handleErrors() {
defer sp.wg.Done()
for err := range sp.producer.Errors() {
expectation := err.Msg.expectation
expectation <- err
}
}
func (sp *syncProducer) Close() error {
sp.producer.AsyncClose()
sp.wg.Wait()
return nil
}