archive_video_relation.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package business
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/search/conf"
  7. "go-common/app/job/main/search/dao"
  8. "go-common/app/job/main/search/model"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. // Avr single table consume databus.
  14. type Avr struct {
  15. c *conf.Config
  16. d *dao.Dao
  17. appid string
  18. attrs *model.Attrs
  19. db *xsql.DB
  20. dtb *databus.Databus
  21. offset *model.LoopOffset
  22. mapData []model.MapData
  23. commits map[int32]*databus.Message
  24. }
  25. // NewAvr .
  26. func NewAvr(d *dao.Dao, appid string, c *conf.Config) (a *Avr) {
  27. a = &Avr{
  28. c: c,
  29. d: d,
  30. appid: appid,
  31. attrs: d.AttrPool[appid],
  32. offset: &model.LoopOffset{},
  33. mapData: []model.MapData{},
  34. db: d.DBPool[d.AttrPool[appid].DBName],
  35. dtb: d.DatabusPool[d.AttrPool[appid].Databus.Databus],
  36. commits: make(map[int32]*databus.Message),
  37. }
  38. return
  39. }
  40. // Business return business.
  41. func (a *Avr) Business() string {
  42. return a.attrs.Business
  43. }
  44. // InitIndex init index.
  45. func (a *Avr) InitIndex(c context.Context) {
  46. if aliases, err := a.d.GetAliases(a.attrs.ESName, a.attrs.Index.IndexAliasPrefix); err != nil {
  47. a.d.InitIndex(c, nil, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexEntityPrefix, a.attrs.Index.IndexMapping)
  48. } else {
  49. a.d.InitIndex(c, aliases, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexEntityPrefix, a.attrs.Index.IndexMapping)
  50. }
  51. }
  52. // InitOffset insert init value to offset.
  53. func (a *Avr) InitOffset(c context.Context) {
  54. a.d.InitOffset(c, a.offset, a.attrs, []string{})
  55. nowFormat := time.Now().Format("2006-01-02 15:04:05")
  56. a.offset.SetOffset(0, nowFormat)
  57. }
  58. // Offset get offset.
  59. func (a *Avr) Offset(c context.Context) {
  60. for {
  61. offset, err := a.d.Offset(c, a.appid, a.attrs.Table.TablePrefix)
  62. if err != nil {
  63. log.Error("a.d.Offset error(%v)", err)
  64. time.Sleep(time.Second * 3)
  65. continue
  66. }
  67. a.offset.SetReview(offset.ReviewID, offset.ReviewTime)
  68. a.offset.SetOffset(offset.OffsetID(), offset.OffsetTime())
  69. break
  70. }
  71. }
  72. // SetRecover set recover
  73. func (a *Avr) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  74. a.offset.SetRecoverOffset(recoverID, recoverTime)
  75. }
  76. // IncrMessages .
  77. func (a *Avr) IncrMessages(c context.Context) (length int, err error) {
  78. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(a.attrs.Databus.Ticker)))
  79. defer ticker.Stop()
  80. for {
  81. select {
  82. case msg, ok := <-a.dtb.Messages():
  83. if !ok {
  84. log.Error("databus: %s binlog consumer exit!!!", a.attrs.Databus)
  85. break
  86. }
  87. m := &model.Message{}
  88. a.commits[msg.Partition] = msg
  89. if err = json.Unmarshal(msg.Value, m); err != nil {
  90. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  91. continue
  92. }
  93. //fmt.Println("origin msg", m)
  94. //log.Info("origin msg: (%v)", m.Table, a.mapData)
  95. if m.Table == a.attrs.Table.TablePrefix {
  96. if m.Action == "insert" || m.Action == "update" {
  97. var parseMap map[string]interface{}
  98. parseMap, err = a.d.JSON2map(m.New)
  99. if err != nil {
  100. log.Error("a.JSON2map error(%v)", err)
  101. continue
  102. }
  103. a.mapData = append(a.mapData, parseMap)
  104. }
  105. }
  106. if len(a.mapData) < a.attrs.Databus.AggCount {
  107. continue
  108. }
  109. case <-ticker.C:
  110. }
  111. break
  112. }
  113. //log.Info("origin msg: (%v)", a.mapData)
  114. if len(a.mapData) > 0 {
  115. //fmt.Println("before", a.mapData)
  116. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "dtb", []string{"archive", "video", "audit", "ups"})
  117. //fmt.Println("after", a.mapData)
  118. log.Info("dtb msg: (%v)", a.mapData)
  119. }
  120. length = len(a.mapData)
  121. return
  122. }
  123. // AllMessages .
  124. func (a *Avr) AllMessages(c context.Context) (length int, err error) {
  125. rows, err := a.db.Query(c, a.attrs.DataSQL.SQLByID, a.offset.RecoverID, a.attrs.Other.Size)
  126. log.Info("appid: %s allMessages Current RecoverID: %d", a.appid, a.offset.RecoverID)
  127. if err != nil {
  128. log.Error("AllMessages db.Query error(%v)", err)
  129. return
  130. }
  131. defer rows.Close()
  132. for rows.Next() {
  133. item, row := dao.InitMapData(a.attrs.DataSQL.DataIndexFields)
  134. if err = rows.Scan(row...); err != nil {
  135. log.Error("AllMessages rows.Scan() error(%v)", err)
  136. return
  137. }
  138. a.mapData = append(a.mapData, item)
  139. }
  140. if len(a.mapData) > 0 {
  141. //fmt.Println("before", a.mapData)
  142. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "db", []string{"audit", "ups"})
  143. if v, ok := a.mapData[len(a.mapData)-1]["_id"]; ok {
  144. a.offset.SetTempOffset(v.(int64), "")
  145. a.offset.SetRecoverTempOffset(v.(int64), "")
  146. }
  147. }
  148. length = len(a.mapData)
  149. return
  150. }
  151. // BulkIndex .
  152. func (a *Avr) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) {
  153. if len(a.mapData) > 0 {
  154. partData := a.mapData[start:end]
  155. err = a.d.BulkDBData(c, a.attrs, writeEntityIndex, partData...)
  156. }
  157. return
  158. }
  159. // Commit commit offset.
  160. func (a *Avr) Commit(c context.Context) (err error) {
  161. if a.c.Business.Index {
  162. err = a.d.CommitOffset(c, a.offset, a.attrs.AppID, a.attrs.Table.TablePrefix)
  163. } else {
  164. for k, cos := range a.commits {
  165. if err = cos.Commit(); err != nil {
  166. log.Error("appid(%s) commit error(%v)", a.attrs.AppID, err)
  167. continue
  168. }
  169. delete(a.commits, k)
  170. }
  171. }
  172. a.mapData = []model.MapData{}
  173. return
  174. }
  175. // Sleep interval duration.
  176. func (a *Avr) Sleep(c context.Context) {
  177. time.Sleep(time.Second * time.Duration(a.attrs.Other.Sleep))
  178. }
  179. // Size return size.
  180. func (a *Avr) Size(c context.Context) int {
  181. return a.attrs.Other.Size
  182. }