一、场景要求
我们在使用locust时,有时候默认的场景无法满足我们的要求时,这时后我们需要自定义场景
比如我们要设置每一段时间启动10个用户运行,执行60s后再一次启动10个用户服务器托管网,总共运行10分钟,默认的场景是无法满足这样的要求的,我们可以使用LoadTestshape类,LoadTestshape类提供了几种负载测试策略
二、用法
在脚本文件中定义一个类继承LoadTestshape类,locust在启动时发现文件中有使用这个类会自动启动。
在该类中需要定义tick()方法,该方法返回用户数以及产生率的元组(如果没有返回这两个测试将停止),locust启动后每秒调用一次该函数。
在LoadTestshape类中可以使用get_run_time()方法来获取测试运行的时间,使用此方法可以用来控制压测的总时间。
三、基于时间峰值策略
需求:比如我们要设置每一段时间启动10个用户运行,过一段时间后再一次启动10个用户,持续加压 60s
四、代码实现
import os
from locust import *
'''实现目标:每隔一段时间增加十个用户,实现持续加压'''
class CustomTaskSet(LoadTestShape):
# 设置压测时间60s
time_limit=60
#设置启动/停止的用户数
spawn_rate=10
def tick(self):
"""
返回一个元组,包含两值:
user_count -- 总用户数
spawn_rate -- 每秒启动/停止用户数
返回None时,停止负载测试
服务器托管网 """
#获取压测时间
run_time=self.get_run_time()
if run_time #每隔一段时间启动10个用户;为-1时,表示将个位变为0,逢5进一
user_count=round(run_time,-1)
print(f'当前用户数{user_count},当前时间{run_time}')
return user_count,self.spawn_rate
return None
class IncrementalPressureMeasurement(HttpUser):
wait_time =between(1,2)
host="http://localhost:8080"
def on_start(self):
print("负载加压开始")
def on_stop(self):
print("负载加压结束")
@task
def increment_pressure(self):
self.client.post('/measurement',data={'measurement':''})
if __name__ == '__main__':
file_path = os.path.abspath(__file__)
os.system(f'locust -f {file_path} --web-host=127.0.0.1')
五、实现效果
最后我们欣赏下劳动成果吧,haha!
最后,还请大家可以点个免费的赞,你们的点赞才是我更新的动力!
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
环境 管理节点:Ubuntu 22.04 控制节点:CentOS 8 Ansible:2.15.6 module_defaults 如果某个module被多次调用,且每次调用时的参数也一样,则可以通过 module_defaults 为该module指定缺省参…