grpc.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "go-common/library/log"
  8. "go-common/library/naming/discovery"
  9. "github.com/gogo/protobuf/jsonpb"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/encoding"
  12. )
  13. // Reply .
  14. type Reply struct {
  15. res []byte
  16. }
  17. // Reference https://jbrandhorst.com/post/grpc-json/
  18. func init() {
  19. encoding.RegisterCodec(JSON{
  20. Marshaler: jsonpb.Marshaler{
  21. EmitDefaults: true,
  22. OrigName: true,
  23. },
  24. })
  25. }
  26. func ipFromDiscovery(appid string) (ip string, err error) {
  27. d := discovery.New(nil)
  28. defer d.Close()
  29. b := d.Build(appid)
  30. defer b.Close()
  31. ch := b.Watch()
  32. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
  33. defer cancel()
  34. select {
  35. case <-ch:
  36. case <-ctx.Done():
  37. err = fmt.Errorf("查找节点超时 请检查appid是否填写正确")
  38. return
  39. }
  40. ins, ok := b.Fetch(context.Background())
  41. if !ok {
  42. err = fmt.Errorf("discovery 拉取:%s 失败", appid)
  43. return
  44. }
  45. for _, vs := range ins {
  46. for _, v := range vs {
  47. for _, addr := range v.Addrs {
  48. if strings.Contains(addr, "grpc://") {
  49. ip = strings.Replace(addr, "grpc://", "", -1)
  50. return
  51. }
  52. }
  53. }
  54. }
  55. err = fmt.Errorf("discovery 找不到服务节点:%s", appid)
  56. return
  57. }
  58. func callGrpc(addr, method string, body []byte) (res []byte, err error) {
  59. opts := []grpc.DialOption{
  60. grpc.WithInsecure(),
  61. grpc.WithDefaultCallOptions(grpc.CallContentSubtype(JSON{}.Name())),
  62. }
  63. var conn *grpc.ClientConn
  64. if !strings.Contains(addr, ":") {
  65. addr, err = ipFromDiscovery(addr)
  66. }
  67. if err != nil {
  68. return
  69. }
  70. conn, err = grpc.Dial(addr, opts...)
  71. if err != nil {
  72. return
  73. }
  74. var reply Reply
  75. log.Info("callrpc method: %s body: %s", method, body)
  76. if err = conn.Invoke(context.Background(), method, body, &reply); err != nil {
  77. return
  78. }
  79. res = reply.res
  80. return
  81. }
  82. // JSON is impl of encoding.Codec
  83. type JSON struct {
  84. jsonpb.Marshaler
  85. jsonpb.Unmarshaler
  86. }
  87. // Name is name of JSON
  88. func (j JSON) Name() string {
  89. return "json"
  90. }
  91. // Marshal is json marshal
  92. func (j JSON) Marshal(v interface{}) (out []byte, err error) {
  93. return v.([]byte), nil
  94. }
  95. // Unmarshal is json unmarshal
  96. func (j JSON) Unmarshal(data []byte, v interface{}) (err error) {
  97. v.(*Reply).res = data
  98. return nil
  99. }