all.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/search/model"
  5. "go-common/library/log"
  6. )
  7. // all all data to es
  8. func (s *Service) all(c context.Context, appid string, writeEntityIndex bool) {
  9. var stat = new(model.Stat)
  10. app := s.base.D.AppPool[appid]
  11. app.InitIndex(c)
  12. app.InitOffset(c)
  13. //app.Offset(c)
  14. app.Sleep(c)
  15. for {
  16. start := 0
  17. length, err := app.AllMessages(c)
  18. if err != nil {
  19. log.Error("AllMessages error(%v)", err)
  20. app.Sleep(c)
  21. continue
  22. }
  23. for {
  24. end := start + _bulkSize
  25. diff := length - start
  26. if diff > _bulkSize {
  27. if err := app.BulkIndex(c, start, end, writeEntityIndex); err != nil {
  28. log.Error("es:BulkIndex error(%v)", err)
  29. app.Sleep(c)
  30. continue
  31. }
  32. start = end
  33. } else if diff > 0 && diff <= _bulkSize {
  34. if err := app.BulkIndex(c, start, length, writeEntityIndex); err != nil {
  35. log.Error("BulkIndex error(%v)", err)
  36. app.Sleep(c)
  37. continue
  38. }
  39. if err := app.Commit(c); err != nil {
  40. log.Error("UpdateOffsetID error(%v)", err)
  41. app.Sleep(c)
  42. continue
  43. }
  44. app.Sleep(c)
  45. break
  46. } else {
  47. app.Sleep(c)
  48. break
  49. }
  50. }
  51. stat.Counts += length
  52. s.updateStat(appid, stat)
  53. if length < app.Size(c) {
  54. switch appid {
  55. case "pgc_media", "esports", "esports_contests", "academy_archive", "esports_fav_all", "activity_all":
  56. app.SetRecover(c, 0, "", 0)
  57. app.Sleep(c)
  58. continue
  59. }
  60. break
  61. }
  62. app.Sleep(c)
  63. }
  64. log.Info("appid:%s, all data to es successful!!!", appid)
  65. }