123456789101112131415161718192021222324252627282930313233343536373839 |
- package service
- import (
- "context"
- "time"
- "go-common/app/job/main/search/dao"
- )
- // incr increment data
- func (s *Service) incr(c context.Context, app dao.App) {
- app.InitIndex(c)
- app.Sleep(c)
- app.Offset(c)
- Loop:
- for {
- length, err := app.IncrMessages(c)
- if err != nil {
- s.base.D.PromError("IncrMessages", "IncrMessages error(%v)", err)
- app.Sleep(c)
- continue
- }
- for start := 0; start < length; start += _bulkSize {
- diff := length - start
- if diff > _bulkSize {
- diff = _bulkSize
- }
- if err := app.BulkIndex(c, start, start+diff, false); err != nil {
- s.base.D.PromError("BulkIndex", "BulkIndex error(%v)", err)
- time.Sleep(120 * time.Second) // 使databus readTimeout重新消费
- continue Loop
- }
- }
- if err := app.Commit(c); err != nil {
- s.base.D.PromError("UpdateOffsetID", "UpdateOffsetID error(%v)", err)
- }
- app.Sleep(c)
- }
- }
|