service.go 14 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "go-common/app/job/main/spy/conf"
  10. "go-common/app/job/main/spy/dao"
  11. "go-common/app/job/main/spy/model"
  12. cmmdl "go-common/app/service/main/spy/model"
  13. spyrpc "go-common/app/service/main/spy/rpc/client"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. "go-common/library/stat/prom"
  17. xtime "go-common/library/time"
  18. "github.com/robfig/cron"
  19. )
  20. const (
  21. _bigdataEvent = "bt"
  22. _secretEvent = "st"
  23. )
  24. // Service service def.
  25. type Service struct {
  26. c *conf.Config
  27. waiter sync.WaitGroup
  28. dao *dao.Dao
  29. eventDatabus *databus.Databus
  30. spystatDatabus *databus.Databus
  31. secLoginDatabus *databus.Databus
  32. spyRPC *spyrpc.Service
  33. quit chan struct{}
  34. cachech chan func()
  35. retrych chan func()
  36. spyConfig map[string]interface{}
  37. configLoadTick time.Duration
  38. promBlockInfo *prom.Prom
  39. blockTick time.Duration
  40. blockWaitTick time.Duration
  41. allEventName map[string]int64
  42. loadEventTick time.Duration
  43. // activity events
  44. activityEvents map[string]struct{}
  45. }
  46. // New create a instance of Service and return.
  47. func New(c *conf.Config) (s *Service) {
  48. s = &Service{
  49. c: c,
  50. dao: dao.New(c),
  51. eventDatabus: databus.New(c.Databus.EventData),
  52. spystatDatabus: databus.New(c.Databus.SpyStatData),
  53. secLoginDatabus: databus.New(c.Databus.SecLogin),
  54. spyRPC: spyrpc.New(c.SpyRPC),
  55. quit: make(chan struct{}),
  56. cachech: make(chan func(), 1024),
  57. retrych: make(chan func(), 128),
  58. spyConfig: make(map[string]interface{}),
  59. configLoadTick: time.Duration(c.Property.ConfigLoadTick),
  60. promBlockInfo: prom.New().WithCounter("spy_block_info", []string{"name"}),
  61. blockTick: time.Duration(c.Property.BlockTick),
  62. blockWaitTick: time.Duration(c.Property.BlockWaitTick),
  63. allEventName: make(map[string]int64),
  64. loadEventTick: time.Duration(c.Property.LoadEventTick),
  65. }
  66. if err := s.loadSystemConfig(); err != nil {
  67. panic(err)
  68. }
  69. if err := s.loadeventname(); err != nil {
  70. panic(err)
  71. }
  72. s.initActivityEvents()
  73. s.waiter.Add(1)
  74. go s.consumeproc()
  75. s.waiter.Add(1)
  76. go s.secloginproc()
  77. s.waiter.Add(1)
  78. go s.spystatproc()
  79. go s.retryproc()
  80. go s.cacheproc()
  81. go s.loadconfig()
  82. go s.blockcacheuser()
  83. go s.loadeventproc()
  84. t := cron.New()
  85. if err := t.AddFunc(s.c.Property.Block.CycleCron, s.cycleblock); err != nil {
  86. panic(err)
  87. }
  88. if err := t.AddFunc(s.c.Property.ReportCron, s.reportjob); err != nil {
  89. panic(err)
  90. }
  91. t.Start()
  92. return s
  93. }
  94. func (s *Service) consumeproc() {
  95. defer s.waiter.Done()
  96. defer func() {
  97. if x := recover(); x != nil {
  98. log.Error("eventproc unknown panic(%v)", x)
  99. }
  100. }()
  101. var (
  102. msg *databus.Message
  103. eventMsg *model.EventMessage
  104. ok bool
  105. err error
  106. msgChan = s.eventDatabus.Messages()
  107. c = context.TODO()
  108. preOffset int64
  109. )
  110. for {
  111. select {
  112. case msg, ok = <-msgChan:
  113. if !ok {
  114. log.Info("eventConsumeProc msgChan closed")
  115. return
  116. }
  117. case <-s.quit:
  118. log.Info("quit eventConsumeProc")
  119. return
  120. }
  121. if err = msg.Commit(); err != nil {
  122. log.Error("msg.Commit error(%v)", err)
  123. }
  124. if preOffset, err = s.dao.OffsetCache(c, _bigdataEvent, msg.Partition); err != nil {
  125. log.Error("s.dao.OffsetCache(%d) error(%v)", msg.Partition, err)
  126. preOffset = 0
  127. } else {
  128. if msg.Offset > preOffset {
  129. s.setOffset(_bigdataEvent, msg.Partition, msg.Offset)
  130. }
  131. }
  132. eventMsg = &model.EventMessage{}
  133. if err = json.Unmarshal([]byte(msg.Value), eventMsg); err != nil {
  134. log.Error("json.Unmarshall(%s) error(%v)", msg.Value, err)
  135. s.setOffset(_bigdataEvent, msg.Partition, msg.Offset)
  136. continue
  137. }
  138. if msg.Offset <= preOffset && s.isMsgExpiration(eventMsg.Time) {
  139. log.Error("drop expired msg (%+v) (now_offset : %d)", preOffset)
  140. continue
  141. }
  142. if err = s.handleEvent(c, eventMsg); err != nil {
  143. log.Error("s.HandleEvent(%v) error(%v)", eventMsg, err)
  144. continue
  145. }
  146. log.Info("s.handleEvent(%v) eventMsg", eventMsg)
  147. }
  148. }
  149. func (s *Service) setOffset(event string, partition int32, offset int64) {
  150. s.cachemiss(func() {
  151. var err error
  152. if err = s.dao.SetOffsetCache(context.TODO(), event, partition, offset); err != nil {
  153. log.Error("s.dao.SetOffsetCache(%d,%d) error(%v)", partition, offset, err)
  154. }
  155. })
  156. }
  157. func (s *Service) secloginproc() {
  158. defer s.waiter.Done()
  159. defer func() {
  160. if x := recover(); x != nil {
  161. log.Error("eventproc unknown panic(%v)", x)
  162. }
  163. }()
  164. var (
  165. msg *databus.Message
  166. eventMsg *model.EventMessage
  167. ok bool
  168. err error
  169. msgChan = s.secLoginDatabus.Messages()
  170. c = context.TODO()
  171. preOffset int64
  172. )
  173. for {
  174. select {
  175. case msg, ok = <-msgChan:
  176. if !ok {
  177. log.Info("secloginproc msgChan closed")
  178. return
  179. }
  180. case <-s.quit:
  181. log.Info("quit secloginproc")
  182. return
  183. }
  184. if err = msg.Commit(); err != nil {
  185. log.Error("msg.Commit error(%v)", err)
  186. }
  187. if preOffset, err = s.dao.OffsetCache(c, _secretEvent, msg.Partition); err != nil {
  188. log.Error("s.dao.OffsetCache(%d) error(%v)", msg.Partition, err)
  189. preOffset = 0
  190. } else {
  191. if msg.Offset > preOffset {
  192. s.setOffset(_secretEvent, msg.Partition, msg.Offset)
  193. }
  194. }
  195. eventMsg = &model.EventMessage{}
  196. if err = json.Unmarshal([]byte(msg.Value), eventMsg); err != nil {
  197. log.Error("json.Unmarshall(%s) error(%v)", msg.Value, err)
  198. s.setOffset(_secretEvent, msg.Partition, msg.Offset)
  199. continue
  200. }
  201. if msg.Offset <= preOffset && s.isMsgExpiration(eventMsg.Time) {
  202. log.Error("drop expired msg (%+v) (now_offset : %d)", preOffset)
  203. continue
  204. }
  205. if err = s.handleEvent(c, eventMsg); err != nil {
  206. log.Error("s.HandleEvent(%v) error(%v)", eventMsg, err)
  207. continue
  208. }
  209. log.Info("s.handleEvent(%v) eventMsg", eventMsg)
  210. }
  211. }
  212. func (s *Service) isMsgExpiration(timeStr string) bool {
  213. var (
  214. eventTime time.Time
  215. now = time.Now()
  216. err error
  217. )
  218. if eventTime, err = time.Parse("2006-01-02 15:04:05", timeStr); err != nil {
  219. return true
  220. }
  221. return eventTime.AddDate(0, 1, 1).Before(now)
  222. }
  223. func (s *Service) handleEvent(c context.Context, event *model.EventMessage) (err error) {
  224. var (
  225. eventTime time.Time
  226. argBytes []byte
  227. )
  228. if eventTime, err = time.Parse("2006-01-02 15:04:05", event.Time); err != nil {
  229. log.Error("time.Parse(%s) errore(%v)", event.Time, err)
  230. return
  231. }
  232. if argBytes, err = json.Marshal(event.Args); err != nil {
  233. log.Error("json.Marshal(%v) error(%v), so empty it", event.Args, err)
  234. argBytes = []byte("{}")
  235. }
  236. var argHandleEvent = &cmmdl.ArgHandleEvent{
  237. Time: xtime.Time(eventTime.Unix()),
  238. IP: event.IP,
  239. Service: event.Service,
  240. Event: event.Event,
  241. ActiveMid: event.ActiveMid,
  242. TargetMid: event.TargetMid,
  243. TargetID: event.TargetID,
  244. Args: string(argBytes),
  245. Result: event.Result,
  246. Effect: event.Effect,
  247. RiskLevel: event.RiskLevel,
  248. }
  249. if err = s.spyRPC.HandleEvent(c, argHandleEvent); err != nil {
  250. log.Error("s.spyRPC.HandleEvent(%v) error(%v)", argHandleEvent, err)
  251. // s.retrymiss(func() {
  252. // log.Info("Start retry rpc error(%v)", err)
  253. // for {
  254. // if err = s.spyRPC.HandleEvent(context.TODO(), argHandleEvent); err != nil {
  255. // log.Error("s.spyRPC.HandleEvent(%v) error(%v)", argHandleEvent, err)
  256. // } else {
  257. // break
  258. // }
  259. // }
  260. // log.Info("End retry error(%v)", err)
  261. // })
  262. return
  263. }
  264. return
  265. }
  266. func (s *Service) dataproc() {
  267. for {
  268. select {
  269. case <-s.quit:
  270. log.Info("quit handleData")
  271. return
  272. default:
  273. }
  274. if !s.c.Property.Debug {
  275. d := time.Now().AddDate(0, 0, 1)
  276. ts := time.Date(d.Year(), d.Month(), d.Day(), 5, 0, 0, 0, time.Local).Sub(time.Now())
  277. time.Sleep(ts)
  278. } else {
  279. time.Sleep(s.c.Property.TaskTimer * time.Second)
  280. }
  281. s.reBuild()
  282. }
  283. }
  284. // Ping check service health.
  285. func (s *Service) Ping(c context.Context) (err error) {
  286. if err = s.dao.Ping(c); err != nil {
  287. log.Error("s.db.Ping() error(%v)", err)
  288. return
  289. }
  290. return
  291. }
  292. // Close all resource.
  293. func (s *Service) Close() (err error) {
  294. close(s.quit)
  295. s.dao.Close()
  296. if err = s.eventDatabus.Close(); err != nil {
  297. log.Error("s.db.Close() error(%v)", err)
  298. return
  299. }
  300. return
  301. }
  302. // Wait wait all closed.
  303. func (s *Service) Wait() {
  304. s.waiter.Wait()
  305. }
  306. func (s *Service) cacheproc() {
  307. defer func() {
  308. if x := recover(); x != nil {
  309. log.Error("service.cacheproc panic(%v)", x)
  310. go s.cacheproc()
  311. log.Info("service.cacheproc recover")
  312. }
  313. }()
  314. for {
  315. f := <-s.cachech
  316. f()
  317. }
  318. }
  319. func (s *Service) retryproc() {
  320. defer func() {
  321. if x := recover(); x != nil {
  322. log.Error("service.retryproc panic(%v)", x)
  323. go s.retryproc()
  324. log.Info("service.retryproc recover")
  325. }
  326. }()
  327. for {
  328. f := <-s.retrych
  329. go f()
  330. }
  331. }
  332. func (s *Service) cachemiss(f func()) {
  333. defer func() {
  334. if x := recover(); x != nil {
  335. log.Error("service.cachemiss panic(%v)", x)
  336. }
  337. }()
  338. select {
  339. case s.cachech <- f:
  340. default:
  341. log.Error("service.cachech full")
  342. }
  343. }
  344. func (s *Service) retrymiss(f func()) {
  345. defer func() {
  346. if x := recover(); x != nil {
  347. log.Error("service.retrymiss panic(%v)", x)
  348. }
  349. }()
  350. select {
  351. case s.retrych <- f:
  352. default:
  353. log.Error("service.retrych full")
  354. }
  355. }
  356. func (s *Service) loadconfig() {
  357. defer func() {
  358. if x := recover(); x != nil {
  359. log.Error("service.cycleblock panic(%v)", x)
  360. }
  361. }()
  362. for {
  363. time.Sleep(s.configLoadTick)
  364. s.loadSystemConfig()
  365. }
  366. }
  367. func (s *Service) loadSystemConfig() (err error) {
  368. var (
  369. cdb map[string]string
  370. )
  371. cdb, err = s.dao.Configs(context.TODO())
  372. if err != nil {
  373. log.Error("sys config db get data err(%v)", err)
  374. return
  375. }
  376. if len(cdb) == 0 {
  377. err = errors.New("sys config no data")
  378. return
  379. }
  380. cs := make(map[string]interface{}, len(cdb))
  381. for k, v := range cdb {
  382. switch k {
  383. case model.LimitBlockCount:
  384. t, err1 := strconv.ParseInt(v, 10, 64)
  385. if err1 != nil {
  386. log.Error("sys config err(%s,%v,%v)", model.LimitBlockCount, t, err1)
  387. err = err1
  388. return
  389. }
  390. cs[k] = t
  391. case model.LessBlockScore:
  392. tmp, err1 := strconv.ParseInt(v, 10, 8)
  393. if err1 != nil {
  394. log.Error("sys config err(%s,%v,%v)", model.LessBlockScore, tmp, err1)
  395. err = err1
  396. return
  397. }
  398. cs[k] = int8(tmp)
  399. case model.AutoBlock:
  400. tmp, err1 := strconv.ParseInt(v, 10, 8)
  401. if err1 != nil {
  402. log.Error("sys config err(%s,%v,%v)", model.AutoBlock, tmp, err1)
  403. err = err1
  404. return
  405. }
  406. cs[k] = int8(tmp)
  407. default:
  408. cs[k] = v
  409. }
  410. }
  411. s.spyConfig = cs
  412. log.Info("loadSystemConfig success(%v)", cs)
  413. return
  414. }
  415. //Config get config.
  416. func (s *Service) Config(key string) (interface{}, bool) {
  417. if s.spyConfig == nil {
  418. return nil, false
  419. }
  420. v, ok := s.spyConfig[key]
  421. return v, ok
  422. }
  423. func (s *Service) cycleblock() {
  424. var (
  425. c = context.TODO()
  426. )
  427. log.Info("cycleblock start (%v)", time.Now())
  428. if b, err := s.dao.SetNXLockCache(c, model.BlockLockKey, model.DefLockTime); !b || err != nil {
  429. log.Error("cycleblock had run (%v,%v)", b, err)
  430. return
  431. }
  432. s.BlockTask(c)
  433. s.dao.DelLockCache(c, model.BlockLockKey)
  434. log.Info("cycleblock end (%v)", time.Now())
  435. }
  436. func (s *Service) blockcacheuser() {
  437. defer func() {
  438. if x := recover(); x != nil {
  439. log.Error("service.blockcacheuser panic(%v)", x)
  440. go s.blockcacheuser()
  441. log.Info("service.blockcacheuser recover")
  442. }
  443. }()
  444. for {
  445. mid, err := s.dao.SPOPBlockCache(context.TODO())
  446. if err != nil {
  447. log.Error("blockcacheuser err (%v,%v)", mid, err)
  448. continue
  449. }
  450. if mid != 0 {
  451. s.blockByMid(context.TODO(), mid)
  452. time.Sleep(s.blockTick)
  453. } else {
  454. // when no user should be block
  455. time.Sleep(s.blockWaitTick)
  456. }
  457. }
  458. }
  459. func (s *Service) reportjob() {
  460. var (
  461. c = context.TODO()
  462. )
  463. defer func() {
  464. if x := recover(); x != nil {
  465. log.Error("service.reportjob panic(%v)", x)
  466. go s.blockcacheuser()
  467. log.Info("service.reportjob recover")
  468. }
  469. }()
  470. log.Info("reportjob start (%v)", time.Now())
  471. if b, err := s.dao.SetNXLockCache(c, model.ReportJobKey, model.DefLockTime); !b || err != nil {
  472. log.Error("reportjob had run (%v,%v)", b, err)
  473. return
  474. }
  475. s.AddReport(c)
  476. log.Info("reportjob end (%v)", time.Now())
  477. }
  478. func (s *Service) spystatproc() {
  479. defer s.waiter.Done()
  480. defer func() {
  481. if x := recover(); x != nil {
  482. log.Error("sinstatproc unknown panic(%v)", x)
  483. }
  484. }()
  485. var (
  486. msg *databus.Message
  487. statMsg *model.SpyStatMessage
  488. ok bool
  489. err error
  490. msgChan = s.spystatDatabus.Messages()
  491. c = context.TODO()
  492. )
  493. for {
  494. select {
  495. case msg, ok = <-msgChan:
  496. if !ok {
  497. log.Info("spystatproc msgChan closed")
  498. return
  499. }
  500. case <-s.quit:
  501. log.Info("quit spystatproc")
  502. return
  503. }
  504. if err = msg.Commit(); err != nil {
  505. log.Error("msg.Commit error(%v)", err)
  506. }
  507. statMsg = &model.SpyStatMessage{}
  508. if err = json.Unmarshal([]byte(msg.Value), statMsg); err != nil {
  509. log.Error("json.Unmarshall(%s) error(%v)", msg.Value, err)
  510. continue
  511. }
  512. log.Info(" spystatproc (%v) start", statMsg)
  513. // check uuid
  514. unique, _ := s.dao.PfaddCache(c, statMsg.UUID)
  515. if !unique {
  516. log.Error("stat duplicate msg (%s) error", statMsg)
  517. continue
  518. }
  519. s.UpdateStatData(c, statMsg)
  520. log.Info(" spystatproc (%v) handle", statMsg)
  521. }
  522. }
  523. func (s *Service) loadeventname() (err error) {
  524. var (
  525. c = context.Background()
  526. es []*model.Event
  527. )
  528. es, err = s.dao.AllEvent(c)
  529. if err != nil {
  530. log.Error("loadeventname allevent error(%v)", err)
  531. return
  532. }
  533. tmp := make(map[string]int64, len(es))
  534. for _, e := range es {
  535. tmp[e.Name] = e.ID
  536. }
  537. s.allEventName = tmp
  538. log.Info("loadeventname (%v) load success", tmp)
  539. return
  540. }
  541. func (s *Service) loadeventproc() {
  542. for {
  543. time.Sleep(s.loadEventTick)
  544. s.loadeventname()
  545. }
  546. }
  547. func (s *Service) initActivityEvents() {
  548. tmp := make(map[string]struct{}, len(s.c.Property.ActivityEvents))
  549. for _, name := range s.c.Property.ActivityEvents {
  550. tmp[name] = struct{}{}
  551. }
  552. s.activityEvents = tmp
  553. }