service.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io/ioutil"
  6. "os"
  7. "runtime/debug"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/figure/conf"
  11. "go-common/app/job/main/figure/dao"
  12. "go-common/app/job/main/figure/model"
  13. coinm "go-common/app/service/main/coin/model"
  14. spym "go-common/app/service/main/spy/model"
  15. "go-common/library/log"
  16. "go-common/library/queue/databus"
  17. )
  18. const (
  19. _insertAction = "insert"
  20. _vipTable = "vip_user_info"
  21. _payTable = "pay_pay_order"
  22. _blockedTable = "blocked_kpi"
  23. // pay success status.
  24. _paySuccess = 2
  25. )
  26. // Service biz service def.
  27. type Service struct {
  28. c *conf.Config
  29. figureDao *dao.Dao
  30. accExpDatabus *databus.Databus
  31. accRegDatabus *databus.Databus
  32. vipDatabus *databus.Databus
  33. spyDatabus *databus.Databus
  34. coinDatabus *databus.Databus
  35. replyInfoDatabus *databus.Databus
  36. payDatabus *databus.Databus
  37. danmakuDatabus *databus.Databus
  38. blockedDatabus *databus.Databus
  39. }
  40. // New new a Service and return.
  41. func New(c *conf.Config) (s *Service) {
  42. s = &Service{
  43. c: c,
  44. figureDao: dao.New(c),
  45. }
  46. if c.DataSource.AccountExp != nil {
  47. s.accExpDatabus = databus.New(c.DataSource.AccountExp)
  48. go s.accexpproc()
  49. }
  50. if c.DataSource.AccountReg != nil {
  51. s.accRegDatabus = databus.New(c.DataSource.AccountReg)
  52. go s.accregproc()
  53. }
  54. if c.DataSource.Vip != nil {
  55. s.vipDatabus = databus.New(c.DataSource.Vip)
  56. go s.vipproc()
  57. }
  58. if c.DataSource.Spy != nil {
  59. s.spyDatabus = databus.New(c.DataSource.Spy)
  60. go s.spyproc()
  61. }
  62. if c.DataSource.Coin != nil {
  63. s.coinDatabus = databus.New(c.DataSource.Coin)
  64. go s.coinproc()
  65. }
  66. if c.DataSource.ReplyInfo != nil {
  67. s.replyInfoDatabus = databus.New(c.DataSource.ReplyInfo)
  68. go s.replyinfoproc()
  69. }
  70. if c.DataSource.Pay != nil {
  71. s.payDatabus = databus.New(c.DataSource.Pay)
  72. go s.payproc()
  73. }
  74. if c.DataSource.Danmaku != nil {
  75. s.danmakuDatabus = databus.New(c.DataSource.Danmaku)
  76. go s.danmakuproc()
  77. }
  78. if c.DataSource.Blocked != nil {
  79. s.blockedDatabus = databus.New(c.DataSource.Blocked)
  80. go s.blockedproc()
  81. }
  82. go s.syncproc()
  83. return s
  84. }
  85. // Ping check dao health.
  86. func (s *Service) Ping(c context.Context) (err error) {
  87. s.figureDao.Ping(c)
  88. return
  89. }
  90. // Close close all dao.
  91. func (s *Service) Close() {
  92. s.figureDao.Close()
  93. }
  94. func (s *Service) accexpproc() {
  95. defer func() {
  96. if x := recover(); x != nil {
  97. log.Error("s.accexpproc panic(%v) %s", x, debug.Stack())
  98. go s.accexpproc()
  99. log.Info("s.accexpproc recover")
  100. }
  101. }()
  102. var (
  103. err error
  104. msg *databus.Message
  105. msgChan = s.accExpDatabus.Messages()
  106. ok bool
  107. exp *model.MsgAccountLog
  108. )
  109. for {
  110. select {
  111. case msg, ok = <-msgChan:
  112. if !ok {
  113. log.Info("accproc msgChan closed")
  114. }
  115. }
  116. if err = msg.Commit(); err != nil {
  117. log.Error("msg.Commit err(%v)", err)
  118. }
  119. exp = &model.MsgAccountLog{}
  120. if err = json.Unmarshal([]byte(msg.Value), exp); err != nil {
  121. log.Error("json.Unmarshal(%v) err(%v)", msg, err)
  122. continue
  123. }
  124. log.Info("exp Info (%+v)", exp)
  125. if err = s.AccountExp(context.Background(), exp.Mid, int64(exp.ExpTo())); err != nil {
  126. log.Error("s.AccountExp(%v) err(%v)", exp, err)
  127. continue
  128. }
  129. s.figureDao.SetWaiteUserCache(context.Background(), exp.Mid, s.figureDao.Version(time.Now()))
  130. if exp.IsViewExp() {
  131. if err = s.AccountViewVideo(context.Background(), exp.Mid); err != nil {
  132. log.Error("s.AccountExp(%v) err(%v)", exp, err)
  133. continue
  134. }
  135. }
  136. }
  137. }
  138. func (s *Service) accregproc() {
  139. defer func() {
  140. if x := recover(); x != nil {
  141. log.Error("s.accregproc panic(%v) %s", x, debug.Stack())
  142. go s.accregproc()
  143. log.Info("s.accregproc recover")
  144. }
  145. }()
  146. var (
  147. err error
  148. msg *databus.Message
  149. msgChan = s.accRegDatabus.Messages()
  150. ok bool
  151. )
  152. for {
  153. select {
  154. case msg, ok = <-msgChan:
  155. if !ok {
  156. log.Info("accproc msgChan closed")
  157. }
  158. }
  159. if err = msg.Commit(); err != nil {
  160. log.Error("msg.Commit err(%v)", err)
  161. }
  162. reg := &model.MsgCanal{}
  163. if err = json.Unmarshal([]byte(msg.Value), reg); err != nil {
  164. log.Error("json.Unmarshal(%v) err(%v)", msg, err)
  165. continue
  166. }
  167. log.Info("reg log %+v", reg)
  168. if reg.Action == _insertAction {
  169. var info struct {
  170. Mid int64 `json:"mid"`
  171. }
  172. if err = json.Unmarshal(reg.New, &info); err != nil {
  173. log.Error("json.Unmarshal(%s) error(%v)", string(reg.New), err)
  174. return
  175. }
  176. if err = s.AccountReg(context.Background(), info.Mid); err != nil {
  177. log.Error("s.AccountReg(%v) err(%v)", reg, err)
  178. continue
  179. }
  180. s.figureDao.SetWaiteUserCache(context.Background(), info.Mid, s.figureDao.Version(time.Now()))
  181. }
  182. }
  183. }
  184. func (s *Service) vipproc() {
  185. defer func() {
  186. if x := recover(); x != nil {
  187. log.Error("s.vipproc panic(%v) %s", x, debug.Stack())
  188. go s.vipproc()
  189. log.Info("s.vipproc recover")
  190. }
  191. }()
  192. var (
  193. err error
  194. msg *databus.Message
  195. msgChan = s.vipDatabus.Messages()
  196. ok bool
  197. )
  198. for {
  199. select {
  200. case msg, ok = <-msgChan:
  201. if !ok {
  202. log.Info("vipproc msgChan closed")
  203. }
  204. }
  205. if err = msg.Commit(); err != nil {
  206. log.Error("msg.Commit err(%v)", err)
  207. }
  208. v := &model.MsgCanal{}
  209. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  210. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  211. continue
  212. }
  213. log.Info("vip log %+v", v)
  214. if v.Table == _vipTable {
  215. var vipInfo struct {
  216. Mid int64 `json:"mid"`
  217. VipStatus int32 `json:"vip_status"`
  218. }
  219. if err = json.Unmarshal(v.New, &vipInfo); err != nil {
  220. log.Error("json.Unmarshal(%v) err(%v)", v.New, err)
  221. continue
  222. }
  223. if err = s.UpdateVipStatus(context.Background(), vipInfo.Mid, vipInfo.VipStatus); err != nil {
  224. log.Error("s.UpdateVipStatus(%v) err(%v)", v, err)
  225. continue
  226. }
  227. s.figureDao.SetWaiteUserCache(context.Background(), vipInfo.Mid, s.figureDao.Version(time.Now()))
  228. }
  229. }
  230. }
  231. func (s *Service) spyproc() {
  232. defer func() {
  233. if x := recover(); x != nil {
  234. log.Error("s.spyproc panic(%v) %s", x, debug.Stack())
  235. go s.spyproc()
  236. log.Info("s.spyproc recover")
  237. }
  238. }()
  239. var (
  240. err error
  241. msg *databus.Message
  242. msgChan = s.spyDatabus.Messages()
  243. ok bool
  244. )
  245. for {
  246. select {
  247. case msg, ok = <-msgChan:
  248. if !ok {
  249. log.Info("spyproc msgChan closed")
  250. }
  251. }
  252. if err = msg.Commit(); err != nil {
  253. log.Error("msg.Commit err(%v)", err)
  254. }
  255. sc := &spym.ScoreChange{}
  256. if err = json.Unmarshal([]byte(msg.Value), sc); err != nil {
  257. log.Error("json.Unmarshal(%v) err(%v)", sc, err)
  258. continue
  259. }
  260. log.Info("spy log %+v", sc)
  261. if err = s.PutSpyScore(context.Background(), sc); err != nil {
  262. log.Error("s.PutSpyScore(%v) err(%v)", sc, err)
  263. continue
  264. }
  265. s.figureDao.SetWaiteUserCache(context.Background(), sc.Mid, s.figureDao.Version(time.Now()))
  266. }
  267. }
  268. func (s *Service) coinproc() {
  269. defer func() {
  270. if x := recover(); x != nil {
  271. log.Error("s.coinproc panic(%v) %s", x, debug.Stack())
  272. go s.coinproc()
  273. log.Info("s.coinproc recover")
  274. }
  275. }()
  276. var (
  277. err error
  278. msg *databus.Message
  279. msgChan = s.coinDatabus.Messages()
  280. ok bool
  281. )
  282. for {
  283. select {
  284. case msg, ok = <-msgChan:
  285. if !ok {
  286. log.Info("coinproc msgChan closed")
  287. }
  288. }
  289. if err = msg.Commit(); err != nil {
  290. log.Error("msg.Commit err(%v)", err)
  291. }
  292. cd := &coinm.DataBus{}
  293. if err = json.Unmarshal([]byte(msg.Value), cd); err != nil {
  294. log.Error("json.Unmarshal(%v) err(%v)", cd, err)
  295. continue
  296. }
  297. log.Info("coin log %+v", cd)
  298. if err = s.PutCoinInfo(context.Background(), cd); err != nil {
  299. log.Error("s.PutCoinInfo(%v) err(%v)", cd, err)
  300. continue
  301. }
  302. s.figureDao.SetWaiteUserCache(context.Background(), cd.Mid, s.figureDao.Version(time.Now()))
  303. }
  304. }
  305. func (s *Service) replyinfoproc() {
  306. defer func() {
  307. if x := recover(); x != nil {
  308. log.Error("s.replyinfoproc panic(%v) %s", x, debug.Stack())
  309. go s.replyinfoproc()
  310. log.Info("s.replyinfoproc recover")
  311. }
  312. }()
  313. var (
  314. err error
  315. msg *databus.Message
  316. msgChan = s.replyInfoDatabus.Messages()
  317. ok bool
  318. )
  319. for {
  320. select {
  321. case msg, ok = <-msgChan:
  322. if !ok {
  323. log.Info("replyinfoproc msgChan closed")
  324. }
  325. }
  326. if err = msg.Commit(); err != nil {
  327. log.Error("msg.Commit err(%v)", err)
  328. }
  329. m := &model.ReplyEvent{}
  330. if err = json.Unmarshal([]byte(msg.Value), m); err != nil {
  331. log.Error("json.Unmarshal(%v) err(%v)", m, err)
  332. continue
  333. }
  334. log.Info("reply log %+v", m)
  335. if err = s.PutReplyInfo(context.Background(), m); err != nil {
  336. log.Error("s.PutCoinInfo(%v) err(%v)", m, err)
  337. continue
  338. }
  339. if m.Action == model.EventAdd {
  340. s.figureDao.SetWaiteUserCache(context.Background(), m.Mid, s.figureDao.Version(time.Now()))
  341. } else {
  342. s.figureDao.SetWaiteUserCache(context.Background(), m.Reply.Mid, s.figureDao.Version(time.Now()))
  343. }
  344. }
  345. }
  346. func (s *Service) syncproc() {
  347. defer func() {
  348. if x := recover(); x != nil {
  349. log.Error("s.syncproc panic(%v) %s", x, debug.Stack())
  350. go s.syncproc()
  351. log.Info("s.syncproc recover")
  352. }
  353. }()
  354. if s.c.Figure.Sync {
  355. log.Info("start import data after half hour.")
  356. time.Sleep(5 * time.Minute)
  357. log.Info("start import data.")
  358. var (
  359. vipPath = s.c.Figure.VipPath
  360. files []os.FileInfo
  361. err error
  362. )
  363. if files, err = ioutil.ReadDir(vipPath); err != nil {
  364. log.Error("ioutile.ReadDir(%s) err [%s]", vipPath, err)
  365. }
  366. for _, f := range files {
  367. s.SyncUserVIP(context.TODO(), vipPath+"/"+f.Name())
  368. }
  369. log.Info("end import data.")
  370. }
  371. }
  372. func (s *Service) payproc() {
  373. defer func() {
  374. if x := recover(); x != nil {
  375. log.Error("s.payproc panic(%v) %s", x, debug.Stack())
  376. go s.payproc()
  377. log.Info("s.payproc recover")
  378. }
  379. }()
  380. var (
  381. err error
  382. msg *databus.Message
  383. msgChan = s.payDatabus.Messages()
  384. ok bool
  385. )
  386. for {
  387. select {
  388. case msg, ok = <-msgChan:
  389. if !ok {
  390. log.Info("payproc msgChan closed")
  391. }
  392. }
  393. if err = msg.Commit(); err != nil {
  394. log.Error("msg.Commit err(%v)", err)
  395. }
  396. v := &model.MsgCanal{}
  397. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  398. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  399. continue
  400. }
  401. log.Info("pay log %+v", v)
  402. if strings.HasPrefix(v.Table, _payTable) {
  403. var payOrder struct {
  404. Mid int64 `json:"pay_mid"`
  405. Money float64 `json:"bp"`
  406. MerchantID int8 `json:"merchant_id"`
  407. Status int32 `json:"status"`
  408. }
  409. if err = json.Unmarshal(v.New, &payOrder); err != nil {
  410. log.Error("json.Unmarshal(%v) err(%v)", v.New, err)
  411. continue
  412. }
  413. if payOrder.Status != _paySuccess {
  414. continue
  415. }
  416. // update YUAN to Fen
  417. money := int64(payOrder.Money * 100)
  418. if err = s.PayOrderInfo(context.Background(), payOrder.Mid, money, payOrder.MerchantID); err != nil {
  419. log.Error("s.PayOrderInfo(%v) err(%v)", payOrder, err)
  420. continue
  421. }
  422. s.figureDao.SetWaiteUserCache(context.Background(), payOrder.Mid, s.figureDao.Version(time.Now()))
  423. }
  424. }
  425. }
  426. // 风纪委相关
  427. func (s *Service) blockedproc() {
  428. defer func() {
  429. if x := recover(); x != nil {
  430. log.Error("s.blockedproc panic(%v) %s", x, debug.Stack())
  431. go s.blockedproc()
  432. log.Info("s.blockedproc recover")
  433. }
  434. }()
  435. var (
  436. err error
  437. msg *databus.Message
  438. msgChan = s.blockedDatabus.Messages()
  439. ok bool
  440. )
  441. for {
  442. select {
  443. case msg, ok = <-msgChan:
  444. if !ok {
  445. log.Info("blockedproc msgChan closed")
  446. }
  447. }
  448. if err = msg.Commit(); err != nil {
  449. log.Error("msg.Commit err(%v)", err)
  450. }
  451. v := &model.MsgCanal{}
  452. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  453. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  454. continue
  455. }
  456. log.Info("blocked log %+v", v)
  457. if v.Table == _blockedTable {
  458. var KPIInfo struct {
  459. Mid int64 `json:"mid"`
  460. Rate int16 `json:"rate"`
  461. }
  462. if err = json.Unmarshal(v.New, &KPIInfo); err != nil {
  463. log.Error("json.Unmarshal(%v) err(%v)", v.New, err)
  464. continue
  465. }
  466. if err = s.BlockedKPIInfo(context.Background(), KPIInfo.Mid, KPIInfo.Rate); err != nil {
  467. log.Error("s.BlockedKPIInfo(%v) err(%v)", KPIInfo, err)
  468. continue
  469. }
  470. s.figureDao.SetWaiteUserCache(context.Background(), KPIInfo.Mid, s.figureDao.Version(time.Now()))
  471. }
  472. }
  473. }
  474. func (s *Service) danmakuproc() {
  475. defer func() {
  476. if x := recover(); x != nil {
  477. log.Error("s.danmakuproc panic(%v) %s", x, debug.Stack())
  478. go s.danmakuproc()
  479. log.Info("s.danmakuproc recover")
  480. }
  481. }()
  482. var (
  483. err error
  484. msg *databus.Message
  485. msgChan = s.danmakuDatabus.Messages()
  486. ok bool
  487. )
  488. for {
  489. select {
  490. case msg, ok = <-msgChan:
  491. if !ok {
  492. log.Info("danmakuproc msgChan closed")
  493. }
  494. }
  495. if err = msg.Commit(); err != nil {
  496. log.Error("msg.Commit err(%+v)", err)
  497. }
  498. m := &model.DMAction{}
  499. if err = json.Unmarshal([]byte(msg.Value), m); err != nil {
  500. log.Error("json.Unmarshal(%v) err(%+v)", m, err)
  501. continue
  502. }
  503. log.Info("danmaku msg %+v", m)
  504. if m.Action == _reportDel {
  505. if err = s.DanmakuReport(context.Background(), m); err != nil {
  506. log.Error("s.DanmakuReport(%v) err(%+v)", m, err)
  507. continue
  508. }
  509. s.figureDao.SetWaiteUserCache(context.Background(), m.Data.OwnerUID, s.figureDao.Version(time.Now()))
  510. s.figureDao.SetWaiteUserCache(context.Background(), m.Data.ReportUID, s.figureDao.Version(time.Now()))
  511. }
  512. }
  513. }