app_multiple_databus.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/search/model"
  10. xsql "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "go-common/library/queue/databus"
  13. )
  14. // AppMultipleDatabus .
  15. type AppMultipleDatabus struct {
  16. d *Dao
  17. appid string
  18. attrs *model.Attrs
  19. db *xsql.DB
  20. dtb *databus.Databus
  21. offsets model.LoopOffsets
  22. mapData []model.MapData
  23. tableName []string
  24. indexNameSuffix []string
  25. commits map[int32]*databus.Message
  26. }
  27. // IndexNameSuffix .
  28. func (amd *AppMultipleDatabus) IndexNameSuffix(format string, startDate string) (res []string, err error) {
  29. var (
  30. sTime time.Time
  31. eTime = time.Now()
  32. )
  33. sTime, err = time.Parse(format, startDate)
  34. if err != nil {
  35. log.Error("d.LogAuditIndexName(%v)", startDate)
  36. return
  37. }
  38. resDict := map[string]bool{}
  39. if strings.Contains(format, "02") {
  40. for {
  41. resDict[amd.getIndexName(format, eTime)] = true
  42. eTime = eTime.AddDate(0, 0, -1)
  43. if sTime.After(eTime) {
  44. break
  45. }
  46. }
  47. } else if strings.Contains(format, "week") {
  48. for {
  49. resDict[amd.getIndexName(format, eTime)] = true
  50. eTime = eTime.AddDate(0, 0, -7)
  51. if sTime.After(eTime) {
  52. break
  53. }
  54. }
  55. } else if strings.Contains(format, "01") {
  56. // 1月31日时AddDate(0, -1, 0)会出现错误
  57. year, month, _ := eTime.Date()
  58. hour, min, sec := eTime.Clock()
  59. eTime = time.Date(year, month, 1, hour, min, sec, 0, eTime.Location())
  60. for {
  61. resDict[amd.getIndexName(format, eTime)] = true
  62. eTime = eTime.AddDate(0, -1, 0)
  63. if sTime.After(eTime) {
  64. break
  65. }
  66. }
  67. } else if strings.Contains(format, "2006") {
  68. // 2月29日时AddDate(-1, 0, 0)会出现错误
  69. year, _, _ := eTime.Date()
  70. hour, min, sec := eTime.Clock()
  71. eTime = time.Date(year, 1, 1, hour, min, sec, 0, eTime.Location())
  72. for {
  73. resDict[amd.getIndexName(format, eTime)] = true
  74. eTime = eTime.AddDate(-1, 0, 0)
  75. if sTime.After(eTime) {
  76. break
  77. }
  78. }
  79. }
  80. for k := range resDict {
  81. res = append(res, k)
  82. }
  83. return
  84. }
  85. func (amd *AppMultipleDatabus) getIndexName(format string, time time.Time) (index string) {
  86. var (
  87. week = map[int]string{
  88. 0: "0108",
  89. 1: "0916",
  90. 2: "1724",
  91. 3: "2531",
  92. }
  93. )
  94. return strings.Replace(time.Format(format), "week", week[time.Day()/9], -1)
  95. }
  96. // NewAppMultipleDatabus .
  97. func NewAppMultipleDatabus(d *Dao, appid string) (amd *AppMultipleDatabus) {
  98. var err error
  99. amd = &AppMultipleDatabus{
  100. d: d,
  101. appid: appid,
  102. attrs: d.AttrPool[appid],
  103. offsets: make(map[int]*model.LoopOffset),
  104. tableName: []string{},
  105. indexNameSuffix: []string{},
  106. commits: make(map[int32]*databus.Message),
  107. }
  108. amd.db = d.DBPool[amd.attrs.DBName]
  109. amd.dtb = d.DatabusPool[amd.attrs.Databus.Databus]
  110. if amd.attrs.Table.TableSplit == "int" || amd.attrs.Table.TableSplit == "single" {
  111. for i := amd.attrs.Table.TableFrom; i <= amd.attrs.Table.TableTo; i++ {
  112. tableName := fmt.Sprintf("%s%0"+amd.attrs.Table.TableZero+"d", amd.attrs.Table.TablePrefix, i)
  113. amd.tableName = append(amd.tableName, tableName)
  114. amd.offsets[i] = &model.LoopOffset{}
  115. }
  116. } else {
  117. var tableNameSuffix []string
  118. tableFormat := strings.Split(amd.attrs.Table.TableFormat, ",")
  119. if tableNameSuffix, err = amd.IndexNameSuffix(tableFormat[0], tableFormat[1]); err != nil {
  120. log.Error("amd.IndexNameSuffix(%v)", err)
  121. return
  122. }
  123. for _, v := range tableNameSuffix {
  124. amd.tableName = append(amd.tableName, amd.attrs.Table.TablePrefix+v)
  125. }
  126. for i := range amd.tableName {
  127. amd.offsets[i] = &model.LoopOffset{}
  128. }
  129. }
  130. return
  131. }
  132. // Business return business.
  133. func (amd *AppMultipleDatabus) Business() string {
  134. return amd.attrs.Business
  135. }
  136. // InitIndex .
  137. func (amd *AppMultipleDatabus) InitIndex(c context.Context) {
  138. var (
  139. err error
  140. indexAliasName string
  141. indexEntityName string
  142. )
  143. indexFormat := strings.Split(amd.attrs.Index.IndexFormat, ",")
  144. aliases, aliasErr := amd.d.GetAliases(amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix)
  145. if indexFormat[0] == "int" || indexFormat[0] == "single" {
  146. for i := amd.attrs.Index.IndexFrom; i <= amd.attrs.Index.IndexTo; i++ {
  147. // == "0" 有问题,不通用
  148. if amd.attrs.Index.IndexZero == "0" {
  149. indexAliasName = amd.attrs.Index.IndexAliasPrefix
  150. indexEntityName = amd.attrs.Index.IndexEntityPrefix
  151. } else {
  152. indexAliasName = fmt.Sprintf("%s%0"+amd.attrs.Index.IndexZero+"d", amd.attrs.Index.IndexAliasPrefix, i)
  153. indexEntityName = fmt.Sprintf("%s%0"+amd.attrs.Index.IndexZero+"d", amd.attrs.Index.IndexEntityPrefix, i)
  154. }
  155. if aliasErr != nil {
  156. amd.d.InitIndex(c, nil, amd.attrs.ESName, indexAliasName, indexEntityName, amd.attrs.Index.IndexMapping)
  157. } else {
  158. amd.d.InitIndex(c, aliases, amd.attrs.ESName, indexAliasName, indexEntityName, amd.attrs.Index.IndexMapping)
  159. }
  160. }
  161. } else {
  162. if amd.indexNameSuffix, err = amd.IndexNameSuffix(indexFormat[0], indexFormat[1]); err != nil {
  163. log.Error("amd.IndexNameSuffix(%v)", err)
  164. return
  165. }
  166. for _, v := range amd.indexNameSuffix {
  167. if aliasErr != nil {
  168. amd.d.InitIndex(c, nil, amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix+v, amd.attrs.Index.IndexEntityPrefix+v, amd.attrs.Index.IndexMapping)
  169. } else {
  170. amd.d.InitIndex(c, aliases, amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix+v, amd.attrs.Index.IndexEntityPrefix+v, amd.attrs.Index.IndexMapping)
  171. }
  172. }
  173. }
  174. }
  175. // InitOffset insert init value to offset.
  176. func (amd *AppMultipleDatabus) InitOffset(c context.Context) {
  177. amd.d.InitOffset(c, amd.offsets[0], amd.attrs, amd.tableName)
  178. }
  179. // Offset .
  180. func (amd *AppMultipleDatabus) Offset(c context.Context) {
  181. for i, v := range amd.tableName {
  182. offset, err := amd.d.Offset(c, amd.attrs.AppID, v)
  183. if err != nil {
  184. log.Error("amd.d.offset error(%v)", err)
  185. time.Sleep(time.Second * 3)
  186. }
  187. amd.offsets[i].SetReview(offset.ReviewID, offset.ReviewTime)
  188. amd.offsets[i].SetOffset(offset.OffsetID(), offset.OffsetTime())
  189. }
  190. }
  191. // SetRecover set recover
  192. func (amd *AppMultipleDatabus) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  193. amd.offsets.SetRecoverOffsets(i, recoverID, recoverTime)
  194. }
  195. // IncrMessages .
  196. func (amd *AppMultipleDatabus) IncrMessages(c context.Context) (length int, err error) {
  197. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(amd.attrs.Databus.Ticker)))
  198. defer ticker.Stop()
  199. for {
  200. select {
  201. case msg, ok := <-amd.dtb.Messages():
  202. if !ok {
  203. log.Error("databus: %s binlog consumer exit!!!", amd.attrs.Databus)
  204. break
  205. }
  206. m := &model.Message{}
  207. amd.commits[msg.Partition] = msg
  208. if err = json.Unmarshal(msg.Value, m); err != nil {
  209. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  210. continue
  211. }
  212. if amd.attrs.Business == "creative_reply" {
  213. r, _ := regexp.Compile("reply_\\d+")
  214. if !r.MatchString(m.Table) {
  215. continue
  216. }
  217. }
  218. if (amd.attrs.Table.TableSplit == "string" && m.Table == amd.attrs.Table.TablePrefix) ||
  219. (amd.attrs.Table.TableSplit != "string" && strings.HasPrefix(m.Table, amd.attrs.Table.TablePrefix)) {
  220. if m.Action == "insert" || m.Action == "update" {
  221. var parseMap map[string]interface{}
  222. parseMap, err = amd.d.JSON2map(m.New)
  223. if err != nil {
  224. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  225. continue
  226. }
  227. // esports fav type filter
  228. if amd.attrs.AppID == "esports_fav" {
  229. if t, ok := parseMap["type"]; ok && t.(int64) != 10 {
  230. continue
  231. }
  232. }
  233. // playlist fav type and attr filter
  234. if amd.attrs.AppID == "fav_playlist" {
  235. if t, ok := parseMap["type"]; ok && t.(int64) != 2 {
  236. continue
  237. }
  238. if t, ok := parseMap["attr"]; ok {
  239. if t.(int64)>>0&1 == 0 || (m.Action == "insert" && t.(int64)>>1&1 == 1) {
  240. continue
  241. }
  242. }
  243. }
  244. var newParseMap map[string]interface{}
  245. newParseMap, err = amd.newParseMap(c, m.Table, parseMap)
  246. if err != nil {
  247. if amd.attrs.AppID == "creative_reply" {
  248. continue
  249. }
  250. log.Error("amd.newParseMap error(%v)", err)
  251. continue
  252. }
  253. amd.mapData = append(amd.mapData, newParseMap)
  254. }
  255. }
  256. if len(amd.mapData) < amd.attrs.Databus.AggCount {
  257. continue
  258. }
  259. case <-ticker.C:
  260. }
  261. break
  262. }
  263. if len(amd.mapData) > 0 {
  264. amd.mapData, err = amd.d.ExtraData(c, amd.mapData, amd.attrs, "dtb", []string{})
  265. }
  266. length = len(amd.mapData)
  267. //amd.d.extraData(c, amd, "dtb")
  268. return
  269. }
  270. // AllMessages .
  271. func (amd *AppMultipleDatabus) AllMessages(c context.Context) (length int, err error) {
  272. amd.mapData = []model.MapData{}
  273. for i, v := range amd.tableName {
  274. var (
  275. rows *xsql.Rows
  276. sql string
  277. )
  278. tableFormat := strings.Split(amd.attrs.Table.TableFormat, ",")
  279. if amd.attrs.AppID == "dm_search" || amd.attrs.AppID == "dm" {
  280. sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, i, i)
  281. } else if tableFormat[0] == "int" || tableFormat[0] == "single" { // 兼容只传后缀,不传表名
  282. sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, i)
  283. log.Info(sql, amd.offsets[i].OffsetID, amd.attrs.Other.Size)
  284. } else {
  285. sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, v)
  286. }
  287. if rows, err = amd.db.Query(c, sql, amd.offsets[i].OffsetID, amd.attrs.Other.Size); err != nil {
  288. log.Error("AllMessages db.Query error(%v)", err)
  289. return
  290. }
  291. tempList := []model.MapData{}
  292. for rows.Next() {
  293. item, row := InitMapData(amd.attrs.DataSQL.DataIndexFields)
  294. if err = rows.Scan(row...); err != nil {
  295. log.Error("AppMultipleDatabus.AllMessages rows.Scan() error(%v)", err)
  296. continue
  297. }
  298. var newParseMap map[string]interface{}
  299. newParseMap, err = amd.newParseMap(c, v, item)
  300. if err != nil {
  301. log.Error("amd.newParseMap error(%v)", err)
  302. continue
  303. }
  304. tempList = append(tempList, newParseMap)
  305. amd.mapData = append(amd.mapData, newParseMap)
  306. }
  307. rows.Close()
  308. tmpLength := len(tempList)
  309. if tmpLength > 0 {
  310. amd.offsets[i].SetTempOffset(tempList[tmpLength-1].PrimaryID(), tempList[tmpLength-1].StrMTime())
  311. }
  312. }
  313. if len(amd.mapData) > 0 {
  314. amd.mapData, err = amd.d.ExtraData(c, amd.mapData, amd.attrs, "db", []string{})
  315. }
  316. length = len(amd.mapData)
  317. //amd.d.extraData(c, amd, "db")
  318. return
  319. }
  320. // BulkIndex .
  321. func (amd *AppMultipleDatabus) BulkIndex(c context.Context, start int, end int, writeEntityIndex bool) (err error) {
  322. partData := amd.mapData[start:end]
  323. if amd.d.c.Business.Index {
  324. err = amd.d.BulkDBData(c, amd.attrs, writeEntityIndex, partData...)
  325. } else {
  326. err = amd.d.BulkDatabusData(c, amd.attrs, writeEntityIndex, partData...)
  327. }
  328. return
  329. }
  330. // Commit .
  331. func (amd *AppMultipleDatabus) Commit(c context.Context) (err error) {
  332. if amd.d.c.Business.Index {
  333. if amd.attrs.Table.TableSplit == "int" || amd.attrs.Table.TableSplit == "single" { // 兼容只传后缀,不传表名
  334. for i := amd.attrs.Table.TableFrom; i <= amd.attrs.Table.TableTo; i++ {
  335. tableName := fmt.Sprintf("%s%0"+amd.attrs.Table.TableZero+"d", amd.attrs.Table.TablePrefix, i)
  336. if err = amd.d.CommitOffset(c, amd.offsets[i], amd.attrs.AppID, tableName); err != nil {
  337. log.Error("AppMultipleDatabus.Commit error(%v)", err)
  338. continue
  339. }
  340. }
  341. } else {
  342. for i, v := range amd.indexNameSuffix {
  343. if err = amd.d.CommitOffset(c, amd.offsets[i], amd.attrs.AppID, v); err != nil {
  344. log.Error("Commit error(%v)", err)
  345. continue
  346. }
  347. }
  348. }
  349. } else {
  350. for k, c := range amd.commits {
  351. if err = c.Commit(); err != nil {
  352. log.Error("AppMultipleDatabus.Commit error(%v)", err)
  353. continue
  354. }
  355. delete(amd.commits, k)
  356. }
  357. }
  358. amd.mapData = []model.MapData{}
  359. return
  360. }
  361. // Sleep .
  362. func (amd *AppMultipleDatabus) Sleep(c context.Context) {
  363. time.Sleep(time.Second * time.Duration(amd.attrs.Other.Sleep))
  364. }
  365. // Size .
  366. func (amd *AppMultipleDatabus) Size(c context.Context) (size int) {
  367. return amd.attrs.Other.Size
  368. }
  369. // indexField .
  370. // func (amd *AppMultipleDatabus) indexField(c context.Context, tableName string) (fieldName string, fieldValue int) {
  371. // suffix, _ := strconv.Atoi(strings.Split(tableName, "_")[2])
  372. // s := strings.Split(amd.attrs.DataSQL.DataIndexSuffix, ";")
  373. // v := strings.Split(s[1], ":")
  374. // fieldName = v[0]
  375. // indexNum, _ := strconv.Atoi(v[2])
  376. // fieldValue = suffix + indexNum
  377. // return
  378. // }
  379. // newParseMap .
  380. func (amd *AppMultipleDatabus) newParseMap(c context.Context, table string, parseMap map[string]interface{}) (res map[string]interface{}, err error) {
  381. res = parseMap
  382. //TODO 实体索引写不进去
  383. if (amd.attrs.AppID == "dm_search" || amd.attrs.AppID == "dm") && !amd.d.c.Business.Index {
  384. indexSuffix := strings.Split(table, "_")[2]
  385. res["index_name"] = amd.attrs.Index.IndexAliasPrefix + indexSuffix
  386. if _, ok := res["msg"]; ok {
  387. // dm_content_
  388. res["index_field"] = true // 删除ctime
  389. res["index_id"] = fmt.Sprintf("%v", res["dmid"])
  390. } else {
  391. // dm_index_
  392. res["index_id"] = fmt.Sprintf("%v", res["id"])
  393. }
  394. } else if amd.attrs.AppID == "dmreport" {
  395. if ztime, ok := res["ctime"].(*interface{}); ok { // 数据库
  396. if ctime, cok := (*ztime).(time.Time); cok {
  397. res["index_name"] = amd.attrs.Index.IndexAliasPrefix + ctime.Format("2006")
  398. }
  399. } else if ztime, ok := res["ctime"].(string); ok { // databus
  400. var ctime time.Time
  401. if ctime, err = time.Parse("2006-01-02 15:04:05", ztime); err == nil {
  402. res["index_name"] = amd.attrs.Index.IndexAliasPrefix + ctime.Format("2006")
  403. }
  404. }
  405. } else if amd.attrs.AppID == "creative_reply" && !amd.d.c.Business.Index {
  406. if replyType, ok := res["type"].(int64); ok {
  407. if replyType == 1 || replyType == 12 || replyType == 14 {
  408. } else {
  409. err = fmt.Errorf("多余数据")
  410. }
  411. } else {
  412. err = fmt.Errorf("错误数据")
  413. }
  414. } else if amd.attrs.Index.IndexSplit == "single" {
  415. res["index_name"] = amd.attrs.Index.IndexAliasPrefix
  416. } else {
  417. indexSuffix := string([]rune(table)[strings.Count(amd.attrs.Table.TablePrefix, "")-1:])
  418. res["index_name"] = amd.attrs.Index.IndexAliasPrefix + indexSuffix
  419. }
  420. //dtb index_id
  421. if amd.attrs.AppID == "favorite" && !amd.d.c.Business.Index {
  422. if fid, ok := res["fid"].(int64); ok {
  423. if oid, ok := res["oid"].(int64); ok {
  424. res["index_id"] = fmt.Sprintf("%d_%d", fid, oid)
  425. return
  426. }
  427. }
  428. res["index_id"] = "err"
  429. res["indexName"] = ""
  430. }
  431. return
  432. }