作者 yangfu

Merge branch 'feature_bsn' into feature_sign_up

... ... @@ -20,6 +20,8 @@ require (
github.com/sergi/go-diff v1.2.0 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.7.0
github.com/tal-tech/go-queue v1.0.5
github.com/tal-tech/go-zero v1.0.27
github.com/valyala/fasthttp v1.23.0 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
... ...
... ... @@ -21,6 +21,7 @@ github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+Dx
github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
github.com/beego/beego/v2 v2.0.1 h1:07a7Z0Ok5vbqyqh+q53sDPl9LdhKh0ZDy3gbyGrhFnE=
github.com/beego/beego/v2 v2.0.1/go.mod h1:8zyHi1FnWO1mZLwTn62aKRIZF/aIKvkCBB2JYs+eqQI=
github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ=
... ... @@ -179,6 +180,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/iancoleman/strcase v0.1.2/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk=
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
... ... @@ -204,6 +206,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.8 h1:difgzQsp5mdAz9v8lm3P/I+EpDKMU/6uTMw1y1FObuo=
github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
... ... @@ -312,6 +315,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/segmentio/kafka-go v0.4.2 h1:QXZ6q9Bu1JkAJQ/CQBb2Av8pFRG8LQ0kWCrLXgQyL8c=
github.com/segmentio/kafka-go v0.4.2/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo=
... ... @@ -347,6 +352,9 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/tal-tech/go-queue v1.0.5 h1:cd2o0lPjAFJKIXuEbQvsGypUhzz6FLib4FVVAyxsMtY=
github.com/tal-tech/go-queue v1.0.5/go.mod h1:gQK4Eg8pqel8Z9r1hjlSXbJFavLeJQVyTSwBKeAnpm8=
github.com/tal-tech/go-zero v1.0.21/go.mod h1:llP5PQjnATfnzZo/lo5unjR41njzoL3lkGO/KXbnisw=
github.com/tal-tech/go-zero v1.0.27 h1:QMIbaTxibMc/OsO5RTAuKZ8ndbl2dGN6pITQEtp2x/A=
github.com/tal-tech/go-zero v1.0.27/go.mod h1:JtNXlsh/CgeIHyQnt5C5M2IcSevW7V0NAnqO93TQgm8=
github.com/tiptok/egglib-go v0.0.0-20210608073225-c852ce95ae34 h1:9iDNyYbfpv5KLWDLpDywD/aIODg+PNnwn+v9on7KGlE=
... ... @@ -357,6 +365,7 @@ github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYm
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs=
github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
... ... @@ -373,7 +382,9 @@ github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgq
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
... ... @@ -423,6 +434,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
... ...
... ... @@ -11,6 +11,7 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/beego"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/goqueue"
)
func main() {
... ... @@ -28,6 +29,8 @@ func main() {
})
log.Logger.AddHook(bw)
goqueue.SetUp()
log.Logger.Info("server start!")
web.Run()
}
... ...
package command
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type UpChainCommand struct {
// 数据来源 例如:app.model
Source string `cname:"数据来源 例如:app.model" json:"source" valid:"Required"`
// 来源数据唯一ID
PrimaryId string `cname:"来源数据唯一ID" json:"primaryId" valid:"Required"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `cname:"溯源ID 标记同一个系列的数据;例如订单相关事件" json:"issueId"`
// 数据体
Data string `cname:"数据体" json:"data" valid:"Required"`
}
func (upChainCommand *UpChainCommand) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (upChainCommand *UpChainCommand) ValidateCommand() error {
valid := validation.Validation{}
b, err := valid.Valid(upChainCommand)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(upChainCommand).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
package dto
import "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
type BlockChain struct {
UpChainId int64 `json:"upChainId"`
PrimaryId string `json:"primaryId"`
// 数据块hash
BlockHash string `json:"blockHash"`
}
type BlockChains []*BlockChain
func (b *BlockChain) LoadDto(upChain *domain.UpChain) {
b.PrimaryId = upChain.PrimaryId
b.BlockHash = upChain.Hash
b.UpChainId = upChain.UpChainId
}
func NewBlockChains(upChains []*domain.UpChain) BlockChains {
var rsp = make([]*BlockChain, 0)
for i := range upChains {
item := new(BlockChain)
item.LoadDto(upChains[i])
rsp = append(rsp, item)
}
return rsp
}
... ...
package query
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type GetBlockChainTokenQuery struct {
// 操作类型:
//1-交易哈希溯源
//2-溯源ID溯源
//3-验真
Type int `cname:"操作类型" json:"type" valid:"Required"`
// 参数
UpChainId int64 `cname:"上链Id" json:"upChainId,omitempty" valid:"Required"`
}
func (listBlockChain *GetBlockChainTokenQuery) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (listBlockChain *GetBlockChainTokenQuery) ValidateQuery() error {
valid := validation.Validation{}
b, err := valid.Valid(listBlockChain)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(listBlockChain).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
package query
import (
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type ListBlockChainQuery struct {
OperateInfo *domain.OperateInfo `json:"-"`
// 查询偏离量
Offset int `cname:"查询偏离量" json:"offset,omitempty"`
// 查询限制
Limit int `cname:"查询限制" json:"limit,omitempty"`
// 数据来源 例如:app.model
Source string `cname:"数据来源 例如:app.model" json:"source" valid:"Required"`
// 来源数据唯一ID列表
PrimaryIdList []string `cname:"来源数据唯一ID列表" json:"primaryIdList"`
// 过滤重复的primaryId
EnableDistinctPrimaryId bool `cname:"过滤重复的primaryId" json:"enableDistinctPrimaryId"`
// 关闭查询限制
DisableLimit bool `cname:"关闭查询限制" json:"disableLimit,omitempty"`
}
func (listBlockChain *ListBlockChainQuery) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (listBlockChain *ListBlockChainQuery) ValidateQuery() error {
valid := validation.Validation{}
b, err := valid.Valid(listBlockChain)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(listBlockChain).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
package service
import (
"fmt"
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/command"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/dto"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/query"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/blockchain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/utils"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
"strconv"
"time"
)
// 区块链服务
type BlockChainService struct {
}
// 数据上链
func (blockChainService *BlockChainService) UpChain(upChainCommand *command.UpChainCommand) (interface{}, error) {
if err := upChainCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
upChain := &domain.UpChain{
Source: upChainCommand.Source,
PrimaryId: upChainCommand.PrimaryId,
IssueId: upChainCommand.IssueId,
Data: upChainCommand.Data,
UpChainStatus: 2,
CreatedAt: time.Now(),
}
// 1. 查重
upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0)
// 可溯源数据,可重复上传,可以追溯历史修改记录
if len(upChain.IssueId) == 0 {
if item, err := upChainRepository.FindOne(map[string]interface{}{"source": upChain.Source, "primaryId": upChain.PrimaryId}); err == nil && item != nil {
return nil, fmt.Errorf("duplicate message %v %v", upChain.Source, upChain.PrimaryId)
}
}
// 2.上链
bc := &blockchain.BSNBlockChain{
PublicPem: []byte(blockchain.PubPem),
Host: blockchain.Host,
PublicKey: blockchain.PubKey,
PrivatePem: blockchain.PriK,
EnableDebugLog: true,
}
options := blockchain.NewUpToChainOptions(upChain.Source, upChain.PrimaryId, upChain.Data).WithInnerPrimaryIssueId(upChain.IssueId)
upToChainResponse, e := bc.UpToChain(options)
if e != nil || upToChainResponse == nil {
upChain.UpFail()
if e != nil {
log.Logger.Error("up chain err:" + e.Error())
}
} else {
upChain.UpSuccess(string(*upToChainResponse))
}
// 3.保存记录
if upChain, err = upChainRepository.Save(upChain); err != nil {
return nil, err
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return upChain, nil
}
// 区块链列表
func (blockChainService *BlockChainService) ListBlockChain(listBlockChain *query.ListBlockChainQuery) (interface{}, error) {
if err := listBlockChain.ValidateQuery(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0)
queryOptions := utils.ObjectToMap(listBlockChain)
_, upChains, err := upChainRepository.Find(queryOptions)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
response := dto.NewBlockChains(upChains)
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return response, nil
}
func (blockChainService *BlockChainService) GetBlockChainToken(listBlockChain *query.GetBlockChainTokenQuery) (interface{}, error) {
if err := listBlockChain.ValidateQuery(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0)
upChain, err := upChainRepository.FindOne(map[string]interface{}{"upChainId": listBlockChain.UpChainId})
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
var request = blockchain.GetTokenRequest{
Type: listBlockChain.Type,
ShowValue: true,
}
bsn := newBSNBlockChain()
switch listBlockChain.Type {
case blockchain.QueryByHashId:
request.TsTxId = upChain.Hash
case blockchain.QueryByIssueId:
request.TsTxId = upChain.Hash
request.IssueId = upChain.IssueId
default:
return nil, application.ThrowError(application.ARG_ERROR, "unknown type "+strconv.Itoa(listBlockChain.Type))
}
getTokenResponse, err := bsn.GetToken(&request)
if err != nil {
log.Logger.Error(err.Error())
return nil, application.ThrowError(application.BUSINESS_ERROR, "不存在")
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return map[string]interface{}{
"token": getTokenResponse.Token,
"browseUrl": fmt.Sprintf("%v?token=%v", blockchain.BlockBrowserAddress, getTokenResponse.Token),
}, nil
}
func newBSNBlockChain() *blockchain.BSNBlockChain {
// 2.上链
bc := &blockchain.BSNBlockChain{
PublicPem: []byte(blockchain.PubPem),
Host: blockchain.Host,
PublicKey: blockchain.PubKey,
PrivatePem: blockchain.PriK,
EnableDebugLog: true,
}
return bc
}
func NewBlockChainService(options map[string]interface{}) *BlockChainService {
newBlockChainService := &BlockChainService{}
return newBlockChainService
}
... ...
... ... @@ -169,6 +169,32 @@ func FastPgMenu(transactionContext application.TransactionContext, menuId int64)
return rep, mod, err
}
// FastPgUpChain 快速返回区块链
//
// transactionContext 事务
// upChain upChainId
func FastPgUpChain(transactionContext application.TransactionContext, upChainId int64) (domain.UpChainRepository, *domain.UpChain, error) {
var rep domain.UpChainRepository
var mod *domain.UpChain
var err error
if value, err := CreateUpChainRepository(map[string]interface{}{
"transactionContext": transactionContext,
}); err != nil {
return nil, nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
} else {
rep = value
}
if upChainId > 0 {
if mod, err = rep.FindOne(map[string]interface{}{"upChainId": upChainId}); err != nil {
if err == domain.ErrorNotFound {
return nil, nil, application.ThrowError(application.RES_NO_FIND_ERROR, "该组织不存在")
}
return nil, nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
}
return rep, mod, err
}
// FastPgCustomizeMenu 快速返回领域自定义菜单
//
// transactionContext 事务
... ...
... ... @@ -69,3 +69,11 @@ func CreateAccountDestroyRecordRepository(options map[string]interface{}) (domai
}
return repository.NewAccountDestroyRecordRepository(transactionContext)
}
func CreateUpChainRepository(options map[string]interface{}) (domain.UpChainRepository, error) {
var transactionContext *pg.TransactionContext
if value, ok := options["transactionContext"]; ok {
transactionContext = value.(*pg.TransactionContext)
}
return repository.NewUpChainRepository(transactionContext)
}
... ...
... ... @@ -4,9 +4,11 @@ import "os"
var (
// kafka 地址
KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"
KAFKA_HOST = "106.75.231.90:9092" //"192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"
// kafka topic log stash
TOPIC_LOG_STASH = "go_stash_dev"
TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
// kafka topic up_block_chain
TOPIC_UP_BLOCK_CHAIN = "up_block_chain"
// 是否启用日志收集 (本地不启用)
ENABLE_KAFKA_LOG = false
)
... ...
package domain
import "time"
// 上链数据
type UpChain struct {
// 上链数据唯一标识
UpChainId int64 `json:"upChainId,string"`
// 数据来源 例如:app.model
Source string `json:"source"`
// 来源数据唯一ID
PrimaryId string `json:"primaryId"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `json:"issueId"`
// 数据体
Data string `json:"data"`
// 数据块hash
Hash string `json:"hash"`
// 上链状态 1:成功 2:失败
UpChainStatus int `json:"upChainStatus"`
// 创建时间
CreatedAt time.Time `json:"createdAt"`
}
type UpChainRepository interface {
Save(upChain *UpChain) (*UpChain, error)
Remove(upChain *UpChain) (*UpChain, error)
FindOne(queryOptions map[string]interface{}) (*UpChain, error)
Find(queryOptions map[string]interface{}) (int64, []*UpChain, error)
}
func (upChain *UpChain) Identify() interface{} {
if upChain.UpChainId == 0 {
return nil
}
return upChain.UpChainId
}
func (upChain *UpChain) Update(data map[string]interface{}) error {
//if source, ok := data["source"]; ok {
// upChain.Source = source.(string)
//}
//if primaryId, ok := data["primaryId"]; ok {
// upChain.PrimaryId = primaryId.(string)
//}
//if issueId, ok := data["issueId"]; ok {
// upChain.IssueId = issueId.(string)
//}
//if data, ok := data["data"]; ok {
// upChain.Data = data.(string)
//}
if hash, ok := data["hash"]; ok {
upChain.Hash = hash.(string)
}
if upChainStatus, ok := data["upChainStatus"]; ok {
upChain.UpChainStatus = upChainStatus.(int)
}
if createdAt, ok := data["createdAt"]; ok {
upChain.CreatedAt = createdAt.(time.Time)
}
return nil
}
func (upChain *UpChain) UpSuccess(hashData string) {
if len(upChain.Hash) > 0 {
return
}
upChain.Hash = hashData
upChain.UpChainStatus = 1
}
func (upChain *UpChain) UpFail() {
if len(upChain.Hash) > 0 {
return
}
upChain.Hash = ""
upChain.UpChainStatus = 2
}
... ...
package blockchain
import (
"bytes"
"encoding/base64"
rawjson "encoding/json"
"fmt"
"github.com/beego/beego/v2/client/httplib"
"github.com/linmadan/egglib-go/utils/json"
"net/http"
"net/http/httputil"
"sort"
"time"
)
type (
BSNBlockChain struct {
PublicPem []byte
PrivatePem []byte
PublicKey string
Host string
EnableDebugLog bool
}
UpToChainRequest struct {
// 上链数据的数据库、数据表等的标识值 (非必填)
InnerDBTable string `json:"innerDBTable,omitempty"`
// 上链数据的唯一标识主键值 (非必填)
InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
// 上链记录的一个标记值(IssueId), 数据溯源出所有相关事件内容,例如快递单号,过滤出该快递的所有相关事件内容并用于展示 (非必填)
InnerPrimaryIssueId string `json:"innerPrimaryIssueId,omitempty"`
// 作用与key1相同 (非必填)
InnerSecondIssueId string `json:"innerSecondIssueId,omitempty"`
// 数据原文 (必填)
Value string `json:"value,omitempty"`
// 数据描述: 对value的描述,无论needHash为何值,本字段均会原文存储到链上
Desc string `json:"desc,omitempty"`
// 是否哈希: true: 需要哈希,会将value进行hash上链,false:不需要哈希,明文上链,链上所有用户都可看到明文,默认false
NeedHash bool `json:"needHash"`
}
UpToChainResponse string
GetTokenRequest struct {
// 操作类型:
//1-交易哈希溯源
//2-溯源ID溯源
//3-验真
Type int `json:"type"`
// type为1或者3时必填
TsTxId string `json:"tsTxId,omitempty"`
// type为2时必填
IssueId string `json:"issueId,omitempty"`
// type为3时必填
Value string `json:"value,omitempty"`
// 当type=1或者2必填,为false只显示密文,为true溯源才会显示原文
ShowValue bool `json:"showValue"`
}
GetTokenResponse struct {
Token string `json:"token"`
}
Response struct {
Data rawjson.RawMessage `json:"data"`
Code int `json:"code"`
Message string `json:"message"`
}
)
// 上链
func (c *BSNBlockChain) UpToChain(options *UpToChainOptions) (*UpToChainResponse, error) {
req, err := c.MakeRequest(options, "/chainApi/upToChain", "upToChain", http.MethodPost)
if err != nil {
return nil, err
}
var upToChainResponse UpToChainResponse
_, err = c.HandlerResponse(req, &upToChainResponse)
return &upToChainResponse, err
}
// 浏览器溯源验真申请
func (c *BSNBlockChain) GetToken(options *GetTokenRequest) (*GetTokenResponse, error) {
req, err := c.MakeRequest(options, "/chainApi/getToken", "getToken", http.MethodPost)
if err != nil {
return nil, err
}
var getTokenResponse = GetTokenResponse{}
_, err = c.HandlerResponse(req, &getTokenResponse)
return &getTokenResponse, err
}
// 签名
func (c *BSNBlockChain) Signature(body map[string]interface{}, method string) (string, error) {
var keys []string
for key, _ := range body {
keys = append(keys, key)
}
sort.Strings(keys)
encryptString := bytes.NewBuffer(nil)
for i := range keys {
key := keys[i]
if v, ok := body[key]; ok {
encryptString.WriteString(fmt.Sprintf("%s=%v&", key, v))
}
}
encryptString.WriteString(fmt.Sprintf("method=%v", method))
// 此处用私钥签名
encryptData, err := RsaSign(c.PrivatePem, encryptString.Bytes())
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(encryptData), nil
}
func (c *BSNBlockChain) MakeRequest(obj interface{}, action string, signAction, httpMethod string) (*httplib.BeegoHTTPRequest, error) {
var mapBlockInfo = make(map[string]interface{})
json.UnmarshalFromString(json.MarshalToString(obj), &mapBlockInfo)
secret, err := c.Signature(mapBlockInfo, signAction)
if err != nil {
return nil, err
}
req := httplib.NewBeegoRequest(c.Host+action, httpMethod)
req.Header("pubKey", c.PublicKey) //url.QueryEscape(string(c.PublicKey))
req.Header("signature", secret) //url.QueryEscape(secret)
req.SetTimeout(time.Second*5, time.Second*5)
if httpMethod == http.MethodPost || httpMethod == http.MethodPut {
req.JSONBody(obj)
}
if c.EnableDebugLog {
data, _ := httputil.DumpRequest(req.GetRequest(), true)
fmt.Println(string(data))
}
return req, nil
}
func (c *BSNBlockChain) HandlerResponse(req *httplib.BeegoHTTPRequest, value interface{}) (*Response, error) {
response := &Response{}
data, err := req.Bytes()
if err != nil {
return nil, err
}
rsp, err := req.Response()
if err != nil {
return nil, err
}
if rsp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("response code:%v status:%v", rsp.StatusCode, rsp.Status)
}
err = json.Unmarshal(data, response)
if err != nil {
return nil, err
}
if c.EnableDebugLog {
fmt.Println("\nHttp Response-> \n", string(data))
}
if response.Code != 0 {
return nil, fmt.Errorf("upchain code:%v msg:%v", response.Code, response.Message)
}
json.Unmarshal(response.Data, value)
return response, nil
}
func (b *UpToChainRequest) Complete(options *UpToChainOptions) {
b.InnerDBTable = options.InnerDBTable
b.InnerPrimaryKey = options.InnerPrimaryKey
b.InnerPrimaryIssueId = options.InnerPrimaryIssueId
b.InnerSecondIssueId = options.InnerSecondIssueId
b.Value = options.Value
b.Desc = options.Desc
b.NeedHash = options.NeedHash
}
... ...
package blockchain
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"github.com/linmadan/egglib-go/utils/json"
"github.com/stretchr/testify/assert"
"log"
"os"
"testing"
)
//var priK = []byte(`-----BEGIN RSA PRIVATE KEY-----
//MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
//MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
//0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
//GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
//31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
//6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
//FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
//OZZReuOyvCM=
//-----END RSA PRIVATE KEY-----`)
//var pubPem = `-----BEGIN PUBLIC KEY-----
//MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
//yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
//-----END PUBLIC KEY-----`
//var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT\nyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
//
//var host = "http://allied-creation-gateway-dev.fjmaimaimai.com"
var priK = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
OZZReuOyvCM=
-----END RSA PRIVATE KEY-----`)
var pubPem = `-----BEGIN PUBLIC KEY-----
MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
-----END PUBLIC KEY-----`
var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLTyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
var host = "http://101.34.29.149:9092/test"
func TestSignature(t *testing.T) {
options := NewUpToChainOptions("table", "1", "149848948").WithDesc("desc")
bsn := &BSNBlockChain{
PrivatePem: priK,
PublicPem: []byte(pubPem),
}
bInfo := &UpToChainRequest{}
bInfo.Complete(options)
var mapBlockInfo = make(map[string]interface{})
json.UnmarshalFromString(json.MarshalToString(bInfo), &mapBlockInfo)
secret, err := bsn.Signature(mapBlockInfo, "upToChain")
assert.Nil(t, err)
t.Log(secret)
decryptSecret, err := RsaDecrypt(priK, []byte(secret))
if err != nil {
t.Log(err.Error())
}
t.Log(decryptSecret)
}
func TestGenerateRSA(t *testing.T) {
// generate key
privatekey, err := rsa.GenerateKey(rand.Reader, 512)
if err != nil {
fmt.Printf("Cannot generate RSA key\n")
os.Exit(1)
}
publickey := &privatekey.PublicKey
// dump private key to file
var privateKeyBytes []byte = x509.MarshalPKCS1PrivateKey(privatekey)
privateKeyBlock := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: privateKeyBytes,
}
privatePem := bytes.NewBuffer(nil)
if err != nil {
fmt.Printf("error when create private.pem: %s \n", err)
os.Exit(1)
}
err = pem.Encode(privatePem, privateKeyBlock)
if err != nil {
fmt.Printf("error when encode private pem: %s \n", err)
os.Exit(1)
}
// dump public key to file
publicKeyBytes, err := x509.MarshalPKIXPublicKey(publickey)
if err != nil {
fmt.Printf("error when dumping publickey: %s \n", err)
os.Exit(1)
}
publicKeyBlock := &pem.Block{
Type: "PUBLIC KEY",
Bytes: publicKeyBytes,
}
publicPem := bytes.NewBuffer(nil)
if err != nil {
fmt.Printf("error when create public.pem: %s \n", err)
os.Exit(1)
}
err = pem.Encode(publicPem, publicKeyBlock)
if err != nil {
fmt.Printf("error when encode public pem: %s \n", err)
os.Exit(1)
}
log.Println(privatePem.String())
log.Println(publicPem.String())
}
func TestBSNBlockChain_UpToChain(t *testing.T) {
bc := &BSNBlockChain{
PublicPem: []byte(pubPem),
Host: host,
PublicKey: pubKey,
PrivatePem: priK,
}
options := NewUpToChainOptions("table", "2", "149848948").WithDesc("desc")
rsp, err := bc.UpToChain(options)
if err != nil {
t.Fatal(err)
}
t.Log(string(*rsp))
}
func TestBSNBlockChain_GetToken(t *testing.T) {
bc := &BSNBlockChain{
PublicPem: []byte(pubPem),
Host: host,
PublicKey: pubKey,
PrivatePem: priK,
EnableDebugLog: true,
}
options := &GetTokenRequest{
Type: 1,
TsTxId: "54df75d3bead65d144a1123d1f18af8bb4db65420c5c449631e9a93b81fcdb93",
ShowValue: true,
}
token, err := bc.GetToken(options)
if err != nil {
t.Fatal(err)
}
fmt.Println(token.Token)
}
func TestBSNBlockChain_UpToChain_All_Type(t *testing.T) {
bc := &BSNBlockChain{
PublicPem: []byte(pubPem),
Host: host,
PublicKey: pubKey,
PrivatePem: priK,
EnableDebugLog: true,
}
inputs := []struct {
name string
option *UpToChainOptions
t int
}{
{
"1.交易哈希溯源",
NewUpToChainOptions("app.order", "793745u988434", `{"orderId":"793745u988434"}`).WithDesc(""),
1,
},
{
"2.交易哈希溯源",
NewUpToChainOptions("app.order", "793745u988435", `{"orderId":"793745u988435"}`).WithDesc("").WithInnerPrimaryIssueId("893745u988435"),
2,
},
{
"3.验真",
NewUpToChainOptions("app.order", "793745u988436", `{"orderId":"793745u988436"}`).WithDesc("").WithInnerPrimaryIssueId("893745u988436"),
3,
},
}
for i := range inputs {
input := inputs[i]
fmt.Println(input.name)
rsp, err := bc.UpToChain(input.option)
if err != nil {
t.Fatal(err)
}
fmt.Println()
options := &GetTokenRequest{
Type: input.t,
ShowValue: true,
}
switch input.t {
case 1:
options.TsTxId = string(*rsp)
case 2:
options.IssueId = input.option.InnerPrimaryIssueId
case 3:
options.TsTxId = string(*rsp)
options.Value = input.option.Value
}
token, err := bc.GetToken(options)
if err != nil {
t.Fatal(err)
}
fmt.Println(fmt.Sprintf("Token:%v \n", token.Token))
}
}
... ...
package blockchain
var PriK = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
OZZReuOyvCM=
-----END RSA PRIVATE KEY-----`)
var PubPem = `-----BEGIN PUBLIC KEY-----
MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
-----END PUBLIC KEY-----`
var PubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLTyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
var Host = "http://101.34.29.149:9092/test"
var BlockBrowserAddress = "http://101.34.29.149/browser"
... ...
package blockchain
type UpToChainOptions struct {
// 上链数据的数据库、数据表等的标识值 (非必填)
InnerDBTable string `json:"innerDBTable,omitempty"`
// 上链数据的唯一标识主键值 (非必填)
InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
// 上链记录的一个标记值(IssueId), 数据溯源出所有相关事件内容,例如快递单号,过滤出该快递的所有相关事件内容并用于展示 (非必填)
InnerPrimaryIssueId string `json:"innerPrimaryIssueId,omitempty"`
// 作用与key1相同 (非必填)
InnerSecondIssueId string `json:"innerSecondIssueId,omitempty"`
// 数据原文 (必填)
Value string `json:"value"`
// 数据描述: 对value的描述,无论needHash为何值,本字段均会原文存储到链上
Desc string `json:"desc,omitempty"`
// 是否哈希: true: 需要哈希,会将value进行hash上链,false:不需要哈希,明文上链,链上所有用户都可看到明文,默认false
NeedHash bool `json:"needHash"`
}
func NewUpToChainOptions(table, primaryKey, value string) *UpToChainOptions {
return &UpToChainOptions{InnerDBTable: table, InnerPrimaryKey: primaryKey, Value: value, NeedHash: false}
}
func (o *UpToChainOptions) WithInnerDBTable(innerDBTable string) *UpToChainOptions {
o.InnerDBTable = innerDBTable
return o
}
func (o *UpToChainOptions) WithInnerPrimaryKey(innerPrimaryKey string) *UpToChainOptions {
o.InnerPrimaryKey = innerPrimaryKey
return o
}
func (o *UpToChainOptions) WithInnerPrimaryIssueId(innerPrimaryIssueId string) *UpToChainOptions {
o.InnerPrimaryIssueId = innerPrimaryIssueId
return o
}
func (o *UpToChainOptions) WithInnerSecondIssueId(innerSecondIssueId string) *UpToChainOptions {
o.InnerSecondIssueId = innerSecondIssueId
return o
}
func (o *UpToChainOptions) WithValue(Value string) *UpToChainOptions {
o.Value = Value
return o
}
func (o *UpToChainOptions) WithDesc(Desc string) *UpToChainOptions {
o.Desc = Desc
return o
}
func (o *UpToChainOptions) WithNeedHash() *UpToChainOptions {
o.NeedHash = true
return o
}
const (
QueryByHashId = iota + 1
QueryByIssueId
)
... ...
package blockchain
import (
"crypto"
"crypto/md5"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
)
// rsa签名
func RsaSign(publicKey []byte, origData []byte) ([]byte, error) {
block, _ := pem.Decode(publicKey)
if block == nil {
return nil, errors.New("public key error")
}
pubInterface, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
// md5
hash := md5.New()
hash.Write([]byte(origData))
pub := pubInterface.(*rsa.PrivateKey)
return rsa.SignPKCS1v15(rand.Reader, pub, crypto.MD5, hash.Sum(nil))
//pub := pubInterface.(*rsa.PublicKey)
//return rsa.EncryptPKCS1v15(rand.Reader, pub, origData)
}
func RsaEncrypt(publicKey []byte, origData []byte) ([]byte, error) {
block, _ := pem.Decode(publicKey)
if block == nil {
return nil, errors.New("public key error")
}
pubInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
fmt.Println(string(origData))
// md5
hash := md5.New()
hash.Write([]byte(origData))
pub := pubInterface.(*rsa.PublicKey)
fmt.Println(hash.Sum(nil))
return rsa.EncryptPKCS1v15(rand.Reader, pub, hash.Sum(nil))
}
// 解密
func RsaDecrypt(privateKey []byte, ciphertext []byte) ([]byte, error) {
block, _ := pem.Decode(privateKey)
if block == nil {
return nil, errors.New("private key error!")
}
encryptData, _ := base64.StdEncoding.DecodeString(string(ciphertext))
priv, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
// pkcs1 是标准但裸奔,pkcs8升级支持密码
pri2, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
priv = pri2.(*rsa.PrivateKey)
}
return rsa.DecryptPKCS1v15(rand.Reader, priv, encryptData)
}
... ...
... ... @@ -35,6 +35,7 @@ func init() {
(*models.User)(nil),
(*models.UserBase)(nil),
(*models.AccountDestroyRecord)(nil),
(*models.UpChain)(nil),
} {
err := DB.Model(model).CreateTable(&orm.CreateTableOptions{
Temp: false,
... ...
package models
import "time"
type UpChain struct {
tableName string `comment:"上链数据" pg:"business.up_chain"`
// 上链数据唯一标识
UpChainId int64 `comment:"上链数据唯一标识" pg:"pk:up_chain_id"`
// 数据来源 例如:app.model
Source string `comment:"数据来源 例如:app.model"`
// 来源数据唯一ID
PrimaryId string `comment:"来源数据唯一ID"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `comment:"溯源ID 标记同一个系列的数据;例如订单相关事件"`
// 数据体
Data string `comment:"数据体"`
// 数据块hash
Hash string `comment:"数据块hash"`
// 上链状态 1:成功 2:失败
UpChainStatus int `comment:"上链状态 1:成功 2:失败"`
// 创建时间
CreatedAt time.Time `comment:"创建时间"`
}
... ...
package transform
import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/models"
)
func TransformToUpChainDomainModelFromPgModels(upChainModel *models.UpChain) (*domain.UpChain, error) {
return &domain.UpChain{
UpChainId: upChainModel.UpChainId,
Source: upChainModel.Source,
PrimaryId: upChainModel.PrimaryId,
IssueId: upChainModel.IssueId,
Data: upChainModel.Data,
Hash: upChainModel.Hash,
UpChainStatus: upChainModel.UpChainStatus,
CreatedAt: upChainModel.CreatedAt,
}, nil
}
... ...
package repository
import (
"fmt"
"github.com/go-pg/pg/v10"
"github.com/linmadan/egglib-go/persistent/pg/sqlbuilder"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"github.com/linmadan/egglib-go/utils/snowflake"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/models"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/transform"
)
type UpChainRepository struct {
transactionContext *pgTransaction.TransactionContext
}
func (repository *UpChainRepository) nextIdentify() (int64, error) {
IdWorker, err := snowflake.NewIdWorker(1)
if err != nil {
return 0, err
}
id, err := IdWorker.NextId()
return id, err
}
func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpChain, error) {
sqlBuildFields := []string{
"up_chain_id",
"source",
"primary_id",
"issue_id",
"data",
"hash",
"up_chain_status",
"created_at",
}
insertFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id"))
insertPlaceHoldersSnippet := sqlbuilder.SqlPlaceHoldersSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id"))
returningFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields)
updateFields := sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id")
updateFieldsSnippet := sqlbuilder.SqlUpdateFieldsSnippet(updateFields)
tx := repository.transactionContext.PgTx
if upChain.Identify() == nil {
upChainId, err := repository.nextIdentify()
if err != nil {
return upChain, err
} else {
upChain.UpChainId = upChainId
}
if _, err := tx.QueryOne(
pg.Scan(
&upChain.UpChainId,
&upChain.Source,
&upChain.PrimaryId,
&upChain.IssueId,
&upChain.Data,
&upChain.Hash,
&upChain.UpChainStatus,
&upChain.CreatedAt,
),
fmt.Sprintf("INSERT INTO business.up_chain (%s) VALUES (%s) RETURNING %s", insertFieldsSnippet, insertPlaceHoldersSnippet, returningFieldsSnippet),
//upChain.UpChainId,
upChain.Source,
upChain.PrimaryId,
upChain.IssueId,
upChain.Data,
upChain.Hash,
upChain.UpChainStatus,
upChain.CreatedAt,
); err != nil {
return upChain, err
}
} else {
if _, err := tx.QueryOne(
pg.Scan(
&upChain.UpChainId,
&upChain.Source,
&upChain.PrimaryId,
&upChain.IssueId,
&upChain.Data,
&upChain.Hash,
&upChain.UpChainStatus,
&upChain.CreatedAt,
),
fmt.Sprintf("UPDATE business.up_chain SET %s WHERE up_chain_id=? RETURNING %s", updateFieldsSnippet, returningFieldsSnippet),
upChain.Source,
upChain.PrimaryId,
upChain.IssueId,
upChain.Data,
upChain.Hash,
upChain.UpChainStatus,
upChain.CreatedAt,
upChain.Identify(),
); err != nil {
return upChain, err
}
}
return upChain, nil
}
func (repository *UpChainRepository) Remove(upChain *domain.UpChain) (*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
upChainModel := new(models.UpChain)
upChainModel.UpChainId = upChain.Identify().(int64)
if _, err := tx.Model(upChainModel).WherePK().Delete(); err != nil {
return upChain, err
}
return upChain, nil
}
func (repository *UpChainRepository) FindOne(queryOptions map[string]interface{}) (*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
upChainModel := new(models.UpChain)
query := sqlbuilder.BuildQuery(tx.Model(upChainModel), queryOptions)
query.SetWhereByQueryOption("up_chain.up_chain_id = ?", "upChainId")
query.SetWhereByQueryOption("up_chain.source = ?", "source")
query.SetWhereByQueryOption("up_chain.primary_id = ?", "primaryId")
if err := query.First(); err != nil {
if err.Error() == "pg: no rows in result set" {
return nil, fmt.Errorf("没有此资源")
} else {
return nil, err
}
}
if upChainModel.UpChainId == 0 {
return nil, nil
} else {
return transform.TransformToUpChainDomainModelFromPgModels(upChainModel)
}
}
func (repository *UpChainRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
var upChainModels []*models.UpChain
upChains := make([]*domain.UpChain, 0)
query := sqlbuilder.BuildQuery(tx.Model(&upChainModels), queryOptions)
if v, ok := queryOptions["disableLimit"]; !(ok && v.(bool)) {
query.SetOffsetAndLimit(20)
}
if v, ok := queryOptions["source"]; ok && len(v.(string)) > 0 {
query.Where(`source = ?`, v)
}
if v, ok := queryOptions["primaryIdList"]; ok && len(v.([]string)) > 0 {
query.Where(`primary_id in (?)`, pg.In(v.([]string)))
}
if v, ok := queryOptions["enableDistinctPrimaryId"]; ok && v.(bool) {
query.DistinctOn(`primary_id`)
query.SetOrderDirect("primary_id", "DESC")
}
query.SetOrderDirect("up_chain_id", "DESC")
if count, err := query.SelectAndCount(); err != nil {
return 0, upChains, err
} else {
for _, upChainModel := range upChainModels {
if upChain, err := transform.TransformToUpChainDomainModelFromPgModels(upChainModel); err != nil {
return 0, upChains, err
} else {
upChains = append(upChains, upChain)
}
}
return int64(count), upChains, nil
}
}
func NewUpChainRepository(transactionContext *pgTransaction.TransactionContext) (*UpChainRepository, error) {
if transactionContext == nil {
return nil, fmt.Errorf("transactionContext参数不能为nil")
} else {
return &UpChainRepository{
transactionContext: transactionContext,
}, nil
}
}
... ...
package controllers
import (
"github.com/linmadan/egglib-go/web/beego"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/query"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/service"
)
type BlockChainController struct {
beego.BaseController
}
//func (controller *BlockChainController) CreateBlockChain() {
// blockChainService := service.NewBlockChainService(nil)
// createBlockChainCommand := &command.CreateBlockChainCommand{}
// controller.Unmarshal(createBlockChainCommand)
// data, err := blockChainService.CreateBlockChain(createBlockChainCommand)
// controller.Response(data, err)
//}
//
//func (controller *BlockChainController) UpdateBlockChain() {
// blockChainService := service.NewBlockChainService(nil)
// updateBlockChainCommand := &command.UpdateBlockChainCommand{}
// controller.Unmarshal(updateBlockChainCommand)
// blockChainId, _ := controller.GetString(":blockChainId")
// updateBlockChainCommand.BlockChainId = blockChainId
// data, err := blockChainService.UpdateBlockChain(updateBlockChainCommand)
// controller.Response(data, err)
//}
//
//func (controller *BlockChainController) GetBlockChain() {
// blockChainService := service.NewBlockChainService(nil)
// getBlockChainQuery := &query.GetBlockChainQuery{}
// blockChainId, _ := controller.GetString(":blockChainId")
// getBlockChainQuery.BlockChainId = blockChainId
// data, err := blockChainService.GetBlockChain(getBlockChainQuery)
// controller.Response(data, err)
//}
//
//func (controller *BlockChainController) RemoveBlockChain() {
// blockChainService := service.NewBlockChainService(nil)
// removeBlockChainCommand := &command.RemoveBlockChainCommand{}
// controller.Unmarshal(removeBlockChainCommand)
// blockChainId, _ := controller.GetString(":blockChainId")
// removeBlockChainCommand.BlockChainId = blockChainId
// data, err := blockChainService.RemoveBlockChain(removeBlockChainCommand)
// controller.Response(data, err)
//}
func (controller *BlockChainController) ListBlockChain() {
blockChainService := service.NewBlockChainService(nil)
listBlockChainQuery := &query.ListBlockChainQuery{}
Must(controller.Unmarshal(listBlockChainQuery))
listBlockChainQuery.OperateInfo = ParseOperateInfo(controller.BaseController)
data, err := blockChainService.ListBlockChain(listBlockChainQuery)
controller.Response(data, err)
}
func (controller *BlockChainController) GetBlockChainToken() {
blockChainService := service.NewBlockChainService(nil)
listBlockChainQuery := &query.GetBlockChainTokenQuery{}
Must(controller.Unmarshal(listBlockChainQuery))
//listBlockChainQuery.OperateInfo = ParseOperateInfo(controller.BaseController)
data, err := blockChainService.GetBlockChainToken(listBlockChainQuery)
controller.Response(data, err)
}
... ...
package routers
import (
"github.com/beego/beego/v2/server/web"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/beego/controllers"
)
func init() {
//web.Router("/block-chains/", &controllers.BlockChainController{}, "Post:CreateBlockChain")
//web.Router("/block-chains/:blockChainId", &controllers.BlockChainController{}, "Put:UpdateBlockChain")
//web.Router("/block-chains/:blockChainId", &controllers.BlockChainController{}, "Get:GetBlockChain")
//web.Router("/block-chains/:blockChainId", &controllers.BlockChainController{}, "Delete:RemoveBlockChain")
web.Router("/block-chains/", &controllers.BlockChainController{}, "Post:ListBlockChain")
web.Router("/block-chains/token", &controllers.BlockChainController{}, "Post:GetBlockChainToken")
}
... ...
package goqueue
import (
"fmt"
"github.com/tal-tech/go-queue/kq"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/service"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
"strings"
)
func SetUp() {
go func() {
q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(UpToChainHandler))
defer func() {
q.Stop()
log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN))
}()
q.Start()
}()
log.Logger.Info("goqueue start!")
}
func NewConfig(topic, group string, consumers int) kq.KqConf {
brokers := strings.Split(constant.KAFKA_HOST, ",")
return kq.KqConf{
ServiceConf: service.ServiceConf{
Name: topic,
Log: logx.LogConf{
Mode: "console",
},
Mode: "pro",
},
Brokers: brokers,
Group: group,
Topic: topic,
Offset: "first",
Conns: 1,
Consumers: consumers,
Processors: 4,
MinBytes: 10200,
MaxBytes: 10485760,
}
}
... ...
package goqueue
import (
"fmt"
"github.com/linmadan/egglib-go/utils/json"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/command"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/service"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
)
func UpToChainHandler(k, v string) error {
log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})
blockChainService := service.NewBlockChainService(nil)
upChainCommand := &command.UpChainCommand{}
err := json.UnmarshalFromString(v, upChainCommand)
if err != nil {
return err
}
_, err = blockChainService.UpChain(upChainCommand)
return err
}
... ...
package block_chain
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/beego/beego/v2/server/web"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/beego"
)
func TestBlockChain(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Beego Port BlockChain Correlations Test Case Suite")
}
var handler http.Handler
var server *httptest.Server
var _ = BeforeSuite(func() {
handler = web.BeeApp.Handlers
server = httptest.NewServer(handler)
})
var _ = AfterSuite(func() {
server.Close()
})
... ...
package block_chain
import (
"net/http"
"github.com/gavv/httpexpect"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pG "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
)
var _ = Describe("创建", func() {
Describe("提交数据创建", func() {
Context("提交正确的新上链数据数据", func() {
It("返回上链数据数据", func() {
httpExpect := httpexpect.New(GinkgoT(), server.URL)
body := map[string]interface{}{
"upChainId": "int64",
}
httpExpect.POST("/block-chains/").
WithJSON(body).
Expect().
Status(http.StatusOK).
JSON().
Object().
ContainsKey("code").ValueEqual("code", 0).
ContainsKey("msg").ValueEqual("msg", "ok").
ContainsKey("data").Value("data").Object().
ContainsKey("upChainId").ValueNotEqual("upChainId", BeZero())
})
})
})
AfterEach(func() {
_, err := pG.DB.Exec("DELETE FROM up_chains WHERE true")
Expect(err).NotTo(HaveOccurred())
})
})
... ...
package block_chain
import (
"github.com/go-pg/pg/v10"
"net/http"
"github.com/gavv/httpexpect"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pG "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
)
var _ = Describe("返回", func() {
var upChainId int64
BeforeEach(func() {
_, err := pG.DB.QueryOne(
pg.Scan(&upChainId),
"INSERT INTO up_chains (up_chain_id, source, primary_id, issue_id, data, hash, up_chain_status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING up_chain_id",
"testUpChainId", "testSource", "testPrimaryId", "testIssueId", "testData", "testHash", "testUpChainStatus", "testCreatedAt")
Expect(err).NotTo(HaveOccurred())
})
Describe("根据upChainId参数返回上链数据", func() {
Context("传入有效的upChainId", func() {
It("返回上链数据数据", func() {
httpExpect := httpexpect.New(GinkgoT(), server.URL)
httpExpect.GET("/block-chains/{blockChainId}").
Expect().
Status(http.StatusOK).
JSON().
Object().
ContainsKey("code").ValueEqual("code", 0).
ContainsKey("msg").ValueEqual("msg", "ok").
ContainsKey("data").Value("data").Object()
})
})
})
AfterEach(func() {
_, err := pG.DB.Exec("DELETE FROM up_chains WHERE true")
Expect(err).NotTo(HaveOccurred())
})
})
... ...
package block_chain
import (
"github.com/go-pg/pg/v10"
"net/http"
"github.com/gavv/httpexpect"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pG "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
)
var _ = Describe("返回列表", func() {
var upChainId int64
BeforeEach(func() {
_, err := pG.DB.QueryOne(
pg.Scan(&upChainId),
"INSERT INTO up_chains (up_chain_id, source, primary_id, issue_id, data, hash, up_chain_status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING up_chain_id",
"testUpChainId", "testSource", "testPrimaryId", "testIssueId", "testData", "testHash", "testUpChainStatus", "testCreatedAt")
Expect(err).NotTo(HaveOccurred())
})
Describe("根据参数返回上链数据列表", func() {
Context("传入有效的参数", func() {
It("返回上链数据数据列表", func() {
httpExpect := httpexpect.New(GinkgoT(), server.URL)
httpExpect.GET("/block-chains/").
WithQuery("offset", "int").
WithQuery("limit", "int").
Expect().
Status(http.StatusOK).
JSON().
Object().
ContainsKey("code").ValueEqual("code", 0).
ContainsKey("msg").ValueEqual("msg", "ok").
ContainsKey("data").Value("data").Object().
ContainsKey("count").ValueEqual("count", 1).
ContainsKey("upChains").Value("upChains").Array()
})
})
})
AfterEach(func() {
_, err := pG.DB.Exec("DELETE FROM up_chains WHERE true")
Expect(err).NotTo(HaveOccurred())
})
})
... ...
package block_chain
import (
"github.com/go-pg/pg/v10"
"net/http"
"github.com/gavv/httpexpect"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pG "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
)
var _ = Describe("移除", func() {
var upChainId int64
BeforeEach(func() {
_, err := pG.DB.QueryOne(
pg.Scan(&upChainId),
"INSERT INTO up_chains (up_chain_id, source, primary_id, issue_id, data, hash, up_chain_status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING up_chain_id",
"testUpChainId", "testSource", "testPrimaryId", "testIssueId", "testData", "testHash", "testUpChainStatus", "testCreatedAt")
Expect(err).NotTo(HaveOccurred())
})
Describe("根据参数移除", func() {
Context("传入有效的upChainId", func() {
It("返回被移除上链数据的数据", func() {
httpExpect := httpexpect.New(GinkgoT(), server.URL)
httpExpect.DELETE("/block-chains/{blockChainId}").
Expect().
Status(http.StatusOK).
JSON().
Object().
ContainsKey("code").ValueEqual("code", 0).
ContainsKey("msg").ValueEqual("msg", "ok").
ContainsKey("data").Value("data").Object()
})
})
})
AfterEach(func() {
_, err := pG.DB.Exec("DELETE FROM up_chains WHERE true")
Expect(err).NotTo(HaveOccurred())
})
})
... ...
package block_chain
import (
"github.com/go-pg/pg/v10"
"net/http"
"github.com/gavv/httpexpect"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pG "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg"
)
var _ = Describe("更新", func() {
var upChainId int64
BeforeEach(func() {
_, err := pG.DB.QueryOne(
pg.Scan(&upChainId),
"INSERT INTO up_chains (up_chain_id, source, primary_id, issue_id, data, hash, up_chain_status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING up_chain_id",
"testUpChainId", "testSource", "testPrimaryId", "testIssueId", "testData", "testHash", "testUpChainStatus", "testCreatedAt")
Expect(err).NotTo(HaveOccurred())
})
Describe("提交数据更新", func() {
Context("提交正确的上链数据数据", func() {
It("返回更新后的上链数据数据", func() {
httpExpect := httpexpect.New(GinkgoT(), server.URL)
body := map[string]interface{}{
"upChainId": "int64",
}
httpExpect.PUT("/block-chains/{blockChainId}").
WithJSON(body).
Expect().
Status(http.StatusOK).
JSON().
Object().
ContainsKey("code").ValueEqual("code", 0).
ContainsKey("msg").ValueEqual("msg", "ok").
ContainsKey("data").Value("data").Object().
ContainsKey("upChainId").ValueEqual("upChainId", upChainId)
})
})
})
AfterEach(func() {
_, err := pG.DB.Exec("DELETE FROM up_chains WHERE true")
Expect(err).NotTo(HaveOccurred())
})
})
... ...
package goqueue
import (
"fmt"
"github.com/linmadan/egglib-go/utils/json"
"github.com/tal-tech/go-queue/kq"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/constant"
"strings"
"testing"
"time"
)
func Test_UpChain(t *testing.T) {
pusher := kq.NewPusher(strings.Split(constant.KAFKA_HOST, ","), constant.TOPIC_UP_BLOCK_CHAIN)
err := pusher.Push(json.MarshalToString(map[string]interface{}{
"source": "allied-creation.cooperation",
"primaryId": fmt.Sprintf("%v", time.Now().Unix()),
"issueId": "key12345",
"data": "{}",
}))
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second * 5)
}
... ...