app_databus.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/search/model"
  8. xsql "go-common/library/database/sql"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. )
  12. // AppDatabus single table consume databus.
  13. type AppDatabus struct {
  14. d *Dao
  15. appid string
  16. attrs *model.Attrs
  17. db *xsql.DB
  18. dtb *databus.Databus
  19. offset *model.LoopOffset
  20. mapData []model.MapData
  21. commits map[int32]*databus.Message
  22. }
  23. // NewAppDatabus .
  24. func NewAppDatabus(d *Dao, appid string) (a *AppDatabus) {
  25. a = &AppDatabus{
  26. d: d,
  27. appid: appid,
  28. attrs: d.AttrPool[appid],
  29. offset: &model.LoopOffset{},
  30. mapData: []model.MapData{},
  31. db: d.DBPool[d.AttrPool[appid].DBName],
  32. dtb: d.DatabusPool[d.AttrPool[appid].Databus.Databus],
  33. commits: make(map[int32]*databus.Message),
  34. }
  35. return
  36. }
  37. // Business return business.
  38. func (a *AppDatabus) Business() string {
  39. return a.attrs.Business
  40. }
  41. // InitIndex init index.
  42. func (a *AppDatabus) InitIndex(c context.Context) {
  43. if aliases, err := a.d.GetAliases(a.attrs.ESName, a.attrs.Index.IndexAliasPrefix); err != nil {
  44. a.d.InitIndex(c, nil, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexEntityPrefix, a.attrs.Index.IndexMapping)
  45. } else {
  46. a.d.InitIndex(c, aliases, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexMapping)
  47. }
  48. }
  49. // InitOffset insert init value to offset.
  50. func (a *AppDatabus) InitOffset(c context.Context) {
  51. a.d.InitOffset(c, a.offset, a.attrs, []string{})
  52. nowFormat := time.Now().Format("2006-01-02 15:04:05")
  53. a.offset.SetOffset(0, nowFormat)
  54. }
  55. // Offset get offset.
  56. func (a *AppDatabus) Offset(c context.Context) {
  57. for {
  58. offset, err := a.d.Offset(c, a.appid, a.attrs.Table.TablePrefix)
  59. if err != nil {
  60. log.Error("a.d.Offset error(%v)", err)
  61. time.Sleep(time.Second * 3)
  62. continue
  63. }
  64. a.offset.SetReview(offset.ReviewID, offset.ReviewTime)
  65. a.offset.SetOffset(offset.OffsetID(), offset.OffsetTime())
  66. break
  67. }
  68. }
  69. // SetRecover set recover
  70. func (a *AppDatabus) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  71. a.offset.SetRecoverOffset(recoverID, recoverTime)
  72. }
  73. // IncrMessages .
  74. func (a *AppDatabus) IncrMessages(c context.Context) (length int, err error) {
  75. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(a.attrs.Databus.Ticker)))
  76. defer ticker.Stop()
  77. for {
  78. select {
  79. case msg, ok := <-a.dtb.Messages():
  80. if !ok {
  81. log.Error("databus: %s binlog consumer exit!!!", a.attrs.Databus)
  82. break
  83. }
  84. m := &model.Message{}
  85. a.commits[msg.Partition] = msg
  86. if err = json.Unmarshal(msg.Value, m); err != nil {
  87. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  88. continue
  89. }
  90. if m.Table == a.attrs.Table.TablePrefix {
  91. if m.Action == "insert" || m.Action == "update" {
  92. var parseMap map[string]interface{}
  93. parseMap, err = a.d.JSON2map(m.New)
  94. log.Info(fmt.Sprintf("%v: %+v", a.attrs.AppID, parseMap))
  95. if err != nil {
  96. log.Error("a.JSON2map error(%v)", err)
  97. continue
  98. }
  99. a.mapData = append(a.mapData, parseMap)
  100. }
  101. }
  102. if len(a.mapData) < a.attrs.Databus.AggCount {
  103. continue
  104. }
  105. case <-ticker.C:
  106. }
  107. break
  108. }
  109. if len(a.mapData) > 0 {
  110. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "dtb", []string{})
  111. }
  112. length = len(a.mapData)
  113. return
  114. }
  115. // AllMessages .
  116. func (a *AppDatabus) AllMessages(c context.Context) (length int, err error) {
  117. rows, err := a.db.Query(c, a.attrs.DataSQL.SQLByID, a.offset.OffsetID, a.attrs.Other.Size)
  118. log.Info("appid: %s allMessages Current offsetID: %d", a.appid, a.offset.OffsetID)
  119. if err != nil {
  120. log.Error("AllMessages db.Query error(%v)", err)
  121. return
  122. }
  123. defer rows.Close()
  124. for rows.Next() {
  125. item, row := InitMapData(a.attrs.DataSQL.DataIndexFields)
  126. if err = rows.Scan(row...); err != nil {
  127. log.Error("AllMessages rows.Scan() error(%v)", err)
  128. return
  129. }
  130. a.mapData = append(a.mapData, item)
  131. }
  132. if len(a.mapData) > 0 {
  133. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "db", []string{})
  134. // offset
  135. if v, ok := a.mapData[len(a.mapData)-1]["_id"]; ok && v != nil {
  136. if v2, ok := v.(interface{}); ok {
  137. a.offset.SetTempOffset((v2).(int64), "")
  138. a.offset.SetRecoverTempOffset((v2).(int64), "")
  139. } else {
  140. log.Error("dtb.all._id interface error")
  141. }
  142. } else {
  143. log.Error("dtb.all._id nil error")
  144. }
  145. }
  146. length = len(a.mapData)
  147. return
  148. }
  149. // BulkIndex .
  150. func (a *AppDatabus) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) {
  151. partData := a.mapData[start:end]
  152. if a.d.c.Business.Index {
  153. err = a.d.BulkDBData(c, a.attrs, writeEntityIndex, partData...)
  154. } else {
  155. err = a.d.BulkDatabusData(c, a.attrs, writeEntityIndex, partData...)
  156. }
  157. return
  158. }
  159. // Commit commit offset.
  160. func (a *AppDatabus) Commit(c context.Context) (err error) {
  161. if a.d.c.Business.Index {
  162. err = a.d.CommitOffset(c, a.offset, a.attrs.AppID, a.attrs.Table.TablePrefix)
  163. } else {
  164. for k, cos := range a.commits {
  165. if err = cos.Commit(); err != nil {
  166. log.Error("appid(%s) commit error(%v)", a.attrs.AppID, err)
  167. continue
  168. }
  169. delete(a.commits, k)
  170. }
  171. }
  172. a.mapData = []model.MapData{}
  173. return
  174. }
  175. // Sleep interval duration.
  176. func (a *AppDatabus) Sleep(c context.Context) {
  177. time.Sleep(time.Second * time.Duration(a.attrs.Other.Sleep))
  178. }
  179. // Size return size.
  180. func (a *AppDatabus) Size(c context.Context) int {
  181. return a.attrs.Other.Size
  182. }