dao.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package unicom
  2. import (
  3. "context"
  4. "database/sql"
  5. "time"
  6. "go-common/app/job/main/app-wall/conf"
  7. "go-common/app/job/main/app-wall/model/unicom"
  8. "go-common/library/cache/memcache"
  9. xsql "go-common/library/database/sql"
  10. "go-common/library/log"
  11. httpx "go-common/library/net/http/blademaster"
  12. )
  13. const (
  14. // unicom integral change
  15. _upUserIntegralSQL = `UPDATE unicom_user_bind SET integral=?,flow=?,monthlytime=? WHERE mid=? AND state=1`
  16. _orderUserSyncSQL = `SELECT usermob,spid,type,ordertime,endtime FROM unicom_order WHERE usermob=? AND type=0 ORDER BY type DESC`
  17. _bindAllSQL = `SELECT mid,usermob,monthlytime FROM unicom_user_bind WHERE state=1 LIMIT ?,?`
  18. _userBindSQL = `SELECT usermob,phone,mid,state,integral,flow,monthlytime FROM unicom_user_bind WHERE state=1 AND mid=?`
  19. // update unicom ip
  20. _inUnicomIPSyncSQL = `INSERT IGNORE INTO unicom_ip (ipbegion,ipend,isopen,ctime,mtime) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE
  21. ipbegion=?,ipend=?,isopen=?,mtime=?`
  22. _upUnicomIPSQL = `UPDATE unicom_ip SET isopen=?,mtime=? WHERE ipbegion=? AND ipend=?`
  23. _ipSyncSQL = `SELECT ipbegion,ipend FROM unicom_ip WHERE isopen=1`
  24. _inUserPackLogSQL = `INSERT INTO unicom_user_packs_log (phone,usermob,mid,request_no,ptype,integral,pdesc) VALUES (?,?,?,?,?,?,?)`
  25. _inUserIntegralLogSQL = `INSERT INTO unicom_user_integral_log (phone,mid,unicom_desc,ptype,integral,flow,pdesc) VALUES (?,?,?,?,?,?,?)`
  26. )
  27. type Dao struct {
  28. db *xsql.DB
  29. uclient *httpx.Client
  30. // memcache
  31. mc *memcache.Pool
  32. flowKeyExpired int32
  33. expire int32
  34. // unicom integral change
  35. upUserIntegralSQL *xsql.Stmt
  36. orderUserSyncSQL *xsql.Stmt
  37. bindAllSQL *xsql.Stmt
  38. userBindSQL *xsql.Stmt
  39. ipSyncSQL *xsql.Stmt
  40. inUserPackLogSQL *xsql.Stmt
  41. inUserIntegralLogSQL *xsql.Stmt
  42. // unicom url
  43. unicomFlowExchangeURL string
  44. unicomIPURL string
  45. }
  46. func New(c *conf.Config) (d *Dao) {
  47. d = &Dao{
  48. db: xsql.NewMySQL(c.MySQL.Show),
  49. uclient: httpx.NewClient(conf.Conf.HTTPUnicom),
  50. // memcache
  51. mc: memcache.NewPool(c.Memcache.Operator.Config),
  52. expire: int32(time.Duration(c.Unicom.PackKeyExpired) / time.Second),
  53. flowKeyExpired: int32(time.Duration(c.Unicom.KeyExpired) / time.Second),
  54. // unicom url
  55. unicomFlowExchangeURL: c.Host.UnicomFlow + _unicomFlowExchangeURL,
  56. unicomIPURL: c.Host.Unicom + _unicomIPURL,
  57. }
  58. // unicom integral change
  59. d.upUserIntegralSQL = d.db.Prepared(_upUserIntegralSQL)
  60. d.orderUserSyncSQL = d.db.Prepared(_orderUserSyncSQL)
  61. d.bindAllSQL = d.db.Prepared(_bindAllSQL)
  62. d.userBindSQL = d.db.Prepared(_userBindSQL)
  63. d.ipSyncSQL = d.db.Prepared(_ipSyncSQL)
  64. d.inUserPackLogSQL = d.db.Prepared(_inUserPackLogSQL)
  65. d.inUserIntegralLogSQL = d.db.Prepared(_inUserIntegralLogSQL)
  66. return
  67. }
  68. // UpUserIntegral update unicom user integral
  69. func (d *Dao) UpUserIntegral(ctx context.Context, ub *unicom.UserBind) (row int64, err error) {
  70. res, err := d.upUserIntegralSQL.Exec(ctx, ub.Integral, ub.Flow, ub.Monthly, ub.Mid)
  71. if err != nil {
  72. log.Error("update user integral sql error(%v)", err)
  73. return 0, err
  74. }
  75. return res.RowsAffected()
  76. }
  77. // OrdersUserFlow select user OrdersSync
  78. func (d *Dao) OrdersUserFlow(ctx context.Context, usermob string) (res []*unicom.Unicom, err error) {
  79. rows, err := d.orderUserSyncSQL.Query(ctx, usermob)
  80. if err != nil {
  81. log.Error("query error (%v)", err)
  82. return
  83. }
  84. defer rows.Close()
  85. for rows.Next() {
  86. u := &unicom.Unicom{}
  87. if err = rows.Scan(&u.Usermob, &u.Spid, &u.TypeInt, &u.Ordertime, &u.Endtime); err != nil {
  88. log.Error("OrdersUserFlow row.Scan err (%v)", err)
  89. return
  90. }
  91. res = append(res, u)
  92. }
  93. return
  94. }
  95. //BindAll select bind all mid state 1
  96. func (d *Dao) BindAll(ctx context.Context, start, end int) (res []*unicom.UserBind, err error) {
  97. rows, err := d.bindAllSQL.Query(ctx, start, end)
  98. if err != nil {
  99. log.Error("query error (%v)", err)
  100. return
  101. }
  102. defer rows.Close()
  103. for rows.Next() {
  104. u := &unicom.UserBind{}
  105. if err = rows.Scan(&u.Mid, &u.Usermob, &u.Monthly); err != nil {
  106. log.Error("BindAll rows.Scan error(%v)", err)
  107. return
  108. }
  109. res = append(res, u)
  110. }
  111. return
  112. }
  113. // UserBind unicom select user bind
  114. func (d *Dao) UserBind(ctx context.Context, mid int64) (res *unicom.UserBind, err error) {
  115. row := d.userBindSQL.QueryRow(ctx, mid)
  116. if row == nil {
  117. log.Error("userBindSQL is null")
  118. return
  119. }
  120. res = &unicom.UserBind{}
  121. if err = row.Scan(&res.Usermob, &res.Phone, &res.Mid, &res.State, &res.Integral, &res.Flow, &res.Monthly); err != nil {
  122. if err == sql.ErrNoRows {
  123. err = nil
  124. } else {
  125. log.Error("userBindSQL row.Scan error(%v)", err)
  126. }
  127. res = nil
  128. return
  129. }
  130. return
  131. }
  132. // InUnicomIPSync insert or update unicom_ip
  133. func (d *Dao) InUnicomIPSync(tx *xsql.Tx, u *unicom.UnicomIP, now time.Time) (row int64, err error) {
  134. res, err := tx.Exec(_inUnicomIPSyncSQL, u.Ipbegin, u.Ipend, 1, now, now,
  135. u.Ipbegin, u.Ipend, 1, now)
  136. if err != nil {
  137. log.Error("tx.inUnicomIPSyncSQL.Exec error(%v)", err)
  138. return 0, err
  139. }
  140. return res.RowsAffected()
  141. }
  142. // UpUnicomIP update unicom_ip state
  143. func (d *Dao) UpUnicomIP(tx *xsql.Tx, ipstart, ipend, state int, now time.Time) (row int64, err error) {
  144. res, err := tx.Exec(_upUnicomIPSQL, state, now, ipstart, ipend)
  145. if err != nil {
  146. log.Error("tx.upUnicomIPSQL.Exec error(%v)", err)
  147. return 0, err
  148. }
  149. return res.RowsAffected()
  150. }
  151. // IPSync select all ipSync
  152. func (d *Dao) IPSync(ctx context.Context) (res []*unicom.UnicomIP, err error) {
  153. rows, err := d.ipSyncSQL.Query(ctx)
  154. if err != nil {
  155. log.Error("query error (%v)", err)
  156. return
  157. }
  158. defer rows.Close()
  159. res = []*unicom.UnicomIP{}
  160. for rows.Next() {
  161. u := &unicom.UnicomIP{}
  162. if err = rows.Scan(&u.Ipbegin, &u.Ipend); err != nil {
  163. log.Error("rows.Scan error(%v)", err)
  164. return
  165. }
  166. u.UnicomIPChange()
  167. res = append(res, u)
  168. }
  169. return
  170. }
  171. // InUserPackLog insert unicom user pack log
  172. func (d *Dao) InUserPackLog(ctx context.Context, u *unicom.UserPackLog) (row int64, err error) {
  173. res, err := d.inUserPackLogSQL.Exec(ctx, u.Phone, u.Usermob, u.Mid, u.RequestNo, u.Type, u.Integral, u.Desc)
  174. if err != nil {
  175. log.Error("insert user pack log integral sql error(%v)", err)
  176. return 0, err
  177. }
  178. return res.RowsAffected()
  179. }
  180. // InUserIntegralLog insert unicom user add integral and flow log
  181. func (d *Dao) InUserIntegralLog(ctx context.Context, u *unicom.UserIntegralLog) (row int64, err error) {
  182. res, err := d.inUserIntegralLogSQL.Exec(ctx, u.Phone, u.Mid, u.UnicomDesc, u.Type, u.Integral, u.Flow, u.Desc)
  183. if err != nil {
  184. log.Error("insert user add integral and flow sql error(%v)", err)
  185. return 0, err
  186. }
  187. return res.RowsAffected()
  188. }
  189. // BeginTran begin a transacition
  190. func (d *Dao) BeginTran(ctx context.Context) (tx *xsql.Tx, err error) {
  191. return d.db.Begin(ctx)
  192. }