123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- package dao
- import (
- "context"
- "encoding/json"
- "fmt"
- "regexp"
- "strings"
- "time"
- "go-common/app/job/main/search/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- "go-common/library/queue/databus"
- )
- // AppMultipleDatabus .
- type AppMultipleDatabus struct {
- d *Dao
- appid string
- attrs *model.Attrs
- db *xsql.DB
- dtb *databus.Databus
- offsets model.LoopOffsets
- mapData []model.MapData
- tableName []string
- indexNameSuffix []string
- commits map[int32]*databus.Message
- }
- // IndexNameSuffix .
- func (amd *AppMultipleDatabus) IndexNameSuffix(format string, startDate string) (res []string, err error) {
- var (
- sTime time.Time
- eTime = time.Now()
- )
- sTime, err = time.Parse(format, startDate)
- if err != nil {
- log.Error("d.LogAuditIndexName(%v)", startDate)
- return
- }
- resDict := map[string]bool{}
- if strings.Contains(format, "02") {
- for {
- resDict[amd.getIndexName(format, eTime)] = true
- eTime = eTime.AddDate(0, 0, -1)
- if sTime.After(eTime) {
- break
- }
- }
- } else if strings.Contains(format, "week") {
- for {
- resDict[amd.getIndexName(format, eTime)] = true
- eTime = eTime.AddDate(0, 0, -7)
- if sTime.After(eTime) {
- break
- }
- }
- } else if strings.Contains(format, "01") {
- // 1月31日时AddDate(0, -1, 0)会出现错误
- year, month, _ := eTime.Date()
- hour, min, sec := eTime.Clock()
- eTime = time.Date(year, month, 1, hour, min, sec, 0, eTime.Location())
- for {
- resDict[amd.getIndexName(format, eTime)] = true
- eTime = eTime.AddDate(0, -1, 0)
- if sTime.After(eTime) {
- break
- }
- }
- } else if strings.Contains(format, "2006") {
- // 2月29日时AddDate(-1, 0, 0)会出现错误
- year, _, _ := eTime.Date()
- hour, min, sec := eTime.Clock()
- eTime = time.Date(year, 1, 1, hour, min, sec, 0, eTime.Location())
- for {
- resDict[amd.getIndexName(format, eTime)] = true
- eTime = eTime.AddDate(-1, 0, 0)
- if sTime.After(eTime) {
- break
- }
- }
- }
- for k := range resDict {
- res = append(res, k)
- }
- return
- }
- func (amd *AppMultipleDatabus) getIndexName(format string, time time.Time) (index string) {
- var (
- week = map[int]string{
- 0: "0108",
- 1: "0916",
- 2: "1724",
- 3: "2531",
- }
- )
- return strings.Replace(time.Format(format), "week", week[time.Day()/9], -1)
- }
- // NewAppMultipleDatabus .
- func NewAppMultipleDatabus(d *Dao, appid string) (amd *AppMultipleDatabus) {
- var err error
- amd = &AppMultipleDatabus{
- d: d,
- appid: appid,
- attrs: d.AttrPool[appid],
- offsets: make(map[int]*model.LoopOffset),
- tableName: []string{},
- indexNameSuffix: []string{},
- commits: make(map[int32]*databus.Message),
- }
- amd.db = d.DBPool[amd.attrs.DBName]
- amd.dtb = d.DatabusPool[amd.attrs.Databus.Databus]
- if amd.attrs.Table.TableSplit == "int" || amd.attrs.Table.TableSplit == "single" {
- for i := amd.attrs.Table.TableFrom; i <= amd.attrs.Table.TableTo; i++ {
- tableName := fmt.Sprintf("%s%0"+amd.attrs.Table.TableZero+"d", amd.attrs.Table.TablePrefix, i)
- amd.tableName = append(amd.tableName, tableName)
- amd.offsets[i] = &model.LoopOffset{}
- }
- } else {
- var tableNameSuffix []string
- tableFormat := strings.Split(amd.attrs.Table.TableFormat, ",")
- if tableNameSuffix, err = amd.IndexNameSuffix(tableFormat[0], tableFormat[1]); err != nil {
- log.Error("amd.IndexNameSuffix(%v)", err)
- return
- }
- for _, v := range tableNameSuffix {
- amd.tableName = append(amd.tableName, amd.attrs.Table.TablePrefix+v)
- }
- for i := range amd.tableName {
- amd.offsets[i] = &model.LoopOffset{}
- }
- }
- return
- }
- // Business return business.
- func (amd *AppMultipleDatabus) Business() string {
- return amd.attrs.Business
- }
- // InitIndex .
- func (amd *AppMultipleDatabus) InitIndex(c context.Context) {
- var (
- err error
- indexAliasName string
- indexEntityName string
- )
- indexFormat := strings.Split(amd.attrs.Index.IndexFormat, ",")
- aliases, aliasErr := amd.d.GetAliases(amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix)
- if indexFormat[0] == "int" || indexFormat[0] == "single" {
- for i := amd.attrs.Index.IndexFrom; i <= amd.attrs.Index.IndexTo; i++ {
- // == "0" 有问题,不通用
- if amd.attrs.Index.IndexZero == "0" {
- indexAliasName = amd.attrs.Index.IndexAliasPrefix
- indexEntityName = amd.attrs.Index.IndexEntityPrefix
- } else {
- indexAliasName = fmt.Sprintf("%s%0"+amd.attrs.Index.IndexZero+"d", amd.attrs.Index.IndexAliasPrefix, i)
- indexEntityName = fmt.Sprintf("%s%0"+amd.attrs.Index.IndexZero+"d", amd.attrs.Index.IndexEntityPrefix, i)
- }
- if aliasErr != nil {
- amd.d.InitIndex(c, nil, amd.attrs.ESName, indexAliasName, indexEntityName, amd.attrs.Index.IndexMapping)
- } else {
- amd.d.InitIndex(c, aliases, amd.attrs.ESName, indexAliasName, indexEntityName, amd.attrs.Index.IndexMapping)
- }
- }
- } else {
- if amd.indexNameSuffix, err = amd.IndexNameSuffix(indexFormat[0], indexFormat[1]); err != nil {
- log.Error("amd.IndexNameSuffix(%v)", err)
- return
- }
- for _, v := range amd.indexNameSuffix {
- if aliasErr != nil {
- amd.d.InitIndex(c, nil, amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix+v, amd.attrs.Index.IndexEntityPrefix+v, amd.attrs.Index.IndexMapping)
- } else {
- amd.d.InitIndex(c, aliases, amd.attrs.ESName, amd.attrs.Index.IndexAliasPrefix+v, amd.attrs.Index.IndexEntityPrefix+v, amd.attrs.Index.IndexMapping)
- }
- }
- }
- }
- // InitOffset insert init value to offset.
- func (amd *AppMultipleDatabus) InitOffset(c context.Context) {
- amd.d.InitOffset(c, amd.offsets[0], amd.attrs, amd.tableName)
- }
- // Offset .
- func (amd *AppMultipleDatabus) Offset(c context.Context) {
- for i, v := range amd.tableName {
- offset, err := amd.d.Offset(c, amd.attrs.AppID, v)
- if err != nil {
- log.Error("amd.d.offset error(%v)", err)
- time.Sleep(time.Second * 3)
- }
- amd.offsets[i].SetReview(offset.ReviewID, offset.ReviewTime)
- amd.offsets[i].SetOffset(offset.OffsetID(), offset.OffsetTime())
- }
- }
- // SetRecover set recover
- func (amd *AppMultipleDatabus) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
- amd.offsets.SetRecoverOffsets(i, recoverID, recoverTime)
- }
- // IncrMessages .
- func (amd *AppMultipleDatabus) IncrMessages(c context.Context) (length int, err error) {
- ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(amd.attrs.Databus.Ticker)))
- defer ticker.Stop()
- for {
- select {
- case msg, ok := <-amd.dtb.Messages():
- if !ok {
- log.Error("databus: %s binlog consumer exit!!!", amd.attrs.Databus)
- break
- }
- m := &model.Message{}
- amd.commits[msg.Partition] = msg
- if err = json.Unmarshal(msg.Value, m); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- continue
- }
- if amd.attrs.Business == "creative_reply" {
- r, _ := regexp.Compile("reply_\\d+")
- if !r.MatchString(m.Table) {
- continue
- }
- }
- if (amd.attrs.Table.TableSplit == "string" && m.Table == amd.attrs.Table.TablePrefix) ||
- (amd.attrs.Table.TableSplit != "string" && strings.HasPrefix(m.Table, amd.attrs.Table.TablePrefix)) {
- if m.Action == "insert" || m.Action == "update" {
- var parseMap map[string]interface{}
- parseMap, err = amd.d.JSON2map(m.New)
- if err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- continue
- }
- // esports fav type filter
- if amd.attrs.AppID == "esports_fav" {
- if t, ok := parseMap["type"]; ok && t.(int64) != 10 {
- continue
- }
- }
- // playlist fav type and attr filter
- if amd.attrs.AppID == "fav_playlist" {
- if t, ok := parseMap["type"]; ok && t.(int64) != 2 {
- continue
- }
- if t, ok := parseMap["attr"]; ok {
- if t.(int64)>>0&1 == 0 || (m.Action == "insert" && t.(int64)>>1&1 == 1) {
- continue
- }
- }
- }
- var newParseMap map[string]interface{}
- newParseMap, err = amd.newParseMap(c, m.Table, parseMap)
- if err != nil {
- if amd.attrs.AppID == "creative_reply" {
- continue
- }
- log.Error("amd.newParseMap error(%v)", err)
- continue
- }
- amd.mapData = append(amd.mapData, newParseMap)
- }
- }
- if len(amd.mapData) < amd.attrs.Databus.AggCount {
- continue
- }
- case <-ticker.C:
- }
- break
- }
- if len(amd.mapData) > 0 {
- amd.mapData, err = amd.d.ExtraData(c, amd.mapData, amd.attrs, "dtb", []string{})
- }
- length = len(amd.mapData)
- //amd.d.extraData(c, amd, "dtb")
- return
- }
- // AllMessages .
- func (amd *AppMultipleDatabus) AllMessages(c context.Context) (length int, err error) {
- amd.mapData = []model.MapData{}
- for i, v := range amd.tableName {
- var (
- rows *xsql.Rows
- sql string
- )
- tableFormat := strings.Split(amd.attrs.Table.TableFormat, ",")
- if amd.attrs.AppID == "dm_search" || amd.attrs.AppID == "dm" {
- sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, i, i)
- } else if tableFormat[0] == "int" || tableFormat[0] == "single" { // 兼容只传后缀,不传表名
- sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, i)
- log.Info(sql, amd.offsets[i].OffsetID, amd.attrs.Other.Size)
- } else {
- sql = fmt.Sprintf(amd.attrs.DataSQL.SQLByID, amd.attrs.DataSQL.SQLFields, v)
- }
- if rows, err = amd.db.Query(c, sql, amd.offsets[i].OffsetID, amd.attrs.Other.Size); err != nil {
- log.Error("AllMessages db.Query error(%v)", err)
- return
- }
- tempList := []model.MapData{}
- for rows.Next() {
- item, row := InitMapData(amd.attrs.DataSQL.DataIndexFields)
- if err = rows.Scan(row...); err != nil {
- log.Error("AppMultipleDatabus.AllMessages rows.Scan() error(%v)", err)
- continue
- }
- var newParseMap map[string]interface{}
- newParseMap, err = amd.newParseMap(c, v, item)
- if err != nil {
- log.Error("amd.newParseMap error(%v)", err)
- continue
- }
- tempList = append(tempList, newParseMap)
- amd.mapData = append(amd.mapData, newParseMap)
- }
- rows.Close()
- tmpLength := len(tempList)
- if tmpLength > 0 {
- amd.offsets[i].SetTempOffset(tempList[tmpLength-1].PrimaryID(), tempList[tmpLength-1].StrMTime())
- }
- }
- if len(amd.mapData) > 0 {
- amd.mapData, err = amd.d.ExtraData(c, amd.mapData, amd.attrs, "db", []string{})
- }
- length = len(amd.mapData)
- //amd.d.extraData(c, amd, "db")
- return
- }
- // BulkIndex .
- func (amd *AppMultipleDatabus) BulkIndex(c context.Context, start int, end int, writeEntityIndex bool) (err error) {
- partData := amd.mapData[start:end]
- if amd.d.c.Business.Index {
- err = amd.d.BulkDBData(c, amd.attrs, writeEntityIndex, partData...)
- } else {
- err = amd.d.BulkDatabusData(c, amd.attrs, writeEntityIndex, partData...)
- }
- return
- }
- // Commit .
- func (amd *AppMultipleDatabus) Commit(c context.Context) (err error) {
- if amd.d.c.Business.Index {
- if amd.attrs.Table.TableSplit == "int" || amd.attrs.Table.TableSplit == "single" { // 兼容只传后缀,不传表名
- for i := amd.attrs.Table.TableFrom; i <= amd.attrs.Table.TableTo; i++ {
- tableName := fmt.Sprintf("%s%0"+amd.attrs.Table.TableZero+"d", amd.attrs.Table.TablePrefix, i)
- if err = amd.d.CommitOffset(c, amd.offsets[i], amd.attrs.AppID, tableName); err != nil {
- log.Error("AppMultipleDatabus.Commit error(%v)", err)
- continue
- }
- }
- } else {
- for i, v := range amd.indexNameSuffix {
- if err = amd.d.CommitOffset(c, amd.offsets[i], amd.attrs.AppID, v); err != nil {
- log.Error("Commit error(%v)", err)
- continue
- }
- }
- }
- } else {
- for k, c := range amd.commits {
- if err = c.Commit(); err != nil {
- log.Error("AppMultipleDatabus.Commit error(%v)", err)
- continue
- }
- delete(amd.commits, k)
- }
- }
- amd.mapData = []model.MapData{}
- return
- }
- // Sleep .
- func (amd *AppMultipleDatabus) Sleep(c context.Context) {
- time.Sleep(time.Second * time.Duration(amd.attrs.Other.Sleep))
- }
- // Size .
- func (amd *AppMultipleDatabus) Size(c context.Context) (size int) {
- return amd.attrs.Other.Size
- }
- // indexField .
- // func (amd *AppMultipleDatabus) indexField(c context.Context, tableName string) (fieldName string, fieldValue int) {
- // suffix, _ := strconv.Atoi(strings.Split(tableName, "_")[2])
- // s := strings.Split(amd.attrs.DataSQL.DataIndexSuffix, ";")
- // v := strings.Split(s[1], ":")
- // fieldName = v[0]
- // indexNum, _ := strconv.Atoi(v[2])
- // fieldValue = suffix + indexNum
- // return
- // }
- // newParseMap .
- func (amd *AppMultipleDatabus) newParseMap(c context.Context, table string, parseMap map[string]interface{}) (res map[string]interface{}, err error) {
- res = parseMap
- //TODO 实体索引写不进去
- if (amd.attrs.AppID == "dm_search" || amd.attrs.AppID == "dm") && !amd.d.c.Business.Index {
- indexSuffix := strings.Split(table, "_")[2]
- res["index_name"] = amd.attrs.Index.IndexAliasPrefix + indexSuffix
- if _, ok := res["msg"]; ok {
- // dm_content_
- res["index_field"] = true // 删除ctime
- res["index_id"] = fmt.Sprintf("%v", res["dmid"])
- } else {
- // dm_index_
- res["index_id"] = fmt.Sprintf("%v", res["id"])
- }
- } else if amd.attrs.AppID == "dmreport" {
- if ztime, ok := res["ctime"].(*interface{}); ok { // 数据库
- if ctime, cok := (*ztime).(time.Time); cok {
- res["index_name"] = amd.attrs.Index.IndexAliasPrefix + ctime.Format("2006")
- }
- } else if ztime, ok := res["ctime"].(string); ok { // databus
- var ctime time.Time
- if ctime, err = time.Parse("2006-01-02 15:04:05", ztime); err == nil {
- res["index_name"] = amd.attrs.Index.IndexAliasPrefix + ctime.Format("2006")
- }
- }
- } else if amd.attrs.AppID == "creative_reply" && !amd.d.c.Business.Index {
- if replyType, ok := res["type"].(int64); ok {
- if replyType == 1 || replyType == 12 || replyType == 14 {
- } else {
- err = fmt.Errorf("多余数据")
- }
- } else {
- err = fmt.Errorf("错误数据")
- }
- } else if amd.attrs.Index.IndexSplit == "single" {
- res["index_name"] = amd.attrs.Index.IndexAliasPrefix
- } else {
- indexSuffix := string([]rune(table)[strings.Count(amd.attrs.Table.TablePrefix, "")-1:])
- res["index_name"] = amd.attrs.Index.IndexAliasPrefix + indexSuffix
- }
- //dtb index_id
- if amd.attrs.AppID == "favorite" && !amd.d.c.Business.Index {
- if fid, ok := res["fid"].(int64); ok {
- if oid, ok := res["oid"].(int64); ok {
- res["index_id"] = fmt.Sprintf("%d_%d", fid, oid)
- return
- }
- }
- res["index_id"] = "err"
- res["indexName"] = ""
- }
- return
- }
|