123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import datetime
- import threading
- import time
- import uuid
- import logging
- def generate_task_id():
- return uuid.uuid1().hex
- class Timing:
- def __init__(self, sleep_interval=1):
- self.task = {}
- self.is_running = False
- self.lock = threading.Lock()
- self.sleep_interval = sleep_interval
- def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
- """
- Add a timing task and schedule will execute it.
- :param task_id: Unique task id.
- :param name: Task name.
- :param interval: The interval between the next task execution.
- :param func: The function of timing task execution.
- :param count: The total number of times the task is executed. If it is 0, there is no limit.
- :param args:
- :param kwargs:
- :return:
- """
- if not isinstance(task_id, str):
- raise TypeError('task_id must be str')
- if not isinstance(interval, int):
- raise TypeError('interval must be int')
- if interval < 0:
- raise ValueError('interval must be bigger than 0')
- if not callable(func):
- raise TypeError('func must be func')
- if not isinstance(count, int):
- raise TypeError('count must be int')
- if count < 0:
- count = 0
- self.lock.acquire()
- # exec_num: It is times which task has executed.
- task = {'name': name, 'interval': interval, 'func': func, 'exec_num': 0,
- 'args': args, 'kwargs': kwargs}
- if count > 0:
- task['count'] = count
- self.task[task_id] = task
- self.lock.release()
- def delete_tasks(self, task_ids):
- """
- Delete the task from schedule by multi task_ids, if exist, return these.
- :param task_ids: multi task id.
- :return:
- """
- elements = []
- for key in task_ids:
- if self.task.__contains__(key):
- element = self.task.pop(key)
- elements.append(element)
- return elements
- def set_interval(self, interval):
- self.sleep_interval = interval
- def sleep(self):
- time.sleep(self.sleep_interval)
- def stop(self):
- self.is_running = False
- def run(self):
- self.is_running = True
- while True:
- if not self.is_running:
- logging.debug('[TIMING] timing server will end.')
- print('[TIMING] timing server will end.')
- return
- logging.debug('[TIMING] run with tasks length: %s', len(self.task))
- clear_keys = []
- self.lock.acquire()
- tasks = self.task
- self.lock.release()
- for task_id in tasks.keys():
- task_detail = tasks[task_id]
- try:
- now = int(datetime.datetime.now().timestamp() * 1000)
- interval = task_detail['interval']
- if interval - (now % interval) > 1:
- continue
- t = threading.Thread(target=task_detail['func'], name=task_detail['name'])
- t.setDaemon(True)
- t.start()
- task_detail['exec_num'] = task_detail['exec_num'] + 1
- if task_detail.__contains__('count') and task_detail['exec_num'] >= task_detail['count']:
- clear_keys.append(task_id)
- except Exception as e:
- logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s',
- task_id, task_detail['name'], e)
- # clear past due keys
- self.lock.acquire()
- self.delete_tasks(clear_keys)
- self.lock.release()
- self.sleep()
- defaultTiming = Timing()
- def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs):
- defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs)
- def delete_tasks(task_ids):
- defaultTiming.delete_tasks(task_ids)
- def set_interval(interval):
- defaultTiming.set_interval(interval)
- def run():
- defaultTiming.run()
- def stop():
- defaultTiming.stop()
|