registry_test.go 14 KB


  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "testing"
  8. "time"
  9. "go-common/app/infra/discovery/model"
  10. "go-common/library/ecode"
  11. . "github.com/smartystreets/goconvey/convey"
  12. )
  13. var reg = &model.ArgRegister{Appid: "main.arch.test", Hostname: "reg", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1}
  14. var regH1 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "regH1", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1}
  15. var reg2 = &model.ArgRegister{Appid: "main.arch.test2", Hostname: "reg2", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1}
  16. var arg = &model.ArgRenew{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "reg"}
  17. var cancel = &model.ArgCancel{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "reg"}
  18. var cancel2 = &model.ArgCancel{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Hostname: "regH1"}
  19. func TestReigster(t *testing.T) {
  20. i := model.NewInstance(reg)
  21. register(t, i)
  22. }
  23. func TestDiscovery(t *testing.T) {
  24. i1 := model.NewInstance(reg)
  25. i2 := model.NewInstance(regH1)
  26. fmt.Println(_evictThreshold)
  27. r := register(t, i1, i2)
  28. Convey("test discovery", t, func() {
  29. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "test"}
  30. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  31. info, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  32. So(err, ShouldBeNil)
  33. So(len(info.Instances), ShouldEqual, 2)
  34. ch, _, err := r.Polls(pollArg)
  35. So(err, ShouldBeNil)
  36. apps := <-ch
  37. So(len(apps["main.arch.test"].Instances), ShouldEqual, 2)
  38. pollArg.LatestTimestamp[0] = apps["main.arch.test"].LatestTimestamp
  39. fmt.Println(apps["main.arch.test"])
  40. r.Cancel(cancel)
  41. ch, _, err = r.Polls(pollArg)
  42. So(err, ShouldBeNil)
  43. apps = <-ch
  44. So(len(apps["main.arch.test"].Instances), ShouldEqual, 1)
  45. pollArg.LatestTimestamp[0] = apps["main.arch.test"].LatestTimestamp
  46. r.Cancel(cancel2)
  47. })
  48. }
  49. func TestRenew(t *testing.T) {
  50. src := model.NewInstance(reg)
  51. r := register(t, src)
  52. Convey("test renew", t, func() {
  53. i, ok := r.Renew(arg)
  54. So(ok, ShouldBeTrue)
  55. So(i, ShouldResemble, src)
  56. })
  57. }
  58. func BenchmarkRenew(b *testing.B) {
  59. var (
  60. i *model.Instance
  61. ok bool
  62. )
  63. b.RunParallel(func(pb *testing.PB) {
  64. for pb.Next() {
  65. r, src := benchRegister(b)
  66. if i, ok = r.Renew(arg); !ok {
  67. b.Errorf("Renew(%v)", src.Appid)
  68. }
  69. benchCompareInstance(b, src, i)
  70. }
  71. })
  72. }
  73. func TestCancel(t *testing.T) {
  74. src := model.NewInstance(reg)
  75. r := register(t, src)
  76. Convey("test cancel", t, func() {
  77. i, ok := r.Cancel(cancel)
  78. So(ok, ShouldBeTrue)
  79. So(i, ShouldResemble, src)
  80. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  81. _, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  82. So(err, ShouldResemble, ecode.NothingFound)
  83. })
  84. }
  85. func BenchmarkCancel(b *testing.B) {
  86. var (
  87. i *model.Instance
  88. ok bool
  89. err error
  90. )
  91. b.RunParallel(func(pb *testing.PB) {
  92. for pb.Next() {
  93. r, src := benchRegister(b)
  94. if i, ok = r.Cancel(cancel); !ok {
  95. b.Errorf("Cancel(%v) error", src.Appid)
  96. }
  97. benchCompareInstance(b, src, i)
  98. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  99. if _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != ecode.NothingFound {
  100. b.Errorf("Fetch(%v) error(%v)", src.Appid, err)
  101. }
  102. }
  103. })
  104. }
  105. func TestFetchAll(t *testing.T) {
  106. i := model.NewInstance(reg)
  107. r := register(t, i)
  108. Convey("test fetch all", t, func() {
  109. am := r.FetchAll()
  110. So(len(am), ShouldResemble, 1)
  111. })
  112. }
  113. func BenchmarkFetchAll(b *testing.B) {
  114. b.RunParallel(func(pb *testing.PB) {
  115. for pb.Next() {
  116. r, _ := benchRegister(b)
  117. if am := r.FetchAll(); len(am) != 1 {
  118. b.Errorf("FetchAll() error")
  119. }
  120. }
  121. })
  122. }
  123. func TestFetch(t *testing.T) {
  124. i := model.NewInstance(reg)
  125. r := register(t, i)
  126. Convey("test fetch", t, func() {
  127. fetchArg2 := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1}
  128. c, err := r.Fetch(fetchArg2.Zone, fetchArg2.Env, fetchArg2.Appid, 0, fetchArg2.Status)
  129. So(err, ShouldBeNil)
  130. So(len(c.Instances), ShouldResemble, 1)
  131. })
  132. }
  133. func BenchmarkFetch(b *testing.B) {
  134. var (
  135. err error
  136. c *model.InstanceInfo
  137. )
  138. b.RunParallel(func(pb *testing.PB) {
  139. for pb.Next() {
  140. r, _ := benchRegister(b)
  141. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1}
  142. if c, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != nil {
  143. b.Errorf("Fetch(%v) error(%v)", arg.Appid, err)
  144. }
  145. fetchArg2 := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 2}
  146. if c, err = r.Fetch(fetchArg2.Zone, fetchArg2.Env, fetchArg2.Appid, 0, fetchArg2.Status); err != nil {
  147. b.Errorf("Fetch(%v) error(%v)", arg.Appid, err)
  148. }
  149. _ = c
  150. }
  151. })
  152. }
  153. func TestPoll(t *testing.T) {
  154. i := model.NewInstance(reg)
  155. r := register(t, i)
  156. Convey("test poll", t, func() {
  157. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "csq"}
  158. ch, _, err := r.Polls(pollArg)
  159. So(err, ShouldBeNil)
  160. c := <-ch
  161. So(len(c[pollArg.Appid[0]].Instances), ShouldEqual, 1)
  162. })
  163. }
  164. func TestPolls(t *testing.T) {
  165. i1 := model.NewInstance(reg)
  166. i2 := model.NewInstance(reg2)
  167. r := register(t, i1, i2)
  168. Convey("test polls", t, func() {
  169. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{0, 0}, Appid: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"}
  170. ch, new, err := r.Polls(pollArg)
  171. So(err, ShouldBeNil)
  172. So(new, ShouldBeTrue)
  173. c := <-ch
  174. So(len(c), ShouldResemble, 2)
  175. })
  176. }
  177. func TestPollsParallel(t *testing.T) {
  178. i1 := model.NewInstance(reg)
  179. i2 := model.NewInstance(reg2)
  180. r := register(t, i1, i2)
  181. Convey("test polls parallel", t, func(c C) {
  182. var (
  183. wg sync.WaitGroup
  184. ch1, ch2 chan map[string]*model.InstanceInfo
  185. new bool
  186. err error
  187. )
  188. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{time.Now().UnixNano(), time.Now().UnixNano()}, Appid: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"}
  189. ch1, new, err = r.Polls(pollArg)
  190. c.So(err, ShouldEqual, ecode.NotModified)
  191. c.So(new, ShouldBeFalse)
  192. c.So(ch1, ShouldNotBeNil)
  193. ch2, new, err = r.Polls(pollArg)
  194. c.So(err, ShouldEqual, ecode.NotModified)
  195. c.So(new, ShouldBeFalse)
  196. c.So(ch2, ShouldNotBeNil)
  197. // wait group
  198. wg.Add(2)
  199. go func() {
  200. res := <-ch1
  201. c.So(len(res), ShouldResemble, 1)
  202. ress, _ := json.Marshal(res)
  203. fmt.Println("chenggongle 1!!!", string(ress))
  204. wg.Done()
  205. }()
  206. go func() {
  207. res := <-ch2
  208. c.So(len(res), ShouldResemble, 1)
  209. ress, _ := json.Marshal(res)
  210. fmt.Println("chenggongle 2!!!", string(ress))
  211. wg.Done()
  212. }()
  213. // re register when 1s later, make sure latest_timestamp changed
  214. time.Sleep(time.Second)
  215. h1 := model.NewInstance(regH1)
  216. r.Register(h1, 0)
  217. // wait
  218. wg.Wait()
  219. })
  220. }
  221. func BenchmarkPoll(b *testing.B) {
  222. b.RunParallel(func(pb *testing.PB) {
  223. for pb.Next() {
  224. var (
  225. err error
  226. ch chan map[string]*model.InstanceInfo
  227. c map[string]*model.InstanceInfo
  228. )
  229. r, _ := benchRegister(b)
  230. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, Hostname: "csq"}
  231. if ch, _, err = r.Polls(pollArg); err != nil {
  232. b.Errorf("Poll(%v) error(%v)", arg.Appid, err)
  233. }
  234. if c = <-ch; len(c[pollArg.Appid[0]].Instances) != 1 {
  235. b.Errorf("Poll(%v) lenth error", arg.Appid)
  236. }
  237. }
  238. })
  239. }
  240. func TestBroadcast(t *testing.T) {
  241. i := model.NewInstance(reg)
  242. r := register(t, i)
  243. Convey("test poll push connection", t, func() {
  244. go func() {
  245. Convey("must poll ahead of time", t, func() {
  246. time.Sleep(time.Microsecond * 5)
  247. var arg2 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "go", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1}
  248. m2 := model.NewInstance(arg2)
  249. err2 := r.Register(m2, 0)
  250. So(err2, ShouldBeNil)
  251. })
  252. }()
  253. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}}
  254. ch, _, err := r.Polls(pollArg)
  255. So(err, ShouldResemble, ecode.NotModified)
  256. c := <-ch
  257. So(len(c[pollArg.Appid[0]].Instances), ShouldResemble, 2)
  258. So(c[pollArg.Appid[0]].ZoneInstances, ShouldNotBeNil)
  259. So(len(c[pollArg.Appid[0]].ZoneInstances["sh0001"]), ShouldResemble, 2)
  260. })
  261. }
  262. func BenchmarkBroadcast(b *testing.B) {
  263. for i := 0; i < b.N; i++ {
  264. var (
  265. err error
  266. err2 error
  267. ch chan map[string]*model.InstanceInfo
  268. c map[string]*model.InstanceInfo
  269. )
  270. r, _ := benchRegister(b)
  271. go func() {
  272. time.Sleep(time.Millisecond * 1)
  273. var arg2 = &model.ArgRegister{Appid: "main.arch.test", Hostname: "go", RPC: "127.0.0.1:8080", Region: "shsb", Zone: "sh0001", Env: "pre", Status: 1}
  274. m2 := model.NewInstance(arg2)
  275. if err2 = r.Register(m2, 0); err2 != nil {
  276. b.Errorf("Reigster(%v) error(%v)", m2.Appid, err2)
  277. }
  278. }()
  279. pollArg := &model.ArgPolls{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}}
  280. if ch, _, err = r.Polls(pollArg); err != nil && err != ecode.NotModified {
  281. b.Errorf("Poll(%v) error(%v)", pollArg.Appid, err)
  282. }
  283. c = <-ch
  284. if len(c[pollArg.Appid[0]].Instances) != 2 {
  285. b.Errorf("Poll(%v) length error", pollArg.Appid)
  286. }
  287. if c[pollArg.Appid[0]].ZoneInstances == nil {
  288. b.Errorf("Poll(%v) zone instances nil error", pollArg.Appid)
  289. }
  290. if len(c[pollArg.Appid[0]].ZoneInstances["sh0001"]) != 2 {
  291. b.Errorf("Poll(%v) zone instances length error", pollArg.Appid)
  292. }
  293. }
  294. }
  295. func TestRegistrySet(t *testing.T) {
  296. i := model.NewInstance(reg)
  297. r := register(t, i)
  298. changes := make(map[string]string)
  299. changes["reg"] = "1"
  300. Convey("test set weight to 1", t, func() {
  301. set := &model.ArgSet{
  302. Region: "shsb",
  303. Env: "pre",
  304. Appid: "main.arch.test",
  305. Hostname: []string{"reg"},
  306. Metadata: []string{`{"weight":"1"}`},
  307. }
  308. ok := r.Set(context.TODO(), set)
  309. So(ok, ShouldBeTrue)
  310. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  311. c, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  312. So(err, ShouldBeNil)
  313. So(c.Instances[0].Metadata["weight"], ShouldResemble, "1")
  314. })
  315. }
  316. func BenchmarkSet(b *testing.B) {
  317. b.RunParallel(func(pb *testing.PB) {
  318. for pb.Next() {
  319. var (
  320. c *model.InstanceInfo
  321. err error
  322. ok bool
  323. )
  324. r, _ := benchRegister(b)
  325. set := &model.ArgSet{
  326. Region: "shsb",
  327. Env: "pre",
  328. Appid: "main.arch.account-service",
  329. Hostname: []string{"test1"},
  330. Status: []int64{1},
  331. Metadata: []string{`{"weight":"1"}`},
  332. }
  333. if ok = r.Set(context.TODO(), set); !ok {
  334. b.Errorf("SetWeight(%v) error", arg.Appid)
  335. }
  336. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  337. if c, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status); err != nil {
  338. b.Errorf("Fetch(%v) error(%v)", fetchArg.Appid, err)
  339. }
  340. if c.Instances[0].Metadata["weight"] != "1" {
  341. b.Errorf("SetWeight(%v) change error", fetchArg.Appid)
  342. }
  343. }
  344. })
  345. }
  346. func TestResetExp(t *testing.T) {
  347. i := model.NewInstance(reg)
  348. r := register(t, i)
  349. Convey("test ResetExp", t, func() {
  350. r.resetExp()
  351. So(r.gd.expPerMin, ShouldResemble, int64(2))
  352. })
  353. }
  354. func benchCompareInstance(b *testing.B, src *model.Instance, i *model.Instance) {
  355. if src.Appid != i.Appid || src.Env != i.Env || src.Hostname != i.Hostname ||
  356. src.Region != i.Region {
  357. b.Errorf("instance compare error")
  358. }
  359. }
  360. func register(t *testing.T, is ...*model.Instance) (r *Registry) {
  361. Convey("test register", t, func() {
  362. r = NewRegistry()
  363. var num int
  364. for _, i := range is {
  365. err := r.Register(i, 0)
  366. So(err, ShouldBeNil)
  367. if i.Appid == "main.arch.test" {
  368. num++
  369. }
  370. }
  371. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  372. instancesInfo, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  373. So(err, ShouldBeNil)
  374. So(len(instancesInfo.Instances), ShouldResemble, num)
  375. })
  376. return r
  377. }
  378. func benchRegister(b *testing.B) (r *Registry, i *model.Instance) {
  379. r = NewRegistry()
  380. i = model.NewInstance(reg)
  381. if err := r.Register(i, 0); err != nil {
  382. b.Errorf("Reigster(%v) error(%v)", i.Appid, err)
  383. }
  384. return r, i
  385. }
  386. func TestEvict(t *testing.T) {
  387. Convey("test evict for protect", t, func() {
  388. r := NewRegistry()
  389. m := model.NewInstance(reg)
  390. // promise the renewtime of instance is expire
  391. m.RenewTimestamp -= 100
  392. err := r.Register(m, 0)
  393. So(err, ShouldBeNil)
  394. // move up the statistics of heartbeat for evict
  395. r.gd.facLastMin = r.gd.facInMin
  396. r.evict()
  397. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 3}
  398. c, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  399. So(err, ShouldBeNil)
  400. // protect
  401. So(len(c.Instances), ShouldResemble, 1)
  402. })
  403. }
  404. func TestEvict2(t *testing.T) {
  405. Convey("test evict for cancel", t, func() {
  406. r := NewRegistry()
  407. m := model.NewInstance(reg)
  408. err := r.Register(m, 0)
  409. So(err, ShouldBeNil)
  410. _, ok := r.Renew(arg)
  411. So(ok, ShouldBeTrue)
  412. // promise the renewtime of instance is expire
  413. m.RenewTimestamp -= int64(time.Second * 100)
  414. r.Register(m, 0)
  415. // move up the statistics of heartbeat for evict
  416. r.gd.facLastMin = r.gd.facInMin
  417. r.evict()
  418. fetchArg := &model.ArgFetch{Region: "shsb", Zone: "sh0001", Env: "pre", Appid: "main.arch.test", Status: 1}
  419. _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.Appid, 0, fetchArg.Status)
  420. So(err, ShouldResemble, ecode.NothingFound)
  421. })
  422. }