flow.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "go-common/app/admin/main/videoup/model/archive"
  7. "go-common/library/database/sql"
  8. "go-common/library/log"
  9. )
  10. // txUpFlowID update flow_id by videoParam.
  11. func (s *Service) txUpFlowID(tx *sql.Tx, ap *archive.ArcParam) (err error) {
  12. if _, err = s.arc.TxUpFlowID(tx, ap.Aid, ap.OnFlowID); err != nil {
  13. log.Error("archive_forbid.on_flow_id s.TxUpFlowID(%d,%d) error(%v)", ap.Aid, ap.OnFlowID, err) // NOTE: update es index question , if after update archive table
  14. return
  15. }
  16. var flowID int64
  17. if flowID, err = s.arc.FlowByPool(archive.PoolPrivateOrder, ap.Aid); err != nil {
  18. log.Error("flow_design s.arc.FlowByPool(%d,%d,%d) error(%v)", ap.Aid, ap.UID, ap.OnFlowID, err)
  19. return
  20. }
  21. if flowID > 0 {
  22. if _, err = s.arc.TxUpFlow(tx, flowID, ap.OnFlowID, ap.UID); err != nil {
  23. log.Error("flow_design s.arc.TxUpFlow(%d,%d,%d) error(%v)", ap.Aid, ap.UID, ap.OnFlowID, err)
  24. return
  25. }
  26. if _, err = s.arc.TxAddFlowLog(tx, archive.PoolPrivateOrder, archive.FlowLogUpdate, ap.Aid, ap.UID, ap.OnFlowID, "审核后台修改稿件私单类型"); err != nil {
  27. log.Error("s.arc.TxAddFlowLog(%d,%d,%d) error(%v)", ap.Aid, ap.UID, ap.OnFlowID, err)
  28. return
  29. }
  30. } else {
  31. if _, err = s.txAddFlow(tx, archive.PoolPrivateOrder, ap.Aid, ap.OnFlowID, ap.UID, "审核后台添加稿件私单类型"); err != nil {
  32. log.Error("flow_design s.arc.TxAddFlow(%d,%d,%d) error(%v)", ap.Aid, ap.UID, ap.OnFlowID, err)
  33. return
  34. }
  35. }
  36. log.Info("aid(%d) flowid(%d)", ap.Aid, ap.OnFlowID)
  37. return
  38. }
  39. /**
  40. * txAddFlow 新增流量套餐的记录
  41. * return int64, error/nil
  42. */
  43. func (s *Service) txAddFlow(tx *sql.Tx, pool int8, oid, groupID, uid int64, remark string) (id int64, err error) {
  44. if id, err = s.arc.TxAddFlow(tx, pool, oid, uid, groupID, remark); err != nil {
  45. log.Error("txAddFlow s.arc.TxAddFlow(%d,%d,%d,%d,%s) error(%v)", pool, oid, uid, groupID, remark, err)
  46. return
  47. }
  48. if id <= 0 {
  49. return
  50. }
  51. if _, err = s.arc.TxAddFlowLog(tx, pool, archive.FlowLogAdd, oid, uid, groupID, remark); err != nil {
  52. log.Error("txAddFlow s.arc.TxAddFlowLog(%d,%d,%d,%d,%s) error(%v)", pool, oid, uid, groupID, remark, err)
  53. return
  54. }
  55. return
  56. }
  57. /**
  58. * txUpFlowState 更新流量套餐的状态
  59. * return error/nil
  60. */
  61. func (s *Service) txUpFlowState(tx *sql.Tx, state int8, uid int64, f *archive.FlowData) (err error) {
  62. if f == nil {
  63. return
  64. }
  65. var rows int64
  66. if rows, err = s.arc.TxUpFlowState(tx, f.ID, state); err != nil {
  67. log.Error("updateFlowState s.arc.TxUpFlowState error(%v) id(%d) state(%d)", err, f.ID, state)
  68. return
  69. }
  70. if rows <= 0 {
  71. return
  72. }
  73. action := archive.FlowLogUpdate
  74. if state == archive.FlowDelete {
  75. action = archive.FlowLogDel
  76. }
  77. if _, err = s.arc.TxAddFlowLog(tx, f.Pool, action, f.OID, uid, f.GroupID, "审核后台修改状态"); err != nil {
  78. log.Error("updateFlowState s.arc.TxAddFlowLog error(%v) pool(%d) oid(%d) uid(%d) state(%d)", err, f.Pool, f.OID, uid, state)
  79. return
  80. }
  81. return
  82. }
  83. /**
  84. * getFlowsByOID 命中哪些流量套餐
  85. * return []*archive.FlowData, error/nil
  86. */
  87. func (s *Service) getFlowsByOID(c context.Context, oid int64) (flows []*archive.FlowData, err error) {
  88. if flows, err = s.arc.FlowsByOID(c, oid); err != nil {
  89. log.Error("getFlowsByOID s.arc.FlowsByOID error(%v) oid(%d)", err, oid)
  90. return
  91. }
  92. return
  93. }
  94. /**
  95. * txAddOrUpdateFlowState 新增或更新流量套餐的状态
  96. * return *archive.FlowData/nil, bool, error/nil
  97. */
  98. func (s *Service) txAddOrUpdateFlowState(c context.Context, tx *sql.Tx, oid, groupID, uid int64, pool, state int8, remark string) (flow *archive.FlowData, diff string, err error) {
  99. var (
  100. old, nw int8
  101. )
  102. defer func() {
  103. if err == nil && old != nw {
  104. tagID := archive.FlowOperType[groupID]
  105. if tagID > 0 {
  106. stateMap := map[int8]string{archive.FlowOpen: "是", archive.FlowDelete: "否"}
  107. diff = strings.TrimSpace(archive.Operformat(tagID, stateMap[old], stateMap[nw], archive.OperStyleOne))
  108. }
  109. }
  110. }()
  111. if flow, err = s.arc.FlowUnique(c, oid, groupID, pool); err != nil {
  112. log.Error("txAddOrUpdateFlowState s.arc.FlowUnique(%d,%d,%d) error(%v) state(%d)", oid, groupID, pool, err, state)
  113. return
  114. }
  115. //无数据前提下,新状态=state就没必要添加数据啦
  116. if flow == nil && state == archive.FlowDelete {
  117. return
  118. }
  119. if flow == nil {
  120. flow = &archive.FlowData{Pool: pool, OID: oid, GroupID: groupID, State: archive.FlowOpen}
  121. if flow.ID, err = s.txAddFlow(tx, flow.Pool, flow.OID, flow.GroupID, uid, remark); err != nil {
  122. log.Error("txAddOrUpdateFlowState s.txAddFlow error(%v) flow(%+v) state(%d)", err, flow, state)
  123. return
  124. }
  125. old = archive.FlowDelete
  126. nw = archive.FlowOpen
  127. } else {
  128. old = flow.State
  129. nw = state
  130. }
  131. if flow.State == state {
  132. return
  133. }
  134. if err = s.txUpFlowState(tx, state, uid, flow); err != nil {
  135. log.Error("txAddOrUpdateFlowState s.txUpdateFlowState error(%v) flow(%+v) state(%d) ", err, flow, state)
  136. return
  137. }
  138. flow.State = state
  139. nw = state
  140. return
  141. }
  142. //HitFlowGroups 命中哪些指定的流量套餐
  143. func (s *Service) HitFlowGroups(c context.Context, oid int64, includePools []int8) (res map[string]int, err error) {
  144. var (
  145. flows []*archive.FlowData
  146. )
  147. res = map[string]int{}
  148. includes := map[int8]int8{}
  149. if flows, err = s.getFlowsByOID(c, oid); err != nil {
  150. return
  151. }
  152. for _, p := range includePools {
  153. includes[p] = 1
  154. }
  155. for _, f := range flows {
  156. if includes[f.Pool] != 1 {
  157. continue
  158. }
  159. //merge their values
  160. value := map[string]int{}
  161. if err = json.Unmarshal(f.GroupValue, &value); err != nil {
  162. log.Error("HitFlowGroups json.Unmarshal error(%v) value(%s) oid(%d) flow.id(%d)", err, string(f.GroupValue), oid, f.ID)
  163. return
  164. }
  165. for attr, val := range value {
  166. if val == 1 {
  167. res[attr] = val
  168. }
  169. }
  170. }
  171. log.Info("HitFlowGroups oid(%d) includepools(%v) res(%+v)", oid, includePools, res)
  172. return
  173. }
  174. func (s *Service) txBatchUpFlowsState(c context.Context, tx *sql.Tx, aid, uid int64, pm map[string]int32) (conts []string, err error) {
  175. var (
  176. diff string
  177. groups []int64
  178. pools map[int64]int8
  179. )
  180. groupStates := map[int64]int8{}
  181. for attr, state := range pm {
  182. groupID := archive.FlowAttrMap[attr]
  183. if groupID <= 0 {
  184. continue
  185. }
  186. groups = append(groups, groupID)
  187. if state == 0 {
  188. groupStates[groupID] = archive.FlowDelete
  189. } else {
  190. groupStates[groupID] = archive.FlowOpen
  191. }
  192. }
  193. if len(groups) <= 0 {
  194. return
  195. }
  196. if pools, err = s.arc.FlowGroupPools(c, groups); err != nil {
  197. log.Error("txBatchUpFlowsState s.arc.FlowGroupPools(%v) error(%v) params(%+v)", groups, err, pm)
  198. return
  199. }
  200. for groupID, pool := range pools {
  201. if _, diff, err = s.txAddOrUpdateFlowState(c, tx, aid, groupID, uid, pool, groupStates[groupID], "审核后台修改"); err != nil {
  202. log.Error("txBatchUpFlowsState s.txAddOrUpdateFlowState(%d,%d,%d,%d,%d) error(%v) params(%+v)", aid, groupID, uid, pool, groupStates[groupID], err, pm)
  203. return
  204. }
  205. if diff != "" {
  206. conts = append(conts, diff)
  207. }
  208. }
  209. log.Info("txBatchUpFlowsState aid(%d) params(%+v) conts(%v)", aid, pm, conts)
  210. return
  211. }