batchwrite.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package batchwrite
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/pkg/errors"
  7. "go-common/app/service/main/dapper/model"
  8. "go-common/library/log"
  9. )
  10. var (
  11. _writeTimeout = time.Second
  12. // ErrClosed .
  13. ErrClosed = errors.New("batchwriter already closed")
  14. )
  15. // BatchWriter BatchWriter
  16. type BatchWriter interface {
  17. WriteSpan(span *model.Span) error
  18. Close() error
  19. // internale queue length
  20. QueueLen() int
  21. }
  22. type rawBundle struct {
  23. key string
  24. data map[string][]byte
  25. }
  26. // NewRawDataBatchWriter NewRawDataBatchWriter
  27. func NewRawDataBatchWriter(writeFunc func(context.Context, string, map[string][]byte) error, bufSize, chanSize, workers int, interval time.Duration) BatchWriter {
  28. if workers <= 0 {
  29. workers = 1
  30. }
  31. if interval <= 0 {
  32. interval = 5 * time.Second
  33. }
  34. rbw := &rawDataBatchWrite{
  35. maxBufSize: bufSize,
  36. ch: make(chan *rawBundle, chanSize),
  37. bufMap: make(map[string]map[string][]byte),
  38. timeout: 10 * time.Second,
  39. writeFunc: writeFunc,
  40. }
  41. rbw.wg.Add(workers)
  42. for i := 0; i < workers; i++ {
  43. go rbw.worker()
  44. }
  45. rbw.flushTicker = time.NewTicker(interval)
  46. go rbw.daemonFlush()
  47. return rbw
  48. }
  49. type rawDataBatchWrite struct {
  50. mx sync.Mutex
  51. closed bool
  52. maxBufSize int
  53. sizeCount int
  54. bufMap map[string]map[string][]byte
  55. ch chan *rawBundle
  56. timeout time.Duration
  57. writeFunc func(context.Context, string, map[string][]byte) error
  58. wg sync.WaitGroup
  59. flushTicker *time.Ticker
  60. }
  61. func (r *rawDataBatchWrite) WriteSpan(span *model.Span) error {
  62. data, err := span.Marshal()
  63. if err != nil {
  64. return err
  65. }
  66. traceID := span.TraceIDStr()
  67. spanID := span.SpanIDStr()
  68. kind := "_s"
  69. if !span.IsServer() {
  70. kind = "_c"
  71. }
  72. key := spanID + kind
  73. var bufMap map[string]map[string][]byte
  74. r.mx.Lock()
  75. if r.sizeCount > r.maxBufSize {
  76. bufMap = r.bufMap
  77. r.bufMap = make(map[string]map[string][]byte)
  78. r.sizeCount = 0
  79. }
  80. r.sizeCount += len(data)
  81. if _, ok := r.bufMap[traceID]; !ok {
  82. r.bufMap[traceID] = make(map[string][]byte)
  83. }
  84. r.bufMap[traceID][key] = data
  85. closed := r.closed
  86. r.mx.Unlock()
  87. if closed {
  88. return ErrClosed
  89. }
  90. if bufMap != nil {
  91. return r.flushBufMap(bufMap)
  92. }
  93. return nil
  94. }
  95. func (r *rawDataBatchWrite) QueueLen() int {
  96. return len(r.ch)
  97. }
  98. func (r *rawDataBatchWrite) daemonFlush() {
  99. for range r.flushTicker.C {
  100. if err := r.flush(); err != nil {
  101. log.Error("flush raw data error: %s", err)
  102. }
  103. }
  104. }
  105. func (r *rawDataBatchWrite) flush() error {
  106. var bufMap map[string]map[string][]byte
  107. r.mx.Lock()
  108. if r.sizeCount != 0 {
  109. bufMap = r.bufMap
  110. r.bufMap = make(map[string]map[string][]byte)
  111. r.sizeCount = 0
  112. }
  113. r.mx.Unlock()
  114. if bufMap != nil {
  115. return r.flushBufMap(bufMap)
  116. }
  117. return nil
  118. }
  119. func (r *rawDataBatchWrite) flushBufMap(bufMap map[string]map[string][]byte) error {
  120. timer := time.NewTimer(_writeTimeout)
  121. for traceID, data := range bufMap {
  122. select {
  123. case <-timer.C:
  124. return errors.New("write span timeout, raw data buffer channel is full")
  125. case r.ch <- &rawBundle{
  126. key: traceID,
  127. data: data,
  128. }:
  129. }
  130. }
  131. return nil
  132. }
  133. func (r *rawDataBatchWrite) Close() error {
  134. r.mx.Lock()
  135. defer r.mx.Unlock()
  136. r.closed = true
  137. r.flushTicker.Stop()
  138. bufMap := r.bufMap
  139. r.bufMap = make(map[string]map[string][]byte)
  140. r.sizeCount = 0
  141. r.flushBufMap(bufMap)
  142. close(r.ch)
  143. r.wg.Wait()
  144. return nil
  145. }
  146. func (r *rawDataBatchWrite) worker() {
  147. for bundle := range r.ch {
  148. if err := r.write(bundle); err != nil {
  149. log.Error("batch write raw data error: %s", err)
  150. }
  151. }
  152. r.wg.Done()
  153. }
  154. func (r *rawDataBatchWrite) write(bundle *rawBundle) error {
  155. ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
  156. defer cancel()
  157. return r.writeFunc(ctx, bundle.key, bundle.data)
  158. }