#!/usr/bin/env python # -*- coding:utf-8 -*- import datetime import threading import time import uuid import logging def generate_task_id(): return uuid.uuid1().hex class Timing: def __init__(self, sleep_interval=1): self.task = {} self.is_running = False self.lock = threading.Lock() self.sleep_interval = sleep_interval def add_task(self, name, interval, func, count, task_id=generate_task_id(), *args, **kwargs): """ Add a timing task and schedule will execute it. :param task_id: Unique task id. :param name: Task name. :param interval: The interval between the next task execution. :param func: The function of timing task execution. :param count: The total number of times the task is executed. If it is 0, there is no limit. :param args: :param kwargs: :return: """ if not isinstance(task_id, str): raise TypeError('task_id must be str') if not isinstance(interval, int): raise TypeError('interval must be int') if interval < 0: raise ValueError('interval must be bigger than 0') if not callable(func): raise TypeError('func must be func') if not isinstance(count, int): raise TypeError('count must be int') if count < 0: count = 0 self.lock.acquire() # exec_num: It is times which task has executed. task = {'name': name, 'interval': interval, 'func': func, 'exec_num': 0, 'args': args, 'kwargs': kwargs} if count > 0: task['count'] = count self.task[task_id] = task self.lock.release() def delete_tasks(self, task_ids): """ Delete the task from schedule by multi task_ids, if exist, return these. :param task_ids: multi task id. :return: """ elements = [] for key in task_ids: if self.task.__contains__(key): element = self.task.pop(key) elements.append(element) return elements def set_interval(self, interval): self.sleep_interval = interval def sleep(self): time.sleep(self.sleep_interval) def stop(self): self.is_running = False def run(self): self.is_running = True while True: if not self.is_running: logging.debug('[TIMING] timing server will end.') print('[TIMING] timing server will end.') return logging.debug('[TIMING] run with tasks length: %s', len(self.task)) clear_keys = [] self.lock.acquire() tasks = self.task self.lock.release() for task_id in tasks.keys(): task_detail = tasks[task_id] try: now = int(datetime.datetime.now().timestamp() * 1000) interval = task_detail['interval'] if interval - (now % interval) > 1: continue t = threading.Thread(target=task_detail['func'], name=task_detail['name']) t.setDaemon(True) t.start() task_detail['exec_num'] = task_detail['exec_num'] + 1 if task_detail.__contains__('count') and task_detail['exec_num'] >= task_detail['count']: clear_keys.append(task_id) except Exception as e: logging.error('[TIMING] timing meet error, id: %s, name: %s, error: %s', task_id, task_detail['name'], e) # clear past due keys self.lock.acquire() self.delete_tasks(clear_keys) self.lock.release() self.sleep() defaultTiming = Timing() def add_task(name, interval, func, count, task_id=generate_task_id(), *args, **kwargs): defaultTiming.add_task(name, interval, func, count, task_id, *args, **kwargs) def delete_tasks(task_ids): defaultTiming.delete_tasks(task_ids) def set_interval(interval): defaultTiming.set_interval(interval) def run(): defaultTiming.run() def stop(): defaultTiming.stop()