client2.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package rpc
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "strconv"
  7. "sync/atomic"
  8. "time"
  9. "go-common/library/conf/env"
  10. "go-common/library/log"
  11. "go-common/library/naming"
  12. "go-common/library/naming/discovery"
  13. "go-common/library/net/netutil/breaker"
  14. xtime "go-common/library/time"
  15. )
  16. const (
  17. scheme = "gorpc"
  18. _policySharding = "sharding"
  19. )
  20. // ClientConfig rpc client config.
  21. type ClientConfig struct {
  22. Policy string
  23. Zone string
  24. Cluster string
  25. Color string
  26. Timeout xtime.Duration
  27. Breaker *breaker.Config
  28. }
  29. // Client2 support for load balancing and service discovery.
  30. type Client2 struct {
  31. c *ClientConfig
  32. appID string
  33. dis naming.Resolver
  34. balancer atomic.Value
  35. }
  36. // NewDiscoveryCli new discovery client.
  37. func NewDiscoveryCli(appID string, cf *ClientConfig) (c *Client2) {
  38. if cf == nil {
  39. cf = &ClientConfig{Timeout: xtime.Duration(300 * time.Millisecond)}
  40. } else if cf.Timeout <= 0 {
  41. cf.Timeout = xtime.Duration(300 * time.Millisecond)
  42. }
  43. c = &Client2{
  44. c: cf,
  45. appID: appID,
  46. dis: discovery.Build(appID),
  47. }
  48. var pools = make(map[string]*Client)
  49. fmt.Printf("开始创建:%s 的gorpc client,等待从discovery拉取节点:%s\n", c.appID, time.Now().Format("2006-01-02 15:04:05"))
  50. event := c.dis.Watch()
  51. select {
  52. case _, ok := <-event:
  53. if ok {
  54. c.disc(pools)
  55. fmt.Printf("结束创建:%s 的gorpc client,从discovery拉取节点和创建成功:%s\n", c.appID, time.Now().Format("2006-01-02 15:04:05"))
  56. } else {
  57. panic("刚启动就从discovery拉到了关闭的event")
  58. }
  59. case <-time.After(10 * time.Second):
  60. fmt.Printf("失败创建:%s 的gorpc client,竟然从discovery拉取节点超时了:%s\n", c.appID, time.Now().Format("2006-01-02 15:04:05"))
  61. if env.DeployEnv == env.DeployEnvProd {
  62. panic("刚启动就从discovery拉节点超时,请检查配置或联系Discovery维护者")
  63. }
  64. }
  65. go c.discproc(event, pools)
  66. return
  67. }
  68. // Boardcast boardcast all rpc client.
  69. func (c *Client2) Boardcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
  70. var (
  71. ok bool
  72. b balancer
  73. )
  74. if b, ok = c.balancer.Load().(balancer); ok {
  75. if err = b.Boardcast(ctx, serviceMethod, args, reply); err != ErrNoClient {
  76. return
  77. }
  78. }
  79. return nil
  80. }
  81. // Call invokes the named function, waits for it to complete, and returns its error status.
  82. // this include rpc.Client.Call method, and takes a timeout.
  83. func (c *Client2) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
  84. var (
  85. ok bool
  86. b balancer
  87. )
  88. if b, ok = c.balancer.Load().(balancer); ok {
  89. if err = b.Call(ctx, serviceMethod, args, reply); err != ErrNoClient {
  90. return
  91. }
  92. }
  93. stats.Incr(serviceMethod, "no_rpc_client")
  94. return ErrNoClient
  95. }
  96. func (c *Client2) removeAndClose(pools, dcs map[string]*Client) {
  97. if len(dcs) == 0 {
  98. return
  99. }
  100. // after rpc timeout(double duration), close no used cliens
  101. if c.c != nil {
  102. to := c.c.Timeout
  103. time.Sleep(2 * time.Duration(to))
  104. }
  105. for key, cli := range dcs {
  106. delete(pools, key)
  107. cli.Close()
  108. }
  109. }
  110. func (c *Client2) discproc(event <-chan struct{}, pools map[string]*Client) {
  111. for {
  112. if _, ok := <-event; ok {
  113. c.disc(pools)
  114. continue
  115. }
  116. return
  117. }
  118. }
  119. func (c *Client2) disc(pools map[string]*Client) (err error) {
  120. var (
  121. weights int64
  122. key string
  123. i, j, idx int
  124. nodes map[string]struct{}
  125. dcs map[string]*Client
  126. blc balancer
  127. cli *Client
  128. cs, wcs []*Client
  129. svr *naming.Instance
  130. )
  131. insMap, ok := c.dis.Fetch(context.Background())
  132. if !ok {
  133. log.Error("discovery fetch instance fail(%s)", c.appID)
  134. return
  135. }
  136. zone := env.Zone
  137. if c.c.Zone != "" {
  138. zone = c.c.Zone
  139. }
  140. tinstance, ok := insMap[zone]
  141. if !ok {
  142. for _, value := range insMap {
  143. tinstance = value
  144. break
  145. }
  146. }
  147. instance := make([]*naming.Instance, 0, len(tinstance))
  148. for _, svr := range tinstance {
  149. nsvr := new(naming.Instance)
  150. *nsvr = *svr
  151. cluster := svr.Metadata[naming.MetaCluster]
  152. if c.c.Cluster != "" && c.c.Cluster != cluster {
  153. continue
  154. }
  155. instance = append(instance, nsvr)
  156. }
  157. log.Info("discovery get %d instances ", len(instance))
  158. if len(instance) > 0 {
  159. nodes = make(map[string]struct{}, len(instance))
  160. cs = make([]*Client, 0, len(instance))
  161. dcs = make(map[string]*Client, len(pools))
  162. svrWeights := make([]int, 0, len(instance))
  163. weights = 0
  164. for _, svr = range instance {
  165. weight, err := strconv.ParseInt(svr.Metadata["weight"], 10, 64)
  166. if err != nil {
  167. weight = 10
  168. }
  169. key = svr.Hostname
  170. nodes[key] = struct{}{}
  171. var addr string
  172. if cli, ok = pools[key]; !ok {
  173. for _, saddr := range svr.Addrs {
  174. u, err := url.Parse(saddr)
  175. if err == nil && u.Scheme == scheme {
  176. addr = u.Host
  177. }
  178. }
  179. if addr == "" {
  180. log.Warn("net/rpc: invalid rpc address(%s,%s,%v) found!", svr.AppID, svr.Hostname, svr.Addrs)
  181. continue
  182. }
  183. cli = Dial(addr, c.c.Timeout, c.c.Breaker)
  184. pools[key] = cli
  185. }
  186. svrWeights = append(svrWeights, int(weight))
  187. weights += weight // calc all weight
  188. log.Info("new cli %+v instance info %+v", addr, svr)
  189. cs = append(cs, cli)
  190. }
  191. // delete old nodes
  192. for key, cli = range pools {
  193. if _, ok = nodes[key]; !ok {
  194. log.Info("syncproc will delete node: %s", key)
  195. dcs[key] = cli
  196. }
  197. }
  198. // new client slice by weights
  199. wcs = make([]*Client, 0, weights)
  200. for i, j = 0, 0; i < int(weights); j++ { // j++ means next svr
  201. idx = j % len(cs)
  202. if svrWeights[idx] > 0 {
  203. i++ // i++ means all weights must fill wrrClis
  204. svrWeights[idx]--
  205. wcs = append(wcs, cs[idx])
  206. }
  207. }
  208. switch c.c.Policy {
  209. case _policySharding:
  210. blc = &sharding{
  211. pool: wcs,
  212. weight: int64(weights),
  213. server: int64(len(instance)),
  214. }
  215. log.Info("discovery syncproc sharding weights:%d size:%d raw:%d", weights, weights, len(instance))
  216. default:
  217. blc = &wrr{
  218. pool: wcs,
  219. weight: int64(weights),
  220. server: int64(len(instance)),
  221. }
  222. log.Info("discovery %s syncproc wrr weights:%d size:%d raw:%d", c.appID, weights, weights, len(instance))
  223. }
  224. c.balancer.Store(blc)
  225. c.removeAndClose(pools, dcs)
  226. }
  227. return
  228. }