123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- package diskqueue
- import (
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "path"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
- const (
- // max memroy use equal to BucketByte * (MemBucket + DynamicMemBucket)
- _defaultBucketByte = _blockByte * 2 * 1024 * 16 // 16MB
- _defaultMemBucket = 1
- _defaultDynamicMemBucket = 3
- _filePrefix = "disk_queue_"
- )
- // node status
- const (
- _inmem int8 = iota
- _indisk
- _freed
- )
- var _globalID int64
- // ErrQueueFull .
- var ErrQueueFull = errors.New("error queue is full, can't create new membucket")
- func nextNodeID() int64 {
- return atomic.AddInt64(&_globalID, 1)
- }
- // DiskQueue disk queue
- type DiskQueue interface {
- Push(p []byte) error
- Pop() ([]byte, error)
- Close() error
- }
- // Option Ringbuffer option
- type Option func(opt *option)
- // SetBucketByte bucketbyte
- func SetBucketByte(n int) Option {
- return func(opt *option) {
- opt.bucketByte = (int32(n) / _blockByte) * _blockByte
- }
- }
- // SetMemBucket set the number of mem bucket
- func SetMemBucket(n int) Option {
- return func(opt *option) {
- opt.memBucket = int32(n)
- }
- }
- // SetDynamicMemBucket set the number of dynamic mem bucket
- func SetDynamicMemBucket(n int) Option {
- return func(opt *option) {
- opt.dynamicMemBucket = int32(n)
- }
- }
- // SetMaxBucket set the number of max bucket 0 represent unlimit
- func SetMaxBucket(n int) Option {
- return func(opt *option) {
- opt.maxBucket = int32(n)
- }
- }
- type option struct {
- bucketByte int32
- memBucket int32
- maxBucket int32
- dynamicMemBucket int32
- fpath string
- }
- func (o option) validate() error {
- if o.bucketByte <= 0 {
- return fmt.Errorf("bucket byte must > 0")
- }
- if o.memBucket <= 0 {
- return fmt.Errorf("mem bucket must > 0")
- }
- if o.dynamicMemBucket <= 0 {
- return fmt.Errorf("dynamic mem bucket must > 0")
- }
- return nil
- }
- var _defaultOpt = option{
- bucketByte: _defaultBucketByte,
- memBucket: _defaultMemBucket,
- dynamicMemBucket: _defaultDynamicMemBucket,
- }
- // New Ringbuffer
- func New(fpath string, options ...Option) (DiskQueue, error) {
- info, err := os.Stat(fpath)
- if err != nil {
- if !os.IsNotExist(err) {
- return nil, fmt.Errorf("stat %s error: %s", fpath, err)
- }
- if err = os.MkdirAll(fpath, 0755); err != nil {
- return nil, fmt.Errorf("fpath %s not exists try create directry error: %s", fpath, err)
- }
- } else if !info.IsDir() {
- return nil, fmt.Errorf("fpath: %s already exists and not a directory", fpath)
- }
- // TODO: check permission
- opt := _defaultOpt
- opt.fpath = fpath
- for _, fn := range options {
- fn(&opt)
- }
- if err = opt.validate(); err != nil {
- return nil, err
- }
- b := &base{
- opt: opt,
- }
- if opt.maxBucket == 0 {
- return &queue{base: b}, b.init()
- }
- return nil, nil
- }
- type node struct {
- id int64
- mx sync.Mutex
- flushing bool
- bucket *memBucket
- next *node
- fpath string
- fbucket *fileBucket
- kind int8
- }
- func (n *node) setFlushing(flushing bool) {
- n.mx.Lock()
- n.flushing = flushing
- n.mx.Unlock()
- }
- func (n *node) pop() ([]byte, error) {
- n.mx.Lock()
- defer n.mx.Unlock()
- if n.bucket != nil {
- return n.bucket.pop()
- }
- var err error
- if n.fbucket == nil {
- if n.fbucket, err = newFileBucket(n.fpath); err != nil {
- return nil, err
- }
- }
- return n.fbucket.pop()
- }
- type base struct {
- opt option
- head *node
- tail *node
- pool *memBucketPool
- length int32
- memBucket int32
- }
- func (b *base) init() error {
- b.pool = newMemBucketPool(b.opt.bucketByte)
- if loaded, err := b.loadFromFile(); err != nil || loaded {
- return err
- }
- current := &node{
- id: nextNodeID(),
- bucket: b.pool.new(),
- }
- b.head = current
- b.tail = current
- return nil
- }
- func (b *base) loadFromFile() (bool, error) {
- infos, err := ioutil.ReadDir(b.opt.fpath)
- if err != nil {
- return false, fmt.Errorf("readdir %s error: %s", b.opt.fpath, err)
- }
- var files []string
- for _, info := range infos {
- if info.IsDir() || !strings.HasPrefix(info.Name(), _filePrefix) {
- continue
- }
- files = append(files, path.Join(b.opt.fpath, info.Name()))
- }
- if len(files) == 0 {
- return false, nil
- }
- nodeID := func(name string) int64 {
- id, err := strconv.ParseInt(path.Base(name)[len(_filePrefix):], 10, 64)
- if err != nil {
- panic(fmt.Errorf("invalid file name: %s error: %s", name, err))
- }
- return id
- }
- sort.Slice(files, func(i int, j int) bool {
- return nodeID(files[i]) < nodeID(files[j])
- })
- _globalID = nodeID(files[len(files)-1])
- current := &node{
- id: nodeID(files[0]),
- fpath: files[0],
- kind: _indisk,
- }
- b.head = current
- for _, file := range files[1:] {
- next := &node{
- id: nodeID(file),
- fpath: file,
- kind: _indisk,
- }
- current.next = next
- current = next
- }
- b.memBucket = 1
- next := &node{
- id: nextNodeID(),
- bucket: b.pool.new(),
- }
- current.next = next
- current = next
- b.tail = current
- return true, nil
- }
- type queue struct {
- *base
- mx sync.Mutex
- closed bool
- lastID int64
- wg sync.WaitGroup
- }
- func (q *queue) Push(p []byte) (err error) {
- if len(p) >= int(q.opt.bucketByte) {
- return fmt.Errorf("data too large")
- }
- if q.closed {
- return fmt.Errorf("queue already closed")
- }
- for {
- err = q.tail.bucket.push(p)
- if err == nil {
- atomic.AddInt32(&q.length, 1)
- return
- }
- if err == errBucketFull {
- if err = q.moveTail(); err != nil {
- return err
- }
- continue
- }
- return
- }
- }
- func (q *queue) moveTail() error {
- bucket := atomic.LoadInt32(&q.memBucket)
- if bucket >= q.opt.memBucket+q.opt.dynamicMemBucket {
- return fmt.Errorf("can't assign memory bucket any more")
- }
- if bucket >= q.opt.maxBucket {
- q.notifyStore()
- }
- // take tail snapshot
- p := q.tail
- // lock queue
- q.mx.Lock()
- defer q.mx.Unlock()
- // tail alreay changed
- if p != q.tail {
- return nil
- }
- atomic.AddInt32(&q.memBucket, 1)
- n := &node{
- id: nextNodeID(),
- bucket: q.pool.new(),
- kind: _inmem,
- }
- // move to new tail
- q.tail.next = n
- q.tail = n
- return nil
- }
- func (q *queue) notifyStore() {
- n := q.head
- for n.next != nil {
- read := q.head
- if n.id > q.lastID && n.kind != _indisk && n != read {
- q.lastID = n.id
- go q.storeNode(n)
- return
- }
- n = n.next
- }
- }
- func (q *queue) Pop() (data []byte, err error) {
- defer func() {
- if err != nil {
- atomic.AddInt32(&q.length, -1)
- }
- }()
- if q.closed {
- return nil, fmt.Errorf("queue already closed")
- }
- data, err = q.head.pop()
- if err != nil {
- if err == io.EOF {
- if err = q.moveHead(); err != nil {
- return nil, err
- }
- return q.head.pop()
- }
- return nil, err
- }
- return data, nil
- }
- func (q *queue) moveHead() error {
- tail := q.tail
- if q.head == tail {
- return io.EOF
- }
- // move head to next
- q.mx.Lock()
- head := q.head
- q.head = q.head.next
- q.mx.Unlock()
- // reset head to new read node
- q.freeNode(head)
- return nil
- }
- func (q *queue) freeNode(n *node) {
- n.mx.Lock()
- defer n.mx.Unlock()
- if n.flushing {
- n.kind = _freed
- return
- }
- if n.bucket != nil {
- q.freeBucket(n.bucket)
- n.bucket = nil
- }
- if n.fbucket != nil {
- n.fbucket.close()
- }
- if n.fpath != "" {
- if err := os.Remove(n.fpath); err != nil {
- //fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: remove file %s error: %s", n.fpath, err)
- }
- }
- }
- func (q *queue) storeNode(n *node) (err error) {
- fpath := storePath(q.opt.fpath, n)
- q.wg.Add(1)
- defer q.wg.Done()
- n.setFlushing(true)
- // if node already free return direct
- if n.bucket == nil {
- return
- }
- // if node be freed just release membucket
- if n.kind == _freed {
- q.freeBucket(n.bucket)
- return
- }
- // store bucket to disk
- if err = store(fpath, n); err != nil {
- fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: store node error: %s", err)
- }
- n.fpath = fpath
- n.setFlushing(false)
- if n.kind == _freed {
- q.freeBucket(n.bucket)
- n.bucket = nil
- if err := os.Remove(fpath); err != nil {
- //fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: remove file %s error: %s", n.fpath, err)
- }
- return
- }
- n.kind = _indisk
- q.mx.Lock()
- if q.head != n {
- q.freeBucket(n.bucket)
- n.bucket = nil
- }
- q.mx.Unlock()
- return
- }
- func (q *queue) freeBucket(bucket *memBucket) {
- q.pool.free(bucket)
- atomic.AddInt32(&q.memBucket, -1)
- }
- func (q *queue) Close() error {
- // set closed
- q.closed = true
- // wait all store goroutines finish
- q.wg.Wait()
- var messages []string
- // store all leave node
- current := q.head
- for current != nil {
- if current.kind == _inmem && current.bucket != nil {
- fpath := storePath(q.opt.fpath, current)
- if err := store(fpath, current); err != nil {
- messages = append(messages, err.Error())
- }
- }
- current = current.next
- }
- if len(messages) == 0 {
- return nil
- }
- return fmt.Errorf("close queue error: %s", strings.Join(messages, "; "))
- }
- func store(fpath string, n *node) (err error) {
- // ignore empty bucket
- if n.bucket.writeAt == n.bucket.readAt {
- return nil
- }
- var fp *os.File
- fp, err = os.OpenFile(fpath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
- if err != nil {
- return fmt.Errorf("open file %s error: %s", fpath, err)
- }
- _, err = n.bucket.dump(fp)
- if err != nil {
- return fmt.Errorf("dump data to file %s error: %s", fpath, err)
- }
- return
- }
- func storePath(base string, n *node) string {
- return path.Join(base, _filePrefix+strconv.FormatInt(n.id, 10))
- }
|