timing.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import datetime
  4. import threading
  5. import time
  6. import uuid
  7. import logging
  8. def generate_task_id():
  9. return uuid.uuid1().hex
  10. class Timing:
  11. def __init__(self, sleep_interval=1):
  12. self.task = {}
  13. self.is_running = False
  14. self.lock = threading.Lock()
  15. self.sleep_interval = sleep_interval
  16. def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
  17. """
  18. Add a timing task and schedule will execute it.
  19. :param task_id: Unique task id.
  20. :param name: Task name.
  21. :param interval: The interval between the next task execution.
  22. :param func: The function of timing task execution.
  23. :param count: The total number of times the task is executed. If it is 0, there is no limit.
  24. :param args:
  25. :param kwargs:
  26. :return:
  27. """
  28. if not isinstance(task_id, str):
  29. raise TypeError('task_id must be str')
  30. if not isinstance(interval, int):
  31. raise TypeError('interval must be int')
  32. if interval < 0:
  33. raise ValueError('interval must be bigger than 0')
  34. if not callable(func):
  35. raise TypeError('func must be func')
  36. if not isinstance(count, int):
  37. raise TypeError('count must be int')
  38. if count < 0:
  39. count = 0
  40. self.lock.acquire()
  41. # exec_num: It is times which task has executed.
  42. task = {'name': name, 'interval': interval, 'func': func, 'exec_num': 0,
  43. 'args': args, 'kwargs': kwargs}
  44. if count > 0:
  45. task['count'] = count
  46. self.task[task_id] = task
  47. self.lock.release()
  48. def delete_tasks(self, task_ids):
  49. """
  50. Delete the task from schedule by multi task_ids, if exist, return these.
  51. :param task_ids: multi task id.
  52. :return:
  53. """
  54. elements = []
  55. for key in task_ids:
  56. if self.task.__contains__(key):
  57. element = self.task.pop(key)
  58. elements.append(element)
  59. return elements
  60. def set_interval(self, interval):
  61. self.sleep_interval = interval
  62. def sleep(self):
  63. time.sleep(self.sleep_interval)
  64. def stop(self):
  65. self.is_running = False
  66. def run(self):
  67. self.is_running = True
  68. while True:
  69. if not self.is_running:
  70. logging.debug('[TIMING] timing server will end.')
  71. print('[TIMING] timing server will end.')
  72. return
  73. logging.debug('[TIMING] run with tasks length: %s', len(self.task))
  74. clear_keys = []
  75. self.lock.acquire()
  76. tasks = self.task
  77. self.lock.release()
  78. for task_id in tasks.keys():
  79. task_detail = tasks[task_id]
  80. try:
  81. now = int(datetime.datetime.now().timestamp() * 1000)
  82. interval = task_detail['interval']
  83. if interval - (now % interval) > 1:
  84. continue
  85. t = threading.Thread(target=task_detail['func'], name=task_detail['name'])
  86. t.setDaemon(True)
  87. t.start()
  88. task_detail['exec_num'] = task_detail['exec_num'] + 1
  89. if task_detail.__contains__('count') and task_detail['exec_num'] >= task_detail['count']:
  90. clear_keys.append(task_id)
  91. except Exception as e:
  92. logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s',
  93. task_id, task_detail['name'], e)
  94. # clear past due keys
  95. self.lock.acquire()
  96. self.delete_tasks(clear_keys)
  97. self.lock.release()
  98. self.sleep()
  99. defaultTiming = Timing()
  100. def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
  101. defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs)
  102. def delete_tasks(task_ids):
  103. defaultTiming.delete_tasks(task_ids)
  104. def set_interval(interval):
  105. defaultTiming.set_interval(interval)
  106. def run():
  107. defaultTiming.run()
  108. def stop():
  109. defaultTiming.stop()