123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package dao
- import (
- "context"
- "time"
- "go-common/app/job/main/search/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- )
- // AppSingle .
- type AppSingle struct {
- d *Dao
- appid string
- attrs *model.Attrs
- db *xsql.DB
- offset *model.LoopOffset
- mapData []model.MapData
- }
- // NewAppSingle .
- func NewAppSingle(d *Dao, appid string) (as *AppSingle) {
- as = &AppSingle{
- d: d,
- appid: appid,
- attrs: d.AttrPool[appid],
- offset: &model.LoopOffset{},
- mapData: []model.MapData{},
- db: d.DBPool[d.AttrPool[appid].DBName],
- }
- return
- }
- // Business return business.
- func (as *AppSingle) Business() string {
- return as.attrs.Business
- }
- // InitIndex init index.
- func (as *AppSingle) InitIndex(c context.Context) {
- if aliases, err := as.d.GetAliases(as.attrs.ESName, as.attrs.Index.IndexAliasPrefix); err != nil {
- as.d.InitIndex(c, nil, as.attrs.ESName, as.attrs.Index.IndexAliasPrefix, as.attrs.Index.IndexEntityPrefix, as.attrs.Index.IndexMapping)
- } else {
- as.d.InitIndex(c, aliases, as.attrs.ESName, as.attrs.Index.IndexAliasPrefix, as.attrs.Index.IndexEntityPrefix, as.attrs.Index.IndexMapping)
- }
- }
- // InitOffset insert init value to offset.
- func (as *AppSingle) InitOffset(c context.Context) {
- as.d.InitOffset(c, as.offset, as.attrs, []string{})
- nowFormat := time.Now().Format("2006-01-02 15:04:05")
- as.offset.SetOffset(0, nowFormat)
- }
- // Offset get offset.
- func (as *AppSingle) Offset(c context.Context) {
- for {
- offset, err := as.d.Offset(c, as.appid, as.attrs.Table.TablePrefix)
- if err != nil {
- log.Error("ac.d.Offset error(%v)", err)
- time.Sleep(time.Second * 3)
- continue
- }
- as.offset.SetReview(offset.ReviewID, offset.ReviewTime)
- as.offset.SetOffset(offset.OffsetID(), offset.OffsetTime())
- break
- }
- }
- // SetRecover set recover
- func (as *AppSingle) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
- as.offset.SetRecoverOffset(recoverID, recoverTime)
- }
- // IncrMessages .
- func (as *AppSingle) IncrMessages(c context.Context) (length int, err error) {
- var rows *xsql.Rows
- //fmt.Println("start", as.offset.OffsetTime)
- if !as.offset.IsLoop {
- rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByMTime, as.offset.OffsetTime, as.attrs.Other.Size)
- } else {
- rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByIDMTime, as.offset.OffsetID, as.offset.OffsetTime, as.attrs.Other.Size)
- }
- if err != nil {
- log.Error("db.Query error(%v)", err)
- return
- }
- defer rows.Close()
- tempPartList := []model.MapData{}
- for rows.Next() {
- item, row := InitMapData(as.attrs.DataSQL.DataIndexFields)
- if err = rows.Scan(row...); err != nil {
- log.Error("IncrMessages rows.Scan() error(%v)", err)
- return
- }
- as.mapData = append(as.mapData, item)
- tempPartList = append(tempPartList, item)
- }
- if len(as.mapData) > 0 {
- // extra relevant data
- as.mapData, err = as.d.ExtraData(c, as.mapData, as.attrs, "db", []string{})
- // offset
- UpdateOffsetByMap(as.offset, tempPartList...)
- }
- length = len(as.mapData)
- return
- }
- // AllMessages .
- func (as *AppSingle) AllMessages(c context.Context) (length int, err error) {
- var rows *xsql.Rows
- if rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByID, as.offset.RecoverID, as.attrs.Other.Size); err != nil {
- log.Error("AllMessages db.Query error(%v)", err)
- return
- }
- defer rows.Close()
- for rows.Next() {
- item, row := InitMapData(as.attrs.DataSQL.DataIndexFields)
- if err = rows.Scan(row...); err != nil {
- log.Error("AllMessages rows.Scan() error(%v)", err)
- continue
- }
- as.mapData = append(as.mapData, item)
- }
- if len(as.mapData) > 0 {
- // extra relevant data
- as.mapData, err = as.d.ExtraData(c, as.mapData, as.attrs, "db", []string{})
- // offset
- if v, ok := as.mapData[len(as.mapData)-1]["_id"]; ok && v != nil {
- if v2, ok := v.(interface{}); ok {
- as.offset.SetTempOffset((v2).(int64), "")
- as.offset.SetRecoverTempOffset((v2).(int64), "")
- } else {
- log.Error("single.all._id interface error")
- }
- } else {
- log.Error("single.all._id nil error")
- }
- }
- length = len(as.mapData)
- return
- }
- // BulkIndex .
- func (as *AppSingle) BulkIndex(c context.Context, start int, end int, writeEntityIndex bool) (err error) {
- if len(as.mapData) >= (start+1) && len(as.mapData) >= end {
- partData := as.mapData[start:end]
- err = as.d.BulkDBData(c, as.attrs, writeEntityIndex, partData...)
- }
- return
- }
- // Commit commit offset.
- func (as *AppSingle) Commit(c context.Context) (err error) {
- err = as.d.CommitOffset(c, as.offset, as.attrs.AppID, as.attrs.Table.TablePrefix)
- as.mapData = []model.MapData{}
- return
- }
- // Sleep interval duration.
- func (as *AppSingle) Sleep(c context.Context) {
- time.Sleep(time.Second * time.Duration(as.attrs.Other.Sleep))
- }
- // Size return size.
- func (as *AppSingle) Size(c context.Context) int {
- return as.attrs.Other.Size
- }
|