1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- package service
- import (
- "context"
- "go-common/app/job/main/search/model"
- "go-common/library/log"
- )
- // all all data to es
- func (s *Service) all(c context.Context, appid string, writeEntityIndex bool) {
- var stat = new(model.Stat)
- app := s.base.D.AppPool[appid]
- app.InitIndex(c)
- app.InitOffset(c)
- //app.Offset(c)
- app.Sleep(c)
- for {
- start := 0
- length, err := app.AllMessages(c)
- if err != nil {
- log.Error("AllMessages error(%v)", err)
- app.Sleep(c)
- continue
- }
- for {
- end := start + _bulkSize
- diff := length - start
- if diff > _bulkSize {
- if err := app.BulkIndex(c, start, end, writeEntityIndex); err != nil {
- log.Error("es:BulkIndex error(%v)", err)
- app.Sleep(c)
- continue
- }
- start = end
- } else if diff > 0 && diff <= _bulkSize {
- if err := app.BulkIndex(c, start, length, writeEntityIndex); err != nil {
- log.Error("BulkIndex error(%v)", err)
- app.Sleep(c)
- continue
- }
- if err := app.Commit(c); err != nil {
- log.Error("UpdateOffsetID error(%v)", err)
- app.Sleep(c)
- continue
- }
- app.Sleep(c)
- break
- } else {
- app.Sleep(c)
- break
- }
- }
- stat.Counts += length
- s.updateStat(appid, stat)
- if length < app.Size(c) {
- switch appid {
- case "pgc_media", "esports", "esports_contests", "academy_archive", "esports_fav_all", "activity_all":
- app.SetRecover(c, 0, "", 0)
- app.Sleep(c)
- continue
- }
- break
- }
- app.Sleep(c)
- }
- log.Info("appid:%s, all data to es successful!!!", appid)
- }
|