package diskqueue import ( "bytes" "crypto/rand" "io" mrand "math/rand" "os" "os/exec" "sync" "testing" "time" ) func init() { mrand.Seed(time.Now().UnixNano()) } func TestDiskQueuePushPopMem(t *testing.T) { dirname := "testdata/d1" defer os.RemoveAll(dirname) queue, err := New(dirname) if err != nil { t.Fatal(err) } N := 10 p := []byte("hello world") for i := 0; i < N; i++ { if err := queue.Push(p); err != nil { t.Error(err) } } count := 0 for { data, err := queue.Pop() if err == io.EOF { break } if err != nil { t.Error(err) } if !bytes.Equal(data, p) { t.Errorf("invalid data: %s", data) } count++ } if count != N { t.Errorf("wrong count %d", count) } } func TestDiskQueueDisk(t *testing.T) { data := make([]byte, 2233) rand.Read(data) count := 1024 * 256 dirname := "testdata/d2" defer os.RemoveAll(dirname) t.Run("test write disk", func(t *testing.T) { queue, err := New(dirname) if err != nil { t.Fatal(err) } for i := 0; i < count; i++ { if err := queue.Push(data); err != nil { time.Sleep(time.Second) if err := queue.Push(data); err != nil { t.Error(err) } } } queue.Close() }) t.Run("test read disk", func(t *testing.T) { n := 0 queue, err := New(dirname) if err != nil { t.Fatal(err) } for { ret, err := queue.Pop() if err == io.EOF { break } if !bytes.Equal(data, ret) { t.Errorf("invalid data unequal") } n++ } if n != count { t.Errorf("want %d get %d", count, n) } }) } func TestDiskQueueTrans(t *testing.T) { dirname := "testdata/d3" defer os.RemoveAll(dirname) queue, err := New(dirname) if err != nil { t.Fatal(err) } data := make([]byte, 1890) rand.Read(data) cycles := 512 var wg sync.WaitGroup wg.Add(2) done := false writed := 0 readed := 0 go func() { defer wg.Done() for i := 0; i < cycles; i++ { ms := mrand.Intn(40) + 10 time.Sleep(time.Duration(ms) * time.Millisecond) for i := 0; i < 128; i++ { if err := queue.Push(data); err != nil { t.Error(err) } else { writed++ } } } done = true }() go func() { defer wg.Done() for { ret, err := queue.Pop() if err == io.EOF && done { break } if err == io.EOF { ms := mrand.Intn(10) time.Sleep(time.Duration(ms) * time.Millisecond) continue } if !bytes.Equal(ret, data) { t.Fatalf("invalid data, data length: %d, want: %d, data: %v, want: %v", len(ret), len(data), ret, data) } readed++ } }() wg.Wait() os.RemoveAll(dirname) if writed != readed { t.Errorf("readed: %d != writed: %d", readed, writed) } } func TestEmpty(t *testing.T) { dirname := "testdata/d4" defer os.RemoveAll(dirname) queue, err := New(dirname) if err != nil { t.Fatal(err) } for i := 0; i < 5; i++ { _, err := queue.Pop() if err != io.EOF { t.Errorf("expect err == io.EOF, get %v", err) } } } func TestEmptyCache(t *testing.T) { datadir := "testdata/emptycache" dirname := "testdata/de" if err := exec.Command("cp", "-r", datadir, dirname).Run(); err != nil { t.Error(err) } defer os.RemoveAll(dirname) queue, err := New(dirname) if err != nil { t.Fatal(err) } for i := 0; i < 5; i++ { _, err := queue.Pop() if err != io.EOF { t.Errorf("expect err == io.EOF, get %v", err) } } } func BenchmarkDiskQueue(b *testing.B) { queue, err := New("testdata/d5") if err != nil { b.Fatal(err) } done := make(chan bool, 1) go func() { for { if _, err := queue.Pop(); err != nil { if err == io.EOF { break } } } done <- true }() data := make([]byte, 768) rand.Read(data) for i := 0; i < b.N; i++ { queue.Push(data) } <-done }