123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package rpc
- import (
- "context"
- "reflect"
- "sync/atomic"
- "go-common/library/log"
- "go-common/library/sync/errgroup"
- )
- const (
- _clientsPool = 3
- )
- // balancer interface.
- type balancer interface {
- Boardcast(context.Context, string, interface{}, interface{}) error
- Call(context.Context, string, interface{}, interface{}) error
- }
- // wrr get avaliable rpc client by wrr strategy.
- type wrr struct {
- pool []*Client
- weight int64
- server int64
- idx int64
- }
- // Boardcast broad cast to all Client.
- // NOTE: reply must be ptr.
- func (r *wrr) Boardcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
- if r.weight == 0 {
- log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx)
- return ErrNoClient
- }
- rtp := reflect.TypeOf(reply).Elem()
- g, ctx := errgroup.WithContext(ctx)
- for i := 0; i < int(r.server); i++ {
- j := i
- g.Go(func() error {
- nrp := reflect.New(rtp).Interface()
- return r.pool[j].Call(ctx, serviceMethod, args, nrp)
- })
- }
- return g.Wait()
- }
- func (r *wrr) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
- if r.weight == 0 {
- log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx)
- return ErrNoClient
- }
- v := atomic.AddInt64(&r.idx, 1)
- for i := int64(0); i < r.server; i++ {
- cli := r.pool[int((v+i)%r.weight)]
- if err = cli.Call(ctx, serviceMethod, args, reply); err != ErrNoClient {
- return
- }
- }
- return ErrNoClient
- }
- type key interface {
- Key() int64
- }
- type sharding struct {
- pool []*Client
- weight int64
- server int64
- idx int64
- }
- // Boardcast broad cast to all clients.
- // NOTE: reply must be ptr.
- func (r *sharding) Boardcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
- if r.weight == 0 {
- log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx)
- return ErrNoClient
- }
- rtp := reflect.TypeOf(reply).Elem()
- g, ctx := errgroup.WithContext(ctx)
- for i := 0; i < int(r.server); i++ {
- j := i
- g.Go(func() error {
- nrp := reflect.New(rtp).Interface()
- return r.pool[j].Call(ctx, serviceMethod, args, nrp)
- })
- }
- return g.Wait()
- }
- func (r *sharding) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
- if r.weight == 0 {
- log.Error("wrr get() error weight:%d server:%d idx:%d", len(r.pool), r.server, r.idx)
- return ErrNoClient
- }
- if k, ok := args.(key); ok {
- if err = r.pool[int(k.Key()%r.server)].Call(ctx, serviceMethod, args, reply); err != ErrNoClient {
- return
- }
- }
- return ErrNoClient
- }
|