memcache.go 6.2 KB


  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "go-common/app/job/main/dm2/model"
  7. "go-common/library/cache/memcache"
  8. "go-common/library/log"
  9. )
  10. const (
  11. _prefixXML = "dm_xml_"
  12. _prefixSub = "s_"
  13. _prefixAjax = "dm_ajax_"
  14. _keyDuration = "d_" // video duration
  15. )
  16. func keyXML(oid int64) string {
  17. return _prefixXML + strconv.FormatInt(oid, 10)
  18. }
  19. func keySubject(tp int32, oid int64) string {
  20. return _prefixSub + fmt.Sprintf("%d_%d", tp, oid)
  21. }
  22. func keyAjax(oid int64) string {
  23. return _prefixAjax + strconv.FormatInt(oid, 10)
  24. }
  25. // keyDuration return video duration key.
  26. func keyDuration(oid int64) string {
  27. return _keyDuration + strconv.FormatInt(oid, 10)
  28. }
  29. func keyTransferLock() string {
  30. return "dm_transfer_lock"
  31. }
  32. // DelXMLCache delete xml content.
  33. func (d *Dao) DelXMLCache(c context.Context, oid int64) (err error) {
  34. conn := d.mc.Get(c)
  35. key := keyXML(oid)
  36. if err = conn.Delete(key); err != nil {
  37. if err == memcache.ErrNotFound {
  38. err = nil
  39. } else {
  40. log.Error("conn.Delete(%s) error(%v)", key, err)
  41. }
  42. }
  43. conn.Close()
  44. return
  45. }
  46. // AddXMLCache add xml content to memcache.
  47. func (d *Dao) AddXMLCache(c context.Context, oid int64, value []byte) (err error) {
  48. conn := d.mc.Get(c)
  49. defer conn.Close()
  50. item := &memcache.Item{
  51. Key: keyXML(oid),
  52. Value: value,
  53. Expiration: d.mcExpire,
  54. }
  55. if err = conn.Set(item); err != nil {
  56. log.Error("conn.Set(%s) error(%v)", keyXML(oid), err)
  57. }
  58. return
  59. }
  60. // XMLCache get xml content.
  61. func (d *Dao) XMLCache(c context.Context, oid int64) (data []byte, err error) {
  62. key := keyXML(oid)
  63. conn := d.mc.Get(c)
  64. defer conn.Close()
  65. item, err := conn.Get(key)
  66. if err != nil {
  67. if err == memcache.ErrNotFound {
  68. err = nil
  69. } else {
  70. log.Error("mc.Get(%s) error(%v)", key, err)
  71. }
  72. return
  73. }
  74. data = item.Value
  75. return
  76. }
  77. // SubjectCache get subject from memcache.
  78. func (d *Dao) SubjectCache(c context.Context, tp int32, oid int64) (sub *model.Subject, err error) {
  79. var (
  80. conn = d.mc.Get(c)
  81. key = keySubject(tp, oid)
  82. rp *memcache.Item
  83. )
  84. defer conn.Close()
  85. if rp, err = conn.Get(key); err != nil {
  86. if err == memcache.ErrNotFound {
  87. sub = nil
  88. err = nil
  89. } else {
  90. log.Error("mc.Get(%s) error(%v)", key, err)
  91. }
  92. return
  93. }
  94. sub = &model.Subject{}
  95. if err = conn.Scan(rp, &sub); err != nil {
  96. log.Error("mc.Scan(%d) error(%v)", oid, err)
  97. }
  98. return
  99. }
  100. // SubjectsCache multi get subject from memcache.
  101. func (d *Dao) SubjectsCache(c context.Context, tp int32, oids []int64) (cached map[int64]*model.Subject, missed []int64, err error) {
  102. var (
  103. conn = d.mc.Get(c)
  104. keys []string
  105. oidMap = make(map[string]int64, len(oids))
  106. )
  107. cached = make(map[int64]*model.Subject, len(oids))
  108. defer conn.Close()
  109. for _, oid := range oids {
  110. k := keySubject(tp, oid)
  111. if _, ok := oidMap[k]; !ok {
  112. keys = append(keys, k)
  113. oidMap[k] = oid
  114. }
  115. }
  116. rs, err := conn.GetMulti(keys)
  117. if err != nil {
  118. log.Error("conn.GetMulti(%v) error(%v)", keys, err)
  119. return
  120. }
  121. for k, r := range rs {
  122. sub := &model.Subject{}
  123. if err = conn.Scan(r, sub); err != nil {
  124. log.Error("conn.Scan(%s) error(%v)", r.Value, err)
  125. err = nil
  126. continue
  127. }
  128. cached[oidMap[k]] = sub
  129. // delete hit key
  130. delete(oidMap, k)
  131. }
  132. // missed key
  133. missed = make([]int64, 0, len(oidMap))
  134. for _, oid := range oidMap {
  135. missed = append(missed, oid)
  136. }
  137. return
  138. }
  139. // AddSubjectCache add subject cache.
  140. func (d *Dao) AddSubjectCache(c context.Context, sub *model.Subject) (err error) {
  141. var (
  142. conn = d.mc.Get(c)
  143. key = keySubject(sub.Type, sub.Oid)
  144. )
  145. defer conn.Close()
  146. item := &memcache.Item{
  147. Key: key,
  148. Object: sub,
  149. Flags: memcache.FlagJSON,
  150. Expiration: d.mcExpire,
  151. }
  152. if err = conn.Set(item); err != nil {
  153. log.Error("conn.Set(%v) error(%v)", item, err)
  154. }
  155. return
  156. }
  157. // DelSubjectCache delete subject memcache cache.
  158. func (d *Dao) DelSubjectCache(c context.Context, tp int32, oid int64) (err error) {
  159. conn := d.mc.Get(c)
  160. key := keySubject(tp, oid)
  161. if err = conn.Delete(key); err != nil {
  162. if err == memcache.ErrNotFound {
  163. err = nil
  164. } else {
  165. log.Error("conn.Delete(%s) error(%v)", key, err)
  166. }
  167. }
  168. conn.Close()
  169. return
  170. }
  171. // AddTransferLock 添加弹幕转移并发锁
  172. func (d *Dao) AddTransferLock(c context.Context) (succeed bool) {
  173. var (
  174. key = keyTransferLock()
  175. conn = d.mc.Get(c)
  176. )
  177. defer conn.Close()
  178. item := &memcache.Item{
  179. Key: key,
  180. Value: []byte("0"),
  181. Expiration: 60,
  182. }
  183. if err := conn.Add(item); err != nil {
  184. if err != memcache.ErrNotStored {
  185. log.Error("conn.Add(%s) error(%v)", key, err)
  186. }
  187. } else {
  188. succeed = true
  189. }
  190. return
  191. }
  192. // DelTransferLock 删除弹幕转移并发锁
  193. func (d *Dao) DelTransferLock(c context.Context) (err error) {
  194. var (
  195. key = keyTransferLock()
  196. conn = d.mc.Get(c)
  197. )
  198. if err = conn.Delete(key); err != nil {
  199. if err == memcache.ErrNotFound {
  200. err = nil
  201. } else {
  202. log.Error("conn.Delete(%s) error(%v)", key, err)
  203. }
  204. }
  205. conn.Close()
  206. return
  207. }
  208. // DelAjaxDMCache delete ajax dm from memcache.
  209. func (d *Dao) DelAjaxDMCache(c context.Context, oid int64) (err error) {
  210. conn := d.mc.Get(c)
  211. defer conn.Close()
  212. key := keyAjax(oid)
  213. if err = conn.Delete(key); err != nil {
  214. if err == memcache.ErrNotFound {
  215. err = nil
  216. } else {
  217. log.Error("DelAjaxDMCache.conn.Delete(%s) error(%v)", key, err)
  218. }
  219. }
  220. return
  221. }
  222. // DurationCache return duration of video.
  223. func (d *Dao) DurationCache(c context.Context, oid int64) (duration int64, err error) {
  224. var (
  225. key = keyDuration(oid)
  226. conn = d.mc.Get(c)
  227. item *memcache.Item
  228. )
  229. defer conn.Close()
  230. if item, err = conn.Get(key); err != nil {
  231. if err == memcache.ErrNotFound {
  232. duration = model.NotFound
  233. err = nil
  234. } else {
  235. log.Error("conn.Get(%s) error(%v)", key, err)
  236. }
  237. return
  238. }
  239. if duration, err = strconv.ParseInt(string(item.Value), 10, 64); err != nil {
  240. log.Error("strconv.ParseInt(%s) error(%v)", item.Value, err)
  241. }
  242. return
  243. }
  244. // SetDurationCache set video duration to redis.
  245. func (d *Dao) SetDurationCache(c context.Context, oid, duration int64) (err error) {
  246. key := keyDuration(oid)
  247. conn := d.mc.Get(c)
  248. item := memcache.Item{
  249. Key: key,
  250. Value: []byte(fmt.Sprint(duration)),
  251. Expiration: d.mcExpire,
  252. Flags: memcache.FlagRAW,
  253. }
  254. if err = conn.Set(&item); err != nil {
  255. log.Error("mc.Set(%v) error(%v)", item, err)
  256. }
  257. conn.Close()
  258. return
  259. }