package service import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "strconv" "strings" "sync" "syscall" "time" "go-common/app/job/bbq/video/model" "go-common/library/log" ftp "github.com/ftp-master" ) const ( filePathKey = "search" videoFileName = "bbqvideo" videoFileMD5Name = "bbqvideo.md5" userFileName = "bbquser" userFileMD5Name = "bbquser.md5" sugFileName = "bbqsug" sugFileMD5Name = "bbqsug.md5" sugSrcIDIdx = 0 sugSrcTermIdx = 1 sugSrcTypeIdx = 2 sugSrcScoreIdx = 3 defaultSugPath = "/src/go-common/app/job/bbq/video/cmd/sug" ) // writeSug . func (s *Service) writeSug(path string, filename string, md5name string) (err error) { var num int64 filePath := path + filename file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Error("writeSug os.OpenFile(%s) error(%v)", filename, err) return } defer file.Close() srcPath := s.c.FTP.LocalPath["sugsrc"] if srcPath == "" { srcPath = os.Getenv("GOPATH") + defaultSugPath } if srcPath == "" { log.Error("sugsrc path is empty") return } src, err := os.Open(srcPath) if err != nil { log.Error("writeSug os.Open source sug error(%v)", err) return } defer src.Close() br := bufio.NewReader(src) for { a, _, c := br.ReadLine() if c == io.EOF { break } strArr := strings.Split(string(a), ",") var bs []byte var b []byte id, _ := strconv.ParseInt(strArr[sugSrcIDIdx], 10, 64) score, _ := strconv.ParseInt(strArr[sugSrcScoreIdx], 10, 64) sd := &model.Sug{ ID: id, Term: strArr[sugSrcTermIdx], Type: strArr[sugSrcTypeIdx], Score: score, } if b, err = json.Marshal(sd); err != nil { log.Warn("json.Marshal(%v) error(%v)", sd, err) continue } bs = append(bs, b...) bs = append(bs, '\n') if _, err = file.Write(bs); err != nil { log.Error("writeSug file.Write error(%v)", err) return } num++ // log.Info("write sug term:[%s]", strArr[sugSrcTermIdx]) } log.Info("writeSug success num (%d)", num) if err = s.writeMD5(path, filename, md5name); err != nil { return } s.uploadFile(path, filename, md5name) return } // writeUserInfo . func (s *Service) writeUserInfo(path string, filename string, md5name string) (err error) { var lastID int64 var num int64 filePath := path + filename file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Error("writeUserInfo os.OpenFile(%s) error(%v)", filename, err) return } defer file.Close() for { var ( dataList []*model.UserBaseDB bs []byte ) if dataList, err = s.dao.UsersByLast(context.Background(), lastID); err != nil { return } if len(dataList) == 0 { break } for _, data := range dataList { var b []byte sd := &model.UserSearch{ ID: data.MID, Uname: data.Uname, } if b, err = json.Marshal(sd); err != nil { log.Warn("json.Marshal(%v) error(%v)", sd, err) continue } bs = append(bs, b...) bs = append(bs, '\n') lastID = data.ID num++ // log.Info("append user mid:[%d]", data.MID) } if _, err = file.Write(bs); err != nil { log.Error("writeUserInfo file.Write error(%v)", err) return } } log.Info("writeUserInfo success num (%d)", num) if err = s.writeMD5(path, filename, md5name); err != nil { return } s.uploadFile(path, filename, md5name) return } // writeVideoInfo . func (s *Service) writeVideoInfo(path string) (err error) { var lastID int64 var num int64 var RelateID int64 filePath := path + videoFileName file, err := os.OpenFile(filePath, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Error("writeVideoInfo os.OpenFile(%s) error(%v)", videoFileName, err) return } defer file.Close() for { var ( dataList []*model.VideoDB bs []byte ) if dataList, err = s.dao.VideosByLast(context.Background(), lastID); err != nil { return } if len(dataList) == 0 { break } for _, data := range dataList { var ( b []byte ) RelateID, err = strconv.ParseInt(fmt.Sprintf("%d00", data.AutoID), 10, 32) if err != nil { log.Error("relate id parse err [%v]", err) continue } sd := &model.VideoSearch{ ID: int32(RelateID), Title: data.Title, PubTime: int32(data.Pubtime), } if b, err = json.Marshal(sd); err != nil { log.Warn("json.Marshal(%v) error(%v)", sd, err) continue } bs = append(bs, b...) bs = append(bs, '\n') lastID = data.AutoID num++ // log.Info("append video svid:[%d]", data.ID) } if _, err = file.Write(bs); err != nil { log.Error("writeVideoInfo file.Write error(%v)", err) return } } log.Info("writeVideoInfo success num (%d)", num) if err = s.writeMD5(path, videoFileName, videoFileMD5Name); err != nil { return } s.uploadFile(path, videoFileName, videoFileMD5Name) return } // writeMD5 . func (s *Service) writeMD5(path string, fname string, md5name string) (err error) { var out bytes.Buffer f, err := exec.LookPath("md5sum") if err != nil { log.Error("writeMD5 exec.LookPath(md5sum) error(%v)", err) f = "md5sum" } cmd := exec.Command(f, path+fname) cmd.Stdout = &out err = cmd.Run() if err != nil { log.Error("writeMD5 cmd.Run() error(%v)", err) return } outStrs := strings.Split(out.String(), " ") if len(outStrs) == 0 { log.Error("writeMD5 len(outStrs) == 0") return errors.New("NONE MD5") } md5FileName := path + md5name md5File, err := os.OpenFile(md5FileName, os.O_CREATE|syscall.O_TRUNC|os.O_WRONLY, 0600) if err != nil { log.Error("writeMD5 os.OpenFile(md5 file) error(%v)", err) return } defer md5File.Close() _, err = md5File.WriteString(outStrs[0]) return } // SyncVideo2Search 同步视频 func (s *Service) SyncVideo2Search() { err := s.writeVideoInfo(s.c.FTP.LocalPath[filePathKey]) if err != nil { log.Error("SyncVideo err[%v]", err) } else { log.Info("SyncVideo complete") } } // SyncUser2Search 同步用户 func (s *Service) SyncUser2Search() { err := s.writeUserInfo(s.c.FTP.LocalPath[filePathKey], userFileName, userFileMD5Name) if err != nil { log.Error("syncUser err[%v]", err) } else { log.Info("syncUser complete") } } // SyncSug2Search 同步sug func (s *Service) SyncSug2Search() { err := s.writeSug(s.c.FTP.LocalPath[filePathKey], sugFileName, sugFileMD5Name) if err != nil { log.Error("syncSug err[%v]", err) } else { log.Info("syncSug complete") } } // SyncSearch 全量同步搜索各文件 func (s *Service) SyncSearch() { for { s.searchChan <- "SyncSearch Complete" var wg sync.WaitGroup fmt.Println("SyncSearch Start") stratAt := time.Now() wg.Add(3) go func() { defer wg.Done() s.SyncVideo2Search() }() go func() { defer wg.Done() s.SyncUser2Search() }() go func() { defer wg.Done() s.SyncSug2Search() }() wg.Wait() //耗时统计 elapsed := time.Since(stratAt) log.Info("SyncSearch elapsed: %s", elapsed) fmt.Println(<-s.searchChan) //间隔1min time.Sleep(time.Minute * 1) } } // uploadFile . func (s *Service) uploadFile(path string, fname string, md5name string) (err error) { ftp, err := ftp.Connect(s.c.FTP.Addr) if err != nil { log.Error("connect to ftp(%s) error(%v)", s.c.FTP.Addr, err) return } defer ftp.Quit() err = ftp.Login(s.c.FTP.User, s.c.FTP.Password) if err != nil { log.Error("ftp login(user:%s) error(%v)", s.c.FTP.User, err) return } defer ftp.Logout() ftpDir := fmt.Sprintf(s.c.FTP.RemotePath[filePathKey], fname) err = ftp.ChangeDir(ftpDir) if err != nil { log.Error("enter ftp path [%s] err [%v]", ftpDir, err) return } // delete source file, and upload file to ftp. if err = ftp.Delete(fname); err != nil { log.Error("ftp.Delete(%s) error(%v)", fname, err) } filePath := path + fname file, err := os.Open(filePath) if err != nil { log.Error("os.Open(%s) error(%v)", filePath, err) return } defer file.Close() if err = ftp.Stor(fname, file); err != nil { log.Error("ftp.Stor(%s) error(%v)", fname, err) return } log.Info("ftp upload success(%s)", fname) // delete source.md5 file, and upload file to ftp. if err = ftp.Delete(md5name); err != nil { log.Error("ftp.Delete(%s) error(%v)", md5name, err) } md5FilePath := path + md5name md5File, err := os.Open(md5FilePath) if err != nil { log.Error("os.Open(%s) error(%v)", md5name, err) return } defer md5File.Close() if err = ftp.Stor(md5name, md5File); err != nil { log.Error("ftp.Stor(%s) error(%v)", md5name, err) return } log.Info("ftp upload success(%s)", md5name) return }