123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import unittest
- from plugins import base
- import uuid
- import logging
- from threading import Lock
- import datetime
- import schedule
- import time
- mock_lock = Lock()
- fail_num = success_num = 0
- class MockPlugin(base.Base):
- def __init__(self, *args, **kwargs):
- super(MockPlugin, self).__init__(*args, **kwargs)
- if kwargs.__contains__('name'):
- self.name = kwargs['name']
- else:
- self.name = 'ipc' + uuid.uuid1().hex[0:10]
- self.host = kwargs['host']
- self.port = kwargs['port']
- self.user = kwargs['username']
- self.password = kwargs['pwd']
- self.args = args
- self.kwargs = kwargs
- def name(self):
- return self.name
- def config_check(self, *args, **kwargs):
- if kwargs.__contains__('error'):
- return kwargs['error']
- return None
- @staticmethod
- def record(is_success):
- global success_num, fail_num
- mock_lock.acquire()
- if is_success:
- success_num += 1
- else:
- fail_num += 1
- mock_lock.release()
- def exec(self):
- err = self.config_check(**self.kwargs)
- if err is not None:
- logging.error('[MockPlugin] check config error: %s', err)
- self.record(False)
- return
- self.record(True)
- class ScheduleTest(unittest.TestCase):
- @staticmethod
- def generate_valid_config(success_count, fail_count):
- config = {
- 'id': int(datetime.datetime.now().timestamp())
- }
- data = []
- for i in range(success_count):
- item = {
- 'host': 'local.pc',
- 'port': 2000,
- 'dev_id': uuid.uuid1(),
- 'type': 'mock_plugin',
- 'schema': 'onvif',
- 'username': 'admin',
- 'pwd': '123456',
- 'expr': 10,
- 'kpi': {
- 'key1': 1
- }
- }
- data.append(item)
- for i in range(fail_count):
- item = {
- 'host': 'local.pc',
- 'port': 2000,
- 'dev_id': uuid.uuid1(),
- 'type': 'mock_plugin',
- 'schema': 'onvif',
- 'username': 'admin',
- 'pwd': '123456',
- 'expr': 10,
- 'kpi': {
- 'key1': 1
- },
- 'error': 'data error'
- }
- data.append(item)
- config['data'] = data
- return config
- @staticmethod
- def generate_invalid_config(count):
- config = {
- 'id': int(datetime.datetime.now().timestamp())
- }
- keys = ['host', 'port', 'dev_id', 'type', 'schema', 'username', 'pwd']
- data = []
- for count_index in range(count):
- item = {}
- index = count_index % len(keys)
- for i, k in enumerate(keys):
- if index == i:
- continue
- if k == 'port':
- item['port'] = 2000
- if k == 'type':
- item['type'] = 'invalid_type'
- else:
- item[k] = k
- data.append(item)
- config['data'] = data
- return config
- @staticmethod
- def generate_non_object(count):
- config = {
- 'id': int(datetime.datetime.now().timestamp())
- }
- data = []
- for index in range(count):
- data.append(index)
- config['data'] = data
- return config
- @staticmethod
- def reset():
- global success_num, fail_num
- success_num = 0
- fail_num = 0
- def test_schedule_serial(self):
- # config is None
- scheduler = schedule.Schedule(None)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(0, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # config have no field data
- scheduler = schedule.Schedule({'id': int(datetime.datetime.now().timestamp())})
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(0, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # config.data is not list
- scheduler = schedule.Schedule({
- 'id': int(datetime.datetime.now().timestamp()),
- 'data': 10
- })
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(0, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # valid config, 10 success, 10 fail.
- config = self.generate_valid_config(10, 10)
- scheduler = schedule.Schedule(config)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(10, success_num)
- self.assertEqual(10, fail_num)
- self.reset()
- # invalid config, 0 success, 0 fail.
- config = self.generate_invalid_config(10)
- scheduler = schedule.Schedule(config)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- # As invalid config, plugin won't be execute.
- self.assertEqual(0, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # non-object config, 0 success, 0 fail.
- config = self.generate_non_object(10)
- scheduler = schedule.Schedule(config)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(0, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # invalid plugin type, 5 success, 0 fail
- config = self.generate_valid_config(10, 0)
- config_data = config['data']
- for index, item in enumerate(config_data):
- if index == 5:
- break
- config_data[index]['type'] = 'mock_type'
- del config_data[index]['expr']
- del config_data[index]['kpi']
- config['data'] = config_data
- scheduler = schedule.Schedule(config)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(5, success_num)
- self.assertEqual(0, fail_num)
- self.reset()
- # multi config, 10 success, 10 fail
- config_data = [None]
- config = self.generate_valid_config(10, 10)
- invalid_config = self.generate_invalid_config(10)
- non_object_config = self.generate_non_object(10)
- invalid_type_config = self.generate_valid_config(10, 0)
- invalid_type_data = invalid_type_config['data']
- for index, item in enumerate(invalid_type_data):
- invalid_type_data[index]['type'] = 'mock_type'
- config_data = config_data + config['data']
- config_data = config_data + invalid_config['data']
- config_data = config_data + non_object_config['data']
- config_data = config_data + invalid_type_data
- config['data'] = config_data
- # check config
- self.assertEqual(len(config_data), 51)
- scheduler = schedule.Schedule(config)
- scheduler.plugins = {
- 'mock_plugin': MockPlugin
- }
- scheduler.deliver()
- time.sleep(1)
- self.assertEqual(10, success_num)
- self.assertEqual(10, fail_num)
- self.reset()
- def test_schedule_concurrent(self):
- pass
- if __name__ == '__main__':
- unittest.main()
|