proxy.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "math/rand"
  9. "net/http"
  10. "net/http/httptest"
  11. "net/http/httputil"
  12. "net/url"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "go-common/library/log"
  17. )
  18. type BroadcastProxy struct {
  19. backend []string
  20. probePath string
  21. reverseProxy []*httputil.ReverseProxy
  22. bestClientIndex int32
  23. probeSample int
  24. wg sync.WaitGroup
  25. ctx context.Context
  26. cancel context.CancelFunc
  27. }
  28. func NewBroadcastProxy(backend []string, probePath string, maxIdleConns int, probeSample int) (*BroadcastProxy, error) {
  29. if len(backend) == 0 {
  30. return nil, errors.New("Require at least one backend")
  31. }
  32. proxy := new(BroadcastProxy)
  33. if probeSample > 0 {
  34. proxy.probeSample = probeSample
  35. } else {
  36. proxy.probeSample = 1
  37. }
  38. for _, addr := range backend {
  39. proxy.backend = append(proxy.backend, addr)
  40. proxy.probePath = probePath
  41. p := httputil.NewSingleHostReverseProxy(&url.URL{
  42. Scheme: "http",
  43. Host: addr,
  44. })
  45. p.Transport = &http.Transport{
  46. DisableKeepAlives: false,
  47. MaxIdleConns: maxIdleConns,
  48. MaxIdleConnsPerHost: maxIdleConns,
  49. }
  50. proxy.reverseProxy = append(proxy.reverseProxy, p)
  51. }
  52. proxy.ctx, proxy.cancel = context.WithCancel(context.Background())
  53. proxy.wg.Add(1)
  54. go func() {
  55. defer proxy.wg.Done()
  56. proxy.mainProbeProcess()
  57. }()
  58. return proxy, nil
  59. }
  60. func (proxy *BroadcastProxy) Close() {
  61. proxy.cancel()
  62. proxy.wg.Wait()
  63. }
  64. func (proxy *BroadcastProxy) HandleRequest(w http.ResponseWriter, r *http.Request) {
  65. t1 := time.Now()
  66. i := atomic.LoadInt32(&proxy.bestClientIndex)
  67. proxy.reverseProxy[i].ServeHTTP(w, r)
  68. t2 := time.Now()
  69. log.V(3).Info("proxy process req:%s,backend id:%d, timecost:%s", r.RequestURI, i, t2.Sub(t1).String())
  70. }
  71. func (proxy *BroadcastProxy) RequestAllBackend(method, uri string, requestBody []byte) ([]string, []error) {
  72. responseCollection := make([]string, len(proxy.reverseProxy))
  73. errCollection := make([]error, len(proxy.reverseProxy))
  74. var wg sync.WaitGroup
  75. for i := range proxy.reverseProxy {
  76. wg.Add(1)
  77. go func(index int) {
  78. defer wg.Done()
  79. req, err := http.NewRequest(method, uri, bytes.NewReader(requestBody))
  80. if err != nil {
  81. errCollection[index] = err
  82. return
  83. }
  84. httpRecorder := httptest.NewRecorder()
  85. proxy.reverseProxy[index].ServeHTTP(httpRecorder, req)
  86. resp := httpRecorder.Result()
  87. defer resp.Body.Close()
  88. if resp.StatusCode != http.StatusOK {
  89. errCollection[index] = errors.New(fmt.Sprintf("http response:%s", resp.Status))
  90. return
  91. }
  92. body, err := ioutil.ReadAll(resp.Body)
  93. if err != nil {
  94. errCollection[index] = err
  95. return
  96. }
  97. responseCollection[index] = string(body)
  98. }(i)
  99. }
  100. wg.Wait()
  101. return responseCollection, errCollection
  102. }
  103. func (proxy *BroadcastProxy) mainProbeProcess() {
  104. var wg sync.WaitGroup
  105. interval := make([]time.Duration, len(proxy.backend))
  106. for {
  107. fast := -1
  108. for i, probe := range proxy.reverseProxy {
  109. wg.Add(1)
  110. go func(p *httputil.ReverseProxy, index int) {
  111. defer wg.Done()
  112. interval[index] = proxy.unitProbeProcess(p, index)
  113. }(probe, i)
  114. }
  115. wg.Wait()
  116. for i, v := range interval {
  117. if fast < 0 {
  118. fast = i
  119. } else {
  120. if v < interval[fast] {
  121. fast = i
  122. }
  123. }
  124. }
  125. atomic.StoreInt32(&proxy.bestClientIndex, int32(fast))
  126. log.Info("[probe result]best server id:%d,addr:%s", fast, proxy.backend[fast])
  127. for i, d := range interval {
  128. log.Info("[probe log]server id:%d,addr:%s,avg time cost:%fms", i, proxy.backend[i], 1000*d.Seconds())
  129. }
  130. select {
  131. case <-time.After(time.Second):
  132. case <-proxy.ctx.Done():
  133. return
  134. }
  135. }
  136. }
  137. func (proxy *BroadcastProxy) unitProbeProcess(p *httputil.ReverseProxy, backendIndex int) time.Duration {
  138. var (
  139. wg sync.WaitGroup
  140. duration int64
  141. )
  142. for i := 0; i < proxy.probeSample; i++ {
  143. wg.Add(1)
  144. go func() {
  145. defer wg.Done()
  146. timeout := time.Second
  147. <-time.After(time.Duration(rand.Intn(proxy.probeSample)) * time.Millisecond)
  148. if timeCost, err := proxy.checkMonitorPing(p, backendIndex, timeout); err == nil {
  149. atomic.AddInt64(&duration, timeCost.Nanoseconds())
  150. } else {
  151. atomic.AddInt64(&duration, timeout.Nanoseconds())
  152. }
  153. }()
  154. }
  155. wg.Wait()
  156. return time.Duration(duration/int64(proxy.probeSample)) * time.Nanosecond
  157. }
  158. func (proxy *BroadcastProxy) checkMonitorPing(p *httputil.ReverseProxy, backendIndex int, timeout time.Duration) (time.Duration, error) {
  159. req, err := http.NewRequest("GET", proxy.probePath, nil)
  160. if err != nil {
  161. return timeout, err
  162. }
  163. ctx, cancel := context.WithTimeout(proxy.ctx, timeout)
  164. defer cancel()
  165. req = req.WithContext(ctx)
  166. recorder := httptest.NewRecorder()
  167. beginTime := time.Now()
  168. p.ServeHTTP(recorder, req)
  169. resp := recorder.Result()
  170. defer resp.Body.Close()
  171. if resp.StatusCode != http.StatusOK {
  172. log.Error("probe:server id:%d,addr:%s,send requset error:%s", backendIndex,
  173. proxy.backend[backendIndex], resp.Status)
  174. return timeout, errors.New("http response:" + resp.Status)
  175. }
  176. result, err := ioutil.ReadAll(resp.Body)
  177. if err != nil {
  178. log.Error("probe:server id:%d,addr:%s,read response error:%v", backendIndex,
  179. proxy.backend[backendIndex], err)
  180. return timeout, err
  181. }
  182. if !bytes.Equal(result, []byte("pong")) {
  183. log.Error("probe:server id:%d,addr:%s,not match response:%s", backendIndex,
  184. proxy.backend[backendIndex], result)
  185. return timeout, err
  186. }
  187. endTime := time.Now()
  188. return endTime.Sub(beginTime), nil
  189. }