shot.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/hmac"
  6. "crypto/sha1"
  7. "encoding/base64"
  8. "encoding/json"
  9. "fmt"
  10. "io/ioutil"
  11. "net/http"
  12. "net/url"
  13. "path/filepath"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "go-common/app/job/main/videoup/model/archive"
  18. "go-common/app/job/main/videoup/model/redis"
  19. "go-common/library/ecode"
  20. "go-common/library/log"
  21. )
  22. const (
  23. videoshotPath = "http://bfs.bilibili.co/bfs/"
  24. videoshotBucket = "videoshot"
  25. videoshotBfsKey = "624480b94ec1e0eb"
  26. videoshotBfsSecret = "303c6cfa85cf2d550fa5050d123cd2"
  27. archiveBucket = "archive"
  28. archiveBfsKey = "8d4e593ba7555502"
  29. archiveBfsSecret = "0bdbd4c7caeeddf587c3c4daec0475"
  30. )
  31. var (
  32. videoshotHTTPClient = &http.Client{Timeout: time.Second * 5}
  33. )
  34. // videoshotSHConsumer is videoshot consumer in shanghai 云立方.
  35. func (s *Service) videoshotSHConsumer() {
  36. defer s.wg.Done()
  37. var (
  38. msgs = s.videoshotSub2.Messages()
  39. )
  40. for {
  41. msg, ok := <-msgs
  42. if !ok {
  43. log.Error("s.videoshotSub2.Message closed")
  44. return
  45. }
  46. msg.Commit()
  47. var res struct {
  48. Cid int64 `json:"cid"`
  49. Image []string `json:"image"`
  50. Bin string `json:"bin"`
  51. }
  52. if err := json.Unmarshal([]byte(msg.Value), &res); err != nil {
  53. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  54. continue
  55. }
  56. log.Info("videoshotSHConsumer videoshotMessage in shanghai yunlifang key(%s) cid(%d) count(%d) partition(%d) offset(%d) commit", msg.Key, res.Cid, len(res.Image), msg.Partition, msg.Offset)
  57. s.videoshotAdd(res.Cid, res.Bin, res.Image)
  58. }
  59. }
  60. func (s *Service) videocoverCopy(aid, count int64, a *archive.Archive) {
  61. if a == nil {
  62. log.Warn("archive(%d) is not exist", aid)
  63. return
  64. }
  65. cover := a.Cover
  66. _, err := url.ParseRequestURI(cover)
  67. if err != nil {
  68. log.Error("videocoverCopy aid(%s) cover(%s) parse error(%v)", aid, cover, err)
  69. return
  70. }
  71. count = count + 1
  72. _, bs, err := s.videoshotDown(cover)
  73. if err != nil {
  74. log.Error("videocoverCopy aid(%d) cover(%s) videoshotDown err(%v)", aid, cover, err)
  75. if err == ecode.NothingFound {
  76. return
  77. }
  78. s.syncRetry(context.TODO(), aid, count, redis.ActionForVideocovers, cover, cover)
  79. return
  80. }
  81. if cover, err = s.videocoverUp("", bs); err != nil {
  82. log.Error("videocoverCopy aid(%d) cover(%s) videocoverUp err(%v)", aid, cover, err)
  83. s.syncRetry(context.TODO(), aid, count, redis.ActionForVideocovers, a.Cover, a.Cover)
  84. return
  85. }
  86. u, err := url.Parse(cover)
  87. if err != nil {
  88. log.Error("videocoverCopy aid(%d) url(%s) Parse(%s) error", aid, cover, err)
  89. return
  90. }
  91. if _, err := s.arc.UpCover(context.TODO(), aid, u.Path); err != nil {
  92. s.syncRetry(context.TODO(), aid, count, redis.ActionForVideocovers, cover, cover)
  93. return
  94. }
  95. log.Info("videocoverCopy aid(%d) new cover(%s) sub(%s) success", aid, cover, u.Path)
  96. }
  97. func (s *Service) videoshotAdd(cid int64, bin string, imgs []string) {
  98. _, err := url.ParseRequestURI(bin)
  99. if err != nil {
  100. log.Error("videoshotAdd cid(%s) add bin(%s) parse error(%v)", cid, bin, err)
  101. return
  102. }
  103. fn, bs, err := s.videoshotDown(bin)
  104. if err != nil {
  105. if err == ecode.NothingFound {
  106. return
  107. }
  108. s.syncRetry(context.TODO(), cid, 0, redis.ActionForVideoshot, bin, strings.Join(imgs, ","))
  109. return
  110. }
  111. if _, err := s.videoshotUp(fn, bs); err != nil {
  112. s.syncRetry(context.TODO(), cid, 0, redis.ActionForVideoshot, bin, strings.Join(imgs, ","))
  113. return
  114. }
  115. for _, img := range imgs {
  116. if len(img) == 0 {
  117. continue
  118. }
  119. fn, bs, err := s.videoshotDown(img)
  120. if err != nil {
  121. if err == ecode.NothingFound {
  122. return
  123. }
  124. s.syncRetry(context.TODO(), cid, 0, redis.ActionForVideoshot, bin, strings.Join(imgs, ","))
  125. return
  126. }
  127. if _, err := s.videoshotUp(fn, bs); err != nil {
  128. s.syncRetry(context.TODO(), cid, 0, redis.ActionForVideoshot, bin, strings.Join(imgs, ","))
  129. return
  130. }
  131. }
  132. if _, err := s.arc.AddVideoShot(context.TODO(), cid, len(imgs)); err != nil {
  133. s.syncRetry(context.TODO(), cid, 0, "", bin, strings.Join(imgs, ","))
  134. return
  135. }
  136. log.Info("videoshotAdd cid(%d) success", cid)
  137. }
  138. func (s *Service) videoshotDown(uri string) (filename string, bs []byte, err error) {
  139. fileURL, err := url.ParseRequestURI(uri)
  140. if err != nil {
  141. log.Error("videoshotDown(%s) url parse error(%v)", uri, err)
  142. return
  143. }
  144. filename = filepath.Base(fileURL.Path)
  145. resp, err := videoshotHTTPClient.Get(uri)
  146. if err != nil {
  147. log.Error("videoshotDown download from url(%s) error(%v)", uri, err)
  148. return
  149. }
  150. defer resp.Body.Close()
  151. if resp.StatusCode != http.StatusOK {
  152. err = fmt.Errorf("videoshowDown(%s) status error(%d)", uri, resp.StatusCode)
  153. log.Error("%v", err)
  154. if resp.StatusCode == http.StatusNotFound {
  155. err = ecode.NothingFound
  156. }
  157. return
  158. }
  159. if bs, err = ioutil.ReadAll(resp.Body); err != nil {
  160. log.Error("videoshowDown(%s) read body error(%v)", uri, err)
  161. }
  162. return
  163. }
  164. func (s *Service) videoshotUp(filename string, bs []byte) (url string, err error) {
  165. var (
  166. uri = fmt.Sprintf("%s%s/%s", videoshotPath, videoshotBucket, filename)
  167. req *http.Request
  168. resp *http.Response
  169. code int
  170. )
  171. if req, err = http.NewRequest(http.MethodPut, uri, bytes.NewReader(bs)); err != nil {
  172. return
  173. }
  174. var sign = func() string {
  175. expire := time.Now().Unix()
  176. content := fmt.Sprintf("%s\n%s\n%s\n%d\n", http.MethodPut, videoshotBucket, filename, expire)
  177. mac := hmac.New(sha1.New, []byte(videoshotBfsSecret))
  178. mac.Write([]byte(content))
  179. return fmt.Sprintf("%s:%s:%d", videoshotBfsKey, base64.StdEncoding.EncodeToString(mac.Sum(nil)), expire)
  180. }
  181. req.Header.Set("Content-Type", "image/jpeg")
  182. req.Header.Set("Authorization", sign())
  183. if resp, err = videoshotHTTPClient.Do(req); err != nil {
  184. log.Error("videoshotUp client.Do(%s) error(%v)", filename, err)
  185. return
  186. }
  187. defer resp.Body.Close()
  188. if resp.StatusCode != http.StatusOK {
  189. err = fmt.Errorf("videoshotUp client.Do(%s) status: %d", filename, resp.StatusCode)
  190. log.Error("%v", err)
  191. return
  192. }
  193. if code, err = strconv.Atoi(resp.Header.Get("code")); err != nil {
  194. err = fmt.Errorf("videoshotUp conv(%s) code to int error(%v)", filename, err)
  195. log.Error("%v", err)
  196. return
  197. }
  198. if code == http.StatusOK {
  199. url = resp.Header.Get("location")
  200. } else {
  201. err = fmt.Errorf("videoshotUp client.Do(%s) code: %d", filename, code)
  202. log.Error("%v", err)
  203. }
  204. return
  205. }
  206. func (s *Service) videocoverUp(filename string, bs []byte) (url string, err error) {
  207. var (
  208. uri = fmt.Sprintf("%s%s/%s", videoshotPath, archiveBucket, filename)
  209. req *http.Request
  210. resp *http.Response
  211. code int
  212. )
  213. if req, err = http.NewRequest(http.MethodPut, uri, bytes.NewReader(bs)); err != nil {
  214. return
  215. }
  216. var sign = func() string {
  217. expire := time.Now().Unix()
  218. content := fmt.Sprintf("%s\n%s\n%s\n%d\n", http.MethodPut, archiveBucket, filename, expire)
  219. mac := hmac.New(sha1.New, []byte(archiveBfsSecret))
  220. mac.Write([]byte(content))
  221. return fmt.Sprintf("%s:%s:%d", archiveBfsKey, base64.StdEncoding.EncodeToString(mac.Sum(nil)), expire)
  222. }
  223. req.Header.Set("Content-Type", "image/jpeg")
  224. req.Header.Set("Authorization", sign())
  225. if resp, err = videoshotHTTPClient.Do(req); err != nil {
  226. log.Error("videocoverUp client.Do(%s) error(%v)", filename, err)
  227. return
  228. }
  229. defer resp.Body.Close()
  230. if resp.StatusCode != http.StatusOK {
  231. err = fmt.Errorf("videocoverUp client.Do(%s) status: %d", filename, resp.StatusCode)
  232. log.Error("%v", err)
  233. return
  234. }
  235. if code, err = strconv.Atoi(resp.Header.Get("code")); err != nil {
  236. err = fmt.Errorf("videocoverUp conv(%s) code to int error(%v)", filename, err)
  237. log.Error("%v", err)
  238. return
  239. }
  240. if code == http.StatusOK {
  241. url = resp.Header.Get("location")
  242. } else {
  243. err = fmt.Errorf("videocoverUp client.Do(%s) code: %d", filename, code)
  244. log.Error("%v", err)
  245. }
  246. return
  247. }