base_test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package test
  2. import (
  3. "context"
  4. "io"
  5. "log"
  6. "os"
  7. "sync"
  8. "testing"
  9. "time"
  10. "go-common/library/conf/env"
  11. "go-common/library/naming"
  12. "go-common/library/net/netutil/breaker"
  13. "go-common/library/net/rpc/warden"
  14. "go-common/library/net/rpc/warden/balancer/wrr"
  15. pb "go-common/library/net/rpc/warden/proto/testproto"
  16. "go-common/library/net/rpc/warden/resolver"
  17. xtime "go-common/library/time"
  18. "google.golang.org/grpc"
  19. )
  20. type testBuilder struct {
  21. addrs []*naming.Instance
  22. }
  23. type testDiscovery struct {
  24. mu sync.Mutex
  25. b *testBuilder
  26. id string
  27. ch chan struct{}
  28. }
  29. func (b *testBuilder) Build(id string) naming.Resolver {
  30. return &testDiscovery{id: id, b: b}
  31. }
  32. func (b *testBuilder) Scheme() string {
  33. return "testbuilder"
  34. }
  35. func (d *testDiscovery) Fetch(ctx context.Context) (map[string][]*naming.Instance, bool) {
  36. d.mu.Lock()
  37. addrs := d.b.addrs
  38. d.mu.Unlock()
  39. if len(addrs) == 0 {
  40. return nil, false
  41. }
  42. return map[string][]*naming.Instance{env.Zone: addrs}, true
  43. }
  44. func (d *testDiscovery) Watch() <-chan struct{} {
  45. d.mu.Lock()
  46. defer d.mu.Unlock()
  47. if d.ch == nil {
  48. d.ch = make(chan struct{}, 1)
  49. }
  50. return d.ch
  51. }
  52. func (d *testDiscovery) Close() error {
  53. return nil
  54. }
  55. func (d *testDiscovery) Scheme() string {
  56. return "discovery"
  57. }
  58. func (d *testDiscovery) set(addrs []*naming.Instance) {
  59. d.mu.Lock()
  60. defer d.mu.Unlock()
  61. d.b.addrs = addrs
  62. select {
  63. case d.ch <- struct{}{}:
  64. default:
  65. return
  66. }
  67. }
  68. func TestMain(m *testing.M) {
  69. s1 := runServer(":18080")
  70. s2 := runServer(":18081")
  71. s3 := runServer(":18082")
  72. b = &testBuilder{}
  73. resolver.Register(b)
  74. dis = b.Build("test_app").(*testDiscovery)
  75. go func() {
  76. time.Sleep(time.Millisecond * 10)
  77. dis.set([]*naming.Instance{{
  78. Addrs: []string{"grpc://127.0.0.1:18080"},
  79. AppID: "test_app",
  80. Metadata: map[string]string{"weight": "100"},
  81. }, {
  82. Addrs: []string{"grpc://127.0.0.1:18081"},
  83. AppID: "test_app",
  84. Metadata: map[string]string{"color": "red"},
  85. }, {
  86. Addrs: []string{"grpc://127.0.0.1:18082"},
  87. AppID: "test_app",
  88. }})
  89. }()
  90. c = newClient()
  91. time.Sleep(time.Millisecond * 30)
  92. ret := m.Run()
  93. ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
  94. defer cancel()
  95. s1.Shutdown(ctx)
  96. s2.Shutdown(ctx)
  97. s3.Shutdown(ctx)
  98. os.Exit(ret)
  99. }
  100. type helloServer struct {
  101. addr string
  102. }
  103. func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  104. return &pb.HelloReply{Message: s.addr}, nil
  105. }
  106. func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error {
  107. for i := 0; i < 3; i++ {
  108. in, err := ss.Recv()
  109. if err == io.EOF {
  110. return nil
  111. }
  112. if err != nil {
  113. return err
  114. }
  115. ret := &pb.HelloReply{Message: "Hello " + in.Name, Success: true}
  116. err = ss.Send(ret)
  117. if err != nil {
  118. return err
  119. }
  120. }
  121. return nil
  122. }
  123. func runServer(addr string) *warden.Server {
  124. server := warden.NewServer(&warden.ServerConfig{Timeout: xtime.Duration(time.Second)})
  125. pb.RegisterGreeterServer(server.Server(), &helloServer{addr: addr})
  126. go func() {
  127. err := server.Run(addr)
  128. if err != nil {
  129. panic("run server failed!" + err.Error())
  130. }
  131. }()
  132. return server
  133. }
  134. // NewClient returns a new blank Client instance with a default client interceptor.
  135. // opt can be used to add grpc dial options.
  136. func newClient() (client pb.GreeterClient) {
  137. c := warden.NewClient(&warden.ClientConfig{
  138. Dial: xtime.Duration(time.Second * 10),
  139. Timeout: xtime.Duration(time.Second * 10),
  140. Breaker: &breaker.Config{
  141. Window: xtime.Duration(3 * time.Second),
  142. Sleep: xtime.Duration(3 * time.Second),
  143. Bucket: 10,
  144. Ratio: 0.3,
  145. Request: 20,
  146. },
  147. },
  148. grpc.WithBalancerName(wrr.Name),
  149. )
  150. conn, err := c.Dial(context.Background(), "discovery://authority/111")
  151. if err != nil {
  152. log.Fatalf("can't not connect: %v", err)
  153. }
  154. client = pb.NewGreeterClient(conn)
  155. return
  156. }
  157. var b *testBuilder
  158. var dis *testDiscovery
  159. var c pb.GreeterClient
  160. func TestBalancer(t *testing.T) {
  161. testBalancerBasic(t)
  162. testBalancerFailover(t)
  163. testBalancerUpdateColor(t)
  164. testBalancerUpdateScore(t)
  165. }
  166. func testBalancerBasic(t *testing.T) {
  167. time.Sleep(time.Millisecond * 10)
  168. var idx8080 int
  169. var idx8082 int
  170. for i := 0; i < 6; i++ {
  171. resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
  172. if err != nil {
  173. t.Fatalf("testBalancerBasic: say hello failed!err:=%v", err)
  174. }
  175. if resp.Message == ":18082" {
  176. idx8082++
  177. } else if resp.Message == ":18080" {
  178. idx8080++
  179. }
  180. }
  181. if idx8080 != 3 {
  182. t.Fatalf("testBalancerBasic: server 18080 response times should be 3")
  183. }
  184. if idx8082 != 3 {
  185. t.Fatalf("testBalancerBasic: server 18082 response times should be 3")
  186. }
  187. }
  188. func testBalancerFailover(t *testing.T) {
  189. dis.set([]*naming.Instance{{
  190. Addrs: []string{"grpc://127.0.0.1:18080"},
  191. AppID: "test_app",
  192. Metadata: map[string]string{"weight": "100"},
  193. }, {
  194. Addrs: []string{"grpc://127.0.0.1:18081"},
  195. AppID: "test_app",
  196. Metadata: map[string]string{"color": "red"},
  197. }})
  198. time.Sleep(time.Millisecond * 20)
  199. var idx8080 int
  200. var idx8082 int
  201. for i := 0; i < 4; i++ {
  202. resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
  203. if err != nil {
  204. t.Fatalf("testBalancerFailover: say hello failed!err:=%v", err)
  205. }
  206. if resp.Message == ":18082" {
  207. idx8082++
  208. } else if resp.Message == ":18080" {
  209. idx8080++
  210. }
  211. }
  212. if idx8080 != 4 {
  213. t.Fatalf("testBalancerFailover: server 8080 response should be 4")
  214. }
  215. }
  216. func testBalancerUpdateColor(t *testing.T) {
  217. dis.set([]*naming.Instance{{
  218. Addrs: []string{"grpc://127.0.0.1:18080"},
  219. AppID: "test_app",
  220. Metadata: map[string]string{"weight": "100"},
  221. }, {
  222. Addrs: []string{"grpc://127.0.0.1:18081"},
  223. AppID: "test_app",
  224. }})
  225. time.Sleep(time.Millisecond * 30)
  226. var idx8080 int
  227. var idx8081 int
  228. for i := 0; i < 4; i++ {
  229. resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
  230. if err != nil {
  231. t.Fatalf("testBalancerUpdateColor: say hello failed!err:=%v", err)
  232. }
  233. if resp.Message == ":18081" {
  234. idx8081++
  235. } else if resp.Message == ":18080" {
  236. idx8080++
  237. }
  238. }
  239. if idx8080 != 2 {
  240. t.Fatalf("testBalancerUpdateColor: server 8080 response should be 2")
  241. }
  242. if idx8081 != 2 {
  243. t.Fatalf("testBalancerUpdateColor: server 8081 response should be 2")
  244. }
  245. }
  246. func testBalancerUpdateScore(t *testing.T) {
  247. dis.set([]*naming.Instance{{
  248. Addrs: []string{"grpc://127.0.0.1:18080"},
  249. AppID: "test_app",
  250. Metadata: map[string]string{"weight": "100"},
  251. }, {
  252. Addrs: []string{"grpc://127.0.0.1:18081"},
  253. AppID: "test_app",
  254. Metadata: map[string]string{"weight": "300"},
  255. }})
  256. time.Sleep(time.Millisecond * 10)
  257. var idx8080 int
  258. var idx8081 int
  259. for i := 0; i < 4; i++ {
  260. resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
  261. if err != nil {
  262. t.Fatalf("testBalancerUpdateScore: say hello failed!err:=%v", err)
  263. }
  264. if resp.Message == ":18081" {
  265. idx8081++
  266. } else if resp.Message == ":18080" {
  267. idx8080++
  268. }
  269. }
  270. if idx8080 != 1 {
  271. t.Fatalf("testBalancerUpdateScore: server 8080 response should be 2")
  272. }
  273. if idx8081 != 3 {
  274. t.Fatalf("testBalancerUpdateScore: server 8081 response should be 2")
  275. }
  276. }