作者 yangfu

1. 对接数据 2.本地存储链数据

package command
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type UpChainCommand struct {
// 数据来源 例如:app.model
Source string `cname:"数据来源 例如:app.model" json:"source" valid:"Required"`
// 来源数据唯一ID
PrimaryId string `cname:"来源数据唯一ID" json:"primaryId" valid:"Required"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `cname:"溯源ID 标记同一个系列的数据;例如订单相关事件" json:"issueId"`
// 数据体
Data string `cname:"数据体" json:"data" valid:"Required"`
}
func (upChainCommand *UpChainCommand) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (upChainCommand *UpChainCommand) ValidateCommand() error {
valid := validation.Validation{}
b, err := valid.Valid(upChainCommand)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(upChainCommand).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
package service
import (
"fmt"
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/command"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/blockchain"
"time"
)
// 区块链服务
type BlockChainService struct {
}
// 数据上链
func (blockChainService *BlockChainService) UpChain(upChainCommand *command.UpChainCommand) (interface{}, error) {
if err := upChainCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
upChain := &domain.UpChain{
Source: upChainCommand.Source,
PrimaryId: upChainCommand.PrimaryId,
IssueId: upChainCommand.IssueId,
Data: upChainCommand.Data,
UpChainStatus: 2,
CreatedAt: time.Now(),
}
// 1. 查重
upChainRepository, _, _ := factory.FastPgUpChain(transactionContext, 0)
if item, err := upChainRepository.FindOne(map[string]interface{}{"source": upChain.Source, "primaryId": upChain.PrimaryId}); err == nil && item != nil {
return nil, fmt.Errorf("duplicate message %v %v", upChain.Source, upChain.PrimaryId)
}
// 2.上链
bc := &blockchain.BSNBlockChain{
PublicPem: []byte(""),
Host: "",
PublicKey: "",
}
options := blockchain.NewUpToChainOptions(upChain.Source, upChain.PrimaryId, upChain.Data).WithInnerPrimaryIssueId(upChain.IssueId)
upToChainResponse, err := bc.UpToChain(options)
if err != nil || upToChainResponse == nil {
upChain.UpFail()
} else {
upChain.UpSuccess(string(*upToChainResponse))
}
// 3.保存记录
if upChain, err = upChainRepository.Save(upChain); err != nil {
return nil, err
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return upChain, nil
}
func NewBlockChainService(options map[string]interface{}) *BlockChainService {
newBlockChainService := &BlockChainService{}
return newBlockChainService
}
... ...
... ... @@ -169,6 +169,32 @@ func FastPgMenu(transactionContext application.TransactionContext, menuId int64)
return rep, mod, err
}
// FastPgUpChain 快速返回区块链
//
// transactionContext 事务
// upChain upChainId
func FastPgUpChain(transactionContext application.TransactionContext, upChainId int64) (domain.UpChainRepository, *domain.UpChain, error) {
var rep domain.UpChainRepository
var mod *domain.UpChain
var err error
if value, err := CreateUpChainRepository(map[string]interface{}{
"transactionContext": transactionContext,
}); err != nil {
return nil, nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
} else {
rep = value
}
if upChainId > 0 {
if mod, err = rep.FindOne(map[string]interface{}{"upChainId": upChainId}); err != nil {
if err == domain.ErrorNotFound {
return nil, nil, application.ThrowError(application.RES_NO_FIND_ERROR, "该组织不存在")
}
return nil, nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
}
return rep, mod, err
}
// FastPgCustomizeMenu 快速返回领域自定义菜单
//
// transactionContext 事务
... ...
... ... @@ -69,3 +69,11 @@ func CreateAccountDestroyRecordRepository(options map[string]interface{}) (domai
}
return repository.NewAccountDestroyRecordRepository(transactionContext)
}
func CreateUpChainRepository(options map[string]interface{}) (domain.UpChainRepository, error) {
var transactionContext *pg.TransactionContext
if value, ok := options["transactionContext"]; ok {
transactionContext = value.(*pg.TransactionContext)
}
return repository.NewUpChainRepository(transactionContext)
}
... ...
package domain
import "time"
// 上链数据
type UpChain struct {
// 上链数据唯一标识
UpChainId int64 `json:"upChainId,string"`
// 数据来源 例如:app.model
Source string `json:"source"`
// 来源数据唯一ID
PrimaryId string `json:"primaryId"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `json:"issueId"`
// 数据体
Data string `json:"data"`
// 数据块hash
Hash string `json:"hash"`
// 上链状态 1:成功 2:失败
UpChainStatus int `json:"upChainStatus"`
// 创建时间
CreatedAt time.Time `json:"createdAt"`
}
type UpChainRepository interface {
Save(upChain *UpChain) (*UpChain, error)
Remove(upChain *UpChain) (*UpChain, error)
FindOne(queryOptions map[string]interface{}) (*UpChain, error)
Find(queryOptions map[string]interface{}) (int64, []*UpChain, error)
}
func (upChain *UpChain) Identify() interface{} {
if upChain.UpChainId == 0 {
return nil
}
return upChain.UpChainId
}
func (upChain *UpChain) Update(data map[string]interface{}) error {
//if source, ok := data["source"]; ok {
// upChain.Source = source.(string)
//}
//if primaryId, ok := data["primaryId"]; ok {
// upChain.PrimaryId = primaryId.(string)
//}
//if issueId, ok := data["issueId"]; ok {
// upChain.IssueId = issueId.(string)
//}
//if data, ok := data["data"]; ok {
// upChain.Data = data.(string)
//}
if hash, ok := data["hash"]; ok {
upChain.Hash = hash.(string)
}
if upChainStatus, ok := data["upChainStatus"]; ok {
upChain.UpChainStatus = upChainStatus.(int)
}
if createdAt, ok := data["createdAt"]; ok {
upChain.CreatedAt = createdAt.(time.Time)
}
return nil
}
func (upChain *UpChain) UpSuccess(hashData string) {
if len(upChain.Hash) > 0 {
return
}
upChain.Hash = hashData
upChain.UpChainStatus = 1
}
func (upChain *UpChain) UpFail() {
if len(upChain.Hash) > 0 {
return
}
upChain.Hash = ""
upChain.UpChainStatus = 2
}
... ...
... ... @@ -8,7 +8,7 @@ import (
"github.com/beego/beego/v2/client/httplib"
"github.com/linmadan/egglib-go/utils/json"
"net/http"
"net/url"
"net/http/httputil"
"sort"
"time"
)
... ... @@ -16,7 +16,7 @@ import (
type (
BSNBlockChain struct {
PublicPem []byte
privatePem []byte
PrivatePem []byte
PublicKey string
Host string
}
... ... @@ -50,6 +50,8 @@ type (
InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
// type为3时必填
Value string `json:"value,omitempty"`
// 当type=1或者2必填,为false只显示密文,为true溯源才会显示原文
ShowValue bool `json:"showValue"`
}
GetTokenResponse struct {
Token string `json:"token"`
... ... @@ -68,17 +70,22 @@ func (c *BSNBlockChain) UpToChain(options *UpToChainOptions) (*UpToChainResponse
if err != nil {
return nil, err
}
data, _ := httputil.DumpRequest(req.GetRequest(), true)
fmt.Println(string(data))
var upToChainResponse UpToChainResponse
_, err = c.HandlerResponse(req, &upToChainResponse)
return &upToChainResponse, err
}
// 浏览器溯源验真申请
func (c *BSNBlockChain) GetToken(options *GetTokenRequest) (*GetTokenResponse, error) {
req, err := c.MakeRequest(options, "/getToken", "getToken", http.MethodPost)
req, err := c.MakeRequest(options, "/chainApi/getToken", "getToken", http.MethodPost)
if err != nil {
return nil, err
}
data, _ := httputil.DumpRequest(req.GetRequest(), true)
fmt.Println(string(data))
var getTokenResponse = GetTokenResponse{}
_, err = c.HandlerResponse(req, &getTokenResponse)
return &getTokenResponse, err
... ... @@ -100,7 +107,8 @@ func (c *BSNBlockChain) Signature(body map[string]interface{}, method string) (s
}
encryptString.WriteString(fmt.Sprintf("method=%v", method))
encryptData, err := RsaEncrypt(c.PublicPem, encryptString.Bytes())
// 此处用私钥签名
encryptData, err := RsaSign(c.PrivatePem, encryptString.Bytes())
if err != nil {
return "", err
}
... ... @@ -116,8 +124,8 @@ func (c *BSNBlockChain) MakeRequest(obj interface{}, action string, signAction,
return nil, err
}
req := httplib.NewBeegoRequest(c.Host+action, httpMethod)
req.Header("pubKey", url.QueryEscape(string(c.PublicKey)))
req.Header("signature", url.QueryEscape(secret))
req.Header("pubKey", c.PublicKey) //url.QueryEscape(string(c.PublicKey))
req.Header("signature", secret) //url.QueryEscape(secret)
req.SetTimeout(time.Second*5, time.Second*5)
if httpMethod == http.MethodPost || httpMethod == http.MethodPut {
req.JSONBody(obj)
... ... @@ -127,6 +135,7 @@ func (c *BSNBlockChain) MakeRequest(obj interface{}, action string, signAction,
func (c *BSNBlockChain) HandlerResponse(req *httplib.BeegoHTTPRequest, value interface{}) (*Response, error) {
response := &Response{}
//req.DumpBody(true)
data, err := req.Bytes()
if err != nil {
return nil, err
... ... @@ -142,6 +151,9 @@ func (c *BSNBlockChain) HandlerResponse(req *httplib.BeegoHTTPRequest, value int
if err != nil {
return nil, err
}
if response.Code != 0 {
return nil, fmt.Errorf("upchain code:%v msg:%v", response.Code, response.Message)
}
json.Unmarshal(response.Data, value)
return response, nil
}
... ...
... ... @@ -14,6 +14,24 @@ import (
"testing"
)
//var priK = []byte(`-----BEGIN RSA PRIVATE KEY-----
//MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
//MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
//0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
//GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
//31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
//6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
//FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
//OZZReuOyvCM=
//-----END RSA PRIVATE KEY-----`)
//var pubPem = `-----BEGIN PUBLIC KEY-----
//MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
//yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
//-----END PUBLIC KEY-----`
//var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT\nyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
//
//var host = "http://allied-creation-gateway-dev.fjmaimaimai.com"
var priK = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
... ... @@ -28,15 +46,15 @@ var pubPem = `-----BEGIN PUBLIC KEY-----
MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
-----END PUBLIC KEY-----`
var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT\nyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLTyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
var host = "http://allied-creation-gateway-dev.fjmaimaimai.com"
var host = "http://101.34.29.149:9092/test"
func TestSignature(t *testing.T) {
options := NewUpToChainOptions("table", "1", "149848948").WithDesc("desc")
//options := NewUpToChainOptions("table", "", "").WithDesc("")
bsn := &BSNBlockChain{
privatePem: priK,
PrivatePem: priK,
PublicPem: []byte(pubPem),
}
bInfo := &UpToChainRequest{}
... ... @@ -108,12 +126,14 @@ func TestBSNBlockChain_UpToChain(t *testing.T) {
PublicPem: []byte(pubPem),
Host: host,
PublicKey: pubKey,
PrivatePem: priK,
}
options := NewUpToChainOptions("table", "1", "149848948").WithDesc("desc")
_, err := bc.UpToChain(options)
options := NewUpToChainOptions("table", "2", "149848948").WithDesc("desc")
rsp, err := bc.UpToChain(options)
if err != nil {
t.Fatal(err)
}
t.Log(string(*rsp))
}
func TestBSNBlockChain_GetToken(t *testing.T) {
... ... @@ -121,13 +141,16 @@ func TestBSNBlockChain_GetToken(t *testing.T) {
PublicPem: []byte(pubPem),
Host: host,
PublicKey: pubKey,
PrivatePem: priK,
}
options := &GetTokenRequest{
Type: 1,
TsTxId: "",
TsTxId: "54df75d3bead65d144a1123d1f18af8bb4db65420c5c449631e9a93b81fcdb93",
ShowValue: true,
}
_, err := bc.GetToken(options)
token, err := bc.GetToken(options)
if err != nil {
t.Fatal(err)
}
t.Log(token.Token)
}
... ...
... ... @@ -2,17 +2,17 @@ package blockchain
type UpToChainOptions struct {
// 上链数据的数据库、数据表等的标识值 (非必填)
InnerDBTable string `json:"innerDBTable"`
InnerDBTable string `json:"innerDBTable,omitempty"`
// 上链数据的唯一标识主键值 (非必填)
InnerPrimaryKey string `json:"innerPrimaryKey"`
InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
// 上链记录的一个标记值(IssueId), 数据溯源出所有相关事件内容,例如快递单号,过滤出该快递的所有相关事件内容并用于展示 (非必填)
InnerPrimaryIssueId string `json:"innerPrimaryIssueId"`
InnerPrimaryIssueId string `json:"innerPrimaryIssueId,omitempty"`
// 作用与key1相同 (非必填)
InnerSecondIssueId string `json:"innerSecondIssueId"`
InnerSecondIssueId string `json:"innerSecondIssueId,omitempty"`
// 数据原文 (必填)
Value string `json:"value"`
// 数据描述: 对value的描述,无论needHash为何值,本字段均会原文存储到链上
Desc string `json:"desc"`
Desc string `json:"desc,omitempty"`
// 是否哈希: true: 需要哈希,会将value进行hash上链,false:不需要哈希,明文上链,链上所有用户都可看到明文,默认false
NeedHash bool `json:"needHash"`
}
... ...
package blockchain
import (
"crypto"
"crypto/md5"
"crypto/rand"
"crypto/rsa"
... ... @@ -11,8 +12,29 @@ import (
"fmt"
)
// 加密
func RsaEncrypt(publicKey []byte, origData []byte) ([]byte, error) {
// rsa签名
func RsaSign(publicKey []byte, origData []byte) ([]byte, error) {
block, _ := pem.Decode(publicKey)
if block == nil {
return nil, errors.New("public key error")
}
pubInterface, err := x509.ParsePKCS8PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
fmt.Println(string(origData))
// md5
hash := md5.New()
hash.Write([]byte(origData))
pub := pubInterface.(*rsa.PrivateKey)
fmt.Println(hash.Sum(nil))
return rsa.SignPKCS1v15(rand.Reader, pub, crypto.MD5, hash.Sum(nil))
//pub := pubInterface.(*rsa.PublicKey)
//return rsa.EncryptPKCS1v15(rand.Reader, pub, origData)
}
func RsaEncryptBak(publicKey []byte, origData []byte) ([]byte, error) {
block, _ := pem.Decode(publicKey)
if block == nil {
return nil, errors.New("public key error")
... ... @@ -22,6 +44,7 @@ func RsaEncrypt(publicKey []byte, origData []byte) ([]byte, error) {
return nil, err
}
fmt.Println(string(origData))
// md5
hash := md5.New()
hash.Write([]byte(origData))
... ...
package models
import "time"
type UpChain struct {
tableName string `comment:"上链数据" pg:"up_chain,alias:up_chain"`
// 上链数据唯一标识
UpChainId int64 `comment:"上链数据唯一标识" pg:"pk:up_chain_id"`
// 数据来源 例如:app.model
Source string `comment:"数据来源 例如:app.model"`
// 来源数据唯一ID
PrimaryId string `comment:"来源数据唯一ID"`
// 溯源ID 标记同一个系列的数据;例如订单相关事件
IssueId string `comment:"溯源ID 标记同一个系列的数据;例如订单相关事件"`
// 数据体
Data string `comment:"数据体"`
// 数据块hash
Hash string `comment:"数据块hash"`
// 上链状态 1:成功 2:失败
UpChainStatus int `comment:"上链状态 1:成功 2:失败"`
// 创建时间
CreatedAt time.Time `comment:"创建时间"`
}
... ...
package transform
import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/models"
)
func TransformToUpChainDomainModelFromPgModels(upChainModel *models.UpChain) (*domain.UpChain, error) {
return &domain.UpChain{
UpChainId: upChainModel.UpChainId,
Source: upChainModel.Source,
PrimaryId: upChainModel.PrimaryId,
IssueId: upChainModel.IssueId,
Data: upChainModel.Data,
Hash: upChainModel.Hash,
UpChainStatus: upChainModel.UpChainStatus,
CreatedAt: upChainModel.CreatedAt,
}, nil
}
... ...
package repository
import (
"fmt"
"github.com/go-pg/pg/v10"
"github.com/linmadan/egglib-go/persistent/pg/sqlbuilder"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"github.com/linmadan/egglib-go/utils/snowflake"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/models"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/infrastructure/pg/transform"
)
type UpChainRepository struct {
transactionContext *pgTransaction.TransactionContext
}
func (repository *UpChainRepository) nextIdentify() (int64, error) {
IdWorker, err := snowflake.NewIdWorker(1)
if err != nil {
return 0, err
}
id, err := IdWorker.NextId()
return id, err
}
func (repository *UpChainRepository) Save(upChain *domain.UpChain) (*domain.UpChain, error) {
sqlBuildFields := []string{
"up_chain_id",
"source",
"primary_id",
"issue_id",
"data",
"hash",
"up_chain_status",
"created_at",
}
insertFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields)
insertPlaceHoldersSnippet := sqlbuilder.SqlPlaceHoldersSnippet(sqlBuildFields)
returningFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlBuildFields)
updateFields := sqlbuilder.RemoveSqlFields(sqlBuildFields, "upChain_id")
updateFieldsSnippet := sqlbuilder.SqlUpdateFieldsSnippet(updateFields)
tx := repository.transactionContext.PgTx
if upChain.Identify() == nil {
upChainId, err := repository.nextIdentify()
if err != nil {
return upChain, err
} else {
upChain.UpChainId = upChainId
}
if _, err := tx.QueryOne(
pg.Scan(
&upChain.UpChainId,
&upChain.Source,
&upChain.PrimaryId,
&upChain.IssueId,
&upChain.Data,
&upChain.Hash,
&upChain.UpChainStatus,
&upChain.CreatedAt,
),
fmt.Sprintf("INSERT INTO up_chains (%s) VALUES (%s) RETURNING %s", insertFieldsSnippet, insertPlaceHoldersSnippet, returningFieldsSnippet),
upChain.UpChainId,
upChain.Source,
upChain.PrimaryId,
upChain.IssueId,
upChain.Data,
upChain.Hash,
upChain.UpChainStatus,
upChain.CreatedAt,
); err != nil {
return upChain, err
}
} else {
if _, err := tx.QueryOne(
pg.Scan(
&upChain.UpChainId,
&upChain.Source,
&upChain.PrimaryId,
&upChain.IssueId,
&upChain.Data,
&upChain.Hash,
&upChain.UpChainStatus,
&upChain.CreatedAt,
),
fmt.Sprintf("UPDATE up_chains SET %s WHERE up_chain_id=? RETURNING %s", updateFieldsSnippet, returningFieldsSnippet),
upChain.Source,
upChain.PrimaryId,
upChain.IssueId,
upChain.Data,
upChain.Hash,
upChain.UpChainStatus,
upChain.CreatedAt,
upChain.Identify(),
); err != nil {
return upChain, err
}
}
return upChain, nil
}
func (repository *UpChainRepository) Remove(upChain *domain.UpChain) (*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
upChainModel := new(models.UpChain)
upChainModel.UpChainId = upChain.Identify().(int64)
if _, err := tx.Model(upChainModel).WherePK().Delete(); err != nil {
return upChain, err
}
return upChain, nil
}
func (repository *UpChainRepository) FindOne(queryOptions map[string]interface{}) (*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
upChainModel := new(models.UpChain)
query := sqlbuilder.BuildQuery(tx.Model(upChainModel), queryOptions)
query.SetWhereByQueryOption("up_chain.up_chain_id = ?", "upChainId")
query.SetWhereByQueryOption("up_chain.source = ?", "source")
query.SetWhereByQueryOption("up_chain.primary_id = ?", "primaryId")
if err := query.First(); err != nil {
if err.Error() == "pg: no rows in result set" {
return nil, fmt.Errorf("没有此资源")
} else {
return nil, err
}
}
if upChainModel.UpChainId == 0 {
return nil, nil
} else {
return transform.TransformToUpChainDomainModelFromPgModels(upChainModel)
}
}
func (repository *UpChainRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.UpChain, error) {
tx := repository.transactionContext.PgTx
var upChainModels []*models.UpChain
upChains := make([]*domain.UpChain, 0)
query := sqlbuilder.BuildQuery(tx.Model(&upChainModels), queryOptions)
query.SetOffsetAndLimit(20)
query.SetOrderDirect("up_chain_id", "DESC")
if count, err := query.SelectAndCount(); err != nil {
return 0, upChains, err
} else {
for _, upChainModel := range upChainModels {
if upChain, err := transform.TransformToUpChainDomainModelFromPgModels(upChainModel); err != nil {
return 0, upChains, err
} else {
upChains = append(upChains, upChain)
}
}
return int64(count), upChains, nil
}
}
func NewUpChainRepository(transactionContext *pgTransaction.TransactionContext) (*UpChainRepository, error) {
if transactionContext == nil {
return nil, fmt.Errorf("transactionContext参数不能为nil")
} else {
return &UpChainRepository{
transactionContext: transactionContext,
}, nil
}
}
... ...
... ... @@ -2,10 +2,20 @@ package goqueue
import (
"fmt"
"github.com/linmadan/egglib-go/utils/json"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/command"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/application/blockChain/service"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
)
func UpToChainHandler(k, v string) error {
log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})
return nil
blockChainService := service.NewBlockChainService(nil)
upChainCommand := &command.UpChainCommand{}
err := json.UnmarshalFromString(v, upChainCommand)
if err != nil {
return err
}
_, err = blockChainService.UpChain(upChainCommand)
return err
}
... ...