handler.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "sync"
  9. "go-common/app/job/main/aegis/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. pkgerr "github.com/pkg/errors"
  13. )
  14. //RscHandler .
  15. type RscHandler interface {
  16. CheckMessage(json.RawMessage) (interface{}, error)
  17. HandleMessage(context.Context, interface{}) error
  18. }
  19. //TaskHandler .
  20. type TaskHandler interface {
  21. CheckMessage(*databus.Message) (interface{}, error)
  22. HandleMessage(context.Context, interface{}) error
  23. }
  24. var (
  25. _ TaskHandler = baseTaskHandler{}
  26. _ TaskHandler = dynamicTaskHandler{}
  27. _ RscHandler = baseResourceAddHandler{}
  28. _ RscHandler = mangaResourceAddHandler{}
  29. _ RscHandler = baseResourceUpdateHandler{}
  30. _ RscHandler = baseResourceCancelHandler{}
  31. )
  32. //单例
  33. var (
  34. basehandleTask *baseTaskHandler
  35. basehandleRscAdd *baseResourceAddHandler
  36. basehandleRscUpdate *baseResourceUpdateHandler
  37. basehandleRscCancel *baseResourceCancelHandler
  38. dynamicHandleTask *dynamicTaskHandler
  39. mangaHandelRscAdd *mangaResourceAddHandler
  40. once sync.Once
  41. )
  42. //ERROR
  43. var (
  44. ErrTaskDuplicate = errors.New("重复任务")
  45. ErrTaskFlowInvalid = errors.New("流程失效")
  46. ErrTaskResourceInvalid = errors.New("资源失效")
  47. ErrInvalidMsg = errors.New("无效消息")
  48. ErrHandlerMiss = errors.New("handler NotFound")
  49. )
  50. //prefix
  51. var (
  52. _prefixTask = "task_"
  53. _prefixRscAdd = "add_"
  54. _prefixRscUpdate = "update_"
  55. _prefixRscCancel = "cancel_"
  56. )
  57. //业务ID
  58. var (
  59. _bizidDynamic = 1
  60. _bizidManga = 2
  61. )
  62. func (s *Service) registerRscHandler(key string, handler RscHandler) {
  63. s.rschandle[key] = handler
  64. }
  65. func (s *Service) registerTaskHandler(key string, handler TaskHandler) {
  66. s.taskhandle[key] = handler
  67. }
  68. func (s *Service) findTaskHandler(key string) TaskHandler {
  69. if handler, ok := s.taskhandle[key]; ok {
  70. return handler
  71. }
  72. log.Warn("key(%s)没找到任务的处理器,根据类型使用默认handler", key)
  73. return s.getdynamicTaskHandler()
  74. }
  75. func (s *Service) findRscHandler(key string) RscHandler {
  76. if handler, ok := s.rschandle[key]; ok {
  77. return handler
  78. }
  79. log.Warn("key(%s)没找到业务的处理器,根据类型使用默认handler", key)
  80. switch {
  81. case strings.HasPrefix(key, _prefixRscAdd):
  82. return s.getbaseResourceAddHandler()
  83. case strings.HasPrefix(key, _prefixRscUpdate):
  84. return s.getbaseResourceUpdateHandler()
  85. case strings.HasPrefix(key, _prefixRscCancel):
  86. return s.getbaseResourceCancelHandler()
  87. default:
  88. return nil
  89. }
  90. }
  91. //TODO 先写死吧,之后可以根据配置里面的类名用反射实例化
  92. func initHandler(s *Service) {
  93. var (
  94. dynamicTask = fmt.Sprintf("%s%d", _prefixTask, _bizidDynamic)
  95. dynamicRscAdd = fmt.Sprintf("%s%d", _prefixRscAdd, _bizidDynamic)
  96. dynamicRscUpdate = fmt.Sprintf("%s%d", _prefixRscUpdate, _bizidDynamic)
  97. dynamicRscCancel = fmt.Sprintf("%s%d", _prefixRscCancel, _bizidDynamic)
  98. managaTask = fmt.Sprintf("%s%d", _prefixTask, _bizidManga)
  99. managaRscAdd = fmt.Sprintf("%s%d", _prefixRscAdd, _bizidManga)
  100. managaRscUpdate = fmt.Sprintf("%s%d", _prefixRscUpdate, _bizidManga)
  101. managaRscCancel = fmt.Sprintf("%s%d", _prefixRscCancel, _bizidManga)
  102. )
  103. s.rschandle = make(map[string]RscHandler)
  104. s.taskhandle = make(map[string]TaskHandler)
  105. once.Do(func() {
  106. basehandleTask = &baseTaskHandler{Service: s}
  107. basehandleRscAdd = &baseResourceAddHandler{Service: s}
  108. basehandleRscUpdate = &baseResourceUpdateHandler{Service: s}
  109. basehandleRscCancel = &baseResourceCancelHandler{Service: s}
  110. dynamicHandleTask = &dynamicTaskHandler{baseTaskHandler: baseTaskHandler{Service: s}}
  111. mangaHandelRscAdd = &mangaResourceAddHandler{baseResourceAddHandler: baseResourceAddHandler{Service: s}}
  112. })
  113. s.registerRscHandler(dynamicRscAdd, s.getbaseResourceAddHandler())
  114. s.registerRscHandler(dynamicRscUpdate, s.getbaseResourceUpdateHandler())
  115. s.registerRscHandler(dynamicRscCancel, s.getbaseResourceCancelHandler())
  116. s.registerRscHandler(managaRscAdd, s.getmangaResourceAddHandler())
  117. s.registerRscHandler(managaRscUpdate, s.getbaseResourceUpdateHandler())
  118. s.registerRscHandler(managaRscCancel, s.getbaseResourceCancelHandler())
  119. s.registerTaskHandler(managaTask, s.getbaseTaskHandler())
  120. s.registerTaskHandler(dynamicTask, s.getdynamicTaskHandler())
  121. }
  122. func (s *Service) getbaseTaskHandler() *baseTaskHandler {
  123. return basehandleTask
  124. }
  125. func (s *Service) getbaseResourceAddHandler() *baseResourceAddHandler {
  126. return basehandleRscAdd
  127. }
  128. func (s *Service) getbaseResourceUpdateHandler() *baseResourceUpdateHandler {
  129. return basehandleRscUpdate
  130. }
  131. func (s *Service) getbaseResourceCancelHandler() *baseResourceCancelHandler {
  132. return basehandleRscCancel
  133. }
  134. func (s *Service) getdynamicTaskHandler() *dynamicTaskHandler {
  135. return dynamicHandleTask
  136. }
  137. func (s *Service) getmangaResourceAddHandler() *mangaResourceAddHandler {
  138. return mangaHandelRscAdd
  139. }
  140. //解析验证message
  141. /*
  142. TODO
  143. 根据DispatchLimit,动态设置分发数量
  144. */
  145. func (s *Service) checkTaskMsg(msg *databus.Message) (*model.Task, error) {
  146. taskMsg := new(model.CreateTaskMsg)
  147. if err := json.Unmarshal(msg.Value, taskMsg); err != nil {
  148. log.Error("checkTaskMsg key(%s) value(%s)", msg.Key, string(msg.Value))
  149. return nil, err
  150. }
  151. if taskMsg.DispatchLimit == 0 || taskMsg.FlowID == 0 || taskMsg.RID == 0 {
  152. log.Error("checkTaskMsg key(%s) value(%s)", msg.Key, string(msg.Value))
  153. return nil, ErrTaskResourceInvalid
  154. }
  155. if s.dao.CheckTask(context.Background(), taskMsg.FlowID, taskMsg.RID) > 0 {
  156. return nil, ErrTaskDuplicate
  157. }
  158. ok, err := s.dao.CheckFlow(context.TODO(), taskMsg.RID, taskMsg.FlowID)
  159. if !ok || err != nil {
  160. return nil, ErrTaskFlowInvalid
  161. }
  162. //先兼容旧的task消息,没有传bizid
  163. if taskMsg.BizID == 0 {
  164. res, err := s.dao.Resource(context.Background(), taskMsg.RID)
  165. if err != nil || res == nil {
  166. return nil, ErrTaskResourceInvalid
  167. }
  168. taskMsg.BizID = res.BusinessID
  169. }
  170. return &model.Task{
  171. BusinessID: taskMsg.BizID,
  172. FlowID: taskMsg.FlowID,
  173. RID: taskMsg.RID,
  174. }, nil
  175. }
  176. func (s *Service) writeTaskToDB(c context.Context, task *model.Task) error {
  177. return s.dao.CreateTask(c, task)
  178. }
  179. func (s *Service) checkRscAddMsg(msg json.RawMessage) (*model.AddOption, error) {
  180. addMsg := new(model.AddOption)
  181. if err := json.Unmarshal(msg, addMsg); err != nil {
  182. return nil, err
  183. }
  184. if addMsg.BusinessID == 0 || len(addMsg.OID) == 0 {
  185. return nil, ErrInvalidMsg
  186. }
  187. return addMsg, nil
  188. }
  189. func (s *Service) writeRscAdd(c context.Context, opt *model.AddOption) error {
  190. //TODO 根据错误号重试
  191. return s.dao.RscAdd(c, opt)
  192. }
  193. func (s *Service) checkRscUpdateMsg(msg json.RawMessage) (*model.UpdateOption, error) {
  194. updateMsg := new(model.UpdateOption)
  195. if err := json.Unmarshal(msg, updateMsg); err != nil {
  196. return nil, err
  197. }
  198. if updateMsg.BusinessID == 0 || len(updateMsg.OID) == 0 || len(updateMsg.Update) == 0 {
  199. return nil, ErrInvalidMsg
  200. }
  201. return updateMsg, nil
  202. }
  203. func (s *Service) writeRscUpdate(c context.Context, opt *model.UpdateOption) error {
  204. return s.dao.RscUpdate(c, opt)
  205. }
  206. func (s *Service) checkRscCancelMsg(msg json.RawMessage) (*model.CancelOption, error) {
  207. cancelMsg := new(model.CancelOption)
  208. if err := json.Unmarshal(msg, cancelMsg); err != nil {
  209. return nil, err
  210. }
  211. if cancelMsg.BusinessID == 0 || len(cancelMsg.Oids) == 0 {
  212. return nil, ErrInvalidMsg
  213. }
  214. return cancelMsg, nil
  215. }
  216. func (s *Service) writeRscCancel(c context.Context, opt *model.CancelOption) error {
  217. return s.dao.RscCancel(c, opt)
  218. }
  219. func (s *Service) newrsc(msg *databus.Message) (interface{}, error) {
  220. log.Info("databusgroup new msg key(%+v) partition(%d) offset(%d) value(%s) ", msg.Key, msg.Partition, msg.Offset, string(msg.Value))
  221. rscmsg := new(model.RscMsg)
  222. if err := json.Unmarshal(msg.Value, rscmsg); err != nil {
  223. log.Error("databusgroup json.Unmarshal for msg(%+v)", string(msg.Value))
  224. return nil, ErrInvalidMsg
  225. }
  226. key := fmt.Sprintf("%s_%d", rscmsg.Action, rscmsg.BizID)
  227. handler := s.findRscHandler(key)
  228. if handler == nil {
  229. log.Error("databusgroup can not find handler for msg key(%+v)", key)
  230. return nil, ErrHandlerMiss
  231. }
  232. data, err := handler.CheckMessage(rscmsg.Raw)
  233. if err != nil {
  234. log.Error("databusgroup new msg key(%+v) partition(%d) offset(%d) value(%s) CheckMessage(%v)", msg.Key, msg.Partition, msg.Offset, string(msg.Value), pkgerr.WithStack(err))
  235. }
  236. return data, err
  237. }
  238. func (s *Service) splitrsc(msg *databus.Message, data interface{}) int {
  239. switch t := data.(type) {
  240. case *model.AddOption:
  241. return int(t.BusinessID)
  242. case *model.UpdateOption:
  243. return int(t.BusinessID)
  244. case *model.CancelOption:
  245. return int(t.BusinessID)
  246. default:
  247. return 0
  248. }
  249. }
  250. func (s *Service) dorsc(bmsgs []interface{}) {
  251. for _, msg := range bmsgs {
  252. log.Info("databusgroup do msg(%+v)", msg)
  253. var key string
  254. switch t := msg.(type) {
  255. case *model.AddOption:
  256. key = fmt.Sprintf("%s%d", _prefixRscAdd, t.BusinessID)
  257. case *model.UpdateOption:
  258. key = fmt.Sprintf("%s%d", _prefixRscUpdate, t.BusinessID)
  259. case *model.CancelOption:
  260. key = fmt.Sprintf("%s%d", _prefixRscCancel, t.BusinessID)
  261. default:
  262. log.Error("databusgroup unknow msg(%+v)", msg)
  263. continue
  264. }
  265. handler := s.findRscHandler(key)
  266. if handler == nil {
  267. log.Error("databusgroup msg(%+v) handler NotFound", msg)
  268. continue
  269. }
  270. if err := handler.HandleMessage(context.Background(), msg); err != nil {
  271. log.Error("databusgroup msg(%+v) handler err(%v)", msg, pkgerr.WithStack(err))
  272. continue
  273. }
  274. }
  275. }
  276. func (s *Service) newtask(msg *databus.Message) (interface{}, error) {
  277. log.Info("databusgroup newtask msg key(%+v) partition(%d) offset(%d) value(%s) ", msg.Key, msg.Partition, msg.Offset, string(msg.Value))
  278. taskmsg := new(model.CreateTaskMsg)
  279. if err := json.Unmarshal(msg.Value, taskmsg); err != nil {
  280. log.Error("databusgroup newtask json.Unmarshal for msg(%+v)", string(msg.Value))
  281. return nil, ErrInvalidMsg
  282. }
  283. key := fmt.Sprintf("%s%d", _prefixTask, taskmsg.BizID)
  284. handler := s.findTaskHandler(key)
  285. if handler == nil {
  286. log.Error("databusgroup can not find handler for msg key(%+v)", key)
  287. return nil, ErrHandlerMiss
  288. }
  289. data, err := handler.CheckMessage(msg)
  290. if err != nil {
  291. errmsg := fmt.Sprintf("databusgroup new msg key(%+v) partition(%d) offset(%d) value(%s) CheckMessage(%v)", msg.Key, msg.Partition, msg.Offset, string(msg.Value), pkgerr.WithStack(err))
  292. if err == ErrTaskDuplicate {
  293. log.Warn(errmsg)
  294. } else {
  295. log.Error(errmsg)
  296. }
  297. }
  298. return data, err
  299. }
  300. func (s *Service) splittask(msg *databus.Message, data interface{}) int {
  301. if t, ok := data.(*model.Task); ok {
  302. return int(t.BusinessID)
  303. }
  304. return 0
  305. }
  306. func (s *Service) dotask(bmsgs []interface{}) {
  307. for _, msg := range bmsgs {
  308. log.Info("databusgroup dotask msg(%+v)", msg)
  309. var key string
  310. if t, ok := msg.(*model.Task); ok {
  311. key = fmt.Sprintf("%s%d", _prefixTask, t.BusinessID)
  312. } else {
  313. log.Error("databusgroup dotask unknow msg(%+v)", msg)
  314. continue
  315. }
  316. handler := s.findTaskHandler(key)
  317. if handler == nil {
  318. log.Error("databusgroup dotask msg(%+v) handler NotFound", msg)
  319. continue
  320. }
  321. if err := handler.HandleMessage(context.Background(), msg); err != nil {
  322. log.Error("databusgroup dotask msg(%+v) handler err(%v)", msg, pkgerr.WithStack(err))
  323. continue
  324. }
  325. }
  326. }