log.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/url"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/admin/main/search/model"
  11. "go-common/library/log"
  12. "github.com/pkg/errors"
  13. "gopkg.in/olivere/elastic.v5"
  14. )
  15. const (
  16. _sql = "SELECT id, name, index_format, index_cluster, additional_mapping, permission_point FROM digger_"
  17. _count = "INSERT INTO digger_count (`business`,`type`,`time`,`count`) values (?, 'inc', ?, 1) ON DUPLICATE KEY UPDATE count=count+1"
  18. _percent = "INSERT INTO digger_count (`business`,`type`,`time`,`name`,`count`) values (?, 'inc', ?, ?, 1) ON DUPLICATE KEY UPDATE count=count+1"
  19. )
  20. var (
  21. logAuditBusiness map[int]*model.Business
  22. logUserActionBusiness map[int]*model.Business
  23. )
  24. func (d *Dao) NewLogProcess() {
  25. for {
  26. if err := d.NewLog(); err != nil {
  27. time.Sleep(time.Second)
  28. continue
  29. }
  30. time.Sleep(time.Minute)
  31. }
  32. }
  33. // NewLog .
  34. func (d *Dao) NewLog() (err error) {
  35. if logAuditBusiness, err = d.initMapping("log_audit"); err != nil {
  36. return
  37. }
  38. for k, v := range logAuditBusiness {
  39. if _, ok := d.esPool[v.IndexCluster]; !ok {
  40. log.Error("logAudit esPool no exist(%v)", k)
  41. delete(logAuditBusiness, k)
  42. }
  43. }
  44. if logUserActionBusiness, err = d.initMapping("log_user_action"); err != nil {
  45. return
  46. }
  47. for k, v := range logUserActionBusiness {
  48. if _, ok := d.esPool[v.IndexCluster]; !ok {
  49. log.Error("logUserAction esPool no exist(%v)", k)
  50. delete(logUserActionBusiness, k)
  51. }
  52. }
  53. return
  54. }
  55. // GetLogInfo .
  56. func (d *Dao) GetLogInfo(appID string, id int) (business *model.Business, ok bool) {
  57. switch appID {
  58. case "log_audit":
  59. business, ok = logAuditBusiness[id]
  60. return
  61. case "log_user_action":
  62. business, ok = logUserActionBusiness[id]
  63. return
  64. }
  65. return &model.Business{}, false
  66. }
  67. func (d *Dao) initMapping(appID string) (business map[int]*model.Business, err error) {
  68. defaultMapping := map[string]string{}
  69. switch appID {
  70. case "log_audit":
  71. defaultMapping = model.LogAuditDefaultMapping
  72. case "log_user_action":
  73. defaultMapping = model.LogUserActionDefaultMapping
  74. }
  75. business = map[int]*model.Business{}
  76. rows, err := d.db.Query(context.Background(), _sql+appID)
  77. if err != nil {
  78. return
  79. }
  80. defer rows.Close()
  81. for rows.Next() {
  82. var value = &model.Business{
  83. AppID: appID,
  84. Mapping: map[string]string{},
  85. }
  86. if err = rows.Scan(&value.ID, &value.Name, &value.IndexFormat, &value.IndexCluster, &value.AdditionalMapping, &value.PermissionPoint); err != nil {
  87. log.Error("Log New DB (%v)(%v)", appID, err)
  88. continue
  89. }
  90. if appID == "log_audit" {
  91. value.IndexCluster = "log"
  92. }
  93. for k, v := range defaultMapping {
  94. value.Mapping[k] = v
  95. }
  96. if value.AdditionalMapping != "" {
  97. var additionalMappingDict map[string]string
  98. if err = json.Unmarshal([]byte(value.AdditionalMapping), &additionalMappingDict); err != nil {
  99. log.Error("Log New Json (%v)(%v)", value.ID, err)
  100. continue
  101. }
  102. for k, v := range additionalMappingDict {
  103. value.Mapping[k] = v
  104. }
  105. }
  106. business[value.ID] = value
  107. }
  108. err = rows.Err()
  109. return
  110. }
  111. /**
  112. 获取es索引名 多个用逗号分隔
  113. 按日分最多7天
  114. 按周分最多2月
  115. 按月分最多6月
  116. 按年分最多3年
  117. */
  118. func (d *Dao) logIndexName(c context.Context, p *model.LogParams, business *model.Business) (res string, err error) {
  119. var (
  120. sTime = time.Now()
  121. eTime = time.Now()
  122. resArr []string
  123. )
  124. if p.CTimeFrom != "" {
  125. sTime, err = time.Parse("2006-01-02 15:04:05", p.CTimeFrom)
  126. if err != nil {
  127. log.Error("d.LogAuditIndexName(%v)", p.CTimeFrom)
  128. return
  129. }
  130. }
  131. if p.CTimeTo != "" {
  132. eTime, err = time.Parse("2006-01-02 15:04:05", p.CTimeTo)
  133. if err != nil {
  134. log.Error("d.LogAuditIndexName(p.CTimeTo)(%v)", p.CTimeTo)
  135. return
  136. }
  137. }
  138. resDict := map[string]bool{}
  139. if strings.Contains(business.IndexFormat, "02") {
  140. for a := 0; a <= 60; a++ {
  141. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, eTime)] = true
  142. eTime = eTime.AddDate(0, 0, -1)
  143. if (p.CTimeFrom == "" && a >= 1) || (p.CTimeFrom != "" && sTime.After(eTime)) {
  144. break
  145. }
  146. }
  147. } else if strings.Contains(business.IndexFormat, "week") {
  148. for a := 0; a <= 366; a++ {
  149. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, eTime)] = true
  150. eTime = eTime.AddDate(0, 0, -1)
  151. if (p.CTimeFrom == "" && a >= 1) || (p.CTimeFrom != "" && sTime.After(eTime)) {
  152. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, sTime)] = true
  153. break
  154. }
  155. }
  156. } else if strings.Contains(business.IndexFormat, "01") {
  157. // 1月31日时AddDate(0, -1, 0)会出现错误
  158. year, month, _ := eTime.Date()
  159. hour, min, sec := eTime.Clock()
  160. eTime = time.Date(year, month, 1, hour, min, sec, 0, eTime.Location())
  161. for a := 0; a <= 360; a++ {
  162. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, eTime)] = true
  163. eTime = eTime.AddDate(0, -1, 0)
  164. if (p.CTimeFrom == "" && a >= 1) || p.CTimeFrom != "" && sTime.After(eTime) {
  165. break
  166. }
  167. }
  168. } else if strings.Contains(business.IndexFormat, "2006") {
  169. // 2月29日时AddDate(-1, 0, 0)会出现错误
  170. year, _, _ := eTime.Date()
  171. hour, min, sec := eTime.Clock()
  172. eTime = time.Date(year, 1, 1, hour, min, sec, 0, eTime.Location())
  173. for a := 0; a <= 100; a++ {
  174. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, eTime)] = true
  175. eTime = eTime.AddDate(-1, 0, 0)
  176. if (p.CTimeFrom == "" && a >= 1) || (p.CTimeFrom != "" && sTime.After(eTime)) {
  177. break
  178. }
  179. }
  180. } else if business.IndexFormat == "all" {
  181. resDict[getLogAuditIndexName(p.Business, business.AppID, business.IndexFormat, eTime)] = true
  182. }
  183. for k := range resDict {
  184. if exist, e := d.ExistIndex(c, business.IndexCluster, k); exist && e == nil {
  185. resArr = append(resArr, k)
  186. }
  187. }
  188. res = strings.Join(resArr, ",")
  189. return
  190. }
  191. func getLogAuditIndexName(business int, indexName string, format string, time time.Time) (index string) {
  192. var (
  193. week = map[int]string{
  194. 0: "0107",
  195. 1: "0815",
  196. 2: "1623",
  197. 3: "2431",
  198. }
  199. )
  200. format = strings.Replace(time.Format(format), "week", week[time.Day()/8], -1)
  201. index = indexName + "_" + strconv.Itoa(business) + "_" + format
  202. return
  203. }
  204. func (d *Dao) getQuery(pr map[string][]interface{}, indexMapping map[string]string) (query *elastic.BoolQuery) {
  205. query = elastic.NewBoolQuery()
  206. for k, t := range indexMapping {
  207. switch t {
  208. case "int", "int64":
  209. if v, ok := pr[k]; ok {
  210. query = query.Filter(elastic.NewTermsQuery(k, v...))
  211. }
  212. if v, ok := pr[k+"_from"]; ok {
  213. query = query.Filter(elastic.NewRangeQuery(k).Gte(v[0]))
  214. }
  215. if v, ok := pr[k+"_to"]; ok {
  216. query = query.Filter(elastic.NewRangeQuery(k).Lte(v[0]))
  217. }
  218. case "string":
  219. if v, ok := pr[k]; ok {
  220. query = query.Filter(elastic.NewTermsQuery(k, v...))
  221. }
  222. if v, ok := pr[k+"_like"]; ok {
  223. likeMap := []model.QueryBodyWhereLike{
  224. {
  225. KWFields: []string{k},
  226. KW: []string{fmt.Sprintf("%v", v)},
  227. Level: model.LikeLevelHigh,
  228. },
  229. }
  230. if o, e := d.queryBasicLike(likeMap, ""); e == nil {
  231. query = query.Must(o...)
  232. }
  233. }
  234. case "time":
  235. if v, ok := pr[k+"_from"]; ok {
  236. query = query.Filter(elastic.NewRangeQuery(k).Gte(v[0]))
  237. }
  238. if v, ok := pr[k+"_to"]; ok {
  239. query = query.Filter(elastic.NewRangeQuery(k).Lte(v[0]))
  240. }
  241. case "int_to_bin":
  242. if v, ok := pr[k]; ok {
  243. var arr []elastic.Query
  244. for _, i := range v {
  245. item, err := strconv.ParseUint(i.(string), 10, 64)
  246. if err != nil {
  247. break
  248. }
  249. arr = append(arr, elastic.NewTermsQuery(k, 1<<(item-1)))
  250. }
  251. query = query.Filter(arr...)
  252. }
  253. case "array":
  254. if v, ok := pr[k+"_and"]; ok {
  255. for _, n := range v {
  256. query = query.Filter(elastic.NewTermsQuery(k, n))
  257. }
  258. }
  259. if v, ok := pr[k+"_or"]; ok {
  260. query = query.Filter(elastic.NewTermsQuery(k, v...))
  261. }
  262. }
  263. }
  264. return query
  265. }
  266. // LogAudit .
  267. func (d *Dao) LogAudit(c context.Context, pr map[string][]interface{}, sp *model.LogParams, business *model.Business) (res *model.SearchResult, err error) {
  268. var indexName string
  269. res = &model.SearchResult{
  270. Result: []json.RawMessage{},
  271. Page: &model.Page{},
  272. }
  273. query := d.getQuery(pr, business.Mapping)
  274. indexName, err = d.logIndexName(c, sp, business)
  275. if err != nil {
  276. log.Error("d.LogAudit.logIndexName(%v)(%v)", err, indexName)
  277. return
  278. }
  279. if indexName == "" {
  280. return
  281. }
  282. if res, err = d.searchResult(c, business.IndexCluster, indexName, query, sp.Bsp); err != nil {
  283. PromError(fmt.Sprintf("es:%s ", sp.Bsp.AppID), "%v", err)
  284. }
  285. return
  286. }
  287. // LogAuditGroupBy .
  288. func (d *Dao) LogAuditGroupBy(c context.Context, pr map[string][]interface{}, sp *model.LogParams, business *model.Business) (res *model.SearchResult, err error) {
  289. res = &model.SearchResult{
  290. Result: []json.RawMessage{},
  291. Page: &model.Page{},
  292. }
  293. var (
  294. indexName = ""
  295. searchResult *elastic.SearchResult
  296. )
  297. group := pr["group"][0].(string)
  298. if _, ok := d.esPool[business.IndexCluster]; !ok {
  299. PromError(fmt.Sprintf("es:集群不存在%s", "LogAuditGroupBy"), "s.dao.LogAuditGroupBy indexName:%s", "LogAuditGroupBy")
  300. return
  301. }
  302. query := d.getQuery(pr, business.Mapping)
  303. indexName, err = d.logIndexName(c, sp, business)
  304. if err != nil {
  305. log.Error("d.LogAuditGroupBy.logIndexName(%v)(%v)", err, indexName)
  306. return
  307. }
  308. if indexName == "" {
  309. return
  310. }
  311. collapse := elastic.NewCollapseBuilder(group).MaxConcurrentGroupRequests(1)
  312. searchResult, err = d.esPool[business.IndexCluster].Search().Index(indexName).Type("base").Query(query).
  313. Sort("ctime", false).Collapse(collapse).Size(1000).Do(c)
  314. if err != nil {
  315. log.Error("d.LogAuditGroupBy(%v)", err)
  316. return
  317. }
  318. for _, hit := range searchResult.Hits.Hits {
  319. var t json.RawMessage
  320. err = json.Unmarshal(*hit.Source, &t)
  321. if err != nil {
  322. log.Error("es:%s 返回不是json!!!", business.IndexCluster)
  323. return
  324. }
  325. res.Result = append(res.Result, t)
  326. }
  327. res.Page.Ps = sp.Bsp.Ps
  328. res.Page.Pn = sp.Bsp.Pn
  329. res.Page.Total = int64(len(res.Result))
  330. return
  331. }
  332. // LogAuditDelete .
  333. func (d *Dao) LogAuditDelete(c context.Context, pr map[string][]interface{}, sp *model.LogParams, business *model.Business) (res *model.SearchResult, err error) {
  334. var (
  335. indexName string
  336. searchResult *elastic.BulkIndexByScrollResponse
  337. )
  338. res = &model.SearchResult{
  339. Result: []json.RawMessage{},
  340. Page: &model.Page{},
  341. }
  342. query := d.getQuery(pr, business.Mapping)
  343. indexName, err = d.logIndexName(c, sp, business)
  344. if err != nil {
  345. log.Error("d.LogAuditDelete.logIndexName(%v)(%v)", err, indexName)
  346. return
  347. }
  348. if indexName == "" {
  349. return
  350. }
  351. searchResult, err = d.esPool[business.IndexCluster].DeleteByQuery().Index(indexName).Type("base").Query(query).Size(10000).Do(c)
  352. if err != nil {
  353. log.Error("d.LogAuditDelete.DeleteByQuery(%v)(%v)", err, indexName)
  354. return
  355. }
  356. res.Page.Total = searchResult.Total
  357. return
  358. }
  359. // LogUserAction .
  360. func (d *Dao) LogUserAction(c context.Context, pr map[string][]interface{}, sp *model.LogParams, business *model.Business) (res *model.SearchResult, err error) {
  361. var indexName string
  362. res = &model.SearchResult{
  363. Result: []json.RawMessage{},
  364. Page: &model.Page{},
  365. }
  366. query := d.getQuery(pr, business.Mapping)
  367. indexName, err = d.logIndexName(c, sp, business)
  368. if err != nil {
  369. log.Error("d.LogUserAction.logIndexName(%v)(%v)", err, indexName)
  370. return
  371. }
  372. if indexName == "" {
  373. return
  374. }
  375. if res, err = d.searchResult(c, business.IndexCluster, indexName, query, sp.Bsp); err != nil {
  376. PromError(fmt.Sprintf("es:%s ", sp.Bsp.AppID), "%v", err)
  377. }
  378. return
  379. }
  380. // LogUserActionDelete .
  381. func (d *Dao) LogUserActionDelete(c context.Context, pr map[string][]interface{}, sp *model.LogParams, business *model.Business) (res *model.SearchResult, err error) {
  382. var (
  383. indexName string
  384. searchResult *elastic.BulkIndexByScrollResponse
  385. )
  386. res = &model.SearchResult{
  387. Result: []json.RawMessage{},
  388. Page: &model.Page{},
  389. }
  390. query := d.getQuery(pr, business.Mapping)
  391. indexName, err = d.logIndexName(c, sp, business)
  392. if err != nil {
  393. log.Error("d.LogUserActionDelete.logIndexName(%v)(%v)", err, indexName)
  394. return
  395. }
  396. if indexName == "" {
  397. return
  398. }
  399. searchResult, err = d.esPool[business.IndexCluster].DeleteByQuery().Index(indexName).Type("base").Query(query).Size(10000).Do(c)
  400. if err != nil {
  401. log.Error("d.LogUserActionDelete.DeleteByQuery(%v)(%v)", err, indexName)
  402. return
  403. }
  404. res.Page.Total = searchResult.Total
  405. return
  406. }
  407. // UDepTs .
  408. func (d *Dao) UDepTs(c context.Context, uids []string) (res *model.UDepTsData, err error) {
  409. params := url.Values{}
  410. params.Set("uids", strings.Join(uids, ","))
  411. if err = d.client.Get(c, d.managerDep, "", params, &res); err != nil {
  412. err = errors.Wrapf(err, "d.httpSearch url(%s)", d.managerDep+"?"+params.Encode())
  413. log.Error("d.httpSearch url(%s)", d.managerDep+"?"+params.Encode())
  414. return
  415. }
  416. if res.Code != 0 {
  417. err = errors.Wrapf(err, "response url(%s) code(%d)", d.managerDep+"?"+params.Encode(), res.Code)
  418. log.Error("response url(%s) code(%d)", d.managerDep+"?"+params.Encode(), res.Code)
  419. return
  420. }
  421. return
  422. }
  423. // IP .
  424. func (d *Dao) IP(c context.Context, ip []string) (res *model.IPData, err error) {
  425. params := url.Values{}
  426. params.Set("ips", strings.Join(ip, ","))
  427. if err = d.client.Get(c, d.managerIP, "", params, &res); err != nil {
  428. err = errors.Wrapf(err, "d.httpSearch url(%s)", d.managerIP+"?"+params.Encode())
  429. log.Error("d.httpSearch url(%s)", d.managerIP+"?"+params.Encode())
  430. return
  431. }
  432. if res.Code != 0 {
  433. err = errors.Wrapf(err, "response url(%s) code(%d)", d.managerDep+"?"+params.Encode(), res.Code)
  434. log.Error("response url(%s) code(%d)", d.managerIP+"?"+params.Encode(), res.Code)
  435. return
  436. }
  437. return
  438. }
  439. // LogCount .
  440. func (d *Dao) LogCount(c context.Context, name string, business int, uid interface{}) {
  441. date := time.Now().Format("2006-01-02")
  442. if _, err := d.db.Exec(c, _count, name+"_access", date); err != nil {
  443. log.Error("d.db.Exec err(%v)", err)
  444. return
  445. }
  446. if _, err := d.db.Exec(c, _percent, name+"_uid", date, uid); err != nil {
  447. log.Error("d.db.Exec err(%v)", err)
  448. return
  449. }
  450. if _, err := d.db.Exec(c, _percent, name+"_business", date, business); err != nil {
  451. log.Error("d.db.Exec err(%v)", err)
  452. return
  453. }
  454. }