ftp.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package service
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "os"
  11. "os/exec"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "syscall"
  16. "time"
  17. "go-common/app/job/bbq/video/model"
  18. "go-common/library/log"
  19. ftp "github.com/ftp-master"
  20. )
  21. const (
  22. filePathKey = "search"
  23. videoFileName = "bbqvideo"
  24. videoFileMD5Name = "bbqvideo.md5"
  25. userFileName = "bbquser"
  26. userFileMD5Name = "bbquser.md5"
  27. sugFileName = "bbqsug"
  28. sugFileMD5Name = "bbqsug.md5"
  29. sugSrcIDIdx = 0
  30. sugSrcTermIdx = 1
  31. sugSrcTypeIdx = 2
  32. sugSrcScoreIdx = 3
  33. defaultSugPath = "/src/go-common/app/job/bbq/video/cmd/sug"
  34. )
  35. // writeSug .
  36. func (s *Service) writeSug(path string, filename string, md5name string) (err error) {
  37. var num int64
  38. filePath := path + filename
  39. file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666)
  40. if err != nil {
  41. log.Error("writeSug os.OpenFile(%s) error(%v)", filename, err)
  42. return
  43. }
  44. defer file.Close()
  45. srcPath := s.c.FTP.LocalPath["sugsrc"]
  46. if srcPath == "" {
  47. srcPath = os.Getenv("GOPATH") + defaultSugPath
  48. }
  49. if srcPath == "" {
  50. log.Error("sugsrc path is empty")
  51. return
  52. }
  53. src, err := os.Open(srcPath)
  54. if err != nil {
  55. log.Error("writeSug os.Open source sug error(%v)", err)
  56. return
  57. }
  58. defer src.Close()
  59. br := bufio.NewReader(src)
  60. for {
  61. a, _, c := br.ReadLine()
  62. if c == io.EOF {
  63. break
  64. }
  65. strArr := strings.Split(string(a), ",")
  66. var bs []byte
  67. var b []byte
  68. id, _ := strconv.ParseInt(strArr[sugSrcIDIdx], 10, 64)
  69. score, _ := strconv.ParseInt(strArr[sugSrcScoreIdx], 10, 64)
  70. sd := &model.Sug{
  71. ID: id,
  72. Term: strArr[sugSrcTermIdx],
  73. Type: strArr[sugSrcTypeIdx],
  74. Score: score,
  75. }
  76. if b, err = json.Marshal(sd); err != nil {
  77. log.Warn("json.Marshal(%v) error(%v)", sd, err)
  78. continue
  79. }
  80. bs = append(bs, b...)
  81. bs = append(bs, '\n')
  82. if _, err = file.Write(bs); err != nil {
  83. log.Error("writeSug file.Write error(%v)", err)
  84. return
  85. }
  86. num++
  87. // log.Info("write sug term:[%s]", strArr[sugSrcTermIdx])
  88. }
  89. log.Info("writeSug success num (%d)", num)
  90. if err = s.writeMD5(path, filename, md5name); err != nil {
  91. return
  92. }
  93. s.uploadFile(path, filename, md5name)
  94. return
  95. }
  96. // writeUserInfo .
  97. func (s *Service) writeUserInfo(path string, filename string, md5name string) (err error) {
  98. var lastID int64
  99. var num int64
  100. filePath := path + filename
  101. file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666)
  102. if err != nil {
  103. log.Error("writeUserInfo os.OpenFile(%s) error(%v)", filename, err)
  104. return
  105. }
  106. defer file.Close()
  107. for {
  108. var (
  109. dataList []*model.UserBaseDB
  110. bs []byte
  111. )
  112. if dataList, err = s.dao.UsersByLast(context.Background(), lastID); err != nil {
  113. return
  114. }
  115. if len(dataList) == 0 {
  116. break
  117. }
  118. for _, data := range dataList {
  119. var b []byte
  120. sd := &model.UserSearch{
  121. ID: data.MID,
  122. Uname: data.Uname,
  123. }
  124. if b, err = json.Marshal(sd); err != nil {
  125. log.Warn("json.Marshal(%v) error(%v)", sd, err)
  126. continue
  127. }
  128. bs = append(bs, b...)
  129. bs = append(bs, '\n')
  130. lastID = data.ID
  131. num++
  132. // log.Info("append user mid:[%d]", data.MID)
  133. }
  134. if _, err = file.Write(bs); err != nil {
  135. log.Error("writeUserInfo file.Write error(%v)", err)
  136. return
  137. }
  138. }
  139. log.Info("writeUserInfo success num (%d)", num)
  140. if err = s.writeMD5(path, filename, md5name); err != nil {
  141. return
  142. }
  143. s.uploadFile(path, filename, md5name)
  144. return
  145. }
  146. // writeVideoInfo .
  147. func (s *Service) writeVideoInfo(path string) (err error) {
  148. var lastID int64
  149. var num int64
  150. var RelateID int64
  151. filePath := path + videoFileName
  152. file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666)
  153. if err != nil {
  154. log.Error("writeVideoInfo os.OpenFile(%s) error(%v)", videoFileName, err)
  155. return
  156. }
  157. defer file.Close()
  158. for {
  159. var (
  160. dataList []*model.VideoDB
  161. bs []byte
  162. )
  163. if dataList, err = s.dao.VideosByLast(context.Background(), lastID); err != nil {
  164. return
  165. }
  166. if len(dataList) == 0 {
  167. break
  168. }
  169. for _, data := range dataList {
  170. var (
  171. b []byte
  172. )
  173. RelateID, err = strconv.ParseInt(fmt.Sprintf("%d00", data.AutoID), 10, 32)
  174. if err != nil {
  175. log.Error("relate id parse err [%v]", err)
  176. continue
  177. }
  178. sd := &model.VideoSearch{
  179. ID: int32(RelateID),
  180. Title: data.Title,
  181. PubTime: int32(data.Pubtime),
  182. }
  183. if b, err = json.Marshal(sd); err != nil {
  184. log.Warn("json.Marshal(%v) error(%v)", sd, err)
  185. continue
  186. }
  187. bs = append(bs, b...)
  188. bs = append(bs, '\n')
  189. lastID = data.AutoID
  190. num++
  191. // log.Info("append video svid:[%d]", data.ID)
  192. }
  193. if _, err = file.Write(bs); err != nil {
  194. log.Error("writeVideoInfo file.Write error(%v)", err)
  195. return
  196. }
  197. }
  198. log.Info("writeVideoInfo success num (%d)", num)
  199. if err = s.writeMD5(path, videoFileName, videoFileMD5Name); err != nil {
  200. return
  201. }
  202. s.uploadFile(path, videoFileName, videoFileMD5Name)
  203. return
  204. }
  205. // writeMD5 .
  206. func (s *Service) writeMD5(path string, fname string, md5name string) (err error) {
  207. var out bytes.Buffer
  208. f, err := exec.LookPath("md5sum")
  209. if err != nil {
  210. log.Error("writeMD5 exec.LookPath(md5sum) error(%v)", err)
  211. f = "md5sum"
  212. }
  213. cmd := exec.Command(f, path+fname)
  214. cmd.Stdout = &out
  215. err = cmd.Run()
  216. if err != nil {
  217. log.Error("writeMD5 cmd.Run() error(%v)", err)
  218. return
  219. }
  220. outStrs := strings.Split(out.String(), " ")
  221. if len(outStrs) == 0 {
  222. log.Error("writeMD5 len(outStrs) == 0")
  223. return errors.New("NONE MD5")
  224. }
  225. md5FileName := path + md5name
  226. md5File, err := os.OpenFile(md5FileName, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY, 0600)
  227. if err != nil {
  228. log.Error("writeMD5 os.OpenFile(md5 file) error(%v)", err)
  229. return
  230. }
  231. defer md5File.Close()
  232. _, err = md5File.WriteString(outStrs[0])
  233. return
  234. }
  235. // SyncVideo2Search 同步视频
  236. func (s *Service) SyncVideo2Search() {
  237. err := s.writeVideoInfo(s.c.FTP.LocalPath[filePathKey])
  238. if err != nil {
  239. log.Error("SyncVideo err[%v]", err)
  240. } else {
  241. log.Info("SyncVideo complete")
  242. }
  243. }
  244. // SyncUser2Search 同步用户
  245. func (s *Service) SyncUser2Search() {
  246. err := s.writeUserInfo(s.c.FTP.LocalPath[filePathKey], userFileName, userFileMD5Name)
  247. if err != nil {
  248. log.Error("syncUser err[%v]", err)
  249. } else {
  250. log.Info("syncUser complete")
  251. }
  252. }
  253. // SyncSug2Search 同步sug
  254. func (s *Service) SyncSug2Search() {
  255. err := s.writeSug(s.c.FTP.LocalPath[filePathKey], sugFileName, sugFileMD5Name)
  256. if err != nil {
  257. log.Error("syncSug err[%v]", err)
  258. } else {
  259. log.Info("syncSug complete")
  260. }
  261. }
  262. // SyncSearch 全量同步搜索各文件
  263. func (s *Service) SyncSearch() {
  264. for {
  265. s.searchChan <- "SyncSearch Complete"
  266. var wg sync.WaitGroup
  267. fmt.Println("SyncSearch Start")
  268. stratAt := time.Now()
  269. wg.Add(3)
  270. go func() {
  271. defer wg.Done()
  272. s.SyncVideo2Search()
  273. }()
  274. go func() {
  275. defer wg.Done()
  276. s.SyncUser2Search()
  277. }()
  278. go func() {
  279. defer wg.Done()
  280. s.SyncSug2Search()
  281. }()
  282. wg.Wait()
  283. //耗时统计
  284. elapsed := time.Since(stratAt)
  285. log.Info("SyncSearch elapsed: %s", elapsed)
  286. fmt.Println(<-s.searchChan)
  287. //间隔1min
  288. time.Sleep(time.Minute * 1)
  289. }
  290. }
  291. // uploadFile .
  292. func (s *Service) uploadFile(path string, fname string, md5name string) (err error) {
  293. ftp, err := ftp.Connect(s.c.FTP.Addr)
  294. if err != nil {
  295. log.Error("connect to ftp(%s) error(%v)", s.c.FTP.Addr, err)
  296. return
  297. }
  298. defer ftp.Quit()
  299. err = ftp.Login(s.c.FTP.User, s.c.FTP.Password)
  300. if err != nil {
  301. log.Error("ftp login(user:%s) error(%v)", s.c.FTP.User, err)
  302. return
  303. }
  304. defer ftp.Logout()
  305. ftpDir := fmt.Sprintf(s.c.FTP.RemotePath[filePathKey], fname)
  306. err = ftp.ChangeDir(ftpDir)
  307. if err != nil {
  308. log.Error("enter ftp path [%s] err [%v]", ftpDir, err)
  309. return
  310. }
  311. // delete source file, and upload file to ftp.
  312. if err = ftp.Delete(fname); err != nil {
  313. log.Error("ftp.Delete(%s) error(%v)", fname, err)
  314. }
  315. filePath := path + fname
  316. file, err := os.Open(filePath)
  317. if err != nil {
  318. log.Error("os.Open(%s) error(%v)", filePath, err)
  319. return
  320. }
  321. defer file.Close()
  322. if err = ftp.Stor(fname, file); err != nil {
  323. log.Error("ftp.Stor(%s) error(%v)", fname, err)
  324. return
  325. }
  326. log.Info("ftp upload success(%s)", fname)
  327. // delete source.md5 file, and upload file to ftp.
  328. if err = ftp.Delete(md5name); err != nil {
  329. log.Error("ftp.Delete(%s) error(%v)", md5name, err)
  330. }
  331. md5FilePath := path + md5name
  332. md5File, err := os.Open(md5FilePath)
  333. if err != nil {
  334. log.Error("os.Open(%s) error(%v)", md5name, err)
  335. return
  336. }
  337. defer md5File.Close()
  338. if err = ftp.Stor(md5name, md5File); err != nil {
  339. log.Error("ftp.Stor(%s) error(%v)", md5name, err)
  340. return
  341. }
  342. log.Info("ftp upload success(%s)", md5name)
  343. return
  344. }