contact_bind_log.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package service
  2. import (
  3. "context"
  4. "crypto/sha1"
  5. "encoding/base64"
  6. "encoding/json"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/passport/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. func (s *Service) contactBindLogconsumeproc() {
  14. mergeRoutineNum := int64(s.c.Group.ContactBindLog.Num)
  15. for {
  16. msg, ok := <-s.dsContactBindLog.Messages()
  17. if !ok {
  18. log.Error("s.telBindlogconsumeproc closed")
  19. return
  20. }
  21. m := &message{data: msg}
  22. p := &model.BMsg{}
  23. if err := json.Unmarshal(msg.Value, p); err != nil {
  24. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  25. continue
  26. }
  27. //m.object = p
  28. mid := int64(0)
  29. switch {
  30. case strings.HasPrefix(p.Table, _telBindTable):
  31. t := new(model.TelBindLog)
  32. if err := json.Unmarshal(p.New, t); err != nil {
  33. log.Error("json.Unmarshal(%s) error(%v)", string(p.New), err)
  34. continue
  35. }
  36. mid = t.Mid
  37. m.object = p
  38. log.Info("contactBindLogconsumeproc table:%s key:%s partition:%d offset:%d", p.Table, msg.Key, msg.Partition, msg.Offset)
  39. case strings.HasPrefix(p.Table, _emailBindTable):
  40. t := new(model.EmailBindLog)
  41. if err := json.Unmarshal(p.New, t); err != nil {
  42. log.Error("json.Unmarshal(%s) error(%v)", string(p.New), err)
  43. continue
  44. }
  45. mid = t.Mid
  46. m.object = p
  47. log.Info("contactBindLogconsumeproc table:%s key:%s partition:%d offset:%d", p.Table, msg.Key, msg.Partition, msg.Offset)
  48. default:
  49. log.Warn("unrecognized message: %+v", p)
  50. continue
  51. }
  52. if mid == 0 {
  53. log.Warn("invalid message: %+v", p)
  54. continue
  55. }
  56. s.contactBindLogMu.Lock()
  57. if s.contactBindLogHead == nil {
  58. s.contactBindLogHead = m
  59. s.contactBindLogLast = m
  60. } else {
  61. s.contactBindLogLast.next = m
  62. s.contactBindLogLast = m
  63. }
  64. s.contactBindLogMu.Unlock()
  65. // use specify goroutine to merge messages
  66. s.contactBindLogMergeChans[mid%mergeRoutineNum] <- m
  67. log.Info("contactBindLogconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  68. }
  69. }
  70. func (s *Service) contactBindLogcommitproc() {
  71. commits := make(map[int32]*databus.Message, s.c.Group.Log.Size)
  72. for {
  73. done := <-s.contactBindLogDoneChan
  74. // merge partitions to commit offset
  75. for _, d := range done {
  76. d.done = true
  77. }
  78. s.contactBindLogMu.Lock()
  79. for ; s.contactBindLogHead != nil && s.contactBindLogHead.done; s.contactBindLogHead = s.contactBindLogHead.next {
  80. commits[s.contactBindLogHead.data.Partition] = s.contactBindLogHead.data
  81. }
  82. s.contactBindLogMu.Unlock()
  83. for k, m := range commits {
  84. log.Info("logcommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  85. m.Commit()
  86. delete(commits, k)
  87. }
  88. }
  89. }
  90. func (s *Service) contactBindLogMergeproc(c chan *message) {
  91. var (
  92. max = s.c.Group.ContactBindLog.Size
  93. merges = make([]*model.BMsg, 0, max)
  94. marked = make([]*message, 0, max)
  95. ticker = time.NewTicker(time.Duration(s.c.Group.Log.Ticker))
  96. )
  97. for {
  98. select {
  99. case msg, ok := <-c:
  100. if !ok {
  101. log.Error("s.contactBindLogMergeproc closed")
  102. return
  103. }
  104. p, assertOk := msg.object.(*model.BMsg)
  105. if !assertOk {
  106. log.Warn("s.contactBindLogMergeproc cannot convert BMsg")
  107. continue
  108. }
  109. //if p.Action != "insert" {
  110. // continue
  111. //}
  112. if p.Action == "delete" {
  113. continue
  114. }
  115. log.Info("s.contactBindLogMergeproc: %+v", msg)
  116. switch {
  117. case strings.HasPrefix(p.Table, _telBindTable) || strings.HasPrefix(p.Table, _emailBindTable):
  118. merges = append(merges, p)
  119. default:
  120. log.Warn("unrecognized the message: %+v", p)
  121. }
  122. marked = append(marked, msg)
  123. if len(marked) < max && len(merges) < max {
  124. continue
  125. }
  126. case <-ticker.C:
  127. }
  128. if len(merges) > 0 {
  129. s.contactBindLogProcessMerges(merges)
  130. merges = make([]*model.BMsg, 0, max)
  131. }
  132. if len(marked) > 0 {
  133. s.contactBindLogDoneChan <- marked
  134. marked = make([]*message, 0, max)
  135. }
  136. }
  137. }
  138. func (s *Service) contactBindLogProcessMerges(bmsgs []*model.BMsg) {
  139. for _, msg := range bmsgs {
  140. log.Info("contactBindLogProcessMerges: %+v", msg.Table)
  141. switch {
  142. case strings.HasPrefix(msg.Table, _telBindTable):
  143. t := new(model.TelBindLog)
  144. if err := json.Unmarshal(msg.New, t); err != nil {
  145. log.Error("json.Unmarshal(%s) error(%v)", string(msg.New), err)
  146. continue
  147. }
  148. s.handleTelBindLog(t)
  149. case strings.HasPrefix(msg.Table, _emailBindTable):
  150. t := new(model.EmailBindLog)
  151. if err := json.Unmarshal(msg.New, t); err != nil {
  152. log.Error("json.Unmarshal(%s) error(%v)", string(msg.New), err)
  153. continue
  154. }
  155. s.handleEmailBindLog(t)
  156. }
  157. }
  158. }
  159. type userLogExtra struct {
  160. EncryptTel string `json:"tel"`
  161. EncryptEmail string `json:"email"`
  162. }
  163. type userLog struct {
  164. Action string `json:"action"`
  165. Mid int64 `json:"mid"`
  166. Str0 string `json:"str_0"`
  167. ExtraData string `json:"extra_data"`
  168. Business int `json:"business"`
  169. CTime string `json:"ctime"`
  170. }
  171. func (s *Service) handleTelBindLog(telLog *model.TelBindLog) (err error) {
  172. var bindLog *model.TelBindLog
  173. for {
  174. bindLog, err = s.d.QueryTelBindLog(telLog.ID)
  175. if err != nil {
  176. log.Error("QueryTelBindLog (%v) err(%v)", telLog, err)
  177. time.Sleep(100 * time.Millisecond)
  178. continue
  179. }
  180. break
  181. }
  182. if bindLog == nil || bindLog.ID == 0 {
  183. log.Warn("telephone log (%v) nil", bindLog)
  184. return
  185. }
  186. rt, err := s.encrypt(bindLog.Tel)
  187. if err != nil {
  188. log.Error("aesEncrypt(%v) error(%v)", bindLog, err)
  189. return
  190. }
  191. extraData := userLogExtra{
  192. EncryptTel: rt,
  193. }
  194. hash := sha1.New()
  195. hash.Write([]byte(bindLog.Tel))
  196. extraDataBytes, err := json.Marshal(extraData)
  197. if err != nil {
  198. log.Error("extraData (%v) json marshal err(%v)", extraData, err)
  199. return
  200. }
  201. uLog := userLog{
  202. Action: "telBindLog",
  203. Mid: bindLog.Mid,
  204. Str0: base64.StdEncoding.EncodeToString(hash.Sum(s.hashSalt)),
  205. ExtraData: string(extraDataBytes),
  206. Business: 54,
  207. CTime: time.Unix(bindLog.Timestamp, 0).Format("2006-01-02 15:04:05"),
  208. }
  209. for {
  210. if err = s.userLogPub.Send(context.Background(), bindLog.Tel, uLog); err != nil {
  211. log.Error("databus send(%v) error(%v)", uLog, err)
  212. time.Sleep(100 * time.Millisecond)
  213. continue
  214. }
  215. log.Info("uselog pub uLog: %+v", uLog)
  216. break
  217. }
  218. return
  219. }
  220. func (s *Service) handleEmailBindLog(emailLog *model.EmailBindLog) (err error) {
  221. var bindLog *model.EmailBindLog
  222. for {
  223. bindLog, err = s.d.QueryEmailBindLog(emailLog.ID)
  224. if err != nil {
  225. log.Error("QueryEmailBindLog (%v) err(%v)", emailLog, err)
  226. time.Sleep(100 * time.Millisecond)
  227. continue
  228. }
  229. break
  230. }
  231. if bindLog == nil || bindLog.ID == 0 {
  232. log.Warn("email log (%v) nil", bindLog)
  233. return
  234. }
  235. rt, err := s.encrypt(bindLog.Email)
  236. if err != nil {
  237. log.Error("aesEncrypt(%v) error(%v)", bindLog, err)
  238. return
  239. }
  240. extraData := userLogExtra{
  241. EncryptEmail: rt,
  242. }
  243. hash := sha1.New()
  244. hash.Write([]byte(bindLog.Email))
  245. extraDataBytes, err := json.Marshal(extraData)
  246. if err != nil {
  247. log.Error("extraData (%v) json marshal err(%v)", extraData, err)
  248. return
  249. }
  250. uLog := userLog{
  251. Action: "emailBindLog",
  252. Mid: bindLog.Mid,
  253. Str0: base64.StdEncoding.EncodeToString(hash.Sum(s.hashSalt)),
  254. ExtraData: string(extraDataBytes),
  255. Business: 54,
  256. CTime: time.Unix(bindLog.Timestamp, 0).Format("2006-01-02 15:04:05"),
  257. }
  258. for {
  259. if err = s.userLogPub.Send(context.Background(), bindLog.Email, uLog); err != nil {
  260. log.Error("databus send(%v) error(%v)", uLog, err)
  261. time.Sleep(100 * time.Millisecond)
  262. continue
  263. }
  264. log.Info("uselog pub uLog: %+v", uLog)
  265. break
  266. }
  267. return
  268. }