123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760 |
- package service
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "go-common/library/database/elastic"
- "net/url"
- "regexp"
- "strings"
- "time"
- "go-common/app/job/main/reply/conf"
- "go-common/app/job/main/reply/model/reply"
- model "go-common/app/job/main/reply/model/reply"
- accmdl "go-common/app/service/main/account/api"
- assmdl "go-common/app/service/main/assist/model/assist"
- relmdl "go-common/app/service/main/relation/model"
- xsql "go-common/library/database/sql"
- "go-common/library/log"
- xhttp "go-common/library/net/http/blademaster"
- )
- var (
- _atReg = regexp.MustCompile(`@([^\s^:^,^@]+)`)
- _topicReg = regexp.MustCompile(`#([^\n^@^#^\x{1F000}-\x{1F02F}^\x{1F0A0}-\x{1F0FF}^\x{1F100}-\x{1F64F}^\x{1F680}-\x{1F6FF}^\x{1F910}-\x{1F96B}^\x{1F980}-\x{1F9E0}]{1,32})#`)
- _urlReg = regexp.MustCompile(`(((http:\/\/|https:\/\/)[a-z0-9A-Z]+\.(bilibili|biligame)\.com[a-z0-9A-Z\/\.\$\*\?~=#!%@&-]*)|((http:\/\/|https:\/\/)(acg|b23)\.tv[a-z0-9A-Z\/\.\$\*\?~=#!@&]*))`)
- _avReg = regexp.MustCompile(`#(cv\d+)|#(av\d+)|#(vc\d+)`)
- searchHTTPClient *xhttp.Client
- errReplyContentNotFound = errors.New("reply content not found")
- )
- const (
- _appIDReply = "reply"
- _appIDReport = "replyreport"
- timeFormat = "2006-01-02 15:03:04"
- // event
- _eventReply = "reply"
- _eventHate = "hate"
- _eventLike = "like"
- _eventLikeCancel = "like_cancel"
- _eventHateCancel = "hate_cancel"
- )
- func (s *Service) beginTran(c context.Context) (*xsql.Tx, error) {
- return s.dao.BeginTran(c)
- }
- func (s *Service) actionAdd(c context.Context, msg *consumerMsg) {
- var rp *model.Reply
- if err := json.Unmarshal([]byte(msg.Data), &rp); err != nil {
- log.Error("json.Unmarshal() error(%v)", err)
- return
- }
- if rp.RpID == 0 || rp.Oid == 0 || rp.Content == nil {
- log.Error("The structure of reply(%s) from rpCh was wrong", msg.Data)
- return
- }
- if rp.Root == 0 && rp.Parent == 0 {
- s.addReply(c, rp)
- } else {
- s.addReplyReply(c, rp)
- }
- }
- func (s *Service) tranAdd(c context.Context, rp *model.Reply, is bool) (err error) {
- tx, err := s.beginTran(c)
- if err != nil {
- log.Error("reply(%s) beginTran error(%v)", rp, err)
- return
- }
- var rows int64
- defer func() {
- if err == nil && rows == 0 {
- err = errors.New("sql: transaction add reply failed")
- }
- }()
- if is {
- if rp.IsNormal() {
- rows, err = s.dao.Subject.TxIncrCount(tx, rp.Oid, rp.Type, rp.CTime.Time())
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Subject.TxIncrCount(%v) error(%v) or rows==0", rp, err)
- return
- }
- } else {
- rows, err = s.dao.Subject.TxIncrFCount(tx, rp.Oid, rp.Type, rp.CTime.Time())
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Subject.TxIncrCount(%v) error(%v) or rows==0", rp, err)
- return
- }
- }
- } else {
- var rootReply *model.Reply
- if rootReply, err = s.dao.Reply.GetForUpdate(tx, rp.Oid, rp.Root); err != nil {
- tx.Rollback()
- return err
- }
- if rootReply.IsDeleted() {
- return fmt.Errorf("the root reply is deleted(%d,%d,%d)", rp.Oid, rp.Type, rp.Root)
- }
- if rp.IsNormal() {
- rows, err = s.dao.Reply.TxIncrCount(tx, rp.Oid, rp.Root, rp.CTime.Time())
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Reply.TxIncrCount(%v) error(%v) or rows==0", rp, err)
- return
- }
- rows, err = s.dao.Subject.TxIncrACount(tx, rp.Oid, rp.Type, 1, rp.CTime.Time())
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Subject.TxIncrACount(%v) error(%v) or rows==0", rp, err)
- return
- }
- } else {
- rows, err = s.dao.Reply.TxIncrFCount(tx, rp.Oid, rp.Root, rp.CTime.Time())
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Reply.TxIncrCount(%v) error(%v) or rows==0", rp, err)
- return
- }
- }
- }
- if rp.State == model.ReplyStateAudit || rp.State == model.ReplyStateMonitor {
- if rows, err = s.dao.Subject.TxIncrMCount(tx, rp.Oid, rp.Type, rp.CTime.Time()); err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Subject.TxIncrMCount(%v) error(%v) or rows==0", rp, err)
- return
- }
- }
- rows, err = s.dao.Content.TxInsert(tx, rp.Oid, rp.Content)
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Content.TxInContent(%v) error(%v) or rows==0", rp, err)
- return
- }
- rows, err = s.dao.Reply.TxInsert(tx, rp)
- if err != nil || rows == 0 {
- tx.Rollback()
- log.Error("dao.Reply.TxInReply(%v) error(%v) or rows==0", rp, err)
- return
- }
- return tx.Commit()
- }
- func (s *Service) regTopic(c context.Context, msg string) (topics []string) {
- msg = _urlReg.ReplaceAllString(msg, "")
- msg = _avReg.ReplaceAllString(msg, "#")
- ss := _topicReg.FindAllStringSubmatch(msg, -1)
- if len(ss) == 0 {
- return
- }
- for _, nns := range ss {
- if len(nns) == 2 {
- topic := strings.TrimSpace(nns[1])
- if len(topic) > 0 {
- topics = append(topics, topic)
- }
- }
- if len(topics) >= 5 {
- break
- }
- }
- return
- }
- func (s *Service) regAt(c context.Context, msg string, over, self int64) (ats []int64) {
- var err error
- ss := _atReg.FindAllStringSubmatch(msg, 10)
- if len(ss) == 0 {
- return
- }
- names := make([]string, 0, len(ss))
- for _, nns := range ss {
- if len(nns) == 2 {
- names = append(names, nns[1])
- }
- }
- if len(names) == 0 {
- return
- }
- us, err := s.accSrv.InfosByName3(c, &accmdl.NamesReq{Names: names})
- if err != nil {
- log.Error("s.accSrv.InfosByName2 failed, err(%v)", err)
- return
- }
- ats = make([]int64, 0, len(us.Infos))
- for mid := range us.Infos {
- if mid != over && mid != self {
- ats = append(ats, mid)
- }
- }
- if len(ats) == 0 {
- return
- }
- ats = s.getFilterBlacklist(c, self, ats)
- return
- }
- func (s *Service) addReply(c context.Context, rp *model.Reply) {
- var (
- err error
- ok bool
- )
- sub, err := s.getSubject(c, rp.Oid, rp.Type)
- if err != nil || sub == nil {
- log.Error("s.getSubject failed , oid(%d,%d) err(%v)", rp.Oid, rp.Type, err)
- return
- }
- if sub == nil {
- log.Error("get subject is nil oid(%d) type(%d)", rp.Oid, rp.Type)
- return
- }
- // init some field
- if rp.IsNormal() {
- sub.RCount = sub.RCount + 1
- sub.ACount = sub.ACount + 1
- }
- sub.Count = sub.Count + 1
- rp.Floor = sub.Count
- rp.MTime = rp.CTime
- rp.Content.RpID = rp.RpID
- rp.Content.CTime = rp.CTime
- rp.Content.MTime = rp.MTime
- if len(rp.Content.Ats) == 0 {
- rp.Content.Ats = s.regAt(c, rp.Content.Message, 0, rp.Mid)
- }
- rp.Content.Topics = s.regTopic(c, rp.Content.Message)
- // begin transaction
- if err = s.tranAdd(c, rp, true); err != nil {
- log.Error("Transaction add reply(%v) error(%v)", rp, err)
- return
- }
- // add cache
- if err = s.dao.Mc.AddSubject(c, sub); err != nil {
- log.Error("s.dao.Mc.AddSubject failed , oid(%d) err(%v)", sub.Oid, err)
- }
- if err = s.dao.Mc.AddReply(c, rp); err != nil {
- log.Error("s.dao.Mc.AddReply failed , RpID(%d) err(%v)", rp.RpID, err)
- }
- if rp.IsNormal() {
- // update reply count
- s.upAcount(c, sub.Oid, sub.Type, sub.ACount, rp.CTime.Time())
- // add index cache
- if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByFloor); err == nil && ok {
- if err = s.dao.Redis.AddFloorIndex(c, sub.Oid, sub.Type, rp); err != nil {
- log.Error("s.dao.Redis.AddFloorIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
- }
- }
- if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByCount); err == nil && ok {
- if err = s.dao.Redis.AddCountIndex(c, sub.Oid, sub.Type, rp); err != nil {
- log.Error("s.dao.Redis.AddCountIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
- }
- }
- if ok, err = s.dao.Redis.ExpireIndex(c, sub.Oid, sub.Type, model.SortByLike); err == nil && ok {
- rpts := make(map[int64]*reply.Report, 1)
- if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
- rpts[rp.RpID] = rpt
- }
- if err = s.dao.Redis.AddLikeIndex(c, sub.Oid, sub.Type, rpts, rp); err != nil {
- log.Error("s.dao.Redis.AddLikeIndex failed , oid(%d) type(%d) err(%v)", sub.Oid, sub.Type, err)
- }
- }
- s.notifyReply(c, sub, rp)
- } else if rp.State == model.ReplyStateAudit {
- if err = s.dao.Redis.AddAuditIndex(c, rp); err != nil {
- log.Error("s.dao.Redis.AddAUditIndex(%d,%d,%d) error(%v)", rp.Oid, rp.RpID, rp.Type, err)
- }
- }
- if err = s.dao.PubEvent(c, _eventReply, rp.Mid, sub, rp, nil); err != nil {
- return
- }
- }
- func (s *Service) addReplyReply(c context.Context, rp *model.Reply) {
- var (
- err error
- ok bool
- )
- sub, err := s.getSubject(c, rp.Oid, rp.Type)
- if err != nil || sub == nil {
- log.Error("s.getSubject failed , oid(%d,%d) err(%v)", rp.Oid, rp.Type, err)
- return
- }
- // NOTE:depend on db,do not get from cache
- rootRp, err := s.getReply(c, rp.Oid, rp.Root)
- if err != nil {
- log.Error("s.getReply failed , oid(%d), root(%d) err(%v)", rp.Oid, rp.Root, err)
- return
- }
- if rootRp == nil {
- log.Error("get reply is nil oid(%d) type(%d) rpid(%d)", rp.Oid, rp.Type, rp.Root)
- return
- }
- var parentRp *model.Reply
- if rp.Root != rp.Parent {
- parentRp, err = s.getReply(c, rp.Oid, rp.Parent)
- if err != nil {
- log.Error("s.getReply failed , oid(%d), parent(%d) err(%v)", rp.Oid, rp.Parent, err)
- return
- }
- if parentRp == nil {
- log.Error("get reply is nil oid(%d) type(%d) rpid(%d)", rp.Oid, rp.Type, rp.Parent)
- return
- }
- if parentRp.Dialog == 0 {
- log.Warn("Dialog Need Migration oid(%d) type(%d) rootID(%d)", rp.Oid, rp.Type, rootRp.RpID)
- // s.setDialogByRoot(context.Background(), rp.Oid, rp.Type, rp.Root)
- }
- rp.Dialog = parentRp.Dialog
- } else {
- parentRp = rootRp
- if rp.Dialog != rp.RpID {
- rp.Dialog = rp.RpID
- }
- }
- // init some field
- if rp.IsNormal() {
- sub.ACount = sub.ACount + 1
- rootRp.RCount = rootRp.RCount + 1
- }
- rootRp.Count = rootRp.Count + 1
- rootRp.MTime = rp.CTime
- rp.Floor = rootRp.Count
- rp.MTime = rp.CTime
- rp.Content.RpID = rp.RpID
- rp.Content.CTime = rp.CTime
- rp.Content.MTime = rp.MTime
- if len(rp.Content.Ats) == 0 {
- rp.Content.Ats = s.regAt(c, rp.Content.Message, 0, rp.Mid)
- }
- rp.Content.Topics = s.regTopic(c, rp.Content.Message)
- // begin transaction
- if err = s.tranAdd(c, rp, false); err != nil {
- log.Error("Transaction add reply(%v) error(%v)", rp, err)
- return
- }
- // add cache
- if err = s.dao.Mc.AddSubject(c, sub); err != nil {
- log.Error("s.dao.Mc.AddSubject failed , oid(%d), err(%v)", sub.Oid, err)
- }
- if err = s.dao.Mc.AddReply(c, rp); err != nil {
- log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rp.RpID, err)
- }
- if err = s.dao.Mc.AddReply(c, rootRp); err != nil {
- log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rootRp.RpID, err)
- }
- if rootRp.IsTop() {
- if err = s.dao.Mc.AddTop(c, rootRp); err != nil {
- log.Error("s.dao.Mc.AddReply failed , RpID(%d), err(%v)", rootRp.RpID, err)
- }
- } else if rootRp.IsNormal() {
- if ok, err = s.dao.Redis.ExpireIndex(c, rp.Oid, rp.Type, model.SortByCount); err == nil && ok {
- s.dao.Redis.AddCountIndex(c, rp.Oid, rp.Type, rootRp)
- }
- }
- if rp.IsNormal() {
- // update reply count
- s.upAcount(c, sub.Oid, sub.Type, sub.ACount, rp.CTime.Time())
- // add index cache
- if ok, err = s.dao.Redis.ExpireNewChildIndex(c, rootRp.RpID); err == nil && ok {
- if err = s.dao.Redis.AddNewChildIndex(c, rootRp.RpID, rp); err != nil {
- log.Error("s.dao.Redis.AddFloorIndexByRoot failed , RpID(%d), err(%v)", rootRp.RpID, err)
- }
- }
- // add dialog cache
- if rp.Dialog != 0 {
- if ok, err = s.dao.Redis.ExpireDialogIndex(c, rp.Dialog); err == nil && ok {
- rps := []*model.Reply{rp}
- if err = s.dao.Redis.AddDialogIndex(c, rp.Dialog, rps); err != nil {
- log.Error("s.dao.Redis.AddDialogIndex failed , RpID(%d), Dialog(%d), Floor(%d) err(%v)", rp.RpID, rp.Dialog, rp.Floor, err)
- }
- }
- }
- s.notifyReplyReply(c, sub, rootRp, parentRp, rp)
- } else if rp.State == model.ReplyStateAudit {
- if err = s.dao.Redis.AddAuditIndex(c, rp); err != nil {
- log.Error("s.dao.Redis.AddAUditIndex(%d,%d,%d) error(%v)", rp.Oid, rp.RpID, rp.Type, err)
- }
- }
- if err = s.dao.PubEvent(c, _eventReply, rp.Mid, sub, rp, nil); err != nil {
- return
- }
- }
- func (s *Service) addTopCache(c context.Context, msg *consumerMsg) {
- var (
- err error
- sub *model.Subject
- rp *model.Reply
- )
- var d struct {
- Oid int64 `json:"oid"`
- Tp int8 `json:"tp"`
- Top uint32 `json:"top"`
- }
- if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
- log.Error("json.Unmarshal() error(%v)", err)
- return
- }
- if rp, err = s.dao.Mc.GetTop(c, d.Oid, d.Tp, d.Top); err != nil {
- log.Error("s.dao.Mc.GetTop(oid %v,top %v) err(%v)", d.Oid, d.Top, err)
- return
- } else if rp == nil {
- if rp, err = s.dao.Reply.GetTop(c, d.Oid, d.Tp, d.Top); err != nil || rp == nil {
- log.Error("s.dao.Reply.GetTop(%d, %d) error(%v)", d.Oid, d.Tp, err)
- return
- }
- if rp.Content, err = s.dao.Content.Get(c, d.Oid, rp.RpID); err != nil {
- return
- }
- s.dao.Mc.AddTop(c, rp)
- sub, err = s.dao.Subject.Get(c, d.Oid, d.Tp)
- if err != nil {
- log.Error("s.dao.Subject.Get(%d, %d) error(%v)", d.Oid, d.Tp, err)
- return
- }
- err = sub.TopSet(rp.RpID, d.Top, 1)
- if err != nil {
- return
- }
- _, err = s.dao.Subject.UpMeta(c, d.Oid, d.Tp, sub.Meta, time.Now())
- if err != nil {
- log.Error("s.dao.Subject.UpMeta(%d,%d,%d) failed!err:=%v ", rp.RpID, rp.Oid, d.Tp, err)
- return
- }
- s.dao.Mc.AddSubject(c, sub)
- }
- }
- func (s *Service) actionRpt(c context.Context, msg *consumerMsg) {
- var (
- err error
- ok bool
- )
- var d struct {
- Oid int64 `json:"oid"`
- RpID int64 `json:"rpid"`
- Tp int8 `json:"tp"`
- }
- if err = json.Unmarshal([]byte(msg.Data), &d); err != nil {
- log.Error("json.Unmarshal() error(%v)", err)
- return
- }
- rp, err := s.getReplyCache(c, d.Oid, d.RpID)
- if err != nil {
- log.Error("s.getReply failed , oid(%d), RpID(%d) err(%v)", d.Oid, d.RpID, err)
- return
- }
- if rp == nil {
- return
- }
- sub, err := s.getSubject(c, d.Oid, d.Tp)
- if err != nil || sub == nil {
- log.Error("s.getSubject failed , oid(%d),tp(%d), RpID(%d) err(%v)", d.Oid, d.Tp, d.RpID, err)
- return
- }
- // update like index
- if rp.Root == 0 && rp.Parent == 0 && !rp.IsDeleted() {
- if ok, err = s.dao.Redis.ExpireIndex(c, d.Oid, rp.Type, model.SortByLike); err == nil && ok {
- rpts := make(map[int64]*reply.Report, 1)
- if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
- rpts[rp.RpID] = rpt
- }
- if err = s.dao.Redis.AddLikeIndex(c, d.Oid, rp.Type, rpts, rp); err != nil {
- log.Error("s.dao.Redis.AddLikeIndex(%d, %d) error(%v)", d.Oid, rp.Type, err)
- }
- }
- }
- report, err := s.dao.Report.Get(c, d.Oid, d.RpID)
- if err != nil || report == nil {
- log.Error("dao.Report.GetReport(%d, %d) met error (%v)", rp.Oid, rp.RpID, err)
- return
- }
- if err = s.dao.PubEvent(c, _eventReportAdd, report.Mid, sub, rp, report); err != nil {
- return
- }
- }
- func (s *Service) setLike(c context.Context, cmsg *StatMsg) {
- var (
- event string
- )
- rp, err := s.getReply(c, cmsg.Oid, cmsg.ID)
- if err != nil || rp == nil || rp.Content == nil {
- log.Error("s.getReply(%d, %d) reply:%+v error(%v)", cmsg.Oid, cmsg.ID, rp, err)
- return
- }
- sub, err := s.getSubject(c, rp.Oid, rp.Type)
- if err != nil || sub == nil {
- log.Error("s.getSubject failed , oid(%d) type(%d) err(%v)", rp.Oid, rp.Type, err)
- return
- }
- _, err = s.dao.Reply.UpLike(c, cmsg.Oid, cmsg.ID, cmsg.Count, cmsg.DislikeCount, time.Now())
- if err != nil {
- log.Error("s.dao.Reply.UpLike (%v) failed!err:=%v", cmsg, err)
- return
- }
- if cmsg.Count > rp.Like {
- event = _eventLike
- var max int
- if max, err = s.dao.Redis.MaxLikeCnt(c, rp.RpID); err == nil && cmsg.Count > max {
- if err = s.dao.Redis.SetMaxLikeCnt(c, rp.RpID, int64(cmsg.Count)); err == nil {
- rp.Like = cmsg.Count
- rp.Hate = cmsg.DislikeCount
- s.notifyLike(c, cmsg.Mid, rp)
- }
- }
- } else if cmsg.DislikeCount > rp.Hate {
- event = _eventHate
- } else if cmsg.Count < rp.Like {
- event = _eventLikeCancel
- } else {
- event = _eventHateCancel
- }
- rp.Like = cmsg.Count
- rp.Hate = cmsg.DislikeCount
- s.dao.Mc.AddReply(c, rp)
- if rp.AttrVal(model.ReplyAttrAdminTop) == 1 || rp.AttrVal(model.ReplyAttrUpperTop) == 1 {
- s.dao.Mc.AddTop(c, rp)
- return
- }
- // if have root, then update root's index
- if rp.Root == 0 && rp.IsNormal() {
- var ok bool
- if ok, err = s.dao.Redis.ExpireIndex(c, rp.Oid, rp.Type, model.SortByLike); err == nil && ok {
- rpts := make(map[int64]*reply.Report, 1)
- if rpt, _ := s.dao.Report.Get(c, rp.Oid, rp.RpID); rpt != nil {
- rpts[rp.RpID] = rpt
- }
- if err = s.dao.Redis.AddLikeIndex(c, rp.Oid, rp.Type, rpts, rp); err != nil {
- log.Error("s.dao.Redis.AddLikeIndex(%d, %d) error(%v)", rp.Oid, rp.Type, err)
- }
- }
- }
- if err = s.dao.PubEvent(c, event, cmsg.Mid, sub, rp, nil); err != nil {
- return
- }
- }
- func (s *Service) adminLog(c context.Context, rp *model.Reply, adid int64, isreport, state int8, result, remark string) {
- // admin log
- s.dao.Admin.UpIsNotNew(c, rp.RpID, time.Now())
- s.dao.Admin.Insert(c, adid, rp.Oid, rp.RpID, rp.Type, result, remark, model.AdminIsNew, isreport, state, time.Now())
- }
- // getSubject get reply subject from mysql .
- // NOTE : note get from mc,count must depend on mysql
- func (s *Service) getSubject(c context.Context, oid int64, tp int8) (sub *model.Subject, err error) {
- if sub, err = s.dao.Subject.Get(c, oid, tp); err != nil {
- log.Error("dao.Subject.Get(%d, %d) error(%v)", oid, tp, err)
- }
- return
- }
- func (s *Service) getReply(c context.Context, oid, RpID int64) (rp *model.Reply, err error) {
- if rp, err = s.dao.Reply.Get(c, oid, RpID); err != nil {
- log.Error("s.dao.Reply.Get(%d, %d) error(%v)", oid, RpID, err)
- return
- } else if rp == nil {
- return
- }
- if rp.Content, err = s.dao.Content.Get(c, rp.Oid, rp.RpID); err != nil {
- log.Error("s.dao.Content.Get(%d,%d) error(%v)", rp.Oid, rp.RpID, err)
- } else if rp.Content == nil {
- err = errReplyContentNotFound
- }
- return
- }
- func (s *Service) getReplyCache(c context.Context, oid, RpID int64) (rp *model.Reply, err error) {
- if rp, err = s.dao.Mc.GetReply(c, RpID); err != nil {
- log.Error("replyCacheDao.GetReply(%d, %d) error(%v)", oid, RpID, err)
- }
- if rp != nil {
- return
- }
- if rp, err = s.dao.Reply.Get(c, oid, RpID); err != nil {
- log.Error("dao.Reply.GetReply(%d, %d) error(%v)", oid, RpID, err)
- }
- if rp != nil {
- rp.Content, _ = s.dao.Content.Get(c, rp.Oid, rp.RpID)
- // NOTE not add member info to cache
- }
- return
- }
- func (s *Service) upAcount(c context.Context, oid int64, tp int8, count int, now time.Time) {
- s.statDao.Send(c, tp, oid, count)
- }
- func (s *Service) callSearchUp(c context.Context, res map[string]*searchFlush) (err error) {
- var (
- rps []*searchFlush
- rpts []*searchFlush
- )
- for _, r := range res {
- if r.Report != nil {
- rpts = append(rpts, r)
- } else {
- rps = append(rps, r)
- }
- }
- if len(rps) > 0 {
- err = s.callSearch(c, rps, false)
- }
- if len(rpts) > 0 {
- err = s.callSearch(c, rpts, true)
- }
- return
- }
- // callSearch update reply or report info to ES search.
- func (s *Service) callSearch(c context.Context, params []*searchFlush, isRpt bool) (err error) {
- var (
- b []byte
- ms []map[string]interface{}
- p = url.Values{}
- urlStr = conf.Conf.Host.Search + "/api/reply/internal/update"
- res struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- }
- )
- // 更新搜索ES数据字段
- if isRpt {
- // report
- p.Set("appid", _appIDReport)
- for _, p := range params {
- m := make(map[string]interface{})
- m["id"] = fmt.Sprintf("%d_%d_%d", p.Reply.RpID, p.Reply.Oid, p.Reply.Type)
- m["reply_state"] = fmt.Sprintf("%d", p.Reply.State)
- m["reason"] = fmt.Sprintf("%d", p.Report.Reason)
- m["content"] = p.Report.Content
- m["state"] = fmt.Sprintf("%d", p.Report.State)
- m["mtime"] = p.Report.MTime.Time().Format(timeFormat)
- m["index_time"] = p.Report.CTime.Time().Format(timeFormat)
- if p.Report.Attr == 1 {
- m["attr"] = []int{1}
- } else {
- m["attr"] = []int{}
- }
- ms = append(ms, m)
- }
- if b, err = json.Marshal(ms); err != nil {
- log.Error("json.Marshal(%v) error(%v)", ms, err)
- return
- }
- p.Set("val", string(b))
- if err = searchHTTPClient.Post(c, urlStr, "", p, &res); err != nil {
- log.Error("xhttp.Post(%s) failed error(%v)", urlStr+"?"+p.Encode(), err)
- }
- log.Info("updateSearch: %s post:%s ret:%v", urlStr, p.Encode(), res)
- } else {
- // reply
- var rps = make(map[int64]*model.Reply)
- for _, p := range params {
- rps[p.Reply.RpID] = p.Reply
- }
- err = s.UpSearchReply(c, rps)
- }
- return
- }
- // UpSearchReply update search reply index.
- func (s *Service) UpSearchReply(c context.Context, rps map[int64]*model.Reply) (err error) {
- if len(rps) <= 0 {
- return
- }
- stales := s.es.NewUpdate("reply_list")
- for _, rp := range rps {
- m := make(map[string]interface{})
- m["id"] = rp.RpID
- m["state"] = rp.State
- m["mtime"] = rp.MTime.Time().Format("2006-01-02 15:04:05")
- m["oid"] = rp.Oid
- m["type"] = rp.Type
- if rp.Content != nil {
- m["message"] = rp.Content.Message
- }
- stales = stales.AddData(s.es.NewUpdate("reply_list").IndexByTime("reply_list", elastic.IndexTypeWeek, rp.CTime.Time()), m)
- }
- err = stales.Do(c)
- if err != nil {
- log.Error("upSearchReply update stales(%s) failed!err:=%v", stales.Params(), err)
- return
- }
- log.Info("upSearchReply:stale:%s ret:%+v", stales.Params(), err)
- return
- }
- // getBlackListRelation check if the source user blacklisted the target user
- func (s *Service) getBlackListRelation(c context.Context, srcID, targetID int64) (rel bool) {
- relMap, err := s.accSrv.RichRelations3(c, &accmdl.RichRelationReq{Owner: srcID, Mids: []int64{targetID}, RealIp: ""})
- if err != nil {
- log.Error("s.acc.RichRelations2 sourceId(%v) targetId(%v)error(%v)", srcID, targetID, err)
- err = nil
- return false
- }
- if len(relMap.RichRelations) == 0 {
- return false
- }
- if rel, ok := relMap.RichRelations[targetID]; ok && relmdl.Attr(uint32(rel)) == relmdl.AttrBlack {
- return true
- }
- return false
- }
- // getFilterBlacklist filters the user list that the mid user can notify message for
- func (s *Service) getFilterBlacklist(c context.Context, mid int64, targetIds []int64) (filterIds []int64) {
- filterIds = make([]int64, 0, len(targetIds))
- for _, tmp := range targetIds {
- if !s.getBlackListRelation(c, tmp, mid) {
- filterIds = append(filterIds, tmp)
- }
- }
- return
- }
- func (s *Service) addAssistLog(c context.Context, mid, uid, subjectID, typeID, action int64, objectID, content string) (err error) {
- if len(content) > 50 {
- content = substr2(content, 0, 50) + "..."
- }
- arg := &assmdl.ArgAssistLogAdd{
- Mid: mid,
- AssistMid: uid,
- Type: 1,
- Action: 1,
- SubjectID: subjectID,
- ObjectID: objectID,
- Detail: content,
- RealIP: "",
- }
- if err = s.assistSrv.AssistLogAdd(c, arg); err != nil {
- log.Error("s.assistSrv.Assist(%d, %d, %d, %d, %d) error(%v)", mid, uid, subjectID, typeID, action, err)
- }
- return
- }
- func substr2(str string, start int, subLength int) string {
- rs := []rune(str)
- length := len(rs)
- if start < 0 || start > length {
- start = 0
- }
- if subLength < 0 || subLength > length {
- subLength = length
- }
- return string(rs[start:subLength])
- }
|