diskqueue.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package diskqueue
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "path"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. )
  15. const (
  16. // max memroy use equal to BucketByte * (MemBucket + DynamicMemBucket)
  17. _defaultBucketByte = _blockByte * 2 * 1024 * 16 // 16MB
  18. _defaultMemBucket = 1
  19. _defaultDynamicMemBucket = 3
  20. _filePrefix = "disk_queue_"
  21. )
  22. // node status
  23. const (
  24. _inmem int8 = iota
  25. _indisk
  26. _freed
  27. )
  28. var _globalID int64
  29. // ErrQueueFull .
  30. var ErrQueueFull = errors.New("error queue is full, can't create new membucket")
  31. func nextNodeID() int64 {
  32. return atomic.AddInt64(&_globalID, 1)
  33. }
  34. // DiskQueue disk queue
  35. type DiskQueue interface {
  36. Push(p []byte) error
  37. Pop() ([]byte, error)
  38. Close() error
  39. }
  40. // Option Ringbuffer option
  41. type Option func(opt *option)
  42. // SetBucketByte bucketbyte
  43. func SetBucketByte(n int) Option {
  44. return func(opt *option) {
  45. opt.bucketByte = (int32(n) / _blockByte) * _blockByte
  46. }
  47. }
  48. // SetMemBucket set the number of mem bucket
  49. func SetMemBucket(n int) Option {
  50. return func(opt *option) {
  51. opt.memBucket = int32(n)
  52. }
  53. }
  54. // SetDynamicMemBucket set the number of dynamic mem bucket
  55. func SetDynamicMemBucket(n int) Option {
  56. return func(opt *option) {
  57. opt.dynamicMemBucket = int32(n)
  58. }
  59. }
  60. // SetMaxBucket set the number of max bucket 0 represent unlimit
  61. func SetMaxBucket(n int) Option {
  62. return func(opt *option) {
  63. opt.maxBucket = int32(n)
  64. }
  65. }
  66. type option struct {
  67. bucketByte int32
  68. memBucket int32
  69. maxBucket int32
  70. dynamicMemBucket int32
  71. fpath string
  72. }
  73. func (o option) validate() error {
  74. if o.bucketByte <= 0 {
  75. return fmt.Errorf("bucket byte must > 0")
  76. }
  77. if o.memBucket <= 0 {
  78. return fmt.Errorf("mem bucket must > 0")
  79. }
  80. if o.dynamicMemBucket <= 0 {
  81. return fmt.Errorf("dynamic mem bucket must > 0")
  82. }
  83. return nil
  84. }
  85. var _defaultOpt = option{
  86. bucketByte: _defaultBucketByte,
  87. memBucket: _defaultMemBucket,
  88. dynamicMemBucket: _defaultDynamicMemBucket,
  89. }
  90. // New Ringbuffer
  91. func New(fpath string, options ...Option) (DiskQueue, error) {
  92. info, err := os.Stat(fpath)
  93. if err != nil {
  94. if !os.IsNotExist(err) {
  95. return nil, fmt.Errorf("stat %s error: %s", fpath, err)
  96. }
  97. if err = os.MkdirAll(fpath, 0755); err != nil {
  98. return nil, fmt.Errorf("fpath %s not exists try create directry error: %s", fpath, err)
  99. }
  100. } else if !info.IsDir() {
  101. return nil, fmt.Errorf("fpath: %s already exists and not a directory", fpath)
  102. }
  103. // TODO: check permission
  104. opt := _defaultOpt
  105. opt.fpath = fpath
  106. for _, fn := range options {
  107. fn(&opt)
  108. }
  109. if err = opt.validate(); err != nil {
  110. return nil, err
  111. }
  112. b := &base{
  113. opt: opt,
  114. }
  115. if opt.maxBucket == 0 {
  116. return &queue{base: b}, b.init()
  117. }
  118. return nil, nil
  119. }
  120. type node struct {
  121. id int64
  122. mx sync.Mutex
  123. flushing bool
  124. bucket *memBucket
  125. next *node
  126. fpath string
  127. fbucket *fileBucket
  128. kind int8
  129. }
  130. func (n *node) setFlushing(flushing bool) {
  131. n.mx.Lock()
  132. n.flushing = flushing
  133. n.mx.Unlock()
  134. }
  135. func (n *node) pop() ([]byte, error) {
  136. n.mx.Lock()
  137. defer n.mx.Unlock()
  138. if n.bucket != nil {
  139. return n.bucket.pop()
  140. }
  141. var err error
  142. if n.fbucket == nil {
  143. if n.fbucket, err = newFileBucket(n.fpath); err != nil {
  144. return nil, err
  145. }
  146. }
  147. return n.fbucket.pop()
  148. }
  149. type base struct {
  150. opt option
  151. head *node
  152. tail *node
  153. pool *memBucketPool
  154. length int32
  155. memBucket int32
  156. }
  157. func (b *base) init() error {
  158. b.pool = newMemBucketPool(b.opt.bucketByte)
  159. if loaded, err := b.loadFromFile(); err != nil || loaded {
  160. return err
  161. }
  162. current := &node{
  163. id: nextNodeID(),
  164. bucket: b.pool.new(),
  165. }
  166. b.head = current
  167. b.tail = current
  168. return nil
  169. }
  170. func (b *base) loadFromFile() (bool, error) {
  171. infos, err := ioutil.ReadDir(b.opt.fpath)
  172. if err != nil {
  173. return false, fmt.Errorf("readdir %s error: %s", b.opt.fpath, err)
  174. }
  175. var files []string
  176. for _, info := range infos {
  177. if info.IsDir() || !strings.HasPrefix(info.Name(), _filePrefix) {
  178. continue
  179. }
  180. files = append(files, path.Join(b.opt.fpath, info.Name()))
  181. }
  182. if len(files) == 0 {
  183. return false, nil
  184. }
  185. nodeID := func(name string) int64 {
  186. id, err := strconv.ParseInt(path.Base(name)[len(_filePrefix):], 10, 64)
  187. if err != nil {
  188. panic(fmt.Errorf("invalid file name: %s error: %s", name, err))
  189. }
  190. return id
  191. }
  192. sort.Slice(files, func(i int, j int) bool {
  193. return nodeID(files[i]) < nodeID(files[j])
  194. })
  195. _globalID = nodeID(files[len(files)-1])
  196. current := &node{
  197. id: nodeID(files[0]),
  198. fpath: files[0],
  199. kind: _indisk,
  200. }
  201. b.head = current
  202. for _, file := range files[1:] {
  203. next := &node{
  204. id: nodeID(file),
  205. fpath: file,
  206. kind: _indisk,
  207. }
  208. current.next = next
  209. current = next
  210. }
  211. b.memBucket = 1
  212. next := &node{
  213. id: nextNodeID(),
  214. bucket: b.pool.new(),
  215. }
  216. current.next = next
  217. current = next
  218. b.tail = current
  219. return true, nil
  220. }
  221. type queue struct {
  222. *base
  223. mx sync.Mutex
  224. closed bool
  225. lastID int64
  226. wg sync.WaitGroup
  227. }
  228. func (q *queue) Push(p []byte) (err error) {
  229. if len(p) >= int(q.opt.bucketByte) {
  230. return fmt.Errorf("data too large")
  231. }
  232. if q.closed {
  233. return fmt.Errorf("queue already closed")
  234. }
  235. for {
  236. err = q.tail.bucket.push(p)
  237. if err == nil {
  238. atomic.AddInt32(&q.length, 1)
  239. return
  240. }
  241. if err == errBucketFull {
  242. if err = q.moveTail(); err != nil {
  243. return err
  244. }
  245. continue
  246. }
  247. return
  248. }
  249. }
  250. func (q *queue) moveTail() error {
  251. bucket := atomic.LoadInt32(&q.memBucket)
  252. if bucket >= q.opt.memBucket+q.opt.dynamicMemBucket {
  253. return fmt.Errorf("can't assign memory bucket any more")
  254. }
  255. if bucket >= q.opt.maxBucket {
  256. q.notifyStore()
  257. }
  258. // take tail snapshot
  259. p := q.tail
  260. // lock queue
  261. q.mx.Lock()
  262. defer q.mx.Unlock()
  263. // tail alreay changed
  264. if p != q.tail {
  265. return nil
  266. }
  267. atomic.AddInt32(&q.memBucket, 1)
  268. n := &node{
  269. id: nextNodeID(),
  270. bucket: q.pool.new(),
  271. kind: _inmem,
  272. }
  273. // move to new tail
  274. q.tail.next = n
  275. q.tail = n
  276. return nil
  277. }
  278. func (q *queue) notifyStore() {
  279. n := q.head
  280. for n.next != nil {
  281. read := q.head
  282. if n.id > q.lastID && n.kind != _indisk && n != read {
  283. q.lastID = n.id
  284. go q.storeNode(n)
  285. return
  286. }
  287. n = n.next
  288. }
  289. }
  290. func (q *queue) Pop() (data []byte, err error) {
  291. defer func() {
  292. if err != nil {
  293. atomic.AddInt32(&q.length, -1)
  294. }
  295. }()
  296. if q.closed {
  297. return nil, fmt.Errorf("queue already closed")
  298. }
  299. data, err = q.head.pop()
  300. if err != nil {
  301. if err == io.EOF {
  302. if err = q.moveHead(); err != nil {
  303. return nil, err
  304. }
  305. return q.head.pop()
  306. }
  307. return nil, err
  308. }
  309. return data, nil
  310. }
  311. func (q *queue) moveHead() error {
  312. tail := q.tail
  313. if q.head == tail {
  314. return io.EOF
  315. }
  316. // move head to next
  317. q.mx.Lock()
  318. head := q.head
  319. q.head = q.head.next
  320. q.mx.Unlock()
  321. // reset head to new read node
  322. q.freeNode(head)
  323. return nil
  324. }
  325. func (q *queue) freeNode(n *node) {
  326. n.mx.Lock()
  327. defer n.mx.Unlock()
  328. if n.flushing {
  329. n.kind = _freed
  330. return
  331. }
  332. if n.bucket != nil {
  333. q.freeBucket(n.bucket)
  334. n.bucket = nil
  335. }
  336. if n.fbucket != nil {
  337. n.fbucket.close()
  338. }
  339. if n.fpath != "" {
  340. if err := os.Remove(n.fpath); err != nil {
  341. //fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: remove file %s error: %s", n.fpath, err)
  342. }
  343. }
  344. }
  345. func (q *queue) storeNode(n *node) (err error) {
  346. fpath := storePath(q.opt.fpath, n)
  347. q.wg.Add(1)
  348. defer q.wg.Done()
  349. n.setFlushing(true)
  350. // if node already free return direct
  351. if n.bucket == nil {
  352. return
  353. }
  354. // if node be freed just release membucket
  355. if n.kind == _freed {
  356. q.freeBucket(n.bucket)
  357. return
  358. }
  359. // store bucket to disk
  360. if err = store(fpath, n); err != nil {
  361. fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: store node error: %s", err)
  362. }
  363. n.fpath = fpath
  364. n.setFlushing(false)
  365. if n.kind == _freed {
  366. q.freeBucket(n.bucket)
  367. n.bucket = nil
  368. if err := os.Remove(fpath); err != nil {
  369. //fmt.Fprintf(os.Stderr, "[ERROR] diskqueue: remove file %s error: %s", n.fpath, err)
  370. }
  371. return
  372. }
  373. n.kind = _indisk
  374. q.mx.Lock()
  375. if q.head != n {
  376. q.freeBucket(n.bucket)
  377. n.bucket = nil
  378. }
  379. q.mx.Unlock()
  380. return
  381. }
  382. func (q *queue) freeBucket(bucket *memBucket) {
  383. q.pool.free(bucket)
  384. atomic.AddInt32(&q.memBucket, -1)
  385. }
  386. func (q *queue) Close() error {
  387. // set closed
  388. q.closed = true
  389. // wait all store goroutines finish
  390. q.wg.Wait()
  391. var messages []string
  392. // store all leave node
  393. current := q.head
  394. for current != nil {
  395. if current.kind == _inmem && current.bucket != nil {
  396. fpath := storePath(q.opt.fpath, current)
  397. if err := store(fpath, current); err != nil {
  398. messages = append(messages, err.Error())
  399. }
  400. }
  401. current = current.next
  402. }
  403. if len(messages) == 0 {
  404. return nil
  405. }
  406. return fmt.Errorf("close queue error: %s", strings.Join(messages, "; "))
  407. }
  408. func store(fpath string, n *node) (err error) {
  409. // ignore empty bucket
  410. if n.bucket.writeAt == n.bucket.readAt {
  411. return nil
  412. }
  413. var fp *os.File
  414. fp, err = os.OpenFile(fpath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
  415. if err != nil {
  416. return fmt.Errorf("open file %s error: %s", fpath, err)
  417. }
  418. _, err = n.bucket.dump(fp)
  419. if err != nil {
  420. return fmt.Errorf("dump data to file %s error: %s", fpath, err)
  421. }
  422. return
  423. }
  424. func storePath(base string, n *node) string {
  425. return path.Join(base, _filePrefix+strconv.FormatInt(n.id, 10))
  426. }