log.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package business
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/search/dao"
  11. "go-common/app/job/main/search/model"
  12. "go-common/library/log"
  13. "go-common/library/log/infoc"
  14. "go-common/library/queue/databus"
  15. "gopkg.in/olivere/elastic.v5"
  16. )
  17. const _sql = "SELECT id, index_format, index_version, index_cluster, additional_mapping, data_center FROM digger_"
  18. // Log .
  19. type Log struct {
  20. d *dao.Dao
  21. appid string
  22. attrs *model.Attrs
  23. databus *databus.Databus
  24. infoC *infoc.Infoc
  25. infoCField []string
  26. mapData []model.MapData
  27. commits map[int32]*databus.Message
  28. business map[int]*info
  29. week map[int]string
  30. additionalMapping map[int]map[string]string
  31. defaultMapping map[string]string
  32. mapping map[int]map[string]string
  33. }
  34. type info struct {
  35. Format string
  36. Cluster string
  37. Version string
  38. DataCenter int8
  39. }
  40. // NewLog .
  41. func NewLog(d *dao.Dao, appid string) (l *Log) {
  42. l = &Log{
  43. d: d,
  44. appid: appid,
  45. attrs: d.AttrPool[appid],
  46. databus: d.DatabusPool[appid],
  47. infoC: d.InfoCPool[appid],
  48. infoCField: []string{},
  49. mapData: []model.MapData{},
  50. commits: map[int32]*databus.Message{},
  51. business: map[int]*info{},
  52. additionalMapping: map[int]map[string]string{},
  53. mapping: map[int]map[string]string{},
  54. week: map[int]string{
  55. 0: "0107",
  56. 1: "0815",
  57. 2: "1623",
  58. 3: "2431",
  59. },
  60. }
  61. switch appid {
  62. case "log_audit":
  63. l.defaultMapping = map[string]string{
  64. "uname": "string",
  65. "uid": "string",
  66. "business": "string",
  67. "type": "string",
  68. "oid": "string",
  69. "action": "string",
  70. "ctime": "time",
  71. "int_0": "int",
  72. "int_1": "int",
  73. "int_2": "int",
  74. "str_0": "string",
  75. "str_1": "string",
  76. "str_2": "string",
  77. "extra_data": "string",
  78. }
  79. l.infoCField = []string{"uname", "uid", "business", "type", "oid", "action", "ctime",
  80. "int_0", "int_1", "int_2", "str_0", "str_1", "str_2", "str_3", "str_4", "extra_data"}
  81. case "log_user_action":
  82. l.defaultMapping = map[string]string{
  83. "mid": "string",
  84. "platform": "string",
  85. "build": "string",
  86. "buvid": "string",
  87. "business": "string",
  88. "type": "string",
  89. "oid": "string",
  90. "action": "string",
  91. "ip": "string",
  92. "ctime": "time",
  93. "int_0": "int",
  94. "int_1": "int",
  95. "int_2": "int",
  96. "str_0": "string",
  97. "str_1": "string",
  98. "str_2": "string",
  99. "extra_data": "string",
  100. }
  101. l.infoCField = []string{"mid", "platform", "build", "buvid", "business", "type", "oid", "action", "ip", "ctime",
  102. "int_0", "int_1", "int_2", "str_0", "str_1", "str_2", "extra_data"}
  103. default:
  104. log.Error("log appid error(%v)", appid)
  105. return
  106. }
  107. rows, err := d.SearchDB.Query(context.TODO(), _sql+appid)
  108. if err != nil {
  109. log.Error("log Query error(%v)", appid)
  110. return
  111. }
  112. defer rows.Close()
  113. for rows.Next() {
  114. var (
  115. id int
  116. additionalMapping string
  117. )
  118. info := &info{}
  119. if err = rows.Scan(&id, &info.Format, &info.Version, &info.Cluster, &additionalMapping, &info.DataCenter); err != nil {
  120. log.Error("Log New DB (%v)(%v)", id, err)
  121. continue
  122. }
  123. l.business[id] = info
  124. if additionalMapping != "" {
  125. var additionalMappingDict map[string]string
  126. if err = json.Unmarshal([]byte(additionalMapping), &additionalMappingDict); err != nil {
  127. log.Error("Log New Json (%v)(%v)", id, err)
  128. continue
  129. }
  130. l.additionalMapping[id] = additionalMappingDict
  131. }
  132. }
  133. for b := range l.business {
  134. l.mapping[b] = map[string]string{}
  135. for k, v := range l.defaultMapping {
  136. l.mapping[b][k] = v
  137. }
  138. if a, ok := l.additionalMapping[b]; ok {
  139. for k, v := range a {
  140. l.mapping[b][k] = v
  141. }
  142. }
  143. }
  144. return
  145. }
  146. // Business return business.
  147. func (l *Log) Business() string {
  148. return l.attrs.Business
  149. }
  150. // InitIndex .
  151. func (l *Log) InitIndex(c context.Context) {
  152. }
  153. // InitOffset .
  154. func (l *Log) InitOffset(c context.Context) {
  155. }
  156. // Offset .
  157. func (l *Log) Offset(c context.Context) {
  158. }
  159. // MapData .
  160. func (l *Log) MapData(c context.Context) (mapData []model.MapData) {
  161. return l.mapData
  162. }
  163. // Attrs .
  164. func (l *Log) Attrs(c context.Context) (attrs *model.Attrs) {
  165. return l.attrs
  166. }
  167. // SetRecover .
  168. func (l *Log) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  169. }
  170. // IncrMessages .
  171. func (l *Log) IncrMessages(c context.Context) (length int, err error) {
  172. var jErr error
  173. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(l.attrs.Databus.Ticker)))
  174. defer ticker.Stop()
  175. for {
  176. select {
  177. case msg, ok := <-l.databus.Messages():
  178. if !ok {
  179. log.Error("databus: %s binlog consumer exit!!!", l.attrs.Databus)
  180. break
  181. }
  182. l.commits[msg.Partition] = msg
  183. var result map[string]interface{}
  184. decoder := json.NewDecoder(bytes.NewReader(msg.Value))
  185. decoder.UseNumber()
  186. if jErr = decoder.Decode(&result); jErr != nil {
  187. log.Error("appid(%v) json.Unmarshal(%s) error(%v)", l.appid, msg.Value, jErr)
  188. continue
  189. }
  190. // json.Number转int64
  191. for k, v := range result {
  192. switch t := v.(type) {
  193. case json.Number:
  194. if result[k], jErr = t.Int64(); jErr != nil {
  195. log.Error("appid(%v) log.bulkDatabusData.json.Number(%v)(%v)", l.appid, t, jErr)
  196. }
  197. }
  198. }
  199. l.mapData = append(l.mapData, result)
  200. if len(l.mapData) < l.attrs.Databus.AggCount {
  201. continue
  202. }
  203. case <-ticker.C:
  204. }
  205. break
  206. }
  207. // todo: 额外的参数
  208. length = len(l.mapData)
  209. return
  210. }
  211. // AllMessages .
  212. func (l *Log) AllMessages(c context.Context) (length int, err error) {
  213. return
  214. }
  215. // BulkIndex .
  216. func (l *Log) BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error) {
  217. partData := l.mapData[start:end]
  218. if err = l.bulkDatabusData(c, l.attrs, writeEntityIndex, partData...); err != nil {
  219. log.Error("appid(%v) json.bulkDatabusData error(%v)", l.appid, err)
  220. return
  221. }
  222. return
  223. }
  224. // Commit .
  225. func (l *Log) Commit(c context.Context) (err error) {
  226. for k, msg := range l.commits {
  227. if err = msg.Commit(); err != nil {
  228. log.Error("appid(%v) Commit error(%v)", l.appid, err)
  229. continue
  230. }
  231. delete(l.commits, k)
  232. }
  233. l.mapData = []model.MapData{}
  234. return
  235. }
  236. // Sleep .
  237. func (l *Log) Sleep(c context.Context) {
  238. time.Sleep(time.Second * time.Duration(l.attrs.Other.Sleep))
  239. }
  240. // Size .
  241. func (l *Log) Size(c context.Context) (size int) {
  242. return l.attrs.Other.Size
  243. }
  244. func (l *Log) bulkDatabusData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) {
  245. var (
  246. request elastic.BulkableRequest
  247. bulkRequest map[string]*elastic.BulkService
  248. businessID int
  249. )
  250. bulkRequest = map[string]*elastic.BulkService{}
  251. for _, b := range bulkData {
  252. indexName := ""
  253. if business, ok := b["business"].(int64); ok {
  254. businessID = int(business)
  255. if v, ok := b["ctime"].(string); ok {
  256. if cTime, timeErr := time.Parse("2006-01-02 15:04:05", v); timeErr == nil {
  257. if info, ok := l.business[businessID]; ok {
  258. suffix := strings.Replace(cTime.Format(info.Format), "week", l.week[cTime.Day()/8], -1) + "_" + info.Version
  259. if !writeEntityIndex {
  260. indexName = attrs.Index.IndexAliasPrefix + "_" + strconv.Itoa(businessID) + "_" + suffix
  261. } else {
  262. indexName = attrs.Index.IndexEntityPrefix + "_" + strconv.Itoa(businessID) + "_" + suffix
  263. }
  264. }
  265. }
  266. }
  267. }
  268. if indexName == "" {
  269. log.Error("appid(%v) ac.d.bulkDatabusData business business(%v) data(%+v)", l.appid, b["business"], b)
  270. continue
  271. }
  272. esCluster := l.business[businessID].Cluster // 上方已经判断l.business[businessID]是否存在
  273. if _, ok := bulkRequest[esCluster]; !ok {
  274. if _, eok := l.d.ESPool[esCluster]; eok {
  275. bulkRequest[esCluster] = l.d.ESPool[esCluster].Bulk()
  276. } else {
  277. log.Error("appid(%v) ac.d.bulkDatabusData cluster no find error(%v)", l.appid, esCluster)
  278. continue //忽略这条数据
  279. }
  280. }
  281. //发送数据中心
  282. if l.business[businessID].DataCenter == 1 {
  283. arr := make([]interface{}, len(l.infoCField))
  284. for i, f := range l.infoCField {
  285. if v, ok := b[f]; ok {
  286. arr[i] = fmt.Sprintf("%v", v)
  287. }
  288. }
  289. if er := l.infoC.Info(arr...); er != nil {
  290. log.Error("appid(%v) ac.infoC.Info error(%v)", l.appid, er)
  291. }
  292. }
  293. //数据处理
  294. for k, v := range b {
  295. if t, ok := l.mapping[businessID][k]; ok {
  296. switch t {
  297. case "int_to_bin":
  298. if item, ok := v.(int64); ok {
  299. item := int(item)
  300. arr := []string{}
  301. for i := 0; item != 0; i++ {
  302. if item&1 == 1 {
  303. arr = append(arr, strconv.Itoa(item&1<<uint(i)))
  304. }
  305. item = item >> 1
  306. }
  307. b[k] = arr
  308. } else {
  309. delete(b, k)
  310. }
  311. case "array":
  312. if arr, ok := v.([]interface{}); ok {
  313. b[k] = arr
  314. } else {
  315. delete(b, k)
  316. }
  317. }
  318. } else {
  319. delete(b, k)
  320. }
  321. }
  322. request = elastic.NewBulkIndexRequest().Index(indexName).Type(attrs.Index.IndexType).Doc(b)
  323. bulkRequest[esCluster].Add(request)
  324. }
  325. for _, v := range bulkRequest {
  326. if v.NumberOfActions() == 0 {
  327. continue
  328. }
  329. if _, err = v.Do(c); err != nil {
  330. log.Error("appid(%s) bulk error(%v)", attrs.AppID, err)
  331. }
  332. }
  333. return
  334. }