目 录CONTENT

文章目录

unittest自动化框架实战案例

懿曲折扇情
2022-04-20 / 0 评论 / 3 点赞 / 449 阅读 / 9,518 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-07,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
广告 广告

一、框架思路

(此代码只作为简单演示使用,因为好多问题没有考虑到,时间有限,没有做参数化,没有重跑机制,代码规范等等,请各位仅供参考。)

base:是基于seleniium的二次封装的点击、输入、刷新等操作
common:是基于业务的底层公共方法
config:配置文件
log:收集log的方法,以及生成的截图
excute_logs:生成的日志都会打印在一个文件
page_object:webui登录的方法和一些二次封装的方法
testcase:是testcase
reports:是生成reports的方法和生成的报告

二、框架代码展示

base_page.py文件

# coding=utf-8


import time
from time import sleep
from log.getLogStream import logStream

log = logStream()


# 创建基类
class BasePage:
    # driver = webdriver.Chrome()
    # 构造函数
    def __init__(self, driver):
        log.info('初始化driver{}'.format(driver))
        self.driver = driver

    # 访问URL
    def open(self, url):
        """
        function: 打开浏览器,访问url
        description:
        arg:
        return:
        """
        log.info('访问网址')
        self.driver.get(url)
        self.driver.maximize_window()
        sleep(3)

    # 元素定位
    def locator(self, loc):
        """
        function: 定位元素
        description:
        arg:
        return:
        """
        log.info('正在定位{}元素'.format(loc))
        return self.driver.find_element(*loc)

    # 输入
    def input_(self, loc, txt):
        """
        function: 输入
        description:
        arg:
        return:
        """
        try:
            log.info('正在定位{}元素, 输入{}内容'.format(loc, txt))
            self.locator(loc).send_keys(txt)
            sleep(2)
        except Exception as e:
            self.screenShot()
            log.error('错误日志' % e)

    # 点击
    def click(self, loc):
        """
        function: 点击
        description:
        arg:
        return:
        """
        try:
            log.info('正在点击{}元素'.format(loc))
            self.locator(loc).click()
        except Exception as e:
            self.screenShot()
            log.error('错误日志' % e)

    # 等待
    def wait(self, time_):
        """
        function: 等待
        description:
        arg:
        return:
        """
        log.info('等待时间{}秒'.format(time_))
        sleep(time_)

    # 关闭
    def quit(self):
        """
        function: 退出
        description:
        arg:
        return:
        """
        log.info('退出')
        self.driver.quit()

    # 最大化
    def maxSize(self):
        """
        function: 最大化
        description:
        arg:
        return:
        """
        log.info('最大化')
        self.driver.maximize_window()

    # 截图并保存
    def screenShot(self):
        """
        function: 绑定服务器
        description: bind服务器
        arg:
        return:
        """
        current_time = time.strftime('%Y-%m-%d %H-%M-%S')
        print(current_time)
        pic_path = '../log/screenshot' + '/' + current_time + '.png'
        self.driver.save_screenshot(pic_path)

    # 关闭浏览器
    def close(self):
        """
        function: 关闭当前浏览器
        description:
        arg:
        return:
        """
        self.driver.close()

    # 刷新浏览器
    def refresh(self):
        """
        function: 刷新浏览器
        description:
        arg:
        return:
        """
        self.driver.refresh()

    def waitUntilPageContains(self, message):
        """
        function: 等待界面出现某个字段
        description:
        arg:
        :return:
        example: self.waitUntilPageContains('vsite_automation')
        """
        log.info('正在获取页面%s字段' % message)
        sleep(3)
        msg = self.driver.find_element_by_xpath('//*[contains(text(), "%s")]' % message).text
        print(msg)
        return msg

common目录下的业务文件

import paramiko
from time import sleep
import re


class CLI:

    def ssh_ag(self):
        """

        :param self:
        :return:
        """
        # 创建ssh对象
        self.ssh = paramiko.SSHClient()
        # 允许连接不在know_hosts文件中的主机
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
        # 连接AG
        self.ssh.connect(hostname='192.168.120.220', port=22, username='gaojs', password='123')
        sleep(5)
        channel = self.ssh.invoke_shell()
        self.channel = channel
        channel.settimeout(5)
        sleep(5)
        self.cli_cmd('enable')
        self.cli_cmd('')
        self.cli_cmd('config ter')

    def print_step(self):
        """

        :return:
        """
        result = self.channel.recv(2048)
        print(result.decode())

getLogStream.py收集日志文件

import logging


def logStream():
    # 创建一个日志器
    logger = logging.getLogger()
    # 设置日志级别为info
    logger.setLevel(logging.INFO)
    # 日志想要输出到哪,就要制定输出目的地,控制台   要创建一个控制台处理器
    console = logging.StreamHandler()
    logger.addHandler(console)

    # 创建一个格式器
    # 设置日志格式
    fmt = '%(asctime)s %(filename)s %(levelname)s %(module)s %(funcName)s %(message)s'
    # 生成日志信息 生成时间  文件名   日志的状态  类名   方法名  日志内容
    fomator = logging.Formatter(fmt)
    # 优化及控制台格式
    console.setFormatter(fomator)

    # 创建一个处理器   文本处理器   制定日期信息输出到文本处理器
    filehandler = logging.FileHandler('../excute_logs/logger.log', encoding='utf-8')
    logger.addHandler(filehandler)

    # 设置logger.log文本格式
    filehandler.setFormatter(fomator)
    return logger

page_object目录下的login_page.py文件

from time import sleep

from selenium.webdriver.common.by import By
from base.base_page import BasePage
from selenium import webdriver


class LoginPage(BasePage):

    def login(self, username, password):
        """
        description:登录webui界面
        :param username:
        :param password:
        :return:
        example:
            self.login('array', 'admin')
        """
        self.driver = webdriver.Chrome()
        # URL
        # self.url = "https://%s:%s" % ip, port
        self.url = "https://192.168.120.209:8888"
        # 页面元素
        self.user = (By.NAME, "username")
        self.passwd = (By.ID, 'password')
        self.button = (By.ID, 'loginID')
        # 访问URL,最大化
        self.open(self.url)
        # 点击页面上不是隐私连接提示
        try:
            self.waitUntilPageContains('不是私密连接')
            self.driver.find_element_by_xpath('//button[@id="details-button"]').click()
            sleep(1)
            self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click()
        except:
            pass
        # 输入账号
        sleep(5)
        self.input_(self.user, username)
        # 输入密码
        self.input_(self.passwd, password)
        # 点击登录按钮
        self.click(self.button)
        sleep(5)
    
    def login_L3vpn_test(self, ip, method, username, password, challenge=None, challenge_passwd1=None, challenge_passwd2=None):
        """
        function:登录L3vpn
        :argument:
            ip: 虚拟站点IP
            method:方法名
            username:用户名
            password:密码
        :return:
        examlpe:
            self.login_L3vpn_test('192.168.120.x', 'http_challenge', 'array', 'admin')
            self.login_L3vpn_test(self.vsiteip, 'http_challenge_test', self.username, self.passwd,
                                    challenge=True, challenge_passwd1='chal1', challenge_passwd2='chal2')
        """
        self.driver = webdriver.Chrome()
        # URL
        self.url = "https://%s" % ip
        # 页面元素
        self.user = (By.NAME, "uname")
        self.passwd = (By.NAME, 'pwd')
        self.button = (By.NAME, 'submitbutton')
        self.select_method = (By.NAME, "method")
        self.challenge_signin = (By.NAME, "option")
        # 访问URL,最大化
        self.open(self.url)
        sleep(5)
        try:
            self.driver.find_element_by_xpath('//button[@id="details-button"]').click()
            sleep(1)
            self.driver.find_element_by_xpath('//a[@id="proceed-link"]').click()
        except:
            pass
        # 切换方法
        self.click(self.select_method)
        self.driver.find_element_by_xpath('//option[@value="%s"]' % method).click()
        # 输入账号
        sleep(5)
        self.input_(self.user, username)
        # 输入密码
        self.input_(self.passwd, password)
        # 点击登录按钮
        self.click(self.button)
        sleep(5)
        # 挑战模式
        if challenge:
            try:
                self.input_(self.passwd, challenge_passwd1)
                self.click(self.challenge_signin)
                self.input_(self.passwd, challenge_passwd2)
                self.click(self.challenge_signin)
                self.waitUntilPageContains('welcome to the ArrayOS')
            except Exception as e:
                print(e)
                print('挑战失败,请重试!')
        else:
            print('不符合挑战条件,请检查配置!')

reports目录下的testReports.py文件

import time
import unittest
from BeautifulReport import BeautifulReport


# 找到用例defaultTestLoader默认加载
# import HTMLTestRunner


def testReports():
    """
    function: 生成测试报告方法
    description: 生成测试报告
    arg:
    return:
    """
    case_dir = '../testcase/aaa_http/'
    discover = unittest.defaultTestLoader.discover(case_dir, 'test*.py')

    # 用时间命名测试报告   测试报告生成时间  +  后缀名   2021-11-20 14-49-30test_report.html
    report_dir = '../reports/'
    now = time.strftime('%Y-%m-%d %H-%M-%S')
    report_name = report_dir + '/' +now+ '_test_report.html'

    with open(report_name, 'wb') as f:
        # 执行用例
        # HTMLTestRunner.HTMLTestRunner(stream=f, verbosity=2, title='unittest测试报告练习', description='练习HTMLTestRunner使用').run(discover)
        BeautifulReport(discover).report(description=u'UAG每日构建测试报告', filename=report_name, report_dir='../reports/')

        

三、用例代码

testcase目录下的test_01文件

import unittest
from common.agCli import *
from page_object.login_page import *


class Testcase(unittest.TestCase, LoginPage, CLI):

    def setUp(self) -> None:
        self.ssh_ag()
        self.vsitename = 'vsite_automation'
        self.username = 'array'
        self.passwd = 'admin'
        self.message = 'vsite_automation'

    def test_01(self):
        """
        description:  cli创建虚拟站点,登录webui去查看是否创建成功,是否有vsite_automation虚拟站点信息
        :return:
        date: 2022/02/17
        author: gaojs
        """
        self.cli_cmd('virtual site name vsite_automation')
        self.login(self.username, self.passwd)
        self.switch_vsite(self.vsitename)
        msg = self.waitUntilPageContains(self.message)
        print(msg)
        if msg not in(self.message):
            raise Exception('切换虚拟站点失败,请重试!')

    # 恢复环境
    def tearDown(self) -> None:
        self.cli_cmd('no virtual site name vsite_automation')
        self.cli_cmd('YES')
        self.quit_enable()
        self.close()

压力测试用例test_02.py

from locust import HttpUser, between, task, TaskSet
import os
from common.agCli import *
import logging


class TaskTest(TaskSet, CLI):

    # 执行并发前置动作,比如清理当前所有session
    def on_start(self):
        """
        description:登录ag, 清理log
        :return:
        """
        self.ssh_ag()
        self.clear_log()
        logging.info('清理log结束,压测开始!!!')

    # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高
    @task
    def login(self):
        url = '/prx/000/http/localh/login'
        data = {
            "method": "http1",
            "uname": "gaojs",
            "pwd1": "",
            "pwd2": "",
            "pwd": "admin",
            "submitbutton": "Sign"
        }
        header = {"Content-Type": "application/json;charset=UTF-8"}
        self.client.request(method='POST', url=url, data=data, headers=header, name='登录虚拟站点', verify=False, allow_redirects=False)

    # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/
    def on_stop(self):
        self.ssh_ag()
        self.cli_cmd('switch vsite')
        self.cli_cmd('session kill all')
        logging.info('清理session结束,压测结束,请查看report, http://localhost:8089!!!')


class Login(HttpUser):
    host = 'https://192.168.120.206'
    # 每次请求停顿时间
    wait_time = between(1, 3)
    tasks = [TaskTest]


if __name__ == "__main__":
    os.system("locust -f locust_test.py --host=https://192.168.120.206")

四、测试报告

生成报告展示:

image-1650384632880

image-1650384642969

3

评论区