service.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "go-common/app/job/main/history/conf"
  9. "go-common/app/job/main/history/dao"
  10. "go-common/app/job/main/history/model"
  11. hmdl "go-common/app/service/main/history/model"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "go-common/library/sync/pipeline"
  15. "go-common/library/sync/pipeline/fanout"
  16. "go-common/library/xstr"
  17. "golang.org/x/time/rate"
  18. )
  19. const (
  20. _chanSize = 1024
  21. _runtineSzie = 32
  22. _retryCnt = 3
  23. )
  24. type message struct {
  25. next *message
  26. data *databus.Message
  27. done bool
  28. }
  29. // Service struct of service.
  30. type Service struct {
  31. c *conf.Config
  32. waiter *sync.WaitGroup
  33. dao *dao.Dao
  34. hisSub *databus.Databus
  35. serviceHisSub *databus.Databus
  36. sub *databus.Databus
  37. mergeChan []chan *message
  38. doneChan chan []*message
  39. merge *pipeline.Pipeline
  40. businesses map[int64]*hmdl.Business
  41. cache *fanout.Fanout
  42. limit *rate.Limiter
  43. }
  44. // New create service instance and return.
  45. func New(c *conf.Config) (s *Service) {
  46. s = &Service{
  47. c: c,
  48. dao: dao.New(c),
  49. waiter: new(sync.WaitGroup),
  50. hisSub: databus.New(c.HisSub),
  51. serviceHisSub: databus.New(c.ServiceHisSub),
  52. sub: databus.New(c.Sub),
  53. mergeChan: make([]chan *message, _chanSize),
  54. doneChan: make(chan []*message, _chanSize),
  55. cache: fanout.New("cache"),
  56. limit: rate.NewLimiter(rate.Limit(c.Job.QPSLimit), c.Job.ServiceBatch*2),
  57. }
  58. s.businesses = s.dao.BusinessesMap
  59. go s.subproc()
  60. go s.consumeproc()
  61. go s.serviceConsumeproc()
  62. go s.deleteproc()
  63. s.initMerge()
  64. for i := 0; i < _runtineSzie; i++ {
  65. c := make(chan *message, _chanSize)
  66. s.mergeChan[i] = c
  67. go s.mergeproc(c)
  68. }
  69. return
  70. }
  71. func (s *Service) consumeproc() {
  72. var (
  73. err error
  74. n int
  75. head, last *message
  76. msgs = s.hisSub.Messages()
  77. )
  78. for {
  79. select {
  80. case msg, ok := <-msgs:
  81. if !ok {
  82. log.Error("s.consumeproc closed")
  83. return
  84. }
  85. // marked head to first commit
  86. m := &message{data: msg}
  87. if head == nil {
  88. head = m
  89. last = m
  90. } else {
  91. last.next = m
  92. last = m
  93. }
  94. if n, err = strconv.Atoi(msg.Key); err != nil {
  95. log.Error("strconv.Atoi(%s) error(%v)", msg.Key, err)
  96. }
  97. // use specify goruntine to flush
  98. s.mergeChan[n%_runtineSzie] <- m
  99. msg.Commit()
  100. log.Info("consumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  101. case done := <-s.doneChan:
  102. // merge partitions to commit offset
  103. commits := make(map[int32]*databus.Message)
  104. for _, d := range done {
  105. d.done = true
  106. }
  107. for ; head != nil && head.done; head = head.next {
  108. commits[head.data.Partition] = head.data
  109. }
  110. // for _, m := range commits {
  111. // m.Commit()
  112. // }
  113. }
  114. }
  115. }
  116. func (s *Service) mergeproc(c chan *message) {
  117. var (
  118. err error
  119. max = s.c.Job.Max
  120. merges = make(map[int64]int64, 10240)
  121. marked = make([]*message, 0, 10240)
  122. ticker = time.NewTicker(time.Duration(s.c.Job.Expire))
  123. )
  124. for {
  125. select {
  126. case msg, ok := <-c:
  127. if !ok {
  128. log.Error("s.mergeproc closed")
  129. return
  130. }
  131. ms := make([]*model.Merge, 0, 32)
  132. if err = json.Unmarshal(msg.data.Value, &ms); err != nil {
  133. log.Error("json.Unmarshal(%s) error(%v)", msg.data.Value, err)
  134. continue
  135. }
  136. for _, m := range ms {
  137. if now, ok := merges[m.Mid]; !ok || now > m.Now {
  138. merges[m.Mid] = m.Now
  139. }
  140. }
  141. marked = append(marked, msg)
  142. if len(merges) < max {
  143. continue
  144. }
  145. case <-ticker.C:
  146. }
  147. if len(merges) > 0 {
  148. s.flush(merges)
  149. s.doneChan <- marked
  150. merges = make(map[int64]int64, 10240)
  151. marked = make([]*message, 0, 10240)
  152. }
  153. }
  154. }
  155. func (s *Service) flush(res map[int64]int64) {
  156. var (
  157. err error
  158. ts int64
  159. mids []int64
  160. batch = s.c.Job.Batch
  161. )
  162. for mid, now := range res {
  163. if now < ts || ts == 0 {
  164. ts = now
  165. }
  166. mids = append(mids, mid)
  167. }
  168. for len(mids) > 0 {
  169. if len(mids) < batch {
  170. batch = len(mids)
  171. }
  172. for i := 0; i < _retryCnt; i++ {
  173. if err = s.dao.Flush(context.Background(), xstr.JoinInts(mids[:batch]), ts); err == nil {
  174. break
  175. }
  176. time.Sleep(time.Millisecond * 100)
  177. }
  178. mids = mids[batch:]
  179. }
  180. }
  181. // Ping ping .
  182. func (s *Service) Ping() {}
  183. // Close .
  184. func (s *Service) Close() {
  185. if s.sub != nil {
  186. s.sub.Close()
  187. }
  188. if s.serviceHisSub != nil {
  189. s.serviceHisSub.Close()
  190. }
  191. s.merge.Close()
  192. s.waiter.Wait()
  193. }