http.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/library/log"
  6. "io/ioutil"
  7. "net/http"
  8. "strconv"
  9. "sync"
  10. "time"
  11. )
  12. type BroadcastService struct {
  13. wg sync.WaitGroup
  14. server *http.Server
  15. proxy *BroadcastProxy
  16. dispatch *CometDispatcher
  17. }
  18. func NewBroadcastService(addr string, proxy *BroadcastProxy, dispatch *CometDispatcher) (*BroadcastService, error) {
  19. service := &BroadcastService{
  20. proxy: proxy,
  21. dispatch: dispatch,
  22. }
  23. service.wg.Add(1)
  24. go func() {
  25. defer service.wg.Done()
  26. service.httpServerProcess(addr)
  27. }()
  28. return service, nil
  29. }
  30. func (service *BroadcastService) Close() {
  31. service.server.Shutdown(context.Background())
  32. }
  33. func (service *BroadcastService) httpServerProcess(addr string) {
  34. mux := http.NewServeMux()
  35. mux.HandleFunc("/", service.Proxy)
  36. mux.HandleFunc("/monitor/ping", service.Ping)
  37. mux.HandleFunc("/dm/x/internal/v1/dispatch", service.Dispatch)
  38. mux.HandleFunc("/dm/x/internal/v1/set_angry_value", service.SetAngryValue)
  39. service.server = &http.Server{Addr: addr, Handler: mux}
  40. service.server.SetKeepAlivesEnabled(true)
  41. if err := service.server.ListenAndServe(); err != nil {
  42. if err != http.ErrServerClosed {
  43. panic(err)
  44. }
  45. }
  46. }
  47. func writeJsonResult(w http.ResponseWriter, r *http.Request, begin time.Time, v interface{}) {
  48. data, err := json.Marshal(v)
  49. if err != nil {
  50. log.Error("[Http] write result json.Marshal:%v error:%v", v, err)
  51. return
  52. }
  53. if _, err := w.Write([]byte(data)); err != nil {
  54. log.Error("[Http] write result socket error:%v", err)
  55. return
  56. }
  57. end := time.Now()
  58. log.Info("request %s, response:%s, time cost:%s", r.RequestURI, data, end.Sub(begin).String())
  59. }
  60. func (service *BroadcastService) Proxy(w http.ResponseWriter, r *http.Request) {
  61. service.proxy.HandleRequest(w, r)
  62. }
  63. func (service *BroadcastService) Ping(w http.ResponseWriter, r *http.Request) {
  64. w.WriteHeader(http.StatusOK)
  65. w.Write([]byte("pong"))
  66. }
  67. func (service *BroadcastService) Dispatch(w http.ResponseWriter, r *http.Request) {
  68. if r.Method != "GET" {
  69. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  70. return
  71. }
  72. var result struct {
  73. Code int `json:"code"`
  74. Data struct {
  75. DanmakuServer []string `json:"dm_server"`
  76. DanmakuHost []string `json:"dm_host"`
  77. } `json:"data"`
  78. }
  79. defer writeJsonResult(w, r, time.Now(), &result)
  80. ip := r.URL.Query().Get("ip")
  81. uid, _ := strconv.ParseInt(r.URL.Query().Get("uid"), 10, 64)
  82. result.Code = 0
  83. result.Data.DanmakuServer, result.Data.DanmakuHost = service.dispatch.Dispatch(ip, uid)
  84. }
  85. func (service *BroadcastService) SetAngryValue(w http.ResponseWriter, r *http.Request) {
  86. if r.Method != "POST" {
  87. w.WriteHeader(http.StatusMethodNotAllowed)
  88. return
  89. }
  90. requestBody, err := ioutil.ReadAll(r.Body)
  91. if err != nil {
  92. w.WriteHeader(http.StatusInternalServerError)
  93. return
  94. }
  95. // 气人值的workaround
  96. //go func() {
  97. responseCollection, errCollection := service.proxy.RequestAllBackend(r.Method, "/dm/1/num/change", requestBody)
  98. for i := range responseCollection {
  99. if errCollection[i] != nil {
  100. log.Error("SetAngryValue server:%d error:%+v", i, errCollection[i])
  101. } else {
  102. log.Info("SetAngryValue server:%d result:%s", i, responseCollection[i])
  103. }
  104. }
  105. //}()
  106. w.WriteHeader(http.StatusOK)
  107. response, _ := json.Marshal(map[string]interface{}{"ret": 1})
  108. w.Write(response)
  109. return
  110. }
  111. func (service *BroadcastService) SetAngryValueV2(w http.ResponseWriter, r *http.Request) {
  112. if r.Method != "POST" {
  113. w.WriteHeader(http.StatusMethodNotAllowed)
  114. return
  115. }
  116. requestBody, err := ioutil.ReadAll(r.Body)
  117. if err != nil {
  118. w.WriteHeader(http.StatusInternalServerError)
  119. return
  120. }
  121. //go func() {
  122. responseCollection, errCollection := service.proxy.RequestAllBackend(r.Method,
  123. "/dm/x/internal/v2/set_angry_value", requestBody)
  124. for i := range responseCollection {
  125. if errCollection[i] != nil {
  126. log.Error("SetAngryValueV2 server:%d error:%+v", i, errCollection[i])
  127. w.WriteHeader(http.StatusServiceUnavailable)
  128. response, _ := json.Marshal(map[string]interface{}{"code": -1, "msg": errCollection[i].Error()})
  129. w.Write(response)
  130. return
  131. }
  132. var result struct {
  133. Code int `json:"code"`
  134. Message string `json:"msg"`
  135. }
  136. if err := json.Unmarshal([]byte(responseCollection[i]), &result); err != nil {
  137. log.Error("SetAngryValueV2 server:%d response:%s", i, responseCollection[i])
  138. w.WriteHeader(http.StatusServiceUnavailable)
  139. response, _ := json.Marshal(map[string]interface{}{"code": -2, "msg": responseCollection[i]})
  140. w.Write(response)
  141. return
  142. }
  143. if result.Code != 0 {
  144. w.WriteHeader(http.StatusOK)
  145. w.Write([]byte(responseCollection[i]))
  146. return
  147. }
  148. }
  149. //}()
  150. w.WriteHeader(http.StatusOK)
  151. response, _ := json.Marshal(map[string]interface{}{"code": 0})
  152. w.Write(response)
  153. return
  154. }