作者 yangfu

消息订阅

@@ -20,6 +20,8 @@ require ( @@ -20,6 +20,8 @@ require (
20 github.com/sergi/go-diff v1.2.0 // indirect 20 github.com/sergi/go-diff v1.2.0 // indirect
21 github.com/smartystreets/goconvey v1.6.4 // indirect 21 github.com/smartystreets/goconvey v1.6.4 // indirect
22 github.com/stretchr/testify v1.7.0 22 github.com/stretchr/testify v1.7.0
  23 + github.com/tal-tech/go-queue v1.0.5
  24 + github.com/tal-tech/go-zero v1.0.27
23 github.com/valyala/fasthttp v1.23.0 // indirect 25 github.com/valyala/fasthttp v1.23.0 // indirect
24 github.com/xeipuuv/gojsonschema v1.2.0 // indirect 26 github.com/xeipuuv/gojsonschema v1.2.0 // indirect
25 github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect 27 github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
@@ -21,6 +21,7 @@ github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+Dx @@ -21,6 +21,7 @@ github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+Dx
21 github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= 21 github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
22 github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= 22 github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
23 github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= 23 github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
  24 +github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
24 github.com/beego/beego/v2 v2.0.1 h1:07a7Z0Ok5vbqyqh+q53sDPl9LdhKh0ZDy3gbyGrhFnE= 25 github.com/beego/beego/v2 v2.0.1 h1:07a7Z0Ok5vbqyqh+q53sDPl9LdhKh0ZDy3gbyGrhFnE=
25 github.com/beego/beego/v2 v2.0.1/go.mod h1:8zyHi1FnWO1mZLwTn62aKRIZF/aIKvkCBB2JYs+eqQI= 26 github.com/beego/beego/v2 v2.0.1/go.mod h1:8zyHi1FnWO1mZLwTn62aKRIZF/aIKvkCBB2JYs+eqQI=
26 github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ= 27 github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ=
@@ -179,6 +180,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b @@ -179,6 +180,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
179 github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= 180 github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
180 github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 181 github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
181 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= 182 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
  183 +github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
182 github.com/iancoleman/strcase v0.1.2/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= 184 github.com/iancoleman/strcase v0.1.2/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
183 github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk= 185 github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk=
184 github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= 186 github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
@@ -204,6 +206,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW @@ -204,6 +206,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
204 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= 206 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
205 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 207 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
206 github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= 208 github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
  209 +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
207 github.com/klauspost/compress v1.11.8 h1:difgzQsp5mdAz9v8lm3P/I+EpDKMU/6uTMw1y1FObuo= 210 github.com/klauspost/compress v1.11.8 h1:difgzQsp5mdAz9v8lm3P/I+EpDKMU/6uTMw1y1FObuo=
208 github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= 211 github.com/klauspost/compress v1.11.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
209 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= 212 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -312,6 +315,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L @@ -312,6 +315,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
312 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= 315 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
313 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 316 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
314 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 317 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
  318 +github.com/segmentio/kafka-go v0.4.2 h1:QXZ6q9Bu1JkAJQ/CQBb2Av8pFRG8LQ0kWCrLXgQyL8c=
  319 +github.com/segmentio/kafka-go v0.4.2/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM=
315 github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= 320 github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
316 github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= 321 github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
317 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo= 322 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo=
@@ -347,6 +352,9 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc @@ -347,6 +352,9 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
347 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 352 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
348 github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= 353 github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
349 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= 354 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
  355 +github.com/tal-tech/go-queue v1.0.5 h1:cd2o0lPjAFJKIXuEbQvsGypUhzz6FLib4FVVAyxsMtY=
  356 +github.com/tal-tech/go-queue v1.0.5/go.mod h1:gQK4Eg8pqel8Z9r1hjlSXbJFavLeJQVyTSwBKeAnpm8=
  357 +github.com/tal-tech/go-zero v1.0.21/go.mod h1:llP5PQjnATfnzZo/lo5unjR41njzoL3lkGO/KXbnisw=
350 github.com/tal-tech/go-zero v1.0.27 h1:QMIbaTxibMc/OsO5RTAuKZ8ndbl2dGN6pITQEtp2x/A= 358 github.com/tal-tech/go-zero v1.0.27 h1:QMIbaTxibMc/OsO5RTAuKZ8ndbl2dGN6pITQEtp2x/A=
351 github.com/tal-tech/go-zero v1.0.27/go.mod h1:JtNXlsh/CgeIHyQnt5C5M2IcSevW7V0NAnqO93TQgm8= 359 github.com/tal-tech/go-zero v1.0.27/go.mod h1:JtNXlsh/CgeIHyQnt5C5M2IcSevW7V0NAnqO93TQgm8=
352 github.com/tiptok/egglib-go v0.0.0-20210608073225-c852ce95ae34 h1:9iDNyYbfpv5KLWDLpDywD/aIODg+PNnwn+v9on7KGlE= 360 github.com/tiptok/egglib-go v0.0.0-20210608073225-c852ce95ae34 h1:9iDNyYbfpv5KLWDLpDywD/aIODg+PNnwn+v9on7KGlE=
@@ -357,6 +365,7 @@ github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYm @@ -357,6 +365,7 @@ github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYm
357 github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= 365 github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs=
358 github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= 366 github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
359 github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= 367 github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
  368 +github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
360 github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= 369 github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
361 github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= 370 github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
362 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= 371 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
@@ -373,7 +382,9 @@ github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgq @@ -373,7 +382,9 @@ github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgq
373 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= 382 github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
374 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= 383 github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
375 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= 384 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
  385 +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
376 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= 386 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
  387 +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
377 github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= 388 github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
378 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= 389 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
379 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= 390 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
@@ -423,6 +434,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf @@ -423,6 +434,7 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
423 golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= 434 golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
424 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 435 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
425 golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= 436 golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
  437 +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
426 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 438 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
427 golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 439 golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
428 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 440 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
11 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log" 11 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
12 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log" 12 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
13 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/beego" 13 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/beego"
  14 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/port/goqueue"
14 ) 15 )
15 16
16 func main() { 17 func main() {
@@ -28,6 +29,8 @@ func main() { @@ -28,6 +29,8 @@ func main() {
28 }) 29 })
29 log.Logger.AddHook(bw) 30 log.Logger.AddHook(bw)
30 31
  32 + goqueue.SetUp()
  33 +
31 log.Logger.Info("server start!") 34 log.Logger.Info("server start!")
32 web.Run() 35 web.Run()
33 } 36 }
@@ -4,9 +4,11 @@ import "os" @@ -4,9 +4,11 @@ import "os"
4 4
5 var ( 5 var (
6 // kafka 地址 6 // kafka 地址
7 - KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" 7 + KAFKA_HOST = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.75.231.90:9092"
8 // kafka topic log stash 8 // kafka topic log stash
9 - TOPIC_LOG_STASH = "go_stash_dev" 9 + TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
  10 + // kafka topic up_block_chain
  11 + TOPIC_UP_BLOCK_CHAIN = "pushMessage"
10 // 是否启用日志收集 (本地不启用) 12 // 是否启用日志收集 (本地不启用)
11 ENABLE_KAFKA_LOG = false 13 ENABLE_KAFKA_LOG = false
12 ) 14 )
  1 +package blockchain
  2 +
  3 +import (
  4 + "bytes"
  5 + "encoding/base64"
  6 + rawjson "encoding/json"
  7 + "fmt"
  8 + "github.com/beego/beego/v2/client/httplib"
  9 + "github.com/linmadan/egglib-go/utils/json"
  10 + "net/http"
  11 + "net/url"
  12 + "sort"
  13 + "time"
  14 +)
  15 +
  16 +type (
  17 + BSNBlockChain struct {
  18 + PublicPem []byte
  19 + privatePem []byte
  20 + PublicKey string
  21 + Host string
  22 + }
  23 + UpToChainRequest struct {
  24 + // 上链数据的数据库、数据表等的标识值 (非必填)
  25 + InnerDBTable string `json:"innerDBTable,omitempty"`
  26 + // 上链数据的唯一标识主键值 (非必填)
  27 + InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
  28 + // 上链记录的一个标记值(IssueId), 数据溯源出所有相关事件内容,例如快递单号,过滤出该快递的所有相关事件内容并用于展示 (非必填)
  29 + InnerPrimaryIssueId string `json:"innerPrimaryIssueId,omitempty"`
  30 + // 作用与key1相同 (非必填)
  31 + InnerSecondIssueId string `json:"innerSecondIssueId,omitempty"`
  32 + // 数据原文 (必填)
  33 + Value string `json:"value,omitempty"`
  34 + // 数据描述: 对value的描述,无论needHash为何值,本字段均会原文存储到链上
  35 + Desc string `json:"desc,omitempty"`
  36 + // 是否哈希: true: 需要哈希,会将value进行hash上链,false:不需要哈希,明文上链,链上所有用户都可看到明文,默认false
  37 + NeedHash bool `json:"needHash"`
  38 + }
  39 + UpToChainResponse string
  40 +
  41 + GetTokenRequest struct {
  42 + // 操作类型:
  43 + //1-交易哈希溯源
  44 + //2-溯源ID溯源
  45 + //3-验真
  46 + Type int `json:"type"`
  47 + // type为1或者3时必填
  48 + TsTxId string `json:"tsTxId,omitempty"`
  49 + // type为2时必填
  50 + InnerPrimaryKey string `json:"innerPrimaryKey,omitempty"`
  51 + // type为3时必填
  52 + Value string `json:"value,omitempty"`
  53 + }
  54 + GetTokenResponse struct {
  55 + Token string `json:"token"`
  56 + }
  57 +
  58 + Response struct {
  59 + Data rawjson.RawMessage `json:"data"`
  60 + Code int `json:"code"`
  61 + Message string `json:"message"`
  62 + }
  63 +)
  64 +
  65 +// 上链
  66 +func (c *BSNBlockChain) UpToChain(options *UpToChainOptions) (*UpToChainResponse, error) {
  67 + req, err := c.MakeRequest(options, "/chainApi/upToChain", "upToChain", http.MethodPost)
  68 + if err != nil {
  69 + return nil, err
  70 + }
  71 + var upToChainResponse UpToChainResponse
  72 + _, err = c.HandlerResponse(req, &upToChainResponse)
  73 + return &upToChainResponse, err
  74 +}
  75 +
  76 +// 浏览器溯源验真申请
  77 +func (c *BSNBlockChain) GetToken(options *GetTokenRequest) (*GetTokenResponse, error) {
  78 + req, err := c.MakeRequest(options, "/getToken", "getToken", http.MethodPost)
  79 + if err != nil {
  80 + return nil, err
  81 + }
  82 + var getTokenResponse = GetTokenResponse{}
  83 + _, err = c.HandlerResponse(req, &getTokenResponse)
  84 + return &getTokenResponse, err
  85 +}
  86 +
  87 +// 签名
  88 +func (c *BSNBlockChain) Signature(body map[string]interface{}, method string) (string, error) {
  89 + var keys []string
  90 + for key, _ := range body {
  91 + keys = append(keys, key)
  92 + }
  93 + sort.Strings(keys)
  94 + encryptString := bytes.NewBuffer(nil)
  95 + for i := range keys {
  96 + key := keys[i]
  97 + if v, ok := body[key]; ok {
  98 + encryptString.WriteString(fmt.Sprintf("%s=%v&", key, v))
  99 + }
  100 + }
  101 + encryptString.WriteString(fmt.Sprintf("method=%v", method))
  102 +
  103 + encryptData, err := RsaEncrypt(c.PublicPem, encryptString.Bytes())
  104 + if err != nil {
  105 + return "", err
  106 + }
  107 +
  108 + return base64.StdEncoding.EncodeToString(encryptData), nil
  109 +}
  110 +
  111 +func (c *BSNBlockChain) MakeRequest(obj interface{}, action string, signAction, httpMethod string) (*httplib.BeegoHTTPRequest, error) {
  112 + var mapBlockInfo = make(map[string]interface{})
  113 + json.UnmarshalFromString(json.MarshalToString(obj), &mapBlockInfo)
  114 + secret, err := c.Signature(mapBlockInfo, signAction)
  115 + if err != nil {
  116 + return nil, err
  117 + }
  118 + req := httplib.NewBeegoRequest(c.Host+action, httpMethod)
  119 + req.Header("pubKey", url.QueryEscape(string(c.PublicKey)))
  120 + req.Header("signature", url.QueryEscape(secret))
  121 + req.SetTimeout(time.Second*5, time.Second*5)
  122 + if httpMethod == http.MethodPost || httpMethod == http.MethodPut {
  123 + req.JSONBody(obj)
  124 + }
  125 + return req, nil
  126 +}
  127 +
  128 +func (c *BSNBlockChain) HandlerResponse(req *httplib.BeegoHTTPRequest, value interface{}) (*Response, error) {
  129 + response := &Response{}
  130 + data, err := req.Bytes()
  131 + if err != nil {
  132 + return nil, err
  133 + }
  134 + rsp, err := req.Response()
  135 + if err != nil {
  136 + return nil, err
  137 + }
  138 + if rsp.StatusCode != http.StatusOK {
  139 + return nil, fmt.Errorf("response code:%v status:%v", rsp.StatusCode, rsp.Status)
  140 + }
  141 + err = json.Unmarshal(data, response)
  142 + if err != nil {
  143 + return nil, err
  144 + }
  145 + json.Unmarshal(response.Data, value)
  146 + return response, nil
  147 +}
  148 +
  149 +func (b *UpToChainRequest) Complete(options *UpToChainOptions) {
  150 + b.InnerDBTable = options.InnerDBTable
  151 + b.InnerPrimaryKey = options.InnerPrimaryKey
  152 + b.InnerPrimaryIssueId = options.InnerPrimaryIssueId
  153 + b.InnerSecondIssueId = options.InnerSecondIssueId
  154 + b.Value = options.Value
  155 + b.Desc = options.Desc
  156 + b.NeedHash = options.NeedHash
  157 +}
  1 +package blockchain
  2 +
  3 +import (
  4 + "bytes"
  5 + "crypto/rand"
  6 + "crypto/rsa"
  7 + "crypto/x509"
  8 + "encoding/pem"
  9 + "fmt"
  10 + "github.com/linmadan/egglib-go/utils/json"
  11 + "github.com/stretchr/testify/assert"
  12 + "log"
  13 + "os"
  14 + "testing"
  15 +)
  16 +
  17 +var priK = []byte(`-----BEGIN RSA PRIVATE KEY-----
  18 +MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEA2H6x0D1mg5QbXfU7
  19 +MZKltypRj+eZktPKIApyEqRsyLqe3sRSd1Eh+VqKlQ9QFI8dae3t0USWlVmyfIDM
  20 +0ly85QIDAQABAkAPnKNJ9wOLfYSzs9l+66pTmROkovjqI6exw88SFRVbLCgM8maa
  21 +GOWEP/nhZDlQYBKHUqG0/KsLkeyLGkE8N7JBAiEA8lM3npA3q+Kmhy+lmQbfHFPQ
  22 +31OSkA+RaW/LPn0lP50CIQDktlF3iDk5kxnzgT/3lvvKhHInUh+pH5F19C6MymMD
  23 +6QIgLxDct655MahnAdDOUCeWhBD/e7DmwZZUfu8Ywb1a070CIArsUjO9Q85mIiUp
  24 +FR8EDP59GN6b43s2UMIraVW8DMKRAiEAnnMPbDsD2HsQbgmNNEqETUxYGVyO+p7w
  25 +OZZReuOyvCM=
  26 +-----END RSA PRIVATE KEY-----`)
  27 +var pubPem = `-----BEGIN PUBLIC KEY-----
  28 +MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT
  29 +yiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ==
  30 +-----END PUBLIC KEY-----`
  31 +var pubKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBANh+sdA9ZoOUG131OzGSpbcqUY/nmZLT\nyiAKchKkbMi6nt7EUndRIflaipUPUBSPHWnt7dFElpVZsnyAzNJcvOUCAwEAAQ=="
  32 +
  33 +var host = "http://allied-creation-gateway-dev.fjmaimaimai.com"
  34 +
  35 +func TestSignature(t *testing.T) {
  36 + options := NewUpToChainOptions("table", "1", "149848948").WithDesc("desc")
  37 + //options := NewUpToChainOptions("table", "", "").WithDesc("")
  38 + bsn := &BSNBlockChain{
  39 + privatePem: priK,
  40 + PublicPem: []byte(pubPem),
  41 + }
  42 + bInfo := &UpToChainRequest{}
  43 + bInfo.Complete(options)
  44 + var mapBlockInfo = make(map[string]interface{})
  45 + json.UnmarshalFromString(json.MarshalToString(bInfo), &mapBlockInfo)
  46 + secret, err := bsn.Signature(mapBlockInfo, "upToChain")
  47 + assert.Nil(t, err)
  48 + t.Log(secret)
  49 + decryptSecret, err := RsaDecrypt(priK, []byte(secret))
  50 + if err != nil {
  51 + t.Log(err.Error())
  52 + }
  53 + t.Log(decryptSecret)
  54 +}
  55 +
  56 +func TestGenerateRSA(t *testing.T) {
  57 + // generate key
  58 + privatekey, err := rsa.GenerateKey(rand.Reader, 512)
  59 + if err != nil {
  60 + fmt.Printf("Cannot generate RSA key\n")
  61 + os.Exit(1)
  62 + }
  63 + publickey := &privatekey.PublicKey
  64 +
  65 + // dump private key to file
  66 + var privateKeyBytes []byte = x509.MarshalPKCS1PrivateKey(privatekey)
  67 + privateKeyBlock := &pem.Block{
  68 + Type: "RSA PRIVATE KEY",
  69 + Bytes: privateKeyBytes,
  70 + }
  71 + privatePem := bytes.NewBuffer(nil)
  72 + if err != nil {
  73 + fmt.Printf("error when create private.pem: %s \n", err)
  74 + os.Exit(1)
  75 + }
  76 + err = pem.Encode(privatePem, privateKeyBlock)
  77 + if err != nil {
  78 + fmt.Printf("error when encode private pem: %s \n", err)
  79 + os.Exit(1)
  80 + }
  81 +
  82 + // dump public key to file
  83 + publicKeyBytes, err := x509.MarshalPKIXPublicKey(publickey)
  84 + if err != nil {
  85 + fmt.Printf("error when dumping publickey: %s \n", err)
  86 + os.Exit(1)
  87 + }
  88 + publicKeyBlock := &pem.Block{
  89 + Type: "PUBLIC KEY",
  90 + Bytes: publicKeyBytes,
  91 + }
  92 + publicPem := bytes.NewBuffer(nil)
  93 + if err != nil {
  94 + fmt.Printf("error when create public.pem: %s \n", err)
  95 + os.Exit(1)
  96 + }
  97 + err = pem.Encode(publicPem, publicKeyBlock)
  98 + if err != nil {
  99 + fmt.Printf("error when encode public pem: %s \n", err)
  100 + os.Exit(1)
  101 + }
  102 + log.Println(privatePem.String())
  103 + log.Println(publicPem.String())
  104 +}
  105 +
  106 +func TestBSNBlockChain_UpToChain(t *testing.T) {
  107 + bc := &BSNBlockChain{
  108 + PublicPem: []byte(pubPem),
  109 + Host: host,
  110 + PublicKey: pubKey,
  111 + }
  112 + options := NewUpToChainOptions("table", "1", "149848948").WithDesc("desc")
  113 + _, err := bc.UpToChain(options)
  114 + if err != nil {
  115 + t.Fatal(err)
  116 + }
  117 +}
  118 +
  119 +func TestBSNBlockChain_GetToken(t *testing.T) {
  120 + bc := &BSNBlockChain{
  121 + PublicPem: []byte(pubPem),
  122 + Host: host,
  123 + PublicKey: pubKey,
  124 + }
  125 + options := &GetTokenRequest{
  126 + Type: 1,
  127 + TsTxId: "",
  128 + }
  129 + _, err := bc.GetToken(options)
  130 + if err != nil {
  131 + t.Fatal(err)
  132 + }
  133 +}
  1 +package blockchain
  2 +
  3 +type UpToChainOptions struct {
  4 + // 上链数据的数据库、数据表等的标识值 (非必填)
  5 + InnerDBTable string `json:"innerDBTable"`
  6 + // 上链数据的唯一标识主键值 (非必填)
  7 + InnerPrimaryKey string `json:"innerPrimaryKey"`
  8 + // 上链记录的一个标记值(IssueId), 数据溯源出所有相关事件内容,例如快递单号,过滤出该快递的所有相关事件内容并用于展示 (非必填)
  9 + InnerPrimaryIssueId string `json:"innerPrimaryIssueId"`
  10 + // 作用与key1相同 (非必填)
  11 + InnerSecondIssueId string `json:"innerSecondIssueId"`
  12 + // 数据原文 (必填)
  13 + Value string `json:"value"`
  14 + // 数据描述: 对value的描述,无论needHash为何值,本字段均会原文存储到链上
  15 + Desc string `json:"desc"`
  16 + // 是否哈希: true: 需要哈希,会将value进行hash上链,false:不需要哈希,明文上链,链上所有用户都可看到明文,默认false
  17 + NeedHash bool `json:"needHash"`
  18 +}
  19 +
  20 +func NewUpToChainOptions(table, primaryKey, value string) *UpToChainOptions {
  21 + return &UpToChainOptions{InnerDBTable: table, InnerPrimaryKey: primaryKey, Value: value, NeedHash: false}
  22 +}
  23 +
  24 +func (o *UpToChainOptions) WithInnerDBTable(innerDBTable string) *UpToChainOptions {
  25 + o.InnerDBTable = innerDBTable
  26 + return o
  27 +}
  28 +
  29 +func (o *UpToChainOptions) WithInnerPrimaryKey(innerPrimaryKey string) *UpToChainOptions {
  30 + o.InnerPrimaryKey = innerPrimaryKey
  31 + return o
  32 +}
  33 +
  34 +func (o *UpToChainOptions) WithInnerPrimaryIssueId(innerPrimaryIssueId string) *UpToChainOptions {
  35 + o.InnerPrimaryIssueId = innerPrimaryIssueId
  36 + return o
  37 +}
  38 +
  39 +func (o *UpToChainOptions) WithInnerSecondIssueId(innerSecondIssueId string) *UpToChainOptions {
  40 + o.InnerSecondIssueId = innerSecondIssueId
  41 + return o
  42 +}
  43 +
  44 +func (o *UpToChainOptions) WithValue(Value string) *UpToChainOptions {
  45 + o.Value = Value
  46 + return o
  47 +}
  48 +
  49 +func (o *UpToChainOptions) WithDesc(Desc string) *UpToChainOptions {
  50 + o.Desc = Desc
  51 + return o
  52 +}
  53 +
  54 +func (o *UpToChainOptions) WithNeedHash() *UpToChainOptions {
  55 + o.NeedHash = true
  56 + return o
  57 +}
  1 +package blockchain
  2 +
  3 +import (
  4 + "crypto/md5"
  5 + "crypto/rand"
  6 + "crypto/rsa"
  7 + "crypto/x509"
  8 + "encoding/base64"
  9 + "encoding/pem"
  10 + "errors"
  11 + "fmt"
  12 +)
  13 +
  14 +// 加密
  15 +func RsaEncrypt(publicKey []byte, origData []byte) ([]byte, error) {
  16 + block, _ := pem.Decode(publicKey)
  17 + if block == nil {
  18 + return nil, errors.New("public key error")
  19 + }
  20 + pubInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
  21 + if err != nil {
  22 + return nil, err
  23 + }
  24 +
  25 + // md5
  26 + hash := md5.New()
  27 + hash.Write([]byte(origData))
  28 + pub := pubInterface.(*rsa.PublicKey)
  29 + fmt.Println(hash.Sum(nil))
  30 + return rsa.EncryptPKCS1v15(rand.Reader, pub, hash.Sum(nil))
  31 + //pub := pubInterface.(*rsa.PublicKey)
  32 + //return rsa.EncryptPKCS1v15(rand.Reader, pub, origData)
  33 +}
  34 +
  35 +// 解密
  36 +func RsaDecrypt(privateKey []byte, ciphertext []byte) ([]byte, error) {
  37 + block, _ := pem.Decode(privateKey)
  38 + if block == nil {
  39 + return nil, errors.New("private key error!")
  40 + }
  41 + encryptData, _ := base64.StdEncoding.DecodeString(string(ciphertext))
  42 + priv, err := x509.ParsePKCS1PrivateKey(block.Bytes)
  43 + if err != nil {
  44 + // pkcs1 是标准但裸奔,pkcs8升级支持密码
  45 + pri2, err := x509.ParsePKCS8PrivateKey(block.Bytes)
  46 + if err != nil {
  47 + return nil, err
  48 + }
  49 + priv = pri2.(*rsa.PrivateKey)
  50 + }
  51 + return rsa.DecryptPKCS1v15(rand.Reader, priv, encryptData)
  52 +}
  1 +package goqueue
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/tal-tech/go-queue/kq"
  6 + "github.com/tal-tech/go-zero/core/logx"
  7 + "github.com/tal-tech/go-zero/core/service"
  8 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/constant"
  9 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
  10 + "strings"
  11 +)
  12 +
  13 +func SetUp() {
  14 + go func() {
  15 + q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(UpToChainHandler))
  16 + defer func() {
  17 + q.Stop()
  18 + log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN))
  19 + }()
  20 + q.Start()
  21 + }()
  22 + log.Logger.Info("goqueue start!")
  23 +}
  24 +
  25 +func NewConfig(topic, group string, consumers int) kq.KqConf {
  26 + brokers := strings.Split(constant.KAFKA_HOST, ",")
  27 + return kq.KqConf{
  28 + ServiceConf: service.ServiceConf{
  29 + Name: topic,
  30 + Log: logx.LogConf{
  31 + Mode: "console",
  32 + },
  33 + Mode: "pro",
  34 + },
  35 + Brokers: brokers,
  36 + Group: group,
  37 + Topic: topic,
  38 + Offset: "first",
  39 + Conns: 1,
  40 + Consumers: consumers,
  41 + Processors: 4,
  42 + MinBytes: 10200,
  43 + MaxBytes: 10485760,
  44 + }
  45 +}
  1 +package goqueue
  2 +
  3 +import (
  4 + "fmt"
  5 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
  6 +)
  7 +
  8 +func UpToChainHandler(k, v string) error {
  9 + log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})
  10 + return nil
  11 +}