package dao import ( "context" "database/sql" "encoding/json" "fmt" "strings" "time" "go-common/app/service/openplatform/ticket-sales/model" "go-common/library/cache/redis" xsql "go-common/library/database/sql" "go-common/library/log" "go-common/library/xstr" ) const ( _lockStockSQL = "UPDATE sku_stock SET stock=stock-?, locked_stock=locked_stock+? WHERE sku_id=? AND stock>=?" _unlockStockSQL = "UPDATE sku_stock SET stock=stock+?, locked_stock=locked_stock-? WHERE sku_id=? AND stock<=total_stock-? AND locked_stock>=?" _decrStockSQL = "UPDATE sku_stock SET stock=stock-? WHERE sku_id=? AND stock>=?" _incrStockSQL = "UPDATE sku_stock SET stock=stock+? WHERE sku_id=? AND stock<=total_stock-?" _decrStockLockedSQL = "UPDATE sku_stock SET locked_stock=locked_stock-? WHERE sku_id=? AND total_stock>=?" _getStockBySkuIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE sku_id=? LIMIT 1" _getStocksBySkuIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE sku_id IN (%s)" _getStockByItemIDSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE item_id=?" _getStockBySpecsSQL = "SELECT sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert, ctime, mtime FROM sku_stock WHERE item_id=? AND specs=? LIMIT 1" _insertStockSQL = "INSERT INTO sku_stock (sku_id, parent_sku_id, item_id, specs, total_stock, stock, locked_stock, sk_alert) VALUES %s" _resetStockSQL = "UPDATE sku_stock SET stock=stock+(?-total_stock),total_stock=?, sk_alert=? WHERE sku_id=? AND stock>=(total_stock-?)" _getStockBySkuID = "SELECT sku_id, stock, locked_stock FROM sku_stock WHERE sku_id IN (%s)" _insertSKUStockLog = "INSERT INTO sku_stock_log (sku_id, op_type, src_id, stock) VALUES %s" _selectSKUStockLog = "SELECT id, sku_id, op_type, src_id, stock FROM sku_stock_log WHERE src_id=? AND op_type=? AND sku_id IN(%s)" _rollbackSKUStockLog = "UPDATE sku_stock_log set canceled_at=? where id=? AND canceled_at=0" ) // StockLock lock stock func (d *Dao) StockLock(c context.Context, skuID int64, cnt int64) (err error) { _, err = d.db.Exec(c, _lockStockSQL, cnt, cnt, skuID, cnt) if err != nil { log.Error("d.StockLock(%d, %d) error(%v)", skuID, cnt, err) } return } // TxStockLock lock stock with tx func (d *Dao) TxStockLock(tx *xsql.Tx, skuID int64, cnt int64) (affected int64, err error) { res, err := tx.Exec(_lockStockSQL, cnt, cnt, skuID, cnt) if err != nil { log.Error("d.TxStockLock(%d, %d) error(%v)", skuID, cnt, err) return } affected, err = res.RowsAffected() if err != nil { log.Error("d.TxStockLock(%d, %d) res.RowsAffected() error(%v)", skuID, cnt, err) return } return } // StockDecr 减库存 DB func (d *Dao) StockDecr(c context.Context, skuID int64, num int64) (affected int64, err error) { res, err := d.db.Exec(c, _decrStockSQL, num, skuID, num) if err != nil { log.Error("d.StockDecr(%d, %d) error(%v)", skuID, num, err) return } affected, err = res.RowsAffected() if err != nil { log.Error("d.StockDecr(%d, %d) res.RowsAffected() error(%v)", skuID, num, err) return } return } // TxStockDecr 减库存 DB func (d *Dao) TxStockDecr(tx *xsql.Tx, skuID int64, num int64) (affected int64, err error) { res, err := tx.Exec(_decrStockSQL, num, skuID, num) if err != nil { log.Error("d.TxStockDecr(%d, %d) error(%v)", skuID, num, err) return } affected, err = res.RowsAffected() if err != nil { log.Error("d.TxStockDecr(%d, %d) res.RowsAffected() error(%v)", skuID, num, err) return } return } // StockCacheDecr 减库存缓存 func (d *Dao) StockCacheDecr(c context.Context, skuID int64, total int64) (err error) { if err = d.RedisDecrExist(c, fmt.Sprintf(model.CacheKeyStock, skuID), total); err != nil { log.Error("d.StockCacheDecr(%d) error(%v)", skuID, err) } return } // StockLockedCacheDecr 减锁定库存缓存 func (d *Dao) StockLockedCacheDecr(c context.Context, skuID int64, total int64) (err error) { if err = d.RedisDecrExist(c, fmt.Sprintf(model.CacheKeyStockL, skuID), total); err != nil { log.Error("d.StockLockedCacheDecr(%d) error(%v)", skuID, err) } return } // StockCacheDel 删除库存缓存 func (d *Dao) StockCacheDel(c context.Context, skuID int64) (err error) { if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeyStock, skuID)); err != nil { log.Error("d.StockCacheDel(%d) error(%v)", skuID, err) } return } // DelCacheSku 删除 skuId => sku 缓存 func (d *Dao) DelCacheSku(c context.Context, skuID int64) (err error) { if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeySku, skuID)); err != nil { log.Error("d.StockCacheDel(%d) error(%v)", skuID, err) } return } // AddStockLog TxAddStockLog func (d *Dao) AddStockLog() { } // TxAddStockLog 添加库存操作日志 func (d *Dao) TxAddStockLog(tx *xsql.Tx, stockLogs ...*model.SKUStockLog) (err error) { if len(stockLogs) == 0 { return } placeholder := strings.Trim(strings.Repeat("(?, ?, ?, ?),", len(stockLogs)), ",") var values []interface{} for _, stockLog := range stockLogs { values = append(values, stockLog.SKUID, stockLog.OpType, stockLog.SrcID, stockLog.Stock) } if _, err = tx.Exec(fmt.Sprintf(_insertSKUStockLog, placeholder), values...); err != nil { log.Error("d.TxAddStockLog() error(%v)", err) return } return } // TxStockUnlock 解锁库存(减去锁定库存增加库存) func (d *Dao) TxStockUnlock(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) { res, err := tx.Exec(_unlockStockSQL, count, count, skuID, count, count) if err != nil { log.Error("d.TxStockUnlock(%d, %d) error(%v)", skuID, count, err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockUnlock(%d, %d) res.RowsAffected() error(%v)", skuID, count, err) } return } // TxStockIncr 增加库存 func (d *Dao) TxStockIncr(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) { res, err := tx.Exec(_incrStockSQL, count, skuID, count) if err != nil { log.Error("d.TxStockIncr(%d, %d) error(%v)", skuID, count, err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockIncr(%d, %d) res.RowsAffected() error(%v)", skuID, count, err) } return } // TxStockLockedDecr 减去锁定库存 func (d *Dao) TxStockLockedDecr(tx *xsql.Tx, skuID int64, count int64) (affected int64, err error) { res, err := tx.Exec(_decrStockLockedSQL, count, skuID, count) if err != nil { log.Error("d.TxStockLockedDecr(%d, %d) error(%v)", skuID, count, err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockLockedDecr(%d, %d) res.RowsAffected() error(%v)", skuID, count, err) } return } // Stock 查询库存信息 func (d *Dao) Stock(c context.Context, skuID int64) (stock *model.SKUStock, err error) { stock = new(model.SKUStock) if err = d.db.QueryRow(c, _getStockBySkuIDSQL, skuID).Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil { log.Error("d.Stock(%d), error(%v)", skuID, err) } return } // StockLogs 查询库存操作记录 func (d *Dao) StockLogs(c context.Context, opType int16, srcID int64, skuIDs ...int64) (stockLogs []*model.SKUStockLog, err error) { if len(skuIDs) == 0 { return } rows, err := d.db.Query(c, fmt.Sprintf(_selectSKUStockLog, xstr.JoinInts(skuIDs)), srcID, opType) if err != nil { log.Error("d.StockLogs() error(%v)", err) return } defer rows.Close() stockLogs = make([]*model.SKUStockLog, 0) for rows.Next() { stockLog := &model.SKUStockLog{} if err = rows.Scan(&stockLog.ID, &stockLog.SKUID, &stockLog.OpType, &stockLog.SrcID, &stockLog.Stock); err != nil { log.Error("d.StockLogs() rows.Scan() error(%v)", err) return } stockLogs = append(stockLogs) } return } // TxAddStockInsert 插入 stock 数据 func (d *Dao) TxAddStockInsert(tx *xsql.Tx, stocks ...*model.SKUStock) (affected int64, err error) { placeholder := strings.Trim(strings.Repeat("(?, ?, ?, ?, ?, ?, ?, ?),", len(stocks)), ",") var values []interface{} for _, stock := range stocks { values = append(values, stock.SKUID, stock.ParentSKUID, stock.ItemID, stock.Specs, stock.TotalStock, stock.Stock, stock.LockedStock, stock.SkAlert) } res, err := tx.Exec(fmt.Sprintf(_insertStockSQL, placeholder), values...) if err != nil { log.Error("d.TxStockInsert() error(%v)", err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockInsert() res.RowsAffected() error(%v)", err) return } return } // TxStockReset 重置库存 func (d *Dao) TxStockReset(tx *xsql.Tx, stock *model.SKUStock) (affected int64, err error) { res, err := tx.Exec(_resetStockSQL, stock.TotalStock, stock.TotalStock, stock.SkAlert, stock.SKUID, stock.TotalStock) fmt.Println(_resetStockSQL, stock.TotalStock, stock.TotalStock, stock.SkAlert, stock.SKUID, stock.TotalStock) if err != nil { log.Error("d.TxStockReset() error(%v)", err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockReset() res.RowsAffected() error(%v)", err) return } return } // TxStockLogRollBack 回滚操作日志 func (d *Dao) TxStockLogRollBack(tx *xsql.Tx, stockLogID int64) (affected int64, err error) { res, err := tx.Exec(_rollbackSKUStockLog, time.Now().Unix(), stockLogID) if err != nil { log.Error("d.TxStockLogRollBack(%d) error(%v)", stockLogID, err) return } if affected, err = res.RowsAffected(); err != nil { log.Error("d.TxStockLogRollBack(%d) res.RowsAffected() error(%v)", stockLogID, err) } return } // SkuItemCacheDel 删除 itemId => sku 缓存 func (d *Dao) SkuItemCacheDel(c context.Context, itemID int64) (err error) { if err = d.RedisDel(c, fmt.Sprintf(model.CacheKeyItemSku, itemID)); err != nil { log.Error("d.SkuItemCacheDel(%d) error(%v)", itemID, err) } return } // SkuByItemSpecs 通过 itemID specs 获取单个 sku func (d *Dao) SkuByItemSpecs(c context.Context, itemID int64, specs string) (stock *model.SKUStock, err error) { res, err := d.SkuByItemID(c, itemID) if err != nil { log.Error("d.SkuByItemSpecs(%d, %s) d.SkuByItemID() error(%v)", itemID, specs, err) return } if item, ok := res[specs]; ok { stock = item } return } // RawSkuByItemSpecs 根据 itemID 和规格获取单个 sku func (d *Dao) RawSkuByItemSpecs(c context.Context, itemID int64, specs string) (stock *model.SKUStock, err error) { stock = new(model.SKUStock) if err = d.db.QueryRow(c, _getStockBySpecsSQL, itemID, specs).Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil { if err != sql.ErrNoRows { log.Error("d.SkuByItemSpecs(%d, %s) error(%v)", itemID, specs, err) } } return } // RawSkuByItemID 根据规格获取 sku func (d *Dao) RawSkuByItemID(c context.Context, itemID int64) (stocks map[string]*model.SKUStock, err error) { rows, err := d.db.Query(c, _getStockByItemIDSQL, itemID) if err != nil { log.Error("d.RawSkuByItemID(%d) error(%v)", itemID, err) return } defer rows.Close() stocks = make(map[string]*model.SKUStock) for rows.Next() { stock := new(model.SKUStock) if err = rows.Scan(&stock.SKUID, &stock.ParentSKUID, &stock.ItemID, &stock.Specs, &stock.TotalStock, &stock.Stock, &stock.LockedStock, &stock.SkAlert, &stock.Ctime, &stock.Mtime); err != nil { log.Error("d.RawSkuByItemID(%d) rows.Scan() error(%v)", itemID, err) return } stocks[stock.Specs] = stock } return } // CacheSkuByItemID 根据 itemID 获取 sku 缓存 func (d *Dao) CacheSkuByItemID(c context.Context, itemID int64) (stocks map[string]*model.SKUStock, err error) { conn := d.redis.Get(c) defer conn.Close() reply, err := redis.Bytes(conn.Do("GET", fmt.Sprintf(model.CacheKeyItemSku, itemID))) if err != nil { if err == redis.ErrNil { err = nil return } log.Error("d.CacheSkuByItemID(%d) redis.Bytes() error(%v)", itemID, err) return } stocks = make(map[string]*model.SKUStock) if err = json.Unmarshal(reply, &stocks); err != nil { log.Error("d.CacheSkuByItemID(%d) json.Unmarshal() error(%v)", itemID, err) return } return } // AddCacheSkuByItemID 添加 itemId => sku 缓存 func (d *Dao) AddCacheSkuByItemID(c context.Context, itemID int64, stocks map[string]*model.SKUStock) (err error) { if stocks == nil { return } conn := d.redis.Get(c) defer conn.Close() s, err := json.Marshal(stocks) if err != nil { log.Error("d.AddCacheSkuByItemID(%d, %+v) json.Marshal() error(%v)", itemID, stocks, err) return } if _, err = conn.Do("SETEX", fmt.Sprintf(model.CacheKeyItemSku, itemID), model.RedisExpireSku, s); err != nil { log.Error("d.AddCacheSkuByItemID(%d, %+v) conn.Do() error(%v)", itemID, stocks, err) return } return } // CacheStocks 获取 skuID => stock 库存缓存 func (d *Dao) CacheStocks(c context.Context, keys []int64, isLocked bool) (res map[int64]int64, err error) { if len(keys) == 0 { return } conn := d.redis.Get(c) defer conn.Close() cacheKey := model.CacheKeyStock if isLocked { cacheKey = model.CacheKeyStockL } args := make([]interface{}, 0) for _, key := range keys { args = append(args, fmt.Sprintf(cacheKey, key)) } int64s, err := redis.Int64s(conn.Do("MGET", args...)) if err != nil { if err == redis.ErrNil { err = nil return } log.Error("d.CacheStocks(%v, %t) error(%v)", keys, isLocked, err) return } res = make(map[int64]int64) for index, val := range int64s { if val < 0 { val = 0 } res[keys[index]] = val } return } // RawStocks skuID => stock 缓存回源 func (d *Dao) RawStocks(c context.Context, keys []int64, isLocked bool) (res map[int64]int64, err error) { if len(keys) == 0 { return } rows, err := d.db.Query(c, fmt.Sprintf(_getStockBySkuID, xstr.JoinInts(keys))) if err != nil { log.Error("d.RawStocks() error(%v)", err) return } defer rows.Close() res = make(map[int64]int64) for rows.Next() { var skuID, stock, lockedStock int64 if err = rows.Scan(&skuID, &stock, &lockedStock); err != nil { log.Error("d.RawStocks() rows.Scan() error(%v)", err) return } if isLocked { res[skuID] = lockedStock } else { res[skuID] = stock } } return } // AddCacheStocks skuID => stock 加入缓存 func (d *Dao) AddCacheStocks(c context.Context, stocks map[int64]int64, isLocked bool) (err error) { cacheKey := model.CacheKeyStock if isLocked { cacheKey = model.CacheKeyStockL } for skuID, stock := range stocks { if err1 := d.RedisSetnx(c, fmt.Sprintf(cacheKey, skuID), stock, model.RedisExpireStock); err1 != nil { log.Warn("d.AddCacheStocks() d.RedisSetnx(%s, %d, %d) error(%v)", fmt.Sprintf(cacheKey, skuID), stock, model.RedisExpireStock, err) } } return } // CacheGetSKUs 根据 skuID 获取 sku // withNewStock 是否获取最新库存信息 func (d *Dao) CacheGetSKUs(c context.Context, skuIds []int64, withNewStock bool) (skuMap map[int64]*model.SKUStock, err error) { if len(skuIds) == 0 { return } conn := d.redis.Get(c) defer conn.Close() args := make([]interface{}, 0) for _, skuID := range skuIds { args = append(args, fmt.Sprintf(model.CacheKeySku, skuID)) } res, err := redis.ByteSlices(conn.Do("MGET", args...)) if err != nil { if err == redis.ErrNil { err = nil return } log.Error("d.CacheGetSKUs(%v, %t) error(%v)", skuIds, withNewStock, err) return } skuMap = make(map[int64]*model.SKUStock, len(res)) for _, v := range res { if len(v) == 0 { continue } sku := &model.SKUStock{} if err = json.Unmarshal(v, sku); err != nil { log.Error("d.CacheGetSKUs() json.Unmarshal(%s) error(%v)", v, err) return } skuMap[sku.SKUID] = sku } if withNewStock { var stockMap map[int64]int64 if stockMap, err = d.Stocks(c, skuIds, false); err != nil { log.Error("d.CacheGetSKUs() d.Stocks(%v) error(%v)", skuIds, err) return } for _, sku := range skuMap { sku.Stock = stockMap[sku.SKUID] } } return } // RawGetSKUs . func (d *Dao) RawGetSKUs(c context.Context, skuIds []int64, withNewStock bool) (skuMap map[int64]*model.SKUStock, err error) { if len(skuIds) == 0 { return } rows, err := d.db.Query(c, fmt.Sprintf(_getStocksBySkuIDSQL, xstr.JoinInts(skuIds))) if err != nil { log.Error("d.RawGetSKUs() error(%v)", err) } defer rows.Close() skuMap = make(map[int64]*model.SKUStock) for rows.Next() { sku := &model.SKUStock{} if err = rows.Scan(&sku.SKUID, &sku.ParentSKUID, &sku.ItemID, &sku.Specs, &sku.TotalStock, &sku.Stock, &sku.LockedStock, &sku.SkAlert, &sku.Ctime, &sku.Mtime); err != nil { log.Error("d.RawGetSKUs() rows.Scan error(%v)", err) return } skuMap[sku.SKUID] = sku } return } // AddCacheGetSKUs . func (d *Dao) AddCacheGetSKUs(c context.Context, skuMap map[int64]*model.SKUStock, withNewStock bool) (err error) { conn := d.redis.Get(c) defer func() { conn.Flush() conn.Close() }() for skuID, sku := range skuMap { var v []byte if v, err = json.Marshal(sku); err != nil { log.Warn("d.AddCacheGetSKUs() json.Marshal(%v) error(%v)", sku, err) err = nil continue } conn.Send("SETEX", fmt.Sprintf(model.CacheKeySku, skuID), model.RedisExpireSkuTmp, v) } return }