123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import logging
- import math
- import cv2
- from PIL import Image, ImageStat
- from plugins import base
- from onvif import ONVIFCamera
- import datetime
- import zeep
- import uuid
- import upload
- def zeep_pythonvalue(self, xmlvalue):
- return xmlvalue
- zeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue
- class IPC(base.Base):
- def __init__(self, *args, **kwargs):
- super(IPC, self).__init__(*args, **kwargs)
- if kwargs.__contains__('name'):
- self.name = kwargs['name']
- else:
- self.name = 'ipc' + uuid.uuid1().hex[0:10]
- self.host = kwargs['host']
- self.port = kwargs['port']
- self.user = kwargs['user']
- self.password = kwargs['password']
- self.image_dir = kwargs['image_dir']
- self.wsdl_dir = 'resource/wsdl'
- if kwargs.__contains__('wsdl_dir') and kwargs['wsdl_dir'] != "":
- self.wsdl_dir = kwargs['wsdl_dir']
- self.kpi = kwargs['kpi']
- def name(self):
- return self.name
- def config_check(self, *args, **kwargs):
- pass
- def exec(self):
- image_name = self.snapshot()
- analyze_result = self.analyze()
- data = {'data': analyze_result, 'image': image_name}
- # upload.add_task(data)
- return
- def snapshot(self):
- camera = ONVIFCamera(self.host, self.port, self.user, self.password, self.wsdl_dir)
- media = camera.create_media_service()
- media_profile = media.GetProfiles()[0]
- profiles = media.GetProfiles()
- if len(profiles) < 1:
- return
- token = profiles[0]['token']
- snapshot_detail = media.GetSnapshotUri(token)
- snapshot_url = snapshot_detail['url']
- image_name = str(int(datetime.datetime.now().timestamp() * 1000)) + ".png"
- # download image via image url
- # urlretrieve(snapshot_url, path.join(self.image_dir, image_name))
- return image_name
- def analyze(self, path):
- if len(self.kpi) < 1:
- return
- kpi_result = {}
- if self.kpi.__contains__('img_cast'):
- kpi_result['img_cast'] = self.get_img_cast(path)
- if self.kpi.__contains__('img_ybcast'):
- kpi_result['img_ybcast'] = self.get_img_ybcast(path)
- if self.kpi.__contains__('img_light'):
- kpi_result['img_light'] = self.get_img_light(path)
- if self.kpi.__contains__('img_rgcast'):
- kpi_result['img_rgcast'] = self.get_img_rgcast(path)
- if self.kpi.__contains__('img_lightcast'):
- kpi_result['img_lightcast'] = self.get_img_lightcast(path)
- if self.kpi.__contains__('img_clear'):
- kpi_result['img_clear'] = self.get_img_clear(path)
- if self.kpi.__contains__('img_quality'):
- kpi_result['img_quality'] = self.get_img_quality(path)
- return kpi_result
- # https://blog.csdn.net/sparrowwf/article/details/86595162
- @staticmethod
- def get_img_cast(path):
- image = cv2.imread(path)
- img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
- l_channel, a_channel, b_channel = cv2.split(img)
- h, w, _ = img.shape
- da = a_channel.sum() / (h * w) - 128
- db = b_channel.sum() / (h * w) - 128
- histA = [0] * 256
- histB = [0] * 256
- for i in range(h):
- for j in range(w):
- ta = a_channel[i][j]
- tb = b_channel[i][j]
- histA[ta] += 1
- histB[tb] += 1
- msqA = 0
- msqB = 0
- for y in range(256):
- msqA += float(abs(y - 128 - da)) * histA[y] / (w * h)
- msqB += float(abs(y - 128 - db)) * histB[y] / (w * h)
- import math
- result = math.sqrt(da * da + db * db) / math.sqrt(msqA * msqA + msqB * msqB)
- return result
- @staticmethod
- def get_img_ybcast(path):
- return 1
- # https://stackoverflow.com/questions/3490727/what-are-some-methods-to-analyze-image-brightness-using-python
- @staticmethod
- def get_img_light(path):
- try:
- im = Image.open(path)
- stat = ImageStat.Stat(im)
- r, g, b = stat.mean
- return math.sqrt(0.241 * (r ** 2) + 0.691 * (g ** 2) + 0.068 * (b ** 2))
- except Exception as e:
- logging.error('[IPC] calculate image light with file: %s error: %s', path, e)
- return 0
- @staticmethod
- def get_img_rgcast(path):
- return 1
- @staticmethod
- def get_img_lightcast(path):
- return 1
- # https://blog.csdn.net/luolinll1212/article/details/84107066
- @staticmethod
- def get_img_clear(path):
- image = cv2.imread(path)
- img2gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
- result = cv2.Laplacian(img2gray, cv2.CV_64F).var()
- return result
- @staticmethod
- def get_img_quality(path):
- return 1
|