一.依赖接口
panglu_test.py文件
# coding=utf-8
"""
作者:gaojs
功能:
新增功能:
日期:2022/4/8 18:03
"""
import json
import os.path
import pprint
import time
import requests
# 定义xml转json的函数
import xmltodict as xmltodict
# 强制去掉控制台InsecureRequestWarning
import urllib3
urllib3.disable_warnings()
class Stress:
"""
旁路认证类
"""
def __init__(self):
self.headers = {
'Content-Type': 'text/xml',
'Connection': 'close',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'
}
self.session = requests.session()
self.session.keep_alive = False
self.proxies = {"http": None, "https": None}
def xmltojson(self, xml):
"""
xml转成json
:param xml:
:return:
"""
xml_data = xmltodict.parse(xml)
json_data = json.dumps(xml_data, indent=1)
return json_data
def post_random(self, vsiteIp, appId):
"""
获取十位随机数方法
:return:
"""
url = 'https://%s/auth/getRandom' % vsiteIp
data = '<?xml version="1.0" encoding="UTF-8"?>' \
'<message>' \
'<head>' \
'<version>1.0</version>' \
'<serviceType>OriginalService</serviceType>' \
'</head>'\
'<body>' \
f'<appId>{appId}</appId>' \
'</body>' \
'</message>'
try:
rsp = self.session.post(url, data=data, headers=self.headers, verify=False, proxies=self.proxies)
# print(rsp.text)
json_data = self.xmltojson(rsp.text)
json_data = json.loads(json_data)
random_value = json_data['message']['body']['original']
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
# print(json_data, type(json_data))
# print(random_value)
return random_value
def certificate_list(self):
"""
获取证书列表
:return:
"""
url = r'https://127.0.0.1:63451/NSSkfGetCertsListInfo?DllFilePath=D:\旁路认证\GM3000\mtoken_GM.dll'
rsp = self.session.get(url, verify=False, proxies=self.proxies)
json_data = json.loads(rsp.text)
# print(json_data[0])
def skfAttach(self, PlainText, CertIndex0, ukeyPIN, sm3HashNum):
"""
skfAttach签名
:return:
"""
url = f'https://127.0.0.1:63451/NSSkfAttachedSign?PlainText={PlainText}&CertIndex={CertIndex0}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
try:
rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
json_data = json.loads(rsp.text)
signature_value = json_data[0]['signedData']
# print(signature_value)
# print(signature_value)
# 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
s1 = signature_value.replace('-', '+')
s2 = s1.replace('_', '/')
return s2
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
def skfDetach(self, PlainText, CertIndex, ukeyPIN, sm3HashNum):
"""
skfdetach签名
PlainText = 十位随机数
CertIndex = 1
sm3HashNum = 1.2.156.10197.1.401
ukeyPIN = 12345678
:return:
"""
url = f'https://127.0.0.1:63451/NSSkfDetachedSign?PlainText={PlainText}&CertIndex={CertIndex}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
try:
rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
time.sleep(3)
json_data = json.loads(rsp.text)
signature_value = json_data[0]['signedData']
# print(signature_value)
# 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
s1 = signature_value.replace('-', '+')
s2 = s1.replace('_', '/')
return s2
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
# print(signature_value)
def detach_auth(self, vsiteIp, appId):
"""
带着生成的数字签名去认证
:return:
"""
random_value = self.post_random(vsiteIp, appId)
print(random_value)
# 获取证书列表
self.certificate_list()
# detach数字签名
detach_data = self.skfDetach(PlainText=random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
url = 'https://%s/auth/authUser' % vsiteIp
data = f'''<?xml version="1.0" encoding="utf-8"?>
<message>
<head>
<version>1.0</version>
<serviceType>authenService</serviceType>
</head>
<body>
<appId>T1</appId>
<authen>
<authCredential authMode="password">
<uname>test</uname>
<pwd>test</pwd>
</authCredential>
<authCredential authMode="cert">
<detach>{detach_data}</detach>
<original>{random_value}</original>
</authCredential>
</authen>
<attributes attributeType="portion">
<attr name="X509Certificate.SubjectDN"></attr>
</attributes>
</body>
</message>
'''
rsp = self.session.post(url, headers=self.headers, data=data, verify=False, proxies=self.proxies)
json_data = self.xmltojson(rsp.text)
json_body = json.loads(json_data)['message']['body']['attributes']['attr']['#text']
print(json_body)
return json_data
# 测试接口代码
test = Stress()
# rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1', CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
# print(rsp)
for i in range(100000):
rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1')
text = json.loads(rsp)['message']['body']['attributes']['attr']['#text']
print(f'================================================第 {i+1} 次身份认证成功!===============================================')
with open('log.txt', mode='a', encoding='utf-8') as f:
f.write(f'===========================================第 {i+1} 次身份认证成功!============================================\n')
f.write(text + '\n')
print(text)
二、locust脚本
stress_test.py文件
# coding=utf-8
"""
作者:gaojs
功能:
新增功能:
日期:2022/4/8 18:25
"""
import json
import pprint
import time
from locust import HttpUser, between, task, TaskSet
from panglu_test import Stress
import os
import logging
# 强制去掉控制台InsecureRequestWarning
import urllib3
urllib3.disable_warnings()
class TaskTest(TaskSet, Stress):
# 执行并发前置动作,比如清理当前所有session
def __init__(self, parent: "User"):
super().__init__(parent)
self.signature = None
self.random_value = None
def on_start(self):
"""
description:每个用户执行压测之前都会获取随机数和数字签名
:return:
"""
# 每个用户执行压测之前都会获取随机数和数字签名
print('====================清理log结束,压测开始, 获取随机数和数字签名!!!========================')
self.random_value = Stress().post_random('192.168.120.209', 'T1')
# 获取证书列表
Stress().certificate_list()
# detach会根据证书列表下标去匹配证书,下发数字签名,并返回签名结果
self.signature = Stress().skfDetach(PlainText=self.random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
# return self.random_value, self.signature
# 压测任务,也可以是@task(10)啥的,这个数字是代表权重,数值越大,执行的频率就越高
@task
def stress(self):
"""
旁路认证身份认证请求
:return:
"""
url = '/auth/authUser'
data = f'''<?xml version="1.0" encoding="utf-8"?>
<message>
<head>
<version>1.0</version>
<serviceType>authenService</serviceType>
</head>
<body>
<appId>T1</appId>
<authen>
<authCredential authMode="password">
<uname>t</uname>
<pwd>t</pwd>
</authCredential>
<authCredential authMode="cert">
<detach>{self.signature}</detach>
<original>{self.random_value}</original>
</authCredential>
</authen>
<attributes attributeType="portion">
<attr name="X509Certificate.SubjectDN"></attr>
</attributes>
</body>
</message>
'''
headers = {"Content-Type": "text/xml;charset=UTF-8"}
time.sleep(1)
rsp = self.client.request(method='POST',
url=url,
data=data,
catch_response=True,
headers=headers,
name='旁路认证',
verify=False,
allow_redirects=False)
time.sleep(1)
# 添加断言,是否成功认证
rsp_json = self.xmltojson(rsp.text)
rsp_str = json.loads(rsp_json)
pprint.pprint(rsp_str)
# msg_status = rsp_str['message']['body']['authResultSet']['authResult'][1]['@success']
msg = rsp_str['message']['body']['attributes']['attr']['#text']
# 认证接口返回200,但是业务未必是成功的,所以我们要做断言,如果认证请求返回的结果中messageState为true,则认为认证通过
print(msg)
# print(rsp_json)
if not msg:
print('认证失败,请重试!')
print(rsp_str)
# url = 'http://httpbin.org/get'
# self.client.request(method='GET', url=url)
# 执行并发测试后执行的动作,比如保存log等操作,查看报告http://localhost:8089/
def on_stop(self):
pass
class UserBehavior(HttpUser):
host = 'https://192.168.120.209'
# 每次请求停顿时间
wait_time = between(5, 10)
tasks = [TaskTest]
if __name__ == "__main__":
os.system("locust -f stress_test.py --host=https://192.168.120.209 --web-host=127.0.0.1")
评论区