package dao import ( "context" "encoding/json" "fmt" "sync" "testing" "time" "go-common/app/infra/discovery/model" "go-common/library/ecode" . "github.com/smartystreets/goconvey/convey" ) var reg = &model.ArgRegister{Appid: "main.arch.test", Hostname: "reg", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1} var regH1 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "regH1", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1} var reg2 = &model.ArgRegister{Appid: "main.arch.test2", Hostname: "reg2", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1} var arg = &model.ArgRenew{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "reg"} var cancel = &model.ArgCancel{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "reg"} var cancel2 = &model.ArgCancel{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "regH1"} func TestReigster(t *testing.T) { i := model.NewInstance(reg) register(t, i) } func TestDiscovery(t *testing.T) { i1 := model.NewInstance(reg) i2 := model.NewInstance(regH1) fmt.Println(_evictThreshold) r := register(t, i1, i2) Convey("test discovery", t, func() { pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "test"} fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} info, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldBeNil) So(len(info.Instances), ShouldEqual, 2) ch, _, err := r.Polls(pollArg) So(err, ShouldBeNil) apps := <-ch So(len(apps["main.arch.test"].Instances), ShouldEqual, 2) pollArg.LatestTimestamp[0] = apps["main.arch.test"].LatestTimestamp fmt.Println(apps["main.arch.test"]) r.Cancel(cancel) ch, _, err = r.Polls(pollArg) So(err, ShouldBeNil) apps = <-ch So(len(apps["main.arch.test"].Instances), ShouldEqual, 1) pollArg.LatestTimestamp[0] = apps["main.arch.test"].LatestTimestamp r.Cancel(cancel2) }) } func TestRenew(t *testing.T) { src := model.NewInstance(reg) r := register(t, src) Convey("test renew", t, func() { i, ok := r.Renew(arg) So(ok, ShouldBeTrue) So(i, ShouldResemble, src) }) } func BenchmarkRenew(b *testing.B) { var ( i *model.Instance ok bool ) b.RunParallel(func(pb *testing.PB) { for pb.Next() { r, src := benchRegister(b) if i, ok = r.Renew(arg); !ok { b.Errorf("Renew(%v)", src.Appid) } benchCompareInstance(b, src, i) } }) } func TestCancel(t *testing.T) { src := model.NewInstance(reg) r := register(t, src) Convey("test cancel", t, func() { i, ok := r.Cancel(cancel) So(ok, ShouldBeTrue) So(i, ShouldResemble, src) fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} _, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldResemble, ecode.NothingFound) }) } func BenchmarkCancel(b *testing.B) { var ( i *model.Instance ok bool err error ) b.RunParallel(func(pb *testing.PB) { for pb.Next() { r, src := benchRegister(b) if i, ok = r.Cancel(cancel); !ok { b.Errorf("Cancel(%v) error", src.Appid) } benchCompareInstance(b, src, i) fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} if _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != ecode.NothingFound { b.Errorf("Fetch(%v) error(%v)", src.Appid, err) } } }) } func TestFetchAll(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) Convey("test fetch all", t, func() { am := r.FetchAll() So(len(am), ShouldResemble, 1) }) } func BenchmarkFetchAll(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { r, _ := benchRegister(b) if am := r.FetchAll(); len(am) != 1 { b.Errorf("FetchAll() error") } } }) } func TestFetch(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) Convey("test fetch", t, func() { fetchArg2 := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1} c, err := r.Fetch(fetchArg2.Zone, fetchArg2.Env, fetchArg2.Appid, 0, fetchArg2.Status) So(err, ShouldBeNil) So(len(c.Instances), ShouldResemble, 1) }) } func BenchmarkFetch(b *testing.B) { var ( err error c *model.InstanceInfo ) b.RunParallel(func(pb *testing.PB) { for pb.Next() { r, _ := benchRegister(b) fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1} if c, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != nil { b.Errorf("Fetch(%v) error(%v)", arg.Appid, err) } fetchArg2 := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 2} if c, err = r.Fetch(fetchArg2.Zone, fetchArg2.Env, fetchArg2.Appid, 0, fetchArg2.Status); err != nil { b.Errorf("Fetch(%v) error(%v)", arg.Appid, err) } _ = c } }) } func TestPoll(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) Convey("test poll", t, func() { pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "csq"} ch, _, err := r.Polls(pollArg) So(err, ShouldBeNil) c := <-ch So(len(c[pollArg.Appid[0]].Instances), ShouldEqual, 1) }) } func TestPolls(t *testing.T) { i1 := model.NewInstance(reg) i2 := model.NewInstance(reg2) r := register(t, i1, i2) Convey("test polls", t, func() { pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{0, 0}, Appid: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"} ch, new, err := r.Polls(pollArg) So(err, ShouldBeNil) So(new, ShouldBeTrue) c := <-ch So(len(c), ShouldResemble, 2) }) } func TestPollsParallel(t *testing.T) { i1 := model.NewInstance(reg) i2 := model.NewInstance(reg2) r := register(t, i1, i2) Convey("test polls parallel", t, func(c C) { var ( wg sync.WaitGroup ch1, ch2 chan map[string]*model.InstanceInfo new bool err error ) pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{time.Now().UnixNano(), time.Now().UnixNano()}, Appid: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"} ch1, new, err = r.Polls(pollArg) c.So(err, ShouldEqual, ecode.NotModified) c.So(new, ShouldBeFalse) c.So(ch1, ShouldNotBeNil) ch2, new, err = r.Polls(pollArg) c.So(err, ShouldEqual, ecode.NotModified) c.So(new, ShouldBeFalse) c.So(ch2, ShouldNotBeNil) // wait group wg.Add(2) go func() { res := <-ch1 c.So(len(res), ShouldResemble, 1) ress, _ := json.Marshal(res) fmt.Println("chenggongle 1!!!", string(ress)) wg.Done() }() go func() { res := <-ch2 c.So(len(res), ShouldResemble, 1) ress, _ := json.Marshal(res) fmt.Println("chenggongle 2!!!", string(ress)) wg.Done() }() // re register when 1s later, make sure latest_timestamp changed time.Sleep(time.Second) h1 := model.NewInstance(regH1) r.Register(h1, 0) // wait wg.Wait() }) } func BenchmarkPoll(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { var ( err error ch chan map[string]*model.InstanceInfo c map[string]*model.InstanceInfo ) r, _ := benchRegister(b) pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "csq"} if ch, _, err = r.Polls(pollArg); err != nil { b.Errorf("Poll(%v) error(%v)", arg.Appid, err) } if c = <-ch; len(c[pollArg.Appid[0]].Instances) != 1 { b.Errorf("Poll(%v) lenth error", arg.Appid) } } }) } func TestBroadcast(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) Convey("test poll push connection", t, func() { go func() { Convey("must poll ahead of time", t, func() { time.Sleep(time.Microsecond * 5) var arg2 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "go", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1} m2 := model.NewInstance(arg2) err2 := r.Register(m2, 0) So(err2, ShouldBeNil) }) }() pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}} ch, _, err := r.Polls(pollArg) So(err, ShouldResemble, ecode.NotModified) c := <-ch So(len(c[pollArg.Appid[0]].Instances), ShouldResemble, 2) So(c[pollArg.Appid[0]].ZoneInstances, ShouldNotBeNil) So(len(c[pollArg.Appid[0]].ZoneInstances["sh0001"]), ShouldResemble, 2) }) } func BenchmarkBroadcast(b *testing.B) { for i := 0; i < b.N; i++ { var ( err error err2 error ch chan map[string]*model.InstanceInfo c map[string]*model.InstanceInfo ) r, _ := benchRegister(b) go func() { time.Sleep(time.Millisecond * 1) var arg2 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "go", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1} m2 := model.NewInstance(arg2) if err2 = r.Register(m2, 0); err2 != nil { b.Errorf("Reigster(%v) error(%v)", m2.Appid, err2) } }() pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}} if ch, _, err = r.Polls(pollArg); err != nil && err != ecode.NotModified { b.Errorf("Poll(%v) error(%v)", pollArg.Appid, err) } c = <-ch if len(c[pollArg.Appid[0]].Instances) != 2 { b.Errorf("Poll(%v) length error", pollArg.Appid) } if c[pollArg.Appid[0]].ZoneInstances == nil { b.Errorf("Poll(%v) zone instances nil error", pollArg.Appid) } if len(c[pollArg.Appid[0]].ZoneInstances["sh0001"]) != 2 { b.Errorf("Poll(%v) zone instances length error", pollArg.Appid) } } } func TestRegistrySet(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) changes := make(map[string]string) changes["reg"] = "1" Convey("test set weight to 1", t, func() { set := &model.ArgSet{ Region: "shsb", Env: "pre", Appid: "main.arch.test", Hostname: []string{"reg"}, Metadata: []string{`{"weight":"1"}`}, } ok := r.Set(context.TODO(), set) So(ok, ShouldBeTrue) fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} c, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldBeNil) So(c.Instances[0].Metadata["weight"], ShouldResemble, "1") }) } func BenchmarkSet(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { var ( c *model.InstanceInfo err error ok bool ) r, _ := benchRegister(b) set := &model.ArgSet{ Region: "shsb", Env: "pre", Appid: "main.arch.account-service", Hostname: []string{"test1"}, Status: []int64{1}, Metadata: []string{`{"weight":"1"}`}, } if ok = r.Set(context.TODO(), set); !ok { b.Errorf("SetWeight(%v) error", arg.Appid) } fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} if c, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != nil { b.Errorf("Fetch(%v) error(%v)", fetchArg.Appid, err) } if c.Instances[0].Metadata["weight"] != "1" { b.Errorf("SetWeight(%v) change error", fetchArg.Appid) } } }) } func TestResetExp(t *testing.T) { i := model.NewInstance(reg) r := register(t, i) Convey("test ResetExp", t, func() { r.resetExp() So(r.gd.expPerMin, ShouldResemble, int64(2)) }) } func benchCompareInstance(b *testing.B, src *model.Instance, i *model.Instance) { if src.Appid != i.Appid || src.Env != i.Env || src.Hostname != i.Hostname || src.Region != i.Region { b.Errorf("instance compare error") } } func register(t *testing.T, is ...*model.Instance) (r *Registry) { Convey("test register", t, func() { r = NewRegistry() var num int for _, i := range is { err := r.Register(i, 0) So(err, ShouldBeNil) if i.Appid == "main.arch.test" { num++ } } fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} instancesInfo, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldBeNil) So(len(instancesInfo.Instances), ShouldResemble, num) }) return r } func benchRegister(b *testing.B) (r *Registry, i *model.Instance) { r = NewRegistry() i = model.NewInstance(reg) if err := r.Register(i, 0); err != nil { b.Errorf("Reigster(%v) error(%v)", i.Appid, err) } return r, i } func TestEvict(t *testing.T) { Convey("test evict for protect", t, func() { r := NewRegistry() m := model.NewInstance(reg) // promise the renewtime of instance is expire m.RenewTimestamp -= 100 err := r.Register(m, 0) So(err, ShouldBeNil) // move up the statistics of heartbeat for evict r.gd.facLastMin = r.gd.facInMin r.evict() fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3} c, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldBeNil) // protect So(len(c.Instances), ShouldResemble, 1) }) } func TestEvict2(t *testing.T) { Convey("test evict for cancel", t, func() { r := NewRegistry() m := model.NewInstance(reg) err := r.Register(m, 0) So(err, ShouldBeNil) _, ok := r.Renew(arg) So(ok, ShouldBeTrue) // promise the renewtime of instance is expire m.RenewTimestamp -= int64(time.Second * 100) r.Register(m, 0) // move up the statistics of heartbeat for evict r.gd.facLastMin = r.gd.facInMin r.evict() fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1} _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status) So(err, ShouldResemble, ecode.NothingFound) }) }