正在显示
1 个修改的文件
包含
2 行增加
和
3 行删除
| 1 | package consumer | 1 | package consumer |
| 2 | 2 | ||
| 3 | import ( | 3 | import ( |
| 4 | - "fmt" | ||
| 5 | - | ||
| 6 | "github.com/Shopify/sarama" | 4 | "github.com/Shopify/sarama" |
| 5 | + "github.com/astaxie/beego/logs" | ||
| 7 | ) | 6 | ) |
| 8 | 7 | ||
| 9 | //TopicHandle 处理kafka中得消息 | 8 | //TopicHandle 处理kafka中得消息 |
| @@ -12,7 +11,7 @@ type TopicHandle func(*sarama.ConsumerMessage) error | @@ -12,7 +11,7 @@ type TopicHandle func(*sarama.ConsumerMessage) error | ||
| 12 | //TopicHandleRouters 根据topic区分消息并进行处理 | 11 | //TopicHandleRouters 根据topic区分消息并进行处理 |
| 13 | var TopicHandleRouters = map[string]TopicHandle{ | 12 | var TopicHandleRouters = map[string]TopicHandle{ |
| 14 | "topic_test": func(message *sarama.ConsumerMessage) error { | 13 | "topic_test": func(message *sarama.ConsumerMessage) error { |
| 15 | - fmt.Printf("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", | 14 | + logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", |
| 16 | message.Timestamp, message.Topic, message.Offset, string(message.Value)) | 15 | message.Timestamp, message.Topic, message.Offset, string(message.Value)) |
| 17 | return nil | 16 | return nil |
| 18 | }, | 17 | }, |
-
请 注册 或 登录 后发表评论