incr.go 884 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/search/dao"
  6. )
  7. // incr increment data
  8. func (s *Service) incr(c context.Context, app dao.App) {
  9. app.InitIndex(c)
  10. app.Sleep(c)
  11. app.Offset(c)
  12. Loop:
  13. for {
  14. length, err := app.IncrMessages(c)
  15. if err != nil {
  16. s.base.D.PromError("IncrMessages", "IncrMessages error(%v)", err)
  17. app.Sleep(c)
  18. continue
  19. }
  20. for start := 0; start < length; start += _bulkSize {
  21. diff := length - start
  22. if diff > _bulkSize {
  23. diff = _bulkSize
  24. }
  25. if err := app.BulkIndex(c, start, start+diff, false); err != nil {
  26. s.base.D.PromError("BulkIndex", "BulkIndex error(%v)", err)
  27. time.Sleep(120 * time.Second) // 使databus readTimeout重新消费
  28. continue Loop
  29. }
  30. }
  31. if err := app.Commit(c); err != nil {
  32. s.base.D.PromError("UpdateOffsetID", "UpdateOffsetID error(%v)", err)
  33. }
  34. app.Sleep(c)
  35. }
  36. }