作者 yangfu

ws 链路管理修改

  1 +package websocket
  2 +
  3 +import (
  4 + "encoding/json"
  5 + "fmt"
  6 + "github.com/astaxie/beego"
  7 + "github.com/gorilla/websocket"
  8 + "gitlab.fjmaimaimai.com/mmm-go/gocomm/pkg/mybeego"
  9 + "reflect"
  10 + "sync"
  11 +)
  12 +
  13 +var (
  14 + ErrorNotFound =fmt.Errorf("conn not exist")
  15 +)
  16 +
  17 +var DefaultConnmgrs Connmgrs
  18 +//初始化websocket链路管理器数量,通过uid取模放入不同的管理器
  19 +func InitWebsocketConnmgrs(mgrsSize int){
  20 + connmgrs :=make(map[int]IConnmgr,mgrsSize)
  21 + for i:=0;i<mgrsSize;i++{
  22 + connmgrs[i] = NewMemoryConnmgr()
  23 + }
  24 + DefaultConnmgrs = Connmgrs(connmgrs)
  25 +}
  26 +
  27 +type IConnmgr interface {
  28 + Put(key,value interface{})(bool)
  29 + Remove(key interface{}) error
  30 + Get(key interface{})(value interface{},err error)
  31 +}
  32 +//连接管理器
  33 +type Connmgrs map[int]IConnmgr
  34 +//将连接从指定连接管理器中移除
  35 +func (m Connmgrs)Remove(connmgrId int,key interface{})(err error){
  36 + //删除特定链接管理的连接
  37 + if mgr,ok:= m[connmgrId];ok{
  38 + err =mgr.Remove(key)
  39 + if err!=nil{
  40 + return err
  41 + }
  42 + }
  43 + return
  44 +}
  45 +//将连接装载到指定 连接管理器
  46 +func (m Connmgrs)Put(connmgrId int,key,value interface{})(result bool){
  47 + result =false
  48 + if mgr,ok:= m[connmgrId];ok{
  49 + result =mgr.Put(key,value)
  50 + if !result{
  51 + return
  52 + }
  53 + }
  54 + return
  55 +}
  56 +
  57 +
  58 +type MemoryConnmgr struct {
  59 + mutex sync.RWMutex
  60 + Connections *JMap //conn
  61 + Clients *JMap // key=uid(int64) value(*WebsocketConnection)
  62 + //rooms //房间
  63 +}
  64 +
  65 +func NewMemoryConnmgr()*MemoryConnmgr{
  66 + keyType := reflect.TypeOf(&websocket.Conn{})
  67 + valueType := reflect.TypeOf(&WebsocketConnection{})
  68 + return &MemoryConnmgr{
  69 + Connections:NewJMap(keyType, valueType),
  70 + Clients:NewJMap(reflect.TypeOf("1:1"), valueType),
  71 + }
  72 +}
  73 +
  74 +func(m *MemoryConnmgr)Put(key,value interface{})(result bool){
  75 + m.mutex.Lock()
  76 + defer m.mutex.Unlock()
  77 + if c,ok :=value.(*WebsocketConnection);ok{
  78 + idKey := fmt.Sprintf("%d:%d", c.Uid, c.AppId)
  79 + return m.Connections.Put(c.Conn,c) && m.Clients.Put(idKey,c)
  80 + }
  81 + return false
  82 +}
  83 +
  84 +func(m *MemoryConnmgr)Get(key interface{})(value interface{},err error){
  85 + var ok bool
  86 + switch reflect.TypeOf(key).Kind() {
  87 + case reflect.String:
  88 + if value,ok = m.Clients.Get(key);!ok{
  89 + err = ErrorNotFound
  90 + return
  91 + }
  92 + case reflect.Struct:
  93 + if value,ok = m.Connections.Get(key);!ok{
  94 + err = ErrorNotFound
  95 + return
  96 + }
  97 + default:
  98 + err = ErrorNotFound
  99 + }
  100 + return
  101 +}
  102 +
  103 +func(m *MemoryConnmgr)Remove(key interface{})(err error){
  104 + m.mutex.Lock()
  105 + defer m.mutex.Unlock()
  106 + if c,ok :=key.(*WebsocketConnection);ok{
  107 + key := fmt.Sprintf("%d:%d", c.Uid, c.AppId)
  108 + m.Connections.Remove(c.Conn)
  109 + m.Clients.Remove(key)
  110 + }
  111 + return
  112 +}
  113 +
  114 +//发送数据
  115 +func SendDataByConnmgr(uid int64, appId int, sendMsg interface{}) bool {
  116 + if sendMsg == nil || uid < 1 || appId < 1 {
  117 + return false
  118 + }
  119 + var mgrId int =int(uid % int64(len(DefaultConnmgrs)))
  120 + connmgr,ok :=(DefaultConnmgrs)[mgrId]
  121 + if !ok{
  122 + return false
  123 + }
  124 + msg := &mybeego.Message{
  125 + Errno: 0,
  126 + Errmsg: mybeego.NewMessage(0).Errmsg,
  127 + Data: sendMsg,
  128 + }
  129 + msgByte, err := json.Marshal(msg)
  130 + if err != nil {
  131 + beego.Error(err)
  132 + return false
  133 + }
  134 + key := fmt.Sprintf("%d:%d", uid, appId)
  135 + if connI,err := connmgr.Get(key); err==nil {
  136 + if conn, ok := connI.(*WebsocketConnection); ok {
  137 + conn.Send(string(msgByte))
  138 + return true
  139 + }
  140 + }
  141 + return false
  142 +}
  1 +package websocket
  2 +
  3 +import (
  4 + "github.com/gorilla/websocket"
  5 + "testing"
  6 +)
  7 +
  8 +func TestNewMemoryConnmgr(t *testing.T){
  9 + connmgr :=NewMemoryConnmgr()
  10 + num:=100
  11 + var listConn []*WebsocketConnection
  12 + for i:=0;i<num;i++{
  13 + listConn = append(listConn,&WebsocketConnection{
  14 + Uid:int64(i),
  15 + AppId:1,
  16 + Conn:&websocket.Conn{},
  17 + })
  18 + }
  19 + for i:=range listConn{
  20 + connmgr.Put(listConn[i],listConn[i])
  21 + }
  22 + if connmgr.Clients.Size() !=num && connmgr.Connections.Size()!=num{
  23 + t.Fatal("size error :",connmgr.Clients.Size(),connmgr.Connections.Size())
  24 + }
  25 + for i:=range listConn{
  26 + connmgr.Remove(listConn[i])
  27 + }
  28 + if connmgr.Clients.Size() !=0 && connmgr.Connections.Size()!=0{
  29 + t.Fatal("size error :",connmgr.Clients.Size(),connmgr.Connections.Size())
  30 + }
  31 +}
  32 +
  33 +func BenchmarkNewMemoryConnmgr(b *testing.B) {
  34 + connmgr :=NewMemoryConnmgr()
  35 + var listConn []*WebsocketConnection
  36 + for i:=0;i<b.N;i++{
  37 + listConn = append(listConn,&WebsocketConnection{
  38 + Uid:int64(i),
  39 + AppId:1,
  40 + Conn:&websocket.Conn{},
  41 + })
  42 + }
  43 + b.ResetTimer()
  44 + for i:=0;i<b.N;i++{
  45 + connmgr.Put(listConn[i],listConn[i])
  46 + }
  47 + for i:=0;i<b.N;i++{
  48 + connmgr.Remove(listConn[i])
  49 + }
  50 + if connmgr.Clients.Size() !=0 && connmgr.Connections.Size()!=0{
  51 + b.Fatal("size error :",connmgr.Clients.Size(),connmgr.Connections.Size())
  52 + }
  53 +}
@@ -17,12 +17,12 @@ const ( @@ -17,12 +17,12 @@ const (
17 Connected 17 Connected
18 ) 18 )
19 19
20 -func init() {  
21 - keyType := reflect.TypeOf(&websocket.Conn{})  
22 - valueType := reflect.TypeOf(&WebsocketConnection{})  
23 - Connections = NewJMap(keyType, valueType)  
24 - Clients = NewJMap(reflect.TypeOf("1:1"), valueType)  
25 -} 20 +//func init() {
  21 +// keyType := reflect.TypeOf(&websocket.Conn{})
  22 +// valueType := reflect.TypeOf(&WebsocketConnection{})
  23 +// Connections = NewJMap(keyType, valueType)
  24 +// Clients = NewJMap(reflect.TypeOf("1:1"), valueType)
  25 +//}
26 26
27 type ReceiveHandler (func([]byte) *mybeego.Message) 27 type ReceiveHandler (func([]byte) *mybeego.Message)
28 28
@@ -35,6 +35,7 @@ type WebsocketConnection struct { @@ -35,6 +35,7 @@ type WebsocketConnection struct {
35 State ConnState 35 State ConnState
36 OnReceive ReceiveHandler 36 OnReceive ReceiveHandler
37 OnceClose sync.Once 37 OnceClose sync.Once
  38 + Connmgrg *Connmgrs
38 } 39 }
39 40
40 func NewWebsocketConnection(conn *websocket.Conn,head *mybeego.RequestHead,recv ReceiveHandler)*WebsocketConnection{ 41 func NewWebsocketConnection(conn *websocket.Conn,head *mybeego.RequestHead,recv ReceiveHandler)*WebsocketConnection{
@@ -46,14 +47,15 @@ func NewWebsocketConnection(conn *websocket.Conn,head *mybeego.RequestHead,recv @@ -46,14 +47,15 @@ func NewWebsocketConnection(conn *websocket.Conn,head *mybeego.RequestHead,recv
46 Wchan: make(chan string, 10), 47 Wchan: make(chan string, 10),
47 State: Connected, 48 State: Connected,
48 OnReceive: recv, 49 OnReceive: recv,
  50 + Connmgrg:&DefaultConnmgrs,
49 } 51 }
50 } 52 }
51 53
52 //声明了两个cliets 管理 一个通过uid 一个通过conn管理 54 //声明了两个cliets 管理 一个通过uid 一个通过conn管理
53 // key(*websocket.Conn) value(*WebsocketConnection) 55 // key(*websocket.Conn) value(*WebsocketConnection)
54 -var Connections *JMap 56 +//var Connections *JMap
55 // key=uid(int64) value(*WebsocketConnection) 57 // key=uid(int64) value(*WebsocketConnection)
56 -var Clients *JMap 58 +//var Clients *JMap
57 59
58 type JMap struct { 60 type JMap struct {
59 sync.RWMutex 61 sync.RWMutex
@@ -107,13 +109,12 @@ func (this *JMap) Put(k interface{}, v interface{}) bool { @@ -107,13 +109,12 @@ func (this *JMap) Put(k interface{}, v interface{}) bool {
107 if !this.acceptable(k, v) { 109 if !this.acceptable(k, v) {
108 return false 110 return false
109 } 111 }
110 - if connI, ok := Clients.Get(k); ok { 112 + if connI, ok := this.m[k]; ok {
111 beego.Debug("key:", k, "已经连接,先剔除下线") 113 beego.Debug("key:", k, "已经连接,先剔除下线")
112 if conn, ok := connI.(*WebsocketConnection); ok { 114 if conn, ok := connI.(*WebsocketConnection); ok {
113 //conn.Conn.WriteMessage(websocket.TextMessage, []byte("您的帐号在其它地方登录,您被剔除下线")) 115 //conn.Conn.WriteMessage(websocket.TextMessage, []byte("您的帐号在其它地方登录,您被剔除下线"))
114 conn.Close() 116 conn.Close()
115 } 117 }
116 -  
117 } 118 }
118 this.Lock() 119 this.Lock()
119 this.m[k] = v 120 this.m[k] = v
@@ -152,14 +153,18 @@ func (this *JMap) Contains(k interface{}) bool { @@ -152,14 +153,18 @@ func (this *JMap) Contains(k interface{}) bool {
152 153
153 func (c *WebsocketConnection) Serve() { 154 func (c *WebsocketConnection) Serve() {
154 c.State = Connected 155 c.State = Connected
155 - Connections.Put(c.Conn, c)  
156 - key := fmt.Sprintf("%d:%d", c.Uid, c.AppId)  
157 - Clients.Put(key, c)  
158 - 156 + //Connections.Put(c.Conn, c)
  157 + //key := fmt.Sprintf("%d:%d", c.Uid, c.AppId)
  158 + //Clients.Put(key, c)
  159 + c.Connmgrg.Put(c.GetConnmgrId(c.Uid),c,c)
159 go doWrite(c) 160 go doWrite(c)
160 doRead(c) 161 doRead(c)
161 } 162 }
162 163
  164 +func(c *WebsocketConnection)GetConnmgrId(uid int64)int{
  165 + return (int)(uid % int64(len(*c.Connmgrg)))
  166 +}
  167 +
163 func (c *WebsocketConnection) Send(msg string) { 168 func (c *WebsocketConnection) Send(msg string) {
164 //panic("panic in websocket.send...") 169 //panic("panic in websocket.send...")
165 c.Wchan <- msg 170 c.Wchan <- msg
@@ -167,14 +172,15 @@ func (c *WebsocketConnection) Send(msg string) { @@ -167,14 +172,15 @@ func (c *WebsocketConnection) Send(msg string) {
167 172
168 func (c *WebsocketConnection) Close() { 173 func (c *WebsocketConnection) Close() {
169 c.OnceClose.Do(func(){ 174 c.OnceClose.Do(func(){
170 - beego.Info("ws:close----uid:", c.Uid, "appid:", c.AppId, "state:", c.State) 175 + beego.Info("ws:close----uid:", c.Uid, "appid:", c.AppId, "state:", c.State ,"connmgr-id:",c.GetConnmgrId(c.Uid))
171 if c.State == Disconnected { 176 if c.State == Disconnected {
172 return 177 return
173 } 178 }
174 179
175 - Connections.Remove(c.Conn)  
176 - key := fmt.Sprintf("%d:%d", c.Uid, c.AppId)  
177 - Clients.Remove(key) 180 + //Connections.Remove(c.Conn)
  181 + //key := fmt.Sprintf("%d:%d", c.Uid, c.AppId)
  182 + //Clients.Remove(key)
  183 + c.Connmgrg.Remove(c.GetConnmgrId(c.Uid),c)
178 184
179 c.State = Disconnected 185 c.State = Disconnected
180 close(c.Echan) 186 close(c.Echan)
@@ -236,27 +242,27 @@ func doWrite(c *WebsocketConnection) { @@ -236,27 +242,27 @@ func doWrite(c *WebsocketConnection) {
236 } 242 }
237 } 243 }
238 244
239 -func SendDataByWs(uid int64, appId int, sendMsg interface{}) bool {  
240 - if sendMsg == nil || uid < 1 || appId < 1 {  
241 - return false  
242 - }  
243 - msg := &mybeego.Message{  
244 - Errno: 0,  
245 - Errmsg: mybeego.NewMessage(0).Errmsg,  
246 - Data: sendMsg,  
247 - }  
248 - msgByte, err := json.Marshal(msg)  
249 - if err != nil {  
250 - beego.Error(err)  
251 - return false  
252 - }  
253 - key := fmt.Sprintf("%d:%d", uid, appId)  
254 - if connI, ok := Clients.Get(key); ok {  
255 - beego.Debug(ok)  
256 - if conn, ok := connI.(*WebsocketConnection); ok {  
257 - conn.Send(string(msgByte))  
258 - return true  
259 - }  
260 - }  
261 - return false  
262 -} 245 +//func SendDataByWs(uid int64, appId int, sendMsg interface{}) bool {
  246 +// if sendMsg == nil || uid < 1 || appId < 1 {
  247 +// return false
  248 +// }
  249 +// msg := &mybeego.Message{
  250 +// Errno: 0,
  251 +// Errmsg: mybeego.NewMessage(0).Errmsg,
  252 +// Data: sendMsg,
  253 +// }
  254 +// msgByte, err := json.Marshal(msg)
  255 +// if err != nil {
  256 +// beego.Error(err)
  257 +// return false
  258 +// }
  259 +// key := fmt.Sprintf("%d:%d", uid, appId)
  260 +// if connI, ok := Clients.Get(key); ok {
  261 +// beego.Debug(ok)
  262 +// if conn, ok := connI.(*WebsocketConnection); ok {
  263 +// conn.Send(string(msgByte))
  264 +// return true
  265 +// }
  266 +// }
  267 +// return false
  268 +//}
@@ -2,20 +2,37 @@ package websocket @@ -2,20 +2,37 @@ package websocket
2 2
3 import ( 3 import (
4 "github.com/gorilla/websocket" 4 "github.com/gorilla/websocket"
  5 + "gitlab.fjmaimaimai.com/mmm-go/gocomm/pkg/log"
5 "gitlab.fjmaimaimai.com/mmm-go/gocomm/pkg/mybeego" 6 "gitlab.fjmaimaimai.com/mmm-go/gocomm/pkg/mybeego"
6 "html/template" 7 "html/template"
7 - "log"  
8 "net/http" 8 "net/http"
9 "strconv" 9 "strconv"
10 "testing" 10 "testing"
  11 + "time"
11 ) 12 )
12 13
13 func Test_RunWebSocket(t *testing.T) { 14 func Test_RunWebSocket(t *testing.T) {
  15 + //InitWebsocketConnmgrs(10)
14 //http.HandleFunc("/join",join) 16 //http.HandleFunc("/join",join)
15 //http.HandleFunc("/",home) 17 //http.HandleFunc("/",home)
  18 + //
  19 + //go TimerWork()
16 //log.Fatal(http.ListenAndServe(":8080",nil)) 20 //log.Fatal(http.ListenAndServe(":8080",nil))
17 } 21 }
18 22
  23 +func TimerWork(){
  24 + t :=time.NewTicker(10*time.Second)
  25 + ch :=make(chan int,1)
  26 + for {
  27 + select {
  28 + case <-t.C:
  29 + //log.Info(DefaultConnmgrs[0])
  30 + SendDataByConnmgr(0,0,time.Now())
  31 + }
  32 + }
  33 + <-ch
  34 +}
  35 +
19 var upgrader = websocket.Upgrader{} 36 var upgrader = websocket.Upgrader{}
20 37
21 func join(w http.ResponseWriter, r *http.Request) { 38 func join(w http.ResponseWriter, r *http.Request) {