subproc.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "runtime/debug"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/member/model"
  9. share "go-common/app/service/main/share/model"
  10. "go-common/library/log"
  11. "go-common/library/net/ip"
  12. "github.com/pkg/errors"
  13. )
  14. const (
  15. _DedeMember = "dede_member"
  16. _AsoAccount = "aso_account"
  17. _DedeMemberPerson = "dede_member_person"
  18. _DedeMemberSpace = "dede_member_space"
  19. _MemberBaseInfo = "user_base_"
  20. _MemberMoral = "user_moral"
  21. _memberExp = "user_exp_"
  22. _DedeMemberTags = "dede_member_tags"
  23. _dedeMemberMoral = "dede_member_moral"
  24. _memberRealnameApply = "realname_apply"
  25. _memberRealnameInfo = "realname_info"
  26. _retry = 3
  27. )
  28. var (
  29. _shareVideoType = map[int]struct{}{
  30. 1: {},
  31. 3: {},
  32. }
  33. )
  34. // subproc databus sub
  35. func (s *Service) subproc() {
  36. var err error
  37. var c = context.TODO()
  38. for res := range s.ds.Messages() {
  39. mu := &model.Message{}
  40. if err = json.Unmarshal(res.Value, mu); err != nil {
  41. log.Error("member-job,json.Unmarshal (%v) error(%v)", string(res.Value), err)
  42. continue
  43. }
  44. for i := 0; i < _retry; i++ {
  45. if strings.HasPrefix(mu.Table, _MemberBaseInfo) {
  46. var (
  47. rank = &struct {
  48. Mid int64 `json:"mid"`
  49. Rank int64 `json:"rank"`
  50. }{}
  51. )
  52. oldRank := int64(5000)
  53. if mu.Old != nil && len(mu.Old) != 0 {
  54. if err = json.Unmarshal(mu.Old, rank); err != nil {
  55. log.Error("json.Unmarsha(%s) error(%v)", string(mu.Old), err)
  56. break
  57. }
  58. oldRank = rank.Rank
  59. }
  60. if err = json.Unmarshal(mu.New, rank); err != nil {
  61. log.Error("json.Unmarsha(%s) error(%v)", string(mu.New), err)
  62. break
  63. }
  64. newRank := rank.Rank
  65. if oldRank <= 5000 && newRank >= 10000 {
  66. if err = s.initExp(c, rank.Mid); err != nil {
  67. log.Error("s.initExp(%d) error(%v)", rank.Mid, err)
  68. }
  69. }
  70. if err = s.dao.DelBaseInfoCache(c, rank.Mid); err != nil {
  71. continue
  72. }
  73. // sync face to old account .
  74. s.updateAccFace(c, rank.Mid) //todo delete
  75. // TODO: with update face or name to purge cache at the same time
  76. if err = s.dao.NotifyPurgeCache(c, rank.Mid, model.ActUpdateFace); err != nil {
  77. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%v)", rank.Mid, model.ActUpdateFace, err)
  78. break
  79. }
  80. item := &Item{
  81. Mid: rank.Mid,
  82. Time: time.Now(),
  83. Action: model.ActUpdateUname,
  84. }
  85. if err = s.cachepq.Put(item); err != nil {
  86. log.Error("Failed to put into cachepq with item: %+v: %+v", item, err)
  87. err = nil
  88. }
  89. log.Info("Notify to purge cache with mid(%d) action(%s) message(old: %s, new: %s)", rank.Mid, model.ActUpdateFace, string(mu.Old), string(mu.New))
  90. } else if mu.Table == _MemberMoral {
  91. var p = &model.MemberMid{}
  92. if err = json.Unmarshal(mu.New, p); err != nil {
  93. log.Error("member-job,json.Unmarshal (%v) error(%v)", string(mu.New), err)
  94. break
  95. }
  96. if err = s.dao.DelMoralCache(c, p.Mid); err != nil {
  97. continue
  98. }
  99. if err = s.dao.NotifyPurgeCache(c, p.Mid, model.ActUpdateMoral); err != nil {
  100. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%v)", p.Mid, model.ActUpdateMoral, err)
  101. break
  102. }
  103. } else if mu.Table == _memberRealnameInfo || mu.Table == _memberRealnameApply {
  104. var p = &struct {
  105. Mid int64 `json:"mid"`
  106. Status model.RealnameApplyStatus `json:"status"`
  107. }{}
  108. if err = json.Unmarshal(mu.New, p); err != nil {
  109. log.Error("member-job,json.Unmarshal (%v) error(%+v)", string(mu.New), err)
  110. break
  111. }
  112. if err = s.dao.DeleteRealnameCache(c, p.Mid); err != nil {
  113. log.Error("Delete RealnameCache cache err : %+v", err)
  114. continue
  115. }
  116. if err = s.dao.NotifyPurgeCache(c, p.Mid, "updateRealname"); err != nil {
  117. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%+v)", p.Mid, "updateRealname", err)
  118. break
  119. }
  120. log.Info("Notify to purge realname cache with mid(%d) action(%s) message(old: %s, new: %s)", p.Mid, "updateRealname", string(mu.Old), string(mu.New))
  121. if p.Status.IsPass() {
  122. // 尝试补发一次经验
  123. s.addExp(context.TODO(), &model.AddExp{
  124. Mid: p.Mid,
  125. IP: ip.InternalIP(),
  126. Ts: time.Now().Unix(),
  127. Event: "identify",
  128. })
  129. }
  130. if mu.Table == _memberRealnameInfo {
  131. s.syncParsedRealnameInfo(c, p.Mid)
  132. }
  133. } else if strings.HasPrefix(mu.Table, _memberExp) {
  134. var (
  135. p = &model.MemberMid{}
  136. exp *model.NewExp
  137. )
  138. if err = json.Unmarshal(mu.New, p); err != nil {
  139. log.Error("s.subproc() table(%s) json.Unmarshal() error(%v)", mu.Table, err)
  140. break
  141. }
  142. if exp, err = s.dao.SelExp(c, p.Mid); err != nil {
  143. log.Error("s.dao.SelNewExp(%d) error(%v)", p.Mid, err)
  144. break
  145. }
  146. if err = s.dao.SetExpCache(c, exp.Mid, exp.Exp); err != nil {
  147. log.Error("Failed to set exp cache: %+v: %+v", exp, err)
  148. break
  149. }
  150. log.Info("s.dao.SetExpCache(%d) set exp cache complete. exp(%d)", exp.Mid, exp.Exp)
  151. expChange, levelChange := isExpAndLevelChange(mu)
  152. if expChange {
  153. log.Info("Notify to purge cache with mid(%d) action(%s) message(old: %s, new: %s)", p.Mid, model.ActUpdateExp, string(mu.Old), string(mu.New))
  154. if err = s.dao.NotifyPurgeCache(c, p.Mid, model.ActUpdateExp); err != nil {
  155. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%v)", p.Mid, model.ActUpdateExp, err)
  156. break
  157. }
  158. }
  159. if levelChange {
  160. log.Info("Notify to purge cache with mid(%d) action(%s) message(old: %s, new: %s)", p.Mid, model.ActUpdateLevel, string(mu.Old), string(mu.New))
  161. if err = s.dao.NotifyPurgeCache(c, p.Mid, model.ActUpdateLevel); err != nil {
  162. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%v)", p.Mid, model.ActUpdateLevel, err)
  163. break
  164. }
  165. }
  166. }
  167. if err == nil {
  168. break
  169. }
  170. }
  171. if err = res.Commit(); err != nil {
  172. log.Error("databus.Commit err(%v)", err)
  173. }
  174. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  175. }
  176. }
  177. // subproc databus sub
  178. func (s *Service) accSubproc() {
  179. var (
  180. err error
  181. )
  182. for res := range s.accDs.Messages() {
  183. mu := &model.Message{}
  184. ms := &struct {
  185. Mid int64 `json:"mid"`
  186. }{}
  187. if err = json.Unmarshal(res.Value, mu); err != nil {
  188. log.Error("member-job,json.Unmarshal (%v) error(%v)", string(res.Value), err)
  189. continue
  190. }
  191. if err = json.Unmarshal(mu.New, ms); err != nil {
  192. log.Error("json.Unmarsha(%s) error(%v)", string(mu.New), ms)
  193. continue
  194. }
  195. for num := 0; num < 3; num++ {
  196. switch {
  197. //sex,face,rank
  198. case strings.HasPrefix(mu.Table, _DedeMember) && len(mu.Table) < 14:
  199. //err = s.setFace(c, ms.Mid)
  200. //birthday,dating,place,marital
  201. case mu.Table == _DedeMemberPerson:
  202. //sign
  203. case mu.Table == _DedeMemberSpace:
  204. //err = s.setSign(c, ms.Mid)
  205. //tag
  206. case strings.HasPrefix(mu.Table, _DedeMemberTags):
  207. //moral
  208. case mu.Table == _dedeMemberMoral:
  209. }
  210. if err != nil {
  211. log.Error("accSubproc err(%v)", err)
  212. time.Sleep(1 * time.Second)
  213. continue
  214. }
  215. break
  216. }
  217. if err = res.Commit(); err != nil {
  218. log.Error("databus.Commit err(%v)", err)
  219. }
  220. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  221. }
  222. }
  223. // subproc databus sub.
  224. func (s *Service) passportSubproc() {
  225. var err error
  226. for res := range s.passortDs.Messages() {
  227. mu := &model.Message{}
  228. ms := &struct {
  229. Mid int64 `json:"mid"`
  230. }{}
  231. if err = json.Unmarshal(res.Value, mu); err != nil {
  232. log.Error("member-job,json.Unmarshal (%v) error(%v)", string(res.Value), err)
  233. continue
  234. }
  235. if err = json.Unmarshal(mu.New, ms); err != nil {
  236. log.Error("json.Unmarsha(%s) error(%v)", string(mu.New), ms)
  237. continue
  238. }
  239. for num := 0; num < 3; num++ {
  240. //name
  241. if mu.Table == _AsoAccount {
  242. if mu.Action != "delete" {
  243. err = s.setName(ms.Mid)
  244. }
  245. }
  246. if err != nil {
  247. log.Error("passportSubproc err(%v)", err)
  248. time.Sleep(1 * time.Second)
  249. continue
  250. }
  251. break
  252. }
  253. if err = res.Commit(); err != nil {
  254. log.Error("databus.Commit err(%v)", err)
  255. }
  256. log.Info("subproc key:%v,topic: %v, part:%v offset:%v,message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  257. }
  258. }
  259. func (s *Service) logproc() {
  260. for {
  261. mu := <-s.logDatabus.Messages()
  262. l := &model.UserLog{}
  263. err := json.Unmarshal(mu.Value, l)
  264. if err != nil {
  265. log.Error("Failed to parse log databus message value: value(%s): err: %+v", string(mu.Value), err)
  266. continue
  267. }
  268. // send log to report
  269. s.dao.AddExpLog(context.TODO(), l)
  270. // send log to origin hbase
  271. // content := make(map[string][]byte, len(l.Content))
  272. // for k, v := range l.Content {
  273. // content[k] = []byte(v)
  274. // }
  275. // content["ip"] = []byte(l.IP)
  276. // for i := 0; i < 3; i++ {
  277. // err := s.dao.AddLog(context.TODO(), l.Mid, l.TS, content, model.TableExpLog)
  278. // if err == nil {
  279. // break
  280. // }
  281. // log.Error("addlog mid %d err %v", l.Mid, err)
  282. // time.Sleep(time.Millisecond * 500)
  283. // }
  284. log.Info("consumer key:%s,message:%s", mu.Key, mu.Value)
  285. mu.Commit()
  286. }
  287. }
  288. func (s *Service) expproc() {
  289. for {
  290. mu := <-s.expDatabus.Messages()
  291. ex := new(model.AddExp)
  292. err := json.Unmarshal(mu.Value, ex)
  293. if err != nil {
  294. log.Error("s.expproc() json.Unmarshal error(%v)", err)
  295. mu.Commit()
  296. continue
  297. }
  298. try := 0
  299. success := false
  300. for {
  301. if err = s.addExp(context.TODO(), ex); err == nil {
  302. success = true
  303. break
  304. }
  305. try++
  306. if try > 3 {
  307. log.Error("Failed to add exp, try 3 times mid: %d error: %+v", ex.Mid, err)
  308. mu.Commit()
  309. break
  310. }
  311. time.Sleep(time.Millisecond * 500)
  312. }
  313. if !success {
  314. continue
  315. }
  316. // 如果是一个观看视频的消息就尝试补发一下登录奖励
  317. if ex.Event == "view" {
  318. s.addExp(context.TODO(), &model.AddExp{
  319. Mid: ex.Mid,
  320. IP: ex.IP,
  321. Ts: ex.Ts,
  322. Event: "login",
  323. })
  324. s.recoverMoral(context.TODO(), ex.Mid)
  325. }
  326. log.Info("expproc consumer key:%s,value: %s", mu.Key, mu.Value)
  327. mu.Commit()
  328. }
  329. }
  330. func isVideoShare(shareType int) bool {
  331. _, ok := _shareVideoType[shareType]
  332. return ok
  333. }
  334. func (s *Service) shareMidproc() {
  335. for {
  336. mu := <-s.shareMidDatabus.Messages()
  337. sh := new(share.MIDShare)
  338. err := json.Unmarshal(mu.Value, sh)
  339. if err != nil {
  340. log.Error("s.shareMidproc() json.Unmarshal error(%v)", err)
  341. mu.Commit()
  342. continue
  343. }
  344. if !isVideoShare(sh.TP) {
  345. log.Warn("Not a video share, skip to add exp: %+v", sh)
  346. mu.Commit()
  347. continue
  348. }
  349. try := 0
  350. success := false
  351. ex := &model.AddExp{
  352. Event: "share",
  353. Mid: sh.MID,
  354. IP: ip.InternalIP(),
  355. Ts: sh.Time,
  356. }
  357. for {
  358. if err = s.addExp(context.TODO(), ex); err == nil {
  359. success = true
  360. break
  361. }
  362. try++
  363. if try > 3 {
  364. log.Error("Failed to add share exp, try 3 times mid: %d error: %+v", ex.Mid, err)
  365. mu.Commit()
  366. break
  367. }
  368. time.Sleep(time.Millisecond * 500)
  369. }
  370. if !success {
  371. continue
  372. }
  373. log.Info("shareMidproc consumer key:%s,value: %s", mu.Key, mu.Value)
  374. mu.Commit()
  375. }
  376. }
  377. func (s *Service) realnameSubproc() {
  378. defer func() {
  379. if x := recover(); x != nil {
  380. log.Error("realnameSubproc panic(%+v) :\n %s", x, debug.Stack())
  381. go s.realnameSubproc()
  382. }
  383. }()
  384. log.Info("realnameSubproc run")
  385. var (
  386. c = context.TODO()
  387. err error
  388. )
  389. for res := range s.realnameDatabus.Messages() {
  390. msg := &model.Message{}
  391. if err = json.Unmarshal(res.Value, msg); err != nil {
  392. log.Error("member-job,json.Unmarshal (%v) error(%v)", string(res.Value), errors.WithStack(err))
  393. continue
  394. }
  395. switch msg.Table {
  396. case "dede_identification_card_apply":
  397. if msg.Action == "delete" {
  398. log.Error("dede_identification_card_apply got delete msg (%s)", msg.New)
  399. continue
  400. }
  401. ms := &model.RealnameApplyMessage{}
  402. if err = json.Unmarshal(msg.New, ms); err != nil {
  403. err = errors.Wrapf(err, "dede_identification_card_apply , %s", msg.New)
  404. log.Error("%+v", err)
  405. continue
  406. }
  407. log.Info("upsert realname apply : (%+v)", ms)
  408. if err = s.dao.UpdateRealnameFromMSG(c, ms); err != nil {
  409. log.Error("%+v", err)
  410. continue
  411. }
  412. if err = s.dao.DeleteRealnameCache(c, ms.MID); err != nil {
  413. log.Error("Delete RealnameApplyStatus cache err : %+v", err)
  414. continue
  415. }
  416. if err = s.dao.NotifyPurgeCache(c, ms.MID, "updateRealname"); err != nil {
  417. log.Error("s.dao.NotifyPurgeCache(%d, %s) error(%+v)", ms.MID, "updateRealname", err)
  418. continue
  419. }
  420. log.Info("Notify to purge realname cache with mid(%d) action(%s) message(old: %s, new: %s)", ms.MID, "updateRealname", string(msg.Old), string(msg.New))
  421. case "dede_identification_card_apply_img":
  422. if msg.Action == "delete" {
  423. log.Error("dede_identification_card_apply_img got delete msg (%s)", msg.New)
  424. continue
  425. }
  426. ms := &model.RealnameApplyImgMessage{}
  427. if err = json.Unmarshal(msg.New, ms); err != nil {
  428. err = errors.Wrapf(err, "dede_identification_card_apply_img , %s", msg.New)
  429. log.Error("%+v", err)
  430. continue
  431. }
  432. log.Info("upsert realname apply img : (%+v)", ms)
  433. if err = s.dao.UpsertRealnameApplyImg(c, ms); err != nil {
  434. log.Error("%+v", err)
  435. continue
  436. }
  437. }
  438. if err = res.Commit(); err != nil {
  439. err = errors.Wrapf(err, "realnameSubproc commit")
  440. log.Error("%+v", err)
  441. }
  442. log.Info("Realname subproc key:%v,topic: %v, part:%v offset:%v,message %s,", res.Key, res.Topic, res.Partition, res.Offset, res.Value)
  443. }
  444. }
  445. func (s *Service) syncParsedRealnameInfo(ctx context.Context, mid int64) {
  446. info, err := s.dao.RealnameInfo(ctx, mid)
  447. if err != nil {
  448. log.Error("Failed to fetch realname info with mid: %d: %+v", mid, err)
  449. return
  450. }
  451. if info.Country != model.RealnameCountryChina ||
  452. info.CardType != model.RealnameCardTypeIdentity {
  453. log.Info("Skip to sync parsed realname info with mid: %d", mid)
  454. return
  455. }
  456. card, err := info.DecryptedCard()
  457. if err != nil {
  458. log.Error("Failed to decrypt realname card with mid: %d: %+v", mid, err)
  459. return
  460. }
  461. birth, gender, err := ParseIdentity(card)
  462. if err != nil {
  463. log.Error("Failed to parse idenitfy with mid: %d: %+v", mid, err)
  464. return
  465. }
  466. // sync to hive
  467. log.Infov(ctx,
  468. log.KV("action", "Syning realname parsed info"),
  469. log.KV("mid", info.MID),
  470. log.KV("birthday", birth.Format("2006-01-02")),
  471. log.KV("status", info.Status),
  472. log.KV("gender", gender),
  473. log.KV("mtime", info.MTime.Format("2006-01-02 15:04:05")),
  474. )
  475. s.ParsedRealnameInfoc.Infov(ctx,
  476. birth.Format("2006-01-02"),
  477. info.MTime.Format("2006-01-02 15:04:05"),
  478. int64(info.Status),
  479. int64(info.MID),
  480. gender,
  481. )
  482. }