mysql.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "go-common/app/service/main/rank/model"
  7. "go-common/library/log"
  8. xtime "go-common/library/time"
  9. "go-common/library/xstr"
  10. )
  11. const (
  12. _maxArchiveIDSQL = `SELECT MAX(id) FROM archive`
  13. _archiveMetasSQL = `SELECT id,typeid,pubtime FROM archive WHERE id>? ORDER BY id LIMIT ?`
  14. _archiveMetasMtimeSQL = `SELECT id,typeid,pubtime FROM archive WHERE id>? AND mtime BETWEEN ? AND ? ORDER BY mtime,id LIMIT ?`
  15. _archiveTypesSQL = `SELECT id,pid FROM archive_type WHERE id in (%s)`
  16. _archiveStatsSQL = `SELECT aid,click FROM archive_stat_%s WHERE aid in (%s)`
  17. _archiveStatsMtimeSQL = `SELECT id,aid,click FROM archive_stat_%s WHERE id > ? AND mtime BETWEEN ? AND ? ORDER BY mtime,id LIMIT ?`
  18. _archiveTVsSQL = `SELECT aid,result,deleted,valid FROM ugc_archive WHERE aid in (%s)`
  19. _archiveTVsMtimeSQL = `SELECT id,aid,result,deleted,valid FROM ugc_archive WHERE id > ? AND mtime BETWEEN ? AND ? ORDER BY mtime,id LIMIT ?`
  20. _archiveStatSharding = 100
  21. )
  22. // MaxOid .
  23. func (d *Dao) MaxOid(c context.Context) (oid int64, err error) {
  24. row := d.dbArchive.QueryRow(c, _maxArchiveIDSQL)
  25. if err = row.Scan(&oid); err != nil {
  26. if err == sql.ErrNoRows {
  27. err = nil
  28. } else {
  29. log.Error("row.Scan error(%v)", err)
  30. }
  31. }
  32. return
  33. }
  34. // ArchiveMetas .
  35. func (d *Dao) ArchiveMetas(c context.Context, id int64, limit int) ([]*model.ArchiveMeta, error) {
  36. rows, err := d.dbArchive.Query(c, _archiveMetasSQL, id, limit)
  37. if err != nil {
  38. log.Error("d.dbArchive.Query(%s,%d,%d) error()", _archiveMetasSQL, id, limit, err)
  39. return nil, err
  40. }
  41. defer rows.Close()
  42. as := make([]*model.ArchiveMeta, 0)
  43. for rows.Next() {
  44. a := new(model.ArchiveMeta)
  45. if err = rows.Scan(&a.ID, &a.Typeid, &a.Pubtime); err != nil {
  46. log.Error("rows.Scan() error(%v)", err)
  47. return nil, err
  48. }
  49. as = append(as, a)
  50. }
  51. return as, rows.Err()
  52. }
  53. // ArchiveMetasIncrs .
  54. func (d *Dao) ArchiveMetasIncrs(c context.Context, aid int64, begin, end xtime.Time, limit int) ([]*model.ArchiveMeta, error) {
  55. rows, err := d.dbArchive.Query(c, _archiveMetasMtimeSQL, aid, begin, end, limit)
  56. if err != nil {
  57. log.Error("d.dbArchive.Query(%s,%d,%s,%s,%d) error()", _archiveMetasMtimeSQL, aid, begin, end, limit, err)
  58. return nil, err
  59. }
  60. defer rows.Close()
  61. as := make([]*model.ArchiveMeta, 0)
  62. for rows.Next() {
  63. a := new(model.ArchiveMeta)
  64. if err = rows.Scan(&a.ID, &a.Typeid, &a.Pubtime); err != nil {
  65. log.Error("rows.Scan() error(%v)", err)
  66. return nil, err
  67. }
  68. as = append(as, a)
  69. }
  70. return as, rows.Err()
  71. }
  72. // ArchiveTypes .
  73. func (d *Dao) ArchiveTypes(c context.Context, ids []int64) (map[int64]*model.ArchiveType, error) {
  74. idsStr := xstr.JoinInts(ids)
  75. rows, err := d.dbArchive.Query(c, fmt.Sprintf(_archiveTypesSQL, idsStr))
  76. if err != nil {
  77. log.Error("d.dbArchive.Query(%s) error(%v)", fmt.Sprintf(_archiveTypesSQL, idsStr), err)
  78. return nil, err
  79. }
  80. defer rows.Close()
  81. as := make(map[int64]*model.ArchiveType)
  82. for rows.Next() {
  83. a := new(model.ArchiveType)
  84. if err = rows.Scan(&a.ID, &a.Pid); err != nil {
  85. log.Error("rows.Scan() error(%v)", err)
  86. return nil, err
  87. }
  88. as[a.ID] = a
  89. }
  90. return as, rows.Err()
  91. }
  92. // ArchiveStats .
  93. func (d *Dao) ArchiveStats(c context.Context, aids []int64) (map[int64]*model.ArchiveStat, error) {
  94. tableMap := make(map[int64][]int64)
  95. for _, aid := range aids {
  96. mod := aid % _archiveStatSharding
  97. tableMap[mod] = append(tableMap[mod], aid)
  98. }
  99. as := make(map[int64]*model.ArchiveStat)
  100. for tbl, aids := range tableMap {
  101. aidsStr := xstr.JoinInts(aids)
  102. rows, err := d.dbStat.Query(c, fmt.Sprintf(_archiveStatsSQL, fmt.Sprintf("%02d", tbl), aidsStr))
  103. if err != nil {
  104. log.Error("d.dbStat.Query(%s) error(%v)", fmt.Sprintf(_archiveStatsSQL, fmt.Sprintf("%02d", tbl), aidsStr), err)
  105. return nil, err
  106. }
  107. defer rows.Close()
  108. for rows.Next() {
  109. a := new(model.ArchiveStat)
  110. if err = rows.Scan(&a.Aid, &a.Click); err != nil {
  111. log.Error("rows.Scan() error(%v)", err)
  112. return nil, err
  113. }
  114. as[a.Aid] = a
  115. }
  116. if err = rows.Err(); err != nil {
  117. return nil, err
  118. }
  119. }
  120. return as, nil
  121. }
  122. // ArchiveStatsIncrs .
  123. func (d *Dao) ArchiveStatsIncrs(c context.Context, tbl int, id int64, begin, end xtime.Time, limit int) ([]*model.ArchiveStat, error) {
  124. rows, err := d.dbStat.Query(c, fmt.Sprintf(_archiveStatsMtimeSQL, fmt.Sprintf("%02d", tbl)), id, begin, end, limit)
  125. if err != nil {
  126. log.Error("d.dbStat.Query(%s,%d,%s,%s,%d) error(%v)", fmt.Sprintf(_archiveStatsMtimeSQL, fmt.Sprintf("%02d", tbl)), id, begin, end, limit, err)
  127. return nil, err
  128. }
  129. defer rows.Close()
  130. as := make([]*model.ArchiveStat, 0)
  131. for rows.Next() {
  132. a := new(model.ArchiveStat)
  133. if err = rows.Scan(&a.ID, &a.Aid, &a.Click); err != nil {
  134. log.Error("rows.Scan() error(%v)", err)
  135. return nil, err
  136. }
  137. as = append(as, a)
  138. }
  139. return as, rows.Err()
  140. }
  141. // ArchiveTVs .
  142. func (d *Dao) ArchiveTVs(c context.Context, aids []int64) (map[int64]*model.ArchiveTv, error) {
  143. aidsStr := xstr.JoinInts(aids)
  144. rows, err := d.dbTV.Query(c, fmt.Sprintf(_archiveTVsSQL, aidsStr))
  145. if err != nil {
  146. log.Error("d.dbTV.Query(%s) error(%v)", fmt.Sprintf(_archiveTVsSQL, aidsStr), err)
  147. return nil, err
  148. }
  149. defer rows.Close()
  150. as := make(map[int64]*model.ArchiveTv)
  151. for rows.Next() {
  152. a := new(model.ArchiveTv)
  153. if err = rows.Scan(&a.Aid, &a.Result, &a.Deleted, &a.Valid); err != nil {
  154. log.Error("rows.Scan() error(%v)", err)
  155. return nil, err
  156. }
  157. as[a.Aid] = a
  158. }
  159. return as, rows.Err()
  160. }
  161. // ArchiveTVsIncrs .
  162. func (d *Dao) ArchiveTVsIncrs(c context.Context, id int64, begin, end xtime.Time, limit int) ([]*model.ArchiveTv, error) {
  163. rows, err := d.dbTV.Query(c, _archiveTVsMtimeSQL, id, begin, end, limit)
  164. if err != nil {
  165. log.Error("d.dbTV.Query(%s,%d,%s,%s,%d) error(%v)", _archiveTVsMtimeSQL, id, begin, end, limit, err)
  166. return nil, err
  167. }
  168. defer rows.Close()
  169. as := make([]*model.ArchiveTv, 0)
  170. for rows.Next() {
  171. a := new(model.ArchiveTv)
  172. if err = rows.Scan(&a.ID, &a.Aid, &a.Result, &a.Deleted, &a.Valid); err != nil {
  173. log.Error("rows.Scan() error(%v)", err)
  174. return nil, err
  175. }
  176. as = append(as, a)
  177. }
  178. return as, rows.Err()
  179. }