service.go 6.0 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "time"
  7. "go-common/app/interface/main/credit/conf"
  8. dao "go-common/app/interface/main/credit/dao"
  9. model "go-common/app/interface/main/credit/model"
  10. accgrpc "go-common/app/service/main/account/api"
  11. arcrpc "go-common/app/service/main/archive/api/gorpc"
  12. fligrpc "go-common/app/service/main/filter/api/grpc/v1"
  13. memrpc "go-common/app/service/main/member/api/gorpc"
  14. "go-common/library/log"
  15. "github.com/pkg/errors"
  16. )
  17. // Service struct of service.
  18. type Service struct {
  19. dao *dao.Dao
  20. // rpc
  21. arcRPC *arcrpc.Service2
  22. memRPC *memrpc.Service
  23. // grpc
  24. accountClient accgrpc.AccountClient
  25. fliClient fligrpc.FilterClient
  26. // conf
  27. c *conf.Config
  28. question []*model.LabourQs
  29. avIDs []int64
  30. missch chan func()
  31. // announcement
  32. announcement *announcement
  33. managers map[string]int64
  34. tagMap map[int8]int64
  35. }
  36. type announcement struct {
  37. def []*model.BlockedAnnouncement
  38. alist map[int8][]*model.BlockedAnnouncement
  39. amap map[int64]*model.BlockedAnnouncement
  40. }
  41. // New create service instance and return.
  42. func New(c *conf.Config) (s *Service) {
  43. s = &Service{
  44. c: c,
  45. dao: dao.New(c),
  46. missch: make(chan func(), 1024000),
  47. arcRPC: arcrpc.New2(c.RPCClient2.Archive),
  48. memRPC: memrpc.New(c.RPCClient2.Member),
  49. tagMap: make(map[int8]int64),
  50. announcement: &announcement{
  51. def: make([]*model.BlockedAnnouncement, 0, 4),
  52. alist: make(map[int8][]*model.BlockedAnnouncement),
  53. amap: make(map[int64]*model.BlockedAnnouncement),
  54. },
  55. }
  56. var err error
  57. if s.fliClient, err = fligrpc.NewClient(c.GRPCClient.Filter); err != nil {
  58. panic(errors.WithMessage(err, "Failed to dial filter service"))
  59. }
  60. if s.accountClient, err = accgrpc.NewClient(c.GRPCClient.Account); err != nil {
  61. panic(errors.WithMessage(err, "Failed to dial account service"))
  62. }
  63. s.initTag()
  64. s.loadConf()
  65. s.loadQuestion()
  66. s.loadManager()
  67. s.LoadAnnouncement(context.TODO())
  68. go s.loadConfproc()
  69. go s.loadQuestionproc()
  70. go s.loadManagerproc()
  71. go s.loadAnnouncementproc()
  72. go s.cacheproc()
  73. return
  74. }
  75. func (s *Service) loadConfproc() {
  76. for {
  77. time.Sleep(time.Duration(s.c.Judge.ConfTimer))
  78. s.loadConf()
  79. }
  80. }
  81. func (s *Service) loadQuestionproc() {
  82. for {
  83. time.Sleep(time.Duration(s.c.Judge.ConfTimer))
  84. s.loadQuestion()
  85. }
  86. }
  87. func (s *Service) loadManagerproc() {
  88. for {
  89. time.Sleep(time.Duration(s.c.Judge.LoadManagerTime))
  90. s.loadManager()
  91. }
  92. }
  93. func (s *Service) loadAnnouncementproc() {
  94. for {
  95. time.Sleep(time.Duration(s.c.Judge.ConfTimer))
  96. s.LoadAnnouncement(context.TODO())
  97. }
  98. }
  99. func (s *Service) loadConf() {
  100. m, err := s.dao.LoadConf(context.TODO())
  101. if err != nil {
  102. log.Error("loadConf error(%v)", err)
  103. return
  104. }
  105. if s.c.Judge.CaseGiveHours, err = strconv.ParseInt(m["case_give_hours"], 10, 64); err != nil {
  106. log.Error("loadConf CaseGiveHours error(%v)", err)
  107. }
  108. if s.c.Judge.CaseCheckTime, err = strconv.ParseInt(m["case_check_hours"], 10, 64); err != nil {
  109. log.Error("loadConf CaseCheckTime error(%v)", err)
  110. }
  111. if s.c.Judge.JuryRatio, err = strconv.ParseInt(m["jury_vote_radio"], 10, 64); err != nil {
  112. log.Error("loadConf JuryRatio error(%v)", err)
  113. }
  114. if s.c.Judge.JudgeRadio, err = strconv.ParseInt(m["case_judge_radio"], 10, 64); err != nil {
  115. log.Error("loadConf JudgeRadio error(%v)", err)
  116. }
  117. if s.c.Judge.CaseVoteMin, err = strconv.ParseInt(m["case_vote_min"], 10, 64); err != nil {
  118. log.Error("loadConf CaseVoteMin error(%v)", err)
  119. }
  120. if s.c.Judge.CaseObtainMax, err = strconv.ParseInt(m["case_obtain_max"], 10, 64); err != nil {
  121. log.Error("loadConf CaseObtainMax error(%v)", err)
  122. }
  123. if s.c.Judge.CaseVoteMax, err = strconv.ParseInt(m["case_vote_max"], 10, 64); err != nil {
  124. log.Error("loadConf CaseVoteMax error(%v)", err)
  125. }
  126. if s.c.Judge.JuryApplyMax, err = strconv.ParseInt(m["jury_apply_max"], 10, 64); err != nil {
  127. log.Error("loadConf JuryApplyMax error(%v)", err)
  128. }
  129. if s.c.Judge.CaseLoadMax, err = strconv.Atoi(m["case_load_max"]); err != nil {
  130. log.Error("loadConf CaseLoadMax error(%v)", err)
  131. }
  132. var caseLoadSwitch int64
  133. if caseLoadSwitch, err = strconv.ParseInt(m["case_load_switch"], 10, 64); err != nil {
  134. log.Error("loadConf CaseLoadSwitch error(%v)", err)
  135. }
  136. s.c.Judge.CaseLoadSwitch = int8(caseLoadSwitch)
  137. if _, ok := m["vote_num"]; !ok {
  138. s.c.Judge.VoteNum.RateS = 1
  139. s.c.Judge.VoteNum.RateA = 1
  140. s.c.Judge.VoteNum.RateB = 1
  141. s.c.Judge.VoteNum.RateC = 1
  142. s.c.Judge.VoteNum.RateD = 1
  143. return
  144. }
  145. if err = json.Unmarshal([]byte(m["vote_num"]), &s.c.Judge.VoteNum); err != nil {
  146. log.Error("loadConf vote_num error(%v)", err)
  147. }
  148. }
  149. func (s *Service) initTag() {
  150. s.tagMap[model.OriginReply] = s.c.TagID.Reply
  151. s.tagMap[model.OriginDM] = s.c.TagID.DM
  152. s.tagMap[model.OriginMsg] = s.c.TagID.Msg
  153. s.tagMap[model.OriginTag] = s.c.TagID.Tag
  154. s.tagMap[model.OriginMember] = s.c.TagID.Member
  155. s.tagMap[model.OriginArchive] = s.c.TagID.Archive
  156. s.tagMap[model.OriginMusic] = s.c.TagID.Music
  157. s.tagMap[model.OriginArticle] = s.c.TagID.Article
  158. s.tagMap[model.OriginSpaceTop] = s.c.TagID.SpaceTop
  159. }
  160. func (s *Service) loadManager() {
  161. managers, err := s.dao.Managers(context.TODO())
  162. if err != nil {
  163. log.Error("s.dao.Managers error(%v)", err)
  164. return
  165. }
  166. s.managers = managers
  167. }
  168. func (s *Service) loadQuestion() {
  169. audit, avIDs, err := s.dao.LastAuditQuestion(context.TODO())
  170. if err != nil {
  171. log.Error("s.dao.LastAuditQuestion error(%v)", err)
  172. return
  173. }
  174. noAudit, noAvIDs, err := s.dao.LastNoAuditQuestion(context.TODO())
  175. if err != nil {
  176. log.Error("s.dao.LastNoAuditQuestion error(%v)", err)
  177. return
  178. }
  179. audit = append(audit, noAudit...)
  180. avIDs = append(avIDs, noAvIDs...)
  181. s.question = audit
  182. s.avIDs = avIDs
  183. }
  184. // Ping check server ok.
  185. func (s *Service) Ping(c context.Context) (err error) {
  186. if err = s.dao.Ping(c); err != nil {
  187. return
  188. }
  189. return s.dao.Ping(c)
  190. }
  191. // Close dao.
  192. func (s *Service) Close() {
  193. s.dao.Close()
  194. }
  195. func (s *Service) addCache(f func()) {
  196. select {
  197. case s.missch <- f:
  198. default:
  199. log.Warn("cacheproc chan full")
  200. }
  201. }
  202. // cacheproc is a routine for executing closure.
  203. func (s *Service) cacheproc() {
  204. for {
  205. f := <-s.missch
  206. f()
  207. }
  208. }