registrar.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package file
  2. import (
  3. "sync"
  4. "os"
  5. "fmt"
  6. "time"
  7. "io"
  8. "path/filepath"
  9. "encoding/json"
  10. "context"
  11. "go-common/library/log"
  12. )
  13. type Registrar struct {
  14. Channel chan State
  15. registryFile string // Path to the Registry File
  16. wg sync.WaitGroup
  17. states *States // Map with all file paths inside and the corresponding state
  18. bufferedStateUpdates int
  19. flushInterval time.Duration
  20. harvesters map[uint64]*Harvester
  21. hLock sync.RWMutex
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. }
  25. // New creates a new Registrar instance, updating the registry file on
  26. // `file.State` updates. New fails if the file can not be opened or created.
  27. func NewRegistry(ctx context.Context, registryFile string) (*Registrar, error) {
  28. r := &Registrar{
  29. registryFile: registryFile,
  30. states: NewStates(),
  31. Channel: make(chan State, 100),
  32. wg: sync.WaitGroup{},
  33. flushInterval: time.Second * 5,
  34. harvesters: make(map[uint64]*Harvester),
  35. }
  36. r.ctx, r.cancel = context.WithCancel(ctx)
  37. err := r.Init()
  38. if err != nil {
  39. return nil, err
  40. }
  41. go r.Run()
  42. return r, err
  43. }
  44. func (r *Registrar) RegisterHarvester(h *Harvester) error {
  45. r.hLock.Lock()
  46. defer r.hLock.Unlock()
  47. if _, ok := r.harvesters[h.state.Inode]; !ok {
  48. r.harvesters[h.state.Inode] = h
  49. return nil
  50. }
  51. return fmt.Errorf("harvestor of inode %s Re registered", h.state.Inode)
  52. }
  53. func (r *Registrar) UnRegisterHarvester(h *Harvester) error {
  54. r.hLock.Lock()
  55. defer r.hLock.Unlock()
  56. if _, ok := r.harvesters[h.state.Inode]; ok {
  57. delete(r.harvesters, h.state.Inode)
  58. return nil
  59. }
  60. return fmt.Errorf("harvestor of inode %d not found", h.state.Inode)
  61. }
  62. func (r *Registrar) GetHarvester(i uint64) *Harvester {
  63. r.hLock.RLock()
  64. defer r.hLock.RUnlock()
  65. if h, ok := r.harvesters[i]; ok {
  66. return h
  67. }
  68. return nil
  69. }
  70. // Init sets up the Registrar and make sure the registry file is setup correctly
  71. func (r *Registrar) Init() (err error) {
  72. // Create directory if it does not already exist.
  73. registryPath := filepath.Dir(r.registryFile)
  74. err = os.MkdirAll(registryPath, 0750)
  75. if err != nil {
  76. return fmt.Errorf("Failed to created registry file dir %s: %v", registryPath, err)
  77. }
  78. // Check if files exists
  79. fileInfo, err := os.Lstat(r.registryFile)
  80. if os.IsNotExist(err) {
  81. log.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile)
  82. // No registry exists yet, write empty state to check if registry can be written
  83. return r.writeRegistry()
  84. }
  85. if err != nil {
  86. return err
  87. }
  88. // Check if regular file, no dir, no symlink
  89. if !fileInfo.Mode().IsRegular() {
  90. // Special error message for directory
  91. if fileInfo.IsDir() {
  92. return fmt.Errorf("Registry file path must be a file. %s is a directory.", r.registryFile)
  93. }
  94. return fmt.Errorf("Registry file path is not a regular file: %s", r.registryFile)
  95. }
  96. log.Info("Registry file set to: %s", r.registryFile)
  97. // load states
  98. if err = r.loadStates(); err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. // writeRegistry writes the new json registry file to disk.
  104. func (r *Registrar) writeRegistry() error {
  105. // First clean up states
  106. r.gcStates()
  107. // TODO lock for reading r.states.states
  108. tempfile, err := writeTmpFile(r.registryFile, r.states.states)
  109. if err != nil {
  110. return err
  111. }
  112. err = SafeFileRotate(r.registryFile, tempfile)
  113. if err != nil {
  114. return err
  115. }
  116. log.V(1).Info("Registry file %s updated. %d states written.", r.registryFile, len(r.states.states))
  117. return nil
  118. }
  119. // SafeFileRotate safely rotates an existing file under path and replaces it with the tempfile
  120. func SafeFileRotate(path, tempfile string) error {
  121. parent := filepath.Dir(path)
  122. if e := os.Rename(tempfile, path); e != nil {
  123. return e
  124. }
  125. // best-effort fsync on parent directory. The fsync is required by some
  126. // filesystems, so to update the parents directory metadata to actually
  127. // contain the new file being rotated in.
  128. f, err := os.Open(parent)
  129. if err != nil {
  130. return nil // ignore error
  131. }
  132. defer f.Close()
  133. f.Sync()
  134. return nil
  135. }
  136. // loadStates fetches the previous reading state from the configure RegistryFile file
  137. // The default file is `registry` in the data path.
  138. func (r *Registrar) loadStates() error {
  139. f, err := os.Open(r.registryFile)
  140. if err != nil {
  141. return err
  142. }
  143. defer f.Close()
  144. log.Info("Loading registrar data from %s", r.registryFile)
  145. states, err := readStatesFrom(f)
  146. if err != nil {
  147. return err
  148. }
  149. states = r.preProcessStates(states)
  150. r.states.SetStates(states)
  151. log.V(1).Info("States Loaded from registrar%s : %+v", r.registryFile, len(states))
  152. return nil
  153. }
  154. func (r *Registrar) preProcessStates(states map[uint64]State) map[uint64]State {
  155. for key, state := range states {
  156. // set all states to finished
  157. state.Finished = true
  158. states[key] = state
  159. }
  160. return states
  161. }
  162. func readStatesFrom(in io.Reader) (map[uint64]State, error) {
  163. states := make(map[uint64]State)
  164. decoder := json.NewDecoder(in)
  165. if err := decoder.Decode(&states); err != nil {
  166. return nil, fmt.Errorf("Error decoding states: %s", err)
  167. }
  168. return states, nil
  169. }
  170. func writeTmpFile(baseName string, states map[uint64]State) (string, error) {
  171. tempfile := baseName + ".new"
  172. f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0640)
  173. if err != nil {
  174. log.Error("Failed to create tempfile (%s) for writing: %s", tempfile, err)
  175. return "", err
  176. }
  177. defer f.Close()
  178. encoder := json.NewEncoder(f)
  179. if err := encoder.Encode(states); err != nil {
  180. log.Error("Error when encoding the states: %s", err)
  181. return "", err
  182. }
  183. // Commit the changes to storage to avoid corrupt registry files
  184. if err = f.Sync(); err != nil {
  185. log.Error("Error when syncing new registry file contents: %s", err)
  186. return "", err
  187. }
  188. return tempfile, nil
  189. }
  190. // gcStates runs a registry Cleanup. The method check if more event in the
  191. // registry can be gc'ed in the future. If no potential removable state is found,
  192. // the gcEnabled flag is set to false, indicating the current registrar state being
  193. // stable. New registry update events can re-enable state gc'ing.
  194. func (r *Registrar) gcStates() {
  195. //if !r.gcRequired {
  196. // return
  197. //}
  198. beforeCount := len(r.states.states)
  199. cleanedStates := r.states.Cleanup()
  200. log.V(1).Info(
  201. "Registrar states %s cleaned up. Before: %d, After: %d", r.registryFile,
  202. beforeCount, beforeCount-cleanedStates)
  203. }
  204. // FindPrevious lookups a registered state, that matching the new state.
  205. // Returns a zero-state if no match is found.
  206. func (r *Registrar) FindPrevious(newState State) State {
  207. return r.states.FindPrevious(newState)
  208. }
  209. func (r *Registrar) Run() {
  210. log.Info("Starting Registrar for: %s", r.registryFile)
  211. flushC := time.Tick(r.flushInterval)
  212. for {
  213. select {
  214. case <-flushC:
  215. r.flushRegistry()
  216. case state := <-r.Channel:
  217. r.processEventStates(state)
  218. case <-r.ctx.Done():
  219. r.flushRegistry()
  220. return
  221. }
  222. }
  223. }
  224. func (r *Registrar) flushRegistry() {
  225. if err := r.writeRegistry(); err != nil {
  226. log.Error("Writing of registry returned error: %v. Continuing...", err)
  227. }
  228. }
  229. // processEventStates gets the states from the events and writes them to the registrar state
  230. func (r *Registrar) processEventStates(state State) {
  231. r.states.UpdateWithTs(state, time.Now())
  232. }
  233. func (r *Registrar) SendStateUpdate(state State) {
  234. r.Channel <- state
  235. //select {
  236. //case r.Channel <- state:
  237. //default:
  238. // log.Warn("state update receiving chan full")
  239. //}
  240. }