cron.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. // Package cron implements a cron spec parser and runner.
  2. package cron // import "gopkg.in/robfig/cron.v2"
  3. import (
  4. "sort"
  5. "time"
  6. )
  7. // Cron keeps track of any number of entries, invoking the associated func as
  8. // specified by the schedule. It may be started, stopped, and the entries may
  9. // be inspected while running.
  10. type Cron struct {
  11. entries []*Entry
  12. stop chan struct{}
  13. add chan *Entry
  14. remove chan EntryID
  15. snapshot chan []Entry
  16. running bool
  17. nextID EntryID
  18. }
  19. // Job is an interface for submitted cron jobs.
  20. type Job interface {
  21. Run()
  22. }
  23. // Schedule describes a job's duty cycle.
  24. type Schedule interface {
  25. // Next returns the next activation time, later than the given time.
  26. // Next is invoked initially, and then each time the job is run.
  27. Next(time.Time) time.Time
  28. }
  29. // EntryID identifies an entry within a Cron instance
  30. type EntryID int
  31. // Entry consists of a schedule and the func to execute on that schedule.
  32. type Entry struct {
  33. // ID is the cron-assigned ID of this entry, which may be used to look up a
  34. // snapshot or remove it.
  35. ID EntryID
  36. // Schedule on which this job should be run.
  37. Schedule Schedule
  38. // Next time the job will run, or the zero time if Cron has not been
  39. // started or this entry's schedule is unsatisfiable
  40. Next time.Time
  41. // Prev is the last time this job was run, or the zero time if never.
  42. Prev time.Time
  43. // Job is the thing to run when the Schedule is activated.
  44. Job Job
  45. }
  46. // Valid returns true if this is not the zero entry.
  47. func (e Entry) Valid() bool { return e.ID != 0 }
  48. // byTime is a wrapper for sorting the entry array by time
  49. // (with zero time at the end).
  50. type byTime []*Entry
  51. func (s byTime) Len() int { return len(s) }
  52. func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  53. func (s byTime) Less(i, j int) bool {
  54. // Two zero times should return false.
  55. // Otherwise, zero is "greater" than any other time.
  56. // (To sort it at the end of the list.)
  57. if s[i].Next.IsZero() {
  58. return false
  59. }
  60. if s[j].Next.IsZero() {
  61. return true
  62. }
  63. return s[i].Next.Before(s[j].Next)
  64. }
  65. // New returns a new Cron job runner.
  66. func New() *Cron {
  67. return &Cron{
  68. entries: nil,
  69. add: make(chan *Entry),
  70. stop: make(chan struct{}),
  71. snapshot: make(chan []Entry),
  72. remove: make(chan EntryID),
  73. running: false,
  74. }
  75. }
  76. // FuncJob is a wrapper that turns a func() into a cron.Job
  77. type FuncJob func()
  78. func (f FuncJob) Run() { f() }
  79. // AddFunc adds a func to the Cron to be run on the given schedule.
  80. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
  81. return c.AddJob(spec, FuncJob(cmd))
  82. }
  83. // AddJob adds a Job to the Cron to be run on the given schedule.
  84. func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
  85. schedule, err := Parse(spec)
  86. if err != nil {
  87. return 0, err
  88. }
  89. return c.Schedule(schedule, cmd), nil
  90. }
  91. // Schedule adds a Job to the Cron to be run on the given schedule.
  92. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
  93. c.nextID++
  94. entry := &Entry{
  95. ID: c.nextID,
  96. Schedule: schedule,
  97. Job: cmd,
  98. }
  99. if !c.running {
  100. c.entries = append(c.entries, entry)
  101. } else {
  102. c.add <- entry
  103. }
  104. return entry.ID
  105. }
  106. // Entries returns a snapshot of the cron entries.
  107. func (c *Cron) Entries() []Entry {
  108. if c.running {
  109. c.snapshot <- nil
  110. return <-c.snapshot
  111. }
  112. return c.entrySnapshot()
  113. }
  114. // Entry returns a snapshot of the given entry, or nil if it couldn't be found.
  115. func (c *Cron) Entry(id EntryID) Entry {
  116. for _, entry := range c.Entries() {
  117. if id == entry.ID {
  118. return entry
  119. }
  120. }
  121. return Entry{}
  122. }
  123. // Remove an entry from being run in the future.
  124. func (c *Cron) Remove(id EntryID) {
  125. if c.running {
  126. c.remove <- id
  127. } else {
  128. c.removeEntry(id)
  129. }
  130. }
  131. // Start the cron scheduler in its own go-routine.
  132. func (c *Cron) Start() {
  133. c.running = true
  134. go c.run()
  135. }
  136. // run the scheduler.. this is private just due to the need to synchronize
  137. // access to the 'running' state variable.
  138. func (c *Cron) run() {
  139. // Figure out the next activation times for each entry.
  140. now := time.Now().Local()
  141. for _, entry := range c.entries {
  142. entry.Next = entry.Schedule.Next(now)
  143. }
  144. for {
  145. // Determine the next entry to run.
  146. sort.Sort(byTime(c.entries))
  147. var effective time.Time
  148. if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
  149. // If there are no entries yet, just sleep - it still handles new entries
  150. // and stop requests.
  151. effective = now.AddDate(10, 0, 0)
  152. } else {
  153. effective = c.entries[0].Next
  154. }
  155. select {
  156. case now = <-time.After(effective.Sub(now)):
  157. // Run every entry whose next time was this effective time.
  158. for _, e := range c.entries {
  159. if e.Next != effective {
  160. break
  161. }
  162. go e.Job.Run()
  163. e.Prev = e.Next
  164. e.Next = e.Schedule.Next(effective)
  165. }
  166. continue
  167. case newEntry := <-c.add:
  168. c.entries = append(c.entries, newEntry)
  169. newEntry.Next = newEntry.Schedule.Next(now)
  170. case <-c.snapshot:
  171. c.snapshot <- c.entrySnapshot()
  172. case id := <-c.remove:
  173. c.removeEntry(id)
  174. case <-c.stop:
  175. return
  176. }
  177. now = time.Now().Local()
  178. }
  179. }
  180. // Stop the cron scheduler.
  181. func (c *Cron) Stop() {
  182. c.stop <- struct{}{}
  183. c.running = false
  184. }
  185. // entrySnapshot returns a copy of the current cron entry list.
  186. func (c *Cron) entrySnapshot() []Entry {
  187. var entries = make([]Entry, len(c.entries))
  188. for i, e := range c.entries {
  189. entries[i] = *e
  190. }
  191. return entries
  192. }
  193. func (c *Cron) removeEntry(id EntryID) {
  194. var entries []*Entry
  195. for _, e := range c.entries {
  196. if e.ID != id {
  197. entries = append(entries, e)
  198. }
  199. }
  200. c.entries = entries
  201. }