作者 yangfu

1. 数据上链修改

@@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/factory" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/factory"
8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/blockchain" 9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/blockchain"
  10 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
10 "time" 11 "time"
11 ) 12 )
12 13
@@ -41,20 +42,28 @@ func (blockChainService *BlockChainService) UpChain(upChainCommand *command.UpCh @@ -41,20 +42,28 @@ func (blockChainService *BlockChainService) UpChain(upChainCommand *command.UpCh
41 42
42 // 1. 查重 43 // 1. 查重
43 upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0) 44 upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0)
44 - if item, err := upChainRepository.FindOne(map[string]interface{}{"source": upChain.Source, "primaryId": upChain.PrimaryId}); err == nil && item != nil {  
45 - return nil, fmt.Errorf("duplicate message %v %v", upChain.Source, upChain.PrimaryId) 45 + // 可溯源数据,可重复上传,可以追溯历史修改记录
  46 + if len(upChain.IssueId) == 0 {
  47 + if item, err := upChainRepository.FindOne(map[string]interface{}{"source": upChain.Source, "primaryId": upChain.PrimaryId}); err == nil && item != nil {
  48 + return nil, fmt.Errorf("duplicate message %v %v", upChain.Source, upChain.PrimaryId)
  49 + }
46 } 50 }
47 51
48 // 2.上链 52 // 2.上链
49 bc := &blockchain.BSNBlockChain{ 53 bc := &blockchain.BSNBlockChain{
50 - PublicPem: []byte(""),  
51 - Host: "",  
52 - PublicKey: "", 54 + PublicPem: []byte(blockchain.PubPem),
  55 + Host: blockchain.Host,
  56 + PublicKey: blockchain.PubKey,
  57 + PrivatePem: blockchain.PriK,
  58 + EnableDebugLog: true,
53 } 59 }
54 options := blockchain.NewUpToChainOptions(upChain.Source, upChain.PrimaryId, upChain.Data).WithInnerPrimaryIssueId(upChain.IssueId) 60 options := blockchain.NewUpToChainOptions(upChain.Source, upChain.PrimaryId, upChain.Data).WithInnerPrimaryIssueId(upChain.IssueId)
55 - upToChainResponse, err := bc.UpToChain(options)  
56 - if err != nil || upToChainResponse == nil { 61 + upToChainResponse, e := bc.UpToChain(options)
  62 + if e != nil || upToChainResponse == nil {
57 upChain.UpFail() 63 upChain.UpFail()
  64 + if e != nil {
  65 + log.Logger.Error("up chain err:" + e.Error())
  66 + }
58 } else { 67 } else {
59 upChain.UpSuccess(string(*upToChainResponse)) 68 upChain.UpSuccess(string(*upToChainResponse))
60 } 69 }
@@ -4,11 +4,11 @@ import "os" @@ -4,11 +4,11 @@ import "os"
4 4
5 var ( 5 var (
6 // kafka 地址 6 // kafka 地址
7 - KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.75.231.90:9092" 7 + KAFKA_HOST = "106.75.231.90:9092" //"192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"
8 // kafka topic log stash 8 // kafka topic log stash
9 TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage" 9 TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
10 // kafka topic up_block_chain 10 // kafka topic up_block_chain
11 - TOPIC_UP_BLOCK_CHAIN = "pushMessage" 11 + TOPIC_UP_BLOCK_CHAIN = "up_block_chain"
12 // 是否启用日志收集 (本地不启用) 12 // 是否启用日志收集 (本地不启用)
13 ENABLE_KAFKA_LOG = false 13 ENABLE_KAFKA_LOG = false
14 ) 14 )
  1 +package blockchain
  2 +
  3 +var PriK = []byte(`-----BEGIN RSA PRIVATE KEY-----
  4 +MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
  5 +MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
  6 +0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
  7 +GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
  8 +31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
  9 +6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
  10 +FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
  11 +OZZReuOyvCM=
  12 +-----END RSA PRIVATE KEY-----`)
  13 +var PubPem = `-----BEGIN PUBLIC KEY-----
  14 +MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
  15 +yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
  16 +-----END PUBLIC KEY-----`
  17 +var PubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLTyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
  18 +
  19 +var Host = "http://101.34.29.149:9092/test"
@@ -35,6 +35,7 @@ func init() { @@ -35,6 +35,7 @@ func init() {
35 (*models.User)(nil), 35 (*models.User)(nil),
36 (*models.UserBase)(nil), 36 (*models.UserBase)(nil),
37 (*models.AccountDestroyRecord)(nil), 37 (*models.AccountDestroyRecord)(nil),
  38 + (*models.UpChain)(nil),
38 } { 39 } {
39 err := DB.Model(model).CreateTable(&orm.CreateTableOptions{ 40 err := DB.Model(model).CreateTable(&orm.CreateTableOptions{
40 Temp: false, 41 Temp: false,
@@ -3,7 +3,7 @@ package models @@ -3,7 +3,7 @@ package models
3 import "time" 3 import "time"
4 4
5 type UpChain struct { 5 type UpChain struct {
6 - tableName string `comment:"上链数据" pg:"up_chain,alias:up_chain"` 6 + tableName string `comment:"上链数据" pg:"business.up_chain"`
7 // 上链数据唯一标识 7 // 上链数据唯一标识
8 UpChainId int64 `comment:"上链数据唯一标识" pg:"pk:up_chain_id"` 8 UpChainId int64 `comment:"上链数据唯一标识" pg:"pk:up_chain_id"`
9 // 数据来源 例如:app.model 9 // 数据来源 例如:app.model
@@ -35,10 +35,10 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh @@ -35,10 +35,10 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh
35 "up_chain_status", 35 "up_chain_status",
36 "created_at", 36 "created_at",
37 } 37 }
38 - insertFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields)  
39 - insertPlaceHoldersSnippet := sqlbuilder.SqlPlaceHoldersSnippet(sqlBuildFields) 38 + insertFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id"))
  39 + insertPlaceHoldersSnippet := sqlbuilder.SqlPlaceHoldersSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id"))
40 returningFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields) 40 returningFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields)
41 - updateFields := sqlbuilder.RemoveSqlFields(sqlBuildFields, "upChain_id") 41 + updateFields := sqlbuilder.RemoveSqlFields(sqlBuildFields, "up_chain_id")
42 updateFieldsSnippet := sqlbuilder.SqlUpdateFieldsSnippet(updateFields) 42 updateFieldsSnippet := sqlbuilder.SqlUpdateFieldsSnippet(updateFields)
43 tx := repository.transactionContext.PgTx 43 tx := repository.transactionContext.PgTx
44 if upChain.Identify() == nil { 44 if upChain.Identify() == nil {
@@ -59,8 +59,8 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh @@ -59,8 +59,8 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh
59 &upChain.UpChainStatus, 59 &upChain.UpChainStatus,
60 &upChain.CreatedAt, 60 &upChain.CreatedAt,
61 ), 61 ),
62 - fmt.Sprintf("INSERT INTO up_chains (%s) VALUES (%s) RETURNING %s", insertFieldsSnippet, insertPlaceHoldersSnippet, returningFieldsSnippet),  
63 - upChain.UpChainId, 62 + fmt.Sprintf("INSERT INTO business.up_chain (%s) VALUES (%s) RETURNING %s", insertFieldsSnippet, insertPlaceHoldersSnippet, returningFieldsSnippet),
  63 + //upChain.UpChainId,
64 upChain.Source, 64 upChain.Source,
65 upChain.PrimaryId, 65 upChain.PrimaryId,
66 upChain.IssueId, 66 upChain.IssueId,
@@ -83,7 +83,7 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh @@ -83,7 +83,7 @@ func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpCh
83 &upChain.UpChainStatus, 83 &upChain.UpChainStatus,
84 &upChain.CreatedAt, 84 &upChain.CreatedAt,
85 ), 85 ),
86 - fmt.Sprintf("UPDATE up_chains SET %s WHERE up_chain_id=? RETURNING %s", updateFieldsSnippet, returningFieldsSnippet), 86 + fmt.Sprintf("UPDATE business.up_chain SET %s WHERE up_chain_id=? RETURNING %s", updateFieldsSnippet, returningFieldsSnippet),
87 upChain.Source, 87 upChain.Source,
88 upChain.PrimaryId, 88 upChain.PrimaryId,
89 upChain.IssueId, 89 upChain.IssueId,
  1 +package goqueue
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/linmadan/egglib-go/utils/json"
  6 + "github.com/tal-tech/go-queue/kq"
  7 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/constant"
  8 + "strings"
  9 + "testing"
  10 + "time"
  11 +)
  12 +
  13 +func Test_UpChain(t *testing.T) {
  14 + pusher := kq.NewPusher(strings.Split(constant.KAFKA_HOST, ","), constant.TOPIC_UP_BLOCK_CHAIN)
  15 + err := pusher.Push(json.MarshalToString(map[string]interface{}{
  16 + "source": "allied-creation.cooperation",
  17 + "primaryId": fmt.Sprintf("%v", time.Now().Unix()),
  18 + "issueId": "key12345",
  19 + "data": "{}",
  20 + }))
  21 + if err != nil {
  22 + t.Fatal(err)
  23 + }
  24 + time.Sleep(time.Second * 5)
  25 +}