切换导航条
此项目
正在载入...
登录
mmm-go
/
partnermg
·
提交
转到一个项目
GitLab
转到群组
项目
活动
文件
提交
管道
0
构建
0
图表
里程碑
问题
0
合并请求
0
成员
标记
维基
派生
网络
创建新的问题
下载为
邮件补丁
差异文件
浏览文件
作者
唐旭辉
4 years ago
提交
eab3e6c41f8800ba8547833fe5dcfc53c134a313
1 个父辈
894cc6d2
调试
隐藏空白字符变更
内嵌
并排对比
正在显示
4 个修改的文件
包含
60 行增加
和
63 行删除
main.go
pkg/application/orderinfo/service/order_info.go
pkg/port/consumer/consumer.go
pkg/port/consumer/produce/produce.go
main.go
查看文件 @
eab3e6c
...
...
@@ -6,7 +6,6 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"github.com/astaxie/beego"
"github.com/astaxie/beego/logs"
...
...
@@ -14,7 +13,6 @@ import (
_
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/log"
_
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/produce"
)
func
main
()
{
...
...
@@ -39,11 +37,6 @@ func main() {
<-
consumerRun
.
IsReady
()
logs
.
Info
(
"Sarama consumer up and running!..."
)
}()
go
func
()
{
t
:=
time
.
NewTimer
(
10
*
time
.
Second
)
<-
t
.
C
produce
.
Producer
()
}()
for
{
select
{
case
<-
sigs
:
...
...
pkg/application/orderinfo/service/order_info.go
查看文件 @
eab3e6c
...
...
@@ -922,6 +922,7 @@ func (service OrderInfoService) buildOrderBestshopInfoData(orderBase *domain.Ord
"receivedDividends"
:
orderBase
.
OrderCompute
.
PartnerBonusHas
,
"notReceivedDividend"
:
orderBase
.
OrderCompute
.
PartnerBonusNot
,
"dividendSpending"
:
orderBase
.
OrderCompute
.
PartnerBonusExpense
,
"orderNumber"
:
orderBase
.
OrderCode
,
}
//订单中的商品
product
:=
map
[
string
]
interface
{}{
...
...
pkg/port/consumer/consumer.go
查看文件 @
eab3e6c
...
...
@@ -97,7 +97,10 @@ func (r *Runer) InitConsumer() error {
config
:=
sarama
.
NewConfig
()
//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config
.
Consumer
.
Offsets
.
Initial
=
sarama
.
OffsetOldest
config
.
Version
=
sarama
.
V0_10_2_0
config
.
Version
=
sarama
.
V0_10_2_1
// config.Version = sarama.KafkaVersion{
// version: [4]int{},
// }
consumerGroup
,
err
:=
sarama
.
NewConsumerGroup
(
r
.
msgConsumer
.
kafkaHosts
,
r
.
msgConsumer
.
groupId
,
config
)
if
err
!=
nil
{
return
err
...
...
pkg/port/consumer/produce/produce.go
查看文件 @
eab3e6c
package
produce
import
(
"fmt"
"strconv"
"time"
// import (
// "fmt"
// "strconv"
// "time"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
//
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
// "github.com/Shopify/sarama"
// "github.com/astaxie/beego/logs"
// )
var
(
producer
sarama
.
SyncProducer
)
// var (
// producer sarama.SyncProducer
// )
func
init
()
{
//
func init() {
logs
.
Info
(
"init kafka producer, it may take a few seconds to init the connection
\n
"
)
var
err
error
mqConfig
:=
sarama
.
NewConfig
()
mqConfig
.
Producer
.
Return
.
Successes
=
true
mqConfig
.
Version
=
sarama
.
V0_10_2_0
if
err
=
mqConfig
.
Validate
();
err
!=
nil
{
msg
:=
fmt
.
Sprintf
(
"Kafka producer config invalidate. config: %v. err: %v"
,
configs
.
Cfg
,
err
)
logs
.
Info
(
msg
)
panic
(
msg
)
}
// logs.Info("init kafka producer, it may take a few seconds to init the connection\n")
// var err error
// mqConfig := sarama.NewConfig()
// mqConfig.Producer.Return.Successes = true
// mqConfig.Version = sarama.V0_10_2_1
// if err = mqConfig.Validate(); err != nil {
// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
// logs.Info(msg)
// panic(msg)
// }
producer
,
err
=
sarama
.
NewSyncProducer
(
configs
.
Cfg
.
Servers
,
mqConfig
)
if
err
!=
nil
{
msg
:=
fmt
.
Sprintf
(
"Kafak producer create fail. err: %v"
,
err
)
logs
.
Info
(
msg
)
panic
(
msg
)
}
// producer, err = sarama.NewSyncProducer(configs.Cfg.Servers, mqConfig)
// if err != nil {
// msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
// logs.Info(msg)
// panic(msg)
// }
}
//
}
func
produce
(
topic
string
,
key
string
,
content
string
)
error
{
msg
:=
&
sarama
.
ProducerMessage
{
Topic
:
topic
,
Key
:
sarama
.
StringEncoder
(
key
),
Value
:
sarama
.
StringEncoder
(
content
),
Timestamp
:
time
.
Now
(),
}
// func produce(topic string, key string, content string) error {
// msg := &sarama.ProducerMessage{
// Topic: topic,
// Key: sarama.StringEncoder(key),
// Value: sarama.StringEncoder(content),
// Timestamp: time.Now(),
// }
_
,
_
,
err
:=
producer
.
SendMessage
(
msg
)
if
err
!=
nil
{
msg
:=
fmt
.
Sprintf
(
"Send Error topic: %v. key: %v. content: %v"
,
topic
,
key
,
content
)
logs
.
Info
(
msg
)
return
err
}
logs
.
Info
(
"Send OK topic:%s key:%s value:%s
\n
"
,
topic
,
key
,
content
)
return
nil
}
// _, _, err := producer.SendMessage(msg)
// if err != nil {
// msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
// logs.Info(msg)
// return err
// }
// logs.Info("Send OK topic:%s key:%s value:%s\n", topic, key, content)
// return nil
// }
func
Producer
()
error
{
key
:=
strconv
.
FormatInt
(
time
.
Now
()
.
UTC
()
.
UnixNano
(),
10
)
value
:=
"this is a new kafka message!"
err
:=
produce
(
"topic_test"
,
key
,
value
)
if
err
!=
nil
{
logs
.
Info
(
"producer err:%s
\n
"
,
err
)
return
err
}
return
nil
}
// func Producer() error {
// key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
// value := "this is a new kafka message!"
// err := produce("topic_test", key, value)
// if err != nil {
// logs.Info("producer err:%s \n", err)
// return err
// }
// return nil
// }
...
...
请
注册
或
登录
后发表评论