需求描述
项目需求测试过程中,需要向Nginx服务器发送一些用例请求,然后查看对应的Nginx日志,判断是否存在特征内容,来判断任务是否执行成功。为了提升效率,需要将这一过程实现自动化。
实践环境
Python 3.6.5
代码设计与实现
#!/usr/bin/env python
# -*- coding:utf-8 -*-
\'\'\'
@CreateTime: 2021/06/26 9:05
@Author : shouke
\'\'\'
import time
import threading
import subprocess
from collections import deque
def collect_nginx_log():
global nginx_log_queue
global is_tasks_compete
global task_status
args = \'tail -0f /usr/local/openresty/nginx/logs/access.log\'
while task_status != \'req_log_got\':
with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:
log_for_req = \'\'
outs, errs = \'\', \'\'
try:
outs, errs = proc.communicate(timeout=2)
except subprocess.TimeoutExpired:
print(\'获取nginx日志超时,正在重试\')
proc.kill()
try:
outs, errs = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
print(\'获取nginx日志超时,再次超时,停止重试\')
break
finally:
for line in outs.split(\'\\n\'):
flag = \'\\\"client_ip\\\":\\\"10.118.0.77\\\"\' # 特征
if flag in line: # 查找包含特征内容的日志
log_for_req += line
if task_status == \'req_finished\':
nginx_log_queue.append(log_for_req)
task_status = \'req_log_got\'
def run_tasks(task_list):
\'\'\'
运行任务
:param task_list 任务列表
\'\'\'
global nginx_log_queue
global is_tasks_compete
global task_status
for task in task_list:
thread = threading.Thread(target=collect_nginx_log,
name=\"collect_nginx_log\")
thread.start()
time.sleep(1) # 执行任务前,让收集日志线程先做好准备
print(\'正在执行任务:%s\' % task.get(\'name\'))
# 执行Nginx任务请求
# ...
task_status = \'req_finished\'
time_to_wait = 0.1
while task_status != \'req_log_got\': # 请求触发的nginx日志收集未完成
time.sleep(time_to_wait)
time_to_wait += 0.01
else:# 获取到用例请求触发的nginx日志
if nginx_log_queue:
nginx_log = nginx_log_queue.popleft()
task_status = \'req_ready\'
# 解析日志
# do something here
# ...
else:
print(\'存储请求日志的队列为空\')
# do something here
# ...
if __name__ == \'__main__\':
nginx_log_queue = deque()
is_tasks_compete = False # 所有任务是否执行完成
task_status = \'req_ready\' # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态
print(\'###########################任务开始###########################\')
tast_list = [{\'name\':\'test_task\', \'other\':\'...\'}]
run_tasks(tast_list)
is_tasks_compete = True
current_active_thread_num = len(threading.enumerate())
while current_active_thread_num != 1:
time.sleep(2)
current_active_thread_num = len(threading.enumerate())
print(\'###########################任务完成###########################\')
注意:
1、上述代码为啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep \"特征内容\"呢?这是因为这样做无法获取到Nginx的日志
2、实践时发现,第一次执行proc.communicate(timeout=2)获取日志时,总是无法获取,会超时,需要二次获取,并且timeout设置太小时(实践时尝试过设置为1秒),也会导致第二次执行时无法获取Nginx日志。
来源:https://www.cnblogs.com/shouke/p/14993393.html
图文来源于网络,如有侵权请联系删除。