db.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "go-common/app/service/main/reply-feed/model"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. )
  11. const (
  12. _getSlotsStat = "SELECT name,slot,state FROM reply_abtest_strategy"
  13. _getSlotsStatManager = "SELECT name,slot,algorithm,weight,state FROM reply_abtest_strategy"
  14. _setSlot = "UPDATE reply_abtest_strategy SET name=?,algorithm=?,weight=?,state=? WHERE slot IN (%s)"
  15. _setWeight = "UPDATE reply_abtest_strategy SET weight=? WHERE name=?"
  16. _modifyState = "UPDATE reply_abtest_strategy SET state=? WHERE name=?"
  17. _getSlotsStatByName = "SELECT slot,algorithm,weight FROM reply_abtest_strategy WHERE name=?"
  18. _getStatisticsDate = "SELECT name,date,hour,view,total_view,hot_view,hot_click,hot_like,hot_hate,hot_child,hot_report,total_like,total_hate,total_report,total_root,total_child,hot_like_uv,hot_hate_uv,hot_report_uv,hot_child_uv,total_like_uv,total_hate_uv,total_report_uv,total_child_uv,total_root_uv" +
  19. " FROM reply_abtest_statistics WHERE date>=? AND date<=? AND name!='default'"
  20. _upsertLog = "INSERT INTO reply_abtest_statistics (name,date,hour,view,hot_click,hot_view,total_view) VALUES(?,?,?,?,?,?,?)" +
  21. " ON DUPLICATE KEY UPDATE view=view+?,hot_click=hot_click+?,hot_view=hot_view+?,total_view=total_view+?"
  22. )
  23. var (
  24. _countIdleSlot = fmt.Sprintf("SELECT COUNT(*) FROM reply_abtest_strategy WHERE state=1 AND name='%s'", model.DefaultSlotName)
  25. _getIdelSlots = fmt.Sprintf("SELECT slot FROM reply_abtest_strategy WHERE state=1 AND name='%s' ORDER BY slot ASC LIMIT ?", model.DefaultSlotName)
  26. )
  27. /*
  28. SlotsStat
  29. */
  30. // SlotsMapping get slot name stat from database.
  31. func (d *Dao) SlotsMapping(ctx context.Context) (slotsMap map[string]*model.SlotsMapping, err error) {
  32. slotsMap = make(map[string]*model.SlotsMapping)
  33. rows, err := d.db.Query(ctx, _getSlotsStat)
  34. if err != nil {
  35. log.Error("db.Query(%s) args(%s) error(%v)", _getSlotsStat, err)
  36. return
  37. }
  38. defer rows.Close()
  39. for rows.Next() {
  40. var (
  41. name string
  42. slot int
  43. state int
  44. )
  45. if err = rows.Scan(&name, &slot, &state); err != nil {
  46. log.Error("rows.Scan error(%v)", err)
  47. return
  48. }
  49. mapping, ok := slotsMap[name]
  50. if ok {
  51. mapping.Slots = append(mapping.Slots, slot)
  52. } else {
  53. mapping = &model.SlotsMapping{
  54. Name: name,
  55. Slots: []int{slot},
  56. State: state,
  57. }
  58. }
  59. slotsMap[name] = mapping
  60. }
  61. if err = rows.Err(); err != nil {
  62. log.Error("rows.Err() error(%v)", err)
  63. }
  64. return
  65. }
  66. // SlotsStatManager get slots stat from database, used by manager.
  67. func (d *Dao) SlotsStatManager(ctx context.Context) (s []*model.SlotsStat, err error) {
  68. slotsMap := make(map[string]*model.SlotsStat)
  69. rows, err := d.db.Query(ctx, _getSlotsStatManager)
  70. if err != nil {
  71. log.Error("db.Query(%s) error(%v)", _getSlotsStatManager, err)
  72. return
  73. }
  74. defer rows.Close()
  75. for rows.Next() {
  76. var (
  77. name, algorithm, weight string
  78. slot, state int
  79. )
  80. if err = rows.Scan(&name, &slot, &algorithm, &weight, &state); err != nil {
  81. log.Error("rows.Scan error(%v)", err)
  82. return
  83. }
  84. if mapping, ok := slotsMap[name]; ok {
  85. mapping.Slots = append(mapping.Slots, slot)
  86. } else {
  87. slotsMap[name] = &model.SlotsStat{
  88. Name: name,
  89. Slots: []int{slot},
  90. Algorithm: algorithm,
  91. Weight: weight,
  92. State: state,
  93. }
  94. }
  95. }
  96. if err = rows.Err(); err != nil {
  97. log.Error("rows.Err() error(%v)", err)
  98. return
  99. }
  100. for _, stat := range slotsMap {
  101. s = append(s, stat)
  102. }
  103. return
  104. }
  105. // CountIdleSlot count idle slot which name="default" and state=1
  106. func (d *Dao) CountIdleSlot(ctx context.Context) (count int, err error) {
  107. if err = d.db.QueryRow(ctx, _countIdleSlot).Scan(&count); err != nil {
  108. log.Error("db.QueryRow() error(%v)", err)
  109. }
  110. return
  111. }
  112. // IdleSlots get idle slots
  113. func (d *Dao) IdleSlots(ctx context.Context, count int) (slots []int64, err error) {
  114. rows, err := d.db.Query(ctx, _getIdelSlots, count)
  115. if err != nil {
  116. log.Error("db.Query(%s) args(%d) error(%v)", _getIdelSlots, count, err)
  117. return
  118. }
  119. defer rows.Close()
  120. for rows.Next() {
  121. var slot int64
  122. if err = rows.Scan(&slot); err != nil {
  123. log.Error("rows.Scan() error(%v)", err)
  124. return
  125. }
  126. slots = append(slots, slot)
  127. }
  128. // 槽位不够新创建实验组
  129. if len(slots) < count {
  130. slots = nil
  131. err = errors.New("out of slot")
  132. return
  133. }
  134. if err = rows.Err(); err != nil {
  135. log.Error("rows.Err() error(%v)", err)
  136. }
  137. return
  138. }
  139. // ModifyState ModifyState
  140. func (d *Dao) ModifyState(ctx context.Context, name string, state int) (err error) {
  141. if _, err = d.db.Exec(ctx, _modifyState, state, name); err != nil {
  142. log.Error("db.Exec(%s) args(%d, %s) error(%v)", _modifyState, state, name, err)
  143. }
  144. return
  145. }
  146. // UpdateSlotsStat UpdateSlotStat and set state inactive.
  147. func (d *Dao) UpdateSlotsStat(ctx context.Context, name, algorithm, weight string, slots []int64, state int) (err error) {
  148. if _, err = d.db.Exec(ctx, fmt.Sprintf(_setSlot, xstr.JoinInts(slots)), name, algorithm, weight, state); err != nil {
  149. log.Error("db.Exec() error(%v)", err)
  150. }
  151. return
  152. }
  153. // SlotsStatByName get slots stat by name.
  154. func (d *Dao) SlotsStatByName(ctx context.Context, name string) (slots []int64, algorithm, weight string, err error) {
  155. rows, err := d.db.Query(ctx, _getSlotsStatByName, name)
  156. if err != nil {
  157. log.Error("db.Query(%s) args(%s) error(%v)", _getSlotsStatByName, name, err)
  158. return
  159. }
  160. defer rows.Close()
  161. for rows.Next() {
  162. var (
  163. slot int64
  164. )
  165. if err = rows.Scan(&slot, &algorithm, &weight); err != nil {
  166. log.Error("rows.Scan() error(%v)", err)
  167. return
  168. }
  169. slots = append(slots, slot)
  170. }
  171. if err = rows.Err(); err != nil {
  172. log.Error("rows.Err() error(%v)", err)
  173. }
  174. return
  175. }
  176. // UpdateWeight update a test set weight by name and algorithm.
  177. func (d *Dao) UpdateWeight(ctx context.Context, name string, weight interface{}) (err error) {
  178. b, err := json.Marshal(weight)
  179. if err != nil {
  180. return
  181. }
  182. if _, err = d.db.Exec(ctx, _setWeight, string(b), name); err != nil {
  183. log.Error("db.Exec(%s), error(%v)", _setWeight, err)
  184. }
  185. return
  186. }
  187. /*
  188. Statistics
  189. */
  190. // UpsertStatistics insert or update statistics from database, if err != nil, retry
  191. func (d *Dao) UpsertStatistics(ctx context.Context, name string, date int, hour int, s *model.StatisticsStat) (err error) {
  192. if _, err = d.db.Exec(ctx, _upsertLog,
  193. name, date, hour,
  194. s.View, s.HotClick, s.HotView, s.TotalView,
  195. s.View, s.HotClick, s.HotView, s.TotalView); err != nil {
  196. return
  197. }
  198. return
  199. }
  200. // StatisticsByDate StatisticsByDate
  201. func (d *Dao) StatisticsByDate(ctx context.Context, begin, end int64) (stats model.StatisticsStats, err error) {
  202. rows, err := d.db.Query(ctx, _getStatisticsDate, begin, end)
  203. if err != nil {
  204. log.Error("db.Query(%s) args(%d, %d) error(%v)", _getStatisticsDate, begin, end, err)
  205. return
  206. }
  207. defer rows.Close()
  208. for rows.Next() {
  209. var s = new(model.StatisticsStat)
  210. err = rows.Scan(&s.Name, &s.Date, &s.Hour, &s.View, &s.TotalView, &s.HotView, &s.HotClick, &s.HotLike, &s.HotHate, &s.HotChildReply,
  211. &s.HotReport, &s.TotalLike, &s.TotalHate, &s.TotalReport, &s.TotalRootReply, &s.TotalChildReply,
  212. &s.HotLikeUV, &s.HotHateUV, &s.HotReportUV, &s.HotChildUV, &s.TotalLikeUV, &s.TotalHateUV, &s.TotalReportUV, &s.TotalChildUV, &s.TotalRootUV)
  213. if err != nil {
  214. log.Error("rows.Scan() error(%v)", err)
  215. return
  216. }
  217. stats = append(stats, s)
  218. }
  219. if err = rows.Err(); err != nil {
  220. log.Error("rows.Err() error(%v)", err)
  221. return
  222. }
  223. return
  224. }