mysql.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "strconv"
  7. "time"
  8. "go-common/app/service/main/push/model"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. )
  12. const (
  13. // task
  14. _addTaskSQL = "INSERT INTO push_tasks (job,type,app_id,business_id,platform,title,summary,link_type,link_value,build,sound,vibration,pass_through,mid_file,progress,push_time,expire_time,status,`group`,image_url,extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
  15. _upadteTaskStatusSQL = "UPDATE push_tasks SET status=? WHERE id=?"
  16. _upadteTaskProgressSQL = "UPDATE push_tasks SET progress=? WHERE id=?"
  17. _taskByIDSQL = "SELECT id,job,type,app_id,business_id,platform,title,summary,link_type,link_value,build,sound,vibration,pass_through,mid_file,progress,push_time,expire_time,status,`group`,image_url FROM push_tasks WHERE id=?"
  18. _taskByPlatformSQL = "SELECT id,job,type,app_id,business_id,platform_id,title,summary,link_type,link_value,build,sound,vibration,pass_through,mid_file,progress,push_time,expire_time,status,`group`,image_url FROM push_tasks WHERE status=? AND push_time<=? AND dtime=0 and platform_id=? and mtime>? LIMIT 1 FOR UPDATE"
  19. // business
  20. _businessesSQL = "SELECT id,app_id,name,description,token,sound,vibration,receive_switch,push_switch,silent_time,push_limit_user,whitelist FROM push_business WHERE dtime=0"
  21. // auth
  22. _authsSQL = "SELECT app_id,platform_id,name,`key`,value,bundle_id FROM push_auths WHERE dtime=0"
  23. // callback
  24. _addCallbackSQL = `INSERT INTO push_callbacks (task,app,platform,mid,buvid,token,pid,click,brand,extra) VALUES(?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE app=?,platform=?,mid=?,buvid=?,pid=?,click=?`
  25. )
  26. // AddTask adds task.
  27. func (d *Dao) AddTask(c context.Context, t *model.Task) (id int64, err error) {
  28. var (
  29. res sql.Result
  30. platform = model.JoinInts(t.Platform)
  31. build, _ = json.Marshal(t.Build)
  32. progress, _ = json.Marshal(t.Progress)
  33. extra, _ = json.Marshal(t.Extra)
  34. )
  35. if res, err = d.addTaskStmt.Exec(c, t.Job, t.Type, t.APPID, t.BusinessID, platform, t.Title, t.Summary, t.LinkType, t.LinkValue,
  36. build, t.Sound, t.Vibration, t.PassThrough, t.MidFile, progress, t.PushTime, t.ExpireTime, t.Status, t.Group, t.ImageURL, extra); err != nil {
  37. log.Error("d.AddTask(%+v) error(%v)", t, err)
  38. PromError("mysql:添加推送任务")
  39. return
  40. }
  41. id, err = res.LastInsertId()
  42. return
  43. }
  44. // TxTaskByPlatform gets prepared task by platform.
  45. func (d *Dao) TxTaskByPlatform(tx *xsql.Tx, platform int) (t *model.Task, err error) {
  46. var (
  47. id int64
  48. build string
  49. progress string
  50. now = time.Now()
  51. since = now.Add(-7 * 24 * time.Hour)
  52. )
  53. t = &model.Task{Progress: &model.Progress{}}
  54. if err = tx.QueryRow(_taskByPlatformSQL, model.TaskStatusPrepared, now, platform, since).Scan(&id, &t.Job, &t.Type, &t.APPID, &t.BusinessID, &t.PlatformID, &t.Title, &t.Summary, &t.LinkType, &t.LinkValue, &build,
  55. &t.Sound, &t.Vibration, &t.PassThrough, &t.MidFile, &progress, &t.PushTime, &t.ExpireTime, &t.Status, &t.Group, &t.ImageURL); err != nil {
  56. t = nil
  57. if err == sql.ErrNoRows {
  58. err = nil
  59. return
  60. }
  61. log.Error("d.TxPreparedTask() QueryRow(%d,%v) error(%v)", platform, now, err)
  62. PromError("mysql:按状态查询任务")
  63. return
  64. }
  65. t.ID = strconv.FormatInt(id, 10)
  66. t.Build = model.ParseBuild(build)
  67. if progress != "" {
  68. if err = json.Unmarshal([]byte(progress), t.Progress); err != nil {
  69. log.Error("json.Unmarshal(%s) error(%v)", progress, err)
  70. PromError("mysql:unmarshal进度")
  71. }
  72. }
  73. return
  74. }
  75. // Task loads task by task id.
  76. func (d *Dao) Task(c context.Context, taskID string) (t *model.Task, err error) {
  77. var (
  78. platform string
  79. build string
  80. progress string
  81. id, _ = strconv.ParseInt(taskID, 10, 64)
  82. )
  83. t = &model.Task{Progress: &model.Progress{}}
  84. if err = d.taskStmt.QueryRow(c, id).Scan(&id, &t.Job, &t.Type, &t.APPID, &t.BusinessID, &platform, &t.Title, &t.Summary, &t.LinkType, &t.LinkValue, &build,
  85. &t.Sound, &t.Vibration, &t.PassThrough, &t.MidFile, &progress, &t.PushTime, &t.ExpireTime, &t.Status, &t.Group, &t.ImageURL); err != nil {
  86. if err == sql.ErrNoRows {
  87. t = nil
  88. err = nil
  89. return
  90. }
  91. log.Error("d.taskStmt.QueryRow(%s) error(%v)", taskID, err)
  92. PromError("mysql:按ID查询任务")
  93. return
  94. }
  95. t.ID = strconv.FormatInt(id, 10)
  96. t.Platform = model.SplitInts(platform)
  97. t.Build = model.ParseBuild(build)
  98. if progress != "" {
  99. if err = json.Unmarshal([]byte(progress), t.Progress); err != nil {
  100. log.Error("json.Unmarshal(%s) error(%v)", progress, err)
  101. PromError("mysql:unmarshal进度")
  102. }
  103. }
  104. if t.Progress == nil {
  105. t.Progress = &model.Progress{}
  106. }
  107. return
  108. }
  109. // UpdateTaskStatus update task status.
  110. func (d *Dao) UpdateTaskStatus(c context.Context, taskID string, status int8) (err error) {
  111. id, _ := strconv.ParseInt(taskID, 10, 64)
  112. if _, err = d.updateTaskStatusStmt.Exec(c, status, id); err != nil {
  113. log.Error("d.updateTaskStatusStmt.Exec(%s,%d) error(%v)", taskID, status, err)
  114. PromError("mysql:更新推送任务状态")
  115. }
  116. return
  117. }
  118. // TxUpdateTaskStatus updates task status by tx.
  119. func (d *Dao) TxUpdateTaskStatus(tx *xsql.Tx, taskID string, status int8) (err error) {
  120. id, _ := strconv.ParseInt(taskID, 10, 64)
  121. if _, err = tx.Exec(_upadteTaskStatusSQL, status, id); err != nil {
  122. log.Error("d.TxUpdateTaskStatus() Exec(%s,%d) error(%v)", taskID, status, err)
  123. PromError("mysql:更新推送任务状态")
  124. }
  125. return
  126. }
  127. // UpdateTaskProgress updates task's progress.
  128. func (d *Dao) UpdateTaskProgress(c context.Context, taskID string, p *model.Progress) (err error) {
  129. var b []byte
  130. if b, err = json.Marshal(p); err != nil {
  131. log.Error("json.Marshal(%+v) error(%v)", p, err)
  132. return
  133. }
  134. id, _ := strconv.ParseInt(taskID, 10, 64)
  135. if _, err = d.updateTaskProgressStmt.Exec(c, string(b), id); err != nil {
  136. log.Error("d.updateTaskProgress.Exec(%s,%+v) error(%v)", taskID, p, err)
  137. PromError("mysql:更新推送任务进度")
  138. }
  139. return
  140. }
  141. // Businesses gets all business info.
  142. func (d *Dao) Businesses(c context.Context) (res map[int64]*model.Business, err error) {
  143. rows, err := d.businessesStmt.Query(c)
  144. if err != nil {
  145. log.Error("d.businessesStmt.Query() error(%v)", err)
  146. PromError("mysql:查询业务方")
  147. return
  148. }
  149. defer rows.Close()
  150. res = make(map[int64]*model.Business)
  151. for rows.Next() {
  152. var (
  153. silentTime string
  154. b = &model.Business{}
  155. )
  156. if err = rows.Scan(&b.ID, &b.APPID, &b.Name, &b.Desc, &b.Token,
  157. &b.Sound, &b.Vibration, &b.ReceiveSwitch, &b.PushSwitch, &silentTime, &b.PushLimitUser, &b.Whitelist); err != nil {
  158. PromError("mysql:查询业务方Scan")
  159. log.Error("d.Business() Scan() error(%v)", err)
  160. return
  161. }
  162. b.SilentTime = model.ParseSilentTime(silentTime)
  163. res[b.ID] = b
  164. }
  165. return
  166. }
  167. func (d *Dao) auths(c context.Context) (res []*model.Auth, err error) {
  168. var rows *xsql.Rows
  169. if rows, err = d.authsStmt.Query(c); err != nil {
  170. log.Error("d.authsStmt.Query() error(%v)", err)
  171. PromError("mysql:获取 auths")
  172. return
  173. }
  174. defer rows.Close()
  175. for rows.Next() {
  176. a := &model.Auth{}
  177. if err = rows.Scan(&a.APPID, &a.PlatformID, &a.Name, &a.Key, &a.Value, &a.BundleID); err != nil {
  178. PromError("mysql:获取 auths Scan")
  179. log.Error("d.auths() Scan() error(%v)", err)
  180. return
  181. }
  182. res = append(res, a)
  183. }
  184. return
  185. }
  186. // AddCallback adds callback.
  187. func (d *Dao) AddCallback(c context.Context, cb *model.Callback) (err error) {
  188. var extra []byte
  189. if cb.Extra != nil {
  190. extra, _ = json.Marshal(cb.Extra)
  191. }
  192. if _, err = d.addCallbackStmt.Exec(c, cb.Task, cb.APP, cb.Platform, cb.Mid, cb.Buvid, cb.Token, cb.Pid, cb.Click, cb.Brand, string(extra),
  193. cb.APP, cb.Platform, cb.Mid, cb.Buvid, cb.Pid, cb.Click); err != nil {
  194. log.Error("d.AddCallback(%+v) error(%v)", cb, err)
  195. PromError("mysql:新增callback")
  196. return
  197. }
  198. PromInfo("mysql:新增callback")
  199. return
  200. }