wrr.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package wrr
  2. import (
  3. "context"
  4. "math"
  5. "strconv"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "go-common/library/log"
  10. nmd "go-common/library/net/metadata"
  11. wmeta "go-common/library/net/rpc/warden/metadata"
  12. "go-common/library/stat/summary"
  13. "google.golang.org/grpc"
  14. "google.golang.org/grpc/balancer"
  15. "google.golang.org/grpc/balancer/base"
  16. "google.golang.org/grpc/codes"
  17. "google.golang.org/grpc/metadata"
  18. "google.golang.org/grpc/resolver"
  19. "google.golang.org/grpc/status"
  20. )
  21. var _ base.PickerBuilder = &wrrPickerBuilder{}
  22. var _ balancer.Picker = &wrrPicker{}
  23. // var dwrrFeature feature.Feature = "dwrr"
  24. // Name is the name of round_robin balancer.
  25. const Name = "wrr"
  26. // newBuilder creates a new weighted-roundrobin balancer builder.
  27. func newBuilder() balancer.Builder {
  28. return base.NewBalancerBuilder(Name, &wrrPickerBuilder{})
  29. }
  30. func init() {
  31. //feature.DefaultGate.Add(map[feature.Feature]feature.Spec{
  32. // dwrrFeature: {Default: false},
  33. //})
  34. balancer.Register(newBuilder())
  35. }
  36. type serverInfo struct {
  37. cpu int64
  38. success uint64 // float64 bits
  39. }
  40. type subConn struct {
  41. conn balancer.SubConn
  42. addr resolver.Address
  43. meta wmeta.MD
  44. err summary.Summary
  45. lantency summary.Summary
  46. si serverInfo
  47. // effective weight
  48. ewt int64
  49. // current weight
  50. cwt int64
  51. // last score
  52. score float64
  53. }
  54. // statistics is info for log
  55. type statistics struct {
  56. addr string
  57. ewt int64
  58. cs float64
  59. ss float64
  60. lantency float64
  61. cpu float64
  62. req int64
  63. }
  64. // Stats is grpc Interceptor for client to collect server stats
  65. func Stats() grpc.UnaryClientInterceptor {
  66. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
  67. var (
  68. trailer metadata.MD
  69. md nmd.MD
  70. ok bool
  71. )
  72. if md, ok = nmd.FromContext(ctx); !ok {
  73. md = nmd.MD{}
  74. } else {
  75. md = md.Copy()
  76. }
  77. ctx = nmd.NewContext(ctx, md)
  78. opts = append(opts, grpc.Trailer(&trailer))
  79. err = invoker(ctx, method, req, reply, cc, opts...)
  80. conn, ok := md["conn"].(*subConn)
  81. if !ok {
  82. return
  83. }
  84. if strs, ok := trailer[nmd.CPUUsage]; ok {
  85. if cpu, err2 := strconv.ParseInt(strs[0], 10, 64); err2 == nil && cpu > 0 {
  86. atomic.StoreInt64(&conn.si.cpu, cpu)
  87. }
  88. }
  89. var reqs, errs int64
  90. if strs, ok := trailer[nmd.Requests]; ok {
  91. reqs, _ = strconv.ParseInt(strs[0], 10, 64)
  92. }
  93. if strs, ok := trailer[nmd.Errors]; ok {
  94. errs, _ = strconv.ParseInt(strs[0], 10, 64)
  95. }
  96. if reqs > 0 && reqs >= errs {
  97. success := float64(reqs-errs) / float64(reqs)
  98. if success == 0 {
  99. success = 0.1
  100. }
  101. atomic.StoreUint64(&conn.si.success, math.Float64bits(success))
  102. }
  103. return
  104. }
  105. }
  106. type wrrPickerBuilder struct{}
  107. func (*wrrPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
  108. p := &wrrPicker{
  109. colors: make(map[string]*wrrPicker),
  110. }
  111. for addr, sc := range readySCs {
  112. meta, ok := addr.Metadata.(wmeta.MD)
  113. if !ok {
  114. meta = wmeta.MD{
  115. Weight: 10,
  116. }
  117. }
  118. subc := &subConn{
  119. conn: sc,
  120. addr: addr,
  121. meta: meta,
  122. ewt: meta.Weight,
  123. score: -1,
  124. err: summary.New(time.Second, 10),
  125. lantency: summary.New(time.Second, 10),
  126. si: serverInfo{cpu: 500, success: math.Float64bits(1)},
  127. }
  128. if meta.Color == "" {
  129. p.subConns = append(p.subConns, subc)
  130. continue
  131. }
  132. // if color not empty, use color picker
  133. cp, ok := p.colors[meta.Color]
  134. if !ok {
  135. cp = &wrrPicker{}
  136. p.colors[meta.Color] = cp
  137. }
  138. cp.subConns = append(cp.subConns, subc)
  139. }
  140. return p
  141. }
  142. type wrrPicker struct {
  143. // subConns is the snapshot of the weighted-roundrobin balancer when this picker was
  144. // created. The slice is immutable. Each Get() will do a round robin
  145. // selection from it and return the selected SubConn.
  146. subConns []*subConn
  147. colors map[string]*wrrPicker
  148. updateAt int64
  149. mu sync.Mutex
  150. }
  151. func (p *wrrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  152. if color := nmd.String(ctx, nmd.Color); color != "" {
  153. if cp, ok := p.colors[color]; ok {
  154. return cp.pick(ctx, opts)
  155. }
  156. }
  157. return p.pick(ctx, opts)
  158. }
  159. func (p *wrrPicker) pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  160. var (
  161. conn *subConn
  162. totalWeight int64
  163. )
  164. if len(p.subConns) <= 0 {
  165. return nil, nil, balancer.ErrNoSubConnAvailable
  166. }
  167. p.mu.Lock()
  168. // nginx wrr load balancing algorithm: http://blog.csdn.net/zhangskd/article/details/50194069
  169. for _, sc := range p.subConns {
  170. totalWeight += sc.ewt
  171. sc.cwt += sc.ewt
  172. if conn == nil || conn.cwt < sc.cwt {
  173. conn = sc
  174. }
  175. }
  176. conn.cwt -= totalWeight
  177. p.mu.Unlock()
  178. start := time.Now()
  179. if cmd, ok := nmd.FromContext(ctx); ok {
  180. cmd["conn"] = conn
  181. }
  182. //if !feature.DefaultGate.Enabled(dwrrFeature) {
  183. // return conn.conn, nil, nil
  184. //}
  185. return conn.conn, func(di balancer.DoneInfo) {
  186. ev := int64(0) // error value ,if error set 1
  187. if di.Err != nil {
  188. if st, ok := status.FromError(di.Err); ok {
  189. // only counter the local grpc error, ignore any business error
  190. if st.Code() != codes.Unknown && st.Code() != codes.OK {
  191. ev = 1
  192. }
  193. }
  194. }
  195. conn.err.Add(ev)
  196. now := time.Now()
  197. conn.lantency.Add(now.Sub(start).Nanoseconds() / 1e5)
  198. u := atomic.LoadInt64(&p.updateAt)
  199. if now.UnixNano()-u < int64(time.Second) {
  200. return
  201. }
  202. if !atomic.CompareAndSwapInt64(&p.updateAt, u, now.UnixNano()) {
  203. return
  204. }
  205. var (
  206. stats = make([]statistics, len(p.subConns))
  207. count int
  208. total float64
  209. )
  210. for i, conn := range p.subConns {
  211. cpu := float64(atomic.LoadInt64(&conn.si.cpu))
  212. ss := math.Float64frombits(atomic.LoadUint64(&conn.si.success))
  213. errc, req := conn.err.Value()
  214. lagv, lagc := conn.lantency.Value()
  215. if req > 0 && lagc > 0 && lagv > 0 {
  216. // client-side success ratio
  217. cs := 1 - (float64(errc) / float64(req))
  218. if cs <= 0 {
  219. cs = 0.1
  220. } else if cs <= 0.2 && req <= 5 {
  221. cs = 0.2
  222. }
  223. lag := float64(lagv) / float64(lagc)
  224. conn.score = math.Sqrt((cs * ss * ss * 1e9) / (lag * cpu))
  225. stats[i] = statistics{cs: cs, ss: ss, lantency: lag, cpu: cpu, req: req}
  226. }
  227. stats[i].addr = conn.addr.Addr
  228. if conn.score > 0 {
  229. total += conn.score
  230. count++
  231. }
  232. }
  233. // count must be greater than 1,otherwise will lead ewt to 0
  234. if count < 2 {
  235. return
  236. }
  237. avgscore := total / float64(count)
  238. p.mu.Lock()
  239. for i, conn := range p.subConns {
  240. if conn.score <= 0 {
  241. conn.score = avgscore
  242. }
  243. conn.ewt = int64(conn.score * float64(conn.meta.Weight))
  244. stats[i].ewt = conn.ewt
  245. }
  246. p.mu.Unlock()
  247. log.Info("warden wrr(%s): %+v", conn.addr.ServerName, stats)
  248. }, nil
  249. }