stock.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "go-common/app/service/openplatform/ticket-sales/model"
  10. "go-common/library/cache/redis"
  11. xsql "go-common/library/database/sql"
  12. "go-common/library/log"
  13. "go-common/library/xstr"
  14. )
  15. const (
  16. _lockStockSQL = "UPDATE sku_stock SET stock=stock-?, locked_stock=locked_stock+? WHERE sku_id=? AND stock>=?"
  17. _unlockStockSQL = "UPDATE sku_stock SET stock=stock+?, locked_stock=locked_stock-? WHERE sku_id=? AND stock<=total_stock-? AND locked_stock>=?"
  18. _decrStockSQL = "UPDATE sku_stock SET stock=stock-? WHERE sku_id=? AND stock>=?"
  19. _incrStockSQL = "UPDATE sku_stock SET stock=stock+? WHERE sku_id=? AND stock<=total_stock-?"
  20. _decrStockLockedSQL = "UPDATE sku_stock SET locked_stock=locked_stock-? WHERE sku_id=? AND total_stock>=?"
  21. _getStockBySkuIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE sku_id=? LIMIT 1"
  22. _getStocksBySkuIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE sku_id IN (%s)"
  23. _getStockByItemIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE item_id=?"
  24. _getStockBySpecsSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE item_id=? AND specs=? LIMIT 1"
  25. _insertStockSQL = "INSERT INTO sku_stock (sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert) VALUES %s"
  26. _resetStockSQL = "UPDATE sku_stock SET stock=stock+(?-total_stock),total_stock=?, sk_alert=? WHERE sku_id=? AND stock>=(total_stock-?)"
  27. _getStockBySkuID = "SELECT sku_id, stock, locked_stock FROM sku_stock WHERE sku_id IN (%s)"
  28. _insertSKUStockLog = "INSERT INTO sku_stock_log (sku_id, op_type, src_id, stock) VALUES %s"
  29. _selectSKUStockLog = "SELECT id, sku_id, op_type, src_id, stock FROM sku_stock_log WHERE src_id=? AND op_type=? AND sku_id IN(%s)"
  30. _rollbackSKUStockLog = "UPDATE sku_stock_log set canceled_at=? where id=? AND canceled_at=0"
  31. )
  32. // StockLock lock stock
  33. func (d *Dao) StockLock(c context.Context, skuID int64, cnt int64) (err error) {
  34. _, err = d.db.Exec(c, _lockStockSQL, cnt, cnt, skuID, cnt)
  35. if err != nil {
  36. log.Error("d.StockLock(%d, %d) error(%v)", skuID, cnt, err)
  37. }
  38. return
  39. }
  40. // TxStockLock lock stock with tx
  41. func (d *Dao) TxStockLock(tx *xsql.Tx, skuID int64, cnt int64) (affected int64, err error) {
  42. res, err := tx.Exec(_lockStockSQL, cnt, cnt, skuID, cnt)
  43. if err != nil {
  44. log.Error("d.TxStockLock(%d, %d) error(%v)", skuID, cnt, err)
  45. return
  46. }
  47. affected, err = res.RowsAffected()
  48. if err != nil {
  49. log.Error("d.TxStockLock(%d, %d) res.RowsAffected() error(%v)", skuID, cnt, err)
  50. return
  51. }
  52. return
  53. }
  54. // StockDecr 减库存 DB
  55. func (d *Dao) StockDecr(c context.Context, skuID int64, num int64) (affected int64, err error) {
  56. res, err := d.db.Exec(c, _decrStockSQL, num, skuID, num)
  57. if err != nil {
  58. log.Error("d.StockDecr(%d, %d) error(%v)", skuID, num, err)
  59. return
  60. }
  61. affected, err = res.RowsAffected()
  62. if err != nil {
  63. log.Error("d.StockDecr(%d, %d) res.RowsAffected() error(%v)", skuID, num, err)
  64. return
  65. }
  66. return
  67. }
  68. // TxStockDecr 减库存 DB
  69. func (d *Dao) TxStockDecr(tx *xsql.Tx, skuID int64, num int64) (affected int64, err error) {
  70. res, err := tx.Exec(_decrStockSQL, num, skuID, num)
  71. if err != nil {
  72. log.Error("d.TxStockDecr(%d, %d) error(%v)", skuID, num, err)
  73. return
  74. }
  75. affected, err = res.RowsAffected()
  76. if err != nil {
  77. log.Error("d.TxStockDecr(%d, %d) res.RowsAffected() error(%v)", skuID, num, err)
  78. return
  79. }
  80. return
  81. }
  82. // StockCacheDecr 减库存缓存
  83. func (d *Dao) StockCacheDecr(c context.Context, skuID int64, total int64) (err error) {
  84. if err = d.RedisDecrExist(c, fmt.Sprintf(model.CacheKeyStock, skuID), total); err != nil {
  85. log.Error("d.StockCacheDecr(%d) error(%v)", skuID, err)
  86. }
  87. return
  88. }
  89. // StockLockedCacheDecr 减锁定库存缓存
  90. func (d *Dao) StockLockedCacheDecr(c context.Context, skuID int64, total int64) (err error) {
  91. if err = d.RedisDecrExist(c, fmt.Sprintf(model.CacheKeyStockL, skuID), total); err != nil {
  92. log.Error("d.StockLockedCacheDecr(%d) error(%v)", skuID, err)
  93. }
  94. return
  95. }
  96. // StockCacheDel 删除库存缓存
  97. func (d *Dao) StockCacheDel(c context.Context, skuID int64) (err error) {
  98. if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeyStock, skuID)); err != nil {
  99. log.Error("d.StockCacheDel(%d) error(%v)", skuID, err)
  100. }
  101. return
  102. }
  103. // DelCacheSku 删除 skuId => sku 缓存
  104. func (d *Dao) DelCacheSku(c context.Context, skuID int64) (err error) {
  105. if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeySku, skuID)); err != nil {
  106. log.Error("d.StockCacheDel(%d) error(%v)", skuID, err)
  107. }
  108. return
  109. }
  110. // AddStockLog TxAddStockLog
  111. func (d *Dao) AddStockLog() {
  112. }
  113. // TxAddStockLog 添加库存操作日志
  114. func (d *Dao) TxAddStockLog(tx *xsql.Tx, stockLogs ...*model.SKUStockLog) (err error) {
  115. if len(stockLogs) == 0 {
  116. return
  117. }
  118. placeholder := strings.Trim(strings.Repeat("(?, ?, ?, ?),", len(stockLogs)), ",")
  119. var values []interface{}
  120. for _, stockLog := range stockLogs {
  121. values = append(values, stockLog.SKUID, stockLog.OpType, stockLog.SrcID, stockLog.Stock)
  122. }
  123. if _, err = tx.Exec(fmt.Sprintf(_insertSKUStockLog, placeholder), values...); err != nil {
  124. log.Error("d.TxAddStockLog() error(%v)", err)
  125. return
  126. }
  127. return
  128. }
  129. // TxStockUnlock 解锁库存(减去锁定库存增加库存)
  130. func (d *Dao) TxStockUnlock(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) {
  131. res, err := tx.Exec(_unlockStockSQL, count, count, skuID, count, count)
  132. if err != nil {
  133. log.Error("d.TxStockUnlock(%d, %d) error(%v)", skuID, count, err)
  134. return
  135. }
  136. if affected, err = res.RowsAffected(); err != nil {
  137. log.Error("d.TxStockUnlock(%d, %d) res.RowsAffected() error(%v)", skuID, count, err)
  138. }
  139. return
  140. }
  141. // TxStockIncr 增加库存
  142. func (d *Dao) TxStockIncr(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) {
  143. res, err := tx.Exec(_incrStockSQL, count, skuID, count)
  144. if err != nil {
  145. log.Error("d.TxStockIncr(%d, %d) error(%v)", skuID, count, err)
  146. return
  147. }
  148. if affected, err = res.RowsAffected(); err != nil {
  149. log.Error("d.TxStockIncr(%d, %d) res.RowsAffected() error(%v)", skuID, count, err)
  150. }
  151. return
  152. }
  153. // TxStockLockedDecr 减去锁定库存
  154. func (d *Dao) TxStockLockedDecr(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) {
  155. res, err := tx.Exec(_decrStockLockedSQL, count, skuID, count)
  156. if err != nil {
  157. log.Error("d.TxStockLockedDecr(%d, %d) error(%v)", skuID, count, err)
  158. return
  159. }
  160. if affected, err = res.RowsAffected(); err != nil {
  161. log.Error("d.TxStockLockedDecr(%d, %d) res.RowsAffected() error(%v)", skuID, count, err)
  162. }
  163. return
  164. }
  165. // Stock 查询库存信息
  166. func (d *Dao) Stock(c context.Context, skuID int64) (stock *model.SKUStock, err error) {
  167. stock = new(model.SKUStock)
  168. if err = d.db.QueryRow(c, _getStockBySkuIDSQL, skuID).Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil {
  169. log.Error("d.Stock(%d), error(%v)", skuID, err)
  170. }
  171. return
  172. }
  173. // StockLogs 查询库存操作记录
  174. func (d *Dao) StockLogs(c context.Context, opType int16, srcID int64, skuIDs ...int64) (stockLogs []*model.SKUStockLog, err error) {
  175. if len(skuIDs) == 0 {
  176. return
  177. }
  178. rows, err := d.db.Query(c, fmt.Sprintf(_selectSKUStockLog, xstr.JoinInts(skuIDs)), srcID, opType)
  179. if err != nil {
  180. log.Error("d.StockLogs() error(%v)", err)
  181. return
  182. }
  183. defer rows.Close()
  184. stockLogs = make([]*model.SKUStockLog, 0)
  185. for rows.Next() {
  186. stockLog := &model.SKUStockLog{}
  187. if err = rows.Scan(&stockLog.ID, &stockLog.SKUID, &stockLog.OpType, &stockLog.SrcID, &stockLog.Stock); err != nil {
  188. log.Error("d.StockLogs() rows.Scan() error(%v)", err)
  189. return
  190. }
  191. stockLogs = append(stockLogs)
  192. }
  193. return
  194. }
  195. // TxAddStockInsert 插入 stock 数据
  196. func (d *Dao) TxAddStockInsert(tx *xsql.Tx, stocks ...*model.SKUStock) (affected int64, err error) {
  197. placeholder := strings.Trim(strings.Repeat("(?, ?, ?, ?, ?, ?, ?, ?),", len(stocks)), ",")
  198. var values []interface{}
  199. for _, stock := range stocks {
  200. values = append(values, stock.SKUID, stock.ParentSKUID, stock.ItemID, stock.Specs, stock.TotalStock, stock.Stock, stock.LockedStock, stock.SkAlert)
  201. }
  202. res, err := tx.Exec(fmt.Sprintf(_insertStockSQL, placeholder), values...)
  203. if err != nil {
  204. log.Error("d.TxStockInsert() error(%v)", err)
  205. return
  206. }
  207. if affected, err = res.RowsAffected(); err != nil {
  208. log.Error("d.TxStockInsert() res.RowsAffected() error(%v)", err)
  209. return
  210. }
  211. return
  212. }
  213. // TxStockReset 重置库存
  214. func (d *Dao) TxStockReset(tx *xsql.Tx, stock *model.SKUStock) (affected int64, err error) {
  215. res, err := tx.Exec(_resetStockSQL, stock.TotalStock, stock.TotalStock, stock.SkAlert, stock.SKUID, stock.TotalStock)
  216. fmt.Println(_resetStockSQL, stock.TotalStock, stock.TotalStock, stock.SkAlert, stock.SKUID, stock.TotalStock)
  217. if err != nil {
  218. log.Error("d.TxStockReset() error(%v)", err)
  219. return
  220. }
  221. if affected, err = res.RowsAffected(); err != nil {
  222. log.Error("d.TxStockReset() res.RowsAffected() error(%v)", err)
  223. return
  224. }
  225. return
  226. }
  227. // TxStockLogRollBack 回滚操作日志
  228. func (d *Dao) TxStockLogRollBack(tx *xsql.Tx, stockLogID int64) (affected int64, err error) {
  229. res, err := tx.Exec(_rollbackSKUStockLog, time.Now().Unix(), stockLogID)
  230. if err != nil {
  231. log.Error("d.TxStockLogRollBack(%d) error(%v)", stockLogID, err)
  232. return
  233. }
  234. if affected, err = res.RowsAffected(); err != nil {
  235. log.Error("d.TxStockLogRollBack(%d) res.RowsAffected() error(%v)", stockLogID, err)
  236. }
  237. return
  238. }
  239. // SkuItemCacheDel 删除 itemId => sku 缓存
  240. func (d *Dao) SkuItemCacheDel(c context.Context, itemID int64) (err error) {
  241. if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeyItemSku, itemID)); err != nil {
  242. log.Error("d.SkuItemCacheDel(%d) error(%v)", itemID, err)
  243. }
  244. return
  245. }
  246. // SkuByItemSpecs 通过 itemID specs 获取单个 sku
  247. func (d *Dao) SkuByItemSpecs(c context.Context, itemID int64, specs string) (stock *model.SKUStock, err error) {
  248. res, err := d.SkuByItemID(c, itemID)
  249. if err != nil {
  250. log.Error("d.SkuByItemSpecs(%d, %s) d.SkuByItemID() error(%v)", itemID, specs, err)
  251. return
  252. }
  253. if item, ok := res[specs]; ok {
  254. stock = item
  255. }
  256. return
  257. }
  258. // RawSkuByItemSpecs 根据 itemID 和规格获取单个 sku
  259. func (d *Dao) RawSkuByItemSpecs(c context.Context, itemID int64, specs string) (stock *model.SKUStock, err error) {
  260. stock = new(model.SKUStock)
  261. if err = d.db.QueryRow(c, _getStockBySpecsSQL, itemID, specs).Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil {
  262. if err != sql.ErrNoRows {
  263. log.Error("d.SkuByItemSpecs(%d, %s) error(%v)", itemID, specs, err)
  264. }
  265. }
  266. return
  267. }
  268. // RawSkuByItemID 根据规格获取 sku
  269. func (d *Dao) RawSkuByItemID(c context.Context, itemID int64) (stocks map[string]*model.SKUStock, err error) {
  270. rows, err := d.db.Query(c, _getStockByItemIDSQL, itemID)
  271. if err != nil {
  272. log.Error("d.RawSkuByItemID(%d) error(%v)", itemID, err)
  273. return
  274. }
  275. defer rows.Close()
  276. stocks = make(map[string]*model.SKUStock)
  277. for rows.Next() {
  278. stock := new(model.SKUStock)
  279. if err = rows.Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil {
  280. log.Error("d.RawSkuByItemID(%d) rows.Scan() error(%v)", itemID, err)
  281. return
  282. }
  283. stocks[stock.Specs] = stock
  284. }
  285. return
  286. }
  287. // CacheSkuByItemID 根据 itemID 获取 sku 缓存
  288. func (d *Dao) CacheSkuByItemID(c context.Context, itemID int64) (stocks map[string]*model.SKUStock, err error) {
  289. conn := d.redis.Get(c)
  290. defer conn.Close()
  291. reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyItemSku, itemID)))
  292. if err != nil {
  293. if err == redis.ErrNil {
  294. err = nil
  295. return
  296. }
  297. log.Error("d.CacheSkuByItemID(%d) redis.Bytes() error(%v)", itemID, err)
  298. return
  299. }
  300. stocks = make(map[string]*model.SKUStock)
  301. if err = json.Unmarshal(reply, &stocks); err != nil {
  302. log.Error("d.CacheSkuByItemID(%d) json.Unmarshal() error(%v)", itemID, err)
  303. return
  304. }
  305. return
  306. }
  307. // AddCacheSkuByItemID 添加 itemId => sku 缓存
  308. func (d *Dao) AddCacheSkuByItemID(c context.Context, itemID int64, stocks map[string]*model.SKUStock) (err error) {
  309. if stocks == nil {
  310. return
  311. }
  312. conn := d.redis.Get(c)
  313. defer conn.Close()
  314. s, err := json.Marshal(stocks)
  315. if err != nil {
  316. log.Error("d.AddCacheSkuByItemID(%d, %+v) json.Marshal() error(%v)", itemID, stocks, err)
  317. return
  318. }
  319. if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyItemSku, itemID), model.RedisExpireSku, s); err != nil {
  320. log.Error("d.AddCacheSkuByItemID(%d, %+v) conn.Do() error(%v)", itemID, stocks, err)
  321. return
  322. }
  323. return
  324. }
  325. // CacheStocks 获取 skuID => stock 库存缓存
  326. func (d *Dao) CacheStocks(c context.Context, keys []int64, isLocked bool) (res map[int64]int64, err error) {
  327. if len(keys) == 0 {
  328. return
  329. }
  330. conn := d.redis.Get(c)
  331. defer conn.Close()
  332. cacheKey := model.CacheKeyStock
  333. if isLocked {
  334. cacheKey = model.CacheKeyStockL
  335. }
  336. args := make([]interface{}, 0)
  337. for _, key := range keys {
  338. args = append(args, fmt.Sprintf(cacheKey, key))
  339. }
  340. int64s, err := redis.Int64s(conn.Do("MGET", args...))
  341. if err != nil {
  342. if err == redis.ErrNil {
  343. err = nil
  344. return
  345. }
  346. log.Error("d.CacheStocks(%v, %t) error(%v)", keys, isLocked, err)
  347. return
  348. }
  349. res = make(map[int64]int64)
  350. for index, val := range int64s {
  351. if val < 0 {
  352. val = 0
  353. }
  354. res[keys[index]] = val
  355. }
  356. return
  357. }
  358. // RawStocks skuID => stock 缓存回源
  359. func (d *Dao) RawStocks(c context.Context, keys []int64, isLocked bool) (res map[int64]int64, err error) {
  360. if len(keys) == 0 {
  361. return
  362. }
  363. rows, err := d.db.Query(c, fmt.Sprintf(_getStockBySkuID, xstr.JoinInts(keys)))
  364. if err != nil {
  365. log.Error("d.RawStocks() error(%v)", err)
  366. return
  367. }
  368. defer rows.Close()
  369. res = make(map[int64]int64)
  370. for rows.Next() {
  371. var skuID, stock, lockedStock int64
  372. if err = rows.Scan(&skuID, &stock, &lockedStock); err != nil {
  373. log.Error("d.RawStocks() rows.Scan() error(%v)", err)
  374. return
  375. }
  376. if isLocked {
  377. res[skuID] = lockedStock
  378. } else {
  379. res[skuID] = stock
  380. }
  381. }
  382. return
  383. }
  384. // AddCacheStocks skuID => stock 加入缓存
  385. func (d *Dao) AddCacheStocks(c context.Context, stocks map[int64]int64, isLocked bool) (err error) {
  386. cacheKey := model.CacheKeyStock
  387. if isLocked {
  388. cacheKey = model.CacheKeyStockL
  389. }
  390. for skuID, stock := range stocks {
  391. if err1 := d.RedisSetnx(c, fmt.Sprintf(cacheKey, skuID), stock, model.RedisExpireStock); err1 != nil {
  392. log.Warn("d.AddCacheStocks() d.RedisSetnx(%s, %d, %d) error(%v)", fmt.Sprintf(cacheKey, skuID), stock, model.RedisExpireStock, err)
  393. }
  394. }
  395. return
  396. }
  397. // CacheGetSKUs 根据 skuID 获取 sku
  398. // withNewStock 是否获取最新库存信息
  399. func (d *Dao) CacheGetSKUs(c context.Context, skuIds []int64, withNewStock bool) (skuMap map[int64]*model.SKUStock, err error) {
  400. if len(skuIds) == 0 {
  401. return
  402. }
  403. conn := d.redis.Get(c)
  404. defer conn.Close()
  405. args := make([]interface{}, 0)
  406. for _, skuID := range skuIds {
  407. args = append(args, fmt.Sprintf(model.CacheKeySku, skuID))
  408. }
  409. res, err := redis.ByteSlices(conn.Do("MGET", args...))
  410. if err != nil {
  411. if err == redis.ErrNil {
  412. err = nil
  413. return
  414. }
  415. log.Error("d.CacheGetSKUs(%v, %t) error(%v)", skuIds, withNewStock, err)
  416. return
  417. }
  418. skuMap = make(map[int64]*model.SKUStock, len(res))
  419. for _, v := range res {
  420. if len(v) == 0 {
  421. continue
  422. }
  423. sku := &model.SKUStock{}
  424. if err = json.Unmarshal(v, sku); err != nil {
  425. log.Error("d.CacheGetSKUs() json.Unmarshal(%s) error(%v)", v, err)
  426. return
  427. }
  428. skuMap[sku.SKUID] = sku
  429. }
  430. if withNewStock {
  431. var stockMap map[int64]int64
  432. if stockMap, err = d.Stocks(c, skuIds, false); err != nil {
  433. log.Error("d.CacheGetSKUs() d.Stocks(%v) error(%v)", skuIds, err)
  434. return
  435. }
  436. for _, sku := range skuMap {
  437. sku.Stock = stockMap[sku.SKUID]
  438. }
  439. }
  440. return
  441. }
  442. // RawGetSKUs .
  443. func (d *Dao) RawGetSKUs(c context.Context, skuIds []int64, withNewStock bool) (skuMap map[int64]*model.SKUStock, err error) {
  444. if len(skuIds) == 0 {
  445. return
  446. }
  447. rows, err := d.db.Query(c, fmt.Sprintf(_getStocksBySkuIDSQL, xstr.JoinInts(skuIds)))
  448. if err != nil {
  449. log.Error("d.RawGetSKUs() error(%v)", err)
  450. }
  451. defer rows.Close()
  452. skuMap = make(map[int64]*model.SKUStock)
  453. for rows.Next() {
  454. sku := &model.SKUStock{}
  455. if err = rows.Scan(&sku.SKUID, &sku.ParentSKUID, &sku.ItemID, &sku.Specs, &sku.TotalStock, &sku.Stock, &sku.LockedStock, &sku.SkAlert, &sku.Ctime, &sku.Mtime); err != nil {
  456. log.Error("d.RawGetSKUs() rows.Scan error(%v)", err)
  457. return
  458. }
  459. skuMap[sku.SKUID] = sku
  460. }
  461. return
  462. }
  463. // AddCacheGetSKUs .
  464. func (d *Dao) AddCacheGetSKUs(c context.Context, skuMap map[int64]*model.SKUStock, withNewStock bool) (err error) {
  465. conn := d.redis.Get(c)
  466. defer func() {
  467. conn.Flush()
  468. conn.Close()
  469. }()
  470. for skuID, sku := range skuMap {
  471. var v []byte
  472. if v, err = json.Marshal(sku); err != nil {
  473. log.Warn("d.AddCacheGetSKUs() json.Marshal(%v) error(%v)", sku, err)
  474. err = nil
  475. continue
  476. }
  477. conn.Send("SETEX", fmt.Sprintf(model.CacheKeySku, skuID), model.RedisExpireSkuTmp, v)
  478. }
  479. return
  480. }