service.go 7.9 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "go-common/app/job/main/point/conf"
  9. "go-common/app/job/main/point/dao"
  10. "go-common/app/job/main/point/model"
  11. rpcmdl "go-common/app/service/main/point/model"
  12. "go-common/app/service/main/point/rpc/client"
  13. xsql "go-common/library/database/sql"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. xtime "go-common/library/time"
  17. )
  18. const (
  19. _vipPointChange = "vip_point_change_history"
  20. _pointChange = "point_change_history"
  21. _insert = "insert"
  22. )
  23. // Service struct
  24. type Service struct {
  25. c *conf.Config
  26. dao *dao.Dao
  27. oldVipPointDatabus *databus.Databus
  28. pointDatabus *databus.Databus
  29. pointUpdate *databus.Databus
  30. waiter sync.WaitGroup
  31. closed bool
  32. pointRPC *client.Service
  33. }
  34. // New init
  35. func New(c *conf.Config) (s *Service) {
  36. s = &Service{
  37. c: c,
  38. dao: dao.New(c),
  39. pointRPC: client.New(c.PointRPC),
  40. }
  41. if c.DataBus.OldVipBinlog != nil {
  42. s.oldVipPointDatabus = databus.New(c.DataBus.OldVipBinlog)
  43. s.waiter.Add(1)
  44. go s.syncoldpointdataproc()
  45. }
  46. if c.DataBus.PointBinlog != nil {
  47. s.pointDatabus = databus.New(c.DataBus.PointBinlog)
  48. s.waiter.Add(1)
  49. go s.pointbinlogproc()
  50. }
  51. if c.DataBus.PointUpdate != nil {
  52. s.pointUpdate = databus.New(c.DataBus.PointUpdate)
  53. s.waiter.Add(1)
  54. go s.pointupdateproc()
  55. }
  56. return s
  57. }
  58. // Ping Service
  59. func (s *Service) Ping(c context.Context) (err error) {
  60. return s.dao.Ping(c)
  61. }
  62. // Close Service
  63. func (s *Service) Close() (err error) {
  64. defer s.waiter.Wait()
  65. s.closed = true
  66. s.dao.Close()
  67. if err = s.oldVipPointDatabus.Close(); err != nil {
  68. log.Error("s.oldVipPointDatabus.Close() error(%v)", err)
  69. return
  70. }
  71. if err = s.pointDatabus.Close(); err != nil {
  72. log.Error("s.pointDatabus.Close() error(%v)", err)
  73. return
  74. }
  75. if err = s.pointUpdate.Close(); err != nil {
  76. log.Error("s.pointUpdate.Close() error(%v)", err)
  77. return
  78. }
  79. return
  80. }
  81. func (s *Service) syncoldpointdataproc() {
  82. defer s.waiter.Done()
  83. var (
  84. err error
  85. msg *databus.Message
  86. msgChan = s.oldVipPointDatabus.Messages()
  87. ok bool
  88. c = context.Background()
  89. )
  90. for {
  91. msg, ok = <-msgChan
  92. if s.closed || !ok {
  93. log.Info("syncoldpointdataproc msgChan closed")
  94. return
  95. }
  96. if err = msg.Commit(); err != nil {
  97. log.Error("msg.Commit err(%v)", err)
  98. }
  99. message := &model.MsgCanal{}
  100. if err = json.Unmarshal([]byte(msg.Value), message); err != nil {
  101. log.Error("json.Unmarshal(%v) err(%v)", message, err)
  102. continue
  103. }
  104. if message.Table == _vipPointChange {
  105. history := new(model.VipPointChangeHistoryMsg)
  106. if err = json.Unmarshal(message.New, history); err != nil {
  107. log.Error("json.Unmarshal val(%v) error(%v)", string(message.New), err)
  108. continue
  109. }
  110. if message.Action == _insert {
  111. var count int64
  112. if count, err = s.dao.HistoryCount(c, history.Mid, history.OrderID); err != nil {
  113. log.Error("update point(%v) history(%v)", err, history)
  114. continue
  115. }
  116. if count > 0 {
  117. log.Error("update point change history had repeat record(%v)", history)
  118. continue
  119. }
  120. changeTime, err := time.ParseInLocation("2006-01-02 15:04:05", history.ChangeTime, time.Local)
  121. if err != nil {
  122. log.Error("update point time.ParseInLocation(%s) error(%v)", history.ChangeTime, err)
  123. continue
  124. }
  125. ph := &model.PointHistory{
  126. Mid: history.Mid,
  127. Point: history.Point,
  128. OrderID: history.OrderID,
  129. ChangeType: int(history.ChangeType),
  130. ChangeTime: xtime.Time(changeTime.Unix()),
  131. RelationID: history.RelationID,
  132. PointBalance: history.PointBalance,
  133. Remark: history.Remark,
  134. Operator: history.Operator,
  135. }
  136. s.updatePointWithHistory(c, ph)
  137. }
  138. }
  139. }
  140. }
  141. func (s *Service) pointbinlogproc() {
  142. defer s.waiter.Done()
  143. var (
  144. err error
  145. msg *databus.Message
  146. msgChan = s.pointDatabus.Messages()
  147. ok bool
  148. c = context.Background()
  149. )
  150. for {
  151. msg, ok = <-msgChan
  152. if s.closed || !ok {
  153. log.Info("pointbinlogproc msgChan closed")
  154. return
  155. }
  156. if err = msg.Commit(); err != nil {
  157. log.Error("msg.Commit err(%v)", err)
  158. }
  159. v := &model.MsgCanal{}
  160. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  161. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  162. continue
  163. }
  164. if v.Table != _pointChange || v.Action != _insert {
  165. continue
  166. }
  167. h := new(model.VipPointChangeHistoryMsg)
  168. if err = json.Unmarshal(v.New, h); err != nil {
  169. log.Error("json.Unmarshal val(%v) error(%v)", string(v.New), err)
  170. continue
  171. }
  172. log.Info("point change log %+v", h)
  173. s.Notify(c, h)
  174. }
  175. }
  176. func (s *Service) updatePointWithHistory(c context.Context, ph *model.PointHistory) (pointBalance int64, activePoint int64, err error) {
  177. var (
  178. tx *xsql.Tx
  179. )
  180. if tx, err = s.dao.BeginTran(c); err != nil {
  181. log.Error("update point mid(%d) %+v record(%v)", ph.Mid, err, ph)
  182. return
  183. }
  184. defer func() {
  185. if err == nil {
  186. if err = tx.Commit(); err != nil {
  187. log.Error("update point tx.Commit %+v record(%v)", err, ph)
  188. tx.Rollback()
  189. }
  190. } else {
  191. tx.Rollback()
  192. }
  193. }()
  194. if pointBalance, err = s.updatePoint(c, tx, ph.Mid, ph.Point, ph); err != nil {
  195. log.Error("update point mid(%d) %+v record(%v)", ph.Mid, err, ph)
  196. return
  197. }
  198. if _, err = s.dao.InsertPointHistory(c, tx, ph); err != nil {
  199. log.Error("update point mid(%d) %+v record(%v)", ph.Mid, err, ph)
  200. return
  201. }
  202. return
  203. }
  204. func (s *Service) updatePoint(c context.Context, tx *xsql.Tx, mid, point int64, ph *model.PointHistory) (pb int64, err error) {
  205. var (
  206. pi *model.PointInfo
  207. ver int64
  208. a int64
  209. )
  210. if pi, err = s.dao.TxPointInfo(c, tx, mid); err != nil {
  211. log.Error("%+v", err)
  212. return
  213. }
  214. if point == 0 {
  215. return
  216. }
  217. if pi == nil {
  218. if point < 0 {
  219. point = 0
  220. log.Error("update point<0 mid(%d) (%v)", mid, ph)
  221. }
  222. pb = point
  223. pi = new(model.PointInfo)
  224. pi.Ver = 1
  225. pi.PointBalance = point
  226. pi.Mid = mid
  227. if a, err = s.dao.InsertPoint(c, tx, pi); err != nil {
  228. log.Error("%v", err)
  229. return
  230. }
  231. if a != 1 {
  232. err = fmt.Errorf("operation failed")
  233. return
  234. }
  235. } else {
  236. pb = pi.PointBalance + point
  237. if pb < 0 {
  238. pb = 0
  239. log.Error("update point<0 mid(%d)(%v)", mid, ph)
  240. }
  241. pi.PointBalance = pb
  242. ver = pi.Ver
  243. pi.Ver++
  244. if a, err = s.dao.UpdatePointInfo(c, tx, pi, ver); err != nil {
  245. log.Error("%v", err)
  246. return
  247. }
  248. if a != 1 {
  249. err = fmt.Errorf("operation failed")
  250. return
  251. }
  252. }
  253. return
  254. }
  255. func (s *Service) fixdata(mtimeStr string) (err error) {
  256. var (
  257. pis []*model.PointInfo
  258. c = context.TODO()
  259. mtime time.Time
  260. )
  261. log.Info("fixdata start ")
  262. if mtime, err = time.ParseInLocation("2006-01-02 15:04:05", mtimeStr, time.Local); err != nil {
  263. log.Error("fixdata ParseInLocation error(%v)", err)
  264. return
  265. }
  266. if pis, err = s.dao.MidsByMtime(c, mtime); err != nil {
  267. log.Error("fixdata err %+v ", err)
  268. return
  269. }
  270. log.Info("fixdata count %d ", len(pis))
  271. for _, pi := range pis {
  272. log.Info("fixdata ing %v ", pi)
  273. var point int64
  274. if point, err = s.dao.LastOneHistory(c, pi.Mid); err != nil {
  275. log.Error("fixdata history err %+v ", err)
  276. return
  277. }
  278. if point == pi.PointBalance {
  279. continue
  280. }
  281. log.Info("fixdata mid(%d) %+v ", pi.Mid, err)
  282. var af int64
  283. if af, err = s.dao.FixPointInfo(c, pi.Mid, point); err != nil {
  284. log.Error("fixdata FixPointInfo mid(%d) %+v ", pi.Mid, err)
  285. return
  286. }
  287. if af != 1 {
  288. log.Error("fixdata af!=1 mid(%d)", pi.Mid)
  289. return
  290. }
  291. }
  292. log.Info("fixdata end ")
  293. return
  294. }
  295. func (s *Service) pointupdateproc() {
  296. defer s.waiter.Done()
  297. var (
  298. err error
  299. msg *databus.Message
  300. msgChan = s.pointUpdate.Messages()
  301. ok bool
  302. c = context.Background()
  303. )
  304. for {
  305. msg, ok = <-msgChan
  306. if !ok {
  307. log.Info("pointupdateproc msgChan closed")
  308. return
  309. }
  310. if err = msg.Commit(); err != nil {
  311. log.Error("msg.Commit err(%v)", err)
  312. }
  313. v := &rpcmdl.ArgPointAdd{}
  314. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  315. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  316. continue
  317. }
  318. s.pointRPC.PointAddByBp(c, v)
  319. }
  320. }