package dao import ( "context" "fmt" "sync" "go-common/app/job/main/dm2/model" "go-common/library/database/sql" "go-common/library/log" "go-common/library/sync/errgroup" "go-common/library/xstr" ) const ( _subtitleSharding = 100 _subjectSharding = 100 _indexSharding = 1000 _contentSharding = 1000 _addSubjectSQL = "INSERT INTO dm_subject_%02d(type,oid,pid,mid,maxlimit,attr) VALUES(?,?,?,?,?,?)" _updateChildpoolSQL = "UPDATE dm_subject_%02d SET childpool=? WHERE type=? AND oid=?" _updateSubMidSQL = "UPDATE dm_subject_%02d SET mid=? WHERE type=? AND oid=?" _updateSubAttrSQL = "UPDATE dm_subject_%02d SET attr=? WHERE type=? AND oid=?" _incrSubMCountSQL = "UPDATE dm_subject_%02d SET mcount=mcount+1 WHERE type=? AND oid=?" _incrSubCountSQL = "UPDATE dm_subject_%02d SET acount=acount+?,count=count+?,childpool=? WHERE type=? AND oid=?" _getSubjectSQL = "SELECT id,type,oid,pid,mid,state,attr,acount,count,mcount,move_count,maxlimit,childpool,ctime,mtime FROM dm_subject_%02d WHERE type=? AND oid=?" _addIndexSQL = "INSERT INTO dm_index_%03d(id,type,oid,mid,progress,state,pool,attr,ctime) VALUES(?,?,?,?,?,?,?,?,?)" _getIndexSQL = "SELECT id,type,oid,mid,progress,state,pool,attr,ctime,mtime FROM dm_index_%03d WHERE type=? AND oid=? AND state IN(0,6)" _idxSegIDSQL = "SELECT id FROM dm_index_%03d WHERE type=? AND oid=? AND progress>=? AND progress=? AND progress 0 { pageNum = pageNum + 1 } for i := 0; i < pageNum; i++ { start := i * d.pageSize end := (i + 1) * d.pageSize if end > len(dmids) { end = len(dmids) } wg.Go(func() (err error) { rows, err := d.dmReader.Query(c, fmt.Sprintf(_getContentsSQL, d.hitContent(oid), xstr.JoinInts(dmids[start:end]))) if err != nil { log.Error("db.Query(%s) error(%v)", fmt.Sprintf(_getContentsSQL, d.hitContent(oid), xstr.JoinInts(dmids)), err) return } defer rows.Close() for rows.Next() { ct := &model.Content{} if err = rows.Scan(&ct.ID, &ct.FontSize, &ct.Color, &ct.Mode, &ct.IP, &ct.Plat, &ct.Msg, &ct.Ctime, &ct.Mtime); err != nil { log.Error("rows.Scan() error(%v)", err) return } lock.Lock() ctsMap[ct.ID] = ct lock.Unlock() } err = rows.Err() return }) } if err = wg.Wait(); err != nil { log.Error("wg.Wait() error(%v)", err) } return } // ContentsSpecial multi get special dm content by dmids. func (d *Dao) ContentsSpecial(c context.Context, dmids []int64) (res map[int64]*model.ContentSpecial, err error) { res = make(map[int64]*model.ContentSpecial, len(dmids)) rows, err := d.dmReader.Query(c, fmt.Sprintf(_getContentsSpeSQL, xstr.JoinInts(dmids))) if err != nil { log.Error("db.Query() error(%v)", err) return } defer rows.Close() for rows.Next() { content := &model.ContentSpecial{} if err = rows.Scan(&content.ID, &content.Msg, &content.Ctime, &content.Mtime); err != nil { log.Error("rows.Scan() error(%v)", err) return } res[content.ID] = content } return } // ContentSpecial get special dm content by dmids. func (d *Dao) ContentSpecial(c context.Context, dmid int64) (contentSpe *model.ContentSpecial, err error) { contentSpe = &model.ContentSpecial{} row := d.dmReader.QueryRow(c, _getContentSpeSQL, dmid) if err = row.Scan(&contentSpe.ID, &contentSpe.Msg, &contentSpe.Ctime, &contentSpe.Mtime); err != nil { log.Error("rows.Scan() error(%v)", err) } return } // DelDMHideState del dm hide state func (d *Dao) DelDMHideState(c context.Context, tp int32, oid int64, dmid int64) (affect int64, err error) { res, err := d.dmWriter.Exec(c, fmt.Sprintf(_delDMHideState, d.hitIndex(oid)), model.StateNormal, oid, dmid, model.StateHide) if err != nil { log.Error("dmWriter.Exec(%s %d dmid=%d) error(%v)", _delDMHideState, oid, dmid, err) return } return res.RowsAffected() } // TxIncrSubMCount update monitor dm count. func (d *Dao) TxIncrSubMCount(tx *sql.Tx, tp int32, oid int64) (affect int64, err error) { res, err := tx.Exec(fmt.Sprintf(_incrSubMCountSQL, d.hitSubject(oid)), tp, oid) if err != nil { log.Error("tx.Exec(%s,%d,%d) error(%v)", _incrSubMCountSQL, tp, oid, err) return } return res.RowsAffected() } // UpdateSubtitle update subtitle mid func (d *Dao) UpdateSubtitle(c context.Context, subtitle *model.Subtitle) (err error) { if _, err = d.biliDMWriter.Exec(c, fmt.Sprintf(_updateSubtitle, d.hitSubtile(subtitle.Oid)), subtitle.UpMid, subtitle.Status, subtitle.PubTime, subtitle.RejectComment, subtitle.ID); err != nil { log.Error("biliDMWriter.Exec(query:%v,subtitle:%+v) error(%v)", _updateSubtitle, subtitle, err) return } return } // GetSubtitles . func (d *Dao) GetSubtitles(c context.Context, tp int32, oid int64) (subtitles []*model.Subtitle, err error) { rows, err := d.biliDMWriter.Query(c, fmt.Sprintf(_getSubtitles, d.hitSubtile(oid)), oid, tp) if err != nil { log.Error("biliDMWriter.Query(%s,%d,%d) error(%v)", _getSubtitles, oid, tp, err) return } defer rows.Close() for rows.Next() { var subtitle = &model.Subtitle{} if err = rows.Scan(&subtitle.ID, &subtitle.Oid, &subtitle.Type, &subtitle.Lan, &subtitle.Status, &subtitle.Mid, &subtitle.UpMid, &subtitle.SubtitleURL, &subtitle.PubTime, &subtitle.RejectComment); err != nil { log.Error("biliDMWriter.Scan(%s,%d,%d) error(%v)", _getSubtitles, oid, tp, err) return } subtitles = append(subtitles, subtitle) } if err = rows.Err(); err != nil { log.Error("biliDMWriter.rows.Err()(%s,%d,%d) error(%v)", _getSubtitles, oid, tp, err) return } return } // GetSubtitle . func (d *Dao) GetSubtitle(c context.Context, oid int64, subtitleID int64) (subtitle *model.Subtitle, err error) { subtitle = &model.Subtitle{} row := d.biliDMWriter.QueryRow(c, fmt.Sprintf(_getSubtitle, d.hitSubtile(oid)), subtitleID) if err = row.Scan(&subtitle.ID, &subtitle.Oid, &subtitle.Type, &subtitle.Lan, &subtitle.Status, &subtitle.Mid, &subtitle.UpMid, &subtitle.SubtitleURL, &subtitle.PubTime, &subtitle.RejectComment); err != nil { if err == sql.ErrNoRows { subtitle = nil err = nil } else { log.Error("row.Scan() error(%v)", err) } } return } // TxUpdateSubtitle . func (d *Dao) TxUpdateSubtitle(tx *sql.Tx, subtitle *model.Subtitle) (err error) { if _, err = tx.Exec(fmt.Sprintf(_updateSubtitle, d.hitSubtile(subtitle.Oid)), subtitle.UpMid, subtitle.Status, subtitle.PubTime, subtitle.RejectComment, subtitle.ID); err != nil { log.Error("params(%+v),error(%v)", subtitle, err) return } return } // TxAddSubtitlePub . func (d *Dao) TxAddSubtitlePub(tx *sql.Tx, subtitlePub *model.SubtitlePub) (err error) { if _, err = tx.Exec(_addSubtitlePub, subtitlePub.Oid, subtitlePub.Type, subtitlePub.Lan, subtitlePub.SubtitleID, subtitlePub.IsDelete, subtitlePub.SubtitleID, subtitlePub.IsDelete); err != nil { log.Error("params(%+v),error(%v)", subtitlePub, err) return } return } // MaskMids get mask mids from db. func (d *Dao) MaskMids(c context.Context) (mids []int64, err error) { mids = make([]int64, 0, 100) rows, err := d.biliDMWriter.Query(c, _getMaskMids) if err != nil { log.Error("biliDMWriter.Query(%s) error(%v)", _getMaskMids, err) return } defer rows.Close() for rows.Next() { var mid int64 if err = rows.Scan(&mid); err != nil { log.Error("biliDMWriter.Scan(%s) error(%v)", _getMaskMids, err) return } mids = append(mids, mid) } if err = rows.Err(); err != nil { log.Error("biliDMWriter.rows.Err() error(%v)", err) } return }