http_search.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. package dao
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "go-common/app/admin/main/reply/conf"
  13. "go-common/app/admin/main/reply/model"
  14. "go-common/library/database/elastic"
  15. "go-common/library/log"
  16. )
  17. const (
  18. // api
  19. _apiSearch = "/api/reply/internal/search"
  20. _apiSearchUpdate = "/api/reply/internal/update"
  21. // index
  22. _searchIdxReply = "reply"
  23. _searchIdxReport = "replyreport"
  24. _searchIdxMonitor = "replymonitor"
  25. _searchIdxTimeFormat = "2006-01-02 15:03:04"
  26. )
  27. var zeroTime = time.Time{}
  28. func (d *Dao) SearchReplyV3(c context.Context, sp *model.SearchParams, page, pageSize int64) (res *model.SearchResult, err error) {
  29. var (
  30. end = sp.End
  31. begin = sp.Begin
  32. business = "reply_list"
  33. )
  34. if end == zeroTime {
  35. end = time.Now()
  36. }
  37. if begin == zeroTime {
  38. begin = end.Add(-time.Hour * 24 * 30)
  39. }
  40. r := d.es.NewRequest(business).IndexByTime("reply_list", elastic.IndexTypeWeek, begin, end).
  41. WhereEq("type", fmt.Sprint(sp.Type)).Order(sp.Order, sp.Sort).Pn(int(page)).Ps(int(pageSize)).
  42. WhereRange("ctime", begin.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), elastic.RangeScopeLcRc)
  43. if sp.Oid != 0 {
  44. r = r.WhereEq("oid", strconv.FormatInt(sp.Oid, 10))
  45. }
  46. if sp.TypeIds != "" {
  47. r = r.WhereIn("typeid", strings.Split(sp.TypeIds, ","))
  48. }
  49. if sp.Keyword != "" {
  50. r = r.WhereLike([]string{"message"}, []string{sp.Keyword}, true, elastic.LikeLevelLow)
  51. }
  52. if sp.KeywordHigh != "" {
  53. r = r.WhereLike([]string{"message_middle"}, []string{sp.KeywordHigh}, true, elastic.LikeLevelMiddle)
  54. r = r.OrderScoreFirst(false)
  55. }
  56. if sp.UID != 0 {
  57. r = r.WhereEq("mid", sp.UID)
  58. }
  59. if sp.Uname != "" {
  60. r = r.WhereEq("replier", sp.Uname)
  61. }
  62. if sp.AdminID != 0 {
  63. r = r.WhereEq("adminid", sp.AdminID)
  64. }
  65. if sp.States != "" {
  66. r = r.WhereIn("state", strings.Split(sp.States, ","))
  67. }
  68. if sp.IP != 0 {
  69. var ip = make([]byte, 4)
  70. binary.BigEndian.PutUint32(ip, uint32(sp.IP))
  71. r = r.WhereEq("ip", net.IPv4(ip[0], ip[1], ip[2], ip[3]).String())
  72. }
  73. if sp.Attr != "" {
  74. r = r.WhereIn("attr", strings.Split(sp.Attr, ","))
  75. }
  76. if sp.AdminName != "" {
  77. r = r.WhereEq("admin_name", sp.AdminName)
  78. }
  79. result := new(struct {
  80. Code int
  81. Page *model.Page
  82. Order string
  83. Sort string
  84. Result []*model.SearchReply
  85. Message string
  86. })
  87. log.Warn("search params: %s", r.Params())
  88. err = r.Scan(c, &result)
  89. if err != nil || result.Code != 0 {
  90. log.Error("SearchReplyV3 r.Scan(%v) error:(%v)", c, err)
  91. return
  92. }
  93. res = new(model.SearchResult)
  94. res.Result = result.Result
  95. res.Code = result.Code
  96. res.Page = result.Page.Num
  97. res.PageSize = result.Page.Size
  98. if res.PageSize > 0 {
  99. res.PageCount = result.Page.Total / result.Page.Size
  100. }
  101. res.Total = result.Page.Total
  102. res.Order = result.Order
  103. res.Message = result.Message
  104. return
  105. }
  106. // SearchAdminLog search adminlog
  107. func (d *Dao) SearchAdminLog(c context.Context, rpids []int64) (res []*model.SearchAdminLog, err error) {
  108. if len(rpids) == 0 {
  109. return
  110. }
  111. r := d.es.NewRequest("reply_admin_log").Index("replyadminlog").Pn(int(1)).Ps(len(rpids)).WhereIn("rpid", rpids)
  112. result := new(struct {
  113. Code int
  114. Page *model.Page
  115. Order string
  116. Sort string
  117. Result []*model.SearchAdminLog
  118. Message string
  119. })
  120. log.Warn("search params: %s", r.Params())
  121. err = r.Scan(c, &result)
  122. if err != nil || result.Code != 0 {
  123. log.Error("SearchAdminLog r.Scan(%v) error:(%v)", c, err)
  124. return
  125. }
  126. res = result.Result
  127. return
  128. }
  129. // SearchReply search reply from ES.
  130. func (d *Dao) SearchReply(c context.Context, p *model.SearchParams, page, pageSize int64) (res *model.SearchResult, err error) {
  131. params := url.Values{}
  132. params.Set("appid", _searchIdxReply)
  133. params.Set("type", fmt.Sprint(p.Type))
  134. params.Set("sort", p.Sort)
  135. params.Set("order", p.Order)
  136. params.Set("page", fmt.Sprint(page))
  137. params.Set("pagesize", fmt.Sprint(pageSize))
  138. if p.Oid != 0 {
  139. params.Set("oid", strconv.FormatInt(p.Oid, 10))
  140. }
  141. if p.TypeIds != "" {
  142. params.Set("typeids", p.TypeIds)
  143. }
  144. if p.Keyword != "" {
  145. params.Set("keyword", p.Keyword)
  146. }
  147. if p.UID != 0 {
  148. params.Set("uid", strconv.FormatInt(p.UID, 10))
  149. }
  150. if p.Uname != "" {
  151. params.Set("nickname", p.Uname)
  152. }
  153. if p.AdminID != 0 {
  154. params.Set("adminid", strconv.FormatInt(p.AdminID, 10))
  155. }
  156. if p.Begin != zeroTime {
  157. params.Set("start_time", p.Begin.Format(model.DateFormat))
  158. }
  159. if p.End != zeroTime {
  160. params.Set("end_time", p.End.Format(model.DateFormat))
  161. }
  162. if p.States != "" {
  163. params.Set("states", p.States)
  164. }
  165. if p.IP != 0 {
  166. params.Set("ip", strconv.FormatInt(p.IP, 10))
  167. }
  168. if p.Attr != "" {
  169. params.Set("attr", p.Attr)
  170. }
  171. if p.AdminName != "" {
  172. params.Set("admin_name", p.AdminName)
  173. }
  174. res = &model.SearchResult{}
  175. uri := conf.Conf.Host.Search + _apiSearch
  176. if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
  177. log.Error("searchReply error(%v)", err)
  178. return
  179. }
  180. if res.Code != 0 {
  181. err = model.ErrSearchReply
  182. log.Error("searchReply:%+v error(%v)", res, err)
  183. }
  184. return
  185. }
  186. // SearchMonitor return search monitor reply from ES.
  187. func (d *Dao) SearchMonitor(c context.Context, sp *model.SearchMonitorParams, page, pageSize int64) (res *model.SearchMonitorResult, err error) {
  188. var (
  189. fields []string
  190. keywords []string
  191. order string
  192. sort = "desc"
  193. )
  194. // NOTE:这里之前order 跟 sort 搞反了
  195. if sp.Sort != "" {
  196. order = sp.Sort
  197. }
  198. if sp.Order != "" {
  199. sort = sp.Order
  200. }
  201. r := d.es.NewRequest("reply_monitor").Index(_searchIdxMonitor).
  202. WhereEq("type", fmt.Sprint(sp.Type)).
  203. Order(order, sort).Pn(int(page)).Ps(int(pageSize))
  204. // mode=0 所有监控方式, mode=1 monitor, mode=2, 先审后发
  205. if sp.Mode == 0 {
  206. r = r.WhereOr("monitor", true).WhereOr("audit", true)
  207. } else if sp.Mode == 1 {
  208. r = r.WhereEq("monitor", true)
  209. } else if sp.Mode == 2 {
  210. r = r.WhereEq("audit", true)
  211. }
  212. if sp.Oid > 0 {
  213. r = r.WhereEq("oid", fmt.Sprint(sp.Oid))
  214. }
  215. if sp.UID > 0 {
  216. r = r.WhereEq("mid", fmt.Sprint(sp.UID))
  217. }
  218. if sp.NickName != "" {
  219. fields = append(fields, "uname")
  220. keywords = append(keywords, sp.NickName)
  221. }
  222. if sp.Keyword != "" {
  223. fields = append(fields, "title")
  224. keywords = append(keywords, sp.Keyword)
  225. }
  226. if fields != nil && keywords != nil {
  227. r = r.WhereLike(fields, keywords, true, elastic.LikeLevelLow)
  228. }
  229. result := new(struct {
  230. Code int
  231. Page *model.Page
  232. Order string
  233. Sort string
  234. Result []*model.SearchMonitor
  235. Message string
  236. })
  237. res = &model.SearchMonitorResult{}
  238. log.Warn(r.Params())
  239. err = r.Scan(c, &result)
  240. if err != nil || result.Code != 0 {
  241. log.Error("r.Scan(%v) error:(%v)", c, err)
  242. return
  243. }
  244. res.Result = result.Result
  245. res.Code = result.Code
  246. res.Page = result.Page.Num
  247. res.PageSize = result.Page.Size
  248. if res.PageSize > 0 {
  249. res.PageCount = result.Page.Total / result.Page.Size
  250. }
  251. res.Total = result.Page.Total
  252. res.Order = result.Order
  253. res.Message = result.Message
  254. oids := make([]int64, len(res.Result))
  255. var tp int32
  256. for idx, r := range res.Result {
  257. oids[idx] = r.Oid
  258. tp = int32(r.Type)
  259. }
  260. results, err := d.SubMCount(c, oids, tp)
  261. if err != nil {
  262. log.Error("SubMCount(%v,%v) error", oids, tp)
  263. return
  264. }
  265. for i, reply := range res.Result {
  266. res.Result[i].MCount = results[reply.Oid]
  267. res.Result[i].OidStr = strconv.FormatInt(res.Result[i].Oid, 10)
  268. }
  269. return
  270. }
  271. // UpSearchMonitor update monitor to search data.
  272. func (d *Dao) UpSearchMonitor(c context.Context, sub *model.Subject, remark string) (err error) {
  273. m := make(map[string]interface{})
  274. m["oid"] = sub.Oid
  275. m["type"] = sub.Type
  276. if sub.AttrVal(model.SubAttrMonitor) == model.AttrYes {
  277. m["monitor"] = true
  278. } else {
  279. m["monitor"] = false
  280. }
  281. if sub.AttrVal(model.SubAttrAudit) == model.AttrYes {
  282. m["audit"] = true
  283. } else {
  284. m["audit"] = false
  285. }
  286. m["remark"] = remark
  287. us := d.es.NewUpdate("reply_monitor").Insert()
  288. us.AddData("replymonitor", m)
  289. err = us.Do(c)
  290. if err != nil {
  291. err = model.ErrSearchReport
  292. log.Error("upSearchMonitor error(%v)", err)
  293. return
  294. }
  295. return
  296. }
  297. // SearchReport search reports from ES.
  298. func (d *Dao) SearchReport(c context.Context, sp *model.SearchReportParams, page, pageSize int64) (res *model.SearchReportResult, err error) {
  299. params := url.Values{}
  300. params.Set("appid", "replyreport")
  301. params.Set("type", fmt.Sprint(sp.Type))
  302. params.Set("page", fmt.Sprint(page))
  303. params.Set("pagesize", fmt.Sprint(pageSize))
  304. if sp.Oid != 0 {
  305. params.Set("oid", fmt.Sprint(sp.Oid))
  306. }
  307. if sp.UID != 0 {
  308. params.Set("uid", fmt.Sprint(sp.UID))
  309. }
  310. if sp.Reason != "" {
  311. params.Set("reason", sp.Reason)
  312. }
  313. if sp.Typeids != "" {
  314. params.Set("typeids", sp.Typeids)
  315. }
  316. if sp.Keyword != "" {
  317. params.Set("keyword", sp.Keyword)
  318. }
  319. if sp.Nickname != "" {
  320. params.Set("nickname", sp.Nickname)
  321. }
  322. if sp.States != "" {
  323. params.Set("states", sp.States)
  324. }
  325. if sp.StartTime != "" {
  326. params.Set("start_time", sp.StartTime)
  327. }
  328. if sp.EndTime != "" {
  329. params.Set("end_time", sp.EndTime)
  330. }
  331. if sp.Order != "" {
  332. params.Set("order", sp.Order)
  333. }
  334. if sp.Sort != "" {
  335. params.Set("sort", sp.Sort)
  336. }
  337. res = &model.SearchReportResult{}
  338. uri := conf.Conf.Host.Search + _apiSearch
  339. if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
  340. log.Error("searchReport error(%v)", err)
  341. return
  342. }
  343. if res.Code != 0 {
  344. err = model.ErrSearchReport
  345. log.Error("searchReport:%+v error(%v)", res, err)
  346. }
  347. return
  348. }
  349. // MonitorStats return search monitor stats from ES.
  350. func (d *Dao) MonitorStats(c context.Context, mode, page, pageSize int64, adminIDs, sort, order, startTime, endTime string) (res *model.StatsMonitorResult, err error) {
  351. params := url.Values{}
  352. params.Set("appid", "replymonista")
  353. params.Set("mode", fmt.Sprint(mode))
  354. params.Set("page", fmt.Sprint(page))
  355. params.Set("pagesize", fmt.Sprint(pageSize))
  356. if adminIDs != "" {
  357. params.Set("adminids", adminIDs)
  358. params.Set("typeid", fmt.Sprint(model.MonitorStatsUser))
  359. } else {
  360. params.Set("typeid", fmt.Sprint(model.MonitorStatsAll))
  361. }
  362. if sort != "" {
  363. params.Set("sort", sort)
  364. }
  365. if order != "" {
  366. params.Set("order", order)
  367. }
  368. if startTime != "" {
  369. params.Set("start_time", startTime)
  370. }
  371. if endTime != "" {
  372. params.Set("end_time", endTime)
  373. }
  374. res = &model.StatsMonitorResult{}
  375. uri := conf.Conf.Host.Search + _apiSearch
  376. if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
  377. log.Error("monitorStats error(%v)", err)
  378. return
  379. }
  380. if res.Code != 0 {
  381. err = model.ErrSearchMonitor
  382. log.Error("searchStats:%+v error(%v)", res, err)
  383. }
  384. return
  385. }
  386. // UpSearchReply update search reply index.
  387. func (d *Dao) UpSearchReply(c context.Context, rps map[int64]*model.Reply, newState int32) (err error) {
  388. if len(rps) <= 0 {
  389. return
  390. }
  391. stales := d.es.NewUpdate("reply_list")
  392. for _, rp := range rps {
  393. m := make(map[string]interface{})
  394. m["id"] = rp.ID
  395. m["state"] = newState
  396. m["mtime"] = rp.MTime.Time().Format("2006-01-02 15:04:05")
  397. m["oid"] = rp.Oid
  398. m["type"] = rp.Type
  399. if rp.Content != nil {
  400. m["message"] = rp.Content.Message
  401. }
  402. stales = stales.AddData(d.es.NewUpdate("reply_list").IndexByTime("reply_list", elastic.IndexTypeWeek, rp.CTime.Time()), m)
  403. }
  404. err = stales.Do(c)
  405. if err != nil {
  406. log.Error("upSearchReply update stales(%s) failed!err:=%v", stales.Params(), err)
  407. return
  408. }
  409. log.Info("upSearchReply:stale:%s ret:%+v", stales.Params(), err)
  410. return
  411. }
  412. // UpSearchReport update search report index.
  413. func (d *Dao) UpSearchReport(c context.Context, rpts map[int64]*model.Report, rpState *int32) (err error) {
  414. var res struct {
  415. Code int `json:"code"`
  416. Msg string `json:"msg"`
  417. }
  418. params := url.Values{}
  419. params.Set("appid", _searchIdxReport)
  420. values := make([]map[string]interface{}, 0)
  421. rps := make(map[int64]*model.Reply)
  422. for _, rpt := range rpts {
  423. if int64(rpt.ReplyCtime) != 0 && rpState != nil {
  424. rps[rpt.RpID] = &model.Reply{
  425. ID: rpt.RpID,
  426. Oid: rpt.Oid,
  427. Type: rpt.Type,
  428. MTime: rpt.MTime,
  429. CTime: rpt.ReplyCtime,
  430. State: *rpState,
  431. }
  432. }
  433. v := make(map[string]interface{})
  434. v["id"] = fmt.Sprintf("%d_%d_%d", rpt.RpID, rpt.Oid, rpt.Type)
  435. v["content"] = rpt.Content
  436. v["reason"] = rpt.Reason
  437. v["state"] = rpt.State
  438. v["mtime"] = rpt.MTime.Time().Format(_searchIdxTimeFormat)
  439. v["index_time"] = rpt.CTime.Time().Format(_searchIdxTimeFormat)
  440. if rpt.Attr == 1 {
  441. v["attr"] = []int{1}
  442. } else {
  443. v["attr"] = []int{}
  444. }
  445. if rpState != nil {
  446. v["reply_state"] = *rpState
  447. }
  448. values = append(values, v)
  449. }
  450. b, _ := json.Marshal(values)
  451. params.Set("val", string(b))
  452. // http post
  453. uri := conf.Conf.Host.Search + _apiSearchUpdate
  454. if err = d.httpClient.Post(c, uri, "", params, &res); err != nil {
  455. log.Error("upSearchReport error(%v)", err)
  456. }
  457. log.Info("upSearchReport:%s post:%s ret:%+v", uri, params.Encode(), res)
  458. if len(rps) != 0 && rpState != nil {
  459. err = d.UpSearchReply(c, rps, *rpState)
  460. }
  461. return
  462. }