mysql.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package dao
  2. import (
  3. "context"
  4. xsql "database/sql"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/spy/conf"
  8. "go-common/app/job/main/spy/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. const (
  14. _reBuildMidCountSQL = "SELECT count(mid) FROM spy_user_info_%02d WHERE state=? AND score>=30 AND mtime BETWEEN ? AND ?;"
  15. _reBuildMidSQL = "SELECT mid FROM spy_user_info_%02d WHERE state=? AND score>=30 AND mtime BETWEEN ? AND ? LIMIT ?;"
  16. _getAllConfigSQL = "SELECT id, property,name,val,ctime FROM spy_system_config;"
  17. _addEventHistorySQL = "INSERT INTO spy_user_event_history_%02d (mid,event_id,score,base_score,event_score,remark,reason,factor_val,ctime) VALUES (?,?,?,?,?,?,?,?,?);"
  18. _addPunishmentSQL = "INSERT INTO spy_punishment (mid,type,reason,batch_no,ctime) VALUES (?,?,?,?,?);"
  19. _updateUserStateSQL = "UPDATE spy_user_info_%02d SET state=? WHERE mid=?"
  20. _getLastHistorySQL = "SELECT id,mid,event_id,score,base_score,event_score,remark,reason,factor_val,ctime FROM spy_user_event_history_%02d WHERE mid=? ORDER BY id DESC LIMIT 1;"
  21. _getHistoryListSQL = "SELECT remark,reason FROM spy_user_event_history_%02d WHERE mid= ? ORDER BY id DESC LIMIT ?;"
  22. _updateEventScoreSQL = "UPDATE spy_user_info_%02d SET event_score=?, score=? WHERE mid=?;"
  23. _userInfoSQL = "SELECT id,mid,score,base_score,event_score,state,ctime,mtime FROM spy_user_info_%02d WHERE mid=? LIMIT 1;"
  24. _punishmentCountSQL = "SELECT COUNT(1) FROM spy_punishment where mtime > ? and mtime < ?;"
  25. _securityLoginCountSQL = "SELECT COUNT(1) FROM spy_user_event_history_%02d where reason = ? and mtime > ? and mtime < ?;"
  26. _insertReportSQL = "INSERT INTO `spy_report`(`name`,`date_version`,`val`,`ctime`)VALUES(?,?,?,?);"
  27. _insertIncrStatSQL = "INSERT INTO `spy_statistics`(`target_mid`,`target_id`,`event_id`,`state`,`type`,`quantity`,`ctime`)VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE quantity=quantity+?;"
  28. _insertStatSQL = "INSERT INTO `spy_statistics`(`target_mid`,`target_id`,`event_id`,`state`,`type`,`quantity`,`ctime`)VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE quantity=?;"
  29. _allEventSQL = "SELECT id,name,nick_name,service_id,status,ctime,mtime FROM spy_event WHERE status<>0"
  30. )
  31. func hitHistory(id int64) int64 {
  32. return id % conf.Conf.Property.HistoryShard
  33. }
  34. func hitInfo(id int64) int64 {
  35. return id % conf.Conf.Property.UserInfoShard
  36. }
  37. // BeginTran begin transaction.
  38. func (d *Dao) BeginTran(c context.Context) (*sql.Tx, error) {
  39. return d.db.Begin(c)
  40. }
  41. // ReBuildMidCount count for need reBuild user.
  42. func (d *Dao) ReBuildMidCount(c context.Context, i int, state int8, start, end time.Time) (res int64, err error) {
  43. row := d.db.QueryRow(c, fmt.Sprintf(_reBuildMidCountSQL, i), state, start, end)
  44. if err = row.Scan(&res); err != nil {
  45. if err == sql.ErrNoRows {
  46. err = nil
  47. } else {
  48. log.Error("row.Scan error(%v)", err)
  49. }
  50. }
  51. return
  52. }
  53. // ReBuildMidList query reBuild user mid list by page.
  54. func (d *Dao) ReBuildMidList(c context.Context, i int, t int8, start, end time.Time, ps int64) (res []int64, err error) {
  55. var rows *sql.Rows
  56. if rows, err = d.db.Query(c, fmt.Sprintf(_reBuildMidSQL, i), t, start, end, ps); err != nil {
  57. log.Error("d.reBuildMidSQL.Query(%d, %s, %s, %d) error(%v)", t, start, end, ps, err)
  58. return
  59. }
  60. defer rows.Close()
  61. res = []int64{}
  62. for rows.Next() {
  63. var r int64
  64. if err = rows.Scan(&r); err != nil {
  65. log.Error("row.Scan() error(%v)", err)
  66. res = nil
  67. return
  68. }
  69. res = append(res, r)
  70. }
  71. err = rows.Err()
  72. return
  73. }
  74. // Configs spy system configs.
  75. func (d *Dao) Configs(c context.Context) (res map[string]string, err error) {
  76. var rows *sql.Rows
  77. if rows, err = d.db.Query(c, _getAllConfigSQL); err != nil {
  78. log.Error("d.getAllConfigSQL.Query error(%v)", err)
  79. return
  80. }
  81. defer rows.Close()
  82. res = map[string]string{}
  83. for rows.Next() {
  84. var r model.Config
  85. if err = rows.Scan(&r.ID, &r.Property, &r.Name, &r.Val, &r.Ctime); err != nil {
  86. log.Error("row.Scan() error(%v)", err)
  87. res = nil
  88. return
  89. }
  90. res[r.Property] = r.Val
  91. }
  92. err = rows.Err()
  93. return
  94. }
  95. // TxAddEventHistory insert user_event_history.
  96. func (d *Dao) TxAddEventHistory(c context.Context, tx *sql.Tx, ueh *model.UserEventHistory) (err error) {
  97. var (
  98. now = time.Now()
  99. )
  100. if _, err = tx.Exec(fmt.Sprintf(_addEventHistorySQL, hitHistory(ueh.Mid)), ueh.Mid, ueh.EventID, ueh.Score, ueh.BaseScore, ueh.EventScore, ueh.Remark, ueh.Reason, ueh.FactorVal, now); err != nil {
  101. log.Error("db.Exec(%v) error(%v)", ueh, err)
  102. return
  103. }
  104. return
  105. }
  106. // TxAddPunishment insert punishment.
  107. func (d *Dao) TxAddPunishment(c context.Context, tx *sql.Tx, mid int64, t int8, reason string, blockNo int64) (err error) {
  108. var (
  109. now = time.Now()
  110. )
  111. if _, err = tx.Exec(_addPunishmentSQL, mid, t, reason, blockNo, now); err != nil {
  112. log.Error("db.Exec(%d, %d, %s) error(%v)", mid, t, reason, err)
  113. return
  114. }
  115. return
  116. }
  117. // History get last one user history.
  118. func (d *Dao) History(c context.Context, mid int64) (h *model.UserEventHistory, err error) {
  119. var (
  120. row *sql.Row
  121. )
  122. h = &model.UserEventHistory{}
  123. row = d.db.QueryRow(c, fmt.Sprintf(_getLastHistorySQL, hitHistory(mid)), mid)
  124. if err = row.Scan(&h.ID, &h.Mid, &h.EventID, &h.Score, &h.BaseScore, &h.EventScore, &h.Remark, &h.Reason, &h.FactorVal, &h.CTime); err != nil {
  125. if err == sql.ErrNoRows {
  126. err = nil
  127. h = nil
  128. return
  129. }
  130. log.Error("History row.Scan(%d) error(%v)", mid, err)
  131. }
  132. return
  133. }
  134. // TxUpdateUserState insert or update user state by mid.
  135. func (d *Dao) TxUpdateUserState(c context.Context, tx *sql.Tx, info *model.UserInfo) (err error) {
  136. if _, err = d.db.Exec(c, fmt.Sprintf(_updateUserStateSQL, hitInfo(info.Mid)), info.State, info.Mid); err != nil {
  137. log.Error("TxUpdateUserState db.Exec(%d, %v) error(%v)", info.Mid, info, err)
  138. return
  139. }
  140. return
  141. }
  142. // HistoryList query .
  143. func (d *Dao) HistoryList(c context.Context, mid int64, size int) (res []*model.UserEventHistory, err error) {
  144. var rows *sql.Rows
  145. if rows, err = d.db.Query(c, fmt.Sprintf(_getHistoryListSQL, hitHistory(mid)), mid, size); err != nil {
  146. log.Error("d.HistoryList.Query(%d, %d) error(%v)", mid, size, err)
  147. return
  148. }
  149. defer rows.Close()
  150. for rows.Next() {
  151. r := &model.UserEventHistory{}
  152. if err = rows.Scan(&r.Remark, &r.Reason); err != nil {
  153. log.Error("row.Scan() error(%v)", err)
  154. res = nil
  155. return
  156. }
  157. res = append(res, r)
  158. }
  159. err = rows.Err()
  160. return
  161. }
  162. // UserInfo get info by mid.
  163. func (d *Dao) UserInfo(c context.Context, mid int64) (res *model.UserInfo, err error) {
  164. var (
  165. row *sql.Row
  166. )
  167. res = &model.UserInfo{}
  168. row = d.db.QueryRow(c, fmt.Sprintf(_userInfoSQL, hitInfo(mid)), mid)
  169. if err = row.Scan(&res.ID, &res.Mid, &res.Score, &res.BaseScore, &res.EventScore, &res.State, &res.CTime, &res.MTime); err != nil {
  170. if err == sql.ErrNoRows {
  171. err = nil
  172. res = nil
  173. return
  174. }
  175. log.Error("row.Scan() error(%v)", err)
  176. }
  177. return
  178. }
  179. // TxUpdateEventScore update event score.
  180. func (d *Dao) TxUpdateEventScore(c context.Context, tx *sql.Tx, mid int64, escore, score int8) (err error) {
  181. if _, err = tx.Exec(fmt.Sprintf(_updateEventScoreSQL, hitInfo(mid)), escore, score, mid); err != nil {
  182. log.Error("db.TxUpdateEventScore(%s, %d, %d, %d) error(%v)", _updateEventScoreSQL, escore, score, mid, err)
  183. return
  184. }
  185. return
  186. }
  187. // AddReport add report info.
  188. func (d *Dao) AddReport(c context.Context, r *model.Report) (affected int64, err error) {
  189. var res xsql.Result
  190. if res, err = d.db.Exec(c, _insertReportSQL, r.Name, r.DateVersion, r.Val, r.Ctime); err != nil {
  191. log.Error("AddReport: db.Exec(%v) error(%v)", r, err)
  192. return
  193. }
  194. return res.RowsAffected()
  195. }
  196. // PunishmentCount punishment count.
  197. func (d *Dao) PunishmentCount(c context.Context, start, end time.Time) (res int64, err error) {
  198. row := d.db.QueryRow(c, _punishmentCountSQL, start, end)
  199. if err = row.Scan(&res); err != nil {
  200. if err == sql.ErrNoRows {
  201. err = nil
  202. } else {
  203. log.Error("row.Scan error(%v)", err)
  204. }
  205. }
  206. return
  207. }
  208. // SecurityLoginCount security login count.
  209. func (d *Dao) SecurityLoginCount(c context.Context, index int64, reason string, stime, etime time.Time) (res int64, err error) {
  210. row := d.db.QueryRow(c, fmt.Sprintf(_securityLoginCountSQL, hitHistory(index)), reason, stime, etime)
  211. if err = row.Scan(&res); err != nil {
  212. if err == sql.ErrNoRows {
  213. err = nil
  214. } else {
  215. log.Error("row.Scan error(%v)", err)
  216. }
  217. }
  218. return
  219. }
  220. // AddStatistics add statistics info.
  221. func (d *Dao) AddStatistics(c context.Context, s *model.Statistics) (id int64, err error) {
  222. var res xsql.Result
  223. if res, err = d.db.Exec(c, _insertStatSQL, s.TargetMid, s.TargetID, s.EventID, s.State, s.Type, s.Quantity, s.Ctime, s.Quantity); err != nil {
  224. return
  225. }
  226. if id, err = res.LastInsertId(); err != nil {
  227. err = errors.WithStack(err)
  228. return
  229. }
  230. return
  231. }
  232. // AddIncrStatistics add increase statistics info.
  233. func (d *Dao) AddIncrStatistics(c context.Context, s *model.Statistics) (id int64, err error) {
  234. var res xsql.Result
  235. if res, err = d.db.Exec(c, _insertIncrStatSQL, s.TargetMid, s.TargetID, s.EventID, s.State, s.Type, s.Quantity, s.Ctime, s.Quantity); err != nil {
  236. return
  237. }
  238. if id, err = res.LastInsertId(); err != nil {
  239. err = errors.WithStack(err)
  240. return
  241. }
  242. return
  243. }
  244. //AllEvent all event.
  245. func (d *Dao) AllEvent(c context.Context) (list []*model.Event, err error) {
  246. var (
  247. rows *sql.Rows
  248. )
  249. list = make([]*model.Event, 0)
  250. if rows, err = d.db.Query(c, _allEventSQL); err != nil {
  251. log.Error("d.db.Query(%s) error(%v)", _allEventSQL, err)
  252. return
  253. }
  254. defer rows.Close()
  255. for rows.Next() {
  256. var event = &model.Event{}
  257. if err = rows.Scan(&event.ID, &event.Name, &event.NickName, &event.ServiceID, &event.Status, &event.Ctime, &event.Mtime); err != nil {
  258. log.Error("rows.Scan() error(%v)", err)
  259. return
  260. }
  261. list = append(list, &model.Event{
  262. ID: event.ID,
  263. Name: event.Name,
  264. NickName: event.NickName,
  265. ServiceID: event.ServiceID,
  266. Status: event.Status,
  267. Ctime: event.Ctime,
  268. Mtime: event.Mtime,
  269. })
  270. }
  271. return
  272. }