operation.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "time"
  9. iModel "go-common/app/interface/main/broadcast/model"
  10. pb "go-common/app/service/main/broadcast/api/grpc/v1"
  11. "go-common/app/service/main/broadcast/model"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. "go-common/library/net/http/blademaster/render"
  15. "github.com/gogo/protobuf/proto"
  16. "github.com/gogo/protobuf/types"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/encoding/gzip"
  19. )
  20. const (
  21. _apiConnect = "/x/broadcast/conn/connect"
  22. _apiDisconnect = "/x/broadcast/conn/disconnect"
  23. _apiHeartbeat = "/x/broadcast/conn/heartbeat"
  24. _apiRenewOnline = "/x/broadcast/online/renew"
  25. )
  26. func (s *Server) failedForword(ctx context.Context, url string, req, reply proto.Message) (err error) {
  27. var (
  28. b []byte
  29. httpReq *http.Request
  30. res = new(render.PB)
  31. api = fmt.Sprintf("%s%s?token=%s", s.c.Broadcast.APIHost, url, s.c.Broadcast.APIToken)
  32. )
  33. if b, err = proto.Marshal(req); err != nil {
  34. return
  35. }
  36. if httpReq, err = http.NewRequest("POST", api, bytes.NewBuffer(b)); err != nil {
  37. return
  38. }
  39. if err = s.httpCli.PB(ctx, httpReq, res); err != nil {
  40. return
  41. }
  42. if int(res.Code) != ecode.OK.Code() {
  43. err = ecode.Int(int(res.Code))
  44. return
  45. }
  46. err = types.UnmarshalAny(res.Data, reply)
  47. return
  48. }
  49. // Connect .
  50. func (s *Server) Connect(ctx context.Context, p *model.Proto, cookie string) (mid int64, key, rid, platform string, accepts []int32, err error) {
  51. var (
  52. req = &pb.ConnectReq{
  53. Server: s.serverID,
  54. ServerKey: s.NextKey(),
  55. Cookie: cookie,
  56. Token: p.Body,
  57. }
  58. reply *pb.ConnectReply
  59. )
  60. if !s.c.Broadcast.Failover {
  61. reply, err = s.rpcClient.Connect(ctx, req)
  62. }
  63. if s.c.Broadcast.Failover || err != nil {
  64. reply = new(pb.ConnectReply)
  65. if err = s.failedForword(ctx, _apiConnect, req, reply); err != nil {
  66. return
  67. }
  68. }
  69. return reply.Mid, reply.Key, reply.RoomID, reply.Platform, reply.Accepts, nil
  70. }
  71. // Disconnect .
  72. func (s *Server) Disconnect(ctx context.Context, mid int64, key string) (err error) {
  73. var (
  74. req = &pb.DisconnectReq{
  75. Mid: mid,
  76. Server: s.serverID,
  77. Key: key,
  78. }
  79. reply *pb.DisconnectReply
  80. )
  81. if !s.c.Broadcast.Failover {
  82. reply, err = s.rpcClient.Disconnect(ctx, req)
  83. }
  84. if s.c.Broadcast.Failover || err != nil {
  85. reply = new(pb.DisconnectReply)
  86. if err = s.failedForword(ctx, _apiDisconnect, req, reply); err != nil {
  87. return
  88. }
  89. }
  90. return
  91. }
  92. // Heartbeat .
  93. func (s *Server) Heartbeat(ctx context.Context, mid int64, key string) (err error) {
  94. var (
  95. req = &pb.HeartbeatReq{
  96. Mid: mid,
  97. Server: s.serverID,
  98. Key: key,
  99. }
  100. reply *pb.HeartbeatReply
  101. )
  102. if !s.c.Broadcast.Failover {
  103. reply, err = s.rpcClient.Heartbeat(ctx, req)
  104. }
  105. if s.c.Broadcast.Failover || err != nil {
  106. reply = new(pb.HeartbeatReply)
  107. if err = s.failedForword(ctx, _apiHeartbeat, req, reply); err != nil {
  108. return
  109. }
  110. }
  111. return
  112. }
  113. // RenewOnline .
  114. func (s *Server) RenewOnline(ctx context.Context, serverID string, shard int32, rommCount map[string]int32) (allRoom map[string]int32, err error) {
  115. var (
  116. req = &pb.OnlineReq{
  117. Server: s.serverID,
  118. RoomCount: rommCount,
  119. Sharding: shard,
  120. }
  121. reply *pb.OnlineReply
  122. )
  123. if !s.c.Broadcast.Failover {
  124. for r := 0; r < s.c.Broadcast.OnlineRetries; r++ {
  125. if reply, err = s.rpcClient.RenewOnline(ctx, req, grpc.UseCompressor(gzip.Name)); err != nil {
  126. time.Sleep(s.backoff.Backoff(r))
  127. continue
  128. }
  129. break
  130. }
  131. }
  132. if s.c.Broadcast.Failover || err != nil {
  133. reply = new(pb.OnlineReply)
  134. if err = s.failedForword(ctx, _apiRenewOnline, req, reply); err != nil {
  135. return
  136. }
  137. }
  138. return reply.RoomCount, nil
  139. }
  140. // Report .
  141. func (s *Server) Report(mid int64, proto *model.Proto) (rp *model.Proto, err error) {
  142. var (
  143. reply *pb.ReceiveReply
  144. )
  145. if reply, err = s.rpcClient.Receive(context.Background(), &pb.ReceiveReq{
  146. Mid: mid,
  147. Proto: proto,
  148. }); err != nil {
  149. return
  150. }
  151. return reply.Proto, nil
  152. }
  153. // Operate .
  154. func (s *Server) Operate(p *model.Proto, ch *Channel, b *Bucket) error {
  155. var err error
  156. switch {
  157. case p.Operation >= model.MinBusinessOp && p.Operation <= model.MaxBusinessOp:
  158. _, err = s.Report(ch.Mid, p)
  159. if err != nil {
  160. log.Error("s.Reprot(%d,%v) error(%v)", ch.Mid, p, err)
  161. return nil
  162. }
  163. p.Body = nil
  164. // ignore down message
  165. case p.Operation == model.OpChangeRoom:
  166. p.Operation = model.OpChangeRoomReply
  167. var req iModel.ChangeRoomReq
  168. if err = json.Unmarshal(p.Body, &req); err == nil {
  169. if err = b.ChangeRoom(req.RoomID, ch); err == nil {
  170. p.Body = iModel.Message(map[string]interface{}{"room_id": string(p.Body)}, nil)
  171. }
  172. }
  173. case p.Operation == model.OpRegister:
  174. p.Operation = model.OpRegisterReply
  175. var req iModel.RegisterOpReq
  176. if err = json.Unmarshal(p.Body, &req); err == nil {
  177. if len(req.Operations) > 0 {
  178. ch.Watch(req.Operations...)
  179. p.Body = iModel.Message(map[string]interface{}{"operations": req.Operations}, nil)
  180. } else {
  181. ch.Watch(req.Operation)
  182. p.Body = iModel.Message(map[string]interface{}{"operation": req.Operation}, nil)
  183. }
  184. }
  185. case p.Operation == model.OpUnregister:
  186. p.Operation = model.OpUnregisterReply
  187. var req iModel.UnregisterOpReq
  188. if err = json.Unmarshal(p.Body, &req); err == nil {
  189. if len(req.Operations) > 0 {
  190. ch.UnWatch(req.Operations...)
  191. p.Body = iModel.Message(map[string]interface{}{"operations": req.Operations}, nil)
  192. } else {
  193. ch.UnWatch(req.Operation)
  194. p.Body = iModel.Message(map[string]interface{}{"operation": req.Operation}, nil)
  195. }
  196. }
  197. default:
  198. err = ErrOperation
  199. }
  200. if err != nil {
  201. log.Error("Operate (%+v) failed!err:=%v", p, err)
  202. p.Body = iModel.Message(nil, err)
  203. }
  204. return nil
  205. }