作者 yangfu

redis 更新

正在显示 26 个修改的文件 包含 230 行增加136 行删除
package common
import "testing"
func Test_Jwt(t *testing.T){
}
\ No newline at end of file
... ...
... ... @@ -7,9 +7,9 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/garyburd/redigo v1.6.0
github.com/gin-gonic/gin v1.4.0
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/gomodule/redigo v1.7.0
github.com/google/go-cmp v0.2.0
github.com/gorilla/websocket v1.4.1
github.com/lib/pq v1.2.0 // indirect
... ...
... ... @@ -189,6 +189,12 @@ func (u UUID) String() string {
return string(buf)
}
func (u UUID) StringNoDash() string {
buf := make([]byte, 32)
hex.Encode(buf, u[0:])
return string(buf)
}
//0001 时间的版本
//0010 DCE Security
//0011 MD5哈希
... ...
... ... @@ -2,6 +2,7 @@ package uid
import (
"fmt"
"strings"
"testing"
)
... ... @@ -15,4 +16,39 @@ func TestUID(t *testing.T){
}
fmt.Println("MarshalBinary:",udata)
fmt.Println("uuid version:",uid.Version())
uidStr36 :=uid.String()
uidStr32 :=uid.StringNoDash()
if strings.Replace(uidStr36,"-","",-1) != uidStr32{
t.Fatal("no equal",uidStr36,uidStr32)
}
t.Log(uidStr36,uidStr32)
}
func Test_StringNoDash(t *testing.T){
for i:=0;i<100;i++{
uid :=NewV1()
uidStr36 :=uid.String()
uidStr32 :=uid.StringNoDash()
if strings.Replace(uidStr36,"-","",-1) != uidStr32{
t.Fatal("no equal",uidStr36,uidStr32)
}
}
}
func Test_NewV1(t *testing.T){
num :=10000
mUid :=make(map[string]int,num)
for i:=0;i<num;i++{
uid :=NewV1()
uidStr36 :=uid.String()
if _,ok:=mUid[uidStr36];ok{
t.Fatal("repeat uid",uidStr36)
}else{
mUid[uidStr36]=0
}
}
if len(mUid)!=num{
t.Fatal("map num error")
}
}
... ...
... ... @@ -2,7 +2,7 @@ package redis
import (
"errors"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)
//设置指定hash指定key的值
... ...
... ... @@ -3,7 +3,7 @@ package redis
import (
"fmt"
"github.com/astaxie/beego"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
"sync"
)
... ...
... ... @@ -3,7 +3,7 @@ package redis
import (
"errors"
"fmt"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
"gitlab.fjmaimaimai.com/mmm-go/gocomm/config"
"time"
)
... ...
... ... @@ -2,8 +2,7 @@ package redis
import (
"errors"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)
//设置集合
... ...
... ... @@ -3,8 +3,7 @@ package redis
import (
"encoding/json"
"errors"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)
func Set(key string, v interface{}, timeout int64) error {
... ...
... ... @@ -2,7 +2,7 @@ package redis
import (
"errors"
"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)
//设置集合
... ...
... ... @@ -12,32 +12,33 @@
// License for the specific language governing permissions and limitations
// under the License.
package internal // import "github.com/garyburd/redigo/internal"
package redis
import (
"strings"
)
const (
WatchState = 1 << iota
MultiState
SubscribeState
MonitorState
connectionWatchState = 1 << iota
connectionMultiState
connectionSubscribeState
connectionMonitorState
)
type CommandInfo struct {
type commandInfo struct {
// Set or Clear these states on connection.
Set, Clear int
}
var commandInfos = map[string]CommandInfo{
"WATCH": {Set: WatchState},
"UNWATCH": {Clear: WatchState},
"MULTI": {Set: MultiState},
"EXEC": {Clear: WatchState | MultiState},
"DISCARD": {Clear: WatchState | MultiState},
"PSUBSCRIBE": {Set: SubscribeState},
"SUBSCRIBE": {Set: SubscribeState},
"MONITOR": {Set: MonitorState},
var commandInfos = map[string]commandInfo{
"WATCH": {Set: connectionWatchState},
"UNWATCH": {Clear: connectionWatchState},
"MULTI": {Set: connectionMultiState},
"EXEC": {Clear: connectionWatchState | connectionMultiState},
"DISCARD": {Clear: connectionWatchState | connectionMultiState},
"PSUBSCRIBE": {Set: connectionSubscribeState},
"SUBSCRIBE": {Set: connectionSubscribeState},
"MONITOR": {Set: connectionMonitorState},
}
func init() {
... ... @@ -46,7 +47,7 @@ func init() {
}
}
func LookupCommandInfo(commandName string) CommandInfo {
func lookupCommandInfo(commandName string) commandInfo {
if ci, ok := commandInfos[commandName]; ok {
return ci
}
... ...
... ... @@ -427,10 +427,21 @@ func (pe protocolError) Error() string {
return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe))
}
// readLine reads a line of input from the RESP stream.
func (c *conn) readLine() ([]byte, error) {
// To avoid allocations, attempt to read the line using ReadSlice. This
// call typically succeeds. The known case where the call fails is when
// reading the output from the MONITOR command.
p, err := c.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
return nil, protocolError("long response line")
// The line does not fit in the bufio.Reader's buffer. Fall back to
// allocating a buffer for the line.
buf := append([]byte{}, p...)
for err == bufio.ErrBufferFull {
p, err = c.br.ReadSlice('\n')
buf = append(buf, p...)
}
p = buf
}
if err != nil {
return nil, err
... ...
... ... @@ -14,7 +14,7 @@
// Package redis is a client for the Redis database.
//
// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more
// The Redigo FAQ (https://github.com/gomodule/redigo/wiki/FAQ) contains more
// documentation about this package.
//
// Connections
... ... @@ -174,4 +174,4 @@
// non-recoverable error such as a network error or protocol parsing error. If
// Err() returns a non-nil value, then the connection is not usable and should
// be closed.
package redis // import "github.com/garyburd/redigo/redis"
package redis
... ...
... ... @@ -30,13 +30,22 @@ func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
if prefix != "" {
prefix = prefix + "."
}
return &loggingConn{conn, logger, prefix}
return &loggingConn{conn, logger, prefix, nil}
}
//NewLoggingConnFilter returns a logging wrapper around a connection and a filter function.
func NewLoggingConnFilter(conn Conn, logger *log.Logger, prefix string, skip func(cmdName string) bool) Conn {
if prefix != "" {
prefix = prefix + "."
}
return &loggingConn{conn, logger, prefix, skip}
}
type loggingConn struct {
Conn
logger *log.Logger
prefix string
skip func(cmdName string) bool
}
func (c *loggingConn) Close() error {
... ... @@ -85,6 +94,9 @@ func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) {
}
func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) {
if c.skip != nil && c.skip(commandName) {
return
}
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s%s(", c.prefix, method)
if method != "Receive" {
... ...
... ... @@ -24,13 +24,11 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/garyburd/redigo/internal"
)
var (
_ ConnWithTimeout = (*pooledConnection)(nil)
_ ConnWithTimeout = (*errorConnection)(nil)
_ ConnWithTimeout = (*activeConn)(nil)
_ ConnWithTimeout = (*errorConn)(nil)
)
var nowFunc = time.Now // for testing
... ... @@ -150,6 +148,10 @@ type Pool struct {
// for a connection to be returned to the pool before returning.
Wait bool
// Close connections older than this duration. If the value is zero, then
// the pool does not close connections based on age.
MaxConnLifetime time.Duration
chInitialized uint32 // set to 1 when field ch is initialized
mu sync.Mutex // mu protects the following fields
... ... @@ -172,11 +174,11 @@ func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
c, err := p.get(nil)
pc, err := p.get(nil)
if err != nil {
return errorConnection{err}
return errorConn{err}
}
return &pooledConnection{p: p, c: c}
return &activeConn{p: p, pc: pc}
}
// PoolStats contains pool statistics.
... ... @@ -226,15 +228,15 @@ func (p *Pool) Close() error {
}
p.closed = true
p.active -= p.idle.count
ic := p.idle.front
pc := p.idle.front
p.idle.count = 0
p.idle.front, p.idle.back = nil, nil
if p.ch != nil {
close(p.ch)
}
p.mu.Unlock()
for ; ic != nil; ic = ic.next {
ic.c.Close()
for ; pc != nil; pc = pc.next {
pc.c.Close()
}
return nil
}
... ... @@ -265,7 +267,7 @@ func (p *Pool) lazyInit() {
func (p *Pool) get(ctx interface {
Done() <-chan struct{}
Err() error
}) (Conn, error) {
}) (*poolConn, error) {
// Handle limit for p.Wait == true.
if p.Wait && p.MaxActive > 0 {
... ... @@ -287,10 +289,10 @@ func (p *Pool) get(ctx interface {
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
c := p.idle.back.c
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
c.Close()
pc.c.Close()
p.mu.Lock()
p.active--
}
... ... @@ -298,13 +300,14 @@ func (p *Pool) get(ctx interface {
// Get idle connection from the front of idle list.
for p.idle.front != nil {
ic := p.idle.front
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
if p.TestOnBorrow == nil || p.TestOnBorrow(ic.c, ic.t) == nil {
return ic.c, nil
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
return pc, nil
}
ic.c.Close()
pc.c.Close()
p.mu.Lock()
p.active--
}
... ... @@ -333,24 +336,25 @@ func (p *Pool) get(ctx interface {
}
p.mu.Unlock()
}
return c, err
return &poolConn{c: c, created: nowFunc()}, err
}
func (p *Pool) put(c Conn, forceClose bool) error {
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
p.idle.pushFront(&idleConn{t: nowFunc(), c: c})
pc.t = nowFunc()
p.idle.pushFront(pc)
if p.idle.count > p.MaxIdle {
c = p.idle.back.c
pc = p.idle.back
p.idle.popBack()
} else {
c = nil
pc = nil
}
}
if c != nil {
if pc != nil {
p.mu.Unlock()
c.Close()
pc.c.Close()
p.mu.Lock()
p.active--
}
... ... @@ -362,9 +366,9 @@ func (p *Pool) put(c Conn, forceClose bool) error {
return nil
}
type pooledConnection struct {
type activeConn struct {
p *Pool
c Conn
pc *poolConn
state int
}
... ... @@ -385,79 +389,107 @@ func initSentinel() {
}
}
func (pc *pooledConnection) Close() error {
c := pc.c
if _, ok := c.(errorConnection); ok {
func (ac *activeConn) Close() error {
pc := ac.pc
if pc == nil {
return nil
}
pc.c = errorConnection{errConnClosed}
if pc.state&internal.MultiState != 0 {
c.Send("DISCARD")
pc.state &^= (internal.MultiState | internal.WatchState)
} else if pc.state&internal.WatchState != 0 {
c.Send("UNWATCH")
pc.state &^= internal.WatchState
ac.pc = nil
if ac.state&connectionMultiState != 0 {
pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {
pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if pc.state&internal.SubscribeState != 0 {
c.Send("UNSUBSCRIBE")
c.Send("PUNSUBSCRIBE")
if ac.state&connectionSubscribeState != 0 {
pc.c.Send("UNSUBSCRIBE")
pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
c.Send("ECHO", sentinel)
c.Flush()
pc.c.Send("ECHO", sentinel)
pc.c.Flush()
for {
p, err := c.Receive()
p, err := pc.c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
pc.state &^= internal.SubscribeState
ac.state &^= connectionSubscribeState
break
}
}
}
c.Do("")
pc.p.put(c, pc.state != 0 || c.Err() != nil)
pc.c.Do("")
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}
func (pc *pooledConnection) Err() error {
func (ac *activeConn) Err() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Err()
}
func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Do(commandName, args...)
}
func (pc *pooledConnection) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
}
ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return cwt.DoWithTimeout(timeout, commandName, args...)
}
func (pc *pooledConnection) Send(commandName string, args ...interface{}) error {
ci := internal.LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
func (ac *activeConn) Send(commandName string, args ...interface{}) error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
ci := lookupCommandInfo(commandName)
ac.state = (ac.state | ci.Set) &^ ci.Clear
return pc.c.Send(commandName, args...)
}
func (pc *pooledConnection) Flush() error {
func (ac *activeConn) Flush() error {
pc := ac.pc
if pc == nil {
return errConnClosed
}
return pc.c.Flush()
}
func (pc *pooledConnection) Receive() (reply interface{}, err error) {
func (ac *activeConn) Receive() (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
return pc.c.Receive()
}
func (pc *pooledConnection) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
pc := ac.pc
if pc == nil {
return nil, errConnClosed
}
cwt, ok := pc.c.(ConnWithTimeout)
if !ok {
return nil, errTimeoutNotSupported
... ... @@ -465,63 +497,64 @@ func (pc *pooledConnection) ReceiveWithTimeout(timeout time.Duration) (reply int
return cwt.ReceiveWithTimeout(timeout)
}
type errorConnection struct{ err error }
type errorConn struct{ err error }
func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConnection) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
return nil, ec.err
}
func (ec errorConnection) Send(string, ...interface{}) error { return ec.err }
func (ec errorConnection) Err() error { return ec.err }
func (ec errorConnection) Close() error { return nil }
func (ec errorConnection) Flush() error { return ec.err }
func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConnection) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
func (ec errorConn) Err() error { return ec.err }
func (ec errorConn) Close() error { return nil }
func (ec errorConn) Flush() error { return ec.err }
func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
type idleList struct {
count int
front, back *idleConn
front, back *poolConn
}
type idleConn struct {
type poolConn struct {
c Conn
t time.Time
next, prev *idleConn
created time.Time
next, prev *poolConn
}
func (l *idleList) pushFront(ic *idleConn) {
ic.next = l.front
ic.prev = nil
func (l *idleList) pushFront(pc *poolConn) {
pc.next = l.front
pc.prev = nil
if l.count == 0 {
l.back = ic
l.back = pc
} else {
l.front.prev = ic
l.front.prev = pc
}
l.front = ic
l.front = pc
l.count++
return
}
func (l *idleList) popFront() {
ic := l.front
pc := l.front
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
ic.next.prev = nil
l.front = ic.next
pc.next.prev = nil
l.front = pc.next
}
ic.next, ic.prev = nil, nil
pc.next, pc.prev = nil, nil
}
func (l *idleList) popBack() {
ic := l.back
pc := l.back
l.count--
if l.count == 0 {
l.front, l.back = nil, nil
} else {
ic.prev.next = nil
l.back = ic.prev
pc.prev.next = nil
l.back = pc.prev
}
ic.next, ic.prev = nil, nil
pc.next, pc.prev = nil, nil
}
... ...
... ... @@ -27,9 +27,9 @@ import "context"
// If the function completes without error, then the application must close the
// returned connection.
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
c, err := p.get(ctx)
pc, err := p.get(ctx)
if err != nil {
return errorConnection{err}, err
return errorConn{err}, err
}
return &pooledConnection{p: p, c: c}, nil
return &activeConn{p: p, pc: pc}, nil
}
... ...
... ... @@ -36,18 +36,9 @@ type Message struct {
// The originating channel.
Channel string
// The message data.
Data []byte
}
// PMessage represents a pmessage notification.
type PMessage struct {
// The matched pattern.
// The matched pattern, if any
Pattern string
// The originating channel.
Channel string
// The message data.
Data []byte
}
... ... @@ -102,9 +93,9 @@ func (c PubSubConn) Ping(data string) error {
return c.Conn.Flush()
}
// Receive returns a pushed message as a Subscription, Message, PMessage, Pong
// or error. The return value is intended to be used directly in a type switch
// as illustrated in the PubSubConn example.
// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
return c.receiveInternal(c.Conn.Receive())
}
... ... @@ -135,11 +126,11 @@ func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interfac
}
return m
case "pmessage":
var pm PMessage
if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil {
var m Message
if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err
}
return pm
return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
... ...
... ... @@ -17,9 +17,6 @@ github.com/davecgh/go-spew/spew
github.com/dgrijalva/jwt-go
# github.com/fsnotify/fsnotify v1.4.7
github.com/fsnotify/fsnotify
# github.com/garyburd/redigo v1.6.0
github.com/garyburd/redigo/internal
github.com/garyburd/redigo/redis
# github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3
github.com/gin-contrib/sse
# github.com/gin-gonic/gin v1.4.0
... ... @@ -29,6 +26,8 @@ github.com/gin-gonic/gin/internal/json
github.com/gin-gonic/gin/render
# github.com/golang/protobuf v1.3.1
github.com/golang/protobuf/proto
# github.com/gomodule/redigo v1.7.0
github.com/gomodule/redigo/redis
# github.com/google/go-cmp v0.2.0
github.com/google/go-cmp/cmp
github.com/google/go-cmp/cmp/internal/diff
... ...