progress.go 653 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/interface/main/history/model"
  6. "go-common/library/log"
  7. )
  8. func (s *Service) subproc() {
  9. for {
  10. msg, ok := <-s.sub.Messages()
  11. if !ok {
  12. log.Info("subproc exit")
  13. return
  14. }
  15. msg.Commit()
  16. m := &model.History{}
  17. if err := json.Unmarshal(msg.Value, &m); err != nil {
  18. log.Error("json.Unmarshal() error(%v)", err)
  19. continue
  20. }
  21. if m.Mid != 0 && m.Aid != 0 {
  22. s.add(m)
  23. }
  24. }
  25. }
  26. func (s *Service) add(m *model.History) {
  27. for j := 0; j < 3; j++ {
  28. err := s.dao.Add(context.Background(), m)
  29. if err == nil {
  30. return
  31. }
  32. log.Error("s.dao.Add() err:%+v", err)
  33. }
  34. }