appres.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package push
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. appres "go-common/app/interface/main/app-resource/api/v1"
  8. "go-common/library/log"
  9. "go-common/library/naming/discovery"
  10. "go-common/library/net/rpc/warden"
  11. )
  12. func errlog(step string, err error) {
  13. log.Error("CallPush Step [%s] Err [%v]", step, err)
  14. }
  15. // build grpc client for app-resource
  16. func (d *Dao) grpcClient(addr string) (appres.AppResourceClient, error) {
  17. client := warden.NewClient(d.c.AppresClient)
  18. cc, err := client.Dial(context.Background(), addr)
  19. if err != nil {
  20. return nil, err
  21. }
  22. return appres.NewAppResourceClient(cc), nil
  23. }
  24. // CallRefresh picks ip addrs from discovery, and call grpc method
  25. func (d *Dao) CallRefresh(ctx context.Context) (err error) {
  26. var (
  27. addrs []string
  28. client appres.AppResourceClient
  29. arg = &appres.NoArgRequest{}
  30. succCall int
  31. )
  32. if addrs, err = d.pickAddrs(); err != nil {
  33. errlog("pickAddrs", err)
  34. return
  35. }
  36. // loop grpc call, ignore error
  37. for _, addr := range addrs {
  38. if client, err = d.grpcClient(addr); err != nil {
  39. errlog(fmt.Sprintf("grpcDial Addr [%s]", addr), err)
  40. continue
  41. }
  42. if _, err = client.ModuleUpdateCache(ctx, arg); err != nil {
  43. errlog(fmt.Sprintf("grpcCall Addr [%s] ", addr), err)
  44. continue
  45. }
  46. succCall++
  47. }
  48. if succCall == 0 { // addrs must be greater than 0, if succ call is zero, return error
  49. return fmt.Errorf("CallRefresh Addrs [%d], Zero Succ!", len(addrs))
  50. }
  51. log.Info("CallPush Refresh Succ [%d], Total [%d]", succCall, len(addrs))
  52. err = nil
  53. return
  54. }
  55. // pick all the app-resource grpc instances from discovery
  56. func (d *Dao) pickAddrs() (grpcAddrs []string, err error) {
  57. dis := discovery.New(nil)
  58. defer dis.Close()
  59. b := dis.Build(d.c.Cfg.Grpc.ApiAppID)
  60. defer b.Close()
  61. ch := b.Watch()
  62. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
  63. defer cancel()
  64. select {
  65. case <-ch:
  66. case <-ctx.Done():
  67. err = fmt.Errorf("查找节点超时 请检查appid是否填写正确")
  68. return
  69. }
  70. ins, ok := b.Fetch(context.Background())
  71. if !ok {
  72. err = fmt.Errorf("discovery 拉取失败")
  73. return
  74. }
  75. for _, vs := range ins {
  76. for _, v := range vs {
  77. for _, addr := range v.Addrs {
  78. if strings.Contains(addr, "grpc://") {
  79. ip := strings.Replace(addr, "grpc://", "", -1)
  80. grpcAddrs = append(grpcAddrs, ip)
  81. }
  82. }
  83. }
  84. }
  85. if len(grpcAddrs) == 0 {
  86. err = fmt.Errorf("discovery 找不到服务节点")
  87. }
  88. return
  89. }