首页 > Python资料 博客日记

pytest+allure+excel实现python调CANoe进行诊断自动化测试(二)

2024-08-08 09:00:06Python资料围观97

本篇文章分享pytest+allure+excel实现python调CANoe进行诊断自动化测试(二),对你有帮助的话记得收藏一下,看Python资料网收获更多编程知识

一、前言

本文主要介绍整体框架构成及python调CANoe COM接口封装的方法

二、测试框架的好处

当我们开发软件时,需要确保软件的各个部分能正常工作。为了做到这一点,我们需要测试软件,即检查软件是否按照我们预期的方式运行。传统的测试通常是手动的,即人工点击按钮、输入数据来模拟用户行为,来检查软件功能是否正常。但是这种方式效率低下且容易出错。
为了提高测试效率和质量,我们可以构建一个自动化测试框架。这个框架就像一个机器人,能自动执行测试用例,验证软件的功能是否正常。构建自动化测试框架的好处是可以节省时间和人力资源,并确保每次更新后软件依然正常运行。同时,自动化测试框架可以帮助我们更方便地合作,共享测试用例和结果,确保软件质量。因此,构建自动化测试框架可以帮助我们更快速、更准确地测试软件,确保软件质量。

主要好处分为以下几点:

1. 提高测试效率:自动化测试框架可以自动执行测试用例,省去了手动测试的时间和劳动成本。测试团队可以更快速地执行测试,并及时发现问题。
2. 提高测试覆盖率:自动化测试框架可以帮助测试团队更全面地覆盖功能、边界条件和各种场景,确保软件质量。
3. 降低测试成本:虽然构建自动化测试框架需要一定的投入,但长期来看,自动化测试可以减少重复劳动和人力资源成本。同时,自动化测试框架可以更快速地应对软件变更,减少因人为错误导致的质量问题。
4. 提高持续集成与交付软件的速度:自动化测试框架可以与持续集成工具(如 Jenkins)和持续交付流程结合,确保每次代码提交都能进行自动化测试,从而提高软件交付速度和质量。
5. 方便协作与回归测试:构建自动化测试框架可以使测试团队更容易共享测试用例和测试结果,方便团队合作。同时,自动化测试框架可以帮助团队更方便地进行回归测试,确保每次更新后软件依然符合预期。

三、框架构成

从上往下依次介绍:

allure-report

主要存放生成完的allure测试报告,之后执行结束后打开index.html即可查看完整的测试报告

comm

存放公用方法,因为是自动化测试框架嘛,后面后其他自动化测试要实现时需要用到的公用方法都在此处存放使用

canoe_application.py主要是python调CANoe的方法封装

execute_excel.py主要是读写excel的方法

security_key.py主要是进行安全解锁的方法

log

用于存放日志,目前只存放了CANoe生成的trace日志

report

存放allure生成的json文件。

Allure生成的.json文件是一种用于存储测试结果数据的格式化文件。通常,当运行测试并生成Allure测试报告时,Allure会生成.json文件来存储测试执行的详细信息,包括测试用例的执行结果、测试用例的状态、测试步骤的日志、附加信息等

里面的environment.properties是allure报告的配置文件

testcase

存放的是测试脚本。

testdata

存放测试数据。

根目录下的文件

config.ini:配置路径、设备等

environment.properties:测试报告环境配置,方便复制到allure生成的json文件中

main.py:执行文件

四、python调CANoe COM封装

canoe_application.py

import os
import time
from win32com.client import *
from pathlib import Path

class CANoe:
    def __init__(self):
        # self.application = None
        self.application = DispatchEx("CANoe.Application")
        self.ver = self.application.Version
        print('Loaded CANoe version', self.ver.major, '.', self.ver.minor, '.', self.ver.Build, '...')
        self.logging_collection = None
        self.logger = None
        self.log_target_dir = None
        self.Measurement = self.application.Measurement.Running

    def open_cfg(self, cfgname):
        """
        打开工程
        :param cfgname:
        :return:
        """
        if(self.application != None):
            if os.path.isfile(cfgname) and (os.path.splitext(cfgname)[1] == '.cfg'):
                self.application.Open(cfgname)
                print("opening..."+cfgname)
            else:
                raise NameError("Can't find CANoe cfg file")
        else:
            raise RuntimeError("CANoe Application is missing, unable to open simulation")


    def close_cfg(self):
        """
        关闭工程
        :return:
        """
        if(self.application != None):
            self.application.Quit()
            self.application = None

    def start_Measurement(self):
        """
        运行
        :return:
        """
        retry = 0
        retry_counter = 5  # 尝试5次
        while not self.application.Measurement.Running and (retry < retry_counter):
            self.application.Measurement.Start()
            time.sleep(1)
            retry += 1
        if(retry == retry_counter):
            raise RuntimeWarning("CANoe start measurement failed, Please Check Connection!")

    def stop_Measurement(self):
        """
        停止
        :return:
        """
        if self.application.Measurement.Running:
            self.application.Measurement.Stop()
        else:
            pass

    def send_Diag_Request(self, qualifier, stream_flag=False):
        """
        发送诊断请求
        :param qualifier: 限定符
        :param stream_flag: 是否是给定的限定符
        :return:
        """
        data = qualifier
        my_bytearray = bytearray.fromhex(data)
        qualifier_list = list(my_bytearray)
        self._diag_Request(qualifier_list, stream_flag)

        # self.DiagDev = self.application.Networks("CAN").Devices("EPS").Diagnostic
        # self.DiagReq = self.DiagDev.CreateRequest('0x10 0x01')
        # self.DiagReq.Send()
        # self.Diag.DiagStartTesterPresent()

    def init_service(self, network, device):
        """
        建立诊断服务
        :param network: 网段
        :param device: ECU qualifier
        :return:
        """
        time.sleep(10)
        self.__gDiag = self.application.Networks(network).Devices(device).Diagnostic
        # self.__gDiag.DiagStartTesterPresent()


    def start_TesterPresent(self):
        """
        开启诊断仪在线
        :return:
        """

        self.__gDiag.DiagStartTesterPresent()

    def stop_TesterPresent(self):
        """
        停止诊断仪在线
        :return:
        """
        self.__gDiag.DiagStopTesterPresent()

    def _diag_Request(self, qualifier, stream_flag=False):
        """
        执行服务
        :param qualifier:
        :param stream_flag: True以字节流形式发送,False根据具体限定符发送
        :return:
        """

        global diag_request
        if stream_flag:
            pass
        else:
            for a in qualifier:
                print('%x' % a, end=' ')
        if not stream_flag:
            diag_request = self.__gDiag.CreateRequest(qualifier)
        else:
            diag_request = self.__gDiag.CreateRequestFromStream(bytearray(qualifier))
            # else:
            #     diag_request = self.__gDiag.SetComplexParameter(bytearray(qualifier), iteration, subParameter)

        diag_request.Send()

    def check_Diag_Response(self):
        return self._diag_Response()

    def check_positive(self):
        global diag_request
        Res = ''
        while diag_request.Pending:
            time.sleep(1)
        if diag_request.Responses.Count == 0:
            Res = "NO RESPONSE"
        else:
            for k in range(1, diag_request.Responses.Count + 1):
                diag_resp = diag_request.Responses(k)
                stream = diag_resp.Stream
            if diag_resp.Positive:
                return True
            else:
                return False

    def _diag_Response(self):
        """
        诊断响应
        :return:
        """
        global diag_request
        Res = ''
        positive = True
        while diag_request.Pending:
            time.sleep(1)
        if diag_request.Responses.Count == 0:
            Res = "NO RESPONSE"
        else:
            for k in range(1, diag_request.Responses.Count + 1):
                diag_resp = diag_request.Responses(k)
                stream = diag_resp.Stream
            if diag_resp.Positive:
                positive = True
            else:
                positive = False

            for i in stream:
                Res = Res + '{:02X} '.format(i)
        return Res, positive


    def init_logging_collection(self):
        """

        :return:
        """
        self.logging_collection = self.application.Configuration.OnlineSetup.LoggingCollection

    def select_logger(self, logger_number:int = None):
        """

        :param logger_number:
        :return:
        """
        self.logger = self.logging_collection.Item(logger_number)

    def set_log_target_dir(self, abs_path_to_dir = None):
        """

        :param abs_path_to_dir: log存放位置
        :return:
        """
        path = Path(abs_path_to_dir)
        if path.exists():
            self.log_target_dir = path
        else:
            raise Exception("CANoe Wrapper: Log target directory does not exist")

    def set_logfile_name(self, file_name:str = None):
        """

        设置日志名字
        :param file_name:
        :return:
        """
        file = Path(file_name)
        if file.suffix == '.blf':
            full_path = Path.joinpath(self.log_target_dir, file)
            self.logger.fullName = str(full_path)
        else:
            raise Exception("CANoe Wrapper: Log target file has incorrect extension")

    def start_logging(self):
        """
        开始记录日志
        :return:
        """
        if self.logger is not None:
            self.logger.Trigger.Start()
        else:
            raise Exception("CANoe Wrapper: No active logger was set")

    def stop_logging(self):
        """
        停止记录日志
        :return:
        """
        if self.logger is not None:
            self.logger.Trigger.Stop()
        else:
            raise Exception("CANoe Wrapper: No active logger was set")

    def get_logger_status(self):
        """
        获取日志状态
        :return: 返回TRUE 表示log状态为running,返回FALSE 表示log状态为stopped
        """
        if self.logger is not None:
            return str(self.logger.Trigger.Active)

    def sleep(self, ms:int):
        time.sleep(ms/1000)



if __name__ == '__main__':
    app = CANoe()
    app.open_cfg(r"D:\work\Partial_SDA\workspace\canoeworkspace\ECUTestwork.cfg")
    time.sleep(1)

CANoe的COM接口在帮助文档中可以找到:


版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐