dm_special.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "sort"
  7. "go-common/app/job/main/dm2/model"
  8. )
  9. const (
  10. _bfsMaxSize = 16 * 1024 * 1024 // size MediumText
  11. _specialJSONItemSize = 20 + 1 // {"id":,"content":""},
  12. _specialJSONAtLeastSize = 2 // []
  13. )
  14. // buildSpeicalDms build when db is no record
  15. func (s *Service) speicalDms(c context.Context, tp int32, oid int64) (dms []*model.DM, err error) {
  16. var (
  17. dmids []int64
  18. spContentMap map[int64]*model.ContentSpecial
  19. contentMap map[int64]*model.Content
  20. )
  21. if dms, dmids, err = s.dao.IndexsByPool(c, tp, oid, model.PoolSpecial); err != nil {
  22. return
  23. }
  24. if len(dmids) == 0 {
  25. return
  26. }
  27. if contentMap, err = s.dao.Contents(c, oid, dmids); err != nil {
  28. return
  29. }
  30. if spContentMap, err = s.dao.ContentsSpecial(c, dmids); err != nil {
  31. return
  32. }
  33. for _, dm := range dms {
  34. if v, ok := contentMap[dm.ID]; ok {
  35. dm.Content = v
  36. }
  37. if v, ok := spContentMap[dm.ID]; ok {
  38. dm.ContentSpe = v
  39. }
  40. }
  41. sort.Slice(dms, func(i, j int) bool {
  42. return dms[i].Progress < dms[j].Progress
  43. })
  44. return
  45. }
  46. func (s *Service) buildSpecialDms(c context.Context, dms []*model.DM) (bss [][]byte, err error) {
  47. var (
  48. dmSpecialContents []*model.DmSpecialContent
  49. bs []byte
  50. length int
  51. )
  52. if len(dms) == 0 {
  53. return
  54. }
  55. dmSpecialContents = make([]*model.DmSpecialContent, 0, len(dms))
  56. length = _specialJSONAtLeastSize
  57. for _, dm := range dms {
  58. if len(dm.GetSpecialSeg()) == 0 {
  59. continue
  60. }
  61. itemSize := len(fmt.Sprint(dm.ID)) + len(dm.GetSpecialSeg()) + _specialJSONItemSize
  62. if length+itemSize > _bfsMaxSize {
  63. if bs, err = json.Marshal(dmSpecialContents); err != nil {
  64. return
  65. }
  66. bss = append(bss, bs)
  67. dmSpecialContents = make([]*model.DmSpecialContent, 0, len(dms))
  68. length = _specialJSONAtLeastSize
  69. }
  70. length += itemSize
  71. dmSpecialContents = append(dmSpecialContents, &model.DmSpecialContent{
  72. ID: dm.ID,
  73. Content: dm.GetSpecialSeg(),
  74. })
  75. }
  76. if len(dmSpecialContents) > 0 {
  77. if bs, err = json.Marshal(dmSpecialContents); err != nil {
  78. return
  79. }
  80. bss = append(bss, bs)
  81. }
  82. return
  83. }
  84. func (s *Service) updateSpecualDms(c context.Context, tp int32, oid int64, bss [][]byte) (err error) {
  85. var (
  86. location string
  87. locations []string
  88. ds *model.DmSpecial
  89. )
  90. for _, bs := range bss {
  91. if len(bs) == 0 {
  92. continue
  93. }
  94. if location, err = s.dao.BfsDmUpload(c, "", bs); err != nil {
  95. return
  96. }
  97. locations = append(locations, location)
  98. }
  99. ds = &model.DmSpecial{
  100. Type: tp,
  101. Oid: oid,
  102. }
  103. ds.Join(locations)
  104. if err = s.dao.UpsertDmSpecialLocation(c, ds.Type, ds.Oid, ds.Locations); err != nil {
  105. return
  106. }
  107. return
  108. }
  109. func (s *Service) specialLocationUpdate(c context.Context, tp int32, oid int64) (err error) {
  110. var (
  111. dms []*model.DM
  112. bss [][]byte
  113. )
  114. if dms, err = s.speicalDms(c, tp, oid); err != nil {
  115. return
  116. }
  117. if bss, err = s.buildSpecialDms(c, dms); err != nil {
  118. return
  119. }
  120. if err = s.updateSpecualDms(c, tp, oid, bss); err != nil {
  121. return
  122. }
  123. return
  124. }