123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package test
- import (
- "context"
- "io"
- "log"
- "os"
- "sync"
- "testing"
- "time"
- "go-common/library/conf/env"
- "go-common/library/naming"
- "go-common/library/net/netutil/breaker"
- "go-common/library/net/rpc/warden"
- "go-common/library/net/rpc/warden/balancer/wrr"
- pb "go-common/library/net/rpc/warden/proto/testproto"
- "go-common/library/net/rpc/warden/resolver"
- xtime "go-common/library/time"
- "google.golang.org/grpc"
- )
- type testBuilder struct {
- addrs []*naming.Instance
- }
- type testDiscovery struct {
- mu sync.Mutex
- b *testBuilder
- id string
- ch chan struct{}
- }
- func (b *testBuilder) Build(id string) naming.Resolver {
- return &testDiscovery{id: id, b: b}
- }
- func (b *testBuilder) Scheme() string {
- return "testbuilder"
- }
- func (d *testDiscovery) Fetch(ctx context.Context) (map[string][]*naming.Instance, bool) {
- d.mu.Lock()
- addrs := d.b.addrs
- d.mu.Unlock()
- if len(addrs) == 0 {
- return nil, false
- }
- return map[string][]*naming.Instance{env.Zone: addrs}, true
- }
- func (d *testDiscovery) Watch() <-chan struct{} {
- d.mu.Lock()
- defer d.mu.Unlock()
- if d.ch == nil {
- d.ch = make(chan struct{}, 1)
- }
- return d.ch
- }
- func (d *testDiscovery) Close() error {
- return nil
- }
- func (d *testDiscovery) Scheme() string {
- return "discovery"
- }
- func (d *testDiscovery) set(addrs []*naming.Instance) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.b.addrs = addrs
- select {
- case d.ch <- struct{}{}:
- default:
- return
- }
- }
- func TestMain(m *testing.M) {
- s1 := runServer(":18080")
- s2 := runServer(":18081")
- s3 := runServer(":18082")
- b = &testBuilder{}
- resolver.Register(b)
- dis = b.Build("test_app").(*testDiscovery)
- go func() {
- time.Sleep(time.Millisecond * 10)
- dis.set([]*naming.Instance{{
- Addrs: []string{"grpc://127.0.0.1:18080"},
- AppID: "test_app",
- Metadata: map[string]string{"weight": "100"},
- }, {
- Addrs: []string{"grpc://127.0.0.1:18081"},
- AppID: "test_app",
- Metadata: map[string]string{"color": "red"},
- }, {
- Addrs: []string{"grpc://127.0.0.1:18082"},
- AppID: "test_app",
- }})
- }()
- c = newClient()
- time.Sleep(time.Millisecond * 30)
- ret := m.Run()
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
- defer cancel()
- s1.Shutdown(ctx)
- s2.Shutdown(ctx)
- s3.Shutdown(ctx)
- os.Exit(ret)
- }
- type helloServer struct {
- addr string
- }
- func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
- return &pb.HelloReply{Message: s.addr}, nil
- }
- func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error {
- for i := 0; i < 3; i++ {
- in, err := ss.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- ret := &pb.HelloReply{Message: "Hello " + in.Name, Success: true}
- err = ss.Send(ret)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func runServer(addr string) *warden.Server {
- server := warden.NewServer(&warden.ServerConfig{Timeout: xtime.Duration(time.Second)})
- pb.RegisterGreeterServer(server.Server(), &helloServer{addr: addr})
- go func() {
- err := server.Run(addr)
- if err != nil {
- panic("run server failed!" + err.Error())
- }
- }()
- return server
- }
- // NewClient returns a new blank Client instance with a default client interceptor.
- // opt can be used to add grpc dial options.
- func newClient() (client pb.GreeterClient) {
- c := warden.NewClient(&warden.ClientConfig{
- Dial: xtime.Duration(time.Second * 10),
- Timeout: xtime.Duration(time.Second * 10),
- Breaker: &breaker.Config{
- Window: xtime.Duration(3 * time.Second),
- Sleep: xtime.Duration(3 * time.Second),
- Bucket: 10,
- Ratio: 0.3,
- Request: 20,
- },
- },
- grpc.WithBalancerName(wrr.Name),
- )
- conn, err := c.Dial(context.Background(), "discovery://authority/111")
- if err != nil {
- log.Fatalf("can't not connect: %v", err)
- }
- client = pb.NewGreeterClient(conn)
- return
- }
- var b *testBuilder
- var dis *testDiscovery
- var c pb.GreeterClient
- func TestBalancer(t *testing.T) {
- testBalancerBasic(t)
- testBalancerFailover(t)
- testBalancerUpdateColor(t)
- testBalancerUpdateScore(t)
- }
- func testBalancerBasic(t *testing.T) {
- time.Sleep(time.Millisecond * 10)
- var idx8080 int
- var idx8082 int
- for i := 0; i < 6; i++ {
- resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
- if err != nil {
- t.Fatalf("testBalancerBasic: say hello failed!err:=%v", err)
- }
- if resp.Message == ":18082" {
- idx8082++
- } else if resp.Message == ":18080" {
- idx8080++
- }
- }
- if idx8080 != 3 {
- t.Fatalf("testBalancerBasic: server 18080 response times should be 3")
- }
- if idx8082 != 3 {
- t.Fatalf("testBalancerBasic: server 18082 response times should be 3")
- }
- }
- func testBalancerFailover(t *testing.T) {
- dis.set([]*naming.Instance{{
- Addrs: []string{"grpc://127.0.0.1:18080"},
- AppID: "test_app",
- Metadata: map[string]string{"weight": "100"},
- }, {
- Addrs: []string{"grpc://127.0.0.1:18081"},
- AppID: "test_app",
- Metadata: map[string]string{"color": "red"},
- }})
- time.Sleep(time.Millisecond * 20)
- var idx8080 int
- var idx8082 int
- for i := 0; i < 4; i++ {
- resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
- if err != nil {
- t.Fatalf("testBalancerFailover: say hello failed!err:=%v", err)
- }
- if resp.Message == ":18082" {
- idx8082++
- } else if resp.Message == ":18080" {
- idx8080++
- }
- }
- if idx8080 != 4 {
- t.Fatalf("testBalancerFailover: server 8080 response should be 4")
- }
- }
- func testBalancerUpdateColor(t *testing.T) {
- dis.set([]*naming.Instance{{
- Addrs: []string{"grpc://127.0.0.1:18080"},
- AppID: "test_app",
- Metadata: map[string]string{"weight": "100"},
- }, {
- Addrs: []string{"grpc://127.0.0.1:18081"},
- AppID: "test_app",
- }})
- time.Sleep(time.Millisecond * 30)
- var idx8080 int
- var idx8081 int
- for i := 0; i < 4; i++ {
- resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
- if err != nil {
- t.Fatalf("testBalancerUpdateColor: say hello failed!err:=%v", err)
- }
- if resp.Message == ":18081" {
- idx8081++
- } else if resp.Message == ":18080" {
- idx8080++
- }
- }
- if idx8080 != 2 {
- t.Fatalf("testBalancerUpdateColor: server 8080 response should be 2")
- }
- if idx8081 != 2 {
- t.Fatalf("testBalancerUpdateColor: server 8081 response should be 2")
- }
- }
- func testBalancerUpdateScore(t *testing.T) {
- dis.set([]*naming.Instance{{
- Addrs: []string{"grpc://127.0.0.1:18080"},
- AppID: "test_app",
- Metadata: map[string]string{"weight": "100"},
- }, {
- Addrs: []string{"grpc://127.0.0.1:18081"},
- AppID: "test_app",
- Metadata: map[string]string{"weight": "300"},
- }})
- time.Sleep(time.Millisecond * 10)
- var idx8080 int
- var idx8081 int
- for i := 0; i < 4; i++ {
- resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
- if err != nil {
- t.Fatalf("testBalancerUpdateScore: say hello failed!err:=%v", err)
- }
- if resp.Message == ":18081" {
- idx8081++
- } else if resp.Message == ":18080" {
- idx8080++
- }
- }
- if idx8080 != 1 {
- t.Fatalf("testBalancerUpdateScore: server 8080 response should be 2")
- }
- if idx8081 != 3 {
- t.Fatalf("testBalancerUpdateScore: server 8081 response should be 2")
- }
- }
|