app_single.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package dao
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/search/model"
  6. xsql "go-common/library/database/sql"
  7. "go-common/library/log"
  8. )
  9. // AppSingle .
  10. type AppSingle struct {
  11. d *Dao
  12. appid string
  13. attrs *model.Attrs
  14. db *xsql.DB
  15. offset *model.LoopOffset
  16. mapData []model.MapData
  17. }
  18. // NewAppSingle .
  19. func NewAppSingle(d *Dao, appid string) (as *AppSingle) {
  20. as = &AppSingle{
  21. d: d,
  22. appid: appid,
  23. attrs: d.AttrPool[appid],
  24. offset: &model.LoopOffset{},
  25. mapData: []model.MapData{},
  26. db: d.DBPool[d.AttrPool[appid].DBName],
  27. }
  28. return
  29. }
  30. // Business return business.
  31. func (as *AppSingle) Business() string {
  32. return as.attrs.Business
  33. }
  34. // InitIndex init index.
  35. func (as *AppSingle) InitIndex(c context.Context) {
  36. if aliases, err := as.d.GetAliases(as.attrs.ESName, as.attrs.Index.IndexAliasPrefix); err != nil {
  37. as.d.InitIndex(c, nil, as.attrs.ESName, as.attrs.Index.IndexAliasPrefix, as.attrs.Index.IndexEntityPrefix, as.attrs.Index.IndexMapping)
  38. } else {
  39. as.d.InitIndex(c, aliases, as.attrs.ESName, as.attrs.Index.IndexAliasPrefix, as.attrs.Index.IndexEntityPrefix, as.attrs.Index.IndexMapping)
  40. }
  41. }
  42. // InitOffset insert init value to offset.
  43. func (as *AppSingle) InitOffset(c context.Context) {
  44. as.d.InitOffset(c, as.offset, as.attrs, []string{})
  45. nowFormat := time.Now().Format("2006-01-02 15:04:05")
  46. as.offset.SetOffset(0, nowFormat)
  47. }
  48. // Offset get offset.
  49. func (as *AppSingle) Offset(c context.Context) {
  50. for {
  51. offset, err := as.d.Offset(c, as.appid, as.attrs.Table.TablePrefix)
  52. if err != nil {
  53. log.Error("ac.d.Offset error(%v)", err)
  54. time.Sleep(time.Second * 3)
  55. continue
  56. }
  57. as.offset.SetReview(offset.ReviewID, offset.ReviewTime)
  58. as.offset.SetOffset(offset.OffsetID(), offset.OffsetTime())
  59. break
  60. }
  61. }
  62. // SetRecover set recover
  63. func (as *AppSingle) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  64. as.offset.SetRecoverOffset(recoverID, recoverTime)
  65. }
  66. // IncrMessages .
  67. func (as *AppSingle) IncrMessages(c context.Context) (length int, err error) {
  68. var rows *xsql.Rows
  69. //fmt.Println("start", as.offset.OffsetTime)
  70. if !as.offset.IsLoop {
  71. rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByMTime, as.offset.OffsetTime, as.attrs.Other.Size)
  72. } else {
  73. rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByIDMTime, as.offset.OffsetID, as.offset.OffsetTime, as.attrs.Other.Size)
  74. }
  75. if err != nil {
  76. log.Error("db.Query error(%v)", err)
  77. return
  78. }
  79. defer rows.Close()
  80. tempPartList := []model.MapData{}
  81. for rows.Next() {
  82. item, row := InitMapData(as.attrs.DataSQL.DataIndexFields)
  83. if err = rows.Scan(row...); err != nil {
  84. log.Error("IncrMessages rows.Scan() error(%v)", err)
  85. return
  86. }
  87. as.mapData = append(as.mapData, item)
  88. tempPartList = append(tempPartList, item)
  89. }
  90. if len(as.mapData) > 0 {
  91. // extra relevant data
  92. as.mapData, err = as.d.ExtraData(c, as.mapData, as.attrs, "db", []string{})
  93. // offset
  94. UpdateOffsetByMap(as.offset, tempPartList...)
  95. }
  96. length = len(as.mapData)
  97. return
  98. }
  99. // AllMessages .
  100. func (as *AppSingle) AllMessages(c context.Context) (length int, err error) {
  101. var rows *xsql.Rows
  102. if rows, err = as.db.Query(c, as.attrs.DataSQL.SQLByID, as.offset.RecoverID, as.attrs.Other.Size); err != nil {
  103. log.Error("AllMessages db.Query error(%v)", err)
  104. return
  105. }
  106. defer rows.Close()
  107. for rows.Next() {
  108. item, row := InitMapData(as.attrs.DataSQL.DataIndexFields)
  109. if err = rows.Scan(row...); err != nil {
  110. log.Error("AllMessages rows.Scan() error(%v)", err)
  111. continue
  112. }
  113. as.mapData = append(as.mapData, item)
  114. }
  115. if len(as.mapData) > 0 {
  116. // extra relevant data
  117. as.mapData, err = as.d.ExtraData(c, as.mapData, as.attrs, "db", []string{})
  118. // offset
  119. if v, ok := as.mapData[len(as.mapData)-1]["_id"]; ok && v != nil {
  120. if v2, ok := v.(interface{}); ok {
  121. as.offset.SetTempOffset((v2).(int64), "")
  122. as.offset.SetRecoverTempOffset((v2).(int64), "")
  123. } else {
  124. log.Error("single.all._id interface error")
  125. }
  126. } else {
  127. log.Error("single.all._id nil error")
  128. }
  129. }
  130. length = len(as.mapData)
  131. return
  132. }
  133. // BulkIndex .
  134. func (as *AppSingle) BulkIndex(c context.Context, start int, end int, writeEntityIndex bool) (err error) {
  135. if len(as.mapData) >= (start+1) && len(as.mapData) >= end {
  136. partData := as.mapData[start:end]
  137. err = as.d.BulkDBData(c, as.attrs, writeEntityIndex, partData...)
  138. }
  139. return
  140. }
  141. // Commit commit offset.
  142. func (as *AppSingle) Commit(c context.Context) (err error) {
  143. err = as.d.CommitOffset(c, as.offset, as.attrs.AppID, as.attrs.Table.TablePrefix)
  144. as.mapData = []model.MapData{}
  145. return
  146. }
  147. // Sleep interval duration.
  148. func (as *AppSingle) Sleep(c context.Context) {
  149. time.Sleep(time.Second * time.Duration(as.attrs.Other.Sleep))
  150. }
  151. // Size return size.
  152. func (as *AppSingle) Size(c context.Context) int {
  153. return as.attrs.Other.Size
  154. }