aegis_resource.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package business
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/search/conf"
  8. "go-common/app/job/main/search/dao"
  9. "go-common/app/job/main/search/model"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "go-common/library/queue/databus"
  13. )
  14. // AegisResource single table consume databus.
  15. type AegisResource struct {
  16. d *dao.Dao
  17. c *conf.Config
  18. appid string
  19. attrs *model.Attrs
  20. db *xsql.DB
  21. dtb *databus.Databus
  22. offset *model.LoopOffset
  23. mapData []model.MapData
  24. commits map[int32]*databus.Message
  25. }
  26. // NewAppDatabus .
  27. func NewAegisResource(d *dao.Dao, appid string, c *conf.Config) (a *AegisResource) {
  28. a = &AegisResource{
  29. d: d,
  30. c: c,
  31. appid: appid,
  32. attrs: d.AttrPool[appid],
  33. offset: &model.LoopOffset{},
  34. mapData: []model.MapData{},
  35. db: d.DBPool[d.AttrPool[appid].DBName],
  36. dtb: d.DatabusPool[d.AttrPool[appid].Databus.Databus],
  37. commits: make(map[int32]*databus.Message),
  38. }
  39. return
  40. }
  41. // Business return business.
  42. func (a *AegisResource) Business() string {
  43. return a.attrs.Business
  44. }
  45. // InitIndex init index.
  46. func (a *AegisResource) InitIndex(c context.Context) {
  47. if aliases, err := a.d.GetAliases(a.attrs.ESName, a.attrs.Index.IndexAliasPrefix); err != nil {
  48. a.d.InitIndex(c, nil, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexEntityPrefix, a.attrs.Index.IndexMapping)
  49. } else {
  50. a.d.InitIndex(c, aliases, a.attrs.ESName, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexAliasPrefix, a.attrs.Index.IndexMapping)
  51. }
  52. }
  53. // InitOffset insert init value to offset.
  54. func (a *AegisResource) InitOffset(c context.Context) {
  55. a.d.InitOffset(c, a.offset, a.attrs, []string{})
  56. nowFormat := time.Now().Format("2006-01-02 15:04:05")
  57. a.offset.SetOffset(0, nowFormat)
  58. }
  59. // Offset get offset.
  60. func (a *AegisResource) Offset(c context.Context) {
  61. for {
  62. offset, err := a.d.Offset(c, a.appid, a.attrs.Table.TablePrefix)
  63. if err != nil {
  64. log.Error("a.d.Offset error(%v)", err)
  65. time.Sleep(time.Second * 3)
  66. continue
  67. }
  68. a.offset.SetReview(offset.ReviewID, offset.ReviewTime)
  69. a.offset.SetOffset(offset.OffsetID(), offset.OffsetTime())
  70. break
  71. }
  72. }
  73. // SetRecover set recover
  74. func (a *AegisResource) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  75. a.offset.SetRecoverOffset(recoverID, recoverTime)
  76. }
  77. // IncrMessages .
  78. func (a *AegisResource) IncrMessages(c context.Context) (length int, err error) {
  79. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(a.attrs.Databus.Ticker)))
  80. defer ticker.Stop()
  81. for {
  82. select {
  83. case msg, ok := <-a.dtb.Messages():
  84. if !ok {
  85. log.Error("databus: %s binlog consumer exit!!!", a.attrs.Databus)
  86. break
  87. }
  88. m := &model.Message{}
  89. a.commits[msg.Partition] = msg
  90. if err = json.Unmarshal(msg.Value, m); err != nil {
  91. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  92. continue
  93. }
  94. if m.Table == "resource" || m.Table == "resource_result" || m.Table == "net_flow_resource" {
  95. if m.Action == "insert" || m.Action == "update" {
  96. var parseMap map[string]interface{}
  97. parseMap, err = a.d.JSON2map(m.New)
  98. if _, ok := parseMap["rid"]; ok {
  99. parseMap["_id"] = parseMap["rid"]
  100. parseMap["id"] = parseMap["rid"]
  101. }
  102. if _, sok := parseMap["state"]; m.Table != "resource_result" && sok {
  103. delete(parseMap, "state")
  104. }
  105. log.Info(fmt.Sprintf("%v: %+v", a.attrs.AppID, parseMap))
  106. if err != nil {
  107. log.Error("a.JSON2map error(%v)", err)
  108. continue
  109. }
  110. a.mapData = append(a.mapData, parseMap)
  111. }
  112. }
  113. if len(a.mapData) < a.attrs.Databus.AggCount {
  114. continue
  115. }
  116. case <-ticker.C:
  117. }
  118. break
  119. }
  120. if len(a.mapData) > 0 {
  121. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "dtb", []string{})
  122. }
  123. length = len(a.mapData)
  124. return
  125. }
  126. // AllMessages .
  127. func (a *AegisResource) AllMessages(c context.Context) (length int, err error) {
  128. rows, err := a.db.Query(c, a.attrs.DataSQL.SQLByID, a.offset.OffsetID, a.attrs.Other.Size)
  129. log.Info("appid: %s allMessages Current offsetID: %d", a.appid, a.offset.OffsetID)
  130. if err != nil {
  131. log.Error("AllMessages db.Query error(%v)", err)
  132. return
  133. }
  134. defer rows.Close()
  135. for rows.Next() {
  136. item, row := dao.InitMapData(a.attrs.DataSQL.DataIndexFields)
  137. if err = rows.Scan(row...); err != nil {
  138. log.Error("AllMessages rows.Scan() error(%v)", err)
  139. return
  140. }
  141. a.mapData = append(a.mapData, item)
  142. }
  143. if len(a.mapData) > 0 {
  144. a.mapData, err = a.d.ExtraData(c, a.mapData, a.attrs, "db", []string{})
  145. // offset
  146. if v, ok := a.mapData[len(a.mapData)-1]["_id"]; ok && v != nil {
  147. if v2, ok := v.(interface{}); ok {
  148. a.offset.SetTempOffset((v2).(int64), "")
  149. a.offset.SetRecoverTempOffset((v2).(int64), "")
  150. } else {
  151. log.Error("dtb.all._id interface error")
  152. }
  153. } else {
  154. log.Error("dtb.all._id nil error")
  155. }
  156. }
  157. length = len(a.mapData)
  158. return
  159. }
  160. // BulkIndex .
  161. func (a *AegisResource) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) {
  162. partData := a.mapData[start:end]
  163. if a.c.Business.Index {
  164. err = a.d.BulkDBData(c, a.attrs, writeEntityIndex, partData...)
  165. } else {
  166. err = a.d.BulkDatabusData(c, a.attrs, writeEntityIndex, partData...)
  167. }
  168. return
  169. }
  170. // Commit commit offset.
  171. func (a *AegisResource) Commit(c context.Context) (err error) {
  172. if a.c.Business.Index {
  173. err = a.d.CommitOffset(c, a.offset, a.attrs.AppID, a.attrs.Table.TablePrefix)
  174. } else {
  175. for k, cos := range a.commits {
  176. if err = cos.Commit(); err != nil {
  177. log.Error("appid(%s) commit error(%v)", a.attrs.AppID, err)
  178. continue
  179. }
  180. delete(a.commits, k)
  181. }
  182. }
  183. a.mapData = []model.MapData{}
  184. return
  185. }
  186. // Sleep interval duration.
  187. func (a *AegisResource) Sleep(c context.Context) {
  188. time.Sleep(time.Second * time.Duration(a.attrs.Other.Sleep))
  189. }
  190. // Size return size.
  191. func (a *AegisResource) Size(c context.Context) int {
  192. return a.attrs.Other.Size
  193. }