什么是AutoLine开源平台
AutoLine开源平台是一个开源自动化测试解决方案,基于RobotFramework进行二次开发,支持RobotFramework几乎所有的库。
源码地址
github地址: https://github.com/small99/AutoLine
码 云 地 址:https://gitee.com/lym51/AutoLine
运行器源码路径及源码结构
在AutoLine中我们自定义实现了RobotFramework的运行器,其路径如下图所示:
源码结构如下图所示:
说明:
- 一些已经实现的运行器,用于调试测试用
- 运行器分为自动化运行器、调试运行器、手工运行器三种模式
下面我们对源码进行注释
__author__ = "苦叶子"
"""
公众号: 开源优测
Email: lymking@foxmail.com
"""
import os
import platform
import codecs
import time
import json
import subprocess
from datetime import datetime
from threading import Thread, Timer
import xml.etree.ElementTree as ET
from flask import current_app
from flask_login import current_user
from sqlalchemy import and_
from ..models import AutoTask, AutoProject, User
from .. import db
from ..auto.builder import Builder
from .process import Process
# 同步运行器,即阻塞模式,一次只能运行一个RF进程
def robot_run(category, id):
app = current_app._get_current_object()
if len(app.config["RESULTS"]) > int(app.config['AUTO_PROCESS_COUNT']):
return json.dumps({"status": "busying", "msg": "任务池已满!!!"})
builder = Builder(category, id)
builder.build()
app.config["RESULTS"].append(app.config["POOL"].apply_async(builder.test_run, (app, current_user.get_id(),)))
# app.config["POOL"].join()
return json.dumps({"status": "success", "msg": "任务启动成功"})
# 异步运行器,采用多线程方式,可以启动多个RF进程
def robot_async_run(category, id):
app = current_app._get_current_object()
builder = Builder(category, id)
builder.build()
thr = Thread(target=builder.test_run, args=[app, current_user.get_id()])
#app.config["RESULTS"].append(thr)
thr.start()
return json.dumps({"status": "success", "msg": "任务启动成功"})
# 用于检查RF运行进程的状态
def check_process_status(app):
print("timer to check ....%d" % len(app.config["RUNNERS"]))
with app.app_context():
try:
for runner in app.config["RUNNERS"]:
if runner.is_finish():
runner.write_result()
app.config["RUNNERS"].remove(runner)
except Exception as e:
print(e)
# debug模式运行器
def debug_run(id):
builder = Builder(id)
builder.build()
runner = Runner(builder.id, builder.build_no)
runner.debug()
return (builder.id, builder.build_no)
# RF运行器
def run_process(category, id):
builder = Builder(id)
builder.build()
if builder.has_test_case():
runner = Runner(builder.id, builder.build_no)
if category == "auto":
runner.auto_run()
else:
runner.run()
app = current_app._get_current_object()
app.config["TRIGGER"].update_job(id)
app.config["RUNNERS"].append({
"project_id": builder.id,
"task_id": builder.build_no,
"runner": runner
})
return json.dumps({"status": "success", "msg": "任务启动成功"})
else:
return json.dumps({"status": "fail", "msg": "项目中没有创建关键字步骤,任务启动失败,请新增关键字步骤!!!"})
# 执行器类
class Runner:
def __init__(self, project_id, build_no):
self.project_id = project_id
self.build_no = build_no
self._process = None
self._timer = None
self._out_fd = None
# 用于调度器自动执行
def auto_run(self):
try:
user_id = User.query.filter_by(username="AutoExecutor").first().id
name = AutoProject.query.filter_by(id=self.project_id).first().name
task = AutoTask(project_id=self.project_id,
build_no=self.build_no,
status="running",
create_author_id=user_id,
create_timestamp=datetime.now())
db.session.add(task)
db.session.commit()
output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
output_dir = output_dir.replace("\", "/")
# -x result/output.xml -l result/log.html -r result/report.html
shell = False
if "Windows" in platform.platform():
self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
shell = True
else:
self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name, "%s/testcase.robot" % output_dir]
#print(command)
self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)
#self._process = Process(command)
#self._process.start()
except Exception as e:
print(str(e))
pass
return {"status": "success",
"msg": "任务启动成功",
"project_id": self.project_id,
"build_no": self.build_no}
# 用于手工在页面触发执行
def run(self):
#
try:
name = AutoProject.query.filter_by(id=self.project_id).first().name
task = AutoTask(project_id=self.project_id,
build_no=self.build_no,
status="running",
create_author_id=current_user.get_id(),
create_timestamp=datetime.now())
db.session.add(task)
db.session.commit()
output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
output_dir = output_dir.replace("\", "/")
shell = False
if "Windows" in platform.platform():
self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "cp936")
command = "pybot -d %s -L DEBUG -N %s %s/testcase.robot" % (output_dir, name, output_dir)
shell = True
else:
self._out_fd = codecs.open(output_dir + "/logs.log", "a+", "utf-8")
command = ["pybot", "-d", "%s" % output_dir, "-L", "DEBUG", "-N", "%s" % name,
"%s/testcase.robot" % output_dir]
# print(command)
self._process = subprocess.Popen(command, shell=shell, stdout=self._out_fd, stderr=subprocess.STDOUT)
except Exception as e:
print(str(e))
pass
return {"status": "success",
"msg": "任务启动成功",
"project_id": self.project_id,
"build_no": self.build_no}
# 调试模式,用于检查是否符合RF语法
def debug(self):
try:
output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
output_dir = output_dir.replace("\", "/")
# -x result/output.xml -l result/log.html -r result/report.html
command = ["pybot", "-d", "%s" % output_dir, "--dryrun", "-N", "调试输出", "%s/testcase.robot" % output_dir]
self._out_fd = open(output_dir + "/debug.log", "a+")
self._process = subprocess.Popen(command, shell=False, stdout=self._out_fd, stderr=subprocess.STDOUT)
while True:
if self._process.poll() == 0: # 判断子进程是否结束
break
else:
time.sleep(0.2)
except Exception as e:
print(str(e))
pass
return {"status": "success",
"msg": "任务启动成功",
"project_id": self.project_id,
"build_no": self.build_no}
# 停止任务
def stop(self):
status = "success"
msg = "任务终止"
try:
self._process.stop()
msg += "成功"
except Exception as e:
status = "fail"
msg = msg + "异常" + str(e)
return {"status": status,
"msg": msg,
"project_id": self.project_id,
"build_no": self.build_no}
def get_output(self, wait_until_finished=False):
return self._process.get_output(wait_until_finished)
def is_finish(self):
return self._process.is_finished()
def write_result(self):
output_dir = os.getcwd() + "/logs/%s/%s" % (self.project_id, self.build_no)
output_dir = output_dir.replace("\", "/")
print("write ... result ...")
print(os.path.exists(output_dir + "/log.html"))
if os.path.exists(output_dir + "/log.html"):
time.sleep(0.2)
task = AutoTask.query.filter(and_(AutoTask.project_id == self.project_id,
AutoTask.build_no == self.build_no)).first()
tree = ET.parse(output_dir + "/output.xml")
root = tree.getroot()
passed = root.find("./statistics/suite/stat").attrib["pass"]
fail = root.find("./statistics/suite/stat").attrib["fail"]
if int(fail) != 0:
task.status = 'fail'
else:
task.status = 'pass'
db.session.merge(task)
db.session.commit()
self._timer.canel()
说明:
- 在运行器中,关键的是一个Builder类,该类实现了从数据库读取数据,并序列号为RF语法的文件
- Runner执行器根据类型(web、app、http)调用Builder加载不同的RobotFramework支持库和通用的库,实现对RobotFramework的完整的支持
- 大家主要看Runner类,这里不对代码一一解释,因为代码本身没什么难度,关键在于细节的看上几遍就懂了的
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net