proxy.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. /*
  2. Copyright 2018 The Knative Authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package duck
  14. import (
  15. "sync"
  16. "k8s.io/apimachinery/pkg/watch"
  17. )
  18. // NewProxyWatcher is based on the same concept from Kubernetes apimachinery in 1.12 here:
  19. // https://github.com/kubernetes/apimachinery/blob/c6dd271be/pkg/watch/watch.go#L272
  20. // Replace this copy once we've update our client libraries.
  21. // proxyWatcher lets you wrap your channel in watch.Interface. Threadsafe.
  22. type proxyWatcher struct {
  23. result chan watch.Event
  24. stopCh chan struct{}
  25. mutex sync.Mutex
  26. stopped bool
  27. }
  28. var _ watch.Interface = (*proxyWatcher)(nil)
  29. // NewProxyWatcher creates new proxyWatcher by wrapping a channel
  30. func NewProxyWatcher(ch chan watch.Event) watch.Interface {
  31. return &proxyWatcher{
  32. result: ch,
  33. stopCh: make(chan struct{}),
  34. stopped: false,
  35. }
  36. }
  37. // Stop implements Interface
  38. func (pw *proxyWatcher) Stop() {
  39. pw.mutex.Lock()
  40. defer pw.mutex.Unlock()
  41. if !pw.stopped {
  42. pw.stopped = true
  43. close(pw.stopCh)
  44. }
  45. }
  46. // Stopping returns true if Stop() has been called
  47. func (pw *proxyWatcher) Stopping() bool {
  48. pw.mutex.Lock()
  49. defer pw.mutex.Unlock()
  50. return pw.stopped
  51. }
  52. // ResultChan implements watch.Interface
  53. func (pw *proxyWatcher) ResultChan() <-chan watch.Event {
  54. return pw.result
  55. }
  56. // StopChan returns stop channel
  57. func (pw *proxyWatcher) StopChan() <-chan struct{} {
  58. return pw.stopCh
  59. }