123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- package business
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "time"
- "go-common/app/job/main/search/dao"
- "go-common/app/job/main/search/model"
- "go-common/library/log"
- "go-common/library/log/infoc"
- "go-common/library/queue/databus"
- "gopkg.in/olivere/elastic.v5"
- )
- const _sql = "SELECT id, index_format, index_version, index_cluster, additional_mapping, data_center FROM digger_"
- // Log .
- type Log struct {
- d *dao.Dao
- appid string
- attrs *model.Attrs
- databus *databus.Databus
- infoC *infoc.Infoc
- infoCField []string
- mapData []model.MapData
- commits map[int32]*databus.Message
- business map[int]*info
- week map[int]string
- additionalMapping map[int]map[string]string
- defaultMapping map[string]string
- mapping map[int]map[string]string
- }
- type info struct {
- Format string
- Cluster string
- Version string
- DataCenter int8
- }
- // NewLog .
- func NewLog(d *dao.Dao, appid string) (l *Log) {
- l = &Log{
- d: d,
- appid: appid,
- attrs: d.AttrPool[appid],
- databus: d.DatabusPool[appid],
- infoC: d.InfoCPool[appid],
- infoCField: []string{},
- mapData: []model.MapData{},
- commits: map[int32]*databus.Message{},
- business: map[int]*info{},
- additionalMapping: map[int]map[string]string{},
- mapping: map[int]map[string]string{},
- week: map[int]string{
- 0: "0107",
- 1: "0815",
- 2: "1623",
- 3: "2431",
- },
- }
- switch appid {
- case "log_audit":
- l.defaultMapping = map[string]string{
- "uname": "string",
- "uid": "string",
- "business": "string",
- "type": "string",
- "oid": "string",
- "action": "string",
- "ctime": "time",
- "int_0": "int",
- "int_1": "int",
- "int_2": "int",
- "str_0": "string",
- "str_1": "string",
- "str_2": "string",
- "extra_data": "string",
- }
- l.infoCField = []string{"uname", "uid", "business", "type", "oid", "action", "ctime",
- "int_0", "int_1", "int_2", "str_0", "str_1", "str_2", "str_3", "str_4", "extra_data"}
- case "log_user_action":
- l.defaultMapping = map[string]string{
- "mid": "string",
- "platform": "string",
- "build": "string",
- "buvid": "string",
- "business": "string",
- "type": "string",
- "oid": "string",
- "action": "string",
- "ip": "string",
- "ctime": "time",
- "int_0": "int",
- "int_1": "int",
- "int_2": "int",
- "str_0": "string",
- "str_1": "string",
- "str_2": "string",
- "extra_data": "string",
- }
- l.infoCField = []string{"mid", "platform", "build", "buvid", "business", "type", "oid", "action", "ip", "ctime",
- "int_0", "int_1", "int_2", "str_0", "str_1", "str_2", "extra_data"}
- default:
- log.Error("log appid error(%v)", appid)
- return
- }
- rows, err := d.SearchDB.Query(context.TODO(), _sql+appid)
- if err != nil {
- log.Error("log Query error(%v)", appid)
- return
- }
- defer rows.Close()
- for rows.Next() {
- var (
- id int
- additionalMapping string
- )
- info := &info{}
- if err = rows.Scan(&id, &info.Format, &info.Version, &info.Cluster, &additionalMapping, &info.DataCenter); err != nil {
- log.Error("Log New DB (%v)(%v)", id, err)
- continue
- }
- l.business[id] = info
- if additionalMapping != "" {
- var additionalMappingDict map[string]string
- if err = json.Unmarshal([]byte(additionalMapping), &additionalMappingDict); err != nil {
- log.Error("Log New Json (%v)(%v)", id, err)
- continue
- }
- l.additionalMapping[id] = additionalMappingDict
- }
- }
- for b := range l.business {
- l.mapping[b] = map[string]string{}
- for k, v := range l.defaultMapping {
- l.mapping[b][k] = v
- }
- if a, ok := l.additionalMapping[b]; ok {
- for k, v := range a {
- l.mapping[b][k] = v
- }
- }
- }
- return
- }
- // Business return business.
- func (l *Log) Business() string {
- return l.attrs.Business
- }
- // InitIndex .
- func (l *Log) InitIndex(c context.Context) {
- }
- // InitOffset .
- func (l *Log) InitOffset(c context.Context) {
- }
- // Offset .
- func (l *Log) Offset(c context.Context) {
- }
- // MapData .
- func (l *Log) MapData(c context.Context) (mapData []model.MapData) {
- return l.mapData
- }
- // Attrs .
- func (l *Log) Attrs(c context.Context) (attrs *model.Attrs) {
- return l.attrs
- }
- // SetRecover .
- func (l *Log) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
- }
- // IncrMessages .
- func (l *Log) IncrMessages(c context.Context) (length int, err error) {
- var jErr error
- ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(l.attrs.Databus.Ticker)))
- defer ticker.Stop()
- for {
- select {
- case msg, ok := <-l.databus.Messages():
- if !ok {
- log.Error("databus: %s binlog consumer exit!!!", l.attrs.Databus)
- break
- }
- l.commits[msg.Partition] = msg
- var result map[string]interface{}
- decoder := json.NewDecoder(bytes.NewReader(msg.Value))
- decoder.UseNumber()
- if jErr = decoder.Decode(&result); jErr != nil {
- log.Error("appid(%v) json.Unmarshal(%s) error(%v)", l.appid, msg.Value, jErr)
- continue
- }
- // json.Number转int64
- for k, v := range result {
- switch t := v.(type) {
- case json.Number:
- if result[k], jErr = t.Int64(); jErr != nil {
- log.Error("appid(%v) log.bulkDatabusData.json.Number(%v)(%v)", l.appid, t, jErr)
- }
- }
- }
- l.mapData = append(l.mapData, result)
- if len(l.mapData) < l.attrs.Databus.AggCount {
- continue
- }
- case <-ticker.C:
- }
- break
- }
- // todo: 额外的参数
- length = len(l.mapData)
- return
- }
- // AllMessages .
- func (l *Log) AllMessages(c context.Context) (length int, err error) {
- return
- }
- // BulkIndex .
- func (l *Log) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) {
- partData := l.mapData[start:end]
- if err = l.bulkDatabusData(c, l.attrs, writeEntityIndex, partData...); err != nil {
- log.Error("appid(%v) json.bulkDatabusData error(%v)", l.appid, err)
- return
- }
- return
- }
- // Commit .
- func (l *Log) Commit(c context.Context) (err error) {
- for k, msg := range l.commits {
- if err = msg.Commit(); err != nil {
- log.Error("appid(%v) Commit error(%v)", l.appid, err)
- continue
- }
- delete(l.commits, k)
- }
- l.mapData = []model.MapData{}
- return
- }
- // Sleep .
- func (l *Log) Sleep(c context.Context) {
- time.Sleep(time.Second * time.Duration(l.attrs.Other.Sleep))
- }
- // Size .
- func (l *Log) Size(c context.Context) (size int) {
- return l.attrs.Other.Size
- }
- func (l *Log) bulkDatabusData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) {
- var (
- request elastic.BulkableRequest
- bulkRequest map[string]*elastic.BulkService
- businessID int
- )
- bulkRequest = map[string]*elastic.BulkService{}
- for _, b := range bulkData {
- indexName := ""
- if business, ok := b["business"].(int64); ok {
- businessID = int(business)
- if v, ok := b["ctime"].(string); ok {
- if cTime, timeErr := time.Parse("2006-01-02 15:04:05", v); timeErr == nil {
- if info, ok := l.business[businessID]; ok {
- suffix := strings.Replace(cTime.Format(info.Format), "week", l.week[cTime.Day()/8], -1) + "_" + info.Version
- if !writeEntityIndex {
- indexName = attrs.Index.IndexAliasPrefix + "_" + strconv.Itoa(businessID) + "_" + suffix
- } else {
- indexName = attrs.Index.IndexEntityPrefix + "_" + strconv.Itoa(businessID) + "_" + suffix
- }
- }
- }
- }
- }
- if indexName == "" {
- log.Error("appid(%v) ac.d.bulkDatabusData business business(%v) data(%+v)", l.appid, b["business"], b)
- continue
- }
- esCluster := l.business[businessID].Cluster // 上方已经判断l.business[businessID]是否存在
- if _, ok := bulkRequest[esCluster]; !ok {
- if _, eok := l.d.ESPool[esCluster]; eok {
- bulkRequest[esCluster] = l.d.ESPool[esCluster].Bulk()
- } else {
- log.Error("appid(%v) ac.d.bulkDatabusData cluster no find error(%v)", l.appid, esCluster)
- continue //忽略这条数据
- }
- }
- //发送数据中心
- if l.business[businessID].DataCenter == 1 {
- arr := make([]interface{}, len(l.infoCField))
- for i, f := range l.infoCField {
- if v, ok := b[f]; ok {
- arr[i] = fmt.Sprintf("%v", v)
- }
- }
- if er := l.infoC.Info(arr...); er != nil {
- log.Error("appid(%v) ac.infoC.Info error(%v)", l.appid, er)
- }
- }
- //数据处理
- for k, v := range b {
- if t, ok := l.mapping[businessID][k]; ok {
- switch t {
- case "int_to_bin":
- if item, ok := v.(int64); ok {
- item := int(item)
- arr := []string{}
- for i := 0; item != 0; i++ {
- if item&1 == 1 {
- arr = append(arr, strconv.Itoa(item&1<<uint(i)))
- }
- item = item >> 1
- }
- b[k] = arr
- } else {
- delete(b, k)
- }
- case "array":
- if arr, ok := v.([]interface{}); ok {
- b[k] = arr
- } else {
- delete(b, k)
- }
- }
- } else {
- delete(b, k)
- }
- }
- request = elastic.NewBulkIndexRequest().Index(indexName).Type(attrs.Index.IndexType).Doc(b)
- bulkRequest[esCluster].Add(request)
- }
- for _, v := range bulkRequest {
- if v.NumberOfActions() == 0 {
- continue
- }
- if _, err = v.Do(c); err != nil {
- log.Error("appid(%s) bulk error(%v)", attrs.AppID, err)
- }
- }
- return
- }
|