Pytest源码分析
By:授客 QQ:1033553122
测试环境
pytest 5.4.3
测试脚本mytest.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pytest
def test_func(): # test开头的测试函数
print(\"test_func\")
assert 1 # 断言成功
if __name__ == \'__main__\':
pytest.main() # 执行测试
源码分析
测试脚本mytest.py
import pytest
运行pytest/__init__.py,主要做了两件事情
pytest/__init__.py
# PYTHON_ARGCOMPLETE_OK
\"\"\"
pytest: unit and functional testing with Python.
\"\"\"
from _pytest import __version__
from _pytest.assertion import register_assert_rewrite
from _pytest.compat import _setup_collect_fakemodule
from _pytest.config import cmdline
from _pytest.config import ExitCode
from _pytest.config import hookimpl
from _pytest.config import hookspec
from _pytest.config import main
from _pytest.config import UsageError
from _pytest.debugging import pytestPDB as __pytestPDB
from _pytest.fixtures import fillfixtures as _fillfuncargs
from _pytest.fixtures import fixture
from _pytest.fixtures import yield_fixture
from _pytest.freeze_support import freeze_includes
from _pytest.main import Session
from _pytest.mark import MARK_GEN as mark
from _pytest.mark import param
from _pytest.nodes import Collector
from _pytest.nodes import File
from _pytest.nodes import Item
from _pytest.outcomes import exit
from _pytest.outcomes import fail
from _pytest.outcomes import importorskip
from _pytest.outcomes import skip
from _pytest.outcomes import xfail
from _pytest.python import Class
from _pytest.python import Function
from _pytest.python import Instance
from _pytest.python import Module
from _pytest.python import Package
from _pytest.python_api import approx
from _pytest.python_api import raises
from _pytest.recwarn import deprecated_call
from _pytest.recwarn import warns
from _pytest.warning_types import PytestAssertRewriteWarning
from _pytest.warning_types import PytestCacheWarning
from _pytest.warning_types import PytestCollectionWarning
from _pytest.warning_types import PytestConfigWarning
from _pytest.warning_types import PytestDeprecationWarning
from _pytest.warning_types import PytestExperimentalApiWarning
from _pytest.warning_types import PytestUnhandledCoroutineWarning
from _pytest.warning_types import PytestUnknownMarkWarning
from _pytest.warning_types import PytestWarning
set_trace = __pytestPDB.set_trace
__all__ = [
\"__version__\",
\"_fillfuncargs\",
\"approx\",
\"Class\",
\"cmdline\",
\"Collector\",
\"deprecated_call\",
\"exit\",
\"ExitCode\",
\"fail\",
\"File\",
\"fixture\",
\"freeze_includes\",
\"Function\",
\"hookimpl\",
\"hookspec\",
\"importorskip\",
\"Instance\",
\"Item\",
\"main\",
\"mark\",
\"Module\",
\"Package\",
\"param\",
\"PytestAssertRewriteWarning\",
\"PytestCacheWarning\",
\"PytestCollectionWarning\",
\"PytestConfigWarning\",
\"PytestDeprecationWarning\",
\"PytestExperimentalApiWarning\",
\"PytestUnhandledCoroutineWarning\",
\"PytestUnknownMarkWarning\",
\"PytestWarning\",
\"raises\",
\"register_assert_rewrite\",
\"Session\",
\"set_trace\",
\"skip\",
\"UsageError\",
\"warns\",
\"xfail\",
\"yield_fixture\",
]
_setup_collect_fakemodule() # 建立一个伪模块`pytest.collect`
del _setup_collect_fakemodule
_pytest/compat.py
_setup_collect_fakemodule函数
COLLECT_FAKEMODULE_ATTRIBUTES = (
\"Collector\",
\"Module\",
\"Function\",
\"Instance\",
\"Session\",
\"Item\",
\"Class\",
\"File\",
\"_fillfuncargs\",
)
def _setup_collect_fakemodule() -> None:
from types import ModuleType
import pytest
# Types ignored because the module is created dynamically.
pytest.collect = ModuleType(\"pytest.collect\") # type: ignore
pytest.collect.__all__ = [] # type: ignore # used for setns
for attr_name in COLLECT_FAKEMODULE_ATTRIBUTES:
setattr(pytest.collect, attr_name, getattr(pytest, attr_name)) # type: ignore
测试脚本myptest.py
pytest.main()
这里的main函数为从_pytest/config/__init__.py定义的全局函数--main函数
_pytest/config/__init__.py
_pytest/config/__init__.py main函数定义
主要用于获取Config对象config,然后通过config.hook.pytest_cmdline_main执行测试
def main(args=None, plugins=None) -> Union[int, ExitCode]:
\"\"\" return exit code, after performing an in-process test run.
:arg args: list of command line arguments.
:arg plugins: list of plugin objects to be auto-registered during
initialization.
\"\"\"
try:
try:
config = _prepareconfig(args, plugins)
except ConftestImportFailure as e:
exc_info = ExceptionInfo(e.excinfo)
tw = TerminalWriter(sys.stderr)
tw.line(
\"ImportError while loading conftest \'{e.path}\'.\".format(e=e), red=True
)
exc_info.traceback = exc_info.traceback.filter(filter_traceback)
exc_repr = (
exc_info.getrepr(style=\"short\", chain=False)
if exc_info.traceback
else exc_info.exconly()
)
formatted_tb = str(exc_repr)
for line in formatted_tb.splitlines():
tw.line(line.rstrip(), red=True)
return ExitCode.USAGE_ERROR
else:
try:
ret = config.hook.pytest_cmdline_main(
config=config
) # type: Union[ExitCode, int]
try:
return ExitCode(ret)
except ValueError:
return ret
finally:
config._ensure_unconfigure()
except UsageError as e:
tw = TerminalWriter(sys.stderr)
for msg in e.args:
tw.line(\"ERROR: {}\\n\".format(msg), red=True)
return ExitCode.USAGE_ERROR
_pytest/config/__init__.py _prepareconfig函数定义
主要是获取并返回Config对象config,该对象通过函数pluginmanager.hook.pytest_cmdline_parse返回
def _prepareconfig(
args: Optional[Union[py.path.local, List[str]]] = None, plugins=None
):
if args is None:
args = sys.argv[1:]
elif isinstance(args, py.path.local):
args = [str(args)]
elif not isinstance(args, list):
msg = \"`args` parameter expected to be a list of strings, got: {!r} (type: {})\"
raise TypeError(msg.format(args, type(args)))
config = get_config(args, plugins)
pluginmanager = config.pluginmanager
try:
if plugins:
for plugin in plugins:
if isinstance(plugin, str):
pluginmanager.consider_pluginarg(plugin)
else:
pluginmanager.register(plugin)
return pluginmanager.hook.pytest_cmdline_parse(
pluginmanager=pluginmanager, args=args
)
except BaseException:
config._ensure_unconfigure()
raise
_pytest/config/__init__.py get_config函数定义
主要是构造Config对象
# Plugins that cannot be disabled via \"-p no:X\" currently.
essential_plugins = (
\"mark\",
\"main\",
\"runner\",
\"fixtures\",
\"helpconfig\", # Provides -p.
)
default_plugins = essential_plugins + (
\"python\",
\"terminal\",
\"debugging\",
\"unittest\",
\"capture\",
\"skipping\",
\"tmpdir\",
\"monkeypatch\",
\"recwarn\",
\"pastebin\",
\"nose\",
\"assertion\",
\"junitxml\",
\"resultlog\",
\"doctest\",
\"cacheprovider\",
\"freeze_support\",
\"setuponly\",
\"setupplan\",
\"stepwise\",
\"warnings\",
\"logging\",
\"reports\",
\"faulthandler\",
)
builtin_plugins = set(default_plugins)
builtin_plugins.add(\"pytester\")
def get_config(args=None, plugins=None):
# subsequent calls to main will create a fresh instance
pluginmanager = PytestPluginManager() # PytestPluginManager 继承于 PluginManager
config = Config(
pluginmanager,
invocation_params=Config.InvocationParams(
args=args or (), plugins=plugins, dir=Path().resolve()
),
)
if args is not None:
# Handle any \"-p no:plugin\" args.
pluginmanager.consider_preparse(args, exclude_only=True)
for spec in default_plugins:
pluginmanager.import_plugin(spec) # 为对象导入插件
return config
_pytest/config/__init__.py Config构造函数定义
构造函数参数pluginmanager接收了外部传入的PytestPluginManager实例对象,该参数被赋值给 self.pluginmanager,同时初始化self.hook值为self.pluginmanager.hook,这样Config对象就具备了pluggy的插件管理及hook能力,可通过Config对象.hook.hook函数
class Config:
# ... 略
def __init__(
self,
pluginmanager: PytestPluginManager,
*,
invocation_params: Optional[InvocationParams] = None,
) -> None:
from .argparsing import Parser, FILE_OR_DIR
if invocation_params is None:
invocation_params = self.InvocationParams(
args=(), plugins=None, dir=Path.cwd()
)
self.option = argparse.Namespace()
\"\"\"Access to command line option as attributes.
:type: argparse.Namespace
\"\"\"
self.invocation_params = invocation_params
\"\"\"The parameters with which pytest was invoked.
:type: InvocationParams
\"\"\"
_a = FILE_OR_DIR
self._parser = Parser(
usage=f\"%(prog)s [options] [{_a}] [{_a}] [...]\",
processopt=self._processopt,
)
self.pluginmanager = pluginmanager # 增加插件管理器
\"\"\"The plugin manager handles plugin registration and hook invocation.
:type: PytestPluginManager
\"\"\"
self.trace = self.pluginmanager.trace.root.get(\"config\")
self.hook = self.pluginmanager.hook # 增加hook属性
self._inicache: Dict[str, Any] = {}
self._override_ini: Sequence[str] = ()
self._opt2dest: Dict[str, str] = {}
self._cleanup: List[Callable[[], None]] = []
# A place where plugins can store information on the config for their
# own use. Currently only intended for internal plugins.
self._store = Store()
self.pluginmanager.register(self, \"pytestconfig\")
self._configured = False
self.hook.pytest_addoption.call_historic(
kwargs=dict(parser=self._parser, pluginmanager=self.pluginmanager)
)
if TYPE_CHECKING:
from _pytest.cacheprovider import Cache
self.cache: Optional[Cache] = None
_pytest/config/__init__.py PytestPluginManager类
初始化时,通过self.add_hookspecs(_pytest.hookspec) 添加hook函数声明(接口),同时通过self.register(self)把自己注册为插件实现;
import_plugin 中拼接_pytest/config/__init__.py中定义的模块,拼接后的形式,形如_pytest.python,然后导入并注册插件
@final
class PytestPluginManager(PluginManager):
\"\"\"A :py:class:`pluggy.PluginManager <pluggy.PluginManager>` with
additional pytest-specific functionality:
* Loading plugins from the command line, ``PYTEST_PLUGINS`` env variable and
``pytest_plugins`` global variables found in plugins being loaded.
* ``conftest.py`` loading during start-up.
\"\"\"
def __init__(self) -> None:
import _pytest.assertion
super().__init__(\"pytest\")
# The objects are module objects, only used generically.
self._conftest_plugins: Set[types.ModuleType] = set()
# State related to local conftest plugins.
self._dirpath2confmods: Dict[Path, List[types.ModuleType]] = {}
self._conftestpath2mod: Dict[Path, types.ModuleType] = {}
self._confcutdir: Optional[Path] = None
self._noconftest = False
self._duplicatepaths: Set[Path] = set()
# plugins that were explicitly skipped with pytest.skip
# list of (module name, skip reason)
# previously we would issue a warning when a plugin was skipped, but
# since we refactored warnings as first citizens of Config, they are
# just stored here to be used later.
self.skipped_plugins: List[Tuple[str, str]] = []
self.add_hookspecs(_pytest.hookspec)
self.register(self)
if os.environ.get(\"PYTEST_DEBUG\"):
err: IO[str] = sys.stderr
encoding: str = getattr(err, \"encoding\", \"utf8\")
try:
err = open(
os.dup(err.fileno()),
mode=err.mode,
buffering=1,
encoding=encoding,
)
except Exception:
pass
self.trace.root.setwriter(err.write)
self.enable_tracing()
# Config._consider_importhook will set a real object if required.
self.rewrite_hook = _pytest.assertion.DummyRewriteHook()
# Used to know when we are importing conftests after the pytest_configure stage.
self._configured = False
def parse_hookimpl_opts(self, plugin: _PluggyPlugin, name: str):
# pytest hooks are always prefixed with \"pytest_\",
# so we avoid accessing possibly non-readable attributes
# (see issue #1073).
if not name.startswith(\"pytest_\"):
return
# Ignore names which can not be hooks.
if name == \"pytest_plugins\":
return
method = getattr(plugin, name)
opts = super().parse_hookimpl_opts(plugin, name)
# Consider only actual functions for hooks (#3775).
if not inspect.isroutine(method):
return
# Collect unmarked hooks as long as they have the `pytest_\' prefix.
if opts is None and name.startswith(\"pytest_\"):
opts = {}
if opts is not None:
# TODO: DeprecationWarning, people should use hookimpl
# https://github.com/pytest-dev/pytest/issues/4562
known_marks = {m.name for m in getattr(method, \"pytestmark\", [])}
for name in (\"tryfirst\", \"trylast\", \"optionalhook\", \"hookwrapper\"):
opts.setdefault(name, hasattr(method, name) or name in known_marks)
return opts
def parse_hookspec_opts(self, module_or_class, name: str):
opts = super().parse_hookspec_opts(module_or_class, name)
if opts is None:
method = getattr(module_or_class, name)
if name.startswith(\"pytest_\"):
# todo: deprecate hookspec hacks
# https://github.com/pytest-dev/pytest/issues/4562
known_marks = {m.name for m in getattr(method, \"pytestmark\", [])}
opts = {
\"firstresult\": hasattr(method, \"firstresult\")
or \"firstresult\" in known_marks,
\"historic\": hasattr(method, \"historic\")
or \"historic\" in known_marks,
}
return opts
def register(
self, plugin: _PluggyPlugin, name: Optional[str] = None
) -> Optional[str]:
if name in _pytest.deprecated.DEPRECATED_EXTERNAL_PLUGINS:
warnings.warn(
PytestConfigWarning(
\"{} plugin has been merged into the core, \"
\"please remove it from your requirements.\".format(
name.replace(\"_\", \"-\")
)
)
)
return None
ret: Optional[str] = super().register(plugin, name)
if ret:
self.hook.pytest_plugin_registered.call_historic(
kwargs=dict(plugin=plugin, manager=self)
)
if isinstance(plugin, types.ModuleType):
self.consider_module(plugin)
return ret
# ...略
def import_plugin(self, modname: str, consider_entry_points: bool = False) -> None:
\"\"\"Import a plugin with ``modname``.
If ``consider_entry_points`` is True, entry point names are also
considered to find a plugin.
\"\"\"
# Most often modname refers to builtin modules, e.g. \"pytester\",
# \"terminal\" or \"capture\". Those plugins are registered under their
# basename for historic purposes but must be imported with the
# _pytest prefix.
assert isinstance(modname, str), (
\"module name as text required, got %r\" % modname
)
if self.is_blocked(modname) or self.get_plugin(modname) is not None:
return
importspec = \"_pytest.\" + modname if modname in builtin_plugins else modname
self.rewrite_hook.mark_rewrite(importspec)
if consider_entry_points:
loaded = self.load_setuptools_entrypoints(\"pytest11\", name=modname)
if loaded:
return
try:
__import__(importspec)
except ImportError as e:
raise ImportError(
\'Error importing plugin \"{}\": {}\'.format(modname, str(e.args[0]))
).with_traceback(e.__traceback__) from e
except Skipped as e:
self.skipped_plugins.append((modname, e.msg or \"\"))
else:
mod = sys.modules[importspec]
self.register(mod, modname)
这里重写了父类的register,如下,重写函数中也调用了父类的register函数
父类的register函数中,调用了self.parse_hookimpl_opts(plugin, name),这个函数在当前类即PytestPluginManager类中重写了,所以,运行时调用的是重写后的PytestPluginManager.parse_hookimpl_opts(plugin, name),该函数中,也会调用PluginManager.parse_hookimpl_opts函数,如果调用该父类函数获取返回值为None,并且函数名称以pytest__开头,则标记返回结果值为 {},这样PluginManager.register函数中,hookimpl_opts is not None表达式值为真,会继续往下执行代码,将没有使用hookimpl标记的,以pytest__打头的函数添加为对应hook函数的函数实现体。
pluggy/manage.py PluginManager类
class PluginManager(object):
# ...略
def register(self, plugin, name=None):
\"\"\" Register a plugin and return its canonical name or None if the name
is blocked from registering. Raise a ValueError if the plugin is already
registered. \"\"\"
plugin_name = name or self.get_canonical_name(plugin)
if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
if self._name2plugin.get(plugin_name, -1) is None:
return # blocked plugin, return None to indicate no registration
raise ValueError(
\"Plugin already registered: %s=%s\\n%s\"
% (plugin_name, plugin, self._name2plugin)
)
# XXX if an error happens we should make sure no state has been
# changed at point of return
self._name2plugin[plugin_name] = plugin
# register matching hook implementations of the plugin
self._plugin2hookcallers[plugin] = hookcallers = []
for name in dir(plugin):
hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
if hookimpl_opts is not None:
normalize_hookimpl_opts(hookimpl_opts)
method = getattr(plugin, name)
hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
hook = getattr(self.hook, name, None)
if hook is None:
hook = _HookCaller(name, self._hookexec)
setattr(self.hook, name, hook)
elif hook.has_spec():
self._verify_hook(hook, hookimpl)
hook._maybe_apply_history(hookimpl)
hook._add_hookimpl(hookimpl)
hookcallers.append(hook)
return plugin_name
_pytest/config/__init__.py main函数定义
获取Config对象config后,通过调用config.hook.pytest_cmdline_main,从上到下,执行以下.py脚本中的pytest_cmdline_main函数
_pytest/setupplan.py
_pytest/setuponly.py
_pytest/mark/__init__.py
_pytest/cacheprovider.py
_python/python
_python/helpconfig
_python/main.py
_python/main.py
该文件中的 pytest_cmdline_main函数,负责执行测试
def pytest_cmdline_main(config):
return wrap_session(config, _main)
def wrap_session(
config: Config, doit: Callable[[Config, \"Session\"], Optional[Union[int, ExitCode]]]
) -> Union[int, ExitCode]:
\"\"\"Skeleton command line program\"\"\"
session = Session.from_config(config)
session.exitstatus = ExitCode.OK
initstate = 0
try:
try:
config._do_configure()
initstate = 1
config.hook.pytest_sessionstart(session=session)
initstate = 2
session.exitstatus = doit(config, session) or 0
except UsageError:
session.exitstatus = ExitCode.USAGE_ERROR
raise
except Failed:
session.exitstatus = ExitCode.TESTS_FAILED
except (KeyboardInterrupt, exit.Exception):
excinfo = _pytest._code.ExceptionInfo.from_current()
exitstatus = ExitCode.INTERRUPTED # type: Union[int, ExitCode]
if isinstance(excinfo.value, exit.Exception):
if excinfo.value.returncode is not None:
exitstatus = excinfo.value.returncode
if initstate < 2:
sys.stderr.write(
\"{}: {}\\n\".format(excinfo.typename, excinfo.value.msg)
)
config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
session.exitstatus = exitstatus
except: # noqa
session.exitstatus = ExitCode.INTERNAL_ERROR
excinfo = _pytest._code.ExceptionInfo.from_current()
try:
config.notify_exception(excinfo, config.option)
except exit.Exception as exc:
if exc.returncode is not None:
session.exitstatus = exc.returncode
sys.stderr.write(\"{}: {}\\n\".format(type(exc).__name__, exc))
else:
if excinfo.errisinstance(SystemExit):
sys.stderr.write(\"mainloop: caught unexpected SystemExit!\\n\")
finally:
# Explicitly break reference cycle.
excinfo = None # type: ignore
session.startdir.chdir()
if initstate >= 2:
try:
config.hook.pytest_sessionfinish(
session=session, exitstatus=session.exitstatus
)
except exit.Exception as exc:
if exc.returncode is not None:
session.exitstatus = exc.returncode
sys.stderr.write(\"{}: {}\\n\".format(type(exc).__name__, exc))
config._ensure_unconfigure()
return session.exitstatus
def _main(config: Config, session: \"Session\") -> Optional[Union[int, ExitCode]]:
\"\"\" default command line protocol for initialization, session,
running tests and reporting. \"\"\"
config.hook.pytest_collection(session=session)
config.hook.pytest_runtestloop(session=session)
if session.testsfailed:
return ExitCode.TESTS_FAILED
elif session.testscollected == 0:
return ExitCode.NO_TESTS_COLLECTED
return None
def pytest_collection(session):
return session.perform_collect()
def pytest_runtestloop(session):
if session.testsfailed and not session.config.option.continue_on_collection_errors:
raise session.Interrupted(
\"%d error%s during collection\"
% (session.testsfailed, \"s\" if session.testsfailed != 1 else \"\")
)
if session.config.option.collectonly:
return True
for i, item in enumerate(session.items): # session.items 获取值为为pytest测试脚本中的测试函数
nextitem = session.items[i + 1] if i + 1 < len(session.items) else None
item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
if session.shouldfail:
raise session.Failed(session.shouldfail)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
return True
item.config.hook.pytest_runtest_protocol执行顺序如下,从上到下执行各个脚本中对应的函数
pytest_runtest_protocol warnings.py
pytest_runtest_protocol assertion/__init__.py
pytest_runtest_protocol faulthandler
pytest_runtest_protocol unittest.py
pytest_runtest_protocol runner.py
_pytest/runner.py
pytest_runtest_protocol 负责执行pytest协议
def pytest_runtest_protocol(item, nextitem):
item.ihook.pytest_runtest_logstart(nodeid=item.nodeid, location=item.location)
runtestprotocol(item, nextitem=nextitem)
item.ihook.pytest_runtest_logfinish(nodeid=item.nodeid, location=item.location)
return True
def runtestprotocol(item, log=True, nextitem=None):
hasrequest = hasattr(item, \"_request\")
if hasrequest and not item._request:
item._initrequest()
rep = call_and_report(item, \"setup\", log)
reports = [rep]
if rep.passed:
if item.config.getoption(\"setupshow\", False):
show_test_item(item)
if not item.config.getoption(\"setuponly\", False):
reports.append(call_and_report(item, \"call\", log))
reports.append(call_and_report(item, \"teardown\", log, nextitem=nextitem))
# after all teardown hooks have been called
# want funcargs and request info to go away
if hasrequest:
item._request = False
item.funcargs = None
return reports
来源:https://www.cnblogs.com/shouke/p/14466832.html
图文来源于网络,如有侵权请联系删除。