diskqueue_test.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package diskqueue
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "io"
  6. mrand "math/rand"
  7. "os"
  8. "os/exec"
  9. "sync"
  10. "testing"
  11. "time"
  12. )
  13. func init() {
  14. mrand.Seed(time.Now().UnixNano())
  15. }
  16. func TestDiskQueuePushPopMem(t *testing.T) {
  17. dirname := "testdata/d1"
  18. defer os.RemoveAll(dirname)
  19. queue, err := New(dirname)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. N := 10
  24. p := []byte("hello world")
  25. for i := 0; i < N; i++ {
  26. if err := queue.Push(p); err != nil {
  27. t.Error(err)
  28. }
  29. }
  30. count := 0
  31. for {
  32. data, err := queue.Pop()
  33. if err == io.EOF {
  34. break
  35. }
  36. if err != nil {
  37. t.Error(err)
  38. }
  39. if !bytes.Equal(data, p) {
  40. t.Errorf("invalid data: %s", data)
  41. }
  42. count++
  43. }
  44. if count != N {
  45. t.Errorf("wrong count %d", count)
  46. }
  47. }
  48. func TestDiskQueueDisk(t *testing.T) {
  49. data := make([]byte, 2233)
  50. rand.Read(data)
  51. count := 1024 * 256
  52. dirname := "testdata/d2"
  53. defer os.RemoveAll(dirname)
  54. t.Run("test write disk", func(t *testing.T) {
  55. queue, err := New(dirname)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. for i := 0; i < count; i++ {
  60. if err := queue.Push(data); err != nil {
  61. time.Sleep(time.Second)
  62. if err := queue.Push(data); err != nil {
  63. t.Error(err)
  64. }
  65. }
  66. }
  67. queue.Close()
  68. })
  69. t.Run("test read disk", func(t *testing.T) {
  70. n := 0
  71. queue, err := New(dirname)
  72. if err != nil {
  73. t.Fatal(err)
  74. }
  75. for {
  76. ret, err := queue.Pop()
  77. if err == io.EOF {
  78. break
  79. }
  80. if !bytes.Equal(data, ret) {
  81. t.Errorf("invalid data unequal")
  82. }
  83. n++
  84. }
  85. if n != count {
  86. t.Errorf("want %d get %d", count, n)
  87. }
  88. })
  89. }
  90. func TestDiskQueueTrans(t *testing.T) {
  91. dirname := "testdata/d3"
  92. defer os.RemoveAll(dirname)
  93. queue, err := New(dirname)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. data := make([]byte, 1890)
  98. rand.Read(data)
  99. cycles := 512
  100. var wg sync.WaitGroup
  101. wg.Add(2)
  102. done := false
  103. writed := 0
  104. readed := 0
  105. go func() {
  106. defer wg.Done()
  107. for i := 0; i < cycles; i++ {
  108. ms := mrand.Intn(40) + 10
  109. time.Sleep(time.Duration(ms) * time.Millisecond)
  110. for i := 0; i < 128; i++ {
  111. if err := queue.Push(data); err != nil {
  112. t.Error(err)
  113. } else {
  114. writed++
  115. }
  116. }
  117. }
  118. done = true
  119. }()
  120. go func() {
  121. defer wg.Done()
  122. for {
  123. ret, err := queue.Pop()
  124. if err == io.EOF && done {
  125. break
  126. }
  127. if err == io.EOF {
  128. ms := mrand.Intn(10)
  129. time.Sleep(time.Duration(ms) * time.Millisecond)
  130. continue
  131. }
  132. if !bytes.Equal(ret, data) {
  133. t.Fatalf("invalid data, data length: %d, want: %d, data: %v, want: %v", len(ret), len(data), ret, data)
  134. }
  135. readed++
  136. }
  137. }()
  138. wg.Wait()
  139. os.RemoveAll(dirname)
  140. if writed != readed {
  141. t.Errorf("readed: %d != writed: %d", readed, writed)
  142. }
  143. }
  144. func TestEmpty(t *testing.T) {
  145. dirname := "testdata/d4"
  146. defer os.RemoveAll(dirname)
  147. queue, err := New(dirname)
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. for i := 0; i < 5; i++ {
  152. _, err := queue.Pop()
  153. if err != io.EOF {
  154. t.Errorf("expect err == io.EOF, get %v", err)
  155. }
  156. }
  157. }
  158. func TestEmptyCache(t *testing.T) {
  159. datadir := "testdata/emptycache"
  160. dirname := "testdata/de"
  161. if err := exec.Command("cp", "-r", datadir, dirname).Run(); err != nil {
  162. t.Error(err)
  163. }
  164. defer os.RemoveAll(dirname)
  165. queue, err := New(dirname)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. for i := 0; i < 5; i++ {
  170. _, err := queue.Pop()
  171. if err != io.EOF {
  172. t.Errorf("expect err == io.EOF, get %v", err)
  173. }
  174. }
  175. }
  176. func BenchmarkDiskQueue(b *testing.B) {
  177. queue, err := New("testdata/d5")
  178. if err != nil {
  179. b.Fatal(err)
  180. }
  181. done := make(chan bool, 1)
  182. go func() {
  183. for {
  184. if _, err := queue.Pop(); err != nil {
  185. if err == io.EOF {
  186. break
  187. }
  188. }
  189. }
  190. done <- true
  191. }()
  192. data := make([]byte, 768)
  193. rand.Read(data)
  194. for i := 0; i < b.N; i++ {
  195. queue.Push(data)
  196. }
  197. <-done
  198. }