agent.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package config
  14. import (
  15. "os"
  16. "sync"
  17. "time"
  18. "github.com/sirupsen/logrus"
  19. )
  20. // Agent watches a path and automatically loads the config stored
  21. // therein.
  22. type Agent struct {
  23. sync.Mutex
  24. c *Config
  25. subscriptions []chan<- ConfigDelta
  26. }
  27. // Start will begin polling the config file at the path. If the first load
  28. // fails, Start with return the error and abort. Future load failures will log
  29. // the failure message but continue attempting to load.
  30. func (ca *Agent) Start(prowConfig, jobConfig string) error {
  31. c, err := Load(prowConfig, jobConfig)
  32. if err != nil {
  33. return err
  34. }
  35. ca.Set(c)
  36. go func() {
  37. var lastModTime time.Time
  38. // Rarely, if two changes happen in the same second, mtime will
  39. // be the same for the second change, and an mtime-based check would
  40. // fail. Reload periodically just in case.
  41. skips := 0
  42. for range time.Tick(1 * time.Second) {
  43. if skips < 600 {
  44. // Check if the file changed to see if it needs to be re-read.
  45. // os.Stat follows symbolic links, which is how ConfigMaps work.
  46. prowStat, err := os.Stat(prowConfig)
  47. if err != nil {
  48. logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
  49. continue
  50. }
  51. recentModTime := prowStat.ModTime()
  52. // TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
  53. if jobConfig != "" {
  54. jobConfigStat, err := os.Stat(jobConfig)
  55. if err != nil {
  56. logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
  57. continue
  58. }
  59. if jobConfigStat.ModTime().After(recentModTime) {
  60. recentModTime = jobConfigStat.ModTime()
  61. }
  62. }
  63. if !recentModTime.After(lastModTime) {
  64. skips++
  65. continue // file hasn't been modified
  66. }
  67. lastModTime = recentModTime
  68. }
  69. if c, err := Load(prowConfig, jobConfig); err != nil {
  70. logrus.WithField("prowConfig", prowConfig).
  71. WithField("jobConfig", jobConfig).
  72. WithError(err).Error("Error loading config.")
  73. } else {
  74. skips = 0
  75. ca.Set(c)
  76. }
  77. }
  78. }()
  79. return nil
  80. }
  81. type ConfigDelta struct {
  82. Before, After Config
  83. }
  84. // Subscribe registers the channel for messages on config reload.
  85. // The caller can expect a copy of the previous and current config
  86. // to be sent down the subscribed channel when a new configuration
  87. // is loaded.
  88. func (ca *Agent) Subscribe(subscription chan<- ConfigDelta) {
  89. ca.Lock()
  90. defer ca.Unlock()
  91. ca.subscriptions = append(ca.subscriptions, subscription)
  92. }
  93. // Config returns the latest config. Do not modify the config.
  94. func (ca *Agent) Config() *Config {
  95. ca.Lock()
  96. defer ca.Unlock()
  97. return ca.c
  98. }
  99. // Set sets the config. Useful for testing.
  100. func (ca *Agent) Set(c *Config) {
  101. ca.Lock()
  102. defer ca.Unlock()
  103. var oldConfig Config
  104. if ca.c != nil {
  105. oldConfig = *ca.c
  106. }
  107. delta := ConfigDelta{oldConfig, *c}
  108. for _, subscription := range ca.subscriptions {
  109. // we can't let unbuffered channels for subscriptions lock us up
  110. // here, so we will send events best-effort into the channels we have
  111. go func(out chan<- ConfigDelta) { out <- delta }(subscription)
  112. }
  113. ca.c = c
  114. }