sync_all.go 8.2 KB


  1. package service
  2. //var (
  3. //may = time.Now().AddDate(0, -1, 0)
  4. //apr = time.Now().AddDate(0, -2, 0)
  5. //)
  6. /*
  7. func (s *Service) syncToken(tabIdx string, min, max int64) {
  8. var minID int64
  9. var t time.Time
  10. if tabIdx == "201805" {
  11. t = may
  12. minID = 167003927
  13. } else if tabIdx == "201804" {
  14. t = apr
  15. minID = min
  16. }
  17. for {
  18. tokens, err := s.dao.TokenMultiQuery(context.Background(), tabIdx, minID, s.c.SyncLines)
  19. if err != nil {
  20. log.Error("s.dao.TokenMultiQuery err, sleep 1s %v", err)
  21. time.Sleep(1 * time.Second)
  22. continue
  23. }
  24. if len(tokens) == 0 {
  25. break
  26. }
  27. minID = setToken(tokens, s, minID, t)
  28. if max > 0 && max <= minID {
  29. break
  30. }
  31. }
  32. }
  33. func setToken(tokens []*model.OldToken, s *Service, minID int64, t time.Time) int64 {
  34. newTokens := make([]*model.Token, 0, 51)
  35. newRefreshs := make([]*model.Refresh, 0, 51)
  36. for _, token := range tokens {
  37. newToken := &model.Token{
  38. Mid: token.Mid,
  39. AppID: token.AppID,
  40. Token: token.AccessToken,
  41. Expires: token.Expires,
  42. Type: token.Type,
  43. }
  44. newTokens = append(newTokens, newToken)
  45. if len(newTokens) == 50 {
  46. s.dao.BatchAddToken(context.Background(), newTokens, t)
  47. newTokens = make([]*model.Token, 0, 51)
  48. }
  49. if minID < token.ID {
  50. minID = token.ID
  51. }
  52. if token.RefreshToken == "" || token.Expires < time.Now().Unix() {
  53. continue
  54. }
  55. newRefresh := &model.Refresh{
  56. Mid: token.Mid,
  57. AppID: token.AppID,
  58. Refresh: token.RefreshToken,
  59. Token: token.AccessToken,
  60. Expires: token.Expires,
  61. }
  62. newRefreshs = append(newRefreshs, newRefresh)
  63. if len(newRefreshs) == 50 {
  64. s.dao.BatchAddRefresh(context.Background(), newRefreshs, t)
  65. newRefreshs = make([]*model.Refresh, 0, 51)
  66. }
  67. }
  68. if len(newTokens) != 0 {
  69. for _, token := range newTokens {
  70. tokenByte, err := hex.DecodeString(token.Token)
  71. if err != nil {
  72. log.Error("setNewToken error: tokenByte decode hex err: %v", token)
  73. continue
  74. }
  75. s.dao.AddToken(context.Background(), token, tokenByte, t)
  76. }
  77. }
  78. if len(newRefreshs) != 0 {
  79. for _, refresh := range newRefreshs {
  80. refreshByte, err := hex.DecodeString(refresh.Refresh)
  81. tokenByteR, err1 := hex.DecodeString(refresh.Refresh)
  82. if err != nil || err1 != nil {
  83. log.Error("setNewToken error: refreshByte decode hex err: %v", refresh)
  84. continue
  85. }
  86. s.dao.AddRefresh(context.Background(), refresh, refreshByte, tokenByteR, t)
  87. }
  88. }
  89. return minID
  90. }
  91. func (s *Service) setNewToken(token *model.OldToken) (err error) {
  92. newToken := &model.Token{
  93. Mid: token.Mid,
  94. AppID: token.AppID,
  95. Token: token.AccessToken,
  96. Expires: token.Expires,
  97. Type: token.Type,
  98. }
  99. var t time.Time
  100. t, err = time.ParseInLocation("2006-01-02T15:04:05Z07:00", token.CTime, time.Local)
  101. if err != nil {
  102. log.Error("setNewToken error: ctime parse err. %v", token)
  103. return
  104. }
  105. tokenByte, err := hex.DecodeString(token.AccessToken)
  106. if err != nil {
  107. log.Error("setNewToken error: tokenByte decode hex err: %v", token)
  108. return
  109. }
  110. s.dao.AddToken(context.Background(), newToken, tokenByte, t)
  111. if token.RefreshToken == "" || t.Unix() < 1525104000 {
  112. return
  113. }
  114. newRefresh := &model.Refresh{
  115. Mid: token.Mid,
  116. AppID: token.AppID,
  117. Refresh: token.RefreshToken,
  118. Token: token.AccessToken,
  119. Expires: token.Expires,
  120. }
  121. refreshByte, err := hex.DecodeString(token.RefreshToken)
  122. if err != nil {
  123. log.Error("setNewToken error: refreshByte decode hex err: %v", token)
  124. return
  125. }
  126. tokenByteR, err := hex.DecodeString(token.AccessToken)
  127. if err != nil {
  128. log.Error("setNewToken error: tokenByte decode hex err: %v", token)
  129. return
  130. }
  131. s.dao.AddRefresh(context.Background(), newRefresh, refreshByte, tokenByteR, t)
  132. return
  133. }
  134. func (s *Service) syncCookie(tabIdx int64) {
  135. var minID int64
  136. for {
  137. now := time.Now().Unix()
  138. cookies, err := s.dao.CookieMultiQuery(context.Background(), tabIdx, minID, now, s.c.SyncLines)
  139. if err != nil {
  140. log.Error("s.dao.CookieMultiQuery err, sleep 1s %v", err)
  141. time.Sleep(1 * time.Second)
  142. continue
  143. }
  144. if len(cookies) == 0 {
  145. log.Info("syncCookie finished tabIdx : %d", tabIdx)
  146. break
  147. }
  148. minID = s.setCookie(cookies, now, minID)
  149. }
  150. }
  151. func (s *Service) setCookie(cookies []*model.OldCookie, now int64, minID int64) int64 {
  152. today := time.Now()
  153. newCookies := make([]*model.Cookie, 0, 51)
  154. var count int64
  155. for _, cookie := range cookies {
  156. if count%1000 == 0 {
  157. // for down cpu
  158. time.Sleep(10 * time.Millisecond)
  159. }
  160. if minID < cookie.ID {
  161. minID = cookie.ID
  162. }
  163. if len(newCookies) == 50 {
  164. s.dao.AddMultiCookie(context.Background(), newCookies, today)
  165. newCookies = make([]*model.Cookie, 0, 51)
  166. }
  167. var t time.Time
  168. t, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", cookie.ModifyTime, time.Local)
  169. if err != nil {
  170. log.Error("setNewToken error: ctime parse err. %v", cookie)
  171. continue
  172. }
  173. if cookie.Expires > now && t.Unix() > 1527782400 {
  174. newCookies = append(newCookies, &model.Cookie{
  175. Mid: cookie.Mid,
  176. Session: cookie.Session,
  177. CSRF: cookie.CSRFToken,
  178. Type: cookie.Type,
  179. Expires: cookie.Expires,
  180. })
  181. }
  182. count = count + 1
  183. }
  184. if len(newCookies) != 0 {
  185. for _, newCookie := range newCookies {
  186. csrfByte, err := hex.DecodeString(newCookie.CSRF)
  187. if err != nil {
  188. log.Error("hex.DecodeString csrf err: %v, %v", newCookie, err)
  189. continue
  190. }
  191. s.dao.AddCookie(context.Background(), newCookie, []byte(newCookie.Session), csrfByte, today)
  192. }
  193. }
  194. return minID
  195. }
  196. */
  197. // func (s *Service) setNewCookie(cookie *model.OldCookie) (err error) {
  198. // newCookie := &model.Cookie{
  199. // Mid: cookie.Mid,
  200. // Session: cookie.Session,
  201. // CSRF: cookie.CSRFToken,
  202. // Type: cookie.Type,
  203. // Expires: cookie.Expires,
  204. // }
  205. // csrfByte, err := hex.DecodeString(newCookie.CSRF)
  206. // if err != nil {
  207. // log.Error("hex.DecodeString csrf err: %v, %v", cookie, err)
  208. // return
  209. // }
  210. // if _, err = s.dao.AddCookie(context.Background(), newCookie, []byte(newCookie.Session), csrfByte, time.Now()); err != nil {
  211. // log.Error("s.dao.AddCookie %v, %v", cookie, err)
  212. // return
  213. // }
  214. // return nil
  215. // }
  216. /*
  217. func (s *Service) compareToken(tabIdx string, min int64, max int64) {
  218. var t time.Time
  219. if tabIdx == "201805" {
  220. t = may
  221. } else if tabIdx == "201804" {
  222. t = apr
  223. } else {
  224. t = time.Now()
  225. }
  226. minID := min
  227. for {
  228. tokens, err := s.dao.BatchGetToken(context.Background(), minID, s.c.SyncLines, t)
  229. if err != nil {
  230. log.Error("compareToken error: s.dao.BatchGetToken err, sleep 1s %v", err)
  231. time.Sleep(1 * time.Second)
  232. continue
  233. }
  234. if len(tokens) == 0 {
  235. break
  236. }
  237. tokenMap, err := s.dao.TokenBatchQuery(context.Background(), tabIdx, tokens)
  238. if err != nil {
  239. log.Error("compareToken error: s.dao.TokenBatchQuery err, sleep 1s %v", err)
  240. time.Sleep(1 * time.Second)
  241. continue
  242. }
  243. for _, newToken := range tokens {
  244. if tokenMap[newToken.Token] == "" {
  245. tokenByte, err := hex.DecodeString(newToken.Token)
  246. if err != nil {
  247. log.Error("compareToken error: tokenByte decode hex err: %v", err)
  248. continue
  249. }
  250. s.dao.DelToken(context.Background(), tokenByte, t)
  251. }
  252. }
  253. minID = tokens[len(tokens)-1].ID
  254. if max > 0 && max <= minID {
  255. break
  256. }
  257. }
  258. log.Info("token compare finished %s, %d - %d", tabIdx, minID, max)
  259. }
  260. func (s *Service) compareCookie(tabIdx string, min int64) {
  261. var t time.Time
  262. if tabIdx == "201805" {
  263. t = may
  264. } else if tabIdx == "201804" {
  265. t = apr
  266. } else {
  267. t = time.Now()
  268. }
  269. minID := min
  270. for {
  271. cookies, err := s.dao.BatchGetCookie(context.Background(), minID, s.c.SyncLines, t)
  272. if err != nil {
  273. log.Error("compareCookie error: s.dao.BatchGetCookie err, sleep 1s %v", err)
  274. time.Sleep(1 * time.Second)
  275. continue
  276. }
  277. if len(cookies) == 0 {
  278. break
  279. }
  280. tabIdxMap := make(map[int64][]string)
  281. for _, newCookie := range cookies {
  282. oldTabIdx := newCookie.Mid % 30
  283. tabIdxMap[oldTabIdx] = append(tabIdxMap[oldTabIdx], newCookie.Session)
  284. }
  285. sessionMap, err := s.dao.CookieBatchQuery(context.Background(), tabIdxMap)
  286. if err != nil {
  287. log.Error("compareCookie error: s.dao.CookieBatchQuery err, sleep 1s %v", err)
  288. time.Sleep(1 * time.Second)
  289. continue
  290. }
  291. for _, newCookie := range cookies {
  292. if sessionMap[newCookie.Session] == "" {
  293. s.dao.DelCookie(context.Background(), []byte(newCookie.Session), t)
  294. }
  295. }
  296. minID = cookies[len(cookies)-1].ID
  297. }
  298. }*/