ticket.go 12 KB


  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "go-common/app/service/openplatform/ticket-sales/model"
  8. "go-common/app/service/openplatform/ticket-sales/model/consts"
  9. "go-common/library/cache/redis"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. //票号相关常量
  14. const (
  15. sqlCountRefundTicket = "SELECT COUNT(*) FROM ticket WHERE oid IN (%s) AND refund_status=?"
  16. _selectTicketByOrderIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE oid=?"
  17. _selectTicketByScreenIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE sid=? AND uid=? AND type != ?"
  18. _selectTicketByIDSQL = "SELECT id, uid, oid, sid, price, src, type, status, qr, ref_id, sku_id, seat_id, seat, etime, refund_apply_time, ctime, mtime FROM ticket WHERE id IN (%s)"
  19. _updateTicketStatusSQL = "UPDATE ticket set status=? WHERE id IN (%s)"
  20. _selectTicketSendBySendTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE send_tid IN (%s)"
  21. _selectTicketSendByRecvTID = "SELECT id, sid, send_tid, recv_tid, send_uid, recv_uid, recv_tel, status, ctime, mtime, oid FROM ticket_send WHERE recv_tid IN (%s)"
  22. )
  23. //rawRefundTicketCnt 统计用户已退票数
  24. func (d *Dao) rawRefundTicketCnt(ctx context.Context, oids []int64) (cnt int64, err error) {
  25. lo := len(oids)
  26. if lo == 0 {
  27. return
  28. }
  29. q := fmt.Sprintf(sqlCountRefundTicket, strings.Repeat(",?", lo)[1:])
  30. a := make([]interface{}, lo+1)
  31. a[lo] = consts.TkStatusRefunded
  32. for k, v := range oids {
  33. a[k] = v
  34. }
  35. err = d.db.QueryRow(ctx, q, a...).Scan(&cnt)
  36. return
  37. }
  38. // CacheTicketsByOrderID 通过 order_id 获取 tickets 取缓存
  39. func (d *Dao) CacheTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
  40. conn := d.redis.Get(c)
  41. defer conn.Close()
  42. reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyOrderTickets, orderID)))
  43. if err != nil {
  44. if err == redis.ErrNil {
  45. err = nil
  46. return
  47. }
  48. log.Error("d.CacheTicketsByOrderID(%d) error(%v)", orderID, err)
  49. return
  50. }
  51. if err = json.Unmarshal(reply, &res); err != nil {
  52. log.Error("d.CacheTicketsByOrderID(%d) json.Unmarshal() error(%v)", orderID, err)
  53. return
  54. }
  55. return
  56. }
  57. // RawTicketsByOrderID 通过 order_id 获取 tickets
  58. func (d *Dao) RawTicketsByOrderID(c context.Context, orderID int64) (res []*model.Ticket, err error) {
  59. rows, err := d.db.Query(c, _selectTicketByOrderIDSQL, orderID)
  60. if err != nil {
  61. log.Error("d.TicketsByOrderID(%v) d.db.Query() error(%v)", orderID, err)
  62. return
  63. }
  64. defer rows.Close()
  65. for rows.Next() {
  66. ticket := &model.Ticket{}
  67. if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
  68. &ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
  69. &ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
  70. log.Error("d.TicketsByOrderID(%v) rows.Scan() error(%v)", orderID, err)
  71. return
  72. }
  73. res = append(res, ticket)
  74. }
  75. return
  76. }
  77. // AddCacheTicketsByOrderID 通过 order_id 获取 tickets 写缓存
  78. func (d *Dao) AddCacheTicketsByOrderID(c context.Context, orderID int64, tickets []*model.Ticket) (err error) {
  79. conn := d.redis.Get(c)
  80. defer conn.Close()
  81. val, err := json.Marshal(tickets)
  82. if err != nil {
  83. log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
  84. return
  85. }
  86. if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyOrderTickets, orderID), model.RedisExpireOneDayTmp, val); err != nil {
  87. log.Error("d.AddCacheTicketsByOrderID(%d, %+v) error(%v)", orderID, tickets, err)
  88. return
  89. }
  90. return
  91. }
  92. // CacheTicketsByScreen 通过 screen_id user_id 获取 tickets
  93. func (d *Dao) CacheTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
  94. conn := d.redis.Get(c)
  95. defer conn.Close()
  96. reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID)))
  97. if err != nil {
  98. if err == redis.ErrNil {
  99. err = nil
  100. return
  101. }
  102. log.Error("d.CacheTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
  103. return
  104. }
  105. if err = json.Unmarshal(reply, &res); err != nil {
  106. log.Error("d.CacheTicketsByScreen(%d, %d) json.Unmarshal() error(%v)", screenID, UID, err)
  107. return
  108. }
  109. return
  110. }
  111. // RawTicketsByScreen 通过 screen_id user_id 获取 tickets
  112. func (d *Dao) RawTicketsByScreen(c context.Context, screenID int64, UID int64) (res []*model.Ticket, err error) {
  113. rows, err := d.db.Query(c, _selectTicketByScreenIDSQL, screenID, UID, consts.TkTypeDistrib)
  114. if err != nil {
  115. log.Error("d.RawTicketsByScreen(%d, %d) error(%v)", screenID, UID, err)
  116. return
  117. }
  118. defer rows.Close()
  119. for rows.Next() {
  120. ticket := &model.Ticket{}
  121. if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
  122. &ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
  123. &ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
  124. log.Error("d.RawTicketsByScreen(%d, %d) rows.Scan() error(%v)", screenID, UID, err)
  125. return
  126. }
  127. res = append(res, ticket)
  128. }
  129. return
  130. }
  131. // AddCacheTicketsByScreen 通过 screen_id user_id 获取 tickets
  132. func (d *Dao) AddCacheTicketsByScreen(c context.Context, screenID int64, tickets []*model.Ticket, UID int64) (err error) {
  133. conn := d.redis.Get(c)
  134. defer conn.Close()
  135. val, err := json.Marshal(tickets)
  136. if err != nil {
  137. log.Error("d.AddCacheTicketsByScreen() error(%v)", err)
  138. return
  139. }
  140. if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyScreenTickets, screenID, UID), model.RedisExpireOneDayTmp, val); err != nil {
  141. log.Error("d.AddCacheTicketsByScreen(%d, %d, %+v) error(%v)", screenID, UID, tickets, err)
  142. return
  143. }
  144. return
  145. }
  146. // CacheTicketsByID .
  147. func (d *Dao) CacheTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
  148. if len(ticketID) == 0 {
  149. return
  150. }
  151. conn := d.redis.Get(c)
  152. defer conn.Close()
  153. keys := make([]interface{}, 0)
  154. for _, ID := range ticketID {
  155. keys = append(keys, fmt.Sprintf(model.CacheKeyTicket, ID))
  156. }
  157. reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
  158. if err != nil {
  159. if err == redis.ErrNil {
  160. err = nil
  161. return
  162. }
  163. log.Error("d.CacheTicketsByID(%v) conn.Do() error(%v)", ticketID, err)
  164. return
  165. }
  166. res = make(map[int64]*model.Ticket)
  167. for _, item := range reply {
  168. if len(item) == 0 {
  169. continue
  170. }
  171. ticket := &model.Ticket{}
  172. if err = json.Unmarshal(item, ticket); err != nil {
  173. log.Error("d.CacheTicketsByID(%v) json.Unmarshal(%s) error(%v)", ticketID, item, err)
  174. continue
  175. }
  176. res[ticket.ID] = ticket
  177. }
  178. return
  179. }
  180. // RawTicketsByID .
  181. func (d *Dao) RawTicketsByID(c context.Context, ticketID []int64) (res map[int64]*model.Ticket, err error) {
  182. if len(ticketID) == 0 {
  183. return
  184. }
  185. rows, err := d.db.Query(c, fmt.Sprintf(_selectTicketByIDSQL, xstr.JoinInts(ticketID)))
  186. if err != nil {
  187. log.Error("d.RawTicketsByID(%v) d.db.Query() error(%v)", ticketID, err)
  188. return
  189. }
  190. defer rows.Close()
  191. res = make(map[int64]*model.Ticket)
  192. for rows.Next() {
  193. ticket := &model.Ticket{}
  194. if err = rows.Scan(&ticket.ID, &ticket.UID, &ticket.OID, &ticket.SID, &ticket.Price, &ticket.Src, &ticket.Type,
  195. &ticket.Status, &ticket.Qr, &ticket.RefID, &ticket.SkuID, &ticket.SeatID,
  196. &ticket.Seat, &ticket.ETime, &ticket.RefundApplyTime, &ticket.CTime, &ticket.MTime); err != nil {
  197. log.Error("d.RawTicketsByID(%v) rows.Scan() error(%v)", ticketID, err)
  198. return
  199. }
  200. res[ticket.ID] = ticket
  201. }
  202. return
  203. }
  204. // AddCacheTicketsByID .
  205. func (d *Dao) AddCacheTicketsByID(c context.Context, tickets map[int64]*model.Ticket) (err error) {
  206. if len(tickets) == 0 {
  207. return
  208. }
  209. conn := d.redis.Get(c)
  210. defer func() {
  211. conn.Flush()
  212. conn.Close()
  213. }()
  214. args := make([]interface{}, 0)
  215. for ID, ticket := range tickets {
  216. var b []byte
  217. if b, err = json.Marshal(ticket); err != nil {
  218. log.Error("d.AddCacheTicketsByID(%+v) json.Marshal(%+v) error(%v)", tickets, ticket, err)
  219. continue
  220. }
  221. args = append(args, fmt.Sprintf(model.CacheKeyTicket, ID), b)
  222. }
  223. if err = conn.Send("MSET", args...); err != nil {
  224. log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tickets, err)
  225. return
  226. }
  227. for ID := range tickets {
  228. conn.Send("EXPIRE", fmt.Sprintf(model.CacheKeyTicket, ID), model.RedisExpireTenMinTmp)
  229. }
  230. return
  231. }
  232. // UpdateTicketStatus 更新票状态
  233. func (d *Dao) UpdateTicketStatus(c context.Context, status int16, ticketID ...int64) (err error) {
  234. if len(ticketID) == 0 {
  235. return
  236. }
  237. if _, err = d.db.Exec(c, fmt.Sprintf(_updateTicketStatusSQL, xstr.JoinInts(ticketID)), status); err != nil {
  238. log.Error("d.UpdateTicketStatus(%d, %v) error(%v)", status, ticketID, err)
  239. }
  240. return
  241. }
  242. // DelTicketCache 删除单张电子票全部 cache
  243. func (d *Dao) DelTicketCache(c context.Context, tickets ...*model.Ticket) (err error) {
  244. if len(tickets) == 0 {
  245. return
  246. }
  247. var keys []interface{}
  248. for _, ticket := range tickets {
  249. keys = append(
  250. keys,
  251. fmt.Sprintf(model.CacheKeyOrderTickets, ticket.OID),
  252. fmt.Sprintf(model.CacheKeyScreenTickets, ticket.SID, ticket.UID),
  253. fmt.Sprintf(model.CacheKeyTicket, ticket.ID),
  254. fmt.Sprintf(model.CacheKeyTicketQr, ticket.Qr),
  255. )
  256. }
  257. if err = d.RedisDel(c, keys...); err != nil {
  258. log.Error("d.DelTicketCache() d.RedisDel(%v) error(%v)", keys, err)
  259. }
  260. return
  261. }
  262. // CacheTicketSend .
  263. func (d *Dao) CacheTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
  264. var cacheKey string
  265. switch TIDType {
  266. case consts.TIDTypeSend:
  267. cacheKey = model.CacheKeyTicketSend
  268. case consts.TIDTypeRecv:
  269. cacheKey = model.CacheKeyTicketRecv
  270. default:
  271. return
  272. }
  273. conn := d.redis.Get(c)
  274. defer conn.Close()
  275. keys := make([]interface{}, 0)
  276. for _, ID := range IDs {
  277. keys = append(keys, fmt.Sprintf(cacheKey, ID))
  278. }
  279. reply, err := redis.ByteSlices(conn.Do("MGET", keys...))
  280. if err != nil {
  281. if err == redis.ErrNil {
  282. err = nil
  283. return
  284. }
  285. log.Error("d.CacheTicketSend(%v, %s) conn.Do() error(%v)", IDs, TIDType, err)
  286. return
  287. }
  288. res = make(map[int64]*model.TicketSend)
  289. for _, item := range reply {
  290. if len(item) == 0 {
  291. continue
  292. }
  293. tmp := &model.TicketSend{}
  294. if err = json.Unmarshal(item, tmp); err != nil {
  295. log.Error("d.CacheTicketSend() json.Unmarshal(%s) error(%v)", item, err)
  296. continue
  297. }
  298. switch TIDType {
  299. case consts.TIDTypeSend:
  300. res[tmp.SendTID] = tmp
  301. case consts.TIDTypeRecv:
  302. res[tmp.RecvTID] = tmp
  303. default:
  304. return
  305. }
  306. }
  307. return
  308. }
  309. // RawTicketSend .
  310. func (d *Dao) RawTicketSend(c context.Context, IDs []int64, TIDType string) (res map[int64]*model.TicketSend, err error) {
  311. if len(IDs) == 0 {
  312. return
  313. }
  314. var sql string
  315. switch TIDType {
  316. case consts.TIDTypeSend:
  317. sql = _selectTicketSendBySendTID
  318. case consts.TIDTypeRecv:
  319. sql = _selectTicketSendByRecvTID
  320. default:
  321. return
  322. }
  323. rows, err := d.db.Query(c, fmt.Sprintf(sql, xstr.JoinInts(IDs)))
  324. if err != nil {
  325. log.Error("d.RawTicketSend(%v, %s) d.db.Query() error(%v)", IDs, TIDType, err)
  326. return
  327. }
  328. defer rows.Close()
  329. res = make(map[int64]*model.TicketSend)
  330. for rows.Next() {
  331. tmp := &model.TicketSend{}
  332. if err = rows.Scan(&tmp.ID, &tmp.SID, &tmp.SendTID, &tmp.RecvTID, &tmp.SendUID, &tmp.RecvUID, &tmp.RecvTel, &tmp.Status, &tmp.CTime, &tmp.MTime, &tmp.OID); err != nil {
  333. log.Error("d.RawTicketSend(%v, %s) rows.Scan() error(%v)", IDs, TIDType, err)
  334. return
  335. }
  336. switch TIDType {
  337. case consts.TIDTypeSend:
  338. res[tmp.SendTID] = tmp
  339. case consts.TIDTypeRecv:
  340. res[tmp.RecvTID] = tmp
  341. default:
  342. return
  343. }
  344. }
  345. return
  346. }
  347. // AddCacheTicketSend .
  348. func (d *Dao) AddCacheTicketSend(c context.Context, tsMap map[int64]*model.TicketSend, TIDType string) (err error) {
  349. var cacheKey string
  350. switch TIDType {
  351. case consts.TIDTypeSend:
  352. cacheKey = model.CacheKeyTicketSend
  353. case consts.TIDTypeRecv:
  354. cacheKey = model.CacheKeyTicketRecv
  355. default:
  356. return
  357. }
  358. conn := d.redis.Get(c)
  359. defer func() {
  360. conn.Flush()
  361. conn.Close()
  362. }()
  363. var args []interface{}
  364. for ID, item := range tsMap {
  365. var b []byte
  366. if b, err = json.Marshal(item); err != nil {
  367. log.Error("d.AddCacheTicketSend(%v, %s), json.Marshal(%s) error(%v)", tsMap, TIDType, b, err)
  368. continue
  369. }
  370. args = append(args, fmt.Sprintf(cacheKey, ID), b)
  371. }
  372. if err = conn.Send("MSET", args...); err != nil {
  373. log.Error("d.AddCacheTicketsByID(%+v) conn.Send() error(%v)", tsMap, err)
  374. return
  375. }
  376. for ID := range tsMap {
  377. conn.Send("EXPIRE", fmt.Sprintf(cacheKey, ID), model.RedisExpireOneDayTmp)
  378. }
  379. return
  380. }