目 录CONTENT

文章目录

locust实战

懿曲折扇情
2022-05-06 / 0 评论 / 4 点赞 / 143 阅读 / 8,755 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-07,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
广告 广告

一.依赖接口

panglu_test.py文件
# coding=utf-8
"""
    作者:gaojs
    功能:
    新增功能:
    日期:2022/4/8 18:03
"""
import json
import os.path
import pprint
import time

import requests
# 定义xml转json的函数
import xmltodict as xmltodict
# 强制去掉控制台InsecureRequestWarning
import urllib3

urllib3.disable_warnings()


class Stress:
    """
    旁路认证类
    """
    def __init__(self):
        self.headers = {
            'Content-Type': 'text/xml',
            'Connection': 'close',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'
        }
        self.session = requests.session()
        self.session.keep_alive = False
        self.proxies = {"http": None, "https": None}

    def xmltojson(self, xml):
        """
        xml转成json
        :param xml:
        :return:
        """
        xml_data = xmltodict.parse(xml)
        json_data = json.dumps(xml_data, indent=1)
        return json_data

    def post_random(self, vsiteIp, appId):
        """
        获取十位随机数方法
        :return:
        """
        url = 'https://%s/auth/getRandom' % vsiteIp
        data = '<?xml version="1.0" encoding="UTF-8"?>' \
                '<message>' \
                '<head>' \
                '<version>1.0</version>' \
                '<serviceType>OriginalService</serviceType>' \
                '</head>'\
                '<body>' \
                f'<appId>{appId}</appId>' \
                '</body>' \
                '</message>'
        try:
            rsp = self.session.post(url, data=data, headers=self.headers, verify=False, proxies=self.proxies)
            # print(rsp.text)
            json_data = self.xmltojson(rsp.text)
            json_data = json.loads(json_data)
            random_value = json_data['message']['body']['original']
        except requests.exceptions.ConnectionError:
            rsp.status_code = "Connection refused"
        # print(json_data, type(json_data))
        # print(random_value)
        return random_value

    def certificate_list(self):
        """
        获取证书列表
        :return:
        """
        url = r'https://127.0.0.1:63451/NSSkfGetCertsListInfo?DllFilePath=D:\旁路认证\GM3000\mtoken_GM.dll'
        rsp = self.session.get(url, verify=False, proxies=self.proxies)
        json_data = json.loads(rsp.text)
        # print(json_data[0])

    def skfAttach(self, PlainText, CertIndex0, ukeyPIN, sm3HashNum):
        """
        skfAttach签名
        :return:
        """
        url = f'https://127.0.0.1:63451/NSSkfAttachedSign?PlainText={PlainText}&CertIndex={CertIndex0}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
        try:
            rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
            json_data = json.loads(rsp.text)
            signature_value = json_data[0]['signedData']
            # print(signature_value)
            # print(signature_value)
            # 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
            s1 = signature_value.replace('-', '+')
            s2 = s1.replace('_', '/')
            return s2
        except requests.exceptions.ConnectionError:
            rsp.status_code = "Connection refused"

    def skfDetach(self, PlainText, CertIndex, ukeyPIN, sm3HashNum):
        """
        skfdetach签名
        PlainText = 十位随机数
        CertIndex = 1
        sm3HashNum = 1.2.156.10197.1.401
        ukeyPIN = 12345678
        :return:
        """
        url = f'https://127.0.0.1:63451/NSSkfDetachedSign?PlainText={PlainText}&CertIndex={CertIndex}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
        try:
            rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
            time.sleep(3)
            json_data = json.loads(rsp.text)
            signature_value = json_data[0]['signedData']
            # print(signature_value)
            # 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
            s1 = signature_value.replace('-', '+')
            s2 = s1.replace('_', '/')
            return s2
        except requests.exceptions.ConnectionError:
            rsp.status_code = "Connection refused"

    # print(signature_value)

    def detach_auth(self, vsiteIp, appId):
        """
        带着生成的数字签名去认证
        :return:
        """
        random_value = self.post_random(vsiteIp, appId)
        print(random_value)
        # 获取证书列表
        self.certificate_list()
        # detach数字签名
        detach_data = self.skfDetach(PlainText=random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
        url = 'https://%s/auth/authUser' % vsiteIp
        data = f'''<?xml version="1.0" encoding="utf-8"?>
                    <message>
                        <head>
                            <version>1.0</version>
                            <serviceType>authenService</serviceType>
                        </head>
                        <body>
                            <appId>T1</appId>
                            <authen>
                                <authCredential authMode="password">
                                    <uname>test</uname>
                                    <pwd>test</pwd>
                                </authCredential>
                                <authCredential authMode="cert">
                                    <detach>{detach_data}</detach>
                                    <original>{random_value}</original>
                                </authCredential>
                            </authen>
                            <attributes attributeType="portion">
                                <attr name="X509Certificate.SubjectDN"></attr>
                            </attributes>
                        </body>
                    </message>
        '''
        rsp = self.session.post(url, headers=self.headers, data=data, verify=False, proxies=self.proxies)
        json_data = self.xmltojson(rsp.text)
        json_body = json.loads(json_data)['message']['body']['attributes']['attr']['#text']
        print(json_body)
        return json_data


# 测试接口代码
test = Stress()
# rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1', CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
# print(rsp)
for i in range(100000):
    rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1')
    text = json.loads(rsp)['message']['body']['attributes']['attr']['#text']
    print(f'================================================第 {i+1} 次身份认证成功!===============================================')
    with open('log.txt', mode='a', encoding='utf-8') as f:
        f.write(f'===========================================第 {i+1} 次身份认证成功!============================================\n')
        f.write(text + '\n')
        print(text)

二、locust脚本

stress_test.py文件
# coding=utf-8
"""
    作者:gaojs
    功能:
    新增功能:
    日期:2022/4/8 18:25
"""
import json
import pprint
import time

from locust import HttpUser, between, task, TaskSet
from panglu_test import Stress
import os
import logging
# 强制去掉控制台InsecureRequestWarning
import urllib3

urllib3.disable_warnings()


class TaskTest(TaskSet, Stress):

    # 执行并发前置动作,比如清理当前所有session
    def __init__(self, parent: "User"):
        super().__init__(parent)
        self.signature = None
        self.random_value = None

    def on_start(self):
        """
        description:每个用户执行压测之前都会获取随机数和数字签名
        :return:
        """
        # 每个用户执行压测之前都会获取随机数和数字签名
        print('====================清理log结束,压测开始, 获取随机数和数字签名!!!========================')
        self.random_value = Stress().post_random('192.168.120.209', 'T1')
        # 获取证书列表
        Stress().certificate_list()
        # detach会根据证书列表下标去匹配证书,下发数字签名,并返回签名结果
        self.signature = Stress().skfDetach(PlainText=self.random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
        # return self.random_value, self.signature

    # 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高
    @task
    def stress(self):
        """
        旁路认证身份认证请求
        :return:
        """

        url = '/auth/authUser'
        data = f'''<?xml version="1.0" encoding="utf-8"?>
                    <message>
                        <head>
                            <version>1.0</version>
                            <serviceType>authenService</serviceType>
                        </head>
                        <body>
                            <appId>T1</appId>
                            <authen>
                                <authCredential authMode="password">
                                    <uname>t</uname>
                                    <pwd>t</pwd>
                                </authCredential>
                                <authCredential authMode="cert">
                                    <detach>{self.signature}</detach>
                                    <original>{self.random_value}</original>
                                </authCredential>
                            </authen>
                            <attributes attributeType="portion">
                                <attr name="X509Certificate.SubjectDN"></attr>
                            </attributes>
                        </body>
                    </message>
                '''
        headers = {"Content-Type": "text/xml;charset=UTF-8"}
        time.sleep(1)
        rsp = self.client.request(method='POST',
                                  url=url,
                                  data=data,
                                  catch_response=True,
                                  headers=headers,
                                  name='旁路认证',
                                  verify=False,
                                  allow_redirects=False)
        time.sleep(1)
        # 添加断言,是否成功认证
        rsp_json = self.xmltojson(rsp.text)
        rsp_str = json.loads(rsp_json)
        pprint.pprint(rsp_str)
        # msg_status = rsp_str['message']['body']['authResultSet']['authResult'][1]['@success']
        msg = rsp_str['message']['body']['attributes']['attr']['#text']
        # 认证接口返回200,但是业务未必是成功的,所以我们要做断言,如果认证请求返回的结果中messageState为true,则认为认证通过
        print(msg)
        # print(rsp_json)
        if not msg:
            print('认证失败,请重试!')
            print(rsp_str)
        # url = 'http://httpbin.org/get'
        # self.client.request(method='GET', url=url)

    # 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/
    def on_stop(self):
        pass


class UserBehavior(HttpUser):
    host = 'https://192.168.120.209'
    # 每次请求停顿时间
    wait_time = between(5, 10)
    tasks = [TaskTest]


if __name__ == "__main__":
    os.system("locust -f stress_test.py --host=https://192.168.120.209 --web-host=127.0.0.1")

4

评论区