|
- package dao
- import (
- "context"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "net"
- "net/url"
- "strconv"
- "strings"
- "time"
- "go-common/app/admin/main/reply/conf"
- "go-common/app/admin/main/reply/model"
- "go-common/library/database/elastic"
- "go-common/library/log"
- )
- const (
- // api
- _apiSearch = "/api/reply/internal/search"
- _apiSearchUpdate = "/api/reply/internal/update"
- // index
- _searchIdxReply = "reply"
- _searchIdxReport = "replyreport"
- _searchIdxMonitor = "replymonitor"
- _searchIdxTimeFormat = "2006-01-02 15:03:04"
- )
- var zeroTime = time.Time{}
- func (d *Dao) SearchReplyV3(c context.Context, sp *model.SearchParams, page, pageSize int64) (res *model.SearchResult, err error) {
- var (
- end = sp.End
- begin = sp.Begin
- business = "reply_list"
- )
- if end == zeroTime {
- end = time.Now()
- }
- if begin == zeroTime {
- begin = end.Add(-time.Hour * 24 * 30)
- }
- r := d.es.NewRequest(business).IndexByTime("reply_list", elastic.IndexTypeWeek, begin, end).
- WhereEq("type", fmt.Sprint(sp.Type)).Order(sp.Order, sp.Sort).Pn(int(page)).Ps(int(pageSize)).
- WhereRange("ctime", begin.Format("2006-01-02 15:04:05"), end.Format("2006-01-02 15:04:05"), elastic.RangeScopeLcRc)
- if sp.Oid != 0 {
- r = r.WhereEq("oid", strconv.FormatInt(sp.Oid, 10))
- }
- if sp.TypeIds != "" {
- r = r.WhereIn("typeid", strings.Split(sp.TypeIds, ","))
- }
- if sp.Keyword != "" {
- r = r.WhereLike([]string{"message"}, []string{sp.Keyword}, true, elastic.LikeLevelLow)
- }
- if sp.KeywordHigh != "" {
- r = r.WhereLike([]string{"message_middle"}, []string{sp.KeywordHigh}, true, elastic.LikeLevelMiddle)
- r = r.OrderScoreFirst(false)
- }
- if sp.UID != 0 {
- r = r.WhereEq("mid", sp.UID)
- }
- if sp.Uname != "" {
- r = r.WhereEq("replier", sp.Uname)
- }
- if sp.AdminID != 0 {
- r = r.WhereEq("adminid", sp.AdminID)
- }
- if sp.States != "" {
- r = r.WhereIn("state", strings.Split(sp.States, ","))
- }
- if sp.IP != 0 {
- var ip = make([]byte, 4)
- binary.BigEndian.PutUint32(ip, uint32(sp.IP))
- r = r.WhereEq("ip", net.IPv4(ip[0], ip[1], ip[2], ip[3]).String())
- }
- if sp.Attr != "" {
- r = r.WhereIn("attr", strings.Split(sp.Attr, ","))
- }
- if sp.AdminName != "" {
- r = r.WhereEq("admin_name", sp.AdminName)
- }
- result := new(struct {
- Code int
- Page *model.Page
- Order string
- Sort string
- Result []*model.SearchReply
- Message string
- })
- log.Warn("search params: %s", r.Params())
- err = r.Scan(c, &result)
- if err != nil || result.Code != 0 {
- log.Error("SearchReplyV3 r.Scan(%v) error:(%v)", c, err)
- return
- }
- res = new(model.SearchResult)
- res.Result = result.Result
- res.Code = result.Code
- res.Page = result.Page.Num
- res.PageSize = result.Page.Size
- if res.PageSize > 0 {
- res.PageCount = result.Page.Total / result.Page.Size
- }
- res.Total = result.Page.Total
- res.Order = result.Order
- res.Message = result.Message
- return
- }
- // SearchAdminLog search adminlog
- func (d *Dao) SearchAdminLog(c context.Context, rpids []int64) (res []*model.SearchAdminLog, err error) {
- if len(rpids) == 0 {
- return
- }
- r := d.es.NewRequest("reply_admin_log").Index("replyadminlog").Pn(int(1)).Ps(len(rpids)).WhereIn("rpid", rpids)
- result := new(struct {
- Code int
- Page *model.Page
- Order string
- Sort string
- Result []*model.SearchAdminLog
- Message string
- })
- log.Warn("search params: %s", r.Params())
- err = r.Scan(c, &result)
- if err != nil || result.Code != 0 {
- log.Error("SearchAdminLog r.Scan(%v) error:(%v)", c, err)
- return
- }
- res = result.Result
- return
- }
- // SearchReply search reply from ES.
- func (d *Dao) SearchReply(c context.Context, p *model.SearchParams, page, pageSize int64) (res *model.SearchResult, err error) {
- params := url.Values{}
- params.Set("appid", _searchIdxReply)
- params.Set("type", fmt.Sprint(p.Type))
- params.Set("sort", p.Sort)
- params.Set("order", p.Order)
- params.Set("page", fmt.Sprint(page))
- params.Set("pagesize", fmt.Sprint(pageSize))
- if p.Oid != 0 {
- params.Set("oid", strconv.FormatInt(p.Oid, 10))
- }
- if p.TypeIds != "" {
- params.Set("typeids", p.TypeIds)
- }
- if p.Keyword != "" {
- params.Set("keyword", p.Keyword)
- }
- if p.UID != 0 {
- params.Set("uid", strconv.FormatInt(p.UID, 10))
- }
- if p.Uname != "" {
- params.Set("nickname", p.Uname)
- }
- if p.AdminID != 0 {
- params.Set("adminid", strconv.FormatInt(p.AdminID, 10))
- }
- if p.Begin != zeroTime {
- params.Set("start_time", p.Begin.Format(model.DateFormat))
- }
- if p.End != zeroTime {
- params.Set("end_time", p.End.Format(model.DateFormat))
- }
- if p.States != "" {
- params.Set("states", p.States)
- }
- if p.IP != 0 {
- params.Set("ip", strconv.FormatInt(p.IP, 10))
- }
- if p.Attr != "" {
- params.Set("attr", p.Attr)
- }
- if p.AdminName != "" {
- params.Set("admin_name", p.AdminName)
- }
- res = &model.SearchResult{}
- uri := conf.Conf.Host.Search + _apiSearch
- if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
- log.Error("searchReply error(%v)", err)
- return
- }
- if res.Code != 0 {
- err = model.ErrSearchReply
- log.Error("searchReply:%+v error(%v)", res, err)
- }
- return
- }
- // SearchMonitor return search monitor reply from ES.
- func (d *Dao) SearchMonitor(c context.Context, sp *model.SearchMonitorParams, page, pageSize int64) (res *model.SearchMonitorResult, err error) {
- var (
- fields []string
- keywords []string
- order string
- sort = "desc"
- )
- // NOTE:这里之前order 跟 sort 搞反了
- if sp.Sort != "" {
- order = sp.Sort
- }
- if sp.Order != "" {
- sort = sp.Order
- }
- r := d.es.NewRequest("reply_monitor").Index(_searchIdxMonitor).
- WhereEq("type", fmt.Sprint(sp.Type)).
- Order(order, sort).Pn(int(page)).Ps(int(pageSize))
- // mode=0 所有监控方式, mode=1 monitor, mode=2, 先审后发
- if sp.Mode == 0 {
- r = r.WhereOr("monitor", true).WhereOr("audit", true)
- } else if sp.Mode == 1 {
- r = r.WhereEq("monitor", true)
- } else if sp.Mode == 2 {
- r = r.WhereEq("audit", true)
- }
- if sp.Oid > 0 {
- r = r.WhereEq("oid", fmt.Sprint(sp.Oid))
- }
- if sp.UID > 0 {
- r = r.WhereEq("mid", fmt.Sprint(sp.UID))
- }
- if sp.NickName != "" {
- fields = append(fields, "uname")
- keywords = append(keywords, sp.NickName)
- }
- if sp.Keyword != "" {
- fields = append(fields, "title")
- keywords = append(keywords, sp.Keyword)
- }
- if fields != nil && keywords != nil {
- r = r.WhereLike(fields, keywords, true, elastic.LikeLevelLow)
- }
- result := new(struct {
- Code int
- Page *model.Page
- Order string
- Sort string
- Result []*model.SearchMonitor
- Message string
- })
- res = &model.SearchMonitorResult{}
- log.Warn(r.Params())
- err = r.Scan(c, &result)
- if err != nil || result.Code != 0 {
- log.Error("r.Scan(%v) error:(%v)", c, err)
- return
- }
- res.Result = result.Result
- res.Code = result.Code
- res.Page = result.Page.Num
- res.PageSize = result.Page.Size
- if res.PageSize > 0 {
- res.PageCount = result.Page.Total / result.Page.Size
- }
- res.Total = result.Page.Total
- res.Order = result.Order
- res.Message = result.Message
- oids := make([]int64, len(res.Result))
- var tp int32
- for idx, r := range res.Result {
- oids[idx] = r.Oid
- tp = int32(r.Type)
- }
- results, err := d.SubMCount(c, oids, tp)
- if err != nil {
- log.Error("SubMCount(%v,%v) error", oids, tp)
- return
- }
- for i, reply := range res.Result {
- res.Result[i].MCount = results[reply.Oid]
- res.Result[i].OidStr = strconv.FormatInt(res.Result[i].Oid, 10)
- }
- return
- }
- // UpSearchMonitor update monitor to search data.
- func (d *Dao) UpSearchMonitor(c context.Context, sub *model.Subject, remark string) (err error) {
- m := make(map[string]interface{})
- m["oid"] = sub.Oid
- m["type"] = sub.Type
- if sub.AttrVal(model.SubAttrMonitor) == model.AttrYes {
- m["monitor"] = true
- } else {
- m["monitor"] = false
- }
- if sub.AttrVal(model.SubAttrAudit) == model.AttrYes {
- m["audit"] = true
- } else {
- m["audit"] = false
- }
- m["remark"] = remark
- us := d.es.NewUpdate("reply_monitor").Insert()
- us.AddData("replymonitor", m)
- err = us.Do(c)
- if err != nil {
- err = model.ErrSearchReport
- log.Error("upSearchMonitor error(%v)", err)
- return
- }
- return
- }
- // SearchReport search reports from ES.
- func (d *Dao) SearchReport(c context.Context, sp *model.SearchReportParams, page, pageSize int64) (res *model.SearchReportResult, err error) {
- params := url.Values{}
- params.Set("appid", "replyreport")
- params.Set("type", fmt.Sprint(sp.Type))
- params.Set("page", fmt.Sprint(page))
- params.Set("pagesize", fmt.Sprint(pageSize))
- if sp.Oid != 0 {
- params.Set("oid", fmt.Sprint(sp.Oid))
- }
- if sp.UID != 0 {
- params.Set("uid", fmt.Sprint(sp.UID))
- }
- if sp.Reason != "" {
- params.Set("reason", sp.Reason)
- }
- if sp.Typeids != "" {
- params.Set("typeids", sp.Typeids)
- }
- if sp.Keyword != "" {
- params.Set("keyword", sp.Keyword)
- }
- if sp.Nickname != "" {
- params.Set("nickname", sp.Nickname)
- }
- if sp.States != "" {
- params.Set("states", sp.States)
- }
- if sp.StartTime != "" {
- params.Set("start_time", sp.StartTime)
- }
- if sp.EndTime != "" {
- params.Set("end_time", sp.EndTime)
- }
- if sp.Order != "" {
- params.Set("order", sp.Order)
- }
- if sp.Sort != "" {
- params.Set("sort", sp.Sort)
- }
- res = &model.SearchReportResult{}
- uri := conf.Conf.Host.Search + _apiSearch
- if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
- log.Error("searchReport error(%v)", err)
- return
- }
- if res.Code != 0 {
- err = model.ErrSearchReport
- log.Error("searchReport:%+v error(%v)", res, err)
- }
- return
- }
- // MonitorStats return search monitor stats from ES.
- func (d *Dao) MonitorStats(c context.Context, mode, page, pageSize int64, adminIDs, sort, order, startTime, endTime string) (res *model.StatsMonitorResult, err error) {
- params := url.Values{}
- params.Set("appid", "replymonista")
- params.Set("mode", fmt.Sprint(mode))
- params.Set("page", fmt.Sprint(page))
- params.Set("pagesize", fmt.Sprint(pageSize))
- if adminIDs != "" {
- params.Set("adminids", adminIDs)
- params.Set("typeid", fmt.Sprint(model.MonitorStatsUser))
- } else {
- params.Set("typeid", fmt.Sprint(model.MonitorStatsAll))
- }
- if sort != "" {
- params.Set("sort", sort)
- }
- if order != "" {
- params.Set("order", order)
- }
- if startTime != "" {
- params.Set("start_time", startTime)
- }
- if endTime != "" {
- params.Set("end_time", endTime)
- }
- res = &model.StatsMonitorResult{}
- uri := conf.Conf.Host.Search + _apiSearch
- if err = d.httpClient.Get(c, uri, "", params, res); err != nil {
- log.Error("monitorStats error(%v)", err)
- return
- }
- if res.Code != 0 {
- err = model.ErrSearchMonitor
- log.Error("searchStats:%+v error(%v)", res, err)
- }
- return
- }
- // UpSearchReply update search reply index.
- func (d *Dao) UpSearchReply(c context.Context, rps map[int64]*model.Reply, newState int32) (err error) {
- if len(rps) <= 0 {
- return
- }
- stales := d.es.NewUpdate("reply_list")
- for _, rp := range rps {
- m := make(map[string]interface{})
- m["id"] = rp.ID
- m["state"] = newState
- m["mtime"] = rp.MTime.Time().Format("2006-01-02 15:04:05")
- m["oid"] = rp.Oid
- m["type"] = rp.Type
- if rp.Content != nil {
- m["message"] = rp.Content.Message
- }
- stales = stales.AddData(d.es.NewUpdate("reply_list").IndexByTime("reply_list", elastic.IndexTypeWeek, rp.CTime.Time()), m)
- }
- err = stales.Do(c)
- if err != nil {
- log.Error("upSearchReply update stales(%s) failed!err:=%v", stales.Params(), err)
- return
- }
- log.Info("upSearchReply:stale:%s ret:%+v", stales.Params(), err)
- return
- }
- // UpSearchReport update search report index.
- func (d *Dao) UpSearchReport(c context.Context, rpts map[int64]*model.Report, rpState *int32) (err error) {
- var res struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- }
- params := url.Values{}
- params.Set("appid", _searchIdxReport)
- values := make([]map[string]interface{}, 0)
- rps := make(map[int64]*model.Reply)
- for _, rpt := range rpts {
- if int64(rpt.ReplyCtime) != 0 && rpState != nil {
- rps[rpt.RpID] = &model.Reply{
- ID: rpt.RpID,
- Oid: rpt.Oid,
- Type: rpt.Type,
- MTime: rpt.MTime,
- CTime: rpt.ReplyCtime,
- State: *rpState,
- }
- }
- v := make(map[string]interface{})
- v["id"] = fmt.Sprintf("%d_%d_%d", rpt.RpID, rpt.Oid, rpt.Type)
- v["content"] = rpt.Content
- v["reason"] = rpt.Reason
- v["state"] = rpt.State
- v["mtime"] = rpt.MTime.Time().Format(_searchIdxTimeFormat)
- v["index_time"] = rpt.CTime.Time().Format(_searchIdxTimeFormat)
- if rpt.Attr == 1 {
- v["attr"] = []int{1}
- } else {
- v["attr"] = []int{}
- }
- if rpState != nil {
- v["reply_state"] = *rpState
- }
- values = append(values, v)
- }
- b, _ := json.Marshal(values)
- params.Set("val", string(b))
- // http post
- uri := conf.Conf.Host.Search + _apiSearchUpdate
- if err = d.httpClient.Post(c, uri, "", params, &res); err != nil {
- log.Error("upSearchReport error(%v)", err)
- }
- log.Info("upSearchReport:%s post:%s ret:%+v", uri, params.Encode(), res)
- if len(rps) != 0 && rpState != nil {
- err = d.UpSearchReply(c, rps, *rpState)
- }
- return
- }
|