service.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/answer/conf"
  8. "go-common/app/job/main/answer/dao"
  9. "go-common/app/job/main/answer/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. const (
  14. _insertAction = "insert"
  15. _updateAction = "update"
  16. _labourTable = "blocked_labour_question"
  17. )
  18. // Service service def.
  19. type Service struct {
  20. c *conf.Config
  21. dao *dao.Dao
  22. labourDatabus *databus.Databus
  23. accountFormal *databus.Databus
  24. waiter sync.WaitGroup
  25. uploadInterval time.Duration
  26. closed bool
  27. }
  28. // New create a instance of Service and return.
  29. func New(c *conf.Config) (s *Service) {
  30. s = &Service{
  31. c: c,
  32. dao: dao.New(c),
  33. uploadInterval: time.Duration(c.Properties.UploadInterval),
  34. }
  35. if c.Databus.Labour != nil {
  36. s.labourDatabus = databus.New(c.Databus.Labour)
  37. s.waiter.Add(1)
  38. go s.labourproc()
  39. }
  40. if c.Databus.Account != nil {
  41. s.accountFormal = databus.New(c.Databus.Account)
  42. go s.formalproc()
  43. }
  44. s.waiter.Add(1)
  45. go s.loadextarqueproc()
  46. return s
  47. }
  48. func (s *Service) labourproc() {
  49. defer s.waiter.Done()
  50. var (
  51. err error
  52. msg *databus.Message
  53. msgChan = s.labourDatabus.Messages()
  54. ok bool
  55. )
  56. for {
  57. msg, ok = <-msgChan
  58. if !ok {
  59. log.Info("labour msgChan closed")
  60. }
  61. if s.closed {
  62. return
  63. }
  64. if err = msg.Commit(); err != nil {
  65. log.Error("msg.Commit err(%v)", err)
  66. }
  67. v := &model.MsgCanal{}
  68. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  69. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  70. continue
  71. }
  72. if v.Table == _labourTable {
  73. switch v.Action {
  74. case _insertAction:
  75. s.AddLabourQuestion(context.Background(), v)
  76. case _updateAction:
  77. s.ModifyLabourQuestion(context.Background(), v)
  78. }
  79. }
  80. }
  81. }
  82. func (s *Service) formalproc() {
  83. var (
  84. ok bool
  85. err error
  86. msg *databus.Message
  87. msgChan = s.accountFormal.Messages()
  88. )
  89. for {
  90. msg, ok = <-msgChan
  91. if !ok {
  92. log.Info("account formal msgChan closed")
  93. }
  94. if s.closed {
  95. return
  96. }
  97. v := &model.Formal{}
  98. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  99. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  100. continue
  101. }
  102. for retries := 0; retries < s.c.Properties.MaxRetries; retries++ {
  103. if err = s.dao.BeFormal(context.Background(), v.Mid, v.IP); err != nil {
  104. sleep := s.c.Backoff.Backoff(retries)
  105. log.Error("s.dao.BeFormal(%+v) sleep(%d) err(%+v)", v, sleep, err)
  106. time.Sleep(sleep * time.Second)
  107. continue
  108. }
  109. break
  110. }
  111. if err = msg.Commit(); err != nil {
  112. log.Error("msg.Commit err(%v)", err)
  113. }
  114. }
  115. }
  116. // Close all resource.
  117. func (s *Service) Close() (err error) {
  118. defer s.waiter.Wait()
  119. s.closed = true
  120. s.dao.Close()
  121. if err = s.labourDatabus.Close(); err != nil {
  122. log.Error("s.labourDatabus.Close() error(%v)", err)
  123. return
  124. }
  125. return
  126. }
  127. // Ping check dao health.
  128. func (s *Service) Ping(c context.Context) (err error) {
  129. return s.dao.Ping(c)
  130. }
  131. func (s *Service) loadextarqueproc() {
  132. defer s.waiter.Done()
  133. for {
  134. time.Sleep(s.uploadInterval)
  135. if s.closed {
  136. return
  137. }
  138. res, err := s.dao.QidsExtraByState(context.Background(), model.LimitSize)
  139. if err != nil {
  140. log.Error("s.dao.QidsExtraByState() error(%v)", err)
  141. continue
  142. }
  143. if len(res) == 0 {
  144. continue
  145. }
  146. for _, q := range res {
  147. s.UploadQueImg(context.Background(), q)
  148. }
  149. }
  150. }